diff --git a/jmclient/jmclient/wallet_service.py b/jmclient/jmclient/wallet_service.py index d0b7952..ce5d325 100644 --- a/jmclient/jmclient/wallet_service.py +++ b/jmclient/jmclient/wallet_service.py @@ -1,6 +1,7 @@ #! /usr/bin/env python import collections +import itertools import time import sys from decimal import Decimal @@ -83,9 +84,9 @@ class WalletService(Service): # transactions we are actively monitoring, # i.e. they are not new but we want to track: - self.active_txids = [] + self.active_txs = {} # to ensure transactions are only processed once: - self.processed_txids = [] + self.processed_txids = set() self.set_autofreeze_warning_cb() @@ -285,6 +286,23 @@ class WalletService(Service): self.autofreeze_warning_cb(utxo) self.disable_utxo(*utxo) + def _yield_new_transactions(self): + """ Constrains the sequence generated by bci._yield_transactions so + that it stops just before it would yield the newest transaction + previously yielded by _yield_new_transactions. + """ + since_txid = self.last_seen_txid + last = True + for tx in self.bci._yield_transactions(): + if 'txid' in tx: + txid = tx['txid'] + if txid == since_txid: + return + if last: + self.last_seen_txid = txid + last = False + yield tx + def transaction_monitor(self): """Keeps track of any changes in the wallet (new transactions). Intended to be run as a twisted task.LoopingCall so that this @@ -294,33 +312,19 @@ class WalletService(Service): if not self.update_blockheight(): return - txlist = self.bci.list_transactions(100) - if not txlist: - return - - new_txs = [] - for x in txlist: - # process either (a) a completely new tx or - # (b) a tx that reached unconf status but we are still - # waiting for conf (active_txids) - if "txid" not in x: - continue - if x['txid'] in self.active_txids or x['txid'] not in self.old_txs: - new_txs.append(x) - # reset for next polling event: - self.old_txs = set(x['txid'] for x in txlist if "txid" in x) # for this invocation of transaction_monitor, we *don't* want # to call `gettransaction` more than once per txid, even if the # `listtransactions` result has multiple instances for different # wallet labels; so we use a temporary variable to cache. - gettx_results = {} - for tx in new_txs: - txid = tx["txid"] - if txid not in gettx_results: - res = self.bci.get_transaction(hextobin(txid)) - gettx_results[txid] = res - else: - res = gettx_results[txid] + seen_txids = set() + wallet_name = self.get_wallet_name() + for txid in itertools.chain(list(self.active_txs), reversed( + [tx['txid'] for tx in self._yield_new_transactions() + if 'txid' in tx])): + if txid in seen_txids: + continue + seen_txids.add(txid) + res = self.bci.get_transaction(hextobin(txid)) if not res: continue confs = res["confirmations"] @@ -330,13 +334,18 @@ class WalletService(Service): if confs < 0: jlog.info( "Transaction: " + txid + " has a conflict, abandoning.") + self.active_txs.pop(txid, None) continue if confs == 0: + if txid in self.active_txs: + # avoid processing an unconfirmed tx we've already seen + continue height = None else: height = self.current_blockheight - confs + 1 - txd = self.bci.get_deser_from_gettransaction(res) + txd = self.active_txs.get(txid) or \ + self.bci.get_deser_from_gettransaction(res) if txd is None: continue removed_utxos, added_utxos = self.wallet.process_new_tx(txd, height) @@ -347,14 +356,16 @@ class WalletService(Service): # is absurdly fast, this is considered acceptable compared with # additional complexity. self.log_new_tx(removed_utxos, added_utxos, txid) - self.processed_txids.append(txid) + self.processed_txids.add(txid) # first fire 'all' type callbacks, irrespective of if the # transaction pertains to anything known (but must # have correct label per above); filter on this Joinmarket wallet label, # or the external monitoring label: - if (self.bci.is_address_labeled(tx, self.get_wallet_name()) or - self.bci.is_address_labeled(tx, self.EXTERNAL_WALLET_LABEL)): + if (any(self.bci.is_address_labeled(addr, wallet_name) or + self.bci.is_address_labeled(addr, + self.EXTERNAL_WALLET_LABEL) + for addr in res["details"])): for f in self.callbacks["all"]: # note we need no return value as we will never # remove these from the list @@ -373,10 +384,10 @@ class WalletService(Service): # Note also that it's entirely possible that there are only removals, # not additions, to the utxo set, specifically in sweeps to external # addresses. In this case, since removal can by definition only - # happen once, we must allow entries in self.active_txids through the + # happen once, we must allow txids in self.active_txs through the # filter. if len(added_utxos) > 0 or len(removed_utxos) > 0 \ - or txid in self.active_txids: + or txid in self.active_txs: if confs == 0: for k in possible_keys: if k in self.callbacks["unconfirmed"]: @@ -385,7 +396,7 @@ class WalletService(Service): if f(txd, txid): self.callbacks["unconfirmed"][k].remove(f) # keep monitoring for conf > 0: - self.active_txids.append(txid) + self.active_txs[txid] = txd elif confs > 0: for k in possible_keys: if k in self.callbacks["confirmed"]: @@ -393,7 +404,7 @@ class WalletService(Service): if f(txd, txid, confs): self.callbacks["confirmed"][k].remove(f) if txid in self.active_txids: - self.active_txids.remove(txid) + self.active_txs.pop(txid, None) def check_callback_called(self, txinfo, callback, cbtype, msg): """ Intended to be a deferred Task to be scheduled some @@ -406,7 +417,7 @@ class WalletService(Service): if callback in self.callbacks[cbtype][txinfo]: # the callback was not called, drop it and warn self.callbacks[cbtype][txinfo].remove(callback) - # TODO - dangling txids in self.active_txids will + # TODO - dangling txids in self.active_txs will # be caused by this, but could also happen for # other reasons; possibly add logic to ensure that # this never occurs, although their presence should @@ -454,8 +465,9 @@ class WalletService(Service): self.sync_unspent() # Don't attempt updates on transactions that existed # before startup - self.old_txs = [x['txid'] for x in self.bci.list_transactions(100) - if "txid" in x] + self.last_seen_txid = next( + (tx['txid'] for tx in self.bci._yield_transactions() + if 'txid' in tx), None) if isinstance(self.bci, BitcoinCoreNoHistoryInterface): self.bci.set_wallet_no_history(self.wallet) return self.synced @@ -668,13 +680,13 @@ class WalletService(Service): to this wallet, as a list. """ res = [] - processed_txids = [] + processed_txids = set() for r in self.bci._yield_transactions(): txid = r["txid"] if txid not in processed_txids: tx = self.bci.get_transaction(hextobin(txid)) res.append(self.bci.get_deser_from_gettransaction(tx)) - processed_txids.append(txid) + processed_txids.add(txid) return res def get_transaction(self, txid):