diff --git a/electrum/address_synchronizer.py b/electrum/address_synchronizer.py index bb5863db8..32dfd0820 100644 --- a/electrum/address_synchronizer.py +++ b/electrum/address_synchronizer.py @@ -212,7 +212,7 @@ class AddressSynchronizer(Logger, EventListener): self.db.history[address] = [] self.set_up_to_date(False) if self.synchronizer: - self.synchronizer.add(address) + self.synchronizer.add_address(address) def get_conflicting_transactions(self, tx_hash, tx: Transaction, include_self=False): """Returns a set of transaction hashes from the wallet history that are diff --git a/electrum/scripts/watch_address.py b/electrum/scripts/watch_address.py index 297ec2ea2..4403d44ec 100755 --- a/electrum/scripts/watch_address.py +++ b/electrum/scripts/watch_address.py @@ -32,12 +32,12 @@ class Notifier(SynchronizerBase): async def main(self): # resend existing subscriptions if we were restarted for addr in self.watched_addresses: - await self._add_address(addr) + self.add_address(addr) # main loop while True: addr = await self.watch_queue.get() self.watched_addresses.add(addr) - await self._add_address(addr) + self.add_address(addr) async def _on_address_status(self, addr, status): print_msg(f"addr {addr}, status {status}") diff --git a/electrum/synchronizer.py b/electrum/synchronizer.py index 1e8ee4f69..2c434e7ba 100644 --- a/electrum/synchronizer.py +++ b/electrum/synchronizer.py @@ -83,15 +83,17 @@ class SynchronizerBase(NetworkJobOnDefaultServer): # we are being cancelled now self.session.unsubscribe(self.status_queue) - def add(self, addr): - asyncio.run_coroutine_threadsafe(self._add_address(addr), self.asyncio_loop) - - async def _add_address(self, addr: str): - # note: this method is async as add_queue.put_nowait is not thread-safe. + def add_address(self, addr: str) -> None: + """Add an address to subscribe to. + Thread-safe. When the method returns, the Synchronizer (e.g. is_up_to_date) will know about the address. + """ if not is_address(addr): raise ValueError(f"invalid bitcoin address {addr}") if addr in self.requested_addrs: return self.requested_addrs.add(addr) - self.add_queue.put_nowait(addr) + async def enqueue(): + # note: this method is async as add_queue.put_nowait is not thread-safe. + self.add_queue.put_nowait(addr) + asyncio.run_coroutine_threadsafe(enqueue(), self.asyncio_loop) async def _on_address_status(self, addr, status): """Handle the change of the status of an address.""" @@ -245,7 +247,7 @@ class Synchronizer(SynchronizerBase): await self._request_missing_txs(history, allow_server_not_finding_tx=True) # add addresses to bootstrap for addr in random_shuffled_copy(self.adb.get_addresses()): - await self._add_address(addr) + self.add_address(addr) # main loop while True: await asyncio.sleep(0.1) @@ -271,12 +273,12 @@ class Notifier(SynchronizerBase): async def main(self): # resend existing subscriptions if we were restarted for addr in self.watched_addresses: - await self._add_address(addr) + self.add_address(addr) # main loop while True: addr, url = await self._start_watching_queue.get() self.watched_addresses[addr].append(url) - await self._add_address(addr) + self.add_address(addr) async def start_watching_addr(self, addr: str, url: str): await self._start_watching_queue.put((addr, url))