Browse Source

Merge pull request #7835 from SomberNight/202205_lnpeer_async_process_message

lnpeer: make `process_message` async
master
ghost43 2 years ago committed by GitHub
parent
commit
697968bcbb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 23
      electrum/lnpeer.py

23
electrum/lnpeer.py

@ -198,7 +198,7 @@ class Peer(Logger):
self.pong_event.clear() self.pong_event.clear()
await self.pong_event.wait() await self.pong_event.wait()
def process_message(self, message: bytes): async def _process_message(self, message: bytes) -> None:
try: try:
message_type, payload = decode_msg(message) message_type, payload = decode_msg(message)
except UnknownOptionalMsgType as e: except UnknownOptionalMsgType as e:
@ -236,9 +236,22 @@ class Peer(Logger):
# raw message is needed to check signature # raw message is needed to check signature
if message_type in ['node_announcement', 'channel_announcement', 'channel_update']: if message_type in ['node_announcement', 'channel_announcement', 'channel_update']:
payload['raw'] = message payload['raw'] = message
execution_result = f(*args) # note: the message handler might be async or non-async. In either case, by default,
# we wait for it to complete before we return, i.e. before the next message is processed.
if asyncio.iscoroutinefunction(f): if asyncio.iscoroutinefunction(f):
asyncio.ensure_future(self.taskgroup.spawn(execution_result)) await f(*args)
else:
f(*args)
def non_blocking_msg_handler(func):
"""Makes a message handler non-blocking: while processing the message,
the message_loop keeps processing subsequent incoming messages asynchronously.
"""
assert asyncio.iscoroutinefunction(func), 'func needs to be a coroutine'
@functools.wraps(func)
async def wrapper(self: 'Peer', *args, **kwargs):
return await self.taskgroup.spawn(func(self, *args, **kwargs))
return wrapper
def on_warning(self, payload): def on_warning(self, payload):
chan_id = payload.get("channel_id") chan_id = payload.get("channel_id")
@ -602,7 +615,7 @@ class Peer(Logger):
except (OSError, asyncio.TimeoutError, HandshakeFailed) as e: except (OSError, asyncio.TimeoutError, HandshakeFailed) as e:
raise GracefulDisconnect(f'initialize failed: {repr(e)}') from e raise GracefulDisconnect(f'initialize failed: {repr(e)}') from e
async for msg in self.transport.read_messages(): async for msg in self.transport.read_messages():
self.process_message(msg) await self._process_message(msg)
if self.DELAY_INC_MSG_PROCESSING_SLEEP: if self.DELAY_INC_MSG_PROCESSING_SLEEP:
# rate-limit message-processing a bit, to make it harder # rate-limit message-processing a bit, to make it harder
# for a single peer to bog down the event loop / cpu: # for a single peer to bog down the event loop / cpu:
@ -944,6 +957,7 @@ class Peer(Logger):
# set db to None, because we do not want to write updates until channel is saved # set db to None, because we do not want to write updates until channel is saved
return StoredDict(chan_dict, None, []) return StoredDict(chan_dict, None, [])
@non_blocking_msg_handler
async def on_open_channel(self, payload): async def on_open_channel(self, payload):
"""Implements the channel acceptance flow. """Implements the channel acceptance flow.
@ -2278,6 +2292,7 @@ class Peer(Logger):
raise Exception('The remote peer did not send their final signature. The channel may not have been be closed') raise Exception('The remote peer did not send their final signature. The channel may not have been be closed')
return txid return txid
@non_blocking_msg_handler
async def on_shutdown(self, chan: Channel, payload): async def on_shutdown(self, chan: Channel, payload):
# TODO: A receiving node: if it hasn't received a funding_signed (if it is a # TODO: A receiving node: if it hasn't received a funding_signed (if it is a
# funder) or a funding_created (if it is a fundee): # funder) or a funding_created (if it is a fundee):

Loading…
Cancel
Save