diff --git a/electrum/descriptor.py b/electrum/descriptor.py index 59abd49be..612ad6947 100644 --- a/electrum/descriptor.py +++ b/electrum/descriptor.py @@ -13,6 +13,9 @@ # TODO impl ADDR descriptors # TODO impl RAW descriptors # TODO disable descs we cannot solve: TRDescriptor +# TODO add checks to validate nestings +# https://github.com/bitcoin/bitcoin/blob/94070029fb6b783833973f9fe08a3a871994492f/doc/descriptors.md#reference +# e.g. sh is top-level only, wsh is top-level or directly inside sh # # TODO tests # - port https://github.com/bitcoin-core/HWI/blob/master/test/test_descriptor.py @@ -24,8 +27,9 @@ from .bip32 import convert_bip32_path_to_list_of_uint32, BIP32Node, KeyOriginInfo from . import bitcoin -from .bitcoin import construct_script, opcodes +from .bitcoin import construct_script, opcodes, construct_witness from .crypto import hash_160, sha256 +from .util import bfh from binascii import unhexlify from enum import Enum @@ -35,6 +39,7 @@ from typing import ( Optional, Tuple, Sequence, + Mapping, ) @@ -71,6 +76,18 @@ class ExpandedScripts: return bitcoin.script_to_address(spk.hex(), net=net) +class ScriptSolutionInner(NamedTuple): + witness_items: Optional[Sequence] = None + + +class ScriptSolutionTop(NamedTuple): + witness: Optional[bytes] = None + script_sig: Optional[bytes] = None + + +class MissingSolutionPiece(Exception): pass + + def PolyMod(c: int, val: int) -> int: """ :meta private: @@ -316,6 +333,38 @@ class Descriptor(object): """ raise NotImplementedError("The Descriptor base class does not implement this method") + def _satisfy_inner( + self, + *, + sigdata: Mapping[bytes, bytes] = None, # pubkey -> sig + allow_dummy: bool = False, + ) -> ScriptSolutionInner: + raise NotImplementedError("The Descriptor base class does not implement this method") + + def satisfy( + self, + *, + sigdata: Mapping[bytes, bytes] = None, # pubkey -> sig + allow_dummy: bool = False, + ) -> ScriptSolutionTop: + """Construct a witness and/or scriptSig to be used in a txin, to satisfy the bitcoin SCRIPT. + + Raises MissingSolutionPiece if satisfaction is not yet possible due to e.g. missing a signature, + unless `allow_dummy` is set to True, in which case dummy data is used where needed (e.g. for size estimation). + """ + assert not self.is_range() + sol = self._satisfy_inner(sigdata=sigdata, allow_dummy=allow_dummy) + witness = None + script_sig = None + if self.is_segwit(): + witness = bfh(construct_witness(sol.witness_items)) + else: + script_sig = bfh(construct_script(sol.witness_items)) + return ScriptSolutionTop( + witness=witness, + script_sig=script_sig, + ) + def is_range(self) -> bool: for pubkey in self.pubkeys: if pubkey.is_range(): @@ -347,6 +396,20 @@ class PKDescriptor(Descriptor): script = construct_script([pubkey, opcodes.OP_CHECKSIG]) return ExpandedScripts(output_script=bytes.fromhex(script)) + def _satisfy_inner(self, *, sigdata=None, allow_dummy=False) -> ScriptSolutionInner: + if sigdata is None: sigdata = {} + assert not self.is_range() + assert not self.subdescriptors + pubkey = self.pubkeys[0].get_pubkey_bytes() + sig = sigdata.get(pubkey) + if sig is None and allow_dummy: + sig = 72 * b"\x00" + if sig is None: + raise MissingSolutionPiece(f"no sig for {pubkey.hex()}") + return ScriptSolutionInner( + witness_items=(sig,), + ) + class PKHDescriptor(Descriptor): """ @@ -367,6 +430,20 @@ class PKHDescriptor(Descriptor): script = bitcoin.pubkeyhash_to_p2pkh_script(pkh) return ExpandedScripts(output_script=bytes.fromhex(script)) + def _satisfy_inner(self, *, sigdata=None, allow_dummy=False) -> ScriptSolutionInner: + if sigdata is None: sigdata = {} + assert not self.is_range() + assert not self.subdescriptors + pubkey = self.pubkeys[0].get_pubkey_bytes() + sig = sigdata.get(pubkey) + if sig is None and allow_dummy: + sig = 72 * b"\x00" + if sig is None: + raise MissingSolutionPiece(f"no sig for {pubkey.hex()}") + return ScriptSolutionInner( + witness_items=(sig, pubkey), + ) + class WPKHDescriptor(Descriptor): """ @@ -390,6 +467,20 @@ class WPKHDescriptor(Descriptor): scriptcode_for_sighash=bytes.fromhex(scriptcode), ) + def _satisfy_inner(self, *, sigdata=None, allow_dummy=False) -> ScriptSolutionInner: + if sigdata is None: sigdata = {} + assert not self.is_range() + assert not self.subdescriptors + pubkey = self.pubkeys[0].get_pubkey_bytes() + sig = sigdata.get(pubkey) + if sig is None and allow_dummy: + sig = 72 * b"\x00" + if sig is None: + raise MissingSolutionPiece(f"no sig for {pubkey.hex()}") + return ScriptSolutionInner( + witness_items=(sig, pubkey), + ) + def is_segwit(self) -> bool: return True @@ -410,6 +501,8 @@ class MultisigDescriptor(Descriptor): :param is_sorted: Whether this is a ``sortedmulti()`` descriptor """ super().__init__(pubkeys, [], "sortedmulti" if is_sorted else "multi") + if not (1 <= thresh <= len(pubkeys) <= 15): + raise ValueError(f'{thresh=}, {len(pubkeys)=}') self.thresh = thresh self.is_sorted = is_sorted if self.is_sorted: @@ -419,21 +512,35 @@ class MultisigDescriptor(Descriptor): return "{}({},{})".format(self.name, self.thresh, ",".join([p.to_string() for p in self.pubkeys])) def expand(self, *, pos: Optional[int] = None) -> "ExpandedScripts": - if self.thresh > 16: - m = b"\x01" + self.thresh.to_bytes(1, "big") - else: - m = (self.thresh + 0x50).to_bytes(1, "big") if self.thresh > 0 else b"\x00" - n = (len(self.pubkeys) + 0x50).to_bytes(1, "big") if len(self.pubkeys) > 0 else b"\x00" - script: bytes = m der_pks = [p.get_pubkey_bytes(pos=pos) for p in self.pubkeys] if self.is_sorted: der_pks.sort() - for pk in der_pks: - script += len(pk).to_bytes(1, "big") + pk - script += n + b"\xae" - + script = bfh(construct_script([self.thresh, *der_pks, len(der_pks), opcodes.OP_CHECKMULTISIG])) return ExpandedScripts(output_script=script) + def _satisfy_inner(self, *, sigdata=None, allow_dummy=False) -> ScriptSolutionInner: + if sigdata is None: sigdata = {} + assert not self.is_range() + assert not self.subdescriptors + der_pks = [p.get_pubkey_bytes() for p in self.pubkeys] + if self.is_sorted: + der_pks.sort() + signatures = [] + for pubkey in der_pks: + if sig := sigdata.get(pubkey): + signatures.append(sig) + if len(signatures) >= self.thresh: + break + if allow_dummy: + dummy_sig = 72 * b"\x00" + signatures += (self.thresh - len(signatures)) * [dummy_sig] + if len(signatures) < self.thresh: + raise MissingSolutionPiece(f"not enough sigs") + assert len(signatures) == self.thresh, f"thresh={self.thresh}, but got {len(signatures)} sigs" + return ScriptSolutionInner( + witness_items=(0, *signatures), + ) + class SHDescriptor(Descriptor): """ @@ -453,7 +560,7 @@ class SHDescriptor(Descriptor): sub_scripts = self.subdescriptors[0].expand(pos=pos) redeem_script = sub_scripts.output_script witness_script = sub_scripts.witness_script - script = b"\xa9\x14" + hash_160(redeem_script) + b"\x87" + script = bfh(construct_script([opcodes.OP_HASH160, hash_160(redeem_script), opcodes.OP_EQUAL])) return ExpandedScripts( output_script=script, redeem_script=redeem_script, @@ -461,6 +568,26 @@ class SHDescriptor(Descriptor): scriptcode_for_sighash=sub_scripts.scriptcode_for_sighash, ) + def _satisfy_inner(self, *, sigdata=None, allow_dummy=False) -> ScriptSolutionInner: + raise Exception("does not make sense for sh()") + + def satisfy(self, *, sigdata=None, allow_dummy=False) -> ScriptSolutionTop: + assert not self.is_range() + assert len(self.subdescriptors) == 1 + subdesc = self.subdescriptors[0] + redeem_script = self.expand().redeem_script + witness = None + if isinstance(subdesc, (WSHDescriptor, WPKHDescriptor)): # witness_v0 nested in p2sh + witness = subdesc.satisfy(sigdata=sigdata, allow_dummy=allow_dummy).witness + script_sig = bfh(construct_script([redeem_script])) + else: # legacy p2sh + subsol = subdesc._satisfy_inner(sigdata=sigdata, allow_dummy=allow_dummy) + script_sig = bfh(construct_script([*subsol.witness_items, redeem_script])) + return ScriptSolutionTop( + witness=witness, + script_sig=script_sig, + ) + class WSHDescriptor(Descriptor): """ @@ -479,12 +606,25 @@ class WSHDescriptor(Descriptor): assert len(self.subdescriptors) == 1 sub_scripts = self.subdescriptors[0].expand(pos=pos) witness_script = sub_scripts.output_script - script = b"\x00\x20" + sha256(witness_script) + output_script = bfh(construct_script([0, sha256(witness_script)])) return ExpandedScripts( - output_script=script, + output_script=output_script, witness_script=witness_script, ) + def _satisfy_inner(self, *, sigdata=None, allow_dummy=False) -> ScriptSolutionInner: + raise Exception("does not make sense for wsh()") + + def satisfy(self, *, sigdata=None, allow_dummy=False) -> ScriptSolutionTop: + assert not self.is_range() + assert len(self.subdescriptors) == 1 + subsol = self.subdescriptors[0]._satisfy_inner(sigdata=sigdata, allow_dummy=allow_dummy) + witness_script = self.expand().witness_script + witness = construct_witness([*subsol.witness_items, witness_script]) + return ScriptSolutionTop( + witness=bytes.fromhex(witness), + ) + def is_segwit(self) -> bool: return True diff --git a/electrum/transaction.py b/electrum/transaction.py index 6cb02cefa..d9047336d 100644 --- a/electrum/transaction.py +++ b/electrum/transaction.py @@ -52,7 +52,7 @@ from .bitcoin import (TYPE_ADDRESS, TYPE_SCRIPT, hash_160, from .crypto import sha256d from .logging import get_logger from .util import ShortID -from .descriptor import Descriptor +from .descriptor import Descriptor, MissingSolutionPiece if TYPE_CHECKING: from .wallet import Abstract_Wallet @@ -774,6 +774,14 @@ class Transaction: if estimate_size and txin.witness_sizehint is not None: return '00' * txin.witness_sizehint + + if desc := txin.script_descriptor: + sol = desc.satisfy(allow_dummy=estimate_size, sigdata=txin.part_sigs) + if sol.witness is not None: + return sol.witness.hex() + return construct_witness([]) + + assert estimate_size # TODO xxxxx if _type in ('address', 'unknown') and estimate_size: _type = cls.guess_txintype_from_address(txin.address) pubkeys, sig_list = cls.get_siglist(txin, estimate_size=estimate_size) @@ -827,7 +835,12 @@ class Transaction: if redeem_script := desc.expand().redeem_script: return construct_script([redeem_script]) return "" + sol = desc.satisfy(allow_dummy=estimate_size, sigdata=txin.part_sigs) + if sol.script_sig is not None: + return sol.script_sig.hex() + return "" + assert estimate_size # TODO xxxxx _type = txin.script_type pubkeys, sig_list = self.get_siglist(txin, estimate_size=estimate_size) if _type in ('address', 'unknown') and estimate_size: @@ -841,7 +854,6 @@ class Transaction: elif _type == 'p2pkh': return construct_script([sig_list[0], pubkeys[0]]) elif _type == 'p2wpkh-p2sh': - assert estimate_size # otherwise script_descriptor should handle it redeem_script = bitcoin.p2wpkh_nested_script(pubkeys[0]) return construct_script([redeem_script]) raise UnknownTxinType(f'cannot construct scriptSig for txin_type: {_type}') @@ -1486,17 +1498,13 @@ class PartialTxInput(TxInput, PSBTSection): return True if self.script_sig is not None and not self.is_segwit(): return True - signatures = list(self.part_sigs.values()) - s = len(signatures) - # note: The 'script_type' field is currently only set by the wallet, - # for its own addresses. This means we can only finalize inputs - # that are related to the wallet. - # The 'fix' would be adding extra logic that matches on templates, - # and figures out the script_type from available fields. - if self.script_type in ('p2pk', 'p2pkh', 'p2wpkh', 'p2wpkh-p2sh'): - return s >= 1 - if self.script_type in ('p2sh', 'p2wsh', 'p2wsh-p2sh'): - return s >= self.num_sig + if desc := self.script_descriptor: + try: + desc.satisfy(allow_dummy=False, sigdata=self.part_sigs) + except MissingSolutionPiece: + pass + else: + return True return False def finalize(self) -> None: @@ -1589,6 +1597,8 @@ class PartialTxInput(TxInput, PSBTSection): return False if self.witness_script: return True + if desc := self.script_descriptor: + return desc.is_segwit() _type = self.script_type if _type == 'address' and guess_for_address: _type = Transaction.guess_txintype_from_address(self.address)