@ -83,15 +83,17 @@ class SynchronizerBase(NetworkJobOnDefaultServer):
# we are being cancelled now
# we are being cancelled now
self . session . unsubscribe ( self . status_queue )
self . session . unsubscribe ( self . status_queue )
def add ( self , addr ) :
def add_address ( self , addr : str ) - > None :
asyncio . run_coroutine_threadsafe ( self . _add_address ( addr ) , self . asyncio_loop )
""" Add an address to subscribe to.
Thread - safe . When the method returns , the Synchronizer ( e . g . is_up_to_date ) will know about the address .
async def _add_address ( self , addr : str ) :
"""
# note: this method is async as add_queue.put_nowait is not thread-safe.
if not is_address ( addr ) : raise ValueError ( f " invalid bitcoin address { addr } " )
if not is_address ( addr ) : raise ValueError ( f " invalid bitcoin address { addr } " )
if addr in self . requested_addrs : return
if addr in self . requested_addrs : return
self . requested_addrs . add ( addr )
self . requested_addrs . add ( addr )
async def enqueue ( ) :
# note: this method is async as add_queue.put_nowait is not thread-safe.
self . add_queue . put_nowait ( addr )
self . add_queue . put_nowait ( addr )
asyncio . run_coroutine_threadsafe ( enqueue ( ) , self . asyncio_loop )
async def _on_address_status ( self , addr , status ) :
async def _on_address_status ( self , addr , status ) :
""" Handle the change of the status of an address. """
""" Handle the change of the status of an address. """
@ -245,7 +247,7 @@ class Synchronizer(SynchronizerBase):
await self . _request_missing_txs ( history , allow_server_not_finding_tx = True )
await self . _request_missing_txs ( history , allow_server_not_finding_tx = True )
# add addresses to bootstrap
# add addresses to bootstrap
for addr in random_shuffled_copy ( self . adb . get_addresses ( ) ) :
for addr in random_shuffled_copy ( self . adb . get_addresses ( ) ) :
await self . _ add_address( addr )
self . add_address ( addr )
# main loop
# main loop
while True :
while True :
await asyncio . sleep ( 0.1 )
await asyncio . sleep ( 0.1 )
@ -271,12 +273,12 @@ class Notifier(SynchronizerBase):
async def main ( self ) :
async def main ( self ) :
# resend existing subscriptions if we were restarted
# resend existing subscriptions if we were restarted
for addr in self . watched_addresses :
for addr in self . watched_addresses :
await self . _ add_address( addr )
self . add_address ( addr )
# main loop
# main loop
while True :
while True :
addr , url = await self . _start_watching_queue . get ( )
addr , url = await self . _start_watching_queue . get ( )
self . watched_addresses [ addr ] . append ( url )
self . watched_addresses [ addr ] . append ( url )
await self . _ add_address( addr )
self . add_address ( addr )
async def start_watching_addr ( self , addr : str , url : str ) :
async def start_watching_addr ( self , addr : str , url : str ) :
await self . _start_watching_queue . put ( ( addr , url ) )
await self . _start_watching_queue . put ( ( addr , url ) )