Browse Source

Merge #1180: WalletService: reduce polling overhead

8fe5b7b WalletService: clean up and fix callback handling (Matt Whitlock)
f52bf71 WalletService: reduce polling overhead (Matt Whitlock)
8d45427 BitcoinCoreInterface: improve _yield_transactions (Matt Whitlock)
master
Adam Gibson 4 years ago
parent
commit
15e9dc03bf
No known key found for this signature in database
GPG Key ID: 141001A1AF77F20B
  1. 40
      jmclient/jmclient/blockchaininterface.py
  2. 179
      jmclient/jmclient/wallet_service.py

40
jmclient/jmclient/blockchaininterface.py

@ -292,18 +292,36 @@ class BitcoinCoreInterface(BlockchainInterface):
self.import_addresses(addresses - imported_addresses, wallet_name)
return import_needed
def _yield_transactions(self, wallet_name):
batch_size = 1000
iteration = 0
def _yield_transactions(self):
""" Generates a lazily fetched sequence of transactions seen in the
wallet (under any label/account), yielded in newest-first order. Care
is taken to avoid yielding duplicates even when new transactions are
actively being added to the wallet while the iteration is ongoing.
"""
num, skip = 1, 0
txs = self.list_transactions(num, skip)
if not txs:
return
yielded_tx = txs[0]
yield yielded_tx
while True:
new = self._rpc(
'listtransactions',
["*", batch_size, iteration * batch_size, True])
for tx in new:
yield tx
if len(new) < batch_size:
num *= 2
txs = self.list_transactions(num, skip)
if not txs:
return
try:
idx = [(tx['txid'], tx['vout'], tx['category']) for tx in txs
].index((yielded_tx['txid'], yielded_tx['vout'],
yielded_tx['category']))
except ValueError:
skip += num
continue
for tx in reversed(txs[:idx]):
yielded_tx = tx # inefficient but more obvious
yield yielded_tx
if len(txs) < num:
return
iteration += 1
skip += num - 1
def get_deser_from_gettransaction(self, rpcretval):
"""Get full transaction deserialization from a call
@ -641,7 +659,7 @@ class BitcoinCoreNoHistoryInterface(BitcoinCoreInterface, RegtestBitcoinCoreMixi
assert desc_str.startswith("addr(")
return desc_str[5:desc_str.find(")")]
def _yield_transactions(self, wallet_name):
def _yield_transactions(self):
for u in self.scan_result["unspents"]:
tx = {"category": "receive", "address":
self._get_addr_from_desc(u["desc"])}

179
jmclient/jmclient/wallet_service.py

@ -1,6 +1,7 @@
#! /usr/bin/env python
import collections
import itertools
import time
import sys
from decimal import Decimal
@ -74,18 +75,19 @@ class WalletService(Service):
# Dicts of registered callbacks, by type
# and then by txinfo, for events
# on transactions.
self.callbacks = {}
self.callbacks["all"] = []
self.callbacks["unconfirmed"] = {}
self.callbacks["confirmed"] = {}
self.callbacks = {
"all": [], # note: list, not dict
"unconfirmed": {},
"confirmed": {},
}
self.restart_callback = None
# transactions we are actively monitoring,
# i.e. they are not new but we want to track:
self.active_txids = []
self.active_txs = {}
# to ensure transactions are only processed once:
self.processed_txids = []
self.processed_txids = set()
self.set_autofreeze_warning_cb()
@ -200,9 +202,12 @@ class WalletService(Service):
# note that in this case, txid is ignored.
self.callbacks["all"].extend(callbacks)
elif cb_type in ["unconfirmed", "confirmed"]:
if txinfo not in self.callbacks[cb_type]:
self.callbacks[cb_type][txinfo] = []
self.callbacks[cb_type][txinfo].extend(callbacks)
if callbacks:
reg = self.callbacks[cb_type].setdefault(txinfo, [])
if isinstance(reg, str):
# found a txid breadcrumb for this txinfo
reg = self.callbacks[cb_type].setdefault(reg, [])
reg.extend(callbacks)
else:
assert False, "Invalid argument: " + cb_type
@ -285,6 +290,23 @@ class WalletService(Service):
self.autofreeze_warning_cb(utxo)
self.disable_utxo(*utxo)
def _yield_new_transactions(self):
""" Constrains the sequence generated by bci._yield_transactions so
that it stops just before it would yield the newest transaction
previously yielded by _yield_new_transactions.
"""
since_txid = self.last_seen_txid
last = True
for tx in self.bci._yield_transactions():
if 'txid' in tx:
txid = tx['txid']
if txid == since_txid:
return
if last:
self.last_seen_txid = txid
last = False
yield tx
def transaction_monitor(self):
"""Keeps track of any changes in the wallet (new transactions).
Intended to be run as a twisted task.LoopingCall so that this
@ -294,33 +316,19 @@ class WalletService(Service):
if not self.update_blockheight():
return
txlist = self.bci.list_transactions(100)
if not txlist:
return
new_txs = []
for x in txlist:
# process either (a) a completely new tx or
# (b) a tx that reached unconf status but we are still
# waiting for conf (active_txids)
if "txid" not in x:
continue
if x['txid'] in self.active_txids or x['txid'] not in self.old_txs:
new_txs.append(x)
# reset for next polling event:
self.old_txs = set(x['txid'] for x in txlist if "txid" in x)
# for this invocation of transaction_monitor, we *don't* want
# to call `gettransaction` more than once per txid, even if the
# `listtransactions` result has multiple instances for different
# wallet labels; so we use a temporary variable to cache.
gettx_results = {}
for tx in new_txs:
txid = tx["txid"]
if txid not in gettx_results:
seen_txids = set()
wallet_name = self.get_wallet_name()
for txid in itertools.chain(list(self.active_txs), reversed(
[tx['txid'] for tx in self._yield_new_transactions()
if 'txid' in tx])):
if txid in seen_txids:
continue
seen_txids.add(txid)
res = self.bci.get_transaction(hextobin(txid))
gettx_results[txid] = res
else:
res = gettx_results[txid]
if not res:
continue
confs = res["confirmations"]
@ -330,13 +338,18 @@ class WalletService(Service):
if confs < 0:
jlog.info(
"Transaction: " + txid + " has a conflict, abandoning.")
self.active_txs.pop(txid, None)
continue
if confs == 0:
if txid in self.active_txs:
# avoid processing an unconfirmed tx we've already seen
continue
height = None
else:
height = self.current_blockheight - confs + 1
txd = self.bci.get_deser_from_gettransaction(res)
txd = self.active_txs.get(txid) or \
self.bci.get_deser_from_gettransaction(res)
if txd is None:
continue
removed_utxos, added_utxos = self.wallet.process_new_tx(txd, height)
@ -347,23 +360,31 @@ class WalletService(Service):
# is absurdly fast, this is considered acceptable compared with
# additional complexity.
self.log_new_tx(removed_utxos, added_utxos, txid)
self.processed_txids.append(txid)
self.processed_txids.add(txid)
# first fire 'all' type callbacks, irrespective of if the
# transaction pertains to anything known (but must
# have correct label per above); filter on this Joinmarket wallet label,
# or the external monitoring label:
if (self.bci.is_address_labeled(tx, self.get_wallet_name()) or
self.bci.is_address_labeled(tx, self.EXTERNAL_WALLET_LABEL)):
if (any(self.bci.is_address_labeled(addr, wallet_name) or
self.bci.is_address_labeled(addr,
self.EXTERNAL_WALLET_LABEL)
for addr in res["details"])):
for f in self.callbacks["all"]:
# note we need no return value as we will never
# remove these from the list
f(txd, txid)
# The tuple given as the second possible key for the dict
# is such because txid is not always available
# at the time of callback registration).
possible_keys = [txid, tuple((x.scriptPubKey, x.nValue) for x in txd.vout)]
# txid is not always available at the time of callback registration.
# Migrate any callbacks registered under the provisional key, and
# leave a txid breadcrumb so check_callback_called can find it.
txos = tuple((x.scriptPubKey, x.nValue) for x in txd.vout)
for cb_type in ["unconfirmed", "confirmed"]:
callbacks = self.callbacks[cb_type]
reg = callbacks.get(txos)
if isinstance(reg, list):
callbacks.setdefault(txid, [])[:0] = reg
callbacks[txos] = txid
# note that len(added_utxos) > 0 is not a sufficient condition for
# the tx being new, since wallet.add_new_utxos will happily re-add
@ -373,27 +394,31 @@ class WalletService(Service):
# Note also that it's entirely possible that there are only removals,
# not additions, to the utxo set, specifically in sweeps to external
# addresses. In this case, since removal can by definition only
# happen once, we must allow entries in self.active_txids through the
# happen once, we must allow txids in self.active_txs through the
# filter.
if len(added_utxos) > 0 or len(removed_utxos) > 0 \
or txid in self.active_txids:
or txid in self.active_txs:
if confs == 0:
for k in possible_keys:
if k in self.callbacks["unconfirmed"]:
for f in self.callbacks["unconfirmed"][k]:
# True implies success, implies removal:
if f(txd, txid):
self.callbacks["unconfirmed"][k].remove(f)
callbacks = [f for f in
self.callbacks["unconfirmed"].pop(txid, [])
if not f(txd, txid)]
if callbacks:
self.callbacks["unconfirmed"][txid] = callbacks
else:
self.callbacks["unconfirmed"].pop(txos, None)
if self.callbacks["confirmed"].get(txid):
# keep monitoring for conf > 0:
self.active_txids.append(txid)
self.active_txs[txid] = txd
elif confs > 0:
for k in possible_keys:
if k in self.callbacks["confirmed"]:
for f in self.callbacks["confirmed"][k]:
if f(txd, txid, confs):
self.callbacks["confirmed"][k].remove(f)
if txid in self.active_txids:
self.active_txids.remove(txid)
callbacks = [f for f in
self.callbacks["confirmed"].pop(txid, [])
if not f(txd, txid, confs)]
if callbacks:
self.callbacks["confirmed"][txid] = callbacks
else:
self.callbacks["confirmed"].pop(txos, None)
# no more callbacks registered; stop monitoring tx
self.active_txs.pop(txid, None)
def check_callback_called(self, txinfo, callback, cbtype, msg):
""" Intended to be a deferred Task to be scheduled some
@ -402,15 +427,27 @@ class WalletService(Service):
If the callback was previously called, return True, otherwise False.
"""
assert cbtype in ["unconfirmed", "confirmed"]
if txinfo in self.callbacks[cbtype]:
if callback in self.callbacks[cbtype][txinfo]:
callbacks = self.callbacks[cbtype]
if isinstance(txinfo, str):
txid = txinfo
reg = callbacks.get(txid)
else:
txid = None
reg = callbacks.get(txinfo)
if isinstance(reg, str):
# found a txid breadcrumb for this txinfo
txid = reg
reg = callbacks.get(txid)
if reg:
if callback in reg:
# the callback was not called, drop it and warn
self.callbacks[cbtype][txinfo].remove(callback)
# TODO - dangling txids in self.active_txids will
# be caused by this, but could also happen for
# other reasons; possibly add logic to ensure that
# this never occurs, although their presence should
# not cause a functional error.
reg.remove(callback)
if not reg:
del callbacks[txinfo]
if txid:
callbacks.pop(txid, None)
# no more callbacks registered; stop monitoring tx
self.active_txs.pop(txid, None)
jlog.info("Timed out: " + msg)
return False
# if callback is not in the list, it was already
@ -454,8 +491,9 @@ class WalletService(Service):
self.sync_unspent()
# Don't attempt updates on transactions that existed
# before startup
self.old_txs = [x['txid'] for x in self.bci.list_transactions(100)
if "txid" in x]
self.last_seen_txid = next(
(tx['txid'] for tx in self.bci._yield_transactions()
if 'txid' in tx), None)
if isinstance(self.bci, BitcoinCoreNoHistoryInterface):
self.bci.set_wallet_no_history(self.wallet)
return self.synced
@ -668,14 +706,13 @@ class WalletService(Service):
to this wallet, as a list.
"""
res = []
processed_txids = []
for r in self.bci._yield_transactions(
self.get_wallet_name()):
processed_txids = set()
for r in self.bci._yield_transactions():
txid = r["txid"]
if txid not in processed_txids:
tx = self.bci.get_transaction(hextobin(txid))
res.append(self.bci.get_deser_from_gettransaction(tx))
processed_txids.append(txid)
processed_txids.add(txid)
return res
def get_transaction(self, txid):
@ -720,7 +757,7 @@ class WalletService(Service):
if isinstance(self.wallet, FidelityBondMixin):
tx_receive = []
burner_txes = []
for tx in self.bci._yield_transactions(wallet_name):
for tx in self.bci._yield_transactions():
if tx['category'] == 'receive':
tx_receive.append(tx)
elif tx["category"] == "send":
@ -743,7 +780,7 @@ class WalletService(Service):
else:
#not fidelity bond wallet, significantly faster sync
used_addresses_gen = set(tx['address']
for tx in self.bci._yield_transactions(wallet_name)
for tx in self.bci._yield_transactions()
if tx['category'] == 'receive')
# needed for address-reuse check:
self.used_addresses = used_addresses_gen

Loading…
Cancel
Save