@ -65,18 +65,18 @@ class SynchronizerBase(NetworkJobOnDefaultServer):
def _reset ( self ) :
super ( ) . _reset ( )
self . _adding_addrs = set ( )
self . requested_addrs = set ( )
self . _handling_addr_statuses = set ( )
self . scripthash_to_address = { }
self . _processed_some_notifications = False # so that we don't miss them
# Queues
self . add_queue = asyncio . Queue ( )
self . status_queue = asyncio . Queue ( )
async def _run_tasks ( self , * , taskgroup ) :
await super ( ) . _run_tasks ( taskgroup = taskgroup )
try :
async with taskgroup as group :
await group . spawn ( self . send_subscriptions ( ) )
await group . spawn ( self . handle_status ( ) )
await group . spawn ( self . main ( ) )
finally :
@ -84,43 +84,44 @@ class SynchronizerBase(NetworkJobOnDefaultServer):
self . session . unsubscribe ( self . status_queue )
def add ( self , addr ) :
# FIXME is_up_to_date does not take addr into account until _add_address executes
if not is_address ( addr ) : raise ValueError ( f " invalid bitcoin address { addr } " )
self . _adding_addrs . add ( addr ) # this lets is_up_to_date already know about addr
asyncio . run_coroutine_threadsafe ( self . _add_address ( addr ) , self . asyncio_loop )
async def _add_address ( self , addr : str ) :
# note: this method is async as add_queue.put_nowait is not thread-safe.
if not is_address ( addr ) : raise ValueError ( f " invalid bitcoin address { addr } " )
if addr in self . requested_addrs : return
self . requested_addrs . add ( addr )
self . add_queue . put_nowait ( addr )
try :
if not is_address ( addr ) : raise ValueError ( f " invalid bitcoin address { addr } " )
if addr in self . requested_addrs : return
self . requested_addrs . add ( addr )
await self . taskgroup . spawn ( self . _subscribe_to_address , addr )
finally :
self . _adding_addrs . discard ( addr ) # ok for addr not to be present
async def _on_address_status ( self , addr , status ) :
""" Handle the change of the status of an address. """
""" Handle the change of the status of an address.
Should remove addr from self . _handling_addr_statuses when done .
"""
raise NotImplementedError ( ) # implemented by subclasses
async def send_subscriptions ( self ) :
async def subscribe_to_address ( addr ) :
h = address_to_scripthash ( addr )
self . scripthash_to_address [ h ] = addr
self . _requests_sent + = 1
try :
async with self . _network_request_semaphore :
await self . session . subscribe ( ' blockchain.scripthash.subscribe ' , [ h ] , self . status_queue )
except RPCError as e :
if e . message == ' history too large ' : # no unique error code
raise GracefulDisconnect ( e , log_level = logging . ERROR ) from e
raise
self . _requests_answered + = 1
self . requested_addrs . remove ( addr )
while True :
addr = await self . add_queue . get ( )
await self . taskgroup . spawn ( subscribe_to_address , addr )
async def _subscribe_to_address ( self , addr ) :
h = address_to_scripthash ( addr )
self . scripthash_to_address [ h ] = addr
self . _requests_sent + = 1
try :
async with self . _network_request_semaphore :
await self . session . subscribe ( ' blockchain.scripthash.subscribe ' , [ h ] , self . status_queue )
except RPCError as e :
if e . message == ' history too large ' : # no unique error code
raise GracefulDisconnect ( e , log_level = logging . ERROR ) from e
raise
self . _requests_answered + = 1
async def handle_status ( self ) :
while True :
h , status = await self . status_queue . get ( )
addr = self . scripthash_to_address [ h ]
self . _handling_addr_statuses . add ( addr )
self . requested_addrs . discard ( addr ) # ok for addr not to be present
await self . taskgroup . spawn ( self . _on_address_status , addr , status )
self . _processed_some_notifications = True
@ -142,6 +143,7 @@ class Synchronizer(SynchronizerBase):
def _reset ( self ) :
super ( ) . _reset ( )
self . _init_done = False
self . requested_tx = { }
self . requested_histories = set ( )
self . _stale_histories = dict ( ) # type: Dict[str, asyncio.Task]
@ -150,22 +152,29 @@ class Synchronizer(SynchronizerBase):
return self . adb . diagnostic_name ( )
def is_up_to_date ( self ) :
return ( not self . requested_addrs
return ( self . _init_done
and not self . _adding_addrs
and not self . requested_addrs
and not self . _handling_addr_statuses
and not self . requested_histories
and not self . requested_tx
and not self . _stale_histories )
and not self . _stale_histories
and self . status_queue . empty ( ) )
async def _on_address_status ( self , addr , status ) :
history = self . adb . db . get_addr_history ( addr )
if history_status ( history ) == status :
return
# No point in requesting history twice for the same announced status.
# However if we got announced a new status, we should request history again:
if ( addr , status ) in self . requested_histories :
return
# request address history
self . requested_histories . add ( ( addr , status ) )
self . _stale_histories . pop ( addr , asyncio . Future ( ) ) . cancel ( )
try :
history = self . adb . db . get_addr_history ( addr )
if history_status ( history ) == status :
return
# No point in requesting history twice for the same announced status.
# However if we got announced a new status, we should request history again:
if ( addr , status ) in self . requested_histories :
return
# request address history
self . requested_histories . add ( ( addr , status ) )
self . _stale_histories . pop ( addr , asyncio . Future ( ) ) . cancel ( )
finally :
self . _handling_addr_statuses . discard ( addr )
h = address_to_scripthash ( addr )
self . _requests_sent + = 1
async with self . _network_request_semaphore :
@ -236,7 +245,7 @@ class Synchronizer(SynchronizerBase):
self . logger . info ( f " received tx { tx_hash } height: { tx_height } bytes: { len ( raw_tx ) } " )
async def main ( self ) :
self . adb . set_ up_to_date( False )
self . adb . up_to_date_changed ( )
# request missing txns, if any
for addr in random_shuffled_copy ( self . adb . db . get_history ( ) ) :
history = self . adb . db . get_addr_history ( addr )
@ -248,16 +257,17 @@ class Synchronizer(SynchronizerBase):
for addr in random_shuffled_copy ( self . adb . get_addresses ( ) ) :
await self . _add_address ( addr )
# main loop
self . _init_done = True
prev_uptodate = False
while True :
await asyncio . sleep ( 0.1 )
hist_done = self . is_up_to_date ( )
spv_done = self . adb . verifier . is_up_to_date ( ) if self . adb . verifier else True
up_to_date = hist_done and spv_done
up_to_date = self . adb . is_up_to_date ( )
# see if status changed
if ( up_to_date != self . adb . is_up_to_date ( )
if ( up_to_date != prev_uptodate
or up_to_date and self . _processed_some_notifications ) :
self . _processed_some_notifications = False
self . adb . set_up_to_date ( up_to_date )
self . adb . up_to_date_changed ( )
prev_uptodate = up_to_date
class Notifier ( SynchronizerBase ) :