Browse Source

simplify network callbacks in lnworker

master
ThomasV 6 years ago
parent
commit
8e08ca7cb1
  1. 7
      electrum/lnwatcher.py
  2. 80
      electrum/lnworker.py

7
electrum/lnwatcher.py

@ -145,7 +145,8 @@ class LNWatcher(AddressSynchronizer):
self.channels = {} self.channels = {}
self.network = network self.network = network
self.network.register_callback(self.on_network_update, self.network.register_callback(self.on_network_update,
['network_updated', 'blockchain_updated', 'verified', 'wallet_updated']) ['network_updated', 'blockchain_updated', 'verified', 'wallet_updated', 'fee'])
# status gets populated when we run # status gets populated when we run
self.channel_status = {} self.channel_status = {}
@ -180,14 +181,14 @@ class LNWatcher(AddressSynchronizer):
funding_height = self.get_tx_height(funding_txid) funding_height = self.get_tx_height(funding_txid)
closing_txid = spenders.get(funding_outpoint) closing_txid = spenders.get(funding_outpoint)
if closing_txid is None: if closing_txid is None:
self.network.trigger_callback('channel_open', funding_outpoint, funding_txid, funding_height) self.network.trigger_callback('update_open_channel', funding_outpoint, funding_txid, funding_height)
else: else:
closing_height = self.get_tx_height(closing_txid) closing_height = self.get_tx_height(closing_txid)
closing_tx = self.db.get_transaction(closing_txid) closing_tx = self.db.get_transaction(closing_txid)
if not closing_tx: if not closing_tx:
self.logger.info(f"channel {funding_outpoint} closed by {closing_txid}. still waiting for tx itself...") self.logger.info(f"channel {funding_outpoint} closed by {closing_txid}. still waiting for tx itself...")
return return
self.network.trigger_callback('channel_closed', funding_outpoint, spenders, self.network.trigger_callback('update_closed_channel', funding_outpoint, spenders,
funding_txid, funding_height, closing_txid, funding_txid, funding_height, closing_txid,
closing_height, closing_tx) # FIXME sooo many args.. closing_height, closing_tx) # FIXME sooo many args..
# TODO: add tests for local_watchtower # TODO: add tests for local_watchtower

80
electrum/lnworker.py

@ -402,17 +402,15 @@ class LNWallet(LNWorker):
self.lnwatcher = LNWatcher(network) self.lnwatcher = LNWatcher(network)
self.lnwatcher.start_network(network) self.lnwatcher.start_network(network)
self.network = network self.network = network
self.network.register_callback(self.on_network_update, ['wallet_updated', 'network_updated', 'verified', 'fee']) # thread safe self.network.register_callback(self.on_update_open_channel, ['update_open_channel'])
self.network.register_callback(self.on_channel_open, ['channel_open']) self.network.register_callback(self.on_update_closed_channel, ['update_closed_channel'])
self.network.register_callback(self.on_channel_closed, ['channel_closed'])
for chan_id, chan in self.channels.items(): for chan_id, chan in self.channels.items():
self.lnwatcher.add_channel(chan.funding_outpoint.to_str(), chan.get_funding_address()) self.lnwatcher.add_channel(chan.funding_outpoint.to_str(), chan.get_funding_address())
super().start_network(network) super().start_network(network)
for coro in [ for coro in [
self.maybe_listen(), self.maybe_listen(),
self.on_network_update('network_updated'), # shortcut (don't block) if funding tx locked and verified self.lnwatcher.on_network_update('network_updated'), # shortcut (don't block) if funding tx locked and verified
self.lnwatcher.on_network_update('network_updated'), # ping watcher to check our channels
self.reestablish_peers_and_channels(), self.reestablish_peers_and_channels(),
self.sync_with_local_watchtower(), self.sync_with_local_watchtower(),
self.sync_with_remote_watchtower(), self.sync_with_remote_watchtower(),
@ -640,7 +638,9 @@ class LNWallet(LNWorker):
if chan.funding_outpoint.to_str() == txo: if chan.funding_outpoint.to_str() == txo:
return chan return chan
def on_channel_open(self, event, funding_outpoint, funding_txid, funding_height): @ignore_exceptions
@log_exceptions
async def on_update_open_channel(self, event, funding_outpoint, funding_txid, funding_height):
chan = self.channel_by_txo(funding_outpoint) chan = self.channel_by_txo(funding_outpoint)
if not chan: if not chan:
return return
@ -651,8 +651,34 @@ class LNWallet(LNWorker):
# send event to GUI # send event to GUI
self.network.trigger_callback('channel', chan) self.network.trigger_callback('channel', chan)
if self.should_channel_be_closed_due_to_expiring_htlcs(chan):
self.logger.info(f"force-closing due to expiring htlcs")
await self.force_close_channel(chan.channel_id)
return
if chan.short_channel_id is None:
self.save_short_chan_id(chan)
if chan.get_state() == "OPENING" and chan.short_channel_id:
peer = self.peers[chan.node_id]
peer.send_funding_locked(chan)
elif chan.get_state() == "OPEN":
peer = self.peers.get(chan.node_id)
if peer is None:
self.logger.info("peer not found for {}".format(bh2u(chan.node_id)))
return
if event == 'fee':
await peer.bitcoin_fee_update(chan)
conf = self.lnwatcher.get_tx_height(chan.funding_outpoint.txid).conf
peer.on_network_update(chan, conf)
elif chan.force_closed and chan.get_state() != 'CLOSED':
txid = chan.force_close_tx().txid()
height = self.lnwatcher.get_tx_height(txid).height
self.logger.info(f"force closing tx {txid}, height {height}")
if height == TX_HEIGHT_LOCAL:
self.logger.info('REBROADCASTING CLOSING TX')
await self.force_close_channel(chan.channel_id)
@log_exceptions @log_exceptions
async def on_channel_closed(self, event, funding_outpoint, spenders, funding_txid, funding_height, closing_txid, closing_height, closing_tx): async def on_update_closed_channel(self, event, funding_outpoint, spenders, funding_txid, funding_height, closing_txid, closing_height, closing_tx):
chan = self.channel_by_txo(funding_outpoint) chan = self.channel_by_txo(funding_outpoint)
if not chan: if not chan:
return return
@ -761,46 +787,6 @@ class LNWallet(LNWorker):
500_000) 500_000)
return total_value_sat > min_value_worth_closing_channel_over_sat return total_value_sat > min_value_worth_closing_channel_over_sat
@ignore_exceptions
@log_exceptions
async def on_network_update(self, event, *args):
# TODO
# Race discovered in save_channel (assertion failing):
# since short_channel_id could be changed while saving.
with self.lock:
channels = list(self.channels.values())
if event in ('verified', 'wallet_updated'):
if args[0] != self.lnwatcher:
return
for chan in channels:
if chan.is_closed():
continue
if chan.get_state() != 'CLOSED' and self.should_channel_be_closed_due_to_expiring_htlcs(chan):
self.logger.info(f"force-closing due to expiring htlcs")
await self.force_close_channel(chan.channel_id)
continue
if chan.short_channel_id is None:
self.save_short_chan_id(chan)
if chan.get_state() == "OPENING" and chan.short_channel_id:
peer = self.peers[chan.node_id]
peer.send_funding_locked(chan)
elif chan.get_state() == "OPEN":
peer = self.peers.get(chan.node_id)
if peer is None:
self.logger.info("peer not found for {}".format(bh2u(chan.node_id)))
return
if event == 'fee':
await peer.bitcoin_fee_update(chan)
conf = self.lnwatcher.get_tx_height(chan.funding_outpoint.txid).conf
peer.on_network_update(chan, conf)
elif chan.force_closed and chan.get_state() != 'CLOSED':
txid = chan.force_close_tx().txid()
height = self.lnwatcher.get_tx_height(txid).height
self.logger.info(f"force closing tx {txid}, height {height}")
if height == TX_HEIGHT_LOCAL:
self.logger.info('REBROADCASTING CLOSING TX')
await self.force_close_channel(chan.channel_id)
@log_exceptions @log_exceptions
async def _open_channel_coroutine(self, connect_str, funding_tx, funding_sat, push_sat, password): async def _open_channel_coroutine(self, connect_str, funding_tx, funding_sat, push_sat, password):
peer = await self.add_peer(connect_str) peer = await self.add_peer(connect_str)

Loading…
Cancel
Save