You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

573 lines
24 KiB

import copy
import os
from typing import List, NamedTuple, Any, Dict, Optional
from electrum.logging import get_logger
from electrum.slip39 import EncryptedSeed
from electrum.storage import WalletStorage, StorageEncryptionVersion
from electrum.wallet_db import WalletDB
from electrum.bip32 import normalize_bip32_derivation, xpub_type
from electrum import keystore, mnemonic
from electrum import bitcoin
from electrum.mnemonic import is_any_2fa_seed_type
class WizardViewState(NamedTuple):
view: Optional[str]
wizard_data: Dict[str, Any]
params: Dict[str, Any]
class AbstractWizard:
# serve as a base for all UIs, so no qt
# encapsulate wizard state
# encapsulate navigation decisions, UI agnostic
# encapsulate stack, go backwards
# allow extend/override flow in subclasses e.g.
# - override: replace 'next' value to own fn
# - extend: add new keys to navmap, wire up flow by override
_logger = get_logger(__name__)
def __init__(self):
self.navmap = {}
self._current = WizardViewState(None, {}, {})
self._stack = [] # type: List[WizardViewState]
def navmap_merge(self, additional_navmap):
# NOTE: only merges one level deep. Deeper dict levels will overwrite
for k, v in additional_navmap.items():
if k in self.navmap:
self.navmap[k].update(v)
else:
self.navmap[k] = v
# from current view and wizard_data, resolve the new view
# returns WizardViewState tuple (view name, wizard_data, view params)
# view name is the string id of the view in the nav map
# wizard data is the (stacked) wizard data dict containing user input and choices
# view params are transient, meant for extra configuration of a view (e.g. info
# msg in a generic choice dialog)
# exception: stay on this view
def resolve_next(self, view, wizard_data) -> WizardViewState:
assert view
self._logger.debug(f'view={view}')
assert view in self.navmap
nav = self.navmap[view]
if 'accept' in nav:
# allow python scope to append to wizard_data before
# adding to stack or finishing
view_accept = nav['accept']
if callable(view_accept):
view_accept(wizard_data)
else:
raise Exception(f'accept handler for view {view} is not callable')
# make a clone for next view
wizard_data = copy.deepcopy(wizard_data)
is_finished = False
if 'next' not in nav:
# finished
is_finished = True
# self.finished(wizard_data)
# return WizardViewState(None, wizard_data, {})
new_view = WizardViewState(None, wizard_data, {})
else:
view_next = nav['next']
if isinstance(view_next, str):
# string literal
new_view = WizardViewState(view_next, wizard_data, {})
elif callable(view_next):
# handler fn based
nv = view_next(wizard_data)
self._logger.debug(repr(nv))
# append wizard_data and params if not returned
if isinstance(nv, str):
new_view = WizardViewState(nv, wizard_data, {})
elif len(nv) == 1:
new_view = WizardViewState(nv[0], wizard_data, {})
elif len(nv) == 2:
new_view = WizardViewState(nv[0], nv[1], {})
else:
new_view = nv
else:
raise Exception(f'next handler for view {view} is not callable nor a string literal')
self._logger.debug(f'resolve_next view is {new_view}')
self._stack.append(copy.deepcopy(self._current))
self._current = new_view
self.log_stack()
if is_finished:
self.finished(wizard_data)
return new_view
def resolve_prev(self):
self._current = self._stack.pop()
self._logger.debug(f'resolve_prev view is "{self._current.view}"')
self.log_stack()
return self._current
# check if this view is the final view
def is_last_view(self, view, wizard_data):
assert view
assert view in self.navmap
nav = self.navmap[view]
if 'last' not in nav:
return False
view_last = nav['last']
if isinstance(view_last, bool):
# bool literal
self._logger.debug(f'view "{view}" last: {view_last}')
return view_last
elif callable(view_last):
# handler fn based
is_last = view_last(wizard_data)
self._logger.debug(f'view "{view}" last: {is_last}')
return is_last
else:
raise Exception(f'last handler for view {view} is not callable nor a bool literal')
def finished(self, wizard_data):
self._logger.debug('finished.')
def reset(self):
self._stack = []
self._current = WizardViewState(None, {}, {})
def log_stack(self):
logstr = 'wizard stack:'
i = 0
for item in self._stack:
ssi = self.sanitize_stack_item(item.wizard_data)
logstr += f'\n{i}: {hex(id(item.wizard_data))} - {repr(ssi)}'
i += 1
sci = self.sanitize_stack_item(self._current.wizard_data)
logstr += f'\nc: {hex(id(self._current.wizard_data))} - {repr(sci)}'
self._logger.debug(logstr)
def sanitize_stack_item(self, _stack_item) -> dict:
sensitive_keys = ['seed', 'seed_extra_words', 'master_key', 'private_key_list', 'password']
def sanitize(_dict):
result = {}
for item in _dict:
if isinstance(_dict[item], dict):
result[item] = sanitize(_dict[item])
else:
if item in sensitive_keys:
result[item] = '<sensitive value removed>'
else:
result[item] = _dict[item]
return result
return sanitize(_stack_item)
class NewWalletWizard(AbstractWizard):
_logger = get_logger(__name__)
def __init__(self, daemon):
AbstractWizard.__init__(self)
self.navmap = {
'wallet_name': {
'next': 'wallet_type'
},
'wallet_type': {
'next': self.on_wallet_type
},
'keystore_type': {
'next': self.on_keystore_type
},
'create_seed': {
'next': 'confirm_seed'
},
'confirm_seed': {
'next': self.on_have_or_confirm_seed,
'accept': self.maybe_master_pubkey,
'last': lambda d: self.is_single_password() and not self.is_multisig(d)
},
'have_seed': {
'next': self.on_have_or_confirm_seed,
'accept': self.maybe_master_pubkey,
'last': lambda d: self.is_single_password() and not
(self.needs_derivation_path(d) or self.is_multisig(d))
},
'script_and_derivation': {
'next': lambda d: 'wallet_password' if not self.is_multisig(d) else 'multisig_cosigner_keystore',
'accept': self.maybe_master_pubkey,
'last': lambda d: self.is_single_password() and not self.is_multisig(d)
},
'have_master_key': {
'next': lambda d: 'wallet_password' if not self.is_multisig(d) else 'multisig_cosigner_keystore',
'accept': self.maybe_master_pubkey,
'last': lambda d: self.is_single_password() and not self.is_multisig(d)
},
'multisig': {
'next': 'keystore_type'
},
'multisig_cosigner_keystore': { # this view should set 'multisig_current_cosigner'
'next': self.on_cosigner_keystore_type
},
'multisig_cosigner_key': {
'next': lambda d: 'wallet_password' if self.last_cosigner(d) else 'multisig_cosigner_keystore',
'last': lambda d: self.is_single_password() and self.last_cosigner(d)
},
'multisig_cosigner_seed': {
'next': self.on_have_cosigner_seed,
'last': lambda d: self.is_single_password() and self.last_cosigner(d) and not self.needs_derivation_path(d)
},
'multisig_cosigner_script_and_derivation': {
'next': lambda d: 'wallet_password' if self.last_cosigner(d) else 'multisig_cosigner_keystore',
'last': lambda d: self.is_single_password() and self.last_cosigner(d)
},
'imported': {
'next': 'wallet_password',
'last': lambda d: self.is_single_password()
},
'wallet_password': {
'last': True
}
}
self._daemon = daemon
def start(self, initial_data=None):
if initial_data is None:
initial_data = {}
self.reset()
self._current = WizardViewState('wallet_name', initial_data, {})
return self._current
def is_single_password(self):
raise NotImplementedError()
# returns (sub)dict of current cosigner (or root if first)
def _current_cosigner(self, wizard_data):
wdata = wizard_data
if wizard_data['wallet_type'] == 'multisig' and 'multisig_current_cosigner' in wizard_data:
cosigner = wizard_data['multisig_current_cosigner']
wdata = wizard_data['multisig_cosigner_data'][str(cosigner)]
return wdata
def needs_derivation_path(self, wizard_data):
wdata = self._current_cosigner(wizard_data)
return 'seed_variant' in wdata and wdata['seed_variant'] in ['bip39', 'slip39']
def wants_ext(self, wizard_data):
wdata = self._current_cosigner(wizard_data)
return 'seed_variant' in wdata and wdata['seed_extend']
def is_multisig(self, wizard_data):
return wizard_data['wallet_type'] == 'multisig'
def on_wallet_type(self, wizard_data):
t = wizard_data['wallet_type']
return {
'standard': 'keystore_type',
'2fa': 'trustedcoin_start',
'multisig': 'multisig',
'imported': 'imported'
}.get(t)
def on_keystore_type(self, wizard_data):
t = wizard_data['keystore_type']
return {
'createseed': 'create_seed',
'haveseed': 'have_seed',
'masterkey': 'have_master_key'
}.get(t)
def on_have_or_confirm_seed(self, wizard_data):
if self.needs_derivation_path(wizard_data):
return 'script_and_derivation'
elif self.is_multisig(wizard_data):
return 'multisig_cosigner_keystore'
else:
return 'wallet_password'
def maybe_master_pubkey(self, wizard_data):
self._logger.debug('maybe_master_pubkey')
if self.needs_derivation_path(wizard_data) and 'derivation_path' not in wizard_data:
self._logger.debug('deferred, missing derivation_path')
return
wizard_data['multisig_master_pubkey'] = self.keystore_from_data(wizard_data['wallet_type'], wizard_data).get_master_public_key()
def on_cosigner_keystore_type(self, wizard_data):
t = wizard_data['cosigner_keystore_type']
return {
'key': 'multisig_cosigner_key',
'seed': 'multisig_cosigner_seed'
}.get(t)
def on_have_cosigner_seed(self, wizard_data):
current_cosigner_data = wizard_data['multisig_cosigner_data'][str(wizard_data['multisig_current_cosigner'])]
if self.needs_derivation_path(wizard_data) and 'derivation_path' not in current_cosigner_data:
return 'multisig_cosigner_script_and_derivation'
elif self.last_cosigner(wizard_data):
return 'wallet_password'
else:
return 'multisig_cosigner_keystore'
def last_cosigner(self, wizard_data):
# check if we have the final number of cosigners. Doesn't check if cosigner data itself is complete
# (should be validated by wizardcomponents)
if len(wizard_data['multisig_cosigner_data']) < (wizard_data['multisig_participants'] - 1):
return False
return True
def has_duplicate_masterkeys(self, wizard_data) -> bool:
"""Multisig wallets need distinct master keys. If True, need to prevent wallet-creation."""
xpubs = []
xpubs.append(self.keystore_from_data(wizard_data['wallet_type'], wizard_data).get_master_public_key())
for cosigner in wizard_data['multisig_cosigner_data']:
data = wizard_data['multisig_cosigner_data'][cosigner]
xpubs.append(self.keystore_from_data(wizard_data['wallet_type'], data).get_master_public_key())
assert xpubs
return len(xpubs) != len(set(xpubs))
def has_heterogeneous_masterkeys(self, wizard_data) -> bool:
"""Multisig wallets need homogeneous master keys.
All master keys need to be bip32, and e.g. Ypub cannot be mixed with Zpub.
If True, need to prevent wallet-creation.
"""
xpubs = [self.keystore_from_data(wizard_data['wallet_type'], wizard_data).get_master_public_key()]
for cosigner in wizard_data['multisig_cosigner_data']:
data = wizard_data['multisig_cosigner_data'][cosigner]
xpubs.append(self.keystore_from_data(wizard_data['wallet_type'], data).get_master_public_key())
assert xpubs
try:
k_xpub_type = xpub_type(xpubs[0])
except Exception:
return True # maybe old_mpk?
for xpub in xpubs:
try:
my_xpub_type = xpub_type(xpub)
except Exception:
return True # maybe old_mpk?
if my_xpub_type != k_xpub_type:
return True
return False
def keystore_from_data(self, wallet_type, data):
if 'seed' in data:
if data['seed_variant'] == 'electrum':
return keystore.from_seed(data['seed'], data['seed_extra_words'], True)
elif data['seed_variant'] == 'bip39':
root_seed = keystore.bip39_to_seed(data['seed'], data['seed_extra_words'])
derivation = normalize_bip32_derivation(data['derivation_path'])
if wallet_type == 'multisig':
script = data['script_type'] if data['script_type'] != 'p2sh' else 'standard'
else:
script = data['script_type'] if data['script_type'] != 'p2pkh' else 'standard'
return keystore.from_bip43_rootseed(root_seed, derivation, xtype=script)
elif data['seed_variant'] == 'slip39':
root_seed = data['seed'].decrypt(data['seed_extra_words'])
derivation = normalize_bip32_derivation(data['derivation_path'])
if wallet_type == 'multisig':
script = data['script_type'] if data['script_type'] != 'p2sh' else 'standard'
else:
script = data['script_type'] if data['script_type'] != 'p2pkh' else 'standard'
return keystore.from_bip43_rootseed(root_seed, derivation, xtype=script)
else:
raise Exception('Unsupported seed variant %s' % data['seed_variant'])
elif 'master_key' in data:
return keystore.from_master_key(data['master_key'])
else:
raise Exception('no seed or master_key in data')
def validate_seed(self, seed, seed_variant, wallet_type):
seed_type = ''
seed_valid = False
validation_message = ''
if seed_variant == 'electrum':
seed_type = mnemonic.seed_type(seed)
if seed_type != '':
seed_valid = True
elif seed_variant == 'bip39':
is_checksum, is_wordlist = keystore.bip39_is_checksum_valid(seed)
status = ('checksum: ' + ('ok' if is_checksum else 'failed')) if is_wordlist else 'unknown wordlist'
validation_message = 'BIP39 (%s)' % status
if is_checksum:
seed_type = 'bip39'
seed_valid = True
elif seed_variant == 'slip39':
# seed shares should be already validated by wizard page, we have a combined encrypted seed
if seed and isinstance(seed, EncryptedSeed):
seed_valid = True
seed_type = 'slip39'
else:
seed_valid = False
else:
raise Exception(f'unknown seed variant {seed_variant}')
# check if seed matches wallet type
if wallet_type == '2fa' and not is_any_2fa_seed_type(seed_type):
seed_valid = False
elif wallet_type == 'standard' and seed_type not in ['old', 'standard', 'segwit', 'bip39', 'slip39']:
seed_valid = False
elif wallet_type == 'multisig' and seed_type not in ['standard', 'segwit', 'bip39']:
seed_valid = False
self._logger.debug(f'seed verified: {seed_valid}, type={seed_type}, validation_message={validation_message}')
return seed_valid, seed_type, validation_message
def create_storage(self, path, data):
assert data['wallet_type'] in ['standard', '2fa', 'imported', 'multisig']
if os.path.exists(path):
raise Exception('file already exists at path')
storage = WalletStorage(path)
# TODO: refactor using self.keystore_from_data
k = None
if 'keystore_type' not in data:
assert data['wallet_type'] == 'imported'
addresses = {}
if 'private_key_list' in data:
k = keystore.Imported_KeyStore({})
keys = keystore.get_private_keys(data['private_key_list'])
for pk in keys:
assert bitcoin.is_private_key(pk)
txin_type, pubkey = k.import_privkey(pk, None)
addr = bitcoin.pubkey_to_address(txin_type, pubkey)
addresses[addr] = {'type': txin_type, 'pubkey': pubkey}
elif 'address_list' in data:
for addr in data['address_list'].split():
addresses[addr] = {}
elif data['keystore_type'] in ['createseed', 'haveseed']:
if data['seed_type'] in ['old', 'standard', 'segwit']:
self._logger.debug('creating keystore from electrum seed')
k = keystore.from_seed(data['seed'], data['seed_extra_words'], data['wallet_type'] == 'multisig')
elif data['seed_type'] in ['bip39', 'slip39']:
self._logger.debug('creating keystore from %s seed' % data['seed_type'])
if data['seed_type'] == 'bip39':
root_seed = keystore.bip39_to_seed(data['seed'], data['seed_extra_words'])
else:
root_seed = data['seed'].decrypt(data['seed_extra_words'])
derivation = normalize_bip32_derivation(data['derivation_path'])
if data['wallet_type'] == 'multisig':
script = data['script_type'] if data['script_type'] != 'p2sh' else 'standard'
else:
script = data['script_type'] if data['script_type'] != 'p2pkh' else 'standard'
k = keystore.from_bip43_rootseed(root_seed, derivation, xtype=script)
elif is_any_2fa_seed_type(data['seed_type']):
self._logger.debug('creating keystore from 2fa seed')
k = keystore.from_xprv(data['x1/']['xprv'])
else:
raise Exception('unsupported/unknown seed_type %s' % data['seed_type'])
elif data['keystore_type'] == 'masterkey':
k = keystore.from_master_key(data['master_key'])
if isinstance(k, keystore.Xpub): # has xpub
t1 = xpub_type(k.xpub)
if data['wallet_type'] == 'multisig':
if t1 not in ['standard', 'p2wsh', 'p2wsh-p2sh']:
raise Exception('wrong key type %s' % t1)
else:
if t1 not in ['standard', 'p2wpkh', 'p2wpkh-p2sh']:
raise Exception('wrong key type %s' % t1)
elif isinstance(k, keystore.Old_KeyStore):
pass
else:
raise Exception(f"unexpected keystore type: {type(keystore)}")
else:
raise Exception('unsupported/unknown keystore_type %s' % data['keystore_type'])
if data['encrypt']:
if k and k.may_have_password():
k.update_password(None, data['password'])
storage.set_password(data['password'], enc_version=StorageEncryptionVersion.USER_PASSWORD)
db = WalletDB('', storage=storage, manual_upgrades=False)
db.set_keystore_encryption(bool(data['password']) and data['encrypt'])
db.put('wallet_type', data['wallet_type'])
if 'seed_type' in data:
db.put('seed_type', data['seed_type'])
if data['wallet_type'] == 'standard':
db.put('keystore', k.dump())
elif data['wallet_type'] == '2fa':
db.put('x1/', k.dump())
if data['trustedcoin_keepordisable'] == 'disable':
k2 = keystore.from_xprv(data['x2/']['xprv'])
if data['encrypt'] and k2.may_have_password():
k2.update_password(None, data['password'])
db.put('x2/', k2.dump())
else:
db.put('x2/', data['x2/'])
db.put('x3/', data['x3/'])
db.put('use_trustedcoin', True)
elif data['wallet_type'] == 'multisig':
if not isinstance(k, keystore.Xpub):
raise Exception(f"unexpected keystore(main) type={type(k)} in multisig. not bip32.")
k_xpub_type = xpub_type(k.xpub)
db.put('wallet_type', '%dof%d' % (data['multisig_signatures'],data['multisig_participants']))
db.put('x1/', k.dump())
for cosigner in data['multisig_cosigner_data']:
cosigner_keystore = self.keystore_from_data('multisig', data['multisig_cosigner_data'][cosigner])
if not isinstance(cosigner_keystore, keystore.Xpub):
raise Exception(f"unexpected keystore(cosigner) type={type(cosigner_keystore)} in multisig. not bip32.")
if k_xpub_type != xpub_type(cosigner_keystore.xpub):
raise Exception("multisig wallet needs to have homogeneous xpub types")
if data['encrypt'] and cosigner_keystore.may_have_password():
cosigner_keystore.update_password(None, data['password'])
db.put(f'x{cosigner}/', cosigner_keystore.dump())
elif data['wallet_type'] == 'imported':
if k:
db.put('keystore', k.dump())
db.put('addresses', addresses)
if k and k.can_have_deterministic_lightning_xprv():
db.put('lightning_xprv', k.get_lightning_xprv(data['password'] if data['encrypt'] else None))
db.load_plugins()
db.write()
class ServerConnectWizard(AbstractWizard):
_logger = get_logger(__name__)
def __init__(self, daemon):
AbstractWizard.__init__(self)
self.navmap = {
'autoconnect': {
'next': 'server_config',
'last': lambda d: d['autoconnect']
},
'proxy_ask': {
'next': lambda d: 'proxy_config' if d['want_proxy'] else 'autoconnect'
},
'proxy_config': {
'next': 'autoconnect'
},
'server_config': {
'last': True
}
}
self._daemon = daemon
def start(self, initial_data=None):
if initial_data is None:
initial_data = {}
self.reset()
self._current = WizardViewState('proxy_ask', initial_data, {})
return self._current