Browse Source

a lot more coverage of jmclient; only client_protocol left

master
Adam Gibson 9 years ago
parent
commit
bb07a09e25
No known key found for this signature in database
GPG Key ID: B3AE09F1E9A3197A
  1. 2
      jmbase/jmbase/__init__.py
  2. 4
      jmbase/jmbase/support.py
  3. 4
      jmclient/jmclient/__init__.py
  4. 29
      jmclient/jmclient/blockchaininterface.py
  5. 6
      jmclient/jmclient/commitment_utils.py
  6. 6
      jmclient/jmclient/configure.py
  7. 13
      jmclient/jmclient/podle.py
  8. 8
      jmclient/jmclient/wallet.py
  9. 152
      jmclient/test/base58_keys_invalid.json
  10. 452
      jmclient/test/base58_keys_valid.json
  11. 4
      jmclient/test/commontest.py
  12. 42
      jmclient/test/test_addresses.py
  13. 2
      jmclient/test/test_blockr.py
  14. 61
      jmclient/test/test_configure.py
  15. 237
      jmclient/test/test_podle.py
  16. 55
      jmclient/test/test_schedule.py
  17. 120
      jmclient/test/test_tx_notify.py
  18. 524
      jmclient/test/test_wallets.py
  19. 20
      scripts/add-utxo.py
  20. 15
      scripts/sendpayment.py
  21. 16
      scripts/wallet-tool.py

2
jmbase/jmbase/__init__.py

@ -1,6 +1,6 @@
from __future__ import print_function
from .support import (get_log, chunks, debug_silence, debug_dump_object,
joinmarket_alert, core_alert)
joinmarket_alert, core_alert, get_password)
from commands import *

4
jmbase/jmbase/support.py

@ -5,6 +5,7 @@ import sys
import logging
import pprint
import random
from getpass import getpass
logFormatter = logging.Formatter(
"%(asctime)s [%(threadName)-12.12s] [%(levelname)-5.5s] %(message)s")
@ -50,6 +51,9 @@ def get_log():
def chunks(d, n):
return [d[x:x + n] for x in xrange(0, len(d), n)]
def get_password(msg):
return getpass(msg)
def debug_dump_object(obj, skip_fields=None):
if skip_fields is None:
skip_fields = []

4
jmclient/jmclient/__init__.py

@ -16,8 +16,8 @@ from .jsonrpc import JsonRpcError, JsonRpcConnectionError, JsonRpc
from .old_mnemonic import mn_decode, mn_encode
from .slowaes import decryptData, encryptData
from .taker import Taker
from .wallet import AbstractWallet, BitcoinCoreInterface, Wallet, \
BitcoinCoreWallet, estimate_tx_fee
from .wallet import (AbstractWallet, BitcoinCoreInterface, Wallet,
BitcoinCoreWallet, estimate_tx_fee, WalletError)
from .configure import load_program_config, jm_single, get_p2pk_vbyte, \
get_network, jm_single, get_network, validate_address, get_irc_mchannels, \
check_utxo_blacklist

29
jmclient/jmclient/blockchaininterface.py

