@ -8,7 +8,7 @@ import time
import csv
import decimal
from decimal import Decimal
from typing import Sequence , Optional , Mapping , Dict , Union , Any
from typing import Sequence , Optional , Mapping , Dict , Union , Any , Tuple
from aiorpcx . curio import timeout_after , TaskTimeout , ignore_after
import aiohttp
@ -97,30 +97,49 @@ class ExchangeBase(Logger):
self . _quotes_timestamp = time . time ( )
self . on_quotes ( received_new_data = True )
def read_historical_rates ( self , ccy : str , cache_dir : str ) - > Optional [ dict ] :
filename = os . path . join ( cache_dir , self . name ( ) + ' _ ' + ccy )
@staticmethod
def _read_historical_rates_from_file (
* , exchange_name : str , ccy : str , cache_dir : str ,
) - > Tuple [ Optional [ dict ] , Optional [ float ] ] :
filename = os . path . join ( cache_dir , f " { exchange_name } _ { ccy } " )
if not os . path . exists ( filename ) :
return None
return None , None
timestamp = os . stat ( filename ) . st_mtime
try :
with open ( filename , ' r ' , encoding = ' utf-8 ' ) as f :
h = json . loads ( f . read ( ) )
except Exception :
return None
return None , None
if not h : # e.g. empty dict
return None
return None , None
# cast rates to str
h = { date_str : str ( rate ) for ( date_str , rate ) in h . items ( ) }
return h , timestamp
def read_historical_rates ( self , ccy : str , cache_dir : str ) - > Optional [ dict ] :
h , timestamp = self . _read_historical_rates_from_file (
exchange_name = self . name ( ) ,
ccy = ccy ,
cache_dir = cache_dir ,
)
h [ ' timestamp ' ] = timestamp
self . _history [ ccy ] = h
self . on_history ( )
return h
@staticmethod
def _write_historical_rates_to_file (
* , exchange_name : str , ccy : str , cache_dir : str , history : Dict [ str , str ] ,
) - > None :
filename = os . path . join ( cache_dir , f " { exchange_name } _ { ccy } " )
with open ( filename , ' w ' , encoding = ' utf-8 ' ) as f :
f . write ( json . dumps ( history ) )
@log_exceptions
async def get_historical_rates_safe ( self , ccy : str , cache_dir : str ) - > None :
try :
self . logger . info ( f " requesting fx history for { ccy } " )
h = await self . request_history ( ccy )
h_new = await self . request_history ( ccy )
self . logger . info ( f " received fx history for { ccy } " )
except ( aiohttp . ClientError , asyncio . TimeoutError , OSError ) as e :
self . logger . info ( f " failed fx history: { repr ( e ) } " )
@ -129,10 +148,16 @@ class ExchangeBase(Logger):
self . logger . exception ( f " failed fx history: { repr ( e ) } " )
return
# cast rates to str
h = { date_str : str ( rate ) for ( date_str , rate ) in h . items ( ) }
filename = os . path . join ( cache_dir , self . name ( ) + ' _ ' + ccy )
with open ( filename , ' w ' , encoding = ' utf-8 ' ) as f :
f . write ( json . dumps ( h ) )
h_new = { date_str : str ( rate ) for ( date_str , rate ) in h_new . items ( ) }
# merge old history and new history. resolve duplicate dates using new data.
h_old , _timestamp = self . _read_historical_rates_from_file (
exchange_name = self . name ( ) , ccy = ccy , cache_dir = cache_dir ,
)
h = { * * h_old , * * h_new }
# write merged data to disk cache
self . _write_historical_rates_to_file (
exchange_name = self . name ( ) , ccy = ccy , cache_dir = cache_dir , history = h ,
)
h [ ' timestamp ' ] = time . time ( )
self . _history [ ccy ] = h
self . on_history ( )
@ -352,8 +377,12 @@ class CoinGecko(ExchangeBase):
return CURRENCIES [ self . name ( ) ]
async def request_history ( self , ccy ) :
num_days = 365
# Setting `num_days = "max"` started erroring (around 2024-04) with:
# > Your request exceeds the allowed time range. Public API users are limited to querying
# > historical data within the past 365 days. Upgrade to a paid plan to enjoy full historical data access
history = await self . get_json ( ' api.coingecko.com ' ,
' /api/v3/coins/bitcoin/market_chart?vs_currency= %s &days=max ' % ccy )
f " /api/v3/coins/bitcoin/market_chart?vs_currency= { ccy } &days= { num_days } " )
return dict ( [ ( timestamp_to_datetime ( h [ 0 ] / 1000 , utc = True ) . strftime ( ' % Y- % m- %d ' ) , str ( h [ 1 ] ) )
for h in history [ ' prices ' ] ] )