From 29a0560f98c85a7c804401d79e2d26119259eb93 Mon Sep 17 00:00:00 2001 From: SomberNight Date: Wed, 21 Dec 2022 15:23:11 +0000 Subject: [PATCH] rework AddressSynchronizer.is_up_to_date - AddressSynchronizer no longer has its own state re up_to_date, it defers to Synchronizer/Verifier instead - Synchronizer is now tracking address sync states throughout their lifecycle: and Synchronizer.is_up_to_date() checks all states - Synchronizer.add_queue (internal) is removed as it was redundant - should fix wallet.is_up_to_date flickering during sync due to race related: dc6c4814068809b6f6d2df9aac0fa90d827a15f2 1c20a29a2205c97f0ee4e7373653831567be7bc6 --- electrum/address_synchronizer.py | 15 ++--- electrum/synchronizer.py | 100 +++++++++++++++++-------------- 2 files changed, 60 insertions(+), 55 deletions(-) diff --git a/electrum/address_synchronizer.py b/electrum/address_synchronizer.py index bb5863db8..cc45c5826 100644 --- a/electrum/address_synchronizer.py +++ b/electrum/address_synchronizer.py @@ -84,8 +84,6 @@ class AddressSynchronizer(Logger, EventListener): self.unverified_tx = defaultdict(int) # type: Dict[str, int] # txid -> height. Access with self.lock. # Txs the server claims are in the mempool: self.unconfirmed_tx = defaultdict(int) # type: Dict[str, int] # txid -> height. Access with self.lock. - # true when synchronized - self._up_to_date = False # considers both Synchronizer and Verifier # thread local storage for caching stuff self.threadlocal_cache = threading.local() @@ -210,9 +208,9 @@ class AddressSynchronizer(Logger, EventListener): def add_address(self, address): if address not in self.db.history: self.db.history[address] = [] - self.set_up_to_date(False) if self.synchronizer: self.synchronizer.add(address) + self.up_to_date_changed() def get_conflicting_transactions(self, tx_hash, tx: Transaction, include_self=False): """Returns a set of transaction hashes from the wallet history that are @@ -677,17 +675,14 @@ class AddressSynchronizer(Logger, EventListener): # local transaction return TxMinedInfo(height=TX_HEIGHT_LOCAL, conf=0) - def set_up_to_date(self, up_to_date): - with self.lock: - status_changed = self._up_to_date != up_to_date - self._up_to_date = up_to_date + def up_to_date_changed(self) -> None: # fire triggers util.trigger_callback('adb_set_up_to_date', self) - if status_changed: - self.logger.info(f'set_up_to_date: {up_to_date}') def is_up_to_date(self): - return self._up_to_date + if not self.synchronizer or not self.verifier: + return False + return self.synchronizer.is_up_to_date() and self.verifier.is_up_to_date() def reset_netrequest_counters(self) -> None: if self.synchronizer: diff --git a/electrum/synchronizer.py b/electrum/synchronizer.py index b5e9b0ed2..adb8d480f 100644 --- a/electrum/synchronizer.py +++ b/electrum/synchronizer.py @@ -65,18 +65,18 @@ class SynchronizerBase(NetworkJobOnDefaultServer): def _reset(self): super()._reset() + self._adding_addrs = set() self.requested_addrs = set() + self._handling_addr_statuses = set() self.scripthash_to_address = {} self._processed_some_notifications = False # so that we don't miss them # Queues - self.add_queue = asyncio.Queue() self.status_queue = asyncio.Queue() async def _run_tasks(self, *, taskgroup): await super()._run_tasks(taskgroup=taskgroup) try: async with taskgroup as group: - await group.spawn(self.send_subscriptions()) await group.spawn(self.handle_status()) await group.spawn(self.main()) finally: @@ -84,43 +84,44 @@ class SynchronizerBase(NetworkJobOnDefaultServer): self.session.unsubscribe(self.status_queue) def add(self, addr): - # FIXME is_up_to_date does not take addr into account until _add_address executes + if not is_address(addr): raise ValueError(f"invalid bitcoin address {addr}") + self._adding_addrs.add(addr) # this lets is_up_to_date already know about addr asyncio.run_coroutine_threadsafe(self._add_address(addr), self.asyncio_loop) async def _add_address(self, addr: str): - # note: this method is async as add_queue.put_nowait is not thread-safe. - if not is_address(addr): raise ValueError(f"invalid bitcoin address {addr}") - if addr in self.requested_addrs: return - self.requested_addrs.add(addr) - self.add_queue.put_nowait(addr) + try: + if not is_address(addr): raise ValueError(f"invalid bitcoin address {addr}") + if addr in self.requested_addrs: return + self.requested_addrs.add(addr) + await self.taskgroup.spawn(self._subscribe_to_address, addr) + finally: + self._adding_addrs.discard(addr) # ok for addr not to be present async def _on_address_status(self, addr, status): - """Handle the change of the status of an address.""" + """Handle the change of the status of an address. + Should remove addr from self._handling_addr_statuses when done. + """ raise NotImplementedError() # implemented by subclasses - async def send_subscriptions(self): - async def subscribe_to_address(addr): - h = address_to_scripthash(addr) - self.scripthash_to_address[h] = addr - self._requests_sent += 1 - try: - async with self._network_request_semaphore: - await self.session.subscribe('blockchain.scripthash.subscribe', [h], self.status_queue) - except RPCError as e: - if e.message == 'history too large': # no unique error code - raise GracefulDisconnect(e, log_level=logging.ERROR) from e - raise - self._requests_answered += 1 - self.requested_addrs.remove(addr) - - while True: - addr = await self.add_queue.get() - await self.taskgroup.spawn(subscribe_to_address, addr) + async def _subscribe_to_address(self, addr): + h = address_to_scripthash(addr) + self.scripthash_to_address[h] = addr + self._requests_sent += 1 + try: + async with self._network_request_semaphore: + await self.session.subscribe('blockchain.scripthash.subscribe', [h], self.status_queue) + except RPCError as e: + if e.message == 'history too large': # no unique error code + raise GracefulDisconnect(e, log_level=logging.ERROR) from e + raise + self._requests_answered += 1 async def handle_status(self): while True: h, status = await self.status_queue.get() addr = self.scripthash_to_address[h] + self._handling_addr_statuses.add(addr) + self.requested_addrs.discard(addr) # ok for addr not to be present await self.taskgroup.spawn(self._on_address_status, addr, status) self._processed_some_notifications = True @@ -142,6 +143,7 @@ class Synchronizer(SynchronizerBase): def _reset(self): super()._reset() + self._init_done = False self.requested_tx = {} self.requested_histories = set() self._stale_histories = dict() # type: Dict[str, asyncio.Task] @@ -150,22 +152,29 @@ class Synchronizer(SynchronizerBase): return self.adb.diagnostic_name() def is_up_to_date(self): - return (not self.requested_addrs + return (self._init_done + and not self._adding_addrs + and not self.requested_addrs + and not self._handling_addr_statuses and not self.requested_histories and not self.requested_tx - and not self._stale_histories) + and not self._stale_histories + and self.status_queue.empty()) async def _on_address_status(self, addr, status): - history = self.adb.db.get_addr_history(addr) - if history_status(history) == status: - return - # No point in requesting history twice for the same announced status. - # However if we got announced a new status, we should request history again: - if (addr, status) in self.requested_histories: - return - # request address history - self.requested_histories.add((addr, status)) - self._stale_histories.pop(addr, asyncio.Future()).cancel() + try: + history = self.adb.db.get_addr_history(addr) + if history_status(history) == status: + return + # No point in requesting history twice for the same announced status. + # However if we got announced a new status, we should request history again: + if (addr, status) in self.requested_histories: + return + # request address history + self.requested_histories.add((addr, status)) + self._stale_histories.pop(addr, asyncio.Future()).cancel() + finally: + self._handling_addr_statuses.discard(addr) h = address_to_scripthash(addr) self._requests_sent += 1 async with self._network_request_semaphore: @@ -236,7 +245,7 @@ class Synchronizer(SynchronizerBase): self.logger.info(f"received tx {tx_hash} height: {tx_height} bytes: {len(raw_tx)}") async def main(self): - self.adb.set_up_to_date(False) + self.adb.up_to_date_changed() # request missing txns, if any for addr in random_shuffled_copy(self.adb.db.get_history()): history = self.adb.db.get_addr_history(addr) @@ -248,16 +257,17 @@ class Synchronizer(SynchronizerBase): for addr in random_shuffled_copy(self.adb.get_addresses()): await self._add_address(addr) # main loop + self._init_done = True + prev_uptodate = False while True: await asyncio.sleep(0.1) - hist_done = self.is_up_to_date() - spv_done = self.adb.verifier.is_up_to_date() if self.adb.verifier else True - up_to_date = hist_done and spv_done + up_to_date = self.adb.is_up_to_date() # see if status changed - if (up_to_date != self.adb.is_up_to_date() + if (up_to_date != prev_uptodate or up_to_date and self._processed_some_notifications): self._processed_some_notifications = False - self.adb.set_up_to_date(up_to_date) + self.adb.up_to_date_changed() + prev_uptodate = up_to_date class Notifier(SynchronizerBase):