From a278b91288a117903ad943c5320a190217175135 Mon Sep 17 00:00:00 2001 From: Adam Gibson Date: Mon, 18 Apr 2022 23:26:15 +0100 Subject: [PATCH 1/2] Directories forward disconnection events Prior to this commit, passive observation of orderbook updates by bots such as ob-watcher was not able to keep track of nicks/bots that had disconnected and were no longer available, because non-directory peers do not get the network level disconnection event. After this commit, the directory nodes forward a variant of the "peerlist" control message, that was previously used only to send connection information from one non-directory peer to another (so that they could establish a new p2p connection if desired). Now a new version of that message adds an extra "D" field to indicate that this message is being used to inform that the relevant peer has disconnected from this directory node. Bubbling up the on_nick_leave event: it is already the case that the final on_nick_leave callback is only triggered by the disappearance of a nick from all available message channels (so that as long as a nick is perceived as being in at least one available message channel, its current offers are still maintained in the local database); this logic is now repeated one layer down, because the disconnection of a nick from one directory node may not mean it is shut down, so we only pass the on_nick_leave callback from the OnionMessageChannel object up to the MessageChannelCollection callback when the active_directories dict indicates that this particular nick is no longer available on any of our configured directory nodes. --- jmdaemon/jmdaemon/onionmc.py | 106 ++++++++++++++++++++++++++--------- 1 file changed, 80 insertions(+), 26 deletions(-) diff --git a/jmdaemon/jmdaemon/onionmc.py b/jmdaemon/jmdaemon/onionmc.py index 8e6f63f..d80e000 100644 --- a/jmdaemon/jmdaemon/onionmc.py +++ b/jmdaemon/jmdaemon/onionmc.py @@ -700,8 +700,8 @@ class OnionMessageChannel(MessageChannel): self.wait_for_directories_loop = None # this dict plays the same role as `active_channels` in `MessageChannelCollection`. - # it has structure {nick: set(),..} where set() has elements that are dicts: - # {OnionPeer: bool}. + # it has structure {nick1: {}, nick2: {}, ...} where the inner dicts are: + # {OnionDirectoryPeer1: bool, OnionDirectoryPeer2: bool, ...}. # Entries get updated with changing connection status of directories, # allowing us to decide where to send each message we want to send when we have no # direct connection. @@ -1057,6 +1057,33 @@ class OnionMessageChannel(MessageChannel): return self.send_peers(peer, peer_filter=[peer_from]) + def on_nick_leave_directory(self, nick: str, dir_peer: OnionPeer) -> None: + """ This is called in response to a disconnection control + message from a directory, telling us that a certain nick has left. + We update this connection status in the active_directories map, + and fire the MessageChannel.on_nick_leave when we see all the + connections are lost. + Note that `on_nick_leave` can be triggered in two ways; both here, + and also via `self.register_disconnection`, which occurs for peers + to whom we are directly connected. Calling it multiple times is not + harmful, but remember that the on_nick_leave event only bubbles up + above the message channel layer once *all* message channels trigger + on_nick_leave (in case we are using another message channel as well + as this one, like IRC). + """ + if not nick in self.active_directories: + return + if not dir_peer in self.active_directories[nick]: + log.info("Directory {} is telling us that {} has left, but we " + "didn't know about them. Ignoring.".format( + dir_peer.peer_location(), nick)) + return + log.debug("Directory {} has lost connection to: {}".format( + dir_peer.peer_location(), nick)) + self.active_directories[nick][dir_peer] = False + if all([x is False for x in self.active_directories[nick].values()]): + self.on_nick_leave(nick, self) + def process_control_message(self, peerid: str, msgtype: int, msgval: str) -> bool: """ Triggered by a directory node feeding us @@ -1084,10 +1111,27 @@ class OnionMessageChannel(MessageChannel): return True try: peerlist = msgval.split(",") - for peer in peerlist: + for peer_in_list in peerlist: + # directories should send us peerstrings that include + # nick;host:port;D where "D" indicates that the directory + # is signalling this peer as having left. Otherwise, without + # the third field, we treat it as a "join" event. + try: + nick, hostport, disconnect_code = peer_in_list.split( + NICK_PEERLOCATOR_SEPARATOR) + if not disconnect_code == "D": + continue + self.on_nick_leave_directory(nick, peer) + continue + except ValueError: + # old code does not recognize this "D"; it will + # swallow the message in `add_peer`, ignoring + # the message as invalid because it has three fields + # instead of two. + pass # defaults mean we just add the peer, not # add or alter its connection status: - self.add_peer(peer, with_nick=True) + self.add_peer(peer_in_list, with_nick=True) except Exception as e: log.debug("Incorrectly formatted peer list: {}, " "ignoring, {}".format(msgval, e)) @@ -1118,6 +1162,12 @@ class OnionMessageChannel(MessageChannel): msgval = self.get_peer_by_id(msgval).peer_location() self.add_peer(msgval, connection=False, overwrite_connection=True) + if self.self_as_peer.directory: + # We propagate the control message as a "peerlist" with + # the "D" flag: + disconnected_peer = self.get_peer_by_id(msgval) + for p in self.get_connected_nondirectory_peers(): + self.send_peers(p, peer_filter=[disconnected_peer], d=True) # bubble up the disconnection event to the abstract # message channel logic: if self.on_nick_leave: @@ -1310,8 +1360,6 @@ class OnionMessageChannel(MessageChannel): try: nick, peer = peerdata.split(NICK_PEERLOCATOR_SEPARATOR) except Exception as e: - # TODO: as of now, this is not an error, but expected. - # Don't log? Do something else? log.debug("Received invalid peer identifier string: {}, {}".format( peerdata, e)) return @@ -1361,7 +1409,7 @@ class OnionMessageChannel(MessageChannel): return [p for p in self.peers if p.directory and p.status() == \ PEER_STATUS_HANDSHAKED] - def get_connected_nondirectory_peers(self) -> list: + def get_connected_nondirectory_peers(self) -> List[OnionPeer]: return [p for p in self.peers if (not p.directory) and p.status() == \ PEER_STATUS_HANDSHAKED] @@ -1397,7 +1445,7 @@ class OnionMessageChannel(MessageChannel): """ CONTROL MESSAGES SENT BY US """ def send_peers(self, requesting_peer: OnionPeer, - peer_filter: List[OnionPeer]) -> None: + peer_filter: List[OnionPeer], d: bool=False) -> None: """ This message is sent by directory peers, currently only when a privmsg has to be forwarded to them. It could also be sent by directories to non-directory peers @@ -1414,24 +1462,30 @@ class OnionMessageChannel(MessageChannel): "Cannot send peer list to unhandshaked peer") peerlist = set() peer_filter_exists = len(peer_filter) > 0 - for p in self.get_connected_nondirectory_peers(): - # don't send a peer to itself - if p == requesting_peer: - continue - if peer_filter_exists and p not in peer_filter: - continue - if p.status() != PEER_STATUS_HANDSHAKED: - # don't advertise what is not online. - continue - # peers that haven't sent their nick yet are not - # privmsg-reachable; don't send them - if p.nick == "": - continue - if p.peer_location() == NOT_SERVING_ONION_HOSTNAME: - # if a connection has no reachable destination, - # don't forward it - continue - peerlist.add(p.get_nick_peerlocation_ser()) + if d is False: + for p in self.get_connected_nondirectory_peers(): + # don't send a peer to itself + if p == requesting_peer: + continue + if peer_filter_exists and p not in peer_filter: + continue + if p.status() != PEER_STATUS_HANDSHAKED: + # don't advertise what is not online. + continue + # peers that haven't sent their nick yet are not + # privmsg-reachable; don't send them + if p.nick == "": + continue + if p.peer_location() == NOT_SERVING_ONION_HOSTNAME: + # if a connection has no reachable destination, + # don't forward it + continue + peerlist.add(p.get_nick_peerlocation_ser()) + else: + # since the peer may already be removed from self.peers, + # we don't limit except by filter: + for p in peer_filter: + peerlist.add(p.get_nick_peerlocation_ser() + NICK_PEERLOCATOR_SEPARATOR + "D") # For testing: dns won't usually participate: peerlist.add(self.self_as_peer.get_nick_peerlocation_ser()) # don't send an empty set (will not be possible unless From 706bdb0a0379ed6ed841ee02c7fc77dff0c22b7b Mon Sep 17 00:00:00 2001 From: Adam Gibson Date: Wed, 20 Apr 2022 15:08:41 +0100 Subject: [PATCH 2/2] address review of @PulpCattel --- jmdaemon/jmdaemon/onionmc.py | 40 +++++++++++++++++++++--------------- 1 file changed, 24 insertions(+), 16 deletions(-) diff --git a/jmdaemon/jmdaemon/onionmc.py b/jmdaemon/jmdaemon/onionmc.py index d80e000..1265939 100644 --- a/jmdaemon/jmdaemon/onionmc.py +++ b/jmdaemon/jmdaemon/onionmc.py @@ -1081,7 +1081,7 @@ class OnionMessageChannel(MessageChannel): log.debug("Directory {} has lost connection to: {}".format( dir_peer.peer_location(), nick)) self.active_directories[nick][dir_peer] = False - if all([x is False for x in self.active_directories[nick].values()]): + if not any(self.active_directories[nick].values()): self.on_nick_leave(nick, self) def process_control_message(self, peerid: str, msgtype: int, @@ -1119,15 +1119,12 @@ class OnionMessageChannel(MessageChannel): try: nick, hostport, disconnect_code = peer_in_list.split( NICK_PEERLOCATOR_SEPARATOR) - if not disconnect_code == "D": + if disconnect_code != "D": continue self.on_nick_leave_directory(nick, peer) continue except ValueError: - # old code does not recognize this "D"; it will - # swallow the message in `add_peer`, ignoring - # the message as invalid because it has three fields - # instead of two. + # just means this message is not of the 'disconnect' type pass # defaults mean we just add the peer, not # add or alter its connection status: @@ -1167,7 +1164,8 @@ class OnionMessageChannel(MessageChannel): # the "D" flag: disconnected_peer = self.get_peer_by_id(msgval) for p in self.get_connected_nondirectory_peers(): - self.send_peers(p, peer_filter=[disconnected_peer], d=True) + self.send_peers(p, peer_filter=[disconnected_peer], + disconnect=True) # bubble up the disconnection event to the abstract # message channel logic: if self.on_nick_leave: @@ -1360,6 +1358,12 @@ class OnionMessageChannel(MessageChannel): try: nick, peer = peerdata.split(NICK_PEERLOCATOR_SEPARATOR) except Exception as e: + # old code does not recognize messages with "D" as a third + # field; they will swallow the message here, ignoring + # the message as invalid because it has three fields + # instead of two. + # (We still use the catch-all `Exception`, for the usual reason + # of not wanting to make assumptions about external input). log.debug("Received invalid peer identifier string: {}, {}".format( peerdata, e)) return @@ -1445,24 +1449,28 @@ class OnionMessageChannel(MessageChannel): """ CONTROL MESSAGES SENT BY US """ def send_peers(self, requesting_peer: OnionPeer, - peer_filter: List[OnionPeer], d: bool=False) -> None: + peer_filter: List[OnionPeer], disconnect: bool=False) -> None: """ This message is sent by directory peers, currently - only when a privmsg has to be forwarded to them. It - could also be sent by directories to non-directory peers - according to some other algorithm. - If peer_filter is specified, only those peers will be sent. + only when a privmsg has to be forwarded to them, or a peer has + disconnected. It could also be sent by directories to non-directory + peers according to some other algorithm. + The message is sent *to* `requesting_peer`. + If `peer_filter` is specified, only those peers will be sent. + If `disconnect` is True, we append "D" to every entry, which + indicates to the receiver that the peer being sent has left, + not that that peer is available. The peerlist message should have this format: (1) entries comma separated - (2) each entry is serialized nick then the NICK_PEERLOCATOR_SEPARATOR - then host:port - (3) Peers that do not have a reachable location are not sent. + (2) each entry a two- or three- element list, separated by NICK_PEERLOCATOR_SEPARATOR, + [nick, host:port] or same with ["D"] added at the end. + For the case disconnect=False, peers that do not have a reachable location are not sent. """ if not requesting_peer.status() == PEER_STATUS_HANDSHAKED: raise OnionPeerConnectionError( "Cannot send peer list to unhandshaked peer") peerlist = set() peer_filter_exists = len(peer_filter) > 0 - if d is False: + if disconnect is False: for p in self.get_connected_nondirectory_peers(): # don't send a peer to itself if p == requesting_peer: