You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

2037 lines
88 KiB

import asyncio
import base64
import binascii
import json
import os
import sqlite3
import sys
import datetime
from optparse import OptionParser
from numbers import Integral
from collections import Counter, defaultdict
from itertools import islice, chain
from typing import Callable, Optional, Tuple, Union
from jmclient import (get_network, WALLET_IMPLEMENTATIONS, Storage, podle,
jm_single, WalletError, BaseWallet, VolatileStorage, DKGRecoveryStorage,
StoragePasswordError, is_taproot_mode, is_segwit_mode, SegwitLegacyWallet,
LegacyWallet, SegwitWallet, FidelityBondMixin, FidelityBondWatchonlyWallet,
TaprootWallet, is_native_segwit_mode, load_program_config, is_frost_mode,
add_base_options, check_regtest, JMClientProtocolFactory, start_reactor,
FrostWallet, DKGStorage)
from jmclient.blockchaininterface import (BitcoinCoreInterface,
BitcoinCoreNoHistoryInterface)
from jmclient.wallet_service import WalletService
from jmbase.support import (get_password, jmprint, EXIT_FAILURE,
EXIT_ARGERROR, utxo_to_utxostr, hextobin, bintohex,
IndentedHelpFormatterWithNL, dict_factory,
cli_prompt_user_yesno, twisted_sys_exit)
from jmfrost.chilldkg_ref.chilldkg import hostpubkey_gen
from .frost_clients import FROSTClient
from .frost_ipc import FrostIPCServer, FrostIPCClient
from .cryptoengine import (
TYPE_P2PKH, TYPE_P2SH_P2WPKH, TYPE_P2WPKH,
TYPE_SEGWIT_WALLET_FIDELITY_BONDS, TYPE_P2TR, TYPE_P2TR_FROST,
TYPE_TAPROOT_WALLET_FIDELITY_BONDS)
from .output import fmt_utxo
import jmbitcoin as btc
from .descriptor import descsum_create
# used for creating new wallets
DEFAULT_MIXDEPTH = 4
def get_wallettool_parser():
description ="""Use this script to monitor and manage your Joinmarket wallet.
The method is one of the following:
(display) Shows addresses and balances.
(displayall) Shows ALL addresses and balances.
(summary) Shows a summary of mixing depth balances.
(generate) Generates a new wallet.
(changepass) Changes the encryption passphrase of the wallet.
(history) Show all historical transaction details. Requires Bitcoin Core.
(recover) Recovers a wallet from the 12 word recovery seed.
(showutxos) Shows all utxos in the wallet.
(showseed) Shows the wallet recovery seed and hex seed.
(importprivkey) Adds privkeys to this wallet, privkeys are spaces or commas separated.
(dumpprivkey) Export a single private key, specify an hd wallet path.
(signmessage) Sign a message with the private key from an address in
the wallet. Use with -H and specify an HD wallet path for the address.
(signpsbt) Sign PSBT with JoinMarket wallet.
(freeze) Freeze or un-freeze UTXOs. Specify mixdepth with -m.
(gettimelockaddress) Obtain a timelocked address. Argument is locktime value as yyyy-mm. For example `2021-03`.
(addtxoutproof) Add a tx out proof as metadata to a burner transaction. Specify path with
-H and proof which is output of Bitcoin Core\'s RPC call gettxoutproof.
(createwatchonly) Create a watch-only fidelity bond wallet.
(setlabel) Set the label associated with the given address.
(hostpubkey) display host public key
(servefrost) run only as DKG/FROST counterparty
(dkgrecover) Recovers a wallet DKG data from Recovery DKG File
(dkgls) display FrostWallet dkg data
(dkgrm) rm FrostWallet dkg data by session_id list
(recdkgls) display Recovery DKG File data
(recdkgrm) rm Recovery DKG File data by session_id list
(testdkg) run only as test of DKG process
(testfrost) run only as test of FROST signing
"""
parser = OptionParser(usage='usage: %prog [options] [wallet file] [method] [args..]',
description=description, formatter=IndentedHelpFormatterWithNL())
add_base_options(parser)
parser.add_option('-p',
'--privkey',
action='store_true',
dest='showprivkey',
help='print private key along with address, default false')
parser.add_option('-m',
'--mixdepth',
action='store',
type='int',
dest='mixdepth',
help="Mixdepth(s) to use in the wallet. Default: {}"
.format(DEFAULT_MIXDEPTH),
default=None)
parser.add_option('-g',
'--gap-limit',
type="int",
action='store',
dest='gaplimit',
help='gap limit for wallet, default=6',
default=6)
parser.add_option('--csv',
action='store_true',
dest='csv',
default=False,
help=('When using the history method, output as csv'))
parser.add_option('-v', '--verbosity',
action='store',
type='int',
dest='verbosity',
default=1,
help=('History method verbosity, 0 (least) to 6 (most), '
'<=2 batches earnings, even values also list TXIDs'))
parser.add_option('-H',
'--hd',
action='store',
type='str',
dest='hd_path',
help='hd wallet path (e.g. m/0/0/0/000)')
return parser
"""The classes in this module manage representations
of wallet states; but they know nothing about Bitcoin,
so do not attempt to validate addresses, keys, BIP32 or relationships.
A console based output is provided as default, but underlying serializations
can be used by calling classes for UIs.
"""
bip32sep = '/'
def bip32pathparse(path):
if not path.startswith('m'):
return False
elements = path.split(bip32sep)[1:]
ret_elements = []
for e in elements:
if e[-1] == "'": e = e[:-1]
try:
x = int(e)
except:
return False
if not x >= -1:
#-1 is allowed for dummy branches for imported keys
return False
ret_elements.append(x)
return ret_elements
"""
WalletView* classes manage wallet representations.
"""
class WalletViewBase(object):
def __init__(self, wallet_path_repr, children=None, serclass=str,
custom_separator=None):
self.wallet_path_repr = wallet_path_repr
self.children = children
self.serclass = serclass
self.separator = custom_separator if custom_separator else "\t"
def get_balance(self, include_unconf=True):
if not include_unconf:
raise NotImplementedError("Separate conf/unconf balances not impl.")
return sum([x.get_balance() for x in self.children])
def get_available_balance(self):
return sum([x.get_available_balance() for x in self.children])
def get_balances(self, include_unconf=True):
return (self.get_balance(include_unconf=include_unconf),
self.get_available_balance())
def get_fmt_balance(self, include_unconf=True):
total_balance, available_balance = self.get_balances(
include_unconf=include_unconf)
if available_balance != total_balance:
return "{0:.08f} ({1:.08f})".format(available_balance, total_balance)
else:
return "{0:.08f}".format(total_balance)
def get_fmt_balance_json(self, include_unconf=True):
total_balance, available_balance = self.get_balances(
include_unconf=include_unconf)
return {self.balance_key_name: "{0:.08f}".format(total_balance),
"available_balance": "{0:.08f}".format(available_balance)}
class WalletViewEntry(WalletViewBase):
balance_key_name = "amount"
def __init__(self, wallet_path_repr, account, address_type, aindex, addr, amounts,
status = 'new', serclass=str, priv=None, custom_separator=None,
label=None):
super().__init__(wallet_path_repr, serclass=serclass,
custom_separator=custom_separator)
self.account = account
assert address_type in [SegwitWallet.BIP32_EXT_ID,
SegwitWallet.BIP32_INT_ID, -1, FidelityBondMixin.BIP32_TIMELOCK_ID,
FidelityBondMixin.BIP32_BURN_ID]
self.address_type = address_type
assert isinstance(aindex, Integral)
assert aindex >= 0
self.aindex = aindex
self.address = addr
self.unconfirmed_amount, self.confirmed_amount = amounts
#note no validation here
self.private_key = priv
self.status = status
self.label = label
def is_locked(self):
return "[LOCKED]" in self.status
def is_frozen(self):
return "[FROZEN]" in self.status
def get_balance(self, include_unconf=True):
"""Overwrites base class since no children
"""
if not include_unconf:
raise NotImplementedError("Separate conf/unconf balances not impl.")
return self.unconfirmed_amount/1e8
def get_available_balance(self, include_unconf=True):
return 0 if self.is_locked() or self.is_frozen() else self.get_balance()
def serialize(self):
left = self.serialize_wallet_position()
addr = self.serialize_address()
amounts = self.serialize_amounts()
status = self.serialize_status()
label = self.serialize_label()
extradata = self.serialize_extra_data()
return self.serclass(self.separator.join([
left, addr, amounts, status, label, extradata]))
def serialize_json(self):
json_serialized = {"hd_path": self.wallet_path_repr,
"address": self.serialize_address(),
"status": self.serialize_status(),
"label": self.serialize_label(),
"extradata": self.serialize_extra_data()}
json_serialized.update(self.get_fmt_balance_json())
return json_serialized
def serialize_wallet_position(self):
return self.wallet_path_repr.ljust(20)
def serialize_address(self):
return self.serclass(self.address)
def serialize_amounts(self, unconf_separate=False, denom="BTC"):
if denom != "BTC":
raise NotImplementedError("Altern. denominations not yet implemented.")
if unconf_separate:
raise NotImplementedError("Separate handling of unconfirmed funds "
"not yet implemented.")
return self.serclass("{0:.08f}".format(self.unconfirmed_amount/1e8))
def serialize_status(self):
return self.serclass(self.status)
def serialize_label(self):
if self.label:
return self.serclass(self.label)
else:
return self.serclass("")
def serialize_extra_data(self):
if self.private_key:
return self.serclass(self.private_key)
else:
return self.serclass("")
class WalletViewEntryBurnOutput(WalletViewEntry):
# balance in burn outputs shouldnt be counted
# towards the total balance
def get_balance(self, include_unconf=True):
return 0
class WalletViewBranch(WalletViewBase):
balance_key_name = "balance"
def __init__(self, wallet_path_repr, account, address_type, branchentries=None,
xpub=None, serclass=str, custom_separator=None):
super().__init__(wallet_path_repr, children=branchentries,
serclass=serclass, custom_separator=custom_separator)
self.account = account
assert address_type in [SegwitWallet.BIP32_EXT_ID,
SegwitWallet.BIP32_INT_ID, -1, FidelityBondMixin.BIP32_TIMELOCK_ID,
FidelityBondMixin.BIP32_BURN_ID]
self.address_type = address_type
if xpub:
assert xpub.startswith('xpub') or xpub.startswith('tpub')
self.xpub = xpub if xpub else ""
self.branchentries = branchentries
def serialize(self, entryseparator="\n", summarize=False):
if summarize:
return ""
lines = [self.serialize_branch_header()]
for we in self.branchentries:
lines.append(we.serialize())
footer = "Balance:" + self.separator + self.get_fmt_balance()
lines.append(footer)
return self.serclass(entryseparator.join(lines))
def serialize_json(self, summarize=False):
if summarize:
return {}
json_serialized = {"branch": self.serialize_branch_header(),
"entries": [x.serialize_json() for x in self.branchentries]}
json_serialized.update(self.get_fmt_balance_json())
return json_serialized
def serialize_branch_header(self):
start = "external addresses" if self.address_type == 0 else "internal addresses"
if self.address_type == -1:
start = "Imported keys"
return self.serclass(self.separator.join([start, self.wallet_path_repr,
self.xpub]))
class WalletViewAccount(WalletViewBase):
balance_key_name = "account_balance"
def __init__(self, wallet_path_repr, account, branches=None, account_name="mixdepth",
serclass=str, custom_separator=None, xpub=None):
super().__init__(wallet_path_repr, children=branches, serclass=serclass,
custom_separator=custom_separator)
self.account = account
self.account_name = account_name
self.xpub = xpub
if branches:
assert len(branches) in [2, 3, 4, 5] #3 if imported keys, 4 if fidelity bonds
#5 if all those plus imported
assert all([isinstance(x, WalletViewBranch) for x in branches])
self.branches = branches
def serialize(self, entryseparator="\n", summarize=False):
header = self.account_name + self.separator + str(self.account)
if self.xpub:
header = header + self.separator + self.xpub
footer = "Balance for mixdepth " + str(
self.account) + ":" + self.separator + self.get_fmt_balance()
if summarize:
return self.serclass(entryseparator.join(
[x.serialize("", summarize=True) for x in self.branches] + [footer]))
else:
return self.serclass(entryseparator.join([header] + [
x.serialize(entryseparator) for x in self.branches] + [footer]))
def serialize_json(self, summarize=False):
json_serialized = {"account": str(self.account)}
json_serialized.update(self.get_fmt_balance_json())
if summarize:
return json_serialized
json_serialized["branches"] = [x.serialize_json() for x in self.branches]
return json_serialized
class WalletView(WalletViewBase):
balance_key_name = "total_balance"
def __init__(self, wallet_path_repr, accounts, wallet_name="JM wallet",
serclass=str, custom_separator=None):
super().__init__(wallet_path_repr, children=accounts, serclass=serclass,
custom_separator=custom_separator)
self.wallet_name = wallet_name
assert all([isinstance(x, WalletViewAccount) for x in accounts])
self.accounts = accounts
def serialize(self, entryseparator="\n", summarize=False):
header = self.wallet_name
if len(self.accounts) > 1:
footer = "Total balance:" + self.separator + \
self.get_fmt_balance()
else:
footer = ""
if summarize:
return self.serclass(entryseparator.join([header] + [
x.serialize("", summarize=True) for x in self.accounts] + [footer]))
else:
return self.serclass(entryseparator.join([header] + [
x.serialize(entryseparator, summarize=False) for x in self.accounts] + [footer]))
def serialize_json(self, summarize=False):
json_serialized = {"wallet_name": self.wallet_name,
"accounts": [x.serialize_json(summarize=summarize) for x in self.accounts]}
json_serialized.update(self.get_fmt_balance_json())
return json_serialized
def get_tx_info(txid: bytes, tx_cache: Optional[dict] = None) -> Tuple[
bool, int, int, dict, int, btc.CTransaction]:
"""
Retrieve some basic information about the given transaction.
:param txid: txid as binary
:param tx_cache: optional cache (dictionary) for get_transaction results
:return: tuple
is_coinjoin: bool
cj_amount: int, only useful if is_coinjoin==True
cj_n: int, number of cj participants, only useful if is_coinjoin==True
output_script_values: {script: value} dict including all outputs
blocktime: int, blocktime this tx was mined
txd: deserialized transaction object (hex-encoded data)
"""
if tx_cache is not None and txid in tx_cache:
rpctx, rpctx_deser = tx_cache[txid]
else:
rpctx = jm_single().bc_interface.get_transaction(txid)
rpctx_deser = jm_single().bc_interface.get_deser_from_gettransaction(
rpctx)
if tx_cache is not None:
tx_cache[txid] = (rpctx, rpctx_deser)
output_script_values = {x.scriptPubKey: x.nValue for x in rpctx_deser.vout}
value_freq_list = sorted(
Counter(output_script_values.values()).most_common(),
key=lambda x: -x[1])
non_cj_freq = (0 if len(value_freq_list) == 1 else
sum(next(islice(zip(*value_freq_list[1:]), 1, None))))
is_coinjoin = (value_freq_list[0][1] > 1 and
value_freq_list[0][1] in
[non_cj_freq, non_cj_freq + 1])
cj_amount = value_freq_list[0][0]
cj_n = value_freq_list[0][1]
return is_coinjoin, cj_amount, cj_n, output_script_values,\
rpctx.get('blocktime', 0), rpctx_deser
async def get_imported_privkey_branch(wallet_service, m, showprivkey):
entries = []
balance_by_script = defaultdict(int)
_utxos = await wallet_service.get_utxos_at_mixdepth(
m, include_disabled=True)
for data in _utxos.values():
balance_by_script[data['script']] += data['value']
for path in wallet_service.yield_imported_paths(m):
addr = await wallet_service.get_address_from_path(path)
script = await wallet_service.get_script_from_path(path)
balance = balance_by_script.get(script, 0)
status = ('used' if balance else 'empty')
if showprivkey:
wip_privkey = wallet_service.get_wif_path(path)
else:
wip_privkey = ''
entries.append(WalletViewEntry(wallet_service.get_path_repr(path), m, -1,
0, addr, [balance, balance],
status=status, priv=wip_privkey))
if entries:
return WalletViewBranch("m/0", m, -1, branchentries=entries)
return None
async def wallet_showutxos(wallet_service: WalletService, showprivkey: bool,
limit_mixdepth: Optional[int] = None) -> str:
unsp = {}
max_tries = jm_single().config.getint("POLICY", "taker_utxo_retries")
_utxos = await wallet_service.get_utxos_by_mixdepth(
include_disabled=True, includeconfs=True,
limit_mixdepth=limit_mixdepth)
for md in _utxos:
(enabled, disabled) = await get_utxos_enabled_disabled(wallet_service,
md)
for u, av in _utxos[md].items():
success, us = utxo_to_utxostr(u)
assert success
key = wallet_service._get_key_from_path(av["path"])[0]
if FidelityBondMixin.is_timelocked_path(av["path"]):
key, locktime = key
else:
locktime = None
tries = podle.get_podle_tries(u, key, max_tries)
tries_remaining = max(0, max_tries - tries)
mixdepth = wallet_service.wallet.get_details(av['path'])[0]
unsp[us] = {'address': av['address'],
'path': wallet_service.get_path_repr(av['path']),
'label': av['label'] if av['label'] else "",
'value': av['value'],
'tries': tries, 'tries_remaining': tries_remaining,
'external': False,
'mixdepth': mixdepth,
'confirmations': av['confs'],
'frozen': u in disabled}
if showprivkey:
unsp[us]['privkey'] = wallet_service.get_wif_path(av['path'])
if locktime:
unsp[us]["locktime"] = str(
datetime.datetime.fromtimestamp(0, datetime.UTC) +
datetime.timedelta(seconds=locktime))
used_commitments, external_commitments = podle.get_podle_commitments()
for u, ec in external_commitments.items():
success, us = utxo_to_utxostr(u)
assert success
tries = podle.get_podle_tries(utxo=u, max_tries=max_tries,
external=True)
tries_remaining = max(0, max_tries - tries)
unsp[us] = {'tries': tries, 'tries_remaining': tries_remaining,
'external': True}
return json.dumps(unsp, indent=4, ensure_ascii=False)
def get_utxo_status_string(utxos, utxos_enabled, path):
has_frozen_utxo = False
has_pending_utxo = False
for utxo, utxodata in utxos.items():
if path == utxodata["path"]:
if not utxo in utxos_enabled:
has_frozen_utxo = True
if utxodata['confs'] <= 0:
has_pending_utxo = True
utxo_status_string = ""
if has_frozen_utxo:
utxo_status_string += ' [FROZEN]'
if has_pending_utxo:
utxo_status_string += ' [PENDING]'
return utxo_status_string
async def wallet_display(wallet_service, showprivkey, displayall=False,
serialized=True, summarized=False, mixdepth=None, jsonified=False):
"""build the walletview object,
then return its serialization directly if serialized,
else return the WalletView object.
"""
def get_addr_status(addr_path, utxos, utxos_enabled, is_new, is_internal):
addr_balance = 0
status = []
for utxo, utxodata in utxos.items():
if addr_path != utxodata['path']:
continue
addr_balance += utxodata['value']
#TODO it is a failure of abstraction here that
# the bitcoin core interface is used directly
#the function should either be removed or added to bci
#or possibly add some kind of `gettransaction` function
# to bci
if not isinstance(jm_single().bc_interface, BitcoinCoreNoHistoryInterface):
is_coinjoin, cj_amount, cj_n = \
get_tx_info(utxo[0])[:3]
if is_coinjoin and utxodata['value'] == cj_amount:
status.append('cj-out')
elif is_coinjoin:
status.append('change-out')
elif is_internal:
status.append('non-cj-change')
else:
status.append('deposit')
out_status = 'new' if is_new else 'used'
if len(status) > 1:
out_status = 'reused'
elif len(status) == 1:
out_status = status[0]
out_status += get_utxo_status_string(utxos, utxos_enabled, addr_path)
return addr_balance, out_status
acctlist = []
utxos = await wallet_service.get_utxos_by_mixdepth(
include_disabled=True, includeconfs=True)
utxos_enabled = await wallet_service.get_utxos_by_mixdepth()
if mixdepth:
md_range = range(mixdepth, mixdepth + 1)
else:
md_range = range(wallet_service.mixdepth + 1)
for m in md_range:
branchlist = []
for address_type in [BaseWallet.ADDRESS_TYPE_EXTERNAL,
BaseWallet.ADDRESS_TYPE_INTERNAL]:
entrylist = []
if address_type == BaseWallet.ADDRESS_TYPE_EXTERNAL:
# users would only want to hand out the xpub for externals
xpub_key = wallet_service.get_bip32_pub_export(m, address_type)
else:
xpub_key = ""
unused_index = wallet_service.get_next_unused_index(m, address_type)
gap_addrs = []
for k in range(unused_index + wallet_service.gap_limit):
path = wallet_service.get_path(m, address_type, k)
addr = await wallet_service.get_address_from_path(path)
if k >= unused_index:
if isinstance(wallet_service.wallet,
(TaprootWallet, FrostWallet)):
P = await wallet_service.get_pubkey(m, address_type, k)
desc = f'tr({bytes(P)[1:].hex()})'
gap_addrs.append(f'{descsum_create(desc)}')
else:
gap_addrs.append(addr)
label = wallet_service.get_address_label(addr)
balance, status = get_addr_status(
path, utxos[m], utxos_enabled[m], k >= unused_index, address_type)
if showprivkey:
privkey = wallet_service.get_wif_path(path)
else:
privkey = ''
if (displayall or balance > 0 or
(status == 'new' and address_type == 0)):
entrylist.append(WalletViewEntry(
wallet_service.get_path_repr(path), m, address_type, k, addr,
[balance, balance], priv=privkey, status=status, label=label))
# ensure that we never display un-imported addresses (this will generally duplicate
# the import of each new address gap limit times, but one importmulti call
# per mixdepth is cheap enough.
# This only applies to the external branch, because it only applies to addresses
# displayed for user deposit.
# It also does not apply to fidelity bond addresses which are created manually.
if address_type == BaseWallet.ADDRESS_TYPE_EXTERNAL and wallet_service.bci is not None:
if isinstance(wallet_service.wallet,
(TaprootWallet, FrostWallet)):
wallet_service.bci.import_descriptors(
gap_addrs, wallet_service.get_wallet_name())
else:
wallet_service.bci.import_addresses(
gap_addrs, wallet_service.get_wallet_name())
wallet_service.set_next_index(m, address_type, unused_index)
path = wallet_service.get_path_repr(wallet_service.get_path(m, address_type))
branchlist.append(WalletViewBranch(path, m, address_type, entrylist,
xpub=xpub_key))
if m == FidelityBondMixin.FIDELITY_BOND_MIXDEPTH and \
isinstance(wallet_service.wallet, FidelityBondMixin):
address_type = FidelityBondMixin.BIP32_TIMELOCK_ID
entrylist = []
for timenumber in range(FidelityBondMixin.TIMENUMBER_COUNT):
path = wallet_service.get_path(m, address_type, timenumber)
addr = await wallet_service.get_address_from_path(path)
label = wallet_service.get_address_label(addr)
timelock = (datetime.datetime.fromtimestamp(0, datetime.UTC) +
datetime.timedelta(seconds=path[-1]))
balance = sum([utxodata["value"] for _, utxodata in
utxos[m].items() if path == utxodata["path"]])
status = (timelock.strftime("%Y-%m-%d") + " [" + (
"LOCKED" if datetime.datetime.now(datetime.UTC) < timelock
else "UNLOCKED") + "]")
status += get_utxo_status_string(utxos[m], utxos_enabled[m], path)
privkey = ""
if showprivkey:
privkey = wallet_service.get_wif_path(path)
if displayall or balance > 0:
entrylist.append(WalletViewEntry(
wallet_service.get_path_repr(path), m, address_type, k,
addr, [balance, balance], priv=privkey, status=status,
label=label))
xpub_key = wallet_service.get_bip32_pub_export(m, address_type)
path = wallet_service.get_path_repr(wallet_service.get_path(m, address_type))
branchlist.append(WalletViewBranch(path, m, address_type, entrylist,
xpub=xpub_key))
entrylist = []
address_type = FidelityBondMixin.BIP32_BURN_ID
unused_index = wallet_service.get_next_unused_index(m, address_type)
burner_outputs = wallet_service.wallet.get_burner_outputs()
wallet_service.set_next_index(m, address_type, unused_index +
wallet_service.wallet.gap_limit, force=True)
for k in range(unused_index + wallet_service.wallet.gap_limit):
path = wallet_service.get_path(m, address_type, k)
path_repr = wallet_service.get_path_repr(path)
path_repr_b = path_repr.encode()
privkey, engine = wallet_service._get_key_from_path(path)
pubkey = engine.privkey_to_pubkey(privkey)
pubkeyhash = btc.Hash160(pubkey)
output = "BURN-" + binascii.hexlify(pubkeyhash).decode()
balance = 0
status = "no transaction"
if path_repr_b in burner_outputs:
txhex, blockheight, merkle_branch, blockindex = burner_outputs[path_repr_b]
txd = btc.CMutableTransaction.deserialize(txhex)
assert len(txd.vout) == 1
balance = txd.vout[0].nValue
script = txd.vout[0].scriptPubKey
assert script[0] == 0x6a #OP_RETURN
tx_pubkeyhash = script[2:]
assert tx_pubkeyhash == pubkeyhash
status = bintohex(txd.GetTxid()) + (" [NO MERKLE PROOF]" if
merkle_branch == FidelityBondMixin.MERKLE_BRANCH_UNAVAILABLE else "")
privkey = (wallet_service.get_wif_path(path) if showprivkey else "")
if displayall or balance > 0:
entrylist.append(WalletViewEntryBurnOutput(path_repr, m,
address_type, k, output, [balance, balance],
priv=privkey, status=status))
wallet_service.set_next_index(m, address_type, unused_index)
xpub_key = wallet_service.get_bip32_pub_export(m, address_type)
path = wallet_service.get_path_repr(wallet_service.get_path(m, address_type))
branchlist.append(WalletViewBranch(path, m, address_type, entrylist,
xpub=xpub_key))
ipb = await get_imported_privkey_branch(wallet_service, m, showprivkey)
if ipb:
branchlist.append(ipb)
#get the xpub key of the whole account
xpub_account = wallet_service.get_bip32_pub_export(mixdepth=m)
path = wallet_service.get_path_repr(wallet_service.get_path(m))
acctlist.append(WalletViewAccount(path, m, branchlist,
xpub=xpub_account))
path = wallet_service.get_path_repr(wallet_service.get_path())
walletview = WalletView(path, acctlist)
if serialized:
if jsonified:
return walletview.serialize_json(summarize=summarized)
else:
return walletview.serialize(summarize=summarized)
else:
return walletview
def cli_get_wallet_passphrase_check() -> Optional[str]:
password = get_password("Enter new passphrase to encrypt wallet: ")
password2 = get_password("Reenter new passphrase to encrypt wallet: ")
if password != password2:
jmprint('ERROR. Passwords did not match', "error")
return None
return password
def cli_get_wallet_file_name(defaultname: str = "wallet.jmdat") -> str:
return input(f'Input wallet file name (default: {defaultname}): ')
def cli_display_user_words(words: str, mnemonic_extension: str) -> None:
text = 'Write down this wallet recovery mnemonic\n\n' + words +'\n'
if mnemonic_extension:
text += '\nAnd this mnemonic extension: ' + mnemonic_extension.decode(
'utf-8') + '\n'
jmprint(text, "important")
def cli_user_mnemonic_entry() -> Tuple[Optional[str], Optional[str]]:
mnemonic_phrase = input("Input mnemonic recovery phrase: ")
mnemonic_extension = input("Input mnemonic extension, leave blank if there isnt one: ")
if len(mnemonic_extension.strip()) == 0:
mnemonic_extension = None
return (mnemonic_phrase, mnemonic_extension)
def cli_do_use_mnemonic_extension() -> bool:
if cli_prompt_user_yesno("Would you like to use a two-factor mnemonic "
"recovery phrase? "
"Write 'n' if you don't know what this is"):
return True
else:
jmprint("Not using mnemonic extension", "info")
return False #no mnemonic extension
def cli_get_mnemonic_extension() -> str:
jmprint("Note: This will be stored in a reversible way. Do not reuse!",
"info")
return input("Enter mnemonic extension: ")
def cli_do_support_fidelity_bonds() -> bool:
if cli_prompt_user_yesno("Would you like this wallet to support "
"fidelity bonds? "
"Write 'n' if you don't know what this is"):
return True
else:
jmprint("Not supporting fidelity bonds", "info")
return False
async def wallet_generate_recover_bip39(
method: str,
walletspath: str,
default_wallet_name: str,
display_seed_callback: Callable[[str, str], None],
enter_seed_callback: Optional[Callable[[], Tuple[Optional[str], Optional[str]]]],
enter_wallet_password_callback: Callable[[], str],
enter_wallet_file_name_callback: Callable[[], str],
enter_if_use_seed_extension: Optional[Callable[[], bool]],
enter_seed_extension_callback: Optional[Callable[[], Optional[str]]],
enter_do_support_fidelity_bonds: Callable[[], bool],
mixdepth: int = DEFAULT_MIXDEPTH) -> bool:
entropy = None
mnemonic_extension = None
if method == "generate":
cb_res = enter_if_use_seed_extension()
if asyncio.iscoroutine(cb_res):
cb_res = await cb_res
if cb_res:
mnemonic_extension = enter_seed_extension_callback()
if asyncio.iscoroutine(mnemonic_extension):
mnemonic_extension = await mnemonic_extension
if not mnemonic_extension:
return False
elif method == 'recover':
if enter_seed_callback:
cb_res = enter_seed_callback()
if asyncio.iscoroutine(cb_res):
cb_res = await cb_res
words, mnemonic_extension = cb_res
words = words and words.strip()
if not words:
return False
mnemonic_extension = mnemonic_extension and mnemonic_extension.strip()
try:
entropy = SegwitLegacyWallet.entropy_from_mnemonic(words)
except WalletError:
return False
else:
raise Exception("unknown method for wallet creation: '{}'"
.format(method))
password = enter_wallet_password_callback()
if asyncio.iscoroutine(password):
password = await password
if not password:
if get_network() == 'mainnet':
return False
password = None
wallet_name = enter_wallet_file_name_callback()
if asyncio.iscoroutine(wallet_name):
wallet_name = await wallet_name
if wallet_name == "cancelled":
# currently used only by Qt, because user has option
# to click cancel in dialog.
return False
if not wallet_name:
wallet_name = default_wallet_name
wallet_path = os.path.join(walletspath, wallet_name)
if is_frost_mode():
support_fidelity_bonds = False
else:
support_fidelity_bonds = enter_do_support_fidelity_bonds()
if asyncio.iscoroutine(support_fidelity_bonds):
support_fidelity_bonds = await support_fidelity_bonds
wallet_cls = get_wallet_cls(get_configured_wallet_type(support_fidelity_bonds))
wallet = await create_wallet(
wallet_path, password, mixdepth, wallet_cls, entropy=entropy,
entropy_extension=mnemonic_extension)
mnemonic, mnext = wallet.get_mnemonic_words()
if display_seed_callback:
cb_res = display_seed_callback(mnemonic, mnext or '')
if asyncio.iscoroutine(cb_res):
cb_res = await cb_res
wallet.close()
return True
async def wallet_generate_recover(method, walletspath,
default_wallet_name='wallet.jmdat',
mixdepth=DEFAULT_MIXDEPTH):
if is_frost_mode():
return await wallet_generate_recover_bip39(
method, walletspath,
default_wallet_name, cli_display_user_words, cli_user_mnemonic_entry,
cli_get_wallet_passphrase_check, cli_get_wallet_file_name,
cli_do_use_mnemonic_extension, cli_get_mnemonic_extension,
cli_do_support_fidelity_bonds, mixdepth=mixdepth)
elif is_taproot_mode():
return await wallet_generate_recover_bip39(
method, walletspath,
default_wallet_name, cli_display_user_words, cli_user_mnemonic_entry,
cli_get_wallet_passphrase_check, cli_get_wallet_file_name,
cli_do_use_mnemonic_extension, cli_get_mnemonic_extension,
cli_do_support_fidelity_bonds, mixdepth=mixdepth)
elif is_segwit_mode():
#Here using default callbacks for scripts (not used in Qt)
return await wallet_generate_recover_bip39(
method, walletspath,
default_wallet_name, cli_display_user_words, cli_user_mnemonic_entry,
cli_get_wallet_passphrase_check, cli_get_wallet_file_name,
cli_do_use_mnemonic_extension, cli_get_mnemonic_extension,
cli_do_support_fidelity_bonds, mixdepth=mixdepth)
entropy = None
if method == 'recover':
seed = input("Input 12 word recovery seed: ")
try:
entropy = LegacyWallet.entropy_from_mnemonic(seed)
except WalletError as e:
jmprint("Unable to restore seed: {}".format(e.message), "error")
return ""
elif method != 'generate':
raise Exception("unknown method for wallet creation: '{}'"
.format(method))
password = cli_get_wallet_passphrase_check()
if not password:
return ""
wallet_name = cli_get_wallet_file_name()
if not wallet_name:
wallet_name = default_wallet_name
wallet_path = os.path.join(walletspath, wallet_name)
wallet = await create_wallet(wallet_path, password, mixdepth,
wallet_cls=LegacyWallet, entropy=entropy)
jmprint("Write down and safely store this wallet recovery seed\n\n{}\n"
.format(wallet.get_mnemonic_words()[0]), "important")
wallet.close()
return True
async def wallet_change_passphrase(
walletservice,
enter_wallet_passphrase_callback=cli_get_wallet_passphrase_check
):
passphrase = enter_wallet_passphrase_callback()
if asyncio.iscoroutine(passphrase):
passphrase = await passphrase
if passphrase:
walletservice.change_wallet_passphrase(passphrase)
return True
async def wallet_fetch_history(wallet, options):
# sort txes in a db because python can be really bad with large lists
con = sqlite3.connect(":memory:")
con.row_factory = dict_factory
tx_db = con.cursor()
tx_db.execute("CREATE TABLE transactions(txid TEXT, "
"blockhash TEXT, blocktime INTEGER, conflicts INTEGER);")
jm_single().debug_silence[0] = True
wallet_name = wallet.get_wallet_name()
buf = range(1000)
t = 0
while len(buf) == 1000:
buf = jm_single().bc_interface.list_transactions(1000, t)
t += len(buf)
# confirmed
tx_data = ((tx['txid'], tx['blockhash'], tx['blocktime'], 0) for tx
in buf if 'txid' in tx and 'blockhash' in tx and 'blocktime'
in tx)
tx_db.executemany('INSERT INTO transactions VALUES(?, ?, ?, ?);',
tx_data)
# unconfirmed
uc_tx_data = ((tx['txid'], None, None, len(tx['walletconflicts'])) for
tx in buf if 'txid' in tx and 'blockhash' not in tx and
'blocktme' not in tx)
tx_db.executemany('INSERT INTO transactions VALUES(?, ?, ?, ?);',
uc_tx_data)
txes = tx_db.execute(
'SELECT DISTINCT txid, blockhash, blocktime '
'FROM transactions '
'WHERE (blockhash IS NOT NULL AND blocktime IS NOT NULL) OR conflicts = 0 '
'ORDER BY blocktime').fetchall()
wallet_script_set = set([await wallet.get_script_from_path(p)
for p in wallet.yield_known_paths()])
def s():
return ',' if options.csv else ' '
def sat_to_str_na(sat):
if sat == 0:
return "N/A "
else:
return btc.sat_to_str(sat)
def skip_n1(v):
return '% 2s'%(str(v)) if v != -1 else ' #'
def skip_n1_btc(v):
return btc.sat_to_str(v) if v != -1 else '#' + ' '*10
def print_row(index, time, tx_type, amount, delta, balance, cj_n,
total_fees, utxo_count, mixdepth_src, mixdepth_dst, txid):
data = [
index,
datetime.datetime.fromtimestamp(time).strftime("%Y-%m-%d %H:%M"),
tx_type, btc.sat_to_str(abs(amount)),
btc.sat_to_str_p(delta),
btc.sat_to_str(balance),
skip_n1(cj_n),
sat_to_str_na(total_fees),
'% 3d' % utxo_count,
skip_n1(mixdepth_src),
skip_n1(mixdepth_dst)
]
if options.verbosity % 2 == 0: data += [txid]
jmprint(s().join(map('"{}"'.format, data)), "info")
field_names = ['tx#', 'timestamp', 'type', 'amount/btc',
'balance-change/btc', 'balance/btc', 'coinjoin-n', 'total-fees',
'utxo-count', 'mixdepth-from', 'mixdepth-to']
if options.verbosity % 2 == 0: field_names += ['txid']
if options.csv:
#jmprint('Bumping verbosity level to 4 due to --csv flag', "debug")
options.verbosity = 4
if options.verbosity > 0: jmprint(s().join(field_names), "info")
if options.verbosity <= 2: cj_batch = [0]*8 + [[]]*2
balance = 0
unconfirmed_balance = 0
utxo_count = 0
unconfirmed_utxo_count = 0
deposits = []
deposit_times = []
tx_number = 0
tx_cache = {}
for tx in txes:
is_coinjoin, cj_amount, cj_n, output_script_values, blocktime, txd =\
get_tx_info(hextobin(tx['txid']), tx_cache=tx_cache)
# unconfirmed transactions don't have blocktime, get_tx_info() returns
# 0 in that case
is_confirmed = (blocktime != 0)
our_output_scripts = wallet_script_set.intersection(
output_script_values.keys())
rpc_inputs = []
for ins in txd.vin:
if ins.prevout.hash[::-1] in tx_cache:
wallet_tx, wallet_tx_deser = tx_cache[ins.prevout.hash[::-1]]
else:
wallet_tx = jm_single().bc_interface.get_transaction(
ins.prevout.hash[::-1])
if wallet_tx:
wallet_tx_deser = jm_single(
).bc_interface.get_deser_from_gettransaction(
wallet_tx)
tx_cache[ins.prevout.hash[::-1]] = (wallet_tx,
wallet_tx_deser)
else:
tx_cache[ins.prevout.hash[::-1]] = (None, None)
if wallet_tx is None:
continue
inp = wallet_tx_deser.vout[ins.prevout.n]
input_dict = {"script": inp.scriptPubKey, "value": inp.nValue}
rpc_inputs.append(input_dict)
rpc_input_scripts = set(ind['script'] for ind in rpc_inputs)
our_input_scripts = wallet_script_set.intersection(rpc_input_scripts)
our_input_values = [
ind['value'] for ind in rpc_inputs
if ind['script'] in our_input_scripts]
our_input_value = sum(our_input_values)
utxos_consumed = len(our_input_values)
tx_type = None
amount = 0
delta_balance = 0
fees = 0
mixdepth_src = -1
mixdepth_dst = -1
#TODO this seems to assume all the input addresses are from the same
# mixdepth, which might not be true
if len(our_input_scripts) == 0 and len(our_output_scripts) > 0:
#payment to us
amount = sum([output_script_values[a] for a in our_output_scripts])
tx_type = 'deposit '
cj_n = -1
delta_balance = amount
mixdepth_dst = tuple(wallet.get_script_mixdepth(a)
for a in our_output_scripts)
if len(mixdepth_dst) == 1:
mixdepth_dst = mixdepth_dst[0]
elif len(our_input_scripts) == 0 and len(our_output_scripts) == 0:
continue # skip those that don't belong to our wallet
elif len(our_input_scripts) > 0 and len(our_output_scripts) == 0:
# we swept coins elsewhere
if is_coinjoin:
tx_type = 'cj sweepout'
amount = cj_amount
fees = our_input_value - cj_amount
else:
tx_type = 'sweep out '
amount = sum([v for v in output_script_values.values()])
fees = our_input_value - amount
delta_balance = -our_input_value
mixdepth_src = wallet.get_script_mixdepth(list(our_input_scripts)[0])
elif len(our_input_scripts) > 0 and len(our_output_scripts) == 1:
our_output_script = list(our_output_scripts)[0]
our_output_value = output_script_values[our_output_script]
fees = our_input_value - our_output_value - cj_amount
if is_coinjoin:
amount = cj_amount
if our_output_value == cj_amount:
#a sweep coinjoin with no change address back to our wallet
tx_type = 'cj intsweep'
mixdepth_dst = wallet.get_script_mixdepth(our_output_script)
fees = 0
else:
#payment elsewhere with our change address getting the remaining
#our_output_value is the change output
tx_type = 'cj withdraw'
else:
tx_type = 'withdraw '
#TODO does tx_fee go here? not my_tx_fee only?
amount = our_input_value - our_output_value
cj_n = -1
fees = 0
delta_balance = our_output_value - our_input_value
mixdepth_src = wallet.get_script_mixdepth(list(our_input_scripts)[0])
elif len(our_input_scripts) > 0 and len(our_output_scripts) == 2 and is_coinjoin:
out_value = sum([output_script_values[a] for a in our_output_scripts])
amount = cj_amount
delta_balance = out_value - our_input_value
mixdepth_src = wallet.get_script_mixdepth(list(our_input_scripts)[0])
tx_type = 'cj internal'
cj_script = list(set([a for a, v in output_script_values.items()
if v == cj_amount]).intersection(our_output_scripts))[0]
mixdepth_dst = wallet.get_script_mixdepth(cj_script)
elif len(our_input_scripts) > 0 and len(our_output_scripts) == len(output_script_values):
out_value = sum([output_script_values[a] for a in our_output_scripts])
amount = sum([output_script_values[a] for a in our_output_scripts])
delta_balance = out_value - our_input_value
mixdepth_src = wallet.get_script_mixdepth(list(our_input_scripts)[0])
tx_type = 'internal '
mixdepth_dst = wallet.get_script_mixdepth(list(our_output_scripts)[0])
else:
tx_type = 'unknown '
out_value = sum([output_script_values[a] for a in our_output_scripts])
amount = out_value
delta_balance = out_value - our_input_value
mixdepth_src = wallet.get_script_mixdepth(
list(our_input_scripts)[0])
mixdepth_dst = wallet.get_script_mixdepth(
list(our_output_scripts)[0])
#jmprint('our utxos: ' + str(len(our_input_scripts)) \
# + ' in, ' + str(len(our_output_scripts)) + ' out')
if is_confirmed:
balance += delta_balance
utxo_count += (len(our_output_scripts) - utxos_consumed)
index = '%4d'%(tx_number)
tx_number += 1
if options.verbosity > 0:
if options.verbosity <= 2:
n = cj_batch[0]
if tx_type == 'cj internal':
cj_batch[0] += 1
cj_batch[1] += blocktime
cj_batch[2] += amount
cj_batch[3] += delta_balance
cj_batch[4] = balance
cj_batch[5] += cj_n
cj_batch[6] += fees
cj_batch[7] += utxo_count
cj_batch[8] += [mixdepth_src]
cj_batch[9] += [mixdepth_dst]
else:
if n > 0:
# print the previously-accumulated batch
print_row('N='+"%2d"%n, cj_batch[1]/n, 'cj batch ',
cj_batch[2], cj_batch[3], cj_batch[4],
cj_batch[5]/n, cj_batch[6], cj_batch[7]/n,
min(cj_batch[8]), max(cj_batch[9]), '...')
cj_batch = [0]*8 + [[]]*2 # reset the batch collector
# print batch terminating row
print_row(index, blocktime, tx_type, amount,
delta_balance, balance, cj_n, fees, utxo_count,
mixdepth_src, mixdepth_dst, tx['txid'])
elif options.verbosity >= 3:
print_row(index, blocktime, tx_type, amount,
delta_balance, balance, cj_n, fees, utxo_count,
mixdepth_src, mixdepth_dst, tx['txid'])
if tx_type != 'cj internal':
deposits.append(delta_balance)
deposit_times.append(blocktime)
else:
unconfirmed_balance += delta_balance
utxo_count += (len(our_output_scripts) - utxos_consumed)
# we could have a leftover batch!
if options.verbosity <= 2:
n = cj_batch[0]
if n > 0:
print_row('N='+"%2d"%n, cj_batch[1]/n, 'cj batch ', cj_batch[2],
cj_batch[3], cj_batch[4], cj_batch[5]/n, cj_batch[6],
cj_batch[7]/n, min(cj_batch[8]), max(cj_batch[9]), '...')
# don't display summaries if csv export
if options.csv:
return ''
bestblockhash = jm_single().bc_interface.get_best_block_hash()
now = jm_single().bc_interface.get_block_time(bestblockhash)
jmprint(' %s best block is %s' % (
datetime.datetime.fromtimestamp(now).strftime("%Y-%m-%d %H:%M"),
bestblockhash))
total_profit = float(balance - sum(deposits)) / float(100000000)
jmprint('total profit = %.8f BTC' % total_profit)
if abs(total_profit) > 0:
try:
# https://gist.github.com/chris-belcher/647da261ce718fc8ca10
import numpy as np
from scipy.optimize import brentq
deposit_times = np.array(deposit_times)
now -= deposit_times[0]
deposit_times -= deposit_times[0]
deposits = np.array(deposits)
def f(r, deposits, deposit_times, now, final_balance):
return np.sum(np.exp((now - deposit_times) / 60.0 / 60 / 24 /
365)**r * deposits) - final_balance
r = brentq(f, a=1, b=-1, args=(deposits, deposit_times, now, balance))
jmprint('continuously compounded equivalent annual interest rate = ' +
str(r * 100) + ' %')
jmprint('(as if yield generator was a bank account)')
except ImportError:
jmprint('scipy not installed, unable to predict accumulation rate')
jmprint('to add it to this virtual environment, use `pip install scipy`')
# includes disabled utxos in accounting:
total_wallet_balance = sum(wallet.get_balance_by_mixdepth(
include_disabled=True).values())
if balance + unconfirmed_balance != total_wallet_balance:
jmprint(('BUG ERROR: wallet balance (%s) does not match balance from ' +
'history (%s)') % (btc.sat_to_str(total_wallet_balance),
btc.sat_to_str(balance)))
_utxos = await wallet.get_utxos_by_mixdepth(include_disabled=True)
wallet_utxo_count = sum(map(len, _utxos.values()))
if utxo_count + unconfirmed_utxo_count != wallet_utxo_count:
jmprint(('BUG ERROR: wallet utxo count (%d) does not match utxo count from ' +
'history (%s)') % (wallet_utxo_count, utxo_count))
if unconfirmed_balance != 0:
jmprint('unconfirmed balance change = %s BTC' % btc.sat_to_str(unconfirmed_balance))
# wallet-tool.py prints return value, so return empty string instead of None here
return ''
def wallet_showseed(wallet):
seed, extension = wallet.get_mnemonic_words()
text = "Wallet mnemonic recovery phrase:\n\n{}\n".format(seed)
if extension:
text += "\nWallet mnemonic extension: {}\n".format(extension.decode('utf-8'))
return text
async def wallet_importprivkey(wallet, mixdepth):
jmprint("WARNING: This imported key will not be recoverable with your 12 "
"word mnemonic phrase. Make sure you have backups.", "warning")
jmprint("WARNING: Make sure that the type of the public address previously "
"derived from this private key matches the wallet type you are "
"currently using.", "warning")
jmprint("WARNING: Handling of raw ECDSA bitcoin private keys can lead to "
"non-intuitive behaviour and loss of funds.\n Recommended instead "
"is to use the \'sweep\' feature of sendpayment.py.", "warning")
privkeys = input("Enter private key(s) to import: ")
privkeys = privkeys.split(',') if ',' in privkeys else privkeys.split()
imported_addr = []
import_failed = 0
# TODO read also one key for each line
for wif in privkeys:
# TODO is there any point in only accepting wif format? check what
# other wallets do
try:
path = wallet.import_private_key(mixdepth, wif)
except WalletError as e:
print("Failed to import key {}: {}".format(wif, e))
import_failed += 1
else:
imported_addr.append(await wallet.get_address_from_path(path))
if not imported_addr:
jmprint("Warning: No keys imported!", "error")
return
wallet.save()
# show addresses to user so they can verify everything went as expected
jmprint("Imported keys for addresses:\n{}".format('\n'.join(imported_addr)),
"success")
if import_failed:
jmprint("Warning: failed to import {} keys".format(import_failed),
"error")
def wallet_dumpprivkey(wallet, hdpath):
if not hdpath:
jmprint("Error: no hd wallet path supplied", "error")
return ""
path = wallet.path_repr_to_path(hdpath)
return wallet.get_wif_path(path) # will raise exception on invalid path
async def wallet_signmessage(
wallet, hdpath: str, message: str,
out_str: bool = True) -> Union[Tuple[str, str, str], str]:
""" Given a wallet, a BIP32 HD path (as can be output
from the display method) and a message string, returns
a base64 encoded signature along with the corresponding
address and message.
If `out_str` is True, returns human readable representation,
otherwise returns tuple of (signature, message, address).
"""
if not get_network() == "mainnet":
return "Error: message signing is only supported on mainnet."
msg = message.encode('utf-8')
if not hdpath:
return "Error: no key path for signing specified"
if not message:
return "Error: no message specified"
path = wallet.path_repr_to_path(hdpath)
addr, sig = await wallet.sign_message(msg, path)
if not out_str:
return (sig, message, addr)
return ("Signature: {}\nMessage: {}\nAddress: {}\n"
"To verify this in Electrum use Tools->Sign/verify "
"message.".format(sig, message, addr))
async def wallet_signpsbt(wallet_service, psbt):
if not psbt:
return "Error: no PSBT specified"
signed_psbt_and_signresult, err = await wallet_service.sign_psbt(
base64.b64decode(psbt.encode('ascii')), with_sign_result=True)
if err:
return "Failed to sign PSBT, quitting. Error message: {}".format(err)
signresult, signedpsbt = signed_psbt_and_signresult
jmprint(wallet_service.human_readable_psbt(signedpsbt))
jmprint("Base64 of the above PSBT:")
jmprint(signedpsbt.to_base64())
if signresult.is_final:
if not cli_prompt_user_yesno("Above PSBT is fully signed. Do you want to broadcast?"):
jmprint("Not broadcasting.")
else:
jmprint("Broadcasting...")
tx = signedpsbt.extract_transaction()
if jm_single().bc_interface.pushtx(tx.serialize()):
jmprint("Transaction sent: " + bintohex(
tx.GetTxid()[::-1]))
else:
jmprint("Transaction broadcast failed!", "error")
else:
# if the signing action did not result in a finalized PSBT,
# inform the user of the current status:
jmprint("The PSBT is not yet fully signed, we signed: {} "
"inputs.".format(signresult.num_inputs_signed))
return ""
async def display_utxos_for_disable_choice_default(wallet_service,
utxos_enabled,
utxos_disabled):
""" CLI implementation of the callback required as described in
wallet_disableutxo
"""
def default_user_choice(umax):
jmprint("Choose an index 0 .. {} to freeze/unfreeze or "
"-1 to just quit, or -2 to (un)freeze all".format(umax))
while True:
try:
ret = int(input())
except ValueError:
jmprint("Invalid choice, must be an integer.", "error")
continue
if ret < -2 or ret > umax:
jmprint("Invalid choice, must be between: -2 and {}, "
"try again.".format(umax), "error")
continue
break
return ret
async def output_utxos(utxos, status, start=0):
for (txid, idx), v in utxos.items():
value = v['value']
jmprint("{:4}: {} ({}): {} -- {}".format(
start, fmt_utxo((txid, idx)),
await wallet_service.wallet.script_to_addr(v["script"]),
btc.amount_to_str(value), status))
start += 1
yield txid, idx
jmprint("List of UTXOs:")
ulist = list(await output_utxos(utxos_disabled, 'FROZEN'))
disabled_max = len(ulist) - 1
ulist.extend(await output_utxos(utxos_enabled, 'NOT FROZEN',
start=len(ulist)))
max_id = len(ulist) - 1
chosen_idx = default_user_choice(max_id)
if chosen_idx == -1:
return None
if chosen_idx == -2:
return "all"
# the return value 'disable' is the action we are going to take;
# so it should be true if the utxos is currently unfrozen/enabled.
disable = False if chosen_idx <= disabled_max else True
return ulist[chosen_idx], disable
async def get_utxos_enabled_disabled(wallet_service: WalletService,
md: int) -> Tuple[dict, dict]:
""" Returns dicts for enabled and disabled separately
"""
utxos_enabled = await wallet_service.get_utxos_at_mixdepth(md)
utxos_all = await wallet_service.get_utxos_at_mixdepth(
md, include_disabled=True)
utxos_disabled_keyset = set(utxos_all).difference(set(utxos_enabled))
utxos_disabled = {}
for u in utxos_disabled_keyset:
utxos_disabled[u] = utxos_all[u]
return utxos_enabled, utxos_disabled
async def wallet_freezeutxo(wallet_service, md,
display_callback=None, info_callback=None):
""" Given a wallet and a mixdepth, display to the user
the set of available utxos, indexed by integer, and accept a choice
of index to "freeze", then commit this disabling to the wallet storage,
so that this disable-ment is persisted. Also allow unfreezing of a
chosen utxo which is currently frozen.
Callbacks for display and reporting can be specified in the keyword
arguments as explained below, otherwise default CLI is used.
** display_callback signature:
args:
1. wallet_service
2. utxos_enabled ; dict of utxos as format in wallet.py.
3. utxos_disabled ; as above, for disabled
returns:
1.((txid(str), index(int)), disabled(bool)) of chosen utxo
for freezing/unfreezing, or None for no action/cancel.
** info_callback signature:
args:
1. message (str)
2. type (str) ("info", "error" etc as per jmprint)
returns: None
"""
if display_callback is None:
display_callback = display_utxos_for_disable_choice_default
if info_callback is None:
info_callback = jmprint
if md is None:
info_callback("Specify the mixdepth with the -m flag", "error")
return "Failed"
while True:
utxos_enabled, utxos_disabled = await get_utxos_enabled_disabled(
wallet_service, md)
if utxos_disabled == {} and utxos_enabled == {}:
info_callback("The mixdepth: " + str(md) + \
" contains no utxos to freeze/unfreeze.", "error")
return "Failed"
display_ret = display_callback(
wallet_service, utxos_enabled, utxos_disabled)
if asyncio.iscoroutine(display_ret):
display_ret = await display_ret
if display_ret is None:
break
if display_ret == "all":
disable = (len(utxos_disabled) == 0)
info_callback("Setting all UTXOs to " + ("frozen" if disable else "unfrozen")
+ " . . .")
for txid, index in chain(utxos_enabled, utxos_disabled):
wallet_service.disable_utxo(txid, index, disable)
else:
(txid, index), disable = display_ret
wallet_service.disable_utxo(txid, index, disable)
if disable:
info_callback("Utxo: {} is now frozen and unavailable for spending."
.format(fmt_utxo((txid, index))))
else:
info_callback("Utxo: {} is now unfrozen and available for spending."
.format(fmt_utxo((txid, index))))
return "Done"
async def wallet_gettimelockaddress(wallet, locktime_string):
if not isinstance(wallet, FidelityBondMixin):
jmprint("Error: not a fidelity bond wallet", "error")
return ""
lock_datetime = datetime.datetime.strptime(locktime_string, "%Y-%m")
if (jm_single().config.get("BLOCKCHAIN", "network") == "mainnet"
and lock_datetime <= datetime.datetime.now()):
jmprint("Error: locktime must be a future date", "error")
return ""
m = FidelityBondMixin.FIDELITY_BOND_MIXDEPTH
address_type = FidelityBondMixin.BIP32_TIMELOCK_ID
timenumber = FidelityBondMixin.datetime_to_time_number(lock_datetime)
path = wallet.get_path(m, address_type, timenumber)
jmprint("path = " + wallet.get_path_repr(path), "info")
jmprint("Coins sent to this address will not be spendable until "
+ lock_datetime.strftime("%B %Y") + ". Full date: "
+ str(lock_datetime))
jmprint("WARNING: You should send coins to this address only once."
+ " Only single biggest value UTXO will be announced as a fidelity bond."
+ " Sending coins to this address multiple times will not increase"
+ " fidelity bond value.")
jmprint("WARNING: Only send coins here which are from coinjoins or otherwise"
+ " not linked to your identity. Also, use a sweep transaction when funding the"
+ " timelocked address, i.e. Don't create a change address. See the privacy warnings in"
+ " fidelity-bonds.md")
addr = await wallet.get_address_from_path(path)
return addr
def wallet_addtxoutproof(wallet_service, hdpath, txoutproof):
if not isinstance(wallet_service.wallet, FidelityBondMixin):
jmprint("Error: not a fidelity bond wallet", "error")
return ""
path = hdpath.encode()
if path not in wallet_service.wallet.get_burner_outputs():
jmprint("Error: unknown burner transaction with on that path", "error")
return ""
txhex, block_height, old_merkle_branch, block_index = \
wallet_service.wallet.get_burner_outputs()[path]
new_merkle_branch = jm_single().bc_interface.core_proof_to_merkle_branch(txoutproof)
txhex = binascii.hexlify(txhex).decode()
txid = btc.txhash(txhex)
if not jm_single().bc_interface.verify_tx_merkle_branch(txid, block_height,
new_merkle_branch):
jmprint("Error: tx out proof invalid", "error")
return ""
wallet_service.wallet.add_burner_output(hdpath, txhex, block_height,
new_merkle_branch, block_index)
return "Done"
async def wallet_createwatchonly(wallet_root_path, master_pub_key):
wallet_name = cli_get_wallet_file_name(defaultname="watchonly.jmdat")
if not wallet_name:
DEFAULT_WATCHONLY_WALLET_NAME = "watchonly.jmdat"
wallet_name = DEFAULT_WATCHONLY_WALLET_NAME
wallet_path = os.path.join(wallet_root_path, wallet_name)
password = cli_get_wallet_passphrase_check()
if not password:
return ""
entropy = FidelityBondMixin.get_xpub_from_fidelity_bond_master_pub_key(master_pub_key)
if not entropy:
jmprint("Error with provided master pub key", "error")
return ""
entropy = entropy.encode()
wallet = await create_wallet(wallet_path, password,
max_mixdepth=FidelityBondMixin.FIDELITY_BOND_MIXDEPTH,
wallet_cls=FidelityBondWatchonlyWallet, entropy=entropy)
return "Done"
def get_configured_wallet_type(support_fidelity_bonds):
configured_type = TYPE_P2PKH
if is_frost_mode():
return TYPE_P2TR_FROST
if is_taproot_mode():
configured_type = TYPE_P2TR
elif is_segwit_mode():
if is_native_segwit_mode():
configured_type = TYPE_P2WPKH
else:
configured_type = TYPE_P2SH_P2WPKH
if not support_fidelity_bonds:
return configured_type
if configured_type == TYPE_P2WPKH:
return TYPE_SEGWIT_WALLET_FIDELITY_BONDS
elif configured_type == TYPE_P2TR:
return TYPE_TAPROOT_WALLET_FIDELITY_BONDS
else:
raise ValueError("Fidelity bonds not supported with the configured "
"options of segwit and native. Edit joinmarket.cfg")
def get_wallet_cls(wtype):
cls = WALLET_IMPLEMENTATIONS.get(wtype)
if not cls:
raise WalletError("No wallet implementation found for type {}."
"".format(wtype))
return cls
async def create_wallet(path, password, max_mixdepth, wallet_cls, **kwargs):
storage = Storage(path, password, create=True)
gap_limit = jm_single().config.getint("POLICY", "gaplimit")
if wallet_cls == FrostWallet:
dkg_path = DKGStorage.dkg_path(path)
dkg_storage = DKGStorage(dkg_path, create=True)
dkg_recovery_path = DKGRecoveryStorage.dkg_recovery_path(path)
recovery_storage = DKGRecoveryStorage(dkg_recovery_path, create=True)
wallet_cls.initialize(storage, dkg_storage, recovery_storage,
get_network(), max_mixdepth=max_mixdepth,
**kwargs)
storage.save()
dkg_storage.save()
recovery_storage.save()
wallet = wallet_cls(storage, dkg_storage, recovery_storage,
gap_limit=gap_limit)
await wallet.async_init(storage, gap_limit=gap_limit)
return wallet
else:
wallet_cls.initialize(storage, get_network(),
max_mixdepth=max_mixdepth, **kwargs)
storage.save()
wallet = wallet_cls(storage, gap_limit=gap_limit)
await wallet.async_init(storage, gap_limit=gap_limit)
return wallet
async def open_test_wallet_maybe(
path, seed, max_mixdepth, test_wallet_cls=SegwitWallet,
wallet_password_stdin=False, **kwargs):
"""
Create a volatile test wallet if path is a hex-encoded string of length 64,
otherwise run open_wallet().
params:
path: path to wallet file, ignored for test wallets
seed: hex-encoded test seed
max_mixdepth: maximum mixdepth to use
kwargs: see open_wallet()
returns:
wallet object
"""
# If the native flag is set in the config, it overrides the argument
# test_wallet_cls
if jm_single().config.get("POLICY", "native") == "false":
test_wallet_cls = SegwitLegacyWallet
if len(seed) == test_wallet_cls.ENTROPY_BYTES * 2:
try:
seed = binascii.unhexlify(seed)
except binascii.Error:
pass
else:
if max_mixdepth is None:
max_mixdepth = DEFAULT_MIXDEPTH
storage = VolatileStorage()
test_wallet_cls.initialize(
storage, get_network(), max_mixdepth=max_mixdepth,
entropy=seed)
#wallet instantiation insists on no unexpected kwargs,
#but Qt caller opens both test and mainnet with same args,
#hence these checks/deletes of unwanted args for tests.
if 'ask_for_password' in kwargs:
del kwargs['ask_for_password']
if 'password' in kwargs:
del kwargs['password']
if 'read_only' in kwargs:
del kwargs['read_only']
wallet = test_wallet_cls(storage, **kwargs)
await wallet.async_init(storage, **kwargs)
return wallet
if wallet_password_stdin is True:
password = read_password_stdin()
return await open_wallet(
path, ask_for_password=False, password=password,
mixdepth=max_mixdepth, **kwargs)
return await open_wallet(path, mixdepth=max_mixdepth, **kwargs)
async def open_wallet(path, ask_for_password=True, password=None,
read_only=False, load_dkg=False, dkg_read_only=True,
**kwargs):
"""
Open the wallet file at path and return the corresponding wallet object.
params:
path: str, full path to wallet file
ask_for_password: bool, if False password is assumed unset and user
will not be asked to type it
password: password for storage, ignored if ask_for_password is True
read_only: bool, if True, open wallet in read-only mode
kwargs: additional options to pass to wallet's init method
returns:
wallet object
"""
if not read_only and not dkg_read_only:
raise Exception('open_wallet: params read_only and dkg_read_only'
' can not be mutually unset')
if not os.path.isfile(path):
raise Exception("Failed to open wallet at '{}': not a file".format(path))
if not Storage.is_storage_file(path):
raise Exception("Failed to open wallet at '{}': not a valid joinmarket"
" wallet.".format(path))
if ask_for_password and Storage.is_encrypted_storage_file(path):
while True:
try:
# Verify lock status if not read only before trying to open wallet.
if not read_only:
Storage.verify_lock(path)
# do not try empty password, assume unencrypted on empty password
pwd = get_password("Enter passphrase to decrypt wallet: ") or None
storage = Storage(path, password=pwd, read_only=read_only)
except StoragePasswordError:
jmprint("Wrong password, try again.", "warning")
continue
except Exception as e:
jmprint("Failed to load wallet, error message: " + repr(e),
"error")
raise e
break
else:
storage = Storage(path, password, read_only=read_only)
load_cache = True
if jm_single().config.get("POLICY", "wallet_caching_disabled") == "true":
load_cache = False
wallet_cls = get_wallet_cls_from_storage(storage)
if wallet_cls == FrostWallet:
dkg_storage = None
recovery_storage = None
if load_dkg:
dkg_path = DKGStorage.dkg_path(path)
if not os.path.isfile(dkg_path):
raise Exception(f"Failed to open DKG File at "
f"'{dkg_path}': not a file")
if not DKGStorage.is_storage_file(dkg_path):
raise Exception(f"Failed to open DKG File at "
f"'{dkg_path}': not a valid file magic.")
try:
if not dkg_read_only:
DKGStorage.verify_lock(dkg_path)
dkg_storage = DKGStorage(dkg_path, read_only=dkg_read_only)
except Exception as e:
jmprint(f"Failed to load DKG File, "
f"error message: {repr(e)}",
"error")
raise e
dkg_recovery_path = DKGRecoveryStorage.dkg_recovery_path(path)
if not os.path.isfile(dkg_recovery_path):
raise Exception(f"Failed to open DKG Recovery File at "
f"'{dkg_recovery_path}': not a file")
if not DKGRecoveryStorage.is_storage_file(dkg_recovery_path):
raise Exception(f"Failed to open DKG Recovery File at "
f"'{dkg_recovery_path}': not a valid "
f"file magic.")
try:
if not dkg_read_only:
DKGRecoveryStorage.verify_lock(dkg_recovery_path)
recovery_storage = DKGRecoveryStorage(
dkg_recovery_path, read_only=dkg_read_only)
except Exception as e:
jmprint(f"Failed to load DKG Recovery File, "
f"error message: {repr(e)}",
"error")
raise e
wallet = wallet_cls(storage, dkg_storage, recovery_storage,
load_cache=load_cache, **kwargs)
await wallet.async_init(storage, load_cache=load_cache, **kwargs)
else:
wallet = wallet_cls(storage, load_cache=load_cache, **kwargs)
await wallet.async_init(storage, load_cache=load_cache, **kwargs)
wallet_sanity_check(wallet)
return wallet
def get_wallet_cls_from_storage(storage):
wtype = storage.data.get(b'wallet_type')
if wtype is None:
raise WalletError("File {} is not a valid wallet.".format(storage.path))
return get_wallet_cls(wtype)
def wallet_sanity_check(wallet):
if wallet.network != get_network():
raise Exception("Wallet network mismatch: we are on '{}' but wallet "
"is on '{}'.".format(get_network(), wallet.network))
def get_wallet_path(file_name, wallet_dir=None):
if not wallet_dir:
wallet_dir = os.path.join(jm_single().datadir, 'wallets')
return os.path.join(wallet_dir, file_name)
def read_password_stdin():
return sys.stdin.readline().replace('\n','').encode('utf-8')
async def wallet_tool_main(wallet_root_path):
"""Main wallet tool script function; returned is a string (output or error)
"""
parser = get_wallettool_parser()
(options, args) = parser.parse_args()
load_program_config(config_path=options.datadir)
check_regtest(blockchain_start=False)
# full path to the wallets/ subdirectory in the user data area:
wallet_root_path = os.path.join(jm_single().datadir, wallet_root_path)
noseed_methods = ['generate', 'recover', 'createwatchonly']
methods = ['display', 'displayall', 'summary', 'showseed', 'importprivkey',
'history', 'showutxos', 'freeze', 'gettimelockaddress',
'addtxoutproof', 'changepass', 'setlabel']
methods.extend(noseed_methods)
noscan_methods = ['showseed', 'importprivkey', 'dumpprivkey', 'signmessage',
'changepass']
readonly_methods = ['display', 'displayall', 'summary', 'showseed',
'history', 'showutxos', 'dumpprivkey', 'signmessage',
'gettimelockaddress']
# FrostWallet related methods
frost_load_dkg_methods = ['hostpubkey', 'servefrost', 'dkgrecover',
'dkgls', 'dkgrm', 'recdkgls', 'recdkgrm']
frost_noscan_methods = ['hostpubkey', 'servefrost', 'dkgrecover',
'dkgls', 'dkgrm', 'recdkgls', 'recdkgrm',
'testdkg', 'testfrost']
frost_readonly_methods = ['hostpubkey', 'dkgls', 'recdkgls',
'testdkg', 'testfrost']
noscan_methods.extend(frost_noscan_methods)
readonly_methods.extend(frost_readonly_methods)
if len(args) < 1:
parser.error('Needs a wallet file or method')
twisted_sys_exit(EXIT_ARGERROR)
if options.mixdepth is not None and options.mixdepth < 0:
parser.error("Must have at least one mixdepth.")
twisted_sys_exit(EXIT_ARGERROR)
if args[0] in noseed_methods:
method = args[0]
if options.mixdepth is None:
options.mixdepth = DEFAULT_MIXDEPTH
else:
seed = args[0]
wallet_path = get_wallet_path(seed, wallet_root_path)
method = ('display' if len(args) == 1 else args[1].lower())
# no-seed methods are incompatible with a provided wallet:
if method in noseed_methods:
parser.error("The method '" + method + \
"' is not compatible with a wallet filename.")
twisted_sys_exit(EXIT_ARGERROR)
config = jm_single().config
if config.has_option('POLICY', 'frost'):
policy_frost = config.get("POLICY", "frost")
if policy_frost == 'true':
readonly_methods.remove('display')
readonly_methods.remove('displayall')
read_only = method in readonly_methods
#special case needed for fidelity bond burner outputs
#maybe theres a better way to do this
if options.recoversync:
read_only = False
if method in frost_load_dkg_methods:
load_dkg = True
read_only = True
dkg_read_only = True if method in frost_readonly_methods else False
else:
load_dkg = False
dkg_read_only = True
wallet = await open_test_wallet_maybe(
wallet_path, seed, options.mixdepth, read_only=read_only,
load_dkg=load_dkg, dkg_read_only=dkg_read_only,
wallet_password_stdin=options.wallet_password_stdin, gap_limit=options.gaplimit)
# this object is only to respect the layering,
# the service will not be started since this is a synchronous script:
wallet_service = WalletService(wallet)
if wallet_service.rpc_error:
twisted_sys_exit(EXIT_FAILURE)
if (isinstance(wallet, FrostWallet) and
method not in frost_load_dkg_methods):
ipc_client = FrostIPCClient(wallet)
await ipc_client.async_init()
wallet.set_ipc_client(ipc_client)
if method not in noscan_methods and jm_single().bc_interface is not None:
# if nothing was configured, we override bitcoind's options so that
# unconfirmed balance is included in the wallet display by default
if 'listunspent_args' not in jm_single().config.options('POLICY'):
jm_single().config.set('POLICY','listunspent_args', '[0]')
while True:
if await wallet_service.sync_wallet(
fast=not options.recoversync):
break
#Now the wallet/data is prepared, execute the script according to the method
if method == "display":
return await wallet_display(wallet_service, options.showprivkey,
mixdepth=options.mixdepth)
elif method == "displayall":
return await wallet_display(wallet_service, options.showprivkey,
displayall=True, mixdepth=options.mixdepth)
elif method == "summary":
return await wallet_display(wallet_service, options.showprivkey,
summarized=True, mixdepth=options.mixdepth)
elif method == "history":
if not isinstance(jm_single().bc_interface, BitcoinCoreInterface):
jmprint('showing history only available when using the Bitcoin Core ' +
'blockchain interface', "error")
twisted_sys_exit(EXIT_ARGERROR)
else:
return await wallet_fetch_history(wallet_service, options)
elif method == "generate":
retval = await wallet_generate_recover(
"generate", wallet_root_path, mixdepth=options.mixdepth)
return "Generated wallet OK" if retval else "Failed"
elif method == "recover":
retval = await wallet_generate_recover(
"recover", wallet_root_path, mixdepth=options.mixdepth)
return "Recovered wallet OK" if retval else "Failed"
elif method == "changepass":
retval = await wallet_change_passphrase(wallet_service)
return "Changed encryption passphrase OK" if retval else "Failed"
elif method == "showutxos":
return await wallet_showutxos(wallet_service,
showprivkey=options.showprivkey,
limit_mixdepth=options.mixdepth)
elif method == "showseed":
return wallet_showseed(wallet_service)
elif method == "dumpprivkey":
return wallet_dumpprivkey(wallet_service, options.hd_path)
elif method == "importprivkey":
#note: must be interactive (security)
if options.mixdepth is None:
parser.error("You need to specify a mixdepth with -m")
await wallet_importprivkey(wallet_service, options.mixdepth)
return "Key import completed."
elif method == "signmessage":
if len(args) < 3:
jmprint('Must provide message to sign', "error")
twisted_sys_exit(EXIT_ARGERROR)
return await wallet_signmessage(
wallet_service, options.hd_path, args[2])
elif method == "signpsbt":
if len(args) < 3:
jmprint("Must provide PSBT to sign", "error")
twisted_sys_exit(EXIT_ARGERROR)
return await wallet_signpsbt(wallet_service, args[2])
elif method == "freeze":
return await wallet_freezeutxo(wallet_service, options.mixdepth)
elif method == "gettimelockaddress":
if len(args) < 3:
jmprint('Must have locktime value yyyy-mm. For example 2021-03', "error")
twisted_sys_exit(EXIT_ARGERROR)
return await wallet_gettimelockaddress(wallet_service.wallet, args[2])
elif method == "addtxoutproof":
if len(args) < 3:
jmprint('Must have txout proof, which is the output of Bitcoin '
+ 'Core\'s RPC call gettxoutproof', "error")
twisted_sys_exit(EXIT_ARGERROR)
return wallet_addtxoutproof(wallet_service, options.hd_path, args[2])
elif method == "createwatchonly":
if len(args) < 2:
jmprint("args: [master public key]", "error")
twisted_sys_exit(EXIT_ARGERROR)
return await wallet_createwatchonly(wallet_root_path, args[1])
elif method == "setlabel":
if len(args) < 4:
jmprint("args: address label", "error")
twisted_sys_exit(EXIT_ARGERROR)
wallet.set_address_label(args[2], args[3])
if args[3]:
return "Address label set"
else:
return "Address label removed"
elif method == "servefrost":
if not isinstance(wallet, FrostWallet):
return 'Command "servefrost" used only for FROST wallets'
client = FROSTClient(wallet_service)
cfactory = JMClientProtocolFactory(client, proto_type="MAKER")
wallet.set_client_factory(cfactory)
async def wait_jm_up():
while True:
await asyncio.sleep(1)
if client.jm_up:
break
start_reactor(
jm_single().config.get("DAEMON", "daemon_host"),
jm_single().config.getint("DAEMON", "daemon_port"),
cfactory,
ish=True,
daemon=True,
gui=True)
await wait_jm_up()
ipc_server = FrostIPCServer(wallet)
await ipc_server.async_init()
await ipc_server.serve_forever()
return
elif method == "testdkg":
if not isinstance(wallet, FrostWallet):
return 'Command "testdgk" used only for FROST wallets'
md = address_type = index = 0
pubkey = await wallet.ipc_client.get_dkg_pubkey(
md, address_type, index, session_id=b'\x00'*32)
if pubkey:
return f'pubkey: {pubkey.hex()}'
elif method == "testfrost":
if not isinstance(wallet, FrostWallet):
return 'Command "testfrost" used only for FROST wallets'
from hashlib import sha256
from bitcointx.core.key import XOnlyPubKey
msg = 'testmsg'
md = address_type = index = 0
msghash = sha256(msg.encode()).digest()
sig, pubkey, tweaked_pubkey = await wallet.ipc_client.frost_req(
md, address_type, index, msghash)
verify_pubkey = XOnlyPubKey(tweaked_pubkey[1:])
if verify_pubkey.verify_schnorr(msghash, sig):
return "Schnorr signature successfully verified"
else:
jmprint("Schnorr signature verify failed", "error")
return
elif method == "hostpubkey":
if not isinstance(wallet, FrostWallet):
return 'Command "hostpubkey" used only for FROST wallets'
hostseckey = wallet._hostseckey
if hostseckey:
hostpubkey = hostpubkey_gen(hostseckey[:32])
return hostpubkey.hex()
else:
return 'No hostseckey available'
elif method == "dkgrecover":
if not isinstance(wallet, FrostWallet):
return 'Command "dkgrecover" used only for FROST wallets'
dkgrec_path = args[2]
dkgrec_storage = DKGRecoveryStorage(
dkgrec_path, create=False, read_only=True)
return wallet_service.dkg.dkg_recover(dkgrec_storage)
elif method == "dkgls":
if not isinstance(wallet, FrostWallet):
return 'Command "dkgls" used only for FROST wallets'
return wallet_service.dkg.dkg_ls()
elif method == "dkgrm":
if not isinstance(wallet, FrostWallet):
return 'Command "dkgrm" used only for FROST wallets'
session_ids = args[2:]
if not session_ids:
return jmprint("no session ids specified", "error")
session_ids = list(dict.fromkeys(session_ids)) # make unique
return wallet_service.dkg.dkg_rm(session_ids)
elif method == "recdkgls":
if not isinstance(wallet, FrostWallet):
return 'Command "recdkgls" used only for FROST wallets'
return wallet_service.dkg.recdkg_ls()
elif method == "recdkgrm":
if not isinstance(wallet, FrostWallet):
return 'Command "recdkgrm" used only for FROST wallets'
session_ids = args[2:]
if not session_ids:
return jmprint("no session ids specified", "error")
session_ids = list(dict.fromkeys(session_ids)) # make unique
return wallet_service.dkg.recdkg_rm(session_ids)
else:
parser.error("Unknown wallet-tool method: " + method)
twisted_sys_exit(EXIT_ARGERROR)