Browse Source

Add type hints

Co-authored-by: roshii <roshii@riseup.net>
master
Kristaps Kaupe 2 years ago
parent
commit
7ebbacf1f4
No known key found for this signature in database
GPG Key ID: 33E472FE870C7E5D
  1. 45
      src/jmbitcoin/secp256k1_main.py
  2. 77
      src/jmbitcoin/secp256k1_transaction.py

45
src/jmbitcoin/secp256k1_main.py

@ -1,5 +1,6 @@
import base64 import base64
import struct import struct
from typing import List, Tuple, Union
from jmbase import bintohex from jmbase import bintohex
from bitcointx import base58 from bitcointx import base58
@ -26,7 +27,7 @@ BTC_P2SH_VBYTE = {"mainnet": b'\x05', "testnet": b'\xc4', "signet": b'\xc4'}
"""PoDLE related primitives """PoDLE related primitives
""" """
def getG(compressed=True): def getG(compressed: bool = True) -> CPubKey:
"""Returns the public key binary """Returns the public key binary
representation of secp256k1 G; representation of secp256k1 G;
note that CPubKey is of type bytes. note that CPubKey is of type bytes.
@ -39,17 +40,17 @@ def getG(compressed=True):
podle_PublicKey_class = CPubKey podle_PublicKey_class = CPubKey
podle_PrivateKey_class = CKey podle_PrivateKey_class = CKey
def podle_PublicKey(P): def podle_PublicKey(P: bytes) -> CPubKey:
"""Returns a PublicKey object from a binary string """Returns a PublicKey object from a binary string
""" """
return CPubKey(P) return CPubKey(P)
def podle_PrivateKey(priv): def podle_PrivateKey(priv: bytes) -> CKey:
"""Returns a PrivateKey object from a binary string """Returns a PrivateKey object from a binary string
""" """
return CKey(priv) return CKey(priv)
def read_privkey(priv): def read_privkey(priv: bytes) -> Tuple[bool, bytes]:
if len(priv) == 33: if len(priv) == 33:
if priv[-1:] == b'\x01': if priv[-1:] == b'\x01':
compressed = True compressed = True
@ -61,7 +62,7 @@ def read_privkey(priv):
raise Exception("Invalid private key") raise Exception("Invalid private key")
return (compressed, priv[:32]) return (compressed, priv[:32])
def privkey_to_pubkey(priv): def privkey_to_pubkey(priv: bytes) -> CPubKey:
'''Take 32/33 byte raw private key as input. '''Take 32/33 byte raw private key as input.
If 32 bytes, return as uncompressed raw public key. If 32 bytes, return as uncompressed raw public key.
If 33 bytes and the final byte is 01, return If 33 bytes and the final byte is 01, return
@ -75,7 +76,8 @@ def privkey_to_pubkey(priv):
# b58check wrapper functions around bitcointx.base58 functions: # b58check wrapper functions around bitcointx.base58 functions:
# (avoids complexity of key management structure) # (avoids complexity of key management structure)
def bin_to_b58check(inp, magicbyte=b'\x00'): def bin_to_b58check(inp: bytes,
magicbyte: Union[bytes, int] = b'\x00') -> str:
""" The magic byte (prefix byte) should be passed either """ The magic byte (prefix byte) should be passed either
as a single byte or an integer. What is returned is a string as a single byte or an integer. What is returned is a string
in base58 encoding, with the prefix and the checksum. in base58 encoding, with the prefix and the checksum.
@ -91,25 +93,25 @@ def bin_to_b58check(inp, magicbyte=b'\x00'):
checksum = Hash(inp_fmtd)[:4] checksum = Hash(inp_fmtd)[:4]
return base58.encode(inp_fmtd + checksum) return base58.encode(inp_fmtd + checksum)
def b58check_to_bin(s): def b58check_to_bin(s: str) -> bytes:
data = base58.decode(s) data = base58.decode(s)
assert Hash(data[:-4])[:4] == data[-4:] assert Hash(data[:-4])[:4] == data[-4:]
return struct.pack(b"B", data[0]), data[1:-4] return struct.pack(b"B", data[0]), data[1:-4]
def get_version_byte(s): def get_version_byte(s: str) -> bytes:
return b58check_to_bin(s)[0] return b58check_to_bin(s)[0]
def ecdsa_sign(msg, priv): def ecdsa_sign(msg: str, priv: bytes) -> str:
hashed_msg = BitcoinMessage(msg).GetHash() hashed_msg = BitcoinMessage(msg).GetHash()
sig = ecdsa_raw_sign(hashed_msg, priv, rawmsg=True) sig = ecdsa_raw_sign(hashed_msg, priv, rawmsg=True)
return base64.b64encode(sig).decode('ascii') return base64.b64encode(sig).decode('ascii')
def ecdsa_verify(msg, sig, pub): def ecdsa_verify(msg: str, sig: str, pub: bytes) -> bool:
hashed_msg = BitcoinMessage(msg).GetHash() hashed_msg = BitcoinMessage(msg).GetHash()
sig = base64.b64decode(sig) sig = base64.b64decode(sig)
return ecdsa_raw_verify(hashed_msg, pub, sig, rawmsg=True) return ecdsa_raw_verify(hashed_msg, pub, sig, rawmsg=True)
def is_valid_pubkey(pubkey, require_compressed=False): def is_valid_pubkey(pubkey: bytes, require_compressed: bool = False) -> bool:
""" Returns True if the serialized pubkey is a valid secp256k1 """ Returns True if the serialized pubkey is a valid secp256k1
pubkey serialization or False if not; returns False for an pubkey serialization or False if not; returns False for an
uncompressed encoding if require_compressed is True. uncompressed encoding if require_compressed is True.
@ -135,7 +137,7 @@ def is_valid_pubkey(pubkey, require_compressed=False):
return True return True
def multiply(s, pub, return_serialized=True): def multiply(s: bytes, pub: bytes, return_serialized: bool = True) -> bytes:
'''Input binary compressed pubkey P(33 bytes) '''Input binary compressed pubkey P(33 bytes)
and scalar s(32 bytes), return s*P. and scalar s(32 bytes), return s*P.
The return value is a binary compressed public key, The return value is a binary compressed public key,
@ -166,7 +168,7 @@ def multiply(s, pub, return_serialized=True):
return CPubKey._from_ctypes_char_array(pubkey_buf) return CPubKey._from_ctypes_char_array(pubkey_buf)
return bytes(CPubKey._from_ctypes_char_array(pubkey_buf)) return bytes(CPubKey._from_ctypes_char_array(pubkey_buf))
def add_pubkeys(pubkeys): def add_pubkeys(pubkeys: List[bytes]) -> CPubKey:
'''Input a list of binary compressed pubkeys '''Input a list of binary compressed pubkeys
and return their sum as a binary compressed pubkey.''' and return their sum as a binary compressed pubkey.'''
pubkey_list = [CPubKey(x) for x in pubkeys] pubkey_list = [CPubKey(x) for x in pubkeys]
@ -176,7 +178,7 @@ def add_pubkeys(pubkeys):
raise ValueError("Invalid pubkey format.") raise ValueError("Invalid pubkey format.")
return CPubKey.combine(*pubkey_list) return CPubKey.combine(*pubkey_list)
def add_privkeys(priv1, priv2): def add_privkeys(priv1: bytes, priv2: bytes) -> bytes:
'''Add privkey 1 to privkey 2. '''Add privkey 1 to privkey 2.
Input keys must be in binary either compressed or not. Input keys must be in binary either compressed or not.
Returned key will have the same compression state. Returned key will have the same compression state.
@ -192,7 +194,7 @@ def add_privkeys(priv1, priv2):
res += b'\x01' res += b'\x01'
return res return res
def ecdh(privkey, pubkey): def ecdh(privkey: bytes, pubkey: bytes) -> bytes:
""" Take a privkey in raw byte serialization, """ Take a privkey in raw byte serialization,
and a pubkey serialized in compressed, binary format (33 bytes), and a pubkey serialized in compressed, binary format (33 bytes),
and output the shared secret as a 32 byte hash digest output. and output the shared secret as a 32 byte hash digest output.
@ -205,9 +207,9 @@ def ecdh(privkey, pubkey):
_, priv = read_privkey(privkey) _, priv = read_privkey(privkey)
return CKey(priv).ECDH(CPubKey(pubkey)) return CKey(priv).ECDH(CPubKey(pubkey))
def ecdsa_raw_sign(msg, def ecdsa_raw_sign(msg: Union[bytes, bytearray],
priv, priv: bytes,
rawmsg=False): rawmsg: bool = False) -> bytes:
'''Take the binary message msg and sign it with the private key '''Take the binary message msg and sign it with the private key
priv. priv.
If rawmsg is True, no sha256 hash is applied to msg before signing. If rawmsg is True, no sha256 hash is applied to msg before signing.
@ -225,7 +227,10 @@ def ecdsa_raw_sign(msg,
sig = newpriv.sign(Hash(msg), _ecdsa_sig_grind_low_r=False) sig = newpriv.sign(Hash(msg), _ecdsa_sig_grind_low_r=False)
return sig return sig
def ecdsa_raw_verify(msg, pub, sig, rawmsg=False): def ecdsa_raw_verify(msg: bytes,
pub: bytes,
sig: bytes,
rawmsg: bool = False) -> bool:
'''Take the binary message msg and binary signature sig, '''Take the binary message msg and binary signature sig,
and verify it against the pubkey pub. and verify it against the pubkey pub.
If rawmsg is True, no sha256 hash is applied to msg before verifying. If rawmsg is True, no sha256 hash is applied to msg before verifying.
@ -265,7 +270,7 @@ class JMCKey(bytes, CKeyBase):
def __init__(self, b): def __init__(self, b):
CKeyBase.__init__(self, b, compressed=True) CKeyBase.__init__(self, b, compressed=True)
def sign(self, hash): def sign(self, hash: Union[bytes, bytearray]) -> bytes:
assert isinstance(hash, (bytes, bytearray)) assert isinstance(hash, (bytes, bytearray))
if len(hash) != 32: if len(hash) != 32:
raise ValueError('Hash must be exactly 32 bytes long') raise ValueError('Hash must be exactly 32 bytes long')

77
src/jmbitcoin/secp256k1_transaction.py

@ -1,17 +1,16 @@
# note, only used for non-cryptographic randomness:
import random
import json
from typing import List, Union, Tuple
# needed for single sha256 evaluation, which is used # needed for single sha256 evaluation, which is used
# in bitcoin (p2wsh) but not exposed in python-bitcointx: # in bitcoin (p2wsh) but not exposed in python-bitcointx:
import hashlib import hashlib
import json
# note, only used for non-cryptographic randomness:
import random
from math import ceil from math import ceil
from typing import List, Optional, Tuple, Union
from jmbitcoin.secp256k1_main import *
from jmbase import bintohex, utxo_to_utxostr
from bitcointx.core import (CMutableTransaction, CTxInWitness, from bitcointx.core import (CMutableTransaction, CTxInWitness,
CMutableOutPoint, CMutableTxIn, CTransaction, CMutableOutPoint, CMutableTxIn, CTransaction,
CMutableTxOut, CTxIn, CTxOut, ValidationError) CMutableTxOut, CTxIn, CTxOut, ValidationError,
CBitcoinTransaction)
from bitcointx.core.script import * from bitcointx.core.script import *
from bitcointx.wallet import (P2WPKHCoinAddress, CCoinAddress, P2PKHCoinAddress, from bitcointx.wallet import (P2WPKHCoinAddress, CCoinAddress, P2PKHCoinAddress,
CCoinAddressError) CCoinAddressError)
@ -20,7 +19,11 @@ from bitcointx.core.scripteval import (VerifyScript, SCRIPT_VERIFY_WITNESS,
SCRIPT_VERIFY_STRICTENC, SCRIPT_VERIFY_STRICTENC,
SIGVERSION_WITNESS_V0) SIGVERSION_WITNESS_V0)
def human_readable_transaction(tx, jsonified=True): from jmbase import bintohex, utxo_to_utxostr
from jmbitcoin.secp256k1_main import *
def human_readable_transaction(tx: CTransaction, jsonified: bool = True) -> str:
""" Given a CTransaction object, output a human """ Given a CTransaction object, output a human
readable json-formatted string (suitable for terminal readable json-formatted string (suitable for terminal
output or large GUI textbox display) containing output or large GUI textbox display) containing
@ -49,7 +52,8 @@ def human_readable_transaction(tx, jsonified=True):
return outdict return outdict
return json.dumps(outdict, indent=4) return json.dumps(outdict, indent=4)
def human_readable_input(txinput, txinput_witness): def human_readable_input(txinput: CTxIn,
txinput_witness: Optional[CTxInWitness]) -> dict:
""" Pass objects of type CTxIn and CTxInWitness (or None) """ Pass objects of type CTxIn and CTxInWitness (or None)
and a dict of human-readable entries for this input and a dict of human-readable entries for this input
is returned. is returned.
@ -68,7 +72,7 @@ def human_readable_input(txinput, txinput_witness):
txinput_witness.scriptWitness.serialize()) txinput_witness.scriptWitness.serialize())
return outdict return outdict
def human_readable_output(txoutput): def human_readable_output(txoutput: CTxOut) -> dict:
""" Returns a dict of human-readable entries """ Returns a dict of human-readable entries
for this output. for this output.
""" """
@ -175,7 +179,7 @@ def estimate_tx_size(ins: List[str], outs: List[str]) -> Union[int, Tuple[int]]:
return nwsize return nwsize
return (wsize, nwsize) return (wsize, nwsize)
def tx_vsize(tx): def tx_vsize(tx: CTransaction) -> int:
""" """
Computes the virtual size (in vbytes) of a transaction Computes the virtual size (in vbytes) of a transaction
""" """
@ -184,7 +188,8 @@ def tx_vsize(tx):
non_witness_size = raw_tx_size - witness_size non_witness_size = raw_tx_size - witness_size
return ceil(non_witness_size + .25 * witness_size) return ceil(non_witness_size + .25 * witness_size)
def pubkey_to_p2pkh_script(pub, require_compressed=False): def pubkey_to_p2pkh_script(pub: bytes,
require_compressed: bool = False) -> CScript:
""" """
Given a pubkey in bytes, return a CScript Given a pubkey in bytes, return a CScript
representing the corresponding pay-to-pubkey-hash representing the corresponding pay-to-pubkey-hash
@ -192,7 +197,7 @@ def pubkey_to_p2pkh_script(pub, require_compressed=False):
""" """
return P2PKHCoinAddress.from_pubkey(pub).to_scriptPubKey() return P2PKHCoinAddress.from_pubkey(pub).to_scriptPubKey()
def pubkey_to_p2wpkh_script(pub): def pubkey_to_p2wpkh_script(pub: bytes) -> CScript:
""" """
Given a pubkey in bytes (compressed), return a CScript Given a pubkey in bytes (compressed), return a CScript
representing the corresponding pay-to-witness-pubkey-hash representing the corresponding pay-to-witness-pubkey-hash
@ -200,7 +205,7 @@ def pubkey_to_p2wpkh_script(pub):
""" """
return P2WPKHCoinAddress.from_pubkey(pub).to_scriptPubKey() return P2WPKHCoinAddress.from_pubkey(pub).to_scriptPubKey()
def pubkey_to_p2sh_p2wpkh_script(pub): def pubkey_to_p2sh_p2wpkh_script(pub: bytes) -> CScript:
""" """
Given a pubkey in bytes, return a CScript representing Given a pubkey in bytes, return a CScript representing
the corresponding nested pay to witness keyhash the corresponding nested pay to witness keyhash
@ -210,7 +215,7 @@ def pubkey_to_p2sh_p2wpkh_script(pub):
raise Exception("Invalid pubkey") raise Exception("Invalid pubkey")
return pubkey_to_p2wpkh_script(pub).to_p2sh_scriptPubKey() return pubkey_to_p2wpkh_script(pub).to_p2sh_scriptPubKey()
def redeem_script_to_p2wsh_script(redeem_script): def redeem_script_to_p2wsh_script(redeem_script: Union[bytes, CScript]) -> CScript:
""" Given redeem script of type CScript (or bytes) """ Given redeem script of type CScript (or bytes)
returns the corresponding segwit v0 scriptPubKey as returns the corresponding segwit v0 scriptPubKey as
for the case pay-to-witness-scripthash. for the case pay-to-witness-scripthash.
@ -218,7 +223,7 @@ def redeem_script_to_p2wsh_script(redeem_script):
return standard_witness_v0_scriptpubkey( return standard_witness_v0_scriptpubkey(
hashlib.sha256(redeem_script).digest()) hashlib.sha256(redeem_script).digest())
def mk_freeze_script(pub, locktime): def mk_freeze_script(pub: bytes, locktime: int) -> CScript:
""" """
Given a pubkey and locktime, create a script which can only be spent Given a pubkey and locktime, create a script which can only be spent
after the locktime has passed using OP_CHECKLOCKTIMEVERIFY after the locktime has passed using OP_CHECKLOCKTIMEVERIFY
@ -232,7 +237,7 @@ def mk_freeze_script(pub, locktime):
return CScript([locktime, OP_CHECKLOCKTIMEVERIFY, OP_DROP, pub, return CScript([locktime, OP_CHECKLOCKTIMEVERIFY, OP_DROP, pub,
OP_CHECKSIG]) OP_CHECKSIG])
def mk_burn_script(data): def mk_burn_script(data: bytes) -> CScript:
""" For a given bytestring (data), """ For a given bytestring (data),
returns a scriptPubKey which is an OP_RETURN returns a scriptPubKey which is an OP_RETURN
of that data. of that data.
@ -241,7 +246,12 @@ def mk_burn_script(data):
raise TypeError("data must be in bytes") raise TypeError("data must be in bytes")
return CScript([OP_RETURN, data]) return CScript([OP_RETURN, data])
def sign(tx, i, priv, hashcode=SIGHASH_ALL, amount=None, native=False): def sign(tx: CMutableTransaction,
i: int,
priv: bytes,
hashcode: SIGHASH_Type = SIGHASH_ALL,
amount: Optional[int] = None,
native: bool = False) -> Tuple[Optional[bytes], str]:
""" """
Given a transaction tx of type CMutableTransaction, an input index i, Given a transaction tx of type CMutableTransaction, an input index i,
and a raw privkey in bytes, updates the CMutableTransaction to contain and a raw privkey in bytes, updates the CMutableTransaction to contain
@ -325,7 +335,10 @@ def sign(tx, i, priv, hashcode=SIGHASH_ALL, amount=None, native=False):
return sig, "signing succeeded" return sig, "signing succeeded"
def mktx(ins, outs, version=1, locktime=0): def mktx(ins: List[Tuple[bytes, int]],
outs: List[dict],
version: int = 1,
locktime: int = 0) -> CMutableTransaction:
""" Given a list of input tuples (txid(bytes), n(int)), """ Given a list of input tuples (txid(bytes), n(int)),
and a list of outputs which are dicts with and a list of outputs which are dicts with
keys "address" (value should be *str* not CCoinAddress) ( keys "address" (value should be *str* not CCoinAddress) (
@ -361,7 +374,10 @@ def mktx(ins, outs, version=1, locktime=0):
vout.append(out) vout.append(out)
return CMutableTransaction(vin, vout, nLockTime=locktime, nVersion=version) return CMutableTransaction(vin, vout, nLockTime=locktime, nVersion=version)
def make_shuffled_tx(ins, outs, version=1, locktime=0): def make_shuffled_tx(ins: List[Tuple[bytes, int]],
outs: List[dict],
version: int = 1,
locktime: int = 0) -> CMutableTransaction:
""" Simple wrapper to ensure transaction """ Simple wrapper to ensure transaction
inputs and outputs are randomly ordered. inputs and outputs are randomly ordered.
NB: This mutates ordering of `ins` and `outs`. NB: This mutates ordering of `ins` and `outs`.
@ -370,7 +386,12 @@ def make_shuffled_tx(ins, outs, version=1, locktime=0):
random.shuffle(outs) random.shuffle(outs)
return mktx(ins, outs, version=version, locktime=locktime) return mktx(ins, outs, version=version, locktime=locktime)
def verify_tx_input(tx, i, scriptSig, scriptPubKey, amount=None, witness=None): def verify_tx_input(tx: CTransaction,
i: int,
scriptSig: CScript,
scriptPubKey: CScript,
amount: Optional[int] = None,
witness: Optional[CScriptWitness] = None) -> bool:
flags = set([SCRIPT_VERIFY_STRICTENC]) flags = set([SCRIPT_VERIFY_STRICTENC])
if witness: if witness:
# https://github.com/Simplexum/python-bitcointx/blob/648ad8f45ff853bf9923c6498bfa0648b3d7bcbd/bitcointx/core/scripteval.py#L1250-L1252 # https://github.com/Simplexum/python-bitcointx/blob/648ad8f45ff853bf9923c6498bfa0648b3d7bcbd/bitcointx/core/scripteval.py#L1250-L1252
@ -383,7 +404,8 @@ def verify_tx_input(tx, i, scriptSig, scriptPubKey, amount=None, witness=None):
return False return False
return True return True
def extract_witness(tx, i): def extract_witness(tx: CTransaction,
i: int) -> Tuple[Optional[Tuple[CTxInWitness, ...]], str]:
"""Given `tx` of type CTransaction, extract, """Given `tx` of type CTransaction, extract,
as a list of objects of type CScript, which constitute the as a list of objects of type CScript, which constitute the
witness at the index i, followed by "success". witness at the index i, followed by "success".
@ -401,7 +423,8 @@ def extract_witness(tx, i):
witness = tx.wit.vtxinwit[i] witness = tx.wit.vtxinwit[i]
return (witness, "success") return (witness, "success")
def extract_pubkey_from_witness(tx, i): def extract_pubkey_from_witness(tx: CTransaction,
i: int) -> Tuple[Optional[CScriptWitness], str]:
""" Extract the pubkey used to sign at index i, """ Extract the pubkey used to sign at index i,
in CTransaction tx, assuming it is of type p2wpkh in CTransaction tx, assuming it is of type p2wpkh
(including wrapped segwit version). (including wrapped segwit version).
@ -418,7 +441,7 @@ def extract_pubkey_from_witness(tx, i):
return None, "invalid pubkey in witness" return None, "invalid pubkey in witness"
return sWitness[1], "success" return sWitness[1], "success"
def get_equal_outs(tx): def get_equal_outs(tx: CTransaction) -> Optional[List[CTxOut]]:
""" If 2 or more transaction outputs have the same """ If 2 or more transaction outputs have the same
bitcoin value, return then as a list of CTxOuts. bitcoin value, return then as a list of CTxOuts.
If there is not exactly one equal output size, return False. If there is not exactly one equal output size, return False.
@ -429,14 +452,16 @@ def get_equal_outs(tx):
if len(eos) > 0: if len(eos) > 0:
eos = set(eos) eos = set(eos)
if len(eos) > 1: if len(eos) > 1:
return False return None
for i, vout in enumerate(tx.vout): for i, vout in enumerate(tx.vout):
if vout.nValue == list(eos)[0]: if vout.nValue == list(eos)[0]:
retval.append((i, vout)) retval.append((i, vout))
assert len(retval) > 1 assert len(retval) > 1
return retval return retval
def is_jm_tx(tx, min_cj_amount=75000, min_participants=3): def is_jm_tx(tx: CBitcoinTransaction,
min_cj_amount: int = 75000,
min_participants: int = 3) -> Union[Tuple[bool, None], Tuple[int, int]]:
""" Identify Joinmarket-patterned transactions. """ Identify Joinmarket-patterned transactions.
TODO: this should be in another module. TODO: this should be in another module.
Given a CBitcoinTransaction tx, check: Given a CBitcoinTransaction tx, check:

Loading…
Cancel
Save