diff --git a/jmclient/jmclient/blockchaininterface.py b/jmclient/jmclient/blockchaininterface.py index 2402ff7..65a4cfc 100644 --- a/jmclient/jmclient/blockchaininterface.py +++ b/jmclient/jmclient/blockchaininterface.py @@ -292,18 +292,36 @@ class BitcoinCoreInterface(BlockchainInterface): self.import_addresses(addresses - imported_addresses, wallet_name) return import_needed - def _yield_transactions(self, wallet_name): - batch_size = 1000 - iteration = 0 + def _yield_transactions(self): + """ Generates a lazily fetched sequence of transactions seen in the + wallet (under any label/account), yielded in newest-first order. Care + is taken to avoid yielding duplicates even when new transactions are + actively being added to the wallet while the iteration is ongoing. + """ + num, skip = 1, 0 + txs = self.list_transactions(num, skip) + if not txs: + return + yielded_tx = txs[0] + yield yielded_tx while True: - new = self._rpc( - 'listtransactions', - ["*", batch_size, iteration * batch_size, True]) - for tx in new: - yield tx - if len(new) < batch_size: + num *= 2 + txs = self.list_transactions(num, skip) + if not txs: + return + try: + idx = [(tx['txid'], tx['vout'], tx['category']) for tx in txs + ].index((yielded_tx['txid'], yielded_tx['vout'], + yielded_tx['category'])) + except ValueError: + skip += num + continue + for tx in reversed(txs[:idx]): + yielded_tx = tx # inefficient but more obvious + yield yielded_tx + if len(txs) < num: return - iteration += 1 + skip += num - 1 def get_deser_from_gettransaction(self, rpcretval): """Get full transaction deserialization from a call @@ -641,7 +659,7 @@ class BitcoinCoreNoHistoryInterface(BitcoinCoreInterface, RegtestBitcoinCoreMixi assert desc_str.startswith("addr(") return desc_str[5:desc_str.find(")")] - def _yield_transactions(self, wallet_name): + def _yield_transactions(self): for u in self.scan_result["unspents"]: tx = {"category": "receive", "address": self._get_addr_from_desc(u["desc"])} diff --git a/jmclient/jmclient/wallet_service.py b/jmclient/jmclient/wallet_service.py index 332523e..0b09d35 100644 --- a/jmclient/jmclient/wallet_service.py +++ b/jmclient/jmclient/wallet_service.py @@ -1,6 +1,7 @@ #! /usr/bin/env python import collections +import itertools import time import sys from decimal import Decimal @@ -74,18 +75,19 @@ class WalletService(Service): # Dicts of registered callbacks, by type # and then by txinfo, for events # on transactions. - self.callbacks = {} - self.callbacks["all"] = [] - self.callbacks["unconfirmed"] = {} - self.callbacks["confirmed"] = {} + self.callbacks = { + "all": [], # note: list, not dict + "unconfirmed": {}, + "confirmed": {}, + } self.restart_callback = None # transactions we are actively monitoring, # i.e. they are not new but we want to track: - self.active_txids = [] + self.active_txs = {} # to ensure transactions are only processed once: - self.processed_txids = [] + self.processed_txids = set() self.set_autofreeze_warning_cb() @@ -200,9 +202,12 @@ class WalletService(Service): # note that in this case, txid is ignored. self.callbacks["all"].extend(callbacks) elif cb_type in ["unconfirmed", "confirmed"]: - if txinfo not in self.callbacks[cb_type]: - self.callbacks[cb_type][txinfo] = [] - self.callbacks[cb_type][txinfo].extend(callbacks) + if callbacks: + reg = self.callbacks[cb_type].setdefault(txinfo, []) + if isinstance(reg, str): + # found a txid breadcrumb for this txinfo + reg = self.callbacks[cb_type].setdefault(reg, []) + reg.extend(callbacks) else: assert False, "Invalid argument: " + cb_type @@ -285,6 +290,23 @@ class WalletService(Service): self.autofreeze_warning_cb(utxo) self.disable_utxo(*utxo) + def _yield_new_transactions(self): + """ Constrains the sequence generated by bci._yield_transactions so + that it stops just before it would yield the newest transaction + previously yielded by _yield_new_transactions. + """ + since_txid = self.last_seen_txid + last = True + for tx in self.bci._yield_transactions(): + if 'txid' in tx: + txid = tx['txid'] + if txid == since_txid: + return + if last: + self.last_seen_txid = txid + last = False + yield tx + def transaction_monitor(self): """Keeps track of any changes in the wallet (new transactions). Intended to be run as a twisted task.LoopingCall so that this @@ -294,33 +316,19 @@ class WalletService(Service): if not self.update_blockheight(): return - txlist = self.bci.list_transactions(100) - if not txlist: - return - - new_txs = [] - for x in txlist: - # process either (a) a completely new tx or - # (b) a tx that reached unconf status but we are still - # waiting for conf (active_txids) - if "txid" not in x: - continue - if x['txid'] in self.active_txids or x['txid'] not in self.old_txs: - new_txs.append(x) - # reset for next polling event: - self.old_txs = set(x['txid'] for x in txlist if "txid" in x) # for this invocation of transaction_monitor, we *don't* want # to call `gettransaction` more than once per txid, even if the # `listtransactions` result has multiple instances for different # wallet labels; so we use a temporary variable to cache. - gettx_results = {} - for tx in new_txs: - txid = tx["txid"] - if txid not in gettx_results: - res = self.bci.get_transaction(hextobin(txid)) - gettx_results[txid] = res - else: - res = gettx_results[txid] + seen_txids = set() + wallet_name = self.get_wallet_name() + for txid in itertools.chain(list(self.active_txs), reversed( + [tx['txid'] for tx in self._yield_new_transactions() + if 'txid' in tx])): + if txid in seen_txids: + continue + seen_txids.add(txid) + res = self.bci.get_transaction(hextobin(txid)) if not res: continue confs = res["confirmations"] @@ -330,13 +338,18 @@ class WalletService(Service): if confs < 0: jlog.info( "Transaction: " + txid + " has a conflict, abandoning.") + self.active_txs.pop(txid, None) continue if confs == 0: + if txid in self.active_txs: + # avoid processing an unconfirmed tx we've already seen + continue height = None else: height = self.current_blockheight - confs + 1 - txd = self.bci.get_deser_from_gettransaction(res) + txd = self.active_txs.get(txid) or \ + self.bci.get_deser_from_gettransaction(res) if txd is None: continue removed_utxos, added_utxos = self.wallet.process_new_tx(txd, height) @@ -347,23 +360,31 @@ class WalletService(Service): # is absurdly fast, this is considered acceptable compared with # additional complexity. self.log_new_tx(removed_utxos, added_utxos, txid) - self.processed_txids.append(txid) + self.processed_txids.add(txid) # first fire 'all' type callbacks, irrespective of if the # transaction pertains to anything known (but must # have correct label per above); filter on this Joinmarket wallet label, # or the external monitoring label: - if (self.bci.is_address_labeled(tx, self.get_wallet_name()) or - self.bci.is_address_labeled(tx, self.EXTERNAL_WALLET_LABEL)): + if (any(self.bci.is_address_labeled(addr, wallet_name) or + self.bci.is_address_labeled(addr, + self.EXTERNAL_WALLET_LABEL) + for addr in res["details"])): for f in self.callbacks["all"]: # note we need no return value as we will never # remove these from the list f(txd, txid) - # The tuple given as the second possible key for the dict - # is such because txid is not always available - # at the time of callback registration). - possible_keys = [txid, tuple((x.scriptPubKey, x.nValue) for x in txd.vout)] + # txid is not always available at the time of callback registration. + # Migrate any callbacks registered under the provisional key, and + # leave a txid breadcrumb so check_callback_called can find it. + txos = tuple((x.scriptPubKey, x.nValue) for x in txd.vout) + for cb_type in ["unconfirmed", "confirmed"]: + callbacks = self.callbacks[cb_type] + reg = callbacks.get(txos) + if isinstance(reg, list): + callbacks.setdefault(txid, [])[:0] = reg + callbacks[txos] = txid # note that len(added_utxos) > 0 is not a sufficient condition for # the tx being new, since wallet.add_new_utxos will happily re-add @@ -373,27 +394,31 @@ class WalletService(Service): # Note also that it's entirely possible that there are only removals, # not additions, to the utxo set, specifically in sweeps to external # addresses. In this case, since removal can by definition only - # happen once, we must allow entries in self.active_txids through the + # happen once, we must allow txids in self.active_txs through the # filter. if len(added_utxos) > 0 or len(removed_utxos) > 0 \ - or txid in self.active_txids: + or txid in self.active_txs: if confs == 0: - for k in possible_keys: - if k in self.callbacks["unconfirmed"]: - for f in self.callbacks["unconfirmed"][k]: - # True implies success, implies removal: - if f(txd, txid): - self.callbacks["unconfirmed"][k].remove(f) - # keep monitoring for conf > 0: - self.active_txids.append(txid) + callbacks = [f for f in + self.callbacks["unconfirmed"].pop(txid, []) + if not f(txd, txid)] + if callbacks: + self.callbacks["unconfirmed"][txid] = callbacks + else: + self.callbacks["unconfirmed"].pop(txos, None) + if self.callbacks["confirmed"].get(txid): + # keep monitoring for conf > 0: + self.active_txs[txid] = txd elif confs > 0: - for k in possible_keys: - if k in self.callbacks["confirmed"]: - for f in self.callbacks["confirmed"][k]: - if f(txd, txid, confs): - self.callbacks["confirmed"][k].remove(f) - if txid in self.active_txids: - self.active_txids.remove(txid) + callbacks = [f for f in + self.callbacks["confirmed"].pop(txid, []) + if not f(txd, txid, confs)] + if callbacks: + self.callbacks["confirmed"][txid] = callbacks + else: + self.callbacks["confirmed"].pop(txos, None) + # no more callbacks registered; stop monitoring tx + self.active_txs.pop(txid, None) def check_callback_called(self, txinfo, callback, cbtype, msg): """ Intended to be a deferred Task to be scheduled some @@ -402,15 +427,27 @@ class WalletService(Service): If the callback was previously called, return True, otherwise False. """ assert cbtype in ["unconfirmed", "confirmed"] - if txinfo in self.callbacks[cbtype]: - if callback in self.callbacks[cbtype][txinfo]: + callbacks = self.callbacks[cbtype] + if isinstance(txinfo, str): + txid = txinfo + reg = callbacks.get(txid) + else: + txid = None + reg = callbacks.get(txinfo) + if isinstance(reg, str): + # found a txid breadcrumb for this txinfo + txid = reg + reg = callbacks.get(txid) + if reg: + if callback in reg: # the callback was not called, drop it and warn - self.callbacks[cbtype][txinfo].remove(callback) - # TODO - dangling txids in self.active_txids will - # be caused by this, but could also happen for - # other reasons; possibly add logic to ensure that - # this never occurs, although their presence should - # not cause a functional error. + reg.remove(callback) + if not reg: + del callbacks[txinfo] + if txid: + callbacks.pop(txid, None) + # no more callbacks registered; stop monitoring tx + self.active_txs.pop(txid, None) jlog.info("Timed out: " + msg) return False # if callback is not in the list, it was already @@ -454,8 +491,9 @@ class WalletService(Service): self.sync_unspent() # Don't attempt updates on transactions that existed # before startup - self.old_txs = [x['txid'] for x in self.bci.list_transactions(100) - if "txid" in x] + self.last_seen_txid = next( + (tx['txid'] for tx in self.bci._yield_transactions() + if 'txid' in tx), None) if isinstance(self.bci, BitcoinCoreNoHistoryInterface): self.bci.set_wallet_no_history(self.wallet) return self.synced @@ -668,14 +706,13 @@ class WalletService(Service): to this wallet, as a list. """ res = [] - processed_txids = [] - for r in self.bci._yield_transactions( - self.get_wallet_name()): + processed_txids = set() + for r in self.bci._yield_transactions(): txid = r["txid"] if txid not in processed_txids: tx = self.bci.get_transaction(hextobin(txid)) res.append(self.bci.get_deser_from_gettransaction(tx)) - processed_txids.append(txid) + processed_txids.add(txid) return res def get_transaction(self, txid): @@ -720,7 +757,7 @@ class WalletService(Service): if isinstance(self.wallet, FidelityBondMixin): tx_receive = [] burner_txes = [] - for tx in self.bci._yield_transactions(wallet_name): + for tx in self.bci._yield_transactions(): if tx['category'] == 'receive': tx_receive.append(tx) elif tx["category"] == "send": @@ -743,7 +780,7 @@ class WalletService(Service): else: #not fidelity bond wallet, significantly faster sync used_addresses_gen = set(tx['address'] - for tx in self.bci._yield_transactions(wallet_name) + for tx in self.bci._yield_transactions() if tx['category'] == 'receive') # needed for address-reuse check: self.used_addresses = used_addresses_gen