# Copyright (c) 2017 Andrew Chow # Copyright (c) 2023 The Electrum developers # Distributed under the MIT software license, see the accompanying # file LICENCE or http://www.opensource.org/licenses/mit-license.php # # forked from https://github.com/bitcoin-core/HWI/blob/5f300d3dee7b317a6194680ad293eaa0962a3cc7/hwilib/descriptor.py # # Output Script Descriptors # See https://github.com/bitcoin/bitcoin/blob/master/doc/descriptors.md # # TODO allow xprv # TODO hardened derivation # TODO allow WIF privkeys # TODO impl ADDR descriptors # TODO impl RAW descriptors import enum from .bip32 import convert_bip32_strpath_to_intpath, BIP32Node, KeyOriginInfo, BIP32_PRIME from . import bitcoin from .bitcoin import construct_script, opcodes, construct_witness from . import constants from .crypto import hash_160, sha256 from . import ecc from . import segwit_addr from .util import bfh from binascii import unhexlify from enum import Enum from typing import ( List, NamedTuple, Optional, Tuple, Sequence, Mapping, Set, ) MAX_TAPROOT_NODES = 128 # we guess that signatures will be 72 bytes long # note: DER-encoded ECDSA signatures are 71 or 72 bytes in practice # See https://bitcoin.stackexchange.com/questions/77191/what-is-the-maximum-size-of-a-der-encoded-ecdsa-signature # We assume low S (as that is a bitcoin standardness rule). # We do not assume low R (even though the sigs we create conform), as external sigs, # e.g. from a hw signer cannot be expected to have a low R. DUMMY_DER_SIG = 72 * b"\x00" class ExpandedScripts: def __init__( self, *, output_script: bytes, # "scriptPubKey" redeem_script: Optional[bytes] = None, witness_script: Optional[bytes] = None, scriptcode_for_sighash: Optional[bytes] = None ): self.output_script = output_script self.redeem_script = redeem_script self.witness_script = witness_script self.scriptcode_for_sighash = scriptcode_for_sighash @property def scriptcode_for_sighash(self) -> Optional[bytes]: if self._scriptcode_for_sighash: return self._scriptcode_for_sighash return self.witness_script or self.redeem_script or self.output_script @scriptcode_for_sighash.setter def scriptcode_for_sighash(self, value: Optional[bytes]): self._scriptcode_for_sighash = value def address(self, *, net=None) -> Optional[str]: return bitcoin.script_to_address(self.output_script.hex(), net=net) class ScriptSolutionInner(NamedTuple): witness_items: Optional[Sequence] = None class ScriptSolutionTop(NamedTuple): witness: Optional[bytes] = None script_sig: Optional[bytes] = None class MissingSolutionPiece(Exception): pass def PolyMod(c: int, val: int) -> int: """ :meta private: Function to compute modulo over the polynomial used for descriptor checksums From: https://github.com/bitcoin/bitcoin/blob/master/src/script/descriptor.cpp """ c0 = c >> 35 c = ((c & 0x7ffffffff) << 5) ^ val if (c0 & 1): c ^= 0xf5dee51989 if (c0 & 2): c ^= 0xa9fdca3312 if (c0 & 4): c ^= 0x1bab10e32d if (c0 & 8): c ^= 0x3706b1677a if (c0 & 16): c ^= 0x644d626ffd return c _INPUT_CHARSET = "0123456789()[],'/*abcdefgh@:$%{}IJKLMNOPQRSTUVWXYZ&+-.;<=>?!^_|~ijklmnopqrstuvwxyzABCDEFGH`#\"\\ " _INPUT_CHARSET_INV = {c: i for (i, c) in enumerate(_INPUT_CHARSET)} _CHECKSUM_CHARSET = "qpzry9x8gf2tvdw0s3jn54khce6mua7l" def DescriptorChecksum(desc: str) -> str: """ Compute the checksum for a descriptor :param desc: The descriptor string to compute a checksum for :return: A checksum """ c = 1 cls = 0 clscount = 0 for ch in desc: try: pos = _INPUT_CHARSET_INV[ch] except KeyError: return "" c = PolyMod(c, pos & 31) cls = cls * 3 + (pos >> 5) clscount += 1 if clscount == 3: c = PolyMod(c, cls) cls = 0 clscount = 0 if clscount > 0: c = PolyMod(c, cls) for j in range(0, 8): c = PolyMod(c, 0) c ^= 1 ret = [''] * 8 for j in range(0, 8): ret[j] = _CHECKSUM_CHARSET[(c >> (5 * (7 - j))) & 31] return ''.join(ret) def AddChecksum(desc: str) -> str: """ Compute and attach the checksum for a descriptor :param desc: The descriptor string to add a checksum to :return: Descriptor with checksum """ return desc + "#" + DescriptorChecksum(desc) class PubkeyProvider(object): """ A public key expression in a descriptor. Can contain the key origin info, the pubkey itself, and subsequent derivation paths for derivation from the pubkey The pubkey can be a typical pubkey or an extended pubkey. """ def __init__( self, origin: Optional['KeyOriginInfo'], pubkey: str, deriv_path: Optional[str] ) -> None: """ :param origin: The key origin if one is available :param pubkey: The public key. Either a hex string or a serialized extended pubkey :param deriv_path: Additional derivation path (suffix) if the pubkey is an extended pubkey """ self.origin = origin self.pubkey = pubkey self.deriv_path = deriv_path if deriv_path: wildcard_count = deriv_path.count("*") if wildcard_count > 1: raise ValueError("only one wildcard(*) is allowed in a descriptor") if wildcard_count == 1: if deriv_path[-1] != "*": raise ValueError("wildcard in descriptor only allowed in last position") if deriv_path[0] != "/": raise ValueError(f"deriv_path suffix must start with a '/'. got {deriv_path!r}") # Make ExtendedKey from pubkey if it isn't hex self.extkey = None try: unhexlify(self.pubkey) # Is hex, normal pubkey except Exception: # Not hex, maybe xpub (but don't allow ypub/zpub) self.extkey = BIP32Node.from_xkey(pubkey, allow_custom_headers=False) if deriv_path and self.extkey is None: raise ValueError("deriv_path suffix present for simple pubkey") @classmethod def parse(cls, s: str) -> 'PubkeyProvider': """ Deserialize a key expression from the string into a ``PubkeyProvider``. :param s: String containing the key expression :return: A new ``PubkeyProvider`` containing the details given by ``s`` """ origin = None deriv_path = None if s[0] == "[": end = s.index("]") origin = KeyOriginInfo.from_string(s[1:end]) s = s[end + 1:] pubkey = s slash_idx = s.find("/") if slash_idx != -1: pubkey = s[:slash_idx] deriv_path = s[slash_idx:] return cls(origin, pubkey, deriv_path) def to_string(self) -> str: """ Serialize the pubkey expression to a string to be used in a descriptor :return: The pubkey expression as a string """ s = "" if self.origin: s += "[{}]".format(self.origin.to_string()) s += self.pubkey if self.deriv_path: s += self.deriv_path return s def get_pubkey_bytes(self, *, pos: Optional[int] = None) -> bytes: if self.is_range() and pos is None: raise ValueError("pos must be set for ranged descriptor") # note: if not ranged, we ignore pos. if self.extkey is not None: compressed = True # bip32 implies compressed pubkeys if self.deriv_path is None: assert not self.is_range() return self.extkey.eckey.get_public_key_bytes(compressed=compressed) else: path_str = self.deriv_path[1:] if self.is_range(): assert path_str[-1] == "*" path_str = path_str[:-1] + str(pos) path = convert_bip32_strpath_to_intpath(path_str) child_key = self.extkey.subkey_at_public_derivation(path) return child_key.eckey.get_public_key_bytes(compressed=compressed) else: assert not self.is_range() return unhexlify(self.pubkey) def get_full_derivation_path(self, *, pos: Optional[int] = None) -> str: """ Returns the full derivation path at the given position, including the origin """ if self.is_range() and pos is None: raise ValueError("pos must be set for ranged descriptor") path = self.origin.get_derivation_path() if self.origin is not None else "m" path += self.deriv_path if self.deriv_path is not None else "" if path[-1] == "*": path = path[:-1] + str(pos) return path def get_full_derivation_int_list(self, *, pos: Optional[int] = None) -> List[int]: """ Returns the full derivation path as an integer list at the given position. Includes the origin and master key fingerprint as an int """ if self.is_range() and pos is None: raise ValueError("pos must be set for ranged descriptor") path: List[int] = self.origin.get_full_int_list() if self.origin is not None else [] path.extend(self.get_der_suffix_int_list(pos=pos)) return path def get_der_suffix_int_list(self, *, pos: Optional[int] = None) -> List[int]: if not self.deriv_path: return [] der_suffix = self.deriv_path assert (wc_count := der_suffix.count("*")) <= 1, wc_count der_suffix = der_suffix.replace("*", str(pos)) return convert_bip32_strpath_to_intpath(der_suffix) def __lt__(self, other: 'PubkeyProvider') -> bool: return self.pubkey < other.pubkey def is_range(self) -> bool: if not self.deriv_path: return False if self.deriv_path[-1] == "*": # TODO hardened return True return False def has_uncompressed_pubkey(self) -> bool: if self.is_range(): # bip32 implies compressed return False return b"\x04" == self.get_pubkey_bytes()[:1] class Descriptor(object): r""" An abstract class for Descriptors themselves. Descriptors can contain multiple :class:`PubkeyProvider`\ s and multiple ``Descriptor`` as subdescriptors. Note: a significant portion of input validation logic is in parse_descriptor(), maybe these checks should be moved to (or also done in) this class? For example, sh() must be top-level, or segwit mandates compressed pubkeys, or bare-multisig cannot have >3 pubkeys. """ def __init__( self, pubkeys: List['PubkeyProvider'], subdescriptors: List['Descriptor'], name: str ) -> None: r""" :param pubkeys: The :class:`PubkeyProvider`\ s that are part of this descriptor :param subdescriptor: The ``Descriptor``\ s that are part of this descriptor :param name: The name of the function for this descriptor """ self.pubkeys = pubkeys self.subdescriptors = subdescriptors self.name = name def to_string_no_checksum(self) -> str: """ Serializes the descriptor as a string without the descriptor checksum :return: The descriptor string """ return "{}({}{})".format( self.name, ",".join([p.to_string() for p in self.pubkeys]), self.subdescriptors[0].to_string_no_checksum() if len(self.subdescriptors) > 0 else "" ) def to_string(self) -> str: """ Serializes the descriptor as a string with the checksum :return: The descriptor with a checksum """ return AddChecksum(self.to_string_no_checksum()) def expand(self, *, pos: Optional[int] = None) -> "ExpandedScripts": """ Returns the scripts for a descriptor at the given `pos` for ranged descriptors. """ raise NotImplementedError("The Descriptor base class does not implement this method") def _satisfy_inner( self, *, sigdata: Mapping[bytes, bytes] = None, # pubkey -> sig allow_dummy: bool = False, ) -> ScriptSolutionInner: raise NotImplementedError("The Descriptor base class does not implement this method") def satisfy( self, *, sigdata: Mapping[bytes, bytes] = None, # pubkey -> sig allow_dummy: bool = False, ) -> ScriptSolutionTop: """Construct a witness and/or scriptSig to be used in a txin, to satisfy the bitcoin SCRIPT. Raises MissingSolutionPiece if satisfaction is not yet possible due to e.g. missing a signature, unless `allow_dummy` is set to True, in which case dummy data is used where needed (e.g. for size estimation). """ assert not self.is_range() sol = self._satisfy_inner(sigdata=sigdata, allow_dummy=allow_dummy) witness = None script_sig = None if self.is_segwit(): witness = bfh(construct_witness(sol.witness_items)) else: script_sig = bfh(construct_script(sol.witness_items)) return ScriptSolutionTop( witness=witness, script_sig=script_sig, ) def get_satisfaction_progress( self, *, sigdata: Mapping[bytes, bytes] = None, # pubkey -> sig ) -> Tuple[int, int]: """Returns (num_sigs_we_have, num_sigs_required) towards satisfying this script. Besides signatures, later this can also consider hash-preimages. """ assert not self.is_range() nhave, nreq = 0, 0 for desc in self.subdescriptors: a, b = desc.get_satisfaction_progress(sigdata=sigdata) nhave += a nreq += b return nhave, nreq def is_range(self) -> bool: for pubkey in self.pubkeys: if pubkey.is_range(): return True for desc in self.subdescriptors: if desc.is_range(): return True return False def is_segwit(self) -> bool: return any([desc.is_segwit() for desc in self.subdescriptors]) def get_all_pubkeys(self) -> Set[bytes]: """Returns set of pubkeys that appear at any level in this descriptor.""" assert not self.is_range() all_pubkeys = set([p.get_pubkey_bytes() for p in self.pubkeys]) for desc in self.subdescriptors: all_pubkeys |= desc.get_all_pubkeys() return all_pubkeys def get_simple_singlesig(self) -> Optional['Descriptor']: """Returns innermost pk/pkh/wpkh descriptor, or None if we are not a simple singlesig. note: besides pk,pkh,sh(wpkh),wpkh, overly complicated stuff such as sh(pk),wsh(sh(pkh),etc is also accepted """ if len(self.subdescriptors) == 1: return self.subdescriptors[0].get_simple_singlesig() return None def get_simple_multisig(self) -> Optional['MultisigDescriptor']: """Returns innermost multi descriptor, or None if we are not a simple multisig.""" if len(self.subdescriptors) == 1: return self.subdescriptors[0].get_simple_multisig() return None def to_legacy_electrum_script_type(self) -> str: if isinstance(self, PKDescriptor): return "p2pk" elif isinstance(self, PKHDescriptor): return "p2pkh" elif isinstance(self, WPKHDescriptor): return "p2wpkh" elif isinstance(self, SHDescriptor) and isinstance(self.subdescriptors[0], WPKHDescriptor): return "p2wpkh-p2sh" elif isinstance(self, SHDescriptor) and isinstance(self.subdescriptors[0], MultisigDescriptor): return "p2sh" elif isinstance(self, WSHDescriptor) and isinstance(self.subdescriptors[0], MultisigDescriptor): return "p2wsh" elif (isinstance(self, SHDescriptor) and isinstance(self.subdescriptors[0], WSHDescriptor) and isinstance(self.subdescriptors[0].subdescriptors[0], MultisigDescriptor)): return "p2wsh-p2sh" return "unknown" class PKDescriptor(Descriptor): """ A descriptor for ``pk()`` descriptors """ def __init__( self, pubkey: 'PubkeyProvider' ) -> None: """ :param pubkey: The :class:`PubkeyProvider` for this descriptor """ super().__init__([pubkey], [], "pk") def expand(self, *, pos: Optional[int] = None) -> "ExpandedScripts": pubkey = self.pubkeys[0].get_pubkey_bytes(pos=pos) script = construct_script([pubkey, opcodes.OP_CHECKSIG]) return ExpandedScripts(output_script=bytes.fromhex(script)) def _satisfy_inner(self, *, sigdata=None, allow_dummy=False) -> ScriptSolutionInner: if sigdata is None: sigdata = {} assert not self.is_range() assert not self.subdescriptors pubkey = self.pubkeys[0].get_pubkey_bytes() sig = sigdata.get(pubkey) if sig is None and allow_dummy: sig = DUMMY_DER_SIG if sig is None: raise MissingSolutionPiece(f"no sig for {pubkey.hex()}") return ScriptSolutionInner( witness_items=(sig,), ) def get_satisfaction_progress(self, *, sigdata=None) -> Tuple[int, int]: if sigdata is None: sigdata = {} signatures = list(sigdata.values()) return len(signatures), 1 def get_simple_singlesig(self) -> Optional['Descriptor']: return self class PKHDescriptor(Descriptor): """ A descriptor for ``pkh()`` descriptors """ def __init__( self, pubkey: 'PubkeyProvider' ) -> None: """ :param pubkey: The :class:`PubkeyProvider` for this descriptor """ super().__init__([pubkey], [], "pkh") def expand(self, *, pos: Optional[int] = None) -> "ExpandedScripts": pubkey = self.pubkeys[0].get_pubkey_bytes(pos=pos) pkh = hash_160(pubkey).hex() script = bitcoin.pubkeyhash_to_p2pkh_script(pkh) return ExpandedScripts(output_script=bytes.fromhex(script)) def _satisfy_inner(self, *, sigdata=None, allow_dummy=False) -> ScriptSolutionInner: if sigdata is None: sigdata = {} assert not self.is_range() assert not self.subdescriptors pubkey = self.pubkeys[0].get_pubkey_bytes() sig = sigdata.get(pubkey) if sig is None and allow_dummy: sig = DUMMY_DER_SIG if sig is None: raise MissingSolutionPiece(f"no sig for {pubkey.hex()}") return ScriptSolutionInner( witness_items=(sig, pubkey), ) def get_satisfaction_progress(self, *, sigdata=None) -> Tuple[int, int]: if sigdata is None: sigdata = {} signatures = list(sigdata.values()) return len(signatures), 1 def get_simple_singlesig(self) -> Optional['Descriptor']: return self class WPKHDescriptor(Descriptor): """ A descriptor for ``wpkh()`` descriptors """ def __init__( self, pubkey: 'PubkeyProvider' ) -> None: """ :param pubkey: The :class:`PubkeyProvider` for this descriptor """ super().__init__([pubkey], [], "wpkh") def expand(self, *, pos: Optional[int] = None) -> "ExpandedScripts": pkh = hash_160(self.pubkeys[0].get_pubkey_bytes(pos=pos)) output_script = construct_script([0, pkh]) scriptcode = bitcoin.pubkeyhash_to_p2pkh_script(pkh.hex()) return ExpandedScripts( output_script=bytes.fromhex(output_script), scriptcode_for_sighash=bytes.fromhex(scriptcode), ) def _satisfy_inner(self, *, sigdata=None, allow_dummy=False) -> ScriptSolutionInner: if sigdata is None: sigdata = {} assert not self.is_range() assert not self.subdescriptors pubkey = self.pubkeys[0].get_pubkey_bytes() sig = sigdata.get(pubkey) if sig is None and allow_dummy: sig = DUMMY_DER_SIG if sig is None: raise MissingSolutionPiece(f"no sig for {pubkey.hex()}") return ScriptSolutionInner( witness_items=(sig, pubkey), ) def get_satisfaction_progress(self, *, sigdata=None) -> Tuple[int, int]: if sigdata is None: sigdata = {} signatures = list(sigdata.values()) return len(signatures), 1 def is_segwit(self) -> bool: return True def get_simple_singlesig(self) -> Optional['Descriptor']: return self class MultisigDescriptor(Descriptor): """ A descriptor for ``multi()`` and ``sortedmulti()`` descriptors """ def __init__( self, pubkeys: List['PubkeyProvider'], thresh: int, is_sorted: bool ) -> None: r""" :param pubkeys: The :class:`PubkeyProvider`\ s for this descriptor :param thresh: The number of keys required to sign this multisig :param is_sorted: Whether this is a ``sortedmulti()`` descriptor """ super().__init__(pubkeys, [], "sortedmulti" if is_sorted else "multi") if not (1 <= thresh <= len(pubkeys) <= 15): raise ValueError(f'{thresh=}, {len(pubkeys)=}') self.thresh = thresh self.is_sorted = is_sorted if self.is_sorted: if not self.is_range(): # sort xpubs using the order of pubkeys der_pks = [p.get_pubkey_bytes() for p in self.pubkeys] self.pubkeys = [x[1] for x in sorted(zip(der_pks, self.pubkeys))] else: # not possible to sort according to final order in expanded scripts, # but for easier visual comparison, we do a lexicographical sort self.pubkeys.sort() def to_string_no_checksum(self) -> str: return "{}({},{})".format(self.name, self.thresh, ",".join([p.to_string() for p in self.pubkeys])) def expand(self, *, pos: Optional[int] = None) -> "ExpandedScripts": der_pks = [p.get_pubkey_bytes(pos=pos) for p in self.pubkeys] if self.is_sorted: der_pks.sort() script = bfh(construct_script([self.thresh, *der_pks, len(der_pks), opcodes.OP_CHECKMULTISIG])) return ExpandedScripts(output_script=script) def _satisfy_inner(self, *, sigdata=None, allow_dummy=False) -> ScriptSolutionInner: if sigdata is None: sigdata = {} assert not self.is_range() assert not self.subdescriptors der_pks = [p.get_pubkey_bytes() for p in self.pubkeys] if self.is_sorted: der_pks.sort() signatures = [] for pubkey in der_pks: if sig := sigdata.get(pubkey): signatures.append(sig) if len(signatures) >= self.thresh: break if allow_dummy: dummy_sig = DUMMY_DER_SIG signatures += (self.thresh - len(signatures)) * [dummy_sig] if len(signatures) < self.thresh: raise MissingSolutionPiece(f"not enough sigs") assert len(signatures) == self.thresh, f"thresh={self.thresh}, but got {len(signatures)} sigs" return ScriptSolutionInner( witness_items=(0, *signatures), ) def get_satisfaction_progress(self, *, sigdata=None) -> Tuple[int, int]: if sigdata is None: sigdata = {} signatures = list(sigdata.values()) return len(signatures), self.thresh def get_simple_multisig(self) -> Optional['MultisigDescriptor']: return self class SHDescriptor(Descriptor): """ A descriptor for ``sh()`` descriptors """ def __init__( self, subdescriptor: 'Descriptor' ) -> None: """ :param subdescriptor: The :class:`Descriptor` that is a sub-descriptor for this descriptor """ super().__init__([], [subdescriptor], "sh") def expand(self, *, pos: Optional[int] = None) -> "ExpandedScripts": assert len(self.subdescriptors) == 1 sub_scripts = self.subdescriptors[0].expand(pos=pos) redeem_script = sub_scripts.output_script witness_script = sub_scripts.witness_script script = bfh(construct_script([opcodes.OP_HASH160, hash_160(redeem_script), opcodes.OP_EQUAL])) return ExpandedScripts( output_script=script, redeem_script=redeem_script, witness_script=witness_script, scriptcode_for_sighash=sub_scripts.scriptcode_for_sighash, ) def _satisfy_inner(self, *, sigdata=None, allow_dummy=False) -> ScriptSolutionInner: raise Exception("does not make sense for sh()") def satisfy(self, *, sigdata=None, allow_dummy=False) -> ScriptSolutionTop: assert not self.is_range() assert len(self.subdescriptors) == 1 subdesc = self.subdescriptors[0] redeem_script = self.expand().redeem_script witness = None if isinstance(subdesc, (WSHDescriptor, WPKHDescriptor)): # witness_v0 nested in p2sh witness = subdesc.satisfy(sigdata=sigdata, allow_dummy=allow_dummy).witness script_sig = bfh(construct_script([redeem_script])) else: # legacy p2sh subsol = subdesc._satisfy_inner(sigdata=sigdata, allow_dummy=allow_dummy) script_sig = bfh(construct_script([*subsol.witness_items, redeem_script])) return ScriptSolutionTop( witness=witness, script_sig=script_sig, ) class WSHDescriptor(Descriptor): """ A descriptor for ``wsh()`` descriptors """ def __init__( self, subdescriptor: 'Descriptor' ) -> None: """ :param subdescriptor: The :class:`Descriptor` that is a sub-descriptor for this descriptor """ super().__init__([], [subdescriptor], "wsh") def expand(self, *, pos: Optional[int] = None) -> "ExpandedScripts": assert len(self.subdescriptors) == 1 sub_scripts = self.subdescriptors[0].expand(pos=pos) witness_script = sub_scripts.output_script output_script = bfh(construct_script([0, sha256(witness_script)])) return ExpandedScripts( output_script=output_script, witness_script=witness_script, ) def _satisfy_inner(self, *, sigdata=None, allow_dummy=False) -> ScriptSolutionInner: raise Exception("does not make sense for wsh()") def satisfy(self, *, sigdata=None, allow_dummy=False) -> ScriptSolutionTop: assert not self.is_range() assert len(self.subdescriptors) == 1 subsol = self.subdescriptors[0]._satisfy_inner(sigdata=sigdata, allow_dummy=allow_dummy) witness_script = self.expand().witness_script witness = construct_witness([*subsol.witness_items, witness_script]) return ScriptSolutionTop( witness=bytes.fromhex(witness), ) def is_segwit(self) -> bool: return True class TRDescriptor(Descriptor): """ A descriptor for ``tr()`` descriptors """ def __init__( self, internal_key: 'PubkeyProvider', subdescriptors: List['Descriptor'] = None, depths: List[int] = None, ) -> None: r""" :param internal_key: The :class:`PubkeyProvider` that is the internal key for this descriptor :param subdescriptors: The :class:`Descriptor`\ s that are the leaf scripts for this descriptor :param depths: The depths of the leaf scripts in the same order as `subdescriptors` """ if subdescriptors is None: subdescriptors = [] if depths is None: depths = [] super().__init__([internal_key], subdescriptors, "tr") self.depths = depths def to_string_no_checksum(self) -> str: r = f"{self.name}({self.pubkeys[0].to_string()}" path: List[bool] = [] # Track left or right for each depth for p, depth in enumerate(self.depths): r += "," while len(path) <= depth: if len(path) > 0: r += "{" path.append(False) r += self.subdescriptors[p].to_string_no_checksum() while len(path) > 0 and path[-1]: if len(path) > 0: r += "}" path.pop() if len(path) > 0: path[-1] = True r += ")" return r def is_segwit(self) -> bool: return True def _get_func_expr(s: str) -> Tuple[str, str]: """ Get the function name and then the expression inside :param s: The string that begins with a function name :return: The function name as the first element of the tuple, and the expression contained within the function as the second element :raises: ValueError: if a matching pair of parentheses cannot be found """ start = s.index("(") end = s.rindex(")") return s[0:start], s[start + 1:end] def _get_const(s: str, const: str) -> str: """ Get the first character of the string, make sure it is the expected character, and return the rest of the string :param s: The string that begins with a constant character :param const: The constant character :return: The remainder of the string without the constant character :raises: ValueError: if the first character is not the constant character """ if s[0] != const: raise ValueError(f"Expected '{const}' but got '{s[0]}'") return s[1:] def _get_expr(s: str) -> Tuple[str, str]: """ Extract the expression that ``s`` begins with. This will return the initial part of ``s``, up to the first comma or closing brace, skipping ones that are surrounded by braces. :param s: The string to extract the expression from :return: A pair with the first item being the extracted expression and the second the rest of the string """ level: int = 0 for i, c in enumerate(s): if c in ["(", "{"]: level += 1 elif level > 0 and c in [")", "}"]: level -= 1 elif level == 0 and c in [")", "}", ","]: break return s[0:i], s[i:] def parse_pubkey(expr: str, *, ctx: '_ParseDescriptorContext') -> Tuple['PubkeyProvider', str]: """ Parses an individual pubkey expression from a string that may contain more than one pubkey expression. :param expr: The expression to parse a pubkey expression from :return: The :class:`PubkeyProvider` that is parsed as the first item of a tuple, and the remainder of the expression as the second item. """ end = len(expr) comma_idx = expr.find(",") next_expr = "" if comma_idx != -1: end = comma_idx next_expr = expr[end + 1:] pubkey_provider = PubkeyProvider.parse(expr[:end]) permit_uncompressed = ctx in (_ParseDescriptorContext.TOP, _ParseDescriptorContext.P2SH) if not permit_uncompressed and pubkey_provider.has_uncompressed_pubkey(): raise ValueError("uncompressed pubkeys are not allowed") return pubkey_provider, next_expr class _ParseDescriptorContext(Enum): """ :meta private: Enum representing the level that we are in when parsing a descriptor. Some expressions aren't allowed at certain levels, this helps us track those. """ TOP = enum.auto() # The top level, not within any descriptor P2SH = enum.auto() # Within an sh() descriptor P2WPKH = enum.auto() # Within wpkh() descriptor P2WSH = enum.auto() # Within a wsh() descriptor P2TR = enum.auto() # Within a tr() descriptor def _parse_descriptor(desc: str, *, ctx: '_ParseDescriptorContext') -> 'Descriptor': """ :meta private: Parse a descriptor given the context level we are in. Used recursively to parse subdescriptors :param desc: The descriptor string to parse :param ctx: The :class:`_ParseDescriptorContext` indicating the level we are in :return: The parsed descriptor :raises: ValueError: if the descriptor is malformed """ func, expr = _get_func_expr(desc) if func == "pk": pubkey, expr = parse_pubkey(expr, ctx=ctx) if expr: raise ValueError("more than one pubkey in pk descriptor") return PKDescriptor(pubkey) if func == "pkh": if not (ctx == _ParseDescriptorContext.TOP or ctx == _ParseDescriptorContext.P2SH or ctx == _ParseDescriptorContext.P2WSH): raise ValueError("Can only have pkh at top level, in sh(), or in wsh()") pubkey, expr = parse_pubkey(expr, ctx=ctx) if expr: raise ValueError("More than one pubkey in pkh descriptor") return PKHDescriptor(pubkey) if func == "sortedmulti" or func == "multi": if not (ctx == _ParseDescriptorContext.TOP or ctx == _ParseDescriptorContext.P2SH or ctx == _ParseDescriptorContext.P2WSH): raise ValueError("Can only have multi/sortedmulti at top level, in sh(), or in wsh()") is_sorted = func == "sortedmulti" comma_idx = expr.index(",") thresh = int(expr[:comma_idx]) expr = expr[comma_idx + 1:] pubkeys = [] while expr: pubkey, expr = parse_pubkey(expr, ctx=ctx) pubkeys.append(pubkey) if len(pubkeys) == 0 or len(pubkeys) > 15: raise ValueError("Cannot have {} keys in a multisig; must have between 1 and 15 keys, inclusive".format(len(pubkeys))) elif thresh < 1: raise ValueError("Multisig threshold cannot be {}, must be at least 1".format(thresh)) elif thresh > len(pubkeys): raise ValueError("Multisig threshold cannot be larger than the number of keys; threshold is {} but only {} keys specified".format(thresh, len(pubkeys))) if ctx == _ParseDescriptorContext.TOP and len(pubkeys) > 3: raise ValueError("Cannot have {} pubkeys in bare multisig: only at most 3 pubkeys") return MultisigDescriptor(pubkeys, thresh, is_sorted) if func == "wpkh": if not (ctx == _ParseDescriptorContext.TOP or ctx == _ParseDescriptorContext.P2SH): raise ValueError("Can only have wpkh() at top level or inside sh()") pubkey, expr = parse_pubkey(expr, ctx=_ParseDescriptorContext.P2WPKH) if expr: raise ValueError("More than one pubkey in pkh descriptor") return WPKHDescriptor(pubkey) if func == "sh": if ctx != _ParseDescriptorContext.TOP: raise ValueError("Can only have sh() at top level") subdesc = _parse_descriptor(expr, ctx=_ParseDescriptorContext.P2SH) return SHDescriptor(subdesc) if func == "wsh": if not (ctx == _ParseDescriptorContext.TOP or ctx == _ParseDescriptorContext.P2SH): raise ValueError("Can only have wsh() at top level or inside sh()") subdesc = _parse_descriptor(expr, ctx=_ParseDescriptorContext.P2WSH) return WSHDescriptor(subdesc) if func == "tr": if ctx != _ParseDescriptorContext.TOP: raise ValueError("Can only have tr at top level") internal_key, expr = parse_pubkey(expr, ctx=ctx) subscripts = [] depths = [] if expr: # Path from top of the tree to what we're currently processing. # branches[i] == False: left branch in the i'th step from the top # branches[i] == true: right branch branches = [] while True: # Process open braces while True: try: expr = _get_const(expr, "{") branches.append(False) except ValueError: break if len(branches) > MAX_TAPROOT_NODES: raise ValueError(f"tr() supports at most {MAX_TAPROOT_NODES} nesting levels") # TODO xxxx fixed upstream bug here # Process script expression sarg, expr = _get_expr(expr) subscripts.append(_parse_descriptor(sarg, ctx=_ParseDescriptorContext.P2TR)) depths.append(len(branches)) # Process closing braces while len(branches) > 0 and branches[-1]: expr = _get_const(expr, "}") branches.pop() # If we're at the end of a left branch, expect a comma if len(branches) > 0 and not branches[-1]: expr = _get_const(expr, ",") branches[-1] = True if len(branches) == 0: break return TRDescriptor(internal_key, subscripts, depths) if ctx == _ParseDescriptorContext.P2SH: raise ValueError("A function is needed within P2SH") elif ctx == _ParseDescriptorContext.P2WSH: raise ValueError("A function is needed within P2WSH") raise ValueError("{} is not a valid descriptor function".format(func)) def parse_descriptor(desc: str) -> 'Descriptor': """ Parse a descriptor string into a :class:`Descriptor`. Validates the checksum if one is provided in the string :param desc: The descriptor string :return: The parsed :class:`Descriptor` :raises: ValueError: if the descriptor string is malformed """ i = desc.find("#") if i != -1: checksum = desc[i + 1:] desc = desc[:i] computed = DescriptorChecksum(desc) if computed != checksum: raise ValueError("The checksum does not match; Got {}, expected {}".format(checksum, computed)) return _parse_descriptor(desc, ctx=_ParseDescriptorContext.TOP) ##### def get_singlesig_descriptor_from_legacy_leaf(*, pubkey: str, script_type: str) -> Optional[Descriptor]: pubkey = PubkeyProvider.parse(pubkey) if script_type == 'p2pk': return PKDescriptor(pubkey=pubkey) elif script_type == 'p2pkh': return PKHDescriptor(pubkey=pubkey) elif script_type == 'p2wpkh': return WPKHDescriptor(pubkey=pubkey) elif script_type == 'p2wpkh-p2sh': wpkh = WPKHDescriptor(pubkey=pubkey) return SHDescriptor(subdescriptor=wpkh) else: raise NotImplementedError(f"unexpected {script_type=}") def create_dummy_descriptor_from_address(addr: Optional[str]) -> 'Descriptor': # It's not possible to tell the script type in general just from an address. # - "1" addresses are of course p2pkh # - "3" addresses are p2sh but we don't know the redeem script... # - "bc1" addresses (if they are 42-long) are p2wpkh # - "bc1" addresses that are 62-long are p2wsh but we don't know the script... # If we don't know the script, we _guess_ it is pubkeyhash. # As this method is used e.g. for tx size estimation, # the estimation will not be precise. def guess_script_type(addr: Optional[str]) -> str: if addr is None: return 'p2wpkh' # the default guess witver, witprog = segwit_addr.decode_segwit_address(constants.net.SEGWIT_HRP, addr) if witprog is not None: return 'p2wpkh' addrtype, hash_160_ = bitcoin.b58_address_to_hash160(addr) if addrtype == constants.net.ADDRTYPE_P2PKH: return 'p2pkh' elif addrtype == constants.net.ADDRTYPE_P2SH: return 'p2wpkh-p2sh' raise Exception(f'unrecognized address: {repr(addr)}') script_type = guess_script_type(addr) # guess pubkey-len to be 33-bytes: pubkey = ecc.GENERATOR.get_public_key_bytes(compressed=True).hex() desc = get_singlesig_descriptor_from_legacy_leaf(pubkey=pubkey, script_type=script_type) return desc