You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
2623 lines
100 KiB
2623 lines
100 KiB
|
|
from configparser import NoOptionError |
|
import warnings |
|
import functools |
|
import collections |
|
import numbers |
|
import random |
|
import copy |
|
import base64 |
|
import json |
|
from math import ceil |
|
from binascii import hexlify, unhexlify |
|
from datetime import datetime, timedelta |
|
from calendar import timegm |
|
from copy import deepcopy |
|
from mnemonic import Mnemonic as MnemonicParent |
|
from hashlib import sha256 |
|
from itertools import chain |
|
from decimal import Decimal |
|
from numbers import Integral |
|
from math import exp |
|
|
|
|
|
from .configure import jm_single |
|
from .blockchaininterface import INF_HEIGHT |
|
from .support import select_gradual, select_greedy, select_greediest, \ |
|
select, NotEnoughFundsException |
|
from .cryptoengine import TYPE_P2PKH, TYPE_P2SH_P2WPKH, TYPE_P2WSH,\ |
|
TYPE_P2WPKH, TYPE_TIMELOCK_P2WSH, TYPE_SEGWIT_WALLET_FIDELITY_BONDS,\ |
|
TYPE_WATCHONLY_FIDELITY_BONDS, TYPE_WATCHONLY_TIMELOCK_P2WSH, \ |
|
TYPE_WATCHONLY_P2WPKH, TYPE_P2TR, ENGINES, detect_script_type, EngineError |
|
from .support import get_random_bytes |
|
from . import mn_encode, mn_decode |
|
import jmbitcoin as btc |
|
from jmbase import JM_WALLET_NAME_PREFIX, bintohex |
|
|
|
|
|
def _int_to_bytestr(i): |
|
return str(i).encode('ascii') |
|
|
|
|
|
class WalletError(Exception): |
|
pass |
|
|
|
|
|
class UnknownAddressForLabel(Exception): |
|
|
|
def __init__(self, addr): |
|
super().__init__('Unknown address for this wallet: ' + addr) |
|
|
|
|
|
class Mnemonic(MnemonicParent): |
|
@classmethod |
|
def detect_language(cls, code): |
|
return "english" |
|
|
|
def estimate_tx_fee(ins, outs, txtype='p2pkh', outtype=None, extra_bytes=0): |
|
'''Returns an estimate of the number of satoshis required |
|
for a transaction with the given number of inputs and outputs, |
|
based on information from the blockchain interface. |
|
|
|
Arguments: |
|
ins: int, number of inputs |
|
outs: int, number of outputs |
|
txtype: either a single string, or a list of strings |
|
outtype: either None or a list of strings |
|
extra_bytes: an int |
|
These arguments are intended to allow a kind of 'default', where |
|
all the inputs and outputs match a predefined type (that of the wallet), |
|
but also allow customization for heterogeneous input and output types. |
|
For supported input and output types, see the keys of the dicts |
|
`inmults` and `outmults` in jmbitcoin.secp256k1_transaction.estimate_tx_size`. |
|
|
|
Returns: |
|
a single integer number of satoshis as estimate. |
|
''' |
|
if jm_single().bc_interface is None: |
|
raise RuntimeError("Cannot estimate transaction fee " + |
|
"without blockchain source.") |
|
fee_per_kb = jm_single().bc_interface.estimate_fee_per_kb( |
|
jm_single().config.getint("POLICY","tx_fees")) |
|
if fee_per_kb is None: |
|
raise RuntimeError("Cannot estimate fee per kB, possibly" + |
|
" a failure of connection to the blockchain.") |
|
absurd_fee = jm_single().config.getint("POLICY", "absurd_fee_per_kb") |
|
if fee_per_kb > absurd_fee: |
|
#This error is considered critical; for safety reasons, shut down. |
|
raise ValueError("Estimated fee " + |
|
btc.fee_per_kb_to_str(fee_per_kb) + |
|
" greater than absurd value " + |
|
btc.fee_per_kb_to_str(absurd_fee) + ", quitting.") |
|
|
|
# See docstring for explanation: |
|
if isinstance(txtype, str): |
|
ins = [txtype] * ins |
|
else: |
|
assert isinstance(txtype, list) |
|
ins = txtype |
|
if outtype is None: |
|
outs = [txtype] * outs |
|
elif isinstance(outtype, str): |
|
outs = [outtype] * outs |
|
else: |
|
assert isinstance(outtype, list) |
|
outs = outtype |
|
|
|
# Note: the calls to `estimate_tx_size` in this code |
|
# block can raise `NotImplementedError` if any of the |
|
# strings in (ins, outs) are not known script types. |
|
if not btc.there_is_one_segwit_input(ins): |
|
tx_estimated_bytes = btc.estimate_tx_size(ins, outs) + extra_bytes |
|
return int((tx_estimated_bytes * fee_per_kb)/Decimal(1000.0)) |
|
else: |
|
witness_estimate, non_witness_estimate = btc.estimate_tx_size( |
|
ins, outs) |
|
non_witness_estimate += extra_bytes |
|
return int(int(ceil(non_witness_estimate + 0.25*witness_estimate)*fee_per_kb)/Decimal(1000.0)) |
|
|
|
def compute_tx_locktime(): |
|
# set locktime for best anonset (Core, Electrum) |
|
# most recent block or some time back in random cases |
|
locktime = jm_single().bc_interface.get_current_block_height() |
|
if random.randint(0, 9) == 0: |
|
# P2EP requires locktime > 0 |
|
locktime = max(1, locktime - random.randint(0, 99)) |
|
return locktime |
|
|
|
|
|
#FIXME: move this to a utilities file? |
|
def deprecated(func): |
|
@functools.wraps(func) |
|
def wrapped(*args, **kwargs): |
|
warnings.warn("Call to deprecated function {}.".format(func.__name__), |
|
category=DeprecationWarning, stacklevel=2) |
|
return func(*args, **kwargs) |
|
|
|
return wrapped |
|
|
|
|
|
class UTXOManager(object): |
|
STORAGE_KEY = b'utxo' |
|
METADATA_KEY = b'meta' |
|
TXID_LEN = 32 |
|
|
|
def __init__(self, storage, merge_func): |
|
self.storage = storage |
|
self.selector = merge_func |
|
# {mixdexpth: {(txid, index): (path, value, height)}} |
|
self._utxo = None |
|
# metadata kept as a separate key in the database |
|
# for backwards compat; value as dict for forward-compat. |
|
# format is {(txid, index): value-dict} with "disabled" |
|
# as the only currently used key in the dict. |
|
self._utxo_meta = None |
|
self._load_storage() |
|
assert self._utxo is not None |
|
|
|
@classmethod |
|
def initialize(cls, storage): |
|
storage.data[cls.STORAGE_KEY] = {} |
|
|
|
def _load_storage(self): |
|
assert isinstance(self.storage.data[self.STORAGE_KEY], dict) |
|
|
|
self._utxo = collections.defaultdict(dict) |
|
self._utxo_meta = collections.defaultdict(dict) |
|
for md, data in self.storage.data[self.STORAGE_KEY].items(): |
|
md = int(md) |
|
md_data = self._utxo[md] |
|
for utxo, value in data.items(): |
|
txid = utxo[:self.TXID_LEN] |
|
index = int(utxo[self.TXID_LEN:]) |
|
md_data[(txid, index)] = value |
|
|
|
# Wallets may not have any metadata |
|
if self.METADATA_KEY in self.storage.data: |
|
for utxo, value in self.storage.data[self.METADATA_KEY].items(): |
|
txid = utxo[:self.TXID_LEN] |
|
index = int(utxo[self.TXID_LEN:]) |
|
self._utxo_meta[(txid, index)] = value |
|
|
|
def save(self, write=True): |
|
new_data = {} |
|
self.storage.data[self.STORAGE_KEY] = new_data |
|
|
|
for md, data in self._utxo.items(): |
|
md = _int_to_bytestr(md) |
|
new_data[md] = {} |
|
# storage keys must be bytes() |
|
for (txid, index), value in data.items(): |
|
new_data[md][txid + _int_to_bytestr(index)] = value |
|
|
|
new_meta_data = {} |
|
self.storage.data[self.METADATA_KEY] = new_meta_data |
|
for (txid, index), value in self._utxo_meta.items(): |
|
new_meta_data[txid + _int_to_bytestr(index)] = value |
|
|
|
if write: |
|
self.storage.save() |
|
|
|
def reset(self): |
|
self._utxo = collections.defaultdict(dict) |
|
|
|
def have_utxo(self, txid, index, include_disabled=True): |
|
if not include_disabled and self.is_disabled(txid, index): |
|
return False |
|
for md in self._utxo: |
|
if (txid, index) in self._utxo[md]: |
|
return md |
|
return False |
|
|
|
def remove_utxo(self, txid, index, mixdepth): |
|
# currently does not remove metadata associated |
|
# with this utxo |
|
assert isinstance(txid, bytes) |
|
assert len(txid) == self.TXID_LEN |
|
assert isinstance(index, numbers.Integral) |
|
assert isinstance(mixdepth, numbers.Integral) |
|
|
|
x = self._utxo[mixdepth].pop((txid, index)) |
|
return x |
|
|
|
def add_utxo(self, txid, index, path, value, mixdepth, height=None): |
|
# Assumed: that we add a utxo only if we want it enabled, |
|
# so metadata is not currently added. |
|
# The height (blockheight) field will be "infinity" for unconfirmed. |
|
assert isinstance(txid, bytes) |
|
assert len(txid) == self.TXID_LEN |
|
assert isinstance(index, numbers.Integral) |
|
assert isinstance(value, numbers.Integral) |
|
assert isinstance(mixdepth, numbers.Integral) |
|
if height is None: |
|
height = INF_HEIGHT |
|
assert isinstance(height, numbers.Integral) |
|
|
|
self._utxo[mixdepth][(txid, index)] = (path, value, height) |
|
|
|
def is_disabled(self, txid, index): |
|
if not self._utxo_meta: |
|
return False |
|
if (txid, index) not in self._utxo_meta: |
|
return False |
|
if b'disabled' not in self._utxo_meta[(txid, index)]: |
|
return False |
|
if not self._utxo_meta[(txid, index)][b'disabled']: |
|
return False |
|
return True |
|
|
|
def disable_utxo(self, txid, index, disable=True): |
|
assert isinstance(txid, bytes) |
|
assert len(txid) == self.TXID_LEN |
|
assert isinstance(index, numbers.Integral) |
|
|
|
if b'disabled' not in self._utxo_meta[(txid, index)]: |
|
self._utxo_meta[(txid, index)] = {} |
|
self._utxo_meta[(txid, index)][b'disabled'] = disable |
|
|
|
def enable_utxo(self, txid, index): |
|
self.disable_utxo(txid, index, disable=False) |
|
|
|
def select_utxos(self, mixdepth, amount, utxo_filter=(), select_fn=None, |
|
maxheight=None): |
|
assert isinstance(mixdepth, numbers.Integral) |
|
utxos = self._utxo[mixdepth] |
|
# do not select anything in the filter |
|
available = [{'utxo': utxo, 'value': val} |
|
for utxo, (addr, val, height) in utxos.items() if utxo not in utxo_filter] |
|
# do not select anything with insufficient confirmations: |
|
if maxheight is not None: |
|
available = [{'utxo': utxo, 'value': val} |
|
for utxo, (addr, val, height) in utxos.items( |
|
) if height <= maxheight] |
|
# do not select anything disabled |
|
available = [u for u in available if not self.is_disabled(*u['utxo'])] |
|
selector = select_fn or self.selector |
|
selected = selector(available, amount) |
|
# note that we do not return height; for selection, we expect |
|
# the caller will not want this (after applying the height filter) |
|
return {s['utxo']: {'path': utxos[s['utxo']][0], |
|
'value': utxos[s['utxo']][1]} |
|
for s in selected} |
|
|
|
def get_balance_by_mixdepth(self, max_mixdepth=float('Inf'), |
|
include_disabled=True, maxheight=None): |
|
""" By default this returns a dict of aggregated bitcoin |
|
balance per mixdepth: {0: N sats, 1: M sats, ...} for all |
|
currently available mixdepths. |
|
If max_mixdepth is set it will return balances only up |
|
to that mixdepth. |
|
To get only enabled balance, set include_disabled=False. |
|
To get balances only with a certain number of confs, use maxheight. |
|
""" |
|
balance_dict = collections.defaultdict(int) |
|
for mixdepth, utxomap in self._utxo.items(): |
|
if mixdepth > max_mixdepth: |
|
continue |
|
if not include_disabled: |
|
utxomap = {k: v for k, v in utxomap.items( |
|
) if not self.is_disabled(*k)} |
|
if maxheight is not None: |
|
utxomap = {k: v for k, v in utxomap.items( |
|
) if v[2] <= maxheight} |
|
value = sum(x[1] for x in utxomap.values()) |
|
balance_dict[mixdepth] = value |
|
return balance_dict |
|
|
|
def get_utxos_by_mixdepth(self): |
|
return deepcopy(self._utxo) |
|
|
|
def __eq__(self, o): |
|
return self._utxo == o._utxo and \ |
|
self.selector is o.selector |
|
|
|
|
|
class AddressLabelsManager(object): |
|
STORAGE_KEY = b'addr_labels' |
|
|
|
def __init__(self, storage): |
|
self.storage = storage |
|
self._addr_labels = None |
|
self._load_storage() |
|
assert self._addr_labels is not None |
|
|
|
@classmethod |
|
def initialize(cls, storage): |
|
storage.data[cls.STORAGE_KEY] = {} |
|
|
|
def _load_storage(self): |
|
if self.STORAGE_KEY in self.storage.data: |
|
self._addr_labels = self.storage.data[self.STORAGE_KEY] |
|
else: |
|
self._addr_labels = {} |
|
|
|
def save(self): |
|
self.storage.data[self.STORAGE_KEY] = self._addr_labels |
|
|
|
def get_label(self, address): |
|
address = bytes(address, 'ascii') |
|
if address in self._addr_labels: |
|
return self._addr_labels[address].decode() |
|
else: |
|
return None |
|
|
|
def set_label(self, address, label): |
|
address = bytes(address, 'ascii') |
|
if label: |
|
self._addr_labels[address] = bytes(label, 'utf-8') |
|
elif address in self._addr_labels: |
|
del self._addr_labels[address] |
|
|
|
|
|
class BaseWallet(object): |
|
TYPE = None |
|
|
|
MERGE_ALGORITHMS = { |
|
'default': select, |
|
'gradual': select_gradual, |
|
'greedy': select_greedy, |
|
'greediest': select_greediest |
|
} |
|
|
|
_ENGINES = ENGINES |
|
|
|
_ENGINE = None |
|
|
|
ADDRESS_TYPE_EXTERNAL = 0 |
|
ADDRESS_TYPE_INTERNAL = 1 |
|
|
|
def __init__(self, storage, gap_limit=6, merge_algorithm_name=None, |
|
mixdepth=None): |
|
# to be defined by inheriting classes |
|
assert self.TYPE is not None |
|
assert self._ENGINE is not None |
|
|
|
self.merge_algorithm = self._get_merge_algorithm(merge_algorithm_name) |
|
self.gap_limit = gap_limit |
|
self._storage = storage |
|
self._utxos = None |
|
self._addr_labels = None |
|
# highest mixdepth ever used in wallet, important for synching |
|
self.max_mixdepth = None |
|
# effective maximum mixdepth to be used by joinmarket |
|
self.mixdepth = None |
|
self.network = None |
|
|
|
# {script: path}, should always hold mappings for all "known" keys |
|
self._script_map = {} |
|
|
|
self._load_storage() |
|
|
|
assert self._utxos is not None |
|
assert self.max_mixdepth is not None |
|
assert self.max_mixdepth >= 0 |
|
assert self.network in ('mainnet', 'testnet', 'signet') |
|
|
|
if mixdepth is not None: |
|
assert mixdepth >= 0 |
|
if self._storage.read_only and mixdepth > self.max_mixdepth: |
|
raise Exception("Effective max mixdepth must be at most {}!" |
|
.format(self.max_mixdepth)) |
|
self.max_mixdepth = max(self.max_mixdepth, mixdepth) |
|
self.mixdepth = mixdepth |
|
else: |
|
self.mixdepth = self.max_mixdepth |
|
|
|
assert self.mixdepth is not None |
|
|
|
@property |
|
@deprecated |
|
def max_mix_depth(self): |
|
return self.mixdepth |
|
|
|
@property |
|
@deprecated |
|
def gaplimit(self): |
|
return self.gap_limit |
|
|
|
def _load_storage(self): |
|
""" |
|
load data from storage |
|
""" |
|
if self._storage.data[b'wallet_type'] != self.TYPE: |
|
raise Exception("Wrong class to initialize wallet of type {}." |
|
.format(self.TYPE)) |
|
self.network = self._storage.data[b'network'].decode('ascii') |
|
self._utxos = UTXOManager(self._storage, self.merge_algorithm) |
|
self._addr_labels = AddressLabelsManager(self._storage) |
|
|
|
def get_storage_location(self): |
|
""" Return the location of the |
|
persistent storage, if it exists, or None. |
|
""" |
|
if not self._storage: |
|
return None |
|
return self._storage.get_location() |
|
|
|
def save(self): |
|
""" |
|
Write data to associated storage object and trigger persistent update. |
|
""" |
|
self._utxos.save() |
|
self._addr_labels.save() |
|
|
|
@classmethod |
|
def initialize(cls, storage, network, max_mixdepth=2, timestamp=None, |
|
write=True): |
|
""" |
|
Initialize wallet in an empty storage. Must be used on a storage object |
|
before creating a wallet object with it. |
|
|
|
args: |
|
storage: a Storage object |
|
network: str, network we are on, 'mainnet', 'testnet' or 'signet' |
|
max_mixdepth: int, number of the highest mixdepth |
|
timestamp: bytes or None, defaults to the current time |
|
write: execute storage.save() |
|
""" |
|
assert network in ('mainnet', 'testnet', 'signet') |
|
assert max_mixdepth >= 0 |
|
|
|
if storage.data != {}: |
|
# prevent accidentally overwriting existing wallet |
|
raise WalletError("Refusing to initialize wallet in non-empty " |
|
"storage.") |
|
|
|
if not timestamp: |
|
timestamp = datetime.now().strftime('%Y/%m/%d %H:%M:%S') |
|
|
|
storage.data[b'network'] = network.encode('ascii') |
|
storage.data[b'created'] = timestamp.encode('ascii') |
|
storage.data[b'wallet_type'] = cls.TYPE |
|
|
|
UTXOManager.initialize(storage) |
|
AddressLabelsManager.initialize(storage) |
|
|
|
if write: |
|
storage.save() |
|
|
|
def get_txtype(self): |
|
""" |
|
use TYPE constant instead if possible |
|
""" |
|
if self.TYPE == TYPE_P2PKH: |
|
return 'p2pkh' |
|
elif self.TYPE == TYPE_P2SH_P2WPKH: |
|
return 'p2sh-p2wpkh' |
|
elif self.TYPE in (TYPE_P2WPKH, |
|
TYPE_SEGWIT_WALLET_FIDELITY_BONDS): |
|
return 'p2wpkh' |
|
assert False |
|
|
|
def get_outtype(self, addr): |
|
try: |
|
script_type = detect_script_type( |
|
btc.CCoinAddress(addr).to_scriptPubKey()) |
|
except EngineError: |
|
# up to callers what to do with this; |
|
# it means we don't recognize this script type. |
|
return None |
|
if script_type == TYPE_P2PKH: |
|
return 'p2pkh' |
|
elif script_type == TYPE_P2WPKH: |
|
return 'p2wpkh' |
|
elif script_type == TYPE_P2SH_P2WPKH: |
|
return 'p2sh-p2wpkh' |
|
elif script_type == TYPE_P2WSH: |
|
return 'p2wsh' |
|
elif script_type == TYPE_P2TR: |
|
return 'p2tr' |
|
# should be unreachable; all possible returns |
|
# from detect_script_type are covered. |
|
assert False |
|
|
|
def sign_tx(self, tx, scripts, **kwargs): |
|
""" |
|
Add signatures to transaction for inputs referenced by scripts. |
|
|
|
args: |
|
tx: CMutableTransaction object |
|
scripts: {input_index: (output_script, amount)} |
|
kwargs: additional arguments for engine.sign_transaction |
|
returns: |
|
True, None if success. |
|
False, msg if signing failed, with error msg. |
|
""" |
|
for index, (script, amount) in scripts.items(): |
|
assert amount > 0 |
|
path = self.script_to_path(script) |
|
privkey, engine = self._get_key_from_path(path) |
|
sig, msg = engine.sign_transaction(tx, index, privkey, |
|
amount, **kwargs) |
|
if not sig: |
|
return False, msg |
|
return True, None |
|
|
|
@deprecated |
|
def get_key_from_addr(self, addr): |
|
""" |
|
There should be no reason for code outside the wallet to need a privkey. |
|
""" |
|
script = self._ENGINE.address_to_script(addr) |
|
path = self.script_to_path(script) |
|
privkey = self._get_key_from_path(path)[0] |
|
return privkey |
|
|
|
def get_external_addr(self, mixdepth): |
|
""" |
|
Return an address suitable for external distribution, including funding |
|
the wallet from other sources, or receiving payments or donations. |
|
JoinMarket will never generate these addresses for internal use. |
|
""" |
|
return self.get_new_addr(mixdepth, self.ADDRESS_TYPE_EXTERNAL) |
|
|
|
def get_internal_addr(self, mixdepth): |
|
""" |
|
Return an address for internal usage, as change addresses and when |
|
participating in transactions initiated by other parties. |
|
""" |
|
return self.get_new_addr(mixdepth, self.ADDRESS_TYPE_INTERNAL) |
|
|
|
def get_external_script(self, mixdepth): |
|
return self.get_new_script(mixdepth, self.ADDRESS_TYPE_EXTERNAL) |
|
|
|
def get_internal_script(self, mixdepth): |
|
return self.get_new_script(mixdepth, self.ADDRESS_TYPE_INTERNAL) |
|
|
|
@classmethod |
|
def addr_to_script(cls, addr): |
|
return cls._ENGINE.address_to_script(addr) |
|
|
|
@classmethod |
|
def pubkey_to_script(cls, pubkey): |
|
return cls._ENGINE.pubkey_to_script(pubkey) |
|
|
|
@classmethod |
|
def pubkey_to_addr(cls, pubkey): |
|
return cls._ENGINE.pubkey_to_address(pubkey) |
|
|
|
def script_to_addr(self, script): |
|
assert self.is_known_script(script) |
|
path = self.script_to_path(script) |
|
engine = self._get_key_from_path(path)[1] |
|
return engine.script_to_address(script) |
|
|
|
def get_script_code(self, script): |
|
""" |
|
For segwit wallets, gets the value of the scriptCode |
|
parameter required (see BIP143) for sighashing; this is |
|
required for protocols (like Joinmarket) where signature |
|
verification materials must be communicated between wallets. |
|
For non-segwit wallets, raises EngineError. |
|
""" |
|
path = self.script_to_path(script) |
|
priv, engine = self._get_key_from_path(path) |
|
pub = engine.privkey_to_pubkey(priv) |
|
return engine.pubkey_to_script_code(pub) |
|
|
|
@classmethod |
|
def pubkey_has_address(cls, pubkey, addr): |
|
return cls._ENGINE.pubkey_has_address(pubkey, addr) |
|
|
|
@classmethod |
|
def pubkey_has_script(cls, pubkey, script): |
|
return cls._ENGINE.pubkey_has_script(pubkey, script) |
|
|
|
@deprecated |
|
def get_key(self, mixdepth, address_type, index): |
|
raise NotImplementedError() |
|
|
|
def get_addr(self, mixdepth, address_type, index): |
|
script = self.get_script(mixdepth, address_type, index) |
|
return self.script_to_addr(script) |
|
|
|
def get_address_from_path(self, path): |
|
script = self.get_script_from_path(path) |
|
return self.script_to_addr(script) |
|
|
|
def get_new_addr(self, mixdepth, address_type): |
|
""" |
|
use get_external_addr/get_internal_addr |
|
""" |
|
script = self.get_new_script(mixdepth, address_type) |
|
return self.script_to_addr(script) |
|
|
|
def get_new_script(self, mixdepth, address_type): |
|
raise NotImplementedError() |
|
|
|
def get_wif(self, mixdepth, address_type, index): |
|
return self.get_wif_path(self.get_path(mixdepth, address_type, index)) |
|
|
|
def get_wif_path(self, path): |
|
priv, engine = self._get_key_from_path(path) |
|
return engine.privkey_to_wif(priv) |
|
|
|
def get_path(self, mixdepth=None, address_type=None, index=None): |
|
raise NotImplementedError() |
|
|
|
def get_details(self, path): |
|
""" |
|
Return mixdepth, address_type, index for a given path |
|
|
|
args: |
|
path: wallet path |
|
returns: |
|
tuple (mixdepth, type, index) |
|
|
|
type is one of 0, 1, 'imported', 2, 3 |
|
""" |
|
raise NotImplementedError() |
|
|
|
@deprecated |
|
def update_cache_index(self): |
|
""" |
|
Deprecated alias for save() |
|
""" |
|
self.save() |
|
|
|
def remove_old_utxos(self, tx): |
|
""" |
|
Remove all own inputs of tx from internal utxo list. |
|
|
|
args: |
|
tx: CMutableTransaction |
|
returns: |
|
{(txid, index): {'script': bytes, 'path': str, 'value': int} for all removed utxos |
|
""" |
|
removed_utxos = {} |
|
for inp in tx.vin: |
|
txid = inp.prevout.hash[::-1] |
|
index = inp.prevout.n |
|
md = self._utxos.have_utxo(txid, index) |
|
if md is False: |
|
continue |
|
path, value, height = self._utxos.remove_utxo(txid, index, md) |
|
script = self.get_script_from_path(path) |
|
removed_utxos[(txid, index)] = {'script': script, |
|
'path': path, |
|
'value': value} |
|
return removed_utxos |
|
|
|
def add_new_utxos(self, tx, height=None): |
|
""" |
|
Add all outputs of tx for this wallet to internal utxo list. |
|
They are also returned in standard dict form. |
|
args: |
|
tx: CMutableTransaction |
|
height: blockheight in which tx was included, or None |
|
if unconfirmed. |
|
returns: |
|
{(txid, index): {'script': bytes, 'path': tuple, 'value': int, |
|
'address': str} |
|
for all added utxos |
|
""" |
|
added_utxos = {} |
|
txid = tx.GetTxid()[::-1] |
|
for index, outs in enumerate(tx.vout): |
|
spk = outs.scriptPubKey |
|
val = outs.nValue |
|
try: |
|
self.add_utxo(txid, index, spk, val, height=height) |
|
except WalletError: |
|
continue |
|
|
|
path = self.script_to_path(spk) |
|
added_utxos[(txid, index)] = {'script': spk, 'path': path, 'value': val, |
|
'address': self._ENGINE.script_to_address(spk)} |
|
return added_utxos |
|
|
|
def add_utxo(self, txid, index, script, value, height=None): |
|
assert isinstance(txid, bytes) |
|
assert isinstance(index, Integral) |
|
assert isinstance(script, bytes) |
|
assert isinstance(value, Integral) |
|
|
|
if script not in self._script_map: |
|
raise WalletError("Tried to add UTXO for unknown key to wallet.") |
|
|
|
path = self.script_to_path(script) |
|
mixdepth = self._get_mixdepth_from_path(path) |
|
self._utxos.add_utxo(txid, index, path, value, mixdepth, height=height) |
|
|
|
def inputs_consumed_by_tx(self, tx): |
|
""" Given a transaction tx, checks |
|
which if any of the inputs belonged to this |
|
wallet, and returns [(index, CTxIn),..] for each. |
|
""" |
|
retval = [] |
|
for i, txin in enumerate(tx.vin): |
|
pub, msg = btc.extract_pubkey_from_witness(tx, i) |
|
if not pub: |
|
# this can certainly occur since other inputs |
|
# may not be a spending a script we recognize; |
|
# so we ignore the msg |
|
continue |
|
script = self.pubkey_to_script(pub) |
|
if script in self._script_map: |
|
retval.append((i, txin)) |
|
return retval |
|
|
|
def process_new_tx(self, txd, height=None): |
|
""" Given a newly seen transaction, deserialized as |
|
CMutableTransaction txd, |
|
process its inputs and outputs and update |
|
the utxo contents of this wallet accordingly. |
|
NOTE: this should correctly handle transactions that are not |
|
actually related to the wallet; it will not add (or remove, |
|
obviously) utxos that were not related since the underlying |
|
functions check this condition. |
|
""" |
|
removed_utxos = self.remove_old_utxos(txd) |
|
added_utxos = self.add_new_utxos(txd, height=height) |
|
return (removed_utxos, added_utxos) |
|
|
|
def select_utxos(self, mixdepth, amount, utxo_filter=None, |
|
select_fn=None, maxheight=None, includeaddr=False, |
|
require_auth_address=False): |
|
""" |
|
Select a subset of available UTXOS for a given mixdepth whose value is |
|
greater or equal to amount. If `includeaddr` is True, adds an `address` |
|
key to the returned dict. |
|
|
|
args: |
|
mixdepth: int, mixdepth to select utxos from, must be smaller or |
|
equal to wallet.max_mixdepth |
|
amount: int, total minimum amount of all selected utxos |
|
utxo_filter: list of (txid, index), utxos not to select |
|
maxheight: only select utxos with blockheight <= this. |
|
require_auth_address: if True, output utxos must include a |
|
standard wallet address. The first item of the output dict is |
|
guaranteed to be a suitable utxo. Result will be empty if no |
|
such utxo set could be found. |
|
|
|
returns: |
|
{(txid, index): {'script': bytes, 'path': tuple, 'value': int}} |
|
|
|
raises: |
|
NotEnoughFundsException: if mixdepth does not have utxos with |
|
enough value to satisfy amount |
|
""" |
|
assert isinstance(mixdepth, numbers.Integral) |
|
assert isinstance(amount, numbers.Integral) |
|
|
|
if not utxo_filter: |
|
utxo_filter = () |
|
for i in utxo_filter: |
|
assert len(i) == 2 |
|
assert isinstance(i[0], bytes) |
|
assert isinstance(i[1], numbers.Integral) |
|
utxos = self._utxos.select_utxos( |
|
mixdepth, amount, utxo_filter, select_fn, maxheight=maxheight) |
|
|
|
total_value = 0 |
|
standard_utxo = None |
|
for key, data in utxos.items(): |
|
if self.is_standard_wallet_script(data['path']): |
|
standard_utxo = key |
|
total_value += data['value'] |
|
data['script'] = self.get_script_from_path(data['path']) |
|
if includeaddr: |
|
data["address"] = self.get_address_from_path(data["path"]) |
|
|
|
if require_auth_address and not standard_utxo: |
|
# try to select more utxos, hoping for a standard one |
|
try: |
|
return self.select_utxos( |
|
mixdepth, total_value + 1, utxo_filter, select_fn, |
|
maxheight, includeaddr, require_auth_address) |
|
except NotEnoughFundsException: |
|
# recursive utxo selection was unsuccessful, give up |
|
return {} |
|
elif require_auth_address: |
|
utxos = collections.OrderedDict(utxos) |
|
utxos.move_to_end(standard_utxo, last=False) |
|
|
|
return utxos |
|
|
|
def disable_utxo(self, txid, index, disable=True): |
|
self._utxos.disable_utxo(txid, index, disable) |
|
# make sure the utxo database is persisted |
|
self.save() |
|
|
|
def toggle_disable_utxo(self, txid, index): |
|
is_disabled = self._utxos.is_disabled(txid, index) |
|
self.disable_utxo(txid, index, disable= not is_disabled) |
|
|
|
def reset_utxos(self): |
|
self._utxos.reset() |
|
|
|
def get_balance_by_mixdepth(self, verbose=True, |
|
include_disabled=False, |
|
maxheight=None): |
|
""" |
|
Get available funds in each active mixdepth. |
|
By default ignores disabled utxos in calculation. |
|
By default returns unconfirmed transactions, to filter |
|
confirmations, set maxheight to max acceptable blockheight. |
|
returns: {mixdepth: value} |
|
""" |
|
# TODO: verbose |
|
return self._utxos.get_balance_by_mixdepth(max_mixdepth=self.mixdepth, |
|
include_disabled=include_disabled, |
|
maxheight=maxheight) |
|
|
|
def get_utxos_by_mixdepth(self, include_disabled=False, includeheight=False): |
|
""" |
|
Get all UTXOs for active mixdepths. |
|
|
|
returns: |
|
{mixdepth: {(txid, index): |
|
{'script': bytes, 'path': tuple, 'value': int}}} |
|
(if `includeheight` is True, adds key 'height': int) |
|
""" |
|
mix_utxos = self._utxos.get_utxos_by_mixdepth() |
|
|
|
script_utxos = collections.defaultdict(dict) |
|
for md, data in mix_utxos.items(): |
|
if md > self.mixdepth: |
|
continue |
|
for utxo, (path, value, height) in data.items(): |
|
if not include_disabled and self._utxos.is_disabled(*utxo): |
|
continue |
|
script = self.get_script_from_path(path) |
|
addr = self.get_address_from_path(path) |
|
label = self.get_address_label(addr) |
|
script_utxos[md][utxo] = {'script': script, |
|
'path': path, |
|
'value': value, |
|
'address': addr, |
|
'label': label} |
|
if includeheight: |
|
script_utxos[md][utxo]['height'] = height |
|
return script_utxos |
|
|
|
|
|
def get_all_utxos(self, include_disabled=False): |
|
""" Get all utxos in the wallet, format of return |
|
is as for get_utxos_by_mixdepth for each mixdepth. |
|
""" |
|
mix_utxos = self.get_utxos_by_mixdepth( |
|
include_disabled=include_disabled) |
|
all_utxos = {} |
|
for d in mix_utxos.values(): |
|
all_utxos.update(d) |
|
return all_utxos |
|
|
|
@classmethod |
|
def _get_merge_algorithm(cls, algorithm_name=None): |
|
if not algorithm_name: |
|
try: |
|
algorithm_name = jm_single().config.get('POLICY', |
|
'merge_algorithm') |
|
except NoOptionError: |
|
algorithm_name = 'default' |
|
|
|
alg = cls.MERGE_ALGORITHMS.get(algorithm_name) |
|
if alg is None: |
|
raise Exception("Unknown merge algorithm: '{}'." |
|
"".format(algorithm_name)) |
|
return alg |
|
|
|
def _get_mixdepth_from_path(self, path): |
|
raise NotImplementedError() |
|
|
|
def get_script_from_path(self, path): |
|
""" |
|
internal note: This is the final sink for all operations that somehow |
|
need to derive a script. If anything goes wrong when deriving a |
|
script this is the place to look at. |
|
|
|
args: |
|
path: wallet path |
|
returns: |
|
script |
|
""" |
|
priv, engine = self._get_key_from_path(path) |
|
return engine.key_to_script(priv) |
|
|
|
def get_script(self, mixdepth, address_type, index): |
|
path = self.get_path(mixdepth, address_type, index) |
|
return self.get_script_from_path(path) |
|
|
|
def _get_key_from_path(self, path): |
|
raise NotImplementedError() |
|
|
|
def get_path_repr(self, path): |
|
""" |
|
Get a human-readable representation of the wallet path. |
|
|
|
args: |
|
path: path tuple |
|
returns: |
|
str |
|
""" |
|
raise NotImplementedError() |
|
|
|
def path_repr_to_path(self, pathstr): |
|
""" |
|
Convert a human-readable path representation to internal representation. |
|
|
|
args: |
|
pathstr: str |
|
returns: |
|
path tuple |
|
""" |
|
raise NotImplementedError() |
|
|
|
def get_next_unused_index(self, mixdepth, address_type): |
|
""" |
|
Get the next index for public scripts/addresses not yet handed out. |
|
|
|
returns: |
|
int >= 0 |
|
""" |
|
raise NotImplementedError() |
|
|
|
def get_mnemonic_words(self): |
|
""" |
|
Get mnemonic seed words for master key. |
|
|
|
returns: |
|
mnemonic_seed, seed_extension |
|
mnemonic_seed is a space-separated str of words |
|
seed_extension is a str or None |
|
""" |
|
raise NotImplementedError() |
|
|
|
def sign_message(self, message, path): |
|
""" |
|
Sign the message using the key referenced by path. |
|
|
|
args: |
|
message: bytes |
|
path: path tuple |
|
returns as tuple: |
|
address of key that signed |
|
signature as base64-encoded string |
|
""" |
|
priv, engine = self._get_key_from_path(path) |
|
addr = engine.privkey_to_address(priv) |
|
return addr, engine.sign_message(priv, message) |
|
|
|
def get_wallet_name(self): |
|
""" Returns the name used as a label for this |
|
specific Joinmarket wallet in Bitcoin Core. |
|
""" |
|
return JM_WALLET_NAME_PREFIX + self.get_wallet_id() |
|
|
|
def get_wallet_id(self): |
|
""" |
|
Get a human-readable identifier for the wallet. |
|
|
|
returns: |
|
str |
|
""" |
|
raise NotImplementedError() |
|
|
|
def check_wallet_passphrase(self, passphrase): |
|
return self._storage.check_password(passphrase) |
|
|
|
def change_wallet_passphrase(self, passphrase): |
|
self._storage.change_password(passphrase) |
|
|
|
def yield_imported_paths(self, mixdepth): |
|
""" |
|
Get an iterator for all imported keys in given mixdepth. |
|
|
|
params: |
|
mixdepth: int |
|
returns: |
|
iterator of wallet paths |
|
""" |
|
return iter([]) |
|
|
|
def is_standard_wallet_script(self, path): |
|
""" |
|
Check if the path's script is of the same type as the standard wallet |
|
key type. |
|
|
|
return: |
|
bool |
|
""" |
|
raise NotImplementedError() |
|
|
|
def is_known_addr(self, addr): |
|
""" |
|
Check if address is known to belong to this wallet. |
|
|
|
params: |
|
addr: str |
|
returns: |
|
bool |
|
""" |
|
script = self.addr_to_script(addr) |
|
return script in self._script_map |
|
|
|
def is_known_script(self, script): |
|
""" |
|
Check if script is known to belong to this wallet. |
|
|
|
params: |
|
script: bytes |
|
returns: |
|
bool |
|
""" |
|
assert isinstance(script, bytes) |
|
return script in self._script_map |
|
|
|
def get_addr_mixdepth(self, addr): |
|
script = self.addr_to_script(addr) |
|
return self.get_script_mixdepth(script) |
|
|
|
def get_script_mixdepth(self, script): |
|
path = self.script_to_path(script) |
|
return self._get_mixdepth_from_path(path) |
|
|
|
def yield_known_paths(self): |
|
""" |
|
Generator for all paths currently known to the wallet |
|
|
|
returns: |
|
path generator |
|
""" |
|
for s in self._script_map.values(): |
|
yield s |
|
|
|
def addr_to_path(self, addr): |
|
script = self.addr_to_script(addr) |
|
return self.script_to_path(script) |
|
|
|
def script_to_path(self, script): |
|
assert script in self._script_map |
|
return self._script_map[script] |
|
|
|
def set_next_index(self, mixdepth, address_type, index, force=False): |
|
""" |
|
Set the next index to use when generating a new key pair. |
|
|
|
params: |
|
mixdepth: int |
|
address_type: 0 (external) or 1 (internal) |
|
index: int |
|
force: True if you know the wallet already knows all scripts |
|
up to (excluding) the given index |
|
|
|
Warning: improper use of 'force' will cause undefined behavior! |
|
""" |
|
raise NotImplementedError() |
|
|
|
def rewind_wallet_indices(self, used_indices, saved_indices): |
|
for md in used_indices: |
|
for address_type in range(min(len(used_indices[md]), len(saved_indices[md]))): |
|
index = max(used_indices[md][address_type], |
|
saved_indices[md][address_type]) |
|
self.set_next_index(md, address_type, index, force=True) |
|
|
|
def _get_default_used_indices(self): |
|
return {x: [0, 0] for x in range(self.max_mixdepth + 1)} |
|
|
|
def get_used_indices(self, addr_gen): |
|
""" Returns a dict of max used indices for each branch in |
|
the wallet, from the given addresses addr_gen, assuming |
|
that they are known to the wallet. |
|
""" |
|
indices = self._get_default_used_indices() |
|
|
|
for addr in addr_gen: |
|
if not self.is_known_addr(addr): |
|
continue |
|
md, address_type, index = self.get_details( |
|
self.addr_to_path(addr)) |
|
if address_type not in (self.BIP32_EXT_ID, self.BIP32_INT_ID, |
|
FidelityBondMixin.BIP32_TIMELOCK_ID, FidelityBondMixin.BIP32_BURN_ID): |
|
assert address_type == 'imported' |
|
continue |
|
indices[md][address_type] = max(indices[md][address_type], index + 1) |
|
|
|
return indices |
|
|
|
def check_gap_indices(self, used_indices): |
|
""" Return False if any of the provided indices (which should be |
|
those seen from listtransactions as having been used, for |
|
this wallet/label) are higher than the ones recorded in the index |
|
cache.""" |
|
|
|
for md in used_indices: |
|
for address_type in (self.ADDRESS_TYPE_EXTERNAL, |
|
self.ADDRESS_TYPE_INTERNAL): |
|
if used_indices[md][address_type] >\ |
|
max(self.get_next_unused_index(md, address_type), 0): |
|
return False |
|
return True |
|
|
|
def set_address_label(self, addr, label): |
|
if self.is_known_addr(addr): |
|
self._addr_labels.set_label(addr, label) |
|
self.save() |
|
else: |
|
raise UnknownAddressForLabel(addr) |
|
|
|
def get_address_label(self, addr): |
|
if self.is_known_addr(addr): |
|
return self._addr_labels.get_label(addr) |
|
else: |
|
raise UnknownAddressForLabel(addr) |
|
|
|
def close(self): |
|
self._storage.close() |
|
|
|
def __del__(self): |
|
self.close() |
|
|
|
|
|
class PSBTWalletMixin(object): |
|
""" |
|
Mixin for BaseWallet to provide BIP174 |
|
functions. |
|
""" |
|
def __init__(self, storage, **kwargs): |
|
super().__init__(storage, **kwargs) |
|
|
|
def is_input_finalized(self, psbt_input): |
|
""" This should be a convenience method in python-bitcointx. |
|
However note: this is not a static method and tacitly |
|
assumes that the input under examination is of the wallet's |
|
type. |
|
""" |
|
assert isinstance(psbt_input, btc.PSBT_Input) |
|
if not psbt_input.utxo: |
|
return False |
|
if isinstance(self, (LegacyWallet, SegwitLegacyWallet)): |
|
if not psbt_input.final_script_sig: |
|
return False |
|
if isinstance(self, (SegwitLegacyWallet, SegwitWallet)): |
|
if not psbt_input.final_script_witness: |
|
return False |
|
return True |
|
|
|
@staticmethod |
|
def human_readable_psbt(in_psbt): |
|
""" Returns a jsonified indented string with all relevant |
|
information, in human readable form, contained in a PSBT. |
|
Warning: the output can be very verbose in certain cases. |
|
""" |
|
assert isinstance(in_psbt, btc.PartiallySignedTransaction) |
|
outdict = {} |
|
outdict["psbt-version"] = in_psbt.version |
|
|
|
# human readable serialization of these three global fields is for |
|
# now on a "best-effort" basis, i.e. just takes the representation |
|
# provided by the underlying classes in bitcointx, though this may |
|
# not be very readable. |
|
# TODO: Improve proprietary/unknown as needed. |
|
if in_psbt.xpubs: |
|
outdict["xpubs"] = {str(k): bintohex( |
|
v.serialize()) for k, v in in_psbt.xpubs.items()} |
|
if in_psbt.proprietary_fields: |
|
outdict["proprietary-fields"] = str(in_psbt.proprietary_fields) |
|
if in_psbt.unknown_fields: |
|
outdict["unknown-fields"] = str(in_psbt.unknown_fields) |
|
|
|
outdict["unsigned-tx"] = btc.human_readable_transaction( |
|
in_psbt.unsigned_tx, jsonified=False) |
|
outdict["psbt-inputs"] = [] |
|
for inp in in_psbt.inputs: |
|
outdict["psbt-inputs"].append( |
|
PSBTWalletMixin.human_readable_psbt_in(inp)) |
|
outdict["psbt-outputs"] = [] |
|
for out in in_psbt.outputs: |
|
outdict["psbt-outputs"].append( |
|
PSBTWalletMixin.human_readable_psbt_out(out)) |
|
return json.dumps(outdict, indent=4) |
|
|
|
@staticmethod |
|
def human_readable_psbt_in(psbt_input): |
|
""" Returns a dict containing human readable information |
|
about a bitcointx.core.psbt.PSBT_Input object. |
|
""" |
|
assert isinstance(psbt_input, btc.PSBT_Input) |
|
outdict = {} |
|
if psbt_input.index is not None: |
|
outdict["input-index"] = psbt_input.index |
|
if psbt_input.utxo: |
|
if isinstance(psbt_input.utxo, btc.CTxOut): |
|
outdict["utxo"] = btc.human_readable_output(psbt_input.utxo) |
|
elif isinstance(psbt_input.utxo, btc.CTransaction): |
|
# human readable full transaction is *too* verbose: |
|
outdict["utxo"] = bintohex(psbt_input.utxo.serialize()) |
|
if psbt_input.witness_utxo: |
|
outdict["witness_utxo"] = bintohex(psbt_input.witness_utxo.serialize()) |
|
else: |
|
assert False, "invalid PSBT Input utxo field." |
|
if psbt_input.sighash_type: |
|
outdict["sighash-type"] = psbt_input.sighash_type |
|
if psbt_input.redeem_script: |
|
outdict["redeem-script"] = bintohex(psbt_input.redeem_script) |
|
if psbt_input.witness_script: |
|
outdict["witness-script"] = bintohex(psbt_input.witness_script) |
|
if psbt_input.partial_sigs: |
|
# convert the dict entries to hex: |
|
outdict["partial-sigs"] = {bintohex(k): bintohex(v) for k,v in \ |
|
psbt_input.partial_sigs.items()} |
|
# Note we do not currently add derivation info to our own inputs, |
|
# but probably will in future ( TODO ), still this is shown for |
|
# externally generated PSBTs: |
|
if psbt_input.derivation_map: |
|
# TODO it would be more useful to print the indexes of the |
|
# derivation path as integers, than 4 byte hex: |
|
outdict["derivation-map"] = {bintohex(k): bintohex(v.serialize( |
|
)) for k, v in psbt_input.derivation_map.items()} |
|
|
|
# we show these fields on a best-effort basis; same comment as for |
|
# globals section as mentioned in hr_psbt() |
|
if psbt_input.proprietary_fields: |
|
outdict["proprietary-fields"] = str(psbt_input.proprietary_fields) |
|
if psbt_input.unknown_fields: |
|
outdict["unknown-fields"] = str(psbt_input.unknown_fields) |
|
if psbt_input.proof_of_reserves_commitment: |
|
outdict["proof-of-reserves-commitment"] = \ |
|
str(psbt_input.proof_of_reserves_commitment) |
|
|
|
outdict["final-scriptSig"] = bintohex(psbt_input.final_script_sig) |
|
outdict["final-scriptWitness"] = bintohex( |
|
psbt_input.final_script_witness.serialize()) |
|
|
|
return outdict |
|
|
|
@staticmethod |
|
def human_readable_psbt_out(psbt_output): |
|
""" Returns a dict containing human readable information |
|
about a PSBT_Output object. |
|
""" |
|
assert isinstance(psbt_output, btc.PSBT_Output) |
|
outdict = {} |
|
if psbt_output.index is not None: |
|
outdict["output-index"] = psbt_output.index |
|
|
|
if psbt_output.derivation_map: |
|
# See note to derivation map in hr_psbt_in() |
|
outdict["derivation-map"] = {bintohex(k): bintohex(v.serialize( |
|
)) for k, v in psbt_output.derivation_map.items()} |
|
|
|
if psbt_output.redeem_script: |
|
outdict["redeem-script"] = bintohex(psbt_output.redeem_script) |
|
if psbt_output.witness_script: |
|
outdict["witness-script"] = bintohex(psbt_output.witness_script) |
|
|
|
if psbt_output.proprietary_fields: |
|
outdict["proprietary-fields"] = str(psbt_output.proprietary_fields) |
|
if psbt_output.unknown_fields: |
|
outdict["unknown-fields"] = str(psbt_output.unknown_fields) |
|
|
|
return outdict |
|
|
|
@staticmethod |
|
def witness_utxos_to_psbt_utxos(utxos): |
|
""" Given a dict of utxos as returned from select_utxos, |
|
convert them to the format required to populate PSBT inputs, |
|
namely CTxOut. Note that the non-segwit case is different, there |
|
you should provide an entire CMutableTransaction object instead. |
|
""" |
|
return [btc.CMutableTxOut(v["value"], |
|
v["script"]) for _, v in utxos.items()] |
|
|
|
@staticmethod |
|
def check_finalized_input_type(psbt_input): |
|
""" Given an input of a PSBT which is already finalized, |
|
return its type as either "sw-legacy" or "sw" or False |
|
if not one of these two types. |
|
TODO: can be extented to other types, does not currently |
|
support any non-segwit input. |
|
""" |
|
assert isinstance(psbt_input, btc.PSBT_Input) |
|
if not psbt_input.final_script_witness: |
|
return False |
|
if psbt_input.witness_utxo.scriptPubKey.is_p2sh(): |
|
# Note: p2sh does not prove the redeemscript; |
|
# we check the finalscriptSig is p2wpkh: |
|
fss = btc.CScript(next(btc.CScript( |
|
psbt_input.final_script_sig).raw_iter())[1]) |
|
if fss.is_witness_v0_keyhash(): |
|
return "sw-legacy" |
|
elif psbt_input.witness_utxo.scriptPubKey.is_witness_v0_keyhash(): |
|
return "sw" |
|
else: |
|
return False |
|
|
|
def create_psbt_from_tx(self, tx, spent_outs=None, force_witness_utxo=True): |
|
""" Given a CMutableTransaction object, which should not currently |
|
contain signatures, we create and return a new PSBT object of type |
|
btc.PartiallySignedTransaction. |
|
Optionally the information about the spent outputs that is stored |
|
in PSBT_IN_NONWITNESS_UTXO, PSBT_IN_WITNESS_UTXO and PSBT_IN_REDEEM_SCRIPT |
|
can also be provided, one item per input, in the tuple (spent_outs). |
|
These objects should be either CMutableTransaction, CTxOut or None, |
|
Note that redeem script information cannot be provided for inputs which |
|
we don't own. |
|
""" |
|
# TODO: verify tx contains no signatures as a sanity check? |
|
new_psbt = btc.PartiallySignedTransaction(unsigned_tx=tx) |
|
if spent_outs is None: |
|
# user has not provided input script information; psbt |
|
# will not yet be usable for signing. |
|
return new_psbt |
|
for i, txinput in enumerate(new_psbt.inputs): |
|
if spent_outs[i] is None: |
|
# as above, will not be signable in this case |
|
continue |
|
if isinstance(spent_outs[i], (btc.CTransaction, btc.CTxOut)): |
|
# note: if spent_outs[i] is in fact CTransaction, then |
|
# the unsigned_tx must be provided (tx) in order to populate |
|
# the witness_utxo field of this PSBT_Input (and if it isn't, |
|
# this argument is ignored), but also, until the redeem script |
|
# is populated, then for the p2sh case, the lib can't know |
|
# whether it is witness, so we must force the witness_utxo here, |
|
# unless this is switched off in the kwargs of this function: |
|
txinput.set_utxo(spent_outs[i], tx, |
|
force_witness_utxo=force_witness_utxo) |
|
else: |
|
assert False, "invalid spent output type passed into PSBT creator" |
|
# we now insert redeemscripts where that is possible and necessary: |
|
for i, txinput in enumerate(new_psbt.inputs): |
|
if isinstance(txinput.witness_utxo, btc.CTxOut): |
|
# witness |
|
if txinput.witness_utxo.scriptPubKey.is_witness_scriptpubkey(): |
|
# nothing needs inserting; the scriptSig is empty. |
|
continue |
|
elif txinput.witness_utxo.scriptPubKey.is_p2sh(): |
|
try: |
|
path = self.script_to_path(txinput.witness_utxo.scriptPubKey) |
|
except AssertionError: |
|
# this happens when an input is provided but it's not in |
|
# this wallet; in this case, we cannot set the redeem script. |
|
continue |
|
privkey, _ = self._get_key_from_path(path) |
|
txinput.redeem_script = btc.pubkey_to_p2wpkh_script( |
|
btc.privkey_to_pubkey(privkey)) |
|
return new_psbt |
|
|
|
def sign_psbt(self, in_psbt, with_sign_result=False): |
|
""" Given a serialized PSBT in raw binary format, |
|
iterate over the inputs and sign all that we can sign with this wallet. |
|
NB IT IS UP TO CALLERS TO ENSURE THAT THEY ACTUALLY WANT TO SIGN |
|
THIS TRANSACTION! |
|
The above is important especially in coinjoin scenarios. |
|
Return: (psbt, msg) |
|
msg: error message or None |
|
if not `with_sign_result`: |
|
psbt: signed psbt in binary serialization, or None if error. |
|
if `with_sign_result` True: |
|
psbt: (PSBT_SignResult object, psbt (deserialized) object) |
|
""" |
|
try: |
|
new_psbt = btc.PartiallySignedTransaction.from_binary(in_psbt) |
|
except Exception as e: |
|
return None, "Unable to deserialize binary PSBT, error: " + repr(e) |
|
privkeys = [] |
|
for k, v in self._utxos._utxo.items(): |
|
for k2, v2 in v.items(): |
|
key = self._get_key_from_path(v2[0]) |
|
if FidelityBondMixin.is_timelocked_path(v2[0]) and len(key[0]) == 2: |
|
#key is ((privkey, locktime), engine) for timelocked addrs |
|
key = (key[0][0], key[1]) |
|
privkeys.append(key) |
|
|
|
# in the rare situation that we want to sign a psbt using private keys |
|
# to utxos that we've stopped tracking, let's also find inputs that |
|
# belong to us and add those private keys as well |
|
for vin in new_psbt.inputs: |
|
try: |
|
path = self.script_to_path(vin.utxo.scriptPubKey) |
|
key = self._get_key_from_path(path) |
|
if key not in privkeys: |
|
privkeys.append(key) |
|
except AssertionError: |
|
# we can safely assume that an exception means we do not |
|
# have the ability to sign for this input |
|
continue |
|
except AttributeError: |
|
# shouldn't happen for properly constructed psbts |
|
# however, psbts with no utxo information will raise |
|
# an AttributeError exception. we simply ignore it. |
|
continue |
|
|
|
jmckeys = list(btc.JMCKey(x[0][:-1]) for x in privkeys) |
|
new_keystore = btc.KeyStore.from_iterable(jmckeys) |
|
|
|
# for p2sh inputs that we want to sign, the redeem_script |
|
# field must be populated by us, as the counterparty did not |
|
# know it. If this was set in an earlier create-psbt role, |
|
# then overwriting it is harmless (preimage resistance). |
|
if isinstance(self, SegwitLegacyWallet): |
|
for i, txinput in enumerate(new_psbt.inputs): |
|
tu = txinput.witness_utxo |
|
if isinstance(tu, btc.CTxOut): |
|
# witness |
|
if tu.scriptPubKey.is_witness_scriptpubkey(): |
|
# native segwit; no insertion needed. |
|
continue |
|
elif tu.scriptPubKey.is_p2sh(): |
|
try: |
|
path = self.script_to_path(tu.scriptPubKey) |
|
except AssertionError: |
|
# this happens when an input is provided but it's not in |
|
# this wallet; in this case, we cannot set the redeem script. |
|
continue |
|
privkey, _ = self._get_key_from_path(path) |
|
txinput.redeem_script = btc.pubkey_to_p2wpkh_script( |
|
btc.privkey_to_pubkey(privkey)) |
|
# no else branch; any other form of scriptPubKey will just be |
|
# ignored. |
|
try: |
|
signresult = new_psbt.sign(new_keystore) |
|
except Exception as e: |
|
return None, repr(e) |
|
if not with_sign_result: |
|
return new_psbt.serialize(), None |
|
else: |
|
return (signresult, new_psbt), None |
|
|
|
class SNICKERWalletMixin(object): |
|
|
|
SUPPORTED_SNICKER_VERSIONS = bytes([0, 1]) |
|
|
|
def __init__(self, storage, **kwargs): |
|
super().__init__(storage, **kwargs) |
|
|
|
def check_tweak_matches_and_import(self, addr, tweak, tweaked_key, |
|
source_mixdepth): |
|
""" Given the address from our HD wallet, the tweak bytes and |
|
the tweaked public key, check the tweak correctly generates the |
|
claimed tweaked public key. If not, (False, errmsg is returned), |
|
if so, we import the private key to a mixdepth *distinct* from |
|
source_mixdepth, and (True, None) is returned, if it works. |
|
""" |
|
try: |
|
priv = self.get_key_from_addr(addr) |
|
except: |
|
return False, "Not our address" |
|
tweaked_privkey = btc.snicker_privkey_tweak(priv, tweak) |
|
if not btc.privkey_to_pubkey(tweaked_privkey) == tweaked_key: |
|
hextk = bintohex(tweaked_key) |
|
hexftpk = bintohex(btc.privkey_to_pubkey(tweaked_privkey)) |
|
return False, ("Was not able to recover tweaked pubkey " |
|
"from tweaked privkey - code error." |
|
"Expected: " + hextk + ", got: " + hexftpk) |
|
# note that the WIF is not preserving SPK type, it's implied |
|
# for the wallet. |
|
try: |
|
self.import_private_key((source_mixdepth + 1) % (self.mixdepth + 1), |
|
self._ENGINE.privkey_to_wif(tweaked_privkey)) |
|
self.save() |
|
except: |
|
return False, "Failed to import private key." |
|
return True, None |
|
|
|
def create_snicker_proposal(self, our_inputs, their_input, our_input_utxos, |
|
their_input_utxo, net_transfer, network_fee, |
|
our_priv, their_pub, our_spk, change_spk, |
|
encrypted=True, version_byte=1): |
|
""" Creates a SNICKER proposal from the given transaction data. |
|
This only applies to existing specification, i.e. SNICKER v 00 or 01. |
|
This is only to be used for Joinmarket and only segwit wallets. |
|
`our_inputs`, `their_input` - utxo format used in JM wallets (first is list), |
|
keyed by (tixd, n), as dicts (currently of single entry). |
|
`our_input_utxos`, `their..` - type CTxOut (contains value, scriptPubKey) |
|
net_transfer - amount, after bitcoin transaction fee, transferred from |
|
Proposer (our) to Receiver (their). May be negative. |
|
network_fee - total bitcoin network transaction fee to be paid (so estimates |
|
must occur before this function). |
|
`our_priv`, `their_pub` - these are the keys to be used in ECDH to derive |
|
the tweak as per the BIP. Note `their_pub` may or may not be associated with |
|
the input of the receiver, so is specified here separately. Note also that |
|
according to the BIP the privkey we use *must* be the one corresponding to |
|
the first input we provided, else (properly coded) Receivers will reject our |
|
proposal. |
|
`our_spk` - a scriptPubKey for the Proposer coinjoin output |
|
`change_spk` - a change scriptPubkey for the proposer as per BIP |
|
`encrypted` - whether or not to return the ECIES encrypted version of the |
|
proposal. |
|
`version_byte` - which of currently specified Snicker versions is being |
|
used, (0 for reused address, 1 for inferred key). |
|
returns: |
|
if encrypted is True: |
|
base 64 encoded encrypted transaction proposal as a string |
|
else: |
|
binary serialized plaintext SNICKER message. |
|
""" |
|
assert isinstance(self, PSBTWalletMixin) |
|
# before constructing the bitcoin transaction we must calculate the output |
|
# amounts |
|
# find our total input value: |
|
our_input_val = sum([x.nValue for x in our_input_utxos]) |
|
if our_input_val - their_input_utxo.nValue - network_fee <= 0: |
|
raise Exception( |
|
"Cannot create SNICKER proposal, Proposer input too small") |
|
# we must also use ecdh to calculate the output scriptpubkey for the |
|
# receiver |
|
# First, check that `our_priv` corresponds to scriptPubKey in |
|
# first entry in `our_input_utxos` to prevent callers from making |
|
# useless proposals. |
|
expected_pub = btc.privkey_to_pubkey(our_priv) |
|
expected_spk = self.pubkey_to_script(expected_pub) |
|
assert our_input_utxos[0].scriptPubKey == expected_spk |
|
# now we create the ecdh based tweak: |
|
tweak_bytes = btc.ecdh(our_priv[:-1], their_pub) |
|
tweaked_pub = btc.snicker_pubkey_tweak(their_pub, tweak_bytes) |
|
wtype = self.get_txtype() |
|
if wtype == "p2wpkh": |
|
tweaked_spk = btc.pubkey_to_p2wpkh_script(tweaked_pub) |
|
elif wtype == "p2sh-p2wpkh": |
|
tweaked_spk = btc.pubkey_to_p2sh_p2wpkh_script(tweaked_pub) |
|
else: |
|
raise NotImplementedError("SNICKER only supports " |
|
"p2sh-p2wpkh or p2wpkh outputs.") |
|
tweaked_addr, our_addr, change_addr = [str( |
|
btc.CCoinAddress.from_scriptPubKey(x)) for x in ( |
|
tweaked_spk, our_spk, change_spk)] |
|
outputs = btc.construct_snicker_outputs(our_input_val, |
|
their_input_utxo.nValue, |
|
tweaked_addr, |
|
our_addr, |
|
change_addr, |
|
network_fee, |
|
net_transfer) |
|
assert all([x["value"] > 0 for x in outputs]) |
|
|
|
# version and locktime as currently specified in the BIP |
|
# for 0/1 version SNICKER. (Note the locktime is partly because |
|
# of expected delays). |
|
# We do not use `make_shuffled_tx` because we need a specific order |
|
# on the input side: the receiver's is placed randomly, but the first |
|
# *of ours* is the one used for ECDH. |
|
all_inputs = copy.deepcopy(our_inputs) |
|
insertion_index = random.randrange(len(all_inputs)+1) |
|
all_inputs.insert(insertion_index, their_input) |
|
all_input_utxos = copy.deepcopy(our_input_utxos) |
|
all_input_utxos.insert(insertion_index, their_input_utxo) |
|
# for outputs, we must shuffle as normal: |
|
random.shuffle(outputs) |
|
tx = btc.mktx(all_inputs, outputs, |
|
version=2, locktime=0) |
|
# create the psbt and then sign our input. |
|
snicker_psbt = self.create_psbt_from_tx(tx, |
|
spent_outs=all_input_utxos) |
|
# having created the PSBT, sign our input |
|
signed_psbt_and_signresult, err = self.sign_psbt( |
|
snicker_psbt.serialize(), with_sign_result=True) |
|
assert err is None |
|
signresult, partially_signed_psbt = signed_psbt_and_signresult |
|
assert signresult.num_inputs_signed == len(our_inputs) |
|
assert signresult.num_inputs_final == len(our_inputs) |
|
assert not signresult.is_final |
|
snicker_serialized_message = btc.SNICKER_MAGIC_BYTES + bytes( |
|
[version_byte]) + btc.SNICKER_FLAG_NONE + tweak_bytes + \ |
|
partially_signed_psbt.serialize() |
|
|
|
if not encrypted: |
|
return snicker_serialized_message |
|
|
|
# encryption has been requested; |
|
# we apply ECIES in the form given by the BIP. |
|
return btc.ecies_encrypt(snicker_serialized_message, their_pub) |
|
|
|
def parse_proposal_to_signed_tx(self, addr, proposal, |
|
acceptance_callback): |
|
""" Given a candidate privkey (binary and compressed format), |
|
and a candidate encrypted SNICKER proposal, attempt to decrypt |
|
and validate it in all aspects. If validation fails the first |
|
return value is None and the second is the reason as a string. |
|
|
|
If all validation checks pass, the next step is checking |
|
acceptance according to financial rules: the acceptance |
|
callback must be a function that accepts four arguments: |
|
(our_ins, their_ins, our_outs, their_outs), where *ins values |
|
are lists of CTxIns and *outs are lists of CTxOuts, |
|
and must return only True/False where True means that the |
|
transaction should be signed. |
|
|
|
If True is returned from the callback, the following are returned |
|
from this function: |
|
(raw transaction for broadcasting (serialized), |
|
tweak value as bytes, derived output spk belonging to receiver) |
|
|
|
Note: flags is currently always None as version is only 0 or 1. |
|
""" |
|
assert isinstance(self, PSBTWalletMixin) |
|
|
|
try: |
|
privkey = self.get_key_from_addr(addr) |
|
except: |
|
return None, "Could not derive privkey from address" |
|
|
|
our_pub = btc.privkey_to_pubkey(privkey) |
|
|
|
if len(proposal) < 5: |
|
return None, "Invalid proposal, too short." |
|
|
|
if base64.b64decode(proposal)[:4] == btc.ECIES_MAGIC_BYTES: |
|
# attempt decryption and reject if fails: |
|
try: |
|
snicker_message = btc.ecies_decrypt(privkey, proposal) |
|
except Exception as e: |
|
return None, "Failed to decrypt." + repr(e) |
|
else: |
|
snicker_message = proposal |
|
|
|
# magic + version,flag + tweak + psbt: |
|
# TODO replace '20' with the minimum feasible PSBT. |
|
if len(snicker_message) < 7 + 2 + 32 + 20: |
|
return None, "Invalid proposal, too short." |
|
|
|
if snicker_message[:7] != btc.SNICKER_MAGIC_BYTES: |
|
return None, "Invalid SNICKER magic bytes." |
|
|
|
version_byte = bytes([snicker_message[7]]) |
|
flag_byte = bytes([snicker_message[8]]) |
|
if version_byte not in self.SUPPORTED_SNICKER_VERSIONS: |
|
return None, "Unrecognized SNICKER version: " + version_byte |
|
if flag_byte != btc.SNICKER_FLAG_NONE: |
|
return None, "Invalid flag byte for version 0,1: " + flag_byte |
|
|
|
tweak_bytes = snicker_message[9:41] |
|
candidate_psbt_serialized = snicker_message[41:] |
|
# attempt to validate the PSBT's format: |
|
try: |
|
cpsbt = btc.PartiallySignedTransaction.from_base64_or_binary( |
|
candidate_psbt_serialized) |
|
except: |
|
return None, "Invalid PSBT format." |
|
|
|
utx = cpsbt.unsigned_tx |
|
# validate that it contains one signature, and two inputs. |
|
# else the proposal is invalid. To achieve this, we call |
|
# PartiallySignedTransaction.sign() with an empty KeyStore, |
|
# which populates the 'is_signed' info fields for us. Note that |
|
# we do not use the PSBTWalletMixin.sign_psbt() which automatically |
|
# signs with our keys. |
|
if len(utx.vin) < 2: |
|
return None, "PSBT proposal does not contain at least 2 inputs." |
|
testsignresult = cpsbt.sign(btc.KeyStore(), finalize=False) |
|
|
|
# Note: "num_inputs_signed" refers to how many *we* signed, |
|
# which is obviously none here as we provided no keys. |
|
if not (testsignresult.num_inputs_signed == 0 and \ |
|
testsignresult.num_inputs_final == len(utx.vin)-1 and \ |
|
not testsignresult.is_final): |
|
return None, "PSBT proposal is not fully signed by proposer." |
|
|
|
# Validate that we own one SNICKER style output: |
|
spk = btc.verify_snicker_output(utx, our_pub, tweak_bytes) |
|
|
|
if spk[0] == -1: |
|
return None, "Tweaked destination not found exactly once." |
|
our_output_index = spk[0] |
|
our_output_amount = utx.vout[our_output_index].nValue |
|
|
|
# At least one other output must have an amount equal to that at |
|
# `our_output_index`, according to the spec. |
|
found = 0 |
|
for i, o in enumerate(utx.vout): |
|
if i == our_output_index: |
|
continue |
|
if o.nValue == our_output_amount: |
|
found += 1 |
|
if found != 1: |
|
return None, "Invalid SNICKER, there are not two equal outputs." |
|
|
|
# To allow the acceptance callback to assess validity, we must identify |
|
# which input is ours and which is(are) not. |
|
# See https://github.com/Simplexum/python-bitcointx/issues/39 |
|
# for background on why this is different per wallet type |
|
unsigned_index = -1 |
|
for i, psbtinputsigninfo in enumerate(testsignresult.inputs_info): |
|
if psbtinputsigninfo is None: |
|
unsigned_index = i |
|
break |
|
if psbtinputsigninfo.num_new_sigs == 0 and \ |
|
psbtinputsigninfo.is_final == False: |
|
unsigned_index = i |
|
break |
|
assert unsigned_index != -1 |
|
# All validation checks passed. We now check whether the |
|
#transaction is acceptable according to the caller: |
|
if not acceptance_callback([utx.vin[unsigned_index]], |
|
[x for i, x in enumerate(utx.vin) if i != unsigned_index], |
|
[utx.vout[our_output_index]], |
|
[x for i, x in enumerate(utx.vout) if i != our_output_index]): |
|
return None, "Caller rejected transaction for signing." |
|
|
|
# Acceptance passed, prepare the deserialized tx for signing by us: |
|
signresult_and_signedpsbt, err = self.sign_psbt(cpsbt.serialize(), |
|
with_sign_result=True) |
|
if err: |
|
return None, "Unable to sign proposed PSBT, reason: " + err |
|
signresult, signed_psbt = signresult_and_signedpsbt |
|
assert signresult.num_inputs_signed == 1 |
|
assert signresult.num_inputs_final == len(utx.vin) |
|
assert signresult.is_final |
|
# we now know the transaction is valid and fully signed; return to caller, |
|
# along with supporting data for this tx: |
|
return (signed_psbt.extract_transaction(), tweak_bytes, spk[1]) |
|
|
|
class ImportWalletMixin(object): |
|
""" |
|
Mixin for BaseWallet to support importing keys. |
|
""" |
|
_IMPORTED_STORAGE_KEY = b'imported_keys' |
|
_IMPORTED_ROOT_PATH = b'imported' |
|
|
|
def __init__(self, storage, **kwargs): |
|
# {mixdepth: [(privkey, type)]} |
|
self._imported = None |
|
# path is (_IMPORTED_ROOT_PATH, mixdepth, key_index) |
|
super().__init__(storage, **kwargs) |
|
|
|
def _load_storage(self): |
|
super()._load_storage() |
|
self._imported = collections.defaultdict(list) |
|
for md, keys in self._storage.data[self._IMPORTED_STORAGE_KEY].items(): |
|
md = int(md) |
|
self._imported[md] = keys |
|
for index, (key, key_type) in enumerate(keys): |
|
if not key: |
|
# imported key was removed |
|
continue |
|
assert key_type in self._ENGINES |
|
self._cache_imported_key(md, key, key_type, index) |
|
|
|
def save(self): |
|
import_data = {} |
|
for md in self._imported: |
|
import_data[_int_to_bytestr(md)] = self._imported[md] |
|
self._storage.data[self._IMPORTED_STORAGE_KEY] = import_data |
|
super().save() |
|
|
|
@classmethod |
|
def initialize(cls, storage, network, max_mixdepth=2, timestamp=None, |
|
write=True, **kwargs): |
|
super(ImportWalletMixin, cls).initialize( |
|
storage, network, max_mixdepth, timestamp, write=False, **kwargs) |
|
storage.data[cls._IMPORTED_STORAGE_KEY] = {} |
|
|
|
if write: |
|
storage.save() |
|
|
|
def import_private_key(self, mixdepth, wif): |
|
""" |
|
Import a private key in WIF format. |
|
|
|
args: |
|
mixdepth: int, mixdepth to import key into |
|
wif: str, private key in WIF format |
|
|
|
raises: |
|
WalletError: if key's network does not match wallet network |
|
WalletError: if key is not compressed and type is not P2PKH |
|
|
|
returns: |
|
path of imported key |
|
""" |
|
if not 0 <= mixdepth <= self.max_mixdepth: |
|
raise WalletError("Mixdepth must be positive and at most {}." |
|
"".format(self.max_mixdepth)) |
|
|
|
privkey, key_type_wif = self._ENGINE.wif_to_privkey(wif) |
|
# FIXME: there is no established standard for encoding key type in wif |
|
#if key_type is not None and key_type_wif is not None and \ |
|
# key_type != key_type_wif: |
|
# raise WalletError("Expected key type does not match WIF type.") |
|
|
|
# default to wallet key type |
|
key_type = self.TYPE |
|
engine = self._ENGINES[key_type] |
|
|
|
if engine.key_to_script(privkey) in self._script_map: |
|
raise WalletError("Cannot import key, already in wallet: {}" |
|
"".format(wif)) |
|
|
|
self._imported[mixdepth].append((privkey, key_type)) |
|
return self._cache_imported_key(mixdepth, privkey, key_type, |
|
len(self._imported[mixdepth]) - 1) |
|
|
|
def remove_imported_key(self, script=None, address=None, path=None): |
|
""" |
|
Remove an imported key. Arguments are exclusive. |
|
|
|
args: |
|
script: bytes |
|
address: str |
|
path: path |
|
""" |
|
if sum((bool(script), bool(address), bool(path))) != 1: |
|
raise Exception("Only one of script|address|path may be given.") |
|
|
|
if address: |
|
script = self.addr_to_script(address) |
|
if script: |
|
path = self.script_to_path(script) |
|
|
|
if not path: |
|
raise WalletError("Cannot find key in wallet.") |
|
|
|
if not self._is_imported_path(path): |
|
raise WalletError("Cannot remove non-imported key.") |
|
|
|
assert len(path) == 3 |
|
|
|
if not script: |
|
script = self.get_script_from_path(path) |
|
|
|
# we need to retain indices |
|
self._imported[path[1]][path[2]] = (b'', -1) |
|
|
|
del self._script_map[script] |
|
|
|
def _cache_imported_key(self, mixdepth, privkey, key_type, index): |
|
engine = self._ENGINES[key_type] |
|
path = (self._IMPORTED_ROOT_PATH, mixdepth, index) |
|
|
|
self._script_map[engine.key_to_script(privkey)] = path |
|
|
|
return path |
|
|
|
def _get_mixdepth_from_path(self, path): |
|
if not self._is_imported_path(path): |
|
return super()._get_mixdepth_from_path(path) |
|
|
|
assert len(path) == 3 |
|
return path[1] |
|
|
|
def _get_key_from_path(self, path): |
|
if not self._is_imported_path(path): |
|
return super()._get_key_from_path(path) |
|
|
|
assert len(path) == 3 |
|
md, i = path[1], path[2] |
|
assert 0 <= md <= self.max_mixdepth |
|
|
|
if len(self._imported[md]) <= i: |
|
raise WalletError("unknown imported key at {}" |
|
"".format(self.get_path_repr(path))) |
|
|
|
key, key_type = self._imported[md][i] |
|
|
|
if key_type == -1: |
|
raise WalletError("imported key was removed") |
|
|
|
return key, self._ENGINES[key_type] |
|
|
|
@classmethod |
|
def _is_imported_path(cls, path): |
|
return len(path) == 3 and path[0] == cls._IMPORTED_ROOT_PATH |
|
|
|
def is_standard_wallet_script(self, path): |
|
if self._is_imported_path(path): |
|
engine = self._get_key_from_path(path)[1] |
|
return engine == self._ENGINE |
|
return super().is_standard_wallet_script(path) |
|
|
|
def path_repr_to_path(self, pathstr): |
|
spath = pathstr.encode('ascii').split(b'/') |
|
if not self._is_imported_path(spath): |
|
return super().path_repr_to_path(pathstr) |
|
|
|
return self._IMPORTED_ROOT_PATH, int(spath[1]), int(spath[2]) |
|
|
|
def get_path_repr(self, path): |
|
if not self._is_imported_path(path): |
|
return super().get_path_repr(path) |
|
|
|
assert len(path) == 3 |
|
return 'imported/{}/{}'.format(*path[1:]) |
|
|
|
def yield_imported_paths(self, mixdepth): |
|
assert 0 <= mixdepth <= self.max_mixdepth |
|
|
|
for index in range(len(self._imported[mixdepth])): |
|
if self._imported[mixdepth][index][1] == -1: |
|
continue |
|
yield (self._IMPORTED_ROOT_PATH, mixdepth, index) |
|
|
|
def get_details(self, path): |
|
if not self._is_imported_path(path): |
|
return super().get_details(path) |
|
return path[1], 'imported', path[2] |
|
|
|
|
|
class BIP39WalletMixin(object): |
|
""" |
|
Mixin to use BIP-39 mnemonic seed with BIP32Wallet |
|
""" |
|
_BIP39_EXTENSION_KEY = b'seed_extension' |
|
MNEMONIC_LANG = 'english' |
|
|
|
def _load_storage(self): |
|
super()._load_storage() |
|
self._entropy_extension = self._storage.data.get(self._BIP39_EXTENSION_KEY) |
|
|
|
@classmethod |
|
def initialize(cls, storage, network, max_mixdepth=2, timestamp=None, |
|
entropy=None, entropy_extension=None, write=True, **kwargs): |
|
super(BIP39WalletMixin, cls).initialize( |
|
storage, network, max_mixdepth, timestamp, entropy, |
|
write=False, **kwargs) |
|
if entropy_extension: |
|
# Note: future reads from storage will retrieve this data |
|
# as binary, so set it as binary on initialization for consistency. |
|
# Note that this is in contrast to the mnemonic wordlist, which is |
|
# handled by the mnemonic package, which returns the words as a string. |
|
storage.data[cls._BIP39_EXTENSION_KEY] = entropy_extension.encode("utf-8") |
|
|
|
if write: |
|
storage.save() |
|
|
|
def _create_master_key(self): |
|
ent, ext = self.get_mnemonic_words() |
|
m = Mnemonic(self.MNEMONIC_LANG) |
|
return m.to_seed(ent, ext or b'') |
|
|
|
@classmethod |
|
def _verify_entropy(cls, ent): |
|
# every 4-bytestream is a valid entropy for BIP-39 |
|
return ent and len(ent) % 4 == 0 |
|
|
|
def get_mnemonic_words(self): |
|
entropy = super()._create_master_key() |
|
m = Mnemonic(self.MNEMONIC_LANG) |
|
return m.to_mnemonic(entropy), self._entropy_extension |
|
|
|
@classmethod |
|
def entropy_from_mnemonic(cls, seed): |
|
m = Mnemonic(cls.MNEMONIC_LANG) |
|
seed = seed.lower() |
|
# ensure only single whitespace characters: |
|
seed = " ".join(seed.split()) |
|
if not m.check(seed): |
|
raise WalletError("Invalid mnemonic seed.") |
|
|
|
ent = m.to_entropy(seed) |
|
|
|
if not cls._verify_entropy(ent): |
|
raise WalletError("Seed entropy is too low.") |
|
|
|
return bytes(ent) |
|
|
|
|
|
class BIP32Wallet(BaseWallet): |
|
_STORAGE_ENTROPY_KEY = b'entropy' |
|
_STORAGE_INDEX_CACHE = b'index_cache' |
|
BIP32_MAX_PATH_LEVEL = 2**31 |
|
BIP32_EXT_ID = BaseWallet.ADDRESS_TYPE_EXTERNAL |
|
BIP32_INT_ID = BaseWallet.ADDRESS_TYPE_INTERNAL |
|
ENTROPY_BYTES = 16 |
|
|
|
def __init__(self, storage, **kwargs): |
|
self._entropy = None |
|
# {mixdepth: {type: index}} with type being 0/1 corresponding |
|
# to external/internal addresses |
|
self._index_cache = None |
|
# path is a tuple of BIP32 levels, |
|
# m is the master key's fingerprint |
|
# other levels are ints |
|
super().__init__(storage, **kwargs) |
|
assert self._index_cache is not None |
|
assert self._verify_entropy(self._entropy) |
|
|
|
_master_entropy = self._create_master_key() |
|
assert _master_entropy |
|
assert isinstance(_master_entropy, bytes) |
|
self._master_key = self._derive_bip32_master_key(_master_entropy) |
|
|
|
# used to verify paths for sanity checking and for wallet id creation |
|
self._key_ident = b'' # otherwise get_bip32_* won't work |
|
self._key_ident = self._get_key_ident() |
|
self._populate_script_map() |
|
self.disable_new_scripts = False |
|
|
|
@classmethod |
|
def initialize(cls, storage, network, max_mixdepth=2, timestamp=None, |
|
entropy=None, write=True): |
|
""" |
|
args: |
|
entropy: ENTROPY_BYTES bytes or None to have wallet generate some |
|
""" |
|
if entropy and not cls._verify_entropy(entropy): |
|
raise WalletError("Invalid entropy.") |
|
|
|
super(BIP32Wallet, cls).initialize(storage, network, max_mixdepth, |
|
timestamp, write=False) |
|
|
|
if not entropy: |
|
entropy = get_random_bytes(cls.ENTROPY_BYTES, True) |
|
|
|
storage.data[cls._STORAGE_ENTROPY_KEY] = entropy |
|
storage.data[cls._STORAGE_INDEX_CACHE] = { |
|
_int_to_bytestr(i): {} for i in range(max_mixdepth + 1)} |
|
|
|
if write: |
|
storage.save() |
|
|
|
def _load_storage(self): |
|
super()._load_storage() |
|
self._entropy = self._storage.data[self._STORAGE_ENTROPY_KEY] |
|
|
|
self._index_cache = collections.defaultdict( |
|
lambda: collections.defaultdict(int)) |
|
|
|
for md, data in self._storage.data[self._STORAGE_INDEX_CACHE].items(): |
|
md = int(md) |
|
md_map = self._index_cache[md] |
|
for t, k in data.items(): |
|
md_map[int(t)] = k |
|
|
|
self.max_mixdepth = max(0, 0, *self._index_cache.keys()) |
|
|
|
def _get_key_ident(self): |
|
return sha256(sha256( |
|
self.get_bip32_priv_export(0, self.BIP32_EXT_ID).encode('ascii')).digest())\ |
|
.digest()[:3] |
|
|
|
def _populate_script_map(self): |
|
for md in self._index_cache: |
|
for address_type in (self.BIP32_EXT_ID, self.BIP32_INT_ID): |
|
for i in range(self._index_cache[md][address_type]): |
|
path = self.get_path(md, address_type, i) |
|
script = self.get_script_from_path(path) |
|
self._script_map[script] = path |
|
|
|
def save(self): |
|
for md, data in self._index_cache.items(): |
|
str_data = {} |
|
str_md = _int_to_bytestr(md) |
|
|
|
for t, k in data.items(): |
|
str_data[_int_to_bytestr(t)] = k |
|
|
|
self._storage.data[self._STORAGE_INDEX_CACHE][str_md] = str_data |
|
|
|
super().save() |
|
|
|
def _create_master_key(self): |
|
""" |
|
for base/legacy wallet type, this is a passthrough. |
|
for bip39 style wallets, this will convert from one to the other |
|
""" |
|
return self._entropy |
|
|
|
@classmethod |
|
def _verify_entropy(cls, ent): |
|
# This is not very useful but true for BIP32. Subclasses may have |
|
# stricter requirements. |
|
return bool(ent) |
|
|
|
@classmethod |
|
def _derive_bip32_master_key(cls, seed): |
|
return cls._ENGINE.derive_bip32_master_key(seed) |
|
|
|
@classmethod |
|
def _get_supported_address_types(cls): |
|
return (cls.BIP32_EXT_ID, cls.BIP32_INT_ID) |
|
|
|
def get_script_from_path(self, path): |
|
if not self._is_my_bip32_path(path): |
|
return super().get_script_from_path(path) |
|
|
|
md, address_type, index = self.get_details(path) |
|
|
|
if not 0 <= md <= self.max_mixdepth: |
|
raise WalletError("Mixdepth outside of wallet's range.") |
|
assert address_type in self._get_supported_address_types() |
|
|
|
current_index = self._index_cache[md][address_type] |
|
|
|
if index == current_index \ |
|
and address_type != FidelityBondMixin.BIP32_TIMELOCK_ID: |
|
#special case for timelocked addresses because for them the |
|
#concept of a "next address" cant be used |
|
return self.get_new_script_override_disable(md, address_type) |
|
|
|
return super().get_script_from_path(path) |
|
|
|
def get_path(self, mixdepth=None, address_type=None, index=None): |
|
if mixdepth is not None: |
|
assert isinstance(mixdepth, Integral) |
|
if not 0 <= mixdepth <= self.max_mixdepth: |
|
raise WalletError("Mixdepth outside of wallet's range.") |
|
|
|
if address_type is not None: |
|
if mixdepth is None: |
|
raise Exception("mixdepth must be set if address_type is set") |
|
|
|
if index is not None: |
|
assert isinstance(index, Integral) |
|
if address_type is None: |
|
raise Exception("address_type must be set if index is set") |
|
assert index <= self._index_cache[mixdepth][address_type] |
|
assert index < self.BIP32_MAX_PATH_LEVEL |
|
return tuple(chain(self._get_bip32_export_path(mixdepth, address_type), |
|
(index,))) |
|
|
|
return tuple(self._get_bip32_export_path(mixdepth, address_type)) |
|
|
|
def get_path_repr(self, path): |
|
path = list(path) |
|
assert self._is_my_bip32_path(path) |
|
path.pop(0) |
|
return 'm' + '/' + '/'.join(map(self._path_level_to_repr, path)) |
|
|
|
@classmethod |
|
def _harden_path_level(cls, lvl): |
|
assert isinstance(lvl, Integral) |
|
if not 0 <= lvl < cls.BIP32_MAX_PATH_LEVEL: |
|
raise WalletError("Unable to derive hardened path level from {}." |
|
"".format(lvl)) |
|
return lvl + cls.BIP32_MAX_PATH_LEVEL |
|
|
|
@classmethod |
|
def _path_level_to_repr(cls, lvl): |
|
assert isinstance(lvl, Integral) |
|
if not 0 <= lvl < cls.BIP32_MAX_PATH_LEVEL * 2: |
|
raise WalletError("Invalid path level {}.".format(lvl)) |
|
if lvl < cls.BIP32_MAX_PATH_LEVEL: |
|
return str(lvl) |
|
return str(lvl - cls.BIP32_MAX_PATH_LEVEL) + "'" |
|
|
|
def path_repr_to_path(self, pathstr): |
|
spath = pathstr.split('/') |
|
assert len(spath) > 0 |
|
if spath[0] != 'm': |
|
raise WalletError("Not a valid wallet path: {}".format(pathstr)) |
|
|
|
def conv_level(lvl): |
|
if lvl[-1] == "'": |
|
return self._harden_path_level(int(lvl[:-1])) |
|
return int(lvl) |
|
|
|
return tuple(chain((self._key_ident,), map(conv_level, spath[1:]))) |
|
|
|
def _get_mixdepth_from_path(self, path): |
|
if not self._is_my_bip32_path(path): |
|
raise WalletError("Invalid path, unknown root: {}".format(path)) |
|
|
|
return path[len(self._get_bip32_base_path())] |
|
|
|
def _get_key_from_path(self, path): |
|
if not self._is_my_bip32_path(path): |
|
raise WalletError("Invalid path, unknown root: {}".format(path)) |
|
|
|
return self._ENGINE.derive_bip32_privkey(self._master_key, path), \ |
|
self._ENGINE |
|
|
|
def _is_my_bip32_path(self, path): |
|
return len(path) > 0 and path[0] == self._key_ident |
|
|
|
def is_standard_wallet_script(self, path): |
|
return self._is_my_bip32_path(path) |
|
|
|
def get_new_script(self, mixdepth, address_type): |
|
if self.disable_new_scripts: |
|
raise RuntimeError("Obtaining new wallet addresses " |
|
+ "disabled, due to nohistory mode") |
|
return self.get_new_script_override_disable(mixdepth, address_type) |
|
|
|
def get_new_script_override_disable(self, mixdepth, address_type): |
|
# This is called by get_script_from_path and calls back there. We need to |
|
# ensure all conditions match to avoid endless recursion. |
|
index = self.get_index_cache_and_increment(mixdepth, address_type) |
|
return self.get_script_and_update_map(mixdepth, address_type, index) |
|
|
|
def _set_index_cache(self, mixdepth, address_type, index): |
|
""" Ensures that any update to index_cache dict only applies |
|
to valid address types. |
|
""" |
|
assert address_type in self._get_supported_address_types() |
|
self._index_cache[mixdepth][address_type] = index |
|
|
|
def get_index_cache_and_increment(self, mixdepth, address_type): |
|
cur_index = self._index_cache[mixdepth][address_type] |
|
self._set_index_cache(mixdepth, address_type, cur_index + 1) |
|
return cur_index |
|
|
|
def get_script_and_update_map(self, *args): |
|
path = self.get_path(*args) |
|
script = self.get_script_from_path(path) |
|
self._script_map[script] = path |
|
return script |
|
|
|
@deprecated |
|
def get_key(self, mixdepth, address_type, index): |
|
path = self.get_path(mixdepth, address_type, index) |
|
priv = self._ENGINE.derive_bip32_privkey(self._master_key, path) |
|
return hexlify(priv).decode('ascii') |
|
|
|
def get_bip32_priv_export(self, mixdepth=None, address_type=None): |
|
path = self._get_bip32_export_path(mixdepth, address_type) |
|
return self._ENGINE.derive_bip32_priv_export(self._master_key, path) |
|
|
|
def get_bip32_pub_export(self, mixdepth=None, address_type=None): |
|
path = self._get_bip32_export_path(mixdepth, address_type) |
|
return self._ENGINE.derive_bip32_pub_export(self._master_key, path) |
|
|
|
def _get_bip32_export_path(self, mixdepth=None, address_type=None): |
|
if mixdepth is None: |
|
assert address_type is None |
|
path = tuple() |
|
else: |
|
assert 0 <= mixdepth <= self.max_mixdepth |
|
if address_type is None: |
|
path = (self._get_bip32_mixdepth_path_level(mixdepth),) |
|
else: |
|
path = (self._get_bip32_mixdepth_path_level(mixdepth), address_type) |
|
|
|
return tuple(chain(self._get_bip32_base_path(), path)) |
|
|
|
def _get_bip32_base_path(self): |
|
return self._key_ident, |
|
|
|
@classmethod |
|
def _get_bip32_mixdepth_path_level(cls, mixdepth): |
|
return mixdepth |
|
|
|
def get_next_unused_index(self, mixdepth, address_type): |
|
assert 0 <= mixdepth <= self.max_mixdepth |
|
|
|
if self._index_cache[mixdepth][address_type] >= self.BIP32_MAX_PATH_LEVEL: |
|
# FIXME: theoretically this should work for up to |
|
# self.BIP32_MAX_PATH_LEVEL * 2, no? |
|
raise WalletError("All addresses used up, cannot generate new ones.") |
|
|
|
return self._index_cache[mixdepth][address_type] |
|
|
|
def get_mnemonic_words(self): |
|
return ' '.join(mn_encode(hexlify(self._entropy).decode('ascii'))), None |
|
|
|
@classmethod |
|
def entropy_from_mnemonic(cls, seed): |
|
words = seed.split() |
|
if len(words) != 12: |
|
raise WalletError("Seed phrase must consist of exactly 12 words.") |
|
|
|
return unhexlify(mn_decode(words)) |
|
|
|
def get_wallet_id(self): |
|
return hexlify(self._key_ident).decode('ascii') |
|
|
|
def set_next_index(self, mixdepth, address_type, index, force=False): |
|
if not (force or index <= self._index_cache[mixdepth][address_type]): |
|
raise Exception("cannot advance index without force=True") |
|
self._set_index_cache(mixdepth, address_type, index) |
|
|
|
def get_details(self, path): |
|
if not self._is_my_bip32_path(path): |
|
raise Exception("path does not belong to wallet") |
|
return self._get_mixdepth_from_path(path), path[-2], path[-1] |
|
|
|
|
|
class LegacyWallet(ImportWalletMixin, PSBTWalletMixin, BIP32Wallet): |
|
TYPE = TYPE_P2PKH |
|
_ENGINE = ENGINES[TYPE_P2PKH] |
|
|
|
def _create_master_key(self): |
|
return hexlify(self._entropy) |
|
|
|
def _get_bip32_base_path(self): |
|
return self._key_ident, 0 |
|
|
|
class BIP32PurposedWallet(BIP32Wallet): |
|
""" A class to encapsulate cases like |
|
BIP44, 49 and 84, all of which are derivatives |
|
of BIP32, and use specific purpose |
|
fields to flag different wallet types. |
|
""" |
|
|
|
def _get_bip32_base_path(self): |
|
return self._key_ident, self._PURPOSE,\ |
|
self._ENGINE.BIP44_COIN_TYPE |
|
|
|
@classmethod |
|
def _get_bip32_mixdepth_path_level(cls, mixdepth): |
|
assert 0 <= mixdepth < 2**31 |
|
return cls._harden_path_level(mixdepth) |
|
|
|
def _get_mixdepth_from_path(self, path): |
|
if not self._is_my_bip32_path(path): |
|
raise WalletError("Invalid path, unknown root: {}".format(path)) |
|
|
|
return path[len(self._get_bip32_base_path())] - 2**31 |
|
|
|
class FidelityBondMixin(object): |
|
BIP32_TIMELOCK_ID = 2 |
|
BIP32_BURN_ID = 3 |
|
|
|
""" |
|
Explaination of time number |
|
|
|
incrementing time numbers (0, 1, 2, 3, 4... |
|
will produce datetimes |
|
suitable for timelocking (1st january, 1st april, 1st july .... |
|
this greatly reduces the number of possible timelock values |
|
and is helpful for recovery of funds because the wallet can search |
|
the only addresses corresponding to timenumbers which are far fewer |
|
|
|
For example, if TIMENUMBER_UNIT = 2 (i.e. every time number is two months) |
|
then there are 6 timelocks per year so just 600 possible |
|
addresses per century. Easily searchable when recovering a |
|
wallet from seed phrase. Therefore the user doesn't need to store any |
|
dates, the seed phrase is sufficent for recovery. |
|
""" |
|
#should be a factor of 12, the number of months in a year |
|
TIMENUMBER_UNIT = 1 |
|
|
|
# all timelocks are 1st of the month at midnight |
|
TIMELOCK_DAY_AND_SHORTER = (1, 0, 0, 0, 0) |
|
TIMELOCK_EPOCH_YEAR = 2020 |
|
TIMELOCK_EPOCH_MONTH = 1 #january |
|
MONTHS_IN_YEAR = 12 |
|
|
|
TIMELOCK_ERA_YEARS = 80 |
|
TIMENUMBER_COUNT = TIMELOCK_ERA_YEARS * MONTHS_IN_YEAR // TIMENUMBER_UNIT |
|
|
|
_TIMELOCK_ENGINE = ENGINES[TYPE_TIMELOCK_P2WSH] |
|
|
|
#only one mixdepth will have fidelity bonds in it |
|
FIDELITY_BOND_MIXDEPTH = 0 |
|
|
|
MERKLE_BRANCH_UNAVAILABLE = b"mbu" |
|
|
|
_BURNER_OUTPUT_STORAGE_KEY = b"burner-out" |
|
|
|
_BIP32_PUBKEY_PREFIX = "fbonds-mpk-" |
|
|
|
@classmethod |
|
def _time_number_to_timestamp(cls, timenumber): |
|
""" |
|
converts a time number to a unix timestamp |
|
""" |
|
if not 0 <= timenumber < cls.TIMENUMBER_COUNT: |
|
raise ValueError() |
|
year = cls.TIMELOCK_EPOCH_YEAR + (timenumber*cls.TIMENUMBER_UNIT) // cls.MONTHS_IN_YEAR |
|
month = cls.TIMELOCK_EPOCH_MONTH + (timenumber*cls.TIMENUMBER_UNIT) % cls.MONTHS_IN_YEAR |
|
return timegm(datetime(year, month, *cls.TIMELOCK_DAY_AND_SHORTER).timetuple()) |
|
|
|
@classmethod |
|
def datetime_to_time_number(cls, dt): |
|
""" |
|
converts a datetime object to a time number |
|
""" |
|
if (dt.month - cls.TIMELOCK_EPOCH_MONTH) % cls.TIMENUMBER_UNIT != 0: |
|
raise ValueError() |
|
day_and_shorter_tuple = (dt.day, dt.hour, dt.minute, dt.second, dt.microsecond) |
|
if day_and_shorter_tuple != cls.TIMELOCK_DAY_AND_SHORTER: |
|
raise ValueError() |
|
timenumber = (dt.year - cls.TIMELOCK_EPOCH_YEAR)*(cls.MONTHS_IN_YEAR // |
|
cls.TIMENUMBER_UNIT) + ((dt.month - cls.TIMELOCK_EPOCH_MONTH) // cls.TIMENUMBER_UNIT) |
|
if timenumber < 0 or timenumber > cls.TIMENUMBER_COUNT: |
|
raise ValueError("datetime out of range") |
|
return timenumber |
|
|
|
@classmethod |
|
def timestamp_to_time_number(cls, timestamp): |
|
""" |
|
converts a unix timestamp to a time number |
|
""" |
|
#workaround for the year 2038 problem on 32 bit systems |
|
#see https://stackoverflow.com/questions/10588027/converting-timestamps-larger-than-maxint-into-datetime-objects |
|
dt = datetime.utcfromtimestamp(0) + timedelta(seconds=timestamp) |
|
return cls.datetime_to_time_number(dt) |
|
|
|
@classmethod |
|
def is_timelocked_path(cls, path): |
|
return len(path) > 4 and path[4] == cls.BIP32_TIMELOCK_ID |
|
|
|
def _get_key_ident(self): |
|
first_path = self.get_path(0, BIP32Wallet.BIP32_EXT_ID) |
|
priv, engine = self._get_key_from_path(first_path) |
|
pub = engine.privkey_to_pubkey(priv) |
|
return sha256(sha256(pub).digest()).digest()[:3] |
|
|
|
def is_standard_wallet_script(self, path): |
|
if self.is_timelocked_path(path): |
|
return False |
|
return super().is_standard_wallet_script(path) |
|
|
|
@classmethod |
|
def get_xpub_from_fidelity_bond_master_pub_key(cls, mpk): |
|
if mpk.startswith(cls._BIP32_PUBKEY_PREFIX): |
|
return mpk[len(cls._BIP32_PUBKEY_PREFIX):] |
|
else: |
|
return False |
|
|
|
def _populate_script_map(self): |
|
super()._populate_script_map() |
|
md = self.FIDELITY_BOND_MIXDEPTH |
|
address_type = self.BIP32_TIMELOCK_ID |
|
for timenumber in range(self.TIMENUMBER_COUNT): |
|
path = self.get_path(md, address_type, timenumber) |
|
script = self.get_script_from_path(path) |
|
self._script_map[script] = path |
|
|
|
def add_utxo(self, txid, index, script, value, height=None): |
|
super().add_utxo(txid, index, script, value, height) |
|
#dont use coin control freeze if wallet readonly |
|
if self._storage.read_only: |
|
return |
|
path = self.script_to_path(script) |
|
if not self.is_timelocked_path(path): |
|
return |
|
if (datetime.utcfromtimestamp(0) + timedelta(seconds=path[-1])) > datetime.now(): |
|
#freeze utxo if its timelock is in the future |
|
self.disable_utxo(txid, index, disable=True) |
|
|
|
def get_bip32_pub_export(self, mixdepth=None, address_type=None): |
|
bip32_pub = super().get_bip32_pub_export(mixdepth, address_type) |
|
if address_type == None and mixdepth == self.FIDELITY_BOND_MIXDEPTH: |
|
bip32_pub = self._BIP32_PUBKEY_PREFIX + bip32_pub |
|
return bip32_pub |
|
|
|
@classmethod |
|
def _get_supported_address_types(cls): |
|
return (cls.BIP32_EXT_ID, cls.BIP32_INT_ID, cls.BIP32_TIMELOCK_ID, cls.BIP32_BURN_ID) |
|
|
|
def _get_key_from_path(self, path): |
|
if self.is_timelocked_path(path): |
|
key_path = path[:-1] |
|
locktime = path[-1] |
|
engine = self._TIMELOCK_ENGINE |
|
privkey = engine.derive_bip32_privkey(self._master_key, key_path) |
|
return (privkey, locktime), engine |
|
else: |
|
return super()._get_key_from_path(path) |
|
|
|
def get_path(self, mixdepth=None, address_type=None, index=None): |
|
if address_type == None or address_type in (self.BIP32_EXT_ID, self.BIP32_INT_ID, |
|
self.BIP32_BURN_ID) or index == None: |
|
return super().get_path(mixdepth, address_type, index) |
|
elif address_type == self.BIP32_TIMELOCK_ID: |
|
# index is re-purposed as timenumber |
|
assert index is not None |
|
timestamp = self._time_number_to_timestamp(index) |
|
return tuple(chain(self._get_bip32_export_path(mixdepth, address_type), |
|
(index, timestamp))) |
|
else: |
|
assert 0 |
|
|
|
""" |
|
We define a new serialization of the bip32 path to include bip65 timelock addresses |
|
Previously it was m/44'/1'/0'/3/0 |
|
For a timelocked address it will be m/44'/1'/0'/3/0:13245432 |
|
The timelock will be in unix format and added to the end with a colon ":" character |
|
refering to the pubkey plus the timelock value which together are needed to create the address |
|
""" |
|
def get_path_repr(self, path): |
|
if self.is_timelocked_path(path) and len(path) == 7: |
|
return super().get_path_repr(path[:-1]) + ":" + str(path[-1]) |
|
else: |
|
return super().get_path_repr(path) |
|
|
|
def path_repr_to_path(self, pathstr): |
|
if pathstr.find(":") == -1: |
|
return super().path_repr_to_path(pathstr) |
|
else: |
|
colon_chunks = pathstr.split(":") |
|
if len(colon_chunks) != 2: |
|
raise WalletError("Not a valid wallet timelock path: {}".format(pathstr)) |
|
return tuple(chain( |
|
super().path_repr_to_path(colon_chunks[0]), (int(colon_chunks[1]),))) |
|
|
|
def get_details(self, path): |
|
if self.is_timelocked_path(path): |
|
return self._get_mixdepth_from_path(path), path[-3], path[-2] |
|
else: |
|
return super().get_details(path) |
|
|
|
def _get_default_used_indices(self): |
|
return {x: [0, 0, 0, 0] for x in range(self.max_mixdepth + 1)} |
|
|
|
def add_burner_output(self, path, txhex, block_height, merkle_branch, |
|
block_index, write=True): |
|
""" |
|
merkle_branch = None means it was unavailable because of pruning |
|
""" |
|
if self._BURNER_OUTPUT_STORAGE_KEY not in self._storage.data: |
|
self._storage.data[self._BURNER_OUTPUT_STORAGE_KEY] = {} |
|
path = path.encode() |
|
txhex = unhexlify(txhex) |
|
if not merkle_branch: |
|
merkle_branch = self.MERKLE_BRANCH_UNAVAILABLE |
|
self._storage.data[self._BURNER_OUTPUT_STORAGE_KEY][path] = [txhex, |
|
block_height, merkle_branch, block_index] |
|
if write: |
|
self._storage.save() |
|
|
|
def get_burner_outputs(self): |
|
""" |
|
Result is a dict {path: [txhex, blockheight, merkleproof, blockindex]} |
|
""" |
|
return self._storage.data.get(self._BURNER_OUTPUT_STORAGE_KEY, {}) |
|
|
|
def set_burner_output_merkle_branch(self, path, merkle_branch): |
|
path = path.encode() |
|
self._storage.data[self._BURNER_OUTPUT_STORAGE_KEY][path][2] = \ |
|
merkle_branch |
|
|
|
|
|
@classmethod |
|
def calculate_timelocked_fidelity_bond_value(cls, utxo_value, confirmation_time, locktime, |
|
current_time, interest_rate): |
|
""" |
|
utxo_value is in satoshi |
|
interest rate is per year |
|
all times are seconds |
|
""" |
|
YEAR = 60 * 60 * 24 * 365.2425 #gregorian calender year length |
|
|
|
r = interest_rate |
|
T = (locktime - confirmation_time) / YEAR |
|
L = locktime / YEAR |
|
t = current_time / YEAR |
|
|
|
a = max(0, min(1, exp(r*T) - 1) - min(1, exp(r*max(0, t-L)) - 1)) |
|
exponent = float(jm_single().config.get("POLICY", "bond_value_exponent")) |
|
return pow(utxo_value*a, exponent) |
|
|
|
@classmethod |
|
def get_validated_timelocked_fidelity_bond_utxo(cls, utxo, utxo_pubkey, locktime, |
|
cert_expiry, current_block_height): |
|
|
|
utxo_data = jm_single().bc_interface.query_utxo_set(utxo, includeconfs=True) |
|
if utxo_data[0] == None: |
|
return None |
|
if utxo_data[0]["confirms"] <= 0: |
|
return None |
|
RETARGET_INTERVAL = 2016 |
|
if current_block_height > cert_expiry*RETARGET_INTERVAL: |
|
return None |
|
implied_spk = btc.redeem_script_to_p2wsh_script(btc.mk_freeze_script(utxo_pubkey, locktime)) |
|
if utxo_data[0]["script"] != implied_spk: |
|
return None |
|
return utxo_data[0] |
|
|
|
class BIP49Wallet(BIP32PurposedWallet): |
|
_PURPOSE = 2**31 + 49 |
|
_ENGINE = ENGINES[TYPE_P2SH_P2WPKH] |
|
|
|
class BIP84Wallet(BIP32PurposedWallet): |
|
_PURPOSE = 2**31 + 84 |
|
_ENGINE = ENGINES[TYPE_P2WPKH] |
|
|
|
class SegwitLegacyWallet(ImportWalletMixin, BIP39WalletMixin, PSBTWalletMixin, SNICKERWalletMixin, BIP49Wallet): |
|
TYPE = TYPE_P2SH_P2WPKH |
|
|
|
class SegwitWallet(ImportWalletMixin, BIP39WalletMixin, PSBTWalletMixin, SNICKERWalletMixin, BIP84Wallet): |
|
TYPE = TYPE_P2WPKH |
|
|
|
class SegwitWalletFidelityBonds(FidelityBondMixin, SegwitWallet): |
|
TYPE = TYPE_SEGWIT_WALLET_FIDELITY_BONDS |
|
|
|
|
|
class FidelityBondWatchonlyWallet(FidelityBondMixin, BIP84Wallet): |
|
TYPE = TYPE_WATCHONLY_FIDELITY_BONDS |
|
_ENGINE = ENGINES[TYPE_WATCHONLY_P2WPKH] |
|
_TIMELOCK_ENGINE = ENGINES[TYPE_WATCHONLY_TIMELOCK_P2WSH] |
|
|
|
@classmethod |
|
def _verify_entropy(cls, ent): |
|
return ent[1:4] == b"pub" |
|
|
|
@classmethod |
|
def _derive_bip32_master_key(cls, master_entropy): |
|
return btc.bip32_deserialize(master_entropy.decode()) |
|
|
|
def _get_bip32_export_path(self, mixdepth=None, address_type=None): |
|
path = super()._get_bip32_export_path(mixdepth, address_type) |
|
return path |
|
|
|
|
|
WALLET_IMPLEMENTATIONS = { |
|
LegacyWallet.TYPE: LegacyWallet, |
|
SegwitLegacyWallet.TYPE: SegwitLegacyWallet, |
|
SegwitWallet.TYPE: SegwitWallet, |
|
SegwitWalletFidelityBonds.TYPE: SegwitWalletFidelityBonds, |
|
FidelityBondWatchonlyWallet.TYPE: FidelityBondWatchonlyWallet |
|
}
|
|
|