Browse Source

lnpeer: make process_message async

This allows making any message handler async in lnpeer.

Note: `process_message` is only called from `_message_loop`.
There are(would be) basically three types of message handlers:
1. "traditional blocking msg handlers". non-async ones. When these handlers are called, `process_message` naturally blocks until the handler returns, which means `_message_loop` also blocks until the message is fully processed before starting the next iteration.
2. "async blocking msg handlers". async ones where we want the previous property, i.e. we want the `_message_loop` to wait until the handler finishes. We await the handler inside `process_message`, and `_message_loop` awaits `process_message`.
3. "async non-blocking msg handlers". async message handlers that can be spawned e.g. onto `Peer.taskgroup` and the loop is free to start processing subsequent messages. e.g. msg handlers that start a negotiation, such as `on_shutdown` and `on_open_channel`.

Any non-async message handler (`def on_...`) automatically goes into category 1.
An async message handler, by default, goes into category 2, "blocking";
to go into category 3 ("non-blocking"), we use the `runs_in_taskgroup` function decorator.
master
SomberNight 4 years ago
parent
commit
45e08ada61
No known key found for this signature in database
GPG Key ID: B33B5F232C6271E9
  1. 23
      electrum/lnpeer.py

23
electrum/lnpeer.py

@ -198,7 +198,7 @@ class Peer(Logger):
self.pong_event.clear() self.pong_event.clear()
await self.pong_event.wait() await self.pong_event.wait()
def process_message(self, message: bytes): async def _process_message(self, message: bytes) -> None:
try: try:
message_type, payload = decode_msg(message) message_type, payload = decode_msg(message)
except UnknownOptionalMsgType as e: except UnknownOptionalMsgType as e:
@ -236,9 +236,22 @@ class Peer(Logger):
# raw message is needed to check signature # raw message is needed to check signature
if message_type in ['node_announcement', 'channel_announcement', 'channel_update']: if message_type in ['node_announcement', 'channel_announcement', 'channel_update']:
payload['raw'] = message payload['raw'] = message
execution_result = f(*args) # note: the message handler might be async or non-async. In either case, by default,
# we wait for it to complete before we return, i.e. before the next message is processed.
if asyncio.iscoroutinefunction(f): if asyncio.iscoroutinefunction(f):
asyncio.ensure_future(self.taskgroup.spawn(execution_result)) await f(*args)
else:
f(*args)
def non_blocking_msg_handler(func):
"""Makes a message handler non-blocking: while processing the message,
the message_loop keeps processing subsequent incoming messages asynchronously.
"""
assert asyncio.iscoroutinefunction(func), 'func needs to be a coroutine'
@functools.wraps(func)
async def wrapper(self: 'Peer', *args, **kwargs):
return await self.taskgroup.spawn(func(self, *args, **kwargs))
return wrapper
def on_warning(self, payload): def on_warning(self, payload):
chan_id = payload.get("channel_id") chan_id = payload.get("channel_id")
@ -602,7 +615,7 @@ class Peer(Logger):
except (OSError, asyncio.TimeoutError, HandshakeFailed) as e: except (OSError, asyncio.TimeoutError, HandshakeFailed) as e:
raise GracefulDisconnect(f'initialize failed: {repr(e)}') from e raise GracefulDisconnect(f'initialize failed: {repr(e)}') from e
async for msg in self.transport.read_messages(): async for msg in self.transport.read_messages():
self.process_message(msg) await self._process_message(msg)
if self.DELAY_INC_MSG_PROCESSING_SLEEP: if self.DELAY_INC_MSG_PROCESSING_SLEEP:
# rate-limit message-processing a bit, to make it harder # rate-limit message-processing a bit, to make it harder
# for a single peer to bog down the event loop / cpu: # for a single peer to bog down the event loop / cpu:
@ -944,6 +957,7 @@ class Peer(Logger):
# set db to None, because we do not want to write updates until channel is saved # set db to None, because we do not want to write updates until channel is saved
return StoredDict(chan_dict, None, []) return StoredDict(chan_dict, None, [])
@non_blocking_msg_handler
async def on_open_channel(self, payload): async def on_open_channel(self, payload):
"""Implements the channel acceptance flow. """Implements the channel acceptance flow.
@ -2278,6 +2292,7 @@ class Peer(Logger):
raise Exception('The remote peer did not send their final signature. The channel may not have been be closed') raise Exception('The remote peer did not send their final signature. The channel may not have been be closed')
return txid return txid
@non_blocking_msg_handler
async def on_shutdown(self, chan: Channel, payload): async def on_shutdown(self, chan: Channel, payload):
# TODO: A receiving node: if it hasn't received a funding_signed (if it is a # TODO: A receiving node: if it hasn't received a funding_signed (if it is a
# funder) or a funding_created (if it is a fundee): # funder) or a funding_created (if it is a fundee):

Loading…
Cancel
Save