Browse Source

Merge JoinMarket-Org/joinmarket-clientserver#1462: Refactor and cleanup of blockchaininterface and related

3fc74fbe2a Refactor and cleanup of blockchaininterface and related (Kristaps Kaupe)

Pull request description:

  Summary of changes:

  * Add typehints to `jmclient/jmclient/blockchaininterface.py` and `jmclient/jmclient/jsonrpc.py`;
  * Move all methods called by external code to `BlockchainInterface` base class or add abstract methods there;
  * <del>Remove broken `ElectrumWalletInterface` (we can re-introduce it later from history if somebody wants to fix it);</del><ins> (done in #1477)</ins>
  * More dummy abstract method overrides for `DummyBlockchainInterface` (for tests);
  * Alphabetical ordering of imports and other minor stuff;
  * Behaviour change - previously fee estimation would fail if it could not get mempoolminfee, now will go with default 10 sat/vB.

  #1460 was part of this, but did separate PR for that one, as it is simpler to review small refactoring changes.

  This should make it more easy to: 1) write new code that uses blockchaininterface, 2) write new alternative implementations of `BlockchainInterface`.

Top commit has no ACKs.

Tree-SHA512: 7699de0419c1006ff3c6ac5da7d26055b8e3508c424b5bdd3117fea575d30ab21e8617b2d50ae7c8b2e76997d6f4b14781bb56fcdad4efc081d533a713666a31
master
Kristaps Kaupe 2 years ago
parent
commit
a6b13c2af1
No known key found for this signature in database
GPG Key ID: 33E472FE870C7E5D
  1. 388
      jmclient/jmclient/blockchaininterface.py
  2. 33
      jmclient/jmclient/jsonrpc.py
  3. 5
      jmclient/jmclient/payjoin.py
  4. 18
      jmclient/jmclient/wallet_service.py
  5. 73
      jmclient/test/commontest.py
  6. 4
      jmclient/test/test_psbt_wallet.py

388
jmclient/jmclient/blockchaininterface.py

