Browse Source
Some modifications to Wallet to remove user interaction from module. Removal of ElectrumWrapWallet, moved to the plugin files. Add tests for aes, mnemonics. Add harness for tests in jmclient, conftest.py and commontest Add .coveragerc for these tests locally.master
11 changed files with 660 additions and 214 deletions
@ -0,0 +1,5 @@
|
||||
# .coveragerc to control coverage.py |
||||
[run] |
||||
omit = |
||||
../jmclient/jsonrpc.py |
||||
../jmclient/btc.py |
||||
@ -0,0 +1,238 @@
|
||||
#! /usr/bin/env python |
||||
from __future__ import absolute_import |
||||
'''Some helper functions for testing''' |
||||
|
||||
import sys |
||||
import os |
||||
import time |
||||
import binascii |
||||
import pexpect |
||||
import random |
||||
import subprocess |
||||
import platform |
||||
from decimal import Decimal |
||||
|
||||
from jmclient import (jm_single, Wallet, get_log, estimate_tx_fee, |
||||
BlockchainInterface) |
||||
from jmbase.support import chunks |
||||
import jmbitcoin as btc |
||||
|
||||
log = get_log() |
||||
'''This code is intended to provide |
||||
subprocess startup cross-platform with |
||||
some useful options; it could do with |
||||
some simplification/improvement.''' |
||||
import platform |
||||
OS = platform.system() |
||||
PINL = '\r\n' if OS == 'Windows' else '\n' |
||||
|
||||
class DummyBlockchainInterface(BlockchainInterface): |
||||
def __init__(self): |
||||
self.fake_query_results = None |
||||
self.qusfail = False |
||||
|
||||
def sync_addresses(self, wallet): |
||||
pass |
||||
def sync_unspent(self, wallet): |
||||
pass |
||||
def add_tx_notify(self, |
||||
txd, |
||||
unconfirmfun, |
||||
confirmfun, |
||||
notifyaddr, |
||||
timeoutfun=None): |
||||
pass |
||||
|
||||
def pushtx(self, txhex): |
||||
print("pushing: " + str(txhex)) |
||||
return True |
||||
|
||||
def insert_fake_query_results(self, fqr): |
||||
self.fake_query_results = fqr |
||||
|
||||
def setQUSFail(self, state): |
||||
self.qusfail = state |
||||
|
||||
def query_utxo_set(self, txouts,includeconf=False): |
||||
if self.qusfail: |
||||
#simulate failure to find the utxo |
||||
return [None] |
||||
if self.fake_query_results: |
||||
result = [] |
||||
for x in self.fake_query_results: |
||||
for y in txouts: |
||||
if y == x['utxo']: |
||||
result.append(x) |
||||
return result |
||||
result = [] |
||||
#external maker utxos |
||||
known_outs = {"03243f4a659e278a1333f8308f6aaf32db4692ee7df0340202750fd6c09150f6:1": "03a2d1cbe977b1feaf8d0d5cc28c686859563d1520b28018be0c2661cf1ebe4857", |
||||
"498faa8b22534f3b443c6b0ce202f31e12f21668b4f0c7a005146808f250d4c3:0": "02b4b749d54e96b04066b0803e372a43d6ffa16e75a001ae0ed4b235674ab286be", |
||||
"3f3ea820d706e08ad8dc1d2c392c98facb1b067ae4c671043ae9461057bd2a3c:1": "023bcbafb4f68455e0d1d117c178b0e82a84e66414f0987453d78da034b299c3a9"} |
||||
#our wallet utxos, faked, for podle tests: utxos are doctored (leading 'f'), |
||||
#and the lists are (amt, age) |
||||
wallet_outs = {'f34b635ed8891f16c4ec5b8236ae86164783903e8e8bb47fa9ef2ca31f3c2d7a:0': [10000000, 2], |
||||
'f780d6e5e381bff01a3519997bb4fcba002493103a198fde334fd264f9835d75:1': [20000000, 6], |
||||
'fe574db96a4d43a99786b3ea653cda9e4388f377848f489332577e018380cff1:0': [50000000, 3], |
||||
'fd9711a2ef340750db21efb761f5f7d665d94b312332dc354e252c77e9c48349:0': [50000000, 6]} |
||||
|
||||
if includeconf and set(txouts).issubset(set(wallet_outs)): |
||||
#includeconf used as a trigger for a podle check; |
||||
#here we simulate a variety of amount/age returns |
||||
results = [] |
||||
for to in txouts: |
||||
results.append({'value': wallet_outs[to][0], |
||||
'confirms': wallet_outs[to][1]}) |
||||
return results |
||||
if txouts[0] in known_outs: |
||||
return [{'value': 200000000, |
||||
'address': btc.pubkey_to_address(known_outs[txouts[0]], magicbyte=0x6f), |
||||
'confirms': 20}] |
||||
for t in txouts: |
||||
result_dict = {'value': 10000000000, |
||||
'address': "mrcNu71ztWjAQA6ww9kHiW3zBWSQidHXTQ"} |
||||
if includeconf: |
||||
result_dict['confirms'] = 20 |
||||
result.append(result_dict) |
||||
return result |
||||
|
||||
def estimate_fee_per_kb(self, N): |
||||
return 30000 |
||||
|
||||
class TestWallet(Wallet): |
||||
"""Implementation of wallet |
||||
that allows passing in a password |
||||
for removal of command line interrupt. |
||||
""" |
||||
|
||||
def __init__(self, |
||||
seedarg, |
||||
max_mix_depth=2, |
||||
gaplimit=6, |
||||
extend_mixdepth=False, |
||||
storepassword=False, |
||||
pwd=None): |
||||
self.given_pwd = pwd |
||||
super(TestWallet, self).__init__(seedarg, |
||||
max_mix_depth, |
||||
gaplimit, |
||||
extend_mixdepth, |
||||
storepassword) |
||||
|
||||
def read_wallet_file_data(self, filename): |
||||
return super(TestWallet, self).read_wallet_file_data( |
||||
filename, self.given_pwd) |
||||
|
||||
def make_sign_and_push(ins_full, |
||||
wallet, |
||||
amount, |
||||
output_addr=None, |
||||
change_addr=None, |
||||
hashcode=btc.SIGHASH_ALL, |
||||
estimate_fee = False): |
||||
"""Utility function for easily building transactions |
||||
from wallets |
||||
""" |
||||
total = sum(x['value'] for x in ins_full.values()) |
||||
ins = ins_full.keys() |
||||
#random output address and change addr |
||||
output_addr = wallet.get_new_addr(1, 1) if not output_addr else output_addr |
||||
change_addr = wallet.get_new_addr(1, 0) if not change_addr else change_addr |
||||
fee_est = estimate_tx_fee(len(ins), 2) if estimate_fee else 10000 |
||||
outs = [{'value': amount, |
||||
'address': output_addr}, {'value': total - amount - fee_est, |
||||
'address': change_addr}] |
||||
|
||||
tx = btc.mktx(ins, outs) |
||||
de_tx = btc.deserialize(tx) |
||||
for index, ins in enumerate(de_tx['ins']): |
||||
utxo = ins['outpoint']['hash'] + ':' + str(ins['outpoint']['index']) |
||||
addr = ins_full[utxo]['address'] |
||||
priv = wallet.get_key_from_addr(addr) |
||||
if index % 2: |
||||
priv = binascii.unhexlify(priv) |
||||
tx = btc.sign(tx, index, priv, hashcode=hashcode) |
||||
#pushtx returns False on any error |
||||
print btc.deserialize(tx) |
||||
push_succeed = jm_single().bc_interface.pushtx(tx) |
||||
if push_succeed: |
||||
return btc.txhash(tx) |
||||
else: |
||||
return False |
||||
|
||||
def local_command(command, bg=False, redirect=''): |
||||
if redirect == 'NULL': |
||||
if OS == 'Windows': |
||||
command.append(' > NUL 2>&1') |
||||
elif OS == 'Linux': |
||||
command.extend(['>', '/dev/null', '2>&1']) |
||||
else: |
||||
print "OS not recognised, quitting." |
||||
elif redirect: |
||||
command.extend(['>', redirect]) |
||||
|
||||
if bg: |
||||
#using subprocess.PIPE seems to cause problems |
||||
FNULL = open(os.devnull, 'w') |
||||
return subprocess.Popen(command, |
||||
stdout=FNULL, |
||||
stderr=subprocess.STDOUT, |
||||
close_fds=True) |
||||
else: |
||||
#in case of foreground execution, we can use the output; if not |
||||
#it doesn't matter |
||||
return subprocess.check_output(command) |
||||
|
||||
|
||||
def make_wallets(n, |
||||
wallet_structures=None, |
||||
mean_amt=1, |
||||
sdev_amt=0, |
||||
start_index=0, |
||||
fixed_seeds=None, |
||||
test_wallet=False, |
||||
passwords=None): |
||||
'''n: number of wallets to be created |
||||
wallet_structure: array of n arrays , each subarray |
||||
specifying the number of addresses to be populated with coins |
||||
at each depth (for now, this will only populate coins into 'receive' addresses) |
||||
mean_amt: the number of coins (in btc units) in each address as above |
||||
sdev_amt: if randomness in amouts is desired, specify here. |
||||
Returns: a dict of dicts of form {0:{'seed':seed,'wallet':Wallet object},1:..,} |
||||
Default Wallet constructor is joinmarket.Wallet, else use TestWallet, |
||||
which takes a password parameter as in the list passwords. |
||||
''' |
||||
if len(wallet_structures) != n: |
||||
raise Exception("Number of wallets doesn't match wallet structures") |
||||
if not fixed_seeds: |
||||
seeds = chunks(binascii.hexlify(os.urandom(15 * n)), 15 * 2) |
||||
else: |
||||
seeds = fixed_seeds |
||||
wallets = {} |
||||
for i in range(n): |
||||
if test_wallet: |
||||
w = TestWallet(seeds[i], max_mix_depth=5, pwd=passwords[i]) |
||||
else: |
||||
w = Wallet(seeds[i], max_mix_depth=5) |
||||
wallets[i + start_index] = {'seed': seeds[i], |
||||
'wallet': w} |
||||
for j in range(5): |
||||
for k in range(wallet_structures[i][j]): |
||||
deviation = sdev_amt * random.random() |
||||
amt = mean_amt - sdev_amt / 2.0 + deviation |
||||
if amt < 0: amt = 0.001 |
||||
amt = float(Decimal(amt).quantize(Decimal(10)**-8)) |
||||
jm_single().bc_interface.grab_coins( |
||||
wallets[i + start_index]['wallet'].get_external_addr(j), |
||||
amt) |
||||
#reset the index so the coins can be seen if running in same script |
||||
wallets[i + start_index]['wallet'].index[j][0] -= wallet_structures[i][j] |
||||
return wallets |
||||
|
||||
|
||||
def interact(process, inputs, expected): |
||||
if len(inputs) != len(expected): |
||||
raise Exception("Invalid inputs to interact()") |
||||
for i, inp in enumerate(inputs): |
||||
process.expect(expected[i]) |
||||
process.sendline(inp) |
||||
@ -0,0 +1,54 @@
|
||||
import pytest |
||||
import os |
||||
import time |
||||
import subprocess |
||||
from commontest import local_command |
||||
from jmclient import load_program_config |
||||
|
||||
bitcoin_path = None |
||||
bitcoin_conf = None |
||||
bitcoin_rpcpassword = None |
||||
bitcoin_rpcusername = None |
||||
|
||||
|
||||
def pytest_addoption(parser): |
||||
parser.addoption("--btcroot", action="store", default='', |
||||
help="the fully qualified path to the directory containing "+\ |
||||
"the bitcoin binaries, e.g. /home/user/bitcoin/bin/") |
||||
parser.addoption("--btcconf", action="store", |
||||
help="the fully qualified path to the location of the "+\ |
||||
"bitcoin configuration file you use for testing, e.g. "+\ |
||||
"/home/user/.bitcoin/bitcoin.conf") |
||||
parser.addoption("--btcpwd", |
||||
action="store", |
||||
help="the RPC password for your test bitcoin instance") |
||||
parser.addoption("--btcuser", |
||||
action="store", |
||||
default='bitcoinrpc', |
||||
help="the RPC username for your test bitcoin instance (default=bitcoinrpc)") |
||||
|
||||
def teardown(): |
||||
#shut down bitcoin and remove the regtest dir |
||||
local_command([bitcoin_path + "bitcoin-cli", "-regtest", "-rpcuser=" + bitcoin_rpcusername, |
||||
"-rpcpassword=" + bitcoin_rpcpassword, "stop"]) |
||||
#note, it is better to clean out ~/.bitcoin/regtest but too |
||||
#dangerous to automate it here perhaps |
||||
|
||||
|
||||
@pytest.fixture(scope="session", autouse=True) |
||||
def setup(request): |
||||
request.addfinalizer(teardown) |
||||
|
||||
global bitcoin_conf, bitcoin_path, bitcoin_rpcpassword, bitcoin_rpcusername |
||||
bitcoin_path = request.config.getoption("--btcroot") |
||||
bitcoin_conf = request.config.getoption("--btcconf") |
||||
bitcoin_rpcpassword = request.config.getoption("--btcpwd") |
||||
bitcoin_rpcusername = request.config.getoption("--btcuser") |
||||
|
||||
#start up regtest blockchain |
||||
btc_proc = subprocess.call([bitcoin_path + "bitcoind", "-regtest", |
||||
"-daemon", "-conf=" + bitcoin_conf]) |
||||
time.sleep(3) |
||||
#generate blocks |
||||
local_command([bitcoin_path + "bitcoin-cli", "-regtest", "-rpcuser=" + bitcoin_rpcusername, |
||||
"-rpcpassword=" + bitcoin_rpcpassword, "generate", "101"]) |
||||
@ -0,0 +1,29 @@
|
||||
import jmclient.slowaes as sa |
||||
"""test general AES operation; probably not needed. |
||||
Not included in coverage, but should be included in suite.""" |
||||
import os |
||||
import pytest |
||||
|
||||
def test_pkcs7_bad_padding(): |
||||
#used in seed decryption; check that it throws |
||||
#if wrongly padded (this caused a REAL bug before!) |
||||
bad_padded = ['\x07'*14, '\x07'*31, '\x07'*31+'\x11', '\x07'*31+'\x00', |
||||
'\x07'*14+'\x01\x02'] |
||||
for b in bad_padded: |
||||
with pytest.raises(Exception) as e_info: |
||||
fake_unpadded = sa.strip_PKCS7_padding(b) |
||||
|
||||
def test_aes(): |
||||
cleartext = "This is a test!" |
||||
iv = [103, 35, 148, 239, 76, 213, 47, 118, 255, 222, 123, 176, 106, 134, 98, |
||||
92] |
||||
for ks in [16,24,32]: |
||||
for mode in ["CFB", "CBC", "OFB"]: |
||||
cypherkey = map(ord, os.urandom(ks)) |
||||
moo = sa.AESModeOfOperation() |
||||
mode, orig_len, ciph = moo.encrypt(cleartext, moo.modeOfOperation[mode], |
||||
cypherkey, ks, |
||||
iv) |
||||
decr = moo.decrypt(ciph, orig_len, mode, cypherkey, |
||||
ks, iv) |
||||
assert decr==cleartext |
||||
@ -0,0 +1,55 @@
|
||||
from jmclient import old_mnemonic |
||||
|
||||
import pytest |
||||
|
||||
@pytest.mark.parametrize( |
||||
"seedphrase, key, valid", |
||||
[ |
||||
(["spiral", "squeeze", "strain", "sunset", "suspend", "sympathy", |
||||
"thigh", "throne", "total", "unseen", "weapon", "weary"], |
||||
'0028644c0028644f0028645200286455', |
||||
True), |
||||
(["pair", "bury", "lung", "swim", "orange", "doctor", "numb", "interest", |
||||
"shock", "bloom", "fragile", "screen"], |
||||
'fa92999d01431f961a26c876f55d3f6c', |
||||
True), |
||||
(["check", "squeeze", "strain", "sunset", "suspend", "sympathy", |
||||
"thigh", "throne", "total", "unseen", "weapon", "weary"], |
||||
'0028644c0028644f0028645200286455', |
||||
False), |
||||
(["qwerty", "check", "strain", "sunset", "suspend", "sympathy", |
||||
"thigh", "throne", "total", "unseen", "weapon", "weary"], |
||||
'', |
||||
False), |
||||
(["", "check", "strain", "sunset", "suspend", "sympathy", |
||||
"thigh", "throne", "total", "unseen", "weapon", "weary"], |
||||
'', |
||||
False), |
||||
(["strain", "sunset"], |
||||
'', |
||||
False), |
||||
]) |
||||
def test_old_mnemonic(seedphrase, key, valid): |
||||
if valid: |
||||
assert old_mnemonic.mn_decode(seedphrase) == key |
||||
assert old_mnemonic.mn_encode(key) == seedphrase |
||||
else: |
||||
if len(key) > 0: |
||||
#test cases where the seedphrase is valid |
||||
#but must not match the provided master private key |
||||
assert old_mnemonic.mn_decode(seedphrase) != key |
||||
else: |
||||
#test cases where the seedphrase is intrinsically invalid |
||||
#Already known error condition: an incorrectly short |
||||
#word list will NOT throw an error; this is handled by calling code |
||||
if len(seedphrase) < 12: |
||||
print "For known failure case of seedphrase less than 12: " |
||||
print old_mnemonic.mn_decode(seedphrase) |
||||
else: |
||||
with pytest.raises(Exception) as e_info: |
||||
dummy = old_mnemonic.mn_decode(seedphrase) |
||||
print "Got this return value from mn_decode: " + str(dummy) |
||||
|
||||
|
||||
|
||||
|
||||
@ -0,0 +1,251 @@
|
||||
#! /usr/bin/env python |
||||
from __future__ import absolute_import |
||||
'''Wallet functionality tests.''' |
||||
|
||||
import sys |
||||
import os |
||||
import time |
||||
import binascii |
||||
import pexpect |
||||
import random |
||||
import subprocess |
||||
import datetime |
||||
import unittest |
||||
from ConfigParser import SafeConfigParser, NoSectionError |
||||
from decimal import Decimal |
||||
from commontest import (local_command, interact, make_wallets, make_sign_and_push, |
||||
DummyBlockchainInterface, TestWallet) |
||||
import json |
||||
|
||||
import jmbitcoin as bitcoin |
||||
import pytest |
||||
from jmclient import (load_program_config, jm_single, sync_wallet, AbstractWallet, |
||||
get_p2pk_vbyte, get_log, Wallet, select, select_gradual, |
||||
select_greedy, select_greediest, estimate_tx_fee, encryptData, |
||||
get_network) |
||||
from jmbase.support import chunks |
||||
from taker_test_data import t_obtained_tx |
||||
|
||||
log = get_log() |
||||
|
||||
def do_tx(wallet, amount): |
||||
ins_full = wallet.select_utxos(0, amount) |
||||
cj_addr = wallet.get_internal_addr(1) |
||||
change_addr = wallet.get_internal_addr(0) |
||||
wallet.update_cache_index() |
||||
txid = make_sign_and_push(ins_full, wallet, amount, |
||||
output_addr=cj_addr, |
||||
change_addr=change_addr, |
||||
estimate_fee=True) |
||||
assert txid |
||||
time.sleep(2) #blocks |
||||
jm_single().bc_interface.sync_unspent(wallet) |
||||
|
||||
def test_absurd_fee(setup_wallets): |
||||
jm_single().config.set("POLICY", "absurd_fee_per_kb", "1000") |
||||
with pytest.raises(ValueError) as e_info: |
||||
estimate_tx_fee(10,2) |
||||
load_program_config() |
||||
|
||||
def test_abstract_wallet(setup_wallets): |
||||
class DoNothingWallet(AbstractWallet): |
||||
pass |
||||
for algo in ["default", "gradual", "greedy", "greediest", "none"]: |
||||
jm_single().config.set("POLICY", "merge_algorithm", algo) |
||||
if algo == "none": |
||||
with pytest.raises(Exception) as e_info: |
||||
dnw = DoNothingWallet() |
||||
#also test if the config is blank |
||||
jm_single().config = SafeConfigParser() |
||||
dnw = DoNothingWallet() |
||||
assert dnw.utxo_selector == select |
||||
else: |
||||
dnw = DoNothingWallet() |
||||
assert not dnw.get_key_from_addr("a") |
||||
assert not dnw.get_utxos_by_mixdepth() |
||||
assert not dnw.get_external_addr(1) |
||||
assert not dnw.get_internal_addr(0) |
||||
dnw.update_cache_index() |
||||
dnw.remove_old_utxos("a") |
||||
dnw.add_new_utxos("b", "c") |
||||
load_program_config() |
||||
|
||||
def create_default_testnet_wallet(): |
||||
walletdir = "wallets" |
||||
testwalletname = "testwallet.json" |
||||
pathtowallet = os.path.join(walletdir, testwalletname) |
||||
if os.path.exists(pathtowallet): |
||||
os.remove(pathtowallet) |
||||
seed = "hello" |
||||
return (walletdir, pathtowallet, testwalletname, Wallet(seed, |
||||
5, |
||||
6, |
||||
extend_mixdepth=False, |
||||
storepassword=False)) |
||||
|
||||
@pytest.mark.parametrize( |
||||
"includecache, wrongnet, storepwd, extendmd, pwdnumtries", |
||||
[ |
||||
(False, False, False, False, 1000), |
||||
(True, False, False, True, 1), |
||||
(False, True, False, False, 1), |
||||
(False, False, True, False, 1) |
||||
]) |
||||
def test_wallet_create(setup_wallets, includecache, wrongnet, storepwd, extendmd, |
||||
pwdnumtries): |
||||
walletdir, pathtowallet, testwalletname, wallet = create_default_testnet_wallet() |
||||
assert wallet.get_key(4,1,17) == "1289ca322f96673acef83f396a9735840e3ab69f0459cf9bfa8d9985a876534401" |
||||
assert wallet.get_addr(2,0,5) == "myWPu9QJWHGE79XAmuKkwKgNk8vsr5evpk" |
||||
jm_single().bc_interface.wallet_synced = True |
||||
assert wallet.get_new_addr(1, 0) == "mi88ZgDGPmarzcsU6S437h9CY9BLmgH5M6" |
||||
assert wallet.get_external_addr(3) == "mvChQuChnXVhqvH67wfMxrodPQ7xccdVJU" |
||||
addr3internal = wallet.get_internal_addr(3) |
||||
assert addr3internal == "mv26o79Bauf2miJMoxoSu1vXmfXnk85YPQ" |
||||
assert wallet.get_key_from_addr(addr3internal) == "2a283c9a2168a25509e2fb944939637228c50c8b4fecd9024650316c4584246501" |
||||
dummyaddr = "mvw1NazKDRbeNufFANqpYNAANafsMC2zVU" |
||||
assert not wallet.get_key_from_addr(dummyaddr) |
||||
#Make a new Wallet(), and prepare a testnet wallet file for this wallet |
||||
|
||||
password = "dummypassword" |
||||
password_key = bitcoin.bin_dbl_sha256(password) |
||||
seed = bitcoin.sha256("\xaa"*64)[:32] |
||||
encrypted_seed = encryptData(password_key, seed.decode('hex')) |
||||
timestamp = datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S") |
||||
net = get_network() if not wrongnet else 'mainnnet' |
||||
walletfilejson = {'creator': 'joinmarket project', |
||||
'creation_time': timestamp, |
||||
'encrypted_seed': encrypted_seed.encode('hex'), |
||||
'network': net} |
||||
if includecache: |
||||
mmd = wallet.max_mix_depth if not extendmd else wallet.max_mix_depth + 5 |
||||
print("using mmd: " + str(mmd)) |
||||
walletfilejson.update({'index_cache': [[0,0]]*mmd}) |
||||
walletfile = json.dumps(walletfilejson) |
||||
if not os.path.exists(walletdir): |
||||
os.makedirs(walletdir) |
||||
with open(pathtowallet, "wb") as f: |
||||
f.write(walletfile) |
||||
if wrongnet: |
||||
with pytest.raises(ValueError) as e_info: |
||||
TestWallet(testwalletname, 5, 6, extend_mixdepth=extendmd, |
||||
storepassword=storepwd, pwd=password) |
||||
return |
||||
from string import ascii_letters |
||||
for i in range(pwdnumtries): #multiple tries to ensure pkcs7 error is triggered |
||||
with pytest.raises(ValueError) as e_info: |
||||
wrongpwd = "".join([random.choice(ascii_letters) for _ in range(20)]) |
||||
TestWallet(testwalletname, 5, 6, extend_mixdepth=extendmd, |
||||
storepassword=storepwd, pwd=wrongpwd) |
||||
|
||||
with pytest.raises(ValueError) as e_info: |
||||
TestWallet(testwalletname, 5, 6, extend_mixdepth=extendmd, |
||||
storepassword=storepwd, pwd=None) |
||||
newwallet = TestWallet(testwalletname, 5, 6, extend_mixdepth=extendmd, |
||||
storepassword=storepwd, pwd=password) |
||||
assert newwallet.seed == seed |
||||
#now we have a functional wallet + file, update the cache; first try |
||||
#with failed paths |
||||
oldpath = newwallet.path |
||||
newwallet.path = None |
||||
newwallet.update_cache_index() |
||||
newwallet.path = "fake-path-definitely-doesnt-exist" |
||||
newwallet.update_cache_index() |
||||
#with real path |
||||
newwallet.path = oldpath |
||||
newwallet.index = [[1,1]]*5 |
||||
newwallet.update_cache_index() |
||||
|
||||
#ensure we cannot find a mainnet wallet from seed |
||||
seed = "goodbye" |
||||
jm_single().config.set("BLOCKCHAIN", "network", "mainnet") |
||||
with pytest.raises(IOError) as e_info: |
||||
Wallet(seed, 5, 6, False, False) |
||||
load_program_config() |
||||
|
||||
def test_imported_privkey(setup_wallets): |
||||
jm_single().config.set("BLOCKCHAIN", "network", "mainnet") |
||||
password = "dummypassword" |
||||
password_key = bitcoin.bin_dbl_sha256(password) |
||||
privkey = "L1RrrnXkcKut5DEMwtDthjwRcTTwED36thyL1DebVrKuwvohjMNi" |
||||
#to verify use from_wif_privkey and privkey_to_address |
||||
iaddr = "1LDsjB43N2NAQ1Vbc2xyHca4iBBciN8iwC" |
||||
privkey_bin = bitcoin.from_wif_privkey(privkey, |
||||
vbyte=get_p2pk_vbyte()).decode('hex')[:-1] |
||||
encrypted_privkey = encryptData(password_key, privkey_bin) |
||||
encrypted_privkey_bad = encryptData(password_key, privkey_bin[:6]) |
||||
walletdir = "wallets" |
||||
testwalletname = "testreal" |
||||
pathtowallet = os.path.join(walletdir, testwalletname) |
||||
seed = bitcoin.sha256("\xaa"*64)[:32] |
||||
encrypted_seed = encryptData(password_key, seed.decode('hex')) |
||||
timestamp = datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S") |
||||
for ep in [encrypted_privkey, encrypted_privkey_bad]: |
||||
walletfilejson = {'creator': 'joinmarket project', |
||||
'creation_time': timestamp, |
||||
'encrypted_seed': encrypted_seed.encode('hex'), |
||||
'network': get_network(), |
||||
'index_cache': [[0,0]]*5, |
||||
'imported_keys': [ |
||||
{'encrypted_privkey': ep.encode('hex'), |
||||
'mixdepth': 0}]} |
||||
walletfile = json.dumps(walletfilejson) |
||||
if not os.path.exists(walletdir): |
||||
os.makedirs(walletdir) |
||||
with open(pathtowallet, "wb") as f: |
||||
f.write(walletfile) |
||||
if ep == encrypted_privkey_bad: |
||||
with pytest.raises(Exception) as e_info: |
||||
TestWallet(testwalletname, 5, 6, False, False, pwd=password) |
||||
continue |
||||
newwallet = TestWallet(testwalletname, 5, 6, False, False, pwd=password) |
||||
assert newwallet.seed == seed |
||||
#test accessing the key from the addr |
||||
assert newwallet.get_key_from_addr(iaddr) == bitcoin.from_wif_privkey(privkey) |
||||
load_program_config() |
||||
|
||||
def test_add_remove_utxos(setup_wallets): |
||||
#Make a fake wallet and inject and then remove fake utxos |
||||
walletdir, pathtowallet, testwalletname, wallet = create_default_testnet_wallet() |
||||
assert wallet.get_addr(2,0,5) == "myWPu9QJWHGE79XAmuKkwKgNk8vsr5evpk" |
||||
wallet.addr_cache["myWPu9QJWHGE79XAmuKkwKgNk8vsr5evpk"] = (2, 0, 5) |
||||
#'76a914c55738deaa9861b6022e53a129968cbf354898b488ac' |
||||
#these calls automatically update the addr_cache: |
||||
assert wallet.get_new_addr(1, 0) == "mi88ZgDGPmarzcsU6S437h9CY9BLmgH5M6" |
||||
#76a9141c9761f5fef73bef6aca378c930c59e7e795088488ac |
||||
assert wallet.get_external_addr(3) == "mvChQuChnXVhqvH67wfMxrodPQ7xccdVJU" |
||||
#76a914a115fa0394ce881437a96d443e236b39e07db1f988ac |
||||
#using the above pubkey scripts: |
||||
faketxforwallet = {'outs': |
||||
[{'script': '76a914c55738deaa9861b6022e53a129968cbf354898b488ac', |
||||
'value': 110000000}, |
||||
{'script': '76a9141c9761f5fef73bef6aca378c930c59e7e795088488ac', |
||||
'value': 89910900}, |
||||
{'script': '76a914a115fa0394ce881437a96d443e236b39e07db1f988ac', |
||||
'value': 90021000}, |
||||
{'script': '76a9145ece2dac945c8ff5b2b6635360ca0478ade305d488ac', #not ours |
||||
'value': 110000000}], |
||||
'version': 1} |
||||
wallet.add_new_utxos(faketxforwallet, "aa"*32) |
||||
faketxforspending = {'ins': |
||||
[{'outpoint': {'hash': 'aa'*32, |
||||
'index': 0}}, |
||||
{'outpoint': {'hash': 'aa'*32, |
||||
'index': 1}}, |
||||
{'outpoint': {'hash': 'aa'*32, |
||||
'index': 2}}, |
||||
{'outpoint': {'hash': '3f3ea820d706e08ad8dc1d2c392c98facb1b067ae4c671043ae9461057bd2a3c', |
||||
'index': 1}, |
||||
'script': '', |
||||
'sequence': 4294967295}]} |
||||
wallet.select_utxos(1, 100000) |
||||
with pytest.raises(Exception) as e_info: |
||||
wallet.select_utxos(0, 100000) |
||||
#ensure get_utxos_by_mixdepth can handle utxos outside of maxmixdepth |
||||
wallet.max_mix_depth = 2 |
||||
mul = wallet.get_utxos_by_mixdepth() |
||||
assert mul[3] != {} |
||||
wallet.remove_old_utxos(faketxforspending) |
||||
|
||||
@pytest.fixture(scope="module") |
||||
def setup_wallets(): |
||||
load_program_config() |
||||
Loading…
Reference in new issue