Browse Source

test_lnpeer: factorize code into TestPeer._activate_trampoline

master
ThomasV 2 years ago
parent
commit
6c231e1d07
  1. 20
      electrum/tests/test_lnpeer.py

20
electrum/tests/test_lnpeer.py

@ -721,15 +721,16 @@ class TestPeer(ElectrumTestCase):
with self.assertRaises(SuccessfulTest):
await f()
async def _activate_trampoline(self, w):
if w.network.channel_db:
w.network.channel_db.stop()
await w.network.channel_db.stopped_event.wait()
w.network.channel_db = None
async def _test_simple_payment(self, trampoline: bool, test_hold_invoice=False, test_timeout=False):
"""Alice pays Bob a single HTLC via direct channel."""
alice_channel, bob_channel = create_test_channels()
p1, p2, w1, w2, _q1, _q2 = self.prepare_peers(alice_channel, bob_channel)
async def turn_on_trampoline_alice():
if w1.network.channel_db:
w1.network.channel_db.stop()
await w1.network.channel_db.stopped_event.wait()
w1.network.channel_db = None
async def pay(lnaddr, pay_req):
self.assertEqual(PR_UNPAID, w2.get_payment_status(lnaddr.paymenthash))
result, log = await w1.pay_invoice(pay_req)
@ -750,7 +751,7 @@ class TestPeer(ElectrumTestCase):
async def f():
if trampoline:
await turn_on_trampoline_alice()
await self._activate_trampoline(w1)
async with OldTaskGroup() as group:
await group.spawn(p1._message_loop())
await group.spawn(p1.htlc_switch())
@ -1143,11 +1144,6 @@ class TestPeer(ElectrumTestCase):
async def _run_trampoline_payment(self, is_legacy, direct, drop_dave=None):
if drop_dave is None: drop_dave = []
async def turn_on_trampoline_alice():
if graph.workers['alice'].network.channel_db:
graph.workers['alice'].network.channel_db.stop()
await graph.workers['alice'].network.channel_db.stopped_event.wait()
graph.workers['alice'].network.channel_db = None
async def pay(lnaddr, pay_req):
self.assertEqual(PR_UNPAID, graph.workers['dave'].get_payment_status(lnaddr.paymenthash))
@ -1164,7 +1160,7 @@ class TestPeer(ElectrumTestCase):
graph.workers[t].peers.pop(dave_node_id)
async def f():
await turn_on_trampoline_alice()
await self._activate_trampoline(graph.workers['alice'])
async with OldTaskGroup() as group:
for peer in peers:
await group.spawn(peer._message_loop())

Loading…
Cancel
Save