From af2c9b081c9dd4f781e9f52d5fa8c397383df342 Mon Sep 17 00:00:00 2001 From: SomberNight Date: Mon, 3 Jun 2024 18:36:08 +0000 Subject: [PATCH] util: add AsyncHangDetector, and use it for lnpeer._process_message --- electrum/lnpeer.py | 8 ++++++-- electrum/util.py | 30 ++++++++++++++++++++++++++++++ 2 files changed, 36 insertions(+), 2 deletions(-) diff --git a/electrum/lnpeer.py b/electrum/lnpeer.py index 58bd75f0e..c29c37526 100644 --- a/electrum/lnpeer.py +++ b/electrum/lnpeer.py @@ -22,7 +22,7 @@ from . import ecc from .ecc import ecdsa_sig64_from_r_and_s, ecdsa_der_sig_from_ecdsa_sig64, ECPubkey from . import constants from .util import (bfh, log_exceptions, ignore_exceptions, chunks, OldTaskGroup, - UnrelatedTransactionException, error_text_bytes_to_safe_str) + UnrelatedTransactionException, error_text_bytes_to_safe_str, AsyncHangDetector) from . import transaction from .bitcoin import make_op_return, DummyAddress from .transaction import PartialTxOutput, match_script_against_template, Sighash @@ -240,7 +240,11 @@ class Peer(Logger): # note: the message handler might be async or non-async. In either case, by default, # we wait for it to complete before we return, i.e. before the next message is processed. if asyncio.iscoroutinefunction(f): - await f(*args) + async with AsyncHangDetector( + message=f"message handler still running for {message_type.upper()}", + logger=self.logger, + ): + await f(*args) else: f(*args) diff --git a/electrum/util.py b/electrum/util.py index c4422deb5..40aff7dfb 100644 --- a/electrum/util.py +++ b/electrum/util.py @@ -22,6 +22,7 @@ # SOFTWARE. import binascii import concurrent.futures +import logging import os, sys, re, json from collections import defaultdict, OrderedDict from typing import (NamedTuple, Union, TYPE_CHECKING, Tuple, Optional, Callable, Any, @@ -488,6 +489,35 @@ def profiler(func=None, *, min_threshold: Union[int, float, None] = None): return do_profile +class AsyncHangDetector: + """Context manager that logs every `n` seconds if encapsulated context still has not exited.""" + + def __init__( + self, + *, + period_sec: int = 15, + message: str, + logger: logging.Logger = None, + ): + self.period_sec = period_sec + self.message = message + self.logger = logger or _logger + + async def _monitor(self): + # note: this assumes that the event loop itself is not blocked + t0 = time.monotonic() + while True: + await asyncio.sleep(self.period_sec) + t1 = time.monotonic() + self.logger.info(f"{self.message} (after {t1 - t0:.2f} sec)") + + async def __aenter__(self): + self.mtask = asyncio.create_task(self._monitor()) + + async def __aexit__(self, exc_type, exc, tb): + self.mtask.cancel() + + def android_ext_dir(): from android.storage import primary_external_storage_path return primary_external_storage_path()