Browse Source

Convert jmbitcoin to py3 style

master
James Hilliard 7 years ago
parent
commit
3537fb48fb
  1. 3
      jmbitcoin/jmbitcoin/__init__.py
  2. 3
      jmbitcoin/jmbitcoin/bci.py
  3. 4
      jmbitcoin/jmbitcoin/bech32.py
  4. 3
      jmbitcoin/jmbitcoin/btscript.py
  5. 19
      jmbitcoin/jmbitcoin/secp256k1_deterministic.py
  6. 341
      jmbitcoin/jmbitcoin/secp256k1_main.py
  7. 398
      jmbitcoin/jmbitcoin/secp256k1_transaction.py
  8. 11
      jmbitcoin/test/test_addresses.py
  9. 17
      jmbitcoin/test/test_bech32.py
  10. 15
      jmbitcoin/test/test_bip32.py
  11. 13
      jmbitcoin/test/test_btc_formatting.py
  12. 11
      jmbitcoin/test/test_ecc_signing.py
  13. 28
      jmbitcoin/test/test_keys.py
  14. 14
      jmbitcoin/test/test_main.py
  15. 7
      jmbitcoin/test/test_tx_serialize.py
  16. 15
      jmclient/jmclient/btc.py
  17. 6
      jmclient/jmclient/client_protocol.py
  18. 9
      jmclient/jmclient/cryptoengine.py
  19. 3
      jmclient/jmclient/maker.py
  20. 1
      jmclient/test/commontest.py
  21. 2
      jmdaemon/test/test_message_channel.py
  22. 2
      scripts/obwatch/ob-watcher.py

3
jmbitcoin/jmbitcoin/__init__.py

@ -1,3 +1,6 @@
from __future__ import (absolute_import, division,
print_function, unicode_literals)
from builtins import *
import coincurve as secp256k1
from jmbitcoin.secp256k1_main import *
from jmbitcoin.secp256k1_transaction import *

3
jmbitcoin/jmbitcoin/bci.py

@ -1,4 +1,7 @@
#!/usr/bin/python
from __future__ import (absolute_import, division,
print_function, unicode_literals)
from builtins import * # noqa: F401
import json, re
import random
import time

4
jmbitcoin/jmbitcoin/bech32.py

@ -19,7 +19,9 @@
# THE SOFTWARE.
"""Reference implementation for Bech32 and segwit addresses."""
from __future__ import (absolute_import, division,
print_function, unicode_literals)
from builtins import * # noqa: F401
CHARSET = "qpzry9x8gf2tvdw0s3jn54khce6mua7l"

3
jmbitcoin/jmbitcoin/btscript.py

@ -1,3 +1,6 @@
from __future__ import (absolute_import, division,
print_function, unicode_literals)
from builtins import * # noqa: F401
#OP codes; disabled commented.
# push value

19
jmbitcoin/jmbitcoin/secp256k1_deterministic.py

@ -1,6 +1,10 @@
from __future__ import (absolute_import, division,
print_function, unicode_literals)
from builtins import *
from jmbitcoin.secp256k1_main import *
import hmac
import hashlib
import struct
# Below code ASSUMES binary inputs and compressed pubkeys
MAINNET_PRIVATE = b'\x04\x88\xAD\xE4'
@ -32,29 +36,30 @@ def raw_bip32_ckd(rawtuple, i):
hashlib.sha512).digest()
if vbytes in PRIVATE:
newkey = add_privkeys(I[:32] + B'\x01', priv, False)
newkey = add_privkeys(I[:32] + b'\x01', priv, False)
fingerprint = bin_hash160(privtopub(key, False))[:4]
if vbytes in PUBLIC:
newkey = add_pubkeys([privtopub(I[:32] + '\x01', False), key], False)
newkey = add_pubkeys([privtopub(I[:32] + b'\x01', False), key], False)
fingerprint = bin_hash160(key)[:4]
return (vbytes, depth + 1, fingerprint, i, I[32:], newkey)
def bip32_serialize(rawtuple):
vbytes, depth, fingerprint, i, chaincode, key = rawtuple
i = encode(i, 256, 4)
chaincode = encode(hash_to_int(chaincode), 256, 32)
if isinstance(i, int):
i = struct.pack(b'>L', i)
chaincode = chaincode
keydata = b'\x00' + key[:-1] if vbytes in PRIVATE else key
bindata = vbytes + from_int_to_byte(
depth % 256) + fingerprint + i + chaincode + keydata
return changebase(bindata + bin_dbl_sha256(bindata)[:4], 256, 58)
return b58encode(bindata + bin_dbl_sha256(bindata)[:4])
def bip32_deserialize(data):
dbin = changebase(data, 58, 256)
dbin = b58decode(data)
if bin_dbl_sha256(dbin[:-4])[:4] != dbin[-4:]:
raise Exception("Invalid checksum")
vbytes = dbin[0:4]
depth = from_byte_to_int(dbin[4])
depth = dbin[4]
fingerprint = dbin[5:9]
i = decode(dbin[9:13], 256)
chaincode = dbin[13:45]

341
jmbitcoin/jmbitcoin/secp256k1_main.py

