From 1c20a29a2205c97f0ee4e7373653831567be7bc6 Mon Sep 17 00:00:00 2001 From: SomberNight Date: Tue, 20 Dec 2022 16:15:24 +0000 Subject: [PATCH] Revert "wallet.is_up_to_date: fix flickering during sync due to race" This reverts commit dc6c4814068809b6f6d2df9aac0fa90d827a15f2 as it introduced its own issue: while add_address was running on one thread, synchronizer._reset could be running on another, and by the time the "enqueue" coro would run, it would use a new add_queue and addr would not be in requested_addrs anymore... ``` I/w | wallet.Standard_Wallet.[test_segwit_2] | starting taskgroup. I | lnworker.LNWallet.[test_segwit_2] | starting taskgroup. E/i | interface.[testnet.qtornado.com:51002] | Exception in run: KeyError('tb1q3wmgf8n5eettnj50pzgnfrrpdpjmwn37x7nzsc5780kk4je9v4hspym8mu') Traceback (most recent call last): File ".../electrum/electrum/util.py", line 1243, in wrapper return await func(*args, **kwargs) File ".../electrum/electrum/interface.py", line 506, in wrapper_func return await func(self, *args, **kwargs) File ".../electrum/electrum/interface.py", line 529, in run await self.open_session(ssl_context) File ".../electrum/electrum/interface.py", line 679, in open_session async with self.taskgroup as group: File ".../aiorpcX/aiorpcx/curio.py", line 304, in __aexit__ await self.join() File ".../electrum/electrum/util.py", line 1339, in join task.result() File ".../electrum/electrum/synchronizer.py", line 80, in _run_tasks async with taskgroup as group: File ".../aiorpcX/aiorpcx/curio.py", line 304, in __aexit__ await self.join() File ".../electrum/electrum/util.py", line 1339, in join task.result() File ".../electrum/electrum/synchronizer.py", line 127, in subscribe_to_address self.requested_addrs.remove(addr) KeyError: 'tb1q3wmgf8n5eettnj50pzgnfrrpdpjmwn37x7nzsc5780kk4je9v4hspym8mu' ``` --- electrum/address_synchronizer.py | 2 +- electrum/scripts/watch_address.py | 4 ++-- electrum/synchronizer.py | 21 ++++++++++----------- 3 files changed, 13 insertions(+), 14 deletions(-) diff --git a/electrum/address_synchronizer.py b/electrum/address_synchronizer.py index 32dfd0820..bb5863db8 100644 --- a/electrum/address_synchronizer.py +++ b/electrum/address_synchronizer.py @@ -212,7 +212,7 @@ class AddressSynchronizer(Logger, EventListener): self.db.history[address] = [] self.set_up_to_date(False) if self.synchronizer: - self.synchronizer.add_address(address) + self.synchronizer.add(address) def get_conflicting_transactions(self, tx_hash, tx: Transaction, include_self=False): """Returns a set of transaction hashes from the wallet history that are diff --git a/electrum/scripts/watch_address.py b/electrum/scripts/watch_address.py index 4403d44ec..297ec2ea2 100755 --- a/electrum/scripts/watch_address.py +++ b/electrum/scripts/watch_address.py @@ -32,12 +32,12 @@ class Notifier(SynchronizerBase): async def main(self): # resend existing subscriptions if we were restarted for addr in self.watched_addresses: - self.add_address(addr) + await self._add_address(addr) # main loop while True: addr = await self.watch_queue.get() self.watched_addresses.add(addr) - self.add_address(addr) + await self._add_address(addr) async def _on_address_status(self, addr, status): print_msg(f"addr {addr}, status {status}") diff --git a/electrum/synchronizer.py b/electrum/synchronizer.py index 2c434e7ba..b5e9b0ed2 100644 --- a/electrum/synchronizer.py +++ b/electrum/synchronizer.py @@ -83,17 +83,16 @@ class SynchronizerBase(NetworkJobOnDefaultServer): # we are being cancelled now self.session.unsubscribe(self.status_queue) - def add_address(self, addr: str) -> None: - """Add an address to subscribe to. - Thread-safe. When the method returns, the Synchronizer (e.g. is_up_to_date) will know about the address. - """ + def add(self, addr): + # FIXME is_up_to_date does not take addr into account until _add_address executes + asyncio.run_coroutine_threadsafe(self._add_address(addr), self.asyncio_loop) + + async def _add_address(self, addr: str): + # note: this method is async as add_queue.put_nowait is not thread-safe. if not is_address(addr): raise ValueError(f"invalid bitcoin address {addr}") if addr in self.requested_addrs: return self.requested_addrs.add(addr) - async def enqueue(): - # note: this method is async as add_queue.put_nowait is not thread-safe. - self.add_queue.put_nowait(addr) - asyncio.run_coroutine_threadsafe(enqueue(), self.asyncio_loop) + self.add_queue.put_nowait(addr) async def _on_address_status(self, addr, status): """Handle the change of the status of an address.""" @@ -247,7 +246,7 @@ class Synchronizer(SynchronizerBase): await self._request_missing_txs(history, allow_server_not_finding_tx=True) # add addresses to bootstrap for addr in random_shuffled_copy(self.adb.get_addresses()): - self.add_address(addr) + await self._add_address(addr) # main loop while True: await asyncio.sleep(0.1) @@ -273,12 +272,12 @@ class Notifier(SynchronizerBase): async def main(self): # resend existing subscriptions if we were restarted for addr in self.watched_addresses: - self.add_address(addr) + await self._add_address(addr) # main loop while True: addr, url = await self._start_watching_queue.get() self.watched_addresses[addr].append(url) - self.add_address(addr) + await self._add_address(addr) async def start_watching_addr(self, addr: str, url: str): await self._start_watching_queue.put((addr, url))