Browse Source

WalletService: reduce polling overhead

The transaction_monitor function in WalletService was polling the most
recent 100 transactions from the BlockchainInterface every 5 seconds.
This was egregious and inefficient. Introduce a _yield_new_transactions
function that remembers the newest previously yielded transaction
between invocations and yields only transactions newer than that one.
Use the new function in transaction_monitor.

Avoid repeatedly deserializing confirmed transactions that remain in the
set of monitored transactions after being confirmed either due to
"confirmed" callbacks that return False or because callbacks have timed
out and have been unregistered without cleaning the set of monitored
transactions. Change the active_txids list to an active_txs dict whose
keys are the txids and whose values are the deserialized transactions.
Use this new dict as a cache to avoid repeatedly deserializing confirmed
transactions.
master
Matt Whitlock 4 years ago
parent
commit
f52bf71740
  1. 86
      jmclient/jmclient/wallet_service.py

86
jmclient/jmclient/wallet_service.py

@ -1,6 +1,7 @@
#! /usr/bin/env python
import collections
import itertools
import time
import sys
from decimal import Decimal
@ -83,9 +84,9 @@ class WalletService(Service):
# transactions we are actively monitoring,
# i.e. they are not new but we want to track:
self.active_txids = []
self.active_txs = {}
# to ensure transactions are only processed once:
self.processed_txids = []
self.processed_txids = set()
self.set_autofreeze_warning_cb()
@ -285,6 +286,23 @@ class WalletService(Service):
self.autofreeze_warning_cb(utxo)
self.disable_utxo(*utxo)
def _yield_new_transactions(self):
""" Constrains the sequence generated by bci._yield_transactions so
that it stops just before it would yield the newest transaction
previously yielded by _yield_new_transactions.
"""
since_txid = self.last_seen_txid
last = True
for tx in self.bci._yield_transactions():
if 'txid' in tx:
txid = tx['txid']
if txid == since_txid:
return
if last:
self.last_seen_txid = txid
last = False
yield tx
def transaction_monitor(self):
"""Keeps track of any changes in the wallet (new transactions).
Intended to be run as a twisted task.LoopingCall so that this
@ -294,33 +312,19 @@ class WalletService(Service):
if not self.update_blockheight():
return
txlist = self.bci.list_transactions(100)
if not txlist:
return
new_txs = []
for x in txlist:
# process either (a) a completely new tx or
# (b) a tx that reached unconf status but we are still
# waiting for conf (active_txids)
if "txid" not in x:
continue
if x['txid'] in self.active_txids or x['txid'] not in self.old_txs:
new_txs.append(x)
# reset for next polling event:
self.old_txs = set(x['txid'] for x in txlist if "txid" in x)
# for this invocation of transaction_monitor, we *don't* want
# to call `gettransaction` more than once per txid, even if the
# `listtransactions` result has multiple instances for different
# wallet labels; so we use a temporary variable to cache.
gettx_results = {}
for tx in new_txs:
txid = tx["txid"]
if txid not in gettx_results:
seen_txids = set()
wallet_name = self.get_wallet_name()
for txid in itertools.chain(list(self.active_txs), reversed(
[tx['txid'] for tx in self._yield_new_transactions()
if 'txid' in tx])):
if txid in seen_txids:
continue
seen_txids.add(txid)
res = self.bci.get_transaction(hextobin(txid))
gettx_results[txid] = res
else:
res = gettx_results[txid]
if not res:
continue
confs = res["confirmations"]
@ -330,13 +334,18 @@ class WalletService(Service):
if confs < 0:
jlog.info(
"Transaction: " + txid + " has a conflict, abandoning.")
self.active_txs.pop(txid, None)
continue
if confs == 0:
if txid in self.active_txs:
# avoid processing an unconfirmed tx we've already seen
continue
height = None
else:
height = self.current_blockheight - confs + 1
txd = self.bci.get_deser_from_gettransaction(res)
txd = self.active_txs.get(txid) or \
self.bci.get_deser_from_gettransaction(res)
if txd is None:
continue
removed_utxos, added_utxos = self.wallet.process_new_tx(txd, height)
@ -347,14 +356,16 @@ class WalletService(Service):
# is absurdly fast, this is considered acceptable compared with
# additional complexity.
self.log_new_tx(removed_utxos, added_utxos, txid)
self.processed_txids.append(txid)
self.processed_txids.add(txid)
# first fire 'all' type callbacks, irrespective of if the
# transaction pertains to anything known (but must
# have correct label per above); filter on this Joinmarket wallet label,
# or the external monitoring label:
if (self.bci.is_address_labeled(tx, self.get_wallet_name()) or
self.bci.is_address_labeled(tx, self.EXTERNAL_WALLET_LABEL)):
if (any(self.bci.is_address_labeled(addr, wallet_name) or
self.bci.is_address_labeled(addr,
self.EXTERNAL_WALLET_LABEL)
for addr in res["details"])):
for f in self.callbacks["all"]:
# note we need no return value as we will never
# remove these from the list
@ -373,10 +384,10 @@ class WalletService(Service):
# Note also that it's entirely possible that there are only removals,
# not additions, to the utxo set, specifically in sweeps to external
# addresses. In this case, since removal can by definition only
# happen once, we must allow entries in self.active_txids through the
# happen once, we must allow txids in self.active_txs through the
# filter.
if len(added_utxos) > 0 or len(removed_utxos) > 0 \
or txid in self.active_txids:
or txid in self.active_txs:
if confs == 0:
for k in possible_keys:
if k in self.callbacks["unconfirmed"]:
@ -385,7 +396,7 @@ class WalletService(Service):
if f(txd, txid):
self.callbacks["unconfirmed"][k].remove(f)
# keep monitoring for conf > 0:
self.active_txids.append(txid)
self.active_txs[txid] = txd
elif confs > 0:
for k in possible_keys:
if k in self.callbacks["confirmed"]:
@ -393,7 +404,7 @@ class WalletService(Service):
if f(txd, txid, confs):
self.callbacks["confirmed"][k].remove(f)
if txid in self.active_txids:
self.active_txids.remove(txid)
self.active_txs.pop(txid, None)
def check_callback_called(self, txinfo, callback, cbtype, msg):
""" Intended to be a deferred Task to be scheduled some
@ -406,7 +417,7 @@ class WalletService(Service):
if callback in self.callbacks[cbtype][txinfo]:
# the callback was not called, drop it and warn
self.callbacks[cbtype][txinfo].remove(callback)
# TODO - dangling txids in self.active_txids will
# TODO - dangling txids in self.active_txs will
# be caused by this, but could also happen for
# other reasons; possibly add logic to ensure that
# this never occurs, although their presence should
@ -454,8 +465,9 @@ class WalletService(Service):
self.sync_unspent()
# Don't attempt updates on transactions that existed
# before startup
self.old_txs = [x['txid'] for x in self.bci.list_transactions(100)
if "txid" in x]
self.last_seen_txid = next(
(tx['txid'] for tx in self.bci._yield_transactions()
if 'txid' in tx), None)
if isinstance(self.bci, BitcoinCoreNoHistoryInterface):
self.bci.set_wallet_no_history(self.wallet)
return self.synced
@ -668,13 +680,13 @@ class WalletService(Service):
to this wallet, as a list.
"""
res = []
processed_txids = []
processed_txids = set()
for r in self.bci._yield_transactions():
txid = r["txid"]
if txid not in processed_txids:
tx = self.bci.get_transaction(hextobin(txid))
res.append(self.bci.get_deser_from_gettransaction(tx))
processed_txids.append(txid)
processed_txids.add(txid)
return res
def get_transaction(self, txid):

Loading…
Cancel
Save