@ -1,5 +1,3 @@
from commontest import DummyBlockchainInterface
import jmbitcoin as bitcoin
import binascii
import os
import copy
@ -10,6 +8,15 @@ import json
import struct
from base64 import b64encode
from typing import Optional
from unittest import IsolatedAsyncioTestCase
from unittest_parametrize import parametrize , ParametrizedTestCase
import jmclient # install asyncioreactor
from twisted . internet import reactor
from commontest import DummyBlockchainInterface
import jmbitcoin as bitcoin
from jmbase import utxostr_to_utxo , hextobin
from jmclient import load_test_config , jm_single , set_commitment_file , \
get_commitment_file , LegacyWallet , Taker , VolatileStorage , \
@ -29,10 +36,14 @@ def convert_utxos(utxodict):
return return_dict
class DummyWallet ( LegacyWallet ) :
def __init__ ( self ) :
storage = VolatileStorage ( )
super ( ) . initialize ( storage , get_network ( ) , max_mixdepth = 5 )
super ( ) . __init__ ( storage )
self . storage = VolatileStorage ( )
super ( ) . initialize ( self . storage , get_network ( ) , max_mixdepth = 5 )
super ( ) . __init__ ( self . storage )
async def async_init ( self , storage , * * kwargs ) :
await super ( ) . async_init ( storage )
self . _add_utxos ( )
self . ex_utxos = { }
self . inject_addr_get_failure = False
@ -72,7 +83,7 @@ class DummyWallet(LegacyWallet):
def remove_extra_utxo ( self , txid , index , md ) :
del self . ex_utxos [ ( txid , index ) ]
def get_utxos_by_mixdepth ( self , include_disabled : bool = False ,
async def get_utxos_by_mixdepth ( self , include_disabled : bool = False ,
verbose : bool = True ,
includeheight : bool = False ,
limit_mixdepth : Optional [ int ] = None ) :
@ -90,8 +101,8 @@ class DummyWallet(LegacyWallet):
retval [ md ] . update ( u )
return retval
def select_utxos ( self , mixdepth , amount , utxo_filter = None , select_fn = None ,
maxheight = None , includeaddr = False ,
async def select_utxos ( self , mixdepth , amount , utxo_filter = None ,
select_fn = None , maxheight = None , includeaddr = False ,
require_auth_address = False ) :
if amount > self . get_balance_by_mixdepth ( ) [ mixdepth ] :
raise NotEnoughFundsException ( amount , self . get_balance_by_mixdepth ( ) [ mixdepth ] )
@ -104,16 +115,16 @@ class DummyWallet(LegacyWallet):
retval [ u ] [ " script " ] = self . addr_to_script ( retval [ u ] [ " address " ] )
return retval
def get_internal_addr ( self , mixing_depth , bci = None ) :
async def get_internal_addr ( self , mixing_depth , bci = None ) :
if self . inject_addr_get_failure :
raise Exception ( " address get failure " )
return " mxeLuX8PP7qLkcM8uarHmdZyvP1b5e1Ynf "
def sign_tx ( self , tx , addrs ) :
async def sign_tx ( self , tx , addrs ) :
print ( " Pretending to sign on addresses: " + str ( addrs ) )
return True , None
def sign ( self , tx , i , priv , amount ) :
async def sign ( self , tx , i , priv , amount ) :
""" Sign a transaction; the amount field
triggers the segwit style signing .
"""
@ -153,17 +164,17 @@ class DummyWallet(LegacyWallet):
def get_path_repr ( self , path ) :
return ' / ' . join ( map ( str , path ) )
def is_standard_wallet_script ( self , path ) :
async def is_standard_wallet_script ( self , path ) :
if path [ 0 ] == " nonstandard_path " :
return False
return True
def script_to_addr ( self , script ,
async def script_to_addr ( self , script ,
validate_cache : bool = False ) :
if self . script_to_path ( script ) [ 0 ] == " nonstandard_path " :
return " dummyaddr "
return super ( ) . script_to_addr ( script ,
validate_cache = validate_cache )
return await super ( ) . script_to_addr (
script , validate_cache = validate_cache )
def dummy_order_chooser ( ) :
@ -176,7 +187,7 @@ def dummy_filter_orderbook(orders_fees, cjamount):
print ( " calling dummy filter orderbook " )
return True
def get_taker ( schedule = None , schedule_len = 0 , on_finished = None ,
async def get_taker ( schedule = None , schedule_len = 0 , on_finished = None ,
filter_orders = None , custom_change = None ) :
if not schedule :
#note, for taker.initalize() this will result in junk
@ -184,26 +195,43 @@ def get_taker(schedule=None, schedule_len=0, on_finished=None,
print ( " Using schedule: " + str ( schedule ) )
on_finished_callback = on_finished if on_finished else taker_finished
filter_orders_callback = filter_orders if filter_orders else dummy_filter_orderbook
taker = Taker ( WalletService ( DummyWallet ( ) ) , schedule , default_max_cj_fee ,
wallet = DummyWallet ( )
await wallet . async_init ( wallet . storage )
taker = Taker ( WalletService ( wallet ) , schedule , default_max_cj_fee ,
callbacks = [ filter_orders_callback , None , on_finished_callback ] ,
custom_change_address = custom_change )
taker . wallet_service . current_blockheight = 10 * * 6
return taker
def test_filter_rejection ( setup_taker ) :
class AsyncioTestCase ( IsolatedAsyncioTestCase , ParametrizedTestCase ) :
def setUp ( self ) :
if not os . path . exists ( " cmtdata " ) :
os . makedirs ( " cmtdata " )
load_test_config ( )
jm_single ( ) . bc_interface = DummyBlockchainInterface ( )
jm_single ( ) . config . set ( " BLOCKCHAIN " , " network " , " testnet " )
def tearDown ( self ) :
from twisted . internet import reactor
for dc in reactor . getDelayedCalls ( ) :
dc . cancel ( )
shutil . rmtree ( " cmtdata " )
async def test_filter_rejection ( self ) :
def filter_orders_reject ( orders_feesl , cjamount ) :
print ( " calling filter orders rejection " )
return False
taker = get_taker ( filter_orders = filter_orders_reject )
taker = await get_taker ( filter_orders = filter_orders_reject )
taker . schedule = [ [ 0 , 20000000 , 3 , " mnsquzxrHXpFsZeL42qwbKdCP2y1esN3qw " , 0 , NO_ROUNDING ] ]
res = taker . initialize ( t_orderbook , [ ] )
res = await taker . initialize ( t_orderbook , [ ] )
assert not res [ 0 ]
taker = get_taker ( filter_orders = filter_orders_reject )
taker = await get_taker ( filter_orders = filter_orders_reject )
taker . schedule = [ [ 0 , 0 , 3 , " mnsquzxrHXpFsZeL42qwbKdCP2y1esN3qw " , 0 , NO_ROUNDING ] ]
res = taker . initialize ( t_orderbook , [ ] )
res = await taker . initialize ( t_orderbook , [ ] )
assert not res [ 0 ]
@pytest . mark . parametrize (
@ parametrize(
" mixdepth, cjamt, failquery, external, expected_success, amtpercent, age, mixdepth_extras " ,
[
( 0 , 110000000 , False , False , True , 0 , 0 , { } ) ,
@ -228,8 +256,9 @@ def test_filter_rejection(setup_taker):
# the timelocked UTXO is big enough:
( 0 , 1110000000 , False , False , False , 20 , 5 , { " custom-script " : { 0 : [ 1000000000 ] } } ) ,
] )
def test_make_commitment ( setup_taker , mixdepth , cjamt , failquery , external ,
expected_success , amtpercent , age , mixdepth_extras ) :
async def test_make_commitment ( self , mixdepth , cjamt , failquery , external ,
expected_success , amtpercent , age ,
mixdepth_extras ) :
def clean_up ( ) :
jm_single ( ) . config . set ( " POLICY " , " taker_utxo_age " , old_taker_utxo_age )
jm_single ( ) . config . set ( " POLICY " , " taker_utxo_amtpercent " , old_taker_utxo_amtpercent )
@ -256,14 +285,16 @@ def test_make_commitment(setup_taker, mixdepth, cjamt, failquery, external,
jm_single ( ) . config . set ( " POLICY " , " taker_utxo_age " , newtua )
jm_single ( ) . config . set ( " POLICY " , " taker_utxo_amtpercent " , newtuap )
taker = get_taker ( [ ( mixdepth , cjamt , 3 , " mnsquzxrHXpFsZeL42qwbKdCP2y1esN3qw " , NO_ROUNDING ) ] )
taker = await get_taker (
[ ( mixdepth , cjamt , 3 , " mnsquzxrHXpFsZeL42qwbKdCP2y1esN3qw " , NO_ROUNDING ) ] )
# modify or add any extra utxos for this run:
for k , v in mixdepth_extras . items ( ) :
if k == " confchange " :
for k2 , v2 in v . items ( ) :
# set the utxos in mixdepth k2 to have confs v2:
cdict = taker . wallet_service . get_utxos_by_mixdepth ( ) [ k2 ]
cdict = (
await taker . wallet_service . get_utxos_by_mixdepth ( ) ) [ k2 ]
jm_single ( ) . bc_interface . set_confs ( { utxo : v2 for utxo in cdict . keys ( ) } )
elif k == " custom-script " :
# note: this is inspired by fidelity bonds, and currently
@ -287,11 +318,12 @@ def test_make_commitment(setup_taker, mixdepth, cjamt, failquery, external,
os . urandom ( 32 ) , 0 , value , k )
taker . cjamount = cjamt
taker . input_utxos = taker . wallet_service . get_utxos_by_mixdepth ( ) [ mixdepth ]
taker . input_utxos = (
await taker . wallet_service . get_utxos_by_mixdepth ( ) ) [ mixdepth ]
taker . mixdepth = mixdepth
if failquery :
jm_single ( ) . bc_interface . setQUSFail ( True )
comm , revelation , msg = taker . make_commitment ( )
comm , revelation , msg = await taker . make_commitment ( )
if expected_success and failquery :
# for manual tests, show the error message:
print ( " Failure case due to QUS fail: " )
@ -308,22 +340,24 @@ def test_make_commitment(setup_taker, mixdepth, cjamt, failquery, external,
assert not comm , " podle was generated but should not have been. "
clean_up ( )
def test_not_found_maker_utxos ( setup_taker ) :
taker = get_taker ( [ ( 0 , 20000000 , 3 , " mnsquzxrHXpFsZeL42qwbKdCP2y1esN3qw " , 0 , NO_ROUNDING ) ] )
async def test_not_found_maker_utxos ( self ) :
taker = await get_taker (
[ ( 0 , 20000000 , 3 , " mnsquzxrHXpFsZeL42qwbKdCP2y1esN3qw " , 0 , NO_ROUNDING ) ] )
orderbook = copy . deepcopy ( t_orderbook )
res = taker . initialize ( orderbook , [ ] )
res = await taker . initialize ( orderbook , [ ] )
taker . orderbook = copy . deepcopy ( t_chosen_orders ) #total_cjfee unaffected, all same
maker_response = copy . deepcopy ( t_maker_response )
jm_single ( ) . bc_interface . setQUSFail ( True )
res = taker . receive_utxos ( maker_response )
res = await taker . receive_utxos ( maker_response )
assert not res [ 0 ]
assert res [ 1 ] == " Not enough counterparties responded to fill, giving up "
jm_single ( ) . bc_interface . setQUSFail ( False )
def test_auth_pub_not_found ( setup_taker ) :
taker = get_taker ( [ ( 0 , 20000000 , 3 , " mnsquzxrHXpFsZeL42qwbKdCP2y1esN3qw " , 0 , NO_ROUNDING ) ] )
async def test_auth_pub_not_found ( self ) :
taker = await get_taker (
[ ( 0 , 20000000 , 3 , " mnsquzxrHXpFsZeL42qwbKdCP2y1esN3qw " , 0 , NO_ROUNDING ) ] )
orderbook = copy . deepcopy ( t_orderbook )
res = taker . initialize ( orderbook , [ ] )
res = await taker . initialize ( orderbook , [ ] )
taker . orderbook = copy . deepcopy ( t_chosen_orders ) #total_cjfee unaffected, all same
maker_response = copy . deepcopy ( t_maker_response )
utxos = [ utxostr_to_utxo ( x ) [ 1 ] for x in [
@ -336,12 +370,12 @@ def test_auth_pub_not_found(setup_taker):
' utxo ' : utxos [ i ] ,
' confirms ' : 20 } for i in range ( 3 ) ]
jm_single ( ) . bc_interface . insert_fake_query_results ( fake_query_results )
res = taker . receive_utxos ( maker_response )
res = await taker . receive_utxos ( maker_response )
assert not res [ 0 ]
assert res [ 1 ] == " Not enough counterparties responded to fill, giving up "
jm_single ( ) . bc_interface . insert_fake_query_results ( None )
@pytest . mark . parametrize (
@ parametrize(
" schedule, highfee, toomuchcoins, minmakers, notauthed, ignored, nocommit " ,
[
( [ ( 0 , 20000000 , 3 , " mnsquzxrHXpFsZeL42qwbKdCP2y1esN3qw " , 0 , NO_ROUNDING ) ] , False , False ,
@ -375,7 +409,7 @@ def test_auth_pub_not_found(setup_taker):
( [ ( 0 , 0 , 5 , " mnsquzxrHXpFsZeL42qwbKdCP2y1esN3qw " , 0 , NO_ROUNDING ) ] , False , False ,
2 , False , [ " J659UPUSLLjHJpaB " , " J65z23xdjxJjC7er " , 0 ] , None ) , #test inadequate for sweep
] )
def test_taker_init ( setup_taker , schedule , highfee , toomuchcoins , minmakers ,
async def test_taker_init ( self , schedule , highfee , toomuchcoins , minmakers ,
notauthed , ignored , nocommit ) :
#these tests do not trigger utxo_retries
oldtakerutxoretries = jm_single ( ) . config . get ( " POLICY " , " taker_utxo_retries " )
@ -393,7 +427,7 @@ def test_taker_init(setup_taker, schedule, highfee, toomuchcoins, minmakers,
oldminmakers = jm_single ( ) . config . get ( " POLICY " , " minimum_makers " )
jm_single ( ) . config . set ( " POLICY " , " minimum_makers " , str ( minmakers ) )
jm_single ( ) . config . set ( " POLICY " , " max_sweep_fee_change " , " 3.0 " )
taker = get_taker ( schedule )
taker = await get_taker ( schedule )
orderbook = copy . deepcopy ( t_orderbook )
if highfee :
for o in orderbook :
@ -406,11 +440,11 @@ def test_taker_init(setup_taker, schedule, highfee, toomuchcoins, minmakers,
if schedule [ 0 ] [ 1 ] == 0.2 :
#triggers calc-ing amount based on a fraction
jm_single ( ) . mincjamount = 50000000 #bigger than 40m = 0.2 * 200m
res = taker . initialize ( orderbook , [ ] )
res = await taker . initialize ( orderbook , [ ] )
assert res [ 0 ]
assert res [ 1 ] == jm_single ( ) . mincjamount
return clean_up ( )
res = taker . initialize ( orderbook , [ ] )
res = await taker . initialize ( orderbook , [ ] )
if toomuchcoins or ignored :
assert not res [ 0 ]
return clean_up ( )
@ -433,7 +467,7 @@ def test_taker_init(setup_taker, schedule, highfee, toomuchcoins, minmakers,
#simulate the effect of a maker giving us a lot more utxos
taker . utxos [ " dummy_for_negative_change " ] = [ ( struct . pack ( b " B " , a ) * 32 , a + 1 ) for a in range ( 7 , 12 ) ]
with pytest . raises ( ValueError ) as e_info :
res = taker . receive_utxos ( maker_response )
res = await taker . receive_utxos ( maker_response )
return clean_up ( )
if schedule [ 0 ] [ 1 ] == 199856001 :
#our own change is greater than zero but less than dust
@ -441,7 +475,7 @@ def test_taker_init(setup_taker, schedule, highfee, toomuchcoins, minmakers,
#(because we need tx creation to complete), but trigger case by
#bumping dust threshold
jm_single ( ) . BITCOIN_DUST_THRESHOLD = 14000
res = taker . receive_utxos ( maker_response )
res = await taker . receive_utxos ( maker_response )
#should have succeeded to build tx
assert res [ 0 ]
#change should be none
@ -457,7 +491,7 @@ def test_taker_init(setup_taker, schedule, highfee, toomuchcoins, minmakers,
#given that real_cjfee = -0.002*x
#change = 200000000 - x - 1000 - 0.002*x
#x*1.002 = 1999999000; x = 199599800
res = taker . receive_utxos ( maker_response )
res = await taker . receive_utxos ( maker_response )
assert not res [ 0 ]
assert res [ 1 ] == " Not enough counterparties responded to fill, giving up "
return clean_up ( )
@ -470,7 +504,7 @@ def test_taker_init(setup_taker, schedule, highfee, toomuchcoins, minmakers,
taker . input_utxos = copy . deepcopy ( t_utxos_by_mixdepth ) [ 0 ]
for k , v in taker . input_utxos . items ( ) :
v [ " value " ] = int ( 0.999805228 * v [ " value " ] )
res = taker . receive_utxos ( maker_response )
res = await taker . receive_utxos ( maker_response )
assert res [ 0 ]
return clean_up ( )
if schedule [ 0 ] [ 3 ] == " mteaYsGsLCL9a4cftZFTpGEWXNwZyDt5KS " :
@ -478,11 +512,11 @@ def test_taker_init(setup_taker, schedule, highfee, toomuchcoins, minmakers,
taker . input_utxos = copy . deepcopy ( t_utxos_by_mixdepth ) [ 0 ]
for k , v in taker . input_utxos . items ( ) :
v [ " value " ] = int ( 0.999805028 * v [ " value " ] )
res = taker . receive_utxos ( maker_response )
res = await taker . receive_utxos ( maker_response )
assert res [ 0 ]
return clean_up ( )
res = taker . receive_utxos ( maker_response )
res = await taker . receive_utxos ( maker_response )
if minmakers != 2 :
assert not res [ 0 ]
assert res [ 1 ] == " Not enough counterparties responded to fill, giving up "
@ -490,24 +524,24 @@ def test_taker_init(setup_taker, schedule, highfee, toomuchcoins, minmakers,
assert res [ 0 ]
#re-calling will trigger "finished" code, since schedule is "complete".
res = taker . initialize ( orderbook , [ ] )
res = await taker . initialize ( orderbook , [ ] )
assert not res [ 0 ]
#some exception cases: no coinjoin address, no change address:
#donations not yet implemented:
taker . my_cj_addr = None
with pytest . raises ( NotImplementedError ) as e_info :
taker . prepare_my_bitcoin_data ( )
await taker . prepare_my_bitcoin_data ( )
with pytest . raises ( NotImplementedError ) as e_info :
a = taker . coinjoin_address ( )
taker . wallet_service . wallet . inject_addr_get_failure = True
taker . my_cj_addr = " dummy "
taker . my_change_addr = None
assert not taker . prepare_my_bitcoin_data ( )
assert not awai t taker . prepare_my_bitcoin_data ( )
#clean up
return clean_up ( )
def test_custom_change ( setup_taker ) :
async def test_custom_change ( self ) :
# create three random custom change addresses, one of each
# known type in Joinmarket.
privs = [ x * 32 + b " \x01 " for x in [ struct . pack ( b ' B ' , y ) for y in range ( 1 , 4 ) ] ]
@ -515,12 +549,12 @@ def test_custom_change(setup_taker):
addrs = [ a . privkey_to_address ( i ) for a , i in zip ( [ BTC_P2PKH , BTC_P2SH_P2WPKH , BTC_P2WPKH ] , privs ) ]
schedule = [ ( 0 , 20000000 , 3 , " mnsquzxrHXpFsZeL42qwbKdCP2y1esN3qw " , 0 , NO_ROUNDING ) ]
for script , addr in zip ( scripts , addrs ) :
taker = get_taker ( schedule , custom_change = addr )
taker = await get_taker ( schedule , custom_change = addr )
orderbook = copy . deepcopy ( t_orderbook )
res = taker . initialize ( orderbook , [ ] )
res = await taker . initialize ( orderbook , [ ] )
taker . orderbook = copy . deepcopy ( t_chosen_orders )
maker_response = copy . deepcopy ( t_maker_response )
res = taker . receive_utxos ( maker_response )
res = await taker . receive_utxos ( maker_response )
assert res [ 0 ]
# ensure that the transaction created for signing has
# the address we intended with the right amount:
@ -542,12 +576,12 @@ def test_custom_change(setup_taker):
custom_change_found = True
assert custom_change_found
@pytest . mark . parametrize (
@ parametrize(
" schedule_len " ,
[
( 7 ) ,
( 7 , ) ,
] )
def test_unconfirm_confirm ( setup_taker , schedule_len ) :
async def test_unconfirm_confirm ( self , schedule_len ) :
""" These functions are: do-nothing by default (unconfirm, for Taker),
and merely update schedule index for confirm ( useful for schedules / tumbles ) .
This tests that the on_finished callback correctly reports the fromtx
@ -555,14 +589,17 @@ def test_unconfirm_confirm(setup_taker, schedule_len):
The exception to the above is that the txd passed in must match
self . latest_tx , so we use a dummy value here for that .
"""
test_unconfirm_confirm = self . test_unconfirm_confirm_0 . __wrapped__
class DummyTx ( object ) :
pass
test_unconfirm_confirm . txflag = True
def finished_for_confirms ( res , fromtx = False , waittime = 0 , txdetails = None ) :
assert res #confirmed should always send true
test_unconfirm_confirm . txflag = fromtx
taker = get_taker ( schedule_len = schedule_len , on_finished = finished_for_confirms )
taker = await get_taker (
schedule_len = schedule_len , on_finished = finished_for_confirms )
taker . latest_tx = DummyTx ( )
taker . latest_tx . vout = " blah "
fake_txd = DummyTx ( )
@ -571,19 +608,19 @@ def test_unconfirm_confirm(setup_taker, schedule_len):
taker . unconfirm_callback ( fake_txd , " b " )
for i in range ( schedule_len - 1 ) :
taker . schedule_index + = 1
fromtx = taker . confirm_callback ( fake_txd , " b " , 1 )
fromtx = await taker . confirm_callback ( fake_txd , " b " , 1 )
assert test_unconfirm_confirm . txflag
taker . schedule_index + = 1
fromtx = taker . confirm_callback ( fake_txd , " b " , 1 )
fromtx = await taker . confirm_callback ( fake_txd , " b " , 1 )
assert not test_unconfirm_confirm . txflag
@pytest . mark . parametrize (
@ parametrize(
" dummyaddr, schedule " ,
[
( " mrcNu71ztWjAQA6ww9kHiW3zBWSQidHXTQ " ,
[ ( 0 , 20000000 , 3 , " mnsquzxrHXpFsZeL42qwbKdCP2y1esN3qw " , 0 ) ] )
[ ( 0 , 20000000 , 3 , " mnsquzxrHXpFsZeL42qwbKdCP2y1esN3qw " , 0 ) ] , ) ,
] )
def test_on_sig ( setup_taker , dummyaddr , schedule ) :
async def test_on_sig ( self , dummyaddr , schedule ) :
#plan: create a new transaction with known inputs and dummy outputs;
#then, create a signature with various inputs, pass in in b64 to on_sig.
#in order for it to verify, the DummyBlockchainInterface will have to
@ -609,7 +646,7 @@ def test_on_sig(setup_taker, dummyaddr, schedule):
tx2 = bitcoin . mktx ( utxos , outs )
#prepare the Taker with the right intermediate data
taker = get_taker ( schedule = schedule )
taker = await get_taker ( schedule = schedule )
taker . nonrespondants = [ " cp1 " , " cp2 " , " cp3 " ]
taker . latest_tx = tx
#my inputs are the first 2 utxos
@ -632,15 +669,15 @@ def test_on_sig(setup_taker, dummyaddr, schedule):
sig , msg = bitcoin . sign ( tx2 , 2 , privs [ 2 ] )
assert sig , " Failed to sign: " + msg
sig3 = b64encode ( tx2 . vin [ 2 ] . scriptSig )
taker . on_sig ( " cp1 " , sig3 )
await taker . on_sig ( " cp1 " , sig3 )
#try sending the same sig again; should be ignored
taker . on_sig ( " cp1 " , sig3 )
await taker . on_sig ( " cp1 " , sig3 )
sig , msg = bitcoin . sign ( tx2 , 3 , privs [ 3 ] )
assert sig , " Failed to sign: " + msg
sig4 = b64encode ( tx2 . vin [ 3 ] . scriptSig )
#try sending junk instead of cp2's correct sig
assert not taker . on_sig ( " cp2 " , str ( " junk " ) ) , " incorrectly accepted junk signature "
taker . on_sig ( " cp2 " , sig4 )
assert not awai t taker . on_sig ( " cp2 " , str ( " junk " ) ) , " incorrectly accepted junk signature "
await taker . on_sig ( " cp2 " , sig4 )
sig , msg = bitcoin . sign ( tx2 , 4 , privs [ 4 ] )
assert sig , " Failed to sign: " + msg
#Before completing with the final signature, which will trigger our own
@ -648,19 +685,19 @@ def test_on_sig(setup_taker, dummyaddr, schedule):
#prevent this signature being accepted.
dbci . setQUSFail ( True )
sig5 = b64encode ( tx2 . vin [ 4 ] . scriptSig )
assert not taker . on_sig ( " cp3 " , sig5 ) , " incorrectly accepted sig5 "
assert not awai t taker . on_sig ( " cp3 " , sig5 ) , " incorrectly accepted sig5 "
#allow it to succeed, and try again
dbci . setQUSFail ( False )
#this should succeed and trigger the we-sign code
taker . on_sig ( " cp3 " , sig5 )
await taker . on_sig ( " cp3 " , sig5 )
@pytest . mark . parametrize (
@ parametrize(
" schedule " ,
[
( [ ( 0 , 20000000 , 3 , " mnsquzxrHXpFsZeL42qwbKdCP2y1esN3qw " ) ] ) ,
( [ ( 0 , 20000000 , 3 , " mnsquzxrHXpFsZeL42qwbKdCP2y1esN3qw " ) ] , ) ,
] )
def test_auth_counterparty ( setup_taker , schedule ) :
taker = get_taker ( schedule = schedule )
async def test_auth_counterparty ( self , schedule ) :
taker = await get_taker ( schedule = schedule )
first_maker_response = t_maker_response [ " J659UPUSLLjHJpaB " ]
utxo , auth_pub , cjaddr , changeaddr , sig , maker_pub = first_maker_response
auth_pub_tweaked = auth_pub [ : 8 ] + auth_pub [ 6 : 8 ] + auth_pub [ 10 : ]
@ -668,19 +705,3 @@ def test_auth_counterparty(setup_taker, schedule):
assert taker . auth_counterparty ( sig , auth_pub , maker_pub )
assert not taker . auth_counterparty ( sig , auth_pub_tweaked , maker_pub )
assert not taker . auth_counterparty ( sig_tweaked , auth_pub , maker_pub )
@pytest . fixture ( scope = " module " )
def setup_taker ( request ) :
def clean ( ) :
from twisted . internet import reactor
for dc in reactor . getDelayedCalls ( ) :
dc . cancel ( )
request . addfinalizer ( clean )
def cmtdatateardown ( ) :
shutil . rmtree ( " cmtdata " )
request . addfinalizer ( cmtdatateardown )
if not os . path . exists ( " cmtdata " ) :
os . makedirs ( " cmtdata " )
load_test_config ( )
jm_single ( ) . bc_interface = DummyBlockchainInterface ( )
jm_single ( ) . config . set ( " BLOCKCHAIN " , " network " , " testnet " )