diff --git a/electrum/commands.py b/electrum/commands.py index 6058eed70..9de9b3a6a 100644 --- a/electrum/commands.py +++ b/electrum/commands.py @@ -187,7 +187,7 @@ class Commands: kwargs.pop('wallet') coro = f(*args, **kwargs) - fut = asyncio.run_coroutine_threadsafe(coro, asyncio.get_event_loop()) + fut = asyncio.run_coroutine_threadsafe(coro, util.get_asyncio_loop()) result = fut.result() if self._callback: diff --git a/electrum/daemon.py b/electrum/daemon.py index 8bb8fc6b9..cf8d96481 100644 --- a/electrum/daemon.py +++ b/electrum/daemon.py @@ -124,7 +124,7 @@ def request(config: SimpleConfig, endpoint, args=(), timeout=60): rpc_user, rpc_password = get_rpc_credentials(config) server_url = 'http://%s:%d' % (host, port) auth = aiohttp.BasicAuth(login=rpc_user, password=rpc_password) - loop = asyncio.get_event_loop() + loop = util.get_asyncio_loop() async def request_coroutine(): if socktype == 'unix': connector = aiohttp.UnixConnector(path=path) @@ -467,7 +467,7 @@ class Daemon(Logger): if 'wallet_path' in config.cmdline_options: self.logger.warning("Ignoring parameter 'wallet_path' for daemon. " "Use the load_wallet command instead.") - self.asyncio_loop = asyncio.get_event_loop() + self.asyncio_loop = util.get_asyncio_loop() self.network = None if not config.get('offline'): self.network = Network(config, daemon=self) diff --git a/electrum/exchange_rate.py b/electrum/exchange_rate.py index aacc6dee1..5e033e4ea 100644 --- a/electrum/exchange_rate.py +++ b/electrum/exchange_rate.py @@ -148,7 +148,7 @@ class ExchangeBase(Logger): if h is None: h = self.read_historical_rates(ccy, cache_dir) if h is None or h['timestamp'] < time.time() - 24*3600: - asyncio.get_event_loop().create_task(self.get_historical_rates_safe(ccy, cache_dir)) + util.get_asyncio_loop().create_task(self.get_historical_rates_safe(ccy, cache_dir)) def history_ccys(self) -> Sequence[str]: return [] @@ -471,7 +471,7 @@ def get_exchanges_and_currencies(): for name, klass in exchanges.items(): exchange = klass(None, None) await group.spawn(get_currencies_safe(name, exchange)) - loop = asyncio.get_event_loop() + loop = util.get_asyncio_loop() try: loop.run_until_complete(query_all_exchanges_for_their_ccys_over_network()) except Exception as e: diff --git a/electrum/gui/kivy/main_window.py b/electrum/gui/kivy/main_window.py index 70d17a434..163fe9a60 100644 --- a/electrum/gui/kivy/main_window.py +++ b/electrum/gui/kivy/main_window.py @@ -394,7 +394,7 @@ class ElectrumWindow(App, Logger): self.is_exit = False self.wallet = None # type: Optional[Abstract_Wallet] self.pause_time = 0 - self.asyncio_loop = asyncio.get_event_loop() + self.asyncio_loop = util.get_asyncio_loop() self.password = None self._use_single_password = False self.resume_dialog = None diff --git a/electrum/lnworker.py b/electrum/lnworker.py index 42098ad32..91a442589 100644 --- a/electrum/lnworker.py +++ b/electrum/lnworker.py @@ -496,7 +496,7 @@ class LNWorker(Logger, NetworkRetryManager[LNPeerAddr]): # Try DNS-resolving the host (if needed). This is simply so that # the caller gets a nice exception if it cannot be resolved. try: - await asyncio.get_event_loop().getaddrinfo(host, port) + await asyncio.get_running_loop().getaddrinfo(host, port) except socket.gaierror: raise ConnStringFormatError(_('Hostname does not resolve (getaddrinfo failed)')) # add peer diff --git a/electrum/network.py b/electrum/network.py index 8b9c4aca0..cb0c5e1c1 100644 --- a/electrum/network.py +++ b/electrum/network.py @@ -273,7 +273,7 @@ class Network(Logger, NetworkRetryManager[ServerAddr]): init_retry_delay_urgent=1, ) - self.asyncio_loop = asyncio.get_event_loop() + self.asyncio_loop = util.get_asyncio_loop() assert self.asyncio_loop.is_running(), "event loop not running" assert isinstance(config, SimpleConfig), f"config should be a SimpleConfig instead of {type(config)}" @@ -381,9 +381,11 @@ class Network(Logger, NetworkRetryManager[ServerAddr]): self.channel_db = None self.path_finder = None - def run_from_another_thread(self, coro, *, timeout=None): - assert util.get_running_loop() != self.asyncio_loop, 'must not be called from network thread' - fut = asyncio.run_coroutine_threadsafe(coro, self.asyncio_loop) + @classmethod + def run_from_another_thread(cls, coro, *, timeout=None): + loop = util.get_asyncio_loop() + assert util.get_running_loop() != loop, 'must not be called from asyncio thread' + fut = asyncio.run_coroutine_threadsafe(coro, loop) return fut.result(timeout) @staticmethod @@ -1321,7 +1323,7 @@ class Network(Logger, NetworkRetryManager[ServerAddr]): assert util.get_running_loop() != network.asyncio_loop loop = network.asyncio_loop else: - loop = asyncio.get_event_loop() + loop = util.get_asyncio_loop() coro = asyncio.run_coroutine_threadsafe(cls._send_http_on_proxy(method, url, **kwargs), loop) # note: _send_http_on_proxy has its own timeout, so no timeout here: return coro.result() diff --git a/electrum/sql_db.py b/electrum/sql_db.py index 3d9dba7de..a6459fc06 100644 --- a/electrum/sql_db.py +++ b/electrum/sql_db.py @@ -26,7 +26,6 @@ class SqlDB(Logger): def __init__(self, asyncio_loop: asyncio.BaseEventLoop, path, commit_interval=None): Logger.__init__(self) self.asyncio_loop = asyncio_loop - asyncio.set_event_loop(asyncio_loop) self.stopping = False self.stopped_event = asyncio.Event() self.path = path diff --git a/electrum/tests/__init__.py b/electrum/tests/__init__.py index dbfc9ada0..b9c7eefe1 100644 --- a/electrum/tests/__init__.py +++ b/electrum/tests/__init__.py @@ -6,6 +6,7 @@ import shutil import electrum import electrum.logging from electrum import constants +from electrum import util # Set this locally to make the test suite run faster. @@ -37,9 +38,12 @@ class ElectrumTestCase(SequentialTestCase): def setUp(self): super().setUp() + self.asyncio_loop, self._stop_loop, self._loop_thread = util.create_and_start_event_loop() self.electrum_path = tempfile.mkdtemp() def tearDown(self): + self.asyncio_loop.call_soon_threadsafe(self._stop_loop.set_result, 1) + self._loop_thread.join(timeout=1) super().tearDown() shutil.rmtree(self.electrum_path) diff --git a/electrum/tests/test_commands.py b/electrum/tests/test_commands.py index 053976749..c16d2ccd6 100644 --- a/electrum/tests/test_commands.py +++ b/electrum/tests/test_commands.py @@ -2,7 +2,6 @@ import unittest from unittest import mock from decimal import Decimal -from electrum.util import create_and_start_event_loop from electrum.commands import Commands, eval_bool from electrum import storage, wallet from electrum.wallet import restore_wallet_from_text @@ -18,14 +17,8 @@ class TestCommands(ElectrumTestCase): def setUp(self): super().setUp() - self.asyncio_loop, self._stop_loop, self._loop_thread = create_and_start_event_loop() self.config = SimpleConfig({'electrum_path': self.electrum_path}) - def tearDown(self): - super().tearDown() - self.asyncio_loop.call_soon_threadsafe(self._stop_loop.set_result, 1) - self._loop_thread.join(timeout=1) - def test_setconfig_non_auth_number(self): self.assertEqual(7777, Commands._setconfig_normalize_value('rpcport', "7777")) self.assertEqual(7777, Commands._setconfig_normalize_value('rpcport', '7777')) @@ -135,14 +128,8 @@ class TestCommandsTestnet(TestCaseForTestnet): def setUp(self): super().setUp() - self.asyncio_loop, self._stop_loop, self._loop_thread = create_and_start_event_loop() self.config = SimpleConfig({'electrum_path': self.electrum_path}) - def tearDown(self): - super().tearDown() - self.asyncio_loop.call_soon_threadsafe(self._stop_loop.set_result, 1) - self._loop_thread.join(timeout=1) - def test_convert_xkey(self): cmds = Commands(config=self.config) xpubs = { diff --git a/electrum/tests/test_lnpeer.py b/electrum/tests/test_lnpeer.py index 9ba096a1c..de9bf535a 100644 --- a/electrum/tests/test_lnpeer.py +++ b/electrum/tests/test_lnpeer.py @@ -15,13 +15,14 @@ from aiorpcx import timeout_after, TaskTimeout import electrum import electrum.trampoline from electrum import bitcoin +from electrum import util from electrum import constants from electrum.network import Network from electrum.ecc import ECPrivkey from electrum import simple_config, lnutil from electrum.lnaddr import lnencode, LnAddr, lndecode from electrum.bitcoin import COIN, sha256 -from electrum.util import bh2u, create_and_start_event_loop, NetworkRetryManager, bfh, OldTaskGroup +from electrum.util import bh2u, NetworkRetryManager, bfh, OldTaskGroup from electrum.lnpeer import Peer from electrum.lnutil import LNPeerAddr, Keypair, privkey_to_pubkey from electrum.lnutil import PaymentFailure, LnFeatures, HTLCOwner @@ -62,7 +63,7 @@ class MockNetwork: user_config = {} user_dir = tempfile.mkdtemp(prefix="electrum-lnpeer-test-") self.config = simple_config.SimpleConfig(user_config, read_user_dir_function=lambda: user_dir) - self.asyncio_loop = asyncio.get_event_loop() + self.asyncio_loop = util.get_asyncio_loop() self.channel_db = ChannelDB(self) self.channel_db.data_loaded.set() self.path_finder = LNPathFinder(self.channel_db) @@ -368,7 +369,6 @@ class TestPeer(TestCaseForTestnet): def setUp(self): super().setUp() - self.asyncio_loop, self._stop_loop, self._loop_thread = create_and_start_event_loop() self._lnworkers_created = [] # type: List[MockLNWallet] def tearDown(self): @@ -379,8 +379,6 @@ class TestPeer(TestCaseForTestnet): self._lnworkers_created.clear() run(cleanup_lnworkers()) - self.asyncio_loop.call_soon_threadsafe(self._stop_loop.set_result, 1) - self._loop_thread.join(timeout=1) super().tearDown() def prepare_peers(self, alice_channel: Channel, bob_channel: Channel): @@ -1361,4 +1359,4 @@ class TestPeer(TestCaseForTestnet): def run(coro): - return asyncio.run_coroutine_threadsafe(coro, loop=asyncio.get_event_loop()).result() + return asyncio.run_coroutine_threadsafe(coro, loop=util.get_asyncio_loop()).result() diff --git a/electrum/tests/test_lnrouter.py b/electrum/tests/test_lnrouter.py index 09b045f6e..1f466bd38 100644 --- a/electrum/tests/test_lnrouter.py +++ b/electrum/tests/test_lnrouter.py @@ -4,7 +4,8 @@ import tempfile import shutil import asyncio -from electrum.util import bh2u, bfh, create_and_start_event_loop +from electrum import util +from electrum.util import bh2u, bfh from electrum.lnutil import ShortChannelID from electrum.lnonion import (OnionHopsDataSingle, new_onion_packet, process_onion_packet, _decode_onion_error, decode_onion_error, @@ -32,7 +33,6 @@ class Test_LNRouter(TestCaseForTestnet): def setUp(self): super().setUp() - self.asyncio_loop, self._stop_loop, self._loop_thread = create_and_start_event_loop() self.config = SimpleConfig({'electrum_path': self.electrum_path}) def tearDown(self): @@ -40,8 +40,6 @@ class Test_LNRouter(TestCaseForTestnet): if self.cdb: self.cdb.stop() asyncio.run_coroutine_threadsafe(self.cdb.stopped_event.wait(), self.asyncio_loop).result() - self.asyncio_loop.call_soon_threadsafe(self._stop_loop.set_result, 1) - self._loop_thread.join(timeout=1) super().tearDown() def prepare_graph(self): @@ -64,7 +62,7 @@ class Test_LNRouter(TestCaseForTestnet): """ class fake_network: config = self.config - asyncio_loop = asyncio.get_event_loop() + asyncio_loop = util.get_asyncio_loop() trigger_callback = lambda *args: None register_callback = lambda *args: None interface = None diff --git a/electrum/tests/test_lntransport.py b/electrum/tests/test_lntransport.py index 4d355bb83..c24b38e32 100644 --- a/electrum/tests/test_lntransport.py +++ b/electrum/tests/test_lntransport.py @@ -1,5 +1,6 @@ import asyncio +from electrum import util from electrum.ecc import ECPrivkey from electrum.lnutil import LNPeerAddr from electrum.lntransport import LNResponderTransport, LNTransport @@ -38,11 +39,11 @@ class TestLNTransport(ElectrumTestCase): assert num_bytes == 66 return bytes.fromhex('00b9e3a702e93e3a9948c2ed6e5fd7590a6e1c3a0344cfc9d5b57357049aa22355361aa02e55a8fc28fef5bd6d71ad0c38228dc68b1c466263b47fdf31e560e139ba') transport = LNResponderTransport(ls_priv, Reader(), Writer()) - asyncio.get_event_loop().run_until_complete(transport.handshake(epriv=e_priv)) + asyncio.run_coroutine_threadsafe( + transport.handshake(epriv=e_priv), self.asyncio_loop).result() @needs_test_with_all_chacha20_implementations def test_loop(self): - loop = asyncio.get_event_loop() responder_shaked = asyncio.Event() server_shaked = asyncio.Event() responder_key = ECPrivkey.generate_random_key() @@ -96,4 +97,4 @@ class TestLNTransport(ElectrumTestCase): server.close() await server.wait_closed() - loop.run_until_complete(f()) + asyncio.run_coroutine_threadsafe(f(), self.asyncio_loop).result() diff --git a/electrum/tests/test_network.py b/electrum/tests/test_network.py index b433a5e26..66d905b76 100644 --- a/electrum/tests/test_network.py +++ b/electrum/tests/test_network.py @@ -8,6 +8,7 @@ from electrum import blockchain from electrum.interface import Interface, ServerAddr from electrum.crypto import sha256 from electrum.util import bh2u +from electrum import util from . import ElectrumTestCase @@ -17,7 +18,9 @@ class MockTaskGroup: class MockNetwork: taskgroup = MockTaskGroup() - asyncio_loop = asyncio.get_event_loop() + + def __init__(self): + self.asyncio_loop = util.get_asyncio_loop() class MockInterface(Interface): def __init__(self, config): @@ -66,7 +69,8 @@ class TestNetwork(ElectrumTestCase): self.interface.q.put_nowait({'block_height': 5, 'mock': {'binary':1,'check':lambda x: True, 'connect': lambda x: True}}) self.interface.q.put_nowait({'block_height': 6, 'mock': {'binary':1,'check':lambda x: True, 'connect': lambda x: True}}) ifa = self.interface - self.assertEqual(('fork', 8), asyncio.get_event_loop().run_until_complete(ifa.sync_until(8, next_height=7))) + fut = asyncio.run_coroutine_threadsafe(ifa.sync_until(8, next_height=7), util.get_asyncio_loop()) + self.assertEqual(('fork', 8), fut.result()) self.assertEqual(self.interface.q.qsize(), 0) def test_fork_conflict(self): @@ -80,7 +84,8 @@ class TestNetwork(ElectrumTestCase): self.interface.q.put_nowait({'block_height': 5, 'mock': {'binary':1,'check':lambda x: True, 'connect': lambda x: True}}) self.interface.q.put_nowait({'block_height': 6, 'mock': {'binary':1,'check':lambda x: True, 'connect': lambda x: True}}) ifa = self.interface - self.assertEqual(('fork', 8), asyncio.get_event_loop().run_until_complete(ifa.sync_until(8, next_height=7))) + fut = asyncio.run_coroutine_threadsafe(ifa.sync_until(8, next_height=7), util.get_asyncio_loop()) + self.assertEqual(('fork', 8), fut.result()) self.assertEqual(self.interface.q.qsize(), 0) def test_can_connect_during_backward(self): @@ -93,7 +98,8 @@ class TestNetwork(ElectrumTestCase): self.interface.q.put_nowait({'block_height': 3, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: True}}) self.interface.q.put_nowait({'block_height': 4, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: True}}) ifa = self.interface - self.assertEqual(('catchup', 5), asyncio.get_event_loop().run_until_complete(ifa.sync_until(8, next_height=4))) + fut = asyncio.run_coroutine_threadsafe(ifa.sync_until(8, next_height=4), util.get_asyncio_loop()) + self.assertEqual(('catchup', 5), fut.result()) self.assertEqual(self.interface.q.qsize(), 0) def mock_fork(self, bad_header): @@ -113,7 +119,8 @@ class TestNetwork(ElectrumTestCase): self.interface.q.put_nowait({'block_height': 5, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: True}}) self.interface.q.put_nowait({'block_height': 6, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: True}}) ifa = self.interface - self.assertEqual(('catchup', 7), asyncio.get_event_loop().run_until_complete(ifa.sync_until(8, next_height=6))) + fut = asyncio.run_coroutine_threadsafe(ifa.sync_until(8, next_height=6), util.get_asyncio_loop()) + self.assertEqual(('catchup', 7), fut.result()) self.assertEqual(self.interface.q.qsize(), 0) diff --git a/electrum/tests/test_storage_upgrade.py b/electrum/tests/test_storage_upgrade.py index 47f64869e..e5c582db0 100644 --- a/electrum/tests/test_storage_upgrade.py +++ b/electrum/tests/test_storage_upgrade.py @@ -18,15 +18,6 @@ from .test_wallet import WalletTestCase # TODO hw wallet with client version 2.6.x (single-, and multiacc) class TestStorageUpgrade(WalletTestCase): - def setUp(self): - super().setUp() - self.asyncio_loop, self._stop_loop, self._loop_thread = util.create_and_start_event_loop() - - def tearDown(self): - super().tearDown() - self.asyncio_loop.call_soon_threadsafe(self._stop_loop.set_result, 1) - self._loop_thread.join(timeout=1) - def testnet_wallet(func): # note: it's ok to modify global network constants in subclasses of SequentialTestCase def wrapper(self, *args, **kwargs): diff --git a/electrum/tests/test_wallet.py b/electrum/tests/test_wallet.py index 74c3bca57..fa49c7c4c 100644 --- a/electrum/tests/test_wallet.py +++ b/electrum/tests/test_wallet.py @@ -241,12 +241,9 @@ class TestWalletPassword(WalletTestCase): def setUp(self): super().setUp() - self.asyncio_loop, self._stop_loop, self._loop_thread = util.create_and_start_event_loop() def tearDown(self): super().tearDown() - self.asyncio_loop.call_soon_threadsafe(self._stop_loop.set_result, 1) - self._loop_thread.join(timeout=1) def test_update_password_of_imported_wallet(self): wallet_str = '{"addr_history":{"1364Js2VG66BwRdkaoxAaFtdPb1eQgn8Dr":[],"15CyDgLffJsJgQrhcyooFH4gnVDG82pUrA":[],"1Exet2BhHsFxKTwhnfdsBMkPYLGvobxuW6":[]},"addresses":{"change":[],"receiving":["1364Js2VG66BwRdkaoxAaFtdPb1eQgn8Dr","1Exet2BhHsFxKTwhnfdsBMkPYLGvobxuW6","15CyDgLffJsJgQrhcyooFH4gnVDG82pUrA"]},"keystore":{"keypairs":{"0344b1588589958b0bcab03435061539e9bcf54677c104904044e4f8901f4ebdf5":"L2sED74axVXC4H8szBJ4rQJrkfem7UMc6usLCPUoEWxDCFGUaGUM","0389508c13999d08ffae0f434a085f4185922d64765c0bff2f66e36ad7f745cc5f":"L3Gi6EQLvYw8gEEUckmqawkevfj9s8hxoQDFveQJGZHTfyWnbk1U","04575f52b82f159fa649d2a4c353eb7435f30206f0a6cb9674fbd659f45082c37d559ffd19bea9c0d3b7dcc07a7b79f4cffb76026d5d4dff35341efe99056e22d2":"5JyVyXU1LiRXATvRTQvR9Kp8Rx1X84j2x49iGkjSsXipydtByUq"},"type":"imported"},"pruned_txo":{},"seed_version":13,"stored_height":-1,"transactions":{},"tx_fees":{},"txi":{},"txo":{},"use_encryption":false,"verified_tx3":{},"wallet_type":"standard","winpos-qt":[100,100,840,405]}' diff --git a/electrum/tests/test_wallet_vertical.py b/electrum/tests/test_wallet_vertical.py index 7037e77e0..130b72e9a 100644 --- a/electrum/tests/test_wallet_vertical.py +++ b/electrum/tests/test_wallet_vertical.py @@ -9,15 +9,17 @@ import copy from electrum import storage, bitcoin, keystore, bip32, slip39, wallet from electrum import Transaction from electrum import SimpleConfig +from electrum import util from electrum.address_synchronizer import TX_HEIGHT_UNCONFIRMED, TX_HEIGHT_UNCONF_PARENT from electrum.wallet import (sweep, Multisig_Wallet, Standard_Wallet, Imported_Wallet, restore_wallet_from_text, Abstract_Wallet, BumpFeeStrategy) from electrum.util import ( - bfh, bh2u, create_and_start_event_loop, NotEnoughFunds, UnrelatedTransactionException, + bfh, bh2u, NotEnoughFunds, UnrelatedTransactionException, UserFacingException) from electrum.transaction import (TxOutput, Transaction, PartialTransaction, PartialTxOutput, PartialTxInput, tx_from_any, TxOutpoint) from electrum.mnemonic import seed_type +from electrum.network import Network from electrum.plugins.trustedcoin import trustedcoin @@ -1369,14 +1371,7 @@ class TestWalletSending(TestCaseForTestnet): raise Exception("unexpected txid") def has_internet_connection(self): return True - def run_from_another_thread(self, coro, *, timeout=None): - loop, stop_loop, loop_thread = create_and_start_event_loop() - fut = asyncio.run_coroutine_threadsafe(coro, loop) - try: - return fut.result(timeout) - finally: - loop.call_soon_threadsafe(stop_loop.set_result, 1) - loop_thread.join(timeout=1) + run_from_another_thread = Network.run_from_another_thread def get_local_height(self): return 0 def blockchain(self): @@ -1429,14 +1424,7 @@ class TestWalletSending(TestCaseForTestnet): raise Exception("unexpected txid") def has_internet_connection(self): return True - def run_from_another_thread(self, coro, *, timeout=None): - loop, stop_loop, loop_thread = create_and_start_event_loop() - fut = asyncio.run_coroutine_threadsafe(coro, loop) - try: - return fut.result(timeout) - finally: - loop.call_soon_threadsafe(stop_loop.set_result, 1) - loop_thread.join(timeout=1) + run_from_another_thread = Network.run_from_another_thread def get_local_height(self): return 0 def blockchain(self): @@ -1844,8 +1832,8 @@ class TestWalletSending(TestCaseForTestnet): network = NetworkMock() dest_addr = 'tb1q3ws2p0qjk5vrravv065xqlnkckvzcpclk79eu2' sweep_coro = sweep(privkeys, network=network, config=self.config, to_address=dest_addr, fee=5000, locktime=1325785, tx_version=1) - loop = asyncio.get_event_loop() - tx = loop.run_until_complete(sweep_coro) + loop = util.get_asyncio_loop() + tx = asyncio.run_coroutine_threadsafe(sweep_coro, loop).result() tx_copy = tx_from_any(tx.serialize()) self.assertEqual('010000000129349e5641d79915e9d0282fdbaee8c3df0b6731bab9d70bf626e8588bde24ac010000004847304402206bf0d0a93abae0d5873a62ebf277a5dd2f33837821e8b93e74d04e19d71b578002201a6d729bc159941ef5c4c9e5fe13ece9fc544351ba531b00f68ba549c8b38a9a01fdffffff01b82e0f00000000001600148ba0a0bc12b51831f58c7ea8607e76c5982c071fd93a1400', @@ -2199,14 +2187,7 @@ class TestWalletSending(TestCaseForTestnet): raise Exception("unexpected txid") def has_internet_connection(self): return True - def run_from_another_thread(self, coro, *, timeout=None): - loop, stop_loop, loop_thread = create_and_start_event_loop() - fut = asyncio.run_coroutine_threadsafe(coro, loop) - try: - return fut.result(timeout) - finally: - loop.call_soon_threadsafe(stop_loop.set_result, 1) - loop_thread.join(timeout=1) + run_from_another_thread = Network.run_from_another_thread def get_local_height(self): return 0 def blockchain(self): diff --git a/electrum/util.py b/electrum/util.py index fb0657cb8..eaf584792 100644 --- a/electrum/util.py +++ b/electrum/util.py @@ -1031,7 +1031,7 @@ def parse_URI(uri: str, on_pr: Callable = None, *, loop=None) -> dict: request = await pr.get_payment_request(r) if on_pr: on_pr(request) - loop = loop or asyncio.get_event_loop() + loop = loop or get_asyncio_loop() asyncio.run_coroutine_threadsafe(get_payment_request(), loop) return out @@ -1319,7 +1319,6 @@ class NetworkJobOnDefaultServer(Logger, ABC): """ def __init__(self, network: 'Network'): Logger.__init__(self) - asyncio.set_event_loop(network.asyncio_loop) self.network = network self.interface = None # type: Interface self._restart_lock = asyncio.Lock() @@ -1384,9 +1383,41 @@ class NetworkJobOnDefaultServer(Logger, ABC): return s +_asyncio_event_loop = None # type: Optional[asyncio.AbstractEventLoop] +def get_asyncio_loop() -> asyncio.AbstractEventLoop: + """Returns the global asyncio event loop we use.""" + if _asyncio_event_loop is None: + raise Exception("event loop not created yet") + return _asyncio_event_loop + + def create_and_start_event_loop() -> Tuple[asyncio.AbstractEventLoop, asyncio.Future, threading.Thread]: + global _asyncio_event_loop + if _asyncio_event_loop is not None: + raise Exception("there is already a running event loop") + + # asyncio.get_event_loop() became deprecated in python3.10. (see https://github.com/python/cpython/issues/83710) + # We set a custom event loop policy purely to be compatible with code that + # relies on asyncio.get_event_loop(). + # - in python 3.8-3.9, asyncio.Event.__init__, asyncio.Lock.__init__, + # and similar, calls get_event_loop. see https://github.com/python/cpython/pull/23420 + class MyEventLoopPolicy(asyncio.DefaultEventLoopPolicy): + def get_event_loop(self): + # In case electrum is being used as a library, there might be other + # event loops in use besides ours. To minimise interfering with those, + # if there is a loop running in the current thread, return that: + running_loop = get_running_loop() + if running_loop is not None: + return running_loop + # Otherwise, return our global loop: + return get_asyncio_loop() + asyncio.set_event_loop_policy(MyEventLoopPolicy()) + + loop = asyncio.new_event_loop() + _asyncio_event_loop = loop + def on_exception(loop, context): """Suppress spurious messages it appears we cannot control.""" SUPPRESS_MESSAGE_REGEX = re.compile('SSL handshake|Fatal read error on|' @@ -1396,13 +1427,21 @@ def create_and_start_event_loop() -> Tuple[asyncio.AbstractEventLoop, return loop.default_exception_handler(context) - loop = asyncio.get_event_loop() + def run_event_loop(): + try: + loop.run_until_complete(stopping_fut) + finally: + # clean-up + global _asyncio_event_loop + _asyncio_event_loop = None + loop.set_exception_handler(on_exception) # loop.set_debug(1) stopping_fut = loop.create_future() - loop_thread = threading.Thread(target=loop.run_until_complete, - args=(stopping_fut,), - name='EventLoop') + loop_thread = threading.Thread( + target=run_event_loop, + name='EventLoop', + ) loop_thread.start() return loop, stopping_fut, loop_thread @@ -1558,7 +1597,7 @@ class CallbackManager: on the event loop. """ if self.asyncio_loop is None: - self.asyncio_loop = asyncio.get_event_loop() + self.asyncio_loop = get_asyncio_loop() assert self.asyncio_loop.is_running(), "event loop not running" with self.callback_lock: callbacks = self.callbacks[event][:] @@ -1643,7 +1682,7 @@ class NetworkRetryManager(Generic[_NetAddrType]): class MySocksProxy(aiorpcx.SOCKSProxy): async def open_connection(self, host=None, port=None, **kwargs): - loop = asyncio.get_event_loop() + loop = asyncio.get_running_loop() reader = asyncio.StreamReader(loop=loop) protocol = asyncio.StreamReaderProtocol(reader, loop=loop) transport, _ = await self.create_connection( @@ -1753,6 +1792,7 @@ class nullcontext: def get_running_loop() -> Optional[asyncio.AbstractEventLoop]: + """Returns the asyncio event loop that is *running in this thread*, if any.""" try: return asyncio.get_running_loop() except RuntimeError: