Browse Source

Move callback manager out of Network class

master
ThomasV 6 years ago
parent
commit
9224404108
  1. 8
      electrum/address_synchronizer.py
  2. 6
      electrum/channel_db.py
  3. 3
      electrum/daemon.py
  4. 10
      electrum/exchange_rate.py
  5. 30
      electrum/gui/kivy/main_window.py
  6. 9
      electrum/gui/qt/channel_details.py
  7. 7
      electrum/gui/qt/lightning_dialog.py
  8. 9
      electrum/gui/qt/main_window.py
  9. 4
      electrum/gui/qt/network_dialog.py
  10. 3
      electrum/gui/stdio.py
  11. 4
      electrum/gui/text.py
  12. 6
      electrum/interface.py
  13. 8
      electrum/lnchannel.py
  14. 8
      electrum/lnpeer.py
  15. 8
      electrum/lnwatcher.py
  16. 60
      electrum/lnworker.py
  17. 47
      electrum/network.py
  18. 5
      electrum/synchronizer.py
  19. 40
      electrum/util.py

8
electrum/address_synchronizer.py

@ -28,7 +28,7 @@ import itertools
from collections import defaultdict from collections import defaultdict
from typing import TYPE_CHECKING, Dict, Optional, Set, Tuple, NamedTuple, Sequence, List from typing import TYPE_CHECKING, Dict, Optional, Set, Tuple, NamedTuple, Sequence, List
from . import bitcoin from . import bitcoin, util
from .bitcoin import COINBASE_MATURITY from .bitcoin import COINBASE_MATURITY
from .util import profiler, bfh, TxMinedInfo from .util import profiler, bfh, TxMinedInfo
from .transaction import Transaction, TxOutput, TxInput, PartialTxInput, TxOutpoint, PartialTransaction from .transaction import Transaction, TxOutput, TxInput, PartialTxInput, TxOutpoint, PartialTransaction
@ -161,7 +161,7 @@ class AddressSynchronizer(Logger):
if self.network is not None: if self.network is not None:
self.synchronizer = Synchronizer(self) self.synchronizer = Synchronizer(self)
self.verifier = SPV(self.network, self) self.verifier = SPV(self.network, self)
self.network.register_callback(self.on_blockchain_updated, ['blockchain_updated']) util.register_callback(self.on_blockchain_updated, ['blockchain_updated'])
def on_blockchain_updated(self, event, *args): def on_blockchain_updated(self, event, *args):
self._get_addr_balance_cache = {} # invalidate cache self._get_addr_balance_cache = {} # invalidate cache
@ -174,7 +174,7 @@ class AddressSynchronizer(Logger):
if self.verifier: if self.verifier:
asyncio.run_coroutine_threadsafe(self.verifier.stop(), self.network.asyncio_loop) asyncio.run_coroutine_threadsafe(self.verifier.stop(), self.network.asyncio_loop)
self.verifier = None self.verifier = None
self.network.unregister_callback(self.on_blockchain_updated) util.unregister_callback(self.on_blockchain_updated)
self.db.put('stored_height', self.get_local_height()) self.db.put('stored_height', self.get_local_height())
def add_address(self, address): def add_address(self, address):
@ -546,7 +546,7 @@ class AddressSynchronizer(Logger):
self.unverified_tx.pop(tx_hash, None) self.unverified_tx.pop(tx_hash, None)
self.db.add_verified_tx(tx_hash, info) self.db.add_verified_tx(tx_hash, info)
tx_mined_status = self.get_tx_height(tx_hash) tx_mined_status = self.get_tx_height(tx_hash)
self.network.trigger_callback('verified', self, tx_hash, tx_mined_status) util.trigger_callback('verified', self, tx_hash, tx_mined_status)
def get_unverified_txs(self): def get_unverified_txs(self):
'''Returns a map from tx hash to transaction height''' '''Returns a map from tx hash to transaction height'''

6
electrum/channel_db.py

