Browse Source

fix test/unified/test_bumpfee.py

add_frost_channel_encryption
zebra-lucky 4 months ago
parent
commit
d512c58543
  1. 62
      test/unified/common.py
  2. 807
      test/unified/test_bumpfee.py

62
test/unified/common.py

@ -10,6 +10,9 @@ from decimal import Decimal
data_dir = os.path.dirname(os.path.dirname(os.path.realpath(__file__))) data_dir = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
sys.path.insert(0, os.path.join(data_dir)) sys.path.insert(0, os.path.join(data_dir))
from unittest import IsolatedAsyncioTestCase
from twisted.trial.unittest import TestCase as TrialTestCase
from jmbase import get_log from jmbase import get_log
from jmclient import open_test_wallet_maybe, BIP32Wallet, SegwitWallet, \ from jmclient import open_test_wallet_maybe, BIP32Wallet, SegwitWallet, \
estimate_tx_fee, jm_single, WalletService, BaseWallet, WALLET_IMPLEMENTATIONS estimate_tx_fee, jm_single, WalletService, BaseWallet, WALLET_IMPLEMENTATIONS
@ -19,13 +22,24 @@ from jmbase import chunks
log = get_log() log = get_log()
def make_sign_and_push(ins_full,
wallet_service, class TrialAsyncioTestCase(TrialTestCase, IsolatedAsyncioTestCase):
amount,
output_addr=None, def __init__(self, methodName='runTest'):
change_addr=None, IsolatedAsyncioTestCase.__init__(self, methodName)
hashcode=btc.SIGHASH_ALL, TrialTestCase.__init__(self, methodName)
estimate_fee = False):
def __call__(self, *args, **kwds):
return IsolatedAsyncioTestCase.run(self, *args, **kwds)
async def make_sign_and_push(ins_full,
wallet_service,
amount,
output_addr=None,
change_addr=None,
hashcode=btc.SIGHASH_ALL,
estimate_fee = False):
"""Utility function for easily building transactions """Utility function for easily building transactions
from wallets. from wallets.
""" """
@ -33,8 +47,12 @@ def make_sign_and_push(ins_full,
total = sum(x['value'] for x in ins_full.values()) total = sum(x['value'] for x in ins_full.values())
ins = ins_full.keys() ins = ins_full.keys()
#random output address and change addr #random output address and change addr
output_addr = wallet_service.get_new_addr(1, BaseWallet.ADDRESS_TYPE_INTERNAL) if not output_addr else output_addr output_addr = await wallet_service.get_new_addr(
change_addr = wallet_service.get_new_addr(0, BaseWallet.ADDRESS_TYPE_INTERNAL) if not change_addr else change_addr 1,
BaseWallet.ADDRESS_TYPE_INTERNAL) if not output_addr else output_addr
change_addr = wallet_service.get_new_addr(
0,
BaseWallet.ADDRESS_TYPE_INTERNAL) if not change_addr else change_addr
fee_est = estimate_tx_fee(len(ins), 2) if estimate_fee else 10000 fee_est = estimate_tx_fee(len(ins), 2) if estimate_fee else 10000
outs = [{'value': amount, outs = [{'value': amount,
'address': output_addr}, {'value': total - amount - fee_est, 'address': output_addr}, {'value': total - amount - fee_est,
@ -57,17 +75,17 @@ def make_sign_and_push(ins_full,
else: else:
return False return False
def make_wallets(n, async def make_wallets(n,
wallet_structures=None, wallet_structures=None,
mean_amt=1, mean_amt=1,
sdev_amt=0, sdev_amt=0,
start_index=0, start_index=0,
fixed_seeds=None, fixed_seeds=None,
test_wallet=False, test_wallet=False,
passwords=None, passwords=None,
walletclass=SegwitWallet, walletclass=SegwitWallet,
mixdepths=5, mixdepths=5,
fb_indices=[]): fb_indices=[]):
'''n: number of wallets to be created '''n: number of wallets to be created
wallet_structure: array of n arrays , each subarray wallet_structure: array of n arrays , each subarray
specifying the number of addresses to be populated with coins specifying the number of addresses to be populated with coins
@ -105,8 +123,8 @@ def make_wallets(n,
print("for index: {}, we got wallet type: {}".format(i, wc)) print("for index: {}, we got wallet type: {}".format(i, wc))
else: else:
wc = walletclass wc = walletclass
w = open_test_wallet_maybe(seeds[i], seeds[i], mixdepths - 1, w = await open_test_wallet_maybe(seeds[i], seeds[i], mixdepths - 1,
test_wallet_cls=wc) test_wallet_cls=wc)
wallet_service = WalletService(w) wallet_service = WalletService(w)
wallets[i + start_index] = {'seed': seeds[i].decode('ascii'), wallets[i + start_index] = {'seed': seeds[i].decode('ascii'),

807
test/unified/test_bumpfee.py

@ -7,6 +7,8 @@ from scripts.bumpfee import (
check_valid_candidate, compute_bump_fee, check_valid_candidate, compute_bump_fee,
create_bumped_tx, sign_transaction, sign_psbt) create_bumped_tx, sign_transaction, sign_psbt)
from common import TrialAsyncioTestCase
def fund_wallet_addr(wallet, addr, value_btc=1): def fund_wallet_addr(wallet, addr, value_btc=1):
# special case, grab_coins returns hex from rpc: # special case, grab_coins returns hex from rpc:
txin_id = hextobin(jm_single().bc_interface.grab_coins(addr, value_btc)) txin_id = hextobin(jm_single().bc_interface.grab_coins(addr, value_btc))
@ -16,396 +18,415 @@ def fund_wallet_addr(wallet, addr, value_btc=1):
assert len(utxo_in) == 1 assert len(utxo_in) == 1
return list(utxo_in.keys())[0] return list(utxo_in.keys())[0]
def test_tx_vsize(setup_wallet):
# tests that we correctly compute the transaction size class BumpFeeTests(TrialAsyncioTestCase):
wallet = setup_wallet[0]
wallet_service = setup_wallet[1] async def asyncSetUp(self):
wallet_service.resync_wallet() load_test_config()
addr = wallet.get_external_addr(0) btc.select_chain_params("bitcoin/regtest")
utxo = fund_wallet_addr(wallet, addr) #see note in cryptoengine.py:
amount_sats = 10**7 cryptoengine.BTC_P2WPKH.VBYTE = 100
tx = btc.mktx([utxo], jm_single().bc_interface.tick_forward_chain_interval = 2
[{"address": str(btc.CCoinAddress.from_scriptPubKey( jm_single().config.set('BLOCKCHAIN', 'network', 'testnet')
btc.CScript(b"\x00").to_p2sh_scriptPubKey())), mnemonic = 'zoo zoo zoo zoo zoo zoo zoo zoo zoo zoo zoo abstract'
"value": amount_sats}, entropy = SegwitWallet.entropy_from_mnemonic(mnemonic)
{"address": wallet.get_internal_addr(0), storage = VolatileStorage()
"value": 10**8 - amount_sats - 142}]) SegwitWallet.initialize(
tx.vin[0].nSequence = 0xffffffff - 2 # mark as replaceable storage, get_network(), entropy=entropy, max_mixdepth=1)
success, msg = wallet.sign_tx(tx, {0: (wallet.addr_to_script(addr), 10**8)}) self.wallet = wallet = SegwitWallet(storage)
await wallet.async_init(storage)
assert btc.tx_vsize(tx) in (142, 143) # transaction size may vary due to signature self.wallet_service = WalletService(wallet)
def test_check_valid_candidate_confirmed_tx(setup_wallet): async def test_tx_vsize(self):
# test that the replaceable transaction is unconfirmed # tests that we correctly compute the transaction size
wallet = setup_wallet[0] wallet = self.wallet
wallet_service = setup_wallet[1] wallet_service = self.wallet_service
wallet_service.resync_wallet() await wallet_service.resync_wallet()
addr = wallet.get_external_addr(0) addr = await wallet.get_external_addr(0)
utxo = fund_wallet_addr(wallet, addr) utxo = fund_wallet_addr(wallet, addr)
amount_sats = 10**7 amount_sats = 10**7
tx = btc.mktx([utxo], tx = btc.mktx([utxo],
[{"address": str(btc.CCoinAddress.from_scriptPubKey( [{"address": str(btc.CCoinAddress.from_scriptPubKey(
btc.CScript(b"\x00").to_p2sh_scriptPubKey())), btc.CScript(b"\x00").to_p2sh_scriptPubKey())),
"value": amount_sats}, "value": amount_sats},
{"address": wallet.get_internal_addr(0), {"address": await wallet.get_internal_addr(0),
"value": 10**8 - amount_sats - 142}]) "value": 10**8 - amount_sats - 142}])
success, msg = wallet.sign_tx(tx, {0: (wallet.addr_to_script(addr), 10**8)}) tx.vin[0].nSequence = 0xffffffff - 2 # mark as replaceable
success = jm_single().bc_interface.pushtx(tx.serialize()) success, msg = await wallet.sign_tx(
jm_single().bc_interface.tick_forward_chain(1) tx, {0: (wallet.addr_to_script(addr), 10**8)})
with pytest.raises(RuntimeWarning, match="Transaction already confirmed. Nothing to do."): assert btc.tx_vsize(tx) in (142, 143) # transaction size may vary due to signature
check_valid_candidate(tx, wallet)
async def test_check_valid_candidate_confirmed_tx(self):
def test_check_valid_candidate_unowned_input(setup_wallet): # test that the replaceable transaction is unconfirmed
# tests that all inputs in the replaceable transaction belong to the wallet wallet = self.wallet
wallet = setup_wallet[0] wallet_service = self.wallet_service
wallet_service = setup_wallet[1] await wallet_service.resync_wallet()
wallet_service.resync_wallet() addr = await wallet.get_external_addr(0)
addr = wallet.get_external_addr(0) utxo = fund_wallet_addr(wallet, addr)
utxo = fund_wallet_addr(wallet, addr) amount_sats = 10**7
amount_sats = 10**7 tx = btc.mktx([utxo],
[{"address": str(btc.CCoinAddress.from_scriptPubKey(
mnemonic = 'abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about' btc.CScript(b"\x00").to_p2sh_scriptPubKey())),
entropy = SegwitWallet.entropy_from_mnemonic(mnemonic) "value": amount_sats},
storage = VolatileStorage() {"address": await wallet.get_internal_addr(0),
SegwitWallet.initialize( "value": 10**8 - amount_sats - 142}])
storage, get_network(), entropy=entropy, max_mixdepth=0) success, msg = await wallet.sign_tx(
wallet_ext = SegwitWallet(storage) tx, {0: (wallet.addr_to_script(addr), 10**8)})
addr_ext = wallet_ext.get_external_addr(0) success = jm_single().bc_interface.pushtx(tx.serialize())
utxo_ext = fund_wallet_addr(wallet_ext, addr_ext) jm_single().bc_interface.tick_forward_chain(1)
tx = btc.mktx([utxo, utxo_ext], with pytest.raises(RuntimeWarning, match="Transaction already confirmed. Nothing to do."):
[{"address": str(btc.CCoinAddress.from_scriptPubKey( check_valid_candidate(tx, wallet)
btc.CScript(b"\x00").to_p2sh_scriptPubKey())),
"value": amount_sats}, async def test_check_valid_candidate_unowned_input(self):
{"address": wallet.get_internal_addr(0), # tests that all inputs in the replaceable transaction belong to the wallet
"value": (2 * 10**8) - amount_sats - 210}]) wallet = self.wallet
tx.vin[0].nSequence = 0xffffffff - 2 # mark as replaceable wallet_service = self.wallet_service
success, msg = wallet.sign_tx(tx, {0: (wallet.addr_to_script(addr), 10**8)}) await wallet_service.resync_wallet()
success, msg = wallet_ext.sign_tx(tx, {1: (wallet_ext.addr_to_script(addr_ext), 10**8)}) addr = await wallet.get_external_addr(0)
success = jm_single().bc_interface.pushtx(tx.serialize()) utxo = fund_wallet_addr(wallet, addr)
amount_sats = 10**7
with pytest.raises(ValueError, match="Transaction inputs should belong to the wallet."):
check_valid_candidate(tx, wallet) mnemonic = 'abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about'
entropy = SegwitWallet.entropy_from_mnemonic(mnemonic)
def test_check_valid_candidate_explicit_output_index(setup_wallet): storage = VolatileStorage()
# tests that there's at least one output that we own and can deduct fees SegwitWallet.initialize(
wallet = setup_wallet[0] storage, get_network(), entropy=entropy, max_mixdepth=0)
wallet_service = setup_wallet[1] wallet_ext = SegwitWallet(storage)
wallet_service.resync_wallet() await wallet_ext.async_init(storage)
addr = wallet.get_external_addr(0) addr_ext = await wallet_ext.get_external_addr(0)
utxo = fund_wallet_addr(wallet, addr) utxo_ext = fund_wallet_addr(wallet_ext, addr_ext)
amount_sats = 10**7
tx = btc.mktx([utxo], tx = btc.mktx([utxo, utxo_ext],
[{"address": str(btc.CCoinAddress.from_scriptPubKey( [{"address": str(btc.CCoinAddress.from_scriptPubKey(
btc.CScript(b"\x00").to_p2sh_scriptPubKey())), btc.CScript(b"\x00").to_p2sh_scriptPubKey())),
"value": 10**8 - amount_sats - 143}, "value": amount_sats},
{"address": str(btc.CCoinAddress.from_scriptPubKey( {"address": await wallet.get_internal_addr(0),
btc.CScript(b"\x01").to_p2sh_scriptPubKey())), "value": (2 * 10**8) - amount_sats - 210}])
"value": amount_sats}]) tx.vin[0].nSequence = 0xffffffff - 2 # mark as replaceable
tx.vin[0].nSequence = 0xffffffff - 2 # mark as replaceable success, msg = await wallet.sign_tx(
success, msg = wallet.sign_tx(tx, {0: (wallet.addr_to_script(addr), 10**8)}) tx, {0: (wallet.addr_to_script(addr), 10**8)})
success = jm_single().bc_interface.pushtx(tx.serialize()) success, msg = await wallet_ext.sign_tx(
tx, {1: (wallet_ext.addr_to_script(addr_ext), 10**8)})
assert check_valid_candidate(tx, wallet, 0) == None success = jm_single().bc_interface.pushtx(tx.serialize())
def test_check_valid_candidate_one_output(setup_wallet): with pytest.raises(ValueError, match="Transaction inputs should belong to the wallet."):
# tests that there's at least one output that we own and can deduct fees check_valid_candidate(tx, wallet)
wallet = setup_wallet[0]
wallet_service = setup_wallet[1] async def test_check_valid_candidate_explicit_output_index(self):
wallet_service.resync_wallet() # tests that there's at least one output that we own and can deduct fees
addr = wallet.get_external_addr(0) wallet = self.wallet
utxo = fund_wallet_addr(wallet, addr) wallet_service = self.wallet_service
amount_sats = 10**7 await wallet_service.resync_wallet()
tx = btc.mktx([utxo], addr = await wallet.get_external_addr(0)
[{"address": str(btc.CCoinAddress.from_scriptPubKey( utxo = fund_wallet_addr(wallet, addr)
btc.CScript(b"\x00").to_p2sh_scriptPubKey())), amount_sats = 10**7
"value": 10**8 - 111}]) tx = btc.mktx([utxo],
tx.vin[0].nSequence = 0xffffffff - 2 # mark as replaceable [{"address": str(btc.CCoinAddress.from_scriptPubKey(
success, msg = wallet.sign_tx(tx, {0: (wallet.addr_to_script(addr), 10**8)}) btc.CScript(b"\x00").to_p2sh_scriptPubKey())),
success = jm_single().bc_interface.pushtx(tx.serialize()) "value": 10**8 - amount_sats - 143},
{"address": str(btc.CCoinAddress.from_scriptPubKey(
assert check_valid_candidate(tx, wallet) == None btc.CScript(b"\x01").to_p2sh_scriptPubKey())),
"value": amount_sats}])
def test_check_valid_candidate_no_owned_outputs(setup_wallet): tx.vin[0].nSequence = 0xffffffff - 2 # mark as replaceable
# tests that there's at least one output that we own and can deduct fees success, msg = await wallet.sign_tx(
wallet = setup_wallet[0] tx, {0: (wallet.addr_to_script(addr), 10**8)})
wallet_service = setup_wallet[1] success = jm_single().bc_interface.pushtx(tx.serialize())
wallet_service.resync_wallet()
addr = wallet.get_external_addr(0) assert check_valid_candidate(tx, wallet, 0) == None
utxo = fund_wallet_addr(wallet, addr)
amount_sats = 10**7 async def test_check_valid_candidate_one_output(self):
tx = btc.mktx([utxo], # tests that there's at least one output that we own and can deduct fees
[{"address": str(btc.CCoinAddress.from_scriptPubKey( wallet = self.wallet
btc.CScript(b"\x00").to_p2sh_scriptPubKey())), wallet_service = self.wallet_service
"value": 10**8 - amount_sats - 143}, await wallet_service.resync_wallet()
{"address": str(btc.CCoinAddress.from_scriptPubKey( addr = await wallet.get_external_addr(0)
btc.CScript(b"\x01").to_p2sh_scriptPubKey())), utxo = fund_wallet_addr(wallet, addr)
"value": amount_sats}]) amount_sats = 10**7
tx.vin[0].nSequence = 0xffffffff - 2 # mark as replaceable tx = btc.mktx([utxo],
success, msg = wallet.sign_tx(tx, {0: (wallet.addr_to_script(addr), 10**8)}) [{"address": str(btc.CCoinAddress.from_scriptPubKey(
success = jm_single().bc_interface.pushtx(tx.serialize()) btc.CScript(b"\x00").to_p2sh_scriptPubKey())),
"value": 10**8 - 111}])
with pytest.raises(ValueError, match="Transaction has no obvious output we can deduct fees from. " tx.vin[0].nSequence = 0xffffffff - 2 # mark as replaceable
"Specify the output to pay from using the -o option."): success, msg = await wallet.sign_tx(
check_valid_candidate(tx, wallet) tx, {0: (wallet.addr_to_script(addr), 10**8)})
success = jm_single().bc_interface.pushtx(tx.serialize())
def test_check_valid_candidate(setup_wallet):
# tests that all checks are passed for a valid replaceable transaction assert check_valid_candidate(tx, wallet) == None
wallet = setup_wallet[0]
wallet_service = setup_wallet[1] async def test_check_valid_candidate_no_owned_outputs(self):
wallet_service.resync_wallet() # tests that there's at least one output that we own and can deduct fees
addr = wallet.get_external_addr(0) wallet = self.wallet
utxo = fund_wallet_addr(wallet, addr) wallet_service = self.wallet_service
amount_sats = 10**7 await wallet_service.resync_wallet()
tx = btc.mktx([utxo], addr = await wallet.get_external_addr(0)
[{"address": str(btc.CCoinAddress.from_scriptPubKey( utxo = fund_wallet_addr(wallet, addr)
btc.CScript(b"\x00").to_p2sh_scriptPubKey())), amount_sats = 10**7
"value": amount_sats}, tx = btc.mktx([utxo],
{"address": wallet.get_internal_addr(0), [{"address": str(btc.CCoinAddress.from_scriptPubKey(
"value": 10**8 - amount_sats - 142}]) btc.CScript(b"\x00").to_p2sh_scriptPubKey())),
tx.vin[0].nSequence = 0xffffffff - 2 # mark as replaceable "value": 10**8 - amount_sats - 143},
success, msg = wallet.sign_tx(tx, {0: (wallet.addr_to_script(addr), 10**8)}) {"address": str(btc.CCoinAddress.from_scriptPubKey(
success = jm_single().bc_interface.pushtx(tx.serialize()) btc.CScript(b"\x01").to_p2sh_scriptPubKey())),
"value": amount_sats}])
assert check_valid_candidate(tx, wallet) == None tx.vin[0].nSequence = 0xffffffff - 2 # mark as replaceable
success, msg = await wallet.sign_tx(
def test_compute_bump_fee(setup_wallet): tx, {0: (wallet.addr_to_script(addr), 10**8)})
# tests that the compute_bump_fee method correctly calculates success = jm_single().bc_interface.pushtx(tx.serialize())
# the fee by which to bump the transaction
wallet = setup_wallet[0] with pytest.raises(ValueError, match="Transaction has no obvious output we can deduct fees from. "
wallet_service = setup_wallet[1] "Specify the output to pay from using the -o option."):
wallet_service.resync_wallet() check_valid_candidate(tx, wallet)
addr = wallet.get_external_addr(0)
utxo = fund_wallet_addr(wallet, addr) async def test_check_valid_candidate(self):
amount_sats = 10**7 # tests that all checks are passed for a valid replaceable transaction
tx = btc.mktx([utxo], wallet = self.wallet
[{"address": str(btc.CCoinAddress.from_scriptPubKey( wallet_service = self.wallet_service
btc.CScript(b"\x00").to_p2sh_scriptPubKey())), await wallet_service.resync_wallet()
"value": amount_sats}, addr = await wallet.get_external_addr(0)
{"address": wallet.get_internal_addr(0), utxo = fund_wallet_addr(wallet, addr)
"value": 10**8 - amount_sats - 142}]) amount_sats = 10**7
tx.vin[0].nSequence = 0xffffffff - 2 # mark as replaceable tx = btc.mktx([utxo],
success, msg = wallet.sign_tx(tx, {0: (wallet.addr_to_script(addr), 10**8)}) [{"address": str(btc.CCoinAddress.from_scriptPubKey(
success = jm_single().bc_interface.pushtx(tx.serialize()) btc.CScript(b"\x00").to_p2sh_scriptPubKey())),
"value": amount_sats},
assert compute_bump_fee(tx, 2000) in (142, 144) # will vary depending on signature size {"address": await wallet.get_internal_addr(0),
"value": 10**8 - amount_sats - 142}])
def test_create_bumped_tx(setup_wallet): tx.vin[0].nSequence = 0xffffffff - 2 # mark as replaceable
# tests that the bumped transaction has a change output with amount success, msg = await wallet.sign_tx(
# less the bump fee tx, {0: (wallet.addr_to_script(addr), 10**8)})
wallet = setup_wallet[0] success = jm_single().bc_interface.pushtx(tx.serialize())
wallet_service = setup_wallet[1]
wallet_service.resync_wallet() assert check_valid_candidate(tx, wallet) == None
addr = wallet.get_external_addr(0)
utxo = fund_wallet_addr(wallet, addr) async def test_compute_bump_fee(self):
amount_sats = 10**7 # tests that the compute_bump_fee method correctly calculates
tx = btc.mktx([utxo], # the fee by which to bump the transaction
[{"address": str(btc.CCoinAddress.from_scriptPubKey( wallet = self.wallet
btc.CScript(b"\x00").to_p2sh_scriptPubKey())), wallet_service = self.wallet_service
"value": amount_sats}, await wallet_service.resync_wallet()
{"address": wallet.get_internal_addr(0), addr = await wallet.get_external_addr(0)
"value": 10**8 - amount_sats - 142}]) utxo = fund_wallet_addr(wallet, addr)
tx.vin[0].nSequence = 0xffffffff - 2 # mark as replaceable amount_sats = 10**7
success, msg = wallet.sign_tx(tx, {0: (wallet.addr_to_script(addr), 10**8)}) tx = btc.mktx([utxo],
success = jm_single().bc_interface.pushtx(tx.serialize()) [{"address": str(btc.CCoinAddress.from_scriptPubKey(
orig_tx = tx.clone() btc.CScript(b"\x00").to_p2sh_scriptPubKey())),
"value": amount_sats},
bumped_tx = create_bumped_tx(tx, 2000, wallet) {"address": await wallet.get_internal_addr(0),
"value": 10**8 - amount_sats - 142}])
assert orig_tx.vin[0] == bumped_tx.vin[0] tx.vin[0].nSequence = 0xffffffff - 2 # mark as replaceable
assert orig_tx.vout[0] == bumped_tx.vout[0] success, msg = await wallet.sign_tx(
assert (orig_tx.vout[1].nValue - bumped_tx.vout[1].nValue) in (142, 144) tx, {0: (wallet.addr_to_script(addr), 10**8)})
success = jm_single().bc_interface.pushtx(tx.serialize())
def test_create_bumped_tx_dust_change(setup_wallet):
# tests that the change output gets dropped when it's at or below dust assert compute_bump_fee(tx, 2000) in (142, 144) # will vary depending on signature size
wallet = setup_wallet[0]
wallet_service = setup_wallet[1] async def test_create_bumped_tx(self):
wallet_service.resync_wallet() # tests that the bumped transaction has a change output with amount
addr = wallet.get_external_addr(0) # less the bump fee
utxo = fund_wallet_addr(wallet, addr) wallet = self.wallet
amount_sats = 10**8 - jm_single().BITCOIN_DUST_THRESHOLD - 142 wallet_service = self.wallet_service
change_sats = 10**8 - amount_sats - 142 await wallet_service.resync_wallet()
tx = btc.mktx([utxo], addr = await wallet.get_external_addr(0)
[{"address": str(btc.CCoinAddress.from_scriptPubKey( utxo = fund_wallet_addr(wallet, addr)
btc.CScript(b"\x00").to_p2sh_scriptPubKey())), amount_sats = 10**7
"value": amount_sats}, tx = btc.mktx([utxo],
{"address": wallet.get_internal_addr(0), [{"address": str(btc.CCoinAddress.from_scriptPubKey(
"value": change_sats}]) btc.CScript(b"\x00").to_p2sh_scriptPubKey())),
tx.vin[0].nSequence = 0xffffffff - 2 # mark as replaceable "value": amount_sats},
success, msg = wallet.sign_tx(tx, {0: (wallet.addr_to_script(addr), 10**8)}) {"address": await wallet.get_internal_addr(0),
success = jm_single().bc_interface.pushtx(tx.serialize()) "value": 10**8 - amount_sats - 142}])
orig_tx = tx.clone() tx.vin[0].nSequence = 0xffffffff - 2 # mark as replaceable
success, msg = await wallet.sign_tx(
bumped_tx = create_bumped_tx(tx, 2000, wallet) tx, {0: (wallet.addr_to_script(addr), 10**8)})
success = jm_single().bc_interface.pushtx(tx.serialize())
assert orig_tx.vin[0] == bumped_tx.vin[0] orig_tx = tx.clone()
assert orig_tx.vout[0] == bumped_tx.vout[0]
assert len(bumped_tx.vout) == 1 bumped_tx = create_bumped_tx(tx, 2000, wallet)
def test_create_bumped_tx_multi_dust_change(setup_wallet): assert orig_tx.vin[0] == bumped_tx.vin[0]
# tests that several change outputs get dropped when they are at or below dust assert orig_tx.vout[0] == bumped_tx.vout[0]
# to fulfill fee requirements assert (orig_tx.vout[1].nValue - bumped_tx.vout[1].nValue) in (142, 144)
wallet = setup_wallet[0]
wallet_service = setup_wallet[1] async def test_create_bumped_tx_dust_change(self):
wallet_service.resync_wallet() # tests that the change output gets dropped when it's at or below dust
addr = wallet.get_external_addr(0) wallet = self.wallet
utxo = fund_wallet_addr(wallet, addr) wallet_service = self.wallet_service
amount_sats = 10**8 - (546*18) - 669 await wallet_service.resync_wallet()
change_sats = 546 addr = await wallet.get_external_addr(0)
tx = btc.mktx([utxo], utxo = fund_wallet_addr(wallet, addr)
[{"address": str(btc.CCoinAddress.from_scriptPubKey( amount_sats = 10**8 - jm_single().BITCOIN_DUST_THRESHOLD - 142
btc.CScript(b"\x00").to_p2sh_scriptPubKey())), change_sats = 10**8 - amount_sats - 142
"value": amount_sats}] + tx = btc.mktx([utxo],
[{"address": wallet.get_internal_addr(0), [{"address": str(btc.CCoinAddress.from_scriptPubKey(
"value": change_sats} for ix in range(18)]) btc.CScript(b"\x00").to_p2sh_scriptPubKey())),
tx.vin[0].nSequence = 0xffffffff - 2 # mark as replaceable "value": amount_sats},
success, msg = wallet.sign_tx(tx, {0: (wallet.addr_to_script(addr), 10**8)}) {"address": await wallet.get_internal_addr(0),
success = jm_single().bc_interface.pushtx(tx.serialize()) "value": change_sats}])
orig_tx = tx.clone() tx.vin[0].nSequence = 0xffffffff - 2 # mark as replaceable
success, msg = await wallet.sign_tx(
bumped_tx = create_bumped_tx(tx, 3000, wallet) tx, {0: (wallet.addr_to_script(addr), 10**8)})
success = jm_single().bc_interface.pushtx(tx.serialize())
assert orig_tx.vin[0] == bumped_tx.vin[0] orig_tx = tx.clone()
assert orig_tx.vout[0] == bumped_tx.vout[0]
assert len(bumped_tx.vout) == 16 bumped_tx = create_bumped_tx(tx, 2000, wallet)
def test_create_bumped_tx_single_output(setup_wallet): assert orig_tx.vin[0] == bumped_tx.vin[0]
# tests that fees are deducted from the only output available assert orig_tx.vout[0] == bumped_tx.vout[0]
# in the transaction assert len(bumped_tx.vout) == 1
wallet = setup_wallet[0]
wallet_service = setup_wallet[1] async def test_create_bumped_tx_multi_dust_change(self):
wallet_service.resync_wallet() # tests that several change outputs get dropped when they are at or below dust
addr = wallet.get_external_addr(0) # to fulfill fee requirements
utxo = fund_wallet_addr(wallet, addr) wallet = self.wallet
amount_sats = 10**8 - 111 wallet_service = self.wallet_service
tx = btc.mktx([utxo], await wallet_service.resync_wallet()
[{"address": str(btc.CCoinAddress.from_scriptPubKey( addr = await wallet.get_external_addr(0)
btc.CScript(b"\x00").to_p2sh_scriptPubKey())), utxo = fund_wallet_addr(wallet, addr)
"value": amount_sats}]) amount_sats = 10**8 - (546*18) - 669
tx.vin[0].nSequence = 0xffffffff - 2 # mark as replaceable change_sats = 546
success, msg = wallet.sign_tx(tx, {0: (wallet.addr_to_script(addr), 10**8)}) tx = btc.mktx([utxo],
success = jm_single().bc_interface.pushtx(tx.serialize()) [{"address": str(btc.CCoinAddress.from_scriptPubKey(
orig_tx = tx.clone() btc.CScript(b"\x00").to_p2sh_scriptPubKey())),
"value": amount_sats}] +
bumped_tx = create_bumped_tx(tx, 2000, wallet) [{"address": await wallet.get_internal_addr(0),
"value": change_sats} for ix in range(18)])
assert orig_tx.vin[0] == bumped_tx.vin[0] tx.vin[0].nSequence = 0xffffffff - 2 # mark as replaceable
assert (orig_tx.vout[0].nValue - bumped_tx.vout[0].nValue) in (111, 113) success, msg = await wallet.sign_tx(
tx, {0: (wallet.addr_to_script(addr), 10**8)})
def test_create_bumped_tx_output_index(setup_wallet): success = jm_single().bc_interface.pushtx(tx.serialize())
# tests that the bumped transaction deducts its fee from the specified orig_tx = tx.clone()
# output even if it is an external wallet address
wallet = setup_wallet[0] bumped_tx = create_bumped_tx(tx, 3000, wallet)
wallet_service = setup_wallet[1]
wallet_service.resync_wallet() assert orig_tx.vin[0] == bumped_tx.vin[0]
addr = wallet.get_external_addr(0) assert orig_tx.vout[0] == bumped_tx.vout[0]
utxo = fund_wallet_addr(wallet, addr) assert len(bumped_tx.vout) == 16
amount_sats = 10**7
tx = btc.mktx([utxo], async def test_create_bumped_tx_single_output(self):
[{"address": str(btc.CCoinAddress.from_scriptPubKey( # tests that fees are deducted from the only output available
btc.CScript(b"\x00").to_p2sh_scriptPubKey())), # in the transaction
"value": amount_sats}, wallet = self.wallet
{"address": wallet.get_internal_addr(0), wallet_service = self.wallet_service
"value": 10**8 - amount_sats - 142}]) await wallet_service.resync_wallet()
tx.vin[0].nSequence = 0xffffffff - 2 # mark as replaceable addr = await wallet.get_external_addr(0)
success, msg = wallet.sign_tx(tx, {0: (wallet.addr_to_script(addr), 10**8)}) utxo = fund_wallet_addr(wallet, addr)
success = jm_single().bc_interface.pushtx(tx.serialize()) amount_sats = 10**8 - 111
orig_tx = tx.clone() tx = btc.mktx([utxo],
[{"address": str(btc.CCoinAddress.from_scriptPubKey(
bumped_tx = create_bumped_tx(tx, 2000, wallet, 0) btc.CScript(b"\x00").to_p2sh_scriptPubKey())),
"value": amount_sats}])
assert orig_tx.vin[0] == bumped_tx.vin[0] tx.vin[0].nSequence = 0xffffffff - 2 # mark as replaceable
assert orig_tx.vout[1] == bumped_tx.vout[1] success, msg = await wallet.sign_tx(
assert (orig_tx.vout[0].nValue - bumped_tx.vout[0].nValue) in (142, 144) tx, {0: (wallet.addr_to_script(addr), 10**8)})
success = jm_single().bc_interface.pushtx(tx.serialize())
def test_create_bumped_tx_no_change(setup_wallet): orig_tx = tx.clone()
# tests that the bumped transaction is the same as the original if fees
# cannot be deducted bumped_tx = create_bumped_tx(tx, 2000, wallet)
wallet = setup_wallet[0]
wallet_service = setup_wallet[1] assert orig_tx.vin[0] == bumped_tx.vin[0]
wallet_service.resync_wallet() assert (orig_tx.vout[0].nValue - bumped_tx.vout[0].nValue) in (111, 113)
addr = wallet.get_external_addr(0)
utxo = fund_wallet_addr(wallet, addr, 0.00002843) async def test_create_bumped_tx_output_index(self):
amount_sats = 2730 # tests that the bumped transaction deducts its fee from the specified
tx = btc.mktx([utxo], # output even if it is an external wallet address
[{"address": str(btc.CCoinAddress.from_scriptPubKey( wallet = self.wallet
btc.CScript(b"\x00").to_p2sh_scriptPubKey())), wallet_service = self.wallet_service
"value": amount_sats}]) await wallet_service.resync_wallet()
tx.vin[0].nSequence = 0xffffffff - 2 # mark as replaceable addr = await wallet.get_external_addr(0)
success, msg = wallet.sign_tx(tx, {0: (wallet.addr_to_script(addr), 2843)}) utxo = fund_wallet_addr(wallet, addr)
success = jm_single().bc_interface.pushtx(tx.serialize()) amount_sats = 10**7
orig_tx = tx.clone() tx = btc.mktx([utxo],
[{"address": str(btc.CCoinAddress.from_scriptPubKey(
bumped_tx = create_bumped_tx(tx, 3000, wallet) btc.CScript(b"\x00").to_p2sh_scriptPubKey())),
"value": amount_sats},
assert orig_tx.vin[0] == bumped_tx.vin[0] {"address": await wallet.get_internal_addr(0),
assert orig_tx.vout[0] == bumped_tx.vout[0] "value": 10**8 - amount_sats - 142}])
tx.vin[0].nSequence = 0xffffffff - 2 # mark as replaceable
def test_sign_and_broadcast(setup_wallet): success, msg = await wallet.sign_tx(
# tests that we can correctly sign and broadcast a replaced transaction tx, {0: (wallet.addr_to_script(addr), 10**8)})
wallet = setup_wallet[0] success = jm_single().bc_interface.pushtx(tx.serialize())
wallet_service = setup_wallet[1] orig_tx = tx.clone()
wallet_service.resync_wallet()
addr = wallet.get_external_addr(0) bumped_tx = create_bumped_tx(tx, 2000, wallet, 0)
utxo = fund_wallet_addr(wallet, addr)
amount_sats = 10**8 - jm_single().BITCOIN_DUST_THRESHOLD - 142 assert orig_tx.vin[0] == bumped_tx.vin[0]
change_sats = 10**8 - amount_sats - 142 assert orig_tx.vout[1] == bumped_tx.vout[1]
tx = btc.mktx([utxo], assert (orig_tx.vout[0].nValue - bumped_tx.vout[0].nValue) in (142, 144)
[{"address": str(btc.CCoinAddress.from_scriptPubKey(
btc.CScript(b"\x00").to_p2sh_scriptPubKey())), async def test_create_bumped_tx_no_change(self):
"value": amount_sats}, # tests that the bumped transaction is the same as the original if fees
{"address": wallet.get_internal_addr(0), # cannot be deducted
"value": change_sats}]) wallet = self.wallet
tx.vin[0].nSequence = 0xffffffff - 2 # mark as replaceable wallet_service = self.wallet_service
success, msg = wallet.sign_tx(tx, {0: (wallet.addr_to_script(addr), 10**8)}) await wallet_service.resync_wallet()
success = jm_single().bc_interface.pushtx(tx.serialize()) addr = await wallet.get_external_addr(0)
orig_tx = tx.clone() utxo = fund_wallet_addr(wallet, addr, 0.00002843)
amount_sats = 2730
bumped_tx = create_bumped_tx(tx, 2000, wallet) tx = btc.mktx([utxo],
sign_transaction(bumped_tx, orig_tx, wallet_service) [{"address": str(btc.CCoinAddress.from_scriptPubKey(
btc.CScript(b"\x00").to_p2sh_scriptPubKey())),
assert jm_single().bc_interface.pushtx(bumped_tx.serialize()) == True "value": amount_sats}])
tx.vin[0].nSequence = 0xffffffff - 2 # mark as replaceable
def test_sign_psbt_broadcast(setup_wallet): success, msg = await wallet.sign_tx(
# tests that we can correctly sign and broadcast a replaced psbt transaction tx, {0: (wallet.addr_to_script(addr), 2843)})
wallet = setup_wallet[0] success = jm_single().bc_interface.pushtx(tx.serialize())
wallet_service = setup_wallet[1] orig_tx = tx.clone()
wallet_service.resync_wallet()
addr = wallet.get_external_addr(0) bumped_tx = create_bumped_tx(tx, 3000, wallet)
utxo = fund_wallet_addr(wallet, addr)
amount_sats = 10**8 - jm_single().BITCOIN_DUST_THRESHOLD - 142 assert orig_tx.vin[0] == bumped_tx.vin[0]
change_sats = 10**8 - amount_sats - 142 assert orig_tx.vout[0] == bumped_tx.vout[0]
tx = btc.mktx([utxo],
[{"address": str(btc.CCoinAddress.from_scriptPubKey( async def test_sign_and_broadcast(self):
btc.CScript(b"\x00").to_p2sh_scriptPubKey())), # tests that we can correctly sign and broadcast a replaced transaction
"value": amount_sats}, wallet = self.wallet
{"address": wallet.get_internal_addr(0), wallet_service = self.wallet_service
"value": change_sats}]) await wallet_service.resync_wallet()
tx.vin[0].nSequence = 0xffffffff - 2 # mark as replaceable addr = await wallet.get_external_addr(0)
success, msg = wallet.sign_tx(tx, {0: (wallet.addr_to_script(addr), 10**8)}) utxo = fund_wallet_addr(wallet, addr)
success = jm_single().bc_interface.pushtx(tx.serialize()) amount_sats = 10**8 - jm_single().BITCOIN_DUST_THRESHOLD - 142
orig_tx = tx.clone() change_sats = 10**8 - amount_sats - 142
tx = btc.mktx([utxo],
bumped_tx = create_bumped_tx(tx, 2000, wallet) [{"address": str(btc.CCoinAddress.from_scriptPubKey(
psbt = sign_psbt(bumped_tx, orig_tx, wallet_service) btc.CScript(b"\x00").to_p2sh_scriptPubKey())),
"value": amount_sats},
assert jm_single().bc_interface.pushtx(psbt.extract_transaction().serialize()) == True {"address": await wallet.get_internal_addr(0),
"value": change_sats}])
tx.vin[0].nSequence = 0xffffffff - 2 # mark as replaceable
@pytest.fixture(scope='module') success, msg = await wallet.sign_tx(
def setup_wallet(request): tx, {0: (wallet.addr_to_script(addr), 10**8)})
load_test_config() success = jm_single().bc_interface.pushtx(tx.serialize())
btc.select_chain_params("bitcoin/regtest") orig_tx = tx.clone()
#see note in cryptoengine.py:
cryptoengine.BTC_P2WPKH.VBYTE = 100 bumped_tx = create_bumped_tx(tx, 2000, wallet)
jm_single().bc_interface.tick_forward_chain_interval = 2 await sign_transaction(bumped_tx, orig_tx, wallet_service)
jm_single().config.set('BLOCKCHAIN', 'network', 'testnet')
mnemonic = 'zoo zoo zoo zoo zoo zoo zoo zoo zoo zoo zoo abstract' assert jm_single().bc_interface.pushtx(bumped_tx.serialize()) == True
entropy = SegwitWallet.entropy_from_mnemonic(mnemonic)
storage = VolatileStorage() async def test_sign_psbt_broadcast(self):
SegwitWallet.initialize( # tests that we can correctly sign and broadcast a replaced psbt transaction
storage, get_network(), entropy=entropy, max_mixdepth=1) wallet = self.wallet
wallet = SegwitWallet(storage) wallet_service = self.wallet_service
wallet_service = WalletService(wallet) await wallet_service.resync_wallet()
return [wallet, wallet_service] addr = await wallet.get_external_addr(0)
utxo = fund_wallet_addr(wallet, addr)
amount_sats = 10**8 - jm_single().BITCOIN_DUST_THRESHOLD - 142
change_sats = 10**8 - amount_sats - 142
tx = btc.mktx([utxo],
[{"address": str(btc.CCoinAddress.from_scriptPubKey(
btc.CScript(b"\x00").to_p2sh_scriptPubKey())),
"value": amount_sats},
{"address": await wallet.get_internal_addr(0),
"value": change_sats}])
tx.vin[0].nSequence = 0xffffffff - 2 # mark as replaceable
success, msg = await wallet.sign_tx(
tx, {0: (wallet.addr_to_script(addr), 10**8)})
success = jm_single().bc_interface.pushtx(tx.serialize())
orig_tx = tx.clone()
bumped_tx = create_bumped_tx(tx, 2000, wallet)
psbt = await sign_psbt(bumped_tx, orig_tx, wallet_service)
assert jm_single().bc_interface.pushtx(psbt.extract_transaction().serialize()) == True

Loading…
Cancel
Save