Browse Source

descriptors: wallet/transaction: construct intermediate osd

master
SomberNight 3 years ago
parent
commit
f1f39f0e82
No known key found for this signature in database
GPG Key ID: B33B5F232C6271E9
  1. 12
      electrum/bitcoin.py
  2. 7
      electrum/commands.py
  3. 142
      electrum/descriptor.py
  4. 3
      electrum/lnsweep.py
  5. 6
      electrum/lnutil.py
  6. 11
      electrum/tests/test_transaction.py
  7. 37
      electrum/transaction.py
  8. 108
      electrum/wallet.py

12
electrum/bitcoin.py

@ -421,15 +421,9 @@ def p2wsh_nested_script(witness_script: str) -> str:
return construct_script([0, wsh])
def pubkey_to_address(txin_type: str, pubkey: str, *, net=None) -> str:
if txin_type == 'p2pkh':
return public_key_to_p2pkh(bfh(pubkey), net=net)
elif txin_type == 'p2wpkh':
return public_key_to_p2wpkh(bfh(pubkey), net=net)
elif txin_type == 'p2wpkh-p2sh':
scriptSig = p2wpkh_nested_script(pubkey)
return hash160_to_p2sh(hash_160(bfh(scriptSig)), net=net)
else:
raise NotImplementedError(txin_type)
from . import descriptor
desc = descriptor.get_singlesig_descriptor_from_legacy_leaf(pubkey=pubkey, script_type=txin_type)
return desc.expand().address(net=net)
# TODO this method is confusingly named

7
electrum/commands.py

@ -66,6 +66,7 @@ from . import submarine_swaps
from . import GuiImportError
from . import crypto
from . import constants
from . import descriptor
if TYPE_CHECKING:
from .network import Network
@ -394,6 +395,8 @@ class Commands:
txin_type, privkey, compressed = bitcoin.deserialize_privkey(sec)
pubkey = ecc.ECPrivkey(privkey).get_public_key_hex(compressed=compressed)
keypairs[pubkey] = privkey, compressed
desc = descriptor.get_singlesig_descriptor_from_legacy_leaf(pubkey=pubkey, script_type=txin_type)
txin.script_descriptor = desc
txin.script_type = txin_type
txin.pubkeys = [bfh(pubkey)]
txin.num_sig = 1
@ -420,9 +423,11 @@ class Commands:
for priv in privkey:
txin_type, priv2, compressed = bitcoin.deserialize_privkey(priv)
pubkey = ecc.ECPrivkey(priv2).get_public_key_bytes(compressed=compressed)
address = bitcoin.pubkey_to_address(txin_type, pubkey.hex())
desc = descriptor.get_singlesig_descriptor_from_legacy_leaf(pubkey=pubkey.hex(), script_type=txin_type)
address = desc.expand().address()
if address in txins_dict.keys():
for txin in txins_dict[address]:
txin.script_descriptor = desc
txin.pubkeys = [pubkey]
txin.script_type = txin_type
tx.sign({pubkey.hex(): (priv2, compressed)})

142
electrum/descriptor.py

