You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
436 lines
14 KiB
436 lines
14 KiB
#!/usr/bin/python |
|
import binascii, re, json, copy, sys |
|
from jmbitcoin.secp256k1_main import * |
|
from _functools import reduce |
|
import os |
|
|
|
is_python2 = sys.version_info.major == 2 |
|
|
|
### Hex to bin converter and vice versa for objects |
|
def json_is_base(obj, base): |
|
if not is_python2 and isinstance(obj, bytes): |
|
return False |
|
|
|
alpha = get_code_string(base) |
|
if isinstance(obj, string_types): |
|
for i in range(len(obj)): |
|
if alpha.find(obj[i]) == -1: |
|
return False |
|
return True |
|
elif isinstance(obj, int_types) or obj is None: |
|
return True |
|
elif isinstance(obj, list): |
|
for i in range(len(obj)): |
|
if not json_is_base(obj[i], base): |
|
return False |
|
return True |
|
else: |
|
for x in obj: |
|
if not json_is_base(obj[x], base): |
|
return False |
|
return True |
|
|
|
|
|
def json_changebase(obj, changer): |
|
if isinstance(obj, string_or_bytes_types): |
|
return changer(obj) |
|
elif isinstance(obj, int_types) or obj is None: |
|
return obj |
|
elif isinstance(obj, list): |
|
return [json_changebase(x, changer) for x in obj] |
|
return dict((x, json_changebase(obj[x], changer)) for x in obj) |
|
|
|
# Transaction serialization and deserialization |
|
|
|
|
|
def deserialize(tx): |
|
if isinstance(tx, str) and re.match('^[0-9a-fA-F]*$', tx): |
|
#tx = bytes(bytearray.fromhex(tx)) |
|
return json_changebase( |
|
deserialize(binascii.unhexlify(tx)), lambda x: safe_hexlify(x)) |
|
# http://stackoverflow.com/questions/4851463/python-closure-write-to-variable-in-parent-scope |
|
# Python's scoping rules are demented, requiring me to make pos an object |
|
# so that it is call-by-reference |
|
pos = [0] |
|
|
|
def read_as_int(bytez): |
|
pos[0] += bytez |
|
return decode(tx[pos[0] - bytez:pos[0]][::-1], 256) |
|
|
|
def read_var_int(): |
|
pos[0] += 1 |
|
|
|
val = from_byte_to_int(tx[pos[0] - 1]) |
|
if val < 253: |
|
return val |
|
return read_as_int(pow(2, val - 252)) |
|
|
|
def read_bytes(bytez): |
|
pos[0] += bytez |
|
return tx[pos[0] - bytez:pos[0]] |
|
|
|
def read_var_string(): |
|
size = read_var_int() |
|
return read_bytes(size) |
|
|
|
obj = {"ins": [], "outs": []} |
|
obj["version"] = read_as_int(4) |
|
ins = read_var_int() |
|
for i in range(ins): |
|
obj["ins"].append({ |
|
"outpoint": { |
|
"hash": read_bytes(32)[::-1], |
|
"index": read_as_int(4) |
|
}, |
|
"script": read_var_string(), |
|
"sequence": read_as_int(4) |
|
}) |
|
outs = read_var_int() |
|
for i in range(outs): |
|
obj["outs"].append({ |
|
"value": read_as_int(8), |
|
"script": read_var_string() |
|
}) |
|
obj["locktime"] = read_as_int(4) |
|
return obj |
|
|
|
|
|
def serialize(txobj): |
|
#if isinstance(txobj, bytes): |
|
# txobj = bytes_to_hex_string(txobj) |
|
o = [] |
|
if json_is_base(txobj, 16): |
|
json_changedbase = json_changebase(txobj, |
|
lambda x: binascii.unhexlify(x)) |
|
hexlified = safe_hexlify(serialize(json_changedbase)) |
|
return hexlified |
|
o.append(encode(txobj["version"], 256, 4)[::-1]) |
|
o.append(num_to_var_int(len(txobj["ins"]))) |
|
for inp in txobj["ins"]: |
|
o.append(inp["outpoint"]["hash"][::-1]) |
|
o.append(encode(inp["outpoint"]["index"], 256, 4)[::-1]) |
|
o.append(num_to_var_int(len(inp["script"])) + (inp["script"] if inp[ |
|
"script"] or is_python2 else bytes())) |
|
o.append(encode(inp["sequence"], 256, 4)[::-1]) |
|
o.append(num_to_var_int(len(txobj["outs"]))) |
|
for out in txobj["outs"]: |
|
o.append(encode(out["value"], 256, 8)[::-1]) |
|
o.append(num_to_var_int(len(out["script"])) + out["script"]) |
|
o.append(encode(txobj["locktime"], 256, 4)[::-1]) |
|
|
|
return ''.join(o) if is_python2 else reduce(lambda x, y: x + y, o, bytes()) |
|
|
|
# Hashing transactions for signing |
|
|
|
SIGHASH_ALL = 1 |
|
SIGHASH_NONE = 2 |
|
SIGHASH_SINGLE = 3 |
|
SIGHASH_ANYONECANPAY = 0x80 |
|
|
|
def signature_form(tx, i, script, hashcode=SIGHASH_ALL): |
|
i, hashcode = int(i), int(hashcode) |
|
if isinstance(tx, string_or_bytes_types): |
|
return serialize(signature_form(deserialize(tx), i, script, hashcode)) |
|
newtx = copy.deepcopy(tx) |
|
for inp in newtx["ins"]: |
|
inp["script"] = "" |
|
newtx["ins"][i]["script"] = script |
|
if hashcode & 0x1f == SIGHASH_NONE: |
|
newtx["outs"] = [] |
|
for j, inp in enumerate(newtx["ins"]): |
|
if j != i: |
|
inp["sequence"] = 0 |
|
elif hashcode & 0x1f == SIGHASH_SINGLE: |
|
if len(newtx["ins"]) > len(newtx["outs"]): |
|
raise Exception( |
|
"Transactions with sighash single should have len in <= len out") |
|
newtx["outs"] = newtx["outs"][:i+1] |
|
for out in newtx["outs"][:i]: |
|
out['value'] = 2**64 - 1 |
|
out['script'] = "" |
|
for j, inp in enumerate(newtx["ins"]): |
|
if j != i: |
|
inp["sequence"] = 0 |
|
if hashcode & SIGHASH_ANYONECANPAY: |
|
newtx["ins"] = [newtx["ins"][i]] |
|
else: |
|
pass |
|
return newtx |
|
|
|
def txhash(tx, hashcode=None): |
|
if isinstance(tx, str) and re.match('^[0-9a-fA-F]*$', tx): |
|
tx = changebase(tx, 16, 256) |
|
if hashcode: |
|
return dbl_sha256(from_string_to_bytes(tx) + encode( |
|
int(hashcode), 256, 4)[::-1]) |
|
else: |
|
return safe_hexlify(bin_dbl_sha256(tx)[::-1]) |
|
|
|
|
|
def bin_txhash(tx, hashcode=None): |
|
return binascii.unhexlify(txhash(tx, hashcode)) |
|
|
|
|
|
def ecdsa_tx_sign(tx, priv, hashcode=SIGHASH_ALL, usenonce=None): |
|
sig = ecdsa_raw_sign( |
|
txhash(tx, hashcode), |
|
priv, |
|
True, |
|
rawmsg=True, |
|
usenonce=usenonce) |
|
return sig + encode(hashcode, 16, 2) |
|
|
|
|
|
def ecdsa_tx_verify(tx, sig, pub, hashcode=SIGHASH_ALL): |
|
return ecdsa_raw_verify( |
|
txhash(tx, hashcode), |
|
pub, |
|
sig[:-2], |
|
True, |
|
rawmsg=True) |
|
|
|
# Scripts |
|
|
|
|
|
def mk_pubkey_script(addr): |
|
# Keep the auxiliary functions around for altcoins' sake |
|
return '76a914' + b58check_to_hex(addr) + '88ac' |
|
|
|
|
|
def mk_scripthash_script(addr): |
|
return 'a914' + b58check_to_hex(addr) + '87' |
|
|
|
# Address representation to output script |
|
|
|
|
|
def address_to_script(addr): |
|
if addr[0] == '3' or addr[0] == '2': |
|
return mk_scripthash_script(addr) |
|
else: |
|
return mk_pubkey_script(addr) |
|
|
|
# Output script to address representation |
|
|
|
|
|
def script_to_address(script, vbyte=0): |
|
if re.match('^[0-9a-fA-F]*$', script): |
|
script = binascii.unhexlify(script) |
|
if script[:3] == b'\x76\xa9\x14' and script[-2:] == b'\x88\xac' and len( |
|
script) == 25: |
|
return bin_to_b58check(script[3:-2], vbyte) # pubkey hash addresses |
|
else: |
|
if vbyte in [111, 196]: |
|
# Testnet |
|
scripthash_byte = 196 |
|
else: |
|
scripthash_byte = 5 |
|
# BIP0016 scripthash addresses |
|
return bin_to_b58check(script[2:-1], scripthash_byte) |
|
|
|
|
|
def p2sh_scriptaddr(script, magicbyte=5): |
|
if re.match('^[0-9a-fA-F]*$', script): |
|
script = binascii.unhexlify(script) |
|
return hex_to_b58check(hash160(script), magicbyte) |
|
|
|
|
|
scriptaddr = p2sh_scriptaddr |
|
|
|
|
|
def deserialize_script(script): |
|
if isinstance(script, str) and re.match('^[0-9a-fA-F]*$', script): |
|
return json_changebase( |
|
deserialize_script(binascii.unhexlify(script)), |
|
lambda x: safe_hexlify(x)) |
|
out, pos = [], 0 |
|
while pos < len(script): |
|
code = from_byte_to_int(script[pos]) |
|
if code == 0: |
|
out.append(None) |
|
pos += 1 |
|
elif code <= 75: |
|
out.append(script[pos + 1:pos + 1 + code]) |
|
pos += 1 + code |
|
elif code <= 78: |
|
szsz = pow(2, code - 76) |
|
sz = decode(script[pos + szsz:pos:-1], 256) |
|
out.append(script[pos + 1 + szsz:pos + 1 + szsz + sz]) |
|
pos += 1 + szsz + sz |
|
elif code <= 96: |
|
out.append(code - 80) |
|
pos += 1 |
|
else: |
|
out.append(code) |
|
pos += 1 |
|
return out |
|
|
|
|
|
def serialize_script_unit(unit): |
|
if isinstance(unit, int): |
|
if unit < 16: |
|
return from_int_to_byte(unit + 80) |
|
else: |
|
return bytes([unit]) |
|
elif unit is None: |
|
return b'\x00' |
|
else: |
|
if len(unit) <= 75: |
|
return from_int_to_byte(len(unit)) + unit |
|
elif len(unit) < 256: |
|
return from_int_to_byte(76) + from_int_to_byte(len(unit)) + unit |
|
elif len(unit) < 65536: |
|
return from_int_to_byte(77) + encode(len(unit), 256, 2)[::-1] + unit |
|
else: |
|
return from_int_to_byte(78) + encode(len(unit), 256, 4)[::-1] + unit |
|
|
|
|
|
if is_python2: |
|
|
|
def serialize_script(script): |
|
if json_is_base(script, 16): |
|
return binascii.hexlify(serialize_script(json_changebase( |
|
script, lambda x: binascii.unhexlify(x)))) |
|
return ''.join(map(serialize_script_unit, script)) |
|
else: |
|
|
|
def serialize_script(script): |
|
if json_is_base(script, 16): |
|
return safe_hexlify(serialize_script(json_changebase( |
|
script, lambda x: binascii.unhexlify(x)))) |
|
|
|
result = bytes() |
|
for b in map(serialize_script_unit, script): |
|
result += b if isinstance(b, bytes) else bytes(b, 'utf-8') |
|
return result |
|
|
|
|
|
def mk_multisig_script(*args): # [pubs],k or pub1,pub2...pub[n],k |
|
if isinstance(args[0], list): |
|
pubs, k = args[0], int(args[1]) |
|
else: |
|
pubs = list(filter(lambda x: len(str(x)) >= 32, args)) |
|
k = int(args[len(pubs)]) |
|
return serialize_script([k] + pubs + [len(pubs)]) + 'ae' |
|
|
|
# Signing and verifying |
|
|
|
|
|
def verify_tx_input(tx, i, script, sig, pub): |
|
if re.match('^[0-9a-fA-F]*$', tx): |
|
tx = binascii.unhexlify(tx) |
|
if re.match('^[0-9a-fA-F]*$', script): |
|
script = binascii.unhexlify(script) |
|
if not re.match('^[0-9a-fA-F]*$', sig): |
|
sig = safe_hexlify(sig) |
|
if not re.match('^[0-9a-fA-F]*$', pub): |
|
pub = safe_hexlify(pub) |
|
hashcode = decode(sig[-2:], 16) |
|
modtx = signature_form(tx, int(i), script, hashcode) |
|
return ecdsa_tx_verify(modtx, sig, pub, hashcode) |
|
|
|
|
|
def sign(tx, i, priv, hashcode=SIGHASH_ALL, usenonce=None): |
|
i = int(i) |
|
if (not is_python2 and isinstance(re, bytes)) or not re.match( |
|
'^[0-9a-fA-F]*$', tx): |
|
return binascii.unhexlify(sign(safe_hexlify(tx), i, priv)) |
|
if len(priv) <= 33: |
|
priv = safe_hexlify(priv) |
|
pub = privkey_to_pubkey(priv, True) |
|
address = pubkey_to_address(pub) |
|
signing_tx = signature_form(tx, i, mk_pubkey_script(address), hashcode) |
|
sig = ecdsa_tx_sign(signing_tx, priv, hashcode, usenonce=usenonce) |
|
txobj = deserialize(tx) |
|
txobj["ins"][i]["script"] = serialize_script([sig, pub]) |
|
return serialize(txobj) |
|
|
|
|
|
def signall(tx, priv): |
|
# if priv is a dictionary, assume format is |
|
# { 'txinhash:txinidx' : privkey } |
|
if isinstance(priv, dict): |
|
for e, i in enumerate(deserialize(tx)["ins"]): |
|
k = priv["%s:%d" % (i["outpoint"]["hash"], i["outpoint"]["index"])] |
|
tx = sign(tx, e, k) |
|
else: |
|
for i in range(len(deserialize(tx)["ins"])): |
|
tx = sign(tx, i, priv) |
|
return tx |
|
|
|
|
|
def multisign(tx, i, script, pk, hashcode=SIGHASH_ALL): |
|
if re.match('^[0-9a-fA-F]*$', tx): |
|
tx = binascii.unhexlify(tx) |
|
if re.match('^[0-9a-fA-F]*$', script): |
|
script = binascii.unhexlify(script) |
|
modtx = signature_form(tx, i, script, hashcode) |
|
return ecdsa_tx_sign(modtx, pk, hashcode) |
|
|
|
|
|
def apply_multisignatures(*args): |
|
# tx,i,script,sigs OR tx,i,script,sig1,sig2...,sig[n] |
|
tx, i, script = args[0], int(args[1]), args[2] |
|
sigs = args[3] if isinstance(args[3], list) else list(args[3:]) |
|
|
|
if isinstance(script, str) and re.match('^[0-9a-fA-F]*$', script): |
|
script = binascii.unhexlify(script) |
|
sigs = [binascii.unhexlify(x) if x[:2] == '30' else x for x in sigs] |
|
if isinstance(tx, str) and re.match('^[0-9a-fA-F]*$', tx): |
|
return safe_hexlify(apply_multisignatures( |
|
binascii.unhexlify(tx), i, script, sigs)) |
|
|
|
txobj = deserialize(tx) |
|
txobj["ins"][i]["script"] = serialize_script([None] + sigs + [script]) |
|
return serialize(txobj) |
|
|
|
|
|
def is_inp(arg): |
|
return len(arg) > 64 or "output" in arg or "outpoint" in arg |
|
|
|
|
|
def mktx(*args): |
|
# [in0, in1...],[out0, out1...] or in0, in1 ... out0 out1 ... |
|
ins, outs = [], [] |
|
for arg in args: |
|
if isinstance(arg, list): |
|
for a in arg: |
|
(ins if is_inp(a) else outs).append(a) |
|
else: |
|
(ins if is_inp(arg) else outs).append(arg) |
|
|
|
txobj = {"locktime": 0, "version": 1, "ins": [], "outs": []} |
|
for i in ins: |
|
if isinstance(i, dict) and "outpoint" in i: |
|
txobj["ins"].append(i) |
|
else: |
|
if isinstance(i, dict) and "output" in i: |
|
i = i["output"] |
|
txobj["ins"].append({ |
|
"outpoint": {"hash": i[:64], |
|
"index": int(i[65:])}, |
|
"script": "", |
|
"sequence": 4294967295 |
|
}) |
|
for o in outs: |
|
if isinstance(o, string_or_bytes_types): |
|
addr = o[:o.find(':')] |
|
val = int(o[o.find(':') + 1:]) |
|
o = {} |
|
if re.match('^[0-9a-fA-F]*$', addr): |
|
o["script"] = addr |
|
else: |
|
o["address"] = addr |
|
o["value"] = val |
|
|
|
outobj = {} |
|
if "address" in o: |
|
outobj["script"] = address_to_script(o["address"]) |
|
elif "script" in o: |
|
outobj["script"] = o["script"] |
|
else: |
|
raise Exception("Could not find 'address' or 'script' in output.") |
|
outobj["value"] = o["value"] |
|
txobj["outs"].append(outobj) |
|
|
|
return serialize(txobj) |
|
|
|
|