|
|
|
|
@ -13,6 +13,9 @@
|
|
|
|
|
# TODO impl ADDR descriptors |
|
|
|
|
# TODO impl RAW descriptors |
|
|
|
|
# TODO disable descs we cannot solve: TRDescriptor |
|
|
|
|
# TODO add checks to validate nestings |
|
|
|
|
# https://github.com/bitcoin/bitcoin/blob/94070029fb6b783833973f9fe08a3a871994492f/doc/descriptors.md#reference |
|
|
|
|
# e.g. sh is top-level only, wsh is top-level or directly inside sh |
|
|
|
|
# |
|
|
|
|
# TODO tests |
|
|
|
|
# - port https://github.com/bitcoin-core/HWI/blob/master/test/test_descriptor.py |
|
|
|
|
@ -24,8 +27,9 @@
|
|
|
|
|
|
|
|
|
|
from .bip32 import convert_bip32_path_to_list_of_uint32, BIP32Node, KeyOriginInfo |
|
|
|
|
from . import bitcoin |
|
|
|
|
from .bitcoin import construct_script, opcodes |
|
|
|
|
from .bitcoin import construct_script, opcodes, construct_witness |
|
|
|
|
from .crypto import hash_160, sha256 |
|
|
|
|
from .util import bfh |
|
|
|
|
|
|
|
|
|
from binascii import unhexlify |
|
|
|
|
from enum import Enum |
|
|
|
|
@ -35,6 +39,7 @@ from typing import (
|
|
|
|
|
Optional, |
|
|
|
|
Tuple, |
|
|
|
|
Sequence, |
|
|
|
|
Mapping, |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -71,6 +76,18 @@ class ExpandedScripts:
|
|
|
|
|
return bitcoin.script_to_address(spk.hex(), net=net) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ScriptSolutionInner(NamedTuple): |
|
|
|
|
witness_items: Optional[Sequence] = None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ScriptSolutionTop(NamedTuple): |
|
|
|
|
witness: Optional[bytes] = None |
|
|
|
|
script_sig: Optional[bytes] = None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class MissingSolutionPiece(Exception): pass |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def PolyMod(c: int, val: int) -> int: |
|
|
|
|
""" |
|
|
|
|
:meta private: |
|
|
|
|
@ -316,6 +333,38 @@ class Descriptor(object):
|
|
|
|
|
""" |
|
|
|
|
raise NotImplementedError("The Descriptor base class does not implement this method") |
|
|
|
|
|
|
|
|
|
def _satisfy_inner( |
|
|
|
|
self, |
|
|
|
|
*, |
|
|
|
|
sigdata: Mapping[bytes, bytes] = None, # pubkey -> sig |
|
|
|
|
allow_dummy: bool = False, |
|
|
|
|
) -> ScriptSolutionInner: |
|
|
|
|
raise NotImplementedError("The Descriptor base class does not implement this method") |
|
|
|
|
|
|
|
|
|
def satisfy( |
|
|
|
|
self, |
|
|
|
|
*, |
|
|
|
|
sigdata: Mapping[bytes, bytes] = None, # pubkey -> sig |
|
|
|
|
allow_dummy: bool = False, |
|
|
|
|
) -> ScriptSolutionTop: |
|
|
|
|
"""Construct a witness and/or scriptSig to be used in a txin, to satisfy the bitcoin SCRIPT. |
|
|
|
|
|
|
|
|
|
Raises MissingSolutionPiece if satisfaction is not yet possible due to e.g. missing a signature, |
|
|
|
|
unless `allow_dummy` is set to True, in which case dummy data is used where needed (e.g. for size estimation). |
|
|
|
|
""" |
|
|
|
|
assert not self.is_range() |
|
|
|
|
sol = self._satisfy_inner(sigdata=sigdata, allow_dummy=allow_dummy) |
|
|
|
|
witness = None |
|
|
|
|
script_sig = None |
|
|
|
|
if self.is_segwit(): |
|
|
|
|
witness = bfh(construct_witness(sol.witness_items)) |
|
|
|
|
else: |
|
|
|
|
script_sig = bfh(construct_script(sol.witness_items)) |
|
|
|
|
return ScriptSolutionTop( |
|
|
|
|
witness=witness, |
|
|
|
|
script_sig=script_sig, |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
def is_range(self) -> bool: |
|
|
|
|
for pubkey in self.pubkeys: |
|
|
|
|
if pubkey.is_range(): |
|
|
|
|
@ -347,6 +396,20 @@ class PKDescriptor(Descriptor):
|
|
|
|
|
script = construct_script([pubkey, opcodes.OP_CHECKSIG]) |
|
|
|
|
return ExpandedScripts(output_script=bytes.fromhex(script)) |
|
|
|
|
|
|
|
|
|
def _satisfy_inner(self, *, sigdata=None, allow_dummy=False) -> ScriptSolutionInner: |
|
|
|
|
if sigdata is None: sigdata = {} |
|
|
|
|
assert not self.is_range() |
|
|
|
|
assert not self.subdescriptors |
|
|
|
|
pubkey = self.pubkeys[0].get_pubkey_bytes() |
|
|
|
|
sig = sigdata.get(pubkey) |
|
|
|
|
if sig is None and allow_dummy: |
|
|
|
|
sig = 72 * b"\x00" |
|
|
|
|
if sig is None: |
|
|
|
|
raise MissingSolutionPiece(f"no sig for {pubkey.hex()}") |
|
|
|
|
return ScriptSolutionInner( |
|
|
|
|
witness_items=(sig,), |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class PKHDescriptor(Descriptor): |
|
|
|
|
""" |
|
|
|
|
@ -367,6 +430,20 @@ class PKHDescriptor(Descriptor):
|
|
|
|
|
script = bitcoin.pubkeyhash_to_p2pkh_script(pkh) |
|
|
|
|
return ExpandedScripts(output_script=bytes.fromhex(script)) |
|
|
|
|
|
|
|
|
|
def _satisfy_inner(self, *, sigdata=None, allow_dummy=False) -> ScriptSolutionInner: |
|
|
|
|
if sigdata is None: sigdata = {} |
|
|
|
|
assert not self.is_range() |
|
|
|
|
assert not self.subdescriptors |
|
|
|
|
pubkey = self.pubkeys[0].get_pubkey_bytes() |
|
|
|
|
sig = sigdata.get(pubkey) |
|
|
|
|
if sig is None and allow_dummy: |
|
|
|
|
sig = 72 * b"\x00" |
|
|
|
|
if sig is None: |
|
|
|
|
raise MissingSolutionPiece(f"no sig for {pubkey.hex()}") |
|
|
|
|
return ScriptSolutionInner( |
|
|
|
|
witness_items=(sig, pubkey), |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class WPKHDescriptor(Descriptor): |
|
|
|
|
""" |
|
|
|
|
@ -390,6 +467,20 @@ class WPKHDescriptor(Descriptor):
|
|
|
|
|
scriptcode_for_sighash=bytes.fromhex(scriptcode), |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
def _satisfy_inner(self, *, sigdata=None, allow_dummy=False) -> ScriptSolutionInner: |
|
|
|
|
if sigdata is None: sigdata = {} |
|
|
|
|
assert not self.is_range() |
|
|
|
|
assert not self.subdescriptors |
|
|
|
|
pubkey = self.pubkeys[0].get_pubkey_bytes() |
|
|
|
|
sig = sigdata.get(pubkey) |
|
|
|
|
if sig is None and allow_dummy: |
|
|
|
|
sig = 72 * b"\x00" |
|
|
|
|
if sig is None: |
|
|
|
|
raise MissingSolutionPiece(f"no sig for {pubkey.hex()}") |
|
|
|
|
return ScriptSolutionInner( |
|
|
|
|
witness_items=(sig, pubkey), |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
def is_segwit(self) -> bool: |
|
|
|
|
return True |
|
|
|
|
|
|
|
|
|
@ -410,6 +501,8 @@ class MultisigDescriptor(Descriptor):
|
|
|
|
|
:param is_sorted: Whether this is a ``sortedmulti()`` descriptor |
|
|
|
|
""" |
|
|
|
|
super().__init__(pubkeys, [], "sortedmulti" if is_sorted else "multi") |
|
|
|
|
if not (1 <= thresh <= len(pubkeys) <= 15): |
|
|
|
|
raise ValueError(f'{thresh=}, {len(pubkeys)=}') |
|
|
|
|
self.thresh = thresh |
|
|
|
|
self.is_sorted = is_sorted |
|
|
|
|
if self.is_sorted: |
|
|
|
|
@ -419,21 +512,35 @@ class MultisigDescriptor(Descriptor):
|
|
|
|
|
return "{}({},{})".format(self.name, self.thresh, ",".join([p.to_string() for p in self.pubkeys])) |
|
|
|
|
|
|
|
|
|
def expand(self, *, pos: Optional[int] = None) -> "ExpandedScripts": |
|
|
|
|
if self.thresh > 16: |
|
|
|
|
m = b"\x01" + self.thresh.to_bytes(1, "big") |
|
|
|
|
else: |
|
|
|
|
m = (self.thresh + 0x50).to_bytes(1, "big") if self.thresh > 0 else b"\x00" |
|
|
|
|
n = (len(self.pubkeys) + 0x50).to_bytes(1, "big") if len(self.pubkeys) > 0 else b"\x00" |
|
|
|
|
script: bytes = m |
|
|
|
|
der_pks = [p.get_pubkey_bytes(pos=pos) for p in self.pubkeys] |
|
|
|
|
if self.is_sorted: |
|
|
|
|
der_pks.sort() |
|
|
|
|
for pk in der_pks: |
|
|
|
|
script += len(pk).to_bytes(1, "big") + pk |
|
|
|
|
script += n + b"\xae" |
|
|
|
|
|
|
|
|
|
script = bfh(construct_script([self.thresh, *der_pks, len(der_pks), opcodes.OP_CHECKMULTISIG])) |
|
|
|
|
return ExpandedScripts(output_script=script) |
|
|
|
|
|
|
|
|
|
def _satisfy_inner(self, *, sigdata=None, allow_dummy=False) -> ScriptSolutionInner: |
|
|
|
|
if sigdata is None: sigdata = {} |
|
|
|
|
assert not self.is_range() |
|
|
|
|
assert not self.subdescriptors |
|
|
|
|
der_pks = [p.get_pubkey_bytes() for p in self.pubkeys] |
|
|
|
|
if self.is_sorted: |
|
|
|
|
der_pks.sort() |
|
|
|
|
signatures = [] |
|
|
|
|
for pubkey in der_pks: |
|
|
|
|
if sig := sigdata.get(pubkey): |
|
|
|
|
signatures.append(sig) |
|
|
|
|
if len(signatures) >= self.thresh: |
|
|
|
|
break |
|
|
|
|
if allow_dummy: |
|
|
|
|
dummy_sig = 72 * b"\x00" |
|
|
|
|
signatures += (self.thresh - len(signatures)) * [dummy_sig] |
|
|
|
|
if len(signatures) < self.thresh: |
|
|
|
|
raise MissingSolutionPiece(f"not enough sigs") |
|
|
|
|
assert len(signatures) == self.thresh, f"thresh={self.thresh}, but got {len(signatures)} sigs" |
|
|
|
|
return ScriptSolutionInner( |
|
|
|
|
witness_items=(0, *signatures), |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class SHDescriptor(Descriptor): |
|
|
|
|
""" |
|
|
|
|
@ -453,7 +560,7 @@ class SHDescriptor(Descriptor):
|
|
|
|
|
sub_scripts = self.subdescriptors[0].expand(pos=pos) |
|
|
|
|
redeem_script = sub_scripts.output_script |
|
|
|
|
witness_script = sub_scripts.witness_script |
|
|
|
|
script = b"\xa9\x14" + hash_160(redeem_script) + b"\x87" |
|
|
|
|
script = bfh(construct_script([opcodes.OP_HASH160, hash_160(redeem_script), opcodes.OP_EQUAL])) |
|
|
|
|
return ExpandedScripts( |
|
|
|
|
output_script=script, |
|
|
|
|
redeem_script=redeem_script, |
|
|
|
|
@ -461,6 +568,26 @@ class SHDescriptor(Descriptor):
|
|
|
|
|
scriptcode_for_sighash=sub_scripts.scriptcode_for_sighash, |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
def _satisfy_inner(self, *, sigdata=None, allow_dummy=False) -> ScriptSolutionInner: |
|
|
|
|
raise Exception("does not make sense for sh()") |
|
|
|
|
|
|
|
|
|
def satisfy(self, *, sigdata=None, allow_dummy=False) -> ScriptSolutionTop: |
|
|
|
|
assert not self.is_range() |
|
|
|
|
assert len(self.subdescriptors) == 1 |
|
|
|
|
subdesc = self.subdescriptors[0] |
|
|
|
|
redeem_script = self.expand().redeem_script |
|
|
|
|
witness = None |
|
|
|
|
if isinstance(subdesc, (WSHDescriptor, WPKHDescriptor)): # witness_v0 nested in p2sh |
|
|
|
|
witness = subdesc.satisfy(sigdata=sigdata, allow_dummy=allow_dummy).witness |
|
|
|
|
script_sig = bfh(construct_script([redeem_script])) |
|
|
|
|
else: # legacy p2sh |
|
|
|
|
subsol = subdesc._satisfy_inner(sigdata=sigdata, allow_dummy=allow_dummy) |
|
|
|
|
script_sig = bfh(construct_script([*subsol.witness_items, redeem_script])) |
|
|
|
|
return ScriptSolutionTop( |
|
|
|
|
witness=witness, |
|
|
|
|
script_sig=script_sig, |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class WSHDescriptor(Descriptor): |
|
|
|
|
""" |
|
|
|
|
@ -479,12 +606,25 @@ class WSHDescriptor(Descriptor):
|
|
|
|
|
assert len(self.subdescriptors) == 1 |
|
|
|
|
sub_scripts = self.subdescriptors[0].expand(pos=pos) |
|
|
|
|
witness_script = sub_scripts.output_script |
|
|
|
|
script = b"\x00\x20" + sha256(witness_script) |
|
|
|
|
output_script = bfh(construct_script([0, sha256(witness_script)])) |
|
|
|
|
return ExpandedScripts( |
|
|
|
|
output_script=script, |
|
|
|
|
output_script=output_script, |
|
|
|
|
witness_script=witness_script, |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
def _satisfy_inner(self, *, sigdata=None, allow_dummy=False) -> ScriptSolutionInner: |
|
|
|
|
raise Exception("does not make sense for wsh()") |
|
|
|
|
|
|
|
|
|
def satisfy(self, *, sigdata=None, allow_dummy=False) -> ScriptSolutionTop: |
|
|
|
|
assert not self.is_range() |
|
|
|
|
assert len(self.subdescriptors) == 1 |
|
|
|
|
subsol = self.subdescriptors[0]._satisfy_inner(sigdata=sigdata, allow_dummy=allow_dummy) |
|
|
|
|
witness_script = self.expand().witness_script |
|
|
|
|
witness = construct_witness([*subsol.witness_items, witness_script]) |
|
|
|
|
return ScriptSolutionTop( |
|
|
|
|
witness=bytes.fromhex(witness), |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
def is_segwit(self) -> bool: |
|
|
|
|
return True |
|
|
|
|
|
|
|
|
|
|