From f52bf71740d141ac02ab507ef4078d99ac208870 Mon Sep 17 00:00:00 2001 From: Matt Whitlock Date: Thu, 10 Feb 2022 05:49:51 -0500 Subject: [PATCH] WalletService: reduce polling overhead The transaction_monitor function in WalletService was polling the most recent 100 transactions from the BlockchainInterface every 5 seconds. This was egregious and inefficient. Introduce a _yield_new_transactions function that remembers the newest previously yielded transaction between invocations and yields only transactions newer than that one. Use the new function in transaction_monitor. Avoid repeatedly deserializing confirmed transactions that remain in the set of monitored transactions after being confirmed either due to "confirmed" callbacks that return False or because callbacks have timed out and have been unregistered without cleaning the set of monitored transactions. Change the active_txids list to an active_txs dict whose keys are the txids and whose values are the deserialized transactions. Use this new dict as a cache to avoid repeatedly deserializing confirmed transactions. --- jmclient/jmclient/wallet_service.py | 88 ++++++++++++++++------------- 1 file changed, 50 insertions(+), 38 deletions(-) diff --git a/jmclient/jmclient/wallet_service.py b/jmclient/jmclient/wallet_service.py index d0b7952..ce5d325 100644 --- a/jmclient/jmclient/wallet_service.py +++ b/jmclient/jmclient/wallet_service.py @@ -1,6 +1,7 @@ #! /usr/bin/env python import collections +import itertools import time import sys from decimal import Decimal @@ -83,9 +84,9 @@ class WalletService(Service): # transactions we are actively monitoring, # i.e. they are not new but we want to track: - self.active_txids = [] + self.active_txs = {} # to ensure transactions are only processed once: - self.processed_txids = [] + self.processed_txids = set() self.set_autofreeze_warning_cb() @@ -285,6 +286,23 @@ class WalletService(Service): self.autofreeze_warning_cb(utxo) self.disable_utxo(*utxo) + def _yield_new_transactions(self): + """ Constrains the sequence generated by bci._yield_transactions so + that it stops just before it would yield the newest transaction + previously yielded by _yield_new_transactions. + """ + since_txid = self.last_seen_txid + last = True + for tx in self.bci._yield_transactions(): + if 'txid' in tx: + txid = tx['txid'] + if txid == since_txid: + return + if last: + self.last_seen_txid = txid + last = False + yield tx + def transaction_monitor(self): """Keeps track of any changes in the wallet (new transactions). Intended to be run as a twisted task.LoopingCall so that this @@ -294,33 +312,19 @@ class WalletService(Service): if not self.update_blockheight(): return - txlist = self.bci.list_transactions(100) - if not txlist: - return - - new_txs = [] - for x in txlist: - # process either (a) a completely new tx or - # (b) a tx that reached unconf status but we are still - # waiting for conf (active_txids) - if "txid" not in x: - continue - if x['txid'] in self.active_txids or x['txid'] not in self.old_txs: - new_txs.append(x) - # reset for next polling event: - self.old_txs = set(x['txid'] for x in txlist if "txid" in x) # for this invocation of transaction_monitor, we *don't* want # to call `gettransaction` more than once per txid, even if the # `listtransactions` result has multiple instances for different # wallet labels; so we use a temporary variable to cache. - gettx_results = {} - for tx in new_txs: - txid = tx["txid"] - if txid not in gettx_results: - res = self.bci.get_transaction(hextobin(txid)) - gettx_results[txid] = res - else: - res = gettx_results[txid] + seen_txids = set() + wallet_name = self.get_wallet_name() + for txid in itertools.chain(list(self.active_txs), reversed( + [tx['txid'] for tx in self._yield_new_transactions() + if 'txid' in tx])): + if txid in seen_txids: + continue + seen_txids.add(txid) + res = self.bci.get_transaction(hextobin(txid)) if not res: continue confs = res["confirmations"] @@ -330,13 +334,18 @@ class WalletService(Service): if confs < 0: jlog.info( "Transaction: " + txid + " has a conflict, abandoning.") + self.active_txs.pop(txid, None) continue if confs == 0: + if txid in self.active_txs: + # avoid processing an unconfirmed tx we've already seen + continue height = None else: height = self.current_blockheight - confs + 1 - txd = self.bci.get_deser_from_gettransaction(res) + txd = self.active_txs.get(txid) or \ + self.bci.get_deser_from_gettransaction(res) if txd is None: continue removed_utxos, added_utxos = self.wallet.process_new_tx(txd, height) @@ -347,14 +356,16 @@ class WalletService(Service): # is absurdly fast, this is considered acceptable compared with # additional complexity. self.log_new_tx(removed_utxos, added_utxos, txid) - self.processed_txids.append(txid) + self.processed_txids.add(txid) # first fire 'all' type callbacks, irrespective of if the # transaction pertains to anything known (but must # have correct label per above); filter on this Joinmarket wallet label, # or the external monitoring label: - if (self.bci.is_address_labeled(tx, self.get_wallet_name()) or - self.bci.is_address_labeled(tx, self.EXTERNAL_WALLET_LABEL)): + if (any(self.bci.is_address_labeled(addr, wallet_name) or + self.bci.is_address_labeled(addr, + self.EXTERNAL_WALLET_LABEL) + for addr in res["details"])): for f in self.callbacks["all"]: # note we need no return value as we will never # remove these from the list @@ -373,10 +384,10 @@ class WalletService(Service): # Note also that it's entirely possible that there are only removals, # not additions, to the utxo set, specifically in sweeps to external # addresses. In this case, since removal can by definition only - # happen once, we must allow entries in self.active_txids through the + # happen once, we must allow txids in self.active_txs through the # filter. if len(added_utxos) > 0 or len(removed_utxos) > 0 \ - or txid in self.active_txids: + or txid in self.active_txs: if confs == 0: for k in possible_keys: if k in self.callbacks["unconfirmed"]: @@ -385,7 +396,7 @@ class WalletService(Service): if f(txd, txid): self.callbacks["unconfirmed"][k].remove(f) # keep monitoring for conf > 0: - self.active_txids.append(txid) + self.active_txs[txid] = txd elif confs > 0: for k in possible_keys: if k in self.callbacks["confirmed"]: @@ -393,7 +404,7 @@ class WalletService(Service): if f(txd, txid, confs): self.callbacks["confirmed"][k].remove(f) if txid in self.active_txids: - self.active_txids.remove(txid) + self.active_txs.pop(txid, None) def check_callback_called(self, txinfo, callback, cbtype, msg): """ Intended to be a deferred Task to be scheduled some @@ -406,7 +417,7 @@ class WalletService(Service): if callback in self.callbacks[cbtype][txinfo]: # the callback was not called, drop it and warn self.callbacks[cbtype][txinfo].remove(callback) - # TODO - dangling txids in self.active_txids will + # TODO - dangling txids in self.active_txs will # be caused by this, but could also happen for # other reasons; possibly add logic to ensure that # this never occurs, although their presence should @@ -454,8 +465,9 @@ class WalletService(Service): self.sync_unspent() # Don't attempt updates on transactions that existed # before startup - self.old_txs = [x['txid'] for x in self.bci.list_transactions(100) - if "txid" in x] + self.last_seen_txid = next( + (tx['txid'] for tx in self.bci._yield_transactions() + if 'txid' in tx), None) if isinstance(self.bci, BitcoinCoreNoHistoryInterface): self.bci.set_wallet_no_history(self.wallet) return self.synced @@ -668,13 +680,13 @@ class WalletService(Service): to this wallet, as a list. """ res = [] - processed_txids = [] + processed_txids = set() for r in self.bci._yield_transactions(): txid = r["txid"] if txid not in processed_txids: tx = self.bci.get_transaction(hextobin(txid)) res.append(self.bci.get_deser_from_gettransaction(tx)) - processed_txids.append(txid) + processed_txids.add(txid) return res def get_transaction(self, txid):