@ -10,7 +10,7 @@ import decimal
from decimal import Decimal
from typing import Sequence , Optional , Mapping , Dict , Union , Any
from aiorpcx . curio import timeout_after , TaskTimeout
from aiorpcx . curio import timeout_after , TaskTimeout , ignore_after
import aiohttp
from . import util
@ -18,6 +18,7 @@ from .bitcoin import COIN
from . i18n import _
from . util import ( ThreadJob , make_dir , log_exceptions , OldTaskGroup ,
make_aiohttp_session , resource_path , EventListener , event_listener , to_decimal )
from . util import NetworkRetryManager
from . network import Network
from . simple_config import SimpleConfig
from . logging import Logger
@ -34,8 +35,9 @@ CCY_PRECISIONS = {'BHD': 3, 'BIF': 0, 'BYR': 0, 'CLF': 4, 'CLP': 0,
' BTC ' : 8 , ' LTC ' : 8 , ' XRP ' : 6 , ' ETH ' : 18 ,
}
POLL_PERIOD_SPOT_RATE = 150 # approx. every 2.5 minutes, try to refresh spot price
EXPIRY_SPOT_RATE = 600 # spot price becomes stale after 10 minutes
SPOT_RATE_REFRESH_TARGET = 150 # approx. every 2.5 minutes, try to refresh spot price
SPOT_RATE_CLOSE_TO_STALE = 450 # try harder to fetch an update if price is getting old
SPOT_RATE_EXPIRY = 600 # spot price becomes stale after 10 minutes -> we no longer show/use it
class ExchangeBase ( Logger ) :
@ -83,13 +85,16 @@ class ExchangeBase(Logger):
self . _quotes = await self . get_rates ( ccy )
assert all ( isinstance ( rate , ( Decimal , type ( None ) ) ) for rate in self . _quotes . values ( ) ) , \
f " fx rate must be Decimal, got { self . _quotes } "
self . _quotes_timestamp = time . time ( )
self . logger . info ( " received fx quotes " )
except ( aiohttp . ClientError , asyncio . TimeoutError ) as e :
self . logger . info ( f " failed fx quotes: { repr ( e ) } " )
self . on_quotes ( )
except Exception as e :
self . logger . exception ( f " failed fx quotes: { repr ( e ) } " )
self . on_quotes ( )
self . on_quotes ( )
else :
self . logger . info ( " received fx quotes " )
self . _quotes_timestamp = time . time ( )
self . on_quotes ( received_new_data = True )
def read_historical_rates ( self , ccy : str , cache_dir : str ) - > Optional [ dict ] :
filename = os . path . join ( cache_dir , self . name ( ) + ' _ ' + ccy )
@ -169,7 +174,7 @@ class ExchangeBase(Logger):
rate = self . _quotes . get ( ccy )
if rate is None :
return Decimal ( ' NaN ' )
if self . _quotes_timestamp + EXPIRY_ SPOT_RATE < time . time ( ) :
if self . _quotes_timestamp + SPOT_RATE_EXPIRY < time . time ( ) :
# Our rate is stale. Probably better to return no rate than an incorrect one.
return Decimal ( ' NaN ' )
return Decimal ( rate )
@ -504,10 +509,17 @@ def get_exchanges_by_ccy(history=True):
return dictinvert ( d )
class FxThread ( ThreadJob , EventListener ) :
class FxThread ( ThreadJob , EventListener , NetworkRetryManager [ str ] ) :
def __init__ ( self , * , config : SimpleConfig ) :
ThreadJob . __init__ ( self )
NetworkRetryManager . __init__ (
self ,
max_retry_delay_normal = SPOT_RATE_REFRESH_TARGET ,
init_retry_delay_normal = SPOT_RATE_REFRESH_TARGET ,
max_retry_delay_urgent = SPOT_RATE_REFRESH_TARGET ,
init_retry_delay_urgent = 1 ,
) # note: we poll every 5 seconds for action, so we won't attempt connections more frequently than that.
self . config = config
self . register_callbacks ( )
self . ccy = self . get_currency ( )
@ -522,6 +534,7 @@ class FxThread(ThreadJob, EventListener):
@event_listener
def on_event_proxy_set ( self , * args ) :
self . _clear_addr_retry_times ( )
self . _trigger . set ( )
@staticmethod
@ -559,17 +572,28 @@ class FxThread(ThreadJob, EventListener):
async def run ( self ) :
while True :
# every few minutes, refresh spot price
try :
async with timeout_after ( POLL_PERIOD_SPOT_RATE ) :
await self . _trigger . wait ( )
self . _trigger . clear ( )
# we were manually triggered, so get historical rates
if self . is_enabled ( ) and self . has_history ( ) :
self . exchange . get_historical_rates ( self . ccy , self . cache_dir )
except TaskTimeout :
pass
if self . is_enabled ( ) :
# keep polling and see if we should refresh spot price or historical prices
manually_triggered = False
async with ignore_after ( 5 ) :
await self . _trigger . wait ( )
self . _trigger . clear ( )
manually_triggered = True
if not self . is_enabled ( ) :
continue
if manually_triggered and self . has_history ( ) : # maybe refresh historical prices
self . exchange . get_historical_rates ( self . ccy , self . cache_dir )
now = time . time ( )
if not manually_triggered and self . exchange . _quotes_timestamp + SPOT_RATE_REFRESH_TARGET > now :
continue # last quote still fresh
# If the last quote is relatively recent, we poll at fixed time intervals.
# Once it gets close to cache expiry, we change to an exponential backoff, to try to get
# a quote before it expires. Also, on Android, we might come back from a sleep after a long time,
# with the last quote close to expiry or already expired, in that case we go into exponential backoff.
is_urgent = self . exchange . _quotes_timestamp + SPOT_RATE_CLOSE_TO_STALE < now
addr_name = " spot-urgent " if is_urgent else " spot " # this separates retry-counters
if self . _can_retry_addr ( addr_name , urgent = is_urgent ) :
self . _trying_addr_now ( addr_name )
# refresh spot price
await self . exchange . update_safe ( self . ccy )
def is_enabled ( self ) - > bool :
@ -599,6 +623,7 @@ class FxThread(ThreadJob, EventListener):
self . on_quotes ( )
def trigger_update ( self ) :
self . _clear_addr_retry_times ( )
loop = util . get_asyncio_loop ( )
loop . call_soon_threadsafe ( self . _trigger . set )
@ -614,7 +639,9 @@ class FxThread(ThreadJob, EventListener):
self . trigger_update ( )
self . exchange . read_historical_rates ( self . ccy , self . cache_dir )
def on_quotes ( self ) :
def on_quotes ( self , * , received_new_data : bool = False ) :
if received_new_data :
self . _clear_addr_retry_times ( )
util . trigger_callback ( ' on_quotes ' )
def on_history ( self ) :