Browse Source

add new wallet classes to existing tests

master
undeath 8 years ago
parent
commit
89b5cd4280
  1. 2
      jmclient/jmclient/__init__.py
  2. 14
      jmclient/jmclient/taker.py
  3. 23
      jmclient/jmclient/wallet_utils.py
  4. 94
      jmclient/test/commontest.py
  5. 6
      jmclient/test/taker_test_data.py
  6. 5
      jmclient/test/test_podle.py
  7. 61
      jmclient/test/test_taker.py
  8. 18
      jmclient/test/test_tx_creation.py
  9. 522
      jmclient/test/test_wallets.py
  10. 36
      test/common.py
  11. 203
      test/test_segwit.py

2
jmclient/jmclient/__init__.py

@ -46,7 +46,7 @@ from .taker_utils import (tumbler_taker_finished_update, restart_waiter,
from .wallet_utils import (
wallet_tool_main, wallet_generate_recover_bip39, open_wallet,
open_test_wallet_maybe, create_wallet, get_wallet_cls, get_wallet_path,
wallet_display)
wallet_display, SewgitTestWallet)
from .maker import Maker
from .yieldgenerator import YieldGenerator, YieldGeneratorBasic, ygmain
# Set default logging handler to avoid "No handler found" warnings.

14
jmclient/jmclient/taker.py

@ -4,9 +4,7 @@ from __future__ import print_function
import base64
import pprint
import random
import sys
import time
import copy
from itertools import chain
import btc
from jmclient.configure import jm_single, get_p2pk_vbyte, get_p2sh_vbyte
@ -99,6 +97,7 @@ class Taker(object):
self.waiting_for_conf = False
self.txid = None
self.schedule_index = -1
self.utxos = {}
self.tdestaddrs = [] if not tdestaddrs else tdestaddrs
#allow custom wallet-based clients to use their own signing code;
#currently only setting "wallet" is allowed, calls wallet.sign_tx(tx)
@ -665,9 +664,12 @@ class Taker(object):
#in the transaction, about to be consumed, rather than use
#random utxos that will persist after. At this step we also
#allow use of external utxos in the json file.
if self.wallet.unspent:
if any(self.wallet.get_utxos_by_mixdepth_().values()):
utxos = {}
for mdutxo in self.wallet.get_utxos_by_mixdepth().values():
utxos.update(mdutxo)
priv_utxo_pairs, to, ts = priv_utxo_pairs_from_utxos(
self.wallet.unspent, age, amt)
utxos, age, amt)
#Pre-filter the set of external commitments that work for this
#transaction according to its size and age.
dummy, extdict = get_podle_commitments()
@ -688,7 +690,7 @@ class Taker(object):
"Commitment sourced OK")
else:
errmsgheader, errmsg = generate_podle_error_string(priv_utxo_pairs,
to, ts, self.wallet.unspent, self.cjamount,
to, ts, self.wallet.get_utxos_by_mixdepth(), self.cjamount,
jm_single().config.get("POLICY", "taker_utxo_age"),
jm_single().config.get("POLICY", "taker_utxo_amtpercent"))

23
jmclient/jmclient/wallet_utils.py

