You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

2883 lines
110 KiB

from configparser import NoOptionError
import warnings
import functools
import collections
import numbers
import random
import copy
import base64
import json
from math import ceil
from binascii import hexlify, unhexlify
from datetime import datetime, timedelta
from calendar import timegm
from copy import deepcopy
from mnemonic import Mnemonic as MnemonicParent
from hashlib import sha256
from itertools import chain
from decimal import Decimal
from numbers import Integral
from math import exp
from typing import Any, Dict, Optional, Tuple
from .configure import jm_single
from .blockchaininterface import INF_HEIGHT
from .support import select_gradual, select_greedy, select_greediest, \
select, NotEnoughFundsException
from .cryptoengine import TYPE_P2PKH, TYPE_P2SH_P2WPKH, TYPE_P2WSH,\
TYPE_P2WPKH, TYPE_TIMELOCK_P2WSH, TYPE_SEGWIT_WALLET_FIDELITY_BONDS,\
TYPE_WATCHONLY_FIDELITY_BONDS, TYPE_WATCHONLY_TIMELOCK_P2WSH, \
TYPE_WATCHONLY_P2WPKH, TYPE_P2TR, ENGINES, detect_script_type, EngineError
from .support import get_random_bytes
from . import mn_encode, mn_decode
import jmbitcoin as btc
from jmbase import JM_WALLET_NAME_PREFIX, bintohex
def _int_to_bytestr(i):
return str(i).encode('ascii')
class WalletError(Exception):
pass
class WalletCannotGetPrivateKeyFromWatchOnly(WalletError):
def __init__(self):
super().__init__("Cannot get a private key from a watch-only wallet.")
class WalletCacheValidationFailed(WalletError):
def __init__(self):
super().__init__("Wallet cache validation failed.")
class WalletMixdepthOutOfRange(WalletError):
def __init__(self):
super().__init__("Mixdepth outside of wallet's range.")
class WalletInvalidPath(WalletError):
def __init__(self, path: str):
super().__init__(f"Invalid path, unknown root: {path}.")
class UnknownAddressForLabel(Exception):
def __init__(self, addr: str):
super().__init__(f"Unknown address for this wallet: {addr}.")
class Mnemonic(MnemonicParent):
@classmethod
def detect_language(cls, code):
return "english"
def estimate_tx_fee(ins, outs, txtype='p2pkh', outtype=None, extra_bytes=0):
'''Returns an estimate of the number of satoshis required
for a transaction with the given number of inputs and outputs,
based on information from the blockchain interface.
Arguments:
ins: int, number of inputs
outs: int, number of outputs
txtype: either a single string, or a list of strings
outtype: either None or a list of strings
extra_bytes: an int
These arguments are intended to allow a kind of 'default', where
all the inputs and outputs match a predefined type (that of the wallet),
but also allow customization for heterogeneous input and output types.
For supported input and output types, see the keys of the dicts
`inmults` and `outmults` in jmbitcoin.secp256k1_transaction.estimate_tx_size`.
Returns:
a single integer number of satoshis as estimate.
'''
if jm_single().bc_interface is None:
raise RuntimeError("Cannot estimate transaction fee " +
"without blockchain source.")
fee_per_kb = jm_single().bc_interface.estimate_fee_per_kb(
jm_single().config.getint("POLICY","tx_fees"))
if fee_per_kb is None:
raise RuntimeError("Cannot estimate fee per kB, possibly" +
" a failure of connection to the blockchain.")
absurd_fee = jm_single().config.getint("POLICY", "absurd_fee_per_kb")
if fee_per_kb > absurd_fee:
#This error is considered critical; for safety reasons, shut down.
raise ValueError("Estimated fee " +
btc.fee_per_kb_to_str(fee_per_kb) +
" greater than absurd value " +
btc.fee_per_kb_to_str(absurd_fee) + ", quitting.")
# See docstring for explanation:
if isinstance(txtype, str):
ins = [txtype] * ins
else:
assert isinstance(txtype, list)
ins = txtype
if outtype is None:
outs = [txtype] * outs
elif isinstance(outtype, str):
outs = [outtype] * outs
else:
assert isinstance(outtype, list)
outs = outtype
# Note: the calls to `estimate_tx_size` in this code
# block can raise `NotImplementedError` if any of the
# strings in (ins, outs) are not known script types.
if not btc.there_is_one_segwit_input(ins):
tx_estimated_bytes = btc.estimate_tx_size(ins, outs) + extra_bytes
return int((tx_estimated_bytes * fee_per_kb)/Decimal(1000.0))
else:
witness_estimate, non_witness_estimate = btc.estimate_tx_size(
ins, outs)
non_witness_estimate += extra_bytes
return int(int(ceil(non_witness_estimate + 0.25*witness_estimate)*fee_per_kb)/Decimal(1000.0))
def compute_tx_locktime():
# set locktime for best anonset (Core, Electrum)
# most recent block or some time back in random cases
locktime = jm_single().bc_interface.get_current_block_height()
if random.randint(0, 9) == 0:
# P2EP requires locktime > 0
locktime = max(1, locktime - random.randint(0, 99))
return locktime
#FIXME: move this to a utilities file?
def deprecated(func):
@functools.wraps(func)
def wrapped(*args, **kwargs):
warnings.warn("Call to deprecated function {}.".format(func.__name__),
category=DeprecationWarning, stacklevel=2)
return func(*args, **kwargs)
return wrapped
class UTXOManager(object):
STORAGE_KEY = b'utxo'
METADATA_KEY = b'meta'
TXID_LEN = 32
def __init__(self, storage, merge_func):
self.storage = storage
self.selector = merge_func
# {mixdexpth: {(txid, index): (path, value, height)}}
self._utxo = None
# metadata kept as a separate key in the database
# for backwards compat; value as dict for forward-compat.
# format is {(txid, index): value-dict} with "disabled"
# as the only currently used key in the dict.
self._utxo_meta = None
self._load_storage()
assert self._utxo is not None
@classmethod
def initialize(cls, storage):
storage.data[cls.STORAGE_KEY] = {}
def _load_storage(self):
assert isinstance(self.storage.data[self.STORAGE_KEY], dict)
self._utxo = collections.defaultdict(dict)
self._utxo_meta = collections.defaultdict(dict)
for md, data in self.storage.data[self.STORAGE_KEY].items():
md = int(md)
md_data = self._utxo[md]
for utxo, value in data.items():
txid = utxo[:self.TXID_LEN]
index = int(utxo[self.TXID_LEN:])
md_data[(txid, index)] = value
# Wallets may not have any metadata
if self.METADATA_KEY in self.storage.data:
for utxo, value in self.storage.data[self.METADATA_KEY].items():
txid = utxo[:self.TXID_LEN]
index = int(utxo[self.TXID_LEN:])
self._utxo_meta[(txid, index)] = value
def save(self, write=True):
new_data = {}
self.storage.data[self.STORAGE_KEY] = new_data
for md, data in self._utxo.items():
md = _int_to_bytestr(md)
new_data[md] = {}
# storage keys must be bytes()
for (txid, index), value in data.items():
new_data[md][txid + _int_to_bytestr(index)] = value
new_meta_data = {}
self.storage.data[self.METADATA_KEY] = new_meta_data
for (txid, index), value in self._utxo_meta.items():
new_meta_data[txid + _int_to_bytestr(index)] = value
if write:
self.storage.save()
def reset(self):
self._utxo = collections.defaultdict(dict)
def have_utxo(self, txid, index, include_disabled=True):
if not include_disabled and self.is_disabled(txid, index):
return False
for md in self._utxo:
if (txid, index) in self._utxo[md]:
return md
return False
def remove_utxo(self, txid, index, mixdepth):
# currently does not remove metadata associated
# with this utxo
assert isinstance(txid, bytes)
assert len(txid) == self.TXID_LEN
assert isinstance(index, numbers.Integral)
assert isinstance(mixdepth, numbers.Integral)
x = self._utxo[mixdepth].pop((txid, index))
return x
def add_utxo(self, txid, index, path, value, mixdepth, height=None):
# Assumed: that we add a utxo only if we want it enabled,
# so metadata is not currently added.
# The height (blockheight) field will be "infinity" for unconfirmed.
assert isinstance(txid, bytes)
assert len(txid) == self.TXID_LEN
assert isinstance(index, numbers.Integral)
assert isinstance(value, numbers.Integral)
assert isinstance(mixdepth, numbers.Integral)
if height is None:
height = INF_HEIGHT
assert isinstance(height, numbers.Integral)
self._utxo[mixdepth][(txid, index)] = (path, value, height)
def is_disabled(self, txid, index):
if not self._utxo_meta:
return False
if (txid, index) not in self._utxo_meta:
return False
if b'disabled' not in self._utxo_meta[(txid, index)]:
return False
if not self._utxo_meta[(txid, index)][b'disabled']:
return False
return True
def disable_utxo(self, txid, index, disable=True):
assert isinstance(txid, bytes)
assert len(txid) == self.TXID_LEN
assert isinstance(index, numbers.Integral)
if b'disabled' not in self._utxo_meta[(txid, index)]:
self._utxo_meta[(txid, index)] = {}
self._utxo_meta[(txid, index)][b'disabled'] = disable
def enable_utxo(self, txid, index):
self.disable_utxo(txid, index, disable=False)
def select_utxos(self, mixdepth, amount, utxo_filter=(), select_fn=None,
maxheight=None):
assert isinstance(mixdepth, numbers.Integral)
utxos = self._utxo[mixdepth]
# do not select anything in the filter
available = [{'utxo': utxo, 'value': val}
for utxo, (addr, val, height) in utxos.items() if utxo not in utxo_filter]
# do not select anything with insufficient confirmations:
if maxheight is not None:
available = [{'utxo': utxo, 'value': val}
for utxo, (addr, val, height) in utxos.items(
) if height <= maxheight]
# do not select anything disabled
available = [u for u in available if not self.is_disabled(*u['utxo'])]
selector = select_fn or self.selector
selected = selector(available, amount)
# note that we do not return height; for selection, we expect
# the caller will not want this (after applying the height filter)
return {s['utxo']: {'path': utxos[s['utxo']][0],
'value': utxos[s['utxo']][1]}
for s in selected}
def get_balance_at_mixdepth(self, mixdepth: int,
include_disabled: bool = True,
maxheight: Optional[int] = None) -> int:
""" By default this returns aggregated bitcoin balance at mixdepth.
To get only enabled balance, set include_disabled=False.
To get balances only with a certain number of confs, use maxheight.
"""
utxomap = self._utxo.get(mixdepth)
if not utxomap:
return 0
if not include_disabled:
utxomap = {k: v for k, v in utxomap.items(
) if not self.is_disabled(*k)}
if maxheight is not None:
utxomap = {k: v for k, v in utxomap.items(
) if v[2] <= maxheight}
return sum(x[1] for x in utxomap.values())
def get_utxos_at_mixdepth(self, mixdepth: int) -> \
Dict[Tuple[bytes, int], Tuple[Tuple, int, int]]:
utxomap = self._utxo.get(mixdepth)
return deepcopy(utxomap) if utxomap else {}
def __eq__(self, o):
return self._utxo == o._utxo and \
self.selector is o.selector
class AddressLabelsManager(object):
STORAGE_KEY = b'addr_labels'
def __init__(self, storage):
self.storage = storage
self._addr_labels = None
self._load_storage()
assert self._addr_labels is not None
@classmethod
def initialize(cls, storage):
storage.data[cls.STORAGE_KEY] = {}
def _load_storage(self):
if self.STORAGE_KEY in self.storage.data:
self._addr_labels = self.storage.data[self.STORAGE_KEY]
else:
self._addr_labels = {}
def save(self):
self.storage.data[self.STORAGE_KEY] = self._addr_labels
def get_label(self, address):
address = bytes(address, 'ascii')
if address in self._addr_labels:
return self._addr_labels[address].decode()
else:
return None
def set_label(self, address, label):
address = bytes(address, 'ascii')
if label:
self._addr_labels[address] = bytes(label, 'utf-8')
elif address in self._addr_labels:
del self._addr_labels[address]
class BaseWallet(object):
TYPE = None
MERGE_ALGORITHMS = {
'default': select,
'gradual': select_gradual,
'greedy': select_greedy,
'greediest': select_greediest
}
_ENGINES = ENGINES
_ENGINE = None
ADDRESS_TYPE_EXTERNAL = 0
ADDRESS_TYPE_INTERNAL = 1
def __init__(self, storage, gap_limit=6, merge_algorithm_name=None,
mixdepth=None, load_cache=True):
# to be defined by inheriting classes
assert self.TYPE is not None
assert self._ENGINE is not None
self.merge_algorithm = self._get_merge_algorithm(merge_algorithm_name)
self.gap_limit = gap_limit
self._storage = storage
self._utxos = None
self._addr_labels = None
self._cache = None
# highest mixdepth ever used in wallet, important for synching
self.max_mixdepth = None
# effective maximum mixdepth to be used by joinmarket
self.mixdepth = None
self.network = None
# {script: path}, should always hold mappings for all "known" keys
self._script_map = {}
# {address: path}, should always hold mappings for all "known" keys
self._addr_map = {}
self._load_storage(load_cache=load_cache)
assert self._utxos is not None
assert self._cache is not None
assert self.max_mixdepth is not None
assert self.max_mixdepth >= 0
assert self.network in ('mainnet', 'testnet', 'signet')
if mixdepth is not None:
assert mixdepth >= 0
if self._storage.read_only and mixdepth > self.max_mixdepth:
raise Exception("Effective max mixdepth must be at most {}!"
.format(self.max_mixdepth))
self.max_mixdepth = max(self.max_mixdepth, mixdepth)
self.mixdepth = mixdepth
else:
self.mixdepth = self.max_mixdepth
assert self.mixdepth is not None
@property
@deprecated
def max_mix_depth(self):
return self.mixdepth
@property
@deprecated
def gaplimit(self):
return self.gap_limit
def _load_storage(self, load_cache: bool = True) -> None:
"""
load data from storage
"""
if self._storage.data[b'wallet_type'] != self.TYPE:
raise Exception("Wrong class to initialize wallet of type {}."
.format(self.TYPE))
self.network = self._storage.data[b'network'].decode('ascii')
self._utxos = UTXOManager(self._storage, self.merge_algorithm)
self._addr_labels = AddressLabelsManager(self._storage)
if load_cache:
self._cache = self._storage.data.setdefault(b'cache', {})
else:
self._cache = {}
def get_storage_location(self):
""" Return the location of the
persistent storage, if it exists, or None.
"""
if not self._storage:
return None
return self._storage.get_location()
def save(self):
"""
Write data to associated storage object and trigger persistent update.
"""
self._utxos.save()
self._addr_labels.save()
@classmethod
def initialize(cls, storage, network, max_mixdepth=2, timestamp=None,
write=True):
"""
Initialize wallet in an empty storage. Must be used on a storage object
before creating a wallet object with it.
args:
storage: a Storage object
network: str, network we are on, 'mainnet', 'testnet' or 'signet'
max_mixdepth: int, number of the highest mixdepth
timestamp: bytes or None, defaults to the current time
write: execute storage.save()
"""
assert network in ('mainnet', 'testnet', 'signet')
assert max_mixdepth >= 0
if storage.data != {}:
# prevent accidentally overwriting existing wallet
raise WalletError("Refusing to initialize wallet in non-empty "
"storage.")
if not timestamp:
timestamp = datetime.now().strftime('%Y/%m/%d %H:%M:%S')
storage.data[b'network'] = network.encode('ascii')
storage.data[b'created'] = timestamp.encode('ascii')
storage.data[b'wallet_type'] = cls.TYPE
UTXOManager.initialize(storage)
AddressLabelsManager.initialize(storage)
if write:
storage.save()
def get_txtype(self):
"""
use TYPE constant instead if possible
"""
if self.TYPE == TYPE_P2PKH:
return 'p2pkh'
elif self.TYPE == TYPE_P2SH_P2WPKH:
return 'p2sh-p2wpkh'
elif self.TYPE in (TYPE_P2WPKH,
TYPE_SEGWIT_WALLET_FIDELITY_BONDS):
return 'p2wpkh'
assert False
def get_outtype(self, addr):
try:
script_type = detect_script_type(
btc.CCoinAddress(addr).to_scriptPubKey())
except EngineError:
# up to callers what to do with this;
# it means we don't recognize this script type.
return None
if script_type == TYPE_P2PKH:
return 'p2pkh'
elif script_type == TYPE_P2WPKH:
return 'p2wpkh'
elif script_type == TYPE_P2SH_P2WPKH:
return 'p2sh-p2wpkh'
elif script_type == TYPE_P2WSH:
return 'p2wsh'
elif script_type == TYPE_P2TR:
return 'p2tr'
# should be unreachable; all possible returns
# from detect_script_type are covered.
assert False
def sign_tx(self, tx, scripts, **kwargs):
"""
Add signatures to transaction for inputs referenced by scripts.
args:
tx: CMutableTransaction object
scripts: {input_index: (output_script, amount)}
kwargs: additional arguments for engine.sign_transaction
returns:
True, None if success.
False, msg if signing failed, with error msg.
"""
for index, (script, amount) in scripts.items():
assert amount > 0
path = self.script_to_path(script)
privkey, engine = self._get_key_from_path(path)
sig, msg = engine.sign_transaction(tx, index, privkey,
amount, **kwargs)
if not sig:
return False, msg
return True, None
@deprecated
def get_key_from_addr(self, addr):
"""
There should be no reason for code outside the wallet to need a privkey.
"""
path = self.addr_to_path(addr)
privkey = self._get_key_from_path(path)[0]
return privkey
def get_external_addr(self, mixdepth):
"""
Return an address suitable for external distribution, including funding
the wallet from other sources, or receiving payments or donations.
JoinMarket will never generate these addresses for internal use.
"""
return self.get_new_addr(mixdepth, self.ADDRESS_TYPE_EXTERNAL)
def get_internal_addr(self, mixdepth):
"""
Return an address for internal usage, as change addresses and when
participating in transactions initiated by other parties.
"""
return self.get_new_addr(mixdepth, self.ADDRESS_TYPE_INTERNAL)
def get_external_script(self, mixdepth):
return self.get_new_script(mixdepth, self.ADDRESS_TYPE_EXTERNAL)
def get_internal_script(self, mixdepth):
return self.get_new_script(mixdepth, self.ADDRESS_TYPE_INTERNAL)
@classmethod
def addr_to_script(cls, addr):
"""
Try not to call this slow method. Instead, call addr_to_path,
followed by get_script_from_path, as those are cached.
"""
return cls._ENGINE.address_to_script(addr)
@classmethod
def pubkey_to_script(cls, pubkey):
"""
Try not to call this slow method. Instead, call
get_script_from_path if possible, as that is cached.
"""
return cls._ENGINE.pubkey_to_script(pubkey)
@classmethod
def pubkey_to_addr(cls, pubkey):
"""
Try not to call this slow method. Instead, call
get_address_from_path if possible, as that is cached.
"""
return cls._ENGINE.pubkey_to_address(pubkey)
def script_to_addr(self, script,
validate_cache: bool = False):
path = self.script_to_path(script)
return self.get_address_from_path(path,
validate_cache=validate_cache)
def get_script_code(self, script):
"""
For segwit wallets, gets the value of the scriptCode
parameter required (see BIP143) for sighashing; this is
required for protocols (like Joinmarket) where signature
verification materials must be communicated between wallets.
For non-segwit wallets, raises EngineError.
"""
path = self.script_to_path(script)
pub, engine = self._get_pubkey_from_path(path)
return engine.pubkey_to_script_code(pub)
@classmethod
def pubkey_has_address(cls, pubkey, addr):
return cls._ENGINE.pubkey_has_address(pubkey, addr)
@classmethod
def pubkey_has_script(cls, pubkey, script):
return cls._ENGINE.pubkey_has_script(pubkey, script)
@deprecated
def get_key(self, mixdepth, address_type, index):
raise NotImplementedError()
def get_addr(self, mixdepth, address_type, index,
validate_cache: bool = False):
path = self.get_path(mixdepth, address_type, index)
return self.get_address_from_path(path,
validate_cache=validate_cache)
def get_address_from_path(self, path,
validate_cache: bool = False):
cache = self._get_cache_for_path(path)
addr = cache.get(b'A')
if addr is not None:
addr = addr.decode('ascii')
if addr is None or validate_cache:
engine = self._get_pubkey_from_path(path)[1]
script = self.get_script_from_path(path,
validate_cache=validate_cache)
new_addr = engine.script_to_address(script)
if addr is None:
addr = new_addr
cache[b'A'] = addr.encode('ascii')
elif addr != new_addr:
raise WalletCacheValidationFailed()
return addr
def get_new_addr(self, mixdepth, address_type,
validate_cache: bool = True):
"""
use get_external_addr/get_internal_addr
"""
script = self.get_new_script(mixdepth, address_type,
validate_cache=validate_cache)
return self.script_to_addr(script,
validate_cache=validate_cache)
def get_new_script(self, mixdepth, address_type,
validate_cache: bool = True):
raise NotImplementedError()
def get_wif(self, mixdepth, address_type, index):
return self.get_wif_path(self.get_path(mixdepth, address_type, index))
def get_wif_path(self, path):
priv, engine = self._get_key_from_path(path)
return engine.privkey_to_wif(priv)
def get_path(self, mixdepth=None, address_type=None, index=None):
raise NotImplementedError()
def get_details(self, path):
"""
Return mixdepth, address_type, index for a given path
args:
path: wallet path
returns:
tuple (mixdepth, type, index)
type is one of 0, 1, 'imported', 2, 3
"""
raise NotImplementedError()
@deprecated
def update_cache_index(self):
"""
Deprecated alias for save()
"""
self.save()
def remove_old_utxos(self, tx):
"""
Remove all own inputs of tx from internal utxo list.
args:
tx: CMutableTransaction
returns:
{(txid, index): {'script': bytes, 'path': str, 'value': int} for all removed utxos
"""
removed_utxos = {}
for inp in tx.vin:
txid = inp.prevout.hash[::-1]
index = inp.prevout.n
md = self._utxos.have_utxo(txid, index)
if md is False:
continue
path, value, height = self._utxos.remove_utxo(txid, index, md)
script = self.get_script_from_path(path)
removed_utxos[(txid, index)] = {'script': script,
'path': path,
'value': value}
return removed_utxos
def add_new_utxos(self, tx, height=None):
"""
Add all outputs of tx for this wallet to internal utxo list.
They are also returned in standard dict form.
args:
tx: CMutableTransaction
height: blockheight in which tx was included, or None
if unconfirmed.
returns:
{(txid, index): {'script': bytes, 'path': tuple, 'value': int,
'address': str}
for all added utxos
"""
added_utxos = {}
txid = tx.GetTxid()[::-1]
for index, outs in enumerate(tx.vout):
spk = outs.scriptPubKey
val = outs.nValue
try:
self.add_utxo(txid, index, spk, val, height=height)
except WalletError:
continue
path = self.script_to_path(spk)
added_utxos[(txid, index)] = {'script': spk, 'path': path, 'value': val,
'address': self._ENGINE.script_to_address(spk)}
return added_utxos
def add_utxo(self, txid, index, script, value, height=None):
assert isinstance(txid, bytes)
assert isinstance(index, Integral)
assert isinstance(script, bytes)
assert isinstance(value, Integral)
if script not in self._script_map:
raise WalletError("Tried to add UTXO for unknown key to wallet.")
path = self.script_to_path(script)
mixdepth = self._get_mixdepth_from_path(path)
self._utxos.add_utxo(txid, index, path, value, mixdepth, height=height)
def inputs_consumed_by_tx(self, tx):
""" Given a transaction tx, checks
which if any of the inputs belonged to this
wallet, and returns [(index, CTxIn),..] for each.
"""
retval = []
for i, txin in enumerate(tx.vin):
pub, msg = btc.extract_pubkey_from_witness(tx, i)
if not pub:
# this can certainly occur since other inputs
# may not be a spending a script we recognize;
# so we ignore the msg
continue
script = self.pubkey_to_script(pub)
if script in self._script_map:
retval.append((i, txin))
return retval
def process_new_tx(self, txd, height=None):
""" Given a newly seen transaction, deserialized as
CMutableTransaction txd,
process its inputs and outputs and update
the utxo contents of this wallet accordingly.
NOTE: this should correctly handle transactions that are not
actually related to the wallet; it will not add (or remove,
obviously) utxos that were not related since the underlying
functions check this condition.
"""
removed_utxos = self.remove_old_utxos(txd)
added_utxos = self.add_new_utxos(txd, height=height)
return (removed_utxos, added_utxos)
def select_utxos(self, mixdepth, amount, utxo_filter=None,
select_fn=None, maxheight=None, includeaddr=False,
require_auth_address=False):
"""
Select a subset of available UTXOS for a given mixdepth whose value is
greater or equal to amount. If `includeaddr` is True, adds an `address`
key to the returned dict.
args:
mixdepth: int, mixdepth to select utxos from, must be smaller or
equal to wallet.max_mixdepth
amount: int, total minimum amount of all selected utxos
utxo_filter: list of (txid, index), utxos not to select
maxheight: only select utxos with blockheight <= this.
require_auth_address: if True, output utxos must include a
standard wallet address. The first item of the output dict is
guaranteed to be a suitable utxo. Result will be empty if no
such utxo set could be found.
returns:
{(txid, index): {'script': bytes, 'path': tuple, 'value': int}}
raises:
NotEnoughFundsException: if mixdepth does not have utxos with
enough value to satisfy amount
"""
assert isinstance(mixdepth, numbers.Integral)
assert isinstance(amount, numbers.Integral)
if not utxo_filter:
utxo_filter = ()
for i in utxo_filter:
assert len(i) == 2
assert isinstance(i[0], bytes)
assert isinstance(i[1], numbers.Integral)
utxos = self._utxos.select_utxos(
mixdepth, amount, utxo_filter, select_fn, maxheight=maxheight)
total_value = 0
standard_utxo = None
for key, data in utxos.items():
if self.is_standard_wallet_script(data['path']):
standard_utxo = key
total_value += data['value']
data['script'] = self.get_script_from_path(data['path'])
if includeaddr:
data["address"] = self.get_address_from_path(data["path"])
if require_auth_address and not standard_utxo:
# try to select more utxos, hoping for a standard one
try:
return self.select_utxos(
mixdepth, total_value + 1, utxo_filter, select_fn,
maxheight, includeaddr, require_auth_address)
except NotEnoughFundsException:
# recursive utxo selection was unsuccessful, give up
return {}
elif require_auth_address:
utxos = collections.OrderedDict(utxos)
utxos.move_to_end(standard_utxo, last=False)
return utxos
def disable_utxo(self, txid, index, disable=True):
self._utxos.disable_utxo(txid, index, disable)
# make sure the utxo database is persisted
self.save()
def toggle_disable_utxo(self, txid, index):
is_disabled = self._utxos.is_disabled(txid, index)
self.disable_utxo(txid, index, disable= not is_disabled)
def reset_utxos(self):
self._utxos.reset()
def get_balance_by_mixdepth(self, verbose=True,
include_disabled=False,
maxheight=None):
"""
Get available funds in each active mixdepth.
By default ignores disabled utxos in calculation.
By default returns unconfirmed transactions, to filter
confirmations, set maxheight to max acceptable blockheight.
returns: {mixdepth: value}
"""
balances = collections.defaultdict(int)
for md in range(self.mixdepth + 1):
balances[md] = self.get_balance_at_mixdepth(md, verbose=verbose,
include_disabled=include_disabled, maxheight=maxheight)
return balances
def get_balance_at_mixdepth(self, mixdepth,
verbose: bool = True,
include_disabled: bool = False,
maxheight: Optional[int] = None) -> int:
# TODO: verbose
return self._utxos.get_balance_at_mixdepth(mixdepth,
include_disabled=include_disabled, maxheight=maxheight)
def get_utxos_by_mixdepth(self, include_disabled: bool = False,
includeheight: bool = False,
limit_mixdepth: Optional[int] = None
) -> collections.defaultdict:
"""
Get all UTXOs for active mixdepths or specified mixdepth.
returns:
{mixdepth: {(txid, index):
{'script': bytes, 'path': tuple, 'value': int}}}
(if `includeheight` is True, adds key 'height': int)
"""
script_utxos = collections.defaultdict(dict)
if limit_mixdepth:
script_utxos[limit_mixdepth] = self.get_utxos_at_mixdepth(
mixdepth=limit_mixdepth, include_disabled=include_disabled,
includeheight=includeheight)
else:
for md in range(self.mixdepth + 1):
script_utxos[md] = self.get_utxos_at_mixdepth(md,
include_disabled=include_disabled,
includeheight=includeheight)
return script_utxos
def get_utxos_at_mixdepth(self, mixdepth: int,
include_disabled: bool = False,
includeheight: bool = False) -> \
Dict[Tuple[bytes, int], Dict[str, Any]]:
script_utxos = {}
if 0 <= mixdepth <= self.mixdepth:
data = self._utxos.get_utxos_at_mixdepth(mixdepth)
for utxo, (path, value, height) in data.items():
if not include_disabled and self._utxos.is_disabled(*utxo):
continue
script = self.get_script_from_path(path)
addr = self.get_address_from_path(path)
label = self.get_address_label(addr)
script_utxo = {
'script': script,
'path': path,
'value': value,
'address': addr,
'label': label,
}
if includeheight:
script_utxo['height'] = height
script_utxos[utxo] = script_utxo
return script_utxos
def get_all_utxos(self, include_disabled=False):
""" Get all utxos in the wallet, format of return
is as for get_utxos_by_mixdepth for each mixdepth.
"""
mix_utxos = self.get_utxos_by_mixdepth(
include_disabled=include_disabled)
all_utxos = {}
for d in mix_utxos.values():
all_utxos.update(d)
return all_utxos
@classmethod
def _get_merge_algorithm(cls, algorithm_name=None):
if not algorithm_name:
try:
algorithm_name = jm_single().config.get('POLICY',
'merge_algorithm')
except NoOptionError:
algorithm_name = 'default'
alg = cls.MERGE_ALGORITHMS.get(algorithm_name)
if alg is None:
raise Exception("Unknown merge algorithm: '{}'."
"".format(algorithm_name))
return alg
def _get_mixdepth_from_path(self, path):
raise NotImplementedError()
def get_script_from_path(self, path,
validate_cache: bool = False):
"""
internal note: This is the final sink for all operations that somehow
need to derive a script. If anything goes wrong when deriving a
script this is the place to look at.
args:
path: wallet path
returns:
script
"""
cache = self._get_cache_for_path(path)
script = cache.get(b'S')
if script is None or validate_cache:
pubkey, engine = self._get_pubkey_from_path(path,
validate_cache=validate_cache)
new_script = engine.pubkey_to_script(pubkey)
if script is None:
cache[b'S'] = script = new_script
elif script != new_script:
raise WalletCacheValidationFailed()
return script
def get_script(self, mixdepth, address_type, index,
validate_cache: bool = False):
path = self.get_path(mixdepth, address_type, index)
return self.get_script_from_path(path, validate_cache=validate_cache)
def _get_key_from_path(self, path,
validate_cache: bool = False):
raise NotImplementedError()
def _get_keypair_from_path(self, path,
validate_cache: bool = False):
privkey, engine = self._get_key_from_path(path,
validate_cache=validate_cache)
cache = self._get_cache_for_path(path)
pubkey = cache.get(b'P')
if pubkey is None or validate_cache:
new_pubkey = engine.privkey_to_pubkey(privkey)
if pubkey is None:
cache[b'P'] = pubkey = new_pubkey
elif pubkey != new_pubkey:
raise WalletCacheValidationFailed()
return privkey, pubkey, engine
def _get_pubkey_from_path(self, path,
validate_cache: bool = False):
privkey, pubkey, engine = self._get_keypair_from_path(path,
validate_cache=validate_cache)
return pubkey, engine
def _get_cache_keys_for_path(self, path):
return path[:1] + tuple(map(_int_to_bytestr, path[1:]))
def _get_cache_for_path(self, path):
assert len(path) > 0
cache = self._cache
for k in self._get_cache_keys_for_path(path):
cache = cache.setdefault(k, {})
return cache
def _delete_cache_for_path(self, path) -> bool:
assert len(path) > 0
def recurse(cache, itr):
k = next(itr, None)
if k is None:
cache.clear()
else:
child = cache.get(k)
if not child or not recurse(child, itr):
return False
if not child:
del cache[k]
return True
return recurse(self._cache, iter(self._get_cache_keys_for_path(path)))
def get_path_repr(self, path):
"""
Get a human-readable representation of the wallet path.
args:
path: path tuple
returns:
str
"""
raise NotImplementedError()
def path_repr_to_path(self, pathstr):
"""
Convert a human-readable path representation to internal representation.
args:
pathstr: str
returns:
path tuple
"""
raise NotImplementedError()
def get_next_unused_index(self, mixdepth, address_type):
"""
Get the next index for public scripts/addresses not yet handed out.
returns:
int >= 0
"""
raise NotImplementedError()
def get_mnemonic_words(self):
"""
Get mnemonic seed words for master key.
returns:
mnemonic_seed, seed_extension
mnemonic_seed is a space-separated str of words
seed_extension is a str or None
"""
raise NotImplementedError()
def sign_message(self, message, path):
"""
Sign the message using the key referenced by path.
args:
message: bytes
path: path tuple
returns as tuple:
address of key that signed
signature as base64-encoded string
"""
priv, engine = self._get_key_from_path(path)
addr = self.get_address_from_path(path)
return addr, engine.sign_message(priv, message)
def get_wallet_name(self):
""" Returns the name used as a label for this
specific Joinmarket wallet in Bitcoin Core.
"""
return JM_WALLET_NAME_PREFIX + self.get_wallet_id()
def get_wallet_id(self):
"""
Get a human-readable identifier for the wallet.
returns:
str
"""
raise NotImplementedError()
def check_wallet_passphrase(self, passphrase):
return self._storage.check_password(passphrase)
def change_wallet_passphrase(self, passphrase):
self._storage.change_password(passphrase)
def yield_imported_paths(self, mixdepth):
"""
Get an iterator for all imported keys in given mixdepth.
params:
mixdepth: int
returns:
iterator of wallet paths
"""
return iter([])
def is_standard_wallet_script(self, path):
"""
Check if the path's script is of the same type as the standard wallet
key type.
return:
bool
"""
raise NotImplementedError()
def is_known_addr(self, addr):
"""
Check if address is known to belong to this wallet.
params:
addr: str
returns:
bool
"""
assert isinstance(addr, str)
return addr in self._addr_map
def is_known_script(self, script):
"""
Check if script is known to belong to this wallet.
params:
script: bytes
returns:
bool
"""
assert isinstance(script, bytes)
return script in self._script_map
def get_addr_mixdepth(self, addr):
path = self.addr_to_path(addr)
return self._get_mixdepth_from_path(path)
def get_script_mixdepth(self, script):
path = self.script_to_path(script)
return self._get_mixdepth_from_path(path)
def yield_known_paths(self):
"""
Generator for all paths currently known to the wallet
returns:
path generator
"""
for md in range(self.max_mixdepth + 1):
for path in self.yield_imported_paths(md):
yield path
def _populate_maps(self, paths):
for path in paths:
self._script_map[self.get_script_from_path(path)] = path
self._addr_map[self.get_address_from_path(path)] = path
def addr_to_path(self, addr):
assert isinstance(addr, str)
path = self._addr_map.get(addr)
assert path is not None
return path
def script_to_path(self, script):
assert isinstance(script, bytes)
path = self._script_map.get(script)
assert path is not None
return path
def set_next_index(self, mixdepth, address_type, index, force=False):
"""
Set the next index to use when generating a new key pair.
params:
mixdepth: int
address_type: 0 (external) or 1 (internal)
index: int
force: True if you know the wallet already knows all scripts
up to (excluding) the given index
Warning: improper use of 'force' will cause undefined behavior!
"""
raise NotImplementedError()
def rewind_wallet_indices(self, used_indices, saved_indices):
for md in used_indices:
for address_type in range(min(len(used_indices[md]), len(saved_indices[md]))):
index = max(used_indices[md][address_type],
saved_indices[md][address_type])
self.set_next_index(md, address_type, index, force=True)
def _get_default_used_indices(self):
return {x: [0, 0] for x in range(self.max_mixdepth + 1)}
def get_used_indices(self, addr_gen):
""" Returns a dict of max used indices for each branch in
the wallet, from the given addresses addr_gen, assuming
that they are known to the wallet.
"""
indices = self._get_default_used_indices()
for addr in addr_gen:
if not self.is_known_addr(addr):
continue
md, address_type, index = self.get_details(
self.addr_to_path(addr))
if address_type not in (self.BIP32_EXT_ID, self.BIP32_INT_ID,
FidelityBondMixin.BIP32_TIMELOCK_ID, FidelityBondMixin.BIP32_BURN_ID):
assert address_type == 'imported'
continue
indices[md][address_type] = max(indices[md][address_type], index + 1)
return indices
def check_gap_indices(self, used_indices):
""" Return False if any of the provided indices (which should be
those seen from listtransactions as having been used, for
this wallet/label) are higher than the ones recorded in the index
cache."""
for md in used_indices:
for address_type in (self.ADDRESS_TYPE_EXTERNAL,
self.ADDRESS_TYPE_INTERNAL):
if used_indices[md][address_type] >\
max(self.get_next_unused_index(md, address_type), 0):
return False
return True
def set_address_label(self, addr, label):
if self.is_known_addr(addr):
self._addr_labels.set_label(addr, label)
self.save()
else:
raise UnknownAddressForLabel(addr)
def get_address_label(self, addr):
if self.is_known_addr(addr):
return self._addr_labels.get_label(addr)
else:
raise UnknownAddressForLabel(addr)
def close(self):
self._storage.close()
def __del__(self):
self.close()
class PSBTWalletMixin(object):
"""
Mixin for BaseWallet to provide BIP174
functions.
"""
def __init__(self, storage, **kwargs):
super().__init__(storage, **kwargs)
def is_input_finalized(self, psbt_input):
""" This should be a convenience method in python-bitcointx.
However note: this is not a static method and tacitly
assumes that the input under examination is of the wallet's
type.
"""
assert isinstance(psbt_input, btc.PSBT_Input)
if not psbt_input.utxo:
return False
if isinstance(self, (LegacyWallet, SegwitLegacyWallet)):
if not psbt_input.final_script_sig:
return False
if isinstance(self, (SegwitLegacyWallet, SegwitWallet)):
if not psbt_input.final_script_witness:
return False
return True
@staticmethod
def human_readable_psbt(in_psbt):
""" Returns a jsonified indented string with all relevant
information, in human readable form, contained in a PSBT.
Warning: the output can be very verbose in certain cases.
"""
assert isinstance(in_psbt, btc.PartiallySignedTransaction)
outdict = {}
outdict["psbt-version"] = in_psbt.version
# human readable serialization of these three global fields is for
# now on a "best-effort" basis, i.e. just takes the representation
# provided by the underlying classes in bitcointx, though this may
# not be very readable.
# TODO: Improve proprietary/unknown as needed.
if in_psbt.xpubs:
outdict["xpubs"] = {str(k): bintohex(
v.serialize()) for k, v in in_psbt.xpubs.items()}
if in_psbt.proprietary_fields:
outdict["proprietary-fields"] = str(in_psbt.proprietary_fields)
if in_psbt.unknown_fields:
outdict["unknown-fields"] = str(in_psbt.unknown_fields)
outdict["unsigned-tx"] = btc.human_readable_transaction(
in_psbt.unsigned_tx, jsonified=False)
outdict["psbt-inputs"] = []
for inp in in_psbt.inputs:
outdict["psbt-inputs"].append(
PSBTWalletMixin.human_readable_psbt_in(inp))
outdict["psbt-outputs"] = []
for out in in_psbt.outputs:
outdict["psbt-outputs"].append(
PSBTWalletMixin.human_readable_psbt_out(out))
return json.dumps(outdict, indent=4)
@staticmethod
def human_readable_psbt_in(psbt_input):
""" Returns a dict containing human readable information
about a bitcointx.core.psbt.PSBT_Input object.
"""
assert isinstance(psbt_input, btc.PSBT_Input)
outdict = {}
if psbt_input.index is not None:
outdict["input-index"] = psbt_input.index
if psbt_input.utxo:
if isinstance(psbt_input.utxo, btc.CTxOut):
outdict["utxo"] = btc.human_readable_output(psbt_input.utxo)
elif isinstance(psbt_input.utxo, btc.CTransaction):
# human readable full transaction is *too* verbose:
outdict["utxo"] = bintohex(psbt_input.utxo.serialize())
if psbt_input.witness_utxo:
outdict["witness_utxo"] = bintohex(psbt_input.witness_utxo.serialize())
else:
assert False, "invalid PSBT Input utxo field."
if psbt_input.sighash_type:
outdict["sighash-type"] = psbt_input.sighash_type
if psbt_input.redeem_script:
outdict["redeem-script"] = bintohex(psbt_input.redeem_script)
if psbt_input.witness_script:
outdict["witness-script"] = bintohex(psbt_input.witness_script)
if psbt_input.partial_sigs:
# convert the dict entries to hex:
outdict["partial-sigs"] = {bintohex(k): bintohex(v) for k,v in \
psbt_input.partial_sigs.items()}
# Note we do not currently add derivation info to our own inputs,
# but probably will in future ( TODO ), still this is shown for
# externally generated PSBTs:
if psbt_input.derivation_map:
# TODO it would be more useful to print the indexes of the
# derivation path as integers, than 4 byte hex:
outdict["derivation-map"] = {bintohex(k): bintohex(v.serialize(
)) for k, v in psbt_input.derivation_map.items()}
# we show these fields on a best-effort basis; same comment as for
# globals section as mentioned in hr_psbt()
if psbt_input.proprietary_fields:
outdict["proprietary-fields"] = str(psbt_input.proprietary_fields)
if psbt_input.unknown_fields:
outdict["unknown-fields"] = str(psbt_input.unknown_fields)
if psbt_input.proof_of_reserves_commitment:
outdict["proof-of-reserves-commitment"] = \
str(psbt_input.proof_of_reserves_commitment)
outdict["final-scriptSig"] = bintohex(psbt_input.final_script_sig)
outdict["final-scriptWitness"] = bintohex(
psbt_input.final_script_witness.serialize())
return outdict
@staticmethod
def human_readable_psbt_out(psbt_output):
""" Returns a dict containing human readable information
about a PSBT_Output object.
"""
assert isinstance(psbt_output, btc.PSBT_Output)
outdict = {}
if psbt_output.index is not None:
outdict["output-index"] = psbt_output.index
if psbt_output.derivation_map:
# See note to derivation map in hr_psbt_in()
outdict["derivation-map"] = {bintohex(k): bintohex(v.serialize(
)) for k, v in psbt_output.derivation_map.items()}
if psbt_output.redeem_script:
outdict["redeem-script"] = bintohex(psbt_output.redeem_script)
if psbt_output.witness_script:
outdict["witness-script"] = bintohex(psbt_output.witness_script)
if psbt_output.proprietary_fields:
outdict["proprietary-fields"] = str(psbt_output.proprietary_fields)
if psbt_output.unknown_fields:
outdict["unknown-fields"] = str(psbt_output.unknown_fields)
return outdict
@staticmethod
def witness_utxos_to_psbt_utxos(utxos):
""" Given a dict of utxos as returned from select_utxos,
convert them to the format required to populate PSBT inputs,
namely CTxOut. Note that the non-segwit case is different, there
you should provide an entire CMutableTransaction object instead.
"""
return [btc.CMutableTxOut(v["value"],
v["script"]) for _, v in utxos.items()]
@staticmethod
def check_finalized_input_type(psbt_input):
""" Given an input of a PSBT which is already finalized,
return its type as either "sw-legacy" or "sw" or False
if not one of these two types.
TODO: can be extented to other types, does not currently
support any non-segwit input.
"""
assert isinstance(psbt_input, btc.PSBT_Input)
if not psbt_input.final_script_witness:
return False
if psbt_input.witness_utxo.scriptPubKey.is_p2sh():
# Note: p2sh does not prove the redeemscript;
# we check the finalscriptSig is p2wpkh:
fss = btc.CScript(next(btc.CScript(
psbt_input.final_script_sig).raw_iter())[1])
if fss.is_witness_v0_keyhash():
return "sw-legacy"
elif psbt_input.witness_utxo.scriptPubKey.is_witness_v0_keyhash():
return "sw"
else:
return False
def create_psbt_from_tx(self, tx, spent_outs=None, force_witness_utxo=True):
""" Given a CMutableTransaction object, which should not currently
contain signatures, we create and return a new PSBT object of type
btc.PartiallySignedTransaction.
Optionally the information about the spent outputs that is stored
in PSBT_IN_NONWITNESS_UTXO, PSBT_IN_WITNESS_UTXO and PSBT_IN_REDEEM_SCRIPT
can also be provided, one item per input, in the tuple (spent_outs).
These objects should be either CMutableTransaction, CTxOut or None,
Note that redeem script information cannot be provided for inputs which
we don't own.
"""
# TODO: verify tx contains no signatures as a sanity check?
new_psbt = btc.PartiallySignedTransaction(unsigned_tx=tx)
if spent_outs is None:
# user has not provided input script information; psbt
# will not yet be usable for signing.
return new_psbt
for i, txinput in enumerate(new_psbt.inputs):
if spent_outs[i] is None:
# as above, will not be signable in this case
continue
if isinstance(spent_outs[i], (btc.CTransaction, btc.CTxOut)):
# note: if spent_outs[i] is in fact CTransaction, then
# the unsigned_tx must be provided (tx) in order to populate
# the witness_utxo field of this PSBT_Input (and if it isn't,
# this argument is ignored), but also, until the redeem script
# is populated, then for the p2sh case, the lib can't know
# whether it is witness, so we must force the witness_utxo here,
# unless this is switched off in the kwargs of this function:
txinput.set_utxo(spent_outs[i], tx,
force_witness_utxo=force_witness_utxo)
else:
assert False, "invalid spent output type passed into PSBT creator"
# we now insert redeemscripts where that is possible and necessary:
for i, txinput in enumerate(new_psbt.inputs):
if isinstance(txinput.witness_utxo, btc.CTxOut):
# witness
if txinput.witness_utxo.scriptPubKey.is_witness_scriptpubkey():
# nothing needs inserting; the scriptSig is empty.
continue
elif txinput.witness_utxo.scriptPubKey.is_p2sh():
try:
path = self.script_to_path(txinput.witness_utxo.scriptPubKey)
except AssertionError:
# this happens when an input is provided but it's not in
# this wallet; in this case, we cannot set the redeem script.
continue
pubkey = self._get_pubkey_from_path(path)[0]
txinput.redeem_script = btc.pubkey_to_p2wpkh_script(pubkey)
return new_psbt
def sign_psbt(self, in_psbt, with_sign_result=False):
""" Given a serialized PSBT in raw binary format,
iterate over the inputs and sign all that we can sign with this wallet.
NB IT IS UP TO CALLERS TO ENSURE THAT THEY ACTUALLY WANT TO SIGN
THIS TRANSACTION!
The above is important especially in coinjoin scenarios.
Return: (psbt, msg)
msg: error message or None
if not `with_sign_result`:
psbt: signed psbt in binary serialization, or None if error.
if `with_sign_result` True:
psbt: (PSBT_SignResult object, psbt (deserialized) object)
"""
try:
new_psbt = btc.PartiallySignedTransaction.from_binary(in_psbt)
except Exception as e:
return None, "Unable to deserialize binary PSBT, error: " + repr(e)
privkeys = []
for k, v in self._utxos._utxo.items():
for k2, v2 in v.items():
key = self._get_key_from_path(v2[0])
if FidelityBondMixin.is_timelocked_path(v2[0]) and len(key[0]) == 2:
#key is ((privkey, locktime), engine) for timelocked addrs
key = (key[0][0], key[1])
privkeys.append(key)
# in the rare situation that we want to sign a psbt using private keys
# to utxos that we've stopped tracking, let's also find inputs that
# belong to us and add those private keys as well
for vin in new_psbt.inputs:
try:
path = self.script_to_path(vin.utxo.scriptPubKey)
key = self._get_key_from_path(path)
if key not in privkeys:
privkeys.append(key)
except AssertionError:
# we can safely assume that an exception means we do not
# have the ability to sign for this input
continue
except AttributeError:
# shouldn't happen for properly constructed psbts
# however, psbts with no utxo information will raise
# an AttributeError exception. we simply ignore it.
continue
jmckeys = list(btc.JMCKey(x[0][:-1]) for x in privkeys)
new_keystore = btc.KeyStore.from_iterable(jmckeys)
# for p2sh inputs that we want to sign, the redeem_script
# field must be populated by us, as the counterparty did not
# know it. If this was set in an earlier create-psbt role,
# then overwriting it is harmless (preimage resistance).
if isinstance(self, SegwitLegacyWallet):
for i, txinput in enumerate(new_psbt.inputs):
tu = txinput.witness_utxo
if isinstance(tu, btc.CTxOut):
# witness
if tu.scriptPubKey.is_witness_scriptpubkey():
# native segwit; no insertion needed.
continue
elif tu.scriptPubKey.is_p2sh():
try:
path = self.script_to_path(tu.scriptPubKey)
except AssertionError:
# this happens when an input is provided but it's not in
# this wallet; in this case, we cannot set the redeem script.
continue
pubkey = self._get_pubkey_from_path(path)[0]
txinput.redeem_script = btc.pubkey_to_p2wpkh_script(pubkey)
# no else branch; any other form of scriptPubKey will just be
# ignored.
try:
signresult = new_psbt.sign(new_keystore)
except Exception as e:
return None, repr(e)
if not with_sign_result:
return new_psbt.serialize(), None
else:
return (signresult, new_psbt), None
class SNICKERWalletMixin(object):
SUPPORTED_SNICKER_VERSIONS = bytes([0, 1])
def __init__(self, storage, **kwargs):
super().__init__(storage, **kwargs)
def check_tweak_matches_and_import(self, addr, tweak, tweaked_key,
source_mixdepth):
""" Given the address from our HD wallet, the tweak bytes and
the tweaked public key, check the tweak correctly generates the
claimed tweaked public key. If not, (False, errmsg is returned),
if so, we import the private key to a mixdepth *distinct* from
source_mixdepth, and (True, None) is returned, if it works.
"""
try:
priv = self.get_key_from_addr(addr)
except:
return False, "Not our address"
tweaked_privkey = btc.snicker_privkey_tweak(priv, tweak)
if not btc.privkey_to_pubkey(tweaked_privkey) == tweaked_key:
hextk = bintohex(tweaked_key)
hexftpk = bintohex(btc.privkey_to_pubkey(tweaked_privkey))
return False, ("Was not able to recover tweaked pubkey "
"from tweaked privkey - code error."
"Expected: " + hextk + ", got: " + hexftpk)
# note that the WIF is not preserving SPK type, it's implied
# for the wallet.
try:
self.import_private_key((source_mixdepth + 1) % (self.mixdepth + 1),
self._ENGINE.privkey_to_wif(tweaked_privkey))
self.save()
except:
return False, "Failed to import private key."
return True, None
def create_snicker_proposal(self, our_inputs, their_input, our_input_utxos,
their_input_utxo, net_transfer, network_fee,
our_priv, their_pub, our_spk, change_spk,
encrypted=True, version_byte=1):
""" Creates a SNICKER proposal from the given transaction data.
This only applies to existing specification, i.e. SNICKER v 00 or 01.
This is only to be used for Joinmarket and only segwit wallets.
`our_inputs`, `their_input` - utxo format used in JM wallets (first is list),
keyed by (tixd, n), as dicts (currently of single entry).
`our_input_utxos`, `their..` - type CTxOut (contains value, scriptPubKey)
net_transfer - amount, after bitcoin transaction fee, transferred from
Proposer (our) to Receiver (their). May be negative.
network_fee - total bitcoin network transaction fee to be paid (so estimates
must occur before this function).
`our_priv`, `their_pub` - these are the keys to be used in ECDH to derive
the tweak as per the BIP. Note `their_pub` may or may not be associated with
the input of the receiver, so is specified here separately. Note also that
according to the BIP the privkey we use *must* be the one corresponding to
the first input we provided, else (properly coded) Receivers will reject our
proposal.
`our_spk` - a scriptPubKey for the Proposer coinjoin output
`change_spk` - a change scriptPubkey for the proposer as per BIP
`encrypted` - whether or not to return the ECIES encrypted version of the
proposal.
`version_byte` - which of currently specified Snicker versions is being
used, (0 for reused address, 1 for inferred key).
returns:
if encrypted is True:
base 64 encoded encrypted transaction proposal as a string
else:
binary serialized plaintext SNICKER message.
"""
assert isinstance(self, PSBTWalletMixin)
# before constructing the bitcoin transaction we must calculate the output
# amounts
# find our total input value:
our_input_val = sum([x.nValue for x in our_input_utxos])
if our_input_val - their_input_utxo.nValue - network_fee <= 0:
raise Exception(
"Cannot create SNICKER proposal, Proposer input too small")
# we must also use ecdh to calculate the output scriptpubkey for the
# receiver
# First, check that `our_priv` corresponds to scriptPubKey in
# first entry in `our_input_utxos` to prevent callers from making
# useless proposals.
expected_pub = btc.privkey_to_pubkey(our_priv)
expected_spk = self.pubkey_to_script(expected_pub)
assert our_input_utxos[0].scriptPubKey == expected_spk
# now we create the ecdh based tweak:
tweak_bytes = btc.ecdh(our_priv[:-1], their_pub)
tweaked_pub = btc.snicker_pubkey_tweak(their_pub, tweak_bytes)
wtype = self.get_txtype()
if wtype == "p2wpkh":
tweaked_spk = btc.pubkey_to_p2wpkh_script(tweaked_pub)
elif wtype == "p2sh-p2wpkh":
tweaked_spk = btc.pubkey_to_p2sh_p2wpkh_script(tweaked_pub)
else:
raise NotImplementedError("SNICKER only supports "
"p2sh-p2wpkh or p2wpkh outputs.")
tweaked_addr, our_addr, change_addr = [str(
btc.CCoinAddress.from_scriptPubKey(x)) for x in (
tweaked_spk, our_spk, change_spk)]
outputs = btc.construct_snicker_outputs(our_input_val,
their_input_utxo.nValue,
tweaked_addr,
our_addr,
change_addr,
network_fee,
net_transfer)
assert all([x["value"] > 0 for x in outputs])
# version and locktime as currently specified in the BIP
# for 0/1 version SNICKER. (Note the locktime is partly because
# of expected delays).
# We do not use `make_shuffled_tx` because we need a specific order
# on the input side: the receiver's is placed randomly, but the first
# *of ours* is the one used for ECDH.
all_inputs = copy.deepcopy(our_inputs)
insertion_index = random.randrange(len(all_inputs)+1)
all_inputs.insert(insertion_index, their_input)
all_input_utxos = copy.deepcopy(our_input_utxos)
all_input_utxos.insert(insertion_index, their_input_utxo)
# for outputs, we must shuffle as normal:
random.shuffle(outputs)
tx = btc.mktx(all_inputs, outputs,
version=2, locktime=0)
# create the psbt and then sign our input.
snicker_psbt = self.create_psbt_from_tx(tx,
spent_outs=all_input_utxos)
# having created the PSBT, sign our input
signed_psbt_and_signresult, err = self.sign_psbt(
snicker_psbt.serialize(), with_sign_result=True)
assert err is None
signresult, partially_signed_psbt = signed_psbt_and_signresult
assert signresult.num_inputs_signed == len(our_inputs)
assert signresult.num_inputs_final == len(our_inputs)
assert not signresult.is_final
snicker_serialized_message = btc.SNICKER_MAGIC_BYTES + bytes(
[version_byte]) + btc.SNICKER_FLAG_NONE + tweak_bytes + \
partially_signed_psbt.serialize()
if not encrypted:
return snicker_serialized_message
# encryption has been requested;
# we apply ECIES in the form given by the BIP.
return btc.ecies_encrypt(snicker_serialized_message, their_pub)
def parse_proposal_to_signed_tx(self, addr, proposal,
acceptance_callback):
""" Given a candidate privkey (binary and compressed format),
and a candidate encrypted SNICKER proposal, attempt to decrypt
and validate it in all aspects. If validation fails the first
return value is None and the second is the reason as a string.
If all validation checks pass, the next step is checking
acceptance according to financial rules: the acceptance
callback must be a function that accepts four arguments:
(our_ins, their_ins, our_outs, their_outs), where *ins values
are lists of CTxIns and *outs are lists of CTxOuts,
and must return only True/False where True means that the
transaction should be signed.
If True is returned from the callback, the following are returned
from this function:
(raw transaction for broadcasting (serialized),
tweak value as bytes, derived output spk belonging to receiver)
Note: flags is currently always None as version is only 0 or 1.
"""
assert isinstance(self, PSBTWalletMixin)
try:
privkey = self.get_key_from_addr(addr)
except:
return None, "Could not derive privkey from address"
our_pub = btc.privkey_to_pubkey(privkey)
if len(proposal) < 5:
return None, "Invalid proposal, too short."
if base64.b64decode(proposal)[:4] == btc.ECIES_MAGIC_BYTES:
# attempt decryption and reject if fails:
try:
snicker_message = btc.ecies_decrypt(privkey, proposal)
except Exception as e:
return None, "Failed to decrypt." + repr(e)
else:
snicker_message = proposal
# magic + version,flag + tweak + psbt:
# TODO replace '20' with the minimum feasible PSBT.
if len(snicker_message) < 7 + 2 + 32 + 20:
return None, "Invalid proposal, too short."
if snicker_message[:7] != btc.SNICKER_MAGIC_BYTES:
return None, "Invalid SNICKER magic bytes."
version_byte = bytes([snicker_message[7]])
flag_byte = bytes([snicker_message[8]])
if version_byte not in self.SUPPORTED_SNICKER_VERSIONS:
return None, "Unrecognized SNICKER version: " + version_byte
if flag_byte != btc.SNICKER_FLAG_NONE:
return None, "Invalid flag byte for version 0,1: " + flag_byte
tweak_bytes = snicker_message[9:41]
candidate_psbt_serialized = snicker_message[41:]
# attempt to validate the PSBT's format:
try:
cpsbt = btc.PartiallySignedTransaction.from_base64_or_binary(
candidate_psbt_serialized)
except:
return None, "Invalid PSBT format."
utx = cpsbt.unsigned_tx
# validate that it contains one signature, and two inputs.
# else the proposal is invalid. To achieve this, we call
# PartiallySignedTransaction.sign() with an empty KeyStore,
# which populates the 'is_signed' info fields for us. Note that
# we do not use the PSBTWalletMixin.sign_psbt() which automatically
# signs with our keys.
if len(utx.vin) < 2:
return None, "PSBT proposal does not contain at least 2 inputs."
testsignresult = cpsbt.sign(btc.KeyStore(), finalize=False)
# Note: "num_inputs_signed" refers to how many *we* signed,
# which is obviously none here as we provided no keys.
if not (testsignresult.num_inputs_signed == 0 and \
testsignresult.num_inputs_final == len(utx.vin)-1 and \
not testsignresult.is_final):
return None, "PSBT proposal is not fully signed by proposer."
# Validate that we own one SNICKER style output:
spk = btc.verify_snicker_output(utx, our_pub, tweak_bytes)
if spk[0] == -1:
return None, "Tweaked destination not found exactly once."
our_output_index = spk[0]
our_output_amount = utx.vout[our_output_index].nValue
# At least one other output must have an amount equal to that at
# `our_output_index`, according to the spec.
found = 0
for i, o in enumerate(utx.vout):
if i == our_output_index:
continue
if o.nValue == our_output_amount:
found += 1
if found != 1:
return None, "Invalid SNICKER, there are not two equal outputs."
# To allow the acceptance callback to assess validity, we must identify
# which input is ours and which is(are) not.
# See https://github.com/Simplexum/python-bitcointx/issues/39
# for background on why this is different per wallet type
unsigned_index = -1
for i, psbtinputsigninfo in enumerate(testsignresult.inputs_info):
if psbtinputsigninfo is None:
unsigned_index = i
break
if psbtinputsigninfo.num_new_sigs == 0 and \
psbtinputsigninfo.is_final == False:
unsigned_index = i
break
assert unsigned_index != -1
# All validation checks passed. We now check whether the
#transaction is acceptable according to the caller:
if not acceptance_callback([utx.vin[unsigned_index]],
[x for i, x in enumerate(utx.vin) if i != unsigned_index],
[utx.vout[our_output_index]],
[x for i, x in enumerate(utx.vout) if i != our_output_index]):
return None, "Caller rejected transaction for signing."
# Acceptance passed, prepare the deserialized tx for signing by us:
signresult_and_signedpsbt, err = self.sign_psbt(cpsbt.serialize(),
with_sign_result=True)
if err:
return None, "Unable to sign proposed PSBT, reason: " + err
signresult, signed_psbt = signresult_and_signedpsbt
assert signresult.num_inputs_signed == 1
assert signresult.num_inputs_final == len(utx.vin)
assert signresult.is_final
# we now know the transaction is valid and fully signed; return to caller,
# along with supporting data for this tx:
return (signed_psbt.extract_transaction(), tweak_bytes, spk[1])
class ImportWalletMixin(object):
"""
Mixin for BaseWallet to support importing keys.
"""
_IMPORTED_STORAGE_KEY = b'imported_keys'
_IMPORTED_ROOT_PATH = b'imported'
def __init__(self, storage, **kwargs):
# {mixdepth: [(privkey, type)]}
self._imported = None
# path is (_IMPORTED_ROOT_PATH, mixdepth, key_index)
super().__init__(storage, **kwargs)
def _load_storage(self, load_cache: bool = True) -> None:
super()._load_storage(load_cache=load_cache)
self._imported = collections.defaultdict(list)
for md, keys in self._storage.data[self._IMPORTED_STORAGE_KEY].items():
md = int(md)
self._imported[md] = keys
self._populate_maps(self.yield_imported_paths(md))
def save(self):
import_data = {}
for md in self._imported:
import_data[_int_to_bytestr(md)] = self._imported[md]
self._storage.data[self._IMPORTED_STORAGE_KEY] = import_data
super().save()
@classmethod
def initialize(cls, storage, network, max_mixdepth=2, timestamp=None,
write=True, **kwargs):
super(ImportWalletMixin, cls).initialize(
storage, network, max_mixdepth, timestamp, write=False, **kwargs)
storage.data[cls._IMPORTED_STORAGE_KEY] = {}
if write:
storage.save()
def import_private_key(self, mixdepth, wif):
"""
Import a private key in WIF format.
args:
mixdepth: int, mixdepth to import key into
wif: str, private key in WIF format
raises:
WalletError: if key's network does not match wallet network
WalletError: if key is not compressed and type is not P2PKH
returns:
path of imported key
"""
if not 0 <= mixdepth <= self.max_mixdepth:
raise WalletError("Mixdepth must be positive and at most {}."
"".format(self.max_mixdepth))
privkey, key_type_wif = self._ENGINE.wif_to_privkey(wif)
# FIXME: there is no established standard for encoding key type in wif
#if key_type is not None and key_type_wif is not None and \
# key_type != key_type_wif:
# raise WalletError("Expected key type does not match WIF type.")
# default to wallet key type
key_type = self.TYPE
engine = self._ENGINES[key_type]
if engine.key_to_script(privkey) in self._script_map:
raise WalletError("Cannot import key, already in wallet: {}"
"".format(wif))
self._imported[mixdepth].append((privkey, key_type))
return self._cache_imported_key(mixdepth, privkey, key_type,
len(self._imported[mixdepth]) - 1)
def remove_imported_key(self, script=None, address=None, path=None):
"""
Remove an imported key. Arguments are exclusive.
args:
script: bytes
address: str
path: path
"""
if sum((bool(script), bool(address), bool(path))) != 1:
raise Exception("Only one of script|address|path may be given.")
if address:
path = self.addr_to_path(address)
elif script:
path = self.script_to_path(script)
if not path:
raise WalletError("Cannot find key in wallet.")
if not self._is_imported_path(path):
raise WalletError("Cannot remove non-imported key.")
assert len(path) == 3
if not script:
script = self.get_script_from_path(path)
if not address:
address = self.get_address_from_path(path)
# we need to retain indices
self._imported[path[1]][path[2]] = (b'', -1)
del self._script_map[script]
del self._addr_map[address]
self._delete_cache_for_path(path)
def _cache_imported_key(self, mixdepth, privkey, key_type, index):
path = (self._IMPORTED_ROOT_PATH, mixdepth, index)
self._populate_maps((path,))
return path
def _get_mixdepth_from_path(self, path):
if not self._is_imported_path(path):
return super()._get_mixdepth_from_path(path)
assert len(path) == 3
return path[1]
def _get_key_from_path(self, path,
validate_cache: bool = False):
if not self._is_imported_path(path):
return super()._get_key_from_path(path,
validate_cache=validate_cache)
assert len(path) == 3
md, i = path[1], path[2]
assert 0 <= md <= self.max_mixdepth
if len(self._imported[md]) <= i:
raise WalletError("unknown imported key at {}"
"".format(self.get_path_repr(path)))
key, key_type = self._imported[md][i]
if key_type == -1:
raise WalletError("imported key was removed")
return key, self._ENGINES[key_type]
@classmethod
def _is_imported_path(cls, path):
return len(path) == 3 and path[0] == cls._IMPORTED_ROOT_PATH
def is_standard_wallet_script(self, path):
if self._is_imported_path(path):
engine = self._get_pubkey_from_path(path)[1]
return engine == self._ENGINE
return super().is_standard_wallet_script(path)
def path_repr_to_path(self, pathstr):
spath = pathstr.encode('ascii').split(b'/')
if not self._is_imported_path(spath):
return super().path_repr_to_path(pathstr)
return self._IMPORTED_ROOT_PATH, int(spath[1]), int(spath[2])
def get_path_repr(self, path):
if not self._is_imported_path(path):
return super().get_path_repr(path)
assert len(path) == 3
return 'imported/{}/{}'.format(*path[1:])
def yield_imported_paths(self, mixdepth):
assert 0 <= mixdepth <= self.max_mixdepth
for index in range(len(self._imported[mixdepth])):
if self._imported[mixdepth][index][1] == -1:
continue
yield (self._IMPORTED_ROOT_PATH, mixdepth, index)
def get_details(self, path):
if not self._is_imported_path(path):
return super().get_details(path)
return path[1], 'imported', path[2]
class BIP39WalletMixin(object):
"""
Mixin to use BIP-39 mnemonic seed with BIP32Wallet
"""
_BIP39_EXTENSION_KEY = b'seed_extension'
MNEMONIC_LANG = 'english'
def _load_storage(self, load_cache: bool = True) -> None:
super()._load_storage(load_cache=load_cache)
self._entropy_extension = self._storage.data.get(self._BIP39_EXTENSION_KEY)
@classmethod
def initialize(cls, storage, network, max_mixdepth=2, timestamp=None,
entropy=None, entropy_extension=None, write=True, **kwargs):
super(BIP39WalletMixin, cls).initialize(
storage, network, max_mixdepth, timestamp, entropy,
write=False, **kwargs)
if entropy_extension:
# Note: future reads from storage will retrieve this data
# as binary, so set it as binary on initialization for consistency.
# Note that this is in contrast to the mnemonic wordlist, which is
# handled by the mnemonic package, which returns the words as a string.
storage.data[cls._BIP39_EXTENSION_KEY] = entropy_extension.encode("utf-8")
if write:
storage.save()
def _create_master_key(self):
ent, ext = self.get_mnemonic_words()
m = Mnemonic(self.MNEMONIC_LANG)
return m.to_seed(ent, ext or b'')
@classmethod
def _verify_entropy(cls, ent):
# every 4-bytestream is a valid entropy for BIP-39
return ent and len(ent) % 4 == 0
def get_mnemonic_words(self):
entropy = super()._create_master_key()
m = Mnemonic(self.MNEMONIC_LANG)
return m.to_mnemonic(entropy), self._entropy_extension
@classmethod
def entropy_from_mnemonic(cls, seed):
m = Mnemonic(cls.MNEMONIC_LANG)
seed = seed.lower()
# ensure only single whitespace characters:
seed = " ".join(seed.split())
if not m.check(seed):
raise WalletError("Invalid mnemonic seed.")
ent = m.to_entropy(seed)
if not cls._verify_entropy(ent):
raise WalletError("Seed entropy is too low.")
return bytes(ent)
class BIP32Wallet(BaseWallet):
_STORAGE_ENTROPY_KEY = b'entropy'
_STORAGE_INDEX_CACHE = b'index_cache'
BIP32_MAX_PATH_LEVEL = 2**31
BIP32_EXT_ID = BaseWallet.ADDRESS_TYPE_EXTERNAL
BIP32_INT_ID = BaseWallet.ADDRESS_TYPE_INTERNAL
ENTROPY_BYTES = 16
def __init__(self, storage, **kwargs):
self._entropy = None
self._key_ident = None
# {mixdepth: {type: index}} with type being 0/1 corresponding
# to external/internal addresses
self._index_cache = None
# path is a tuple of BIP32 levels,
# m is the master key's fingerprint
# other levels are ints
super().__init__(storage, **kwargs)
assert self._index_cache is not None
assert self._verify_entropy(self._entropy)
_master_entropy = self._create_master_key()
assert _master_entropy
assert isinstance(_master_entropy, bytes)
self._master_key = self._derive_bip32_master_key(_master_entropy)
# used to verify paths for sanity checking and for wallet id creation
self._key_ident = b'' # otherwise get_bip32_* won't work
self._key_ident = self._get_key_ident()
self._populate_maps(self.yield_known_bip32_paths())
self.disable_new_scripts = False
@classmethod
def initialize(cls, storage, network, max_mixdepth=2, timestamp=None,
entropy=None, write=True):
"""
args:
entropy: ENTROPY_BYTES bytes or None to have wallet generate some
"""
if entropy and not cls._verify_entropy(entropy):
raise WalletError("Invalid entropy.")
super(BIP32Wallet, cls).initialize(storage, network, max_mixdepth,
timestamp, write=False)
if not entropy:
entropy = get_random_bytes(cls.ENTROPY_BYTES, True)
storage.data[cls._STORAGE_ENTROPY_KEY] = entropy
storage.data[cls._STORAGE_INDEX_CACHE] = {
_int_to_bytestr(i): {} for i in range(max_mixdepth + 1)}
if write:
storage.save()
def _load_storage(self, load_cache: bool = True) -> None:
super()._load_storage(load_cache=load_cache)
self._entropy = self._storage.data[self._STORAGE_ENTROPY_KEY]
self._index_cache = collections.defaultdict(
lambda: collections.defaultdict(int))
for md, data in self._storage.data[self._STORAGE_INDEX_CACHE].items():
md = int(md)
md_map = self._index_cache[md]
for t, k in data.items():
md_map[int(t)] = k
self.max_mixdepth = max(0, 0, *self._index_cache.keys())
def _get_key_ident(self):
return sha256(sha256(
self.get_bip32_priv_export(0, self.BIP32_EXT_ID).encode('ascii')).digest())\
.digest()[:3]
def yield_known_paths(self):
return chain(super().yield_known_paths(), self.yield_known_bip32_paths())
def yield_known_bip32_paths(self):
for md in self._index_cache:
for address_type in (self.BIP32_EXT_ID, self.BIP32_INT_ID):
for i in range(self._index_cache[md][address_type]):
yield self.get_path(md, address_type, i)
def save(self):
for md, data in self._index_cache.items():
str_data = {}
str_md = _int_to_bytestr(md)
for t, k in data.items():
str_data[_int_to_bytestr(t)] = k
self._storage.data[self._STORAGE_INDEX_CACHE][str_md] = str_data
super().save()
def _create_master_key(self):
"""
for base/legacy wallet type, this is a passthrough.
for bip39 style wallets, this will convert from one to the other
"""
return self._entropy
@classmethod
def _verify_entropy(cls, ent):
# This is not very useful but true for BIP32. Subclasses may have
# stricter requirements.
return bool(ent)
@classmethod
def _derive_bip32_master_key(cls, seed):
return cls._ENGINE.derive_bip32_master_key(seed)
@classmethod
def _get_supported_address_types(cls):
return (cls.BIP32_EXT_ID, cls.BIP32_INT_ID)
def _check_path(self, path):
md, address_type, index = self.get_details(path)
if not 0 <= md <= self.max_mixdepth:
raise WalletMixdepthOutOfRange()
assert address_type in self._get_supported_address_types()
current_index = self._index_cache[md][address_type]
if index == current_index \
and address_type != FidelityBondMixin.BIP32_TIMELOCK_ID:
#special case for timelocked addresses because for them the
#concept of a "next address" cant be used
self._set_index_cache(md, address_type, current_index + 1)
self._populate_maps((path,))
def get_script_from_path(self, path,
validate_cache: bool = False):
if self._is_my_bip32_path(path):
self._check_path(path)
return super().get_script_from_path(path,
validate_cache=validate_cache)
def get_address_from_path(self, path,
validate_cache: bool = False):
if self._is_my_bip32_path(path):
self._check_path(path)
return super().get_address_from_path(path,
validate_cache=validate_cache)
def get_path(self, mixdepth=None, address_type=None, index=None):
if mixdepth is not None:
assert isinstance(mixdepth, Integral)
if not 0 <= mixdepth <= self.max_mixdepth:
raise WalletMixdepthOutOfRange()
if address_type is not None:
if mixdepth is None:
raise Exception("mixdepth must be set if address_type is set")
if index is not None:
assert isinstance(index, Integral)
if address_type is None:
raise Exception("address_type must be set if index is set")
assert index < self.BIP32_MAX_PATH_LEVEL
return tuple(chain(self._get_bip32_export_path(mixdepth, address_type),
(index,)))
return tuple(self._get_bip32_export_path(mixdepth, address_type))
def get_path_repr(self, path):
path = list(path)
assert self._is_my_bip32_path(path)
path.pop(0)
return 'm' + '/' + '/'.join(map(self._path_level_to_repr, path))
@classmethod
def _harden_path_level(cls, lvl):
assert isinstance(lvl, Integral)
if not 0 <= lvl < cls.BIP32_MAX_PATH_LEVEL:
raise WalletError("Unable to derive hardened path level from {}."
"".format(lvl))
return lvl + cls.BIP32_MAX_PATH_LEVEL
@classmethod
def _path_level_to_repr(cls, lvl):
assert isinstance(lvl, Integral)
if not 0 <= lvl < cls.BIP32_MAX_PATH_LEVEL * 2:
raise WalletError("Invalid path level {}.".format(lvl))
if lvl < cls.BIP32_MAX_PATH_LEVEL:
return str(lvl)
return str(lvl - cls.BIP32_MAX_PATH_LEVEL) + "'"
def path_repr_to_path(self, pathstr):
spath = pathstr.split('/')
assert len(spath) > 0
if spath[0] != 'm':
raise WalletError("Not a valid wallet path: {}".format(pathstr))
def conv_level(lvl):
if lvl[-1] == "'":
return self._harden_path_level(int(lvl[:-1]))
return int(lvl)
return tuple(chain((self._key_ident,), map(conv_level, spath[1:])))
def _get_mixdepth_from_path(self, path):
if not self._is_my_bip32_path(path):
raise WalletInvalidPath(path)
return path[len(self._get_bip32_base_path())]
def _get_key_from_path(self, path,
validate_cache: bool = False):
if not self._is_my_bip32_path(path):
raise WalletInvalidPath(path)
cache = self._get_cache_for_path(path)
privkey = cache.get(b'p')
if privkey is None or validate_cache:
new_privkey = self._ENGINE.derive_bip32_privkey(self._master_key, path)
if privkey is None:
cache[b'p'] = privkey = new_privkey
elif privkey != new_privkey:
raise WalletCacheValidationFailed()
return privkey, self._ENGINE
def _get_keypair_from_path(self, path,
validate_cache: bool = False):
if not self._is_my_bip32_path(path):
return super()._get_keypair_from_path(path,
validate_cache=validate_cache)
cache = self._get_cache_for_path(path)
privkey = cache.get(b'p')
if privkey is None or validate_cache:
new_privkey = self._ENGINE.derive_bip32_privkey(self._master_key, path)
if privkey is None:
cache[b'p'] = privkey = new_privkey
elif privkey != new_privkey:
raise WalletCacheValidationFailed()
pubkey = cache.get(b'P')
if pubkey is None or validate_cache:
new_pubkey = self._ENGINE.privkey_to_pubkey(privkey)
if pubkey is None:
cache[b'P'] = pubkey = new_pubkey
elif pubkey != new_pubkey:
raise WalletCacheValidationFailed()
return privkey, pubkey, self._ENGINE
def _get_cache_keys_for_path(self, path):
if not self._is_my_bip32_path(path):
return super()._get_cache_keys_for_path(path)
return path[:1] + tuple([self._path_level_to_repr(lvl).encode('ascii')
for lvl in path[1:]])
def _is_my_bip32_path(self, path):
return len(path) > 0 and path[0] == self._key_ident
def is_standard_wallet_script(self, path):
return self._is_my_bip32_path(path)
def get_new_script(self, mixdepth, address_type,
validate_cache: bool = True):
if self.disable_new_scripts:
raise RuntimeError("Obtaining new wallet addresses "
+ "disabled, due to nohistory mode")
index = self._index_cache[mixdepth][address_type]
return self.get_script(mixdepth, address_type, index,
validate_cache=validate_cache)
def _set_index_cache(self, mixdepth, address_type, index):
""" Ensures that any update to index_cache dict only applies
to valid address types.
"""
assert address_type in self._get_supported_address_types()
self._index_cache[mixdepth][address_type] = index
@deprecated
def get_key(self, mixdepth, address_type, index):
path = self.get_path(mixdepth, address_type, index)
priv = self._ENGINE.derive_bip32_privkey(self._master_key, path)
return hexlify(priv).decode('ascii')
def get_bip32_priv_export(self, mixdepth=None, address_type=None):
path = self._get_bip32_export_path(mixdepth, address_type)
return self._ENGINE.derive_bip32_priv_export(self._master_key, path)
def get_bip32_pub_export(self, mixdepth=None, address_type=None):
path = self._get_bip32_export_path(mixdepth, address_type)
return self._ENGINE.derive_bip32_pub_export(self._master_key, path)
def _get_bip32_export_path(self, mixdepth=None, address_type=None):
if mixdepth is None:
assert address_type is None
path = tuple()
else:
assert 0 <= mixdepth <= self.max_mixdepth
if address_type is None:
path = (self._get_bip32_mixdepth_path_level(mixdepth),)
else:
path = (self._get_bip32_mixdepth_path_level(mixdepth), address_type)
return tuple(chain(self._get_bip32_base_path(), path))
def _get_bip32_base_path(self):
return self._key_ident,
@classmethod
def _get_bip32_mixdepth_path_level(cls, mixdepth):
return mixdepth
def get_next_unused_index(self, mixdepth, address_type):
assert 0 <= mixdepth <= self.max_mixdepth
if self._index_cache[mixdepth][address_type] >= self.BIP32_MAX_PATH_LEVEL:
# FIXME: theoretically this should work for up to
# self.BIP32_MAX_PATH_LEVEL * 2, no?
raise WalletError("All addresses used up, cannot generate new ones.")
return self._index_cache[mixdepth][address_type]
def get_mnemonic_words(self):
return ' '.join(mn_encode(hexlify(self._entropy).decode('ascii'))), None
@classmethod
def entropy_from_mnemonic(cls, seed):
words = seed.split()
if len(words) != 12:
raise WalletError("Seed phrase must consist of exactly 12 words.")
return unhexlify(mn_decode(words))
def get_wallet_id(self):
return hexlify(self._key_ident).decode('ascii')
def set_next_index(self, mixdepth, address_type, index, force=False):
if not (force or index <= self._index_cache[mixdepth][address_type]):
raise Exception("cannot advance index without force=True")
self._set_index_cache(mixdepth, address_type, index)
def get_details(self, path):
if not self._is_my_bip32_path(path):
raise Exception("path does not belong to wallet")
return self._get_mixdepth_from_path(path), path[-2], path[-1]
class LegacyWallet(ImportWalletMixin, PSBTWalletMixin, BIP32Wallet):
TYPE = TYPE_P2PKH
_ENGINE = ENGINES[TYPE_P2PKH]
def _create_master_key(self):
return hexlify(self._entropy)
def _get_bip32_base_path(self):
return self._key_ident, 0
class BIP32PurposedWallet(BIP32Wallet):
""" A class to encapsulate cases like
BIP44, 49 and 84, all of which are derivatives
of BIP32, and use specific purpose
fields to flag different wallet types.
"""
def _get_bip32_base_path(self):
return self._key_ident, self._PURPOSE,\
self._ENGINE.BIP44_COIN_TYPE
@classmethod
def _get_bip32_mixdepth_path_level(cls, mixdepth):
assert 0 <= mixdepth < 2**31
return cls._harden_path_level(mixdepth)
def _get_mixdepth_from_path(self, path):
if not self._is_my_bip32_path(path):
raise WalletInvalidPath(path)
return path[len(self._get_bip32_base_path())] - 2**31
class FidelityBondMixin(object):
BIP32_TIMELOCK_ID = 2
BIP32_BURN_ID = 3
"""
Explaination of time number
incrementing time numbers (0, 1, 2, 3, 4...
will produce datetimes
suitable for timelocking (1st january, 1st april, 1st july ....
this greatly reduces the number of possible timelock values
and is helpful for recovery of funds because the wallet can search
the only addresses corresponding to timenumbers which are far fewer
For example, if TIMENUMBER_UNIT = 2 (i.e. every time number is two months)
then there are 6 timelocks per year so just 600 possible
addresses per century. Easily searchable when recovering a
wallet from seed phrase. Therefore the user doesn't need to store any
dates, the seed phrase is sufficent for recovery.
"""
#should be a factor of 12, the number of months in a year
TIMENUMBER_UNIT = 1
# all timelocks are 1st of the month at midnight
TIMELOCK_DAY_AND_SHORTER = (1, 0, 0, 0, 0)
TIMELOCK_EPOCH_YEAR = 2020
TIMELOCK_EPOCH_MONTH = 1 #january
MONTHS_IN_YEAR = 12
TIMELOCK_ERA_YEARS = 80
TIMENUMBER_COUNT = TIMELOCK_ERA_YEARS * MONTHS_IN_YEAR // TIMENUMBER_UNIT
_TIMELOCK_ENGINE = ENGINES[TYPE_TIMELOCK_P2WSH]
#only one mixdepth will have fidelity bonds in it
FIDELITY_BOND_MIXDEPTH = 0
MERKLE_BRANCH_UNAVAILABLE = b"mbu"
_BURNER_OUTPUT_STORAGE_KEY = b"burner-out"
_BIP32_PUBKEY_PREFIX = "fbonds-mpk-"
def __init__(self, storage, **kwargs):
super().__init__(storage, **kwargs)
self._populate_maps(self.yield_fidelity_bond_paths())
@classmethod
def _time_number_to_timestamp(cls, timenumber):
"""
converts a time number to a unix timestamp
"""
if not 0 <= timenumber < cls.TIMENUMBER_COUNT:
raise ValueError()
year = cls.TIMELOCK_EPOCH_YEAR + (timenumber*cls.TIMENUMBER_UNIT) // cls.MONTHS_IN_YEAR
month = cls.TIMELOCK_EPOCH_MONTH + (timenumber*cls.TIMENUMBER_UNIT) % cls.MONTHS_IN_YEAR
return timegm(datetime(year, month, *cls.TIMELOCK_DAY_AND_SHORTER).timetuple())
@classmethod
def datetime_to_time_number(cls, dt):
"""
converts a datetime object to a time number
"""
if (dt.month - cls.TIMELOCK_EPOCH_MONTH) % cls.TIMENUMBER_UNIT != 0:
raise ValueError()
day_and_shorter_tuple = (dt.day, dt.hour, dt.minute, dt.second, dt.microsecond)
if day_and_shorter_tuple != cls.TIMELOCK_DAY_AND_SHORTER:
raise ValueError()
timenumber = (dt.year - cls.TIMELOCK_EPOCH_YEAR)*(cls.MONTHS_IN_YEAR //
cls.TIMENUMBER_UNIT) + ((dt.month - cls.TIMELOCK_EPOCH_MONTH) // cls.TIMENUMBER_UNIT)
if timenumber < 0 or timenumber > cls.TIMENUMBER_COUNT:
raise ValueError("datetime out of range")
return timenumber
@classmethod
def timestamp_to_time_number(cls, timestamp):
"""
converts a unix timestamp to a time number
"""
#workaround for the year 2038 problem on 32 bit systems
#see https://stackoverflow.com/questions/10588027/converting-timestamps-larger-than-maxint-into-datetime-objects
dt = datetime.utcfromtimestamp(0) + timedelta(seconds=timestamp)
return cls.datetime_to_time_number(dt)
@classmethod
def is_timelocked_path(cls, path):
return len(path) > 4 and path[4] == cls.BIP32_TIMELOCK_ID
def _get_key_ident(self):
first_path = self.get_path(0, BIP32Wallet.BIP32_EXT_ID)
pub = self._get_pubkey_from_path(first_path)[0]
return sha256(sha256(pub).digest()).digest()[:3]
def is_standard_wallet_script(self, path):
if self.is_timelocked_path(path):
return False
return super().is_standard_wallet_script(path)
@classmethod
def get_xpub_from_fidelity_bond_master_pub_key(cls, mpk):
if mpk.startswith(cls._BIP32_PUBKEY_PREFIX):
return mpk[len(cls._BIP32_PUBKEY_PREFIX):]
else:
return False
def yield_known_paths(self):
return chain(super().yield_known_paths(), self.yield_fidelity_bond_paths())
def yield_fidelity_bond_paths(self):
md = self.FIDELITY_BOND_MIXDEPTH
address_type = self.BIP32_TIMELOCK_ID
for timenumber in range(self.TIMENUMBER_COUNT):
yield self.get_path(md, address_type, timenumber)
def add_utxo(self, txid, index, script, value, height=None):
super().add_utxo(txid, index, script, value, height)
#dont use coin control freeze if wallet readonly
if self._storage.read_only:
return
path = self.script_to_path(script)
if not self.is_timelocked_path(path):
return
if (datetime.utcfromtimestamp(0) + timedelta(seconds=path[-1])) > datetime.now():
#freeze utxo if its timelock is in the future
self.disable_utxo(txid, index, disable=True)
def get_bip32_pub_export(self, mixdepth=None, address_type=None):
bip32_pub = super().get_bip32_pub_export(mixdepth, address_type)
if address_type == None and mixdepth == self.FIDELITY_BOND_MIXDEPTH:
bip32_pub = self._BIP32_PUBKEY_PREFIX + bip32_pub
return bip32_pub
@classmethod
def _get_supported_address_types(cls):
return (cls.BIP32_EXT_ID, cls.BIP32_INT_ID, cls.BIP32_TIMELOCK_ID, cls.BIP32_BURN_ID)
def _get_key_from_path(self, path,
validate_cache: bool = False):
if self.is_timelocked_path(path):
key_path = path[:-1]
locktime = path[-1]
engine = self._TIMELOCK_ENGINE
cache = super()._get_cache_for_path(key_path)
privkey = cache.get(b'p')
if privkey is None or validate_cache:
new_privkey = engine.derive_bip32_privkey(self._master_key, key_path)
if privkey is None:
cache[b'p'] = privkey = new_privkey
elif privkey != new_privkey:
raise WalletCacheValidationFailed()
return (privkey, locktime), engine
else:
return super()._get_key_from_path(path)
def _get_keypair_from_path(self, path,
validate_cache: bool = False):
if not self.is_timelocked_path(path):
return super()._get_keypair_from_path(path,
validate_cache=validate_cache)
key_path = path[:-1]
locktime = path[-1]
engine = self._TIMELOCK_ENGINE
cache = super()._get_cache_for_path(key_path)
privkey = cache.get(b'p')
if privkey is None or validate_cache:
new_privkey = engine.derive_bip32_privkey(self._master_key, key_path)
if privkey is None:
cache[b'p'] = privkey = new_privkey
elif privkey != new_privkey:
raise WalletCacheValidationFailed()
pubkey = cache.get(b'P')
if pubkey is None or validate_cache:
new_pubkey = engine.privkey_to_pubkey(privkey)
if pubkey is None:
cache[b'P'] = pubkey = new_pubkey
elif pubkey != new_pubkey:
raise WalletCacheValidationFailed()
return (privkey, locktime), (pubkey, locktime), engine
def _get_cache_for_path(self, path):
if self.is_timelocked_path(path):
path = path[:-1]
return super()._get_cache_for_path(path)
def get_path(self, mixdepth=None, address_type=None, index=None):
if address_type == None or address_type in (self.BIP32_EXT_ID, self.BIP32_INT_ID,
self.BIP32_BURN_ID) or index == None:
return super().get_path(mixdepth, address_type, index)
elif address_type == self.BIP32_TIMELOCK_ID:
# index is re-purposed as timenumber
assert index is not None
timestamp = self._time_number_to_timestamp(index)
return tuple(chain(self._get_bip32_export_path(mixdepth, address_type),
(index, timestamp)))
else:
assert 0
"""
We define a new serialization of the bip32 path to include bip65 timelock addresses
Previously it was m/44'/1'/0'/3/0
For a timelocked address it will be m/44'/1'/0'/3/0:13245432
The timelock will be in unix format and added to the end with a colon ":" character
refering to the pubkey plus the timelock value which together are needed to create the address
"""
def get_path_repr(self, path):
if self.is_timelocked_path(path) and len(path) == 7:
return super().get_path_repr(path[:-1]) + ":" + str(path[-1])
else:
return super().get_path_repr(path)
def path_repr_to_path(self, pathstr):
if pathstr.find(":") == -1:
return super().path_repr_to_path(pathstr)
else:
colon_chunks = pathstr.split(":")
if len(colon_chunks) != 2:
raise WalletError("Not a valid wallet timelock path: {}".format(pathstr))
return tuple(chain(
super().path_repr_to_path(colon_chunks[0]), (int(colon_chunks[1]),)))
def get_details(self, path):
if self.is_timelocked_path(path):
return self._get_mixdepth_from_path(path), path[-3], path[-2]
else:
return super().get_details(path)
def _get_default_used_indices(self):
return {x: [0, 0, 0, 0] for x in range(self.max_mixdepth + 1)}
def add_burner_output(self, path, txhex, block_height, merkle_branch,
block_index, write=True):
"""
merkle_branch = None means it was unavailable because of pruning
"""
if self._BURNER_OUTPUT_STORAGE_KEY not in self._storage.data:
self._storage.data[self._BURNER_OUTPUT_STORAGE_KEY] = {}
path = path.encode()
txhex = unhexlify(txhex)
if not merkle_branch:
merkle_branch = self.MERKLE_BRANCH_UNAVAILABLE
self._storage.data[self._BURNER_OUTPUT_STORAGE_KEY][path] = [txhex,
block_height, merkle_branch, block_index]
if write:
self._storage.save()
def get_burner_outputs(self):
"""
Result is a dict {path: [txhex, blockheight, merkleproof, blockindex]}
"""
return self._storage.data.get(self._BURNER_OUTPUT_STORAGE_KEY, {})
def set_burner_output_merkle_branch(self, path, merkle_branch):
path = path.encode()
self._storage.data[self._BURNER_OUTPUT_STORAGE_KEY][path][2] = \
merkle_branch
@classmethod
def calculate_timelocked_fidelity_bond_value(cls, utxo_value, confirmation_time, locktime,
current_time, interest_rate):
"""
utxo_value is in satoshi
interest rate is per year
all times are seconds
"""
YEAR = 60 * 60 * 24 * 365.2425 #gregorian calender year length
r = interest_rate
T = (locktime - confirmation_time) / YEAR
L = locktime / YEAR
t = current_time / YEAR
a = max(0, min(1, exp(r*T) - 1) - min(1, exp(r*max(0, t-L)) - 1))
exponent = float(jm_single().config.get("POLICY", "bond_value_exponent"))
return pow(utxo_value*a, exponent)
@classmethod
def get_validated_timelocked_fidelity_bond_utxo(cls, utxo, utxo_pubkey, locktime,
cert_expiry, current_block_height):
utxo_data = jm_single().bc_interface.query_utxo_set(utxo, includeconfs=True)
if utxo_data[0] == None:
return None
if utxo_data[0]["confirms"] <= 0:
return None
RETARGET_INTERVAL = 2016
if current_block_height > cert_expiry*RETARGET_INTERVAL:
return None
implied_spk = btc.redeem_script_to_p2wsh_script(btc.mk_freeze_script(utxo_pubkey, locktime))
if utxo_data[0]["script"] != implied_spk:
return None
return utxo_data[0]
class BIP49Wallet(BIP32PurposedWallet):
_PURPOSE = 2**31 + 49
_ENGINE = ENGINES[TYPE_P2SH_P2WPKH]
class BIP84Wallet(BIP32PurposedWallet):
_PURPOSE = 2**31 + 84
_ENGINE = ENGINES[TYPE_P2WPKH]
class SegwitLegacyWallet(ImportWalletMixin, BIP39WalletMixin, PSBTWalletMixin, SNICKERWalletMixin, BIP49Wallet):
TYPE = TYPE_P2SH_P2WPKH
class SegwitWallet(ImportWalletMixin, BIP39WalletMixin, PSBTWalletMixin, SNICKERWalletMixin, BIP84Wallet):
TYPE = TYPE_P2WPKH
class SegwitWalletFidelityBonds(FidelityBondMixin, SegwitWallet):
TYPE = TYPE_SEGWIT_WALLET_FIDELITY_BONDS
class FidelityBondWatchonlyWallet(FidelityBondMixin, BIP84Wallet):
TYPE = TYPE_WATCHONLY_FIDELITY_BONDS
_ENGINE = ENGINES[TYPE_WATCHONLY_P2WPKH]
_TIMELOCK_ENGINE = ENGINES[TYPE_WATCHONLY_TIMELOCK_P2WSH]
@classmethod
def _verify_entropy(cls, ent):
return ent[1:4] == b"pub"
@classmethod
def _derive_bip32_master_key(cls, master_entropy):
return btc.bip32_deserialize(master_entropy.decode())
def _get_bip32_export_path(self, mixdepth=None, address_type=None):
path = super()._get_bip32_export_path(mixdepth, address_type)
return path
def _get_key_from_path(self, path,
validate_cache: bool = False):
raise WalletCannotGetPrivateKeyFromWatchOnly()
def _get_keypair_from_path(self, path,
validate_cache: bool = False):
raise WalletCannotGetPrivateKeyFromWatchOnly()
def _get_pubkey_from_path(self, path,
validate_cache: bool = False):
if not self._is_my_bip32_path(path):
return super()._get_pubkey_from_path(path,
validate_cache=validate_cache)
if self.is_timelocked_path(path):
key_path = path[:-1]
locktime = path[-1]
cache = self._get_cache_for_path(key_path)
pubkey = cache.get(b'P')
if pubkey is None or validate_cache:
new_pubkey = self._TIMELOCK_ENGINE.derive_bip32_privkey(
self._master_key, key_path)
if pubkey is None:
cache[b'P'] = pubkey = new_pubkey
elif pubkey != new_pubkey:
raise WalletCacheValidationFailed()
return (pubkey, locktime), self._TIMELOCK_ENGINE
cache = self._get_cache_for_path(path)
pubkey = cache.get(b'P')
if pubkey is None or validate_cache:
new_pubkey = self._ENGINE.derive_bip32_privkey(
self._master_key, path)
if pubkey is None:
cache[b'P'] = pubkey = new_pubkey
elif pubkey != new_pubkey:
raise WalletCacheValidationFailed()
return pubkey, self._ENGINE
WALLET_IMPLEMENTATIONS = {
LegacyWallet.TYPE: LegacyWallet,
SegwitLegacyWallet.TYPE: SegwitLegacyWallet,
SegwitWallet.TYPE: SegwitWallet,
SegwitWalletFidelityBonds.TYPE: SegwitWalletFidelityBonds,
FidelityBondWatchonlyWallet.TYPE: FidelityBondWatchonlyWallet
}