diff --git a/electrum/lnworker.py b/electrum/lnworker.py index b08f8b3df..e411be0b6 100644 --- a/electrum/lnworker.py +++ b/electrum/lnworker.py @@ -693,6 +693,7 @@ class PaySession(Logger): self._amount_inflight = 0 # what we sent in htlcs (that receiver gets, without fees) self._nhtlcs_inflight = 0 + self.is_active = True def diagnostic_name(self): pkey = sha256(self.payment_key) @@ -779,6 +780,14 @@ class PaySession(Logger): def get_outstanding_amount_to_send(self) -> int: return self.amount_to_pay - self._amount_inflight + def can_be_deleted(self) -> bool: + if self.is_active: + return False + # note: no one is consuming from sent_htlcs_q anymore + nhtlcs_resolved = self.sent_htlcs_q.qsize() + assert nhtlcs_resolved <= self._nhtlcs_inflight + return nhtlcs_resolved == self._nhtlcs_inflight + class LNWallet(LNWorker): @@ -1412,7 +1421,7 @@ class LNWallet(LNWorker): raise OnionRoutingFailure(code=OnionFailureCode.TRAMPOLINE_EXPIRY_TOO_SOON, data=b'') payment_key = payment_hash + payment_secret - #assert payment_key not in self._paysessions # FIXME + assert payment_key not in self._paysessions self._paysessions[payment_key] = paysession = PaySession( payment_hash=payment_hash, payment_secret=payment_secret, @@ -1429,65 +1438,70 @@ class LNWallet(LNWorker): # when encountering trampoline forwarding difficulties in the legacy case, we # sometimes need to fall back to a single trampoline forwarder, at the expense # of privacy - while True: - if (amount_to_send := paysession.get_outstanding_amount_to_send()) > 0: - # 1. create a set of routes for remaining amount. - # note: path-finding runs in a separate thread so that we don't block the asyncio loop - # graph updates might occur during the computation - routes = self.create_routes_for_payment( - paysession=paysession, - amount_msat=amount_to_send, - full_path=full_path, - fwd_trampoline_onion=fwd_trampoline_onion, - channels=channels, - ) - # 2. send htlcs - async for sent_htlc_info, cltv_delta, trampoline_onion in routes: - await self.pay_to_route( + try: + while True: + if (amount_to_send := paysession.get_outstanding_amount_to_send()) > 0: + # 1. create a set of routes for remaining amount. + # note: path-finding runs in a separate thread so that we don't block the asyncio loop + # graph updates might occur during the computation + routes = self.create_routes_for_payment( paysession=paysession, - sent_htlc_info=sent_htlc_info, - min_cltv_expiry=cltv_delta, - trampoline_onion=trampoline_onion, + amount_msat=amount_to_send, + full_path=full_path, + fwd_trampoline_onion=fwd_trampoline_onion, + channels=channels, ) - # invoice_status is triggered in self.set_invoice_status when it actually changes. - # It is also triggered here to update progress for a lightning payment in the GUI - # (e.g. attempt counter) - util.trigger_callback('invoice_status', self.wallet, payment_hash.hex(), PR_INFLIGHT) - # 3. await a queue - htlc_log = await paysession.wait_for_one_htlc_to_resolve() # TODO maybe wait a bit, more failures might come - log.append(htlc_log) - if htlc_log.success: - if self.network.path_finder: - # TODO: report every route to liquidity hints for mpp - # in the case of success, we report channels of the - # route as being able to send the same amount in the future, - # as we assume to not know the capacity - self.network.path_finder.update_liquidity_hints(htlc_log.route, htlc_log.amount_msat) - # remove inflight htlcs from liquidity hints - self.network.path_finder.update_inflight_htlcs(htlc_log.route, add_htlcs=False) - return - # htlc failed - if (attempts is not None and len(log) >= attempts) or (attempts is None and time.time() - paysession.start_time > self.PAYMENT_TIMEOUT): - raise PaymentFailure('Giving up after %d attempts'%len(log)) - # if we get a tmp channel failure, it might work to split the amount and try more routes - # if we get a channel update, we might retry the same route and amount - route = htlc_log.route - sender_idx = htlc_log.sender_idx - erring_node_id = route[sender_idx].node_id - failure_msg = htlc_log.failure_msg - code, data = failure_msg.code, failure_msg.data - self.logger.info(f"UPDATE_FAIL_HTLC. code={repr(code)}. " - f"decoded_data={failure_msg.decode_data()}. data={data.hex()!r}") - self.logger.info(f"error reported by {erring_node_id.hex()}") - if code == OnionFailureCode.MPP_TIMEOUT: - raise PaymentFailure(failure_msg.code_name()) - # trampoline - if self.uses_trampoline(): - paysession.handle_failed_trampoline_htlc( - htlc_log=htlc_log, failure_msg=failure_msg) - else: - self.handle_error_code_from_failed_htlc( - route=route, sender_idx=sender_idx, failure_msg=failure_msg, amount=htlc_log.amount_msat) + # 2. send htlcs + async for sent_htlc_info, cltv_delta, trampoline_onion in routes: + await self.pay_to_route( + paysession=paysession, + sent_htlc_info=sent_htlc_info, + min_cltv_expiry=cltv_delta, + trampoline_onion=trampoline_onion, + ) + # invoice_status is triggered in self.set_invoice_status when it actually changes. + # It is also triggered here to update progress for a lightning payment in the GUI + # (e.g. attempt counter) + util.trigger_callback('invoice_status', self.wallet, payment_hash.hex(), PR_INFLIGHT) + # 3. await a queue + htlc_log = await paysession.wait_for_one_htlc_to_resolve() # TODO maybe wait a bit, more failures might come + log.append(htlc_log) + if htlc_log.success: + if self.network.path_finder: + # TODO: report every route to liquidity hints for mpp + # in the case of success, we report channels of the + # route as being able to send the same amount in the future, + # as we assume to not know the capacity + self.network.path_finder.update_liquidity_hints(htlc_log.route, htlc_log.amount_msat) + # remove inflight htlcs from liquidity hints + self.network.path_finder.update_inflight_htlcs(htlc_log.route, add_htlcs=False) + return + # htlc failed + if (attempts is not None and len(log) >= attempts) or (attempts is None and time.time() - paysession.start_time > self.PAYMENT_TIMEOUT): + raise PaymentFailure('Giving up after %d attempts'%len(log)) + # if we get a tmp channel failure, it might work to split the amount and try more routes + # if we get a channel update, we might retry the same route and amount + route = htlc_log.route + sender_idx = htlc_log.sender_idx + erring_node_id = route[sender_idx].node_id + failure_msg = htlc_log.failure_msg + code, data = failure_msg.code, failure_msg.data + self.logger.info(f"UPDATE_FAIL_HTLC. code={repr(code)}. " + f"decoded_data={failure_msg.decode_data()}. data={data.hex()!r}") + self.logger.info(f"error reported by {erring_node_id.hex()}") + if code == OnionFailureCode.MPP_TIMEOUT: + raise PaymentFailure(failure_msg.code_name()) + # trampoline + if self.uses_trampoline(): + paysession.handle_failed_trampoline_htlc( + htlc_log=htlc_log, failure_msg=failure_msg) + else: + self.handle_error_code_from_failed_htlc( + route=route, sender_idx=sender_idx, failure_msg=failure_msg, amount=htlc_log.amount_msat) + finally: + paysession.is_active = False + if paysession.can_be_deleted(): + self._paysessions.pop(payment_key) async def pay_to_route( self, *, @@ -2225,6 +2239,8 @@ class LNWallet(LNWorker): amount_msat=shi.amount_receiver_msat, trampoline_fee_level=shi.trampoline_fee_level) q.put_nowait(htlc_log) + if paysession.can_be_deleted(): + self._paysessions.pop(payment_key) else: key = payment_hash.hex() self.set_invoice_status(key, PR_PAID) @@ -2278,6 +2294,8 @@ class LNWallet(LNWorker): sender_idx=sender_idx, trampoline_fee_level=shi.trampoline_fee_level) q.put_nowait(htlc_log) + if paysession.can_be_deleted(): + self._paysessions.pop(payment_okey) else: self.logger.info(f"received unknown htlc_failed, probably from previous session") key = payment_hash.hex()