@ -35,7 +35,7 @@ import threading
from .sql_db import SqlDB, sql from .sql_db import SqlDB, sql
from . import constants from . import constants, util
from .util import bh2u, profiler, get_headers_dir, bfh, is_ip_address, list_enabled_bits from .util import bh2u, profiler, get_headers_dir, bfh, is_ip_address, list_enabled_bits
from .logging import Logger from .logging import Logger
from .lnutil import (LNPeerAddr, format_short_channel_id, ShortChannelID, from .lnutil import (LNPeerAddr, format_short_channel_id, ShortChannelID,
@ -269,8 +269,8 @@ class ChannelDB(SqlDB):
self.num_nodes = len(self._nodes) self.num_nodes = len(self._nodes)
self.num_channels = len(self._channels) self.num_channels = len(self._channels)
self.num_policies = len(self._policies) self.num_policies = len(self._policies)
self.network.trigger_callback('channel_db', self.num_nodes, self.num_channels, self.num_policies) util.trigger_callback('channel_db', self.num_nodes, self.num_channels, self.num_policies)
self.network.trigger_callback('ln_gossip_sync_progress') util.trigger_callback('ln_gossip_sync_progress')
def get_channel_ids(self): def get_channel_ids(self):
with self.lock: with self.lock:

3
electrum/daemon.py

@ -41,6 +41,7 @@ from jsonrpcserver import response
from jsonrpcclient.clients.aiohttp_client import AiohttpClient from jsonrpcclient.clients.aiohttp_client import AiohttpClient
from aiorpcx import TaskGroup from aiorpcx import TaskGroup
from . import util
from .network import Network from .network import Network
from .util import (json_decode, to_bytes, to_string, profiler, standardize_path, constant_time_compare) from .util import (json_decode, to_bytes, to_string, profiler, standardize_path, constant_time_compare)
from .util import PR_PAID, PR_EXPIRED, get_request_status from .util import PR_PAID, PR_EXPIRED, get_request_status
@ -181,7 +182,7 @@ class PayServer(Logger):
self.daemon = daemon self.daemon = daemon
self.config = daemon.config self.config = daemon.config
self.pending = defaultdict(asyncio.Event) self.pending = defaultdict(asyncio.Event)
self.daemon.network.register_callback(self.on_payment, ['payment_received']) util.register_callback(self.on_payment, ['payment_received'])
async def on_payment(self, evt, wallet, key, status): async def on_payment(self, evt, wallet, key, status):
if status == PR_PAID: if status == PR_PAID:

10
electrum/exchange_rate.py

@ -12,6 +12,7 @@ from typing import Sequence, Optional
from aiorpcx.curio import timeout_after, TaskTimeout, TaskGroup from aiorpcx.curio import timeout_after, TaskTimeout, TaskGroup
from . import util
from .bitcoin import COIN from .bitcoin import COIN
from .i18n import _ from .i18n import _
from .util import (ThreadJob, make_dir, log_exceptions, from .util import (ThreadJob, make_dir, log_exceptions,
@ -456,8 +457,7 @@ class FxThread(ThreadJob):
ThreadJob.__init__(self) ThreadJob.__init__(self)
self.config = config self.config = config
self.network = network self.network = network
if self.network: util.register_callback(self.set_proxy, ['proxy_set'])
self.network.register_callback(self.set_proxy, ['proxy_set'])
self.ccy = self.get_currency() self.ccy = self.get_currency()
self.history_used_spot = False self.history_used_spot = False
self.ccy_combo = None self.ccy_combo = None
@ -567,12 +567,10 @@ class FxThread(ThreadJob):
self.exchange.read_historical_rates(self.ccy, self.cache_dir) self.exchange.read_historical_rates(self.ccy, self.cache_dir)
def on_quotes(self): def on_quotes(self):
if self.network: util.trigger_callback('on_quotes')
self.network.trigger_callback('on_quotes')
def on_history(self): def on_history(self):
if self.network: util.trigger_callback('on_history')
self.network.trigger_callback('on_history')
def exchange_rate(self) -> Decimal: def exchange_rate(self) -> Decimal:
"""Returns the exchange rate as a Decimal""" """Returns the exchange rate as a Decimal"""

30
electrum/gui/kivy/main_window.py

@ -13,6 +13,7 @@ from electrum.storage import WalletStorage, StorageReadWriteError
from electrum.wallet_db import WalletDB from electrum.wallet_db import WalletDB
from electrum.wallet import Wallet, InternalAddressCorruption, Abstract_Wallet from electrum.wallet import Wallet, InternalAddressCorruption, Abstract_Wallet
from electrum.plugin import run_hook from electrum.plugin import run_hook
from electrum import util
from electrum.util import (profiler, InvalidPassword, send_exception_to_crash_reporter, from electrum.util import (profiler, InvalidPassword, send_exception_to_crash_reporter,
format_satoshis, format_satoshis_plain, format_fee_satoshis, format_satoshis, format_satoshis_plain, format_fee_satoshis,
PR_PAID, PR_FAILED, maybe_extract_bolt11_invoice) PR_PAID, PR_FAILED, maybe_extract_bolt11_invoice)
@ -50,7 +51,6 @@ from .uix.dialogs.question import Question
# delayed imports: for startup speed on android # delayed imports: for startup speed on android
notification = app = ref = None notification = app = ref = None
util = False
# register widget cache for keeping memory down timeout to forever to cache # register widget cache for keeping memory down timeout to forever to cache
# the data # the data
@ -565,20 +565,20 @@ class ElectrumWindow(App):
if self.network: if self.network:
interests = ['wallet_updated', 'network_updated', 'blockchain_updated', interests = ['wallet_updated', 'network_updated', 'blockchain_updated',
'status', 'new_transaction', 'verified'] 'status', 'new_transaction', 'verified']
self.network.register_callback(self.on_network_event, interests) util.register_callback(self.on_network_event, interests)
self.network.register_callback(self.on_fee, ['fee']) util.register_callback(self.on_fee, ['fee'])
self.network.register_callback(self.on_fee_histogram, ['fee_histogram']) util.register_callback(self.on_fee_histogram, ['fee_histogram'])
self.network.register_callback(self.on_quotes, ['on_quotes']) util.register_callback(self.on_quotes, ['on_quotes'])
self.network.register_callback(self.on_history, ['on_history']) util.register_callback(self.on_history, ['on_history'])
self.network.register_callback(self.on_channels, ['channels_updated']) util.register_callback(self.on_channels, ['channels_updated'])
self.network.register_callback(self.on_channel, ['channel']) util.register_callback(self.on_channel, ['channel'])
self.network.register_callback(self.on_invoice_status, ['invoice_status']) util.register_callback(self.on_invoice_status, ['invoice_status'])
self.network.register_callback(self.on_request_status, ['request_status']) util.register_callback(self.on_request_status, ['request_status'])
self.network.register_callback(self.on_payment_failed, ['payment_failed']) util.register_callback(self.on_payment_failed, ['payment_failed'])
self.network.register_callback(self.on_payment_succeeded, ['payment_succeeded']) util.register_callback(self.on_payment_succeeded, ['payment_succeeded'])
self.network.register_callback(self.on_channel_db, ['channel_db']) util.register_callback(self.on_channel_db, ['channel_db'])
self.network.register_callback(self.set_num_peers, ['gossip_peers']) util.register_callback(self.set_num_peers, ['gossip_peers'])
self.network.register_callback(self.set_unknown_channels, ['unknown_channels']) util.register_callback(self.set_unknown_channels, ['unknown_channels'])
# load wallet # load wallet
self.load_wallet_by_name(self.electrum_config.get_wallet_path(use_gui_last_wallet=True)) self.load_wallet_by_name(self.electrum_config.get_wallet_path(use_gui_last_wallet=True))
# URI passed in config # URI passed in config

9
electrum/gui/qt/channel_details.py

@ -5,6 +5,7 @@ import PyQt5.QtWidgets as QtWidgets
import PyQt5.QtCore as QtCore import PyQt5.QtCore as QtCore
from PyQt5.QtWidgets import QLabel, QLineEdit from PyQt5.QtWidgets import QLabel, QLineEdit
from electrum import util
from electrum.i18n import _ from electrum.i18n import _
from electrum.util import bh2u, format_time from electrum.util import bh2u, format_time
from electrum.lnutil import format_short_channel_id, LOCAL, REMOTE, UpdateAddHtlc, Direction from electrum.lnutil import format_short_channel_id, LOCAL, REMOTE, UpdateAddHtlc, Direction
@ -132,10 +133,10 @@ class ChannelDetailsDialog(QtWidgets.QDialog):
self.htlc_added.connect(self.do_htlc_added) self.htlc_added.connect(self.do_htlc_added)
# register callbacks for updating # register callbacks for updating
window.network.register_callback(self.ln_payment_completed.emit, ['ln_payment_completed']) util.register_callback(self.ln_payment_completed.emit, ['ln_payment_completed'])
window.network.register_callback(self.ln_payment_failed.emit, ['ln_payment_failed']) util.register_callback(self.ln_payment_failed.emit, ['ln_payment_failed'])
window.network.register_callback(self.htlc_added.emit, ['htlc_added']) util.register_callback(self.htlc_added.emit, ['htlc_added'])
window.network.register_callback(self.state_changed.emit, ['channel']) util.register_callback(self.state_changed.emit, ['channel'])
# set attributes of QDialog # set attributes of QDialog
self.setWindowTitle(_('Channel Details')) self.setWindowTitle(_('Channel Details'))

7
electrum/gui/qt/lightning_dialog.py

@ -27,6 +27,7 @@ from typing import TYPE_CHECKING
from PyQt5.QtWidgets import (QDialog, QLabel, QVBoxLayout, QPushButton) from PyQt5.QtWidgets import (QDialog, QLabel, QVBoxLayout, QPushButton)
from electrum import util
from electrum.i18n import _ from electrum.i18n import _
from .util import Buttons from .util import Buttons
@ -58,9 +59,9 @@ class LightningDialog(QDialog):
b = QPushButton(_('Close')) b = QPushButton(_('Close'))
b.clicked.connect(self.close) b.clicked.connect(self.close)
vbox.addLayout(Buttons(b)) vbox.addLayout(Buttons(b))
self.network.register_callback(self.on_channel_db, ['channel_db']) util.register_callback(self.on_channel_db, ['channel_db'])
self.network.register_callback(self.set_num_peers, ['gossip_peers']) util.register_callback(self.set_num_peers, ['gossip_peers'])
self.network.register_callback(self.set_unknown_channels, ['unknown_channels']) util.register_callback(self.set_unknown_channels, ['unknown_channels'])
self.network.channel_db.update_counts() # trigger callback self.network.channel_db.update_counts() # trigger callback
self.set_num_peers('', self.network.lngossip.num_peers()) self.set_num_peers('', self.network.lngossip.num_peers())
self.set_unknown_channels('', len(self.network.lngossip.unknown_ids)) self.set_unknown_channels('', len(self.network.lngossip.unknown_ids))

9
electrum/gui/qt/main_window.py

@ -272,7 +272,7 @@ class ElectrumWindow(QMainWindow, MessageBoxMixin, Logger):
# window from being GC-ed when closed, callbacks should be # window from being GC-ed when closed, callbacks should be
# methods of this class only, and specifically not be # methods of this class only, and specifically not be
# partials, lambdas or methods of subobjects. Hence... # partials, lambdas or methods of subobjects. Hence...
self.network.register_callback(self.on_network, interests) util.register_callback(self.on_network, interests)
# set initial message # set initial message
self.console.showMessage(self.network.banner) self.console.showMessage(self.network.banner)
@ -466,8 +466,8 @@ class ElectrumWindow(QMainWindow, MessageBoxMixin, Logger):
def load_wallet(self, wallet): def load_wallet(self, wallet):
wallet.thread = TaskThread(self, self.on_error) wallet.thread = TaskThread(self, self.on_error)
self.update_recently_visited(wallet.storage.path) self.update_recently_visited(wallet.storage.path)
if wallet.lnworker and wallet.network: if wallet.lnworker:
wallet.network.trigger_callback('channels_updated', wallet) util.trigger_callback('channels_updated', wallet)
self.need_update.set() self.need_update.set()
# Once GUI has been initialized check if we want to announce something since the callback has been called before the GUI was initialized # Once GUI has been initialized check if we want to announce something since the callback has been called before the GUI was initialized
# update menus # update menus
@ -2889,8 +2889,7 @@ class ElectrumWindow(QMainWindow, MessageBoxMixin, Logger):
def clean_up(self): def clean_up(self):
self.wallet.thread.stop() self.wallet.thread.stop()
if self.network: util.unregister_callback(self.on_network)
self.network.unregister_callback(self.on_network)
self.config.set_key("is_maximized", self.isMaximized()) self.config.set_key("is_maximized", self.isMaximized())
if not self.isMaximized(): if not self.isMaximized():
g = self.geometry() g = self.geometry()

4
electrum/gui/qt/network_dialog.py

@ -35,7 +35,7 @@ from PyQt5.QtWidgets import (QTreeWidget, QTreeWidgetItem, QMenu, QGridLayout, Q
from PyQt5.QtGui import QFontMetrics from PyQt5.QtGui import QFontMetrics
from electrum.i18n import _ from electrum.i18n import _
from electrum import constants, blockchain from electrum import constants, blockchain, util
from electrum.interface import serialize_server, deserialize_server from electrum.interface import serialize_server, deserialize_server
from electrum.network import Network from electrum.network import Network
from electrum.logging import get_logger from electrum.logging import get_logger
@ -61,7 +61,7 @@ class NetworkDialog(QDialog):
vbox.addLayout(Buttons(CloseButton(self))) vbox.addLayout(Buttons(CloseButton(self)))
self.network_updated_signal_obj.network_updated_signal.connect( self.network_updated_signal_obj.network_updated_signal.connect(
self.on_update) self.on_update)
network.register_callback(self.on_network, ['network_updated']) util.register_callback(self.on_network, ['network_updated'])
def on_network(self, event, *args): def on_network(self, event, *args):
self.network_updated_signal_obj.network_updated_signal.emit(event, args) self.network_updated_signal_obj.network_updated_signal.emit(event, args)

3
electrum/gui/stdio.py

@ -3,6 +3,7 @@ import getpass
import datetime import datetime
import logging import logging
from electrum import util
from electrum import WalletStorage, Wallet from electrum import WalletStorage, Wallet
from electrum.util import format_satoshis from electrum.util import format_satoshis
from electrum.bitcoin import is_address, COIN from electrum.bitcoin import is_address, COIN
@ -43,7 +44,7 @@ class ElectrumGui:
self.wallet.start_network(self.network) self.wallet.start_network(self.network)
self.contacts = self.wallet.contacts self.contacts = self.wallet.contacts
self.network.register_callback(self.on_network, ['wallet_updated', 'network_updated', 'banner']) util.register_callback(self.on_network, ['wallet_updated', 'network_updated', 'banner'])
self.commands = [_("[h] - displays this help text"), \ self.commands = [_("[h] - displays this help text"), \
_("[i] - display transaction history"), \ _("[i] - display transaction history"), \
_("[o] - enter payment order"), \ _("[o] - enter payment order"), \

4
electrum/gui/text.py

@ -8,6 +8,7 @@ import getpass
import logging import logging
import electrum import electrum
from electrum import util
from electrum.util import format_satoshis from electrum.util import format_satoshis
from electrum.bitcoin import is_address, COIN from electrum.bitcoin import is_address, COIN
from electrum.transaction import PartialTxOutput from electrum.transaction import PartialTxOutput
@ -65,8 +66,7 @@ class ElectrumGui:
self.str_fee = "" self.str_fee = ""
self.history = None self.history = None
if self.network: util.register_callback(self.update, ['wallet_updated', 'network_updated'])
self.network.register_callback(self.update, ['wallet_updated', 'network_updated'])
self.tab_names = [_("History"), _("Send"), _("Receive"), _("Addresses"), _("Contacts"), _("Banner")] self.tab_names = [_("History"), _("Send"), _("Receive"), _("Addresses"), _("Contacts"), _("Banner")]
self.num_tabs = len(self.tab_names) self.num_tabs = len(self.tab_names)

6
electrum/interface.py

@ -548,7 +548,7 @@ class Interface(Logger):
raise GracefulDisconnect('server tip below max checkpoint') raise GracefulDisconnect('server tip below max checkpoint')
self._mark_ready() self._mark_ready()
await self._process_header_at_tip() await self._process_header_at_tip()
self.network.trigger_callback('network_updated') util.trigger_callback('network_updated')
await self.network.switch_unwanted_fork_interface() await self.network.switch_unwanted_fork_interface()
await self.network.switch_lagging_interface() await self.network.switch_lagging_interface()
@ -563,7 +563,7 @@ class Interface(Logger):
# in the simple case, height == self.tip+1 # in the simple case, height == self.tip+1
if height <= self.tip: if height <= self.tip:
await self.sync_until(height) await self.sync_until(height)
self.network.trigger_callback('blockchain_updated') util.trigger_callback('blockchain_updated')
async def sync_until(self, height, next_height=None): async def sync_until(self, height, next_height=None):
if next_height is None: if next_height is None:
@ -578,7 +578,7 @@ class Interface(Logger):
raise GracefulDisconnect('server chain conflicts with checkpoints or genesis') raise GracefulDisconnect('server chain conflicts with checkpoints or genesis')
last, height = await self.step(height) last, height = await self.step(height)
continue continue
self.network.trigger_callback('network_updated') util.trigger_callback('network_updated')
height = (height // 2016 * 2016) + num_headers height = (height // 2016 * 2016) + num_headers
assert height <= next_height+1, (height, self.tip) assert height <= next_height+1, (height, self.tip)
last = 'catchup' last = 'catchup'

8
electrum/lnchannel.py

@ -33,7 +33,7 @@ from aiorpcx import NetAddress
import attr import attr
from . import ecc from . import ecc
from . import constants from . import constants, util
from .util import bfh, bh2u, chunks, TxMinedInfo from .util import bfh, bh2u, chunks, TxMinedInfo
from .bitcoin import redeem_script_to_address from .bitcoin import redeem_script_to_address
from .crypto import sha256, sha256d from .crypto import sha256, sha256d
@ -679,16 +679,14 @@ class Channel(AbstractChannel):
def set_frozen_for_sending(self, b: bool) -> None: def set_frozen_for_sending(self, b: bool) -> None:
self.storage['frozen_for_sending'] = bool(b) self.storage['frozen_for_sending'] = bool(b)
if self.lnworker: util.trigger_callback('channel', self)
self.lnworker.network.trigger_callback('channel', self)
def is_frozen_for_receiving(self) -> bool: def is_frozen_for_receiving(self) -> bool:
return self.storage.get('frozen_for_receiving', False) return self.storage.get('frozen_for_receiving', False)
def set_frozen_for_receiving(self, b: bool) -> None: def set_frozen_for_receiving(self, b: bool) -> None:
self.storage['frozen_for_receiving'] = bool(b) self.storage['frozen_for_receiving'] = bool(b)
if self.lnworker: util.trigger_callback('channel', self)
self.lnworker.network.trigger_callback('channel', self)
def _assert_can_add_htlc(self, *, htlc_proposer: HTLCOwner, amount_msat: int) -> None: def _assert_can_add_htlc(self, *, htlc_proposer: HTLCOwner, amount_msat: int) -> None:
"""Raises PaymentFailure if the htlc_proposer cannot add this new HTLC. """Raises PaymentFailure if the htlc_proposer cannot add this new HTLC.

8
electrum/lnpeer.py

@ -19,7 +19,7 @@ from datetime import datetime
import aiorpcx import aiorpcx
from .crypto import sha256, sha256d from .crypto import sha256, sha256d
from . import bitcoin from . import bitcoin, util
from . import ecc from . import ecc
from .ecc import sig_string_from_r_and_s, get_r_and_s_from_sig_string, der_sig_from_sig_string from .ecc import sig_string_from_r_and_s, get_r_and_s_from_sig_string, der_sig_from_sig_string
from . import constants from . import constants
@ -744,7 +744,7 @@ class Peer(Logger):
f'already in peer_state {chan.peer_state}') f'already in peer_state {chan.peer_state}')
return return
chan.peer_state = PeerState.REESTABLISHING chan.peer_state = PeerState.REESTABLISHING
self.network.trigger_callback('channel', chan) util.trigger_callback('channel', chan)
# BOLT-02: "A node [...] upon disconnection [...] MUST reverse any uncommitted updates sent by the other side" # BOLT-02: "A node [...] upon disconnection [...] MUST reverse any uncommitted updates sent by the other side"
chan.hm.discard_unsigned_remote_updates() chan.hm.discard_unsigned_remote_updates()
# ctns # ctns
@ -891,7 +891,7 @@ class Peer(Logger):
# checks done # checks done
if chan.is_funded() and chan.config[LOCAL].funding_locked_received: if chan.is_funded() and chan.config[LOCAL].funding_locked_received:
self.mark_open(chan) self.mark_open(chan)
self.network.trigger_callback('channel', chan) util.trigger_callback('channel', chan)
if chan.get_state() == ChannelState.CLOSING: if chan.get_state() == ChannelState.CLOSING:
await self.send_shutdown(chan) await self.send_shutdown(chan)
@ -979,7 +979,7 @@ class Peer(Logger):
return return
assert chan.config[LOCAL].funding_locked_received assert chan.config[LOCAL].funding_locked_received
chan.set_state(ChannelState.OPEN) chan.set_state(ChannelState.OPEN)
self.network.trigger_callback('channel', chan) util.trigger_callback('channel', chan)
# peer may have sent us a channel update for the incoming direction previously # peer may have sent us a channel update for the incoming direction previously
pending_channel_update = self.orphan_channel_updates.get(chan.short_channel_id) pending_channel_update = self.orphan_channel_updates.get(chan.short_channel_id)
if pending_channel_update: if pending_channel_update:

8
electrum/lnwatcher.py

@ -8,6 +8,7 @@ import asyncio
from enum import IntEnum, auto from enum import IntEnum, auto
from typing import NamedTuple, Dict from typing import NamedTuple, Dict
from . import util
from .sql_db import SqlDB, sql from .sql_db import SqlDB, sql
from .wallet_db import WalletDB from .wallet_db import WalletDB
from .util import bh2u, bfh, log_exceptions, ignore_exceptions, TxMinedInfo from .util import bh2u, bfh, log_exceptions, ignore_exceptions, TxMinedInfo
@ -139,8 +140,9 @@ class LNWatcher(AddressSynchronizer):
self.config = network.config self.config = network.config
self.channels = {} self.channels = {}
self.network = network self.network = network
self.network.register_callback(self.on_network_update, util.register_callback(
['network_updated', 'blockchain_updated', 'verified', 'wallet_updated', 'fee']) self.on_network_update,
['network_updated', 'blockchain_updated', 'verified', 'wallet_updated', 'fee'])
# status gets populated when we run # status gets populated when we run
self.channel_status = {} self.channel_status = {}
@ -420,4 +422,4 @@ class LNWalletWatcher(LNWatcher):
tx_was_added = False tx_was_added = False
if tx_was_added: if tx_was_added:
self.logger.info(f'added future tx: {name}. prevout: {prevout}') self.logger.info(f'added future tx: {name}. prevout: {prevout}')
self.network.trigger_callback('wallet_updated', self.lnworker.wallet) util.trigger_callback('wallet_updated', self.lnworker.wallet)

60
electrum/lnworker.py

@ -21,7 +21,7 @@ import dns.resolver
import dns.exception import dns.exception
from aiorpcx import run_in_thread from aiorpcx import run_in_thread
from . import constants from . import constants, util
from . import keystore from . import keystore
from .util import profiler from .util import profiler
from .util import PR_UNPAID, PR_EXPIRED, PR_PAID, PR_INFLIGHT, PR_FAILED, PR_ROUTING from .util import PR_UNPAID, PR_EXPIRED, PR_PAID, PR_INFLIGHT, PR_FAILED, PR_ROUTING
@ -367,7 +367,7 @@ class LNGossip(LNWorker):
max_age = 14*24*3600 max_age = 14*24*3600
LOGGING_SHORTCUT = 'g' LOGGING_SHORTCUT = 'g'
def __init__(self, network): def __init__(self):
seed = os.urandom(32) seed = os.urandom(32)
node = BIP32Node.from_rootseed(seed, xtype='standard') node = BIP32Node.from_rootseed(seed, xtype='standard')
xprv = node.to_xprv() xprv = node.to_xprv()
@ -393,16 +393,16 @@ class LNGossip(LNWorker):
known = self.channel_db.get_channel_ids() known = self.channel_db.get_channel_ids()
new = set(ids) - set(known) new = set(ids) - set(known)
self.unknown_ids.update(new) self.unknown_ids.update(new)
self.network.trigger_callback('unknown_channels', len(self.unknown_ids)) util.trigger_callback('unknown_channels', len(self.unknown_ids))
self.network.trigger_callback('gossip_peers', self.num_peers()) util.trigger_callback('gossip_peers', self.num_peers())
self.network.trigger_callback('ln_gossip_sync_progress') util.trigger_callback('ln_gossip_sync_progress')
def get_ids_to_query(self): def get_ids_to_query(self):
N = 500 N = 500
l = list(self.unknown_ids) l = list(self.unknown_ids)
self.unknown_ids = set(l[N:]) self.unknown_ids = set(l[N:])
self.network.trigger_callback('unknown_channels', len(self.unknown_ids)) util.trigger_callback('unknown_channels', len(self.unknown_ids))
self.network.trigger_callback('ln_gossip_sync_progress') util.trigger_callback('ln_gossip_sync_progress')
return l[0:N] return l[0:N]
def get_sync_progress_estimate(self) -> Tuple[Optional[int], Optional[int]]: def get_sync_progress_estimate(self) -> Tuple[Optional[int], Optional[int]]:
@ -514,7 +514,7 @@ class LNWallet(LNWorker):
def peer_closed(self, peer): def peer_closed(self, peer):
for chan in self.channels_for_peer(peer.pubkey).values(): for chan in self.channels_for_peer(peer.pubkey).values():
chan.peer_state = PeerState.DISCONNECTED chan.peer_state = PeerState.DISCONNECTED
self.network.trigger_callback('channel', chan) util.trigger_callback('channel', chan)
super().peer_closed(peer) super().peer_closed(peer)
def get_settled_payments(self): def get_settled_payments(self):
@ -645,14 +645,14 @@ class LNWallet(LNWorker):
def channel_state_changed(self, chan): def channel_state_changed(self, chan):
self.save_channel(chan) self.save_channel(chan)
self.network.trigger_callback('channel', chan) util.trigger_callback('channel', chan)
def save_channel(self, chan): def save_channel(self, chan):
assert type(chan) is Channel assert type(chan) is Channel
if chan.config[REMOTE].next_per_commitment_point == chan.config[REMOTE].current_per_commitment_point: if chan.config[REMOTE].next_per_commitment_point == chan.config[REMOTE].current_per_commitment_point:
raise Exception("Tried to save channel with next_point == current_point, this should not happen") raise Exception("Tried to save channel with next_point == current_point, this should not happen")
self.wallet.save_db() self.wallet.save_db()
self.network.trigger_callback('channel', chan) util.trigger_callback('channel', chan)
def channel_by_txo(self, txo): def channel_by_txo(self, txo):
with self.lock: with self.lock:
@ -703,7 +703,7 @@ class LNWallet(LNWorker):
funding_sat=funding_sat, funding_sat=funding_sat,
push_msat=push_sat * 1000, push_msat=push_sat * 1000,
temp_channel_id=os.urandom(32)) temp_channel_id=os.urandom(32))
self.network.trigger_callback('channels_updated', self.wallet) util.trigger_callback('channels_updated', self.wallet)
self.wallet.add_transaction(funding_tx) # save tx as local into the wallet self.wallet.add_transaction(funding_tx) # save tx as local into the wallet
self.wallet.set_label(funding_tx.txid(), _('Open channel')) self.wallet.set_label(funding_tx.txid(), _('Open channel'))
if funding_tx.is_complete(): if funding_tx.is_complete():
@ -804,10 +804,10 @@ class LNWallet(LNWorker):
# note: path-finding runs in a separate thread so that we don't block the asyncio loop # note: path-finding runs in a separate thread so that we don't block the asyncio loop
# graph updates might occur during the computation # graph updates might occur during the computation
self.set_invoice_status(key, PR_ROUTING) self.set_invoice_status(key, PR_ROUTING)
self.network.trigger_callback('invoice_status', key) util.trigger_callback('invoice_status', key)
route = await run_in_thread(self._create_route_from_invoice, lnaddr) route = await run_in_thread(self._create_route_from_invoice, lnaddr)
self.set_invoice_status(key, PR_INFLIGHT) self.set_invoice_status(key, PR_INFLIGHT)
self.network.trigger_callback('invoice_status', key) util.trigger_callback('invoice_status', key)
payment_attempt_log = await self._pay_to_route(route, lnaddr) payment_attempt_log = await self._pay_to_route(route, lnaddr)
except Exception as e: except Exception as e:
log.append(PaymentAttemptLog(success=False, exception=e)) log.append(PaymentAttemptLog(success=False, exception=e))
@ -820,11 +820,11 @@ class LNWallet(LNWorker):
break break
else: else:
reason = _('Failed after {} attempts').format(attempts) reason = _('Failed after {} attempts').format(attempts)
self.network.trigger_callback('invoice_status', key) util.trigger_callback('invoice_status', key)
if success: if success:
self.network.trigger_callback('payment_succeeded', key) util.trigger_callback('payment_succeeded', key)
else: else:
self.network.trigger_callback('payment_failed', key, reason) util.trigger_callback('payment_failed', key, reason)
return success return success
async def _pay_to_route(self, route: LNPaymentRoute, lnaddr: LnAddr) -> PaymentAttemptLog: async def _pay_to_route(self, route: LNPaymentRoute, lnaddr: LnAddr) -> PaymentAttemptLog:
@ -840,7 +840,7 @@ class LNWallet(LNWorker):
payment_hash=lnaddr.paymenthash, payment_hash=lnaddr.paymenthash,
min_final_cltv_expiry=lnaddr.get_min_final_cltv_expiry(), min_final_cltv_expiry=lnaddr.get_min_final_cltv_expiry(),
payment_secret=lnaddr.payment_secret) payment_secret=lnaddr.payment_secret)
self.network.trigger_callback('htlc_added', htlc, lnaddr, SENT) util.trigger_callback('htlc_added', htlc, lnaddr, SENT)
payment_attempt = await self.await_payment(lnaddr.paymenthash) payment_attempt = await self.await_payment(lnaddr.paymenthash)
if payment_attempt.success: if payment_attempt.success:
failure_log = None failure_log = None
@ -1139,9 +1139,9 @@ class LNWallet(LNWorker):
f.set_result(payment_attempt) f.set_result(payment_attempt)
else: else:
chan.logger.info('received unexpected payment_failed, probably from previous session') chan.logger.info('received unexpected payment_failed, probably from previous session')
self.network.trigger_callback('invoice_status', key) util.trigger_callback('invoice_status', key)
self.network.trigger_callback('payment_failed', key, '') util.trigger_callback('payment_failed', key, '')
self.network.trigger_callback('ln_payment_failed', payment_hash, chan.channel_id) util.trigger_callback('ln_payment_failed', payment_hash, chan.channel_id)
def payment_sent(self, chan, payment_hash: bytes): def payment_sent(self, chan, payment_hash: bytes):
self.set_payment_status(payment_hash, PR_PAID) self.set_payment_status(payment_hash, PR_PAID)
@ -1155,14 +1155,14 @@ class LNWallet(LNWorker):
f.set_result(payment_attempt) f.set_result(payment_attempt)
else: else:
chan.logger.info('received unexpected payment_sent, probably from previous session') chan.logger.info('received unexpected payment_sent, probably from previous session')
self.network.trigger_callback('invoice_status', key) util.trigger_callback('invoice_status', key)
self.network.trigger_callback('payment_succeeded', key) util.trigger_callback('payment_succeeded', key)
self.network.trigger_callback('ln_payment_completed', payment_hash, chan.channel_id) util.trigger_callback('ln_payment_completed', payment_hash, chan.channel_id)
def payment_received(self, chan, payment_hash: bytes): def payment_received(self, chan, payment_hash: bytes):
self.set_payment_status(payment_hash, PR_PAID) self.set_payment_status(payment_hash, PR_PAID)
self.network.trigger_callback('request_status', payment_hash.hex(), PR_PAID) util.trigger_callback('request_status', payment_hash.hex(), PR_PAID)
self.network.trigger_callback('ln_payment_completed', payment_hash, chan.channel_id) util.trigger_callback('ln_payment_completed', payment_hash, chan.channel_id)
async def _calc_routing_hints_for_invoice(self, amount_sat): async def _calc_routing_hints_for_invoice(self, amount_sat):
"""calculate routing hints (BOLT-11 'r' field)""" """calculate routing hints (BOLT-11 'r' field)"""
@ -1251,8 +1251,8 @@ class LNWallet(LNWorker):
self.channels.pop(chan_id) self.channels.pop(chan_id)
self.db.get('channels').pop(chan_id.hex()) self.db.get('channels').pop(chan_id.hex())
self.network.trigger_callback('channels_updated', self.wallet) util.trigger_callback('channels_updated', self.wallet)
self.network.trigger_callback('wallet_updated', self.wallet) util.trigger_callback('wallet_updated', self.wallet)
@ignore_exceptions @ignore_exceptions
@log_exceptions @log_exceptions
@ -1355,7 +1355,7 @@ class LNBackups(Logger):
self.channel_backups[bfh(channel_id)] = ChannelBackup(cb, sweep_address=self.sweep_address, lnworker=self) self.channel_backups[bfh(channel_id)] = ChannelBackup(cb, sweep_address=self.sweep_address, lnworker=self)
def channel_state_changed(self, chan): def channel_state_changed(self, chan):
self.network.trigger_callback('channel', chan) util.trigger_callback('channel', chan)
def peer_closed(self, chan): def peer_closed(self, chan):
pass pass
@ -1389,7 +1389,7 @@ class LNBackups(Logger):
d[channel_id] = cb_storage d[channel_id] = cb_storage
self.channel_backups[bfh(channel_id)] = cb = ChannelBackup(cb_storage, sweep_address=self.sweep_address, lnworker=self) self.channel_backups[bfh(channel_id)] = cb = ChannelBackup(cb_storage, sweep_address=self.sweep_address, lnworker=self)
self.wallet.save_db() self.wallet.save_db()
self.network.trigger_callback('channels_updated', self.wallet) util.trigger_callback('channels_updated', self.wallet)
self.lnwatcher.add_channel(cb.funding_outpoint.to_str(), cb.get_funding_address()) self.lnwatcher.add_channel(cb.funding_outpoint.to_str(), cb.get_funding_address())
def remove_channel_backup(self, channel_id): def remove_channel_backup(self, channel_id):
@ -1399,7 +1399,7 @@ class LNBackups(Logger):
d.pop(channel_id.hex()) d.pop(channel_id.hex())
self.channel_backups.pop(channel_id) self.channel_backups.pop(channel_id)
self.wallet.save_db() self.wallet.save_db()
self.network.trigger_callback('channels_updated', self.wallet) util.trigger_callback('channels_updated', self.wallet)
@log_exceptions @log_exceptions
async def request_force_close(self, channel_id): async def request_force_close(self, channel_id):

47
electrum/network.py

@ -278,7 +278,6 @@ class Network(Logger):
# locks # locks
self.restart_lock = asyncio.Lock() self.restart_lock = asyncio.Lock()
self.bhi_lock = asyncio.Lock() self.bhi_lock = asyncio.Lock()
self.callback_lock = threading.Lock()
self.recent_servers_lock = threading.RLock() # <- re-entrant self.recent_servers_lock = threading.RLock() # <- re-entrant
self.interfaces_lock = threading.Lock() # for mutating/iterating self.interfaces self.interfaces_lock = threading.Lock() # for mutating/iterating self.interfaces
@ -288,8 +287,6 @@ class Network(Logger):
self.banner = '' self.banner = ''
self.donation_address = '' self.donation_address = ''
self.relay_fee = None # type: Optional[int] self.relay_fee = None # type: Optional[int]
# callbacks set by the GUI
self.callbacks = defaultdict(list) # note: needs self.callback_lock
dir_path = os.path.join(self.config.path, 'certs') dir_path = os.path.join(self.config.path, 'certs')
util.make_dir(dir_path) util.make_dir(dir_path)
@ -332,7 +329,7 @@ class Network(Logger):
from . import channel_db from . import channel_db
self.channel_db = channel_db.ChannelDB(self) self.channel_db = channel_db.ChannelDB(self)
self.path_finder = lnrouter.LNPathFinder(self.channel_db) self.path_finder = lnrouter.LNPathFinder(self.channel_db)
self.lngossip = lnworker.LNGossip(self) self.lngossip = lnworker.LNGossip()
self.lngossip.start_network(self) self.lngossip.start_network(self)
def run_from_another_thread(self, coro, *, timeout=None): def run_from_another_thread(self, coro, *, timeout=None):
@ -350,27 +347,6 @@ class Network(Logger):
return func(self, *args, **kwargs) return func(self, *args, **kwargs)
return func_wrapper return func_wrapper
def register_callback(self, callback, events):
with self.callback_lock:
for event in events:
self.callbacks[event].append(callback)
def unregister_callback(self, callback):
with self.callback_lock:
for callbacks in self.callbacks.values():
if callback in callbacks:
callbacks.remove(callback)
def trigger_callback(self, event, *args):
with self.callback_lock:
callbacks = self.callbacks[event][:]
for callback in callbacks:
# FIXME: if callback throws, we will lose the traceback
if asyncio.iscoroutinefunction(callback):
asyncio.run_coroutine_threadsafe(callback(event, *args), self.asyncio_loop)
else:
self.asyncio_loop.call_soon_threadsafe(callback, event, *args)
def _read_recent_servers(self): def _read_recent_servers(self):
if not self.config.path: if not self.config.path:
return [] return []
@ -481,9 +457,9 @@ class Network(Logger):
def notify(self, key): def notify(self, key):
if key in ['status', 'updated']: if key in ['status', 'updated']:
self.trigger_callback(key) util.trigger_callback(key)
else: else:
self.trigger_callback(key, self.get_status_value(key)) util.trigger_callback(key, self.get_status_value(key))
def get_parameters(self) -> NetworkParameters: def get_parameters(self) -> NetworkParameters:
host, port, protocol = deserialize_server(self.default_server) host, port, protocol = deserialize_server(self.default_server)
@ -574,7 +550,7 @@ class Network(Logger):
self.proxy = proxy self.proxy = proxy
dns_hacks.configure_dns_depending_on_proxy(bool(proxy)) dns_hacks.configure_dns_depending_on_proxy(bool(proxy))
self.logger.info(f'setting proxy {proxy}') self.logger.info(f'setting proxy {proxy}')
self.trigger_callback('proxy_set', self.proxy) util.trigger_callback('proxy_set', self.proxy)
@log_exceptions @log_exceptions
async def set_parameters(self, net_params: NetworkParameters): async def set_parameters(self, net_params: NetworkParameters):
@ -700,12 +676,13 @@ class Network(Logger):
blockchain_updated = i.blockchain != self.blockchain() blockchain_updated = i.blockchain != self.blockchain()
self.interface = i self.interface = i
await i.taskgroup.spawn(self._request_server_info(i)) await i.taskgroup.spawn(self._request_server_info(i))
self.trigger_callback('default_server_changed') util.trigger_callback('default_server_changed')
self.default_server_changed_event.set() self.default_server_changed_event.set()
self.default_server_changed_event.clear() self.default_server_changed_event.clear()
self._set_status('connected') self._set_status('connected')
self.trigger_callback('network_updated') util.trigger_callback('network_updated')
if blockchain_updated: self.trigger_callback('blockchain_updated') if blockchain_updated:
util.trigger_callback('blockchain_updated')
async def _close_interface(self, interface: Interface): async def _close_interface(self, interface: Interface):
if interface: if interface:
@ -734,7 +711,7 @@ class Network(Logger):
if server == self.default_server: if server == self.default_server:
self._set_status('disconnected') self._set_status('disconnected')
await self._close_interface(interface) await self._close_interface(interface)
self.trigger_callback('network_updated') util.trigger_callback('network_updated')
def get_network_timeout_seconds(self, request_type=NetworkTimeout.Generic) -> int: def get_network_timeout_seconds(self, request_type=NetworkTimeout.Generic) -> int:
if self.oneserver and not self.auto_connect: if self.oneserver and not self.auto_connect:
@ -767,7 +744,7 @@ class Network(Logger):
await self.switch_to_interface(server) await self.switch_to_interface(server)
self._add_recent_server(server) self._add_recent_server(server)
self.trigger_callback('network_updated') util.trigger_callback('network_updated')
def check_interface_against_healthy_spread_of_connected_servers(self, iface_to_check) -> bool: def check_interface_against_healthy_spread_of_connected_servers(self, iface_to_check) -> bool:
# main interface is exempt. this makes switching servers easier # main interface is exempt. this makes switching servers easier
@ -1152,7 +1129,7 @@ class Network(Logger):
self.logger.info("taskgroup stopped.") self.logger.info("taskgroup stopped.")
asyncio.run_coroutine_threadsafe(main(), self.asyncio_loop) asyncio.run_coroutine_threadsafe(main(), self.asyncio_loop)
self.trigger_callback('network_updated') util.trigger_callback('network_updated')
def start(self, jobs: Iterable = None): def start(self, jobs: Iterable = None):
"""Schedule starting the network, along with the given job co-routines. """Schedule starting the network, along with the given job co-routines.
@ -1176,7 +1153,7 @@ class Network(Logger):
self.connecting.clear() self.connecting.clear()
self.server_queue = None self.server_queue = None
if not full_shutdown: if not full_shutdown:
self.trigger_callback('network_updated') util.trigger_callback('network_updated')
def stop(self): def stop(self):
assert self._loop_thread != threading.current_thread(), 'must not be called from network thread' assert self._loop_thread != threading.current_thread(), 'must not be called from network thread'

5
electrum/synchronizer.py

@ -30,6 +30,7 @@ import logging
from aiorpcx import TaskGroup, run_in_thread, RPCError from aiorpcx import TaskGroup, run_in_thread, RPCError
from . import util
from .transaction import Transaction, PartialTransaction from .transaction import Transaction, PartialTransaction
from .util import bh2u, make_aiohttp_session, NetworkJobOnDefaultServer from .util import bh2u, make_aiohttp_session, NetworkJobOnDefaultServer
from .bitcoin import address_to_scripthash, is_address from .bitcoin import address_to_scripthash, is_address
@ -227,7 +228,7 @@ class Synchronizer(SynchronizerBase):
self.wallet.receive_tx_callback(tx_hash, tx, tx_height) self.wallet.receive_tx_callback(tx_hash, tx, tx_height)
self.logger.info(f"received tx {tx_hash} height: {tx_height} bytes: {len(raw_tx)}") self.logger.info(f"received tx {tx_hash} height: {tx_height} bytes: {len(raw_tx)}")
# callbacks # callbacks
self.wallet.network.trigger_callback('new_transaction', self.wallet, tx) util.trigger_callback('new_transaction', self.wallet, tx)
async def main(self): async def main(self):
self.wallet.set_up_to_date(False) self.wallet.set_up_to_date(False)
@ -252,7 +253,7 @@ class Synchronizer(SynchronizerBase):
if up_to_date: if up_to_date:
self._reset_request_counters() self._reset_request_counters()
self.wallet.set_up_to_date(up_to_date) self.wallet.set_up_to_date(up_to_date)
self.wallet.network.trigger_callback('wallet_updated', self.wallet) util.trigger_callback('wallet_updated', self.wallet)
class Notifier(SynchronizerBase): class Notifier(SynchronizerBase):

40
electrum/util.py

@ -1130,7 +1130,7 @@ class NetworkJobOnDefaultServer(Logger):
self._restart_lock = asyncio.Lock() self._restart_lock = asyncio.Lock()
self._reset() self._reset()
asyncio.run_coroutine_threadsafe(self._restart(), network.asyncio_loop) asyncio.run_coroutine_threadsafe(self._restart(), network.asyncio_loop)
network.register_callback(self._restart, ['default_server_changed']) register_callback(self._restart, ['default_server_changed'])
def _reset(self): def _reset(self):
"""Initialise fields. Called every time the underlying """Initialise fields. Called every time the underlying
@ -1304,3 +1304,41 @@ def randrange(bound: int) -> int:
"""Return a random integer k such that 1 <= k < bound, uniformly """Return a random integer k such that 1 <= k < bound, uniformly
distributed across that range.""" distributed across that range."""
return ecdsa.util.randrange(bound) return ecdsa.util.randrange(bound)
class CallbackManager:
# callbacks set by the GUI
def __init__(self):
self.callback_lock = threading.Lock()
self.callbacks = defaultdict(list) # note: needs self.callback_lock
self.asyncio_loop = None
def register_callback(self, callback, events):
with self.callback_lock:
for event in events:
self.callbacks[event].append(callback)
def unregister_callback(self, callback):
with self.callback_lock:
for callbacks in self.callbacks.values():
if callback in callbacks:
callbacks.remove(callback)
def trigger_callback(self, event, *args):
if self.asyncio_loop is None:
self.asyncio_loop = asyncio.get_event_loop()
assert self.asyncio_loop.is_running(), "event loop not running"
with self.callback_lock:
callbacks = self.callbacks[event][:]
for callback in callbacks:
# FIXME: if callback throws, we will lose the traceback
if asyncio.iscoroutinefunction(callback):
asyncio.run_coroutine_threadsafe(callback(event, *args), self.asyncio_loop)
else:
self.asyncio_loop.call_soon_threadsafe(callback, event, *args)
callback_mgr = CallbackManager()
trigger_callback = callback_mgr.trigger_callback
register_callback = callback_mgr.register_callback
unregister_callback = callback_mgr.unregister_callback

Loading…
Cancel
Save