You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

751 lines
29 KiB

import asyncio
import queue
import threading
import time
from typing import TYPE_CHECKING, Optional, Tuple
from functools import partial
from PyQt5.QtCore import pyqtProperty, pyqtSignal, pyqtSlot, QObject, QTimer, QMetaObject, Qt
from electrum import bitcoin
from electrum.i18n import _
from electrum.invoices import InvoiceError, PR_DEFAULT_EXPIRATION_WHEN_CREATING, PR_PAID
from electrum.logging import get_logger
from electrum.network import TxBroadcastError, BestEffortRequestFailed
from electrum.transaction import PartialTxOutput
from electrum.util import (parse_max_spend, InvalidPassword, event_listener)
from electrum.plugin import run_hook
from electrum.wallet import Multisig_Wallet
from electrum.crypto import pw_decode_with_version_and_mac
from .auth import AuthMixin, auth_protect
from .qeaddresslistmodel import QEAddressListModel
from .qechannellistmodel import QEChannelListModel
from .qeinvoicelistmodel import QEInvoiceListModel, QERequestListModel
from .qetransactionlistmodel import QETransactionListModel
from .qetypes import QEAmount
from .util import QtEventListener, qt_event_listener
if TYPE_CHECKING:
from electrum.wallet import Abstract_Wallet
class QEWallet(AuthMixin, QObject, QtEventListener):
__instances = []
# this factory method should be used to instantiate QEWallet
# so we have only one QEWallet for each electrum.wallet
@classmethod
def getInstanceFor(cls, wallet):
for i in cls.__instances:
if i.wallet == wallet:
return i
i = QEWallet(wallet)
cls.__instances.append(i)
return i
_logger = get_logger(__name__)
# emitted when wallet wants to display a user notification
# actual presentation should be handled on app or window level
userNotify = pyqtSignal(object, object)
# shared signal for many static wallet properties
dataChanged = pyqtSignal()
requestStatusChanged = pyqtSignal([str,int], arguments=['key','status'])
requestCreateSuccess = pyqtSignal([str], arguments=['key'])
requestCreateError = pyqtSignal([str,str], arguments=['code','error'])
invoiceStatusChanged = pyqtSignal([str,int], arguments=['key','status'])
invoiceCreateSuccess = pyqtSignal()
invoiceCreateError = pyqtSignal([str,str], arguments=['code','error'])
paymentSucceeded = pyqtSignal([str], arguments=['key'])
paymentFailed = pyqtSignal([str,str], arguments=['key','reason'])
requestNewPassword = pyqtSignal()
transactionSigned = pyqtSignal([str], arguments=['txid'])
broadcastSucceeded = pyqtSignal([str], arguments=['txid'])
broadcastFailed = pyqtSignal([str,str,str], arguments=['txid','code','reason'])
importChannelBackupFailed = pyqtSignal([str], arguments=['message'])
labelsUpdated = pyqtSignal()
otpRequested = pyqtSignal()
otpSuccess = pyqtSignal()
otpFailed = pyqtSignal([str,str], arguments=['code','message'])
peersUpdated = pyqtSignal()
seedRetrieved = pyqtSignal()
_network_signal = pyqtSignal(str, object)
def __init__(self, wallet: 'Abstract_Wallet', parent=None):
super().__init__(parent)
self.wallet = wallet
self._logger = get_logger(f'{__name__}.[{wallet}]')
self._synchronizing = False
self._synchronizing_progress = ''
self._historyModel = None
self._addressModel = None
self._requestModel = None
self._invoiceModel = None
self._channelModel = None
self._lightningbalance = QEAmount()
self._confirmedbalance = QEAmount()
self._unconfirmedbalance = QEAmount()
self._frozenbalance = QEAmount()
self._totalbalance = QEAmount()
self._lightningcanreceive = QEAmount()
self._lightningcansend = QEAmount()
self._seed = ''
self.tx_notification_queue = queue.Queue()
self.tx_notification_last_time = 0
self.notification_timer = QTimer(self)
self.notification_timer.setSingleShot(False)
self.notification_timer.setInterval(500) # msec
self.notification_timer.timeout.connect(self.notify_transactions)
self.sync_progress_timer = QTimer(self)
self.sync_progress_timer.setSingleShot(False)
self.sync_progress_timer.setInterval(2000)
self.sync_progress_timer.timeout.connect(self.update_sync_progress)
# post-construction init in GUI thread
# QMetaObject.invokeMethod(self, 'qt_init', Qt.QueuedConnection)
# To avoid leaking references to "self" that prevent the
# window from being GC-ed when closed, callbacks should be
# methods of this class only, and specifically not be
# partials, lambdas or methods of subobjects. Hence...
self.register_callbacks()
self.destroyed.connect(lambda: self.on_destroy())
self.synchronizing = not wallet.is_up_to_date()
synchronizingChanged = pyqtSignal()
@pyqtProperty(bool, notify=synchronizingChanged)
def synchronizing(self):
return self._synchronizing
@synchronizing.setter
def synchronizing(self, synchronizing):
if self._synchronizing != synchronizing:
self._logger.debug(f'SYNC {self._synchronizing} -> {synchronizing}')
self._synchronizing = synchronizing
self.synchronizingChanged.emit()
if synchronizing:
if not self.sync_progress_timer.isActive():
self.update_sync_progress()
self.sync_progress_timer.start()
else:
self.sync_progress_timer.stop()
synchronizingProgressChanged = pyqtSignal()
@pyqtProperty(str, notify=synchronizingProgressChanged)
def synchronizingProgress(self):
return self._synchronizing_progress
@synchronizingProgress.setter
def synchronizingProgress(self, progress):
if self._synchronizing_progress != progress:
self._synchronizing_progress = progress
self._logger.info(progress)
self.synchronizingProgressChanged.emit()
@qt_event_listener
def on_event_request_status(self, wallet, key, status):
if wallet == self.wallet:
self._logger.debug('request status %d for key %s' % (status, key))
self.requestStatusChanged.emit(key, status)
if status == PR_PAID:
# might be new incoming LN payment, update history
# TODO: only update if it was paid over lightning,
# and even then, we can probably just add the payment instead
# of recreating the whole history (expensive)
self.historyModel.init_model(True)
@event_listener
def on_event_invoice_status(self, wallet, key, status):
if wallet == self.wallet:
self._logger.debug(f'invoice status update for key {key} to {status}')
self.invoiceStatusChanged.emit(key, status)
@qt_event_listener
def on_event_new_transaction(self, wallet, tx):
if wallet == self.wallet:
self._logger.info(f'new transaction {tx.txid()}')
self.add_tx_notification(tx)
self.addressModel.setDirty()
self.historyModel.setDirty() # assuming wallet.is_up_to_date triggers after
@qt_event_listener
def on_event_wallet_updated(self, wallet):
if wallet == self.wallet:
self._logger.debug('wallet_updated')
self.balanceChanged.emit()
self.synchronizing = not wallet.is_up_to_date()
if not self.synchronizing:
self.historyModel.init_model() # refresh if dirty
@event_listener
def on_event_channel(self, wallet, channel):
if wallet == self.wallet:
self.balanceChanged.emit()
self.peersUpdated.emit()
@event_listener
def on_event_channels_updated(self, wallet):
if wallet == self.wallet:
self.balanceChanged.emit()
self.peersUpdated.emit()
@qt_event_listener
def on_event_payment_succeeded(self, wallet, key):
if wallet == self.wallet:
self.paymentSucceeded.emit(key)
self.historyModel.init_model(True) # TODO: be less dramatic
@event_listener
def on_event_payment_failed(self, wallet, key, reason):
if wallet == self.wallet:
self.paymentFailed.emit(key, reason)
def on_destroy(self):
self.unregister_callbacks()
def add_tx_notification(self, tx):
self._logger.debug('new transaction event')
self.tx_notification_queue.put(tx)
if not self.notification_timer.isActive():
self._logger.debug('starting wallet notification timer')
self.notification_timer.start()
def notify_transactions(self):
if self.tx_notification_queue.qsize() == 0:
self._logger.debug('queue empty, stopping wallet notification timer')
self.notification_timer.stop()
return
if not self.wallet.is_up_to_date():
return # no notifications while syncing
now = time.time()
rate_limit = 20 # seconds
if self.tx_notification_last_time + rate_limit > now:
return
self.tx_notification_last_time = now
self._logger.info("Notifying app about new transactions")
txns = []
while True:
try:
txns.append(self.tx_notification_queue.get_nowait())
except queue.Empty:
break
config = self.wallet.config
# Combine the transactions if there are at least three
if len(txns) >= 3:
total_amount = 0
for tx in txns:
tx_wallet_delta = self.wallet.get_wallet_delta(tx)
if not tx_wallet_delta.is_relevant:
continue
total_amount += tx_wallet_delta.delta
self.userNotify.emit(self.wallet, _("{} new transactions: Total amount received in the new transactions {}").format(len(txns), config.format_amount_and_units(total_amount)))
else:
for tx in txns:
tx_wallet_delta = self.wallet.get_wallet_delta(tx)
if not tx_wallet_delta.is_relevant:
continue
self.userNotify.emit(self.wallet,
_("New transaction: {}").format(config.format_amount_and_units(tx_wallet_delta.delta)))
def update_sync_progress(self):
if self.wallet.network.is_connected():
num_sent, num_answered = self.wallet.adb.get_history_sync_state_details()
self.synchronizingProgress = \
("{} ({}/{})".format(_("Synchronizing..."), num_answered, num_sent))
historyModelChanged = pyqtSignal()
@pyqtProperty(QETransactionListModel, notify=historyModelChanged)
def historyModel(self):
if self._historyModel is None:
self._historyModel = QETransactionListModel(self.wallet)
return self._historyModel
addressModelChanged = pyqtSignal()
@pyqtProperty(QEAddressListModel, notify=addressModelChanged)
def addressModel(self):
if self._addressModel is None:
self._addressModel = QEAddressListModel(self.wallet)
return self._addressModel
requestModelChanged = pyqtSignal()
@pyqtProperty(QERequestListModel, notify=requestModelChanged)
def requestModel(self):
if self._requestModel is None:
self._requestModel = QERequestListModel(self.wallet)
return self._requestModel
invoiceModelChanged = pyqtSignal()
@pyqtProperty(QEInvoiceListModel, notify=invoiceModelChanged)
def invoiceModel(self):
if self._invoiceModel is None:
self._invoiceModel = QEInvoiceListModel(self.wallet)
return self._invoiceModel
channelModelChanged = pyqtSignal()
@pyqtProperty(QEChannelListModel, notify=channelModelChanged)
def channelModel(self):
if self._channelModel is None:
self._channelModel = QEChannelListModel(self.wallet)
return self._channelModel
nameChanged = pyqtSignal()
@pyqtProperty(str, notify=nameChanged)
def name(self):
return self.wallet.basename()
isLightningChanged = pyqtSignal()
@pyqtProperty(bool, notify=isLightningChanged)
def isLightning(self):
return bool(self.wallet.lnworker)
billingInfoChanged = pyqtSignal()
@pyqtProperty('QVariantMap', notify=billingInfoChanged)
def billingInfo(self):
return {} if self.wallet.wallet_type != '2fa' else self.wallet.billing_info
@pyqtProperty(bool, notify=dataChanged)
def canHaveLightning(self):
return self.wallet.can_have_lightning()
@pyqtProperty(str, notify=dataChanged)
def walletType(self):
return self.wallet.wallet_type
@pyqtProperty(bool, notify=dataChanged)
def hasSeed(self):
return self.wallet.has_seed()
@pyqtProperty(str, notify=dataChanged)
def seed(self):
return self._seed
@pyqtProperty(str, notify=dataChanged)
def txinType(self):
if self.wallet.wallet_type == 'imported':
return self.wallet.txin_type
return self.wallet.get_txin_type(self.wallet.dummy_address())
@pyqtProperty(bool, notify=dataChanged)
def isWatchOnly(self):
return self.wallet.is_watching_only()
@pyqtProperty(bool, notify=dataChanged)
def isDeterministic(self):
return self.wallet.is_deterministic()
@pyqtProperty(bool, notify=dataChanged)
def isEncrypted(self):
return self.wallet.storage.is_encrypted()
@pyqtProperty(bool, notify=dataChanged)
def isHardware(self):
return self.wallet.storage.is_encrypted_with_hw_device()
@pyqtProperty('QVariantList', notify=dataChanged)
def keystores(self):
result = []
for k in self.wallet.get_keystores():
result.append({
'keystore_type': k.type,
'watch_only': k.is_watching_only(),
'derivation_prefix': (k.get_derivation_prefix() if k.is_deterministic() else '') or '',
'master_pubkey': (k.get_master_public_key() if k.is_deterministic() else '') or '',
'fingerprint': (k.get_root_fingerprint() if k.is_deterministic() else '') or '',
'num_imported': len(k.keypairs) if k.can_import() else 0,
})
return result
@pyqtProperty(str, notify=dataChanged)
def lightningNodePubkey(self):
return self.wallet.lnworker.node_keypair.pubkey.hex() if self.wallet.lnworker else ''
@pyqtProperty(str, notify=dataChanged)
def derivationPrefix(self):
keystores = self.wallet.get_keystores()
if len(keystores) > 1:
self._logger.debug('multiple keystores not supported yet')
if len(keystores) == 0:
self._logger.debug('no keystore')
return ''
if not self.isDeterministic:
return ''
return keystores[0].get_derivation_prefix()
@pyqtProperty(str, notify=dataChanged)
def masterPubkey(self):
return self.wallet.get_master_public_key()
@pyqtProperty(bool, notify=dataChanged)
def canSignWithoutServer(self):
return self.wallet.can_sign_without_server() if self.wallet.wallet_type == '2fa' else True
@pyqtProperty(bool, notify=dataChanged)
def canSignWithoutCosigner(self):
if isinstance(self.wallet, Multisig_Wallet):
if self.wallet.wallet_type == '2fa': # 2fa is multisig, but it handles cosigning itself
return True
return self.wallet.m == 1
return True
balanceChanged = pyqtSignal()
@pyqtProperty(QEAmount, notify=balanceChanged)
def frozenBalance(self):
c, u, x = self.wallet.get_frozen_balance()
self._frozenbalance.satsInt = c+x
return self._frozenbalance
@pyqtProperty(QEAmount, notify=balanceChanged)
def unconfirmedBalance(self):
self._unconfirmedbalance.satsInt = self.wallet.get_balance()[1]
return self._unconfirmedbalance
@pyqtProperty(QEAmount, notify=balanceChanged)
def confirmedBalance(self):
c, u, x = self.wallet.get_balance()
self._confirmedbalance.satsInt = c+x
return self._confirmedbalance
@pyqtProperty(QEAmount, notify=balanceChanged)
def lightningBalance(self):
if self.isLightning:
self._lightningbalance.satsInt = int(self.wallet.lnworker.get_balance())
return self._lightningbalance
@pyqtProperty(QEAmount, notify=balanceChanged)
def totalBalance(self):
total = self.confirmedBalance.satsInt + self.lightningBalance.satsInt
self._totalbalance.satsInt = total
return self._totalbalance
@pyqtProperty(QEAmount, notify=balanceChanged)
def lightningCanSend(self):
if self.isLightning:
self._lightningcansend.satsInt = int(self.wallet.lnworker.num_sats_can_send())
return self._lightningcansend
@pyqtProperty(QEAmount, notify=balanceChanged)
def lightningCanReceive(self):
if self.isLightning:
self._lightningcanreceive.satsInt = int(self.wallet.lnworker.num_sats_can_receive())
return self._lightningcanreceive
@pyqtProperty(int, notify=peersUpdated)
def lightningNumPeers(self):
if self.isLightning:
return self.wallet.lnworker.num_peers()
return 0
@pyqtSlot()
def enableLightning(self):
self.wallet.init_lightning(password=None) # TODO pass password if needed
self.isLightningChanged.emit()
@pyqtSlot(str, int, int, bool)
def send_onchain(self, address, amount, fee=None, rbf=False):
self._logger.info('send_onchain: %s %d' % (address,amount))
coins = self.wallet.get_spendable_coins(None)
if not bitcoin.is_address(address):
self._logger.warning('Invalid Bitcoin Address: ' + address)
return False
outputs = [PartialTxOutput.from_address_and_value(address, amount)]
self._logger.info(str(outputs))
output_values = [x.value for x in outputs]
if any(parse_max_spend(outval) for outval in output_values):
output_value = '!'
else:
output_value = sum(output_values)
self._logger.info(str(output_value))
# see qt/confirm_tx_dialog qt/main_window
tx = self.wallet.make_unsigned_transaction(coins=coins,outputs=outputs, fee=None)
self._logger.info(str(tx.to_json()))
tx.set_rbf(True)
self.sign(tx, broadcast=True)
@auth_protect
def sign(self, tx, *, broadcast: bool = False):
sign_hook = run_hook('tc_sign_wrapper', self.wallet, tx, partial(self.on_sign_complete, broadcast),
self.on_sign_failed)
if sign_hook:
self.do_sign(tx, False)
self._logger.debug('plugin needs to sign tx too')
sign_hook(tx)
return
self.do_sign(tx, broadcast)
def do_sign(self, tx, broadcast):
tx = self.wallet.sign_transaction(tx, self.password)
if tx is None:
self._logger.info('did not sign')
return
txid = tx.txid()
self._logger.debug(f'do_sign(), txid={txid}')
self.transactionSigned.emit(txid)
if not tx.is_complete():
self._logger.debug('tx not complete')
broadcast = False
if broadcast:
self.broadcast(tx)
else:
# not broadcasted, so add to history now
self.historyModel.init_model(True)
# this assumes a 2fa wallet, but there are no other tc_sign_wrapper hooks, so that's ok
def on_sign_complete(self, broadcast, tx):
self.otpSuccess.emit()
if broadcast:
self.broadcast(tx)
def on_sign_failed(self, error):
self.otpFailed.emit('error', error)
def request_otp(self, on_submit):
self._otp_on_submit = on_submit
self.otpRequested.emit()
@pyqtSlot(str)
def submitOtp(self, otp):
def submit_otp_task():
self._otp_on_submit(otp)
threading.Thread(target=submit_otp_task, daemon=True).start()
def broadcast(self, tx):
assert tx.is_complete()
def broadcast_thread():
try:
self._logger.info('running broadcast in thread')
self.wallet.network.run_from_another_thread(self.wallet.network.broadcast_transaction(tx))
except TxBroadcastError as e:
self._logger.error(repr(e))
self.broadcastFailed.emit(tx.txid(),'',str(e))
except BestEffortRequestFailed as e:
self._logger.error(repr(e))
self.broadcastFailed.emit(tx.txid(),'',str(e))
else:
self._logger.info('broadcast success')
self.broadcastSucceeded.emit(tx.txid())
self.historyModel.requestRefresh.emit() # via qt thread
threading.Thread(target=broadcast_thread, daemon=True).start()
#TODO: properly catch server side errors, e.g. bad-txns-inputs-missingorspent
paymentAuthRejected = pyqtSignal()
def ln_auth_rejected(self):
self.paymentAuthRejected.emit()
@pyqtSlot(str)
@auth_protect(reject='ln_auth_rejected')
def pay_lightning_invoice(self, invoice_key):
self._logger.debug('about to pay LN')
invoice = self.wallet.get_invoice(invoice_key)
assert(invoice)
assert(invoice.lightning_invoice)
amount_msat = invoice.get_amount_msat()
def pay_thread():
try:
coro = self.wallet.lnworker.pay_invoice(invoice.lightning_invoice, amount_msat=amount_msat)
fut = asyncio.run_coroutine_threadsafe(coro, self.wallet.network.asyncio_loop)
fut.result()
except Exception as e:
self.paymentFailed.emit(invoice.get_id(), repr(e))
threading.Thread(target=pay_thread, daemon=True).start()
def create_bitcoin_request(self, amount: int, message: str, expiration: int, *, ignore_gap: bool = False, reuse_address: bool = False) -> Optional[Tuple]:
addr = self.wallet.get_unused_address()
if addr is None:
if not self.wallet.is_deterministic(): # imported wallet
if not reuse_address:
msg = [
_('No more addresses in your wallet.'), ' ',
_('You are using a non-deterministic wallet, which cannot create new addresses.'), ' ',
_('If you want to create new addresses, use a deterministic wallet instead.'), '\n\n',
_('Creating a new payment request will reuse one of your addresses and overwrite an existing request. Continue anyway?'),
]
self.requestCreateError.emit('non-deterministic',''.join(msg))
return
addr = self.wallet.get_receiving_address()
else: # deterministic wallet
if not ignore_gap:
self.requestCreateError.emit('gaplimit',_("Warning: The next address will not be recovered automatically if you restore your wallet from seed; you may need to add it manually.\n\nThis occurs because you have too many unused addresses in your wallet. To avoid this situation, use the existing addresses first.\n\nCreate anyway?"))
return
addr = self.wallet.create_new_address(False)
req_key = self.wallet.create_request(amount, message, expiration, addr)
self._logger.debug(f'created request with key {req_key}')
#try:
#self.wallet.add_payment_request(req)
#except Exception as e:
#self.logger.exception('Error adding payment request')
#self.requestCreateError.emit('fatal',_('Error adding payment request') + ':\n' + repr(e))
#else:
## TODO: check this flow. Only if alias is defined in config. OpenAlias?
#pass
##self.sign_payment_request(addr)
return req_key, addr
@pyqtSlot(QEAmount, str, int)
@pyqtSlot(QEAmount, str, int, bool)
@pyqtSlot(QEAmount, str, int, bool, bool)
@pyqtSlot(QEAmount, str, int, bool, bool, bool)
def createRequest(self, amount: QEAmount, message: str, expiration: int, ignore_gap: bool = False, reuse_address: bool = False):
try:
if self.wallet.lnworker and self.wallet.lnworker.channels:
# TODO maybe show a warning if amount exceeds lnworker.num_sats_can_receive (as in kivy)
# TODO fallback address robustness
addr = self.wallet.get_unused_address()
key = self.wallet.create_request(amount.satsInt, message, expiration, addr)
else:
key, addr = self.create_bitcoin_request(amount.satsInt, message, expiration, ignore_gap=ignore_gap, reuse_address=reuse_address)
if not key:
return
self.addressModel.setDirty()
except InvoiceError as e:
self.requestCreateError.emit('fatal',_('Error creating payment request') + ':\n' + str(e))
return
assert key is not None
self.requestModel.add_invoice(self.wallet.get_request(key))
self.requestCreateSuccess.emit(key)
@pyqtSlot()
@pyqtSlot(bool)
@pyqtSlot(bool, bool)
def createDefaultRequest(self, ignore_gap: bool = False, reuse_address: bool = False):
try:
default_expiry = self.wallet.config.get('request_expiry', PR_DEFAULT_EXPIRATION_WHEN_CREATING)
if self.wallet.lnworker and self.wallet.lnworker.channels:
addr = self.wallet.get_unused_address()
# if addr is None, we ran out of addresses
if addr is None:
# TODO: remove oldest unpaid request having a fallback address and try again
pass
key = self.wallet.create_request(None, None, default_expiry, addr)
else:
req = self.create_bitcoin_request(None, None, default_expiry, ignore_gap=ignore_gap, reuse_address=reuse_address)
if not req:
return
key, addr = req
except InvoiceError as e:
self.requestCreateError.emit('fatal',_('Error creating payment request') + ':\n' + str(e))
return
assert key is not None
self.requestModel.add_invoice(self.wallet.get_request(key))
self.requestCreateSuccess.emit(key)
@pyqtSlot(str)
def delete_request(self, key: str):
self._logger.debug('delete req %s' % key)
self.wallet.delete_request(key)
self.requestModel.delete_invoice(key)
@pyqtSlot(str, result='QVariant')
def get_request(self, key: str):
return self.requestModel.get_model_invoice(key)
@pyqtSlot(str)
def delete_invoice(self, key: str):
self._logger.debug('delete inv %s' % key)
self.wallet.delete_invoice(key)
self.invoiceModel.delete_invoice(key)
@pyqtSlot(str, result='QVariant')
def get_invoice(self, key: str):
return self.invoiceModel.get_model_invoice(key)
@pyqtSlot(str, result=bool)
def verify_password(self, password):
try:
self.wallet.storage.check_password(password)
return True
except InvalidPassword as e:
return False
@pyqtSlot(str)
def set_password(self, password):
if password == '':
password = None
storage = self.wallet.storage
# HW wallet not supported yet
if storage.is_encrypted_with_hw_device():
return
current_password = self.password if self.password != '' else None
try:
self._logger.info(f'PW change from {current_password} to {password}')
self.wallet.update_password(current_password, password, encrypt_storage=True)
self.password = password
except InvalidPassword as e:
self._logger.exception(repr(e))
@pyqtSlot(str)
def importAddresses(self, addresslist):
self.wallet.import_addresses(addresslist.split())
@pyqtSlot(str)
def importPrivateKeys(self, keyslist):
self.wallet.import_private_keys(keyslist.split(), self.password)
@pyqtSlot(str)
def importChannelBackup(self, backup_str):
try:
self.wallet.lnworker.import_channel_backup(backup_str)
except Exception as e:
self._logger.debug(f'could not import channel backup: {repr(e)}')
self.importChannelBackupFailed.emit(f'Failed to import backup:\n\n{str(e)}')
@pyqtSlot(str, result=bool)
def isValidChannelBackup(self, backup_str):
try:
assert backup_str.startswith('channel_backup:')
encrypted = backup_str[15:]
xpub = self.wallet.get_fingerprint()
decrypted = pw_decode_with_version_and_mac(encrypted, xpub)
return True
except Exception as e:
return False
@pyqtSlot()
def requestShowSeed(self):
self.retrieve_seed()
@auth_protect
def retrieve_seed(self):
try:
self._seed = self.wallet.get_seed(self.password)
self.seedRetrieved.emit()
except:
self._seed = ''
self.dataChanged.emit()