@ -1,4 +1,5 @@
# Copyright (c) 2017 Andrew Chow
# Copyright (c) 2023 The Electrum developers
# Distributed under the MIT software license, see the accompanying
# file LICENCE or http://www.opensource.org/licenses/mit-license.php
#
@ -22,6 +23,8 @@
from .bip32 import convert_bip32_path_to_list_of_uint32, BIP32Node, KeyOriginInfo
from . import bitcoin
from .bitcoin import construct_script, opcodes
from .crypto import hash_160, sha256
from binascii import unhexlify
@ -31,16 +34,41 @@ from typing import (
NamedTuple,
Optional,
Tuple,
Sequence,
)
MAX_TAPROOT_NODES = 128
class ExpandedScripts(NamedTuple):
output_script: Optional[bytes] = None
redeem_script: Optional[bytes] = None
witness_script: Optional[bytes] = None
class ExpandedScripts:
def __init__(
self,
*,
output_script: Optional[bytes] = None,
redeem_script: Optional[bytes] = None,
witness_script: Optional[bytes] = None,
scriptcode_for_sighash: Optional[bytes] = None
):
self.output_script = output_script
self.redeem_script = redeem_script
self.witness_script = witness_script
self.scriptcode_for_sighash = scriptcode_for_sighash
@property
def scriptcode_for_sighash(self) -> Optional[bytes]:
if self._scriptcode_for_sighash:
return self._scriptcode_for_sighash
return self.witness_script or self.redeem_script or self.output_script
@scriptcode_for_sighash.setter
def scriptcode_for_sighash(self, value: Optional[bytes]):
self._scriptcode_for_sighash = value
def address(self, *, net=None) -> Optional[str]:
if spk := self.output_script:
return bitcoin.script_to_address(spk.hex(), net=net)
def PolyMod(c: int, val: int) -> int:
@ -180,17 +208,24 @@ class PubkeyProvider(object):
s += self.deriv_path
return s
def get_pubkey_bytes(self, pos: int) -> bytes:
def get_pubkey_bytes(self, *, pos: Optional[int] = None) -> bytes:
if self.is_range() and pos is None:
raise ValueError("pos must be set for ranged descriptor")
if self.extkey is not None:
if self.deriv_path is not None:
compressed = True # bip32 implies compressed pubkeys
if self.deriv_path is None:
assert not self.is_range()
return self.extkey.eckey.get_public_key_bytes(compressed=compressed)
else:
path_str = self.deriv_path[1:]
if path_str[-1] == "*":
if self.is_range():
assert path_str[-1] == "*"
path_str = path_str[:-1] + str(pos)
path = convert_bip32_path_to_list_of_uint32(path_str)
child_key = self.extkey.subkey_at_public_derivation(path)
return child_key.eckey.get_public_key_bytes()
return child_key.eckey.get_public_key_bytes(compressed=compressed)
else:
return self.extkey.eckey.get_public_key_bytes()
assert not self.is_range()
return unhexlify(self.pubkey)
def get_full_derivation_path(self, pos: int) -> str:
@ -227,6 +262,13 @@ class PubkeyProvider(object):
def __lt__(self, other: 'PubkeyProvider') -> bool:
return self.pubkey < other.pubkey
def is_range(self) -> bool:
if not self.deriv_path:
return False
if self.deriv_path[-1] == "*": # TODO hardened
return True
return False
class Descriptor(object):
r"""
@ -268,12 +310,24 @@ class Descriptor(object):
"""
return AddChecksum(self.to_string_no_checksum())
def expand(self, pos: int) -> "ExpandedScripts":
def expand(self, *, pos: Optional[int] = None) -> "ExpandedScripts":
"""
Returns the scripts for a descriptor at the given `pos` for ranged descriptors.
"""
raise NotImplementedError("The Descriptor base class does not implement this method")
def is_range(self) -> bool:
for pubkey in self.pubkeys:
if pubkey.is_range():
return True
for desc in self.subdescriptors:
if desc.is_range():
return True
return False
def is_segwit(self) -> bool:
return any([desc.is_segwit() for desc in self.subdescriptors])
class PKDescriptor(Descriptor):
"""
@ -288,8 +342,10 @@ class PKDescriptor(Descriptor):
"""
super().__init__([pubkey], [], "pk")
# TODO
# def expand(self, pos: int) -> "ExpandedScripts":
def expand(self, *, pos: Optional[int] = None) -> "ExpandedScripts":
pubkey = self.pubkeys[0].get_pubkey_bytes(pos=pos)
script = construct_script([pubkey, opcodes.OP_CHECKSIG])
return ExpandedScripts(output_script=bytes.fromhex(script))
class PKHDescriptor(Descriptor):
@ -305,9 +361,11 @@ class PKHDescriptor(Descriptor):
"""
super().__init__([pubkey], [], "pkh")
def expand(self, pos: int) -> "ExpandedScripts":
script = b"\x76\xa9\x14" + hash_160(self.pubkeys[0].get_pubkey_bytes(pos)) + b"\x88\xac"
return ExpandedScripts(output_script=script)
def expand(self, *, pos: Optional[int] = None) -> "ExpandedScripts":
pubkey = self.pubkeys[0].get_pubkey_bytes(pos=pos)
pkh = hash_160(pubkey).hex()
script = bitcoin.pubkeyhash_to_p2pkh_script(pkh)
return ExpandedScripts(output_script=bytes.fromhex(script))
class WPKHDescriptor(Descriptor):
@ -323,9 +381,17 @@ class WPKHDescriptor(Descriptor):
"""
super().__init__([pubkey], [], "wpkh")
def expand(self, pos: int) -> "ExpandedScripts":
script = b"\x00\x14" + hash_160(self.pubkeys[0].get_pubkey_bytes(pos))
return ExpandedScripts(output_script=script)
def expand(self, *, pos: Optional[int] = None) -> "ExpandedScripts":
pkh = hash_160(self.pubkeys[0].get_pubkey_bytes(pos=pos))
output_script = construct_script([0, pkh])
scriptcode = bitcoin.pubkeyhash_to_p2pkh_script(pkh.hex())
return ExpandedScripts(
output_script=bytes.fromhex(output_script),
scriptcode_for_sighash=bytes.fromhex(scriptcode),
)
def is_segwit(self) -> bool:
return True
class MultisigDescriptor(Descriptor):
@ -352,14 +418,14 @@ class MultisigDescriptor(Descriptor):
def to_string_no_checksum(self) -> str:
return "{}({},{})".format(self.name, self.thresh, ",".join([p.to_string() for p in self.pubkeys]))
def expand(self, pos: int) -> "ExpandedScripts":
def expand(self, *, pos: Optional[int] = None) -> "ExpandedScripts":
if self.thresh > 16:
m = b"\x01" + self.thresh.to_bytes(1, "big")
else:
m = (self.thresh + 0x50).to_bytes(1, "big") if self.thresh > 0 else b"\x00"
n = (len(self.pubkeys) + 0x50).to_bytes(1, "big") if len(self.pubkeys) > 0 else b"\x00"
script: bytes = m
der_pks = [p.get_pubkey_bytes(pos) for p in self.pubkeys]
der_pks = [p.get_pubkey_bytes(pos=pos) for p in self.pubkeys]
if self.is_sorted:
der_pks.sort()
for pk in der_pks:
@ -382,14 +448,17 @@ class SHDescriptor(Descriptor):
"""
super().__init__([], [subdescriptor], "sh")
def expand(self, pos: int) -> "ExpandedScripts":
def expand(self, *, pos: Optional[int] = None) -> "ExpandedScripts":
assert len(self.subdescriptors) == 1
redeem_script, _, witness_script = self.subdescriptors[0].expand(pos)
sub_scripts = self.subdescriptors[0].expand(pos=pos)
redeem_script = sub_scripts.output_script
witness_script = sub_scripts.witness_script
script = b"\xa9\x14" + hash_160(redeem_script) + b"\x87"
return ExpandedScripts(
output_script=script,
redeem_script=redeem_script,
witness_script=witness_script,
scriptcode_for_sighash=sub_scripts.scriptcode_for_sighash,
)
@ -406,15 +475,19 @@ class WSHDescriptor(Descriptor):
"""
super().__init__([], [subdescriptor], "wsh")
def expand(self, pos: int) -> "ExpandedScripts":
def expand(self, *, pos: Optional[int] = None) -> "ExpandedScripts":
assert len(self.subdescriptors) == 1
witness_script, _, _ = self.subdescriptors[0].expand(pos)
sub_scripts = self.subdescriptors[0].expand(pos=pos)
witness_script = sub_scripts.output_script
script = b"\x00\x20" + sha256(witness_script)
return ExpandedScripts(
output_script=script,
witness_script=witness_script,
)
def is_segwit(self) -> bool:
return True
class TRDescriptor(Descriptor):
"""
@ -457,6 +530,10 @@ class TRDescriptor(Descriptor):
r += ")"
return r
def is_segwit(self) -> bool:
return True
def _get_func_expr(s: str) -> Tuple[str, str]:
"""
Get the function name and then the expression inside
@ -666,3 +743,20 @@ def parse_descriptor(desc: str) -> 'Descriptor':
raise ValueError("The checksum does not match; Got {}, expected {}".format(checksum, computed))
return _parse_descriptor(desc, _ParseDescriptorContext.TOP)
#####
def get_singlesig_descriptor_from_legacy_leaf(*, pubkey: str, script_type: str) -> Optional[Descriptor]:
pubkey = PubkeyProvider.parse(pubkey)
if script_type == 'p2pk':
return PKDescriptor(pubkey=pubkey)
elif script_type == 'p2pkh':
return PKHDescriptor(pubkey=pubkey)
elif script_type == 'p2wpkh':
return WPKHDescriptor(pubkey=pubkey)
elif script_type == 'p2wpkh-p2sh':
wpkh = WPKHDescriptor(pubkey=pubkey)
return SHDescriptor(subdescriptor=wpkh)
else:
raise NotImplementedError(f"unexpected {script_type=}")

