diff --git a/electrum/address_synchronizer.py b/electrum/address_synchronizer.py index 32dfd0820..bb5863db8 100644 --- a/electrum/address_synchronizer.py +++ b/electrum/address_synchronizer.py @@ -212,7 +212,7 @@ class AddressSynchronizer(Logger, EventListener): self.db.history[address] = [] self.set_up_to_date(False) if self.synchronizer: - self.synchronizer.add_address(address) + self.synchronizer.add(address) def get_conflicting_transactions(self, tx_hash, tx: Transaction, include_self=False): """Returns a set of transaction hashes from the wallet history that are diff --git a/electrum/scripts/watch_address.py b/electrum/scripts/watch_address.py index 4403d44ec..297ec2ea2 100755 --- a/electrum/scripts/watch_address.py +++ b/electrum/scripts/watch_address.py @@ -32,12 +32,12 @@ class Notifier(SynchronizerBase): async def main(self): # resend existing subscriptions if we were restarted for addr in self.watched_addresses: - self.add_address(addr) + await self._add_address(addr) # main loop while True: addr = await self.watch_queue.get() self.watched_addresses.add(addr) - self.add_address(addr) + await self._add_address(addr) async def _on_address_status(self, addr, status): print_msg(f"addr {addr}, status {status}") diff --git a/electrum/synchronizer.py b/electrum/synchronizer.py index 2c434e7ba..b5e9b0ed2 100644 --- a/electrum/synchronizer.py +++ b/electrum/synchronizer.py @@ -83,17 +83,16 @@ class SynchronizerBase(NetworkJobOnDefaultServer): # we are being cancelled now self.session.unsubscribe(self.status_queue) - def add_address(self, addr: str) -> None: - """Add an address to subscribe to. - Thread-safe. When the method returns, the Synchronizer (e.g. is_up_to_date) will know about the address. - """ + def add(self, addr): + # FIXME is_up_to_date does not take addr into account until _add_address executes + asyncio.run_coroutine_threadsafe(self._add_address(addr), self.asyncio_loop) + + async def _add_address(self, addr: str): + # note: this method is async as add_queue.put_nowait is not thread-safe. if not is_address(addr): raise ValueError(f"invalid bitcoin address {addr}") if addr in self.requested_addrs: return self.requested_addrs.add(addr) - async def enqueue(): - # note: this method is async as add_queue.put_nowait is not thread-safe. - self.add_queue.put_nowait(addr) - asyncio.run_coroutine_threadsafe(enqueue(), self.asyncio_loop) + self.add_queue.put_nowait(addr) async def _on_address_status(self, addr, status): """Handle the change of the status of an address.""" @@ -247,7 +246,7 @@ class Synchronizer(SynchronizerBase): await self._request_missing_txs(history, allow_server_not_finding_tx=True) # add addresses to bootstrap for addr in random_shuffled_copy(self.adb.get_addresses()): - self.add_address(addr) + await self._add_address(addr) # main loop while True: await asyncio.sleep(0.1) @@ -273,12 +272,12 @@ class Notifier(SynchronizerBase): async def main(self): # resend existing subscriptions if we were restarted for addr in self.watched_addresses: - self.add_address(addr) + await self._add_address(addr) # main loop while True: addr, url = await self._start_watching_queue.get() self.watched_addresses[addr].append(url) - self.add_address(addr) + await self._add_address(addr) async def start_watching_addr(self, addr: str, url: str): await self._start_watching_queue.put((addr, url))