Browse Source

WalletService: reduce polling overhead

The transaction_monitor function in WalletService was polling the most
recent 100 transactions from the BlockchainInterface every 5 seconds.
This was egregious and inefficient. Introduce a _yield_new_transactions
function that remembers the newest previously yielded transaction
between invocations and yields only transactions newer than that one.
Use the new function in transaction_monitor.

Avoid repeatedly deserializing confirmed transactions that remain in the
set of monitored transactions after being confirmed either due to
"confirmed" callbacks that return False or because callbacks have timed
out and have been unregistered without cleaning the set of monitored
transactions. Change the active_txids list to an active_txs dict whose
keys are the txids and whose values are the deserialized transactions.
Use this new dict as a cache to avoid repeatedly deserializing confirmed
transactions.
master
Matt Whitlock 4 years ago
parent
commit
f52bf71740
  1. 88
      jmclient/jmclient/wallet_service.py

88
jmclient/jmclient/wallet_service.py

@ -1,6 +1,7 @@
#! /usr/bin/env python #! /usr/bin/env python
import collections import collections
import itertools
import time import time
import sys import sys
from decimal import Decimal from decimal import Decimal
@ -83,9 +84,9 @@ class WalletService(Service):
# transactions we are actively monitoring, # transactions we are actively monitoring,
# i.e. they are not new but we want to track: # i.e. they are not new but we want to track:
self.active_txids = [] self.active_txs = {}
# to ensure transactions are only processed once: # to ensure transactions are only processed once:
self.processed_txids = [] self.processed_txids = set()
self.set_autofreeze_warning_cb() self.set_autofreeze_warning_cb()
@ -285,6 +286,23 @@ class WalletService(Service):
self.autofreeze_warning_cb(utxo) self.autofreeze_warning_cb(utxo)
self.disable_utxo(*utxo) self.disable_utxo(*utxo)
def _yield_new_transactions(self):
""" Constrains the sequence generated by bci._yield_transactions so
that it stops just before it would yield the newest transaction
previously yielded by _yield_new_transactions.
"""
since_txid = self.last_seen_txid
last = True
for tx in self.bci._yield_transactions():
if 'txid' in tx:
txid = tx['txid']
if txid == since_txid:
return
if last:
self.last_seen_txid = txid
last = False
yield tx
def transaction_monitor(self): def transaction_monitor(self):
"""Keeps track of any changes in the wallet (new transactions). """Keeps track of any changes in the wallet (new transactions).
Intended to be run as a twisted task.LoopingCall so that this Intended to be run as a twisted task.LoopingCall so that this
@ -294,33 +312,19 @@ class WalletService(Service):
if not self.update_blockheight(): if not self.update_blockheight():
return return
txlist = self.bci.list_transactions(100)
if not txlist:
return
new_txs = []
for x in txlist:
# process either (a) a completely new tx or
# (b) a tx that reached unconf status but we are still
# waiting for conf (active_txids)
if "txid" not in x:
continue
if x['txid'] in self.active_txids or x['txid'] not in self.old_txs:
new_txs.append(x)
# reset for next polling event:
self.old_txs = set(x['txid'] for x in txlist if "txid" in x)
# for this invocation of transaction_monitor, we *don't* want # for this invocation of transaction_monitor, we *don't* want
# to call `gettransaction` more than once per txid, even if the # to call `gettransaction` more than once per txid, even if the
# `listtransactions` result has multiple instances for different # `listtransactions` result has multiple instances for different
# wallet labels; so we use a temporary variable to cache. # wallet labels; so we use a temporary variable to cache.
gettx_results = {} seen_txids = set()
for tx in new_txs: wallet_name = self.get_wallet_name()
txid = tx["txid"] for txid in itertools.chain(list(self.active_txs), reversed(
if txid not in gettx_results: [tx['txid'] for tx in self._yield_new_transactions()
res = self.bci.get_transaction(hextobin(txid)) if 'txid' in tx])):
gettx_results[txid] = res if txid in seen_txids:
else: continue
res = gettx_results[txid] seen_txids.add(txid)
res = self.bci.get_transaction(hextobin(txid))
if not res: if not res:
continue continue
confs = res["confirmations"] confs = res["confirmations"]
@ -330,13 +334,18 @@ class WalletService(Service):
if confs < 0: if confs < 0:
jlog.info( jlog.info(
"Transaction: " + txid + " has a conflict, abandoning.") "Transaction: " + txid + " has a conflict, abandoning.")
self.active_txs.pop(txid, None)
continue continue
if confs == 0: if confs == 0:
if txid in self.active_txs:
# avoid processing an unconfirmed tx we've already seen
continue
height = None height = None
else: else:
height = self.current_blockheight - confs + 1 height = self.current_blockheight - confs + 1
txd = self.bci.get_deser_from_gettransaction(res) txd = self.active_txs.get(txid) or \
self.bci.get_deser_from_gettransaction(res)
if txd is None: if txd is None:
continue continue
removed_utxos, added_utxos = self.wallet.process_new_tx(txd, height) removed_utxos, added_utxos = self.wallet.process_new_tx(txd, height)
@ -347,14 +356,16 @@ class WalletService(Service):
# is absurdly fast, this is considered acceptable compared with # is absurdly fast, this is considered acceptable compared with
# additional complexity. # additional complexity.
self.log_new_tx(removed_utxos, added_utxos, txid) self.log_new_tx(removed_utxos, added_utxos, txid)
self.processed_txids.append(txid) self.processed_txids.add(txid)
# first fire 'all' type callbacks, irrespective of if the # first fire 'all' type callbacks, irrespective of if the
# transaction pertains to anything known (but must # transaction pertains to anything known (but must
# have correct label per above); filter on this Joinmarket wallet label, # have correct label per above); filter on this Joinmarket wallet label,
# or the external monitoring label: # or the external monitoring label:
if (self.bci.is_address_labeled(tx, self.get_wallet_name()) or if (any(self.bci.is_address_labeled(addr, wallet_name) or
self.bci.is_address_labeled(tx, self.EXTERNAL_WALLET_LABEL)): self.bci.is_address_labeled(addr,
self.EXTERNAL_WALLET_LABEL)
for addr in res["details"])):
for f in self.callbacks["all"]: for f in self.callbacks["all"]:
# note we need no return value as we will never # note we need no return value as we will never
# remove these from the list # remove these from the list
@ -373,10 +384,10 @@ class WalletService(Service):
# Note also that it's entirely possible that there are only removals, # Note also that it's entirely possible that there are only removals,
# not additions, to the utxo set, specifically in sweeps to external # not additions, to the utxo set, specifically in sweeps to external
# addresses. In this case, since removal can by definition only # addresses. In this case, since removal can by definition only
# happen once, we must allow entries in self.active_txids through the # happen once, we must allow txids in self.active_txs through the
# filter. # filter.
if len(added_utxos) > 0 or len(removed_utxos) > 0 \ if len(added_utxos) > 0 or len(removed_utxos) > 0 \
or txid in self.active_txids: or txid in self.active_txs:
if confs == 0: if confs == 0:
for k in possible_keys: for k in possible_keys:
if k in self.callbacks["unconfirmed"]: if k in self.callbacks["unconfirmed"]:
@ -385,7 +396,7 @@ class WalletService(Service):
if f(txd, txid): if f(txd, txid):
self.callbacks["unconfirmed"][k].remove(f) self.callbacks["unconfirmed"][k].remove(f)
# keep monitoring for conf > 0: # keep monitoring for conf > 0:
self.active_txids.append(txid) self.active_txs[txid] = txd
elif confs > 0: elif confs > 0:
for k in possible_keys: for k in possible_keys:
if k in self.callbacks["confirmed"]: if k in self.callbacks["confirmed"]:
@ -393,7 +404,7 @@ class WalletService(Service):
if f(txd, txid, confs): if f(txd, txid, confs):
self.callbacks["confirmed"][k].remove(f) self.callbacks["confirmed"][k].remove(f)
if txid in self.active_txids: if txid in self.active_txids:
self.active_txids.remove(txid) self.active_txs.pop(txid, None)
def check_callback_called(self, txinfo, callback, cbtype, msg): def check_callback_called(self, txinfo, callback, cbtype, msg):
""" Intended to be a deferred Task to be scheduled some """ Intended to be a deferred Task to be scheduled some
@ -406,7 +417,7 @@ class WalletService(Service):
if callback in self.callbacks[cbtype][txinfo]: if callback in self.callbacks[cbtype][txinfo]:
# the callback was not called, drop it and warn # the callback was not called, drop it and warn
self.callbacks[cbtype][txinfo].remove(callback) self.callbacks[cbtype][txinfo].remove(callback)
# TODO - dangling txids in self.active_txids will # TODO - dangling txids in self.active_txs will
# be caused by this, but could also happen for # be caused by this, but could also happen for
# other reasons; possibly add logic to ensure that # other reasons; possibly add logic to ensure that
# this never occurs, although their presence should # this never occurs, although their presence should
@ -454,8 +465,9 @@ class WalletService(Service):
self.sync_unspent() self.sync_unspent()
# Don't attempt updates on transactions that existed # Don't attempt updates on transactions that existed
# before startup # before startup
self.old_txs = [x['txid'] for x in self.bci.list_transactions(100) self.last_seen_txid = next(
if "txid" in x] (tx['txid'] for tx in self.bci._yield_transactions()
if 'txid' in tx), None)
if isinstance(self.bci, BitcoinCoreNoHistoryInterface): if isinstance(self.bci, BitcoinCoreNoHistoryInterface):
self.bci.set_wallet_no_history(self.wallet) self.bci.set_wallet_no_history(self.wallet)
return self.synced return self.synced
@ -668,13 +680,13 @@ class WalletService(Service):
to this wallet, as a list. to this wallet, as a list.
""" """
res = [] res = []
processed_txids = [] processed_txids = set()
for r in self.bci._yield_transactions(): for r in self.bci._yield_transactions():
txid = r["txid"] txid = r["txid"]
if txid not in processed_txids: if txid not in processed_txids:
tx = self.bci.get_transaction(hextobin(txid)) tx = self.bci.get_transaction(hextobin(txid))
res.append(self.bci.get_deser_from_gettransaction(tx)) res.append(self.bci.get_deser_from_gettransaction(tx))
processed_txids.append(txid) processed_txids.add(txid)
return res return res
def get_transaction(self, txid): def get_transaction(self, txid):

Loading…
Cancel
Save