Browse Source

Only attempt to connect to peers we send to

Prior to this commit, a taker bot that received the orderbook on the
onion message channel would attempt to connect to every maker bot that
had sent an offer, which is not just inefficient but appears to trigger
rate limiting in Tor itself, throwing errors in the SOCKS proxy, at
numbers of around 80 makers, in testing (but could easily happen at
lower levels).
After this commit, we only opportunistically attempt to connect to
makers (using the same OnionPeer.try_to_connect method as before) when
we have already decided to send them a message (e.g. chosen to fill
their offer). This is done by calling try_to_connect only in
OnionMessageChannel._privmsg, and only to the right kind of peer.
The downside of this is that we will send more messages via directory
(at least one additional, for each maker), but clearly the alternative
is not viable.
As part of this update, we ensure that connection attempts are not
accidentally duplicated (see OnionPeer.connecting).
master
Adam Gibson 4 years ago
parent
commit
cc6d341593
No known key found for this signature in database
GPG Key ID: 141001A1AF77F20B
  1. 64
      jmdaemon/jmdaemon/onionmc.py

64
jmdaemon/jmdaemon/onionmc.py

@ -9,7 +9,8 @@ from twisted.protocols import basic
from twisted.application.internet import ClientService
from twisted.internet.endpoints import TCP4ClientEndpoint
from twisted.internet.address import IPv4Address, IPv6Address
from txtorcon.socks import TorSocksEndpoint, HostUnreachableError
from txtorcon.socks import (TorSocksEndpoint, HostUnreachableError,
SocksError, GeneralServerFailureError)
log = get_log()
@ -329,6 +330,9 @@ class OnionPeer(object):
# the reconnecting service allows auto-reconnection to
# some peers:
self.reconnecting_service = None
# don't try to connect more than once
# TODO: prefer state machine update
self.connecting = False
def set_alternate_location(self, location_string: str) -> None:
self.alternate_location = location_string
@ -362,6 +366,7 @@ class OnionPeer(object):
self._status = destn_status
# the handshakes are always initiated by a client:
if destn_status == PEER_STATUS_CONNECTED:
self.connecting = False
log.info("We, {}, are calling the handshake callback as client.".format(
self.messagechannel.self_as_peer.peer_location()))
self.handshake_callback(self)
@ -465,6 +470,9 @@ class OnionPeer(object):
""" This method is called to connect, over Tor, to the remote
peer at the given onion host/port.
"""
if self.connecting:
return
self.connecting = True
if self._status in [PEER_STATUS_HANDSHAKED, PEER_STATUS_CONNECTED]:
return
if not (self.hostname and self.port > 0):
@ -497,9 +505,10 @@ class OnionPeer(object):
self.reconnecting_service.startService()
def respond_to_connection_failure(self, failure):
# the error should be of this type specifically, if the onion
# is down, or was configured wrong:
failure.trap(HostUnreachableError)
self.connecting = False
# the error will be one of these if we just fail
# to connect to the other side.
f = failure.trap(HostUnreachableError, SocksError, GeneralServerFailureError)
# if this is a non-dir reachable peer, we just record
# the failure and explicitly give up:
if not self.directory:
@ -737,6 +746,19 @@ class OnionMessageChannel(MessageChannel):
else:
self._send(dp, msg)
def should_try_to_connect(self, peer: OnionPeer) -> bool:
if not peer:
return False
if peer.peer_location() == NOT_SERVING_ONION_HOSTNAME:
return False
if peer.directory:
return False
if peer == self.self_as_peer:
return False
if peer.status() in [PEER_STATUS_CONNECTED, PEER_STATUS_HANDSHAKED]:
return False
return True
def _privmsg(self, nick: str, cmd: str, msg:str) -> None:
# in certain test scenarios the directory may try to transfer
# commitments to itself:
@ -746,8 +768,15 @@ class OnionMessageChannel(MessageChannel):
return
encoded_privmsg = OnionCustomMessage(self.get_privmsg(nick, cmd, msg),
JM_MESSAGE_TYPES["privmsg"])
peer = self.get_peer_by_nick(nick)
if not peer or peer.status() != PEER_STATUS_HANDSHAKED:
peer_exists = self.get_peer_by_nick(nick, conn_only=False)
peer_sendable = self.get_peer_by_nick(nick)
# opportunistically connect to peers that have talked to us
# (evidenced by the peer existing, which must be because we got
# a `peerlist` message for it), and that we want to talk to
# (evidenced by the call to this function)
if self.should_try_to_connect(peer_exists):
reactor.callLater(0.0, peer_exists.try_to_connect)
if not peer_sendable:
# If we are trying to message a peer via their nick, we
# may not yet have a connection; then we just
# forward via directory nodes.
@ -755,12 +784,12 @@ class OnionMessageChannel(MessageChannel):
"sending via directory.".format(nick))
try:
# TODO change this to redundant or switching
peer = self.get_connected_directory_peers()[0]
peer_sendable = self.get_connected_directory_peers()[0]
except Exception as e:
log.warn("Failed to send privmsg because no "
"directory peer is connected. Error: {}".format(repr(e)))
return
self._send(peer, encoded_privmsg)
self._send(peer_sendable, encoded_privmsg)
def _announce_orders(self, offerlist: list) -> None:
for offer in offerlist:
@ -862,8 +891,14 @@ class OnionMessageChannel(MessageChannel):
def get_directory_peers(self) -> list:
return [p for p in self.peers if p.directory is True]
def get_peer_by_nick(self, nick:str) -> Union[OnionPeer, None]:
for p in self.get_all_connected_peers():
def get_peer_by_nick(self, nick:str, conn_only:bool=True) -> Union[OnionPeer, None]:
""" Return an OnionPeer object matching the given Joinmarket
nick; if `conn_only` is True, we restrict to only those peers
in state PEER_STATUS_HANDSHAKED, else we allow any peer.
If no such peer can be found, return None.
"""
plist = self.get_all_connected_peers() if conn_only else self.peers
for p in plist:
if p.nick == nick:
return p
@ -1260,15 +1295,6 @@ class OnionMessageChannel(MessageChannel):
temp_p.update_status(PEER_STATUS_DISCONNECTED)
if with_nick:
temp_p.set_nick(nick)
if not connection:
# Here, we are not currently connected. We
# try to connect asynchronously. We don't pay attention
# to any return. This attempt is one-shot and opportunistic,
# for non-dns, but will retry with exp-backoff for dns.
# Notice this is only possible for non-dns to other non-dns,
# since dns will never reach this point without an active
# connection.
reactor.callLater(0.0, temp_p.try_to_connect)
return temp_p
else:
p = self.get_peer_by_id(temp_p.peer_location())

Loading…
Cancel
Save