You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

123 lines
3.8 KiB

import asyncio
import concurrent
from PyQt5.QtCore import pyqtProperty, pyqtSignal, pyqtSlot
from PyQt5.QtCore import Qt, QAbstractListModel, QModelIndex, Q_ENUMS
from electrum import Network, keystore
from electrum.bip32 import BIP32Node
from electrum.bip39_recovery import account_discovery
from electrum.logging import get_logger
from .util import TaskThread
class QEBip39RecoveryListModel(QAbstractListModel):
_logger = get_logger(__name__)
class State:
Idle = -1
Scanning = 0
Success = 1
Failed = 2
Cancelled = 3
Q_ENUMS(State)
recoveryFailed = pyqtSignal()
stateChanged = pyqtSignal()
# userinfoChanged = pyqtSignal()
# define listmodel rolemap
_ROLE_NAMES=('description', 'derivation_path', 'script_type')
_ROLE_KEYS = range(Qt.UserRole, Qt.UserRole + len(_ROLE_NAMES))
_ROLE_MAP = dict(zip(_ROLE_KEYS, [bytearray(x.encode()) for x in _ROLE_NAMES]))
def __init__(self, config, parent=None):
super().__init__(parent)
self._accounts = []
self._thread = None
self._root_seed = None
self._state = QEBip39RecoveryListModel.State.Idle
def rowCount(self, index):
return len(self._accounts)
def roleNames(self):
return self._ROLE_MAP
def data(self, index, role):
account = self._accounts[index.row()]
role_index = role - Qt.UserRole
value = account[self._ROLE_NAMES[role_index]]
if isinstance(value, (bool, list, int, str)) or value is None:
return value
return str(value)
def clear(self):
self.beginResetModel()
self._accounts = []
self.endResetModel()
@pyqtProperty(int, notify=stateChanged)
def state(self):
return self._state
@state.setter
def state(self, state: State):
if state != self._state:
self._state = state
self.stateChanged.emit()
@pyqtSlot(str, str)
@pyqtSlot(str, str, str)
def startScan(self, wallet_type: str, seed: str, seed_extra_words: str = None):
if not seed or not wallet_type:
return
assert wallet_type == 'standard'
self._root_seed = keystore.bip39_to_seed(seed, seed_extra_words)
self.clear()
self._thread = TaskThread(self)
network = Network.get_instance()
coro = account_discovery(network, self.get_account_xpub)
self.state = QEBip39RecoveryListModel.State.Scanning
fut = asyncio.run_coroutine_threadsafe(coro, network.asyncio_loop)
self._thread.add(
fut.result,
on_success=self.on_recovery_success,
on_error=self.on_recovery_error,
cancel=fut.cancel,
)
def addAccount(self, account):
self._logger.debug(f'addAccount {account!r}')
self.beginInsertRows(QModelIndex(), len(self._accounts), len(self._accounts))
self._accounts.append(account)
self.endInsertRows()
def on_recovery_success(self, accounts):
self.state = QEBip39RecoveryListModel.State.Success
for account in accounts:
self.addAccount(account)
self._thread.stop()
def on_recovery_error(self, exc_info):
e = exc_info[1]
if isinstance(e, concurrent.futures.CancelledError):
self.state = QEBip39RecoveryListModel.State.Cancelled
return
self._logger.error(f"recovery error", exc_info=exc_info)
self.state = QEBip39RecoveryListModel.State.Failed
self._thread.stop()
def get_account_xpub(self, account_path):
root_node = BIP32Node.from_rootseed(self._root_seed, xtype='standard')
account_node = root_node.subkey_at_private_derivation(account_path)
account_xpub = account_node.to_xpub()
return account_xpub