@ -1,14 +1,12 @@
from __future__ import print_function
import json
import os
import pprint
import sys
import sqlite3
import binascii
from datetime import datetime
from mnemonic import Mnemonic
from optparse import OptionParser
import getpass
from jmclient import (get_network, WALLET_IMPLEMENTATIONS, Storage, podle,
encryptData, get_p2sh_vbyte, get_p2pk_vbyte, jm_single, mn_decode,
mn_encode, BitcoinCoreInterface, JsonRpcError, sync_wallet, WalletError,
@ -16,6 +14,11 @@ from jmclient import (get_network, WALLET_IMPLEMENTATIONS, Storage, podle,
from jmbase.support import get_password
import jmclient.btc as btc
class SewgitTestWallet(ImportWalletMixin, BIP49Wallet):
TYPE = 'p2sh-p2wpkh'
def get_wallettool_parser():
description = (
'Use this script to monitor and manage your Joinmarket wallet.\n'
@ -836,7 +839,8 @@ def create_wallet(path, password, max_mixdepth, **kwargs):
**kwargs)
def open_test_wallet_maybe(path, seed, max_mixdepth, **kwargs):
def open_test_wallet_maybe(path, seed, max_mixdepth,
test_wallet_cls=SewgitTestWallet, **kwargs):
"""
Create a volatile test wallet if path is a hex-encoded string of length 64,
otherwise run open_wallet().
@ -850,22 +854,19 @@ def open_test_wallet_maybe(path, seed, max_mixdepth, **kwargs):
returns:
wallet object
"""
class SewgitTestWallet(ImportWalletMixin, BIP49Wallet):
TYPE = 'p2sh-p2wpkh'
if len(seed) == SewgitTestWallet.ENTROPY_BYTES * 2:
if len(seed) == test_wallet_cls.ENTROPY_BYTES * 2:
try:
seed = binascii.unhexlify(seed)
except binascii.Error:
pass
else:
storage = VolatileStorage()
SewgitTestWallet.initialize(
test_wallet_cls.initialize(
storage, get_network(), max_mixdepth=max_mixdepth,
entropy=seed)
assert 'ask_for_password' not in kwargs
assert 'read_only' not in kwargs
return SewgitTestWallet(storage, **kwargs)
return test_wallet_cls(storage, **kwargs)
return open_wallet(path, **kwargs)
@ -920,8 +921,8 @@ def get_wallet_cls_from_storage(storage):
def wallet_sanity_check(wallet):
if wallet.network != get_network():
raise Exception("Wallet network mismatch: we are on {} but wallet is "
"on {}".format(get_network(), wallet.network))
raise Exception("Wallet network mismatch: we are on '{}' but wallet "
"is on '{}'.".format(get_network(), wallet.network))
def get_wallet_path(file_name, wallet_dir):

94
jmclient/test/commontest.py

@ -1,19 +1,15 @@
#! /usr/bin/env python
from __future__ import absolute_import
from __future__ import absolute_import, print_function
'''Some helper functions for testing'''
import sys
import os
import time
import binascii
import pexpect
import random
import subprocess
import platform
from decimal import Decimal
from jmclient import (jm_single, Wallet, get_log, estimate_tx_fee,
BlockchainInterface, get_p2sh_vbyte)
from jmclient import (
jm_single, open_test_wallet_maybe, get_log, estimate_tx_fee,
BlockchainInterface, get_p2sh_vbyte, BIP32Wallet, SegwitLegacyWallet)
from jmbase.support import chunks
import jmbitcoin as btc
@ -35,6 +31,8 @@ class DummyBlockchainInterface(BlockchainInterface):
pass
def sync_unspent(self, wallet):
pass
def import_addresses(self, addr_list, wallet_name):
pass
def outputs_watcher(self, wallet_name, notifyaddr,
tx_output_set, uf, cf, tf):
pass
@ -106,29 +104,21 @@ class DummyBlockchainInterface(BlockchainInterface):
def estimate_fee_per_kb(self, N):
return 30000
class TestWallet(Wallet):
"""Implementation of wallet
that allows passing in a password
for removal of command line interrupt.
"""
def __init__(self,
seedarg,
max_mix_depth=2,
gaplimit=6,
extend_mixdepth=False,
storepassword=False,
pwd=None):
self.given_pwd = pwd
super(TestWallet, self).__init__(seedarg,
max_mix_depth,
gaplimit,
extend_mixdepth,
storepassword)
def read_wallet_file_data(self, filename):
return super(TestWallet, self).read_wallet_file_data(
filename, self.given_pwd)
def create_wallet_for_sync(wallet_structure, a, **kwargs):
#We need a distinct seed for each run so as not to step over each other;
#make it through a deterministic hash
seedh = btc.sha256("".join([str(x) for x in a]))[:32]
return make_wallets(
1, [wallet_structure], fixed_seeds=[seedh], **kwargs)[0]['wallet']
def binarize_tx(tx):
for o in tx['outs']:
o['script'] = binascii.unhexlify(o['script'])
for i in tx['ins']:
i['outpoint']['hash'] = binascii.unhexlify(i['outpoint']['hash'])
def make_sign_and_push(ins_full,
wallet,
@ -150,17 +140,17 @@ def make_sign_and_push(ins_full,
'address': output_addr}, {'value': total - amount - fee_est,
'address': change_addr}]
tx = btc.mktx(ins, outs)
de_tx = btc.deserialize(tx)
de_tx = btc.deserialize(btc.mktx(ins, outs))
scripts = {}
for index, ins in enumerate(de_tx['ins']):
utxo = ins['outpoint']['hash'] + ':' + str(ins['outpoint']['index'])
addr = ins_full[utxo]['address']
priv = wallet.get_key_from_addr(addr)
if index % 2:
priv = binascii.unhexlify(priv)
tx = btc.sign(tx, index, priv, hashcode=hashcode)
script = wallet.addr_to_script(ins_full[utxo]['address'])
scripts[index] = (script, ins_full[utxo]['value'])
binarize_tx(de_tx)
de_tx = wallet.sign_tx(de_tx, scripts, hashcode=hashcode)
#pushtx returns False on any error
print btc.deserialize(tx)
print(de_tx)
tx = binascii.hexlify(btc.serialize(de_tx))
push_succeed = jm_single().bc_interface.pushtx(tx)
if push_succeed:
return btc.txhash(tx)
@ -173,8 +163,9 @@ def make_wallets(n,
sdev_amt=0,
start_index=0,
fixed_seeds=None,
test_wallet=False,
passwords=None):
wallet_cls=SegwitLegacyWallet,
mixdepths=5,
populate_internal=False):
'''n: number of wallets to be created
wallet_structure: array of n arrays , each subarray
specifying the number of addresses to be populated with coins
@ -182,34 +173,33 @@ def make_wallets(n,
mean_amt: the number of coins (in btc units) in each address as above
sdev_amt: if randomness in amouts is desired, specify here.
Returns: a dict of dicts of form {0:{'seed':seed,'wallet':Wallet object},1:..,}
Default Wallet constructor is joinmarket.Wallet, else use TestWallet,
which takes a password parameter as in the list passwords.
'''
# FIXME: this is basically the same code as test/common.py
assert mixdepths > 0
if len(wallet_structures) != n:
raise Exception("Number of wallets doesn't match wallet structures")
if not fixed_seeds:
seeds = chunks(binascii.hexlify(os.urandom(15 * n)), 15 * 2)
seeds = chunks(binascii.hexlify(os.urandom(BIP32Wallet.ENTROPY_BYTES * n)),
BIP32Wallet.ENTROPY_BYTES * 2)
else:
seeds = fixed_seeds
wallets = {}
for i in range(n):
if test_wallet:
w = Wallet(seeds[i], passwords[i], max_mix_depth=5)
else:
w = Wallet(seeds[i], None, max_mix_depth=5)
assert len(seeds[i]) == BIP32Wallet.ENTROPY_BYTES * 2
w = open_test_wallet_maybe(seeds[i], seeds[i], mixdepths - 1,
test_wallet_cls=wallet_cls)
wallets[i + start_index] = {'seed': seeds[i],
'wallet': w}
for j in range(5):
for j in range(mixdepths):
for k in range(wallet_structures[i][j]):
deviation = sdev_amt * random.random()
amt = mean_amt - sdev_amt / 2.0 + deviation
if amt < 0: amt = 0.001
amt = float(Decimal(amt).quantize(Decimal(10)**-8))
jm_single().bc_interface.grab_coins(
wallets[i + start_index]['wallet'].get_external_addr(j),
amt)
#reset the index so the coins can be seen if running in same script
wallets[i + start_index]['wallet'].index[j][0] -= wallet_structures[i][j]
w.get_new_addr(j, populate_internal), amt)
return wallets

6
jmclient/test/taker_test_data.py

@ -45,11 +45,11 @@ t_chosen_orders = {u'J659UPUSLLjHJpaB': {u'cjfee': u'0.0002',
"""
t_utxos_by_mixdepth = {0: {u'534b635ed8891f16c4ec5b8236ae86164783903e8e8bb47fa9ef2ca31f3c2d7a:0': {'address': u'mrcNu71ztWjAQA6ww9kHiW3zBWSQidHXTQ',
'value': 200000000}},
1: {u'0780d6e5e381bff01a3519997bb4fcba002493103a198fde334fd264f9835d75:1': {'address': u'mvtY8DVgn3TtvjHbVsauYoSQjAhNqVyqmM',
1: {u'0780d6e5e381bff01a3519997bb4fcba002493103a198fde334fd264f9835d75:1': {'address': u'n31WD8pkfAjg2APV78GnbDTdZb1QonBi5D',
'value': 200000000},
u'7e574db96a4d43a99786b3ea653cda9e4388f377848f489332577e018380cff1:0': {'address': u'n3nELhmU2D7ebGYzJnGFWgVDK3cYErmTcQ',
u'7e574db96a4d43a99786b3ea653cda9e4388f377848f489332577e018380cff1:0': {'address': u'mmVEKH61BZbLbnVEmk9VmojreB4G4PmBPd',
'value': 200000000},
u'dd9711a2ef340750db21efb761f5f7d665d94b312332dc354e252c77e9c48349:0': {'address': u'mxeLuX8PP7qLkcM8uarHmdZyvP1b5e1Ynf',
u'dd9711a2ef340750db21efb761f5f7d665d94b312332dc354e252c77e9c48349:0': {'address': u'msxyyydNXTiBmt3SushXbH5Qh2ukBAThk3',
'value': 200000000}},
2: {},
3: {},

5
jmclient/test/test_podle.py

@ -7,11 +7,6 @@ import binascii
import json
import pytest
import copy
import subprocess
import signal
from commontest import make_wallets
import time
from pprint import pformat
from jmclient import (load_program_config, get_log, jm_single, generate_podle,
generate_podle_error_string, set_commitment_file,
get_commitment_file, PoDLE, get_podle_commitments,

61
jmclient/test/test_taker.py

@ -9,26 +9,53 @@ import shutil
import pytest
import json
from base64 import b64encode
from jmclient import (load_program_config, jm_single, set_commitment_file,
get_commitment_file, AbstractWallet, Taker, SegwitWallet,
get_p2sh_vbyte, get_p2pk_vbyte)
from jmclient import (
load_program_config, jm_single, set_commitment_file, get_commitment_file,
LegacyWallet, Taker, VolatileStorage, get_p2sh_vbyte, get_network)
from taker_test_data import (t_utxos_by_mixdepth, t_selected_utxos, t_orderbook,
t_maker_response, t_chosen_orders, t_dummy_ext)
class DummyWallet(AbstractWallet):
class DummyWallet(LegacyWallet):
def __init__(self):
super(DummyWallet, self).__init__()
self.max_mix_depth = 5
storage = VolatileStorage()
super(DummyWallet, self).initialize(storage, get_network(),
max_mixdepth=5)
super(DummyWallet, self).__init__(storage)
self._add_utxos()
self.inject_addr_get_failure = False
def _add_utxos(self):
for md, utxo in t_utxos_by_mixdepth.items():
for i, (txid, data) in enumerate(utxo.items()):
txid, index = txid.split(':')
self._utxos.add_utxo(binascii.unhexlify(txid), int(index),
(b'dummy', md, i), data['value'], md)
def get_utxos_by_mixdepth(self, verbose=True):
return t_utxos_by_mixdepth
def get_utxos_by_mixdepth_(self, verbose=True):
utxos = self.get_utxos_by_mixdepth(verbose)
utxos_conv = {}
for md, utxo_data in utxos.items():
md_utxo = utxos_conv.setdefault(md, {})
for i, (utxo_hex, data) in enumerate(utxo_data.items()):
utxo, index = utxo_hex.split(':')
data_conv = {
'script': self._ENGINE.address_to_script(data['address']),
'path': (b'dummy', md, i),
'value': data['value']
}
md_utxo[(binascii.unhexlify(utxo), int(index))] = data_conv
return utxos_conv
def select_utxos(self, mixdepth, amount):
if amount > self.get_balance_by_mixdepth()[mixdepth]:
raise Exception("Not enough funds")
return self.get_utxos_by_mixdepth()[mixdepth]
return t_utxos_by_mixdepth[mixdepth]
def get_internal_addr(self, mixing_depth):
if self.inject_addr_get_failure:
@ -135,22 +162,8 @@ def test_make_commitment(createcmtdata, failquery, external):
mixdepth = 0
amount = 110000000
taker = get_taker([(mixdepth, amount, 3, "mnsquzxrHXpFsZeL42qwbKdCP2y1esN3qw")])
taker.wallet.unspent = {'f34b635ed8891f16c4ec5b8236ae86164783903e8e8bb47fa9ef2ca31f3c2d7a:0':
{'address': u'n31WD8pkfAjg2APV78GnbDTdZb1QonBi5D',
'value': 10000000},
'f780d6e5e381bff01a3519997bb4fcba002493103a198fde334fd264f9835d75:1':
{'address': u'mmVEKH61BZbLbnVEmk9VmojreB4G4PmBPd',
'value': 20000000},
'fe574db96a4d43a99786b3ea653cda9e4388f377848f489332577e018380cff1:0':
{'address': u'msxyyydNXTiBmt3SushXbH5Qh2ukBAThk3',
'value': 500000000},
'fd9711a2ef340750db21efb761f5f7d665d94b312332dc354e252c77e9c48349:0':
{'address': u'musGZczug3BAbqobmYherywCwL9REgNaNm',
'value': 500000000}}
taker.cjamount = amount
taker.input_utxos = {'f34b635ed8891f16c4ec5b8236ae86164783903e8e8bb47fa9ef2ca31f3c2d7a:0':
{'address': u'n31WD8pkfAjg2APV78GnbDTdZb1QonBi5D',
'value': 10000000}}
taker.input_utxos = t_utxos_by_mixdepth[0]
if failquery:
jm_single().bc_interface.setQUSFail(True)
taker.make_commitment()
@ -461,7 +474,3 @@ def createcmtdata(request):
load_program_config()
jm_single().bc_interface = DummyBlockchainInterface()
jm_single().config.set("BLOCKCHAIN", "network", "testnet")

18
jmclient/test/test_tx_creation.py

@ -3,18 +3,15 @@ from __future__ import absolute_import
'''Test of unusual transaction types creation and push to
network to check validity.'''
import sys
import os
import time
import binascii
import random
from commontest import make_wallets, make_sign_and_push
import jmbitcoin as bitcoin
import pytest
from jmclient import (load_program_config, jm_single, sync_wallet,
get_p2pk_vbyte, get_log, Wallet, select_gradual,
select, select_greedy, select_greediest, estimate_tx_fee)
from jmclient import (
load_program_config, jm_single, sync_wallet, get_p2pk_vbyte, get_log,
select_gradual, select, select_greedy, select_greediest, estimate_tx_fee)
log = get_log()
#just a random selection of pubkeys for receiving multisigs;
@ -177,15 +174,6 @@ def test_create_sighash_txs(setup_tx_creation):
txid = make_sign_and_push(ins_full, wallet, amount, hashcode=sighash)
assert txid
#Create an invalid sighash single (too many inputs)
extra = wallet.select_utxos(4, 100000000) #just a few more inputs
ins_full.update(extra)
with pytest.raises(Exception) as e_info:
txid = make_sign_and_push(ins_full,
wallet,
amount,
hashcode=bitcoin.SIGHASH_SINGLE)
#trigger insufficient funds
with pytest.raises(Exception) as e_info:
fake_utxos = wallet.select_utxos(4, 1000000000)

522
jmclient/test/test_wallets.py

@ -6,27 +6,15 @@ import sys
import os
import time
import binascii
import random
import subprocess
import datetime
import unittest
from mnemonic import Mnemonic
from ConfigParser import SafeConfigParser, NoSectionError
from decimal import Decimal
from commontest import (interact, make_wallets,
make_sign_and_push)
from commontest import create_wallet_for_sync, make_sign_and_push
import json
import jmbitcoin as bitcoin
import pytest
from jmclient import (load_program_config, jm_single, sync_wallet,
AbstractWallet, get_p2pk_vbyte, get_log, Wallet, select,
select_gradual, select_greedy, select_greediest,
estimate_tx_fee, encryptData, get_network, WalletError,
BitcoinCoreInterface, SegwitWallet,
wallet_generate_recover_bip39, decryptData, encryptData)
from jmbase.support import chunks
from taker_test_data import t_obtained_tx, t_raw_signed_tx
from jmclient import (
load_program_config, jm_single, sync_wallet, get_log,
estimate_tx_fee, BitcoinCoreInterface)
from taker_test_data import t_raw_signed_tx
testdir = os.path.dirname(os.path.realpath(__file__))
log = get_log()
@ -51,8 +39,7 @@ def do_tx(wallet, amount):
def test_query_utxo_set(setup_wallets):
load_program_config()
jm_single().bc_interface.tick_forward_chain_interval = 1
wallet = create_wallet_for_sync("wallet4utxo.json", "4utxo",
[2, 3, 0, 0, 0],
wallet = create_wallet_for_sync([2, 3, 0, 0, 0],
["wallet4utxo.json", "4utxo", [2, 3]])
sync_wallet(wallet, fast=True)
txid = do_tx(wallet, 90000000)
@ -72,199 +59,9 @@ def test_query_utxo_set(setup_wallets):
assert res3 == [None]
def create_wallet_for_sync(wallet_file, password, wallet_structure, a):
#Prepare a testnet wallet file for this wallet
password_key = bitcoin.bin_dbl_sha256(password)
#We need a distinct seed for each run so as not to step over each other;
#make it through a deterministic hash
seedh = bitcoin.sha256("".join([str(x) for x in a]))[:32]
encrypted_seed = encryptData(password_key, seedh.decode('hex'))
timestamp = datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S")
walletfilejson = {'creator': 'joinmarket project',
'creation_time': timestamp,
'encrypted_seed': encrypted_seed.encode('hex'),
'network': get_network()}
walletfile = json.dumps(walletfilejson)
if not os.path.exists('wallets'):
os.makedirs('wallets')
with open(os.path.join('wallets', wallet_file), "wb") as f:
f.write(walletfile)
#The call to Wallet() in make_wallets should now find the file
#and read from it:
return make_wallets(1,
[wallet_structure],
fixed_seeds=[wallet_file],
test_wallet=True,
passwords=[password])[0]['wallet']
@pytest.mark.parametrize(
"num_txs, fake_count, wallet_structure, amount, wallet_file, password",
[
(3, 13, [11, 3, 4, 5, 6], 150000000, 'test_import_wallet.json',
'import-pwd'),
#Uncomment all these for thorough tests. Passing currently.
#Lots of used addresses
#(7, 1, [51, 3, 4, 5, 6], 150000000, 'test_import_wallet.json',
# 'import-pwd'),
#(3, 1, [3, 1, 4, 5, 6], 50000000, 'test_import_wallet.json',
# 'import-pwd'),
#No spams/fakes
#(2, 0, [5, 20, 1, 1, 1], 50000000, 'test_import_wallet.json',
# 'import-pwd'),
#Lots of transactions and fakes
#(25, 30, [30, 20, 1, 1, 1], 50000000, 'test_import_wallet.json',
# 'import-pwd'),
])
def test_wallet_sync_with_fast(setup_wallets, num_txs, fake_count,
wallet_structure, amount, wallet_file, password):
jm_single().bc_interface.tick_forward_chain_interval = 1
wallet = create_wallet_for_sync(wallet_file, password, wallet_structure,
[num_txs, fake_count, wallet_structure,
amount, wallet_file, password])
sync_count = 0
jm_single().bc_interface.wallet_synced = False
while not jm_single().bc_interface.wallet_synced:
sync_wallet(wallet)
sync_count += 1
#avoid infinite loop
assert sync_count < 10
log.debug("Tried " + str(sync_count) + " times")
assert jm_single().bc_interface.wallet_synced
assert not jm_single().bc_interface.fast_sync_called
#do some transactions with the wallet, then close, then resync
for i in range(num_txs):
do_tx(wallet, amount)
log.debug("After doing a tx, index is now: " + str(wallet.index))
#simulate a spammer requesting a bunch of transactions. This
#mimics what happens in CoinJoinOrder.__init__()
for j in range(fake_count):
#Note that as in a real script run,
#the initial call to sync_wallet will
#have set wallet_synced to True, so these will
#trigger actual imports.
cj_addr = wallet.get_internal_addr(0)
change_addr = wallet.get_internal_addr(0)
wallet.update_cache_index()
log.debug("After doing a spam, index is now: " + str(wallet.index))
assert wallet.index[0][1] == num_txs + fake_count * 2 * num_txs
#Attempt re-sync, simulating a script restart.
jm_single().bc_interface.wallet_synced = False
sync_count = 0
#Probably should be fixed in main code:
#wallet.index_cache is only assigned in Wallet.__init__(),
#meaning a second sync in the same script, after some transactions,
#will not know about the latest index_cache value (see is_index_ahead_of_cache),
#whereas a real re-sync will involve reading the cache from disk.
#Hence, simulation of the fact that the cache index will
#be read from the file on restart:
wallet.index_cache = wallet.index
while not jm_single().bc_interface.wallet_synced:
#Wallet.__init__() resets index to zero.
wallet.index = []
for i in range(5):
wallet.index.append([0, 0])
#Wallet.__init__() also updates the cache index
#from file, but we can reuse from the above pre-loop setting,
#since nothing else in sync will overwrite the cache.
#for regtest add_watchonly_addresses does not exit(), so can
#just repeat as many times as possible. This might
#be usable for non-test code (i.e. no need to restart the
#script over and over again)?
sync_count += 1
log.debug("TRYING SYNC NUMBER: " + str(sync_count))
sync_wallet(wallet, fast=True)
assert jm_single().bc_interface.fast_sync_called
#avoid infinite loop on failure.
assert sync_count < 10
#Wallet should recognize index_cache on fast sync, so should not need to
#run sync process more than once.
assert sync_count == 1
#validate the wallet index values after sync
for i, ws in enumerate(wallet_structure):
assert wallet.index[i][0] == ws #spends into external only
#Same number as above; note it includes the spammer's extras.
assert wallet.index[0][1] == num_txs + fake_count * 2 * num_txs
assert wallet.index[1][1] == num_txs #one change per transaction
for i in range(2, 5):
assert wallet.index[i][1] == 0 #unused
#Now try to do more transactions as sanity check.
do_tx(wallet, 50000000)
@pytest.mark.parametrize(
"wallet_structure, wallet_file, password, ic",
[
#As usual, more test cases are preferable but time
#of build test is too long, so only one activated.
#([11,3,4,5,6], 'test_import_wallet.json', 'import-pwd',
# [(12,3),(100,99),(7, 40), (200, 201), (10,0)]
# ),
([1, 3, 0, 2, 9], 'test_import_wallet.json', 'import-pwd',
[(1, 7), (100, 99), (0, 0), (200, 201), (21, 41)]),
])
def test_wallet_sync_from_scratch(setup_wallets, wallet_structure, wallet_file,
password, ic):
"""Simulate a scenario in which we use a new bitcoind, thusly:
generate a new wallet and simply pretend that it has an existing
index_cache. This will force import of all addresses up to
the index_cache values.
"""
wallet = create_wallet_for_sync(wallet_file, password, wallet_structure,
[wallet_structure, wallet_file, password,
ic])
sync_count = 0
jm_single().bc_interface.wallet_synced = False
wallet.index_cache = ic
while not jm_single().bc_interface.wallet_synced:
wallet.index = []
for i in range(5):
wallet.index.append([0, 0])
#will call with fast=False but index_cache exists; should use slow-sync
sync_wallet(wallet)
sync_count += 1
#avoid infinite loop
assert sync_count < 10
log.debug("Tried " + str(sync_count) + " times")
#after #586 we expect to ALWAYS succeed within 2 rounds
assert sync_count <= 2
#for each external branch, the new index may be higher than
#the original index_cache if there was a higher used address
expected_wallet_index = []
for i, val in enumerate(wallet_structure):
if val > wallet.index_cache[i][0]:
expected_wallet_index.append([val, wallet.index_cache[i][1]])
else:
expected_wallet_index.append([wallet.index_cache[i][0],
wallet.index_cache[i][1]])
assert wallet.index == expected_wallet_index
log.debug("This is wallet unspent: ")
log.debug(json.dumps(wallet.unspent, indent=4))
"""Purely blockchaininterface related error condition tests"""
def test_index_ahead_cache(setup_wallets):
"""Artificial test; look into finding a sync mode that triggers this
"""
class NonWallet(object):
pass
wallet = NonWallet()
wallet.index_cache = [[0, 0], [0, 2]]
from jmclient.blockchaininterface import is_index_ahead_of_cache
assert is_index_ahead_of_cache(wallet, 3, 1)
def test_wrong_network_bci(setup_wallets):
rpc = jm_single().bc_interface.jsonRpc
with pytest.raises(Exception) as e_info:
@ -293,31 +90,6 @@ def test_absurd_fee(setup_wallets):
load_program_config()
def test_abstract_wallet(setup_wallets):
class DoNothingWallet(AbstractWallet):
pass
for algo in ["default", "gradual", "greedy", "greediest", "none"]:
jm_single().config.set("POLICY", "merge_algorithm", algo)
if algo == "none":
with pytest.raises(Exception) as e_info:
dnw = DoNothingWallet()
#also test if the config is blank
jm_single().config = SafeConfigParser()
dnw = DoNothingWallet()
assert dnw.utxo_selector == select
else:
dnw = DoNothingWallet()
assert not dnw.get_key_from_addr("a")
assert not dnw.get_utxos_by_mixdepth()
assert not dnw.get_external_addr(1)
assert not dnw.get_internal_addr(0)
dnw.update_cache_index()
dnw.remove_old_utxos("a")
dnw.add_new_utxos("b", "c")
load_program_config()
def check_bip39_case(vectors, language="english"):
mnemo = Mnemonic(language)
for v in vectors:
@ -346,288 +118,6 @@ def test_bip39_vectors(setup_wallets):
vectors = filter(lambda x: len(x[1].split())==12, vectors)
check_bip39_case(vectors)
@pytest.mark.parametrize(
"pwd, me, valid", [
("asingleword", "1234aaaaaaaaaaaaaaaaa", True),
("a whole set of words", "a whole set of words", True),
("wordwithtrailingspaces ", "A few words with trailing ", True),
("monkey", "verylongpasswordindeedxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz", True),
("blablah", "invalidcontainsnonascii\xee", False)
])
def test_create_bip39_with_me(setup_wallets, pwd, me, valid):
def dummyDisplayWords(a, b):
pass
def getMnemonic():
return ("legal winner thank year wave sausage worth useful legal winner thank yellow",
me)
def getPassword():
return pwd
def getWalletFileName():
return "bip39-test-wallet-name-from-callback.json"
def promptMnemonicExtension():
return me
if os.path.exists(os.path.join("wallets", getWalletFileName())):
os.remove(os.path.join("wallets", getWalletFileName()))
success = wallet_generate_recover_bip39("generate",
"wallets",
"wallet.json",
callbacks=(dummyDisplayWords,
getMnemonic,
getPassword,
getWalletFileName,
promptMnemonicExtension))
if not valid:
#wgrb39 returns false for failed wallet creation case
assert not success
return
assert success
#open the wallet file, and decrypt the encrypted mnemonic extension and check
#it's the one we intended.
with open(os.path.join("wallets", getWalletFileName()), 'r') as f:
walletdata = json.load(f)
password_key = bitcoin.bin_dbl_sha256(getPassword())
cleartext = decryptData(password_key,
walletdata['encrypted_mnemonic_extension'].decode('hex'))
assert len(cleartext) >= 79
#throws if not len == 3
padding, me2, checksum = cleartext.split('\xff')
strippedme = me.strip()
assert strippedme == me2
assert checksum == bitcoin.dbl_sha256(strippedme)[:8]
#also test recovery from this combination of mnemonic + extension
if os.path.exists(os.path.join("wallets", getWalletFileName())):
os.remove(os.path.join("wallets", getWalletFileName()))
success = wallet_generate_recover_bip39("recover", "wallets", "wallet.json",
callbacks=(dummyDisplayWords,
getMnemonic,
getPassword,
getWalletFileName,
None))
assert success
with open(os.path.join("wallets", getWalletFileName()), 'r') as f:
walletdata = json.load(f)
password_key = bitcoin.bin_dbl_sha256(getPassword())
cleartext = decryptData(password_key,
walletdata['encrypted_entropy'].decode('hex')).encode('hex')
assert cleartext == "7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f"
def create_default_testnet_wallet():
walletdir = "wallets"
testwalletname = "testwallet.json"
pathtowallet = os.path.join(walletdir, testwalletname)
if os.path.exists(pathtowallet):
os.remove(pathtowallet)
seed = "deadbeef"
return (walletdir, pathtowallet, testwalletname,
SegwitWallet(seed,
None,
5,
6,
extend_mixdepth=False,
storepassword=False))
@pytest.mark.parametrize(
"includecache, wrongnet, storepwd, extendmd, pwdnumtries", [
(False, False, False, False, 100),
(True, False, False, True, 1),
(False, True, False, False, 1),
(False, False, True, False, 1)
])
def test_wallet_create(setup_wallets, includecache, wrongnet, storepwd,
extendmd, pwdnumtries):
walletdir, pathtowallet, testwalletname, wallet = create_default_testnet_wallet(
)
assert wallet.get_key(
4, 1,
17) == "96095d7542e4e832c476b9df7e49ca9e5be61ad3bb8c8a3bdd8e141e2f4caf9101"
assert wallet.get_addr(2, 0, 5) == "2NBUxbEQrGPKrYCV6d4o7Y4AtJ34Uy6gZZg"
jm_single().bc_interface.wallet_synced = True
assert wallet.get_new_addr(1, 0) == "2Mz817RE6zqywgkG2h9cATUoiXwnFSxufk2"
assert wallet.get_external_addr(3) == "2N3gn65WXEzbLnjk5FLDZPc1pL6ebvZAmoA"
addr3internal = wallet.get_internal_addr(3)
assert addr3internal == "2N5NMTYogAyrGhDtWBnVQUp1kgwwFzcf7UM"
assert wallet.get_key_from_addr(
addr3internal) == "089a7173314d29f99e02a37e36da517ce41537a317c83284db1f33dda0af0cc201"
dummyaddr = "mvw1NazKDRbeNufFANqpYNAANafsMC2zVU"
assert not wallet.get_key_from_addr(dummyaddr)
#Make a new Wallet(), and prepare a testnet wallet file for this wallet
password = "dummypassword"
password_key = bitcoin.bin_dbl_sha256(password)
seed = bitcoin.sha256("\xaa" * 64)[:32]
encrypted_seed = encryptData(password_key, seed.decode('hex'))
timestamp = datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S")
net = get_network() if not wrongnet else 'mainnnet'
walletfilejson = {'creator': 'joinmarket project',
'creation_time': timestamp,
'encrypted_seed': encrypted_seed.encode('hex'),
'network': net}
if includecache:
mmd = wallet.max_mix_depth if not extendmd else wallet.max_mix_depth + 5
print("using mmd: " + str(mmd))
walletfilejson.update({'index_cache': [[0, 0]] * mmd})
walletfile = json.dumps(walletfilejson)
if not os.path.exists(walletdir):
os.makedirs(walletdir)
with open(pathtowallet, "wb") as f:
f.write(walletfile)
if wrongnet:
with pytest.raises(ValueError) as e_info:
SegwitWallet(testwalletname,
password,
5,
6,
extend_mixdepth=extendmd,
storepassword=storepwd)
return
from string import ascii_letters
for i in range(
pwdnumtries): #multiple tries to ensure pkcs7 error is triggered
with pytest.raises(WalletError) as e_info:
wrongpwd = "".join([random.choice(ascii_letters) for _ in range(20)
])
SegwitWallet(testwalletname,
wrongpwd,
5,
6,
extend_mixdepth=extendmd,
storepassword=storepwd)
with pytest.raises(WalletError) as e_info:
SegwitWallet(testwalletname,
None,
5,
6,
extend_mixdepth=extendmd,
storepassword=storepwd)
newwallet = SegwitWallet(testwalletname,
password,
5,
6,
extend_mixdepth=extendmd,
storepassword=storepwd)
assert newwallet.seed == wallet.wallet_data_to_seed(seed)
#now we have a functional wallet + file, update the cache; first try
#with failed paths
oldpath = newwallet.path
newwallet.path = None
newwallet.update_cache_index()
newwallet.path = "fake-path-definitely-doesnt-exist"
newwallet.update_cache_index()
#with real path
newwallet.path = oldpath
newwallet.index = [[1, 1]] * 5
newwallet.update_cache_index()
#ensure we cannot find a mainnet wallet from seed
seed = "goodbye"
jm_single().config.set("BLOCKCHAIN", "network", "mainnet")
with pytest.raises(IOError) as e_info:
Wallet(seed, 5, 6, False, False)
load_program_config()
def test_imported_privkey(setup_wallets):
for n in ["mainnet", "testnet"]:
privkey = "7d998b45c219a1e38e99e7cbd312ef67f77a455a9b50c730c27f02c6f730dfb401"
jm_single().config.set("BLOCKCHAIN", "network", n)
password = "dummypassword"
password_key = bitcoin.bin_dbl_sha256(password)
wifprivkey = bitcoin.wif_compressed_privkey(privkey, get_p2pk_vbyte())
#mainnet is "L1RrrnXkcKut5DEMwtDthjwRcTTwED36thyL1DebVrKuwvohjMNi"
#to verify use from_wif_privkey and privkey_to_address
if n == "mainnet":
iaddr = "1LDsjB43N2NAQ1Vbc2xyHca4iBBciN8iwC"
else:
iaddr = "mzjq2E92B3oRB7yDKbwM7XnPaAnKfRERw2"
privkey_bin = bitcoin.from_wif_privkey(
wifprivkey,
vbyte=get_p2pk_vbyte()).decode('hex')[:-1]
encrypted_privkey = encryptData(password_key, privkey_bin)
encrypted_privkey_bad = encryptData(password_key, privkey_bin[:6])
walletdir = "wallets"
testwalletname = "test" + n
pathtowallet = os.path.join(walletdir, testwalletname)
seed = bitcoin.sha256("\xaa" * 64)[:32]
encrypted_seed = encryptData(password_key, seed.decode('hex'))
timestamp = datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S")
for ep in [encrypted_privkey, encrypted_privkey_bad]:
walletfilejson = {'creator': 'joinmarket project',
'creation_time': timestamp,
'encrypted_seed': encrypted_seed.encode('hex'),
'network': n,
'index_cache': [[0, 0]] * 5,
'imported_keys': [
{'encrypted_privkey': ep.encode('hex'),
'mixdepth': 0}
]}
walletfile = json.dumps(walletfilejson)
if not os.path.exists(walletdir):
os.makedirs(walletdir)
with open(pathtowallet, "wb") as f:
f.write(walletfile)
if ep == encrypted_privkey_bad:
with pytest.raises(Exception) as e_info:
Wallet(testwalletname, password, 5, 6, False, False)
continue
newwallet = Wallet(testwalletname, password, 5, 6, False, False)
assert newwallet.seed == seed
#test accessing the key from the addr
assert newwallet.get_key_from_addr(
iaddr) == bitcoin.from_wif_privkey(wifprivkey,
vbyte=get_p2pk_vbyte())
if n == "testnet":
jm_single().bc_interface.sync_wallet(newwallet)
load_program_config()
def test_add_remove_utxos(setup_wallets):
#Make a fake wallet and inject and then remove fake utxos
walletdir, pathtowallet, testwalletname, wallet = create_default_testnet_wallet()
assert wallet.get_addr(2, 0, 5) == "2NBUxbEQrGPKrYCV6d4o7Y4AtJ34Uy6gZZg"
wallet.addr_cache["2NBUxbEQrGPKrYCV6d4o7Y4AtJ34Uy6gZZg"] = (2, 0, 5)
#'a914c80b3c03b96c0da5ef983942d9e541cb788aed8787'
#these calls automatically update the addr_cache:
assert wallet.get_new_addr(1, 0) == "2Mz817RE6zqywgkG2h9cATUoiXwnFSxufk2"
#a9144b6b3836a1708fd38d4728e41b86e69d5bb15d5187
assert wallet.get_external_addr(3) == "2N3gn65WXEzbLnjk5FLDZPc1pL6ebvZAmoA"
#a914728673d95ceafa892ed82f9cc23c8bf1700b6c6187
#using the above pubkey scripts:
faketxforwallet = {'outs': [
{'script': 'a914c80b3c03b96c0da5ef983942d9e541cb788aed8787',
'value': 110000000},
{'script': 'a9144b6b3836a1708fd38d4728e41b86e69d5bb15d5187',
'value': 89910900},
{'script': 'a914728673d95ceafa892ed82f9cc23c8bf1700b6c6187',
'value': 90021000},
{'script':
'76a9145ece2dac945c8ff5b2b6635360ca0478ade305d488ac', #not ours
'value': 110000000}
],
'version': 1}
wallet.add_new_utxos(faketxforwallet, "aa" * 32)
faketxforspending = {'ins': [
{'outpoint': {'hash': 'aa' * 32,
'index': 0}}, {'outpoint': {'hash': 'aa' * 32,
'index': 1}}, {'outpoint':
{'hash':
'aa' * 32,
'index': 2}},
{'outpoint':
{'hash':
'3f3ea820d706e08ad8dc1d2c392c98facb1b067ae4c671043ae9461057bd2a3c',
'index': 1},
'script': '',
'sequence': 4294967295}
]}
wallet.select_utxos(1, 100000)
with pytest.raises(Exception) as e_info:
wallet.select_utxos(0, 100000)
#ensure get_utxos_by_mixdepth can handle utxos outside of maxmixdepth
wallet.max_mix_depth = 2
mul = wallet.get_utxos_by_mixdepth()
assert mul[3] != {}
wallet.remove_old_utxos(faketxforspending)
@pytest.fixture(scope="module")
def setup_wallets():

36
test/common.py

@ -4,7 +4,6 @@ from __future__ import absolute_import
import sys
import os
import time
import binascii
import random
from decimal import Decimal
@ -12,7 +11,8 @@ from decimal import Decimal
data_dir = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
sys.path.insert(0, os.path.join(data_dir))
from jmclient import get_wallet_cls, get_log, estimate_tx_fee, jm_single
from jmclient import open_test_wallet_maybe, BIP32Wallet, SegwitLegacyWallet, \
get_log, estimate_tx_fee, jm_single
import jmbitcoin as btc
from jmbase import chunks
@ -63,7 +63,8 @@ def make_wallets(n,
fixed_seeds=None,
test_wallet=False,
passwords=None,
walletclass=None):
walletclass=SegwitLegacyWallet,
mixdepths=5):
'''n: number of wallets to be created
wallet_structure: array of n arrays , each subarray
specifying the number of addresses to be populated with coins
@ -74,35 +75,36 @@ def make_wallets(n,
Default Wallet constructor is joinmarket.Wallet, else use TestWallet,
which takes a password parameter as in the list passwords.
'''
# FIXME: this is basically the same code as jmclient/test/commontest.py
if len(wallet_structures) != n:
raise Exception("Number of wallets doesn't match wallet structures")
if not fixed_seeds:
seeds = chunks(binascii.hexlify(os.urandom(15 * n)), 15 * 2)
seeds = chunks(binascii.hexlify(os.urandom(BIP32Wallet.ENTROPY_BYTES * n)),
BIP32Wallet.ENTROPY_BYTES * 2)
else:
seeds = fixed_seeds
wallets = {}
for i in range(n):
if test_wallet:
w = TestWallet(seeds[i], max_mix_depth=5, pwd=passwords[i])
assert len(seeds[i]) == BIP32Wallet.ENTROPY_BYTES * 2
# FIXME: pwd is ignored (but do we really need this anyway?)
if test_wallet and passwords and i < len(passwords):
pwd = passwords[i]
else:
if walletclass:
wc = walletclass
else:
wc = get_wallet_cls()
w = wc(seeds[i], pwd=None, max_mix_depth=5)
pwd = None
w = open_test_wallet_maybe(seeds[i], seeds[i], mixdepths,
test_wallet_cls=walletclass)
wallets[i + start_index] = {'seed': seeds[i],
'wallet': w}
for j in range(5):
for j in range(mixdepths):
for k in range(wallet_structures[i][j]):
deviation = sdev_amt * random.random()
amt = mean_amt - sdev_amt / 2.0 + deviation
if amt < 0: amt = 0.001
amt = float(Decimal(amt).quantize(Decimal(10)**-8))
jm_single().bc_interface.grab_coins(
wallets[i + start_index]['wallet'].get_external_addr(j),
amt)
#reset the index so the coins can be seen if running in same script
wallets[i + start_index]['wallet'].index[j][0] -= wallet_structures[i][j]
jm_single().bc_interface.grab_coins(w.get_external_addr(j), amt)
return wallets

203
test/test_segwit.py

@ -2,17 +2,13 @@
from __future__ import absolute_import
'''Test creation of segwit transactions.'''
import sys
import os
import time
import binascii
import json
from common import make_wallets
from pprint import pformat
import jmbitcoin as btc
import pytest
from jmclient import (load_program_config, jm_single, get_p2pk_vbyte,
get_log, get_p2sh_vbyte, Wallet)
from jmclient import load_program_config, jm_single, get_log, LegacyWallet
log = get_log()
@ -30,80 +26,11 @@ def test_segwit_valid_txs(setup_segwit):
#and compare the json values
def get_utxo_from_txid(txid, addr):
"""Given a txid and an address for one of the outputs,
return "txid:n" where n is the index of the output
"""
rawtx = jm_single().bc_interface.rpc("getrawtransaction", [txid, 1])
ins = []
for u in rawtx["vout"]:
if u["scriptPubKey"]["addresses"][0] == addr:
ins.append(txid + ":" + str(u["n"]))
assert len(ins) == 1
return ins[0]
def make_sign_and_push(ins_sw,
wallet,
amount,
other_ins=None,
output_addr=None,
change_addr=None,
hashcode=btc.SIGHASH_ALL):
"""A more complicated version of the function in test_tx_creation;
will merge to this one once finished.
ins_sw have this structure:
{"txid:n":(amount, priv, index), "txid2:n2":(amount2, priv2, index2), ..}
if other_ins is not None, it has the same format,
these inputs are assumed to be plain p2pkh.
All of these inputs in these two sets will be consumed.
They are ordered according to the "index" fields (to allow testing
of specific ordering)
It's assumed that they contain sufficient coins to satisy the
required output specified in "amount", plus some extra for fees and a
change output.
The output_addr and change_addr, if None, are taken from the wallet
and are ordinary p2pkh outputs.
All amounts are in satoshis and only converted to btc for grab_coins
"""
#total value of all inputs
print ins_sw
print other_ins
total = sum([x[0] for x in ins_sw.values()])
total += sum([x[0] for x in other_ins.values()])
#construct the other inputs
ins1 = other_ins
ins1.update(ins_sw)
ins1 = sorted(ins1.keys(), key=lambda k: ins1[k][2])
#random output address and change addr
output_addr = wallet.get_new_addr(1, 1) if not output_addr else output_addr
change_addr = wallet.get_new_addr(1, 0) if not change_addr else change_addr
outs = [{'value': amount,
'address': output_addr}, {'value': total - amount - 10000,
'address': change_addr}]
tx = btc.mktx(ins1, outs)
de_tx = btc.deserialize(tx)
for index, ins in enumerate(de_tx['ins']):
utxo = ins['outpoint']['hash'] + ':' + str(ins['outpoint']['index'])
temp_ins = ins_sw if utxo in ins_sw.keys() else other_ins
amt, priv, n = temp_ins[utxo]
temp_amt = amt if utxo in ins_sw.keys() else None
#for better test code coverage
print "signing tx index: " + str(index) + ", priv: " + priv
if index % 2:
priv = binascii.unhexlify(priv)
ms = "other" if not temp_amt else "amount: " + str(temp_amt)
print ms
tx = btc.sign(tx, index, priv, hashcode=hashcode, amount=temp_amt)
print pformat(btc.deserialize(tx))
txid = jm_single().bc_interface.pushtx(tx)
time.sleep(3)
received = jm_single().bc_interface.get_received_by_addr(
[output_addr], None)['data'][0]['balance']
#check coins were transferred as expected
assert received == amount
#pushtx returns False on any error
return txid
def binarize_tx(tx):
for o in tx['outs']:
o['script'] = binascii.unhexlify(o['script'])
for i in tx['ins']:
i['outpoint']['hash'] = binascii.unhexlify(i['outpoint']['hash'])
@pytest.mark.parametrize(
@ -111,7 +38,7 @@ def make_sign_and_push(ins_sw,
([[1, 0, 0, 0, 0]], 1, 1000000, 1, [0, 1, 2], []),
([[4, 0, 0, 0, 1]], 3, 100000000, 1, [0, 2], [1, 3]),
([[4, 0, 0, 0, 1]], 3, 100000000, 1, [0, 5], [1, 2, 3, 4]),
([[2, 0, 0, 0, 2]], 2, 200000007, 0.3, [0, 1, 4, 5], [2, 3, 6]),
([[4, 0, 0, 0, 0]], 2, 200000007, 0.3, [0, 1, 4, 5], [2, 3, 6]),
])
def test_spend_p2sh_p2wpkh_multi(setup_segwit, wallet_structure, in_amt, amount,
segwit_amt, segwit_ins, o_ins):
@ -127,38 +54,94 @@ def test_spend_p2sh_p2wpkh_multi(setup_segwit, wallet_structure, in_amt, amount,
segwit_ins is a list of input indices (where to place the funding segwit utxos)
other_ins is a list of input indices (where to place the funding non-sw utxos)
"""
wallet = make_wallets(1, wallet_structure, in_amt, walletclass=Wallet)[0]['wallet']
jm_single().bc_interface.sync_wallet(wallet, fast=True)
other_ins = {}
ctr = 0
for k, v in wallet.unspent.iteritems():
#only extract as many non-segwit utxos as we need;
#doesn't matter which they are
if ctr == len(o_ins):
break
other_ins[k] = (v["value"], wallet.get_key_from_addr(v["address"]),
o_ins[ctr])
ctr += 1
ins_sw = {}
for i in range(len(segwit_ins)):
#build segwit ins from "deterministic-random" keys;
#intended to be the same for each run with the same parameters
seed = json.dumps([i, wallet_structure, in_amt, amount, segwit_ins,
other_ins])
priv = btc.sha256(seed) + "01"
pub = btc.privtopub(priv)
#magicbyte is testnet p2sh
addr1 = btc.pubkey_to_p2sh_p2wpkh_address(pub, magicbyte=196)
print "got address for p2shp2wpkh: " + addr1
txid = jm_single().bc_interface.grab_coins(addr1, segwit_amt)
#TODO - int cast, fix?
ins_sw[get_utxo_from_txid(txid, addr1)] = (int(segwit_amt * 100000000),
priv, segwit_ins[i])
#make_sign_and_push will sanity check the received amount is correct
txid = make_sign_and_push(ins_sw, wallet, amount, other_ins)
#will always be False if it didn't push.
MIXDEPTH = 0
# set up wallets and inputs
nsw_wallet = make_wallets(1, wallet_structure, in_amt,
walletclass=LegacyWallet)[0]['wallet']
jm_single().bc_interface.sync_wallet(nsw_wallet, fast=True)
sw_wallet = make_wallets(1, [[len(segwit_ins), 0, 0, 0, 0]], segwit_amt)[0]['wallet']
jm_single().bc_interface.sync_wallet(sw_wallet, fast=True)
nsw_utxos = nsw_wallet.get_utxos_by_mixdepth_()[MIXDEPTH]
sw_utxos = sw_wallet.get_utxos_by_mixdepth_()[MIXDEPTH]
assert len(o_ins) <= len(nsw_utxos), "sync failed"
assert len(segwit_ins) <= len(sw_utxos), "sync failed"
total_amt_in_sat = 0
nsw_ins = {}
for nsw_in_index in o_ins:
total_amt_in_sat += in_amt * 10**8
nsw_ins[nsw_in_index] = nsw_utxos.popitem()
sw_ins = {}
for sw_in_index in segwit_ins:
total_amt_in_sat += int(segwit_amt * 10**8)
sw_ins[sw_in_index] = sw_utxos.popitem()
all_ins = {}
all_ins.update(nsw_ins)
all_ins.update(sw_ins)
# sanity checks
assert len(all_ins) == len(nsw_ins) + len(sw_ins), \
"test broken, duplicate index"
for k in all_ins:
assert 0 <= k < len(all_ins), "test broken, missing input index"
# FIXME: encoding mess, mktx should accept binary input formats
tx_ins = []
for i, (txid, data) in sorted(all_ins.items(), key=lambda x: x[0]):
tx_ins.append('{}:{}'.format(binascii.hexlify(txid[0]), txid[1]))
# create outputs
FEE = 50000
assert FEE < total_amt_in_sat - amount, "test broken, not enough funds"
cj_script = nsw_wallet.get_new_script(MIXDEPTH + 1, True)
change_script = nsw_wallet.get_new_script(MIXDEPTH, True)
change_amt = total_amt_in_sat - amount - FEE
tx_outs = [
{'script': binascii.hexlify(cj_script),
'value': amount},
{'script': binascii.hexlify(change_script),
'value': change_amt}]
tx = btc.deserialize(btc.mktx(tx_ins, tx_outs))
binarize_tx(tx)
# import new addresses to bitcoind
jm_single().bc_interface.import_addresses(
[nsw_wallet.script_to_address(x)
for x in [cj_script, change_script]],
jm_single().bc_interface.get_wallet_name(nsw_wallet))
# sign tx
scripts = {}
for nsw_in_index in o_ins:
inp = nsw_ins[nsw_in_index][1]
scripts[nsw_in_index] = (inp['script'], inp['value'])
nsw_wallet.sign_tx(tx, scripts)
scripts = {}
for sw_in_index in segwit_ins:
inp = sw_ins[sw_in_index][1]
scripts[sw_in_index] = (inp['script'], inp['value'])
sw_wallet.sign_tx(tx, scripts)
print(tx)
# push and verify
txid = jm_single().bc_interface.pushtx(binascii.hexlify(btc.serialize(tx)))
assert txid
balances = jm_single().bc_interface.get_received_by_addr(
[nsw_wallet.script_to_address(cj_script),
nsw_wallet.script_to_address(change_script)], None)['data']
assert balances[0]['balance'] == amount
assert balances[1]['balance'] == change_amt
@pytest.fixture(scope="module")
def setup_segwit():

Loading…
Cancel
Save