Browse Source

Completed coverage of jmbitcoin, jmbase, jmclient

Includes moving coverage and pytest files to root directory.
master
Adam Gibson 9 years ago
parent
commit
0ea2dc5301
No known key found for this signature in database
GPG Key ID: B3AE09F1E9A3197A
  1. 12
      .coveragerc
  2. 24
      conftest.py
  3. 2
      jmbase/jmbase/support.py
  4. 26
      jmbase/test/test_base_support.py
  5. 35
      jmbase/test/test_commands.py
  6. 4
      jmbitcoin/jmbitcoin/bci.py
  7. 16
      jmbitcoin/jmbitcoin/secp256k1_main.py
  8. 17
      jmbitcoin/jmbitcoin/secp256k1_transaction.py
  9. 7
      jmbitcoin/test/test_addresses.py
  10. 21
      jmbitcoin/test/test_btc_formatting.py
  11. 5
      jmbitcoin/test/test_ecc_signing.py
  12. 7
      jmbitcoin/test/test_keys.py
  13. 63
      jmbitcoin/test/test_main.py
  14. 5
      jmbitcoin/test/test_tx_serialize.py
  15. 11
      jmclient/jmclient/client_protocol.py
  16. 6
      jmclient/test/.coveragerc
  17. 24
      jmclient/test/commontest.py
  18. 4
      jmclient/test/test_blockr.py
  19. 94
      jmclient/test/test_client_protocol.py
  20. 2
      jmclient/test/test_podle.py
  21. 227
      jmclient/test/test_tx_creation.py
  22. 7
      jmclient/test/test_valid_addresses.py
  23. 4
      jmclient/test/test_wallets.py
  24. 4
      setup.cfg
  25. 168
      test/commontest.py
  26. 76
      test/conftest.py

12
.coveragerc