@ -5,14 +5,15 @@ import sys
import time
from abc import ABC, abstractmethod
from decimal import Decimal
from typing import Optional, Tuple
from typing import *
from twisted.internet import reactor, task
import jmbitcoin as btc
from jmbase import bintohex, hextobin, stop_reactor
from jmbase.support import get_log, jmprint, EXIT_FAILURE
from jmclient.configure import jm_single
from jmclient.jsonrpc import JsonRpcConnectionError, JsonRpcError
from jmclient.jsonrpc import JsonRpc, JsonRpcConnectionError, JsonRpcError
# an inaccessible blockheight; consider rewriting in 1900 years
@ -22,11 +23,11 @@ log = get_log()
class BlockchainInterface(ABC):
def __init__(self):
def __init__(self) -> None:
pass
@abstractmethod
def is_address_imported(self, addr):
def is_address_imported(self, addr: str) -> bool:
"""checks that address is already imported"""
@abstractmethod
@ -34,28 +35,142 @@ class BlockchainInterface(ABC):
"""checks that UTXO belongs to the JM wallet"""
@abstractmethod
def pushtx(self, txhex):
"""pushes tx to the network, returns False if failed"""
def pushtx(self, txbin: bytes) -> bool:
""" Given a binary serialized valid bitcoin transaction,
broadcasts it to the network.
"""
@abstractmethod
def query_utxo_set(self, txouts, includeconfs=False):
"""
takes a utxo or a list of utxos
returns None if they are spend or unconfirmed
otherwise returns value in satoshis, address and output script
optionally return the coin age in number of blocks
def query_utxo_set(self,
txouts: Union[Tuple[bytes, int], List[Tuple[bytes, int]]],
includeconfs: bool = False,
include_mempool: bool = True) -> List[Optional[dict]]:
"""If txout is either (a) a single utxo in (txidbin, n) form,
or a list of the same, returns, as a list for each txout item,
the result of gettxout from the bitcoind rpc for those utxos;
if any utxo is invalid, None is returned.
includeconfs: if this is True, the current number of confirmations
of the prescribed utxo is included in the returned result dict.
include_mempool: if True, the contents of the mempool are included;
this *both* means that utxos that are spent in in-mempool transactions
are *not* returned, *and* means that utxos that are created in the
mempool but have zero confirmations *are* returned.
If the utxo is of a non-standard type such that there is no address,
the address field in the dict is None.
"""
# address and output script contain the same information btw
@abstractmethod
def get_wallet_rescan_status(self) -> Tuple[bool, Optional[Decimal]]:
"""Returns pair of True/False is wallet currently rescanning and
Optional[Decimal] with current rescan progress status."""
def import_addresses_if_needed(self, addresses, wallet_name):
@abstractmethod
def rescanblockchain(self, start_height: int, end_height: Optional[int] = None) -> None:
"""Rescan the local blockchain for wallet related transactions.
"""
@abstractmethod
def import_addresses_if_needed(self, addresses: Set[str], wallet_name: str) -> bool:
"""import addresses to the underlying blockchain interface if needed
returns True if the sync call needs to do a system exit"""
@abstractmethod
def import_addresses(self, addr_list: Iterable[str], wallet_name: str,
restart_cb: Optional[Callable[[str], None]] = None) -> None:
"""Imports addresses in a batch during initial sync.
Refuses to proceed if keys are found to be under control
of another account/label (see console output), and quits.
"""
@abstractmethod
def list_transactions(self, num: int, skip: int = 0) -> List[dict]:
""" Return a list of the last `num` transactions seen
in the wallet (under any label/account), optionally
skipping some.
"""
@abstractmethod
def get_deser_from_gettransaction(self, rpcretval: dict) -> Optional[btc.CMutableTransaction]:
"""Get full transaction deserialization from a call
to get_transaction().
"""
@abstractmethod
def get_transaction(self, txid: bytes) -> Optional[dict]:
""" Argument txid is passed in binary.
Returns a serialized transaction for txid txid,
in hex as returned by Bitcoin Core rpc, or None
if no transaction can be retrieved. Works also for
watch-only wallets.
"""
@abstractmethod
def get_block(self, blockheight: int) -> str:
"""Returns full hex serialized block at a given height.
"""
@abstractmethod
def get_current_block_height(self) -> int:
"""Returns the height of the most-work fully-validated chain.
"""
@abstractmethod
def get_best_block_hash(self) -> str:
"""Returns the hash of the best (tip) block in the most-work
fully-validated chain.
"""
@abstractmethod
def get_best_block_median_time(self) -> int:
"""Returns median time for the current best block.
"""
@abstractmethod
def get_block_height(self, blockhash: str) -> int:
"""Returns the block height for a specific block hash.
"""
@abstractmethod
def get_block_time(self, blockhash: str) -> int:
"""Returns the block time expressed in UNIX epoch time for a specific
block hash.
"""
@abstractmethod
def get_block_hash(self, height: int) -> str:
"""Returns hash of block in best-block-chain at height provided.
"""
@abstractmethod
def get_tx_merkle_branch(self, txid: str,
blockhash: Optional[str] = None) -> bytes:
"""TODO: describe method.
"""
@abstractmethod
def verify_tx_merkle_branch(self, txid: str, block_height: int,
merkle_branch: bytes) -> bool:
"""TODO: describe method.
"""
@abstractmethod
def listaddressgroupings(self) -> list:
"""Lists groups of addresses which have had their common ownership
made public by common use as inputs or as the resulting change
in past transactions.
"""
@abstractmethod
def listunspent(self, minconf: Optional[int] = None) -> List[dict]:
"""Returns list of unspent transaction output info dicts,
optionally filtering by minimum confirmations.
"""
@abstractmethod
def testmempoolaccept(self, rawtx: str) -> bool:
"""Checks that raw transaction would be accepted by mempool.
"""
@abstractmethod
def _get_mempool_min_fee(self) -> Optional[int]:
"""Returns minimum mempool fee as a floor to avoid relay problems
@ -70,6 +185,48 @@ class BlockchainInterface(ABC):
or None in case of error.
"""
def yield_transactions(self) -> Generator[dict, None, None]:
""" Generates a lazily fetched sequence of transactions seen in the
wallet (under any label/account), yielded in newest-first order. Care
is taken to avoid yielding duplicates even when new transactions are
actively being added to the wallet while the iteration is ongoing.
"""
num, skip = 1, 0
txs = self.list_transactions(num, skip)
if not txs:
return
yielded_tx = txs[0]
yield yielded_tx
while True:
num *= 2
txs = self.list_transactions(num, skip)
if not txs:
return
try:
idx = [(tx['txid'], tx['vout'], tx['category']) for tx in txs
].index((yielded_tx['txid'], yielded_tx['vout'],
yielded_tx['category']))
except ValueError:
skip += num
continue
for tx in reversed(txs[:idx]):
yielded_tx = tx # inefficient but more obvious
yield yielded_tx
if len(txs) < num:
return
skip += num - 1
def get_unspent_indices(self, transaction: btc.CTransaction) -> List[int]:
""" Given a CTransaction object, identify the list of
indices of outputs which are unspent (returned as list of ints).
"""
bintxid = transaction.GetTxid()[::-1]
res = self.query_utxo_set([(bintxid, i) for i in range(
len(transaction.vout))])
# QUS returns 'None' for spent outputs, so filter them out
# and return the indices of the others:
return [i for i, val in enumerate(res) if val]
def _fee_per_kb_has_been_manually_set(self, tx_fees: int) -> bool:
"""If the block target (tx_fees) is higher than 1000, interpret it
as manually set fee sats/kvB.
@ -147,15 +304,21 @@ class BlockchainInterface(ABC):
log.info(msg + ": " + btc.fee_per_kb_to_str(retval))
return int(retval)
def core_proof_to_merkle_branch(self, core_proof: str) -> bytes:
core_proof = binascii.unhexlify(core_proof)
#first 80 bytes of a proof given by core are just a block header
#so we can save space by replacing it with a 4-byte block height
return core_proof[80:]
class BitcoinCoreInterface(BlockchainInterface):
def __init__(self, jsonRpc, network, wallet_name):
def __init__(self, jsonRpc: JsonRpc, network: str, wallet_name: str) -> None:
super().__init__()
self.jsonRpc = jsonRpc
blockchainInfo = self._rpc("getblockchaininfo", [])
if not blockchainInfo:
# see note in BitcoinCoreInterface.rpc - here
# see note in BitcoinCoreInterface._rpc() - here
# we have to create this object before reactor start,
# so reactor is not stopped, so we override the 'swallowing'
# of the Exception that happened in self._rpc():
@ -185,26 +348,23 @@ class BitcoinCoreInterface(BlockchainInterface):
"setting in joinmarket.cfg) instead. See docs/USAGE.md "
"for details.")
def is_address_imported(self, addr):
def is_address_imported(self, addr: str) -> bool:
return len(self._rpc('getaddressinfo', [addr])['labels']) > 0
def get_block(self, blockheight):
def get_block(self, blockheight: int) -> str:
"""Returns full serialized block at a given height.
"""
block_hash = self.get_block_hash(blockheight)
block = self._rpc('getblock', [block_hash, False])
if not block:
return False
return block
return self._rpc('getblock', [block_hash, 0])
def rescanblockchain(self, start_height: int, end_height: Optional[int] = None) -> None:
# Threading is not used in Joinmarket but due to blocking
# nature of this very slow RPC call, we need to fire and forget.
from threading import Thread
Thread(target=self.rescan_in_thread, args=(start_height,),
Thread(target=self._rescan_in_thread, args=(start_height,),
daemon=True).start()
def rescan_in_thread(self, start_height: int) -> None:
def _rescan_in_thread(self, start_height: int) -> None:
""" In order to not conflict with the existing main
JsonRPC connection in the main thread, this rescanning
thread creates a distinct JsonRPC object, just to make
@ -239,7 +399,7 @@ class BitcoinCoreInterface(BlockchainInterface):
else:
return False, None
def _rpc(self, method: str, args: Optional[list] = None):
def _rpc(self, method: str, args: Union[dict, list] = []) -> Any:
""" Returns the result of an rpc call to the Bitcoin Core RPC API.
If the connection is permanently or unrecognizably broken, None
is returned *and the reactor is shutdown* (because we consider this
@ -274,15 +434,11 @@ class BitcoinCoreInterface(BlockchainInterface):
# so this is handled elsewhere in BitcoinCoreInterface.
return res
def is_address_labeled(self, utxo, walletname):
def is_address_labeled(self, utxo: dict, walletname: str) -> bool:
return ("label" in utxo and utxo["label"] == walletname)
def import_addresses(self, addr_list, wallet_name, restart_cb=None):
"""Imports addresses in a batch during initial sync.
Refuses to proceed if keys are found to be under control
of another account/label (see console output), and quits.
Do NOT use for in-run imports, use rpc('importaddress',..) instead.
"""
def import_addresses(self, addr_list: Iterable[str], wallet_name: str,
restart_cb: Callable[[str], None] = None) -> None:
requests = []
for addr in addr_list:
requests.append({
@ -313,7 +469,7 @@ class BitcoinCoreInterface(BlockchainInterface):
jmprint(fatal_msg, "important")
sys.exit(EXIT_FAILURE)
def import_addresses_if_needed(self, addresses, wallet_name):
def import_addresses_if_needed(self, addresses: Set[str], wallet_name: str) -> bool:
if wallet_name in self._rpc('listlabels', []):
imported_addresses = set(self._rpc('getaddressesbylabel',
[wallet_name]).keys())
@ -324,61 +480,17 @@ class BitcoinCoreInterface(BlockchainInterface):
self.import_addresses(addresses - imported_addresses, wallet_name)
return import_needed
def _yield_transactions(self):
""" Generates a lazily fetched sequence of transactions seen in the
wallet (under any label/account), yielded in newest-first order. Care
is taken to avoid yielding duplicates even when new transactions are
actively being added to the wallet while the iteration is ongoing.
"""
num, skip = 1, 0
txs = self.list_transactions(num, skip)
if not txs:
return
yielded_tx = txs[0]
yield yielded_tx
while True:
num *= 2
txs = self.list_transactions(num, skip)
if not txs:
return
try:
idx = [(tx['txid'], tx['vout'], tx['category']) for tx in txs
].index((yielded_tx['txid'], yielded_tx['vout'],
yielded_tx['category']))
except ValueError:
skip += num
continue
for tx in reversed(txs[:idx]):
yielded_tx = tx # inefficient but more obvious
yield yielded_tx
if len(txs) < num:
return
skip += num - 1
def get_deser_from_gettransaction(self, rpcretval):
"""Get full transaction deserialization from a call
to `gettransaction`
"""
def get_deser_from_gettransaction(self, rpcretval: dict) -> Optional[btc.CMutableTransaction]:
if not "hex" in rpcretval:
log.info("Malformed gettransaction output")
return None
return btc.CMutableTransaction.deserialize(
hextobin(rpcretval["hex"]))
def list_transactions(self, num, skip=0):
""" Return a list of the last `num` transactions seen
in the wallet (under any label/account), optionally
skipping some.
"""
def list_transactions(self, num: int, skip: int = 0) -> List[dict]:
return self._rpc("listtransactions", ["*", num, skip, True])
def get_transaction(self, txid):
""" Argument txid is passed in binary.
Returns a serialized transaction for txid txid,
in hex as returned by Bitcoin Core rpc, or None
if no transaction can be retrieved. Works also for
watch-only wallets.
"""
def get_transaction(self, txid: bytes) -> Optional[dict]:
htxid = bintohex(txid)
try:
res = self._rpc("gettransaction", [htxid, True])
@ -398,7 +510,7 @@ class BitcoinCoreInterface(BlockchainInterface):
return None
return res
def pushtx(self, txbin):
def pushtx(self, txbin: bytes) -> bool:
""" Given a binary serialized valid bitcoin transaction,
broadcasts it to the network.
"""
@ -413,24 +525,14 @@ class BitcoinCoreInterface(BlockchainInterface):
return False
return True
def query_utxo_set(self, txout, includeconfs=False, include_mempool=True):
"""If txout is either (a) a single utxo in (txidbin, n) form,
or a list of the same, returns, as a list for each txout item,
the result of gettxout from the bitcoind rpc for those utxos;
if any utxo is invalid, None is returned.
includeconfs: if this is True, the current number of confirmations
of the prescribed utxo is included in the returned result dict.
include_mempool: if True, the contents of the mempool are included;
this *both* means that utxos that are spent in in-mempool transactions
are *not* returned, *and* means that utxos that are created in the
mempool but have zero confirmations *are* returned.
If the utxo is of a non-standard type such that there is no address,
the address field in the dict is None.
"""
if not isinstance(txout, list):
txout = [txout]
def query_utxo_set(self,
txouts: Union[Tuple[bytes, int], List[Tuple[bytes, int]]],
includeconfs: bool = False,
include_mempool: bool = True) -> List[Optional[dict]]:
if not isinstance(txouts, list):
txouts = [txouts]
result = []
for txo in txout:
for txo in txouts:
txo_hex = bintohex(txo[0])
if len(txo_hex) != 64:
log.warn("Invalid utxo format, ignoring: {}".format(txo))
@ -454,17 +556,6 @@ class BitcoinCoreInterface(BlockchainInterface):
result.append(result_dict)
return result
def get_unspent_indices(self, transaction):
""" Given a CTransaction object, identify the list of
indices of outputs which are unspent (returned as list of ints).
"""
bintxid = transaction.GetTxid()[::-1]
res = self.query_utxo_set([(bintxid, i) for i in range(
len(transaction.vout))])
# QUS returns 'None' for spent outputs, so filter them out
# and return the indices of the others:
return [i for i, val in enumerate(res) if val]
def _get_mempool_min_fee(self) -> Optional[int]:
rpc_result = self._rpc('getmempoolinfo')
if not rpc_result:
@ -493,34 +584,33 @@ class BitcoinCoreInterface(BlockchainInterface):
log.warn("Could not source a fee estimate from Core")
return None
def get_current_block_height(self):
def get_current_block_height(self) -> int:
try:
res = self._rpc("getblockcount", [])
return self._rpc("getblockcount", [])
except JsonRpcError as e:
log.error("Getblockcount RPC failed with: %i, %s" % (
raise RuntimeError("Getblockcount RPC failed with: %i, %s" % (
e.code, e.message))
res = None
return res
def get_best_block_hash(self):
def get_best_block_hash(self) -> str:
return self._rpc('getbestblockhash', [])
def get_best_block_median_time(self):
def get_best_block_median_time(self) -> int:
return self._rpc('getblockchaininfo', [])['mediantime']
def _get_block_header_data(self, blockhash, key):
def _get_block_header_data(self, blockhash: str, key: str) -> Any:
return self._rpc('getblockheader', [blockhash])[key]
def get_block_height(self, blockhash):
def get_block_height(self, blockhash: str) -> int:
return self._get_block_header_data(blockhash, 'height')
def get_block_time(self, blockhash):
def get_block_time(self, blockhash: str) -> int:
return self._get_block_header_data(blockhash, 'time')
def get_block_hash(self, height):
def get_block_hash(self, height: int) -> str:
return self._rpc("getblockhash", [height])
def get_tx_merkle_branch(self, txid, blockhash=None):
def get_tx_merkle_branch(self, txid: str,
blockhash: Optional[str] = None) -> bytes:
if not blockhash:
tx = self._rpc("gettransaction", [txid])
if tx["confirmations"] < 1:
@ -532,23 +622,18 @@ class BitcoinCoreInterface(BlockchainInterface):
raise ValueError("Block containing transaction is pruned")
return self.core_proof_to_merkle_branch(core_proof)
def core_proof_to_merkle_branch(self, core_proof):
core_proof = binascii.unhexlify(core_proof)
#first 80 bytes of a proof given by core are just a block header
#so we can save space by replacing it with a 4-byte block height
return core_proof[80:]
def verify_tx_merkle_branch(self, txid, block_height, merkle_branch):
def verify_tx_merkle_branch(self, txid: str, block_height: int,
merkle_branch: bytes) -> bool:
block_hash = self.get_block_hash(block_height)
core_proof = self._rpc("getblockheader", [block_hash, False]) + \
binascii.hexlify(merkle_branch).decode()
ret = self._rpc("verifytxoutproof", [core_proof])
return len(ret) == 1 and ret[0] == txid
def listaddressgroupings(self):
def listaddressgroupings(self) -> list:
return self._rpc('listaddressgroupings', [])
def listunspent(self, minconf=None):
def listunspent(self, minconf: Optional[int] = None) -> List[dict]:
listunspent_args = []
if 'listunspent_args' in jm_single().config.options('POLICY'):
listunspent_args = ast.literal_eval(jm_single().config.get(
@ -557,8 +642,9 @@ class BitcoinCoreInterface(BlockchainInterface):
listunspent_args[0] = minconf
return self._rpc('listunspent', listunspent_args)
def testmempoolaccept(self, rawtx):
return self._rpc('testmempoolaccept', [[rawtx]])
def testmempoolaccept(self, rawtx: str) -> bool:
res = self._rpc('testmempoolaccept', [[rawtx]])
return res[0]["allowed"]
class RegtestBitcoinCoreMixin():
@ -566,7 +652,7 @@ class RegtestBitcoinCoreMixin():
This Mixin provides helper functions that are used in Interface classes
requiring some functionality only useful on the regtest network.
"""
def tick_forward_chain(self, n):
def tick_forward_chain(self, n: int) -> None:
"""
Special method for regtest only;
instruct to mine n blocks.
@ -581,7 +667,7 @@ class RegtestBitcoinCoreMixin():
"Failed to generate blocks, looks like the bitcoin daemon \
has been shut down. Ignoring.")
def grab_coins(self, receiving_addr, amt=50):
def grab_coins(self, receiving_addr: str, amt: int = 50) -> str:
"""
NOTE! amt is passed in Coins, not Satoshis!
Special method for regtest only:
@ -610,17 +696,17 @@ class RegtestBitcoinCoreMixin():
class BitcoinCoreNoHistoryInterface(BitcoinCoreInterface, RegtestBitcoinCoreMixin):
def __init__(self, jsonRpc, network, wallet_name):
def __init__(self, jsonRpc: JsonRpc, network: str, wallet_name: str) -> None:
super().__init__(jsonRpc, network, wallet_name)
self.import_addresses_call_count = 0
self.wallet_name = None
self.scan_result = None
def import_addresses_if_needed(self, addresses, wallet_name):
def import_addresses_if_needed(self, addresses: Set[str], wallet_name: str) -> bool:
self.import_addresses_call_count += 1
if self.import_addresses_call_count == 1:
self.wallet_name = wallet_name
addr_list = ["addr(" + a + ")" for a in addresses]
addr_list = [btc.get_address_descriptor(a) for a in addresses]
log.debug("Starting scan of UTXO set")
st = time.time()
self._rpc("scantxoutset", ["abort", []])
@ -635,19 +721,23 @@ class BitcoinCoreNoHistoryInterface(BitcoinCoreInterface, RegtestBitcoinCoreMixi
assert False
return False
def _yield_transactions(self):
def yield_transactions(self) -> Generator[dict, None, None]:
for u in self.scan_result["unspents"]:
tx = {"category": "receive", "address":
btc.get_address_from_descriptor(u["desc"])}
yield tx
def list_transactions(self, num):
def list_transactions(self, num: int, skip: int = 0) -> List[dict]:
return []
def listaddressgroupings(self):
def listaddressgroupings(self) -> list:
raise RuntimeError("default sync not supported by bitcoin-rpc-nohistory, use --recoversync")
def listunspent(self):
def listunspent(self, minconf: Optional[int] = None) -> List[dict]:
if minconf == 0:
log.warning(
"Unconfirmed transactions are not seen by "
"bitcoin-rpc-nohistory.")
return [{
"address": btc.get_address_from_descriptor(u["desc"]),
"label": self.wallet_name,
@ -658,7 +748,7 @@ class BitcoinCoreNoHistoryInterface(BitcoinCoreInterface, RegtestBitcoinCoreMixi
"amount": u["amount"]
} for u in self.scan_result["unspents"]]
def set_wallet_no_history(self, wallet):
def set_wallet_no_history(self, wallet) -> None:
#make wallet-tool not display any new addresses
#because no-history cant tell if an address is used and empty
#so this is necessary to avoid address reuse
@ -667,7 +757,7 @@ class BitcoinCoreNoHistoryInterface(BitcoinCoreInterface, RegtestBitcoinCoreMixi
# avoidance of address reuse
wallet.disable_new_scripts = True
def tick_forward_chain(self, n):
def tick_forward_chain(self, n: int) -> None:
self.destn_addr = self._rpc("getnewaddress", [])
super().tick_forward_chain(n)
@ -678,7 +768,7 @@ class BitcoinCoreNoHistoryInterface(BitcoinCoreInterface, RegtestBitcoinCoreMixi
# with > 100 blocks.
class RegtestBitcoinCoreInterface(BitcoinCoreInterface, RegtestBitcoinCoreMixin): #pragma: no cover
def __init__(self, jsonRpc, wallet_name):
def __init__(self, jsonRpc: JsonRpc, wallet_name: str) -> None:
super().__init__(jsonRpc, 'regtest', wallet_name)
self.pushtx_failure_prob = 0
self.tick_forward_chain_interval = -1
@ -694,7 +784,7 @@ class RegtestBitcoinCoreInterface(BitcoinCoreInterface, RegtestBitcoinCoreMixin)
return jm_single().config.getint("POLICY",
"absurd_fee_per_kb") + 100
def tickchain(self):
def tickchain(self) -> None:
if self.tick_forward_chain_interval < 0:
log.debug('not ticking forward chain')
self.tickchainloop.stop()
@ -704,26 +794,26 @@ class RegtestBitcoinCoreInterface(BitcoinCoreInterface, RegtestBitcoinCoreMixin)
return
self.tick_forward_chain(1)
def simulate_blocks(self):
def simulate_blocks(self) -> None:
self.tickchainloop = task.LoopingCall(self.tickchain)
self.tickchainloop.start(self.tick_forward_chain_interval)
self.simulating = True
def pushtx(self, txhex):
def pushtx(self, txbin: bytes) -> bool:
if self.pushtx_failure_prob != 0 and random.random() <\
self.pushtx_failure_prob:
log.debug('randomly not broadcasting %0.1f%% of the time' %
(self.pushtx_failure_prob * 100))
return True
ret = super().pushtx(txhex)
ret = super().pushtx(txbin)
if not self.simulating and self.tick_forward_chain_interval > 0:
log.debug('will call tfc after ' + str(self.tick_forward_chain_interval) + ' seconds.')
reactor.callLater(self.tick_forward_chain_interval,
self.tick_forward_chain, 1)
return ret
def get_received_by_addr(self, addresses):
def get_received_by_addr(self, addresses: List[str]) -> dict:
# NB This will NOT return coinbase coins (but wont matter in our use
# case). allow importaddress to fail in case the address is already
# in the wallet

33
jmclient/jmclient/jsonrpc.py

@ -1,5 +1,6 @@
# Copyright (C) 2013,2015 by Daniel Kraft <d@domob.eu>
# Copyright (C) 2014 by phelix / blockchained.com
# Copyright (C) 2016-2023 JoinMarket developers
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
@ -19,12 +20,14 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import errno
import socket
import base64
import errno
import http.client
import json
import socket
from decimal import Decimal
from typing import Any, Union
from jmbase import get_log
jlog = get_log()
@ -54,7 +57,7 @@ class JsonRpc(object):
to connect to Bitcoin.
"""
def __init__(self, host, port, user, password, url=""):
def __init__(self, host: str, port: int, user: str, password: str, url: str = "") -> None:
self.host = host
self.port = int(port)
self.conn = http.client.HTTPConnection(self.host, self.port)
@ -62,16 +65,16 @@ class JsonRpc(object):
self.url = url
self.queryId = 1
def setURL(self, url):
def setURL(self, url: str) -> None:
self.url = url
def queryHTTP(self, obj):
def queryHTTP(self, obj: dict) -> Union[dict, str]:
"""
Send an appropriate HTTP query to the server. The JSON-RPC
request should be (as object) in 'obj'. If the call succeeds,
the resulting JSON object is returned. In case of an error
with the connection (not JSON-RPC itself), an exception is raised.
"""
Send an appropriate HTTP query to the server. The JSON-RPC
request should be (as object) in 'obj'. If the call succeeds,
the resulting JSON object is returned. In case of an error
with the connection (not JSON-RPC itself), an exception is raised.
"""
headers = {"User-Agent": "joinmarket",
"Content-Type": "application/json",
@ -108,19 +111,16 @@ class JsonRpc(object):
jlog.warn('Connection was reset, attempting reconnect.')
self.conn.close()
self.conn.connect()
continue
elif e.errno == errno.EPIPE:
jlog.warn('Connection had broken pipe, attempting '
'reconnect.')
self.conn.close()
self.conn.connect()
continue
elif e.errno == errno.EPROTOTYPE:
jlog.warn('Connection had protocol wrong type for socket '
'error, attempting reconnect.')
self.conn.close()
self.conn.connect()
continue
elif e.errno == errno.ECONNREFUSED:
# Will not reattempt in this case:
jlog.error("Connection refused.")
@ -132,12 +132,11 @@ class JsonRpc(object):
except Exception as exc:
raise JsonRpcConnectionError("JSON-RPC connection failed. Err:" +
repr(exc))
break
def call(self, method, params):
def call(self, method: str, params: Union[dict, list]) -> Any:
"""
Call a method over JSON-RPC.
"""
Call a method over JSON-RPC.
"""
currentId = self.queryId
self.queryId += 1

5
jmclient/jmclient/payjoin.py

@ -745,9 +745,8 @@ class PayjoinConverter(object):
# it is still safer to at least verify the validity of the signatures
# at this stage, to ensure no misbehaviour with using inputs
# that are not signed correctly:
res = jm_single().bc_interface.testmempoolaccept(bintohex(
self.manager.payment_tx.serialize()))
if not res[0]["allowed"]:
if not jm_single().bc_interface.testmempoolaccept(bintohex(
self.manager.payment_tx.serialize())):
return (False, "Proposed transaction was "
"rejected from mempool.",
"original-psbt-rejected")

18
jmclient/jmclient/wallet_service.py

@ -4,7 +4,7 @@ import collections
import itertools
import time
import sys
from typing import Optional, Tuple
from typing import Dict, List, Optional, Set, Tuple
from decimal import Decimal
from copy import deepcopy
from twisted.internet import reactor
@ -292,13 +292,13 @@ class WalletService(Service):
self.disable_utxo(*utxo)
def _yield_new_transactions(self):
""" Constrains the sequence generated by bci._yield_transactions so
""" Constrains the sequence generated by bci.yield_transactions so
that it stops just before it would yield the newest transaction
previously yielded by _yield_new_transactions.
"""
since_txid = self.last_seen_txid
last = True
for tx in self.bci._yield_transactions():
for tx in self.bci.yield_transactions():
if 'txid' in tx:
txid = tx['txid']
if txid == since_txid:
@ -495,7 +495,7 @@ class WalletService(Service):
# Don't attempt updates on transactions that existed
# before startup
self.last_seen_txid = next(
(tx['txid'] for tx in self.bci._yield_transactions()
(tx['txid'] for tx in self.bci.yield_transactions()
if 'txid' in tx), None)
if isinstance(self.bci, BitcoinCoreNoHistoryInterface):
self.bci.set_wallet_no_history(self.wallet)
@ -709,7 +709,7 @@ class WalletService(Service):
"""
res = []
processed_txids = set()
for r in self.bci._yield_transactions():
for r in self.bci.yield_transactions():
txid = r["txid"]
if txid not in processed_txids:
tx = self.bci.get_transaction(hextobin(txid))
@ -743,7 +743,7 @@ class WalletService(Service):
it is not a wallet tx and so can't be queried).
"""
txid = tx.GetTxid()[::-1]
return self.get_block_height(self.bci.get_transaction(
return self.bci.get_block_height(self.bci.get_transaction(
txid)["blockhash"])
def sync_addresses(self):
@ -765,7 +765,7 @@ class WalletService(Service):
if isinstance(self.wallet, FidelityBondMixin):
tx_receive = []
burner_txes = []
for tx in self.bci._yield_transactions():
for tx in self.bci.yield_transactions():
if tx['category'] == 'receive':
tx_receive.append(tx)
elif tx["category"] == "send":
@ -788,7 +788,7 @@ class WalletService(Service):
else:
#not fidelity bond wallet, significantly faster sync
used_addresses_gen = set(tx['address']
for tx in self.bci._yield_transactions()
for tx in self.bci.yield_transactions()
if tx['category'] == 'receive')
# needed for address-reuse check:
self.used_addresses = used_addresses_gen
@ -964,7 +964,7 @@ class WalletService(Service):
self.import_addr(addr)
return addr
def collect_addresses_init(self):
def collect_addresses_init(self) -> Tuple[Set[str], Dict[int, List[int]]]:
""" Collects the "current" set of addresses,
as defined by the indices recorded in the wallet's
index cache (persisted in the wallet file usually).

73
jmclient/test/commontest.py

@ -4,16 +4,15 @@
import os
import random
from decimal import Decimal
from typing import Optional, Tuple
from typing import Callable, List, Optional, Set, Tuple, Union
import jmbitcoin as btc
from jmbase import (get_log, hextobin, bintohex, dictchanger)
from jmbase.support import chunks
from jmclient import (
jm_single, open_test_wallet_maybe, estimate_tx_fee,
BlockchainInterface, BIP32Wallet, BaseWallet,
SegwitWallet, WalletService, BTC_P2SH_P2WPKH)
from jmbase.support import chunks
import jmbitcoin as btc
log = get_log()
'''This code is intended to provide
@ -30,28 +29,58 @@ default_max_cj_fee = (1, float('inf'))
def dummy_accept_callback(tx, destaddr, actual_amount, fee_est,
custom_change_addr):
return True
def dummy_info_callback(msg):
pass
class DummyBlockchainInterface(BlockchainInterface):
def __init__(self):
def __init__(self) -> None:
self.fake_query_results = None
self.qusfail = False
self.cbh = 1
self.default_confs = 20
self.confs_for_qus = {}
def rpc(self, a, b):
return None
def sync_addresses(self, wallet):
# Dummy abstract method overrides of base class
def is_address_imported(self, addr: str) -> bool:
pass
def sync_unspent(self, wallet):
def is_address_labeled(self, utxo: dict, walletname: str) -> bool:
pass
def import_addresses(self, addr_list, wallet_name, restart_cb=None):
def import_addresses_if_needed(self, addresses: Set[str], wallet_name: str) -> bool:
pass
def is_address_imported(self, addr):
def import_addresses(self, addr_list: List[str], wallet_name: str,
restart_cb: Callable[[str], None] = None) -> None:
pass
def is_address_labeled(self, utxo: dict, walletname: str) -> bool:
def list_transactions(self, num: int, skip: int = 0) -> List[dict]:
pass
def get_deser_from_gettransaction(self, rpcretval: dict) -> Optional[btc.CMutableTransaction]:
pass
def get_transaction(self, txid: bytes) -> Optional[dict]:
pass
def get_block(self, blockheight: int) -> Optional[str]:
pass
def get_best_block_hash(self) -> str:
pass
def get_best_block_median_time(self) -> int:
pass
def get_block_height(self, blockhash: str) -> int:
pass
def get_block_time(self, blockhash: str) -> int:
pass
def get_block_hash(self, height: int) -> str:
pass
def get_tx_merkle_branch(self, txid: str,
blockhash: Optional[str] = None) -> bytes:
pass
def verify_tx_merkle_branch(self, txid: str, block_height: int,
merkle_branch: bytes) -> bool:
pass
def listaddressgroupings(self) -> list:
pass
def listunspent(self, minconf: Optional[int] = None) -> List[dict]:
pass
def testmempoolaccept(self, rawtx: str) -> bool:
pass
def _get_mempool_min_fee(self) -> Optional[int]:
pass
@ -59,30 +88,34 @@ class DummyBlockchainInterface(BlockchainInterface):
pass
def get_wallet_rescan_status(self) -> Tuple[bool, Optional[Decimal]]:
pass
def rescanblockchain(self, start_height: int, end_height: Optional[int] = None) -> None:
pass
def get_current_block_height(self):
def get_current_block_height(self) -> int:
return 10**6
def pushtx(self, txhex):
def pushtx(self, txbin: bytes) -> bool:
return True
def insert_fake_query_results(self, fqr):
def insert_fake_query_results(self, fqr: List[dict]) -> None:
self.fake_query_results = fqr
def setQUSFail(self, state):
def setQUSFail(self, state: bool) -> None:
self.qusfail = state
def set_confs(self, confs_utxos):
def set_confs(self, confs_utxos) -> None:
# we hook specific confirmation results
# for specific utxos so that query_utxo_set
# can return a non-constant fake value.
self.confs_for_qus.update(confs_utxos)
def reset_confs(self):
def reset_confs(self) -> None:
self.confs_for_qus = {}
def query_utxo_set(self, txouts, includeconfs=False):
def query_utxo_set(self,
txouts: Union[Tuple[bytes, int], List[Tuple[bytes, int]]],
includeconfs: bool = False,
include_mempool: bool = True) -> List[Optional[dict]]:
if self.qusfail:
#simulate failure to find the utxo
return [None]

4
jmclient/test/test_psbt_wallet.py

@ -296,8 +296,8 @@ def test_payjoin_workflow(setup_psbt_wallet, payment_amt, wallet_cls_sender,
# don't want to push the tx right now, because of test structure
# (in production code this isn't really needed, we will not
# produce invalid payment transactions).
res = jm_single().bc_interface.testmempoolaccept(bintohex(extracted_tx))
assert res[0]["allowed"], "Payment transaction was rejected from mempool."
assert jm_single().bc_interface.testmempoolaccept(bintohex(extracted_tx)),\
"Payment transaction was rejected from mempool."
# *** STEP 2 ***
# **************

Loading…
Cancel
Save