|
|
|
|
@ -33,7 +33,7 @@ import sys
|
|
|
|
|
import io |
|
|
|
|
import base64 |
|
|
|
|
from typing import (Sequence, Union, NamedTuple, Tuple, Optional, Iterable, |
|
|
|
|
Callable, List, Dict, Set, TYPE_CHECKING) |
|
|
|
|
Callable, List, Dict, Set, TYPE_CHECKING, Mapping) |
|
|
|
|
from collections import defaultdict |
|
|
|
|
from enum import IntEnum |
|
|
|
|
import itertools |
|
|
|
|
@ -47,8 +47,7 @@ from .util import profiler, to_bytes, bfh, chunks, is_hex_str, parse_max_spend
|
|
|
|
|
from .bitcoin import (TYPE_ADDRESS, TYPE_SCRIPT, hash_160, |
|
|
|
|
hash160_to_p2sh, hash160_to_p2pkh, hash_to_segwit_addr, |
|
|
|
|
var_int, TOTAL_COIN_SUPPLY_LIMIT_IN_BTC, COIN, |
|
|
|
|
int_to_hex, push_script, b58_address_to_hash160, |
|
|
|
|
opcodes, add_number_to_script, base_decode, |
|
|
|
|
opcodes, base_decode, |
|
|
|
|
base_encode, construct_witness, construct_script) |
|
|
|
|
from .crypto import sha256d |
|
|
|
|
from .logging import get_logger |
|
|
|
|
@ -138,13 +137,13 @@ class TxOutput:
|
|
|
|
|
|
|
|
|
|
@classmethod |
|
|
|
|
def from_address_and_value(cls, address: str, value: Union[int, str]) -> Union['TxOutput', 'PartialTxOutput']: |
|
|
|
|
return cls(scriptpubkey=bfh(bitcoin.address_to_script(address)), |
|
|
|
|
return cls(scriptpubkey=bitcoin.address_to_script(address), |
|
|
|
|
value=value) |
|
|
|
|
|
|
|
|
|
def serialize_to_network(self) -> bytes: |
|
|
|
|
buf = int.to_bytes(self.value, 8, byteorder="little", signed=False) |
|
|
|
|
script = self.scriptpubkey |
|
|
|
|
buf += bfh(var_int(len(script.hex()) // 2)) |
|
|
|
|
buf += var_int(len(script)) |
|
|
|
|
buf += script |
|
|
|
|
return buf |
|
|
|
|
|
|
|
|
|
@ -212,9 +211,9 @@ class TxOutput:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class BIP143SharedTxDigestFields(NamedTuple): |
|
|
|
|
hashPrevouts: str |
|
|
|
|
hashSequence: str |
|
|
|
|
hashOutputs: str |
|
|
|
|
hashPrevouts: bytes |
|
|
|
|
hashSequence: bytes |
|
|
|
|
hashOutputs: bytes |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TxOutpoint(NamedTuple): |
|
|
|
|
@ -241,7 +240,7 @@ class TxOutpoint(NamedTuple):
|
|
|
|
|
return [self.txid.hex(), self.out_idx] |
|
|
|
|
|
|
|
|
|
def serialize_to_network(self) -> bytes: |
|
|
|
|
return self.txid[::-1] + bfh(int_to_hex(self.out_idx, 4)) |
|
|
|
|
return self.txid[::-1] + int.to_bytes(self.out_idx, length=4, byteorder="little", signed=False) |
|
|
|
|
|
|
|
|
|
def is_coinbase(self) -> bool: |
|
|
|
|
return self.txid == bytes(32) |
|
|
|
|
@ -356,9 +355,9 @@ class TxInput:
|
|
|
|
|
# Prev hash and index |
|
|
|
|
s = self.prevout.serialize_to_network() |
|
|
|
|
# Script length, script, sequence |
|
|
|
|
s += bytes.fromhex(var_int(len(script_sig))) |
|
|
|
|
s += var_int(len(script_sig)) |
|
|
|
|
s += script_sig |
|
|
|
|
s += bytes.fromhex(int_to_hex(self.nsequence, 4)) |
|
|
|
|
s += int.to_bytes(self.nsequence, length=4, byteorder="little", signed=False) |
|
|
|
|
return s |
|
|
|
|
|
|
|
|
|
def witness_elements(self) -> Sequence[bytes]: |
|
|
|
|
@ -714,7 +713,7 @@ def parse_input(vds: BCDataStream) -> TxInput:
|
|
|
|
|
def parse_witness(vds: BCDataStream, txin: TxInput) -> None: |
|
|
|
|
n = vds.read_compact_size() |
|
|
|
|
witness_elements = list(vds.read_bytes(vds.read_compact_size()) for i in range(n)) |
|
|
|
|
txin.witness = bfh(construct_witness(witness_elements)) |
|
|
|
|
txin.witness = construct_witness(witness_elements) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def parse_output(vds: BCDataStream) -> TxOutput: |
|
|
|
|
@ -729,7 +728,7 @@ def parse_output(vds: BCDataStream) -> TxOutput:
|
|
|
|
|
|
|
|
|
|
# pay & redeem scripts |
|
|
|
|
|
|
|
|
|
def multisig_script(public_keys: Sequence[str], m: int) -> str: |
|
|
|
|
def multisig_script(public_keys: Sequence[str], m: int) -> bytes: |
|
|
|
|
n = len(public_keys) |
|
|
|
|
assert 1 <= m <= n <= 15, f'm {m}, n {n}' |
|
|
|
|
return construct_script([m, *public_keys, n, opcodes.OP_CHECKMULTISIG]) |
|
|
|
|
@ -833,18 +832,18 @@ class Transaction:
|
|
|
|
|
raise SerializationError('extra junk at the end') |
|
|
|
|
|
|
|
|
|
@classmethod |
|
|
|
|
def serialize_witness(cls, txin: TxInput, *, estimate_size=False) -> str: |
|
|
|
|
def serialize_witness(cls, txin: TxInput, *, estimate_size=False) -> bytes: |
|
|
|
|
if txin.witness is not None: |
|
|
|
|
return txin.witness.hex() |
|
|
|
|
return txin.witness |
|
|
|
|
if txin.is_coinbase_input(): |
|
|
|
|
return '' |
|
|
|
|
return b"" |
|
|
|
|
assert isinstance(txin, PartialTxInput) |
|
|
|
|
|
|
|
|
|
if not txin.is_segwit(): |
|
|
|
|
return construct_witness([]) |
|
|
|
|
|
|
|
|
|
if estimate_size and txin.witness_sizehint is not None: |
|
|
|
|
return '00' * txin.witness_sizehint |
|
|
|
|
return bytes(txin.witness_sizehint) |
|
|
|
|
|
|
|
|
|
dummy_desc = None |
|
|
|
|
if estimate_size: |
|
|
|
|
@ -852,22 +851,22 @@ class Transaction:
|
|
|
|
|
if desc := (txin.script_descriptor or dummy_desc): |
|
|
|
|
sol = desc.satisfy(allow_dummy=estimate_size, sigdata=txin.part_sigs) |
|
|
|
|
if sol.witness is not None: |
|
|
|
|
return sol.witness.hex() |
|
|
|
|
return sol.witness |
|
|
|
|
return construct_witness([]) |
|
|
|
|
raise UnknownTxinType("cannot construct witness") |
|
|
|
|
|
|
|
|
|
@classmethod |
|
|
|
|
def input_script(self, txin: TxInput, *, estimate_size=False) -> str: |
|
|
|
|
def input_script(self, txin: TxInput, *, estimate_size=False) -> bytes: |
|
|
|
|
if txin.script_sig is not None: |
|
|
|
|
return txin.script_sig.hex() |
|
|
|
|
return txin.script_sig |
|
|
|
|
if txin.is_coinbase_input(): |
|
|
|
|
return '' |
|
|
|
|
return b"" |
|
|
|
|
assert isinstance(txin, PartialTxInput) |
|
|
|
|
|
|
|
|
|
if txin.is_p2sh_segwit() and txin.redeem_script: |
|
|
|
|
return construct_script([txin.redeem_script]) |
|
|
|
|
if txin.is_native_segwit(): |
|
|
|
|
return '' |
|
|
|
|
return b"" |
|
|
|
|
|
|
|
|
|
dummy_desc = None |
|
|
|
|
if estimate_size: |
|
|
|
|
@ -876,37 +875,39 @@ class Transaction:
|
|
|
|
|
if desc.is_segwit(): |
|
|
|
|
if redeem_script := desc.expand().redeem_script: |
|
|
|
|
return construct_script([redeem_script]) |
|
|
|
|
return "" |
|
|
|
|
return b"" |
|
|
|
|
sol = desc.satisfy(allow_dummy=estimate_size, sigdata=txin.part_sigs) |
|
|
|
|
if sol.script_sig is not None: |
|
|
|
|
return sol.script_sig.hex() |
|
|
|
|
return "" |
|
|
|
|
return sol.script_sig |
|
|
|
|
return b"" |
|
|
|
|
raise UnknownTxinType("cannot construct scriptSig") |
|
|
|
|
|
|
|
|
|
@classmethod |
|
|
|
|
def get_preimage_script(cls, txin: 'PartialTxInput') -> str: |
|
|
|
|
def get_preimage_script(cls, txin: 'PartialTxInput') -> bytes: |
|
|
|
|
if txin.witness_script: |
|
|
|
|
if opcodes.OP_CODESEPARATOR in [x[0] for x in script_GetOp(txin.witness_script)]: |
|
|
|
|
raise Exception('OP_CODESEPARATOR black magic is not supported') |
|
|
|
|
return txin.witness_script.hex() |
|
|
|
|
return txin.witness_script |
|
|
|
|
if not txin.is_segwit() and txin.redeem_script: |
|
|
|
|
if opcodes.OP_CODESEPARATOR in [x[0] for x in script_GetOp(txin.redeem_script)]: |
|
|
|
|
raise Exception('OP_CODESEPARATOR black magic is not supported') |
|
|
|
|
return txin.redeem_script.hex() |
|
|
|
|
return txin.redeem_script |
|
|
|
|
|
|
|
|
|
if desc := txin.script_descriptor: |
|
|
|
|
sc = desc.expand() |
|
|
|
|
if script := sc.scriptcode_for_sighash: |
|
|
|
|
return script.hex() |
|
|
|
|
return script |
|
|
|
|
raise Exception(f"don't know scriptcode for descriptor: {desc.to_string()}") |
|
|
|
|
raise UnknownTxinType(f'cannot construct preimage_script') |
|
|
|
|
|
|
|
|
|
def _calc_bip143_shared_txdigest_fields(self) -> BIP143SharedTxDigestFields: |
|
|
|
|
inputs = self.inputs() |
|
|
|
|
outputs = self.outputs() |
|
|
|
|
hashPrevouts = sha256d(b''.join(txin.prevout.serialize_to_network() for txin in inputs)).hex() |
|
|
|
|
hashSequence = sha256d(bfh(''.join(int_to_hex(txin.nsequence, 4) for txin in inputs))).hex() |
|
|
|
|
hashOutputs = sha256d(bfh(''.join(o.serialize_to_network().hex() for o in outputs))).hex() |
|
|
|
|
hashPrevouts = sha256d(b''.join(txin.prevout.serialize_to_network() for txin in inputs)) |
|
|
|
|
hashSequence = sha256d(b''.join( |
|
|
|
|
int.to_bytes(txin.nsequence, length=4, byteorder="little", signed=False) |
|
|
|
|
for txin in inputs)) |
|
|
|
|
hashOutputs = sha256d(b''.join(o.serialize_to_network() for o in outputs)) |
|
|
|
|
return BIP143SharedTxDigestFields(hashPrevouts=hashPrevouts, |
|
|
|
|
hashSequence=hashSequence, |
|
|
|
|
hashOutputs=hashOutputs) |
|
|
|
|
@ -934,19 +935,20 @@ class Transaction:
|
|
|
|
|
note: (not include_sigs) implies force_legacy |
|
|
|
|
""" |
|
|
|
|
self.deserialize() |
|
|
|
|
nVersion = int_to_hex(self.version, 4) |
|
|
|
|
nLocktime = int_to_hex(self.locktime, 4) |
|
|
|
|
nVersion = int.to_bytes(self.version, length=4, byteorder="little", signed=True).hex() |
|
|
|
|
nLocktime = int.to_bytes(self.locktime, length=4, byteorder="little", signed=False).hex() |
|
|
|
|
inputs = self.inputs() |
|
|
|
|
outputs = self.outputs() |
|
|
|
|
|
|
|
|
|
def create_script_sig(txin: TxInput) -> bytes: |
|
|
|
|
if include_sigs: |
|
|
|
|
script_sig = self.input_script(txin, estimate_size=estimate_size) |
|
|
|
|
return bytes.fromhex(script_sig) |
|
|
|
|
return script_sig |
|
|
|
|
return b"" |
|
|
|
|
txins = var_int(len(inputs)) + ''.join(txin.serialize_to_network(script_sig=create_script_sig(txin)).hex() |
|
|
|
|
for txin in inputs) |
|
|
|
|
txouts = var_int(len(outputs)) + ''.join(o.serialize_to_network().hex() for o in outputs) |
|
|
|
|
txins = var_int(len(inputs)).hex() + ''.join( |
|
|
|
|
txin.serialize_to_network(script_sig=create_script_sig(txin)).hex() |
|
|
|
|
for txin in inputs) |
|
|
|
|
txouts = var_int(len(outputs)).hex() + ''.join(o.serialize_to_network().hex() for o in outputs) |
|
|
|
|
|
|
|
|
|
use_segwit_ser_for_estimate_size = estimate_size and self.is_segwit(guess_for_address=True) |
|
|
|
|
use_segwit_ser_for_actual_use = not estimate_size and self.is_segwit() |
|
|
|
|
@ -954,7 +956,7 @@ class Transaction:
|
|
|
|
|
if include_sigs and not force_legacy and use_segwit_ser: |
|
|
|
|
marker = '00' |
|
|
|
|
flag = '01' |
|
|
|
|
witness = ''.join(self.serialize_witness(x, estimate_size=estimate_size) for x in inputs) |
|
|
|
|
witness = ''.join(self.serialize_witness(x, estimate_size=estimate_size).hex() for x in inputs) |
|
|
|
|
return nVersion + marker + flag + txins + txouts + witness + nLocktime |
|
|
|
|
else: |
|
|
|
|
return nVersion + txins + txouts + nLocktime |
|
|
|
|
@ -1073,7 +1075,7 @@ class Transaction:
|
|
|
|
|
"""Whether the tx explicitly signals BIP-0125 replace-by-fee.""" |
|
|
|
|
return any([txin.nsequence < 0xffffffff - 1 for txin in self.inputs()]) |
|
|
|
|
|
|
|
|
|
def estimated_size(self): |
|
|
|
|
def estimated_size(self) -> int: |
|
|
|
|
"""Return an estimated virtual tx size in vbytes. |
|
|
|
|
BIP-0141 defines 'Virtual transaction size' to be weight/4 rounded up. |
|
|
|
|
This definition is only for humans, and has little meaning otherwise. |
|
|
|
|
@ -1084,13 +1086,13 @@ class Transaction:
|
|
|
|
|
return self.virtual_size_from_weight(weight) |
|
|
|
|
|
|
|
|
|
@classmethod |
|
|
|
|
def estimated_input_weight(cls, txin: TxInput, is_segwit_tx: bool): |
|
|
|
|
def estimated_input_weight(cls, txin: TxInput, is_segwit_tx: bool) -> int: |
|
|
|
|
'''Return an estimate of serialized input weight in weight units.''' |
|
|
|
|
script_sig = cls.input_script(txin, estimate_size=True) |
|
|
|
|
input_size = len(txin.serialize_to_network(script_sig=bytes.fromhex(script_sig))) |
|
|
|
|
input_size = len(txin.serialize_to_network(script_sig=script_sig)) |
|
|
|
|
|
|
|
|
|
if txin.is_segwit(guess_for_address=True): |
|
|
|
|
witness_size = len(cls.serialize_witness(txin, estimate_size=True)) // 2 |
|
|
|
|
witness_size = len(cls.serialize_witness(txin, estimate_size=True)) |
|
|
|
|
else: |
|
|
|
|
witness_size = 1 if is_segwit_tx else 0 |
|
|
|
|
|
|
|
|
|
@ -1103,15 +1105,15 @@ class Transaction:
|
|
|
|
|
return cls.estimated_output_size_for_script(script) |
|
|
|
|
|
|
|
|
|
@classmethod |
|
|
|
|
def estimated_output_size_for_script(cls, script: str) -> int: |
|
|
|
|
def estimated_output_size_for_script(cls, script: bytes) -> int: |
|
|
|
|
"""Return an estimate of serialized output size in bytes.""" |
|
|
|
|
# 8 byte value + varint script len + script |
|
|
|
|
script_len = len(script) // 2 |
|
|
|
|
var_int_len = len(var_int(script_len)) // 2 |
|
|
|
|
script_len = len(script) |
|
|
|
|
var_int_len = len(var_int(script_len)) |
|
|
|
|
return 8 + var_int_len + script_len |
|
|
|
|
|
|
|
|
|
@classmethod |
|
|
|
|
def virtual_size_from_weight(cls, weight): |
|
|
|
|
def virtual_size_from_weight(cls, weight: int) -> int: |
|
|
|
|
return weight // 4 + (weight % 4 > 0) |
|
|
|
|
|
|
|
|
|
@classmethod |
|
|
|
|
@ -1132,8 +1134,8 @@ class Transaction:
|
|
|
|
|
if not self.is_segwit(guess_for_address=estimate): |
|
|
|
|
return 0 |
|
|
|
|
inputs = self.inputs() |
|
|
|
|
witness = ''.join(self.serialize_witness(x, estimate_size=estimate) for x in inputs) |
|
|
|
|
witness_size = len(witness) // 2 + 2 # include marker and flag |
|
|
|
|
witness = b"".join(self.serialize_witness(x, estimate_size=estimate) for x in inputs) |
|
|
|
|
witness_size = len(witness) + 2 # include marker and flag |
|
|
|
|
return witness_size |
|
|
|
|
|
|
|
|
|
def estimated_base_size(self): |
|
|
|
|
@ -1149,17 +1151,16 @@ class Transaction:
|
|
|
|
|
def is_complete(self) -> bool: |
|
|
|
|
return True |
|
|
|
|
|
|
|
|
|
def get_output_idxs_from_scriptpubkey(self, script: str) -> Set[int]: |
|
|
|
|
def get_output_idxs_from_scriptpubkey(self, script: bytes) -> Set[int]: |
|
|
|
|
"""Returns the set indices of outputs with given script.""" |
|
|
|
|
assert isinstance(script, str) # hex |
|
|
|
|
assert isinstance(script, bytes) |
|
|
|
|
# build cache if there isn't one yet |
|
|
|
|
# note: can become stale and return incorrect data |
|
|
|
|
# if the tx is modified later; that's out of scope. |
|
|
|
|
if not hasattr(self, '_script_to_output_idx'): |
|
|
|
|
d = defaultdict(set) |
|
|
|
|
for output_idx, o in enumerate(self.outputs()): |
|
|
|
|
o_script = o.scriptpubkey.hex() |
|
|
|
|
assert isinstance(o_script, str) |
|
|
|
|
o_script = o.scriptpubkey |
|
|
|
|
d[o_script].add(output_idx) |
|
|
|
|
self._script_to_output_idx = d |
|
|
|
|
return set(self._script_to_output_idx[script]) # copy |
|
|
|
|
@ -1347,9 +1348,9 @@ class PSBTSection:
|
|
|
|
|
def create_psbt_writer(cls, fd): |
|
|
|
|
def wr(key_type: int, val: bytes, key: bytes = b''): |
|
|
|
|
full_key = cls.get_fullkey_from_keytype_and_key(key_type, key) |
|
|
|
|
fd.write(bytes.fromhex(var_int(len(full_key)))) # key_size |
|
|
|
|
fd.write(var_int(len(full_key))) # key_size |
|
|
|
|
fd.write(full_key) # key |
|
|
|
|
fd.write(bytes.fromhex(var_int(len(val)))) # val_size |
|
|
|
|
fd.write(var_int(len(val))) # val_size |
|
|
|
|
fd.write(val) # val |
|
|
|
|
return wr |
|
|
|
|
|
|
|
|
|
@ -1363,7 +1364,7 @@ class PSBTSection:
|
|
|
|
|
|
|
|
|
|
@classmethod |
|
|
|
|
def get_fullkey_from_keytype_and_key(cls, key_type: int, key: bytes) -> bytes: |
|
|
|
|
key_type_bytes = bytes.fromhex(var_int(key_type)) |
|
|
|
|
key_type_bytes = var_int(key_type) |
|
|
|
|
return key_type_bytes + key |
|
|
|
|
|
|
|
|
|
def _serialize_psbt_section(self, fd): |
|
|
|
|
@ -1492,11 +1493,11 @@ class PartialTxInput(TxInput, PSBTSection):
|
|
|
|
|
f"If a redeemScript is provided, the scriptPubKey must be for that redeemScript") |
|
|
|
|
if self.witness_script: |
|
|
|
|
if self.redeem_script: |
|
|
|
|
if self.redeem_script != bfh(bitcoin.p2wsh_nested_script(self.witness_script.hex())): |
|
|
|
|
if self.redeem_script != bitcoin.p2wsh_nested_script(self.witness_script): |
|
|
|
|
raise PSBTInputConsistencyFailure(f"PSBT input validation: " |
|
|
|
|
f"If a witnessScript is provided, the redeemScript must be for that witnessScript") |
|
|
|
|
elif self.address: |
|
|
|
|
if self.address != bitcoin.script_to_p2wsh(self.witness_script.hex()): |
|
|
|
|
if self.address != bitcoin.script_to_p2wsh(self.witness_script): |
|
|
|
|
raise PSBTInputConsistencyFailure(f"PSBT input validation: " |
|
|
|
|
f"If a witnessScript is provided, the scriptPubKey must be for that witnessScript") |
|
|
|
|
|
|
|
|
|
@ -1617,7 +1618,7 @@ class PartialTxInput(TxInput, PSBTSection):
|
|
|
|
|
if (spk := super().scriptpubkey) is not None: |
|
|
|
|
return spk |
|
|
|
|
if self._trusted_address is not None: |
|
|
|
|
return bfh(bitcoin.address_to_script(self._trusted_address)) |
|
|
|
|
return bitcoin.address_to_script(self._trusted_address) |
|
|
|
|
if self.witness_utxo: |
|
|
|
|
return self.witness_utxo.scriptpubkey |
|
|
|
|
return None |
|
|
|
|
@ -1657,8 +1658,8 @@ class PartialTxInput(TxInput, PSBTSection):
|
|
|
|
|
clear_fields_when_finalized() |
|
|
|
|
return # already finalized |
|
|
|
|
if self.is_complete(): |
|
|
|
|
self.script_sig = bfh(Transaction.input_script(self)) |
|
|
|
|
self.witness = bfh(Transaction.serialize_witness(self)) |
|
|
|
|
self.script_sig = Transaction.input_script(self) |
|
|
|
|
self.witness = Transaction.serialize_witness(self) |
|
|
|
|
clear_fields_when_finalized() |
|
|
|
|
|
|
|
|
|
def combine_with_other_txin(self, other_txin: 'TxInput') -> None: |
|
|
|
|
@ -2086,16 +2087,16 @@ class PartialTransaction(Transaction):
|
|
|
|
|
self.invalidate_ser_cache() |
|
|
|
|
|
|
|
|
|
def serialize_preimage(self, txin_index: int, *, |
|
|
|
|
bip143_shared_txdigest_fields: BIP143SharedTxDigestFields = None) -> str: |
|
|
|
|
nVersion = int_to_hex(self.version, 4) |
|
|
|
|
nLocktime = int_to_hex(self.locktime, 4) |
|
|
|
|
bip143_shared_txdigest_fields: BIP143SharedTxDigestFields = None) -> bytes: |
|
|
|
|
nVersion = int.to_bytes(self.version, length=4, byteorder="little", signed=True) |
|
|
|
|
nLocktime = int.to_bytes(self.locktime, length=4, byteorder="little", signed=False) |
|
|
|
|
inputs = self.inputs() |
|
|
|
|
outputs = self.outputs() |
|
|
|
|
txin = inputs[txin_index] |
|
|
|
|
sighash = txin.sighash if txin.sighash is not None else Sighash.ALL |
|
|
|
|
if not Sighash.is_valid(sighash): |
|
|
|
|
raise Exception(f"SIGHASH_FLAG ({sighash}) not supported!") |
|
|
|
|
nHashType = int_to_hex(sighash, 4) |
|
|
|
|
nHashType = int.to_bytes(sighash, length=4, byteorder="little", signed=False) |
|
|
|
|
preimage_script = self.get_preimage_script(txin) |
|
|
|
|
if txin.is_segwit(): |
|
|
|
|
if bip143_shared_txdigest_fields is None: |
|
|
|
|
@ -2103,58 +2104,64 @@ class PartialTransaction(Transaction):
|
|
|
|
|
if not (sighash & Sighash.ANYONECANPAY): |
|
|
|
|
hashPrevouts = bip143_shared_txdigest_fields.hashPrevouts |
|
|
|
|
else: |
|
|
|
|
hashPrevouts = '00' * 32 |
|
|
|
|
hashPrevouts = bytes(32) |
|
|
|
|
if not (sighash & Sighash.ANYONECANPAY) and (sighash & 0x1f) != Sighash.SINGLE and (sighash & 0x1f) != Sighash.NONE: |
|
|
|
|
hashSequence = bip143_shared_txdigest_fields.hashSequence |
|
|
|
|
else: |
|
|
|
|
hashSequence = '00' * 32 |
|
|
|
|
hashSequence = bytes(32) |
|
|
|
|
if (sighash & 0x1f) != Sighash.SINGLE and (sighash & 0x1f) != Sighash.NONE: |
|
|
|
|
hashOutputs = bip143_shared_txdigest_fields.hashOutputs |
|
|
|
|
elif (sighash & 0x1f) == Sighash.SINGLE and txin_index < len(outputs): |
|
|
|
|
hashOutputs = sha256d(outputs[txin_index].serialize_to_network()).hex() |
|
|
|
|
hashOutputs = sha256d(outputs[txin_index].serialize_to_network()) |
|
|
|
|
else: |
|
|
|
|
hashOutputs = '00' * 32 |
|
|
|
|
outpoint = txin.prevout.serialize_to_network().hex() |
|
|
|
|
scriptCode = var_int(len(preimage_script) // 2) + preimage_script |
|
|
|
|
amount = int_to_hex(txin.value_sats(), 8) |
|
|
|
|
nSequence = int_to_hex(txin.nsequence, 4) |
|
|
|
|
hashOutputs = bytes(32) |
|
|
|
|
outpoint = txin.prevout.serialize_to_network() |
|
|
|
|
scriptCode = var_int(len(preimage_script)) + preimage_script |
|
|
|
|
amount = int.to_bytes(txin.value_sats(), length=8, byteorder="little", signed=False) |
|
|
|
|
nSequence = int.to_bytes(txin.nsequence, length=4, byteorder="little", signed=False) |
|
|
|
|
preimage = nVersion + hashPrevouts + hashSequence + outpoint + scriptCode + amount + nSequence + hashOutputs + nLocktime + nHashType |
|
|
|
|
else: |
|
|
|
|
if sighash != Sighash.ALL: |
|
|
|
|
raise Exception(f"SIGHASH_FLAG ({sighash}) not supported! (for legacy sighash)") |
|
|
|
|
txins = var_int(len(inputs)) + ''.join(txin.serialize_to_network(script_sig=bfh(preimage_script) if txin_index==k else b"").hex() |
|
|
|
|
for k, txin in enumerate(inputs)) |
|
|
|
|
txouts = var_int(len(outputs)) + ''.join(o.serialize_to_network().hex() for o in outputs) |
|
|
|
|
txins = var_int(len(inputs)) + b"".join( |
|
|
|
|
txin.serialize_to_network(script_sig=preimage_script if txin_index==k else b"") |
|
|
|
|
for k, txin in enumerate(inputs)) |
|
|
|
|
txouts = var_int(len(outputs)) + b"".join(o.serialize_to_network() for o in outputs) |
|
|
|
|
preimage = nVersion + txins + txouts + nLocktime + nHashType |
|
|
|
|
return preimage |
|
|
|
|
|
|
|
|
|
def sign(self, keypairs) -> None: |
|
|
|
|
# keypairs: pubkey_hex -> (secret_bytes, is_compressed) |
|
|
|
|
def sign(self, keypairs: Mapping[bytes, bytes]) -> None: |
|
|
|
|
# keypairs: pubkey_bytes -> secret_bytes |
|
|
|
|
bip143_shared_txdigest_fields = self._calc_bip143_shared_txdigest_fields() |
|
|
|
|
for i, txin in enumerate(self.inputs()): |
|
|
|
|
pubkeys = [pk.hex() for pk in txin.pubkeys] |
|
|
|
|
for pubkey in pubkeys: |
|
|
|
|
for pubkey in txin.pubkeys: |
|
|
|
|
if txin.is_complete(): |
|
|
|
|
break |
|
|
|
|
if pubkey not in keypairs: |
|
|
|
|
continue |
|
|
|
|
_logger.info(f"adding signature for {pubkey}. spending utxo {txin.prevout.to_str()}") |
|
|
|
|
sec, compressed = keypairs[pubkey] |
|
|
|
|
sec = keypairs[pubkey] |
|
|
|
|
sig = self.sign_txin(i, sec, bip143_shared_txdigest_fields=bip143_shared_txdigest_fields) |
|
|
|
|
self.add_signature_to_txin(txin_idx=i, signing_pubkey=pubkey, sig=sig) |
|
|
|
|
|
|
|
|
|
_logger.debug(f"is_complete {self.is_complete()}") |
|
|
|
|
_logger.debug(f"tx.sign() finished. is_complete={self.is_complete()}") |
|
|
|
|
self.invalidate_ser_cache() |
|
|
|
|
|
|
|
|
|
def sign_txin(self, txin_index, privkey_bytes, *, bip143_shared_txdigest_fields=None) -> str: |
|
|
|
|
def sign_txin( |
|
|
|
|
self, |
|
|
|
|
txin_index: int, |
|
|
|
|
privkey_bytes: bytes, |
|
|
|
|
*, |
|
|
|
|
bip143_shared_txdigest_fields=None, |
|
|
|
|
) -> bytes: |
|
|
|
|
txin = self.inputs()[txin_index] |
|
|
|
|
txin.validate_data(for_signing=True) |
|
|
|
|
sighash = txin.sighash if txin.sighash is not None else Sighash.ALL |
|
|
|
|
pre_hash = sha256d(bfh(self.serialize_preimage(txin_index, |
|
|
|
|
bip143_shared_txdigest_fields=bip143_shared_txdigest_fields))) |
|
|
|
|
pre_hash = self.serialize_preimage(txin_index, bip143_shared_txdigest_fields=bip143_shared_txdigest_fields) |
|
|
|
|
msg_hash = sha256d(pre_hash) |
|
|
|
|
privkey = ecc.ECPrivkey(privkey_bytes) |
|
|
|
|
sig = privkey.ecdsa_sign(pre_hash, sigencode=ecc.ecdsa_der_sig_from_r_and_s) |
|
|
|
|
sig = sig.hex() + Sighash.to_sigbytes(sighash).hex() |
|
|
|
|
sig = privkey.ecdsa_sign(msg_hash, sigencode=ecc.ecdsa_der_sig_from_r_and_s) |
|
|
|
|
sig = sig + Sighash.to_sigbytes(sighash) |
|
|
|
|
return sig |
|
|
|
|
|
|
|
|
|
def is_complete(self) -> bool: |
|
|
|
|
@ -2189,7 +2196,7 @@ class PartialTransaction(Transaction):
|
|
|
|
|
raw_bytes = self.serialize_as_bytes() |
|
|
|
|
return base64.b64encode(raw_bytes).decode('ascii') |
|
|
|
|
|
|
|
|
|
def update_signatures(self, signatures: Sequence[Union[str, None]]): |
|
|
|
|
def update_signatures(self, signatures: Sequence[Union[bytes, None]]) -> None: |
|
|
|
|
"""Add new signatures to a transaction |
|
|
|
|
|
|
|
|
|
`signatures` is expected to be a list of sigs with signatures[i] |
|
|
|
|
@ -2201,33 +2208,32 @@ class PartialTransaction(Transaction):
|
|
|
|
|
if len(self.inputs()) != len(signatures): |
|
|
|
|
raise Exception('expected {} signatures; got {}'.format(len(self.inputs()), len(signatures))) |
|
|
|
|
for i, txin in enumerate(self.inputs()): |
|
|
|
|
pubkeys = [pk.hex() for pk in txin.pubkeys] |
|
|
|
|
sig = signatures[i] |
|
|
|
|
if sig is None: |
|
|
|
|
continue |
|
|
|
|
if bfh(sig) in list(txin.part_sigs.values()): |
|
|
|
|
if sig in list(txin.part_sigs.values()): |
|
|
|
|
continue |
|
|
|
|
pre_hash = sha256d(bfh(self.serialize_preimage(i))) |
|
|
|
|
sig_string = ecc.ecdsa_sig64_from_der_sig(bfh(sig[:-2])) |
|
|
|
|
msg_hash = sha256d(self.serialize_preimage(i)) |
|
|
|
|
sig64 = ecc.ecdsa_sig64_from_der_sig(sig[:-1]) |
|
|
|
|
for recid in range(4): |
|
|
|
|
try: |
|
|
|
|
public_key = ecc.ECPubkey.from_ecdsa_sig64(sig_string, recid, pre_hash) |
|
|
|
|
public_key = ecc.ECPubkey.from_ecdsa_sig64(sig64, recid, msg_hash) |
|
|
|
|
except ecc.InvalidECPointException: |
|
|
|
|
# the point might not be on the curve for some recid values |
|
|
|
|
continue |
|
|
|
|
pubkey_hex = public_key.get_public_key_hex(compressed=True) |
|
|
|
|
if pubkey_hex in pubkeys: |
|
|
|
|
if not public_key.ecdsa_verify(sig_string, pre_hash): |
|
|
|
|
pubkey_bytes = public_key.get_public_key_bytes(compressed=True) |
|
|
|
|
if pubkey_bytes in txin.pubkeys: |
|
|
|
|
if not public_key.ecdsa_verify(sig64, msg_hash): |
|
|
|
|
continue |
|
|
|
|
_logger.info(f"adding sig: txin_idx={i}, signing_pubkey={pubkey_hex}, sig={sig}") |
|
|
|
|
self.add_signature_to_txin(txin_idx=i, signing_pubkey=pubkey_hex, sig=sig) |
|
|
|
|
_logger.info(f"adding sig: txin_idx={i}, signing_pubkey={pubkey_bytes.hex()}, sig={sig.hex()}") |
|
|
|
|
self.add_signature_to_txin(txin_idx=i, signing_pubkey=pubkey_bytes, sig=sig) |
|
|
|
|
break |
|
|
|
|
# redo raw |
|
|
|
|
self.invalidate_ser_cache() |
|
|
|
|
|
|
|
|
|
def add_signature_to_txin(self, *, txin_idx: int, signing_pubkey: str, sig: str): |
|
|
|
|
def add_signature_to_txin(self, *, txin_idx: int, signing_pubkey: bytes, sig: bytes) -> None: |
|
|
|
|
txin = self._inputs[txin_idx] |
|
|
|
|
txin.part_sigs[bfh(signing_pubkey)] = bfh(sig) |
|
|
|
|
txin.part_sigs[signing_pubkey] = sig |
|
|
|
|
# force re-serialization |
|
|
|
|
txin.script_sig = None |
|
|
|
|
txin.witness = None |
|
|
|
|
|