@ -180,32 +180,50 @@ class WalletRPCTestBaseFB(WalletRPCTestBase):
# we are using fresh (empty) wallets for these tests
wallet_structure = [ 0 , 0 , 0 , 0 , 0 ]
class ClientNotifTestProto ( ClientTProtocol ) :
def sendAuth ( self ) :
task . deferLater ( reactor , self . factory . delay ,
self . factory . callbackfn )
super ( ) . sendAuth ( )
class ClientNotifTestFactory ( WebSocketClientFactory ) :
def __init__ ( self , * args , * * kwargs ) :
if " delay " in kwargs :
self . delay = kwargs . pop ( " delay " , None )
if " callbackfn " in kwargs :
self . callbackfn = kwargs . pop ( " callbackfn " , None )
super ( ) . __init__ ( * args , * * kwargs )
class TrialTestWRPC_WS ( WalletRPCTestBase , unittest . TestCase ) :
""" class for testing websocket subscriptions/events etc.
"""
def test_notif ( self ) :
# simulate the daemon already having created
# a valid token (which it usually does when
# starting the WalletService:
self . daemon . wss_factory . valid_token = encoded_token
self . client_factory = WebSocketClientFactory (
" ws://127.0.0.1: " + str ( self . wss_port ) )
self . client_factory . protocol = ClientTProtocol
# once the websocket connection is established, and auth
# is sent, our custom clientfactory will fire the tx
# notification via the callback passed as argument here;
# and we wait for the receipt in the code below:
self . client_factory = ClientNotifTestFactory (
" ws://127.0.0.1: " + str ( self . wss_port ) ,
delay = 0.1 , callbackfn = self . fire_tx_notif )
self . client_factory . protocol = ClientNotifTestProto
self . client_connector = connectWS ( self . client_factory )
d = task . deferLater ( reactor , 0.1 , self . fire_tx_notif )
# create a small delay between the instruction to send
# the notification, and the checking of its receipt,
# otherwise the client will be queried before the notification
# arrived. We will try a few times before giving up.
self . attempt_receipt_counter = 0
d . addCallback ( self . wait_to_receive )
return d
return task . deferLater ( reactor , 0.0 , self . wait_to_receive )
def wait_to_receive ( self , res ) :
def wait_to_receive ( self ) :
d = task . deferLater ( reactor , 0.1 , self . checkNotifs )
return d
def checkNotifs ( self ) :
# We wait and monitor if the notification has been received,
# but give up after 10 attempts spaced by 0.2 seconds each.
# It should usually succeed on the first try.
if self . attempt_receipt_counter > 10 :
assert False
if not self . client_factory . notifs == 1 :