@ -231,7 +231,7 @@ class OnionLineProtocolFactory(protocol.ServerFactory):
proto . message ( message )
return True
class OnionClientFactory ( protocol . Reconnecting ClientFactory) :
class OnionClientFactory ( protocol . ClientFactory ) :
""" We define a distinct protocol factory for outbound connections.
Notably , this factory supports only * one * protocol instance at a time .
"""
@ -267,8 +267,8 @@ class OnionClientFactory(protocol.ReconnectingClientFactory):
if self . directory and not self . mc . give_up :
if reactor . running :
log . info ( ' Attempting to reconnect... ' )
protocol . Reconnecting ClientFactory. clientConnectionLost ( self ,
connector , reason )
protocol . ClientFactory . clientConnectionLost ( self ,
connector , reason )
def clientConnectionFailed ( self , connector , reason ) :
log . info ( ' Onion client connection failed: ' + str ( reason ) )
@ -276,8 +276,8 @@ class OnionClientFactory(protocol.ReconnectingClientFactory):
if self . directory and not self . mc . give_up :
if reactor . running :
log . info ( ' Attempting to reconnect... ' )
protocol . Reconnecting ClientFactory. clientConnectionFailed ( self ,
connector , reason )
protocol . ClientFactory . clientConnectionFailed ( self ,
connector , reason )
def register_connection ( self , p : OnionLineProtocol ) - > None :
self . proto_client = p
self . connection_callback ( )
@ -302,6 +302,8 @@ class OnionClientFactory(protocol.ReconnectingClientFactory):
self . message_receive_callback ( message )
class OnionPeer ( object ) :
""" Class encapsulating a peer we connect to.
"""
def __init__ ( self , messagechannel : ' OnionMessageChannel ' ,
socks5_host : str , socks5_port : int ,
@ -513,33 +515,31 @@ class OnionPeer(object):
self . port )
self . reconnecting_service = ClientService ( onionEndpoint , self . factory )
# if we want to actually do something about an unreachable host,
# we have to force t.a.i.ClientService to give up after the timeout:
# we have to force t.a.i.ClientService to give up after the timeout
d = self . reconnecting_service . whenConnected ( failAfterFailures = 1 )
d . addErrback ( self . respond_to_connection_failure )
d . addCallbacks ( self . respond_to_connection_success ,
self . respond_to_connection_failure )
self . reconnecting_service . startService ( )
def respond_to_connection_failure ( self , failure ) :
def respond_to_connection_success ( self , proto ) - > None :
self . connecting = False
def respond_to_connection_failure ( self , failure ) - > None :
self . connecting = False
# the error will be one of these if we just fail
# to connect to the other side.
f = failure . trap ( HostUnreachableError , SocksError , GeneralServerFailureError )
# if this is a non-dir reachable peer, we just record
# the failure and explicitly give up:
if not self . directory :
log . info ( " We failed to connect to peer {} ; giving up " . format (
self . peer_location ( ) ) )
self . reconnecting_service . stopService ( )
else :
# in this case, the still-running ClientService will
# just keep trying:
log . warn ( " We failed to connect to directory {} ; trying "
" again. " . format ( self . peer_location ( ) ) )
failure . trap ( HostUnreachableError , SocksError , GeneralServerFailureError )
comment = " " if self . directory else " ; giving up. "
log . info ( f " Failed to connect to peer { self . peer_location ( ) } { comment } " )
self . reconnecting_service . stopService ( )
def register_connection ( self ) - > None :
self . messagechannel . register_connection ( self . peer_location ( ) ,
direction = 1 )
def register_disconnection ( self ) - > None :
# for non-directory peers, just stop
self . reconnecting_service . stopService ( )
self . messagechannel . register_disconnection ( self . peer_location ( ) )
def try_to_connect ( self ) - > None :
@ -583,19 +583,20 @@ class OnionPeerPassive(OnionPeer):
class OnionDirectoryPeer ( OnionPeer ) :
delay = 4.0
def try_to_connect ( self ) - > None :
# Delay deliberately expands out to very
# long times as yg-s tend to be very long
# running bots:
self . delay * = 1.5
if self . delay > 10000 :
log . warn ( " Cannot connect to directory node peer: {} "
" after 20 attempts, giving up. " . format ( self . peer_location ( ) ) )
return
try :
self . connect ( )
except OnionPeerConnectionError :
reactor . callLater ( self . delay , self . try_to_ connect)
# We will only expand delay 20 times max
# (4 * 1.5^19 = 8867.3)
if self . delay < 8868 :
self . delay * = 1.5
# randomize by a few seconds to minimize bursty-ness locally
jitter = random . randint ( - 1 , 5 )
log . info ( f " Going to reattempt connection to { self . peer_location ( ) } in "
f " { self . delay + jitter } seconds. " )
reactor . callLater ( self . delay + jitter , self . connect )
def register_connection ( self ) - > None :
self . messagechannel . update_directory_map ( self , connected = True )
@ -604,7 +605,14 @@ class OnionDirectoryPeer(OnionPeer):
def register_disconnection ( self ) - > None :
self . messagechannel . update_directory_map ( self , connected = False )
super ( ) . register_disconnection ( )
# for directory peers, we persist in trying to establish
# a connection, but with backoff:
self . try_to_connect ( )
def respond_to_connection_failure ( self , failure ) - > None :
super ( ) . respond_to_connection_failure ( failure )
# same logic as for register_disconnection
self . try_to_connect ( )
class OnionMessageChannel ( MessageChannel ) :
""" Sends messages to other nodes of the same type over Tor