From 7ebbacf1f450877b27db89d9a405bdd86e370086 Mon Sep 17 00:00:00 2001 From: Kristaps Kaupe Date: Wed, 8 Nov 2023 11:58:16 +0200 Subject: [PATCH] Add type hints Co-authored-by: roshii --- src/jmbitcoin/secp256k1_main.py | 45 ++++++++------- src/jmbitcoin/secp256k1_transaction.py | 77 +++++++++++++++++--------- 2 files changed, 76 insertions(+), 46 deletions(-) diff --git a/src/jmbitcoin/secp256k1_main.py b/src/jmbitcoin/secp256k1_main.py index b94f584..652e35d 100644 --- a/src/jmbitcoin/secp256k1_main.py +++ b/src/jmbitcoin/secp256k1_main.py @@ -1,5 +1,6 @@ import base64 import struct +from typing import List, Tuple, Union from jmbase import bintohex from bitcointx import base58 @@ -26,7 +27,7 @@ BTC_P2SH_VBYTE = {"mainnet": b'\x05', "testnet": b'\xc4', "signet": b'\xc4'} """PoDLE related primitives """ -def getG(compressed=True): +def getG(compressed: bool = True) -> CPubKey: """Returns the public key binary representation of secp256k1 G; note that CPubKey is of type bytes. @@ -39,17 +40,17 @@ def getG(compressed=True): podle_PublicKey_class = CPubKey podle_PrivateKey_class = CKey -def podle_PublicKey(P): +def podle_PublicKey(P: bytes) -> CPubKey: """Returns a PublicKey object from a binary string """ return CPubKey(P) -def podle_PrivateKey(priv): +def podle_PrivateKey(priv: bytes) -> CKey: """Returns a PrivateKey object from a binary string """ return CKey(priv) -def read_privkey(priv): +def read_privkey(priv: bytes) -> Tuple[bool, bytes]: if len(priv) == 33: if priv[-1:] == b'\x01': compressed = True @@ -61,7 +62,7 @@ def read_privkey(priv): raise Exception("Invalid private key") return (compressed, priv[:32]) -def privkey_to_pubkey(priv): +def privkey_to_pubkey(priv: bytes) -> CPubKey: '''Take 32/33 byte raw private key as input. If 32 bytes, return as uncompressed raw public key. If 33 bytes and the final byte is 01, return @@ -75,7 +76,8 @@ def privkey_to_pubkey(priv): # b58check wrapper functions around bitcointx.base58 functions: # (avoids complexity of key management structure) -def bin_to_b58check(inp, magicbyte=b'\x00'): +def bin_to_b58check(inp: bytes, + magicbyte: Union[bytes, int] = b'\x00') -> str: """ The magic byte (prefix byte) should be passed either as a single byte or an integer. What is returned is a string in base58 encoding, with the prefix and the checksum. @@ -91,25 +93,25 @@ def bin_to_b58check(inp, magicbyte=b'\x00'): checksum = Hash(inp_fmtd)[:4] return base58.encode(inp_fmtd + checksum) -def b58check_to_bin(s): +def b58check_to_bin(s: str) -> bytes: data = base58.decode(s) assert Hash(data[:-4])[:4] == data[-4:] return struct.pack(b"B", data[0]), data[1:-4] -def get_version_byte(s): +def get_version_byte(s: str) -> bytes: return b58check_to_bin(s)[0] -def ecdsa_sign(msg, priv): +def ecdsa_sign(msg: str, priv: bytes) -> str: hashed_msg = BitcoinMessage(msg).GetHash() sig = ecdsa_raw_sign(hashed_msg, priv, rawmsg=True) return base64.b64encode(sig).decode('ascii') -def ecdsa_verify(msg, sig, pub): +def ecdsa_verify(msg: str, sig: str, pub: bytes) -> bool: hashed_msg = BitcoinMessage(msg).GetHash() sig = base64.b64decode(sig) return ecdsa_raw_verify(hashed_msg, pub, sig, rawmsg=True) -def is_valid_pubkey(pubkey, require_compressed=False): +def is_valid_pubkey(pubkey: bytes, require_compressed: bool = False) -> bool: """ Returns True if the serialized pubkey is a valid secp256k1 pubkey serialization or False if not; returns False for an uncompressed encoding if require_compressed is True. @@ -135,7 +137,7 @@ def is_valid_pubkey(pubkey, require_compressed=False): return True -def multiply(s, pub, return_serialized=True): +def multiply(s: bytes, pub: bytes, return_serialized: bool = True) -> bytes: '''Input binary compressed pubkey P(33 bytes) and scalar s(32 bytes), return s*P. The return value is a binary compressed public key, @@ -166,7 +168,7 @@ def multiply(s, pub, return_serialized=True): return CPubKey._from_ctypes_char_array(pubkey_buf) return bytes(CPubKey._from_ctypes_char_array(pubkey_buf)) -def add_pubkeys(pubkeys): +def add_pubkeys(pubkeys: List[bytes]) -> CPubKey: '''Input a list of binary compressed pubkeys and return their sum as a binary compressed pubkey.''' pubkey_list = [CPubKey(x) for x in pubkeys] @@ -176,7 +178,7 @@ def add_pubkeys(pubkeys): raise ValueError("Invalid pubkey format.") return CPubKey.combine(*pubkey_list) -def add_privkeys(priv1, priv2): +def add_privkeys(priv1: bytes, priv2: bytes) -> bytes: '''Add privkey 1 to privkey 2. Input keys must be in binary either compressed or not. Returned key will have the same compression state. @@ -192,7 +194,7 @@ def add_privkeys(priv1, priv2): res += b'\x01' return res -def ecdh(privkey, pubkey): +def ecdh(privkey: bytes, pubkey: bytes) -> bytes: """ Take a privkey in raw byte serialization, and a pubkey serialized in compressed, binary format (33 bytes), and output the shared secret as a 32 byte hash digest output. @@ -205,9 +207,9 @@ def ecdh(privkey, pubkey): _, priv = read_privkey(privkey) return CKey(priv).ECDH(CPubKey(pubkey)) -def ecdsa_raw_sign(msg, - priv, - rawmsg=False): +def ecdsa_raw_sign(msg: Union[bytes, bytearray], + priv: bytes, + rawmsg: bool = False) -> bytes: '''Take the binary message msg and sign it with the private key priv. If rawmsg is True, no sha256 hash is applied to msg before signing. @@ -225,7 +227,10 @@ def ecdsa_raw_sign(msg, sig = newpriv.sign(Hash(msg), _ecdsa_sig_grind_low_r=False) return sig -def ecdsa_raw_verify(msg, pub, sig, rawmsg=False): +def ecdsa_raw_verify(msg: bytes, + pub: bytes, + sig: bytes, + rawmsg: bool = False) -> bool: '''Take the binary message msg and binary signature sig, and verify it against the pubkey pub. If rawmsg is True, no sha256 hash is applied to msg before verifying. @@ -265,7 +270,7 @@ class JMCKey(bytes, CKeyBase): def __init__(self, b): CKeyBase.__init__(self, b, compressed=True) - def sign(self, hash): + def sign(self, hash: Union[bytes, bytearray]) -> bytes: assert isinstance(hash, (bytes, bytearray)) if len(hash) != 32: raise ValueError('Hash must be exactly 32 bytes long') diff --git a/src/jmbitcoin/secp256k1_transaction.py b/src/jmbitcoin/secp256k1_transaction.py index 4be9fe6..ff91e74 100644 --- a/src/jmbitcoin/secp256k1_transaction.py +++ b/src/jmbitcoin/secp256k1_transaction.py @@ -1,17 +1,16 @@ -# note, only used for non-cryptographic randomness: -import random -import json -from typing import List, Union, Tuple # needed for single sha256 evaluation, which is used # in bitcoin (p2wsh) but not exposed in python-bitcointx: import hashlib +import json +# note, only used for non-cryptographic randomness: +import random from math import ceil +from typing import List, Optional, Tuple, Union -from jmbitcoin.secp256k1_main import * -from jmbase import bintohex, utxo_to_utxostr from bitcointx.core import (CMutableTransaction, CTxInWitness, CMutableOutPoint, CMutableTxIn, CTransaction, - CMutableTxOut, CTxIn, CTxOut, ValidationError) + CMutableTxOut, CTxIn, CTxOut, ValidationError, + CBitcoinTransaction) from bitcointx.core.script import * from bitcointx.wallet import (P2WPKHCoinAddress, CCoinAddress, P2PKHCoinAddress, CCoinAddressError) @@ -20,7 +19,11 @@ from bitcointx.core.scripteval import (VerifyScript, SCRIPT_VERIFY_WITNESS, SCRIPT_VERIFY_STRICTENC, SIGVERSION_WITNESS_V0) -def human_readable_transaction(tx, jsonified=True): +from jmbase import bintohex, utxo_to_utxostr +from jmbitcoin.secp256k1_main import * + + +def human_readable_transaction(tx: CTransaction, jsonified: bool = True) -> str: """ Given a CTransaction object, output a human readable json-formatted string (suitable for terminal output or large GUI textbox display) containing @@ -49,7 +52,8 @@ def human_readable_transaction(tx, jsonified=True): return outdict return json.dumps(outdict, indent=4) -def human_readable_input(txinput, txinput_witness): +def human_readable_input(txinput: CTxIn, + txinput_witness: Optional[CTxInWitness]) -> dict: """ Pass objects of type CTxIn and CTxInWitness (or None) and a dict of human-readable entries for this input is returned. @@ -68,7 +72,7 @@ def human_readable_input(txinput, txinput_witness): txinput_witness.scriptWitness.serialize()) return outdict -def human_readable_output(txoutput): +def human_readable_output(txoutput: CTxOut) -> dict: """ Returns a dict of human-readable entries for this output. """ @@ -175,7 +179,7 @@ def estimate_tx_size(ins: List[str], outs: List[str]) -> Union[int, Tuple[int]]: return nwsize return (wsize, nwsize) -def tx_vsize(tx): +def tx_vsize(tx: CTransaction) -> int: """ Computes the virtual size (in vbytes) of a transaction """ @@ -184,7 +188,8 @@ def tx_vsize(tx): non_witness_size = raw_tx_size - witness_size return ceil(non_witness_size + .25 * witness_size) -def pubkey_to_p2pkh_script(pub, require_compressed=False): +def pubkey_to_p2pkh_script(pub: bytes, + require_compressed: bool = False) -> CScript: """ Given a pubkey in bytes, return a CScript representing the corresponding pay-to-pubkey-hash @@ -192,7 +197,7 @@ def pubkey_to_p2pkh_script(pub, require_compressed=False): """ return P2PKHCoinAddress.from_pubkey(pub).to_scriptPubKey() -def pubkey_to_p2wpkh_script(pub): +def pubkey_to_p2wpkh_script(pub: bytes) -> CScript: """ Given a pubkey in bytes (compressed), return a CScript representing the corresponding pay-to-witness-pubkey-hash @@ -200,7 +205,7 @@ def pubkey_to_p2wpkh_script(pub): """ return P2WPKHCoinAddress.from_pubkey(pub).to_scriptPubKey() -def pubkey_to_p2sh_p2wpkh_script(pub): +def pubkey_to_p2sh_p2wpkh_script(pub: bytes) -> CScript: """ Given a pubkey in bytes, return a CScript representing the corresponding nested pay to witness keyhash @@ -210,7 +215,7 @@ def pubkey_to_p2sh_p2wpkh_script(pub): raise Exception("Invalid pubkey") return pubkey_to_p2wpkh_script(pub).to_p2sh_scriptPubKey() -def redeem_script_to_p2wsh_script(redeem_script): +def redeem_script_to_p2wsh_script(redeem_script: Union[bytes, CScript]) -> CScript: """ Given redeem script of type CScript (or bytes) returns the corresponding segwit v0 scriptPubKey as for the case pay-to-witness-scripthash. @@ -218,7 +223,7 @@ def redeem_script_to_p2wsh_script(redeem_script): return standard_witness_v0_scriptpubkey( hashlib.sha256(redeem_script).digest()) -def mk_freeze_script(pub, locktime): +def mk_freeze_script(pub: bytes, locktime: int) -> CScript: """ Given a pubkey and locktime, create a script which can only be spent after the locktime has passed using OP_CHECKLOCKTIMEVERIFY @@ -232,7 +237,7 @@ def mk_freeze_script(pub, locktime): return CScript([locktime, OP_CHECKLOCKTIMEVERIFY, OP_DROP, pub, OP_CHECKSIG]) -def mk_burn_script(data): +def mk_burn_script(data: bytes) -> CScript: """ For a given bytestring (data), returns a scriptPubKey which is an OP_RETURN of that data. @@ -241,7 +246,12 @@ def mk_burn_script(data): raise TypeError("data must be in bytes") return CScript([OP_RETURN, data]) -def sign(tx, i, priv, hashcode=SIGHASH_ALL, amount=None, native=False): +def sign(tx: CMutableTransaction, + i: int, + priv: bytes, + hashcode: SIGHASH_Type = SIGHASH_ALL, + amount: Optional[int] = None, + native: bool = False) -> Tuple[Optional[bytes], str]: """ Given a transaction tx of type CMutableTransaction, an input index i, and a raw privkey in bytes, updates the CMutableTransaction to contain @@ -325,7 +335,10 @@ def sign(tx, i, priv, hashcode=SIGHASH_ALL, amount=None, native=False): return sig, "signing succeeded" -def mktx(ins, outs, version=1, locktime=0): +def mktx(ins: List[Tuple[bytes, int]], + outs: List[dict], + version: int = 1, + locktime: int = 0) -> CMutableTransaction: """ Given a list of input tuples (txid(bytes), n(int)), and a list of outputs which are dicts with keys "address" (value should be *str* not CCoinAddress) ( @@ -361,7 +374,10 @@ def mktx(ins, outs, version=1, locktime=0): vout.append(out) return CMutableTransaction(vin, vout, nLockTime=locktime, nVersion=version) -def make_shuffled_tx(ins, outs, version=1, locktime=0): +def make_shuffled_tx(ins: List[Tuple[bytes, int]], + outs: List[dict], + version: int = 1, + locktime: int = 0) -> CMutableTransaction: """ Simple wrapper to ensure transaction inputs and outputs are randomly ordered. NB: This mutates ordering of `ins` and `outs`. @@ -370,7 +386,12 @@ def make_shuffled_tx(ins, outs, version=1, locktime=0): random.shuffle(outs) return mktx(ins, outs, version=version, locktime=locktime) -def verify_tx_input(tx, i, scriptSig, scriptPubKey, amount=None, witness=None): +def verify_tx_input(tx: CTransaction, + i: int, + scriptSig: CScript, + scriptPubKey: CScript, + amount: Optional[int] = None, + witness: Optional[CScriptWitness] = None) -> bool: flags = set([SCRIPT_VERIFY_STRICTENC]) if witness: # https://github.com/Simplexum/python-bitcointx/blob/648ad8f45ff853bf9923c6498bfa0648b3d7bcbd/bitcointx/core/scripteval.py#L1250-L1252 @@ -383,7 +404,8 @@ def verify_tx_input(tx, i, scriptSig, scriptPubKey, amount=None, witness=None): return False return True -def extract_witness(tx, i): +def extract_witness(tx: CTransaction, + i: int) -> Tuple[Optional[Tuple[CTxInWitness, ...]], str]: """Given `tx` of type CTransaction, extract, as a list of objects of type CScript, which constitute the witness at the index i, followed by "success". @@ -401,7 +423,8 @@ def extract_witness(tx, i): witness = tx.wit.vtxinwit[i] return (witness, "success") -def extract_pubkey_from_witness(tx, i): +def extract_pubkey_from_witness(tx: CTransaction, + i: int) -> Tuple[Optional[CScriptWitness], str]: """ Extract the pubkey used to sign at index i, in CTransaction tx, assuming it is of type p2wpkh (including wrapped segwit version). @@ -418,7 +441,7 @@ def extract_pubkey_from_witness(tx, i): return None, "invalid pubkey in witness" return sWitness[1], "success" -def get_equal_outs(tx): +def get_equal_outs(tx: CTransaction) -> Optional[List[CTxOut]]: """ If 2 or more transaction outputs have the same bitcoin value, return then as a list of CTxOuts. If there is not exactly one equal output size, return False. @@ -429,14 +452,16 @@ def get_equal_outs(tx): if len(eos) > 0: eos = set(eos) if len(eos) > 1: - return False + return None for i, vout in enumerate(tx.vout): if vout.nValue == list(eos)[0]: retval.append((i, vout)) assert len(retval) > 1 return retval -def is_jm_tx(tx, min_cj_amount=75000, min_participants=3): +def is_jm_tx(tx: CBitcoinTransaction, + min_cj_amount: int = 75000, + min_participants: int = 3) -> Union[Tuple[bool, None], Tuple[int, int]]: """ Identify Joinmarket-patterned transactions. TODO: this should be in another module. Given a CBitcoinTransaction tx, check: