From 29d8d8de2619bba9f87c2cc6ca3e68ae2b33031f Mon Sep 17 00:00:00 2001 From: SomberNight Date: Fri, 10 Jun 2022 18:55:55 +0200 Subject: [PATCH] wallet: (fix) cannot just piggyback on adb.is_up_to_date() The wallet needs its own up_to_date logic: - the adb being up_to_date means all its addresses are synced - but an HD wallet might decide to roll the gap limit and generate new addresses - the adb does not know about this... - the wallet should be considered *not* up_to_date - relatedly, it is now the wallet that decides to reset the network request counters - note that wallet.main() was never cleaned up previously. - now wallet gets its own taskgroup, which is cleaned up in wallet.stop. Follow-up to adb refactor: https://github.com/spesmilo/electrum/commit/121d8732f1e1d97f70a0f72221b4acca1e818319 --- electrum/address_synchronizer.py | 12 +++--- electrum/wallet.py | 66 +++++++++++++++++++++++--------- 2 files changed, 53 insertions(+), 25 deletions(-) diff --git a/electrum/address_synchronizer.py b/electrum/address_synchronizer.py index 1bc933166..8624caf38 100644 --- a/electrum/address_synchronizer.py +++ b/electrum/address_synchronizer.py @@ -672,12 +672,6 @@ class AddressSynchronizer(Logger): with self.lock: status_changed = self._up_to_date != up_to_date self._up_to_date = up_to_date - # reset sync state progress indicator - if up_to_date: - if self.synchronizer: - self.synchronizer.reset_request_counters() - if self.verifier: - self.verifier.reset_request_counters() # fire triggers util.trigger_callback('adb_set_up_to_date', self) if status_changed: @@ -686,6 +680,12 @@ class AddressSynchronizer(Logger): def is_up_to_date(self): return self._up_to_date + def reset_netrequest_counters(self) -> None: + if self.synchronizer: + self.synchronizer.reset_request_counters() + if self.verifier: + self.verifier.reset_request_counters() + def get_history_sync_state_details(self) -> Tuple[int, int]: nsent, nans = 0, 0 if self.synchronizer: diff --git a/electrum/wallet.py b/electrum/wallet.py index 13664b100..ee38265a4 100644 --- a/electrum/wallet.py +++ b/electrum/wallet.py @@ -47,13 +47,13 @@ import threading import enum import asyncio -from aiorpcx import timeout_after, TaskTimeout, ignore_after +from aiorpcx import timeout_after, TaskTimeout, ignore_after, run_in_thread from .i18n import _ from .bip32 import BIP32Node, convert_bip32_intpath_to_strpath, convert_bip32_path_to_list_of_uint32 from .crypto import sha256 from . import util -from .util import (NotEnoughFunds, UserCancelled, profiler, OldTaskGroup, +from .util import (NotEnoughFunds, UserCancelled, profiler, OldTaskGroup, ignore_exceptions, format_satoshis, format_fee_satoshis, NoDynamicFeeEstimates, WalletFileException, BitcoinException, InvalidPassword, format_time, timestamp_to_datetime, Satoshis, @@ -79,7 +79,7 @@ from .invoices import PR_PAID, PR_UNPAID, PR_UNKNOWN, PR_EXPIRED, PR_UNCONFIRMED from .contacts import Contacts from .interface import NetworkException from .mnemonic import Mnemonic -from .logging import get_logger +from .logging import get_logger, Logger from .lnworker import LNWallet from .paymentrequest import PaymentRequest from .util import read_json_file, write_json_file, UserFacingException @@ -267,7 +267,7 @@ class TxWalletDetails(NamedTuple): is_lightning_funding_tx: bool -class Abstract_Wallet(ABC): +class Abstract_Wallet(ABC, Logger): """ Wallet classes are created to handle various address generation methods. Completion states (watching-only, single account, no seed, etc) are handled inside classes. @@ -292,6 +292,7 @@ class Abstract_Wallet(ABC): # load addresses needs to be called before constructor for sanity checks db.load_addresses(self.wallet_type) self.keystore = None # type: Optional[KeyStore] # will be set by load_keystore + Logger.__init__(self) self.network = None self.adb = AddressSynchronizer(db, config) @@ -300,6 +301,8 @@ class Abstract_Wallet(ABC): self.lock = self.adb.lock self.transaction_lock = self.adb.transaction_lock + self.taskgroup = OldTaskGroup() + # saved fields self.use_change = db.get('use_change', True) self.multiple_change = db.get('multiple_change', False) @@ -321,25 +324,45 @@ class Abstract_Wallet(ABC): self.contacts = Contacts(self.db) self._coin_price_cache = {} + # true when synchronized. this is stricter than adb.is_up_to_date(): + # to-be-generated (HD) addresses are also considered here (gap-limit-roll-forward) + self._up_to_date = False + self.lnworker = None self.load_keystore() self.test_addresses_sanity() # callbacks - util.register_callback(self.on_adb_set_up_to_date, ['adb_set_up_to_date']) util.register_callback(self.on_adb_added_tx, ['adb_added_tx']) util.register_callback(self.on_adb_added_verified_tx, ['adb_added_verified_tx']) util.register_callback(self.on_adb_removed_verified_tx, ['adb_removed_verified_tx']) - async def main(self): - from aiorpcx import run_in_thread - # calls synchronize + @ignore_exceptions # don't kill outer taskgroup + async def main_loop(self): + self.logger.info("starting taskgroup.") + try: + async with self.taskgroup as group: + await group.spawn(asyncio.Event().wait) # run forever (until cancel) + await group.spawn(self.do_synchronize_loop()) + except Exception as e: + self.logger.exception("taskgroup died.") + finally: + self.logger.info("taskgroup stopped.") + + async def do_synchronize_loop(self): + """Generates new deterministic addresses if needed (gap limit roll-forward), + and sets up_to_date. + """ while True: + # polling. + # TODO if adb had "up_to_date_changed" asyncio.Event(), we could *also* trigger on that. + # The polling would still be useful as often need to gen new addrs while adb.is_up_to_date() is False await asyncio.sleep(0.1) # note: we only generate new HD addresses if the existing ones - # have history that are mined and SPV-verified. This inherently couples - # the Sychronizer and the Verifier. + # have history that are mined and SPV-verified. num_new_addrs = await run_in_thread(self.synchronize) up_to_date = self.adb.is_up_to_date() and num_new_addrs == 0 + if self.is_up_to_date() != up_to_date: + self.set_up_to_date(up_to_date) def save_db(self): if self.storage: @@ -398,32 +421,37 @@ class Abstract_Wallet(ABC): async def stop(self): """Stop all networking and save DB to disk.""" - util.unregister_callback(self.on_adb_set_up_to_date) util.unregister_callback(self.on_adb_added_tx) util.unregister_callback(self.on_adb_added_verified_tx) util.unregister_callback(self.on_adb_removed_verified_tx) try: async with ignore_after(5): - await self.adb.stop() if self.network: if self.lnworker: await self.lnworker.stop() self.lnworker = None + await self.adb.stop() + await self.taskgroup.cancel_remaining() finally: # even if we get cancelled if any([ks.is_requesting_to_be_rewritten_to_wallet_file for ks in self.get_keystores()]): self.save_keystore() self.save_db() - def is_up_to_date(self): - return self.adb.is_up_to_date() + def is_up_to_date(self) -> bool: + return self._up_to_date - def on_adb_set_up_to_date(self, event, adb): - if adb != self.adb: - return - if adb.is_up_to_date(): + def set_up_to_date(self, up_to_date: bool) -> None: + with self.lock: + status_changed = self._up_to_date != up_to_date + self._up_to_date = up_to_date + if up_to_date: + self.adb.reset_netrequest_counters() # sync progress indicator self.save_db() + # fire triggers util.trigger_callback('wallet_updated', self) util.trigger_callback('status') + if status_changed: + self.logger.info(f'set_up_to_date: {up_to_date}') def on_adb_added_tx(self, event, adb, tx_hash): if self.adb != adb: @@ -460,7 +488,7 @@ class Abstract_Wallet(ABC): def start_network(self, network): self.network = network if network: - asyncio.run_coroutine_threadsafe(self.main(), self.network.asyncio_loop) + asyncio.run_coroutine_threadsafe(self.main_loop(), self.network.asyncio_loop) self.adb.start_network(network) if self.lnworker: self.lnworker.start_network(network)