@ -0,0 +1,12 @@
# .coveragerc to control coverage.py
[run]
omit =
jmclient/jmclient/jsonrpc.py
jmclient/jmclient/slowaes.py
jmclient/jmclient/btc.py
jmclient/test/*
jmclient/setup.py
jmbitcoin/test/*
jmbitcoin/setup.py
jmbase/test/*
jmbase/setup.py

24
jmclient/test/conftest.py → conftest.py

@ -2,14 +2,34 @@ import pytest
import os
import time
import subprocess
from commontest import local_command
from jmclient import load_program_config
bitcoin_path = None
bitcoin_conf = None
bitcoin_rpcpassword = None
bitcoin_rpcusername = None
def local_command(command, bg=False, redirect=''):
if redirect == 'NULL':
if OS == 'Windows':
command.append(' > NUL 2>&1')
elif OS == 'Linux':
command.extend(['>', '/dev/null', '2>&1'])
else:
print "OS not recognised, quitting."
elif redirect:
command.extend(['>', redirect])
if bg:
#using subprocess.PIPE seems to cause problems
FNULL = open(os.devnull, 'w')
return subprocess.Popen(command,
stdout=FNULL,
stderr=subprocess.STDOUT,
close_fds=True)
else:
#in case of foreground execution, we can use the output; if not
#it doesn't matter
return subprocess.check_output(command)
def pytest_addoption(parser):
parser.addoption("--btcroot", action="store", default='',

2
jmbase/jmbase/support.py

@ -51,7 +51,7 @@ def get_log():
def chunks(d, n):
return [d[x:x + n] for x in xrange(0, len(d), n)]
def get_password(msg):
def get_password(msg): #pragma: no cover
return getpass(msg)
def debug_dump_object(obj, skip_fields=None):

26
jmbase/test/test_base_support.py

@ -0,0 +1,26 @@
#! /usr/bin/env python
from __future__ import print_function
from jmbase.support import debug_dump_object, get_password, get_log, joinmarket_alert
import pytest
def test_debug_dump_object():
joinmarket_alert[0] = "dummy jm alert"
class TestObj(object):
def __init__(self):
self.x = "foo"
self.password = "bar"
self.y = "baz"
to = TestObj()
debug_dump_object(to)
to.given_password = "baa"
debug_dump_object(to)
to.extradict = {1:2, 3:4}
debug_dump_object(to)
to.extralist = ["dummy", "list"]
debug_dump_object(to)
to.extradata = 100
debug_dump_object(to, skip_fields="y")

35
jmbase/test/test_commands.py

@ -8,6 +8,9 @@ from twisted.internet.error import (ConnectionLost, ConnectionAborted,
from twisted.protocols.amp import UnknownRemoteError
from twisted.python import failure
from twisted.protocols import amp
from twisted.trial import unittest
from twisted.internet import reactor, task
from jmbase.commands import *
import json
@ -36,12 +39,12 @@ class JMBaseProtocol(amp.AMP):
d.addErrback(self.defaultErrback)
def show_receipt(name, *args):
tmsg("Received msgtype: " + name + ", args: " + ",".join([str(x) for x in args]))
print("Received msgtype: " + name + ", args: " + ",".join([str(x) for x in args]))
def end_test():
global test_completed
test_completed = True
reactor.stop()
class JMTestServerProtocol(JMBaseProtocol):
@ -131,7 +134,6 @@ class JMTestServerProtocol(JMBaseProtocol):
class JMTestClientProtocol(JMBaseProtocol):
def connectionMade(self):
print("Connection made")
self.clientStart()
def clientStart(self):
@ -193,7 +195,7 @@ class JMTestClientProtocol(JMBaseProtocol):
def on_JM_SIG_RECEIVED(self, nick, sig):
show_receipt("JMSIGRECEIVED", nick, sig)
#end of test
reactor.callLater(2, end_test)
reactor.callLater(1, end_test)
return {'accepted': True}
@JMRequestMsgSig.responder
@ -226,10 +228,21 @@ class JMTestClientProtocolFactory(protocol.ClientFactory):
class JMTestServerProtocolFactory(protocol.ServerFactory):
protocol = JMTestServerProtocol
def test_jm_protocol():
reactor.connectTCP("localhost", 27184, JMTestClientProtocolFactory())
reactor.listenTCP(27184, JMTestServerProtocolFactory())
reactor.run()
print("Got here")
if not test_completed:
raise Exception("Failed test")
class TrialTestJMProto(unittest.TestCase):
def setUp(self):
print("setUp()")
self.port = reactor.listenTCP(27184, JMTestServerProtocolFactory())
self.addCleanup(self.port.stopListening)
def cb(client):
self.client = client
self.addCleanup(self.client.transport.loseConnection)
creator = protocol.ClientCreator(reactor, JMTestClientProtocol)
creator.connectTCP("localhost", 27184).addCallback(cb)
def test_waiter(self):
print("test_main()")
return task.deferLater(reactor, 3, self._called_by_deffered)
def _called_by_deffered(self):
pass

4
jmbitcoin/jmbitcoin/bci.py

@ -5,7 +5,7 @@ import sys
import time
import platform
from jmbase.support import get_log
if platform.system() == "Windows":
if platform.system() == "Windows": #pragma: no cover
import ssl
import urllib2
else:
@ -18,7 +18,7 @@ log = get_log()
# Makes a request to a given URL (first arg) and optional params (second arg)
def make_request(*args):
if platform.system() == "Windows":
if platform.system() == "Windows": #pragma: no cover
sctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
sh = urllib2.HTTPSHandler(debuglevel=0, context=sctx)
opener = urllib2.build_opener(sh)

16
jmbitcoin/jmbitcoin/secp256k1_main.py

@ -131,15 +131,8 @@ if sys.version_info.major == 2:
return result
else:
raise NotImplementedError("Only Python2 currently supported by btc interface")
raise NotImplementedError("Only Python2 currently supported by btc interface") #pragma: no cover
def tweak_mul(point, scalar):
"""Temporary hack because Windows binding had a bug in tweak_mul.
Can be removed when Windows binding is updated.
"""
return secp256k1._tweak_public(point,
secp256k1.lib.secp256k1_ec_pubkey_tweak_mul,
scalar)
"""PoDLE related primitives
"""
def getG(compressed=True):
@ -405,7 +398,7 @@ def ecdsa_raw_sign(msg,
'''Take the binary message msg and sign it with the private key
priv.
By default priv is just a 32 byte string, if rawpriv is false
it is assumed to be DER encoded.
it is assumed to be hex encoded (note only works if usehex=False).
If rawmsg is True, no sha256 hash is applied to msg before signing.
In this case, msg must be a precalculated hash (256 bit).
If rawmsg is False, the secp256k1 lib will hash the message as part
@ -422,14 +415,15 @@ def ecdsa_raw_sign(msg,
newpriv = secp256k1.PrivateKey(p, raw=True, ctx=ctx)
else:
newpriv = secp256k1.PrivateKey(priv, raw=False, ctx=ctx)
if usenonce:
#Donations, thus custom nonce, currently disabled, hence not covered.
if usenonce: #pragma: no cover
if len(usenonce) != 32:
raise ValueError("Invalid nonce passed to ecdsa_sign: " + str(
usenonce))
nf = ffi.addressof(_noncefunc.lib, "nonce_function_rand")
ndata = ffi.new("char [32]", usenonce)
usenonce = (nf, ndata)
if usenonce:
if usenonce: #pragma: no cover
sig = newpriv.ecdsa_sign(msg, raw=rawmsg, custom_nonce=usenonce)
else:
#partial fix for secp256k1-transient not including customnonce;

17
jmbitcoin/jmbitcoin/secp256k1_transaction.py

@ -8,7 +8,7 @@ is_python2 = sys.version_info.major == 2
### Hex to bin converter and vice versa for objects
def json_is_base(obj, base):
if not is_python2 and isinstance(obj, bytes):
if not is_python2 and isinstance(obj, bytes): #pragma: no cover
return False
alpha = get_code_string(base)
@ -270,7 +270,7 @@ def serialize_script_unit(unit):
if unit < 16:
return from_int_to_byte(unit + 80)
else:
return bytes([unit])
return from_int_to_byte(unit)
elif unit is None:
return b'\x00'
else:
@ -287,11 +287,22 @@ def serialize_script_unit(unit):
if is_python2:
def serialize_script(script):
#bugfix: if *every* item in the script is of type int,
#for example a script of OP_TRUE, or None,
#then the previous version would always report json_is_base as True,
#resulting in an infinite loop (look who's demented now).
#There is no easy solution without being less flexible;
#here we default to returning a hex serialization in cases where
#there are no strings to use as flags.
if all([(isinstance(x, int) or x is None) for x in script]):
#no indication given whether output should be hex or binary, so..?
return binascii.hexlify(''.join(map(serialize_script_unit, script)))
if json_is_base(script, 16):
return binascii.hexlify(serialize_script(json_changebase(
script, lambda x: binascii.unhexlify(x))))
return ''.join(map(serialize_script_unit, script))
else:
else: #pragma: no cover
def serialize_script(script):
if json_is_base(script, 16):

7
jmbitcoin/test/test_addresses.py

@ -1,7 +1,8 @@
import jmbitcoin as btc
import json
import pytest
import os
testdir = os.path.dirname(os.path.realpath(__file__))
def validate_address(addr, nettype):
"""A mock of jmclient.validate_address
@ -31,7 +32,7 @@ def validate_address(addr, nettype):
])
def test_b58_invalid_addresses(net):
#none of these are valid as any kind of key or address
with open("base58_keys_invalid.json", "r") as f:
with open(os.path.join(testdir,"base58_keys_invalid.json"), "r") as f:
json_data = f.read()
invalid_key_list = json.loads(json_data)
for k in invalid_key_list:
@ -40,7 +41,7 @@ def test_b58_invalid_addresses(net):
assert res == False, "Incorrectly validated address: " + bad_key + " with message: " + message
def test_b58_valid_addresses():
with open("base58_keys_valid.json", "r") as f:
with open(os.path.join(testdir,"base58_keys_valid.json"), "r") as f:
json_data = f.read()
valid_keys_list = json.loads(json_data)
for a in valid_keys_list:

21
jmbitcoin/test/test_btc_formatting.py

@ -45,3 +45,24 @@ def test_changebase(st, frm, to, minlen, res):
])
def test_compact_size(num, compactsize):
assert btc.num_to_var_int(num) == binascii.unhexlify(compactsize)
@pytest.mark.parametrize("frm, to", [
(("16001405481b7f1d90c5a167a15b00e8af76eb6984ea59"),
["001405481b7f1d90c5a167a15b00e8af76eb6984ea59"]),
(("483045022100ad0dda327945e581a5effd83d75d76a9f07c3128f4dc6d25a54"
"e5ad5dd629bd00220487a992959bd540dbc335c655e6485ebfb394129eb48038f"
"0a2d319782f7cb690121039319452b6abafb5fcf06096196d0c141b8bd18a3de7"
"e9b9352da800d671ccd84"),
["3045022100ad0dda327945e581a5effd83d75d76a9f07c3128f4dc6d25a54e5ad5dd629bd00220487a992959bd540dbc335c655e6485ebfb394129eb48038f0a2d319782f7cb6901",
"039319452b6abafb5fcf06096196d0c141b8bd18a3de7e9b9352da800d671ccd84"]),
(("51"), [1]),
(("00"), [None]),
(("510000"), [1, None, None]),
(("636505aaaaaaaaaa53"), [99, 101, "aaaaaaaaaa", 3]),
(("51" + "4d0101" + "aa"*257), [1, "aa"*257]),
(("4e" + "03000100" + "aa"*65539), ["aa"*65539]),
])
def test_deserialize_script(frm, to):
#print(len(btc.deserialize_script(frm)[0]))
assert btc.deserialize_script(frm) == to
assert btc.serialize_script(to) == frm

5
jmbitcoin/test/test_ecc_signing.py

@ -7,7 +7,8 @@ import jmbitcoin as btc
import binascii
import json
import pytest
import os
testdir = os.path.dirname(os.path.realpath(__file__))
vectors = None
def test_valid_sigs(setup_ecc):
@ -38,6 +39,6 @@ def test_valid_sigs(setup_ecc):
@pytest.fixture(scope='module')
def setup_ecc():
global vectors
with open("ecc_sigs_rfc6979_valid.json", "r") as f:
with open(os.path.join(testdir, "ecc_sigs_rfc6979_valid.json"), "r") as f:
json_data = f.read()
vectors = json.loads(json_data)

7
jmbitcoin/test/test_keys.py

@ -6,7 +6,8 @@ import jmbitcoin as btc
import binascii
import json
import pytest
import os
testdir = os.path.dirname(os.path.realpath(__file__))
def test_read_raw_privkeys():
badkeys = ['', '\x07'*31,'\x07'*34, '\x07'*33]
@ -39,7 +40,7 @@ def test_wif_privkeys_invalid():
#Some invalid b58 from bitcoin repo;
#none of these are valid as any kind of key or address
with open("base58_keys_invalid.json", "r") as f:
with open(os.path.join(testdir,"base58_keys_invalid.json"), "r") as f:
json_data = f.read()
invalid_key_list = json.loads(json_data)
for k in invalid_key_list:
@ -58,7 +59,7 @@ def test_wif_privkeys_invalid():
raise Exception("Invalid version byte")
def test_wif_privkeys_valid():
with open("base58_keys_valid.json", "r") as f:
with open(os.path.join(testdir,"base58_keys_valid.json"), "r") as f:
json_data = f.read()
valid_keys_list = json.loads(json_data)
for a in valid_keys_list:

63
jmbitcoin/test/test_main.py

@ -0,0 +1,63 @@
#! /usr/bin/env python
from __future__ import absolute_import
'''Testing mostly exceptional cases in secp256k1_main.
Some of these may represent code that should be removed, TODO.'''
import jmbitcoin as btc
import binascii
import json
import pytest
import os
testdir = os.path.dirname(os.path.realpath(__file__))
def test_hex2b58check():
assert btc.hex_to_b58check("aa"*32) == "12JAT9y2EcnV6DPUGikLJYjWwk5UmUEFXRiQVmTbfSLbL3njFzp"
def test_bindblsha():
assert btc.bin_dbl_sha256("abc") == binascii.unhexlify(
"4f8b42c22dd3729b519ba6f68d2da7cc5b2d606d05daed5ad5128cc03e6c6358")
def test_lpad():
assert btc.lpad("aaaa", "b", 5) == "baaaa"
assert btc.lpad("aaaa", "b", 4) == "aaaa"
assert btc.lpad("aaaa", "b", 3) == "aaaa"
def test_safe_from_hex():
assert btc.safe_from_hex('ff0100') == '\xff\x01\x00'
def test_hash2int():
assert btc.hash_to_int("aa"*32) == \
77194726158210796949047323339125271902179989777093709359638389338608753093290
@btc.hexbin
def dummyforwrap(a, b, c, d="foo", e="bar"):
newa = a+"\x01"
x, y = b
newb = [x+"\x02", y+"\x03"]
if d == "foo":
return newb[1]
else:
return newb[0]
def test_hexbin():
assert dummyforwrap("aa", ["bb", "cc"], True) == "cc03"
assert dummyforwrap("aa", ["bb", "cc"], True, d="baz") == "bb02"
assert dummyforwrap("\xaa", ["\xbb", "\xcc"], False) == "\xcc\x03"
def test_add_privkeys():
with pytest.raises(Exception) as e_info:
btc.add_privkeys("aa"*32, "bb"*32+"01", True)
def test_ecdsa_raw_sign():
msg = "aa"*31
with pytest.raises(Exception) as e_info:
btc.ecdsa_raw_sign(msg, None, None, rawmsg=True)
assert e_info.match("Invalid hash input")
#build non-raw priv object as input
privraw = "aa"*32
msghash = "\xbb"*32
sig = binascii.hexlify(btc.ecdsa_raw_sign(msghash, privraw, False, rawpriv=False, rawmsg=True))
assert sig == "3045022100b81960b4969b423199dea555f562a66b7f49dea5836a0168361f1a5f8a3c8298022003eea7d7ee4462e3e9d6d59220f950564caeb77f7b1cdb42af3c83b013ff3b2f"

5
jmbitcoin/test/test_tx_serialize.py

@ -2,7 +2,8 @@ import sys
import jmbitcoin as btc
import pytest
import json
import os
testdir = os.path.dirname(os.path.realpath(__file__))
#TODO: fold these examples into the tx_valid.json file
@pytest.mark.parametrize(
@ -221,7 +222,7 @@ def test_serialization_roundtrip2():
#still valid.
#Note that of course this is only a serialization, not validity test, so
#only currently of very limited significance
with open("tx_valid.json", "r") as f:
with open(os.path.join(testdir,"tx_valid.json"), "r") as f:
json_data = f.read()
valid_txs = json.loads(json_data)
for j in valid_txs:

11
jmclient/jmclient/client_protocol.py

@ -44,14 +44,6 @@ class JMTakerClientProtocol(amp.AMP):
self.nick_priv = nick_priv
self.shutdown_requested = False
lc = LoopingCall(self.checkForShutdown)
lc.start(0.2)
def checkForShutdown(self):
if self.shutdown_requested:
jlog.info("Client shutdown was requested, complying.")
self.shutdown_requested = False
reactor.stop()
def checkClientResponse(self, response):
"""A generic check of client acceptance; any failure
@ -254,6 +246,7 @@ class JMTakerClientProtocolFactory(protocol.ClientFactory):
return JMTakerClientProtocol(self, self.taker)
def start_reactor(host, port, factory, ish=True):
def start_reactor(host, port, factory, ish=True): #pragma: no cover
#(Cannot start the reactor in tests)
reactor.connectTCP(host, port, factory)
reactor.run(installSignalHandlers=ish)

6
jmclient/test/.coveragerc

@ -1,6 +0,0 @@
# .coveragerc to control coverage.py
[run]
omit =
../jmclient/jsonrpc.py
../jmclient/slowaes.py
../jmclient/btc.py

24
jmclient/test/commontest.py

@ -160,30 +160,6 @@ def make_sign_and_push(ins_full,
else:
return False
def local_command(command, bg=False, redirect=''):
if redirect == 'NULL':
if OS == 'Windows':
command.append(' > NUL 2>&1')
elif OS == 'Linux':
command.extend(['>', '/dev/null', '2>&1'])
else:
print "OS not recognised, quitting."
elif redirect:
command.extend(['>', redirect])
if bg:
#using subprocess.PIPE seems to cause problems
FNULL = open(os.devnull, 'w')
return subprocess.Popen(command,
stdout=FNULL,
stderr=subprocess.STDOUT,
close_fds=True)
else:
#in case of foreground execution, we can use the output; if not
#it doesn't matter
return subprocess.check_output(command)
def make_wallets(n,
wallet_structures=None,
mean_amt=1,

4
jmclient/test/test_blockr.py

@ -18,6 +18,10 @@ log = get_log()
#TODO: some kind of mainnet testing, harder.
blockr_root_url = "https://tbtc.blockr.io/api/v1/"
def test_bci_bad_req():
with pytest.raises(Exception) as e_info:
btc.make_request("1")
def test_blockr_bad_request():
with pytest.raises(Exception) as e_info:
btc.make_request_blockr(blockr_root_url+"address/txs/", "0000")

94
jmclient/test/test_client_protocol.py

@ -9,18 +9,22 @@ from jmclient.client_protocol import JMProtocolError, JMTakerClientProtocol
import os
from twisted.python.log import startLogging, err
from twisted.python.log import msg as tmsg
from twisted.internet import protocol, reactor
from twisted.internet import protocol, reactor, task
from twisted.internet.error import (ConnectionLost, ConnectionAborted,
ConnectionClosed, ConnectionDone)
from twisted.protocols.amp import UnknownRemoteError
from twisted.python import failure
from twisted.protocols import amp
from twisted.trial import unittest
from jmbase.commands import *
from taker_test_data import t_raw_signed_tx
import json
import time
import jmbitcoin as bitcoin
import twisted
twisted.internet.base.DelayedCall.debug = True
test_completed = False
clientfactory = None
@ -62,6 +66,9 @@ class DummyTaker(Taker):
def on_sig(self, nick, sigb64):
"""For test, we exit 'early' on first message, since this marks the end
of client-server communication with the daemon.
"""
jlog.debug("We got a sig: " + sigb64)
end_test()
return True
@ -88,30 +95,13 @@ def show_receipt(name, *args):
tmsg("Received msgtype: " + name + ", args: " + ",".join([str(x) for x in args]))
def end_client(client):
client.shutdown_requested = True
pass
def end_test():
#global runno
global test_completed
#runno += 1
#jlog.info("Updated runno to: " + str(runno))
#taker = DummyTaker(None, None)
#if runno == 1:
# jlog.info("Run number was less than 2")
# taker.set_fail_init(True)
# taker.set_fail_utxos(False)
# cfactory = JMTakerClientProtocolFactory(taker)
# reactor.connectTCP("localhost", 27184, cfactory)
# return
#elif runno == 2:
# taker.set_fail_init(False)
# taker.set_fail_utxos(True)
# cfactory = JMTakerClientProtocolFactory(taker)
# reactor.connectTCP("localhost", 27184, cfactory)
# return
test_completed = True
client = clientfactory.getClient()
reactor.callLater(2, end_client, client)
reactor.callLater(1, end_client, client)
class JMTestServerProtocol(JMBaseProtocol):
@ -215,32 +205,38 @@ class DummyClientProtocolFactory(JMTakerClientProtocolFactory):
def buildProtocol(self, addr):
return JMTakerClientProtocol(self, self.taker, nick_priv="aa"*32)
def test_jm_protocol():
"""We cannot use parametrize for different options as
we can't run in sequence; hence, parameters hardcoded as lists here
"""
params = [[False, False], [True, False], [False, True], [-1, False]]
global clientfactory
load_program_config()
jm_single().maker_timeout_sec = 1
reactor.listenTCP(27184, JMTestServerProtocolFactory())
clientfactories = []
takers = [DummyTaker(None, None) for _ in range(len(params))]
for i, p in enumerate(params):
takers[i].set_fail_init(p[0])
takers[i].set_fail_utxos(p[1])
if i != 0:
clientfactories.append(JMTakerClientProtocolFactory(takers[i]))
reactor.connectTCP("localhost", 27184, clientfactories[i])
else:
clientfactories.append(DummyClientProtocolFactory(takers[i]))
clientfactory = clientfactories[0]
start_reactor("localhost", 27184, clientfactories[0])
print("Got here")
if not test_completed:
raise Exception("Failed test")
class TrialTestJMClientProto(unittest.TestCase):
def setUp(self):
global clientfactory
print("setUp()")
params = [[False, False], [True, False], [False, True], [-1, False]]
load_program_config()
jm_single().maker_timeout_sec = 1
self.port = reactor.listenTCP(27184, JMTestServerProtocolFactory())
self.addCleanup(self.port.stopListening)
def cb(client):
self.client = client
self.addCleanup(self.client.transport.loseConnection)
clientfactories = []
takers = [DummyTaker(None, None) for _ in range(len(params))]
for i, p in enumerate(params):
takers[i].set_fail_init(p[0])
takers[i].set_fail_utxos(p[1])
if i != 0:
clientfactories.append(JMTakerClientProtocolFactory(takers[i]))
clientconn = reactor.connectTCP("localhost", 27184, clientfactories[i])
self.addCleanup(clientconn.disconnect)
else:
clientfactories.append(DummyClientProtocolFactory(takers[i]))
clientfactory = clientfactories[0]
clientconn = reactor.connectTCP("localhost", 27184, clientfactories[0])
self.addCleanup(clientconn.disconnect)
print("Got here")
def test_waiter(self):
print("test_main()")
return task.deferLater(reactor, 3, self._called_by_deffered)
def _called_by_deffered(self):
pass

2
jmclient/test/test_podle.py

@ -9,7 +9,7 @@ import pytest
import copy
import subprocess
import signal
from commontest import local_command, make_wallets
from commontest import make_wallets
import time
from pprint import pformat
from jmclient import (load_program_config, get_log, jm_single, generate_podle,

227
jmclient/test/test_tx_creation.py

@ -0,0 +1,227 @@
#! /usr/bin/env python
from __future__ import absolute_import
'''Test of unusual transaction types creation and push to
network to check validity.'''
import sys
import os
import time
import binascii
import random
from commontest import make_wallets, make_sign_and_push
import jmbitcoin as bitcoin
import pytest
from jmclient import (load_program_config, jm_single, sync_wallet,
get_p2pk_vbyte, get_log, Wallet, select_gradual,
select, select_greedy, select_greediest, estimate_tx_fee)
log = get_log()
#just a random selection of pubkeys for receiving multisigs;
#if we ever need the privkeys, they are in a json file somewhere
vpubs = ["03e9a06e539d6bf5cf1ca5c41b59121fa3df07a338322405a312c67b6349a707e9",
"0280125e42c1984923106e281615dfada44d38c4125c005963b322427110d709d6",
"02726fa5b19e9406aaa46ee22fd9e81a09dd5eb7c87505b93a11efcf4b945e778c",
"03600a739be32a14938680b3b3d61b51f217a60df118160d0decab22c9e1329862",
"028a2f126e3999ff66d01dcb101ab526d3aa1bf5cbdc4bde14950a4cead95f6fcb",
"02bea84d70e74f7603746b62d79bf035e16d982b56e6a1ee07dfd3b9130e8a2ad9"]
@pytest.mark.parametrize(
"nw, wallet_structures, mean_amt, sdev_amt, amount, pubs, k", [
(1, [[2, 1, 4, 0, 0]], 4, 1.4, 600000000, vpubs[1:4], 2),
(1, [[3, 3, 0, 0, 3]], 4, 1.4, 100000000, vpubs[:4], 3),
])
def test_create_p2sh_output_tx(setup_tx_creation, nw, wallet_structures,
mean_amt, sdev_amt, amount, pubs, k):
wallets = make_wallets(nw, wallet_structures, mean_amt, sdev_amt)
for w in wallets.values():
sync_wallet(w['wallet'])
for k, w in enumerate(wallets.values()):
wallet = w['wallet']
ins_full = wallet.select_utxos(0, amount)
script = bitcoin.mk_multisig_script(pubs, k)
#try the alternative argument passing
pubs.append(k)
script2 = bitcoin.mk_multisig_script(*pubs)
assert script2 == script
output_addr = bitcoin.scriptaddr(script, magicbyte=196)
txid = make_sign_and_push(ins_full,
wallet,
amount,
output_addr=output_addr)
assert txid
def test_script_to_address(setup_tx_creation):
sample_script = "a914307f099a3bfedec9a09682238db491bade1b467f87"
assert bitcoin.script_to_address(
sample_script) == "367SYUMqo1Fi4tQsycnmCtB6Ces1Z7EZLH"
assert bitcoin.script_to_address(
sample_script, vbyte=196) == "2MwfecDHsQTm4Gg3RekQdpqAMR15BJrjfRF"
def test_mktx(setup_tx_creation):
"""Testing exceptional conditions; not guaranteed
to create valid tx objects"""
#outpoint structure must be {"outpoint":{"hash":hash, "index": num}}
ins = [{'outpoint': {"hash":x*32, "index":0},
"script": "", "sequence": 4294967295} for x in ["a", "b", "c"]]
pub = vpubs[0]
addr = bitcoin.pubkey_to_address(pub, magicbyte=get_p2pk_vbyte())
script = bitcoin.address_to_script(addr)
outs = [script + ":1000", addr+":2000",{"script":script, "value":3000}]
tx = bitcoin.mktx(ins, outs)
print(tx)
#rewrite with invalid output
outs.append({"foo": "bar"})
with pytest.raises(Exception) as e_info:
tx = bitcoin.mktx(ins, outs)
def test_bintxhash(setup_tx_creation):
tx = "abcd"
x = bitcoin.bin_txhash(tx)
assert binascii.hexlify(x) == "943cbb9637d9b16b92529b182ed0257ec729afc897ac0522b2ed2a86f6809917"
def test_all_same_priv(setup_tx_creation):
#recipient
priv = "aa"*32 + "01"
addr = bitcoin.privkey_to_address(priv, magicbyte=get_p2pk_vbyte())
wallet = make_wallets(1, [[1,0,0,0,0]], 1)[0]['wallet']
#make another utxo on the same address
addrinwallet = wallet.get_addr(0,0,0)
jm_single().bc_interface.grab_coins(addrinwallet, 1)
sync_wallet(wallet)
insfull = wallet.select_utxos(0, 110000000)
outs = [{"address": addr, "value": 1000000}]
ins = insfull.keys()
tx = bitcoin.mktx(ins, outs)
tx = bitcoin.signall(tx, wallet.get_key_from_addr(addrinwallet))
@pytest.mark.parametrize(
"signall, mktxlist",
[
(True, False),
(False, True),
])
def test_verify_tx_input(setup_tx_creation, signall, mktxlist):
priv = "aa"*32 + "01"
addr = bitcoin.privkey_to_address(priv, magicbyte=get_p2pk_vbyte())
wallet = make_wallets(1, [[2,0,0,0,0]], 1)[0]['wallet']
sync_wallet(wallet)
insfull = wallet.select_utxos(0, 110000000)
print(insfull)
if not mktxlist:
outs = [{"address": addr, "value": 1000000}]
ins = insfull.keys()
tx = bitcoin.mktx(ins, outs)
else:
out1 = addr+":1000000"
ins0, ins1 = insfull.keys()
print("INS0 is: " + str(ins0))
print("INS1 is: " + str(ins1))
tx = bitcoin.mktx(ins0, ins1, out1)
desertx = bitcoin.deserialize(tx)
print(desertx)
if signall:
privdict = {}
for index, ins in enumerate(desertx['ins']):
utxo = ins['outpoint']['hash'] + ':' + str(ins['outpoint']['index'])
ad = insfull[utxo]['address']
priv = wallet.get_key_from_addr(ad)
privdict[utxo] = priv
tx = bitcoin.signall(tx, privdict)
else:
for index, ins in enumerate(desertx['ins']):
utxo = ins['outpoint']['hash'] + ':' + str(ins['outpoint']['index'])
ad = insfull[utxo]['address']
priv = wallet.get_key_from_addr(ad)
if index % 2:
tx = binascii.unhexlify(tx)
tx = bitcoin.sign(tx, index, priv)
if index % 2:
tx = binascii.hexlify(tx)
desertx2 = bitcoin.deserialize(tx)
print(desertx2)
sig, pub = bitcoin.deserialize_script(desertx2['ins'][0]['script'])
print(sig, pub)
pubscript = bitcoin.address_to_script(bitcoin.pubkey_to_address(
pub, magicbyte=get_p2pk_vbyte()))
sig = binascii.unhexlify(sig)
pub = binascii.unhexlify(pub)
sig_good = bitcoin.verify_tx_input(tx, 0, pubscript,
sig, pub)
assert sig_good
def test_absurd_fees(setup_tx_creation):
"""Test triggering of ValueError exception
if the transaction fees calculated from the blockchain
interface exceed the limit set in the config.
"""
jm_single().bc_interface.absurd_fees = True
#pay into it
wallet = make_wallets(1, [[2, 0, 0, 0, 1]], 3)[0]['wallet']
sync_wallet(wallet)
amount = 350000000
ins_full = wallet.select_utxos(0, amount)
with pytest.raises(ValueError) as e_info:
txid = make_sign_and_push(ins_full, wallet, amount, estimate_fee=True)
def test_create_sighash_txs(setup_tx_creation):
#non-standard hash codes:
for sighash in [bitcoin.SIGHASH_ANYONECANPAY + bitcoin.SIGHASH_SINGLE,
bitcoin.SIGHASH_NONE, bitcoin.SIGHASH_SINGLE]:
wallet = make_wallets(1, [[2, 0, 0, 0, 1]], 3)[0]['wallet']
sync_wallet(wallet)
amount = 350000000
ins_full = wallet.select_utxos(0, amount)
print "using hashcode: " + str(sighash)
txid = make_sign_and_push(ins_full, wallet, amount, hashcode=sighash)
assert txid
#Create an invalid sighash single (too many inputs)
extra = wallet.select_utxos(4, 100000000) #just a few more inputs
ins_full.update(extra)
with pytest.raises(Exception) as e_info:
txid = make_sign_and_push(ins_full,
wallet,
amount,
hashcode=bitcoin.SIGHASH_SINGLE)
#trigger insufficient funds
with pytest.raises(Exception) as e_info:
fake_utxos = wallet.select_utxos(4, 1000000000)
def test_spend_p2sh_utxos(setup_tx_creation):
#make a multisig address from 3 privs
privs = [chr(x) * 32 + '\x01' for x in range(1, 4)]
pubs = [bitcoin.privkey_to_pubkey(binascii.hexlify(priv)) for priv in privs]
script = bitcoin.mk_multisig_script(pubs, 2)
msig_addr = bitcoin.scriptaddr(script, magicbyte=196)
#pay into it
wallet = make_wallets(1, [[2, 0, 0, 0, 1]], 3)[0]['wallet']
sync_wallet(wallet)
amount = 350000000
ins_full = wallet.select_utxos(0, amount)
txid = make_sign_and_push(ins_full, wallet, amount, output_addr=msig_addr)
assert txid
#wait for mining
time.sleep(4)
#spend out; the input can be constructed from the txid of previous
msig_in = txid + ":0"
ins = [msig_in]
#random output address and change addr
output_addr = wallet.get_new_addr(1, 1)
amount2 = amount - 50000
outs = [{'value': amount2, 'address': output_addr}]
tx = bitcoin.mktx(ins, outs)
sigs = []
for priv in privs[:2]:
sigs.append(bitcoin.multisign(tx, 0, script, binascii.hexlify(priv)))
tx = bitcoin.apply_multisignatures(tx, 0, script, sigs)
txid = jm_single().bc_interface.pushtx(tx)
assert txid
@pytest.fixture(scope="module")
def setup_tx_creation():
load_program_config()

7
jmclient/test/test_addresses.py → jmclient/test/test_valid_addresses.py

@ -2,7 +2,8 @@ from jmclient.configure import validate_address, load_program_config
from jmclient import jm_single
import json
import pytest
import os
testdir = os.path.dirname(os.path.realpath(__file__))
def test_non_addresses(setup_addresses):
#could flesh this out with other examples
@ -11,7 +12,7 @@ def test_non_addresses(setup_addresses):
def test_b58_invalid_addresses(setup_addresses):
#none of these are valid as any kind of key or address
with open("base58_keys_invalid.json", "r") as f:
with open(os.path.join(testdir,"base58_keys_invalid.json"), "r") as f:
json_data = f.read()
invalid_key_list = json.loads(json_data)
for k in invalid_key_list:
@ -21,7 +22,7 @@ def test_b58_invalid_addresses(setup_addresses):
def test_b58_valid_addresses():
with open("base58_keys_valid.json", "r") as f:
with open(os.path.join(testdir,"base58_keys_valid.json"), "r") as f:
json_data = f.read()
valid_keys_list = json.loads(json_data)
for a in valid_keys_list:

4
jmclient/test/test_wallets.py

@ -13,7 +13,7 @@ import datetime
import unittest
from ConfigParser import SafeConfigParser, NoSectionError
from decimal import Decimal
from commontest import (local_command, interact, make_wallets,
from commontest import (interact, make_wallets,
make_sign_and_push, DummyBlockchainInterface)
import json
@ -26,7 +26,7 @@ from jmclient import (load_program_config, jm_single, sync_wallet,
BitcoinCoreWallet, BitcoinCoreInterface)
from jmbase.support import chunks
from taker_test_data import t_obtained_tx, t_raw_signed_tx
testdir = os.path.dirname(os.path.realpath(__file__))
log = get_log()

4
setup.cfg

@ -0,0 +1,4 @@
# content of pytest.ini
[tool:pytest]
norecursedirs = test/*
testpaths = jmbitcoin jmclient jmbase

168
test/commontest.py

@ -1,168 +0,0 @@
#! /usr/bin/env python
from __future__ import absolute_import
'''Some helper functions for testing'''
import sys
import os
import time
import binascii
import pexpect
import random
import subprocess
import platform
from decimal import Decimal
data_dir = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
sys.path.insert(0, os.path.join(data_dir))
from joinmarket import jm_single, Wallet, get_log
from joinmarket.support import chunks
from joinmarket.wallet import estimate_tx_fee
import bitcoin as btc
log = get_log()
'''This code is intended to provide
subprocess startup cross-platform with
some useful options; it could do with
some simplification/improvement.'''
import platform
OS = platform.system()
PINL = '\r\n' if OS == 'Windows' else '\n'
class TestWallet(Wallet):
"""Implementation of wallet
that allows passing in a password
for removal of command line interrupt.
"""
def __init__(self,
seedarg,
max_mix_depth=2,
gaplimit=6,
extend_mixdepth=False,
storepassword=False,
pwd=None):
self.given_pwd = pwd
super(TestWallet, self).__init__(seedarg,
max_mix_depth,
gaplimit,
extend_mixdepth,
storepassword)
def read_wallet_file_data(self, filename):
return super(TestWallet, self).read_wallet_file_data(
filename, self.given_pwd)
def make_sign_and_push(ins_full,
wallet,
amount,
output_addr=None,
change_addr=None,
hashcode=btc.SIGHASH_ALL,
estimate_fee = False):
"""Utility function for easily building transactions
from wallets
"""
total = sum(x['value'] for x in ins_full.values())
ins = ins_full.keys()
#random output address and change addr
output_addr = wallet.get_new_addr(1, 1) if not output_addr else output_addr
change_addr = wallet.get_new_addr(1, 0) if not change_addr else change_addr
fee_est = estimate_tx_fee(len(ins), 2) if estimate_fee else 10000
outs = [{'value': amount,
'address': output_addr}, {'value': total - amount - fee_est,
'address': change_addr}]
tx = btc.mktx(ins, outs)
de_tx = btc.deserialize(tx)
for index, ins in enumerate(de_tx['ins']):
utxo = ins['outpoint']['hash'] + ':' + str(ins['outpoint']['index'])
addr = ins_full[utxo]['address']
priv = wallet.get_key_from_addr(addr)
if index % 2:
priv = binascii.unhexlify(priv)
tx = btc.sign(tx, index, priv, hashcode=hashcode)
#pushtx returns False on any error
print btc.deserialize(tx)
push_succeed = jm_single().bc_interface.pushtx(tx)
if push_succeed:
return btc.txhash(tx)
else:
return False
def local_command(command, bg=False, redirect=''):
if redirect == 'NULL':
if OS == 'Windows':
command.append(' > NUL 2>&1')
elif OS == 'Linux':
command.extend(['>', '/dev/null', '2>&1'])
else:
print "OS not recognised, quitting."
elif redirect:
command.extend(['>', redirect])
if bg:
#using subprocess.PIPE seems to cause problems
FNULL = open(os.devnull, 'w')
return subprocess.Popen(command,
stdout=FNULL,
stderr=subprocess.STDOUT,
close_fds=True)
else:
#in case of foreground execution, we can use the output; if not
#it doesn't matter
return subprocess.check_output(command)
def make_wallets(n,
wallet_structures=None,
mean_amt=1,
sdev_amt=0,
start_index=0,
fixed_seeds=None,
test_wallet=False,
passwords=None):
'''n: number of wallets to be created
wallet_structure: array of n arrays , each subarray
specifying the number of addresses to be populated with coins
at each depth (for now, this will only populate coins into 'receive' addresses)
mean_amt: the number of coins (in btc units) in each address as above
sdev_amt: if randomness in amouts is desired, specify here.
Returns: a dict of dicts of form {0:{'seed':seed,'wallet':Wallet object},1:..,}
Default Wallet constructor is joinmarket.Wallet, else use TestWallet,
which takes a password parameter as in the list passwords.
'''
if len(wallet_structures) != n:
raise Exception("Number of wallets doesn't match wallet structures")
if not fixed_seeds:
seeds = chunks(binascii.hexlify(os.urandom(15 * n)), 15 * 2)
else:
seeds = fixed_seeds
wallets = {}
for i in range(n):
if test_wallet:
w = TestWallet(seeds[i], max_mix_depth=5, pwd=passwords[i])
else:
w = Wallet(seeds[i], max_mix_depth=5)
wallets[i + start_index] = {'seed': seeds[i],
'wallet': w}
for j in range(5):
for k in range(wallet_structures[i][j]):
deviation = sdev_amt * random.random()
amt = mean_amt - sdev_amt / 2.0 + deviation
if amt < 0: amt = 0.001
amt = float(Decimal(amt).quantize(Decimal(10)**-8))
jm_single().bc_interface.grab_coins(
wallets[i + start_index]['wallet'].get_external_addr(j),
amt)
#reset the index so the coins can be seen if running in same script
wallets[i + start_index]['wallet'].index[j][0] -= wallet_structures[i][j]
return wallets
def interact(process, inputs, expected):
if len(inputs) != len(expected):
raise Exception("Invalid inputs to interact()")
for i, inp in enumerate(inputs):
process.expect(expected[i])
process.sendline(inp)

76
test/conftest.py

@ -1,76 +0,0 @@
import pytest
import os
import time
import subprocess
from commontest import local_command
from joinmarket import load_program_config
bitcoin_path = None
bitcoin_conf = None
bitcoin_rpcpassword = None
bitcoin_rpcusername = None
miniircd_procs = []
def pytest_addoption(parser):
parser.addoption("--btcroot", action="store", default='',
help="the fully qualified path to the directory containing "+\
"the bitcoin binaries, e.g. /home/user/bitcoin/bin/")
parser.addoption("--btcconf", action="store",
help="the fully qualified path to the location of the "+\
"bitcoin configuration file you use for testing, e.g. "+\
"/home/user/.bitcoin/bitcoin.conf")
parser.addoption("--btcpwd",
action="store",
help="the RPC password for your test bitcoin instance")
parser.addoption("--btcuser",
action="store",
default='bitcoinrpc',
help="the RPC username for your test bitcoin instance (default=bitcoinrpc)")
parser.addoption("--nirc",
type="int",
action="store",
default=1,
help="the number of local miniircd instances")
def teardown():
#didn't find a stop command in miniircd, so just kill
global miniircd_procs
for m in miniircd_procs:
m.kill()
#shut down bitcoin and remove the regtest dir
local_command([bitcoin_path + "bitcoin-cli", "-regtest", "-rpcuser=" + bitcoin_rpcusername,
"-rpcpassword=" + bitcoin_rpcpassword, "stop"])
#note, it is better to clean out ~/.bitcoin/regtest but too
#dangerous to automate it here perhaps
@pytest.fixture(scope="session", autouse=True)
def setup(request):
request.addfinalizer(teardown)
global bitcoin_conf, bitcoin_path, bitcoin_rpcpassword, bitcoin_rpcusername
bitcoin_path = request.config.getoption("--btcroot")
bitcoin_conf = request.config.getoption("--btcconf")
bitcoin_rpcpassword = request.config.getoption("--btcpwd")
bitcoin_rpcusername = request.config.getoption("--btcuser")
#start up miniircd
#minor bug in miniircd (seems); need *full* unqualified path for motd file
cwd = os.getcwd()
n_irc = request.config.getoption("--nirc")
global miniircd_procs
for i in range(n_irc):
miniircd_proc = local_command(
["./miniircd/miniircd", "--ports=" + str(6667+i),
"--motd=" + cwd + "/miniircd/testmotd"],
bg=True)
miniircd_procs.append(miniircd_proc)
#start up regtest blockchain
btc_proc = subprocess.call([bitcoin_path + "bitcoind", "-regtest",
"-daemon", "-conf=" + bitcoin_conf])
time.sleep(3)
#generate blocks
local_command([bitcoin_path + "bitcoin-cli", "-regtest", "-rpcuser=" + bitcoin_rpcusername,
"-rpcpassword=" + bitcoin_rpcpassword, "generate", "101"])
Loading…
Cancel
Save