@ -74,12 +74,10 @@ class BlockchainInterface(object):
whether this is the timeout for unconfirmed or confirmed
timeout for uncontirmed = False
"""
pass
@abc.abstractmethod
def pushtx(self, txhex):
"""pushes tx to the network, returns False if failed"""
pass
@abc.abstractmethod
def query_utxo_set(self, txouts, includeconf=False):
@ -194,7 +192,7 @@ class ElectrumWalletInterface(BlockchainInterface): #pragma: no cover
return fee_per_kb_sat
class BlockrInterface(BlockchainInterface):
class BlockrInterface(BlockchainInterface): #pragma: no cover
BLOCKR_MAX_ADDR_REQ_COUNT = 20
def __init__(self, testnet=False):
@ -510,7 +508,9 @@ class NotifyRequestHeader(BaseHTTPServer.BaseHTTPRequestHandler):
except (JsonRpcError, JsonRpcConnectionError) as e:
log.debug('transaction not found, probably a conflict')
return
if not re.match('^[0-9a-fA-F]*$', tx):
#the following condition shouldn't be possible I believe;
#the rpc server wil return an error as above if the tx is not found.
if not re.match('^[0-9a-fA-F]*$', tx): #pragma: no cover
log.debug('not a txhex')
return
txd = btc.deserialize(tx)
@ -651,7 +651,8 @@ class BitcoinCoreInterface(BlockchainInterface):
for addr in addr_list:
self.rpc('importaddress', [addr, wallet_name, False])
if jm_single().config.get("BLOCKCHAIN",
"blockchain_source") != 'regtest':
"blockchain_source") != 'regtest': #pragma: no cover
#Exit conditions cannot be included in tests
print('restart Bitcoin Core with -rescan if you\'re '
'recovering an existing wallet from backup seed')
print(' otherwise just restart this joinmarket script')
@ -955,7 +956,7 @@ class BitcoinCoreInterface(BlockchainInterface):
# running on local daemon. Only
# to be instantiated after network is up
# with > 100 blocks.
class RegtestBitcoinCoreInterface(BitcoinCoreInterface):
class RegtestBitcoinCoreInterface(BitcoinCoreInterface): #pragma: no cover
def __init__(self, jsonRpc):
super(RegtestBitcoinCoreInterface, self).__init__(jsonRpc, 'regtest')
@ -1049,19 +1050,3 @@ class RegtestBitcoinCoreInterface(BitcoinCoreInterface):
'balance': int(round(Decimal(1e8) * Decimal(self.rpc(
'getreceivedbyaddress', [address]))))})
return {'data': res}
# todo: won't run anyways
# def main():
# #TODO some useful quick testing here, so people know if they've set it up right
# myBCI = RegtestBitcoinCoreInterface()
# #myBCI.send_tx('stuff')
# print myBCI.get_utxos_from_addr(["n4EjHhGVS4Rod8ociyviR3FH442XYMWweD"])
# print myBCI.get_balance_at_addr(["n4EjHhGVS4Rod8ociyviR3FH442XYMWweD"])
# txid = myBCI.grab_coins('mygp9fsgEJ5U7jkPpDjX9nxRj8b5nC3Hnd', 23)
# print txid
# print myBCI.get_balance_at_addr(['mygp9fsgEJ5U7jkPpDjX9nxRj8b5nC3Hnd'])
# print myBCI.get_utxos_from_addr(['mygp9fsgEJ5U7jkPpDjX9nxRj8b5nC3Hnd'])
#
#
# if __name__ == '__main__':
# main()

6
jmclient/jmclient/commitment_utils.py

@ -4,7 +4,7 @@ import sys, os
import jmclient.btc as btc
from jmclient import jm_single, get_p2pk_vbyte
def quit(parser, errmsg):
def quit(parser, errmsg): #pragma: no cover
parser.error(errmsg)
sys.exit(0)
@ -24,10 +24,12 @@ def get_utxo_info(upriv):
except:
#not sending data to stdout in case privkey info
print("Failed to parse utxo information for utxo")
raise
try:
hexpriv = btc.from_wif_privkey(priv, vbyte=get_p2pk_vbyte())
except:
print("failed to parse privkey, make sure it's WIF compressed format.")
raise
return u, priv
def validate_utxo_data(utxo_datas, retrieve=False):
@ -45,7 +47,7 @@ def validate_utxo_data(utxo_datas, retrieve=False):
print('claimed address: ' + addr)
res = jm_single().bc_interface.query_utxo_set([u])
print('blockchain shows this data: ' + str(res))
if len(res) != 1:
if len(res) != 1 or None in res:
print("utxo not found on blockchain: " + str(u))
return False
if res[0]['address'] != addr:

6
jmclient/jmclient/configure.py

@ -244,7 +244,8 @@ def validate_address(addr):
return True, 'address validated'
def donation_address(reusable_donation_pubkey=None):
def donation_address(reusable_donation_pubkey=None): #pragma: no cover
#Donation code currently disabled, so not tested.
if not reusable_donation_pubkey:
reusable_donation_pubkey = ('02be838257fbfddabaea03afbb9f16e852'
'9dfe2de921260a5c46036d97b5eacf2a')
@ -347,7 +348,8 @@ def get_blockchain_interface_instance(_config):
source = _config.get("BLOCKCHAIN", "blockchain_source")
network = get_network()
testnet = network == 'testnet'
if source == 'bitcoin-rpc':
if source == 'bitcoin-rpc': #pragma: no cover
#This cannot be tested without mainnet or testnet blockchain (not regtest)
rpc_host = _config.get("BLOCKCHAIN", "rpc_host")
rpc_port = _config.get("BLOCKCHAIN", "rpc_port")
rpc_user = _config.get("BLOCKCHAIN", "rpc_user")

13
jmclient/jmclient/podle.py

@ -107,6 +107,8 @@ class PoDLE(object):
else:
self.P2 = None
#These sig values should be passed in hex.
self.s = None
self.e = None
if s:
self.s = binascii.unhexlify(s)
if e:
@ -116,12 +118,6 @@ class PoDLE(object):
#the H(P2) value
self.commitment = None
def mark_used(self):
self.used = True
def mark_unused(self):
self.used = False
def get_commitment(self):
"""Set the commitment to sha256(serialization of public key P2)
Return in hex to calling function
@ -222,7 +218,7 @@ class PoDLE(object):
one NUMS point as defined by the range in index_range
"""
if not all([self.P, self.P2, self.s, self.e]):
raise PoDLE("Verify called without sufficient data")
raise PoDLEError("Verify called without sufficient data")
if not self.get_commitment() == commitment:
return False
for J in [getNUMS(i) for i in index_range]:
@ -354,7 +350,8 @@ def update_commitments(commitment=None,
with open(PODLE_COMMIT_FILE, "rb") as f:
try:
c = json.loads(f.read())
except ValueError:
except ValueError: #pragma: no cover
#Exit conditions cannot be included in tests.
print("the file: " + PODLE_COMMIT_FILE + " is not valid json.")
sys.exit(0)

8
jmclient/jmclient/wallet.py

@ -17,6 +17,9 @@ from jmclient.support import select_gradual, select_greedy,select_greediest, sel
log = get_log()
class WalletError(Exception):
pass
def estimate_tx_fee(ins, outs, txtype='p2pkh'):
'''Returns an estimate of the number of satoshis required
for a transaction with the given number of inputs and outputs,
@ -112,6 +115,7 @@ class AbstractWallet(object):
class Wallet(AbstractWallet):
def __init__(self,
seedarg,
pwd,
max_mix_depth=2,
gaplimit=6,
extend_mixdepth=False,
@ -125,9 +129,9 @@ class Wallet(AbstractWallet):
self.unspent = {}
self.spent_utxos = []
self.imported_privkeys = {}
self.seed = self.read_wallet_file_data(seedarg)
self.seed = self.read_wallet_file_data(seedarg, pwd)
if not self.seed:
raise ValueError("Failed to decrypt wallet")
raise WalletError("Failed to decrypt wallet")
if extend_mixdepth and len(self.index_cache) > max_mix_depth:
self.max_mix_depth = len(self.index_cache)
self.gaplimit = gaplimit

152
jmclient/test/base58_keys_invalid.json

@ -0,0 +1,152 @@
[
[
""
],
[
"x"
],
[
"37qgekLpCCHrQuSjvX3fs496FWTGsHFHizjJAs6NPcR47aefnnCWECAhHV6E3g4YN7u7Yuwod5Y"
],
[
"dzb7VV1Ui55BARxv7ATxAtCUeJsANKovDGWFVgpTbhq9gvPqP3yv"
],
[
"MuNu7ZAEDFiHthiunm7dPjwKqrVNCM3mAz6rP9zFveQu14YA8CxExSJTHcVP9DErn6u84E6Ej7S"
],
[
"rPpQpYknyNQ5AEHuY6H8ijJJrYc2nDKKk9jjmKEXsWzyAQcFGpDLU2Zvsmoi8JLR7hAwoy3RQWf"
],
[
"4Uc3FmN6NQ6zLBK5QQBXRBUREaaHwCZYsGCueHauuDmJpZKn6jkEskMB2Zi2CNgtb5r6epWEFfUJq"
],
[
"7aQgR5DFQ25vyXmqZAWmnVCjL3PkBcdVkBUpjrjMTcghHx3E8wb"
],
[
"17QpPprjeg69fW1DV8DcYYCKvWjYhXvWkov6MJ1iTTvMFj6weAqW7wybZeH57WTNxXVCRH4veVs"
],
[
"KxuACDviz8Xvpn1xAh9MfopySZNuyajYMZWz16Dv2mHHryznWUp3"
],
[
"7nK3GSmqdXJQtdohvGfJ7KsSmn3TmGqExug49583bDAL91pVSGq5xS9SHoAYL3Wv3ijKTit65th"
],
[
"cTivdBmq7bay3RFGEBBuNfMh2P1pDCgRYN2Wbxmgwr4ki3jNUL2va"
],
[
"gjMV4vjNjyMrna4fsAr8bWxAbwtmMUBXJS3zL4NJt5qjozpbQLmAfK1uA3CquSqsZQMpoD1g2nk"
],
[
"emXm1naBMoVzPjbk7xpeTVMFy4oDEe25UmoyGgKEB1gGWsK8kRGs"
],
[
"7VThQnNRj1o3Zyvc7XHPRrjDf8j2oivPTeDXnRPYWeYGE4pXeRJDZgf28ppti5hsHWXS2GSobdqyo"
],
[
"1G9u6oCVCPh2o8m3t55ACiYvG1y5BHewUkDSdiQarDcYXXhFHYdzMdYfUAhfxn5vNZBwpgUNpso"
],
[
"31QQ7ZMLkScDiB4VyZjuptr7AEc9j1SjstF7pRoLhHTGkW4Q2y9XELobQmhhWxeRvqcukGd1XCq"
],
[
"DHqKSnpxa8ZdQyH8keAhvLTrfkyBMQxqngcQA5N8LQ9KVt25kmGN"
],
[
"2LUHcJPbwLCy9GLH1qXmfmAwvadWw4bp4PCpDfduLqV17s6iDcy1imUwhQJhAoNoN1XNmweiJP4i"
],
[
"7USRzBXAnmck8fX9HmW7RAb4qt92VFX6soCnts9s74wxm4gguVhtG5of8fZGbNPJA83irHVY6bCos"
],
[
"1DGezo7BfVebZxAbNT3XGujdeHyNNBF3vnficYoTSp4PfK2QaML9bHzAMxke3wdKdHYWmsMTJVu"
],
[
"2D12DqDZKwCxxkzs1ZATJWvgJGhQ4cFi3WrizQ5zLAyhN5HxuAJ1yMYaJp8GuYsTLLxTAz6otCfb"
],
[
"8AFJzuTujXjw1Z6M3fWhQ1ujDW7zsV4ePeVjVo7D1egERqSW9nZ"
],
[
"163Q17qLbTCue8YY3AvjpUhotuaodLm2uqMhpYirsKjVqnxJRWTEoywMVY3NbBAHuhAJ2cF9GAZ"
],
[
"2MnmgiRH4eGLyLc9eAqStzk7dFgBjFtUCtu"
],
[
"461QQ2sYWxU7H2PV4oBwJGNch8XVTYYbZxU"
],
[
"2UCtv53VttmQYkVU4VMtXB31REvQg4ABzs41AEKZ8UcB7DAfVzdkV9JDErwGwyj5AUHLkmgZeobs"
],
[
"cSNjAsnhgtiFMi6MtfvgscMB2Cbhn2v1FUYfviJ1CdjfidvmeW6mn"
],
[
"gmsow2Y6EWAFDFE1CE4Hd3Tpu2BvfmBfG1SXsuRARbnt1WjkZnFh1qGTiptWWbjsq2Q6qvpgJVj"
],
[
"nksUKSkzS76v8EsSgozXGMoQFiCoCHzCVajFKAXqzK5on9ZJYVHMD5CKwgmX3S3c7M1U3xabUny"
],
[
"L3favK1UzFGgdzYBF2oBT5tbayCo4vtVBLJhg2iYuMeePxWG8SQc"
],
[
"7VxLxGGtYT6N99GdEfi6xz56xdQ8nP2dG1CavuXx7Rf2PrvNMTBNevjkfgs9JmkcGm6EXpj8ipyPZ"
],
[
"2mbZwFXF6cxShaCo2czTRB62WTx9LxhTtpP"
],
[
"dB7cwYdcPSgiyAwKWL3JwCVwSk6epU2txw"
],
[
"HPhFUhUAh8ZQQisH8QQWafAxtQYju3SFTX"
],
[
"4ctAH6AkHzq5ioiM1m9T3E2hiYEev5mTsB"
],
[
"Hn1uFi4dNexWrqARpjMqgT6cX1UsNPuV3cHdGg9ExyXw8HTKadbktRDtdeVmY3M1BxJStiL4vjJ"
],
[
"Sq3fDbvutABmnAHHExJDgPLQn44KnNC7UsXuT7KZecpaYDMU9Txs"
],
[
"6TqWyrqdgUEYDQU1aChMuFMMEimHX44qHFzCUgGfqxGgZNMUVWJ"
],
[
"giqJo7oWqFxNKWyrgcBxAVHXnjJ1t6cGoEffce5Y1y7u649Noj5wJ4mmiUAKEVVrYAGg2KPB3Y4"
],
[
"cNzHY5e8vcmM3QVJUcjCyiKMYfeYvyueq5qCMV3kqcySoLyGLYUK"
],
[
"37uTe568EYc9WLoHEd9jXEvUiWbq5LFLscNyqvAzLU5vBArUJA6eydkLmnMwJDjkL5kXc2VK7ig"
],
[
"EsYbG4tWWWY45G31nox838qNdzksbPySWc"
],
[
"nbuzhfwMoNzA3PaFnyLcRxE9bTJPDkjZ6Rf6Y6o2ckXZfzZzXBT"
],
[
"cQN9PoxZeCWK1x56xnz6QYAsvR11XAce3Ehp3gMUdfSQ53Y2mPzx"
],
[
"1Gm3N3rkef6iMbx4voBzaxtXcmmiMTqZPhcuAepRzYUJQW4qRpEnHvMojzof42hjFRf8PE2jPde"
],
[
"2TAq2tuN6x6m233bpT7yqdYQPELdTDJn1eU"
],
[
"ntEtnnGhqPii4joABvBtSEJG6BxjT2tUZqE8PcVYgk3RHpgxgHDCQxNbLJf7ardf1dDk2oCQ7Cf"
],
[
"Ky1YjoZNgQ196HJV3HpdkecfhRBmRZdMJk89Hi5KGfpfPwS2bUbfd"
],
[
"2A1q1YsMZowabbvta7kTy2Fd6qN4r5ZCeG3qLpvZBMzCixMUdkN2Y4dHB1wPsZAeVXUGD83MfRED"
]
]

452
jmclient/test/base58_keys_valid.json

@ -0,0 +1,452 @@
[
[
"1AGNa15ZQXAZUgFiqJ2i7Z2DPU2J6hW62i",
"65a16059864a2fdbc7c99a4723a8395bc6f188eb",
{
"addrType": "pubkey",
"isPrivkey": false,
"isTestnet": false
}
],
[
"3CMNFxN1oHBc4R1EpboAL5yzHGgE611Xou",
"74f209f6ea907e2ea48f74fae05782ae8a665257",
{
"addrType": "script",
"isPrivkey": false,
"isTestnet": false
}
],
[
"mo9ncXisMeAoXwqcV5EWuyncbmCcQN4rVs",
"53c0307d6851aa0ce7825ba883c6bd9ad242b486",
{
"addrType": "pubkey",
"isPrivkey": false,
"isTestnet": true
}
],
[
"2N2JD6wb56AfK4tfmM6PwdVmoYk2dCKf4Br",
"6349a418fc4578d10a372b54b45c280cc8c4382f",
{
"addrType": "script",
"isPrivkey": false,
"isTestnet": true
}
],
[
"5Kd3NBUAdUnhyzenEwVLy9pBKxSwXvE9FMPyR4UKZvpe6E3AgLr",
"eddbdc1168f1daeadbd3e44c1e3f8f5a284c2029f78ad26af98583a499de5b19",
{
"isCompressed": false,
"isPrivkey": true,
"isTestnet": false
}
],
[
"Kz6UJmQACJmLtaQj5A3JAge4kVTNQ8gbvXuwbmCj7bsaabudb3RD",
"55c9bccb9ed68446d1b75273bbce89d7fe013a8acd1625514420fb2aca1a21c4",
{
"isCompressed": true,
"isPrivkey": true,
"isTestnet": false
}
],
[
"9213qJab2HNEpMpYNBa7wHGFKKbkDn24jpANDs2huN3yi4J11ko",
"36cb93b9ab1bdabf7fb9f2c04f1b9cc879933530ae7842398eef5a63a56800c2",
{
"isCompressed": false,
"isPrivkey": true,
"isTestnet": true
}
],
[
"cTpB4YiyKiBcPxnefsDpbnDxFDffjqJob8wGCEDXxgQ7zQoMXJdH",
"b9f4892c9e8282028fea1d2667c4dc5213564d41fc5783896a0d843fc15089f3",
{
"isCompressed": true,
"isPrivkey": true,
"isTestnet": true
}
],
[
"1Ax4gZtb7gAit2TivwejZHYtNNLT18PUXJ",
"6d23156cbbdcc82a5a47eee4c2c7c583c18b6bf4",
{
"addrType": "pubkey",
"isPrivkey": false,
"isTestnet": false
}
],
[
"3QjYXhTkvuj8qPaXHTTWb5wjXhdsLAAWVy",
"fcc5460dd6e2487c7d75b1963625da0e8f4c5975",
{
"addrType": "script",
"isPrivkey": false,
"isTestnet": false
}
],
[
"n3ZddxzLvAY9o7184TB4c6FJasAybsw4HZ",
"f1d470f9b02370fdec2e6b708b08ac431bf7a5f7",
{
"addrType": "pubkey",
"isPrivkey": false,
"isTestnet": true
}
],
[
"2NBFNJTktNa7GZusGbDbGKRZTxdK9VVez3n",
"c579342c2c4c9220205e2cdc285617040c924a0a",
{
"addrType": "script",
"isPrivkey": false,
"isTestnet": true
}
],
[
"5K494XZwps2bGyeL71pWid4noiSNA2cfCibrvRWqcHSptoFn7rc",
"a326b95ebae30164217d7a7f57d72ab2b54e3be64928a19da0210b9568d4015e",
{
"isCompressed": false,
"isPrivkey": true,
"isTestnet": false
}
],
[
"L1RrrnXkcKut5DEMwtDthjwRcTTwED36thyL1DebVrKuwvohjMNi",
"7d998b45c219a1e38e99e7cbd312ef67f77a455a9b50c730c27f02c6f730dfb4",
{
"isCompressed": true,
"isPrivkey": true,
"isTestnet": false
}
],
[
"93DVKyFYwSN6wEo3E2fCrFPUp17FtrtNi2Lf7n4G3garFb16CRj",
"d6bca256b5abc5602ec2e1c121a08b0da2556587430bcf7e1898af2224885203",
{
"isCompressed": false,
"isPrivkey": true,
"isTestnet": true
}
],
[
"cTDVKtMGVYWTHCb1AFjmVbEbWjvKpKqKgMaR3QJxToMSQAhmCeTN",
"a81ca4e8f90181ec4b61b6a7eb998af17b2cb04de8a03b504b9e34c4c61db7d9",
{
"isCompressed": true,
"isPrivkey": true,
"isTestnet": true
}
],
[
"1C5bSj1iEGUgSTbziymG7Cn18ENQuT36vv",
"7987ccaa53d02c8873487ef919677cd3db7a6912",
{
"addrType": "pubkey",
"isPrivkey": false,
"isTestnet": false
}
],
[
"3AnNxabYGoTxYiTEZwFEnerUoeFXK2Zoks",
"63bcc565f9e68ee0189dd5cc67f1b0e5f02f45cb",
{
"addrType": "script",
"isPrivkey": false,
"isTestnet": false
}
],
[
"n3LnJXCqbPjghuVs8ph9CYsAe4Sh4j97wk",
"ef66444b5b17f14e8fae6e7e19b045a78c54fd79",
{
"addrType": "pubkey",
"isPrivkey": false,
"isTestnet": true
}
],
[
"2NB72XtkjpnATMggui83aEtPawyyKvnbX2o",
"c3e55fceceaa4391ed2a9677f4a4d34eacd021a0",
{
"addrType": "script",
"isPrivkey": false,
"isTestnet": true
}
],
[
"5KaBW9vNtWNhc3ZEDyNCiXLPdVPHCikRxSBWwV9NrpLLa4LsXi9",
"e75d936d56377f432f404aabb406601f892fd49da90eb6ac558a733c93b47252",
{
"isCompressed": false,
"isPrivkey": true,
"isTestnet": false
}
],
[
"L1axzbSyynNYA8mCAhzxkipKkfHtAXYF4YQnhSKcLV8YXA874fgT",
"8248bd0375f2f75d7e274ae544fb920f51784480866b102384190b1addfbaa5c",
{
"isCompressed": true,
"isPrivkey": true,
"isTestnet": false
}
],
[
"927CnUkUbasYtDwYwVn2j8GdTuACNnKkjZ1rpZd2yBB1CLcnXpo",
"44c4f6a096eac5238291a94cc24c01e3b19b8d8cef72874a079e00a242237a52",
{
"isCompressed": false,
"isPrivkey": true,
"isTestnet": true
}
],
[
"cUcfCMRjiQf85YMzzQEk9d1s5A4K7xL5SmBCLrezqXFuTVefyhY7",
"d1de707020a9059d6d3abaf85e17967c6555151143db13dbb06db78df0f15c69",
{
"isCompressed": true,
"isPrivkey": true,
"isTestnet": true
}
],
[
"1Gqk4Tv79P91Cc1STQtU3s1W6277M2CVWu",
"adc1cc2081a27206fae25792f28bbc55b831549d",
{
"addrType": "pubkey",
"isPrivkey": false,
"isTestnet": false
}
],
[
"33vt8ViH5jsr115AGkW6cEmEz9MpvJSwDk",
"188f91a931947eddd7432d6e614387e32b244709",
{
"addrType": "script",
"isPrivkey": false,
"isTestnet": false
}
],
[
"mhaMcBxNh5cqXm4aTQ6EcVbKtfL6LGyK2H",
"1694f5bc1a7295b600f40018a618a6ea48eeb498",
{
"addrType": "pubkey",
"isPrivkey": false,
"isTestnet": true
}
],
[
"2MxgPqX1iThW3oZVk9KoFcE5M4JpiETssVN",
"3b9b3fd7a50d4f08d1a5b0f62f644fa7115ae2f3",
{
"addrType": "script",
"isPrivkey": false,
"isTestnet": true
}
],
[
"5HtH6GdcwCJA4ggWEL1B3jzBBUB8HPiBi9SBc5h9i4Wk4PSeApR",
"091035445ef105fa1bb125eccfb1882f3fe69592265956ade751fd095033d8d0",
{
"isCompressed": false,
"isPrivkey": true,
"isTestnet": false
}
],
[
"L2xSYmMeVo3Zek3ZTsv9xUrXVAmrWxJ8Ua4cw8pkfbQhcEFhkXT8",
"ab2b4bcdfc91d34dee0ae2a8c6b6668dadaeb3a88b9859743156f462325187af",
{
"isCompressed": true,
"isPrivkey": true,
"isTestnet": false
}
],
[
"92xFEve1Z9N8Z641KQQS7ByCSb8kGjsDzw6fAmjHN1LZGKQXyMq",
"b4204389cef18bbe2b353623cbf93e8678fbc92a475b664ae98ed594e6cf0856",
{
"isCompressed": false,
"isPrivkey": true,
"isTestnet": true
}
],
[
"cVM65tdYu1YK37tNoAyGoJTR13VBYFva1vg9FLuPAsJijGvG6NEA",
"e7b230133f1b5489843260236b06edca25f66adb1be455fbd38d4010d48faeef",
{
"isCompressed": true,
"isPrivkey": true,
"isTestnet": true
}
],
[
"1JwMWBVLtiqtscbaRHai4pqHokhFCbtoB4",
"c4c1b72491ede1eedaca00618407ee0b772cad0d",
{
"addrType": "pubkey",
"isPrivkey": false,
"isTestnet": false
}
],
[
"3QCzvfL4ZRvmJFiWWBVwxfdaNBT8EtxB5y",
"f6fe69bcb548a829cce4c57bf6fff8af3a5981f9",
{
"addrType": "script",
"isPrivkey": false,
"isTestnet": false
}
],
[
"mizXiucXRCsEriQCHUkCqef9ph9qtPbZZ6",
"261f83568a098a8638844bd7aeca039d5f2352c0",
{
"addrType": "pubkey",
"isPrivkey": false,
"isTestnet": true
}
],
[
"2NEWDzHWwY5ZZp8CQWbB7ouNMLqCia6YRda",
"e930e1834a4d234702773951d627cce82fbb5d2e",
{
"addrType": "script",
"isPrivkey": false,
"isTestnet": true
}
],
[
"5KQmDryMNDcisTzRp3zEq9e4awRmJrEVU1j5vFRTKpRNYPqYrMg",
"d1fab7ab7385ad26872237f1eb9789aa25cc986bacc695e07ac571d6cdac8bc0",
{
"isCompressed": false,
"isPrivkey": true,
"isTestnet": false
}
],
[
"L39Fy7AC2Hhj95gh3Yb2AU5YHh1mQSAHgpNixvm27poizcJyLtUi",
"b0bbede33ef254e8376aceb1510253fc3550efd0fcf84dcd0c9998b288f166b3",
{
"isCompressed": true,
"isPrivkey": true,
"isTestnet": false
}
],
[
"91cTVUcgydqyZLgaANpf1fvL55FH53QMm4BsnCADVNYuWuqdVys",
"037f4192c630f399d9271e26c575269b1d15be553ea1a7217f0cb8513cef41cb",
{
"isCompressed": false,
"isPrivkey": true,
"isTestnet": true
}
],
[
"cQspfSzsgLeiJGB2u8vrAiWpCU4MxUT6JseWo2SjXy4Qbzn2fwDw",
"6251e205e8ad508bab5596bee086ef16cd4b239e0cc0c5d7c4e6035441e7d5de",
{
"isCompressed": true,
"isPrivkey": true,
"isTestnet": true
}
],
[
"19dcawoKcZdQz365WpXWMhX6QCUpR9SY4r",
"5eadaf9bb7121f0f192561a5a62f5e5f54210292",
{
"addrType": "pubkey",
"isPrivkey": false,
"isTestnet": false
}
],
[
"37Sp6Rv3y4kVd1nQ1JV5pfqXccHNyZm1x3",
"3f210e7277c899c3a155cc1c90f4106cbddeec6e",
{
"addrType": "script",
"isPrivkey": false,
"isTestnet": false
}
],
[
"myoqcgYiehufrsnnkqdqbp69dddVDMopJu",
"c8a3c2a09a298592c3e180f02487cd91ba3400b5",
{
"addrType": "pubkey",
"isPrivkey": false,
"isTestnet": true
}
],
[
"2N7FuwuUuoTBrDFdrAZ9KxBmtqMLxce9i1C",
"99b31df7c9068d1481b596578ddbb4d3bd90baeb",
{
"addrType": "script",
"isPrivkey": false,
"isTestnet": true
}
],
[
"5KL6zEaMtPRXZKo1bbMq7JDjjo1bJuQcsgL33je3oY8uSJCR5b4",
"c7666842503db6dc6ea061f092cfb9c388448629a6fe868d068c42a488b478ae",
{
"isCompressed": false,
"isPrivkey": true,
"isTestnet": false
}
],
[
"KwV9KAfwbwt51veZWNscRTeZs9CKpojyu1MsPnaKTF5kz69H1UN2",
"07f0803fc5399e773555ab1e8939907e9badacc17ca129e67a2f5f2ff84351dd",
{
"isCompressed": true,
"isPrivkey": true,
"isTestnet": false
}
],
[
"93N87D6uxSBzwXvpokpzg8FFmfQPmvX4xHoWQe3pLdYpbiwT5YV",
"ea577acfb5d1d14d3b7b195c321566f12f87d2b77ea3a53f68df7ebf8604a801",
{
"isCompressed": false,
"isPrivkey": true,
"isTestnet": true
}
],
[
"cMxXusSihaX58wpJ3tNuuUcZEQGt6DKJ1wEpxys88FFaQCYjku9h",
"0b3b34f0958d8a268193a9814da92c3e8b58b4a4378a542863e34ac289cd830c",
{
"isCompressed": true,
"isPrivkey": true,
"isTestnet": true
}
],
[
"13p1ijLwsnrcuyqcTvJXkq2ASdXqcnEBLE",
"1ed467017f043e91ed4c44b4e8dd674db211c4e6",
{
"addrType": "pubkey",
"isPrivkey": false,
"isTestnet": false
}
],
[
"3ALJH9Y951VCGcVZYAdpA3KchoP9McEj1G",
"5ece0cadddc415b1980f001785947120acdb36fc",
{
"addrType": "script",
"isPrivkey": false,
"isTestnet": false
}
]
]

4
jmclient/test/commontest.py

@ -211,9 +211,9 @@ def make_wallets(n,
wallets = {}
for i in range(n):
if test_wallet:
w = TestWallet(seeds[i], max_mix_depth=5, pwd=passwords[i])
w = Wallet(seeds[i], passwords[i], max_mix_depth=5)
else:
w = Wallet(seeds[i], max_mix_depth=5)
w = Wallet(seeds[i], None, max_mix_depth=5)
wallets[i + start_index] = {'seed': seeds[i],
'wallet': w}
for j in range(5):

42
jmclient/test/test_addresses.py

@ -0,0 +1,42 @@
from jmclient.configure import validate_address, load_program_config
from jmclient import jm_single
import json
import pytest
def test_non_addresses(setup_addresses):
#could flesh this out with other examples
res, msg = validate_address(2)
assert res == False, "Incorrectly accepted number"
def test_b58_invalid_addresses(setup_addresses):
#none of these are valid as any kind of key or address
with open("base58_keys_invalid.json", "r") as f:
json_data = f.read()
invalid_key_list = json.loads(json_data)
for k in invalid_key_list:
bad_key = k[0]
res, message = validate_address(bad_key)
assert res == False, "Incorrectly validated address: " + bad_key + " with message: " + message
def test_b58_valid_addresses():
with open("base58_keys_valid.json", "r") as f:
json_data = f.read()
valid_keys_list = json.loads(json_data)
for a in valid_keys_list:
addr, pubkey, prop_dict = a
if not prop_dict["isPrivkey"]:
if prop_dict["isTestnet"]:
jm_single().config.set("BLOCKCHAIN", "network", "testnet")
else:
jm_single().config.set("BLOCKCHAIN", "network", "mainnet")
#if using py.test -s ; sanity check to see what's actually being tested
print 'testing this address: ' + addr
res, message = validate_address(addr)
assert res == True, "Incorrectly failed to validate address: " + addr + " with message: " + message
@pytest.fixture(scope="module")
def setup_addresses():
load_program_config()

2
jmclient/test/test_blockr.py

@ -59,7 +59,7 @@ def test_blockr_estimate_fee(setup_blockr):
])
def test_blockr_sync(setup_blockr, net, seed, gaplimit, showprivkey, method):
jm_single().config.set("BLOCKCHAIN", "network", net)
wallet = Wallet(seed, max_mix_depth = 5)
wallet = Wallet(seed, None, max_mix_depth = 5)
sync_wallet(wallet)
#copy pasted from wallet-tool; some boiled down form of

61
jmclient/test/test_configure.py

@ -0,0 +1,61 @@
#! /usr/bin/env python
from __future__ import absolute_import
'''test schedule module.'''
import pytest
from jmclient import (load_program_config, jm_single, get_irc_mchannels,
BTC_P2PK_VBYTE, BTC_P2SH_VBYTE, check_utxo_blacklist,
validate_address)
from jmclient.configure import (get_config_irc_channel, get_p2sh_vbyte,
get_p2pk_vbyte, get_blockchain_interface_instance)
import jmbitcoin as bitcoin
import copy
import os
def test_config_get_irc_channel():
load_program_config()
channel = "dummy"
assert get_config_irc_channel(channel) == "#dummy-test"
jm_single().config.set("BLOCKCHAIN", "network", "mainnet")
assert get_config_irc_channel(channel) == "#dummy"
get_irc_mchannels()
load_program_config()
def test_net_byte():
load_program_config()
assert get_p2pk_vbyte() == 0x6f
assert get_p2sh_vbyte() == 196
def test_check_blacklist():
load_program_config()
jm_single().nickname = "fortestnick"
fn = "blacklist" + "_" + jm_single().nickname
if os.path.exists(fn):
os.remove(fn)
assert check_utxo_blacklist("aa"*32, False)
with open(fn, "wb") as f:
f.write("aa"*32 + "\n")
assert not check_utxo_blacklist("aa"*32, False)
assert check_utxo_blacklist("bb"*32, False)
assert check_utxo_blacklist("bb"*32, True)
assert not check_utxo_blacklist("bb"*32, False)
assert not check_utxo_blacklist("bb"*32, True)
def test_blockchain_sources():
load_program_config()
for src in ["blockr", "electrum", "dummy"]:
jm_single().config.set("BLOCKCHAIN", "blockchain_source", src)
if src=="electrum":
jm_single().config.set("BLOCKCHAIN", "network", "mainnet")
if src == "dummy":
with pytest.raises(ValueError) as e_info:
get_blockchain_interface_instance(jm_single().config)
else:
get_blockchain_interface_instance(jm_single().config)
load_program_config()

237
jmclient/test/test_podle.py

@ -0,0 +1,237 @@
#! /usr/bin/env python
from __future__ import print_function
'''Tests of Proof of discrete log equivalence commitments.'''
import os
import jmbitcoin as bitcoin
import binascii
import json
import pytest
import copy
import subprocess
import signal
from commontest import local_command, make_wallets
import time
from pprint import pformat
from jmclient import (load_program_config, get_log, jm_single, generate_podle,
generate_podle_error_string, set_commitment_file,
get_commitment_file, PoDLE, get_podle_commitments,
add_external_commitments, update_commitments)
from jmclient.podle import verify_all_NUMS, verify_podle, PoDLEError
log = get_log()
def test_commitments_empty(setup_podle):
"""Ensure that empty commitments file
results in {}
"""
assert get_podle_commitments() == ([], {})
def test_commitment_retries(setup_podle):
"""Assumes no external commitments available.
Generate pretend priv/utxo pairs and check that they can be used
taker_utxo_retries times.
"""
allowed = jm_single().config.getint("POLICY", "taker_utxo_retries")
#make some pretend commitments
dummy_priv_utxo_pairs = [(bitcoin.sha256(os.urandom(10)),
bitcoin.sha256(os.urandom(10))+":0") for _ in range(10)]
#test a single commitment request of all 10
for x in dummy_priv_utxo_pairs:
p = generate_podle([x], allowed)
assert p
#At this point slot 0 has been taken by all 10.
for i in range(allowed-1):
p = generate_podle(dummy_priv_utxo_pairs[:1], allowed)
assert p
p = generate_podle(dummy_priv_utxo_pairs[:1], allowed)
assert p is None
def generate_single_podle_sig(priv, i):
"""Make a podle entry for key priv at index i, using a dummy utxo value.
This calls the underlying 'raw' code based on the class PoDLE, not the
library 'generate_podle' which intelligently searches and updates commitments.
"""
dummy_utxo = bitcoin.sha256(priv) + ":3"
podle = PoDLE(dummy_utxo, binascii.hexlify(priv))
r = podle.generate_podle(i)
return (r['P'], r['P2'], r['sig'],
r['e'], r['commit'])
def test_rand_commitments(setup_podle):
for i in range(20):
priv = os.urandom(32)
Pser, P2ser, s, e, commitment = generate_single_podle_sig(priv, 1 + i%5)
assert verify_podle(Pser, P2ser, s, e, commitment)
#tweak commitments to verify failure
tweaked = [x[::-1] for x in [Pser, P2ser, s, e, commitment]]
for i in range(5):
#Check failure on garbling of each parameter
y = [Pser, P2ser, s, e, commitment]
y[i] = tweaked[i]
fail = False
try:
fail = verify_podle(*y)
except:
pass
finally:
assert not fail
def test_nums_verify(setup_podle):
"""Check that the NUMS precomputed values are
valid according to the code; assertion check
implicit.
"""
verify_all_NUMS(True)
def test_external_commitments(setup_podle):
"""Add this generated commitment to the external list
{txid:N:{'P':pubkey, 'reveal':{1:{'P2':P2,'s':s,'e':e}, 2:{..},..}}}
Note we do this *after* the sendpayment test so that the external
commitments will not erroneously used (they are fake).
"""
#ensure the file exists even if empty
update_commitments()
ecs = {}
tries = jm_single().config.getint("POLICY","taker_utxo_retries")
for i in range(10):
priv = os.urandom(32)
dummy_utxo = bitcoin.sha256(priv)+":2"
ecs[dummy_utxo] = {}
ecs[dummy_utxo]['reveal']={}
for j in range(tries):
P, P2, s, e, commit = generate_single_podle_sig(priv, j)
if 'P' not in ecs[dummy_utxo]:
ecs[dummy_utxo]['P']=P
ecs[dummy_utxo]['reveal'][j] = {'P2':P2, 's':s, 'e':e}
add_external_commitments(ecs)
used, external = get_podle_commitments()
for u in external:
assert external[u]['P'] == ecs[u]['P']
for i in range(tries):
for x in ['P2', 's', 'e']:
assert external[u]['reveal'][str(i)][x] == ecs[u]['reveal'][i][x]
#add a dummy used commitment, then try again
update_commitments(commitment="ab"*32)
ecs = {}
known_commits = []
known_utxos = []
tries = 3
for i in range(1, 6):
u = binascii.hexlify(chr(i)*32)
known_utxos.append(u)
priv = chr(i)*32+"\x01"
ecs[u] = {}
ecs[u]['reveal']={}
for j in range(tries):
P, P2, s, e, commit = generate_single_podle_sig(priv, j)
known_commits.append(commit)
if 'P' not in ecs[u]:
ecs[u]['P'] = P
ecs[u]['reveal'][j] = {'P2':P2, 's':s, 'e':e}
add_external_commitments(ecs)
#simulate most of those external being already used
for c in known_commits[:-1]:
update_commitments(commitment=c)
#this should find the remaining one utxo and return from it
assert generate_podle([], tries=tries, allow_external=known_utxos)
#test commitment removal
to_remove = ecs[binascii.hexlify(chr(3)*32)]
update_commitments(external_to_remove={binascii.hexlify(chr(3)*32):to_remove})
#test that an incorrectly formatted file raises
with open(get_commitment_file(), "rb") as f:
validjson = json.loads(f.read())
corruptjson = copy.deepcopy(validjson)
del corruptjson['used']
with open(get_commitment_file(), "wb") as f:
f.write(json.dumps(corruptjson, indent=4))
with pytest.raises(PoDLEError) as e_info:
get_podle_commitments()
#clean up
with open(get_commitment_file(), "wb") as f:
f.write(json.dumps(validjson, indent=4))
def test_podle_constructor(setup_podle):
"""Tests rules about construction of PoDLE object
are conformed to.
"""
priv = "aa"*32
#pub and priv together not allowed
with pytest.raises(PoDLEError) as e_info:
p = PoDLE(priv=priv, P="dummypub")
#no pub or priv is allowed, i forget if this is useful for something
p = PoDLE()
#create from priv
p = PoDLE(priv=priv+"01", u="dummyutxo")
pdict = p.generate_podle(2)
assert all([k in pdict for k in ['used', 'utxo', 'P', 'P2', 'commit', 'sig', 'e']])
#using the valid data, serialize/deserialize test
deser = p.deserialize_revelation(p.serialize_revelation())
assert all([deser[x] == pdict[x] for x in ['utxo', 'P', 'P2', 'sig', 'e']])
#deserialization must fail for wrong number of items
with pytest.raises(PoDLEError) as e_info:
p.deserialize_revelation(':'.join([str(x) for x in range(4)]), separator=':')
#reveal() must work without pre-generated commitment
p.commitment = None
pdict2 = p.reveal()
assert pdict2 == pdict
#corrupt P2, cannot commit:
p.P2 = "blah"
with pytest.raises(PoDLEError) as e_info:
p.get_commitment()
#generation fails without a utxo
p = PoDLE(priv=priv)
with pytest.raises(PoDLEError) as e_info:
p.generate_podle(0)
#Test construction from pubkey
pub = bitcoin.privkey_to_pubkey(priv+"01")
p = PoDLE(P=pub)
with pytest.raises(PoDLEError) as e_info:
p.get_commitment()
with pytest.raises(PoDLEError) as e_info:
p.verify("dummycommitment", range(3))
def test_podle_error_string(setup_podle):
priv_utxo_pairs = [('fakepriv1', 'fakeutxo1'),
('fakepriv2', 'fakeutxo2')]
to = ['tooold1', 'tooold2']
ts = ['toosmall1', 'toosmall2']
unspent = "dummyunspent"
cjamt = 100
tua = "3"
tuamtper = "20"
errmgsheader, errmsg = generate_podle_error_string(priv_utxo_pairs,
to,
ts,
unspent,
cjamt,
tua,
tuamtper)
assert errmgsheader == ("Failed to source a commitment; this debugging information"
" may help:\n\n")
y = [x[1] for x in priv_utxo_pairs]
assert all([errmsg.find(x) != -1 for x in to + ts + y])
#ensure OK with nothing
errmgsheader, errmsg = generate_podle_error_string([], [], [], unspent,
cjamt, tua, tuamtper)
@pytest.fixture(scope="module")
def setup_podle(request):
load_program_config()
if not os.path.exists("cmtdata"):
os.mkdir("cmtdata")
prev_commits = False
#back up any existing commitments
pcf = get_commitment_file()
log.debug("Podle file: " + pcf)
if os.path.exists(pcf):
os.rename(pcf, pcf + ".bak")
prev_commits = True
def teardown():
if prev_commits:
os.rename(pcf + ".bak", pcf)
else:
if os.path.exists(pcf):
os.remove(pcf)
request.addfinalizer(teardown)

55
jmclient/test/test_schedule.py

@ -0,0 +1,55 @@
#! /usr/bin/env python
from __future__ import absolute_import
'''test schedule module.'''
import pytest
from jmclient import (get_schedule, load_program_config)
import os
valids = """#sample for testing
1, 110000000, 3, INTERNAL
0, 20000000, 2, mnsquzxrHXpFsZeL42qwbKdCP2y1esN3qw
"""
invalids1 = """#sample for testing
1, 110000000, 3, 5, INTERNAL
#pointless comment here; following line has trailing spaces
0, 20000000, 2, mnsquzxrHXpFsZeL42qwbKdCP2y1esN3qw
"""
invalids2 = """#sample for testing
1, 110000000, notinteger, INTERNAL
0, 20000000, 2, mnsquzxrHXpFsZeL42qwbKdCP2y1esN3qw
"""
invalids3 = """#sample for testing
1, 110000000, 3, INTERNAL
0, notinteger, 2, mnsquzxrHXpFsZeL42qwbKdCP2y1esN3qw
"""
#invalid address
invalids4 = """#sample for testing
1, 110000000, 3, INTERNAL
0, 20000000, 2, mnsquzxrHXpFsZeL42qwbKdCP2y1esN3qq
"""
def test_get_schedule():
load_program_config()
tsf = "schedulefortesting"
for s in [valids, invalids1, invalids2, invalids3, invalids4]:
if os.path.exists(tsf):
os.remove(tsf)
with open(tsf, "wb") as f:
f.write(s)
result = get_schedule(tsf)
if s== valids:
assert result[0]
assert len(result[1])==2
else:
assert not result[0]

120
jmclient/test/test_tx_notify.py

@ -0,0 +1,120 @@
#! /usr/bin/env python
from __future__ import absolute_import
'''test of the add_tx_notify() timeouts and related callbacks'''
import sys
import time
from commontest import make_wallets
import subprocess
import jmbitcoin as bitcoin
import pytest
from jmclient import (load_program_config, jm_single, get_log, Wallet, sync_wallet)
log = get_log()
unconfirm_called = [False]
confirm_called = [False]
timeout_unconfirm_called = [False]
timeout_confirm_called = [False]
def unconfirm_callback(txd, txid):
unconfirm_called[0] = True
log.debug('unconfirm callback()')
def confirm_callback(txd, txid, confirmations):
confirm_called[0] = True
log.debug('confirm callback()')
def timeout_callback(confirmed):
if not confirmed:
timeout_unconfirm_called[0] = True
log.debug('timeout unconfirm callback()')
else:
timeout_confirm_called[0] = True
log.debug('timeout confirm callback()')
def test_notify_handler_errors(setup_tx_notify):
#TODO this has hardcoded 62612 which is from regtest_joinmarket.cfg
#default testing config
txhex = make_tx_add_notify()
with pytest.raises(subprocess.CalledProcessError) as e_info:
res = subprocess.check_output(["curl", "-I", "--connect-timeout", "1",
"http://localhost:62612/walletnotify?abcd"])
with pytest.raises(subprocess.CalledProcessError) as e_info:
res = subprocess.check_output(["curl", "-I", "--connect-timeout", "1",
"http://localhost:62612/walletnotify?xxXX"])
#unrecognised calls are returned an error message, not a curl failure
res = subprocess.check_output(["curl", "-I", "--connect-timeout", "1",
"http://localhost:62612/dummycommand"])
#alertnotifys with an alert message should be accepted (todo, deprecate)
res = subprocess.check_output(["curl", "-I", "--connect-timeout", "1",
"http://localhost:62612/alertnotify?dummyalertmessage"])
def test_no_timeout(setup_tx_notify):
txhex = make_tx_add_notify()
jm_single().bc_interface.pushtx(txhex)
time.sleep(6)
assert unconfirm_called[0]
assert confirm_called[0]
assert not timeout_unconfirm_called[0]
assert not timeout_confirm_called[0]
return True
def test_unconfirm_timeout(setup_tx_notify):
txhex = make_tx_add_notify()
#dont pushtx
time.sleep(6)
assert not unconfirm_called[0]
assert not confirm_called[0]
assert timeout_unconfirm_called[0]
assert not timeout_confirm_called[0]
return True
def test_confirm_timeout(setup_tx_notify):
txhex = make_tx_add_notify()
jm_single().bc_interface.tick_forward_chain_interval = -1
jm_single().bc_interface.pushtx(txhex)
time.sleep(10)
jm_single().bc_interface.tick_forward_chain_interval = 2
assert unconfirm_called[0]
assert not confirm_called[0]
assert not timeout_unconfirm_called[0]
assert timeout_confirm_called[0]
return True
def make_tx_add_notify():
wallet_dict = make_wallets(1, [[1, 0, 0, 0, 0]], mean_amt=4, sdev_amt=0)[0]
amount = 250000000
txfee = 10000
wallet = wallet_dict['wallet']
sync_wallet(wallet)
inputs = wallet.select_utxos(0, amount)
ins = inputs.keys()
input_value = sum([i['value'] for i in inputs.values()])
output_addr = wallet.get_new_addr(1, 0)
change_addr = wallet.get_new_addr(0, 1)
outs = [{'value': amount, 'address': output_addr},
{'value': input_value - amount - txfee, 'address': change_addr}]
tx = bitcoin.mktx(ins, outs)
de_tx = bitcoin.deserialize(tx)
for index, ins in enumerate(de_tx['ins']):
utxo = ins['outpoint']['hash'] + ':' + str(ins['outpoint']['index'])
addr = inputs[utxo]['address']
priv = wallet.get_key_from_addr(addr)
tx = bitcoin.sign(tx, index, priv)
unconfirm_called[0] = confirm_called[0] = False
timeout_unconfirm_called[0] = timeout_confirm_called[0] = False
jm_single().bc_interface.add_tx_notify(
bitcoin.deserialize(tx), unconfirm_callback,
confirm_callback, output_addr, timeout_callback)
return tx
@pytest.fixture(scope="module")
def setup_tx_notify():
load_program_config()
jm_single().config.set('TIMEOUT', 'unconfirm_timeout_sec', '3')
jm_single().config.set('TIMEOUT', 'confirm_timeout_hours', str(6.0 / 60 / 60))
jm_single().bc_interface.tick_forward_chain_interval = 2

524
jmclient/test/test_wallets.py

@ -13,43 +13,303 @@ import datetime
import unittest
from ConfigParser import SafeConfigParser, NoSectionError
from decimal import Decimal
from commontest import (local_command, interact, make_wallets, make_sign_and_push,
DummyBlockchainInterface, TestWallet)
from commontest import (local_command, interact, make_wallets,
make_sign_and_push, DummyBlockchainInterface)
import json
import jmbitcoin as bitcoin
import pytest
from jmclient import (load_program_config, jm_single, sync_wallet, AbstractWallet,
get_p2pk_vbyte, get_log, Wallet, select, select_gradual,
select_greedy, select_greediest, estimate_tx_fee, encryptData,
get_network)
from jmclient import (load_program_config, jm_single, sync_wallet,
AbstractWallet, get_p2pk_vbyte, get_log, Wallet, select,
select_gradual, select_greedy, select_greediest,
estimate_tx_fee, encryptData, get_network, WalletError,
BitcoinCoreWallet, BitcoinCoreInterface)
from jmbase.support import chunks
from taker_test_data import t_obtained_tx
from taker_test_data import t_obtained_tx, t_raw_signed_tx
log = get_log()
def do_tx(wallet, amount):
ins_full = wallet.select_utxos(0, amount)
cj_addr = wallet.get_internal_addr(1)
change_addr = wallet.get_internal_addr(0)
wallet.update_cache_index()
txid = make_sign_and_push(ins_full, wallet, amount,
txid = make_sign_and_push(ins_full,
wallet,
amount,
output_addr=cj_addr,
change_addr=change_addr,
estimate_fee=True)
assert txid
time.sleep(2) #blocks
time.sleep(2) #blocks
jm_single().bc_interface.sync_unspent(wallet)
return txid
def test_query_utxo_set(setup_wallets):
load_program_config()
wallet = create_wallet_for_sync("wallet4utxo.json", "4utxo",
[2, 3, 0, 0, 0],
["wallet4utxo.json", "4utxo", [2, 3]])
sync_wallet(wallet)
txid = do_tx(wallet, 90000000)
time.sleep(5)
txid2 = do_tx(wallet, 20000000)
print("Got txs: ", txid, txid2)
res1 = jm_single().bc_interface.query_utxo_set(txid + ":0")
res2 = jm_single().bc_interface.query_utxo_set(
[txid + ":0", txid2 + ":1"],
includeconf=True)
assert len(res1) == 1
assert len(res2) == 2
assert all([x in res1[0] for x in ['script', 'address', 'value']])
assert not 'confirms' in res1[0]
assert 'confirms' in res2[0]
assert 'confirms' in res2[1]
res3 = jm_single().bc_interface.query_utxo_set("ee" * 32 + ":25")
assert res3 == [None]
def create_wallet_for_sync(wallet_file, password, wallet_structure, a):
#Prepare a testnet wallet file for this wallet
password_key = bitcoin.bin_dbl_sha256(password)
#We need a distinct seed for each run so as not to step over each other;
#make it through a deterministic hash
seedh = bitcoin.sha256("".join([str(x) for x in a]))[:32]
encrypted_seed = encryptData(password_key, seedh.decode('hex'))
timestamp = datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S")
walletfilejson = {'creator': 'joinmarket project',
'creation_time': timestamp,
'encrypted_seed': encrypted_seed.encode('hex'),
'network': get_network()}
walletfile = json.dumps(walletfilejson)
if not os.path.exists('wallets'):
os.makedirs('wallets')
with open(os.path.join('wallets', wallet_file), "wb") as f:
f.write(walletfile)
#The call to Wallet() in make_wallets should now find the file
#and read from it:
return make_wallets(1,
[wallet_structure],
fixed_seeds=[wallet_file],
test_wallet=True,
passwords=[password])[0]['wallet']
@pytest.mark.parametrize(
"num_txs, fake_count, wallet_structure, amount, wallet_file, password",
[
(3, 13, [11, 3, 4, 5, 6], 150000000, 'test_import_wallet.json',
'import-pwd'),
#Uncomment all these for thorough tests. Passing currently.
#Lots of used addresses
(7, 1, [51, 3, 4, 5, 6], 150000000, 'test_import_wallet.json',
'import-pwd'),
(3, 1, [3, 1, 4, 5, 6], 50000000, 'test_import_wallet.json',
'import-pwd'),
#No spams/fakes
(2, 0, [5, 20, 1, 1, 1], 50000000, 'test_import_wallet.json',
'import-pwd'),
#Lots of transactions and fakes
(25, 30, [30, 20, 1, 1, 1], 50000000, 'test_import_wallet.json',
'import-pwd'),
])
def test_wallet_sync_with_fast(setup_wallets, num_txs, fake_count,
wallet_structure, amount, wallet_file, password):
wallet = create_wallet_for_sync(wallet_file, password, wallet_structure,
[num_txs, fake_count, wallet_structure,
amount, wallet_file, password])
sync_count = 0
jm_single().bc_interface.wallet_synced = False
while not jm_single().bc_interface.wallet_synced:
sync_wallet(wallet)
sync_count += 1
#avoid infinite loop
assert sync_count < 10
log.debug("Tried " + str(sync_count) + " times")
assert jm_single().bc_interface.wallet_synced
assert not jm_single().bc_interface.fast_sync_called
#do some transactions with the wallet, then close, then resync
for i in range(num_txs):
do_tx(wallet, amount)
log.debug("After doing a tx, index is now: " + str(wallet.index))
#simulate a spammer requesting a bunch of transactions. This
#mimics what happens in CoinJoinOrder.__init__()
for j in range(fake_count):
#Note that as in a real script run,
#the initial call to sync_wallet will
#have set wallet_synced to True, so these will
#trigger actual imports.
cj_addr = wallet.get_internal_addr(0)
change_addr = wallet.get_internal_addr(0)
wallet.update_cache_index()
log.debug("After doing a spam, index is now: " + str(wallet.index))
assert wallet.index[0][1] == num_txs + fake_count * 2 * num_txs
#Attempt re-sync, simulating a script restart.
jm_single().bc_interface.wallet_synced = False
sync_count = 0
#Probably should be fixed in main code:
#wallet.index_cache is only assigned in Wallet.__init__(),
#meaning a second sync in the same script, after some transactions,
#will not know about the latest index_cache value (see is_index_ahead_of_cache),
#whereas a real re-sync will involve reading the cache from disk.
#Hence, simulation of the fact that the cache index will
#be read from the file on restart:
wallet.index_cache = wallet.index
while not jm_single().bc_interface.wallet_synced:
#Wallet.__init__() resets index to zero.
wallet.index = []
for i in range(5):
wallet.index.append([0, 0])
#Wallet.__init__() also updates the cache index
#from file, but we can reuse from the above pre-loop setting,
#since nothing else in sync will overwrite the cache.
#for regtest add_watchonly_addresses does not exit(), so can
#just repeat as many times as possible. This might
#be usable for non-test code (i.e. no need to restart the
#script over and over again)?
sync_count += 1
log.debug("TRYING SYNC NUMBER: " + str(sync_count))
sync_wallet(wallet, fast=True)
assert jm_single().bc_interface.fast_sync_called
#avoid infinite loop on failure.
assert sync_count < 10
#Wallet should recognize index_cache on fast sync, so should not need to
#run sync process more than once.
assert sync_count == 1
#validate the wallet index values after sync
for i, ws in enumerate(wallet_structure):
assert wallet.index[i][0] == ws #spends into external only
#Same number as above; note it includes the spammer's extras.
assert wallet.index[0][1] == num_txs + fake_count * 2 * num_txs
assert wallet.index[1][1] == num_txs #one change per transaction
for i in range(2, 5):
assert wallet.index[i][1] == 0 #unused
#Now try to do more transactions as sanity check.
do_tx(wallet, 50000000)
@pytest.mark.parametrize(
"wallet_structure, wallet_file, password, ic",
[
#As usual, more test cases are preferable but time
#of build test is too long, so only one activated.
#([11,3,4,5,6], 'test_import_wallet.json', 'import-pwd',
# [(12,3),(100,99),(7, 40), (200, 201), (10,0)]
# ),
([1, 3, 0, 2, 9], 'test_import_wallet.json', 'import-pwd',
[(1, 7), (100, 99), (0, 0), (200, 201), (21, 41)]),
])
def test_wallet_sync_from_scratch(setup_wallets, wallet_structure, wallet_file,
password, ic):
"""Simulate a scenario in which we use a new bitcoind, thusly:
generate a new wallet and simply pretend that it has an existing
index_cache. This will force import of all addresses up to
the index_cache values.
"""
wallet = create_wallet_for_sync(wallet_file, password, wallet_structure,
[wallet_structure, wallet_file, password,
ic])
sync_count = 0
jm_single().bc_interface.wallet_synced = False
wallet.index_cache = ic
while not jm_single().bc_interface.wallet_synced:
wallet.index = []
for i in range(5):
wallet.index.append([0, 0])
#will call with fast=False but index_cache exists; should use slow-sync
sync_wallet(wallet)
sync_count += 1
#avoid infinite loop
assert sync_count < 10
log.debug("Tried " + str(sync_count) + " times")
#after #586 we expect to ALWAYS succeed within 2 rounds
assert sync_count <= 2
#for each external branch, the new index may be higher than
#the original index_cache if there was a higher used address
expected_wallet_index = []
for i, val in enumerate(wallet_structure):
if val > wallet.index_cache[i][0]:
expected_wallet_index.append([val, wallet.index_cache[i][1]])
else:
expected_wallet_index.append([wallet.index_cache[i][0],
wallet.index_cache[i][1]])
assert wallet.index == expected_wallet_index
log.debug("This is wallet unspent: ")
log.debug(json.dumps(wallet.unspent, indent=4))
"""Purely blockchaininterface related error condition tests"""
def test_index_ahead_cache(setup_wallets):
"""Artificial test; look into finding a sync mode that triggers this
"""
class NonWallet(object):
pass
wallet = NonWallet()
wallet.index_cache = [[0, 0], [0, 2]]
from jmclient.blockchaininterface import is_index_ahead_of_cache
assert is_index_ahead_of_cache(wallet, 3, 1)
def test_core_wallet_no_sync(setup_wallets):
"""Ensure BitcoinCoreWallet sync attempt does nothing
"""
wallet = BitcoinCoreWallet("")
#this will not trigger sync due to absence of non-zero index_cache, usually.
wallet.index_cache = [[1, 1]]
jm_single().bc_interface.wallet_synced = False
jm_single().bc_interface.sync_wallet(wallet, fast=True)
assert not jm_single().bc_interface.wallet_synced
jm_single().bc_interface.sync_wallet(wallet)
assert not jm_single().bc_interface.wallet_synced
def test_wrong_network_bci(setup_wallets):
rpc = jm_single().bc_interface.jsonRpc
with pytest.raises(Exception) as e_info:
x = BitcoinCoreInterface(rpc, 'mainnet')
def test_pushtx_errors(setup_wallets):
"""Ensure pushtx fails return False
"""
badtxhex = "aaaa"
assert not jm_single().bc_interface.pushtx(badtxhex)
#Break the authenticated jsonrpc and try again
jm_single().bc_interface.jsonRpc.port = 18333
assert not jm_single().bc_interface.pushtx(t_raw_signed_tx)
#rebuild a valid jsonrpc inside the bci
load_program_config()
"""Tests mainly for wallet.py"""
def test_absurd_fee(setup_wallets):
jm_single().config.set("POLICY", "absurd_fee_per_kb", "1000")
with pytest.raises(ValueError) as e_info:
estimate_tx_fee(10,2)
estimate_tx_fee(10, 2)
load_program_config()
def test_abstract_wallet(setup_wallets):
class DoNothingWallet(AbstractWallet):
pass
for algo in ["default", "gradual", "greedy", "greediest", "none"]:
jm_single().config.set("POLICY", "merge_algorithm", algo)
if algo == "none":
@ -70,6 +330,7 @@ def test_abstract_wallet(setup_wallets):
dnw.add_new_utxos("b", "c")
load_program_config()
def create_default_testnet_wallet():
walletdir = "wallets"
testwalletname = "testwallet.json"
@ -77,49 +338,53 @@ def create_default_testnet_wallet():
if os.path.exists(pathtowallet):
os.remove(pathtowallet)
seed = "hello"
return (walletdir, pathtowallet, testwalletname, Wallet(seed,
5,
6,
extend_mixdepth=False,
storepassword=False))
return (walletdir, pathtowallet, testwalletname,
Wallet(seed,
None,
5,
6,
extend_mixdepth=False,
storepassword=False))
@pytest.mark.parametrize(
"includecache, wrongnet, storepwd, extendmd, pwdnumtries",
[
(False, False, False, False, 1000),
(True, False, False, True, 1),
(False, True, False, False, 1),
(False, False, True, False, 1)
"includecache, wrongnet, storepwd, extendmd, pwdnumtries", [
(False, False, False, False, 1000), (True, False, False, True, 1),
(False, True, False, False, 1), (False, False, True, False, 1)
])
def test_wallet_create(setup_wallets, includecache, wrongnet, storepwd, extendmd,
pwdnumtries):
walletdir, pathtowallet, testwalletname, wallet = create_default_testnet_wallet()
assert wallet.get_key(4,1,17) == "1289ca322f96673acef83f396a9735840e3ab69f0459cf9bfa8d9985a876534401"
assert wallet.get_addr(2,0,5) == "myWPu9QJWHGE79XAmuKkwKgNk8vsr5evpk"
def test_wallet_create(setup_wallets, includecache, wrongnet, storepwd,
extendmd, pwdnumtries):
walletdir, pathtowallet, testwalletname, wallet = create_default_testnet_wallet(
)
assert wallet.get_key(
4, 1,
17) == "1289ca322f96673acef83f396a9735840e3ab69f0459cf9bfa8d9985a876534401"
assert wallet.get_addr(2, 0, 5) == "myWPu9QJWHGE79XAmuKkwKgNk8vsr5evpk"
jm_single().bc_interface.wallet_synced = True
assert wallet.get_new_addr(1, 0) == "mi88ZgDGPmarzcsU6S437h9CY9BLmgH5M6"
assert wallet.get_external_addr(3) == "mvChQuChnXVhqvH67wfMxrodPQ7xccdVJU"
addr3internal = wallet.get_internal_addr(3)
assert addr3internal == "mv26o79Bauf2miJMoxoSu1vXmfXnk85YPQ"
assert wallet.get_key_from_addr(addr3internal) == "2a283c9a2168a25509e2fb944939637228c50c8b4fecd9024650316c4584246501"
assert wallet.get_key_from_addr(
addr3internal) == "2a283c9a2168a25509e2fb944939637228c50c8b4fecd9024650316c4584246501"
dummyaddr = "mvw1NazKDRbeNufFANqpYNAANafsMC2zVU"
assert not wallet.get_key_from_addr(dummyaddr)
#Make a new Wallet(), and prepare a testnet wallet file for this wallet
password = "dummypassword"
password_key = bitcoin.bin_dbl_sha256(password)
seed = bitcoin.sha256("\xaa"*64)[:32]
seed = bitcoin.sha256("\xaa" * 64)[:32]
encrypted_seed = encryptData(password_key, seed.decode('hex'))
timestamp = datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S")
net = get_network() if not wrongnet else 'mainnnet'
walletfilejson = {'creator': 'joinmarket project',
'creation_time': timestamp,
'encrypted_seed': encrypted_seed.encode('hex'),
'network': net}
'creation_time': timestamp,
'encrypted_seed': encrypted_seed.encode('hex'),
'network': net}
if includecache:
mmd = wallet.max_mix_depth if not extendmd else wallet.max_mix_depth + 5
print("using mmd: " + str(mmd))
walletfilejson.update({'index_cache': [[0,0]]*mmd})
walletfilejson.update({'index_cache': [[0, 0]] * mmd})
walletfile = json.dumps(walletfilejson)
if not os.path.exists(walletdir):
os.makedirs(walletdir)
@ -127,21 +392,39 @@ def test_wallet_create(setup_wallets, includecache, wrongnet, storepwd, extendmd
f.write(walletfile)
if wrongnet:
with pytest.raises(ValueError) as e_info:
TestWallet(testwalletname, 5, 6, extend_mixdepth=extendmd,
storepassword=storepwd, pwd=password)
Wallet(testwalletname,
password,
5,
6,
extend_mixdepth=extendmd,
storepassword=storepwd)
return
from string import ascii_letters
for i in range(pwdnumtries): #multiple tries to ensure pkcs7 error is triggered
with pytest.raises(ValueError) as e_info:
wrongpwd = "".join([random.choice(ascii_letters) for _ in range(20)])
TestWallet(testwalletname, 5, 6, extend_mixdepth=extendmd,
storepassword=storepwd, pwd=wrongpwd)
for i in range(
pwdnumtries): #multiple tries to ensure pkcs7 error is triggered
with pytest.raises(WalletError) as e_info:
wrongpwd = "".join([random.choice(ascii_letters) for _ in range(20)
])
Wallet(testwalletname,
wrongpwd,
5,
6,
extend_mixdepth=extendmd,
storepassword=storepwd)
with pytest.raises(ValueError) as e_info:
TestWallet(testwalletname, 5, 6, extend_mixdepth=extendmd,
storepassword=storepwd, pwd=None)
newwallet = TestWallet(testwalletname, 5, 6, extend_mixdepth=extendmd,
storepassword=storepwd, pwd=password)
with pytest.raises(WalletError) as e_info:
Wallet(testwalletname,
None,
5,
6,
extend_mixdepth=extendmd,
storepassword=storepwd)
newwallet = Wallet(testwalletname,
password,
5,
6,
extend_mixdepth=extendmd,
storepassword=storepwd)
assert newwallet.seed == seed
#now we have a functional wallet + file, update the cache; first try
#with failed paths
@ -152,9 +435,9 @@ def test_wallet_create(setup_wallets, includecache, wrongnet, storepwd, extendmd
newwallet.update_cache_index()
#with real path
newwallet.path = oldpath
newwallet.index = [[1,1]]*5
newwallet.index = [[1, 1]] * 5
newwallet.update_cache_index()
#ensure we cannot find a mainnet wallet from seed
seed = "goodbye"
jm_single().config.set("BLOCKCHAIN", "network", "mainnet")
@ -162,51 +445,66 @@ def test_wallet_create(setup_wallets, includecache, wrongnet, storepwd, extendmd
Wallet(seed, 5, 6, False, False)
load_program_config()
def test_imported_privkey(setup_wallets):
jm_single().config.set("BLOCKCHAIN", "network", "mainnet")
password = "dummypassword"
password_key = bitcoin.bin_dbl_sha256(password)
privkey = "L1RrrnXkcKut5DEMwtDthjwRcTTwED36thyL1DebVrKuwvohjMNi"
#to verify use from_wif_privkey and privkey_to_address
iaddr = "1LDsjB43N2NAQ1Vbc2xyHca4iBBciN8iwC"
privkey_bin = bitcoin.from_wif_privkey(privkey,
vbyte=get_p2pk_vbyte()).decode('hex')[:-1]
encrypted_privkey = encryptData(password_key, privkey_bin)
encrypted_privkey_bad = encryptData(password_key, privkey_bin[:6])
walletdir = "wallets"
testwalletname = "testreal"
pathtowallet = os.path.join(walletdir, testwalletname)
seed = bitcoin.sha256("\xaa"*64)[:32]
encrypted_seed = encryptData(password_key, seed.decode('hex'))
timestamp = datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S")
for ep in [encrypted_privkey, encrypted_privkey_bad]:
walletfilejson = {'creator': 'joinmarket project',
'creation_time': timestamp,
'encrypted_seed': encrypted_seed.encode('hex'),
'network': get_network(),
'index_cache': [[0,0]]*5,
'imported_keys': [
{'encrypted_privkey': ep.encode('hex'),
'mixdepth': 0}]}
walletfile = json.dumps(walletfilejson)
if not os.path.exists(walletdir):
os.makedirs(walletdir)
with open(pathtowallet, "wb") as f:
f.write(walletfile)
if ep == encrypted_privkey_bad:
with pytest.raises(Exception) as e_info:
TestWallet(testwalletname, 5, 6, False, False, pwd=password)
continue
newwallet = TestWallet(testwalletname, 5, 6, False, False, pwd=password)
assert newwallet.seed == seed
#test accessing the key from the addr
assert newwallet.get_key_from_addr(iaddr) == bitcoin.from_wif_privkey(privkey)
for n in ["mainnet", "testnet"]:
privkey = "7d998b45c219a1e38e99e7cbd312ef67f77a455a9b50c730c27f02c6f730dfb401"
jm_single().config.set("BLOCKCHAIN", "network", n)
password = "dummypassword"
password_key = bitcoin.bin_dbl_sha256(password)
wifprivkey = bitcoin.wif_compressed_privkey(privkey, get_p2pk_vbyte())
#mainnet is "L1RrrnXkcKut5DEMwtDthjwRcTTwED36thyL1DebVrKuwvohjMNi"
#to verify use from_wif_privkey and privkey_to_address
if n == "mainnet":
iaddr = "1LDsjB43N2NAQ1Vbc2xyHca4iBBciN8iwC"
else:
iaddr = "mzjq2E92B3oRB7yDKbwM7XnPaAnKfRERw2"
privkey_bin = bitcoin.from_wif_privkey(
wifprivkey,
vbyte=get_p2pk_vbyte()).decode('hex')[:-1]
encrypted_privkey = encryptData(password_key, privkey_bin)
encrypted_privkey_bad = encryptData(password_key, privkey_bin[:6])
walletdir = "wallets"
testwalletname = "test" + n
pathtowallet = os.path.join(walletdir, testwalletname)
seed = bitcoin.sha256("\xaa" * 64)[:32]
encrypted_seed = encryptData(password_key, seed.decode('hex'))
timestamp = datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S")
for ep in [encrypted_privkey, encrypted_privkey_bad]:
walletfilejson = {'creator': 'joinmarket project',
'creation_time': timestamp,
'encrypted_seed': encrypted_seed.encode('hex'),
'network': n,
'index_cache': [[0, 0]] * 5,
'imported_keys': [
{'encrypted_privkey': ep.encode('hex'),
'mixdepth': 0}
]}
walletfile = json.dumps(walletfilejson)
if not os.path.exists(walletdir):
os.makedirs(walletdir)
with open(pathtowallet, "wb") as f:
f.write(walletfile)
if ep == encrypted_privkey_bad:
with pytest.raises(Exception) as e_info:
Wallet(testwalletname, password, 5, 6, False, False)
continue
newwallet = Wallet(testwalletname, password, 5, 6, False, False)
assert newwallet.seed == seed
#test accessing the key from the addr
assert newwallet.get_key_from_addr(
iaddr) == bitcoin.from_wif_privkey(wifprivkey,
vbyte=get_p2pk_vbyte())
if n == "testnet":
jm_single().bc_interface.sync_wallet(newwallet)
load_program_config()
def test_add_remove_utxos(setup_wallets):
#Make a fake wallet and inject and then remove fake utxos
walletdir, pathtowallet, testwalletname, wallet = create_default_testnet_wallet()
assert wallet.get_addr(2,0,5) == "myWPu9QJWHGE79XAmuKkwKgNk8vsr5evpk"
walletdir, pathtowallet, testwalletname, wallet = create_default_testnet_wallet(
)
assert wallet.get_addr(2, 0, 5) == "myWPu9QJWHGE79XAmuKkwKgNk8vsr5evpk"
wallet.addr_cache["myWPu9QJWHGE79XAmuKkwKgNk8vsr5evpk"] = (2, 0, 5)
#'76a914c55738deaa9861b6022e53a129968cbf354898b488ac'
#these calls automatically update the addr_cache:
@ -215,28 +513,33 @@ def test_add_remove_utxos(setup_wallets):
assert wallet.get_external_addr(3) == "mvChQuChnXVhqvH67wfMxrodPQ7xccdVJU"
#76a914a115fa0394ce881437a96d443e236b39e07db1f988ac
#using the above pubkey scripts:
faketxforwallet = {'outs':
[{'script': '76a914c55738deaa9861b6022e53a129968cbf354898b488ac',
'value': 110000000},
{'script': '76a9141c9761f5fef73bef6aca378c930c59e7e795088488ac',
'value': 89910900},
{'script': '76a914a115fa0394ce881437a96d443e236b39e07db1f988ac',
'value': 90021000},
{'script': '76a9145ece2dac945c8ff5b2b6635360ca0478ade305d488ac', #not ours
'value': 110000000}],
'version': 1}
wallet.add_new_utxos(faketxforwallet, "aa"*32)
faketxforspending = {'ins':
[{'outpoint': {'hash': 'aa'*32,
'index': 0}},
{'outpoint': {'hash': 'aa'*32,
'index': 1}},
{'outpoint': {'hash': 'aa'*32,
'index': 2}},
{'outpoint': {'hash': '3f3ea820d706e08ad8dc1d2c392c98facb1b067ae4c671043ae9461057bd2a3c',
'index': 1},
'script': '',
'sequence': 4294967295}]}
faketxforwallet = {'outs': [
{'script': '76a914c55738deaa9861b6022e53a129968cbf354898b488ac',
'value': 110000000},
{'script': '76a9141c9761f5fef73bef6aca378c930c59e7e795088488ac',
'value': 89910900},
{'script': '76a914a115fa0394ce881437a96d443e236b39e07db1f988ac',
'value': 90021000},
{'script':
'76a9145ece2dac945c8ff5b2b6635360ca0478ade305d488ac', #not ours
'value': 110000000}
],
'version': 1}
wallet.add_new_utxos(faketxforwallet, "aa" * 32)
faketxforspending = {'ins': [
{'outpoint': {'hash': 'aa' * 32,
'index': 0}}, {'outpoint': {'hash': 'aa' * 32,
'index': 1}}, {'outpoint':
{'hash':
'aa' * 32,
'index': 2}},
{'outpoint':
{'hash':
'3f3ea820d706e08ad8dc1d2c392c98facb1b067ae4c671043ae9461057bd2a3c',
'index': 1},
'script': '',
'sequence': 4294967295}
]}
wallet.select_utxos(1, 100000)
with pytest.raises(Exception) as e_info:
wallet.select_utxos(0, 100000)
@ -245,7 +548,8 @@ def test_add_remove_utxos(setup_wallets):
mul = wallet.get_utxos_by_mixdepth()
assert mul[3] != {}
wallet.remove_old_utxos(faketxforspending)
@pytest.fixture(scope="module")
def setup_wallets():
load_program_config()

20
scripts/add-utxo.py

@ -14,8 +14,9 @@ from pprint import pformat
from optparse import OptionParser
import jmclient.btc as btc
from jmbase import get_password
from jmclient import (load_program_config, jm_single, get_p2pk_vbyte,
Wallet, sync_wallet, add_external_commitments,
Wallet, WalletError, sync_wallet, add_external_commitments,
generate_podle, update_commitments, PoDLE,
set_commitment_file, get_podle_commitments,
get_utxo_info, validate_utxo_data, quit)
@ -173,9 +174,20 @@ def main():
#Three options (-w, -r, -R) for loading utxo and privkey pairs from a wallet,
#csv file or json file.
if options.loadwallet:
wallet = Wallet(options.loadwallet,
options.maxmixdepth,
options.gaplimit)
while True:
pwd = get_password("Enter wallet decryption passphrase: ")
try:
wallet = Wallet(options.loadwallet,
pwd,
options.maxmixdepth,
options.gaplimit)
except WalletError:
print("Wrong password, try again.")
continue
except Exception as e:
print("Failed to load wallet, error message: " + repr(e))
sys.exit(0)
break
sync_wallet(wallet, fast=options.fastsync)
unsp = {}
for u, av in wallet.unspent.iteritems():

15
scripts/sendpayment.py

@ -51,13 +51,13 @@ import time
from jmclient import (Taker, load_program_config, get_schedule,
JMTakerClientProtocolFactory, start_reactor,
validate_address, jm_single,
validate_address, jm_single, WalletError,
choose_orders, choose_sweep_orders,
cheapest_order_choose, weighted_order_choose,
Wallet, BitcoinCoreWallet, sync_wallet,
RegtestBitcoinCoreInterface, estimate_tx_fee)
from jmbase.support import get_log, debug_dump_object
from jmbase.support import get_log, debug_dump_object, get_password
log = get_log()
@ -252,7 +252,16 @@ def main():
if not options.userpcwallet:
max_mix_depth = max([mixdepth, options.amtmixdepths])
wallet = Wallet(wallet_name, max_mix_depth, options.gaplimit)
try:
pwd = get_password("Enter wallet decryption passphrase: ")
wallet = Wallet(wallet_name, pwd, max_mix_depth, options.gaplimit)
except WalletError:
print("Wrong password, try again.")
continue
except Exception as e:
print("Failed to load wallet, error message: " + repr(e))
sys.exit(0)
break
else:
wallet = BitcoinCoreWallet(fromaccount=wallet_name)
sync_wallet(wallet, fast=options.fastsync)

16
scripts/wallet-tool.py

@ -11,7 +11,9 @@ from optparse import OptionParser
from jmclient import (load_program_config, get_network, Wallet,
encryptData, get_p2pk_vbyte, jm_single,
mn_decode, mn_encode, BitcoinCoreInterface,
JsonRpcError, sync_wallet)
JsonRpcError, sync_wallet, WalletError)
from jmbase.support import get_password
import jmclient.btc as btc
@ -103,11 +105,21 @@ if args[0] in noseed_methods:
else:
seed = args[0]
method = ('display' if len(args) == 1 else args[1].lower())
wallet = Wallet(seed,
while True:
try:
pwd = get_password("Enter wallet decryption passphrase: ")
wallet = Wallet(seed, pwd,
options.maxmixdepth,
options.gaplimit,
extend_mixdepth=not maxmixdepth_configured,
storepassword=(method == 'importprivkey'))
except WalletError:
print("Wrong password, try again.")
continue
except Exception as e:
print("Failed to load wallet, error message: " + repr(e))
sys.exit(0)
break
if method == 'history' and not isinstance(jm_single().bc_interface,
BitcoinCoreInterface):
print('showing history only available when using the Bitcoin Core ' +

Loading…
Cancel
Save