Browse Source

lnworker: clear paysessions dict

master
SomberNight 2 years ago
parent
commit
13864f7abe
No known key found for this signature in database
GPG Key ID: B33B5F232C6271E9
  1. 134
      electrum/lnworker.py

134
electrum/lnworker.py

@ -693,6 +693,7 @@ class PaySession(Logger):
self._amount_inflight = 0 # what we sent in htlcs (that receiver gets, without fees)
self._nhtlcs_inflight = 0
self.is_active = True
def diagnostic_name(self):
pkey = sha256(self.payment_key)
@ -779,6 +780,14 @@ class PaySession(Logger):
def get_outstanding_amount_to_send(self) -> int:
return self.amount_to_pay - self._amount_inflight
def can_be_deleted(self) -> bool:
if self.is_active:
return False
# note: no one is consuming from sent_htlcs_q anymore
nhtlcs_resolved = self.sent_htlcs_q.qsize()
assert nhtlcs_resolved <= self._nhtlcs_inflight
return nhtlcs_resolved == self._nhtlcs_inflight
class LNWallet(LNWorker):
@ -1412,7 +1421,7 @@ class LNWallet(LNWorker):
raise OnionRoutingFailure(code=OnionFailureCode.TRAMPOLINE_EXPIRY_TOO_SOON, data=b'')
payment_key = payment_hash + payment_secret
#assert payment_key not in self._paysessions # FIXME
assert payment_key not in self._paysessions
self._paysessions[payment_key] = paysession = PaySession(
payment_hash=payment_hash,
payment_secret=payment_secret,
@ -1429,65 +1438,70 @@ class LNWallet(LNWorker):
# when encountering trampoline forwarding difficulties in the legacy case, we
# sometimes need to fall back to a single trampoline forwarder, at the expense
# of privacy
while True:
if (amount_to_send := paysession.get_outstanding_amount_to_send()) > 0:
# 1. create a set of routes for remaining amount.
# note: path-finding runs in a separate thread so that we don't block the asyncio loop
# graph updates might occur during the computation
routes = self.create_routes_for_payment(
paysession=paysession,
amount_msat=amount_to_send,
full_path=full_path,
fwd_trampoline_onion=fwd_trampoline_onion,
channels=channels,
)
# 2. send htlcs
async for sent_htlc_info, cltv_delta, trampoline_onion in routes:
await self.pay_to_route(
try:
while True:
if (amount_to_send := paysession.get_outstanding_amount_to_send()) > 0:
# 1. create a set of routes for remaining amount.
# note: path-finding runs in a separate thread so that we don't block the asyncio loop
# graph updates might occur during the computation
routes = self.create_routes_for_payment(
paysession=paysession,
sent_htlc_info=sent_htlc_info,
min_cltv_expiry=cltv_delta,
trampoline_onion=trampoline_onion,
amount_msat=amount_to_send,
full_path=full_path,
fwd_trampoline_onion=fwd_trampoline_onion,
channels=channels,
)
# invoice_status is triggered in self.set_invoice_status when it actually changes.
# It is also triggered here to update progress for a lightning payment in the GUI
# (e.g. attempt counter)
util.trigger_callback('invoice_status', self.wallet, payment_hash.hex(), PR_INFLIGHT)
# 3. await a queue
htlc_log = await paysession.wait_for_one_htlc_to_resolve() # TODO maybe wait a bit, more failures might come
log.append(htlc_log)
if htlc_log.success:
if self.network.path_finder:
# TODO: report every route to liquidity hints for mpp
# in the case of success, we report channels of the
# route as being able to send the same amount in the future,
# as we assume to not know the capacity
self.network.path_finder.update_liquidity_hints(htlc_log.route, htlc_log.amount_msat)
# remove inflight htlcs from liquidity hints
self.network.path_finder.update_inflight_htlcs(htlc_log.route, add_htlcs=False)
return
# htlc failed
if (attempts is not None and len(log) >= attempts) or (attempts is None and time.time() - paysession.start_time > self.PAYMENT_TIMEOUT):
raise PaymentFailure('Giving up after %d attempts'%len(log))
# if we get a tmp channel failure, it might work to split the amount and try more routes
# if we get a channel update, we might retry the same route and amount
route = htlc_log.route
sender_idx = htlc_log.sender_idx
erring_node_id = route[sender_idx].node_id
failure_msg = htlc_log.failure_msg
code, data = failure_msg.code, failure_msg.data
self.logger.info(f"UPDATE_FAIL_HTLC. code={repr(code)}. "
f"decoded_data={failure_msg.decode_data()}. data={data.hex()!r}")
self.logger.info(f"error reported by {erring_node_id.hex()}")
if code == OnionFailureCode.MPP_TIMEOUT:
raise PaymentFailure(failure_msg.code_name())
# trampoline
if self.uses_trampoline():
paysession.handle_failed_trampoline_htlc(
htlc_log=htlc_log, failure_msg=failure_msg)
else:
self.handle_error_code_from_failed_htlc(
route=route, sender_idx=sender_idx, failure_msg=failure_msg, amount=htlc_log.amount_msat)
# 2. send htlcs
async for sent_htlc_info, cltv_delta, trampoline_onion in routes:
await self.pay_to_route(
paysession=paysession,
sent_htlc_info=sent_htlc_info,
min_cltv_expiry=cltv_delta,
trampoline_onion=trampoline_onion,
)
# invoice_status is triggered in self.set_invoice_status when it actually changes.
# It is also triggered here to update progress for a lightning payment in the GUI
# (e.g. attempt counter)
util.trigger_callback('invoice_status', self.wallet, payment_hash.hex(), PR_INFLIGHT)
# 3. await a queue
htlc_log = await paysession.wait_for_one_htlc_to_resolve() # TODO maybe wait a bit, more failures might come
log.append(htlc_log)
if htlc_log.success:
if self.network.path_finder:
# TODO: report every route to liquidity hints for mpp
# in the case of success, we report channels of the
# route as being able to send the same amount in the future,
# as we assume to not know the capacity
self.network.path_finder.update_liquidity_hints(htlc_log.route, htlc_log.amount_msat)
# remove inflight htlcs from liquidity hints
self.network.path_finder.update_inflight_htlcs(htlc_log.route, add_htlcs=False)
return
# htlc failed
if (attempts is not None and len(log) >= attempts) or (attempts is None and time.time() - paysession.start_time > self.PAYMENT_TIMEOUT):
raise PaymentFailure('Giving up after %d attempts'%len(log))
# if we get a tmp channel failure, it might work to split the amount and try more routes
# if we get a channel update, we might retry the same route and amount
route = htlc_log.route
sender_idx = htlc_log.sender_idx
erring_node_id = route[sender_idx].node_id
failure_msg = htlc_log.failure_msg
code, data = failure_msg.code, failure_msg.data
self.logger.info(f"UPDATE_FAIL_HTLC. code={repr(code)}. "
f"decoded_data={failure_msg.decode_data()}. data={data.hex()!r}")
self.logger.info(f"error reported by {erring_node_id.hex()}")
if code == OnionFailureCode.MPP_TIMEOUT:
raise PaymentFailure(failure_msg.code_name())
# trampoline
if self.uses_trampoline():
paysession.handle_failed_trampoline_htlc(
htlc_log=htlc_log, failure_msg=failure_msg)
else:
self.handle_error_code_from_failed_htlc(
route=route, sender_idx=sender_idx, failure_msg=failure_msg, amount=htlc_log.amount_msat)
finally:
paysession.is_active = False
if paysession.can_be_deleted():
self._paysessions.pop(payment_key)
async def pay_to_route(
self, *,
@ -2225,6 +2239,8 @@ class LNWallet(LNWorker):
amount_msat=shi.amount_receiver_msat,
trampoline_fee_level=shi.trampoline_fee_level)
q.put_nowait(htlc_log)
if paysession.can_be_deleted():
self._paysessions.pop(payment_key)
else:
key = payment_hash.hex()
self.set_invoice_status(key, PR_PAID)
@ -2278,6 +2294,8 @@ class LNWallet(LNWorker):
sender_idx=sender_idx,
trampoline_fee_level=shi.trampoline_fee_level)
q.put_nowait(htlc_log)
if paysession.can_be_deleted():
self._paysessions.pop(payment_okey)
else:
self.logger.info(f"received unknown htlc_failed, probably from previous session")
key = payment_hash.hex()

Loading…
Cancel
Save