diff --git a/electrum/interface.py b/electrum/interface.py index f45ec070e..d27a99efa 100644 --- a/electrum/interface.py +++ b/electrum/interface.py @@ -489,7 +489,7 @@ class Interface(Logger): self.logger.warning(f"disconnecting due to {repr(e)}") self.logger.debug(f"(disconnect) trace for {repr(e)}", exc_info=True) finally: - self.got_disconnected.set() + self.got_disconnected.set() # set this ASAP, ideally before any awaits await self.network.connection_down(self) # if was not 'ready' yet, schedule waiting coroutines: self.ready.cancel() @@ -534,6 +534,9 @@ class Interface(Logger): self.ready.set_result(1) + def is_connected_and_ready(self) -> bool: + return self.ready.done() and not self.got_disconnected.is_set() + async def _save_certificate(self) -> None: if not os.path.exists(self.cert_path): # we may need to retry this a few times, in case the handshake hasn't completed diff --git a/electrum/network.py b/electrum/network.py index 4d7235eb8..044ea425c 100644 --- a/electrum/network.py +++ b/electrum/network.py @@ -437,7 +437,7 @@ class Network(Logger, NetworkRetryManager[ServerAddr]): def is_connected(self): interface = self.interface - return interface is not None and interface.ready.done() + return interface is not None and interface.is_connected_and_ready() def is_connecting(self): return self.connection_status == 'connecting' @@ -707,8 +707,9 @@ class Network(Logger, NetworkRetryManager[ServerAddr]): i = self.interfaces[server] if old_interface != i: + if not i.is_connected_and_ready(): + return self.logger.info(f"switching to {server}") - assert i.ready.done(), "interface we are switching to is not ready yet" blockchain_updated = i.blockchain != self.blockchain() self.interface = i await i.taskgroup.spawn(self._request_server_info(i)) @@ -1195,7 +1196,7 @@ class Network(Logger, NetworkRetryManager[ServerAddr]): await self.taskgroup.spawn(self._run_new_interface(self.default_server)) async def main(): - self.logger.info("starting taskgroup.") + self.logger.info(f"starting taskgroup ({hex(id(taskgroup))}).") try: # note: if a task finishes with CancelledError, that # will NOT raise, and the group will keep the other tasks running @@ -1203,9 +1204,9 @@ class Network(Logger, NetworkRetryManager[ServerAddr]): await group.spawn(self._maintain_sessions()) [await group.spawn(job) for job in self._jobs] except Exception as e: - self.logger.exception("taskgroup died.") + self.logger.exception(f"taskgroup died ({hex(id(taskgroup))}).") finally: - self.logger.info("taskgroup stopped.") + self.logger.info(f"taskgroup stopped ({hex(id(taskgroup))}).") asyncio.run_coroutine_threadsafe(main(), self.asyncio_loop) util.trigger_callback('network_updated') @@ -1238,13 +1239,13 @@ class Network(Logger, NetworkRetryManager[ServerAddr]): util.trigger_callback('network_updated') async def _ensure_there_is_a_main_interface(self): - if self.is_connected(): + if self.interface: return # if auto_connect is set, try a different server if self.auto_connect and not self.is_connecting(): await self._switch_to_random_interface() # if auto_connect is not set, or still no main interface, retry current - if not self.is_connected() and not self.is_connecting(): + if not self.interface and not self.is_connecting(): if self._can_retry_addr(self.default_server, urgent=True): await self.switch_to_interface(self.default_server) @@ -1271,15 +1272,9 @@ class Network(Logger, NetworkRetryManager[ServerAddr]): await self.interface.taskgroup.spawn(self._request_fee_estimates, self.interface) while True: - try: - await maybe_start_new_interfaces() - await maintain_healthy_spread_of_connected_servers() - await maintain_main_interface() - except asyncio.CancelledError: - # suppress spurious cancellations - group = self.taskgroup - if not group or group.joined: - raise + await maybe_start_new_interfaces() + await maintain_healthy_spread_of_connected_servers() + await maintain_main_interface() await asyncio.sleep(0.1) @classmethod