From 45e08ada6131a1758df6683746516938e43ed933 Mon Sep 17 00:00:00 2001 From: SomberNight Date: Mon, 30 May 2022 15:15:31 +0200 Subject: [PATCH] lnpeer: make process_message async This allows making any message handler async in lnpeer. Note: `process_message` is only called from `_message_loop`. There are(would be) basically three types of message handlers: 1. "traditional blocking msg handlers". non-async ones. When these handlers are called, `process_message` naturally blocks until the handler returns, which means `_message_loop` also blocks until the message is fully processed before starting the next iteration. 2. "async blocking msg handlers". async ones where we want the previous property, i.e. we want the `_message_loop` to wait until the handler finishes. We await the handler inside `process_message`, and `_message_loop` awaits `process_message`. 3. "async non-blocking msg handlers". async message handlers that can be spawned e.g. onto `Peer.taskgroup` and the loop is free to start processing subsequent messages. e.g. msg handlers that start a negotiation, such as `on_shutdown` and `on_open_channel`. Any non-async message handler (`def on_...`) automatically goes into category 1. An async message handler, by default, goes into category 2, "blocking"; to go into category 3 ("non-blocking"), we use the `runs_in_taskgroup` function decorator. --- electrum/lnpeer.py | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/electrum/lnpeer.py b/electrum/lnpeer.py index e4138c1a5..3f90bfcbf 100644 --- a/electrum/lnpeer.py +++ b/electrum/lnpeer.py @@ -198,7 +198,7 @@ class Peer(Logger): self.pong_event.clear() await self.pong_event.wait() - def process_message(self, message: bytes): + async def _process_message(self, message: bytes) -> None: try: message_type, payload = decode_msg(message) except UnknownOptionalMsgType as e: @@ -236,9 +236,22 @@ class Peer(Logger): # raw message is needed to check signature if message_type in ['node_announcement', 'channel_announcement', 'channel_update']: payload['raw'] = message - execution_result = f(*args) + # note: the message handler might be async or non-async. In either case, by default, + # we wait for it to complete before we return, i.e. before the next message is processed. if asyncio.iscoroutinefunction(f): - asyncio.ensure_future(self.taskgroup.spawn(execution_result)) + await f(*args) + else: + f(*args) + + def non_blocking_msg_handler(func): + """Makes a message handler non-blocking: while processing the message, + the message_loop keeps processing subsequent incoming messages asynchronously. + """ + assert asyncio.iscoroutinefunction(func), 'func needs to be a coroutine' + @functools.wraps(func) + async def wrapper(self: 'Peer', *args, **kwargs): + return await self.taskgroup.spawn(func(self, *args, **kwargs)) + return wrapper def on_warning(self, payload): chan_id = payload.get("channel_id") @@ -602,7 +615,7 @@ class Peer(Logger): except (OSError, asyncio.TimeoutError, HandshakeFailed) as e: raise GracefulDisconnect(f'initialize failed: {repr(e)}') from e async for msg in self.transport.read_messages(): - self.process_message(msg) + await self._process_message(msg) if self.DELAY_INC_MSG_PROCESSING_SLEEP: # rate-limit message-processing a bit, to make it harder # for a single peer to bog down the event loop / cpu: @@ -944,6 +957,7 @@ class Peer(Logger): # set db to None, because we do not want to write updates until channel is saved return StoredDict(chan_dict, None, []) + @non_blocking_msg_handler async def on_open_channel(self, payload): """Implements the channel acceptance flow. @@ -2278,6 +2292,7 @@ class Peer(Logger): raise Exception('The remote peer did not send their final signature. The channel may not have been be closed') return txid + @non_blocking_msg_handler async def on_shutdown(self, chan: Channel, payload): # TODO: A receiving node: if it hasn't received a funding_signed (if it is a # funder) or a funding_created (if it is a fundee):