@ -1,96 +1,221 @@
#!/usr/bin/python
from __future__ import print_function
from __future__ import (absolute_import, division,
print_function, unicode_literals)
from builtins import * # noqa: F401
from future.utils import bytes_to_native_str
import binascii
import hashlib
import re
import sys
import base64
import struct
import coincurve as secp256k1
#Required only for PoDLE calculation:
N = 115792089237316195423570985008687907852837564279074904382605163141518161494337
#Standard prefix for Bitcoin message signing.
BITCOIN_MESSAGE_MAGIC = '\x18' + 'Bitcoin Signed Message:\n'
if sys.version_info.major == 2:
string_types = (str, unicode)
string_or_bytes_types = string_types
int_types = (int, float, long)
# Base switching
code_strings = {
2: '01',
10: '0123456789',
16: '0123456789abcdef',
32: 'abcdefghijklmnopqrstuvwxyz234567',
58: '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz',
256: ''.join([chr(x) for x in range(256)])
}
def lpad(msg, symbol, length):
if len(msg) >= length:
return msg
return symbol * (length - len(msg)) + msg
def get_code_string(base):
if base in code_strings:
return code_strings[base]
BITCOIN_MESSAGE_MAGIC = b'\x18' + b'Bitcoin Signed Message:\n'
string_types = (str)
string_or_bytes_types = (str, bytes)
int_types = (int, float)
# Base switching
code_strings = {
2: '01',
10: '0123456789',
16: '0123456789abcdef',
32: 'abcdefghijklmnopqrstuvwxyz234567',
58: '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz',
256: ''.join([chr(x) for x in range(256)])
}
def lpad(msg, symbol, length):
if len(msg) >= length:
return msg
return symbol * (length - len(msg)) + msg
def get_code_string(base):
if base in code_strings:
return code_strings[base]
else:
raise ValueError("Invalid base!")
def bin_to_b58check(inp, magicbyte=b'\x00'):
if not isinstance(magicbyte, int):
magicbyte = struct.unpack(b'B', magicbyte)[0]
assert(0 <= magicbyte <= 0xff)
if magicbyte == 0:
inp_fmtd = struct.pack(b'B', magicbyte) + inp
while magicbyte > 0:
inp_fmtd = struct.pack(b'B', magicbyte % 256) + inp
magicbyte //= 256
checksum = bin_dbl_sha256(inp_fmtd)[:4]
return b58encode(inp_fmtd + checksum)
def bytes_to_hex_string(b):
return b.encode('hex')
def safe_from_hex(s):
return s.decode('hex')
def from_int_to_byte(a):
return struct.pack(b'B', a)
def from_byte_to_int(a):
return struct.unpack(b'B', a)[0]
def from_string_to_bytes(a):
return a if isinstance(a, bytes) else bytes(a, 'utf-8')
def safe_hexlify(a):
return str(binascii.hexlify(a), 'utf-8')
class SerializationError(Exception):
"""Base class for serialization errors"""
class SerializationTruncationError(SerializationError):
"""Serialized data was truncated
Thrown by deserialize() and stream_deserialize()
"""
def ser_read(f, n):
"""Read from a stream safely
Raises SerializationError and SerializationTruncationError appropriately.
Use this instead of f.read() in your classes stream_(de)serialization()
functions.
"""
MAX_SIZE = 0x02000000
if n > MAX_SIZE:
raise SerializationError('Asked to read 0x%x bytes; MAX_SIZE exceeded' % n)
r = f.read(n)
if len(r) < n:
raise SerializationTruncationError('Asked to read %i bytes, but only got %i' % (n, len(r)))
return r
B58_DIGITS = '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz'
class Base58Error(Exception):
pass
class InvalidBase58Error(Base58Error):
"""Raised on generic invalid base58 data, such as bad characters.
Checksum failures raise Base58ChecksumError specifically.
"""
pass
def b58encode(b):
"""Encode bytes to a base58-encoded string"""
# Convert big-endian bytes to integer
n = int('0x0' + binascii.hexlify(b).decode('utf8'), 16)
# Divide that integer into bas58
res = []
while n > 0:
n, r = divmod(n, 58)
res.append(B58_DIGITS[r])
res = ''.join(res[::-1])
# Encode leading zeros as base58 zeros
czero = b'\x00'
if sys.version > '3':
# In Python3 indexing a bytes returns numbers, not characters.
czero = 0
pad = 0
for c in b:
if c == czero:
pad += 1
else:
raise ValueError("Invalid base!")
def changebase(string, frm, to, minlen=0):
if frm == to:
return lpad(string, get_code_string(frm)[0], minlen)
return encode(decode(string, frm), to, minlen)
def bin_to_b58check(inp, magicbyte=0):
inp_fmtd = chr(int(magicbyte)) + inp
leadingzbytes = len(re.match('^\x00*', inp_fmtd).group(0))
checksum = bin_dbl_sha256(inp_fmtd)[:4]
return '1' * leadingzbytes + changebase(inp_fmtd + checksum, 256, 58)
def bytes_to_hex_string(b):
return b.encode('hex')
def safe_from_hex(s):
return s.decode('hex')
def from_int_to_byte(a):
return chr(a)
def from_byte_to_int(a):
return ord(a)
def from_string_to_bytes(a):
return a
def safe_hexlify(a):
return binascii.hexlify(a)
def encode(val, base, minlen=0):
base, minlen = int(base), int(minlen)
code_string = get_code_string(base)
result = ""
while val > 0:
result = code_string[val % base] + result
val //= base
return code_string[0] * max(minlen - len(result), 0) + result
def decode(string, base):
base = int(base)
code_string = get_code_string(base)
result = 0
if base == 16:
string = string.lower()
while len(string) > 0:
result *= base
result += code_string.find(string[0])
string = string[1:]
return result
else:
raise NotImplementedError("Only Python2 currently supported by btc interface") #pragma: no cover
break
return B58_DIGITS[0] * pad + res
def b58decode(s):
"""Decode a base58-encoding string, returning bytes"""
if not s:
return b''
# Convert the string to an integer
n = 0
for c in s:
n *= 58
if c not in B58_DIGITS:
raise InvalidBase58Error('Character %r is not a valid base58 character' % c)
digit = B58_DIGITS.index(c)
n += digit
# Convert the integer to bytes
h = '%x' % n
if len(h) % 2:
h = '0' + h
res = bytes(binascii.unhexlify(h.encode('utf8')))
# Add padding back.
pad = 0
for c in s[:-1]:
if c == B58_DIGITS[0]: pad += 1
else: break
return b'\x00' * pad + res
def uint256encode(s):
"""Convert bytes to uint256"""
r = 0
t = struct.unpack(b"<IIIIIIII", s[:32])
for i in range(8):
r += t[i] << (i * 32)
return r
def uint256decode(u):
r = b""
for i in range(8):
r += struct.pack(b'<I', u >> (i * 32) & 0xffffffff)
return r
def encode(val, base, minlen=0):
base, minlen = int(base), int(minlen)
code_string = get_code_string(base)
result_bytes = bytes()
while val > 0:
curcode = code_string[val % base]
result_bytes = bytes([ord(curcode)]) + result_bytes
val //= base
pad_size = minlen - len(result_bytes)
padding_element = b'\x00' if base == 256 else b'1' \
if base == 58 else b'0'
if (pad_size > 0):
result_bytes = padding_element*pad_size + result_bytes
result_string = ''.join([chr(y) for y in result_bytes])
result = result_bytes if base == 256 else result_string
return result
def decode(string, base):
if base == 256 and isinstance(string, str):
string = bytes(bytearray.fromhex(string))
base = int(base)
code_string = get_code_string(base)
result = 0
if base == 256:
def extract(d, cs):
if isinstance(d, int):
return d
else:
return struct.unpack(b'B', d)[0]
else:
def extract(d, cs):
return cs.find(d if isinstance(d, str) else chr(d))
if base == 16:
string = string.lower()
while len(string) > 0:
result *= base
result += extract(string[0], code_string)
string = string[1:]
return result
"""PoDLE related primitives
"""
@ -98,7 +223,7 @@ def getG(compressed=True):
"""Returns the public key binary
representation of secp256k1 G
"""
priv = "\x00"*31 + "\x01"
priv = b"\x00"*31 + b"\x01"
G = secp256k1.PrivateKey(priv).public_key.format(compressed)
return G
@ -137,12 +262,11 @@ def bin_sha256(string):
def sha256(string):
return bytes_to_hex_string(bin_sha256(string))
def bin_dbl_sha256(s):
bytes_to_hash = from_string_to_bytes(s)
def bin_dbl_sha256(bytes_to_hash):
return hashlib.sha256(hashlib.sha256(bytes_to_hash).digest()).digest()
def dbl_sha256(string):
return safe_hexlify(bin_dbl_sha256(string))
return hashlib.sha256(hashlib.sha256(string).digest()).hexdigest()
def hash_to_int(x):
if len(x) in [40, 64]:
@ -150,11 +274,14 @@ def hash_to_int(x):
return decode(x, 256)
def num_to_var_int(x):
x = int(x)
if not isinstance(x, int):
if len(x) == 0:
return b'\x00'
x = struct.unpack(b'B', x)[0]
if x < 253: return from_int_to_byte(x)
elif x < 65536: return from_int_to_byte(253) + encode(x, 256, 2)[::-1]
elif x < 4294967296: return from_int_to_byte(254) + encode(x, 256, 4)[::-1]
else: return from_int_to_byte(255) + encode(x, 256, 8)[::-1]
elif x < 65536: return from_int_to_byte(253) + struct.pack(b'<H', x)
elif x < 4294967296: return from_int_to_byte(254) + struct.pack(b'<I', x)
else: return from_int_to_byte(255) + struct.pack(b'<Q', x)
def message_sig_hash(message):
"""Used for construction of signatures of
@ -166,22 +293,20 @@ def message_sig_hash(message):
# Encodings
def b58check_to_bin(inp):
leadingzbytes = len(re.match('^1*', inp).group(0))
data = b'\x00' * leadingzbytes + changebase(inp, 58, 256)
data = b58decode(inp)
assert bin_dbl_sha256(data[:-4])[:4] == data[-4:]
return data[1:-4]
def get_version_byte(inp):
leadingzbytes = len(re.match('^1*', inp).group(0))
data = b'\x00' * leadingzbytes + changebase(inp, 58, 256)
data = b58decode(inp)
assert bin_dbl_sha256(data[:-4])[:4] == data[-4:]
return ord(data[0])
return data[:1]
def hex_to_b58check(inp, magicbyte=0):
def hex_to_b58check(inp, magicbyte=b'\x00'):
return bin_to_b58check(binascii.unhexlify(inp), magicbyte)
def b58check_to_hex(inp):
return safe_hexlify(b58check_to_bin(inp))
return binascii.hexlify(b58check_to_bin(inp))
def pubkey_to_address(pubkey, magicbyte=0):
if len(pubkey) in [66, 130]:
@ -191,9 +316,11 @@ def pubkey_to_address(pubkey, magicbyte=0):
pubtoaddr = pubkey_to_address
def wif_compressed_privkey(priv, vbyte=0):
def wif_compressed_privkey(priv, vbyte=b'\x00'):
"""Convert privkey in hex compressed to WIF compressed
"""
if not isinstance(vbyte, int):
vbyte = struct.unpack(b'B', vbyte)[0]
if len(priv) != 66:
raise Exception("Wrong length of compressed private key")
if priv[-2:] != '01':
@ -201,20 +328,22 @@ def wif_compressed_privkey(priv, vbyte=0):
return bin_to_b58check(binascii.unhexlify(priv), 128 + int(vbyte))
def from_wif_privkey(wif_priv, compressed=True, vbyte=0):
def from_wif_privkey(wif_priv, compressed=True, vbyte=b'\x00'):
"""Convert WIF compressed privkey to hex compressed.
Caller specifies the network version byte (0 for mainnet, 0x6f
for testnet) that the key should correspond to; if there is
a mismatch an error is thrown. WIF encoding uses 128+ this number.
"""
if isinstance(vbyte, int):
vbyte = struct.pack(b'B', vbyte)
bin_key = b58check_to_bin(wif_priv)
claimed_version_byte = get_version_byte(wif_priv)
if not 128+vbyte == claimed_version_byte:
if not from_int_to_byte(128+from_byte_to_int(vbyte)) == claimed_version_byte:
raise Exception(
"WIF key version byte is wrong network (mainnet/testnet?)")
if compressed and not len(bin_key) == 33:
raise Exception("Compressed private key is not 33 bytes")
if compressed and not bin_key[-1] == '\x01':
if compressed and not bin_key[-1:] == b'\x01':
raise Exception("Private key has incorrect compression byte")
return safe_hexlify(bin_key)
@ -252,9 +381,9 @@ def hexbin(func):
newargs = []
for arg in args[:-1]:
if isinstance(arg, (list, tuple)):
newargs += [[x.decode('hex') for x in arg]]
newargs += [[binascii.unhexlify(x) for x in arg]]
else:
newargs += [arg.decode('hex')]
newargs += [binascii.unhexlify(arg)]
newargs += [False]
returnval = func(*newargs, **kwargs)
if isinstance(returnval, bool):
@ -268,7 +397,7 @@ def hexbin(func):
def read_privkey(priv):
if len(priv) == 33:
if priv[-1] == '\x01':
if priv[-1:] == b'\x01':
compressed = True
else:
raise Exception("Invalid private key")
@ -286,7 +415,7 @@ def privkey_to_pubkey_inner(priv, usehex):
and return compressed/uncompressed public key as appropriate.'''
compressed, priv = read_privkey(priv)
#secp256k1 checks for validity of key value.
newpriv = secp256k1.PrivateKey(secret=priv)
newpriv = secp256k1.PrivateKey(secret=bytes_to_native_str(priv))
return newpriv.public_key.format(compressed)
def privkey_to_pubkey(priv, usehex=True):
@ -311,7 +440,7 @@ def multiply(s, pub, usehex, rawpub=True, return_serialized=True):
'''
newpub = secp256k1.PublicKey(pub)
#see note to "tweak_mul" function in podle.py
res = newpub.multiply(s)
res = newpub.multiply(bytes_to_native_str(s))
if not return_serialized:
return res
return res.format()
@ -339,7 +468,7 @@ def add_privkeys(priv1, priv2, usehex):
p1 = secp256k1.PrivateKey(newpriv1)
res = p1.add(newpriv2).secret
if compressed:
res += '\x01'
res += b'\x01'
return res
@hexbin

398
jmbitcoin/jmbitcoin/secp256k1_transaction.py

@ -1,87 +1,61 @@
#!/usr/bin/python
from __future__ import (absolute_import, division,
print_function, unicode_literals)
from builtins import *
from past.builtins import basestring
from io import BytesIO
import binascii
import copy
import re
import sys
import os
from jmbitcoin.secp256k1_main import *
from jmbitcoin.bech32 import *
from _functools import reduce
is_python2 = sys.version_info.major == 2
### Hex to bin converter and vice versa for objects
def json_is_base(obj, base):
if not is_python2 and isinstance(obj, bytes): #pragma: no cover
return False
# Transaction serialization and deserialization
alpha = get_code_string(base)
if isinstance(obj, string_types):
for i in range(len(obj)):
if alpha.find(obj[i]) == -1:
return False
return True
elif isinstance(obj, int_types) or obj is None:
return True
elif isinstance(obj, list):
for i in range(len(obj)):
if not json_is_base(obj[i], base):
return False
return True
def deserialize(txinp):
if isinstance(txinp, basestring) and re.match('^[0-9a-fA-F]*$', txinp):
tx = BytesIO(binascii.unhexlify(txinp))
hexout = True
else:
for x in obj:
if not json_is_base(obj[x], base):
return False
return True
tx = BytesIO(txinp)
hexout = False
def json_changebase(obj, changer):
if isinstance(obj, string_or_bytes_types):
return changer(obj)
elif isinstance(obj, int_types) or obj is None:
return obj
elif isinstance(obj, list):
return [json_changebase(x, changer) for x in obj]
return dict((x, json_changebase(obj[x], changer)) for x in obj)
# Transaction serialization and deserialization
def hex_string(scriptbytes, hexout):
if hexout:
return binascii.hexlify(scriptbytes)
else:
return scriptbytes
def deserialize(tx):
if isinstance(tx, (str, unicode)) and re.match('^[0-9a-fA-F]*$', tx):
#tx = bytes(bytearray.fromhex(tx))
return json_changebase(
deserialize(binascii.unhexlify(tx)), lambda x: safe_hexlify(x))
# http://stackoverflow.com/questions/4851463/python-closure-write-to-variable-in-parent-scope
# Python's scoping rules are demented, requiring me to make pos an object
# so that it is call-by-reference
pos = [0]
#TODO: these are disgustingly ignorant of overrun errors!
def read_as_int(bytez):
pos[0] += bytez
return decode(tx[pos[0] - bytez:pos[0]][::-1], 256)
if bytez == 2:
return struct.unpack(b'<H', ser_read(tx, 2))[0]
elif bytez == 4:
return struct.unpack(b'<I', ser_read(tx, 4))[0]
elif bytez == 1:
return struct.unpack(b'B', ser_read(tx, 1))[0]
elif bytez == 8:
return struct.unpack(b'<Q', ser_read(tx, 8))[0]
else:
raise SerializationError('Asked to read unsupported %x bytes; bytez can only be 1, 2, 4 or 8' % bytez)
def read_var_int():
pos[0] += 1
val = from_byte_to_int(tx[pos[0] - 1])
val = from_byte_to_int(ser_read(tx, 1))
if val < 253:
return val
return read_as_int(pow(2, val - 252))
def read_bytes(bytez):
pos[0] += bytez
return tx[pos[0] - bytez:pos[0]]
def read_var_string():
size = read_var_int()
return read_bytes(size)
return ser_read(tx, size)
def read_flag_byte(val):
flag = read_bytes(1)
last = tx.tell()
flag = ser_read(tx, 1)
if from_byte_to_int(flag)==val:
return True
else:
pos[0] -= 1
tx.seek(last)
return False
obj = {"ins": [], "outs": []}
@ -98,18 +72,18 @@ def deserialize(tx):
for i in range(ins):
obj["ins"].append({
"outpoint": {
"hash": read_bytes(32)[::-1],
"hash": hex_string(ser_read(tx, 32)[::-1], hexout),
"index": read_as_int(4)
},
#TODO this will probably crap out on null for segwit
"script": read_var_string(),
"script": hex_string(read_var_string(), hexout),
"sequence": read_as_int(4)
})
outs = read_var_int()
for i in range(outs):
obj["outs"].append({
"value": read_as_int(8),
"script": read_var_string()
"script": hex_string(read_var_string(), hexout)
})
#segwit flag is only set if at least one txinwitness exists,
#in other words it would have to be at least partially signed;
@ -127,52 +101,98 @@ def deserialize(tx):
num_items = read_var_int()
items = []
for ni in range(num_items):
items.append(read_var_string())
items.append(hex_string(read_var_string(), hexout))
obj["ins"][i]["txinwitness"] = items
obj["locktime"] = read_as_int(4)
return obj
def serialize(txobj):
o = []
if json_is_base(txobj, 16):
json_changedbase = json_changebase(txobj,
lambda x: binascii.unhexlify(x))
hexlified = safe_hexlify(serialize(json_changedbase))
return hexlified
o.append(encode(txobj["version"], 256, 4)[::-1])
def serialize(tx):
"""Assumes a deserialized transaction in which all
dictionary values are decoded hex strings or numbers.
Rationale: mixing raw bytes/hex/strings in dict objects causes
complexity; since the dict tx object has to be inspected
in this function, avoid complicated logic elsewhere by making
all conversions to raw byte strings happen here.
Table of dictionary keys and type for the value:
================================================
version: int
ins: list
ins[0]["outpoint"]["hash"]: hex encoded string
ins[0]["outpoint"]["index"]: int
ins[0]["script"]: hex encoded string
ins[0]["sequence"]: int
ins[0]["txinwitness"]: list (optional, may not exist)
ins[0]["txinwitness"][0]: hex encoded string
outs: list
outs[0]["script"]: hex encoded string
outs[0]["value"]: int
locktime: int
=================================================
Returned serialized transaction is a byte string.
"""
#Because we are manipulating the dict in-place, need
#to work on a copy
txobj = copy.deepcopy(tx)
o = BytesIO()
hexout = False
o.write(struct.pack(b'<I', txobj["version"]))
segwit = False
if any("txinwitness" in x.keys() for x in txobj["ins"]):
if any("txinwitness" in x for x in txobj["ins"]):
segwit = True
if segwit:
#append marker and flag
o.append('\x00')
o.append('\x01')
o.append(num_to_var_int(len(txobj["ins"])))
o.write(b'\x00')
o.write(b'\x01')
o.write(num_to_var_int(len(txobj["ins"])))
for inp in txobj["ins"]:
o.append(inp["outpoint"]["hash"][::-1])
o.append(encode(inp["outpoint"]["index"], 256, 4)[::-1])
o.append(num_to_var_int(len(inp["script"])) + (inp["script"] if inp[
"script"] or is_python2 else bytes()))
o.append(encode(inp["sequence"], 256, 4)[::-1])
o.append(num_to_var_int(len(txobj["outs"])))
if len(inp["outpoint"]["hash"]) == 64:
o.write(binascii.unhexlify(inp["outpoint"]["hash"])[::-1])
hexout = True
elif len(inp["outpoint"]["hash"]) == 32:
o.write(inp["outpoint"]["hash"][::-1])
else:
raise SerializationError('Hash has unsupported length: %x bytes; must be 64 or 32 bytes' % len(inp["outpoint"]["hash"]))
o.write(struct.pack(b'<I', inp["outpoint"]["index"]))
if len(inp["script"]) == 0:
o.write(b'\x00')
elif isinstance(inp["script"], basestring) and re.match('^[0-9a-fA-F]*$', inp["script"]):
o.write(num_to_var_int(len(binascii.unhexlify(inp["script"]))))
o.write(binascii.unhexlify(inp["script"]))
else:
o.write(num_to_var_int(len(inp["script"])))
o.write(inp["script"])
o.write(struct.pack(b'<I', inp["sequence"]))
o.write(num_to_var_int(len(txobj["outs"])))
for out in txobj["outs"]:
o.append(encode(out["value"], 256, 8)[::-1])
o.append(num_to_var_int(len(out["script"])) + out["script"])
o.write(struct.pack(b'<Q', out["value"]))
if len(out["script"]) == 0:
o.write(b'\x00')
elif isinstance(out["script"], basestring) and re.match('^[0-9a-fA-F]*$', out["script"]):
o.write(num_to_var_int(len(binascii.unhexlify(out["script"]))))
o.write(binascii.unhexlify(out["script"]))
else:
o.write(num_to_var_int(len(out["script"])))
o.write(out["script"])
if segwit:
#number of witnesses is not explicitly encoded;
#it's implied by txin length
for inp in txobj["ins"]:
if "txinwitness" not in inp.keys():
o.append('\x00')
if "txinwitness" not in inp:
o.write(b'\x00')
continue
items = inp["txinwitness"]
o.append(num_to_var_int(len(items)))
o.write(num_to_var_int(len(items)))
for item in items:
o.append(num_to_var_int(len(item)) + item)
o.append(encode(txobj["locktime"], 256, 4)[::-1])
if isinstance(item, basestring) and re.match('^[0-9a-fA-F]*$', item):
item = binascii.unhexlify(item)
o.write(num_to_var_int(len(item)) + item)
o.write(struct.pack(b'<I', txobj["locktime"]))
return ''.join(o) if is_python2 else reduce(lambda x, y: x + y, o, bytes())
if hexout:
return binascii.hexlify(o.getvalue())
else:
return o.getvalue()
# Hashing transactions for signing
@ -192,54 +212,61 @@ def segwit_signature_form(txobj, i, script, amount, hashcode=SIGHASH_ALL,
# return serialize(segwit_signature_form(deserialize(txobj), i, script,
# amount, hashcode))
script = decoder_func(script)
nVersion = encode(txobj["version"], 256, 4)[::-1]
nVersion = struct.pack(b'<I', txobj["version"])
if not isinstance(hashcode, int):
hashcode = struct.unpack(b'B', hashcode)[0]
#create hashPrevouts
if hashcode & SIGHASH_ANYONECANPAY:
hashPrevouts = "\x00"*32
hashPrevouts = b"\x00"*32
else:
pi = ""
pi = b""
for inp in txobj["ins"]:
pi += decoder_func(inp["outpoint"]["hash"])[::-1]
pi += encode(inp["outpoint"]["index"], 256, 4)[::-1]
pi += struct.pack(b'<I', inp["outpoint"]["index"])
hashPrevouts = bin_dbl_sha256(pi)
#create hashSequence
if not hashcode & SIGHASH_ANYONECANPAY and not (
hashcode & 0x1f == SIGHASH_SINGLE) and not (hashcode & 0x1f == SIGHASH_NONE):
pi = ""
pi = b""
for inp in txobj["ins"]:
pi += encode(inp["sequence"], 256, 4)[::-1]
pi += struct.pack(b'<I', inp["sequence"])
hashSequence = bin_dbl_sha256(pi)
else:
hashSequence = "\x00"*32
hashSequence = b"\x00"*32
#add this input's outpoint
thisOut = decoder_func(txobj["ins"][i]["outpoint"]["hash"])[::-1]
thisOut += encode(txobj["ins"][i]["outpoint"]["index"], 256, 4)[::-1]
thisOut += struct.pack(b'<I', txobj["ins"][i]["outpoint"]["index"])
scriptCode = num_to_var_int(len(script)) + script
amt = encode(amount, 256, 8)[::-1]
thisSeq = encode(txobj["ins"][i]["sequence"], 256, 4)[::-1]
amt = struct.pack(b'<Q', amount)
thisSeq = struct.pack(b'<I', txobj["ins"][i]["sequence"])
#create hashOutputs
if not (hashcode & 0x1f == SIGHASH_SINGLE) and not (hashcode & 0x1f == SIGHASH_NONE):
pi = ""
pi = b""
for out in txobj["outs"]:
pi += encode(out["value"], 256, 8)[::-1]
pi += struct.pack(b'<Q', out["value"])
pi += (num_to_var_int(len(decoder_func(out["script"]))) + \
decoder_func(out["script"]))
hashOutputs = bin_dbl_sha256(pi)
elif hashcode & 0x1f == SIGHASH_SINGLE and i < len(txobj['outs']):
pi = encode(txobj["outs"][i]["value"], 256, 8)[::-1]
pi = struct.pack(b'<Q', txobj["outs"][i]["value"])
pi += (num_to_var_int(len(decoder_func(txobj["outs"][i]["script"]))) +
decoder_func(txobj["outs"][i]["script"]))
hashOutputs = bin_dbl_sha256(pi)
else:
hashOutputs = "\x00"*32
nLockTime = encode(txobj["locktime"], 256, 4)[::-1]
hashOutputs = b"\x00"*32
nLockTime = struct.pack(b'<I', txobj["locktime"])
return nVersion + hashPrevouts + hashSequence + thisOut + scriptCode + amt + \
thisSeq + hashOutputs + nLockTime
def signature_form(tx, i, script, hashcode=SIGHASH_ALL):
i, hashcode = int(i), int(hashcode)
if isinstance(tx, string_or_bytes_types):
return serialize(signature_form(deserialize(tx), i, script, hashcode))
if not isinstance(hashcode, int):
hashcode = struct.unpack(b'B', hashcode)[0]
if isinstance(tx, basestring) and re.match('^[0-9a-fA-F]*$', tx):
tx = deserialize(tx)
elif isinstance(tx, basestring):
tx = deserialize(binascii.hexlify(tx))
if isinstance(script, basestring) and not re.match('^[0-9a-fA-F]*$', script):
script = binascii.hexlify(script)
newtx = copy.deepcopy(tx)
for inp in newtx["ins"]:
#If tx is passed in in segwit form, it must be switched to non-segwit.
@ -288,16 +315,19 @@ def txhash(tx, hashcode=None, check_sw=True):
If check_sw is True it checks the serialized format for
segwit flag bytes, and produces the correct form for txid (not wtxid).
"""
if isinstance(tx, str) and re.match('^[0-9a-fA-F]*$', tx):
tx = changebase(tx, 16, 256)
if not isinstance(tx, basestring):
tx = serialize(tx)
if isinstance(tx, basestring) and re.match('^[0-9a-fA-F]*$', tx):
tx = binascii.unhexlify(tx)
if check_sw and from_byte_to_int(tx[4]) == 0:
if not from_byte_to_int(tx[5]) == 1:
#This invalid, but a raise is a DOS vector in some contexts.
return None
return segwit_txid(tx, hashcode)
if hashcode:
return dbl_sha256(from_string_to_bytes(tx) + encode(
int(hashcode), 256, 4)[::-1])
if not isinstance(hashcode, int):
hashcode = struct.unpack(b'B', hashcode)[0]
return dbl_sha256(from_string_to_bytes(tx) + struct.pack(b'<I', hashcode))
else:
return safe_hexlify(bin_dbl_sha256(tx)[::-1])
@ -337,12 +367,7 @@ def mk_scripthash_script(addr):
def segwit_scriptpubkey(witver, witprog):
"""Construct a Segwit scriptPubKey for a given witness program."""
if sys.version_info >= (3, 0):
x = bytes([witver + 0x50 if witver else 0, len(witprog)] + witprog)
else:
x = chr(witver + 0x50) if witver else '\x00'
x += chr(len(witprog))
x += bytearray(witprog)
x = bytes([witver + 0x50 if witver else 0, len(witprog)] + witprog)
return x
def mk_native_segwit_script(addr):
@ -412,31 +437,53 @@ def p2sh_scriptaddr(script, magicbyte=5):
scriptaddr = p2sh_scriptaddr
def deserialize_script(script):
if isinstance(script, str) and re.match('^[0-9a-fA-F]*$', script):
return json_changebase(
deserialize_script(binascii.unhexlify(script)),
lambda x: safe_hexlify(x))
out, pos = [], 0
while pos < len(script):
code = from_byte_to_int(script[pos])
def deserialize_script(scriptinp):
"""Note that this is not used internally, in
the jmbitcoin package, to deserialize() transactions;
its function is only to allow parsing of scripts by
external callers. Thus, it returns in the format used
outside the package: a deserialized script is a list of
entries which can be any of:
None
integer
hex string
"""
def hex_string(scriptbytes, hexout):
if hexout:
return binascii.hexlify(scriptbytes)
else:
return scriptbytes
if isinstance(scriptinp, basestring) and re.match('^[0-9a-fA-F]*$', scriptinp):
script = BytesIO(binascii.unhexlify(scriptinp))
hexout = True
else:
script = BytesIO(scriptinp)
hexout = False
script.seek(0, os.SEEK_END)
length = script.tell()
script.seek(0, os.SEEK_SET)
out = []
while script.tell() < length:
code = from_byte_to_int(ser_read(script, 1))
if code == 0:
out.append(None)
pos += 1
elif code <= 75:
out.append(script[pos + 1:pos + 1 + code])
pos += 1 + code
out.append(hex_string(ser_read(script, code), hexout))
elif code <= 78:
szsz = pow(2, code - 76)
sz = decode(script[pos + szsz:pos:-1], 256)
out.append(script[pos + 1 + szsz:pos + 1 + szsz + sz])
pos += 1 + szsz + sz
if code == 78:
sz = struct.unpack(b'<I', ser_read(script, 4))[0]
elif code == 77:
sz = struct.unpack(b'<H', ser_read(script, 2))[0]
elif code == 76:
sz = struct.unpack(b'<B', ser_read(script, 1))[0]
else:
sz = code
out.append(hex_string(ser_read(script, sz), hexout))
elif code <= 96:
out.append(code - 80)
pos += 1
else:
out.append(code)
pos += 1
return out
@ -449,45 +496,28 @@ def serialize_script_unit(unit):
elif unit is None:
return b'\x00'
else:
if isinstance(unit, basestring) and re.match('^[0-9a-fA-F]*$', unit):
unit = binascii.unhexlify(unit)
if len(unit) <= 75:
return from_int_to_byte(len(unit)) + unit
elif len(unit) < 256:
return from_int_to_byte(76) + from_int_to_byte(len(unit)) + unit
return from_int_to_byte(76) + struct.pack(b'<B', len(unit)) + unit
elif len(unit) < 65536:
return from_int_to_byte(77) + encode(len(unit), 256, 2)[::-1] + unit
return from_int_to_byte(77) + struct.pack(b'<H', len(unit)) + unit
else:
return from_int_to_byte(78) + encode(len(unit), 256, 4)[::-1] + unit
if is_python2:
def serialize_script(script):
#bugfix: if *every* item in the script is of type int,
#for example a script of OP_TRUE, or None,
#then the previous version would always report json_is_base as True,
#resulting in an infinite loop (look who's demented now).
#There is no easy solution without being less flexible;
#here we default to returning a hex serialization in cases where
#there are no strings to use as flags.
if all([(isinstance(x, int) or x is None) for x in script]):
#no indication given whether output should be hex or binary, so..?
return binascii.hexlify(''.join(map(serialize_script_unit, script)))
if json_is_base(script, 16):
return binascii.hexlify(serialize_script(json_changebase(
script, lambda x: binascii.unhexlify(x))))
return ''.join(map(serialize_script_unit, script))
else: #pragma: no cover
def serialize_script(script):
#TODO Python 3 bugfix as above needed
if json_is_base(script, 16):
return safe_hexlify(serialize_script(json_changebase(
script, lambda x: binascii.unhexlify(x))))
result = bytes()
for b in map(serialize_script_unit, script):
result += b if isinstance(b, bytes) else bytes(b, 'utf-8')
return from_int_to_byte(78) + struct.pack(b'<I', len(unit)) + unit
def serialize_script(script):
result = b''
hexout = True
for b in script:
if isinstance(b, basestring) and not re.match('^[0-9a-fA-F]*$', b):
hexout = False
result += serialize_script_unit(b)
if hexout:
return binascii.hexlify(result)
else:
return result
@ -508,18 +538,18 @@ def verify_tx_input(tx, i, script, sig, pub, witness=None, amount=None):
if re.match('^[0-9a-fA-F]*$', script):
script = binascii.unhexlify(script)
if not re.match('^[0-9a-fA-F]*$', sig):
sig = safe_hexlify(sig)
sig = binascii.hexlify(sig)
if not re.match('^[0-9a-fA-F]*$', pub):
pub = safe_hexlify(pub)
pub = binascii.hexlify(pub)
if witness:
if not re.match('^[0-9a-fA-F]*$', witness):
witness = safe_hexlify(witness)
hashcode = decode(sig[-2:], 16)
hashcode = binascii.unhexlify(sig[-2:])
if witness and amount:
#TODO assumes p2sh wrapped segwit input; OK for JM wallets
scriptCode = "76a914"+hash160(binascii.unhexlify(pub))+"88ac"
modtx = segwit_signature_form(deserialize(binascii.hexlify(tx)), int(i),
scriptCode, amount, hashcode)
scriptCode = binascii.unhexlify("76a914"+hash160(binascii.unhexlify(pub))+"88ac")
modtx = segwit_signature_form(deserialize(tx), int(i),
scriptCode, amount, hashcode, decoder_func=lambda x: x)
else:
modtx = signature_form(tx, int(i), script, hashcode)
return ecdsa_tx_verify(modtx, sig, pub, hashcode)
@ -527,9 +557,11 @@ def verify_tx_input(tx, i, script, sig, pub, witness=None, amount=None):
def sign(tx, i, priv, hashcode=SIGHASH_ALL, usenonce=None, amount=None):
i = int(i)
if (not is_python2 and isinstance(re, bytes)) or not re.match(
'^[0-9a-fA-F]*$', tx):
return binascii.unhexlify(sign(safe_hexlify(tx), i, priv))
if isinstance(tx, basestring) and re.match('^[0-9a-fA-F]*$', tx):
tx = binascii.unhexlify(tx)
hexout = True
else:
hexout = False
if len(priv) <= 33:
priv = safe_hexlify(priv)
if amount:
@ -541,7 +573,13 @@ def sign(tx, i, priv, hashcode=SIGHASH_ALL, usenonce=None, amount=None):
sig = ecdsa_tx_sign(signing_tx, priv, hashcode, usenonce=usenonce)
txobj = deserialize(tx)
txobj["ins"][i]["script"] = serialize_script([sig, pub])
return serialize(txobj)
serobj = serialize(txobj)
if hexout and isinstance(serobj, basestring) and not re.match('^[0-9a-fA-F]*$', serobj):
return binascii.hexlify(serobj)
elif not hexout and re.match('^[0-9a-fA-F]*$', serobj):
return binascii.unhexlify(serobj)
else:
return serobj
def p2sh_p2wpkh_sign(tx, i, priv, amount, hashcode=SIGHASH_ALL, usenonce=None):
"""Given a serialized transaction, index, private key in hex,
@ -612,7 +650,6 @@ def mktx(*args):
(ins if is_inp(a) else outs).append(a)
else:
(ins if is_inp(arg) else outs).append(arg)
txobj = {"locktime": 0, "version": 1, "ins": [], "outs": []}
for i in ins:
if isinstance(i, dict) and "outpoint" in i:
@ -627,7 +664,7 @@ def mktx(*args):
"sequence": 4294967295
})
for o in outs:
if isinstance(o, string_or_bytes_types):
if isinstance(o, basestring):
addr = o[:o.find(':')]
val = int(o[o.find(':') + 1:])
o = {}
@ -646,6 +683,5 @@ def mktx(*args):
raise Exception("Could not find 'address' or 'script' in output.")
outobj["value"] = o["value"]
txobj["outs"].append(outobj)
return serialize(txobj)

11
jmbitcoin/test/test_addresses.py

@ -1,3 +1,6 @@
from __future__ import (absolute_import, division,
print_function, unicode_literals)
from builtins import * # noqa: F401
import jmbitcoin as btc
import json
import pytest
@ -7,14 +10,13 @@ testdir = os.path.dirname(os.path.realpath(__file__))
def validate_address(addr, nettype):
"""A mock of jmclient.validate_address
"""
BTC_P2PK_VBYTE = {"mainnet": 0x00, "testnet": 0x6f}
BTC_P2SH_VBYTE = {"mainnet": 0x05, "testnet": 0xc4}
BTC_P2PK_VBYTE = {"mainnet": b'\x00', "testnet": b'\x6f'}
BTC_P2SH_VBYTE = {"mainnet": b'\x05', "testnet": b'\xc4'}
try:
ver = btc.get_version_byte(addr)
except AssertionError:
except AssertionError as e:
return False, 'Checksum wrong. Typo in address?'
except Exception as e:
print repr(e)
return False, "Invalid bitcoin address"
if ver not in [BTC_P2PK_VBYTE[nettype], BTC_P2SH_VBYTE[nettype]]:
return False, 'Wrong address version. Testnet/mainnet confused?'
@ -52,7 +54,6 @@ def test_b58_valid_addresses():
else:
net = "mainnet"
#if using pytest -s ; sanity check to see what's actually being tested
print 'testing this address: ' + addr
res, message = validate_address(addr, net)
assert res == True, "Incorrectly failed to validate address: " + addr + " with message: " + message

17
jmbitcoin/test/test_bech32.py

@ -1,5 +1,7 @@
#!/usr/bin/python
from __future__ import print_function
from __future__ import (absolute_import, division,
print_function, unicode_literals)
from builtins import * # noqa: F401
# Copyright (c) 2017 Pieter Wuille
#
@ -24,21 +26,10 @@ from __future__ import print_function
"""Reference tests for segwit adresses"""
import sys
import binascii
import unittest
import jmbitcoin as btc
def segwit_scriptpubkey(witver, witprog):
"""Construct a Segwit scriptPubKey for a given witness program."""
if sys.version_info >= (3, 0):
x = bytes([witver + 0x50 if witver else 0, len(witprog)] + witprog)
else:
x = chr(witver + 0x50) if witver else '\x00'
x += chr(len(witprog))
x += bytearray(witprog)
return x
VALID_CHECKSUM = [
"A12UEL5L",
"an83characterlonghumanreadablepartthatcontainsthenumber1andtheexcludedcharactersbio1tt5tgs",
@ -120,7 +111,7 @@ class TestSegwitAddress(unittest.TestCase):
hrp = "tb"
witver, witprog = btc.bech32addr_decode(hrp, address)
self.assertIsNotNone(witver)
scriptpubkey = segwit_scriptpubkey(witver, witprog)
scriptpubkey = btc.segwit_scriptpubkey(witver, witprog)
self.assertEqual(scriptpubkey, binascii.unhexlify(hexscript))
addr = btc.bech32addr_encode(hrp, witver, witprog)
self.assertEqual(address.lower(), addr)

15
jmbitcoin/test/test_bip32.py

@ -1,3 +1,6 @@
from __future__ import (absolute_import, division,
print_function, unicode_literals)
from builtins import * # noqa: F401
import pytest
import jmbitcoin as btc
import binascii
@ -71,13 +74,13 @@ def test_bip32_vector(vector):
currentkey = master
for i in range(1, len(vector['depths'])):
currentkey = btc.bip32_ckd(currentkey, vector['depths'][i])
print currentkey
print vector['keys'][i][0]
print(currentkey)
print(vector['keys'][i][0])
assert currentkey == vector['keys'][i][
0], 'failed: child priv key, should be: ' + vector['keys'][i][0]
pub = btc.bip32_privtopub(currentkey)
print pub
print vector['keys'][i][1]
print(pub)
print(vector['keys'][i][1])
assert pub == vector['keys'][i][
1], 'failed: child pub key, should be: ' + vector['keys'][i][1]
@ -95,13 +98,13 @@ def test_ckd_pubkeys():
pub = 'xpub68Gmy5EdvgibQVfPdqkBBCHxA5htiqg55crXYuXoQRKfDBFA1WEjWgP6LHhwBZeNK1VTsfTFUHCdrfp1bgwQ9xv5ski8PX9rL2dZXvgGDnw'
new_pub = btc.bip32_ckd(pub, 4)
#how to check it's right?
print new_pub
print(new_pub)
#try to do on hardened, should fail, that's the idea:
with pytest.raises(Exception) as e_info:
new_pub = btc.bip32_ckd(pub, 2**31+1)
def test_bip32_descend():
master = btc.bip32_master_key('\x07'*32)
master = btc.bip32_master_key(b'\x07'*32)
end_key = btc.bip32_descend(master, [2, 3, 10000])
assert end_key=="6856ef965940a1a7b1311dc041050ac0013e326c7ff4e2c677a7694b4f0405c901"
end_key = btc.bip32_descend(master, 2, 5, 4, 5)

13
jmbitcoin/test/test_btc_formatting.py

@ -1,5 +1,7 @@
#! /usr/bin/env python
from __future__ import absolute_import
from __future__ import (absolute_import, division,
print_function, unicode_literals)
from builtins import * # noqa: F401
'''Test bitcoin module data handling'''
import jmbitcoin as btc
@ -16,15 +18,6 @@ def test_bad_code_string():
with pytest.raises(ValueError) as e_info:
btc.get_code_string(i)
@pytest.mark.parametrize(
"st, frm, to, minlen, res",
[
("0101aa", 16, 16, 12, "0000000101aa"),
])
def test_changebase(st, frm, to, minlen, res):
assert btc.changebase(st, frm, to, minlen) == res
#Tests of compactsize encoding, see:
#https://bitcoin.org/en/developer-reference#compactsize-unsigned-integers
#note that little endian is used.

11
jmbitcoin/test/test_ecc_signing.py

@ -1,5 +1,7 @@
#! /usr/bin/env python
from __future__ import absolute_import
from __future__ import (absolute_import, division,
print_function, unicode_literals)
from builtins import * # noqa: F401
'''Test ECDSA signing and other key operations, including legacy message
signature conversion.'''
@ -16,7 +18,7 @@ def test_valid_sigs(setup_ecc):
msg = v['msg']
sig = v['sig']
priv = v['privkey']
assert sig == btc.ecdsa_raw_sign(msg, priv, True, rawmsg=True)+'01'
assert btc.from_string_to_bytes(sig) == btc.ecdsa_raw_sign(msg, priv, True, rawmsg=True)+b'01'
#check that the signature verifies against the key(pair)
pubkey = btc.privtopub(priv)
assert btc.ecdsa_raw_verify(msg, pubkey, sig[:-2], True, rawmsg=True)
@ -24,8 +26,8 @@ def test_valid_sigs(setup_ecc):
for i in [0,1,2,4,7,25,55]:
#corrupt one byte
binsig = binascii.unhexlify(sig)
checksig = binascii.hexlify(binsig[:i] + chr(
(ord(binsig[i])+1) %256) + binsig[i+1:-1])
checksig = binascii.hexlify(binsig[:i] + btc.from_string_to_bytes(chr(
(ord(binsig[i:i+1])+1) %256)) + binsig[i+1:-1])
#this kind of corruption will sometimes lead to an assert
#failure (if the DER format is corrupted) and sometimes lead
@ -36,6 +38,7 @@ def test_valid_sigs(setup_ecc):
continue
assert res==False
@pytest.fixture(scope='module')
def setup_ecc():
global vectors

28
jmbitcoin/test/test_keys.py

@ -1,5 +1,7 @@
#! /usr/bin/env python
from __future__ import absolute_import
from __future__ import (absolute_import, division,
print_function, unicode_literals)
from builtins import * # noqa: F401
'''Public and private key validity and formatting tests.'''
import jmbitcoin as btc
@ -10,30 +12,30 @@ import os
testdir = os.path.dirname(os.path.realpath(__file__))
def test_read_raw_privkeys():
badkeys = ['', '\x07'*31,'\x07'*34, '\x07'*33]
badkeys = [b'', b'\x07'*31,b'\x07'*34, b'\x07'*33]
for b in badkeys:
with pytest.raises(Exception) as e_info:
c, k = btc.read_privkey(b)
goodkeys = [('\x07'*32, False), ('\x07'*32 + '\x01', True)]
goodkeys = [(b'\x07'*32, False), (b'\x07'*32 + b'\x01', True)]
for g in goodkeys:
c, k = btc.read_privkey(g[0])
assert c == g[1]
def test_wif_privkeys_invalid():
#first try to create wif privkey from key of wrong length
bad_privs = ['\x01\x02'*17] #some silly private key but > 33 bytes
bad_privs = [b'\x01\x02'*17] #some silly private key but > 33 bytes
#next try to create wif with correct length but wrong compression byte
bad_privs.append('\x07'*32 + '\x02')
bad_privs.append(b'\x07'*32 + b'\x02')
for priv in bad_privs:
with pytest.raises(Exception) as e_info:
fake_wif = btc.wif_compressed_privkey(binascii.hexlify(priv))
#Create a wif with wrong length
bad_wif1 = btc.bin_to_b58check('\x01\x02'*34, 128)
bad_wif1 = btc.bin_to_b58check(b'\x01\x02'*34, b'\x80')
#Create a wif with wrong compression byte
bad_wif2 = btc.bin_to_b58check('\x07'*33, 128)
bad_wif2 = btc.bin_to_b58check(b'\x07'*33, b'\x80')
for bw in [bad_wif1, bad_wif2]:
with pytest.raises(Exception) as e_info:
fake_priv = btc.from_wif_privkey(bw)
@ -47,7 +49,7 @@ def test_wif_privkeys_invalid():
bad_key = k[0]
for netval in ["mainnet", "testnet"]:
#if using pytest -s ; sanity check to see what's actually being tested
print 'testing this key: ' + bad_key
print('testing this key: ' + bad_key)
#should throw exception
with pytest.raises(Exception) as e_info:
from_wif_key = btc.from_wif_privkey(bad_key,
@ -55,7 +57,7 @@ def test_wif_privkeys_invalid():
#in case the b58 check encoding is valid, we should
#also check if the leading version byte is in the
#expected set, and throw an error if not.
if chr(btc.get_version_byte(bad_key)) not in '\x80\xef':
if chr(btc.get_version_byte(bad_key)) not in b'\x80\xef':
raise Exception("Invalid version byte")
def test_wif_privkeys_valid():
@ -66,14 +68,14 @@ def test_wif_privkeys_valid():
key, hex_key, prop_dict = a
if prop_dict["isPrivkey"]:
netval = "testnet" if prop_dict["isTestnet"] else "mainnet"
print 'testing this key: ' + key
assert chr(btc.get_version_byte(
key)) in '\x80\xef', "not valid network byte"
print('testing this key: ' + key)
assert btc.get_version_byte(
key) in b'\x80\xef', "not valid network byte"
comp = prop_dict["isCompressed"]
from_wif_key = btc.from_wif_privkey(
key,
compressed=comp,
vbyte=btc.get_version_byte(key)-128)
vbyte=btc.from_int_to_byte(btc.from_byte_to_int(btc.get_version_byte(key))-128))
expected_key = hex_key
if comp: expected_key += '01'
assert from_wif_key == expected_key, "Incorrect key decoding: " + \

14
jmbitcoin/test/test_main.py

@ -1,5 +1,7 @@
#! /usr/bin/env python
from __future__ import absolute_import
from __future__ import (absolute_import, division,
print_function, unicode_literals)
from builtins import * # noqa: F401
'''Testing mostly exceptional cases in secp256k1_main.
Some of these may represent code that should be removed, TODO.'''
@ -22,7 +24,7 @@ def test_lpad():
assert btc.lpad("aaaa", "b", 3) == "aaaa"
def test_safe_from_hex():
assert btc.safe_from_hex('ff0100') == '\xff\x01\x00'
assert btc.safe_from_hex('ff0100') == b'\xff\x01\x00'
def test_hash2int():
assert btc.hash_to_int("aa"*32) == \
@ -30,9 +32,9 @@ def test_hash2int():
@btc.hexbin
def dummyforwrap(a, b, c, d="foo", e="bar"):
newa = a+"\x01"
newa = a+b"\x01"
x, y = b
newb = [x+"\x02", y+"\x03"]
newb = [x+b"\x02", y+b"\x03"]
if d == "foo":
return newb[1]
else:
@ -41,7 +43,7 @@ def dummyforwrap(a, b, c, d="foo", e="bar"):
def test_hexbin():
assert dummyforwrap("aa", ["bb", "cc"], True) == "cc03"
assert dummyforwrap("aa", ["bb", "cc"], True, d="baz") == "bb02"
assert dummyforwrap("\xaa", ["\xbb", "\xcc"], False) == "\xcc\x03"
assert dummyforwrap(b"\xaa", [b"\xbb", b"\xcc"], False) == b"\xcc\x03"
def test_add_privkeys():
with pytest.raises(Exception) as e_info:
@ -54,7 +56,7 @@ def test_ecdsa_raw_sign():
assert e_info.match("Invalid hash input")
#build non-raw priv object as input
privraw = "aa"*32
msghash = "\xbb"*32
msghash = b"\xbb"*32
sig = binascii.hexlify(btc.ecdsa_raw_sign(msghash, privraw, False, rawpriv=False, rawmsg=True))
assert sig == "3045022100b81960b4969b423199dea555f562a66b7f49dea5836a0168361f1a5f8a3c8298022003eea7d7ee4462e3e9d6d59220f950564caeb77f7b1cdb42af3c83b013ff3b2f"

7
jmbitcoin/test/test_tx_serialize.py

@ -1,3 +1,6 @@
from __future__ import (absolute_import, division,
print_function, unicode_literals)
from builtins import * # noqa: F401
import jmbitcoin as btc
import pytest
import json
@ -228,7 +231,7 @@ def test_serialization_roundtrip2():
#ignore comment entries
if len(j) < 2:
continue
print j
print(j)
deserialized = btc.deserialize(str(j[0]))
print deserialized
print(deserialized)
assert j[0] == btc.serialize(deserialized)

15
jmclient/jmclient/btc.py

@ -2,8 +2,8 @@
different codebase than joinmarket's own.
"""
#Protocol constants
BTC_P2PK_VBYTE = {"mainnet": 0x00, "testnet": 0x6f}
BTC_P2SH_VBYTE = {"mainnet": 0x05, "testnet": 0xc4}
BTC_P2PK_VBYTE = {"mainnet": b'\x00', "testnet": b'\x6f'}
BTC_P2SH_VBYTE = {"mainnet": b'\x05', "testnet": b'\xc4'}
PODLE_COMMIT_FILE = None
from jmbase.support import get_log
@ -138,22 +138,11 @@ except ImportError:
def b58check_to_bin(addr, version=False):
"""optionally include the version byte for get_version_byte.
Note that jmbitcoin does not need this flag, because it
uses the changebase() function, which here has been restricted.
"""
if not version:
return ebt.DecodeBase58Check(addr)[1:]
else:
return ebt.DecodeBase58Check(addr)
def changebase(inp, frm=256, to=58):
"""Implementation of base58 (*not* b58check) conversion
only. Used in message channel verifiable nick construction.
Explicitly disabling any other conversion for now.
"""
if not (frm==256 and to==58):
raise NotImplementedError
return ebt.base_encode(inp, 58)
def address_to_script(addr):
return etr.Transaction.pay_script(ebt.TYPE_ADDRESS, addr)

