From a92dede490545b04bbfb199064abd48aa46e3116 Mon Sep 17 00:00:00 2001 From: SomberNight Date: Thu, 17 Mar 2022 17:14:51 +0100 Subject: [PATCH] lnpeer: some rework of error/warning message handling - rm the `_get_channel_ids` abstraction as each of its usages needs subtle differences. Some code duplication is preferable in this case. - raise exceptions in `wait_for_message`, so that callers such as the GUI can show user-feedback - on_error/on_warning were dropping messages with temp_chan_ids if they were not stored in `temp_id_to_id` - which was only done once the mapping was known (so the normal chan_id was known). To fix this, we now store temp_chan_ids into `temp_id_to_id` early. - `schedule_force_closing` only works if the chan_id is already in `channels` related: https://github.com/spesmilo/electrum/pull/7645 (and related commits) ----- example before commit: ``` D/P | lnpeer.Peer.[LNWallet, 03933884aa-3b53e4ab] | Sending OPEN_CHANNEL D/P | lnpeer.Peer.[LNWallet, 03933884aa-3b53e4ab] | Received ERROR I/P | lnpeer.Peer.[LNWallet, 03933884aa-3b53e4ab] | remote peer sent error [DO NOT TRUST THIS MESSAGE]: invalid funding_satoshis=10000 sat (min=400000 sat max=1500000000 sat) E | gui.qt.main_window.[test_segwit_2] | Could not open channel Traceback (most recent call last): File "...\electrum\electrum\util.py", line 1160, in wrapper return await func(*args, **kwargs) File "...\electrum\electrum\lnpeer.py", line 661, in wrapper return await func(self, *args, **kwargs) File "...\electrum\electrum\lnpeer.py", line 742, in channel_establishment_flow payload = await self.wait_for_message('accept_channel', temp_channel_id) # File "...\electrum\electrum\lnpeer.py", line 315, in wait_for_message name, payload = await asyncio.wait_for(q.get(), LN_P2P_NETWORK_TIMEOUT) File "...\Python39\lib\asyncio\tasks.py", line 468, in wait_for await waiter asyncio.exceptions.CancelledError During handling of the above exception, another exception occurred: Traceback (most recent call last): File "...\Python39\lib\asyncio\tasks.py", line 492, in wait_for fut.result() asyncio.exceptions.CancelledError The above exception was the direct cause of the following exception: Traceback (most recent call last): File "...\electrum\electrum\gui\qt\util.py", line 914, in run result = task.task() File "...\electrum\electrum\gui\qt\main_window.py", line 1875, in task return self.wallet.lnworker.open_channel( File "...\electrum\electrum\lnworker.py", line 1075, in open_channel chan, funding_tx = fut.result() File "...\Python39\lib\concurrent\futures\_base.py", line 445, in result return self.__get_result() File "...\Python39\lib\concurrent\futures\_base.py", line 390, in __get_result raise self._exception File "...\electrum\electrum\util.py", line 1160, in wrapper return await func(*args, **kwargs) File "...\electrum\electrum\lnworker.py", line 1006, in _open_channel_coroutine chan, funding_tx = await asyncio.wait_for(coro, LN_P2P_NETWORK_TIMEOUT) File "...\Python39\lib\asyncio\tasks.py", line 494, in wait_for raise exceptions.TimeoutError() from exc asyncio.exceptions.TimeoutError ``` example after commit: ``` D/P | lnpeer.Peer.[LNWallet, 03933884aa-ff3a866f] | Sending OPEN_CHANNEL D/P | lnpeer.Peer.[LNWallet, 03933884aa-ff3a866f] | Received ERROR I/P | lnpeer.Peer.[LNWallet, 03933884aa-ff3a866f] | remote peer sent error [DO NOT TRUST THIS MESSAGE]: invalid funding_satoshis=10000 sat (min=400000 sat max=1500000000 sat). chan_id=124ca21fa6aa2993430ad71f465f0d44731ef87f7478e4b31327e4459b5a3988 E | lnworker.LNWallet.[test_segwit_2] | Exception in _open_channel_coroutine: GracefulDisconnect('remote peer sent error [DO NOT TRUST THIS MESSAGE]: invalid funding_satoshis=10000 sat (min=400000 sat max=1500000000 sat)') Traceback (most recent call last): File "...\electrum\electrum\util.py", line 1160, in wrapper return await func(*args, **kwargs) File "...\electrum\electrum\lnworker.py", line 1006, in _open_channel_coroutine chan, funding_tx = await asyncio.wait_for(coro, LN_P2P_NETWORK_TIMEOUT) File "...\Python39\lib\asyncio\tasks.py", line 481, in wait_for return fut.result() File "...\electrum\electrum\lnpeer.py", line 673, in wrapper return await func(self, *args, **kwargs) File "...\electrum\electrum\lnpeer.py", line 755, in channel_establishment_flow payload = await self.wait_for_message('accept_channel', temp_channel_id) File "...\electrum\electrum\lnpeer.py", line 326, in wait_for_message raise GracefulDisconnect( electrum.interface.GracefulDisconnect: remote peer sent error [DO NOT TRUST THIS MESSAGE]: invalid funding_satoshis=10000 sat (min=400000 sat max=1500000000 sat) I/P | lnpeer.Peer.[LNWallet, 03933884aa-ff3a866f] | Disconnecting: GracefulDisconnect() ``` --- electrum/gui/qt/main_window.py | 1 + electrum/lnpeer.py | 91 +++++++++++++++++++++------------- 2 files changed, 58 insertions(+), 34 deletions(-) diff --git a/electrum/gui/qt/main_window.py b/electrum/gui/qt/main_window.py index 4d133b477..78e2106fb 100644 --- a/electrum/gui/qt/main_window.py +++ b/electrum/gui/qt/main_window.py @@ -1880,6 +1880,7 @@ class ElectrumWindow(QMainWindow, MessageBoxMixin, Logger): password=password) def on_failure(exc_info): type_, e, traceback = exc_info + #self.logger.error("Could not open channel", exc_info=exc_info) self.show_error(_('Could not open channel: {}').format(repr(e))) WaitingDialog(self, _('Opening channel...'), task, self.on_open_channel_success, on_failure) diff --git a/electrum/lnpeer.py b/electrum/lnpeer.py index 19eac2043..ae0e56645 100644 --- a/electrum/lnpeer.py +++ b/electrum/lnpeer.py @@ -99,7 +99,7 @@ class Peer(Logger): # gossip uses a single queue to preserve message order self.gossip_queue = asyncio.Queue() self.ordered_message_queues = defaultdict(asyncio.Queue) # for messages that are ordered - self.temp_id_to_id = {} # to forward error messages + self.temp_id_to_id = {} # type: Dict[bytes, Optional[bytes]] # to forward error messages self.funding_created_sent = set() # for channels in PREOPENING self.funding_signed_sent = set() # for channels in PREOPENING self.shutdown_received = {} # chan_id -> asyncio.Future() @@ -224,36 +224,42 @@ class Peer(Logger): if asyncio.iscoroutinefunction(f): asyncio.ensure_future(self.taskgroup.spawn(execution_result)) - def _get_channel_ids(self, channel_id): - # if channel_id is all zero: MUST fail all channels with the sending node. - # otherwise: MUST fail the channel referred to by channel_id, if that channel is with the sending node. - # if no existing channel is referred to by `channel_id: MUST ignore the message. - if channel_id == bytes(32): - return self.channels.keys() - elif channel_id in self.temp_id_to_id: - return [self.temp_id_to_id[channel_id]] - elif channel_id in self.channels: - return [channel_id] - else: - return [] - def on_warning(self, payload): - # TODO: we could need some reconnection logic here -> delayed reconnect - self.logger.info(f"remote peer sent warning [DO NOT TRUST THIS MESSAGE]: {payload['data'].decode('ascii')}") - channel_ids = self._get_channel_ids(payload.get("channel_id")) - for cid in channel_ids: - self.ordered_message_queues[cid].put_nowait((None, {'warning': payload['data']})) - if channel_ids: - raise GracefulDisconnect + chan_id = payload.get("channel_id") + self.logger.info(f"remote peer sent warning [DO NOT TRUST THIS MESSAGE]: " + f"{payload['data'].decode('ascii')}. chan_id={chan_id.hex()}") + if chan_id in self.channels: + self.ordered_message_queues[chan_id].put_nowait((None, {'warning': payload['data']})) + elif chan_id in self.temp_id_to_id: + chan_id = self.temp_id_to_id[chan_id] or chan_id + self.ordered_message_queues[chan_id].put_nowait((None, {'warning': payload['data']})) + else: + # if no existing channel is referred to by channel_id: + # - MUST ignore the message. + return + raise GracefulDisconnect def on_error(self, payload): - self.logger.info(f"remote peer sent error [DO NOT TRUST THIS MESSAGE]: {payload['data'].decode('ascii')}") - channel_ids = self._get_channel_ids(payload.get("channel_id")) - for cid in channel_ids: - self.schedule_force_closing(cid) - self.ordered_message_queues[cid].put_nowait((None, {'error': payload['data']})) - if channel_ids: - raise GracefulDisconnect + chan_id = payload.get("channel_id") + self.logger.info(f"remote peer sent error [DO NOT TRUST THIS MESSAGE]: " + f"{payload['data'].decode('ascii')}. chan_id={chan_id.hex()}") + if chan_id in self.channels: + self.schedule_force_closing(chan_id) + self.ordered_message_queues[chan_id].put_nowait((None, {'error': payload['data']})) + elif chan_id in self.temp_id_to_id: + chan_id = self.temp_id_to_id[chan_id] or chan_id + self.ordered_message_queues[chan_id].put_nowait((None, {'error': payload['data']})) + elif chan_id == bytes(32): + # if channel_id is all zero: + # - MUST fail all channels with the sending node. + for cid in self.channels: + self.schedule_force_closing(cid) + self.ordered_message_queues[cid].put_nowait((None, {'error': payload['data']})) + else: + # if no existing channel is referred to by channel_id: + # - MUST ignore the message. + return + raise GracefulDisconnect async def send_warning(self, channel_id: bytes, message: str = None, *, close_connection=True): """Sends a warning and disconnects if close_connection. @@ -298,8 +304,11 @@ class Peer(Logger): # MUST fail the channel(s) referred to by the error message: # we may violate this with force_close_channel if force_close_channel: - for cid in self._get_channel_ids(channel_id): + if channel_id in self.channels: self.schedule_force_closing(channel_id) + elif channel_id == bytes(32): + for cid in self.channels: + self.schedule_force_closing(cid) raise GracefulDisconnect def on_ping(self, payload): @@ -310,11 +319,15 @@ class Peer(Logger): pass async def wait_for_message(self, expected_name, channel_id): - # errors and warnings are sent to the queue with name set to None, so that this task terminates q = self.ordered_message_queues[channel_id] name, payload = await asyncio.wait_for(q.get(), LN_P2P_NETWORK_TIMEOUT) - if name is None: - raise GracefulDisconnect + # raise exceptions for errors/warnings, so that the caller sees them + if payload.get('error'): + raise GracefulDisconnect( + f"remote peer sent error [DO NOT TRUST THIS MESSAGE]: {payload['error'].decode('ascii')}") + elif payload.get('warning'): + raise GracefulDisconnect( + f"remote peer sent warning [DO NOT TRUST THIS MESSAGE]: {payload['warning'].decode('ascii')}") if name != expected_name: raise Exception(f"Received unexpected '{name}'") return payload @@ -663,7 +676,6 @@ class Peer(Logger): self.lnworker.wallet.set_reserved_state_of_address(addr, reserved=False) return wrapper - @log_exceptions @temporarily_reserve_funding_tx_change_address async def channel_establishment_flow( self, *, @@ -714,6 +726,10 @@ class Peer(Logger): ) per_commitment_point_first = secret_to_pubkey( int.from_bytes(per_commitment_secret_first, 'big')) + + # store the temp id now, so that it is recognized for e.g. 'error' messages + # TODO: this is never cleaned up; the dict grows unbounded until disconnect + self.temp_id_to_id[temp_channel_id] = None self.send_message( "open_channel", temporary_channel_id=temp_channel_id, @@ -897,6 +913,9 @@ class Peer(Logger): push_msat = payload['push_msat'] feerate = payload['feerate_per_kw'] # note: we are not validating this temp_chan_id = payload['temporary_channel_id'] + # store the temp id now, so that it is recognized for e.g. 'error' messages + # TODO: this is never cleaned up; the dict grows unbounded until disconnect + self.temp_id_to_id[temp_chan_id] = None open_channel_tlvs = payload.get('open_channel_tlvs') channel_type = open_channel_tlvs.get('channel_type') if open_channel_tlvs else None @@ -1018,6 +1037,7 @@ class Peer(Logger): channel_id=channel_id, signature=sig_64, ) + self.temp_id_to_id[temp_chan_id] = channel_id self.funding_signed_sent.add(chan.channel_id) chan.open_with_first_pcp(payload['first_per_commitment_point'], remote_sig) chan.set_state(ChannelState.OPENING) @@ -1040,7 +1060,10 @@ class Peer(Logger): channels_with_peer.extend(self.temp_id_to_id.values()) if channel_id not in channels_with_peer: raise ValueError(f"channel {channel_id.hex()} does not belong to this peer") - self.lnworker.schedule_force_closing(channel_id) + if channel_id in self.channels: + self.lnworker.schedule_force_closing(channel_id) + else: + self.logger.warning(f"tried to force-close channel {channel_id.hex()} but it is not in self.channels yet") def on_channel_reestablish(self, chan, msg): their_next_local_ctn = msg["next_commitment_number"]