From 8d454277d8d192c5cdb51b49fa6c43969b283e14 Mon Sep 17 00:00:00 2001 From: Matt Whitlock Date: Thu, 10 Feb 2022 04:25:25 -0500 Subject: [PATCH 1/3] BitcoinCoreInterface: improve _yield_transactions Alter the BitcoinCoreInterface._yield_transactions generator to be nicer in cases where only a small number of transactions need to be examined. Geometrically ramp the requested number of transactions in each successive request, starting from 1. Since new transactions can appear in the Bitcoin Core wallet between RPC calls, overlap successive request ranges and resume yielding transactions after finding the previously yielded transaction to avoid yielding duplicates. The listtransactions RPC of Bitcoin Core can return the same (txid,vout) as both a "category":"receive" and a "category":"send". Therefore, use (txid,vout,category) as the sync key to ensure that all distinct elements are yielded. Delete the unused wallet_name parameter from _yield_transactions, per suggestion by Adam Gibson. --- jmclient/jmclient/blockchaininterface.py | 40 +++++++++++++++++------- jmclient/jmclient/wallet_service.py | 7 ++--- 2 files changed, 32 insertions(+), 15 deletions(-) diff --git a/jmclient/jmclient/blockchaininterface.py b/jmclient/jmclient/blockchaininterface.py index 2402ff7..65a4cfc 100644 --- a/jmclient/jmclient/blockchaininterface.py +++ b/jmclient/jmclient/blockchaininterface.py @@ -292,18 +292,36 @@ class BitcoinCoreInterface(BlockchainInterface): self.import_addresses(addresses - imported_addresses, wallet_name) return import_needed - def _yield_transactions(self, wallet_name): - batch_size = 1000 - iteration = 0 + def _yield_transactions(self): + """ Generates a lazily fetched sequence of transactions seen in the + wallet (under any label/account), yielded in newest-first order. Care + is taken to avoid yielding duplicates even when new transactions are + actively being added to the wallet while the iteration is ongoing. + """ + num, skip = 1, 0 + txs = self.list_transactions(num, skip) + if not txs: + return + yielded_tx = txs[0] + yield yielded_tx while True: - new = self._rpc( - 'listtransactions', - ["*", batch_size, iteration * batch_size, True]) - for tx in new: - yield tx - if len(new) < batch_size: + num *= 2 + txs = self.list_transactions(num, skip) + if not txs: + return + try: + idx = [(tx['txid'], tx['vout'], tx['category']) for tx in txs + ].index((yielded_tx['txid'], yielded_tx['vout'], + yielded_tx['category'])) + except ValueError: + skip += num + continue + for tx in reversed(txs[:idx]): + yielded_tx = tx # inefficient but more obvious + yield yielded_tx + if len(txs) < num: return - iteration += 1 + skip += num - 1 def get_deser_from_gettransaction(self, rpcretval): """Get full transaction deserialization from a call @@ -641,7 +659,7 @@ class BitcoinCoreNoHistoryInterface(BitcoinCoreInterface, RegtestBitcoinCoreMixi assert desc_str.startswith("addr(") return desc_str[5:desc_str.find(")")] - def _yield_transactions(self, wallet_name): + def _yield_transactions(self): for u in self.scan_result["unspents"]: tx = {"category": "receive", "address": self._get_addr_from_desc(u["desc"])} diff --git a/jmclient/jmclient/wallet_service.py b/jmclient/jmclient/wallet_service.py index 332523e..d0b7952 100644 --- a/jmclient/jmclient/wallet_service.py +++ b/jmclient/jmclient/wallet_service.py @@ -669,8 +669,7 @@ class WalletService(Service): """ res = [] processed_txids = [] - for r in self.bci._yield_transactions( - self.get_wallet_name()): + for r in self.bci._yield_transactions(): txid = r["txid"] if txid not in processed_txids: tx = self.bci.get_transaction(hextobin(txid)) @@ -720,7 +719,7 @@ class WalletService(Service): if isinstance(self.wallet, FidelityBondMixin): tx_receive = [] burner_txes = [] - for tx in self.bci._yield_transactions(wallet_name): + for tx in self.bci._yield_transactions(): if tx['category'] == 'receive': tx_receive.append(tx) elif tx["category"] == "send": @@ -743,7 +742,7 @@ class WalletService(Service): else: #not fidelity bond wallet, significantly faster sync used_addresses_gen = set(tx['address'] - for tx in self.bci._yield_transactions(wallet_name) + for tx in self.bci._yield_transactions() if tx['category'] == 'receive') # needed for address-reuse check: self.used_addresses = used_addresses_gen From f52bf71740d141ac02ab507ef4078d99ac208870 Mon Sep 17 00:00:00 2001 From: Matt Whitlock Date: Thu, 10 Feb 2022 05:49:51 -0500 Subject: [PATCH 2/3] WalletService: reduce polling overhead The transaction_monitor function in WalletService was polling the most recent 100 transactions from the BlockchainInterface every 5 seconds. This was egregious and inefficient. Introduce a _yield_new_transactions function that remembers the newest previously yielded transaction between invocations and yields only transactions newer than that one. Use the new function in transaction_monitor. Avoid repeatedly deserializing confirmed transactions that remain in the set of monitored transactions after being confirmed either due to "confirmed" callbacks that return False or because callbacks have timed out and have been unregistered without cleaning the set of monitored transactions. Change the active_txids list to an active_txs dict whose keys are the txids and whose values are the deserialized transactions. Use this new dict as a cache to avoid repeatedly deserializing confirmed transactions. --- jmclient/jmclient/wallet_service.py | 88 ++++++++++++++++------------- 1 file changed, 50 insertions(+), 38 deletions(-) diff --git a/jmclient/jmclient/wallet_service.py b/jmclient/jmclient/wallet_service.py index d0b7952..ce5d325 100644 --- a/jmclient/jmclient/wallet_service.py +++ b/jmclient/jmclient/wallet_service.py @@ -1,6 +1,7 @@ #! /usr/bin/env python import collections +import itertools import time import sys from decimal import Decimal @@ -83,9 +84,9 @@ class WalletService(Service): # transactions we are actively monitoring, # i.e. they are not new but we want to track: - self.active_txids = [] + self.active_txs = {} # to ensure transactions are only processed once: - self.processed_txids = [] + self.processed_txids = set() self.set_autofreeze_warning_cb() @@ -285,6 +286,23 @@ class WalletService(Service): self.autofreeze_warning_cb(utxo) self.disable_utxo(*utxo) + def _yield_new_transactions(self): + """ Constrains the sequence generated by bci._yield_transactions so + that it stops just before it would yield the newest transaction + previously yielded by _yield_new_transactions. + """ + since_txid = self.last_seen_txid + last = True + for tx in self.bci._yield_transactions(): + if 'txid' in tx: + txid = tx['txid'] + if txid == since_txid: + return + if last: + self.last_seen_txid = txid + last = False + yield tx + def transaction_monitor(self): """Keeps track of any changes in the wallet (new transactions). Intended to be run as a twisted task.LoopingCall so that this @@ -294,33 +312,19 @@ class WalletService(Service): if not self.update_blockheight(): return - txlist = self.bci.list_transactions(100) - if not txlist: - return - - new_txs = [] - for x in txlist: - # process either (a) a completely new tx or - # (b) a tx that reached unconf status but we are still - # waiting for conf (active_txids) - if "txid" not in x: - continue - if x['txid'] in self.active_txids or x['txid'] not in self.old_txs: - new_txs.append(x) - # reset for next polling event: - self.old_txs = set(x['txid'] for x in txlist if "txid" in x) # for this invocation of transaction_monitor, we *don't* want # to call `gettransaction` more than once per txid, even if the # `listtransactions` result has multiple instances for different # wallet labels; so we use a temporary variable to cache. - gettx_results = {} - for tx in new_txs: - txid = tx["txid"] - if txid not in gettx_results: - res = self.bci.get_transaction(hextobin(txid)) - gettx_results[txid] = res - else: - res = gettx_results[txid] + seen_txids = set() + wallet_name = self.get_wallet_name() + for txid in itertools.chain(list(self.active_txs), reversed( + [tx['txid'] for tx in self._yield_new_transactions() + if 'txid' in tx])): + if txid in seen_txids: + continue + seen_txids.add(txid) + res = self.bci.get_transaction(hextobin(txid)) if not res: continue confs = res["confirmations"] @@ -330,13 +334,18 @@ class WalletService(Service): if confs < 0: jlog.info( "Transaction: " + txid + " has a conflict, abandoning.") + self.active_txs.pop(txid, None) continue if confs == 0: + if txid in self.active_txs: + # avoid processing an unconfirmed tx we've already seen + continue height = None else: height = self.current_blockheight - confs + 1 - txd = self.bci.get_deser_from_gettransaction(res) + txd = self.active_txs.get(txid) or \ + self.bci.get_deser_from_gettransaction(res) if txd is None: continue removed_utxos, added_utxos = self.wallet.process_new_tx(txd, height) @@ -347,14 +356,16 @@ class WalletService(Service): # is absurdly fast, this is considered acceptable compared with # additional complexity. self.log_new_tx(removed_utxos, added_utxos, txid) - self.processed_txids.append(txid) + self.processed_txids.add(txid) # first fire 'all' type callbacks, irrespective of if the # transaction pertains to anything known (but must # have correct label per above); filter on this Joinmarket wallet label, # or the external monitoring label: - if (self.bci.is_address_labeled(tx, self.get_wallet_name()) or - self.bci.is_address_labeled(tx, self.EXTERNAL_WALLET_LABEL)): + if (any(self.bci.is_address_labeled(addr, wallet_name) or + self.bci.is_address_labeled(addr, + self.EXTERNAL_WALLET_LABEL) + for addr in res["details"])): for f in self.callbacks["all"]: # note we need no return value as we will never # remove these from the list @@ -373,10 +384,10 @@ class WalletService(Service): # Note also that it's entirely possible that there are only removals, # not additions, to the utxo set, specifically in sweeps to external # addresses. In this case, since removal can by definition only - # happen once, we must allow entries in self.active_txids through the + # happen once, we must allow txids in self.active_txs through the # filter. if len(added_utxos) > 0 or len(removed_utxos) > 0 \ - or txid in self.active_txids: + or txid in self.active_txs: if confs == 0: for k in possible_keys: if k in self.callbacks["unconfirmed"]: @@ -385,7 +396,7 @@ class WalletService(Service): if f(txd, txid): self.callbacks["unconfirmed"][k].remove(f) # keep monitoring for conf > 0: - self.active_txids.append(txid) + self.active_txs[txid] = txd elif confs > 0: for k in possible_keys: if k in self.callbacks["confirmed"]: @@ -393,7 +404,7 @@ class WalletService(Service): if f(txd, txid, confs): self.callbacks["confirmed"][k].remove(f) if txid in self.active_txids: - self.active_txids.remove(txid) + self.active_txs.pop(txid, None) def check_callback_called(self, txinfo, callback, cbtype, msg): """ Intended to be a deferred Task to be scheduled some @@ -406,7 +417,7 @@ class WalletService(Service): if callback in self.callbacks[cbtype][txinfo]: # the callback was not called, drop it and warn self.callbacks[cbtype][txinfo].remove(callback) - # TODO - dangling txids in self.active_txids will + # TODO - dangling txids in self.active_txs will # be caused by this, but could also happen for # other reasons; possibly add logic to ensure that # this never occurs, although their presence should @@ -454,8 +465,9 @@ class WalletService(Service): self.sync_unspent() # Don't attempt updates on transactions that existed # before startup - self.old_txs = [x['txid'] for x in self.bci.list_transactions(100) - if "txid" in x] + self.last_seen_txid = next( + (tx['txid'] for tx in self.bci._yield_transactions() + if 'txid' in tx), None) if isinstance(self.bci, BitcoinCoreNoHistoryInterface): self.bci.set_wallet_no_history(self.wallet) return self.synced @@ -668,13 +680,13 @@ class WalletService(Service): to this wallet, as a list. """ res = [] - processed_txids = [] + processed_txids = set() for r in self.bci._yield_transactions(): txid = r["txid"] if txid not in processed_txids: tx = self.bci.get_transaction(hextobin(txid)) res.append(self.bci.get_deser_from_gettransaction(tx)) - processed_txids.append(txid) + processed_txids.add(txid) return res def get_transaction(self, txid): From 8fe5b7b712ed9409d75907e34c1702a1e2d96213 Mon Sep 17 00:00:00 2001 From: Matt Whitlock Date: Fri, 4 Mar 2022 01:45:29 -0500 Subject: [PATCH 3/3] WalletService: clean up and fix callback handling * Migrate callbacks registered under provisional (tx-output tuples) keys as soon as their txids are known. Leave a txid breadcrumb in place so register_callbacks and check_callback_called can find it and the migrated list of callbacks. * Invoke callbacks via list comprehensions, retaining only the callbacks that return False. The old code was buggy, as it was removing elements from the callback lists while iterating over them, which would cause callbacks to be skipped. * The existing code would fail to call any "confirmed" callbacks for a remove-only transaction if no "unconfirmed" callbacks had been registered for that transaction, and it would discontinue monitoring of a transaction after just one "confirmed" callback had returned True, even if other "confirmed" callbacks returned False to remain registered. This commit overhauls the logic to fix all of these bugs. * Delete emptied callback lists from the dicts to be tidy. --- jmclient/jmclient/wallet_service.py | 94 ++++++++++++++++++----------- 1 file changed, 60 insertions(+), 34 deletions(-) diff --git a/jmclient/jmclient/wallet_service.py b/jmclient/jmclient/wallet_service.py index ce5d325..0b09d35 100644 --- a/jmclient/jmclient/wallet_service.py +++ b/jmclient/jmclient/wallet_service.py @@ -75,10 +75,11 @@ class WalletService(Service): # Dicts of registered callbacks, by type # and then by txinfo, for events # on transactions. - self.callbacks = {} - self.callbacks["all"] = [] - self.callbacks["unconfirmed"] = {} - self.callbacks["confirmed"] = {} + self.callbacks = { + "all": [], # note: list, not dict + "unconfirmed": {}, + "confirmed": {}, + } self.restart_callback = None @@ -201,9 +202,12 @@ class WalletService(Service): # note that in this case, txid is ignored. self.callbacks["all"].extend(callbacks) elif cb_type in ["unconfirmed", "confirmed"]: - if txinfo not in self.callbacks[cb_type]: - self.callbacks[cb_type][txinfo] = [] - self.callbacks[cb_type][txinfo].extend(callbacks) + if callbacks: + reg = self.callbacks[cb_type].setdefault(txinfo, []) + if isinstance(reg, str): + # found a txid breadcrumb for this txinfo + reg = self.callbacks[cb_type].setdefault(reg, []) + reg.extend(callbacks) else: assert False, "Invalid argument: " + cb_type @@ -371,10 +375,16 @@ class WalletService(Service): # remove these from the list f(txd, txid) - # The tuple given as the second possible key for the dict - # is such because txid is not always available - # at the time of callback registration). - possible_keys = [txid, tuple((x.scriptPubKey, x.nValue) for x in txd.vout)] + # txid is not always available at the time of callback registration. + # Migrate any callbacks registered under the provisional key, and + # leave a txid breadcrumb so check_callback_called can find it. + txos = tuple((x.scriptPubKey, x.nValue) for x in txd.vout) + for cb_type in ["unconfirmed", "confirmed"]: + callbacks = self.callbacks[cb_type] + reg = callbacks.get(txos) + if isinstance(reg, list): + callbacks.setdefault(txid, [])[:0] = reg + callbacks[txos] = txid # note that len(added_utxos) > 0 is not a sufficient condition for # the tx being new, since wallet.add_new_utxos will happily re-add @@ -389,22 +399,26 @@ class WalletService(Service): if len(added_utxos) > 0 or len(removed_utxos) > 0 \ or txid in self.active_txs: if confs == 0: - for k in possible_keys: - if k in self.callbacks["unconfirmed"]: - for f in self.callbacks["unconfirmed"][k]: - # True implies success, implies removal: - if f(txd, txid): - self.callbacks["unconfirmed"][k].remove(f) - # keep monitoring for conf > 0: - self.active_txs[txid] = txd + callbacks = [f for f in + self.callbacks["unconfirmed"].pop(txid, []) + if not f(txd, txid)] + if callbacks: + self.callbacks["unconfirmed"][txid] = callbacks + else: + self.callbacks["unconfirmed"].pop(txos, None) + if self.callbacks["confirmed"].get(txid): + # keep monitoring for conf > 0: + self.active_txs[txid] = txd elif confs > 0: - for k in possible_keys: - if k in self.callbacks["confirmed"]: - for f in self.callbacks["confirmed"][k]: - if f(txd, txid, confs): - self.callbacks["confirmed"][k].remove(f) - if txid in self.active_txids: - self.active_txs.pop(txid, None) + callbacks = [f for f in + self.callbacks["confirmed"].pop(txid, []) + if not f(txd, txid, confs)] + if callbacks: + self.callbacks["confirmed"][txid] = callbacks + else: + self.callbacks["confirmed"].pop(txos, None) + # no more callbacks registered; stop monitoring tx + self.active_txs.pop(txid, None) def check_callback_called(self, txinfo, callback, cbtype, msg): """ Intended to be a deferred Task to be scheduled some @@ -413,15 +427,27 @@ class WalletService(Service): If the callback was previously called, return True, otherwise False. """ assert cbtype in ["unconfirmed", "confirmed"] - if txinfo in self.callbacks[cbtype]: - if callback in self.callbacks[cbtype][txinfo]: + callbacks = self.callbacks[cbtype] + if isinstance(txinfo, str): + txid = txinfo + reg = callbacks.get(txid) + else: + txid = None + reg = callbacks.get(txinfo) + if isinstance(reg, str): + # found a txid breadcrumb for this txinfo + txid = reg + reg = callbacks.get(txid) + if reg: + if callback in reg: # the callback was not called, drop it and warn - self.callbacks[cbtype][txinfo].remove(callback) - # TODO - dangling txids in self.active_txs will - # be caused by this, but could also happen for - # other reasons; possibly add logic to ensure that - # this never occurs, although their presence should - # not cause a functional error. + reg.remove(callback) + if not reg: + del callbacks[txinfo] + if txid: + callbacks.pop(txid, None) + # no more callbacks registered; stop monitoring tx + self.active_txs.pop(txid, None) jlog.info("Timed out: " + msg) return False # if callback is not in the list, it was already