Browse Source
mastera0c1d5aadd upgrade notes (undeath)8885e61revert bad assert fix (undeath)a929cf3make log output human-readable again (undeath)aa2c1d9fix some bugs in wallet_utils (undeath)9dd1dc7fix wallet sync in fast mode (undeath)98f41f7make SimpleLruCache an actual LRU cache (undeath)703ae04remove wallet.sign() (undeath)34f8600fix wallet syncing (undeath)747c227fix some max_mixdepth off-by-one errors (undeath)39e4276change default wallet name (undeath)8b9abefadd is_segwit_mode() utility function (undeath)8ca6cfcmake sure new addresses always get imported (undeath)914a40eadopt wallet_utils for new wallet (undeath)cdbb345remove uses of internal wallet data from electruminterface. NOTE: changes untested, probably breaks electruminterface somehow (undeath)1f30967adopt blockchaininterface for new wallet (undeath)705d41dremove usages of wallet.unspent (undeath)89b5cd4add new wallet classes to existing tests (undeath)3cf9926remove references to old wallet classes (undeath)2a0757cremove BitcoinCoreWallet (undeath)6aaabb2change yieldgenerator using new wallet implementation, start porting wallet_utils (undeath)995c123replace old wallet implementation with new one (undeath)474a77dadd setup.py dependencies (undeath)ca57a14add new wallet implementation (undeath)
42 changed files with 4312 additions and 2122 deletions
@ -0,0 +1,29 @@
|
||||
Joinmarket-clientserver future: |
||||
=============================== |
||||
|
||||
|
||||
Upgrading |
||||
========= |
||||
|
||||
To upgrade: run the `install.sh` script as mentioned in the README. When prompted to overwrite the directory `jmvenv`, accept. |
||||
|
||||
A new wallet format has been introduced. Old wallets require conversion. In order to convert your existing wallet to the new format you can use the included conversion tool at `scripts/convert_old_wallet.py`. |
||||
|
||||
usage: |
||||
|
||||
python convert_old_wallet.py full/path/to/wallets/wallet.json |
||||
|
||||
This will place the newly converted `wallet.jmdat` file in the existing joinmarket `wallets/` directory. The wallet name will be adopted accordingly if it differs from `wallet`. |
||||
|
||||
|
||||
Notable changes |
||||
=============== |
||||
|
||||
|
||||
Credits |
||||
======= |
||||
|
||||
Thanks to everyone who directly contributed to this release - |
||||
|
||||
|
||||
And thanks also to those who submitted bug reports, tested and otherwise helped out. |
||||
@ -0,0 +1,271 @@
|
||||
from __future__ import print_function, absolute_import, division, unicode_literals |
||||
|
||||
|
||||
from binascii import hexlify, unhexlify |
||||
from collections import OrderedDict |
||||
|
||||
|
||||
from . import btc |
||||
from .configure import get_network |
||||
|
||||
|
||||
TYPE_P2PKH, TYPE_P2SH_P2WPKH, TYPE_P2WPKH = range(3) |
||||
NET_MAINNET, NET_TESTNET = range(2) |
||||
NET_MAP = {'mainnet': NET_MAINNET, 'testnet': NET_TESTNET} |
||||
WIF_PREFIX_MAP = {'mainnet': 0x80, 'testnet': 0xef} |
||||
BIP44_COIN_MAP = {'mainnet': 2**31, 'testnet': 2**31 + 1} |
||||
|
||||
|
||||
# |
||||
# library stuff that should be in btc/jmbitcoin |
||||
# |
||||
|
||||
|
||||
P2PKH_PRE, P2PKH_POST = b'\x76\xa9\x14', b'\x88\xac' |
||||
P2SH_P2WPKH_PRE, P2SH_P2WPKH_POST = b'\xa9\x14', b'\x87' |
||||
P2WPKH_PRE = b'\x00\x14' |
||||
|
||||
|
||||
def _pubkey_to_script(pubkey, script_pre, script_post=b''): |
||||
# sanity check for public key |
||||
# see https://github.com/bitcoin/bitcoin/blob/master/src/pubkey.h |
||||
if not ((len(pubkey) == 33 and pubkey[0] in (b'\x02', b'\x03')) or |
||||
(len(pubkey) == 65 and pubkey[0] in (b'\x04', b'\x06', b'\x07'))): |
||||
raise Exception("Invalid public key!") |
||||
h = btc.bin_hash160(pubkey) |
||||
assert len(h) == 0x14 |
||||
assert script_pre[-1] == b'\x14' |
||||
return script_pre + h + script_post |
||||
|
||||
|
||||
def pubkey_to_p2pkh_script(pubkey): |
||||
return _pubkey_to_script(pubkey, P2PKH_PRE, P2PKH_POST) |
||||
|
||||
|
||||
def pubkey_to_p2sh_p2wpkh_script(pubkey): |
||||
wscript = pubkey_to_p2wpkh_script(pubkey) |
||||
return P2SH_P2WPKH_PRE + btc.bin_hash160(wscript) + P2SH_P2WPKH_POST |
||||
|
||||
|
||||
def pubkey_to_p2wpkh_script(pubkey): |
||||
return _pubkey_to_script(pubkey, P2WPKH_PRE) |
||||
|
||||
|
||||
class classproperty(object): |
||||
""" |
||||
from https://stackoverflow.com/a/5192374 |
||||
""" |
||||
def __init__(self, f): |
||||
self.f = f |
||||
|
||||
def __get__(self, obj, owner): |
||||
return self.f(owner) |
||||
|
||||
|
||||
class SimpleLruCache(OrderedDict): |
||||
""" |
||||
note: python3.2 has a lru cache in functools |
||||
""" |
||||
def __init__(self, max_size): |
||||
OrderedDict.__init__(self) |
||||
assert max_size > 0 |
||||
self.max_size = max_size |
||||
|
||||
def __setitem__(self, key, value): |
||||
OrderedDict.__setitem__(self, key, value) |
||||
self._adjust_size() |
||||
|
||||
def __getitem__(self, item): |
||||
e = OrderedDict.__getitem__(self, item) |
||||
del self[item] |
||||
OrderedDict.__setitem__(self, item, e) |
||||
return e |
||||
|
||||
def _adjust_size(self): |
||||
while len(self) > self.max_size: |
||||
self.popitem(last=False) |
||||
|
||||
|
||||
# |
||||
# library stuff end |
||||
# |
||||
|
||||
|
||||
class EngineError(Exception): |
||||
pass |
||||
|
||||
|
||||
class BTCEngine(object): |
||||
# must be set by subclasses |
||||
VBYTE = None |
||||
__LRU_KEY_CACHE = SimpleLruCache(50) |
||||
|
||||
@classproperty |
||||
def BIP32_priv_vbytes(cls): |
||||
return btc.PRIVATE[NET_MAP[get_network()]] |
||||
|
||||
@classproperty |
||||
def WIF_PREFIX(cls): |
||||
return WIF_PREFIX_MAP[get_network()] |
||||
|
||||
@classproperty |
||||
def BIP44_COIN_TYPE(cls): |
||||
return BIP44_COIN_MAP[get_network()] |
||||
|
||||
@staticmethod |
||||
def privkey_to_pubkey(privkey): |
||||
return btc.privkey_to_pubkey(privkey, False) |
||||
|
||||
@staticmethod |
||||
def address_to_script(addr): |
||||
return unhexlify(btc.address_to_script(addr)) |
||||
|
||||
@classmethod |
||||
def wif_to_privkey(cls, wif): |
||||
raw = btc.b58check_to_bin(wif) |
||||
vbyte = btc.get_version_byte(wif) |
||||
|
||||
if (btc.BTC_P2PK_VBYTE[get_network()] + cls.WIF_PREFIX) & 0xff == vbyte: |
||||
key_type = TYPE_P2PKH |
||||
elif (btc.BTC_P2SH_VBYTE[get_network()] + cls.WIF_PREFIX) & 0xff == vbyte: |
||||
key_type = TYPE_P2SH_P2WPKH |
||||
else: |
||||
key_type = None |
||||
|
||||
return raw, key_type |
||||
|
||||
@classmethod |
||||
def privkey_to_wif(cls, priv): |
||||
return btc.bin_to_b58check(priv, cls.WIF_PREFIX) |
||||
|
||||
@classmethod |
||||
def derive_bip32_master_key(cls, seed): |
||||
# FIXME: slight encoding mess |
||||
return btc.bip32_deserialize( |
||||
btc.bip32_master_key(seed, vbytes=cls.BIP32_priv_vbytes)) |
||||
|
||||
@classmethod |
||||
def derive_bip32_privkey(cls, master_key, path): |
||||
assert len(path) > 1 |
||||
return cls._walk_bip32_path(master_key, path)[-1] |
||||
|
||||
@classmethod |
||||
def derive_bip32_pub_export(cls, master_key, path): |
||||
priv = cls._walk_bip32_path(master_key, path) |
||||
return btc.bip32_serialize(btc.raw_bip32_privtopub(priv)) |
||||
|
||||
@classmethod |
||||
def derive_bip32_priv_export(cls, master_key, path): |
||||
return btc.bip32_serialize(cls._walk_bip32_path(master_key, path)) |
||||
|
||||
@classmethod |
||||
def _walk_bip32_path(cls, master_key, path): |
||||
key = master_key |
||||
for lvl in path[1:]: |
||||
assert 0 <= lvl < 2**32 |
||||
if (key, lvl) in cls.__LRU_KEY_CACHE: |
||||
key = cls.__LRU_KEY_CACHE[(key, lvl)] |
||||
else: |
||||
cls.__LRU_KEY_CACHE[(key, lvl)] = btc.raw_bip32_ckd(key, lvl) |
||||
key = cls.__LRU_KEY_CACHE[(key, lvl)] |
||||
return key |
||||
|
||||
@classmethod |
||||
def privkey_to_script(cls, privkey): |
||||
pub = cls.privkey_to_pubkey(privkey) |
||||
return cls.pubkey_to_script(pub) |
||||
|
||||
@classmethod |
||||
def pubkey_to_script(cls, pubkey): |
||||
raise NotImplementedError() |
||||
|
||||
@classmethod |
||||
def privkey_to_address(cls, privkey): |
||||
script = cls.privkey_to_script(privkey) |
||||
return btc.script_to_address(script, cls.VBYTE) |
||||
|
||||
@classmethod |
||||
def pubkey_to_address(cls, pubkey): |
||||
script = cls.pubkey_to_script(pubkey) |
||||
return btc.script_to_address(script, cls.VBYTE) |
||||
|
||||
@classmethod |
||||
def sign_transaction(cls, tx, index, privkey, amount): |
||||
raise NotImplementedError() |
||||
|
||||
@staticmethod |
||||
def sign_message(privkey, message): |
||||
""" |
||||
args: |
||||
privkey: bytes |
||||
message: bytes |
||||
returns: |
||||
base64-encoded signature |
||||
""" |
||||
return btc.ecdsa_sign(message, privkey, True, False) |
||||
|
||||
@classmethod |
||||
def script_to_address(cls, script): |
||||
return btc.script_to_address(script, vbyte=cls.VBYTE) |
||||
|
||||
|
||||
class BTC_P2PKH(BTCEngine): |
||||
@classproperty |
||||
def VBYTE(cls): |
||||
return btc.BTC_P2PK_VBYTE[get_network()] |
||||
|
||||
@classmethod |
||||
def pubkey_to_script(cls, pubkey): |
||||
return pubkey_to_p2pkh_script(pubkey) |
||||
|
||||
@classmethod |
||||
def sign_transaction(cls, tx, index, privkey, *args, **kwargs): |
||||
hashcode = kwargs.get('hashcode') or btc.SIGHASH_ALL |
||||
|
||||
pubkey = cls.privkey_to_pubkey(privkey) |
||||
script = cls.pubkey_to_script(pubkey) |
||||
|
||||
signing_tx = btc.serialize(btc.signature_form(tx, index, script, |
||||
hashcode=hashcode)) |
||||
# FIXME: encoding mess |
||||
sig = unhexlify(btc.ecdsa_tx_sign(signing_tx, hexlify(privkey), |
||||
**kwargs)) |
||||
|
||||
tx['ins'][index]['script'] = btc.serialize_script([sig, pubkey]) |
||||
|
||||
return tx |
||||
|
||||
|
||||
class BTC_P2SH_P2WPKH(BTCEngine): |
||||
# FIXME: implement different bip32 key export prefixes like electrum? |
||||
# see http://docs.electrum.org/en/latest/seedphrase.html#list-of-reserved-numbers |
||||
|
||||
@classproperty |
||||
def VBYTE(cls): |
||||
return btc.BTC_P2SH_VBYTE[get_network()] |
||||
|
||||
@classmethod |
||||
def pubkey_to_script(cls, pubkey): |
||||
return pubkey_to_p2sh_p2wpkh_script(pubkey) |
||||
|
||||
@classmethod |
||||
def sign_transaction(cls, tx, index, privkey, amount, |
||||
hashcode=btc.SIGHASH_ALL, **kwargs): |
||||
assert amount is not None |
||||
|
||||
pubkey = cls.privkey_to_pubkey(privkey) |
||||
wpkscript = pubkey_to_p2wpkh_script(pubkey) |
||||
pkscript = pubkey_to_p2pkh_script(pubkey) |
||||
|
||||
signing_tx = btc.segwit_signature_form(tx, index, pkscript, amount, |
||||
hashcode=hashcode, |
||||
decoder_func=lambda x: x) |
||||
# FIXME: encoding mess |
||||
sig = unhexlify(btc.ecdsa_tx_sign(signing_tx, hexlify(privkey), |
||||
hashcode=hashcode, **kwargs)) |
||||
|
||||
assert len(wpkscript) == 0x16 |
||||
tx['ins'][index]['script'] = b'\x16' + wpkscript |
||||
tx['ins'][index]['txinwitness'] = [sig, pubkey] |
||||
|
||||
return tx |
||||
@ -0,0 +1,69 @@
|
||||
from binascii import hexlify |
||||
|
||||
|
||||
def fmt_utxos(utxos, wallet, prefix=''): |
||||
output = [] |
||||
for u in utxos: |
||||
utxo_str = '{}{} - {}'.format( |
||||
prefix, fmt_utxo(u), fmt_tx_data(utxos[u], wallet)) |
||||
output.append(utxo_str) |
||||
return '\n'.join(output) |
||||
|
||||
|
||||
def fmt_utxo(utxo): |
||||
return '{}:{}'.format(hexlify(utxo[0]), utxo[1]) |
||||
|
||||
|
||||
def fmt_tx_data(tx_data, wallet): |
||||
return 'path: {}, address: {}, value: {}'.format( |
||||
wallet.get_path_repr(wallet.script_to_path(tx_data['script'])), |
||||
wallet.script_to_addr(tx_data['script']), tx_data['value']) |
||||
|
||||
|
||||
def generate_podle_error_string(priv_utxo_pairs, to, ts, wallet, cjamount, |
||||
taker_utxo_age, taker_utxo_amtpercent): |
||||
"""Gives detailed error information on why commitment sourcing failed. |
||||
""" |
||||
errmsg = "" |
||||
errmsgheader = ("Failed to source a commitment; this debugging information" |
||||
" may help:\n\n") |
||||
errmsg += ("1: Utxos that passed age and size limits, but have " |
||||
"been used too many times (see taker_utxo_retries " |
||||
"in the config):\n") |
||||
if len(priv_utxo_pairs) == 0: |
||||
errmsg += ("None\n") |
||||
else: |
||||
for p, u in priv_utxo_pairs: |
||||
errmsg += (str(u) + "\n") |
||||
errmsg += "2: Utxos that have less than " + taker_utxo_age + " confirmations:\n" |
||||
if len(to) == 0: |
||||
errmsg += ("None\n") |
||||
else: |
||||
for t in to: |
||||
errmsg += (str(t) + "\n") |
||||
errmsg += ("3: Utxos that were not at least " + taker_utxo_amtpercent + \ |
||||
"% of the size of the coinjoin amount " + str(cjamount) + "\n") |
||||
if len(ts) == 0: |
||||
errmsg += ("None\n") |
||||
else: |
||||
for t in ts: |
||||
errmsg += (str(t) + "\n") |
||||
errmsg += ('***\n') |
||||
errmsg += ("Utxos that appeared in item 1 cannot be used again.\n") |
||||
errmsg += ("Utxos only in item 2 can be used by waiting for more " |
||||
"confirmations, (set by the value of taker_utxo_age).\n") |
||||
errmsg += ("Utxos only in item 3 are not big enough for this " |
||||
"coinjoin transaction, set by the value " |
||||
"of taker_utxo_amtpercent.\n") |
||||
errmsg += ("If you cannot source a utxo from your wallet according " |
||||
"to these rules, use the tool add-utxo.py to source a " |
||||
"utxo external to your joinmarket wallet. Read the help " |
||||
"with 'python add-utxo.py --help'\n\n") |
||||
errmsg += ("***\nFor reference, here are the utxos in your wallet:\n") |
||||
|
||||
for md, utxos in wallet.get_utxos_by_mixdepth_().items(): |
||||
if not utxos: |
||||
continue |
||||
errmsg += ("\nmixdepth {}:\n{}".format( |
||||
md, fmt_utxos(utxos, wallet, prefix=' '))) |
||||
return (errmsgheader, errmsg) |
||||
@ -0,0 +1,327 @@
|
||||
from __future__ import print_function, absolute_import, division, unicode_literals |
||||
|
||||
import os |
||||
import shutil |
||||
import atexit |
||||
import bencoder |
||||
import pyaes |
||||
from hashlib import sha256 |
||||
from argon2 import low_level |
||||
from .support import get_random_bytes |
||||
|
||||
|
||||
class Argon2Hash(object): |
||||
def __init__(self, password, salt=None, hash_len=32, salt_len=16, |
||||
time_cost=500, memory_cost=1000, parallelism=4, |
||||
argon2_type=low_level.Type.I, version=19): |
||||
""" |
||||
args: |
||||
password: password as bytes |
||||
salt: salt in bytes or None to create random one, must have length >= 8 |
||||
hash_len: generated hash length in bytes |
||||
salt_len: salt length in bytes, ignored if salt is not None, must be >= 8 |
||||
|
||||
Other arguments are argon2 settings. Only change those if you know what |
||||
you're doing. Optimized for slow hashing suitable for file encryption. |
||||
""" |
||||
# Default and recommended settings from argon2.PasswordHasher are for |
||||
# interactive logins. For encryption we want something much slower. |
||||
self.settings = { |
||||
'time_cost': time_cost, |
||||
'memory_cost': memory_cost, |
||||
'parallelism': parallelism, |
||||
'hash_len': hash_len, |
||||
'type': argon2_type, |
||||
'version': version |
||||
} |
||||
self.salt = salt if salt is not None else get_random_bytes(salt_len) |
||||
self.hash = low_level.hash_secret_raw(password, self.salt, |
||||
**self.settings) |
||||
|
||||
|
||||
class StorageError(Exception): |
||||
pass |
||||
|
||||
|
||||
class StoragePasswordError(StorageError): |
||||
pass |
||||
|
||||
|
||||
class Storage(object): |
||||
""" |
||||
Responsible for reading/writing [encrypted] data to disk. |
||||
|
||||
All data to be stored must be added to self.data which defaults to an |
||||
empty dict. |
||||
|
||||
self.data must contain serializable data (dict, list, tuple, bytes, numbers) |
||||
Having str objects anywhere in self.data will lead to undefined behaviour (py3). |
||||
All dict keys must be bytes. |
||||
|
||||
KDF: argon2, ENC: AES-256-CBC |
||||
""" |
||||
MAGIC_UNENC = b'JMWALLET' |
||||
MAGIC_ENC = b'JMENCWLT' |
||||
MAGIC_DETECT_ENC = b'JMWALLET' |
||||
|
||||
ENC_KEY_BYTES = 32 # AES-256 |
||||
SALT_LENGTH = 16 |
||||
|
||||
def __init__(self, path, password=None, create=False, read_only=False): |
||||
""" |
||||
args: |
||||
path: file path to storage |
||||
password: bytes or None for unencrypted file |
||||
create: create file if it does not exist |
||||
read_only: do not change anything on the file system |
||||
""" |
||||
self.path = path |
||||
self._lock_file = None |
||||
self._hash = None |
||||
self._data_checksum = None |
||||
self.data = None |
||||
self.changed = False |
||||
self.read_only = read_only |
||||
self.newly_created = False |
||||
|
||||
if not os.path.isfile(path): |
||||
if create and not read_only: |
||||
self._create_new(password) |
||||
self._save_file() |
||||
self.newly_created = True |
||||
else: |
||||
raise StorageError("File not found.") |
||||
else: |
||||
self._load_file(password) |
||||
|
||||
assert self.data is not None |
||||
assert self._data_checksum is not None |
||||
|
||||
self._create_lock() |
||||
|
||||
def is_encrypted(self): |
||||
return self._hash is not None |
||||
|
||||
def is_locked(self): |
||||
return self._lock_file and os.path.exists(self._lock_file) |
||||
|
||||
def was_changed(self): |
||||
""" |
||||
return True if data differs from data on disk |
||||
""" |
||||
return self._data_checksum != self._get_data_checksum() |
||||
|
||||
def change_password(self, password): |
||||
if self.read_only: |
||||
raise StorageError("Cannot change password of read-only file.") |
||||
self._set_hash(password) |
||||
self._save_file() |
||||
|
||||
def save(self): |
||||
""" |
||||
Write file to disk if data was modified |
||||
""" |
||||
#if not self.was_changed(): |
||||
# return |
||||
if self.read_only: |
||||
raise StorageError("Read-only storage cannot be saved.") |
||||
self._save_file() |
||||
|
||||
@classmethod |
||||
def is_storage_file(cls, path): |
||||
return cls._get_file_magic(path) in (cls.MAGIC_ENC, cls.MAGIC_UNENC) |
||||
|
||||
@classmethod |
||||
def is_encrypted_storage_file(cls, path): |
||||
return cls._get_file_magic(path) == cls.MAGIC_ENC |
||||
|
||||
@classmethod |
||||
def _get_file_magic(cls, path): |
||||
assert len(cls.MAGIC_ENC) == len(cls.MAGIC_UNENC) |
||||
with open(path, 'rb') as fh: |
||||
return fh.read(len(cls.MAGIC_ENC)) |
||||
|
||||
def _get_data_checksum(self): |
||||
if self.data is None: #pragma: no cover |
||||
return None |
||||
return sha256(self._serialize(self.data)).digest() |
||||
|
||||
def _update_data_hash(self): |
||||
self._data_checksum = self._get_data_checksum() |
||||
|
||||
def _create_new(self, password): |
||||
self.data = {} |
||||
self._set_hash(password) |
||||
|
||||
def _set_hash(self, password): |
||||
if password is None: |
||||
self._hash = None |
||||
else: |
||||
self._hash = self._hash_password(password) |
||||
|
||||
def _save_file(self): |
||||
assert self.read_only == False |
||||
data = self._serialize(self.data) |
||||
enc_data = self._encrypt_file(data) |
||||
|
||||
magic = self.MAGIC_UNENC if data is enc_data else self.MAGIC_ENC |
||||
self._write_file(magic + enc_data) |
||||
self._update_data_hash() |
||||
|
||||
def _load_file(self, password): |
||||
data = self._read_file() |
||||
assert len(self.MAGIC_ENC) == len(self.MAGIC_UNENC) == 8 |
||||
magic = data[:8] |
||||
|
||||
if magic not in (self.MAGIC_ENC, self.MAGIC_UNENC): |
||||
raise StorageError("File does not appear to be a joinmarket wallet.") |
||||
|
||||
data = data[8:] |
||||
|
||||
if magic == self.MAGIC_ENC: |
||||
if password is None: |
||||
raise StorageError("Password required to open wallet.") |
||||
data = self._decrypt_file(password, data) |
||||
else: |
||||
assert magic == self.MAGIC_UNENC |
||||
|
||||
self.data = self._deserialize(data) |
||||
self._update_data_hash() |
||||
|
||||
def _write_file(self, data): |
||||
assert self.read_only is False |
||||
|
||||
if not os.path.exists(self.path): |
||||
# newly created storage |
||||
with open(self.path, 'wb') as fh: |
||||
fh.write(data) |
||||
return |
||||
|
||||
# using a tmpfile ensures the write is atomic |
||||
tmpfile = '{}.tmp'.format(self.path) |
||||
|
||||
with open(tmpfile, 'wb') as fh: |
||||
shutil.copystat(self.path, tmpfile) |
||||
fh.write(data) |
||||
|
||||
#FIXME: behaviour with symlinks might be weird |
||||
shutil.move(tmpfile, self.path) |
||||
|
||||
def _read_file(self): |
||||
# this method mainly exists for easier mocking |
||||
with open(self.path, 'rb') as fh: |
||||
return fh.read() |
||||
|
||||
@staticmethod |
||||
def _serialize(data): |
||||
return bencoder.bencode(data) |
||||
|
||||
@staticmethod |
||||
def _deserialize(data): |
||||
return bencoder.bdecode(data) |
||||
|
||||
def _encrypt_file(self, data): |
||||
if not self.is_encrypted(): |
||||
return data |
||||
|
||||
iv = get_random_bytes(16) |
||||
container = { |
||||
b'enc': {b'salt': self._hash.salt, b'iv': iv}, |
||||
b'data': self._encrypt(data, iv) |
||||
} |
||||
return self._serialize(container) |
||||
|
||||
def _decrypt_file(self, password, data): |
||||
assert password is not None |
||||
|
||||
container = self._deserialize(data) |
||||
assert b'enc' in container |
||||
assert b'data' in container |
||||
|
||||
self._hash = self._hash_password(password, container[b'enc'][b'salt']) |
||||
|
||||
return self._decrypt(container[b'data'], container[b'enc'][b'iv']) |
||||
|
||||
def _encrypt(self, data, iv): |
||||
encrypter = pyaes.Encrypter( |
||||
pyaes.AESModeOfOperationCBC(self._hash.hash, iv=iv)) |
||||
enc_data = encrypter.feed(self.MAGIC_DETECT_ENC + data) |
||||
enc_data += encrypter.feed() |
||||
|
||||
return enc_data |
||||
|
||||
def _decrypt(self, data, iv): |
||||
decrypter = pyaes.Decrypter( |
||||
pyaes.AESModeOfOperationCBC(self._hash.hash, iv=iv)) |
||||
try: |
||||
dec_data = decrypter.feed(data) |
||||
dec_data += decrypter.feed() |
||||
except ValueError: |
||||
# in most "wrong password" cases the pkcs7 padding will be wrong |
||||
raise StoragePasswordError("Wrong password.") |
||||
|
||||
if not dec_data.startswith(self.MAGIC_DETECT_ENC): |
||||
raise StoragePasswordError("Wrong password.") |
||||
return dec_data[len(self.MAGIC_DETECT_ENC):] |
||||
|
||||
@classmethod |
||||
def _hash_password(cls, password, salt=None): |
||||
return Argon2Hash(password, salt, |
||||
hash_len=cls.ENC_KEY_BYTES, salt_len=cls.SALT_LENGTH) |
||||
|
||||
def _create_lock(self): |
||||
if self.read_only: |
||||
return |
||||
self._lock_file = '{}.lock'.format(self.path) |
||||
if os.path.exists(self._lock_file): |
||||
self._lock_file = None |
||||
raise StorageError("File is currently in use. If this is a " |
||||
"leftover from a crashed instance you need to " |
||||
"remove the lock file manually") |
||||
#FIXME: in python >=3.3 use mode xb |
||||
with open(self._lock_file, 'wb'): |
||||
pass |
||||
|
||||
atexit.register(self.close) |
||||
|
||||
def _remove_lock(self): |
||||
if self._lock_file: |
||||
os.remove(self._lock_file) |
||||
self._lock_file = None |
||||
|
||||
def close(self): |
||||
if not self.read_only and self.was_changed(): |
||||
self._save_file() |
||||
self._remove_lock() |
||||
self.read_only = True |
||||
|
||||
def __del__(self): |
||||
self.close() |
||||
|
||||
|
||||
class VolatileStorage(Storage): |
||||
""" |
||||
Storage that is never actually written to disk and only kept in memory. |
||||
|
||||
This exists for easier testing. |
||||
""" |
||||
|
||||
def __init__(self, password=None, data=None): |
||||
self.file_data = None |
||||
super(VolatileStorage, self).__init__('VOLATILE', password, |
||||
create=True) |
||||
if data: |
||||
self.file_data = data |
||||
self._load_file(password) |
||||
|
||||
def _create_lock(self): |
||||
pass |
||||
|
||||
def _remove_lock(self): |
||||
pass |
||||
|
||||
def _write_file(self, data): |
||||
self.file_data = data |
||||
|
||||
def _read_file(self): |
||||
return self.file_data |
||||
@ -0,0 +1,35 @@
|
||||
from __future__ import print_function, absolute_import, division, unicode_literals |
||||
|
||||
from jmclient import Argon2Hash, get_random_bytes |
||||
import pytest |
||||
|
||||
|
||||
def test_argon2_sanity(): |
||||
pwd = b'password' |
||||
salt = b'saltsalt' |
||||
|
||||
h = Argon2Hash(pwd, salt, 16) |
||||
|
||||
assert len(h.hash) == 16 |
||||
assert h.salt == salt |
||||
assert h.hash == b'\x05;V\xd7fy\xdfI\xa4\xe7F$_\\3\xcb' |
||||
|
||||
|
||||
def test_get_random_bytes(): |
||||
assert len(get_random_bytes(16)) == 16 |
||||
assert get_random_bytes(16) != get_random_bytes(16) |
||||
|
||||
|
||||
def test_argon2(): |
||||
pwd = b'testpass' |
||||
h = Argon2Hash(pwd, hash_len=16, salt_len=22) |
||||
|
||||
assert len(h.hash) == 16 |
||||
assert len(h.salt) == 22 |
||||
|
||||
h2 = Argon2Hash(pwd, h.salt, hash_len=16) |
||||
|
||||
assert h.settings == h2.settings |
||||
assert h.hash == h2.hash |
||||
assert h.salt == h2.salt |
||||
|
||||
@ -0,0 +1,149 @@
|
||||
from __future__ import absolute_import, print_function |
||||
|
||||
"""Blockchaininterface functionality tests.""" |
||||
|
||||
import binascii |
||||
from commontest import create_wallet_for_sync, make_sign_and_push |
||||
|
||||
import pytest |
||||
from jmclient import load_program_config, jm_single, sync_wallet, get_log |
||||
|
||||
log = get_log() |
||||
|
||||
|
||||
def sync_test_wallet(fast, wallet): |
||||
sync_count = 0 |
||||
jm_single().bc_interface.wallet_synced = False |
||||
while not jm_single().bc_interface.wallet_synced: |
||||
sync_wallet(wallet, fast=fast) |
||||
sync_count += 1 |
||||
# avoid infinite loop |
||||
assert sync_count < 10 |
||||
log.debug("Tried " + str(sync_count) + " times") |
||||
|
||||
|
||||
@pytest.mark.parametrize('fast', (False, True)) |
||||
def test_empty_wallet_sync(setup_wallets, fast): |
||||
wallet = create_wallet_for_sync([0, 0, 0, 0, 0], ['test_empty_wallet_sync']) |
||||
|
||||
sync_test_wallet(fast, wallet) |
||||
|
||||
broken = True |
||||
for md in range(wallet.max_mixdepth + 1): |
||||
for internal in (True, False): |
||||
broken = False |
||||
assert 0 == wallet.get_next_unused_index(md, internal) |
||||
assert not broken |
||||
|
||||
|
||||
@pytest.mark.parametrize('fast,internal', ( |
||||
(False, False), (False, True), |
||||
(True, False), (True, True))) |
||||
def test_sequentially_used_wallet_sync(setup_wallets, fast, internal): |
||||
used_count = [1, 3, 6, 2, 23] |
||||
wallet = create_wallet_for_sync( |
||||
used_count, ['test_sequentially_used_wallet_sync'], |
||||
populate_internal=internal) |
||||
|
||||
sync_test_wallet(fast, wallet) |
||||
|
||||
broken = True |
||||
for md in range(len(used_count)): |
||||
broken = False |
||||
assert used_count[md] == wallet.get_next_unused_index(md, internal) |
||||
assert not broken |
||||
|
||||
|
||||
@pytest.mark.parametrize('fast', (False, True)) |
||||
def test_gap_used_wallet_sync(setup_wallets, fast): |
||||
used_count = [1, 3, 6, 2, 23] |
||||
wallet = create_wallet_for_sync(used_count, ['test_gap_used_wallet_sync']) |
||||
wallet.gap_limit = 20 |
||||
|
||||
for md in range(len(used_count)): |
||||
x = -1 |
||||
for x in range(md): |
||||
assert x <= wallet.gap_limit, "test broken" |
||||
# create some unused addresses |
||||
wallet.get_new_script(md, True) |
||||
wallet.get_new_script(md, False) |
||||
used_count[md] += x + 2 |
||||
jm_single().bc_interface.grab_coins(wallet.get_new_addr(md, True), 1) |
||||
jm_single().bc_interface.grab_coins(wallet.get_new_addr(md, False), 1) |
||||
|
||||
# reset indices to simulate completely unsynced wallet |
||||
for md in range(wallet.max_mixdepth + 1): |
||||
wallet.set_next_index(md, True, 0) |
||||
wallet.set_next_index(md, False, 0) |
||||
|
||||
sync_test_wallet(fast, wallet) |
||||
|
||||
broken = True |
||||
for md in range(len(used_count)): |
||||
broken = False |
||||
assert md + 1 == wallet.get_next_unused_index(md, True) |
||||
assert used_count[md] == wallet.get_next_unused_index(md, False) |
||||
assert not broken |
||||
|
||||
|
||||
@pytest.mark.parametrize('fast', (False, True)) |
||||
def test_multigap_used_wallet_sync(setup_wallets, fast): |
||||
start_index = 5 |
||||
used_count = [start_index, 0, 0, 0, 0] |
||||
wallet = create_wallet_for_sync(used_count, ['test_multigap_used_wallet_sync']) |
||||
wallet.gap_limit = 5 |
||||
|
||||
mixdepth = 0 |
||||
for w in range(5): |
||||
for x in range(int(wallet.gap_limit * 0.6)): |
||||
assert x <= wallet.gap_limit, "test broken" |
||||
# create some unused addresses |
||||
wallet.get_new_script(mixdepth, True) |
||||
wallet.get_new_script(mixdepth, False) |
||||
used_count[mixdepth] += x + 2 |
||||
jm_single().bc_interface.grab_coins(wallet.get_new_addr(mixdepth, True), 1) |
||||
jm_single().bc_interface.grab_coins(wallet.get_new_addr(mixdepth, False), 1) |
||||
|
||||
# reset indices to simulate completely unsynced wallet |
||||
for md in range(wallet.max_mixdepth + 1): |
||||
wallet.set_next_index(md, True, 0) |
||||
wallet.set_next_index(md, False, 0) |
||||
|
||||
sync_test_wallet(fast, wallet) |
||||
|
||||
assert used_count[mixdepth] - start_index == wallet.get_next_unused_index(mixdepth, True) |
||||
assert used_count[mixdepth] == wallet.get_next_unused_index(mixdepth, False) |
||||
|
||||
|
||||
@pytest.mark.parametrize('fast', (False, True)) |
||||
def test_retain_unused_indices_wallet_sync(setup_wallets, fast): |
||||
used_count = [0, 0, 0, 0, 0] |
||||
wallet = create_wallet_for_sync(used_count, ['test_retain_unused_indices_wallet_sync']) |
||||
|
||||
for x in range(9): |
||||
wallet.get_new_script(0, 1) |
||||
|
||||
sync_test_wallet(fast, wallet) |
||||
|
||||
assert wallet.get_next_unused_index(0, 1) == 9 |
||||
|
||||
|
||||
@pytest.mark.parametrize('fast', (False, True)) |
||||
def test_imported_wallet_sync(setup_wallets, fast): |
||||
used_count = [0, 0, 0, 0, 0] |
||||
wallet = create_wallet_for_sync(used_count, ['test_imported_wallet_sync']) |
||||
source_wallet = create_wallet_for_sync(used_count, ['test_imported_wallet_sync_origin']) |
||||
|
||||
address = source_wallet.get_new_addr(0, 1) |
||||
wallet.import_private_key(0, source_wallet.get_wif(0, 1, 0)) |
||||
txid = binascii.unhexlify(jm_single().bc_interface.grab_coins(address, 1)) |
||||
|
||||
sync_test_wallet(fast, wallet) |
||||
|
||||
assert wallet._utxos.have_utxo(txid, 0) == 0 |
||||
|
||||
|
||||
@pytest.fixture(scope='module') |
||||
def setup_wallets(): |
||||
load_program_config() |
||||
jm_single().bc_interface.tick_forward_chain_interval = 1 |
||||
@ -0,0 +1,128 @@
|
||||
from __future__ import print_function, absolute_import, division, unicode_literals |
||||
|
||||
from jmclient import storage |
||||
import pytest |
||||
|
||||
|
||||
class MockStorage(storage.Storage): |
||||
def __init__(self, data, *args, **kwargs): |
||||
self.file_data = data |
||||
self.locked = False |
||||
super(type(self), self).__init__(*args, **kwargs) |
||||
|
||||
def _read_file(self): |
||||
if hasattr(self, 'file_data'): |
||||
return self.file_data |
||||
return b'' |
||||
|
||||
def _write_file(self, data): |
||||
self.file_data = data |
||||
|
||||
def _create_lock(self): |
||||
self.locked = not self.read_only |
||||
|
||||
def _remove_lock(self): |
||||
self.locked = False |
||||
|
||||
|
||||
def test_storage(): |
||||
s = MockStorage(None, 'nonexistant', b'password', create=True) |
||||
assert s.file_data.startswith(s.MAGIC_ENC) |
||||
assert s.locked |
||||
assert s.is_encrypted() |
||||
assert not s.was_changed() |
||||
|
||||
old_data = s.file_data |
||||
|
||||
s.data[b'mydata'] = b'test' |
||||
assert s.was_changed() |
||||
s.save() |
||||
assert s.file_data != old_data |
||||
enc_data = s.file_data |
||||
|
||||
old_data = s.file_data |
||||
s.change_password(b'newpass') |
||||
assert s.is_encrypted() |
||||
assert not s.was_changed() |
||||
assert s.file_data != old_data |
||||
|
||||
old_data = s.file_data |
||||
s.change_password(None) |
||||
assert not s.is_encrypted() |
||||
assert not s.was_changed() |
||||
assert s.file_data != old_data |
||||
assert s.file_data.startswith(s.MAGIC_UNENC) |
||||
|
||||
s2 = MockStorage(enc_data, __file__, b'password') |
||||
assert s2.locked |
||||
assert s2.is_encrypted() |
||||
assert not s2.was_changed() |
||||
assert s2.data[b'mydata'] == b'test' |
||||
|
||||
|
||||
def test_storage_invalid(): |
||||
with pytest.raises(storage.StorageError, message="File does not exist"): |
||||
MockStorage(None, 'nonexistant', b'password') |
||||
|
||||
s = MockStorage(None, 'nonexistant', b'password', create=True) |
||||
with pytest.raises(storage.StorageError, message="Wrong password"): |
||||
MockStorage(s.file_data, __file__, b'wrongpass') |
||||
|
||||
with pytest.raises(storage.StorageError, message="No password"): |
||||
MockStorage(s.file_data, __file__) |
||||
|
||||
with pytest.raises(storage.StorageError, message="Non-wallet file, unencrypted"): |
||||
MockStorage(b'garbagefile', __file__) |
||||
|
||||
with pytest.raises(storage.StorageError, message="Non-wallet file, encrypted"): |
||||
MockStorage(b'garbagefile', __file__, b'password') |
||||
|
||||
|
||||
def test_storage_readonly(): |
||||
s = MockStorage(None, 'nonexistant', b'password', create=True) |
||||
s = MockStorage(s.file_data, __file__, b'password', read_only=True) |
||||
s.data[b'mydata'] = b'test' |
||||
|
||||
assert not s.locked |
||||
assert s.was_changed() |
||||
|
||||
with pytest.raises(storage.StorageError): |
||||
s.save() |
||||
|
||||
with pytest.raises(storage.StorageError): |
||||
s.change_password(b'newpass') |
||||
|
||||
|
||||
def test_storage_lock(tmpdir): |
||||
p = str(tmpdir.join('test.jmdat')) |
||||
pw = None |
||||
|
||||
with pytest.raises(storage.StorageError, message="File does not exist"): |
||||
storage.Storage(p, pw) |
||||
|
||||
s = storage.Storage(p, pw, create=True) |
||||
assert s.is_locked() |
||||
assert not s.is_encrypted() |
||||
assert s.data == {} |
||||
|
||||
with pytest.raises(storage.StorageError, message="File is locked"): |
||||
storage.Storage(p, pw) |
||||
|
||||
assert storage.Storage.is_storage_file(p) |
||||
assert not storage.Storage.is_encrypted_storage_file(p) |
||||
|
||||
s.data[b'test'] = b'value' |
||||
s.save() |
||||
s.close() |
||||
del s |
||||
|
||||
s = storage.Storage(p, pw, read_only=True) |
||||
assert not s.is_locked() |
||||
assert s.data == {b'test': b'value'} |
||||
s.close() |
||||
del s |
||||
|
||||
s = storage.Storage(p, pw) |
||||
assert s.is_locked() |
||||
assert s.data == {b'test': b'value'} |
||||
|
||||
@ -0,0 +1,93 @@
|
||||
from __future__ import print_function, absolute_import, division, unicode_literals |
||||
|
||||
from jmclient.wallet import UTXOManager |
||||
from test_storage import MockStorage |
||||
import pytest |
||||
|
||||
from jmclient import load_program_config |
||||
import jmclient |
||||
from commontest import DummyBlockchainInterface |
||||
|
||||
|
||||
def select(unspent, value): |
||||
return unspent |
||||
|
||||
|
||||
def test_utxomanager_persist(setup_env_nodeps): |
||||
storage = MockStorage(None, 'wallet.jmdat', None, create=True) |
||||
UTXOManager.initialize(storage) |
||||
um = UTXOManager(storage, select) |
||||
|
||||
txid = b'\x00' * UTXOManager.TXID_LEN |
||||
index = 0 |
||||
path = (0,) |
||||
mixdepth = 0 |
||||
value = 500 |
||||
|
||||
um.add_utxo(txid, index, path, value, mixdepth) |
||||
um.add_utxo(txid, index+1, path, value, mixdepth+1) |
||||
|
||||
um.save() |
||||
del um |
||||
|
||||
um = UTXOManager(storage, select) |
||||
|
||||
assert um.have_utxo(txid, index) is mixdepth |
||||
assert um.have_utxo(txid, index+1) is mixdepth + 1 |
||||
assert um.have_utxo(txid, index+2) is False |
||||
|
||||
utxos = um.get_utxos_by_mixdepth() |
||||
assert len(utxos[mixdepth]) == 1 |
||||
assert len(utxos[mixdepth+1]) == 1 |
||||
assert len(utxos[mixdepth+2]) == 0 |
||||
|
||||
balances = um.get_balance_by_mixdepth() |
||||
assert balances[mixdepth] == value |
||||
assert balances[mixdepth+1] == value |
||||
|
||||
um.remove_utxo(txid, index, mixdepth) |
||||
assert um.have_utxo(txid, index) is False |
||||
|
||||
um.save() |
||||
del um |
||||
|
||||
um = UTXOManager(storage, select) |
||||
|
||||
assert um.have_utxo(txid, index) is False |
||||
assert um.have_utxo(txid, index+1) is mixdepth + 1 |
||||
|
||||
utxos = um.get_utxos_by_mixdepth() |
||||
assert len(utxos[mixdepth]) == 0 |
||||
assert len(utxos[mixdepth+1]) == 1 |
||||
|
||||
balances = um.get_balance_by_mixdepth() |
||||
assert balances[mixdepth] == 0 |
||||
assert balances[mixdepth+1] == value |
||||
assert balances[mixdepth+2] == 0 |
||||
|
||||
|
||||
def test_utxomanager_select(setup_env_nodeps): |
||||
storage = MockStorage(None, 'wallet.jmdat', None, create=True) |
||||
UTXOManager.initialize(storage) |
||||
um = UTXOManager(storage, select) |
||||
|
||||
txid = b'\x00' * UTXOManager.TXID_LEN |
||||
index = 0 |
||||
path = (0,) |
||||
mixdepth = 0 |
||||
value = 500 |
||||
|
||||
um.add_utxo(txid, index, path, value, mixdepth) |
||||
|
||||
assert len(um.select_utxos(mixdepth, value)) is 1 |
||||
assert len(um.select_utxos(mixdepth+1, value)) is 0 |
||||
|
||||
um.add_utxo(txid, index+1, path, value, mixdepth) |
||||
assert len(um.select_utxos(mixdepth, value)) is 2 |
||||
|
||||
|
||||
@pytest.fixture |
||||
def setup_env_nodeps(monkeypatch): |
||||
monkeypatch.setattr(jmclient.configure, 'get_blockchain_interface_instance', |
||||
lambda x: DummyBlockchainInterface()) |
||||
load_program_config() |
||||
@ -0,0 +1,590 @@
|
||||
from __future__ import print_function, absolute_import, division, unicode_literals |
||||
'''Wallet functionality tests.''' |
||||
|
||||
import os |
||||
import json |
||||
from binascii import hexlify, unhexlify |
||||
|
||||
import pytest |
||||
import jmbitcoin as btc |
||||
from commontest import binarize_tx |
||||
from jmclient import load_program_config, jm_single, get_log,\ |
||||
SegwitLegacyWallet,BIP32Wallet, BIP49Wallet, LegacyWallet,\ |
||||
VolatileStorage, get_network, cryptoengine, WalletError |
||||
|
||||
testdir = os.path.dirname(os.path.realpath(__file__)) |
||||
log = get_log() |
||||
|
||||
|
||||
def signed_tx_is_segwit(tx): |
||||
for inp in tx['ins']: |
||||
if 'txinwitness' not in inp: |
||||
return False |
||||
return True |
||||
|
||||
|
||||
def assert_segwit(tx): |
||||
assert signed_tx_is_segwit(tx) |
||||
|
||||
|
||||
def assert_not_segwit(tx): |
||||
assert not signed_tx_is_segwit(tx) |
||||
|
||||
|
||||
def get_populated_wallet(amount=10**8, num=3): |
||||
storage = VolatileStorage() |
||||
SegwitLegacyWallet.initialize(storage, get_network()) |
||||
wallet = SegwitLegacyWallet(storage) |
||||
|
||||
# fund three wallet addresses at mixdepth 0 |
||||
for i in range(num): |
||||
fund_wallet_addr(wallet, wallet.get_internal_addr(0), amount / 10**8) |
||||
|
||||
return wallet |
||||
|
||||
|
||||
def fund_wallet_addr(wallet, addr, value_btc=1): |
||||
txin_id = jm_single().bc_interface.grab_coins(addr, value_btc) |
||||
txinfo = jm_single().bc_interface.rpc('gettransaction', [txin_id]) |
||||
txin = btc.deserialize(unhexlify(txinfo['hex'])) |
||||
utxo_in = wallet.add_new_utxos_(txin, unhexlify(txin_id)) |
||||
assert len(utxo_in) == 1 |
||||
return list(utxo_in.keys())[0] |
||||
|
||||
|
||||
def get_bip39_vectors(): |
||||
fh = open(os.path.join(testdir, 'bip39vectors.json')) |
||||
data = json.load(fh)['english'] |
||||
fh.close() |
||||
return data |
||||
|
||||
|
||||
@pytest.mark.parametrize('entropy,mnemonic,key,xpriv', get_bip39_vectors()) |
||||
def test_bip39_seeds(monkeypatch, setup_wallet, entropy, mnemonic, key, xpriv): |
||||
jm_single().config.set('BLOCKCHAIN', 'network', 'mainnet') |
||||
created_entropy = SegwitLegacyWallet.entropy_from_mnemonic(mnemonic) |
||||
assert entropy == hexlify(created_entropy) |
||||
storage = VolatileStorage() |
||||
SegwitLegacyWallet.initialize( |
||||
storage, get_network(), entropy=created_entropy, |
||||
entropy_extension=b'TREZOR', max_mixdepth=4) |
||||
wallet = SegwitLegacyWallet(storage) |
||||
assert (mnemonic, b'TREZOR') == wallet.get_mnemonic_words() |
||||
assert key == hexlify(wallet._create_master_key()) |
||||
|
||||
# need to monkeypatch this, else we'll default to the BIP-49 path |
||||
monkeypatch.setattr(SegwitLegacyWallet, '_get_bip32_base_path', |
||||
BIP32Wallet._get_bip32_base_path) |
||||
assert xpriv == wallet.get_bip32_priv_export() |
||||
|
||||
|
||||
def test_bip49_seed(monkeypatch, setup_wallet): |
||||
jm_single().config.set('BLOCKCHAIN', 'network', 'testnet') |
||||
mnemonic = 'abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about' |
||||
master_xpriv = 'tprv8ZgxMBicQKsPe5YMU9gHen4Ez3ApihUfykaqUorj9t6FDqy3nP6eoXiAo2ssvpAjoLroQxHqr3R5nE3a5dU3DHTjTgJDd7zrbniJr6nrCzd' |
||||
account0_xpriv = 'tprv8gRrNu65W2Msef2BdBSUgFdRTGzC8EwVXnV7UGS3faeXtuMVtGfEdidVeGbThs4ELEoayCAzZQ4uUji9DUiAs7erdVskqju7hrBcDvDsdbY' |
||||
addr0_script_hash = '336caa13e08b96080a32b5d818d59b4ab3b36742' |
||||
|
||||
entropy = SegwitLegacyWallet.entropy_from_mnemonic(mnemonic) |
||||
storage = VolatileStorage() |
||||
SegwitLegacyWallet.initialize( |
||||
storage, get_network(), entropy=entropy, max_mixdepth=0) |
||||
wallet = SegwitLegacyWallet(storage) |
||||
assert (mnemonic, None) == wallet.get_mnemonic_words() |
||||
assert account0_xpriv == wallet.get_bip32_priv_export(0) |
||||
assert addr0_script_hash == hexlify(wallet.get_external_script(0)[2:-1]) |
||||
|
||||
# FIXME: is this desired behaviour? BIP49 wallet will not return xpriv for |
||||
# the root key but only for key after base path |
||||
monkeypatch.setattr(SegwitLegacyWallet, '_get_bip32_base_path', |
||||
BIP32Wallet._get_bip32_base_path) |
||||
assert master_xpriv == wallet.get_bip32_priv_export() |
||||
|
||||
|
||||
def test_bip32_test_vector_1(monkeypatch, setup_wallet): |
||||
jm_single().config.set('BLOCKCHAIN', 'network', 'mainnet') |
||||
|
||||
entropy = unhexlify('000102030405060708090a0b0c0d0e0f') |
||||
storage = VolatileStorage() |
||||
LegacyWallet.initialize( |
||||
storage, get_network(), entropy=entropy, max_mixdepth=0) |
||||
|
||||
# test vector 1 is using hardened derivation for the account/mixdepth level |
||||
monkeypatch.setattr(LegacyWallet, '_get_mixdepth_from_path', |
||||
BIP49Wallet._get_mixdepth_from_path) |
||||
monkeypatch.setattr(LegacyWallet, '_get_bip32_mixdepth_path_level', |
||||
BIP49Wallet._get_bip32_mixdepth_path_level) |
||||
monkeypatch.setattr(LegacyWallet, '_get_bip32_base_path', |
||||
BIP32Wallet._get_bip32_base_path) |
||||
monkeypatch.setattr(LegacyWallet, '_create_master_key', |
||||
BIP32Wallet._create_master_key) |
||||
|
||||
wallet = LegacyWallet(storage) |
||||
|
||||
assert wallet.get_bip32_priv_export() == 'xprv9s21ZrQH143K3QTDL4LXw2F7HEK3wJUD2nW2nRk4stbPy6cq3jPPqjiChkVvvNKmPGJxWUtg6LnF5kejMRNNU3TGtRBeJgk33yuGBxrMPHi' |
||||
assert wallet.get_bip32_pub_export() == 'xpub661MyMwAqRbcFtXgS5sYJABqqG9YLmC4Q1Rdap9gSE8NqtwybGhePY2gZ29ESFjqJoCu1Rupje8YtGqsefD265TMg7usUDFdp6W1EGMcet8' |
||||
assert wallet.get_bip32_priv_export(0) == 'xprv9uHRZZhk6KAJC1avXpDAp4MDc3sQKNxDiPvvkX8Br5ngLNv1TxvUxt4cV1rGL5hj6KCesnDYUhd7oWgT11eZG7XnxHrnYeSvkzY7d2bhkJ7' |
||||
assert wallet.get_bip32_pub_export(0) == 'xpub68Gmy5EdvgibQVfPdqkBBCHxA5htiqg55crXYuXoQRKfDBFA1WEjWgP6LHhwBZeNK1VTsfTFUHCdrfp1bgwQ9xv5ski8PX9rL2dZXvgGDnw' |
||||
assert wallet.get_bip32_priv_export(0, 1) == 'xprv9wTYmMFdV23N2TdNG573QoEsfRrWKQgWeibmLntzniatZvR9BmLnvSxqu53Kw1UmYPxLgboyZQaXwTCg8MSY3H2EU4pWcQDnRnrVA1xe8fs' |
||||
assert wallet.get_bip32_pub_export(0, 1) == 'xpub6ASuArnXKPbfEwhqN6e3mwBcDTgzisQN1wXN9BJcM47sSikHjJf3UFHKkNAWbWMiGj7Wf5uMash7SyYq527Hqck2AxYysAA7xmALppuCkwQ' |
||||
# there are more test vectors but those don't match joinmarket's wallet |
||||
# structure, hence they make litte sense to test here |
||||
|
||||
|
||||
def test_bip32_test_vector_2(monkeypatch, setup_wallet): |
||||
jm_single().config.set('BLOCKCHAIN', 'network', 'mainnet') |
||||
|
||||
entropy = unhexlify('fffcf9f6f3f0edeae7e4e1dedbd8d5d2cfccc9c6c3c0bdbab7b4b1aeaba8a5a29f9c999693908d8a8784817e7b7875726f6c696663605d5a5754514e4b484542') |
||||
storage = VolatileStorage() |
||||
LegacyWallet.initialize( |
||||
storage, get_network(), entropy=entropy, max_mixdepth=0) |
||||
|
||||
monkeypatch.setattr(LegacyWallet, '_get_bip32_base_path', |
||||
BIP32Wallet._get_bip32_base_path) |
||||
monkeypatch.setattr(LegacyWallet, '_create_master_key', |
||||
BIP32Wallet._create_master_key) |
||||
|
||||
wallet = LegacyWallet(storage) |
||||
|
||||
assert wallet.get_bip32_priv_export() == 'xprv9s21ZrQH143K31xYSDQpPDxsXRTUcvj2iNHm5NUtrGiGG5e2DtALGdso3pGz6ssrdK4PFmM8NSpSBHNqPqm55Qn3LqFtT2emdEXVYsCzC2U' |
||||
assert wallet.get_bip32_pub_export() == 'xpub661MyMwAqRbcFW31YEwpkMuc5THy2PSt5bDMsktWQcFF8syAmRUapSCGu8ED9W6oDMSgv6Zz8idoc4a6mr8BDzTJY47LJhkJ8UB7WEGuduB' |
||||
assert wallet.get_bip32_priv_export(0) == 'xprv9vHkqa6EV4sPZHYqZznhT2NPtPCjKuDKGY38FBWLvgaDx45zo9WQRUT3dKYnjwih2yJD9mkrocEZXo1ex8G81dwSM1fwqWpWkeS3v86pgKt' |
||||
assert wallet.get_bip32_pub_export(0) == 'xpub69H7F5d8KSRgmmdJg2KhpAK8SR3DjMwAdkxj3ZuxV27CprR9LgpeyGmXUbC6wb7ERfvrnKZjXoUmmDznezpbZb7ap6r1D3tgFxHmwMkQTPH' |
||||
# there are more test vectors but those don't match joinmarket's wallet |
||||
# structure, hence they make litte sense to test here |
||||
|
||||
|
||||
def test_bip32_test_vector_3(monkeypatch, setup_wallet): |
||||
jm_single().config.set('BLOCKCHAIN', 'network', 'mainnet') |
||||
|
||||
entropy = unhexlify('4b381541583be4423346c643850da4b320e46a87ae3d2a4e6da11eba819cd4acba45d239319ac14f863b8d5ab5a0d0c64d2e8a1e7d1457df2e5a3c51c73235be') |
||||
storage = VolatileStorage() |
||||
LegacyWallet.initialize( |
||||
storage, get_network(), entropy=entropy, max_mixdepth=0) |
||||
|
||||
# test vector 3 is using hardened derivation for the account/mixdepth level |
||||
monkeypatch.setattr(LegacyWallet, '_get_mixdepth_from_path', |
||||
BIP49Wallet._get_mixdepth_from_path) |
||||
monkeypatch.setattr(LegacyWallet, '_get_bip32_mixdepth_path_level', |
||||
BIP49Wallet._get_bip32_mixdepth_path_level) |
||||
monkeypatch.setattr(LegacyWallet, '_get_bip32_base_path', |
||||
BIP32Wallet._get_bip32_base_path) |
||||
monkeypatch.setattr(LegacyWallet, '_create_master_key', |
||||
BIP32Wallet._create_master_key) |
||||
|
||||
wallet = LegacyWallet(storage) |
||||
|
||||
assert wallet.get_bip32_priv_export() == 'xprv9s21ZrQH143K25QhxbucbDDuQ4naNntJRi4KUfWT7xo4EKsHt2QJDu7KXp1A3u7Bi1j8ph3EGsZ9Xvz9dGuVrtHHs7pXeTzjuxBrCmmhgC6' |
||||
assert wallet.get_bip32_pub_export() == 'xpub661MyMwAqRbcEZVB4dScxMAdx6d4nFc9nvyvH3v4gJL378CSRZiYmhRoP7mBy6gSPSCYk6SzXPTf3ND1cZAceL7SfJ1Z3GC8vBgp2epUt13' |
||||
assert wallet.get_bip32_priv_export(0) == 'xprv9uPDJpEQgRQfDcW7BkF7eTya6RPxXeJCqCJGHuCJ4GiRVLzkTXBAJMu2qaMWPrS7AANYqdq6vcBcBUdJCVVFceUvJFjaPdGZ2y9WACViL4L' |
||||
assert wallet.get_bip32_pub_export(0) == 'xpub68NZiKmJWnxxS6aaHmn81bvJeTESw724CRDs6HbuccFQN9Ku14VQrADWgqbhhTHBaohPX4CjNLf9fq9MYo6oDaPPLPxSb7gwQN3ih19Zm4Y' |
||||
|
||||
|
||||
@pytest.mark.parametrize('mixdepth,internal,index,address,wif', [ |
||||
[0, 0, 0, 'mpCX9EbdXpcrKMtjEe1fqFhvzctkfzMYTX', 'cVqtSSoVxFyPqTRGfeESi31uCYfgTF4tGWRtGeVs84fzybiX5TPk'], |
||||
[0, 0, 5, 'mtj85a3pFppRhrxNcFig1k7ECshrZjJ9XC', 'cMsFXc4TRw9PTcCTv7x9mr88rDeGXBTLEV67mKaw2cxCkjkhL32G'], |
||||
[0, 1, 3, 'n1EaQuqvTRm719hsSJ7yRsj49JfoG1C86q', 'cUgSTqnAtvYoQRXCYy4wCFfaks2Zrz1d55m6mVhFyVhQbkDi7JGJ'], |
||||
[2, 1, 2, 'mfxkBk7uDhmF5PJGS9d1NonGiAxPwJqQP4', 'cPcZXSiXPuS5eiT4oDrDKi1mFumw5D1RcWzK2gkGdEHjEz99eyXn'] |
||||
]) |
||||
def test_bip32_addresses_p2pkh(monkeypatch, setup_wallet, mixdepth, internal, index, address, wif): |
||||
""" |
||||
Test with a random but fixed entropy |
||||
""" |
||||
jm_single().config.set('BLOCKCHAIN', 'network', 'testnet') |
||||
|
||||
entropy = unhexlify('2e0339ba89b4a1272cdf78b27ee62669ee01992a59e836e2807051be128ca817') |
||||
storage = VolatileStorage() |
||||
LegacyWallet.initialize( |
||||
storage, get_network(), entropy=entropy, max_mixdepth=3) |
||||
|
||||
monkeypatch.setattr(LegacyWallet, '_get_bip32_base_path', |
||||
BIP32Wallet._get_bip32_base_path) |
||||
monkeypatch.setattr(LegacyWallet, '_create_master_key', |
||||
BIP32Wallet._create_master_key) |
||||
|
||||
wallet = LegacyWallet(storage) |
||||
|
||||
# wallet needs to know about all intermediate keys |
||||
for i in range(index + 1): |
||||
wallet.get_new_script(mixdepth, internal) |
||||
|
||||
assert wif == wallet.get_wif(mixdepth, internal, index) |
||||
assert address == wallet.get_addr(mixdepth, internal, index) |
||||
|
||||
|
||||
@pytest.mark.parametrize('mixdepth,internal,index,address,wif', [ |
||||
[0, 0, 0, '2MzY5yyonUY7zpHspg7jB7WQs1uJxKafQe4', 'cRAGLvPmhpzJNgdMT4W2gVwEW3fusfaDqdQWM2vnWLgXKzCWKtcM'], |
||||
[0, 0, 5, '2MsKvqPGStp3yXT8UivuAaGwfPzT7xYwSWk', 'cSo3h7nRuV4fwhVPXeTDJx6cBCkjAzS9VM8APXViyjoSaMq85ZKn'], |
||||
[0, 1, 3, '2N7k6wiQqkuMaApwGhk3HKrifprUSDydqUv', 'cTwq3UsZa8STVmwZR94dDphgqgdLFeuaRFD1Ea44qjbjFfKEb1n5'], |
||||
[2, 1, 2, '2MtE6gzHgmEXeWzKsmCJFEqkrpNuBDvoRnz', 'cPV8FZuCvrRpk4RhmhpjnSucHhaQZUan4Vbyo1NVQtuAxurW9grb'] |
||||
]) |
||||
def test_bip32_addresses_p2sh_p2wpkh(setup_wallet, mixdepth, internal, index, address, wif): |
||||
""" |
||||
Test with a random but fixed entropy |
||||
""" |
||||
jm_single().config.set('BLOCKCHAIN', 'network', 'testnet') |
||||
|
||||
entropy = unhexlify('2e0339ba89b4a1272cdf78b27ee62669ee01992a59e836e2807051be128ca817') |
||||
storage = VolatileStorage() |
||||
SegwitLegacyWallet.initialize( |
||||
storage, get_network(), entropy=entropy, max_mixdepth=3) |
||||
wallet = SegwitLegacyWallet(storage) |
||||
|
||||
# wallet needs to know about all intermediate keys |
||||
for i in range(index + 1): |
||||
wallet.get_new_script(mixdepth, internal) |
||||
|
||||
assert wif == wallet.get_wif(mixdepth, internal, index) |
||||
assert address == wallet.get_addr(mixdepth, internal, index) |
||||
|
||||
|
||||
def test_import_key(setup_wallet): |
||||
jm_single().config.set('BLOCKCHAIN', 'network', 'testnet') |
||||
storage = VolatileStorage() |
||||
SegwitLegacyWallet.initialize(storage, get_network()) |
||||
wallet = SegwitLegacyWallet(storage) |
||||
|
||||
wallet.import_private_key( |
||||
0, 'cRAGLvPmhpzJNgdMT4W2gVwEW3fusfaDqdQWM2vnWLgXKzCWKtcM', |
||||
cryptoengine.TYPE_P2SH_P2WPKH) |
||||
wallet.import_private_key( |
||||
1, 'cVqtSSoVxFyPqTRGfeESi31uCYfgTF4tGWRtGeVs84fzybiX5TPk', |
||||
cryptoengine.TYPE_P2PKH) |
||||
|
||||
with pytest.raises(WalletError): |
||||
wallet.import_private_key( |
||||
1, 'cRAGLvPmhpzJNgdMT4W2gVwEW3fusfaDqdQWM2vnWLgXKzCWKtcM', |
||||
cryptoengine.TYPE_P2SH_P2WPKH) |
||||
|
||||
# test persist imported keys |
||||
wallet.save() |
||||
data = storage.file_data |
||||
|
||||
del wallet |
||||
del storage |
||||
|
||||
storage = VolatileStorage(data=data) |
||||
wallet = SegwitLegacyWallet(storage) |
||||
|
||||
imported_paths_md0 = list(wallet.yield_imported_paths(0)) |
||||
imported_paths_md1 = list(wallet.yield_imported_paths(1)) |
||||
assert len(imported_paths_md0) == 1 |
||||
assert len(imported_paths_md1) == 1 |
||||
|
||||
# verify imported addresses |
||||
assert wallet.get_addr_path(imported_paths_md0[0]) == '2MzY5yyonUY7zpHspg7jB7WQs1uJxKafQe4' |
||||
assert wallet.get_addr_path(imported_paths_md1[0]) == 'mpCX9EbdXpcrKMtjEe1fqFhvzctkfzMYTX' |
||||
|
||||
# test remove key |
||||
wallet.remove_imported_key(path=imported_paths_md0[0]) |
||||
assert not list(wallet.yield_imported_paths(0)) |
||||
|
||||
assert wallet.get_details(imported_paths_md1[0]) == (1, 'imported', 0) |
||||
|
||||
|
||||
@pytest.mark.parametrize('wif,keytype,type_check', [ |
||||
['cVqtSSoVxFyPqTRGfeESi31uCYfgTF4tGWRtGeVs84fzybiX5TPk', |
||||
cryptoengine.TYPE_P2PKH, assert_not_segwit], |
||||
['cRAGLvPmhpzJNgdMT4W2gVwEW3fusfaDqdQWM2vnWLgXKzCWKtcM', |
||||
cryptoengine.TYPE_P2SH_P2WPKH, assert_segwit] |
||||
]) |
||||
def test_signing_imported(setup_wallet, wif, keytype, type_check): |
||||
jm_single().config.set('BLOCKCHAIN', 'network', 'testnet') |
||||
storage = VolatileStorage() |
||||
SegwitLegacyWallet.initialize(storage, get_network()) |
||||
wallet = SegwitLegacyWallet(storage) |
||||
|
||||
MIXDEPTH = 0 |
||||
path = wallet.import_private_key(MIXDEPTH, wif, keytype) |
||||
utxo = fund_wallet_addr(wallet, wallet.get_addr_path(path)) |
||||
tx = btc.deserialize(btc.mktx(['{}:{}'.format(hexlify(utxo[0]), utxo[1])], |
||||
['00'*17 + ':' + str(10**8 - 9000)])) |
||||
binarize_tx(tx) |
||||
script = wallet.get_script_path(path) |
||||
wallet.sign_tx(tx, {0: (script, 10**8)}) |
||||
type_check(tx) |
||||
txout = jm_single().bc_interface.pushtx(hexlify(btc.serialize(tx))) |
||||
assert txout |
||||
|
||||
|
||||
@pytest.mark.parametrize('wallet_cls,type_check', [ |
||||
[LegacyWallet, assert_not_segwit], |
||||
[SegwitLegacyWallet, assert_segwit] |
||||
]) |
||||
def test_signing_simple(setup_wallet, wallet_cls, type_check): |
||||
jm_single().config.set('BLOCKCHAIN', 'network', 'testnet') |
||||
storage = VolatileStorage() |
||||
wallet_cls.initialize(storage, get_network()) |
||||
wallet = wallet_cls(storage) |
||||
utxo = fund_wallet_addr(wallet, wallet.get_internal_addr(0)) |
||||
tx = btc.deserialize(btc.mktx(['{}:{}'.format(hexlify(utxo[0]), utxo[1])], |
||||
['00'*17 + ':' + str(10**8 - 9000)])) |
||||
binarize_tx(tx) |
||||
script = wallet.get_script(0, 1, 0) |
||||
wallet.sign_tx(tx, {0: (script, 10**8)}) |
||||
type_check(tx) |
||||
txout = jm_single().bc_interface.pushtx(hexlify(btc.serialize(tx))) |
||||
assert txout |
||||
|
||||
|
||||
def test_add_utxos(setup_wallet): |
||||
jm_single().config.set('BLOCKCHAIN', 'network', 'testnet') |
||||
amount = 10**8 |
||||
num_tx = 3 |
||||
|
||||
wallet = get_populated_wallet(amount, num_tx) |
||||
|
||||
balances = wallet.get_balance_by_mixdepth() |
||||
assert balances[0] == num_tx * amount |
||||
for md in range(1, wallet.max_mixdepth + 1): |
||||
assert balances[md] == 0 |
||||
|
||||
utxos = wallet.get_utxos_by_mixdepth_() |
||||
assert len(utxos[0]) == num_tx |
||||
for md in range(1, wallet.max_mixdepth + 1): |
||||
assert not utxos[md] |
||||
|
||||
with pytest.raises(Exception): |
||||
# no funds in mixdepth |
||||
wallet.select_utxos_(1, amount) |
||||
|
||||
with pytest.raises(Exception): |
||||
# not enough funds |
||||
wallet.select_utxos_(0, amount * (num_tx + 1)) |
||||
|
||||
wallet.reset_utxos() |
||||
assert wallet.get_balance_by_mixdepth()[0] == 0 |
||||
|
||||
|
||||
def test_select_utxos(setup_wallet): |
||||
jm_single().config.set('BLOCKCHAIN', 'network', 'testnet') |
||||
amount = 10**8 |
||||
|
||||
wallet = get_populated_wallet(amount) |
||||
utxos = wallet.select_utxos_(0, amount // 2) |
||||
|
||||
assert len(utxos) == 1 |
||||
utxos = list(utxos.keys()) |
||||
|
||||
more_utxos = wallet.select_utxos_(0, int(amount * 1.5), utxo_filter=utxos) |
||||
assert len(more_utxos) == 2 |
||||
assert utxos[0] not in more_utxos |
||||
|
||||
|
||||
def test_add_new_utxos(setup_wallet): |
||||
jm_single().config.set('BLOCKCHAIN', 'network', 'testnet') |
||||
wallet = get_populated_wallet(num=1) |
||||
|
||||
scripts = [wallet.get_new_script(x, True) for x in range(3)] |
||||
tx_scripts = list(scripts) |
||||
tx_scripts.append(b'\x22'*17) |
||||
|
||||
tx = btc.deserialize(btc.mktx( |
||||
['0'*64 + ':2'], [{'script': hexlify(s), 'value': 10**8} |
||||
for s in tx_scripts])) |
||||
binarize_tx(tx) |
||||
txid = b'\x01' * 32 |
||||
added = wallet.add_new_utxos_(tx, txid) |
||||
assert len(added) == len(scripts) |
||||
|
||||
added_scripts = {x['script'] for x in added.values()} |
||||
for s in scripts: |
||||
assert s in added_scripts |
||||
|
||||
balances = wallet.get_balance_by_mixdepth() |
||||
assert balances[0] == 2 * 10**8 |
||||
assert balances[1] == 10**8 |
||||
assert balances[2] == 10**8 |
||||
assert len(balances) == wallet.max_mixdepth + 1 |
||||
|
||||
|
||||
def test_remove_old_utxos(setup_wallet): |
||||
jm_single().config.set('BLOCKCHAIN', 'network', 'testnet') |
||||
wallet = get_populated_wallet() |
||||
|
||||
# add some more utxos to mixdepth 1 |
||||
for i in range(3): |
||||
txin = jm_single().bc_interface.grab_coins( |
||||
wallet.get_internal_addr(1), 1) |
||||
wallet.add_utxo(unhexlify(txin), 0, wallet.get_script(1, 1, i), 10**8) |
||||
|
||||
inputs = wallet.select_utxos_(0, 10**8) |
||||
inputs.update(wallet.select_utxos_(1, 2 * 10**8)) |
||||
assert len(inputs) == 3 |
||||
|
||||
tx_inputs = list(inputs.keys()) |
||||
tx_inputs.append((b'\x12'*32, 6)) |
||||
|
||||
tx = btc.deserialize(btc.mktx( |
||||
['{}:{}'.format(hexlify(txid), i) for txid, i in tx_inputs], |
||||
['0' * 36 + ':' + str(3 * 10**8 - 1000)])) |
||||
binarize_tx(tx) |
||||
|
||||
removed = wallet.remove_old_utxos_(tx) |
||||
assert len(removed) == len(inputs) |
||||
|
||||
for txid in removed: |
||||
assert txid in inputs |
||||
|
||||
balances = wallet.get_balance_by_mixdepth() |
||||
assert balances[0] == 2 * 10**8 |
||||
assert balances[1] == 10**8 |
||||
assert balances[2] == 0 |
||||
assert len(balances) == wallet.max_mixdepth + 1 |
||||
|
||||
|
||||
def test_initialize_twice(setup_wallet): |
||||
wallet = get_populated_wallet(num=0) |
||||
storage = wallet._storage |
||||
with pytest.raises(WalletError): |
||||
SegwitLegacyWallet.initialize(storage, get_network()) |
||||
|
||||
|
||||
def test_is_known(setup_wallet): |
||||
wallet = get_populated_wallet(num=0) |
||||
script = wallet.get_new_script(1, True) |
||||
addr = wallet.get_new_addr(2, False) |
||||
|
||||
assert wallet.is_known_script(script) |
||||
assert wallet.is_known_addr(addr) |
||||
assert wallet.is_known_addr(wallet.script_to_addr(script)) |
||||
assert wallet.is_known_script(wallet.addr_to_script(addr)) |
||||
|
||||
assert not wallet.is_known_script(b'\x12' * len(script)) |
||||
assert not wallet.is_known_addr('2MzY5yyonUY7zpHspg7jB7WQs1uJxKafQe4') |
||||
|
||||
|
||||
def test_wallet_save(setup_wallet): |
||||
wallet = get_populated_wallet() |
||||
|
||||
script = wallet.get_external_script(1) |
||||
|
||||
wallet.save() |
||||
storage = wallet._storage |
||||
data = storage.file_data |
||||
|
||||
del wallet |
||||
del storage |
||||
|
||||
storage = VolatileStorage(data=data) |
||||
wallet = SegwitLegacyWallet(storage) |
||||
|
||||
assert wallet.get_next_unused_index(0, True) == 3 |
||||
assert wallet.get_next_unused_index(0, False) == 0 |
||||
assert wallet.get_next_unused_index(1, True) == 0 |
||||
assert wallet.get_next_unused_index(1, False) == 1 |
||||
assert wallet.is_known_script(script) |
||||
|
||||
|
||||
def test_set_next_index(setup_wallet): |
||||
wallet = get_populated_wallet() |
||||
|
||||
assert wallet.get_next_unused_index(0, True) == 3 |
||||
|
||||
with pytest.raises(Exception): |
||||
# cannot advance index without force=True |
||||
wallet.set_next_index(0, True, 5) |
||||
|
||||
wallet.set_next_index(0, True, 1) |
||||
assert wallet.get_next_unused_index(0, True) == 1 |
||||
|
||||
wallet.set_next_index(0, True, 20, force=True) |
||||
assert wallet.get_next_unused_index(0, True) == 20 |
||||
|
||||
script = wallet.get_new_script(0, True) |
||||
path = wallet.script_to_path(script) |
||||
index = wallet.get_details(path)[2] |
||||
assert index == 20 |
||||
|
||||
|
||||
def test_path_repr(setup_wallet): |
||||
wallet = get_populated_wallet() |
||||
path = wallet.get_path(2, False, 0) |
||||
path_repr = wallet.get_path_repr(path) |
||||
path_new = wallet.path_repr_to_path(path_repr) |
||||
|
||||
assert path_new == path |
||||
|
||||
|
||||
def test_path_repr_imported(setup_wallet): |
||||
wallet = get_populated_wallet(num=0) |
||||
path = wallet.import_private_key( |
||||
0, 'cRAGLvPmhpzJNgdMT4W2gVwEW3fusfaDqdQWM2vnWLgXKzCWKtcM', |
||||
cryptoengine.TYPE_P2SH_P2WPKH) |
||||
path_repr = wallet.get_path_repr(path) |
||||
path_new = wallet.path_repr_to_path(path_repr) |
||||
|
||||
assert path_new == path |
||||
|
||||
|
||||
def test_wrong_wallet_cls(setup_wallet): |
||||
storage = VolatileStorage() |
||||
SegwitLegacyWallet.initialize(storage, get_network()) |
||||
wallet = SegwitLegacyWallet(storage) |
||||
|
||||
wallet.save() |
||||
data = storage.file_data |
||||
|
||||
del wallet |
||||
del storage |
||||
|
||||
storage = VolatileStorage(data=data) |
||||
|
||||
with pytest.raises(Exception): |
||||
LegacyWallet(storage) |
||||
|
||||
|
||||
def test_wallet_id(setup_wallet): |
||||
storage1 = VolatileStorage() |
||||
SegwitLegacyWallet.initialize(storage1, get_network()) |
||||
wallet1 = SegwitLegacyWallet(storage1) |
||||
|
||||
storage2 = VolatileStorage() |
||||
LegacyWallet.initialize(storage2, get_network(), entropy=wallet1._entropy) |
||||
wallet2 = LegacyWallet(storage2) |
||||
|
||||
assert wallet1.get_wallet_id() != wallet2.get_wallet_id() |
||||
|
||||
storage2 = VolatileStorage() |
||||
SegwitLegacyWallet.initialize(storage2, get_network(), |
||||
entropy=wallet1._entropy) |
||||
wallet2 = SegwitLegacyWallet(storage2) |
||||
|
||||
assert wallet1.get_wallet_id() == wallet2.get_wallet_id() |
||||
|
||||
|
||||
def test_addr_script_conversion(setup_wallet): |
||||
wallet = get_populated_wallet(num=1) |
||||
|
||||
path = wallet.get_path(0, True, 0) |
||||
script = wallet.get_script_path(path) |
||||
addr = wallet.script_to_addr(script) |
||||
|
||||
assert script == wallet.addr_to_script(addr) |
||||
addr_path = wallet.addr_to_path(addr) |
||||
assert path == addr_path |
||||
|
||||
|
||||
def test_imported_key_removed(setup_wallet): |
||||
wif = 'cRAGLvPmhpzJNgdMT4W2gVwEW3fusfaDqdQWM2vnWLgXKzCWKtcM' |
||||
key_type = cryptoengine.TYPE_P2SH_P2WPKH |
||||
|
||||
storage = VolatileStorage() |
||||
SegwitLegacyWallet.initialize(storage, get_network()) |
||||
wallet = SegwitLegacyWallet(storage) |
||||
|
||||
path = wallet.import_private_key(1, wif, key_type) |
||||
script = wallet.get_script_path(path) |
||||
assert wallet.is_known_script(script) |
||||
|
||||
wallet.remove_imported_key(path=path) |
||||
assert not wallet.is_known_script(script) |
||||
|
||||
with pytest.raises(WalletError): |
||||
wallet.get_script_path(path) |
||||
|
||||
|
||||
@pytest.fixture(scope='module') |
||||
def setup_wallet(): |
||||
load_program_config() |
||||
jm_single().bc_interface.tick_forward_chain_interval = 2 |
||||
@ -0,0 +1,151 @@
|
||||
#!/usr/bin/env python2 |
||||
|
||||
import argparse |
||||
import json |
||||
import os.path |
||||
from hashlib import sha256 |
||||
from binascii import hexlify, unhexlify |
||||
from collections import defaultdict |
||||
|
||||
from jmclient import Storage, decryptData, load_program_config |
||||
from jmclient.wallet_utils import get_password, get_wallet_cls,\ |
||||
cli_get_wallet_passphrase_check, get_wallet_path |
||||
from jmbitcoin import wif_compressed_privkey |
||||
|
||||
|
||||
class ConvertException(Exception): |
||||
pass |
||||
|
||||
|
||||
def get_max_mixdepth(data): |
||||
return max(1, len(data.get('index_cache', [1])) - 1, |
||||
*data.get('imported', {}).keys()) |
||||
|
||||
|
||||
def is_encrypted(wallet_data): |
||||
return 'encrypted_seed' in wallet_data or 'encrypted_entropy' in wallet_data |
||||
|
||||
|
||||
def double_sha256(plaintext): |
||||
return sha256(sha256(plaintext).digest()).digest() |
||||
|
||||
|
||||
def decrypt_entropy_extension(enc_data, key): |
||||
data = decryptData(key, unhexlify(enc_data)) |
||||
if data[-9] != b'\xff': |
||||
raise ConvertException("Wrong password.") |
||||
chunks = data.split(b'\xff') |
||||
if len(chunks) < 3 or data[-8:] != double_sha256(chunks[1])[:8]: |
||||
raise ConvertException("Wrong password.") |
||||
return chunks[1] |
||||
|
||||
|
||||
def decrypt_wallet_data(data, password): |
||||
key = double_sha256(password.encode('utf-8')) |
||||
|
||||
enc_entropy = data.get('encrypted_seed') or data.get('encrypted_entropy') |
||||
enc_entropy_ext = data.get('encrypted_mnemonic_extension') |
||||
enc_imported = data.get('imported_keys') |
||||
|
||||
entropy = decryptData(key, unhexlify(enc_entropy)) |
||||
data['entropy'] = entropy |
||||
if enc_entropy_ext: |
||||
data['entropy_ext'] = decrypt_entropy_extension(enc_entropy_ext, key) |
||||
|
||||
if enc_imported: |
||||
imported_keys = defaultdict(list) |
||||
for e in enc_imported: |
||||
md = int(e['mixdepth']) |
||||
imported_enc_key = unhexlify(e['encrypted_privkey']) |
||||
imported_key = decryptData(key, imported_enc_key) |
||||
imported_keys[md].append(imported_key) |
||||
data['imported'] = imported_keys |
||||
|
||||
|
||||
def new_wallet_from_data(data, file_name): |
||||
print("Creating new wallet file.") |
||||
new_pw = cli_get_wallet_passphrase_check() |
||||
if new_pw is False: |
||||
return False |
||||
|
||||
storage = Storage(file_name, create=True, password=new_pw) |
||||
wallet_cls = get_wallet_cls() |
||||
|
||||
kwdata = { |
||||
'entropy': data['entropy'], |
||||
'timestamp': data.get('creation_time'), |
||||
'max_mixdepth': get_max_mixdepth(data) |
||||
} |
||||
|
||||
if 'entropy_ext' in data: |
||||
kwdata['entropy_extension'] = data['entropy_ext'] |
||||
|
||||
wallet_cls.initialize(storage, data['network'], **kwdata) |
||||
wallet = wallet_cls(storage) |
||||
|
||||
if 'index_cache' in data: |
||||
for md, indices in enumerate(data['index_cache']): |
||||
wallet.set_next_index(md, 0, indices[0], force=True) |
||||
wallet.set_next_index(md, 1, indices[1], force=True) |
||||
|
||||
if 'imported' in data: |
||||
for md in data['imported']: |
||||
for privkey in data['imported'][md]: |
||||
privkey += b'\x01' |
||||
wif = wif_compressed_privkey(hexlify(privkey)) |
||||
wallet.import_private_key(md, wif) |
||||
|
||||
wallet.save() |
||||
wallet.close() |
||||
return True |
||||
|
||||
|
||||
def parse_old_wallet(fh): |
||||
file_data = json.load(fh) |
||||
|
||||
if is_encrypted(file_data): |
||||
pw = get_password("Enter password for old wallet file: ") |
||||
try: |
||||
decrypt_wallet_data(file_data, pw) |
||||
except ValueError: |
||||
print("Failed to open wallet: bad password") |
||||
return |
||||
except Exception as e: |
||||
print("Error: {}".format(e)) |
||||
print("Failed to open wallet. Wrong password?") |
||||
return |
||||
|
||||
return file_data |
||||
|
||||
|
||||
def main(): |
||||
parser = argparse.ArgumentParser( |
||||
description="Convert old joinmarket json wallet format to new jmdat " |
||||
"format") |
||||
parser.add_argument('old_wallet_file', type=open) |
||||
parser.add_argument('--name', '-n', required=False, dest='name', |
||||
help="Name of the new wallet file. Default: [old wallet name].jmdat") |
||||
|
||||
try: |
||||
args = parser.parse_args() |
||||
except Exception as e: |
||||
print("Error: {}".format(e)) |
||||
return |
||||
|
||||
data = parse_old_wallet(args.old_wallet_file) |
||||
|
||||
if not data: |
||||
return |
||||
|
||||
file_name = args.name or\ |
||||
os.path.split(args.old_wallet_file.name)[-1].rsplit('.', 1)[0] + '.jmdat' |
||||
wallet_path = get_wallet_path(file_name, None) |
||||
if new_wallet_from_data(data, wallet_path): |
||||
print("New wallet file created at {}".format(wallet_path)) |
||||
else: |
||||
print("Failed to convert wallet.") |
||||
|
||||
|
||||
if __name__ == '__main__': |
||||
load_program_config() |
||||
main() |
||||
Loading…
Reference in new issue