Browse Source
refactoring to improve readability and testing added support for taking the fee from multiple owned outputs as needed added unit tests removed same mixdepth restriction for inputs added option for specifying which output to deduct fees from added additional test cases refactoring: moved tx_vsize to jmbitcoin package added a sanity check to ensure that the transaction id supplied belongs to our wallet fixed a linting errormaster
3 changed files with 723 additions and 0 deletions
@ -0,0 +1,283 @@
|
||||
#!/usr/bin/env python3 |
||||
|
||||
from decimal import Decimal |
||||
from jmbase import get_log, hextobin, bintohex |
||||
from jmbase.support import EXIT_SUCCESS, EXIT_FAILURE, EXIT_ARGERROR, jmprint |
||||
from jmclient import jm_single, load_program_config, open_test_wallet_maybe, get_wallet_path, WalletService |
||||
from jmclient.cli_options import OptionParser, add_base_options |
||||
import jmbitcoin as btc |
||||
import sys |
||||
|
||||
jlog = get_log() |
||||
parser = OptionParser( |
||||
usage='usage: %prog [options] [wallet file] txid', |
||||
description= |
||||
'Bumps the fee on a wallet transaction') |
||||
parser.add_option('--psbt', |
||||
action='store_true', |
||||
dest='with_psbt', |
||||
default=False, |
||||
help='output as psbt instead of ' |
||||
'broadcasting the transaction.') |
||||
parser.add_option('-o', |
||||
'--output', |
||||
action='store', |
||||
type='int', |
||||
dest='output', |
||||
default=-1, |
||||
help='optionally specify which output to deduct the fee from. Outputs ' |
||||
'are 0-indexed meaning the first output has an index value of 0 and the ' |
||||
'second, an index value of 1 and so on.') |
||||
parser.add_option('-f', |
||||
'--txfee', |
||||
action='store', |
||||
type='int', |
||||
dest='txfee', |
||||
default=-1, |
||||
help='Bitcoin miner tx_fee to use for transaction(s). A number higher ' |
||||
'than 1000 is used as "satoshi per KB" tx fee. A number lower than that ' |
||||
'uses the dynamic fee estimation of your blockchain provider as ' |
||||
'confirmation target. This temporarily overrides the "tx_fees" setting ' |
||||
'in your joinmarket.cfg. Works the same way as described in it. Check ' |
||||
'it for examples.') |
||||
parser.add_option('-a', |
||||
'--amtmixdepths', |
||||
action='store', |
||||
type='int', |
||||
dest='amtmixdepths', |
||||
help='number of mixdepths in wallet, default 5', |
||||
default=5) |
||||
parser.add_option('-g', |
||||
'--gap-limit', |
||||
type="int", |
||||
action='store', |
||||
dest='gaplimit', |
||||
help='gap limit for wallet, default=6', |
||||
default=6) |
||||
parser.add_option('--yes', |
||||
action='store_true', |
||||
dest='answeryes', |
||||
default=False, |
||||
help='answer yes to everything') |
||||
add_base_options(parser) |
||||
|
||||
def check_valid_candidate(orig_tx, wallet, output_index=-1): |
||||
orig_tx_info = jm_single().bc_interface.get_transaction(orig_tx.GetTxid()[::-1]) |
||||
# check that the transaction is still unconfirmed |
||||
if orig_tx_info['confirmations'] > 0: |
||||
raise RuntimeWarning('Transaction already confirmed. Nothing to do.') |
||||
|
||||
# all transaction inputs must belong to the wallet |
||||
own_inputs_n = len(wallet.inputs_consumed_by_tx(orig_tx)) |
||||
tx_inputs_n = len(orig_tx.vin) |
||||
|
||||
if own_inputs_n != tx_inputs_n: |
||||
raise ValueError('Transaction inputs should belong to the wallet.') |
||||
|
||||
# at least one input should signal opt-in rbf |
||||
if not any([vin.nSequence <= 0xffffffff - 2 for vin in orig_tx.vin]): |
||||
raise ValueError('Transaction not replaceable.') |
||||
|
||||
# 1. If output_index is specified, check that the output exist |
||||
# 2. If not, check that we have only one output |
||||
# 3. If not, wallet should own at least one output that we can deduct |
||||
# fees from. |
||||
if output_index >= 0 and len(orig_tx.vout) > output_index: |
||||
return None |
||||
elif len(orig_tx.vout) == 1: |
||||
return None |
||||
elif not any( |
||||
[wallet.is_known_script(vout.scriptPubKey) for vout in orig_tx.vout] |
||||
): |
||||
raise ValueError('Transaction has no obvious output we can deduct fees ' |
||||
'from. Specify the output to pay from using the -o ' |
||||
'option.') |
||||
|
||||
def compute_bump_fee(tx, fee_per_kb): |
||||
tx_info = jm_single().bc_interface.get_transaction(tx.GetTxid()[::-1]) |
||||
tx_size_n = btc.tx_vsize(tx) |
||||
tx_fee = btc.amount_to_sat(abs(tx_info['fee'])) |
||||
proposed_fee_rate = fee_per_kb / Decimal(1000.0) |
||||
proposed_fee = int(tx_size_n * proposed_fee_rate) |
||||
min_proposed_fee = tx_fee + tx_size_n |
||||
min_proposed_fee_rate = min_proposed_fee / Decimal(tx_size_n) |
||||
|
||||
if proposed_fee < (tx_fee + tx_size_n): |
||||
raise ValueError('Proposed fee for transaction replacement: ' |
||||
'%d sats (%.1f sat/vB) is below minimum required ' |
||||
'for relay: %d sats (%.1f sat/vB). ' |
||||
'Try using a higher fee setting.' |
||||
% (proposed_fee, proposed_fee_rate, |
||||
min_proposed_fee, min_proposed_fee_rate)) |
||||
|
||||
return (proposed_fee - tx_fee) |
||||
|
||||
def prepare_transaction(new_tx, old_tx, wallet): |
||||
input_scripts = {} |
||||
spent_outs = [] |
||||
for ix, vin in enumerate(new_tx.vin): |
||||
script = wallet.pubkey_to_script( |
||||
btc.extract_pubkey_from_witness(old_tx, ix)[0]) |
||||
tx_info = jm_single().bc_interface.get_transaction( |
||||
hextobin(btc.b2lx(new_tx.vin[ix].prevout.hash))) |
||||
prev_tx = btc.CTransaction.deserialize(hextobin(tx_info['hex'])) |
||||
amount = prev_tx.vout[new_tx.vin[ix].prevout.n].nValue |
||||
|
||||
input_scripts[ix] = (script, amount) |
||||
spent_outs.append(btc.CMutableTxOut(amount, script)) |
||||
|
||||
return (input_scripts, spent_outs) |
||||
|
||||
def sign_transaction(new_tx, old_tx, wallet_service): |
||||
input_scripts, _ = prepare_transaction(new_tx, old_tx, wallet_service.wallet) |
||||
success, msg = wallet_service.sign_tx(new_tx, input_scripts) |
||||
if not success: |
||||
raise RuntimeError("Failed to sign transaction, quitting. Error msg: " + msg) |
||||
|
||||
def sign_psbt(new_tx, old_tx, wallet_service): |
||||
_, spent_outs = prepare_transaction(new_tx, old_tx, wallet_service.wallet) |
||||
unsigned_psbt = wallet_service.create_psbt_from_tx( |
||||
new_tx, spent_outs=spent_outs) |
||||
signed_psbt, err = wallet_service.sign_psbt(unsigned_psbt.serialize()) |
||||
|
||||
if err: |
||||
raise RuntimeError("Failed to sign PSBT, quitting. Error message: " + err) |
||||
|
||||
return btc.PartiallySignedTransaction.deserialize(signed_psbt) |
||||
|
||||
def create_bumped_tx(tx, fee_per_kb, wallet, output_index=-1): |
||||
check_valid_candidate(tx, wallet, output_index) |
||||
fee = compute_bump_fee(tx, fee_per_kb) |
||||
|
||||
if ( |
||||
len(tx.vout) == 1 |
||||
and tx.vout[0].nValue >= (fee + jm_single().BITCOIN_DUST_THRESHOLD) |
||||
): |
||||
tx.vout[0].nValue -= fee |
||||
fee = 0 |
||||
elif ( |
||||
output_index >= 0 |
||||
and len(tx.vout) > output_index |
||||
and tx.vout[output_index].nValue >= (fee + jm_single().BITCOIN_DUST_THRESHOLD) |
||||
): |
||||
tx.vout[output_index].nValue -= fee |
||||
fee = 0 |
||||
else: |
||||
for ix, vout in enumerate(tx.vout): |
||||
if wallet.is_known_script(vout.scriptPubKey) and fee > 0: |
||||
# check if the output is a change address |
||||
if wallet.script_to_path(vout.scriptPubKey)[-2] != 1: |
||||
continue |
||||
|
||||
# deduct fee from the change |
||||
tx.vout[ix].nValue -= fee |
||||
fee = 0 |
||||
|
||||
# if the output value is less than zero, remove it |
||||
if tx.vout[ix].nValue < 0: |
||||
jlog.info("Dynamically calculated change lower than zero; dropping.") |
||||
|
||||
# update fee to the additional amount needed to pay for the tx |
||||
# accounting for the removal of the output |
||||
fee = abs(tx.vout[ix].nValue) - len(tx.vout[ix].serialize()) |
||||
tx.vout.remove(vout) |
||||
continue |
||||
|
||||
# if the output value is below the dust threshold, remove it |
||||
if tx.vout[ix].nValue <= jm_single().BITCOIN_DUST_THRESHOLD: |
||||
jlog.info("Dynamically calculated change lower than dust: " + |
||||
btc.amount_to_str(tx.vout[ix].nValue) + "; dropping.") |
||||
tx.vout.remove(vout) |
||||
break |
||||
|
||||
# create new transaction from the old |
||||
# there's the possibility that it returns the same transaction as the old |
||||
# if no outputs were available to deduct a fee from |
||||
return btc.CMutableTransaction( |
||||
tx.vin, tx.vout, nLockTime=tx.nLockTime, |
||||
nVersion=tx.nVersion) |
||||
|
||||
if __name__ == '__main__': |
||||
(options, args) = parser.parse_args() |
||||
load_program_config(config_path=options.datadir) |
||||
if len(args) < 2: |
||||
parser.error("JoinMarket bumpfee needs arguments:" |
||||
" wallet file and txid.") |
||||
sys.exit(EXIT_ARGERROR) |
||||
|
||||
wallet_name = args[0] |
||||
txid = args[1] |
||||
|
||||
# If tx_fees are set manually by CLI argument, override joinmarket.cfg: |
||||
if int(options.txfee) > 0: |
||||
jm_single().config.set("POLICY", "tx_fees", str(options.txfee)) |
||||
fee_per_kb = jm_single().bc_interface.estimate_fee_per_kb( |
||||
jm_single().config.getint("POLICY", "tx_fees")) |
||||
if fee_per_kb is None: |
||||
raise RuntimeError("Cannot estimate fee per kB, possibly" + |
||||
" a failure of connection to the blockchain.") |
||||
|
||||
# open the wallet and synchronize it |
||||
wallet_path = get_wallet_path(wallet_name, None) |
||||
wallet = open_test_wallet_maybe( |
||||
wallet_path, wallet_name, options.amtmixdepths - 1, |
||||
wallet_password_stdin=options.wallet_password_stdin, |
||||
gap_limit=options.gaplimit) |
||||
wallet_service = WalletService(wallet) |
||||
if wallet_service.rpc_error: |
||||
sys.exit(EXIT_FAILURE) |
||||
while not wallet_service.synced: |
||||
wallet_service.sync_wallet(fast=not options.recoversync) |
||||
wallet_service.startService() |
||||
|
||||
orig_tx = wallet_service.get_transaction(hextobin(txid)) |
||||
|
||||
if not orig_tx: |
||||
jlog.error("Could not retrieve the transaction! Maybe it doesn't belong to this wallet?") |
||||
sys.exit(EXIT_FAILURE) |
||||
|
||||
try: |
||||
bumped_tx = create_bumped_tx(orig_tx, fee_per_kb, wallet, options.output) |
||||
except ValueError as e: |
||||
jmprint(str(e), 'error') |
||||
sys.exit(EXIT_FAILURE) |
||||
except RuntimeWarning as w: |
||||
jmprint(str(w)) |
||||
sys.exit(EXIT_SUCCESS) |
||||
|
||||
# sign the transaction |
||||
if options.with_psbt: |
||||
try: |
||||
psbt = sign_psbt(bumped_tx, orig_tx, wallet_service) |
||||
|
||||
print("Completed PSBT created: ") |
||||
print(wallet_service.human_readable_psbt(psbt)) |
||||
jlog.info("This PSBT is fully signed and can be sent externally for " |
||||
"broadcasting:") |
||||
jlog.info(psbt.to_base64()) |
||||
sys.exit(EXIT_SUCCESS) |
||||
except RuntimeError as e: |
||||
jlog.error(str(e)) |
||||
sys.exit(EXIT_FAILURE) |
||||
else: |
||||
try: |
||||
sign_transaction(bumped_tx, orig_tx, wallet_service) |
||||
except RuntimeError as e: |
||||
jlog.error(str(e)) |
||||
sys.exit(EXIT_FAILURE) |
||||
|
||||
jlog.info("Got signed transaction:") |
||||
jlog.info(btc.human_readable_transaction(bumped_tx)) |
||||
|
||||
if not options.answeryes: |
||||
if input('Would you like to push to the network? (y/n):')[0] != 'y': |
||||
jlog.info("You chose not to broadcast the transaction, quitting.") |
||||
sys.exit(EXIT_SUCCESS) |
||||
|
||||
if jm_single().bc_interface.pushtx(bumped_tx.serialize()): |
||||
txid = bintohex(bumped_tx.GetTxid()[::-1]) |
||||
jlog.info("Transaction sent: " + txid) |
||||
else: |
||||
jlog.error("Transaction broadcast failed!") |
||||
sys.exit(EXIT_FAILURE) |
||||
|
||||
@ -0,0 +1,431 @@
|
||||
import pytest |
||||
from jmbase import hextobin |
||||
import jmbitcoin as btc |
||||
from jmclient import load_test_config, cryptoengine, jm_single, SegwitWallet, \ |
||||
VolatileStorage, get_network, WalletService |
||||
from scripts.bumpfee import ( |
||||
check_valid_candidate, compute_bump_fee, |
||||
create_bumped_tx, sign_transaction, sign_psbt) |
||||
|
||||
def fund_wallet_addr(wallet, addr, value_btc=1): |
||||
# special case, grab_coins returns hex from rpc: |
||||
txin_id = hextobin(jm_single().bc_interface.grab_coins(addr, value_btc)) |
||||
txinfo = jm_single().bc_interface.get_transaction(txin_id) |
||||
txin = btc.CMutableTransaction.deserialize(btc.x(txinfo["hex"])) |
||||
utxo_in = wallet.add_new_utxos(txin, 1) |
||||
assert len(utxo_in) == 1 |
||||
return list(utxo_in.keys())[0] |
||||
|
||||
def test_tx_vsize(setup_wallet): |
||||
# tests that we correctly compute the transaction size |
||||
wallet = setup_wallet[0] |
||||
wallet_service = setup_wallet[1] |
||||
wallet_service.resync_wallet() |
||||
addr = wallet.get_external_addr(0) |
||||
utxo = fund_wallet_addr(wallet, addr) |
||||
amount_sats = 10**7 |
||||
tx = btc.mktx([utxo], |
||||
[{"address": str(btc.CCoinAddress.from_scriptPubKey( |
||||
btc.CScript(b"\x00").to_p2sh_scriptPubKey())), |
||||
"value": amount_sats}, |
||||
{"address": wallet.get_internal_addr(0), |
||||
"value": 10**8 - amount_sats - 142}]) |
||||
tx.vin[0].nSequence = 0xffffffff - 2 # mark as replaceable |
||||
success, msg = wallet.sign_tx(tx, {0: (wallet.addr_to_script(addr), 10**8)}) |
||||
|
||||
assert btc.tx_vsize(tx) in (142, 143) # transaction size may vary due to signature |
||||
|
||||
def test_check_valid_candidate_confirmed_tx(setup_wallet): |
||||
# test that the replaceable transaction is unconfirmed |
||||
wallet = setup_wallet[0] |
||||
wallet_service = setup_wallet[1] |
||||
wallet_service.resync_wallet() |
||||
addr = wallet.get_external_addr(0) |
||||
utxo = fund_wallet_addr(wallet, addr) |
||||
amount_sats = 10**7 |
||||
tx = btc.mktx([utxo], |
||||
[{"address": str(btc.CCoinAddress.from_scriptPubKey( |
||||
btc.CScript(b"\x00").to_p2sh_scriptPubKey())), |
||||
"value": amount_sats}, |
||||
{"address": wallet.get_internal_addr(0), |
||||
"value": 10**8 - amount_sats - 142}]) |
||||
success, msg = wallet.sign_tx(tx, {0: (wallet.addr_to_script(addr), 10**8)}) |
||||
success = jm_single().bc_interface.pushtx(tx.serialize()) |
||||
jm_single().bc_interface.tick_forward_chain(1) |
||||
|
||||
with pytest.raises(RuntimeWarning, match="Transaction already confirmed. Nothing to do."): |
||||
check_valid_candidate(tx, wallet) |
||||
|
||||
def test_check_valid_candidate_unowned_input(setup_wallet): |
||||
# tests that all inputs in the replaceable transaction belong to the wallet |
||||
wallet = setup_wallet[0] |
||||
wallet_service = setup_wallet[1] |
||||
wallet_service.resync_wallet() |
||||
addr = wallet.get_external_addr(0) |
||||
utxo = fund_wallet_addr(wallet, addr) |
||||
amount_sats = 10**7 |
||||
|
||||
mnemonic = 'abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about' |
||||
entropy = SegwitWallet.entropy_from_mnemonic(mnemonic) |
||||
storage = VolatileStorage() |
||||
SegwitWallet.initialize( |
||||
storage, get_network(), entropy=entropy, max_mixdepth=0) |
||||
wallet_ext = SegwitWallet(storage) |
||||
addr_ext = wallet_ext.get_external_addr(0) |
||||
utxo_ext = fund_wallet_addr(wallet_ext, addr_ext) |
||||
|
||||
tx = btc.mktx([utxo, utxo_ext], |
||||
[{"address": str(btc.CCoinAddress.from_scriptPubKey( |
||||
btc.CScript(b"\x00").to_p2sh_scriptPubKey())), |
||||
"value": amount_sats}, |
||||
{"address": wallet.get_internal_addr(0), |
||||
"value": (2 * 10**8) - amount_sats - 210}]) |
||||
tx.vin[0].nSequence = 0xffffffff - 2 # mark as replaceable |
||||
success, msg = wallet.sign_tx(tx, {0: (wallet.addr_to_script(addr), 10**8)}) |
||||
success, msg = wallet_ext.sign_tx(tx, {1: (wallet_ext.addr_to_script(addr_ext), 10**8)}) |
||||
success = jm_single().bc_interface.pushtx(tx.serialize()) |
||||
|
||||
with pytest.raises(ValueError, match="Transaction inputs should belong to the wallet."): |
||||
check_valid_candidate(tx, wallet) |
||||
|
||||
def test_check_valid_candidate_not_replaceable(setup_wallet): |
||||
# tests that the transaction is replaceable |
||||
wallet = setup_wallet[0] |
||||
wallet_service = setup_wallet[1] |
||||
wallet_service.resync_wallet() |
||||
addr = wallet.get_external_addr(0) |
||||
utxo = fund_wallet_addr(wallet, addr) |
||||
amount_sats = 10**7 |
||||
tx = btc.mktx([utxo], |
||||
[{"address": str(btc.CCoinAddress.from_scriptPubKey( |
||||
btc.CScript(b"\x00").to_p2sh_scriptPubKey())), |
||||
"value": amount_sats}, |
||||
{"address": wallet.get_internal_addr(0), |
||||
"value": 10**8 - amount_sats - 142}]) |
||||
success, msg = wallet.sign_tx(tx, {0: (wallet.addr_to_script(addr), 10**8)}) |
||||
success = jm_single().bc_interface.pushtx(tx.serialize()) |
||||
|
||||
with pytest.raises(ValueError, match="Transaction not replaceable."): |
||||
check_valid_candidate(tx, wallet) |
||||
|
||||
def test_check_valid_candidate_explicit_output_index(setup_wallet): |
||||
# tests that there's at least one output that we own and can deduct fees |
||||
wallet = setup_wallet[0] |
||||
wallet_service = setup_wallet[1] |
||||
wallet_service.resync_wallet() |
||||
addr = wallet.get_external_addr(0) |
||||
utxo = fund_wallet_addr(wallet, addr) |
||||
amount_sats = 10**7 |
||||
tx = btc.mktx([utxo], |
||||
[{"address": str(btc.CCoinAddress.from_scriptPubKey( |
||||
btc.CScript(b"\x00").to_p2sh_scriptPubKey())), |
||||
"value": 10**8 - amount_sats - 143}, |
||||
{"address": str(btc.CCoinAddress.from_scriptPubKey( |
||||
btc.CScript(b"\x01").to_p2sh_scriptPubKey())), |
||||
"value": amount_sats}]) |
||||
tx.vin[0].nSequence = 0xffffffff - 2 # mark as replaceable |
||||
success, msg = wallet.sign_tx(tx, {0: (wallet.addr_to_script(addr), 10**8)}) |
||||
success = jm_single().bc_interface.pushtx(tx.serialize()) |
||||
|
||||
assert check_valid_candidate(tx, wallet, 0) == None |
||||
|
||||
def test_check_valid_candidate_one_output(setup_wallet): |
||||
# tests that there's at least one output that we own and can deduct fees |
||||
wallet = setup_wallet[0] |
||||
wallet_service = setup_wallet[1] |
||||
wallet_service.resync_wallet() |
||||
addr = wallet.get_external_addr(0) |
||||
utxo = fund_wallet_addr(wallet, addr) |
||||
amount_sats = 10**7 |
||||
tx = btc.mktx([utxo], |
||||
[{"address": str(btc.CCoinAddress.from_scriptPubKey( |
||||
btc.CScript(b"\x00").to_p2sh_scriptPubKey())), |
||||
"value": 10**8 - 111}]) |
||||
tx.vin[0].nSequence = 0xffffffff - 2 # mark as replaceable |
||||
success, msg = wallet.sign_tx(tx, {0: (wallet.addr_to_script(addr), 10**8)}) |
||||
success = jm_single().bc_interface.pushtx(tx.serialize()) |
||||
|
||||
assert check_valid_candidate(tx, wallet) == None |
||||
|
||||
def test_check_valid_candidate_no_owned_outputs(setup_wallet): |
||||
# tests that there's at least one output that we own and can deduct fees |
||||
wallet = setup_wallet[0] |
||||
wallet_service = setup_wallet[1] |
||||
wallet_service.resync_wallet() |
||||
addr = wallet.get_external_addr(0) |
||||
utxo = fund_wallet_addr(wallet, addr) |
||||
amount_sats = 10**7 |
||||
tx = btc.mktx([utxo], |
||||
[{"address": str(btc.CCoinAddress.from_scriptPubKey( |
||||
btc.CScript(b"\x00").to_p2sh_scriptPubKey())), |
||||
"value": 10**8 - amount_sats - 143}, |
||||
{"address": str(btc.CCoinAddress.from_scriptPubKey( |
||||
btc.CScript(b"\x01").to_p2sh_scriptPubKey())), |
||||
"value": amount_sats}]) |
||||
tx.vin[0].nSequence = 0xffffffff - 2 # mark as replaceable |
||||
success, msg = wallet.sign_tx(tx, {0: (wallet.addr_to_script(addr), 10**8)}) |
||||
success = jm_single().bc_interface.pushtx(tx.serialize()) |
||||
|
||||
with pytest.raises(ValueError, match="Transaction has no obvious output we can deduct fees from. " |
||||
"Specify the output to pay from using the -o option."): |
||||
check_valid_candidate(tx, wallet) |
||||
|
||||
def test_check_valid_candidate(setup_wallet): |
||||
# tests that all checks are passed for a valid replaceable transaction |
||||
wallet = setup_wallet[0] |
||||
wallet_service = setup_wallet[1] |
||||
wallet_service.resync_wallet() |
||||
addr = wallet.get_external_addr(0) |
||||
utxo = fund_wallet_addr(wallet, addr) |
||||
amount_sats = 10**7 |
||||
tx = btc.mktx([utxo], |
||||
[{"address": str(btc.CCoinAddress.from_scriptPubKey( |
||||
btc.CScript(b"\x00").to_p2sh_scriptPubKey())), |
||||
"value": amount_sats}, |
||||
{"address": wallet.get_internal_addr(0), |
||||
"value": 10**8 - amount_sats - 142}]) |
||||
tx.vin[0].nSequence = 0xffffffff - 2 # mark as replaceable |
||||
success, msg = wallet.sign_tx(tx, {0: (wallet.addr_to_script(addr), 10**8)}) |
||||
success = jm_single().bc_interface.pushtx(tx.serialize()) |
||||
|
||||
assert check_valid_candidate(tx, wallet) == None |
||||
|
||||
def test_compute_bump_fee(setup_wallet): |
||||
# tests that the compute_bump_fee method correctly calculates |
||||
# the fee by which to bump the transaction |
||||
wallet = setup_wallet[0] |
||||
wallet_service = setup_wallet[1] |
||||
wallet_service.resync_wallet() |
||||
addr = wallet.get_external_addr(0) |
||||
utxo = fund_wallet_addr(wallet, addr) |
||||
amount_sats = 10**7 |
||||
tx = btc.mktx([utxo], |
||||
[{"address": str(btc.CCoinAddress.from_scriptPubKey( |
||||
btc.CScript(b"\x00").to_p2sh_scriptPubKey())), |
||||
"value": amount_sats}, |
||||
{"address": wallet.get_internal_addr(0), |
||||
"value": 10**8 - amount_sats - 142}]) |
||||
tx.vin[0].nSequence = 0xffffffff - 2 # mark as replaceable |
||||
success, msg = wallet.sign_tx(tx, {0: (wallet.addr_to_script(addr), 10**8)}) |
||||
success = jm_single().bc_interface.pushtx(tx.serialize()) |
||||
|
||||
assert compute_bump_fee(tx, 2000) in (142, 144) # will vary depending on signature size |
||||
|
||||
def test_create_bumped_tx(setup_wallet): |
||||
# tests that the bumped transaction has a change output with amount |
||||
# less the bump fee |
||||
wallet = setup_wallet[0] |
||||
wallet_service = setup_wallet[1] |
||||
wallet_service.resync_wallet() |
||||
addr = wallet.get_external_addr(0) |
||||
utxo = fund_wallet_addr(wallet, addr) |
||||
amount_sats = 10**7 |
||||
tx = btc.mktx([utxo], |
||||
[{"address": str(btc.CCoinAddress.from_scriptPubKey( |
||||
btc.CScript(b"\x00").to_p2sh_scriptPubKey())), |
||||
"value": amount_sats}, |
||||
{"address": wallet.get_internal_addr(0), |
||||
"value": 10**8 - amount_sats - 142}]) |
||||
tx.vin[0].nSequence = 0xffffffff - 2 # mark as replaceable |
||||
success, msg = wallet.sign_tx(tx, {0: (wallet.addr_to_script(addr), 10**8)}) |
||||
success = jm_single().bc_interface.pushtx(tx.serialize()) |
||||
orig_tx = tx.clone() |
||||
|
||||
bumped_tx = create_bumped_tx(tx, 2000, wallet) |
||||
|
||||
assert orig_tx.vin[0] == bumped_tx.vin[0] |
||||
assert orig_tx.vout[0] == bumped_tx.vout[0] |
||||
assert (orig_tx.vout[1].nValue - bumped_tx.vout[1].nValue) in (142, 144) |
||||
|
||||
def test_create_bumped_tx_dust_change(setup_wallet): |
||||
# tests that the change output gets dropped when it's at or below dust |
||||
wallet = setup_wallet[0] |
||||
wallet_service = setup_wallet[1] |
||||
wallet_service.resync_wallet() |
||||
addr = wallet.get_external_addr(0) |
||||
utxo = fund_wallet_addr(wallet, addr) |
||||
amount_sats = 10**8 - jm_single().BITCOIN_DUST_THRESHOLD - 142 |
||||
change_sats = 10**8 - amount_sats - 142 |
||||
tx = btc.mktx([utxo], |
||||
[{"address": str(btc.CCoinAddress.from_scriptPubKey( |
||||
btc.CScript(b"\x00").to_p2sh_scriptPubKey())), |
||||
"value": amount_sats}, |
||||
{"address": wallet.get_internal_addr(0), |
||||
"value": change_sats}]) |
||||
tx.vin[0].nSequence = 0xffffffff - 2 # mark as replaceable |
||||
success, msg = wallet.sign_tx(tx, {0: (wallet.addr_to_script(addr), 10**8)}) |
||||
success = jm_single().bc_interface.pushtx(tx.serialize()) |
||||
orig_tx = tx.clone() |
||||
|
||||
bumped_tx = create_bumped_tx(tx, 2000, wallet) |
||||
|
||||
assert orig_tx.vin[0] == bumped_tx.vin[0] |
||||
assert orig_tx.vout[0] == bumped_tx.vout[0] |
||||
assert len(bumped_tx.vout) == 1 |
||||
|
||||
def test_create_bumped_tx_multi_dust_change(setup_wallet): |
||||
# tests that several change outputs get dropped when they are at or below dust |
||||
# to fulfill fee requirements |
||||
wallet = setup_wallet[0] |
||||
wallet_service = setup_wallet[1] |
||||
wallet_service.resync_wallet() |
||||
addr = wallet.get_external_addr(0) |
||||
utxo = fund_wallet_addr(wallet, addr) |
||||
amount_sats = 10**8 - (546*18) - 669 |
||||
change_sats = 546 |
||||
tx = btc.mktx([utxo], |
||||
[{"address": str(btc.CCoinAddress.from_scriptPubKey( |
||||
btc.CScript(b"\x00").to_p2sh_scriptPubKey())), |
||||
"value": amount_sats}] + |
||||
[{"address": wallet.get_internal_addr(0), |
||||
"value": change_sats} for ix in range(18)]) |
||||
tx.vin[0].nSequence = 0xffffffff - 2 # mark as replaceable |
||||
success, msg = wallet.sign_tx(tx, {0: (wallet.addr_to_script(addr), 10**8)}) |
||||
success = jm_single().bc_interface.pushtx(tx.serialize()) |
||||
orig_tx = tx.clone() |
||||
|
||||
bumped_tx = create_bumped_tx(tx, 3000, wallet) |
||||
|
||||
assert orig_tx.vin[0] == bumped_tx.vin[0] |
||||
assert orig_tx.vout[0] == bumped_tx.vout[0] |
||||
assert len(bumped_tx.vout) == 16 |
||||
|
||||
def test_create_bumped_tx_single_output(setup_wallet): |
||||
# tests that fees are deducted from the only output available |
||||
# in the transaction |
||||
wallet = setup_wallet[0] |
||||
wallet_service = setup_wallet[1] |
||||
wallet_service.resync_wallet() |
||||
addr = wallet.get_external_addr(0) |
||||
utxo = fund_wallet_addr(wallet, addr) |
||||
amount_sats = 10**8 - 111 |
||||
tx = btc.mktx([utxo], |
||||
[{"address": str(btc.CCoinAddress.from_scriptPubKey( |
||||
btc.CScript(b"\x00").to_p2sh_scriptPubKey())), |
||||
"value": amount_sats}]) |
||||
tx.vin[0].nSequence = 0xffffffff - 2 # mark as replaceable |
||||
success, msg = wallet.sign_tx(tx, {0: (wallet.addr_to_script(addr), 10**8)}) |
||||
success = jm_single().bc_interface.pushtx(tx.serialize()) |
||||
orig_tx = tx.clone() |
||||
|
||||
bumped_tx = create_bumped_tx(tx, 2000, wallet) |
||||
|
||||
assert orig_tx.vin[0] == bumped_tx.vin[0] |
||||
assert (orig_tx.vout[0].nValue - bumped_tx.vout[0].nValue) in (111, 113) |
||||
|
||||
def test_create_bumped_tx_output_index(setup_wallet): |
||||
# tests that the bumped transaction deducts its fee from the specified |
||||
# output even if it is an external wallet address |
||||
wallet = setup_wallet[0] |
||||
wallet_service = setup_wallet[1] |
||||
wallet_service.resync_wallet() |
||||
addr = wallet.get_external_addr(0) |
||||
utxo = fund_wallet_addr(wallet, addr) |
||||
amount_sats = 10**7 |
||||
tx = btc.mktx([utxo], |
||||
[{"address": str(btc.CCoinAddress.from_scriptPubKey( |
||||
btc.CScript(b"\x00").to_p2sh_scriptPubKey())), |
||||
"value": amount_sats}, |
||||
{"address": wallet.get_internal_addr(0), |
||||
"value": 10**8 - amount_sats - 142}]) |
||||
tx.vin[0].nSequence = 0xffffffff - 2 # mark as replaceable |
||||
success, msg = wallet.sign_tx(tx, {0: (wallet.addr_to_script(addr), 10**8)}) |
||||
success = jm_single().bc_interface.pushtx(tx.serialize()) |
||||
orig_tx = tx.clone() |
||||
|
||||
bumped_tx = create_bumped_tx(tx, 2000, wallet, 0) |
||||
|
||||
assert orig_tx.vin[0] == bumped_tx.vin[0] |
||||
assert orig_tx.vout[1] == bumped_tx.vout[1] |
||||
assert (orig_tx.vout[0].nValue - bumped_tx.vout[0].nValue) in (142, 144) |
||||
|
||||
def test_create_bumped_tx_no_change(setup_wallet): |
||||
# tests that the bumped transaction is the same as the original if fees |
||||
# cannot be deducted |
||||
wallet = setup_wallet[0] |
||||
wallet_service = setup_wallet[1] |
||||
wallet_service.resync_wallet() |
||||
addr = wallet.get_external_addr(0) |
||||
utxo = fund_wallet_addr(wallet, addr, 0.00002843) |
||||
amount_sats = 2730 |
||||
tx = btc.mktx([utxo], |
||||
[{"address": str(btc.CCoinAddress.from_scriptPubKey( |
||||
btc.CScript(b"\x00").to_p2sh_scriptPubKey())), |
||||
"value": amount_sats}]) |
||||
tx.vin[0].nSequence = 0xffffffff - 2 # mark as replaceable |
||||
success, msg = wallet.sign_tx(tx, {0: (wallet.addr_to_script(addr), 2843)}) |
||||
success = jm_single().bc_interface.pushtx(tx.serialize()) |
||||
orig_tx = tx.clone() |
||||
|
||||
bumped_tx = create_bumped_tx(tx, 3000, wallet) |
||||
|
||||
assert orig_tx.vin[0] == bumped_tx.vin[0] |
||||
assert orig_tx.vout[0] == bumped_tx.vout[0] |
||||
|
||||
def test_sign_and_broadcast(setup_wallet): |
||||
# tests that we can correctly sign and broadcast a replaced transaction |
||||
wallet = setup_wallet[0] |
||||
wallet_service = setup_wallet[1] |
||||
wallet_service.resync_wallet() |
||||
addr = wallet.get_external_addr(0) |
||||
utxo = fund_wallet_addr(wallet, addr) |
||||
amount_sats = 10**8 - jm_single().BITCOIN_DUST_THRESHOLD - 142 |
||||
change_sats = 10**8 - amount_sats - 142 |
||||
tx = btc.mktx([utxo], |
||||
[{"address": str(btc.CCoinAddress.from_scriptPubKey( |
||||
btc.CScript(b"\x00").to_p2sh_scriptPubKey())), |
||||
"value": amount_sats}, |
||||
{"address": wallet.get_internal_addr(0), |
||||
"value": change_sats}]) |
||||
tx.vin[0].nSequence = 0xffffffff - 2 # mark as replaceable |
||||
success, msg = wallet.sign_tx(tx, {0: (wallet.addr_to_script(addr), 10**8)}) |
||||
success = jm_single().bc_interface.pushtx(tx.serialize()) |
||||
orig_tx = tx.clone() |
||||
|
||||
bumped_tx = create_bumped_tx(tx, 2000, wallet) |
||||
sign_transaction(bumped_tx, orig_tx, wallet_service) |
||||
|
||||
assert jm_single().bc_interface.pushtx(bumped_tx.serialize()) == True |
||||
|
||||
def test_sign_psbt_broadcast(setup_wallet): |
||||
# tests that we can correctly sign and broadcast a replaced psbt transaction |
||||
wallet = setup_wallet[0] |
||||
wallet_service = setup_wallet[1] |
||||
wallet_service.resync_wallet() |
||||
addr = wallet.get_external_addr(0) |
||||
utxo = fund_wallet_addr(wallet, addr) |
||||
amount_sats = 10**8 - jm_single().BITCOIN_DUST_THRESHOLD - 142 |
||||
change_sats = 10**8 - amount_sats - 142 |
||||
tx = btc.mktx([utxo], |
||||
[{"address": str(btc.CCoinAddress.from_scriptPubKey( |
||||
btc.CScript(b"\x00").to_p2sh_scriptPubKey())), |
||||
"value": amount_sats}, |
||||
{"address": wallet.get_internal_addr(0), |
||||
"value": change_sats}]) |
||||
tx.vin[0].nSequence = 0xffffffff - 2 # mark as replaceable |
||||
success, msg = wallet.sign_tx(tx, {0: (wallet.addr_to_script(addr), 10**8)}) |
||||
success = jm_single().bc_interface.pushtx(tx.serialize()) |
||||
orig_tx = tx.clone() |
||||
|
||||
bumped_tx = create_bumped_tx(tx, 2000, wallet) |
||||
psbt = sign_psbt(bumped_tx, orig_tx, wallet_service) |
||||
|
||||
assert jm_single().bc_interface.pushtx(psbt.extract_transaction().serialize()) == True |
||||
|
||||
|
||||
@pytest.fixture(scope='module') |
||||
def setup_wallet(request): |
||||
load_test_config() |
||||
btc.select_chain_params("bitcoin/regtest") |
||||
#see note in cryptoengine.py: |
||||
cryptoengine.BTC_P2WPKH.VBYTE = 100 |
||||
jm_single().bc_interface.tick_forward_chain_interval = 2 |
||||
jm_single().config.set('BLOCKCHAIN', 'network', 'testnet') |
||||
mnemonic = 'zoo zoo zoo zoo zoo zoo zoo zoo zoo zoo zoo abstract' |
||||
entropy = SegwitWallet.entropy_from_mnemonic(mnemonic) |
||||
storage = VolatileStorage() |
||||
SegwitWallet.initialize( |
||||
storage, get_network(), entropy=entropy, max_mixdepth=1) |
||||
wallet = SegwitWallet(storage) |
||||
wallet_service = WalletService(wallet) |
||||
return [wallet, wallet_service] |
||||
Loading…
Reference in new issue