6
jmclient/jmclient/client_protocol.py

@ -62,7 +62,7 @@ class JMClientProtocol(amp.AMP):
self.nick_pubkey = btc.privtopub(self.nick_priv)
self.nick_pkh_raw = hashlib.sha256(self.nick_pubkey).digest()[
:self.nick_hashlen]
self.nick_pkh = btc.changebase(self.nick_pkh_raw, 256, 58)
self.nick_pkh = btc.b58encode(self.nick_pkh_raw)
#right pad to maximum possible; b58 is not fixed length.
#Use 'O' as one of the 4 not included chars in base58.
self.nick_pkh += 'O' * (self.nick_maxencoded - len(self.nick_pkh))
@ -111,9 +111,9 @@ class JMClientProtocol(amp.AMP):
nick_stripped = nick[2:2 + max_encoded]
#strip right padding
nick_unpadded = ''.join([x for x in nick_stripped if x != 'O'])
if not nick_unpadded == btc.changebase(nick_pkh_raw, 256, 58):
if not nick_unpadded == btc.b58encode(nick_pkh_raw):
jlog.debug("Nick hash check failed, expected: " + str(nick_unpadded)
+ ", got: " + str(btc.changebase(nick_pkh_raw, 256, 58)))
+ ", got: " + str(btc.b58encode(nick_pkh_raw)))
verif_result = False
d = self.callRemote(commands.JMMsgSignatureVerify,
verif_result=verif_result,

9
jmclient/jmclient/cryptoengine.py

@ -3,6 +3,7 @@ from __future__ import print_function, absolute_import, division, unicode_litera
from binascii import hexlify, unhexlify
from collections import OrderedDict
import struct
from . import btc
@ -12,7 +13,7 @@ from .configure import get_network
TYPE_P2PKH, TYPE_P2SH_P2WPKH, TYPE_P2WPKH = range(3)
NET_MAINNET, NET_TESTNET = range(2)
NET_MAP = {'mainnet': NET_MAINNET, 'testnet': NET_TESTNET}
WIF_PREFIX_MAP = {'mainnet': 0x80, 'testnet': 0xef}
WIF_PREFIX_MAP = {'mainnet': b'\x80', 'testnet': b'\xef'}
BIP44_COIN_MAP = {'mainnet': 2**31, 'testnet': 2**31 + 1}
@ -123,11 +124,11 @@ class BTCEngine(object):
@classmethod
def wif_to_privkey(cls, wif):
raw = btc.b58check_to_bin(wif)
vbyte = btc.get_version_byte(wif)
vbyte = struct.unpack('B', btc.get_version_byte(wif))[0]
if (btc.BTC_P2PK_VBYTE[get_network()] + cls.WIF_PREFIX) & 0xff == vbyte:
if (struct.unpack('B', btc.BTC_P2PK_VBYTE[get_network()])[0] + struct.unpack('B', cls.WIF_PREFIX)[0]) & 0xff == vbyte:
key_type = TYPE_P2PKH
elif (btc.BTC_P2SH_VBYTE[get_network()] + cls.WIF_PREFIX) & 0xff == vbyte:
elif (struct.unpack('B', btc.BTC_P2SH_VBYTE[get_network()])[0] + struct.unpack('B', cls.WIF_PREFIX)[0]) & 0xff == vbyte:
key_type = TYPE_P2SH_P2WPKH
else:
key_type = None

3
jmclient/jmclient/maker.py

@ -7,6 +7,7 @@ import sys
from binascii import unhexlify
import btc
from btc import SerializationError, SerializationTruncationError
from jmclient.configure import jm_single
from jmbase.support import get_log
from jmclient.support import (calc_cj_fee)
@ -110,7 +111,7 @@ class Maker(object):
"""
try:
tx = btc.deserialize(txhex)
except IndexError as e:
except (IndexError, SerializationError, SerializationTruncationError) as e:
return (False, 'malformed txhex. ' + repr(e))
jlog.info('obtained tx\n' + pprint.pformat(tx))
goodtx, errmsg = self.verify_unsigned_tx(tx, offerinfo)

1
jmclient/test/commontest.py

@ -149,7 +149,6 @@ def make_sign_and_push(ins_full,
binarize_tx(de_tx)
de_tx = wallet.sign_tx(de_tx, scripts, hashcode=hashcode)
#pushtx returns False on any error
print(de_tx)
tx = binascii.hexlify(btc.serialize(de_tx))
push_succeed = jm_single().bc_interface.pushtx(tx)
if push_succeed:

2
jmdaemon/test/test_message_channel.py

@ -27,7 +27,7 @@ def make_valid_nick(i=0):
nick_priv = hashlib.sha256(chr(i)*16).hexdigest() + '01'
nick_pubkey = bitcoin.privtopub(nick_priv)
nick_pkh_raw = hashlib.sha256(nick_pubkey).digest()[:NICK_HASH_LENGTH]
nick_pkh = bitcoin.changebase(nick_pkh_raw, 256, 58)
nick_pkh = bitcoin.b58encode(nick_pkh_raw)
#right pad to maximum possible; b58 is not fixed length.
#Use 'O' as one of the 4 not included chars in base58.
nick_pkh += 'O' * (NICK_MAX_ENCODED - len(nick_pkh))

2
scripts/obwatch/ob-watcher.py

@ -412,7 +412,7 @@ def get_dummy_nick():
privacy, a conformant nick is created based on a random
pseudo-pubkey."""
nick_pkh_raw = hashlib.sha256(os.urandom(10)).digest()[:NICK_HASH_LENGTH]
nick_pkh = btc.changebase(nick_pkh_raw, 256, 58)
nick_pkh = btc.b58encode(nick_pkh_raw)
#right pad to maximum possible; b58 is not fixed length.
#Use 'O' as one of the 4 not included chars in base58.
nick_pkh += 'O' * (NICK_MAX_ENCODED - len(nick_pkh))

Loading…
Cancel
Save