diff --git a/electrum/exchange_rate.py b/electrum/exchange_rate.py index 801ce47a8..f4268905f 100644 --- a/electrum/exchange_rate.py +++ b/electrum/exchange_rate.py @@ -10,7 +10,7 @@ import decimal from decimal import Decimal from typing import Sequence, Optional, Mapping, Dict, Union, Any -from aiorpcx.curio import timeout_after, TaskTimeout +from aiorpcx.curio import timeout_after, TaskTimeout, ignore_after import aiohttp from . import util @@ -18,6 +18,7 @@ from .bitcoin import COIN from .i18n import _ from .util import (ThreadJob, make_dir, log_exceptions, OldTaskGroup, make_aiohttp_session, resource_path, EventListener, event_listener, to_decimal) +from .util import NetworkRetryManager from .network import Network from .simple_config import SimpleConfig from .logging import Logger @@ -34,8 +35,9 @@ CCY_PRECISIONS = {'BHD': 3, 'BIF': 0, 'BYR': 0, 'CLF': 4, 'CLP': 0, 'BTC': 8, 'LTC': 8, 'XRP': 6, 'ETH': 18, } -POLL_PERIOD_SPOT_RATE = 150 # approx. every 2.5 minutes, try to refresh spot price -EXPIRY_SPOT_RATE = 600 # spot price becomes stale after 10 minutes +SPOT_RATE_REFRESH_TARGET = 150 # approx. every 2.5 minutes, try to refresh spot price +SPOT_RATE_CLOSE_TO_STALE = 450 # try harder to fetch an update if price is getting old +SPOT_RATE_EXPIRY = 600 # spot price becomes stale after 10 minutes -> we no longer show/use it class ExchangeBase(Logger): @@ -83,13 +85,16 @@ class ExchangeBase(Logger): self._quotes = await self.get_rates(ccy) assert all(isinstance(rate, (Decimal, type(None))) for rate in self._quotes.values()), \ f"fx rate must be Decimal, got {self._quotes}" - self._quotes_timestamp = time.time() - self.logger.info("received fx quotes") except (aiohttp.ClientError, asyncio.TimeoutError) as e: self.logger.info(f"failed fx quotes: {repr(e)}") + self.on_quotes() except Exception as e: self.logger.exception(f"failed fx quotes: {repr(e)}") - self.on_quotes() + self.on_quotes() + else: + self.logger.info("received fx quotes") + self._quotes_timestamp = time.time() + self.on_quotes(received_new_data=True) def read_historical_rates(self, ccy: str, cache_dir: str) -> Optional[dict]: filename = os.path.join(cache_dir, self.name() + '_'+ ccy) @@ -169,7 +174,7 @@ class ExchangeBase(Logger): rate = self._quotes.get(ccy) if rate is None: return Decimal('NaN') - if self._quotes_timestamp + EXPIRY_SPOT_RATE < time.time(): + if self._quotes_timestamp + SPOT_RATE_EXPIRY < time.time(): # Our rate is stale. Probably better to return no rate than an incorrect one. return Decimal('NaN') return Decimal(rate) @@ -504,10 +509,17 @@ def get_exchanges_by_ccy(history=True): return dictinvert(d) -class FxThread(ThreadJob, EventListener): +class FxThread(ThreadJob, EventListener, NetworkRetryManager[str]): def __init__(self, *, config: SimpleConfig): ThreadJob.__init__(self) + NetworkRetryManager.__init__( + self, + max_retry_delay_normal=SPOT_RATE_REFRESH_TARGET, + init_retry_delay_normal=SPOT_RATE_REFRESH_TARGET, + max_retry_delay_urgent=SPOT_RATE_REFRESH_TARGET, + init_retry_delay_urgent=1, + ) # note: we poll every 5 seconds for action, so we won't attempt connections more frequently than that. self.config = config self.register_callbacks() self.ccy = self.get_currency() @@ -522,6 +534,7 @@ class FxThread(ThreadJob, EventListener): @event_listener def on_event_proxy_set(self, *args): + self._clear_addr_retry_times() self._trigger.set() @staticmethod @@ -559,17 +572,28 @@ class FxThread(ThreadJob, EventListener): async def run(self): while True: - # every few minutes, refresh spot price - try: - async with timeout_after(POLL_PERIOD_SPOT_RATE): - await self._trigger.wait() - self._trigger.clear() - # we were manually triggered, so get historical rates - if self.is_enabled() and self.has_history(): - self.exchange.get_historical_rates(self.ccy, self.cache_dir) - except TaskTimeout: - pass - if self.is_enabled(): + # keep polling and see if we should refresh spot price or historical prices + manually_triggered = False + async with ignore_after(5): + await self._trigger.wait() + self._trigger.clear() + manually_triggered = True + if not self.is_enabled(): + continue + if manually_triggered and self.has_history(): # maybe refresh historical prices + self.exchange.get_historical_rates(self.ccy, self.cache_dir) + now = time.time() + if not manually_triggered and self.exchange._quotes_timestamp + SPOT_RATE_REFRESH_TARGET > now: + continue # last quote still fresh + # If the last quote is relatively recent, we poll at fixed time intervals. + # Once it gets close to cache expiry, we change to an exponential backoff, to try to get + # a quote before it expires. Also, on Android, we might come back from a sleep after a long time, + # with the last quote close to expiry or already expired, in that case we go into exponential backoff. + is_urgent = self.exchange._quotes_timestamp + SPOT_RATE_CLOSE_TO_STALE < now + addr_name = "spot-urgent" if is_urgent else "spot" # this separates retry-counters + if self._can_retry_addr(addr_name, urgent=is_urgent): + self._trying_addr_now(addr_name) + # refresh spot price await self.exchange.update_safe(self.ccy) def is_enabled(self) -> bool: @@ -599,6 +623,7 @@ class FxThread(ThreadJob, EventListener): self.on_quotes() def trigger_update(self): + self._clear_addr_retry_times() loop = util.get_asyncio_loop() loop.call_soon_threadsafe(self._trigger.set) @@ -614,7 +639,9 @@ class FxThread(ThreadJob, EventListener): self.trigger_update() self.exchange.read_historical_rates(self.ccy, self.cache_dir) - def on_quotes(self): + def on_quotes(self, *, received_new_data: bool = False): + if received_new_data: + self._clear_addr_retry_times() util.trigger_callback('on_quotes') def on_history(self):