You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
474 lines
19 KiB
474 lines
19 KiB
import os |
|
import base64 |
|
import json |
|
from typing import Optional |
|
|
|
from electrum import bip32, constants |
|
from electrum.crypto import sha256 |
|
from electrum.i18n import _ |
|
from electrum.keystore import Hardware_KeyStore |
|
from electrum.transaction import Transaction |
|
from electrum.wallet import Multisig_Wallet |
|
from electrum.util import UserFacingException |
|
from electrum.base_wizard import ScriptTypeNotSupported |
|
from electrum.logging import get_logger |
|
from electrum.plugin import runs_in_hwd_thread, Device |
|
from electrum.network import Network |
|
|
|
from ..hw_wallet import HW_PluginBase, HardwareClientBase |
|
from ..hw_wallet.plugin import OutdatedHwFirmwareException |
|
|
|
|
|
_logger = get_logger(__name__) |
|
|
|
#import logging |
|
#LOGGING = logging.INFO |
|
#if LOGGING: |
|
# logger = logging.getLogger('jade') |
|
# logger.setLevel(LOGGING) |
|
# device_logger = logging.getLogger('jade-device') |
|
# device_logger.setLevel(LOGGING) |
|
|
|
try: |
|
# Do imports |
|
from .jadepy.jade import JadeAPI |
|
from serial.tools import list_ports |
|
except ImportError as e: |
|
_logger.exception('error importing Jade plugin deps') |
|
|
|
# Ignore -beta and -rc etc labels |
|
def _versiontuple(v): |
|
return tuple(map(int, (v.split('-')[0].split('.')))) |
|
|
|
def _is_multisig(wallet): |
|
return type(wallet) is Multisig_Wallet |
|
|
|
# Ensure a multisig wallet is registered on Jade hw. |
|
# Derives and returns the deterministic name for that multisig registration |
|
def _register_multisig_wallet(wallet, keystore, address): |
|
wallet_fingerprint_hash = sha256(wallet.get_fingerprint()) |
|
multisig_name = 'ele' + wallet_fingerprint_hash.hex()[:12] |
|
|
|
# Collect all the signer data in case we need to register the |
|
# multisig wallet on the Jade hw - NOTE: re-register is a no-op. |
|
signers = [] |
|
for kstore in wallet.get_keystores(): |
|
fingerprint = kstore.get_root_fingerprint() |
|
bip32_path_prefix = kstore.get_derivation_prefix() |
|
derivation_path = bip32.convert_bip32_path_to_list_of_uint32(bip32_path_prefix) |
|
|
|
# Jade only understands standard xtypes, so convert here |
|
node = bip32.BIP32Node.from_xkey(kstore.xpub) |
|
standard_xpub = node._replace(xtype='standard').to_xkey() |
|
|
|
signers.append({'fingerprint': bytes.fromhex(fingerprint), |
|
'derivation': derivation_path, |
|
'xpub': standard_xpub, |
|
'path': []}) |
|
|
|
# Check multisig is registered - re-registering is a no-op |
|
# NOTE: electrum multisigs appear to always be sorted-multisig |
|
txin_type = wallet.get_txin_type(address) |
|
keystore.register_multisig(multisig_name, txin_type, True, wallet.m, signers) |
|
|
|
# Return the name used to register the wallet |
|
return multisig_name |
|
|
|
# Helper to adapt Jade's http call/data to Network.send_http_on_proxy() |
|
def _http_request(params): |
|
# Use the first non-onion url |
|
url = [url for url in params['urls'] if not url.endswith('.onion')][0] |
|
method = params['method'].lower() |
|
json_payload = params.get('data') |
|
json_response = Network.send_http_on_proxy(method, url, json=json_payload) |
|
return {'body': json.loads(json_response)} |
|
|
|
class Jade_Client(HardwareClientBase): |
|
|
|
@staticmethod |
|
def _network() -> str: |
|
return 'localtest' if constants.net.NET_NAME == 'regtest' else constants.net.NET_NAME |
|
|
|
ADDRTYPES = {'standard': 'pkh(k)', |
|
'p2pkh': 'pkh(k)', |
|
'p2wpkh': 'wpkh(k)', |
|
'p2wpkh-p2sh': 'sh(wpkh(k))'} |
|
|
|
MULTI_ADDRTYPES = {'standard': 'sh(multi(k))', |
|
'p2sh': 'sh(multi(k))', |
|
'p2wsh': 'wsh(multi(k))', |
|
'p2wsh-p2sh': 'sh(wsh(multi(k)))'} |
|
|
|
@classmethod |
|
def _convertAddrType(cls, addrType: str, multisig: bool) -> str: |
|
return cls.MULTI_ADDRTYPES[addrType] if multisig else cls.ADDRTYPES[addrType] |
|
|
|
def __init__(self, device: str, plugin: HW_PluginBase): |
|
HardwareClientBase.__init__(self, plugin=plugin) |
|
|
|
# Connect with a small timeout to test connection |
|
self.jade = JadeAPI.create_serial(device, timeout=1) |
|
self.jade.connect() |
|
|
|
verinfo = self.jade.get_version_info() |
|
self.fwversion = _versiontuple(verinfo['JADE_VERSION']) |
|
self.efusemac = verinfo['EFUSEMAC'] |
|
self.jade.disconnect() |
|
|
|
# Reconnect with a the default timeout for all subsequent calls |
|
self.jade = JadeAPI.create_serial(device) |
|
self.jade.connect() |
|
|
|
# Push some host entropy into jade |
|
self.jade.add_entropy(os.urandom(32)) |
|
|
|
@runs_in_hwd_thread |
|
def authenticate(self): |
|
# Ensure Jade unlocked - always call hw unit at least once |
|
# If the hw is already unlocked, this call returns immediately/no-op |
|
# NOTE: uses provided http/networking which respects any user proxy |
|
authenticated = False |
|
while not authenticated: |
|
authenticated = self.jade.auth_user(self._network(), http_request_fn=_http_request) |
|
|
|
def is_pairable(self): |
|
return True |
|
|
|
@runs_in_hwd_thread |
|
def close(self): |
|
self.jade.disconnect() |
|
self.jade = None |
|
|
|
@runs_in_hwd_thread |
|
def is_initialized(self): |
|
verinfo = self.jade.get_version_info() |
|
return verinfo['JADE_STATE'] != 'UNINIT' |
|
|
|
def label(self) -> Optional[str]: |
|
return self.efusemac[-6:] |
|
|
|
def get_soft_device_id(self): |
|
return f'Jade {self.label()}' |
|
|
|
def device_model_name(self): |
|
return 'Blockstream Jade' |
|
|
|
@runs_in_hwd_thread |
|
def has_usable_connection_with_device(self): |
|
if self.efusemac is None: |
|
return False |
|
|
|
try: |
|
verinfo = self.jade.get_version_info() |
|
return verinfo['EFUSEMAC'] == self.efusemac |
|
except BaseException: |
|
return False |
|
|
|
@runs_in_hwd_thread |
|
def get_xpub(self, bip32_path, xtype): |
|
self.authenticate() |
|
|
|
# Jade only provides traditional xpubs ... |
|
path = bip32.convert_bip32_path_to_list_of_uint32(bip32_path) |
|
xpub = self.jade.get_xpub(self._network(), path) |
|
|
|
# ... so convert to relevant xtype locally |
|
node = bip32.BIP32Node.from_xkey(xpub) |
|
return node._replace(xtype=xtype).to_xkey() |
|
|
|
@runs_in_hwd_thread |
|
def sign_message(self, bip32_path_prefix, sequence, message): |
|
self.authenticate() |
|
|
|
path = bip32.convert_bip32_path_to_list_of_uint32(bip32_path_prefix) |
|
path.extend(sequence) |
|
|
|
if isinstance(message, bytes) or isinstance(message, bytearray): |
|
message = message.decode('utf-8') |
|
|
|
# Signature verification does not work with anti-exfil, so stick with default (rfc6979) |
|
sig = self.jade.sign_message(path, message) |
|
return base64.b64decode(sig) |
|
|
|
@runs_in_hwd_thread |
|
def sign_tx(self, txn_bytes, inputs, change): |
|
self.authenticate() |
|
|
|
# Add some host entropy for AE sigs (although we won't verify) |
|
for input in inputs: |
|
if input['path'] is not None: |
|
input['ae_host_entropy'] = os.urandom(32) |
|
input['ae_host_commitment'] = os.urandom(32) |
|
|
|
# Map change script type |
|
for output in change: |
|
if output and output.get('variant') is not None: |
|
output['variant'] = self._convertAddrType(output['variant'], False) |
|
|
|
# Pass to Jade to generate signatures |
|
sig_data = self.jade.sign_tx(self._network(), txn_bytes, inputs, change, use_ae_signatures=True) |
|
|
|
# Extract signatures from returned data (sig[0] is the AE signer-commitment) |
|
return [sig[1] for sig in sig_data] |
|
|
|
@runs_in_hwd_thread |
|
def show_address(self, bip32_path_prefix, sequence, txin_type): |
|
self.authenticate() |
|
path = bip32.convert_bip32_path_to_list_of_uint32(bip32_path_prefix) |
|
path.extend(sequence) |
|
script_variant = self._convertAddrType(txin_type, multisig=False) |
|
address = self.jade.get_receive_address(self._network(), path, variant=script_variant) |
|
return address |
|
|
|
@runs_in_hwd_thread |
|
def register_multisig(self, multisig_name, txin_type, sorted, threshold, signers): |
|
self.authenticate() |
|
variant = self._convertAddrType(txin_type, multisig=True) |
|
return self.jade.register_multisig(self._network(), multisig_name, variant, sorted, threshold, signers) |
|
|
|
@runs_in_hwd_thread |
|
def show_address_multi(self, multisig_name, paths): |
|
self.authenticate() |
|
return self.jade.get_receive_address(self._network(), paths, multisig_name=multisig_name) |
|
|
|
class Jade_KeyStore(Hardware_KeyStore): |
|
hw_type = 'jade' |
|
device = 'Jade' |
|
|
|
plugin: 'JadePlugin' |
|
|
|
def decrypt_message(self, sequence, message, password): |
|
raise UserFacingException(_('Encryption and decryption are not implemented by {}').format(self.device)) |
|
|
|
@runs_in_hwd_thread |
|
def sign_message(self, sequence, message, password, *, script_type=None): |
|
self.handler.show_message(_("Please confirm signing the message with your Jade device...")) |
|
try: |
|
client = self.get_client() |
|
bip32_path_prefix = self.get_derivation_prefix() |
|
return client.sign_message(bip32_path_prefix, sequence, message) |
|
finally: |
|
self.handler.finished() |
|
|
|
@runs_in_hwd_thread |
|
def sign_transaction(self, tx, password): |
|
if tx.is_complete(): |
|
return |
|
|
|
self.handler.show_message(_("Preparing to sign transaction ...")) |
|
try: |
|
wallet = self.handler.get_wallet() |
|
is_multisig = _is_multisig(wallet) |
|
|
|
# Fetch inputs of the transaction to sign |
|
jade_inputs = [] |
|
for txin in tx.inputs(): |
|
pubkey, path = self.find_my_pubkey_in_txinout(txin) |
|
witness_input = txin.is_segwit() |
|
redeem_script = Transaction.get_preimage_script(txin) |
|
redeem_script = bytes.fromhex(redeem_script) if redeem_script is not None else None |
|
input_tx = txin.utxo |
|
input_tx = bytes.fromhex(input_tx.serialize()) if input_tx is not None else None |
|
|
|
# Build the input and add to the list - include some host entropy for AE sigs (although we won't verify) |
|
jade_inputs.append({'is_witness': witness_input, |
|
'input_tx': input_tx, |
|
'script': redeem_script, |
|
'path': path}) |
|
|
|
# Change detection |
|
change = [None] * len(tx.outputs()) |
|
for index, txout in enumerate(tx.outputs()): |
|
if txout.is_mine and txout.is_change: |
|
assert (desc := txout.script_descriptor) |
|
if is_multisig: |
|
# Multisig - wallet details must be registered on Jade hw |
|
multisig_name = _register_multisig_wallet(wallet, self, txout.address) |
|
|
|
# Jade only needs the path suffix(es) and the multisig registration |
|
# name to generate the address, as the fixed derivation part is |
|
# embedded in the multisig wallet registration record |
|
# NOTE: all cosigners have same path suffix |
|
path_suffix = wallet.get_address_index(txout.address) |
|
paths = [path_suffix] * wallet.n |
|
change[index] = {'multisig_name': multisig_name, 'paths': paths} |
|
else: |
|
# Pass entire path |
|
pubkey, path = self.find_my_pubkey_in_txinout(txout) |
|
change[index] = {'path':path, 'variant': desc.to_legacy_electrum_script_type()} |
|
|
|
# The txn itself |
|
txn_bytes = bytes.fromhex(tx.serialize_to_network()) |
|
|
|
# Request Jade generate the signatures for our inputs. |
|
# Change details are passed to be validated on the hw (user does not confirm) |
|
self.handler.show_message(_("Please confirm the transaction details on your Jade device...")) |
|
client = self.get_client() |
|
signatures = client.sign_tx(txn_bytes, jade_inputs, change) |
|
assert len(signatures) == len(tx.inputs()) |
|
|
|
# Inject signatures into tx |
|
for index, (txin, signature) in enumerate(zip(tx.inputs(), signatures)): |
|
pubkey, path = self.find_my_pubkey_in_txinout(txin) |
|
if pubkey is not None and signature is not None: |
|
tx.add_signature_to_txin(txin_idx=index, |
|
signing_pubkey=pubkey.hex(), |
|
sig=signature.hex()) |
|
finally: |
|
self.handler.finished() |
|
|
|
@runs_in_hwd_thread |
|
def show_address(self, sequence, txin_type): |
|
self.handler.show_message(_("Showing address ...")) |
|
try: |
|
client = self.get_client() |
|
bip32_path_prefix = self.get_derivation_prefix() |
|
return client.show_address(bip32_path_prefix, sequence, txin_type) |
|
finally: |
|
self.handler.finished() |
|
|
|
@runs_in_hwd_thread |
|
def register_multisig(self, name, txin_type, sorted, threshold, signers): |
|
self.handler.show_message(_("Please confirm the multisig wallet details on your Jade device...")) |
|
try: |
|
client = self.get_client() |
|
return client.register_multisig(name, txin_type, sorted, threshold, signers) |
|
finally: |
|
self.handler.finished() |
|
|
|
@runs_in_hwd_thread |
|
def show_address_multi(self, multisig_name, paths): |
|
self.handler.show_message(_("Showing address ...")) |
|
try: |
|
client = self.get_client() |
|
return client.show_address_multi(multisig_name, paths) |
|
finally: |
|
self.handler.finished() |
|
|
|
|
|
class JadePlugin(HW_PluginBase): |
|
keystore_class = Jade_KeyStore |
|
minimum_library = (0, 0, 1) |
|
DEVICE_IDS = [(0x10c4, 0xea60), (0x1a86, 0x55d4)] |
|
SUPPORTED_XTYPES = ('standard', 'p2wpkh-p2sh', 'p2wpkh', 'p2wsh-p2sh', 'p2wsh') |
|
MIN_SUPPORTED_FW_VERSION = (0, 1, 32) |
|
|
|
# For testing with qemu simulator (experimental) |
|
SIMULATOR_PATH = None # 'tcp:127.0.0.1:2222' |
|
SIMULATOR_TEST_SEED = None # bytes.fromhex('b90e532426d0dc20fffe01037048c018e940300038b165c211915c672e07762c') |
|
|
|
def enumerate_serial(self): |
|
# Jade is not really an HID device, it shows as a serial/com port device. |
|
# Scan com ports looking for the relevant vid and pid, and use 'path' to |
|
# hold the path to the serial port device, eg. /dev/ttyUSB0 |
|
devices = [] |
|
for devinfo in list_ports.comports(): |
|
device_product_key = (devinfo.vid, devinfo.pid) |
|
if device_product_key in self.DEVICE_IDS: |
|
device = Device(path=devinfo.device, |
|
interface_number=-1, |
|
id_=devinfo.serial_number, |
|
product_key=device_product_key, |
|
usage_page=-1, |
|
transport_ui_string=devinfo.device) |
|
devices.append(device) |
|
|
|
# Maybe look for Jade Qemu simulator if the vars are set (experimental) |
|
if self.SIMULATOR_PATH is not None and self.SIMULATOR_TEST_SEED is not None: |
|
try: |
|
# If we can connect to a simulator and poke a seed in, add that too |
|
client = Jade_Client(self.SIMULATOR_PATH, plugin=self) |
|
device = Device(path=self.SIMULATOR_PATH, |
|
interface_number=-1, |
|
id_='Jade Qemu Simulator', |
|
product_key=self.DEVICE_IDS[0], |
|
usage_page=-1, |
|
transport_ui_string='simulator') |
|
if client.jade.set_seed(self.SIMULATOR_TEST_SEED): |
|
devices.append(device) |
|
client.close() |
|
except Exception as e: |
|
# If we get any sort of error do not add the simulator |
|
_logger.debug("Failed to connect to Jade simulator at {}".format(self.SIMULATOR_PATH)) |
|
_logger.debug(e) |
|
|
|
return devices |
|
|
|
def __init__(self, parent, config, name): |
|
HW_PluginBase.__init__(self, parent, config, name) |
|
|
|
self.libraries_available = self.check_libraries_available() |
|
if not self.libraries_available: |
|
return |
|
|
|
# Register our own serial/com port scanning function |
|
self.device_manager().register_enumerate_func(self.enumerate_serial) |
|
|
|
def get_library_version(self): |
|
try: |
|
from . import jadepy |
|
version = jadepy.__version__ |
|
except ImportError: |
|
raise |
|
except: |
|
version = "unknown" |
|
return version |
|
|
|
@runs_in_hwd_thread |
|
def create_client(self, device, handler): |
|
client = Jade_Client(device.path, plugin=self) |
|
|
|
# Check minimum supported firmware version |
|
if self.MIN_SUPPORTED_FW_VERSION > client.fwversion: |
|
msg = (_('Outdated {} firmware for device labelled {}. Please ' |
|
'update using a Blockstream Green companion app') |
|
.format(self.device, client.label())) |
|
self.logger.info(msg) |
|
|
|
if handler: |
|
handler.show_error(msg) |
|
|
|
raise OutdatedHwFirmwareException(msg) |
|
|
|
return client |
|
|
|
def setup_device(self, device_info, wizard, purpose): |
|
device_id = device_info.device.id_ |
|
client = self.scan_and_create_client_for_device(device_id=device_id, wizard=wizard) |
|
|
|
# Call authenticate on hww to ensure unlocked and suitable for network |
|
# May involve user entering PIN on (or even setting up!) hardware device |
|
wizard.run_task_without_blocking_gui(task=lambda: client.authenticate()) |
|
return client |
|
|
|
def get_xpub(self, device_id, derivation, xtype, wizard): |
|
if xtype not in self.SUPPORTED_XTYPES: |
|
raise ScriptTypeNotSupported(_('This type of script is not supported with {}.').format(self.device)) |
|
client = self.scan_and_create_client_for_device(device_id=device_id, wizard=wizard) |
|
xpub = client.get_xpub(derivation, xtype) |
|
return xpub |
|
|
|
def show_address(self, wallet, address, keystore=None): |
|
if keystore is None: |
|
keystore = wallet.get_keystore() |
|
if not self.show_address_helper(wallet, address, keystore): |
|
return |
|
|
|
path_suffix = wallet.get_address_index(address) |
|
if _is_multisig(wallet): |
|
# Multisig - wallet details must be registered on Jade hw |
|
multisig_name = _register_multisig_wallet(wallet, keystore, address) |
|
|
|
# Jade only needs the path suffix(es) and the multisig registration |
|
# name to generate the address, as the fixed derivation part is |
|
# embedded in the multisig wallet registration record |
|
# NOTE: all cosigners have same path suffix |
|
paths = [path_suffix] * wallet.n |
|
hw_address = keystore.show_address_multi(multisig_name, paths) |
|
else: |
|
# Single-sig/standard |
|
txin_type = wallet.get_txin_type(address) |
|
hw_address = keystore.show_address(path_suffix, txin_type) |
|
|
|
if hw_address != address: |
|
keystore.handler.show_error(_('The address generated by {} does not match!').format(self.device))
|
|
|