3
electrum/lnsweep.py

@ -8,6 +8,7 @@ from enum import Enum, auto
from .util import bfh
from .bitcoin import redeem_script_to_address, dust_threshold, construct_witness
from .invoices import PR_PAID
from . import descriptor
from . import ecc
from .lnutil import (make_commitment_output_to_remote_address, make_commitment_output_to_local_witness_script,
derive_privkey, derive_pubkey, derive_blinded_pubkey, derive_blinded_privkey,
@ -513,6 +514,8 @@ def create_sweeptx_their_ctx_to_remote(
prevout = TxOutpoint(txid=bfh(ctx.txid()), out_idx=output_idx)
txin = PartialTxInput(prevout=prevout)
txin._trusted_value_sats = val
desc = descriptor.get_singlesig_descriptor_from_legacy_leaf(pubkey=our_payment_pubkey, script_type='p2wpkh')
txin.script_descriptor = desc
txin.script_type = 'p2wpkh'
txin.pubkeys = [bfh(our_payment_pubkey)]
txin.num_sig = 1

6
electrum/lnutil.py

@ -21,6 +21,7 @@ from .transaction import (Transaction, PartialTransaction, PartialTxInput, TxOut
PartialTxOutput, opcodes, TxOutput)
from .ecc import CURVE_ORDER, sig_string_from_der_sig, ECPubkey, string_to_number
from . import ecc, bitcoin, crypto, transaction
from . import descriptor
from .bitcoin import (push_script, redeem_script_to_address, address_to_script,
construct_witness, construct_script)
from . import segwit_addr
@ -818,6 +819,11 @@ def make_funding_input(local_funding_pubkey: bytes, remote_funding_pubkey: bytes
# commitment tx input
prevout = TxOutpoint(txid=bfh(funding_txid), out_idx=funding_pos)
c_input = PartialTxInput(prevout=prevout)
ppubkeys = [descriptor.PubkeyProvider.parse(pk) for pk in pubkeys]
multi = descriptor.MultisigDescriptor(pubkeys=ppubkeys, thresh=2, is_sorted=True)
c_input.script_descriptor = descriptor.WSHDescriptor(subdescriptor=multi)
c_input.script_type = 'p2wsh'
c_input.pubkeys = [bfh(pk) for pk in pubkeys]
c_input.num_sig = 2

11
electrum/tests/test_transaction.py

@ -9,8 +9,9 @@ from electrum.util import bfh
from electrum.bitcoin import (deserialize_privkey, opcodes,
construct_script, construct_witness)
from electrum.ecc import ECPrivkey
from .test_bitcoin import disable_ecdsa_r_value_grinding
from electrum import descriptor
from .test_bitcoin import disable_ecdsa_r_value_grinding
from . import ElectrumTestCase
signed_blob = '01000000012a5c9a94fcde98f5581cd00162c60a13936ceb75389ea65bf38633b424eb4031000000006c493046022100a82bbc57a0136751e5433f41cf000b3f1a99c6744775e76ec764fb78c54ee100022100f9e80b7de89de861dc6fb0c1429d5da72c2b6b2ee2406bc9bfb1beedd729d985012102e61d176da16edd1d258a200ad9759ef63adf8e14cd97f53227bae35cdb84d2f6ffffffff0140420f00000000001976a914230ac37834073a42146f11ef8414ae929feaafc388ac00000000'
@ -89,8 +90,12 @@ class TestTransaction(ElectrumTestCase):
def test_tx_update_signatures(self):
tx = tx_from_any("cHNidP8BAFUBAAAAASpcmpT83pj1WBzQAWLGChOTbOt1OJ6mW/OGM7Qk60AxAAAAAAD/////AUBCDwAAAAAAGXapFCMKw3g0BzpCFG8R74QUrpKf6q/DiKwAAAAAAAAA")
tx.inputs()[0].script_type = 'p2pkh'
tx.inputs()[0].pubkeys = [bfh('02e61d176da16edd1d258a200ad9759ef63adf8e14cd97f53227bae35cdb84d2f6')]
pubkey = bfh('02e61d176da16edd1d258a200ad9759ef63adf8e14cd97f53227bae35cdb84d2f6')
script_type = 'p2pkh'
desc = descriptor.get_singlesig_descriptor_from_legacy_leaf(pubkey=pubkey.hex(), script_type=script_type)
tx.inputs()[0].script_descriptor = desc
tx.inputs()[0].script_type = script_type
tx.inputs()[0].pubkeys = [pubkey]
tx.inputs()[0].num_sig = 1
tx.update_signatures(signed_blob_signatures)
self.assertEqual(tx.serialize(), signed_blob)

37
electrum/transaction.py

@ -52,6 +52,7 @@ from .bitcoin import (TYPE_ADDRESS, TYPE_SCRIPT, hash_160,
from .crypto import sha256d
from .logging import get_logger
from .util import ShortID
from .descriptor import Descriptor
if TYPE_CHECKING:
from .wallet import Abstract_Wallet
@ -821,6 +822,12 @@ class Transaction:
if txin.is_native_segwit():
return ''
if desc := txin.script_descriptor:
if desc.is_segwit():
if redeem_script := desc.expand().redeem_script:
return construct_script([redeem_script])
return ""
_type = txin.script_type
pubkeys, sig_list = self.get_siglist(txin, estimate_size=estimate_size)
if _type in ('address', 'unknown') and estimate_size:
@ -833,18 +840,10 @@ class Transaction:
return construct_script([0, *sig_list, redeem_script])
elif _type == 'p2pkh':
return construct_script([sig_list[0], pubkeys[0]])
elif _type in ['p2wpkh', 'p2wsh']:
return ''
elif _type == 'p2wpkh-p2sh':
assert estimate_size # otherwise script_descriptor should handle it
redeem_script = bitcoin.p2wpkh_nested_script(pubkeys[0])
return construct_script([redeem_script])
elif _type == 'p2wsh-p2sh':
if estimate_size:
witness_script = ''
else:
witness_script = self.get_preimage_script(txin)
redeem_script = bitcoin.p2wsh_nested_script(witness_script)
return construct_script([redeem_script])
raise UnknownTxinType(f'cannot construct scriptSig for txin_type: {_type}')
@classmethod
@ -858,17 +857,11 @@ class Transaction:
raise Exception('OP_CODESEPARATOR black magic is not supported')
return txin.redeem_script.hex()
pubkeys = [pk.hex() for pk in txin.pubkeys]
if txin.script_type in ['p2sh', 'p2wsh', 'p2wsh-p2sh']:
return multisig_script(pubkeys, txin.num_sig)
elif txin.script_type in ['p2pkh', 'p2wpkh', 'p2wpkh-p2sh']:
pubkey = pubkeys[0]
pkh = hash_160(bfh(pubkey)).hex()
return bitcoin.pubkeyhash_to_p2pkh_script(pkh)
elif txin.script_type == 'p2pk':
pubkey = pubkeys[0]
return bitcoin.public_key_to_p2pk_script(pubkey)
else:
if desc := txin.script_descriptor:
sc = desc.expand()
if script := sc.scriptcode_for_sighash:
return script.hex()
raise Exception(f"don't know scriptcode for descriptor: {desc.to_string()}")
raise UnknownTxinType(f'cannot construct preimage_script for txin_type: {txin.script_type}')
def _calc_bip143_shared_txdigest_fields(self) -> BIP143SharedTxDigestFields:
@ -1255,6 +1248,7 @@ class PartialTxInput(TxInput, PSBTSection):
self.witness_script = None # type: Optional[bytes]
self._unknown = {} # type: Dict[bytes, bytes]
self.script_descriptor = None # type: Optional[Descriptor]
self.script_type = 'unknown'
self.num_sig = 0 # type: int # num req sigs for multisig
self.pubkeys = [] # type: List[bytes] # note: order matters
@ -1296,6 +1290,7 @@ class PartialTxInput(TxInput, PSBTSection):
'height': self.block_height,
'value_sats': self.value_sats(),
'address': self.address,
'desc': self.script_descriptor.to_string() if self.script_descriptor else None,
'utxo': str(self.utxo) if self.utxo else None,
'witness_utxo': self.witness_utxo.serialize_to_network().hex() if self.witness_utxo else None,
'sighash': self.sighash,
@ -1614,6 +1609,7 @@ class PartialTxOutput(TxOutput, PSBTSection):
self.bip32_paths = {} # type: Dict[bytes, Tuple[bytes, Sequence[int]]] # pubkey -> (xpub_fingerprint, path)
self._unknown = {} # type: Dict[bytes, bytes]
self.script_descriptor = None # type: Optional[Descriptor]
self.script_type = 'unknown'
self.num_sig = 0 # num req sigs for multisig
self.pubkeys = [] # type: List[bytes] # note: order matters
@ -1623,6 +1619,7 @@ class PartialTxOutput(TxOutput, PSBTSection):
def to_json(self):
d = super().to_json()
d.update({
'desc': self.script_descriptor.to_string() if self.script_descriptor else None,
'redeem_script': self.redeem_script.hex() if self.redeem_script else None,
'witness_script': self.witness_script.hex() if self.witness_script else None,
'bip32_paths': {pubkey.hex(): (xfp.hex(), bip32.convert_bip32_intpath_to_strpath(path))

108
electrum/wallet.py

@ -84,6 +84,8 @@ from .lnworker import LNWallet
from .paymentrequest import PaymentRequest
from .util import read_json_file, write_json_file, UserFacingException, FileImportFailed
from .util import EventListener, event_listener
from . import descriptor
from .descriptor import Descriptor
if TYPE_CHECKING:
from .network import Network
@ -100,16 +102,17 @@ TX_STATUS = [
]
async def _append_utxos_to_inputs(*, inputs: List[PartialTxInput], network: 'Network',
pubkey: str, txin_type: str, imax: int) -> None:
if txin_type in ('p2pkh', 'p2wpkh', 'p2wpkh-p2sh'):
address = bitcoin.pubkey_to_address(txin_type, pubkey)
scripthash = bitcoin.address_to_scripthash(address)
elif txin_type == 'p2pk':
script = bitcoin.public_key_to_p2pk_script(pubkey)
async def _append_utxos_to_inputs(
*,
inputs: List[PartialTxInput],
network: 'Network',
script_descriptor: 'descriptor.Descriptor',
pubkey: str,
txin_type: str,
imax: int,
) -> None:
script = script_descriptor.expand().output_script.hex()
scripthash = bitcoin.script_to_scripthash(script)
else:
raise Exception(f'unexpected txin_type to sweep: {txin_type}')
async def append_single_utxo(item):
prev_tx_raw = await network.get_transaction(item['tx_hash'])
@ -122,6 +125,9 @@ async def _append_utxos_to_inputs(*, inputs: List[PartialTxInput], network: 'Net
txin = PartialTxInput(prevout=prevout)
txin.utxo = prev_tx
txin.block_height = int(item['height'])
txin.script_descriptor = script_descriptor
# TODO rm as much of below (.num_sig / .pubkeys) as possible
# TODO need unit tests for other scripts (only have p2pk atm)
txin.script_type = txin_type
txin.pubkeys = [bfh(pubkey)]
txin.num_sig = 1
@ -141,9 +147,11 @@ async def sweep_preparations(privkeys, network: 'Network', imax=100):
async def find_utxos_for_privkey(txin_type, privkey, compressed):
pubkey = ecc.ECPrivkey(privkey).get_public_key_hex(compressed=compressed)
desc = descriptor.get_singlesig_descriptor_from_legacy_leaf(pubkey=pubkey, script_type=txin_type)
await _append_utxos_to_inputs(
inputs=inputs,
network=network,
script_descriptor=desc,
pubkey=pubkey,
txin_type=txin_type,
imax=imax)
@ -684,13 +692,19 @@ class Abstract_Wallet(ABC, Logger, EventListener):
"""
pass
@abstractmethod
def get_redeem_script(self, address: str) -> Optional[str]:
pass
desc = self._get_script_descriptor_for_address(address)
if desc is None: return None
redeem_script = desc.expand().redeem_script
if redeem_script:
return redeem_script.hex()
@abstractmethod
def get_witness_script(self, address: str) -> Optional[str]:
pass
desc = self._get_script_descriptor_for_address(address)
if desc is None: return None
witness_script = desc.expand().witness_script
if witness_script:
return witness_script.hex()
@abstractmethod
def get_txin_type(self, address: str) -> str:
@ -2193,7 +2207,8 @@ class Abstract_Wallet(ABC, Logger, EventListener):
if self.lnworker:
self.lnworker.swap_manager.add_txin_info(txin)
return
# set script_type first, as later checks might rely on it:
txin.script_descriptor = self._get_script_descriptor_for_address(address)
# set script_type first, as later checks might rely on it: # TODO rm most of below in favour of osd
txin.script_type = self.get_txin_type(address)
txin.num_sig = self.m if isinstance(self, Multisig_Wallet) else 1
if txin.redeem_script is None:
@ -2211,6 +2226,34 @@ class Abstract_Wallet(ABC, Logger, EventListener):
self._add_input_sig_info(txin, address, only_der_suffix=only_der_suffix)
txin.block_height = self.adb.get_tx_height(txin.prevout.txid.hex()).height
def _get_script_descriptor_for_address(self, address: str) -> Optional[Descriptor]:
if not self.is_mine(address):
return None
script_type = self.get_txin_type(address)
if script_type in ('address', 'unknown'):
return None
if script_type in ('p2pk', 'p2pkh', 'p2wpkh-p2sh', 'p2wpkh'):
pubkey = self.get_public_keys(address)[0]
return descriptor.get_singlesig_descriptor_from_legacy_leaf(pubkey=pubkey, script_type=script_type)
elif script_type == 'p2sh':
pubkeys = self.get_public_keys(address)
pubkeys = [descriptor.PubkeyProvider.parse(pk) for pk in pubkeys]
multi = descriptor.MultisigDescriptor(pubkeys=pubkeys, thresh=self.m, is_sorted=True)
return descriptor.SHDescriptor(subdescriptor=multi)
elif script_type == 'p2wsh':
pubkeys = self.get_public_keys(address)
pubkeys = [descriptor.PubkeyProvider.parse(pk) for pk in pubkeys]
multi = descriptor.MultisigDescriptor(pubkeys=pubkeys, thresh=self.m, is_sorted=True)
return descriptor.WSHDescriptor(subdescriptor=multi)
elif script_type == 'p2wsh-p2sh':
pubkeys = self.get_public_keys(address)
pubkeys = [descriptor.PubkeyProvider.parse(pk) for pk in pubkeys]
multi = descriptor.MultisigDescriptor(pubkeys=pubkeys, thresh=self.m, is_sorted=True)
wsh = descriptor.WSHDescriptor(subdescriptor=multi)
return descriptor.SHDescriptor(subdescriptor=wsh)
else:
raise NotImplementedError(f"unexpected {script_type=}")
def can_sign(self, tx: Transaction) -> bool:
if not isinstance(tx, PartialTransaction):
return False
@ -2262,6 +2305,7 @@ class Abstract_Wallet(ABC, Logger, EventListener):
is_mine = self._learn_derivation_path_for_address_from_txinout(txout, address)
if not is_mine:
return
txout.script_descriptor = self._get_script_descriptor_for_address(address)
txout.script_type = self.get_txin_type(address)
txout.is_mine = True
txout.is_change = self.is_change(address)
@ -2938,20 +2982,6 @@ class Simple_Wallet(Abstract_Wallet):
def get_public_keys(self, address: str) -> Sequence[str]:
return [self.get_public_key(address)]
def get_redeem_script(self, address: str) -> Optional[str]:
txin_type = self.get_txin_type(address)
if txin_type in ('p2pkh', 'p2wpkh', 'p2pk'):
return None
if txin_type == 'p2wpkh-p2sh':
pubkey = self.get_public_key(address)
return bitcoin.p2wpkh_nested_script(pubkey)
if txin_type == 'address':
return None
raise UnknownTxinType(f'unexpected txin_type {txin_type}')
def get_witness_script(self, address: str) -> Optional[str]:
return None
class Imported_Wallet(Simple_Wallet):
# wallet made of imported addresses
@ -3463,28 +3493,6 @@ class Multisig_Wallet(Deterministic_Wallet):
def pubkeys_to_scriptcode(self, pubkeys: Sequence[str]) -> str:
return transaction.multisig_script(sorted(pubkeys), self.m)
def get_redeem_script(self, address):
txin_type = self.get_txin_type(address)
pubkeys = self.get_public_keys(address)
scriptcode = self.pubkeys_to_scriptcode(pubkeys)
if txin_type == 'p2sh':
return scriptcode
elif txin_type == 'p2wsh-p2sh':
return bitcoin.p2wsh_nested_script(scriptcode)
elif txin_type == 'p2wsh':
return None
raise UnknownTxinType(f'unexpected txin_type {txin_type}')
def get_witness_script(self, address):
txin_type = self.get_txin_type(address)
pubkeys = self.get_public_keys(address)
scriptcode = self.pubkeys_to_scriptcode(pubkeys)
if txin_type == 'p2sh':
return None
elif txin_type in ('p2wsh-p2sh', 'p2wsh'):
return scriptcode
raise UnknownTxinType(f'unexpected txin_type {txin_type}')
def derive_pubkeys(self, c, i):
return [k.derive_pubkey(c, i).hex() for k in self.get_keystores()]

Loading…
Cancel
Save