From 87f9c513b97238a1ea1350cab2ddecd9995396d5 Mon Sep 17 00:00:00 2001 From: zebra-lucky Date: Wed, 3 Dec 2025 03:54:26 +0200 Subject: [PATCH] fix yieldgenerator.py, wallet_rpc.py, test_wallet_rpc.py --- src/jmclient/wallet_rpc.py | 4 ++- src/jmclient/yieldgenerator.py | 32 +++++++++++++------- test/jmclient/test_wallet_rpc.py | 51 +++++++++++++++++++++++++------- 3 files changed, 64 insertions(+), 23 deletions(-) diff --git a/src/jmclient/wallet_rpc.py b/src/jmclient/wallet_rpc.py index e002258..f4ab940 100644 --- a/src/jmclient/wallet_rpc.py +++ b/src/jmclient/wallet_rpc.py @@ -937,7 +937,9 @@ class JMWalletDaemon(Service): self.services["maker"].addSetup(setup_set_coinjoin_state) # Service startup now checks and updates coinjoin state, # assuming setup is successful: - self.services["maker"].startService() + exc = await self.services["maker"].startService() + if exc: + raise exc return make_jmwalletd_response(request, status=202) diff --git a/src/jmclient/yieldgenerator.py b/src/jmclient/yieldgenerator.py index 8f7d772..211cc85 100644 --- a/src/jmclient/yieldgenerator.py +++ b/src/jmclient/yieldgenerator.py @@ -8,7 +8,7 @@ import abc import base64 from twisted.python.log import startLogging from twisted.application.service import Service -from twisted.internet import task +from twisted.internet import task, defer from optparse import OptionParser from jmbase import get_log from jmclient import (Maker, jm_single, load_program_config, @@ -289,6 +289,7 @@ class YieldGeneratorBasic(YieldGenerator): class YieldGeneratorService(Service): + def __init__(self, wallet_service, daemon_host, daemon_port, yg_config): self.wallet_service = wallet_service self.daemon_host = daemon_host @@ -300,6 +301,7 @@ class YieldGeneratorService(Service): self.setup_fns = [] self.cleanup_fns = [] + @defer.inlineCallbacks def startService(self): """ We instantiate the Maker class only here as its constructor will automatically @@ -309,23 +311,23 @@ class YieldGeneratorService(Service): no need to check this here. """ for setup in self.setup_fns: - # we do not catch Exceptions in setup, - # deliberately; this must be caught and distinguished + # exceptions returned from startService + # this must be caught and distinguished # by whoever started the service. - setup_res = setup() - if asyncio.iscoroutine(setup_res): - raise Exception('YieldGeneratorService can not have ' - 'asyncio setup functions') + try: + setup_res = setup() + if asyncio.iscoroutine(setup_res): + yield defer.Deferred.fromCoroutine(setup_res) + except Exception as e: + return e # TODO genericise to any YG class: self.yieldgen = YieldGeneratorBasic(self.wallet_service, self.yg_config) self.clientfactory = JMClientProtocolFactory(self.yieldgen, proto_type="MAKER") - wallet = self.wallet_service.wallet # here 'start_reactor' does not start the reactor but instantiates # the connection to the daemon backend; note daemon=False, i.e. the daemon # backend is assumed to be started elsewhere; we just connect to it with a client. - start_reactor(self.daemon_host, self.daemon_port, self.clientfactory, - rs=False, gui=True) + start_reactor(self.daemon_host, self.daemon_port, self.clientfactory, rs=False) # monitor the Maker object, just to check if it's still in an "up" state, marked # by the aborted instance var: self.monitor_loop = task.LoopingCall(self.monitor) @@ -353,6 +355,7 @@ class YieldGeneratorService(Service): """ self.cleanup_fns.append(cleanup) + @defer.inlineCallbacks def stopService(self): """ TODO need a method exposed to gracefully shut down a maker bot. @@ -362,7 +365,14 @@ class YieldGeneratorService(Service): self.clientfactory.proto_client.request_mc_shutdown() super().stopService() for cleanup in self.cleanup_fns: - cleanup() + try: + cleanup_res = cleanup() + if asyncio.iscoroutine(cleanup_res): + yield defer.Deferred.fromCoroutine(cleanup_res) + except Exception as e: + str_e = str(e) + err_msg = f'error {str_e}' if str_e else 'error' + jlog.warn(f'stopService cleanup_fn {cleanup} {err_msg}') def isRunning(self): return self.running == 1 diff --git a/test/jmclient/test_wallet_rpc.py b/test/jmclient/test_wallet_rpc.py index 7677878..df6c8ef 100644 --- a/test/jmclient/test_wallet_rpc.py +++ b/test/jmclient/test_wallet_rpc.py @@ -28,7 +28,7 @@ from jmclient import ( storage, ) from jmclient.wallet_rpc import api_version_string, CJ_MAKER_RUNNING, CJ_NOT_RUNNING -from commontest import make_wallets, AsyncioTestCase +from commontest import make_wallets, TrialTestCase from test_coinjoin import make_wallets_to_list, sync_wallets from test_websocket import ClientTProtocol, test_tx_hex_1, test_tx_hex_txid @@ -41,6 +41,7 @@ testfilename = "testwrpc" jlog = get_log() + class JMWalletDaemonT(JMWalletDaemon): def check_cookie(self, request, *args, **kwargs): if self.auth_disabled: @@ -48,7 +49,7 @@ class JMWalletDaemonT(JMWalletDaemon): return super().check_cookie(request, *args, **kwargs) -class WalletRPCTestBase(AsyncioTestCase): +class WalletRPCTestBase(TrialTestCase): """ Base class for set up of tests of the Wallet RPC calls using the wallet_rpc.JMWalletDaemon service. """ @@ -65,7 +66,8 @@ class WalletRPCTestBase(AsyncioTestCase): # wallet type wallet_cls = SegwitWallet - async def asyncSetUp(self): + @defer.inlineCallbacks + def setUp(self): load_test_config() self.clean_out_wallet_files() jm_single().bc_interface.tick_forward_chain_interval = 5 @@ -97,12 +99,14 @@ class WalletRPCTestBase(AsyncioTestCase): self.listener_rpc = r self.listener_ws = s wallet_structures = [self.wallet_structure] * 2 - wallets = await make_wallets( + coro = make_wallets( 1, wallet_structures=[wallet_structures[0]], mean_amt=self.mean_amt, wallet_cls=self.wallet_cls) + wallets = yield defer.Deferred.fromCoroutine(coro) self.daemon.services["wallet"] = make_wallets_to_list(wallets)[0] jm_single().bc_interface.tickchain() - await sync_wallets([self.daemon.services["wallet"]]) + coro = sync_wallets([self.daemon.services["wallet"]]) + yield defer.Deferred.fromCoroutine(coro) # dummy tx example to force a notification event: self.test_tx = CTransaction.deserialize(hextobin(test_tx_hex_1)) # auth token is not set at the start @@ -172,7 +176,6 @@ class WalletRPCTestBase(AsyncioTestCase): def tearDown(self): self.clean_out_wallet_files() - reactor.disconnectAll() for dc in reactor.getDelayedCalls(): if not dc.cancelled: dc.cancel() @@ -183,11 +186,13 @@ class WalletRPCTestBase(AsyncioTestCase): # only fire if everything is finished: return defer.gatherResults([d1, d2]) + class WalletRPCTestBaseFB(WalletRPCTestBase): wallet_cls = SegwitWalletFidelityBonds # we are using fresh (empty) wallets for these tests wallet_structure = [0, 0, 0, 0, 0] + class ClientNotifTestProto(ClientTProtocol): def sendAuth(self): @@ -195,7 +200,9 @@ class ClientNotifTestProto(ClientTProtocol): self.factory.callbackfn) super().sendAuth() + class ClientNotifTestFactory(WebSocketClientFactory): + def __init__(self, *args, **kwargs): if "delay" in kwargs: self.delay = kwargs.pop("delay", None) @@ -203,10 +210,15 @@ class ClientNotifTestFactory(WebSocketClientFactory): self.callbackfn = kwargs.pop("callbackfn", None) super().__init__(*args, **kwargs) + class TrialTestWRPC_WS(WalletRPCTestBase): """ class for testing websocket subscriptions/events etc. """ + def tearDown(self): + reactor.disconnectAll() + super().tearDown() + @defer.inlineCallbacks def test_notif(self): # simulate the daemon already having created # an active session (which it usually does when @@ -222,8 +234,11 @@ class TrialTestWRPC_WS(WalletRPCTestBase): self.client_factory.protocol = ClientNotifTestProto self.client_factory.protocol.ACCESS_TOKEN = self.daemon.token.issue()["token"].encode("utf8") self.client_connector = connectWS(self.client_factory) + self.client_factory.on_message_deferred = defer.Deferred() self.attempt_receipt_counter = 0 - return task.deferLater(reactor, 0.0, self.wait_to_receive) + # wait on client to receive message + yield self.client_factory.on_message_deferred + yield task.deferLater(reactor, 0.0, self.wait_to_receive) def wait_to_receive(self): d = task.deferLater(reactor, 0.1, self.checkNotifs) @@ -242,10 +257,16 @@ class TrialTestWRPC_WS(WalletRPCTestBase): return d def fire_tx_notif(self): - self.daemon.wss_factory.sendTxNotification(self.test_tx, - test_tx_hex_txid) + self.daemon.wss_factory.sendTxNotification( + self.test_tx, test_tx_hex_txid) + class TrialTestWRPC_FB(WalletRPCTestBaseFB): + + def tearDown(self): + reactor.disconnectAll() + super().tearDown() + @defer.inlineCallbacks def test_gettimelockaddress(self): self.daemon.auth_disabled = True @@ -299,8 +320,13 @@ class TrialTestWRPC_FB(WalletRPCTestBaseFB): # be MAKER_RUNNING since no non-TL-type coin existed: assert self.daemon.coinjoin_state == CJ_NOT_RUNNING + class TrialTestWRPC_DisplayWallet(WalletRPCTestBase): + def tearDown(self): + reactor.disconnectAll() + super().tearDown() + @defer.inlineCallbacks def do_session_request(self, agent, addr, handler=None, token=None): """ A `None` value for handler is reserved for the case @@ -515,12 +541,14 @@ class TrialTestWRPC_DisplayWallet(WalletRPCTestBase): yield self.do_request(agent, b"POST", addr, body, self.process_direct_send_response) # before querying the wallet display, set a label to check: - labeladdr = self.daemon.services["wallet"].get_addr(0,0,0) + coro = self.daemon.services["wallet"].get_addr(0,0,0) + labeladdr = yield defer.Deferred.fromCoroutine(coro) self.daemon.services["wallet"].set_address_label(labeladdr, "test-wallet-rpc-label") # force the wallet service txmonitor to wake up, to see the new # tx before querying /display: - self.daemon.services["wallet"].transaction_monitor() + coro = self.daemon.services["wallet"].transaction_monitor() + yield defer.Deferred.fromCoroutine(coro) addr = self.get_route_root() addr += "/wallet/" addr += self.daemon.wallet_name @@ -755,6 +783,7 @@ class TrialTestWRPC_DisplayWallet(WalletRPCTestBase): class TrialTestWRPC_JWT(WalletRPCTestBase): + @defer.inlineCallbacks def do_request(self, agent, method, addr, body, handler, token): headers = Headers({"Authorization": ["Bearer " + token]})