10 changed files with 3104 additions and 484 deletions
@ -0,0 +1,266 @@
|
||||
from __future__ import print_function, absolute_import, division, unicode_literals |
||||
|
||||
|
||||
from binascii import hexlify, unhexlify |
||||
from collections import OrderedDict |
||||
|
||||
|
||||
from . import btc |
||||
from .configure import get_network |
||||
|
||||
|
||||
TYPE_P2PKH, TYPE_P2SH_P2WPKH, TYPE_P2WPKH = range(3) |
||||
NET_MAINNET, NET_TESTNET = range(2) |
||||
NET_MAP = {'mainnet': NET_MAINNET, 'testnet': NET_TESTNET} |
||||
WIF_PREFIX_MAP = {'mainnet': 0x80, 'testnet': 0xef} |
||||
BIP44_COIN_MAP = {'mainnet': 2**31, 'testnet': 2**31 + 1} |
||||
|
||||
|
||||
# |
||||
# library stuff that should be in btc/jmbitcoin |
||||
# |
||||
|
||||
|
||||
P2PKH_PRE, P2PKH_POST = b'\x76\xa9\x14', b'\x88\xac' |
||||
P2SH_P2WPKH_PRE, P2SH_P2WPKH_POST = b'\xa9\x14', b'\x87' |
||||
P2WPKH_PRE = b'\x00\x14' |
||||
|
||||
|
||||
def _pubkey_to_script(pubkey, script_pre, script_post=b''): |
||||
# sanity check for public key |
||||
# see https://github.com/bitcoin/bitcoin/blob/master/src/pubkey.h |
||||
if not ((len(pubkey) == 33 and pubkey[0] in (b'\x02', b'\x03')) or |
||||
(len(pubkey) == 65 and pubkey[0] in (b'\x04', b'\x06', b'\x07'))): |
||||
raise Exception("Invalid public key!") |
||||
h = btc.bin_hash160(pubkey) |
||||
assert len(h) == 0x14 |
||||
assert script_pre[-1] == b'\x14' |
||||
return script_pre + h + script_post |
||||
|
||||
|
||||
def pubkey_to_p2pkh_script(pubkey): |
||||
return _pubkey_to_script(pubkey, P2PKH_PRE, P2PKH_POST) |
||||
|
||||
|
||||
def pubkey_to_p2sh_p2wpkh_script(pubkey): |
||||
wscript = pubkey_to_p2wpkh_script(pubkey) |
||||
return P2SH_P2WPKH_PRE + btc.bin_hash160(wscript) + P2SH_P2WPKH_POST |
||||
|
||||
|
||||
def pubkey_to_p2wpkh_script(pubkey): |
||||
return _pubkey_to_script(pubkey, P2WPKH_PRE) |
||||
|
||||
|
||||
class classproperty(object): |
||||
""" |
||||
from https://stackoverflow.com/a/5192374 |
||||
""" |
||||
def __init__(self, f): |
||||
self.f = f |
||||
|
||||
def __get__(self, obj, owner): |
||||
return self.f(owner) |
||||
|
||||
|
||||
class SimpleLruCache(OrderedDict): |
||||
""" |
||||
note: python3.2 has a lru cache in functools |
||||
""" |
||||
def __init__(self, max_size): |
||||
OrderedDict.__init__(self) |
||||
assert max_size > 0 |
||||
self.max_size = max_size |
||||
|
||||
def __setitem__(self, key, value): |
||||
OrderedDict.__setitem__(self, key, value) |
||||
self._adjust_size() |
||||
|
||||
def _adjust_size(self): |
||||
while len(self) > self.max_size: |
||||
self.popitem(last=False) |
||||
|
||||
|
||||
# |
||||
# library stuff end |
||||
# |
||||
|
||||
|
||||
class EngineError(Exception): |
||||
pass |
||||
|
||||
|
||||
class BTCEngine(object): |
||||
# must be set by subclasses |
||||
VBYTE = None |
||||
__LRU_KEY_CACHE = SimpleLruCache(50) |
||||
|
||||
@classproperty |
||||
def BIP32_priv_vbytes(cls): |
||||
return btc.PRIVATE[NET_MAP[get_network()]] |
||||
|
||||
@classproperty |
||||
def WIF_PREFIX(cls): |
||||
return WIF_PREFIX_MAP[get_network()] |
||||
|
||||
@classproperty |
||||
def BIP44_COIN_TYPE(cls): |
||||
return BIP44_COIN_MAP[get_network()] |
||||
|
||||
@staticmethod |
||||
def privkey_to_pubkey(privkey): |
||||
return btc.privkey_to_pubkey(privkey, False) |
||||
|
||||
@staticmethod |
||||
def address_to_script(addr): |
||||
return unhexlify(btc.address_to_script(addr)) |
||||
|
||||
@classmethod |
||||
def wif_to_privkey(cls, wif): |
||||
raw = btc.b58check_to_bin(wif) |
||||
vbyte = btc.get_version_byte(wif) |
||||
|
||||
if (btc.BTC_P2PK_VBYTE[get_network()] + cls.WIF_PREFIX) & 0xff == vbyte: |
||||
key_type = TYPE_P2PKH |
||||
elif (btc.BTC_P2SH_VBYTE[get_network()] + cls.WIF_PREFIX) & 0xff == vbyte: |
||||
key_type = TYPE_P2SH_P2WPKH |
||||
else: |
||||
key_type = None |
||||
|
||||
return raw, key_type |
||||
|
||||
@classmethod |
||||
def privkey_to_wif(cls, priv): |
||||
return btc.bin_to_b58check(priv, cls.WIF_PREFIX) |
||||
|
||||
@classmethod |
||||
def derive_bip32_master_key(cls, seed): |
||||
# FIXME: slight encoding mess |
||||
return btc.bip32_deserialize( |
||||
btc.bip32_master_key(seed, vbytes=cls.BIP32_priv_vbytes)) |
||||
|
||||
@classmethod |
||||
def derive_bip32_privkey(cls, master_key, path): |
||||
assert len(path) > 1 |
||||
return cls._walk_bip32_path(master_key, path)[-1] |
||||
|
||||
@classmethod |
||||
def derive_bip32_pub_export(cls, master_key, path): |
||||
priv = cls._walk_bip32_path(master_key, path) |
||||
return btc.bip32_serialize(btc.raw_bip32_privtopub(priv)) |
||||
|
||||
@classmethod |
||||
def derive_bip32_priv_export(cls, master_key, path): |
||||
return btc.bip32_serialize(cls._walk_bip32_path(master_key, path)) |
||||
|
||||
@classmethod |
||||
def _walk_bip32_path(cls, master_key, path): |
||||
key = master_key |
||||
for lvl in path[1:]: |
||||
assert lvl >= 0 |
||||
assert lvl < 2**32 |
||||
if (key, lvl) in cls.__LRU_KEY_CACHE: |
||||
key = cls.__LRU_KEY_CACHE[(key, lvl)] |
||||
else: |
||||
cls.__LRU_KEY_CACHE[(key, lvl)] = btc.raw_bip32_ckd(key, lvl) |
||||
key = cls.__LRU_KEY_CACHE[(key, lvl)] |
||||
return key |
||||
|
||||
@classmethod |
||||
def privkey_to_script(cls, privkey): |
||||
pub = cls.privkey_to_pubkey(privkey) |
||||
return cls.pubkey_to_script(pub) |
||||
|
||||
@classmethod |
||||
def pubkey_to_script(cls, pubkey): |
||||
raise NotImplementedError() |
||||
|
||||
@classmethod |
||||
def privkey_to_address(cls, privkey): |
||||
script = cls.privkey_to_script(privkey) |
||||
return btc.script_to_address(script, cls.VBYTE) |
||||
|
||||
@classmethod |
||||
def pubkey_to_address(cls, pubkey): |
||||
script = cls.pubkey_to_script(pubkey) |
||||
return btc.script_to_address(script, cls.VBYTE) |
||||
|
||||
@classmethod |
||||
def sign_transaction(cls, tx, index, privkey, amount): |
||||
raise NotImplementedError() |
||||
|
||||
@staticmethod |
||||
def sign_message(privkey, message): |
||||
""" |
||||
args: |
||||
privkey: bytes |
||||
message: bytes |
||||
returns: |
||||
base64-encoded signature |
||||
""" |
||||
return btc.ecdsa_sign(message, privkey, True, False) |
||||
|
||||
@classmethod |
||||
def script_to_address(cls, script): |
||||
return btc.script_to_address(script, vbyte=cls.VBYTE) |
||||
|
||||
|
||||
class BTC_P2PKH(BTCEngine): |
||||
@classproperty |
||||
def VBYTE(cls): |
||||
return btc.BTC_P2PK_VBYTE[get_network()] |
||||
|
||||
@classmethod |
||||
def pubkey_to_script(cls, pubkey): |
||||
return pubkey_to_p2pkh_script(pubkey) |
||||
|
||||
@classmethod |
||||
def sign_transaction(cls, tx, index, privkey, *args, **kwargs): |
||||
hashcode = kwargs.get('hashcode') or btc.SIGHASH_ALL |
||||
|
||||
pubkey = cls.privkey_to_pubkey(privkey) |
||||
script = cls.pubkey_to_script(pubkey) |
||||
|
||||
signing_tx = btc.serialize(btc.signature_form(tx, index, script, |
||||
hashcode=hashcode)) |
||||
# FIXME: encoding mess |
||||
sig = unhexlify(btc.ecdsa_tx_sign(signing_tx, hexlify(privkey), |
||||
**kwargs)) |
||||
|
||||
tx['ins'][index]['script'] = btc.serialize_script([sig, pubkey]) |
||||
|
||||
return tx |
||||
|
||||
|
||||
class BTC_P2SH_P2WPKH(BTCEngine): |
||||
# FIXME: implement different bip32 key export prefixes like electrum? |
||||
# see http://docs.electrum.org/en/latest/seedphrase.html#list-of-reserved-numbers |
||||
|
||||
@classproperty |
||||
def VBYTE(cls): |
||||
return btc.BTC_P2SH_VBYTE[get_network()] |
||||
|
||||
@classmethod |
||||
def pubkey_to_script(cls, pubkey): |
||||
return pubkey_to_p2sh_p2wpkh_script(pubkey) |
||||
|
||||
@classmethod |
||||
def sign_transaction(cls, tx, index, privkey, amount, |
||||
hashcode=btc.SIGHASH_ALL, **kwargs): |
||||
assert amount is not None |
||||
|
||||
pubkey = cls.privkey_to_pubkey(privkey) |
||||
wpkscript = pubkey_to_p2wpkh_script(pubkey) |
||||
pkscript = pubkey_to_p2pkh_script(pubkey) |
||||
|
||||
signing_tx = btc.segwit_signature_form(tx, index, pkscript, amount, |
||||
hashcode=hashcode, |
||||
decoder_func=lambda x: x) |
||||
# FIXME: encoding mess |
||||
sig = unhexlify(btc.ecdsa_tx_sign(signing_tx, hexlify(privkey), |
||||
hashcode=hashcode, **kwargs)) |
||||
|
||||
assert len(wpkscript) == 0x16 |
||||
tx['ins'][index]['script'] = b'\x16' + wpkscript |
||||
tx['ins'][index]['txinwitness'] = [sig, pubkey] |
||||
|
||||
return tx |
||||
@ -0,0 +1,327 @@
|
||||
from __future__ import print_function, absolute_import, division, unicode_literals |
||||
|
||||
import os |
||||
import shutil |
||||
import atexit |
||||
import bencoder |
||||
import pyaes |
||||
from hashlib import sha256 |
||||
from argon2 import low_level |
||||
from .support import get_random_bytes |
||||
|
||||
|
||||
class Argon2Hash(object): |
||||
def __init__(self, password, salt=None, hash_len=32, salt_len=16, |
||||
time_cost=500, memory_cost=1000, parallelism=4, |
||||
argon2_type=low_level.Type.I, version=19): |
||||
""" |
||||
args: |
||||
password: password as bytes |
||||
salt: salt in bytes or None to create random one, must have length >= 8 |
||||
hash_len: generated hash length in bytes |
||||
salt_len: salt length in bytes, ignored if salt is not None, must be >= 8 |
||||
|
||||
Other arguments are argon2 settings. Only change those if you know what |
||||
you're doing. Optimized for slow hashing suitable for file encryption. |
||||
""" |
||||
# Default and recommended settings from argon2.PasswordHasher are for |
||||
# interactive logins. For encryption we want something much slower. |
||||
self.settings = { |
||||
'time_cost': time_cost, |
||||
'memory_cost': memory_cost, |
||||
'parallelism': parallelism, |
||||
'hash_len': hash_len, |
||||
'type': argon2_type, |
||||
'version': version |
||||
} |
||||
self.salt = salt if salt is not None else get_random_bytes(salt_len) |
||||
self.hash = low_level.hash_secret_raw(password, self.salt, |
||||
**self.settings) |
||||
|
||||
|
||||
class StorageError(Exception): |
||||
pass |
||||
|
||||
|
||||
class StoragePasswordError(StorageError): |
||||
pass |
||||
|
||||
|
||||
class Storage(object): |
||||
""" |
||||
Responsible for reading/writing [encrypted] data to disk. |
||||
|
||||
All data to be stored must be added to self.data which defaults to an |
||||
empty dict. |
||||
|
||||
self.data must contain serializable data (dict, list, tuple, bytes, numbers) |
||||
Having str objects anywhere in self.data will lead to undefined behaviour (py3). |
||||
All dict keys must be bytes. |
||||
|
||||
KDF: argon2, ENC: AES-256-CBC |
||||
""" |
||||
MAGIC_UNENC = b'JMWALLET' |
||||
MAGIC_ENC = b'JMENCWLT' |
||||
MAGIC_DETECT_ENC = b'JMWALLET' |
||||
|
||||
ENC_KEY_BYTES = 32 # AES-256 |
||||
SALT_LENGTH = 16 |
||||
|
||||
def __init__(self, path, password=None, create=False, read_only=False): |
||||
""" |
||||
args: |
||||
path: file path to storage |
||||
password: bytes or None for unencrypted file |
||||
create: create file if it does not exist |
||||
read_only: do not change anything on the file system |
||||
""" |
||||
self.path = path |
||||
self._lock_file = None |
||||
self._hash = None |
||||
self._data_checksum = None |
||||
self.data = None |
||||
self.changed = False |
||||
self.read_only = read_only |
||||
self.newly_created = False |
||||
|
||||
if not os.path.isfile(path): |
||||
if create and not read_only: |
||||
self._create_new(password) |
||||
self._save_file() |
||||
self.newly_created = True |
||||
else: |
||||
raise StorageError("File not found.") |
||||
else: |
||||
self._load_file(password) |
||||
|
||||
assert self.data is not None |
||||
assert self._data_checksum is not None |
||||
|
||||
self._create_lock() |
||||
|
||||
def is_encrypted(self): |
||||
return self._hash is not None |
||||
|
||||
def is_locked(self): |
||||
return self._lock_file and os.path.exists(self._lock_file) |
||||
|
||||
def was_changed(self): |
||||
""" |
||||
return True if data differs from data on disk |
||||
""" |
||||
return self._data_checksum != self._get_data_checksum() |
||||
|
||||
def change_password(self, password): |
||||
if self.read_only: |
||||
raise StorageError("Cannot change password of read-only file.") |
||||
self._set_hash(password) |
||||
self._save_file() |
||||
|
||||
def save(self): |
||||
""" |
||||
Write file to disk if data was modified |
||||
""" |
||||
#if not self.was_changed(): |
||||
# return |
||||
if self.read_only: |
||||
raise StorageError("Read-only storage cannot be saved.") |
||||
self._save_file() |
||||
|
||||
@classmethod |
||||
def is_storage_file(cls, path): |
||||
return cls._get_file_magic(path) in (cls.MAGIC_ENC, cls.MAGIC_UNENC) |
||||
|
||||
@classmethod |
||||
def is_encrypted_storage_file(cls, path): |
||||
return cls._get_file_magic(path) == cls.MAGIC_ENC |
||||
|
||||
@classmethod |
||||
def _get_file_magic(cls, path): |
||||
assert len(cls.MAGIC_ENC) == len(cls.MAGIC_UNENC) |
||||
with open(path, 'rb') as fh: |
||||
return fh.read(len(cls.MAGIC_ENC)) |
||||
|
||||
def _get_data_checksum(self): |
||||
if self.data is None: #pragma: no cover |
||||
return None |
||||
return sha256(self._serialize(self.data)).digest() |
||||
|
||||
def _update_data_hash(self): |
||||
self._data_checksum = self._get_data_checksum() |
||||
|
||||
def _create_new(self, password): |
||||
self.data = {} |
||||
self._set_hash(password) |
||||
|
||||
def _set_hash(self, password): |
||||
if password is None: |
||||
self._hash = None |
||||
else: |
||||
self._hash = self._hash_password(password) |
||||
|
||||
def _save_file(self): |
||||
assert self.read_only == False |
||||
data = self._serialize(self.data) |
||||
enc_data = self._encrypt_file(data) |
||||
|
||||
magic = self.MAGIC_UNENC if data is enc_data else self.MAGIC_ENC |
||||
self._write_file(magic + enc_data) |
||||
self._update_data_hash() |
||||
|
||||
def _load_file(self, password): |
||||
data = self._read_file() |
||||
assert len(self.MAGIC_ENC) == len(self.MAGIC_UNENC) == 8 |
||||
magic = data[:8] |
||||
|
||||
if magic not in (self.MAGIC_ENC, self.MAGIC_UNENC): |
||||
raise StorageError("File does not appear to be a joinmarket wallet.") |
||||
|
||||
data = data[8:] |
||||
|
||||
if magic == self.MAGIC_ENC: |
||||
if password is None: |
||||
raise StorageError("Password required to open wallet.") |
||||
data = self._decrypt_file(password, data) |
||||
else: |
||||
assert magic == self.MAGIC_UNENC |
||||
|
||||
self.data = self._deserialize(data) |
||||
self._update_data_hash() |
||||
|
||||
def _write_file(self, data): |
||||
assert self.read_only is False |
||||
|
||||
if not os.path.exists(self.path): |
||||
# newly created storage |
||||
with open(self.path, 'wb') as fh: |
||||
fh.write(data) |
||||
return |
||||
|
||||
# using a tmpfile ensures the write is atomic |
||||
tmpfile = '{}.tmp'.format(self.path) |
||||
|
||||
with open(tmpfile, 'wb') as fh: |
||||
shutil.copystat(self.path, tmpfile) |
||||
fh.write(data) |
||||
|
||||
#FIXME: behaviour with symlinks might be weird |
||||
shutil.move(tmpfile, self.path) |
||||
|
||||
def _read_file(self): |
||||
# this method mainly exists for easier mocking |
||||
with open(self.path, 'rb') as fh: |
||||
return fh.read() |
||||
|
||||
@staticmethod |
||||
def _serialize(data): |
||||
return bencoder.bencode(data) |
||||
|
||||
@staticmethod |
||||
def _deserialize(data): |
||||
return bencoder.bdecode(data) |
||||
|
||||
def _encrypt_file(self, data): |
||||
if not self.is_encrypted(): |
||||
return data |
||||
|
||||
iv = get_random_bytes(16) |
||||
container = { |
||||
b'enc': {b'salt': self._hash.salt, b'iv': iv}, |
||||
b'data': self._encrypt(data, iv) |
||||
} |
||||
return self._serialize(container) |
||||
|
||||
def _decrypt_file(self, password, data): |
||||
assert password is not None |
||||
|
||||
container = self._deserialize(data) |
||||
assert b'enc' in container |
||||
assert b'data' in container |
||||
|
||||
self._hash = self._hash_password(password, container[b'enc'][b'salt']) |
||||
|
||||
return self._decrypt(container[b'data'], container[b'enc'][b'iv']) |
||||
|
||||
def _encrypt(self, data, iv): |
||||
encrypter = pyaes.Encrypter( |
||||
pyaes.AESModeOfOperationCBC(self._hash.hash, iv=iv)) |
||||
enc_data = encrypter.feed(self.MAGIC_DETECT_ENC + data) |
||||
enc_data += encrypter.feed() |
||||
|
||||
return enc_data |
||||
|
||||
def _decrypt(self, data, iv): |
||||
decrypter = pyaes.Decrypter( |
||||
pyaes.AESModeOfOperationCBC(self._hash.hash, iv=iv)) |
||||
try: |
||||
dec_data = decrypter.feed(data) |
||||
dec_data += decrypter.feed() |
||||
except ValueError: |
||||
# in most "wrong password" cases the pkcs7 padding will be wrong |
||||
raise StoragePasswordError("Wrong password.") |
||||
|
||||
if not dec_data.startswith(self.MAGIC_DETECT_ENC): |
||||
raise StoragePasswordError("Wrong password.") |
||||
return dec_data[len(self.MAGIC_DETECT_ENC):] |
||||
|
||||
@classmethod |
||||
def _hash_password(cls, password, salt=None): |
||||
return Argon2Hash(password, salt, |
||||
hash_len=cls.ENC_KEY_BYTES, salt_len=cls.SALT_LENGTH) |
||||
|
||||
def _create_lock(self): |
||||
if self.read_only: |
||||
return |
||||
self._lock_file = '{}.lock'.format(self.path) |
||||
if os.path.exists(self._lock_file): |
||||
self._lock_file = None |
||||
raise StorageError("File is currently in use. If this is a " |
||||
"leftover from a crashed instance you need to " |
||||
"remove the lock file manually") |
||||
#FIXME: in python >=3.3 use mode xb |
||||
with open(self._lock_file, 'wb'): |
||||
pass |
||||
|
||||
atexit.register(self.close) |
||||
|
||||
def _remove_lock(self): |
||||
if self._lock_file: |
||||
os.remove(self._lock_file) |
||||
self._lock_file = None |
||||
|
||||
def close(self): |
||||
if not self.read_only and self.was_changed(): |
||||
self._save_file() |
||||
self._remove_lock() |
||||
self.read_only = True |
||||
|
||||
def __del__(self): |
||||
self.close() |
||||
|
||||
|
||||
class VolatileStorage(Storage): |
||||
""" |
||||
Storage that is never actually written to disk and only kept in memory. |
||||
|
||||
This exists for easier testing. |
||||
""" |
||||
|
||||
def __init__(self, password=None, data=None): |
||||
self.file_data = None |
||||
super(VolatileStorage, self).__init__('VOLATILE', password, |
||||
create=True) |
||||
if data: |
||||
self.file_data = data |
||||
self._load_file(password) |
||||
|
||||
def _create_lock(self): |
||||
pass |
||||
|
||||
def _remove_lock(self): |
||||
pass |
||||
|
||||
def _write_file(self, data): |
||||
self.file_data = data |
||||
|
||||
def _read_file(self): |
||||
return self.file_data |
||||
@ -0,0 +1,35 @@
|
||||
from __future__ import print_function, absolute_import, division, unicode_literals |
||||
|
||||
from jmclient import Argon2Hash, get_random_bytes |
||||
import pytest |
||||
|
||||
|
||||
def test_argon2_sanity(): |
||||
pwd = b'password' |
||||
salt = b'saltsalt' |
||||
|
||||
h = Argon2Hash(pwd, salt, 16) |
||||
|
||||
assert len(h.hash) == 16 |
||||
assert h.salt == salt |
||||
assert h.hash == b'\x05;V\xd7fy\xdfI\xa4\xe7F$_\\3\xcb' |
||||
|
||||
|
||||
def test_get_random_bytes(): |
||||
assert len(get_random_bytes(16)) == 16 |
||||
assert get_random_bytes(16) != get_random_bytes(16) |
||||
|
||||
|
||||
def test_argon2(): |
||||
pwd = b'testpass' |
||||
h = Argon2Hash(pwd, hash_len=16, salt_len=22) |
||||
|
||||
assert len(h.hash) == 16 |
||||
assert len(h.salt) == 22 |
||||
|
||||
h2 = Argon2Hash(pwd, h.salt, hash_len=16) |
||||
|
||||
assert h.settings == h2.settings |
||||
assert h.hash == h2.hash |
||||
assert h.salt == h2.salt |
||||
|
||||
@ -0,0 +1,149 @@
|
||||
from __future__ import absolute_import, print_function |
||||
|
||||
"""Blockchaininterface functionality tests.""" |
||||
|
||||
import binascii |
||||
from commontest import create_wallet_for_sync, make_sign_and_push |
||||
|
||||
import pytest |
||||
from jmclient import load_program_config, jm_single, sync_wallet, get_log |
||||
|
||||
log = get_log() |
||||
|
||||
|
||||
def sync_test_wallet(fast, wallet): |
||||
sync_count = 0 |
||||
jm_single().bc_interface.wallet_synced = False |
||||
while not jm_single().bc_interface.wallet_synced: |
||||
sync_wallet(wallet, fast=fast) |
||||
sync_count += 1 |
||||
# avoid infinite loop |
||||
assert sync_count < 10 |
||||
log.debug("Tried " + str(sync_count) + " times") |
||||
|
||||
|
||||
@pytest.mark.parametrize('fast', (False, True)) |
||||
def test_empty_wallet_sync(setup_wallets, fast): |
||||
wallet = create_wallet_for_sync([0, 0, 0, 0, 0], ['test_empty_wallet_sync']) |
||||
|
||||
sync_test_wallet(fast, wallet) |
||||
|
||||
broken = True |
||||
for md in range(wallet.max_mixdepth + 1): |
||||
for internal in (True, False): |
||||
broken = False |
||||
assert 0 == wallet.get_next_unused_index(md, internal) |
||||
assert not broken |
||||
|
||||
|
||||
@pytest.mark.parametrize('fast,internal', ( |
||||
(False, False), (False, True), |
||||
(True, False), (True, True))) |
||||
def test_sequentially_used_wallet_sync(setup_wallets, fast, internal): |
||||
used_count = [1, 3, 6, 2, 23] |
||||
wallet = create_wallet_for_sync( |
||||
used_count, ['test_sequentially_used_wallet_sync'], |
||||
populate_internal=internal) |
||||
|
||||
sync_test_wallet(fast, wallet) |
||||
|
||||
broken = True |
||||
for md in range(len(used_count)): |
||||
broken = False |
||||
assert used_count[md] == wallet.get_next_unused_index(md, internal) |
||||
assert not broken |
||||
|
||||
|
||||
@pytest.mark.parametrize('fast', (False, True)) |
||||
def test_gap_used_wallet_sync(setup_wallets, fast): |
||||
used_count = [1, 3, 6, 2, 23] |
||||
wallet = create_wallet_for_sync(used_count, ['test_gap_used_wallet_sync']) |
||||
wallet.gap_limit = 20 |
||||
|
||||
for md in range(len(used_count)): |
||||
x = -1 |
||||
for x in range(md): |
||||
assert x <= wallet.gap_limit, "test broken" |
||||
# create some unused addresses |
||||
wallet.get_new_script(md, True) |
||||
wallet.get_new_script(md, False) |
||||
used_count[md] += x + 2 |
||||
jm_single().bc_interface.grab_coins(wallet.get_new_addr(md, True), 1) |
||||
jm_single().bc_interface.grab_coins(wallet.get_new_addr(md, False), 1) |
||||
|
||||
# reset indices to simulate completely unsynced wallet |
||||
for md in range(wallet.max_mixdepth + 1): |
||||
wallet.set_next_index(md, True, 0) |
||||
wallet.set_next_index(md, False, 0) |
||||
|
||||
sync_test_wallet(fast, wallet) |
||||
|
||||
broken = True |
||||
for md in range(len(used_count)): |
||||
broken = False |
||||
assert md + 1 == wallet.get_next_unused_index(md, True) |
||||
assert used_count[md] == wallet.get_next_unused_index(md, False) |
||||
assert not broken |
||||
|
||||
|
||||
@pytest.mark.parametrize('fast', (False, True)) |
||||
def test_multigap_used_wallet_sync(setup_wallets, fast): |
||||
start_index = 5 |
||||
used_count = [start_index, 0, 0, 0, 0] |
||||
wallet = create_wallet_for_sync(used_count, ['test_multigap_used_wallet_sync']) |
||||
wallet.gap_limit = 5 |
||||
|
||||
mixdepth = 0 |
||||
for w in range(5): |
||||
for x in range(int(wallet.gap_limit * 0.6)): |
||||
assert x <= wallet.gap_limit, "test broken" |
||||
# create some unused addresses |
||||
wallet.get_new_script(mixdepth, True) |
||||
wallet.get_new_script(mixdepth, False) |
||||
used_count[mixdepth] += x + 2 |
||||
jm_single().bc_interface.grab_coins(wallet.get_new_addr(mixdepth, True), 1) |
||||
jm_single().bc_interface.grab_coins(wallet.get_new_addr(mixdepth, False), 1) |
||||
|
||||
# reset indices to simulate completely unsynced wallet |
||||
for md in range(wallet.max_mixdepth + 1): |
||||
wallet.set_next_index(md, True, 0) |
||||
wallet.set_next_index(md, False, 0) |
||||
|
||||
sync_test_wallet(fast, wallet) |
||||
|
||||
assert used_count[mixdepth] - start_index == wallet.get_next_unused_index(mixdepth, True) |
||||
assert used_count[mixdepth] == wallet.get_next_unused_index(mixdepth, False) |
||||
|
||||
|
||||
@pytest.mark.parametrize('fast', (False, True)) |
||||
def test_retain_unused_indices_wallet_sync(setup_wallets, fast): |
||||
used_count = [0, 0, 0, 0, 0] |
||||
wallet = create_wallet_for_sync(used_count, ['test_retain_unused_indices_wallet_sync']) |
||||
|
||||
for x in range(9): |
||||
wallet.get_new_script(0, 1) |
||||
|
||||
sync_test_wallet(fast, wallet) |
||||
|
||||
assert wallet.get_next_unused_index(0, 1) == 9 |
||||
|
||||
|
||||
@pytest.mark.parametrize('fast', (False, True)) |
||||
def test_imported_wallet_sync(setup_wallets, fast): |
||||
used_count = [0, 0, 0, 0, 0] |
||||
wallet = create_wallet_for_sync(used_count, ['test_imported_wallet_sync']) |
||||
source_wallet = create_wallet_for_sync(used_count, ['test_imported_wallet_sync_origin']) |
||||
|
||||
address = source_wallet.get_new_addr(0, 1) |
||||
wallet.import_private_key(0, source_wallet.get_wif(0, 1, 0)) |
||||
txid = binascii.unhexlify(jm_single().bc_interface.grab_coins(address, 1)) |
||||
|
||||
sync_test_wallet(fast, wallet) |
||||
|
||||
assert wallet._utxos.have_utxo(txid, 0) == 0 |
||||
|
||||
|
||||
@pytest.fixture(scope='module') |
||||
def setup_wallets(): |
||||
load_program_config() |
||||
jm_single().bc_interface.tick_forward_chain_interval = 1 |
||||
@ -0,0 +1,128 @@
|
||||
from __future__ import print_function, absolute_import, division, unicode_literals |
||||
|
||||
from jmclient import storage |
||||
import pytest |
||||
|
||||
|
||||
class MockStorage(storage.Storage): |
||||
def __init__(self, data, *args, **kwargs): |
||||
self.file_data = data |
||||
self.locked = False |
||||
super(type(self), self).__init__(*args, **kwargs) |
||||
|
||||
def _read_file(self): |
||||
if hasattr(self, 'file_data'): |
||||
return self.file_data |
||||
return b'' |
||||
|
||||
def _write_file(self, data): |
||||
self.file_data = data |
||||
|
||||
def _create_lock(self): |
||||
self.locked = not self.read_only |
||||
|
||||
def _remove_lock(self): |
||||
self.locked = False |
||||
|
||||
|
||||
def test_storage(): |
||||
s = MockStorage(None, 'nonexistant', b'password', create=True) |
||||
assert s.file_data.startswith(s.MAGIC_ENC) |
||||
assert s.locked |
||||
assert s.is_encrypted() |
||||
assert not s.was_changed() |
||||
|
||||
old_data = s.file_data |
||||
|
||||
s.data[b'mydata'] = b'test' |
||||
assert s.was_changed() |
||||
s.save() |
||||
assert s.file_data != old_data |
||||
enc_data = s.file_data |
||||
|
||||
old_data = s.file_data |
||||
s.change_password(b'newpass') |
||||
assert s.is_encrypted() |
||||
assert not s.was_changed() |
||||
assert s.file_data != old_data |
||||
|
||||
old_data = s.file_data |
||||
s.change_password(None) |
||||
assert not s.is_encrypted() |
||||
assert not s.was_changed() |
||||
assert s.file_data != old_data |
||||
assert s.file_data.startswith(s.MAGIC_UNENC) |
||||
|
||||
s2 = MockStorage(enc_data, __file__, b'password') |
||||
assert s2.locked |
||||
assert s2.is_encrypted() |
||||
assert not s2.was_changed() |
||||
assert s2.data[b'mydata'] == b'test' |
||||
|
||||
|
||||
def test_storage_invalid(): |
||||
with pytest.raises(storage.StorageError, message="File does not exist"): |
||||
MockStorage(None, 'nonexistant', b'password') |
||||
|
||||
s = MockStorage(None, 'nonexistant', b'password', create=True) |
||||
with pytest.raises(storage.StorageError, message="Wrong password"): |
||||
MockStorage(s.file_data, __file__, b'wrongpass') |
||||
|
||||
with pytest.raises(storage.StorageError, message="No password"): |
||||
MockStorage(s.file_data, __file__) |
||||
|
||||
with pytest.raises(storage.StorageError, message="Non-wallet file, unencrypted"): |
||||
MockStorage(b'garbagefile', __file__) |
||||
|
||||
with pytest.raises(storage.StorageError, message="Non-wallet file, encrypted"): |
||||
MockStorage(b'garbagefile', __file__, b'password') |
||||
|
||||
|
||||
def test_storage_readonly(): |
||||
s = MockStorage(None, 'nonexistant', b'password', create=True) |
||||
s = MockStorage(s.file_data, __file__, b'password', read_only=True) |
||||
s.data[b'mydata'] = b'test' |
||||
|
||||
assert not s.locked |
||||
assert s.was_changed() |
||||
|
||||
with pytest.raises(storage.StorageError): |
||||
s.save() |
||||
|
||||
with pytest.raises(storage.StorageError): |
||||
s.change_password(b'newpass') |
||||
|
||||
|
||||
def test_storage_lock(tmpdir): |
||||
p = str(tmpdir.join('test.jmdat')) |
||||
pw = None |
||||
|
||||
with pytest.raises(storage.StorageError, message="File does not exist"): |
||||
storage.Storage(p, pw) |
||||
|
||||
s = storage.Storage(p, pw, create=True) |
||||
assert s.is_locked() |
||||
assert not s.is_encrypted() |
||||
assert s.data == {} |
||||
|
||||
with pytest.raises(storage.StorageError, message="File is locked"): |
||||
storage.Storage(p, pw) |
||||
|
||||
assert storage.Storage.is_storage_file(p) |
||||
assert not storage.Storage.is_encrypted_storage_file(p) |
||||
|
||||
s.data[b'test'] = b'value' |
||||
s.save() |
||||
s.close() |
||||
del s |
||||
|
||||
s = storage.Storage(p, pw, read_only=True) |
||||
assert not s.is_locked() |
||||
assert s.data == {b'test': b'value'} |
||||
s.close() |
||||
del s |
||||
|
||||
s = storage.Storage(p, pw) |
||||
assert s.is_locked() |
||||
assert s.data == {b'test': b'value'} |
||||
|
||||
@ -0,0 +1,93 @@
|
||||
from __future__ import print_function, absolute_import, division, unicode_literals |
||||
|
||||
from jmclient.wallet import UTXOManager |
||||
from test_storage import MockStorage |
||||
import pytest |
||||
|
||||
from jmclient import load_program_config |
||||
import jmclient |
||||
from commontest import DummyBlockchainInterface |
||||
|
||||
|
||||
def select(unspent, value): |
||||
return unspent |
||||
|
||||
|
||||
def test_utxomanager_persist(setup_env_nodeps): |
||||
storage = MockStorage(None, 'wallet.jmdat', None, create=True) |
||||
UTXOManager.initialize(storage) |
||||
um = UTXOManager(storage, select) |
||||
|
||||
txid = b'\x00' * UTXOManager.TXID_LEN |
||||
index = 0 |
||||
path = (0,) |
||||
mixdepth = 0 |
||||
value = 500 |
||||
|
||||
um.add_utxo(txid, index, path, value, mixdepth) |
||||
um.add_utxo(txid, index+1, path, value, mixdepth+1) |
||||
|
||||
um.save() |
||||
del um |
||||
|
||||
um = UTXOManager(storage, select) |
||||
|
||||
assert um.have_utxo(txid, index) is mixdepth |
||||
assert um.have_utxo(txid, index+1) is mixdepth + 1 |
||||
assert um.have_utxo(txid, index+2) is False |
||||
|
||||
utxos = um.get_utxos_by_mixdepth() |
||||
assert len(utxos[mixdepth]) == 1 |
||||
assert len(utxos[mixdepth+1]) == 1 |
||||
assert len(utxos[mixdepth+2]) == 0 |
||||
|
||||
balances = um.get_balance_by_mixdepth() |
||||
assert balances[mixdepth] == value |
||||
assert balances[mixdepth+1] == value |
||||
|
||||
um.remove_utxo(txid, index, mixdepth) |
||||
assert um.have_utxo(txid, index) is False |
||||
|
||||
um.save() |
||||
del um |
||||
|
||||
um = UTXOManager(storage, select) |
||||
|
||||
assert um.have_utxo(txid, index) is False |
||||
assert um.have_utxo(txid, index+1) is mixdepth + 1 |
||||
|
||||
utxos = um.get_utxos_by_mixdepth() |
||||
assert len(utxos[mixdepth]) == 0 |
||||
assert len(utxos[mixdepth+1]) == 1 |
||||
|
||||
balances = um.get_balance_by_mixdepth() |
||||
assert balances[mixdepth] == 0 |
||||
assert balances[mixdepth+1] == value |
||||
assert balances[mixdepth+2] == 0 |
||||
|
||||
|
||||
def test_utxomanager_select(setup_env_nodeps): |
||||
storage = MockStorage(None, 'wallet.jmdat', None, create=True) |
||||
UTXOManager.initialize(storage) |
||||
um = UTXOManager(storage, select) |
||||
|
||||
txid = b'\x00' * UTXOManager.TXID_LEN |
||||
index = 0 |
||||
path = (0,) |
||||
mixdepth = 0 |
||||
value = 500 |
||||
|
||||
um.add_utxo(txid, index, path, value, mixdepth) |
||||
|
||||
assert len(um.select_utxos(mixdepth, value)) is 1 |
||||
assert len(um.select_utxos(mixdepth+1, value)) is 0 |
||||
|
||||
um.add_utxo(txid, index+1, path, value, mixdepth) |
||||
assert len(um.select_utxos(mixdepth, value)) is 2 |
||||
|
||||
|
||||
@pytest.fixture |
||||
def setup_env_nodeps(monkeypatch): |
||||
monkeypatch.setattr(jmclient.configure, 'get_blockchain_interface_instance', |
||||
lambda x: DummyBlockchainInterface()) |
||||
load_program_config() |
||||
@ -0,0 +1,590 @@
|
||||
from __future__ import print_function, absolute_import, division, unicode_literals |
||||
'''Wallet functionality tests.''' |
||||
|
||||
import os |
||||
import json |
||||
from binascii import hexlify, unhexlify |
||||
|
||||
import pytest |
||||
import jmbitcoin as btc |
||||
from commontest import binarize_tx |
||||
from jmclient import load_program_config, jm_single, get_log,\ |
||||
SegwitLegacyWallet,BIP32Wallet, BIP49Wallet, LegacyWallet,\ |
||||
VolatileStorage, get_network, cryptoengine, WalletError |
||||
|
||||
testdir = os.path.dirname(os.path.realpath(__file__)) |
||||
log = get_log() |
||||
|
||||
|
||||
def signed_tx_is_segwit(tx): |
||||
for inp in tx['ins']: |
||||
if 'txinwitness' not in inp: |
||||
return False |
||||
return True |
||||
|
||||
|
||||
def assert_segwit(tx): |
||||
assert signed_tx_is_segwit(tx) |
||||
|
||||
|
||||
def assert_not_segwit(tx): |
||||
assert not signed_tx_is_segwit(tx) |
||||
|
||||
|
||||
def get_populated_wallet(amount=10**8, num=3): |
||||
storage = VolatileStorage() |
||||
SegwitLegacyWallet.initialize(storage, get_network()) |
||||
wallet = SegwitLegacyWallet(storage) |
||||
|
||||
# fund three wallet addresses at mixdepth 0 |
||||
for i in range(num): |
||||
fund_wallet_addr(wallet, wallet.get_internal_addr(0), amount / 10**8) |
||||
|
||||
return wallet |
||||
|
||||
|
||||
def fund_wallet_addr(wallet, addr, value_btc=1): |
||||
txin_id = jm_single().bc_interface.grab_coins(addr, value_btc) |
||||
txinfo = jm_single().bc_interface.rpc('gettransaction', [txin_id]) |
||||
txin = btc.deserialize(unhexlify(txinfo['hex'])) |
||||
utxo_in = wallet.add_new_utxos_(txin, unhexlify(txin_id)) |
||||
assert len(utxo_in) == 1 |
||||
return list(utxo_in.keys())[0] |
||||
|
||||
|
||||
def get_bip39_vectors(): |
||||
fh = open(os.path.join(testdir, 'bip39vectors.json')) |
||||
data = json.load(fh)['english'] |
||||
fh.close() |
||||
return data |
||||
|
||||
|
||||
@pytest.mark.parametrize('entropy,mnemonic,key,xpriv', get_bip39_vectors()) |
||||
def test_bip39_seeds(monkeypatch, setup_wallet, entropy, mnemonic, key, xpriv): |
||||
jm_single().config.set('BLOCKCHAIN', 'network', 'mainnet') |
||||
created_entropy = SegwitLegacyWallet.entropy_from_mnemonic(mnemonic) |
||||
assert entropy == hexlify(created_entropy) |
||||
storage = VolatileStorage() |
||||
SegwitLegacyWallet.initialize( |
||||
storage, get_network(), entropy=created_entropy, |
||||
entropy_extension=b'TREZOR', max_mixdepth=4) |
||||
wallet = SegwitLegacyWallet(storage) |
||||
assert (mnemonic, b'TREZOR') == wallet.get_mnemonic_words() |
||||
assert key == hexlify(wallet._create_master_key()) |
||||
|
||||
# need to monkeypatch this, else we'll default to the BIP-49 path |
||||
monkeypatch.setattr(SegwitLegacyWallet, '_get_bip32_base_path', |
||||
BIP32Wallet._get_bip32_base_path) |
||||
assert xpriv == wallet.get_bip32_priv_export() |
||||
|
||||
|
||||
def test_bip49_seed(monkeypatch, setup_wallet): |
||||
jm_single().config.set('BLOCKCHAIN', 'network', 'testnet') |
||||
mnemonic = 'abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about' |
||||
master_xpriv = 'tprv8ZgxMBicQKsPe5YMU9gHen4Ez3ApihUfykaqUorj9t6FDqy3nP6eoXiAo2ssvpAjoLroQxHqr3R5nE3a5dU3DHTjTgJDd7zrbniJr6nrCzd' |
||||
account0_xpriv = 'tprv8gRrNu65W2Msef2BdBSUgFdRTGzC8EwVXnV7UGS3faeXtuMVtGfEdidVeGbThs4ELEoayCAzZQ4uUji9DUiAs7erdVskqju7hrBcDvDsdbY' |
||||
addr0_script_hash = '336caa13e08b96080a32b5d818d59b4ab3b36742' |
||||
|
||||
entropy = SegwitLegacyWallet.entropy_from_mnemonic(mnemonic) |
||||
storage = VolatileStorage() |
||||
SegwitLegacyWallet.initialize( |
||||
storage, get_network(), entropy=entropy, max_mixdepth=0) |
||||
wallet = SegwitLegacyWallet(storage) |
||||
assert (mnemonic, None) == wallet.get_mnemonic_words() |
||||
assert account0_xpriv == wallet.get_bip32_priv_export(0) |
||||
assert addr0_script_hash == hexlify(wallet.get_external_script(0)[2:-1]) |
||||
|
||||
# FIXME: is this desired behaviour? BIP49 wallet will not return xpriv for |
||||
# the root key but only for key after base path |
||||
monkeypatch.setattr(SegwitLegacyWallet, '_get_bip32_base_path', |
||||
BIP32Wallet._get_bip32_base_path) |
||||
assert master_xpriv == wallet.get_bip32_priv_export() |
||||
|
||||
|
||||
def test_bip32_test_vector_1(monkeypatch, setup_wallet): |
||||
jm_single().config.set('BLOCKCHAIN', 'network', 'mainnet') |
||||
|
||||
entropy = unhexlify('000102030405060708090a0b0c0d0e0f') |
||||
storage = VolatileStorage() |
||||
LegacyWallet.initialize( |
||||
storage, get_network(), entropy=entropy, max_mixdepth=0) |
||||
|
||||
# test vector 1 is using hardened derivation for the account/mixdepth level |
||||
monkeypatch.setattr(LegacyWallet, '_get_mixdepth_from_path', |
||||
BIP49Wallet._get_mixdepth_from_path) |
||||
monkeypatch.setattr(LegacyWallet, '_get_bip32_mixdepth_path_level', |
||||
BIP49Wallet._get_bip32_mixdepth_path_level) |
||||
monkeypatch.setattr(LegacyWallet, '_get_bip32_base_path', |
||||
BIP32Wallet._get_bip32_base_path) |
||||
monkeypatch.setattr(LegacyWallet, '_create_master_key', |
||||
BIP32Wallet._create_master_key) |
||||
|
||||
wallet = LegacyWallet(storage) |
||||
|
||||
assert wallet.get_bip32_priv_export() == 'xprv9s21ZrQH143K3QTDL4LXw2F7HEK3wJUD2nW2nRk4stbPy6cq3jPPqjiChkVvvNKmPGJxWUtg6LnF5kejMRNNU3TGtRBeJgk33yuGBxrMPHi' |
||||
assert wallet.get_bip32_pub_export() == 'xpub661MyMwAqRbcFtXgS5sYJABqqG9YLmC4Q1Rdap9gSE8NqtwybGhePY2gZ29ESFjqJoCu1Rupje8YtGqsefD265TMg7usUDFdp6W1EGMcet8' |
||||
assert wallet.get_bip32_priv_export(0) == 'xprv9uHRZZhk6KAJC1avXpDAp4MDc3sQKNxDiPvvkX8Br5ngLNv1TxvUxt4cV1rGL5hj6KCesnDYUhd7oWgT11eZG7XnxHrnYeSvkzY7d2bhkJ7' |
||||
assert wallet.get_bip32_pub_export(0) == 'xpub68Gmy5EdvgibQVfPdqkBBCHxA5htiqg55crXYuXoQRKfDBFA1WEjWgP6LHhwBZeNK1VTsfTFUHCdrfp1bgwQ9xv5ski8PX9rL2dZXvgGDnw' |
||||
assert wallet.get_bip32_priv_export(0, 1) == 'xprv9wTYmMFdV23N2TdNG573QoEsfRrWKQgWeibmLntzniatZvR9BmLnvSxqu53Kw1UmYPxLgboyZQaXwTCg8MSY3H2EU4pWcQDnRnrVA1xe8fs' |
||||
assert wallet.get_bip32_pub_export(0, 1) == 'xpub6ASuArnXKPbfEwhqN6e3mwBcDTgzisQN1wXN9BJcM47sSikHjJf3UFHKkNAWbWMiGj7Wf5uMash7SyYq527Hqck2AxYysAA7xmALppuCkwQ' |
||||
# there are more test vectors but those don't match joinmarket's wallet |
||||
# structure, hence they make litte sense to test here |
||||
|
||||
|
||||
def test_bip32_test_vector_2(monkeypatch, setup_wallet): |
||||
jm_single().config.set('BLOCKCHAIN', 'network', 'mainnet') |
||||
|
||||
entropy = unhexlify('fffcf9f6f3f0edeae7e4e1dedbd8d5d2cfccc9c6c3c0bdbab7b4b1aeaba8a5a29f9c999693908d8a8784817e7b7875726f6c696663605d5a5754514e4b484542') |
||||
storage = VolatileStorage() |
||||
LegacyWallet.initialize( |
||||
storage, get_network(), entropy=entropy, max_mixdepth=0) |
||||
|
||||
monkeypatch.setattr(LegacyWallet, '_get_bip32_base_path', |
||||
BIP32Wallet._get_bip32_base_path) |
||||
monkeypatch.setattr(LegacyWallet, '_create_master_key', |
||||
BIP32Wallet._create_master_key) |
||||
|
||||
wallet = LegacyWallet(storage) |
||||
|
||||
assert wallet.get_bip32_priv_export() == 'xprv9s21ZrQH143K31xYSDQpPDxsXRTUcvj2iNHm5NUtrGiGG5e2DtALGdso3pGz6ssrdK4PFmM8NSpSBHNqPqm55Qn3LqFtT2emdEXVYsCzC2U' |
||||
assert wallet.get_bip32_pub_export() == 'xpub661MyMwAqRbcFW31YEwpkMuc5THy2PSt5bDMsktWQcFF8syAmRUapSCGu8ED9W6oDMSgv6Zz8idoc4a6mr8BDzTJY47LJhkJ8UB7WEGuduB' |
||||
assert wallet.get_bip32_priv_export(0) == 'xprv9vHkqa6EV4sPZHYqZznhT2NPtPCjKuDKGY38FBWLvgaDx45zo9WQRUT3dKYnjwih2yJD9mkrocEZXo1ex8G81dwSM1fwqWpWkeS3v86pgKt' |
||||
assert wallet.get_bip32_pub_export(0) == 'xpub69H7F5d8KSRgmmdJg2KhpAK8SR3DjMwAdkxj3ZuxV27CprR9LgpeyGmXUbC6wb7ERfvrnKZjXoUmmDznezpbZb7ap6r1D3tgFxHmwMkQTPH' |
||||
# there are more test vectors but those don't match joinmarket's wallet |
||||
# structure, hence they make litte sense to test here |
||||
|
||||
|
||||
def test_bip32_test_vector_3(monkeypatch, setup_wallet): |
||||
jm_single().config.set('BLOCKCHAIN', 'network', 'mainnet') |
||||
|
||||
entropy = unhexlify('4b381541583be4423346c643850da4b320e46a87ae3d2a4e6da11eba819cd4acba45d239319ac14f863b8d5ab5a0d0c64d2e8a1e7d1457df2e5a3c51c73235be') |
||||
storage = VolatileStorage() |
||||
LegacyWallet.initialize( |
||||
storage, get_network(), entropy=entropy, max_mixdepth=0) |
||||
|
||||
# test vector 3 is using hardened derivation for the account/mixdepth level |
||||
monkeypatch.setattr(LegacyWallet, '_get_mixdepth_from_path', |
||||
BIP49Wallet._get_mixdepth_from_path) |
||||
monkeypatch.setattr(LegacyWallet, '_get_bip32_mixdepth_path_level', |
||||
BIP49Wallet._get_bip32_mixdepth_path_level) |
||||
monkeypatch.setattr(LegacyWallet, '_get_bip32_base_path', |
||||
BIP32Wallet._get_bip32_base_path) |
||||
monkeypatch.setattr(LegacyWallet, '_create_master_key', |
||||
BIP32Wallet._create_master_key) |
||||
|
||||
wallet = LegacyWallet(storage) |
||||
|
||||
assert wallet.get_bip32_priv_export() == 'xprv9s21ZrQH143K25QhxbucbDDuQ4naNntJRi4KUfWT7xo4EKsHt2QJDu7KXp1A3u7Bi1j8ph3EGsZ9Xvz9dGuVrtHHs7pXeTzjuxBrCmmhgC6' |
||||
assert wallet.get_bip32_pub_export() == 'xpub661MyMwAqRbcEZVB4dScxMAdx6d4nFc9nvyvH3v4gJL378CSRZiYmhRoP7mBy6gSPSCYk6SzXPTf3ND1cZAceL7SfJ1Z3GC8vBgp2epUt13' |
||||
assert wallet.get_bip32_priv_export(0) == 'xprv9uPDJpEQgRQfDcW7BkF7eTya6RPxXeJCqCJGHuCJ4GiRVLzkTXBAJMu2qaMWPrS7AANYqdq6vcBcBUdJCVVFceUvJFjaPdGZ2y9WACViL4L' |
||||
assert wallet.get_bip32_pub_export(0) == 'xpub68NZiKmJWnxxS6aaHmn81bvJeTESw724CRDs6HbuccFQN9Ku14VQrADWgqbhhTHBaohPX4CjNLf9fq9MYo6oDaPPLPxSb7gwQN3ih19Zm4Y' |
||||
|
||||
|
||||
@pytest.mark.parametrize('mixdepth,internal,index,address,wif', [ |
||||
[0, 0, 0, 'mpCX9EbdXpcrKMtjEe1fqFhvzctkfzMYTX', 'cVqtSSoVxFyPqTRGfeESi31uCYfgTF4tGWRtGeVs84fzybiX5TPk'], |
||||
[0, 0, 5, 'mtj85a3pFppRhrxNcFig1k7ECshrZjJ9XC', 'cMsFXc4TRw9PTcCTv7x9mr88rDeGXBTLEV67mKaw2cxCkjkhL32G'], |
||||
[0, 1, 3, 'n1EaQuqvTRm719hsSJ7yRsj49JfoG1C86q', 'cUgSTqnAtvYoQRXCYy4wCFfaks2Zrz1d55m6mVhFyVhQbkDi7JGJ'], |
||||
[2, 1, 2, 'mfxkBk7uDhmF5PJGS9d1NonGiAxPwJqQP4', 'cPcZXSiXPuS5eiT4oDrDKi1mFumw5D1RcWzK2gkGdEHjEz99eyXn'] |
||||
]) |
||||
def test_bip32_addresses_p2pkh(monkeypatch, setup_wallet, mixdepth, internal, index, address, wif): |
||||
""" |
||||
Test with a random but fixed entropy |
||||
""" |
||||
jm_single().config.set('BLOCKCHAIN', 'network', 'testnet') |
||||
|
||||
entropy = unhexlify('2e0339ba89b4a1272cdf78b27ee62669ee01992a59e836e2807051be128ca817') |
||||
storage = VolatileStorage() |
||||
LegacyWallet.initialize( |
||||
storage, get_network(), entropy=entropy, max_mixdepth=3) |
||||
|
||||
monkeypatch.setattr(LegacyWallet, '_get_bip32_base_path', |
||||
BIP32Wallet._get_bip32_base_path) |
||||
monkeypatch.setattr(LegacyWallet, '_create_master_key', |
||||
BIP32Wallet._create_master_key) |
||||
|
||||
wallet = LegacyWallet(storage) |
||||
|
||||
# wallet needs to know about all intermediate keys |
||||
for i in range(index + 1): |
||||
wallet.get_new_script(mixdepth, internal) |
||||
|
||||
assert wif == wallet.get_wif(mixdepth, internal, index) |
||||
assert address == wallet.get_addr(mixdepth, internal, index) |
||||
|
||||
|
||||
@pytest.mark.parametrize('mixdepth,internal,index,address,wif', [ |
||||
[0, 0, 0, '2MzY5yyonUY7zpHspg7jB7WQs1uJxKafQe4', 'cRAGLvPmhpzJNgdMT4W2gVwEW3fusfaDqdQWM2vnWLgXKzCWKtcM'], |
||||
[0, 0, 5, '2MsKvqPGStp3yXT8UivuAaGwfPzT7xYwSWk', 'cSo3h7nRuV4fwhVPXeTDJx6cBCkjAzS9VM8APXViyjoSaMq85ZKn'], |
||||
[0, 1, 3, '2N7k6wiQqkuMaApwGhk3HKrifprUSDydqUv', 'cTwq3UsZa8STVmwZR94dDphgqgdLFeuaRFD1Ea44qjbjFfKEb1n5'], |
||||
[2, 1, 2, '2MtE6gzHgmEXeWzKsmCJFEqkrpNuBDvoRnz', 'cPV8FZuCvrRpk4RhmhpjnSucHhaQZUan4Vbyo1NVQtuAxurW9grb'] |
||||
]) |
||||
def test_bip32_addresses_p2sh_p2wpkh(setup_wallet, mixdepth, internal, index, address, wif): |
||||
""" |
||||
Test with a random but fixed entropy |
||||
""" |
||||
jm_single().config.set('BLOCKCHAIN', 'network', 'testnet') |
||||
|
||||
entropy = unhexlify('2e0339ba89b4a1272cdf78b27ee62669ee01992a59e836e2807051be128ca817') |
||||
storage = VolatileStorage() |
||||
SegwitLegacyWallet.initialize( |
||||
storage, get_network(), entropy=entropy, max_mixdepth=3) |
||||
wallet = SegwitLegacyWallet(storage) |
||||
|
||||
# wallet needs to know about all intermediate keys |
||||
for i in range(index + 1): |
||||
wallet.get_new_script(mixdepth, internal) |
||||
|
||||
assert wif == wallet.get_wif(mixdepth, internal, index) |
||||
assert address == wallet.get_addr(mixdepth, internal, index) |
||||
|
||||
|
||||
def test_import_key(setup_wallet): |
||||
jm_single().config.set('BLOCKCHAIN', 'network', 'testnet') |
||||
storage = VolatileStorage() |
||||
SegwitLegacyWallet.initialize(storage, get_network()) |
||||
wallet = SegwitLegacyWallet(storage) |
||||
|
||||
wallet.import_private_key( |
||||
0, 'cRAGLvPmhpzJNgdMT4W2gVwEW3fusfaDqdQWM2vnWLgXKzCWKtcM', |
||||
cryptoengine.TYPE_P2SH_P2WPKH) |
||||
wallet.import_private_key( |
||||
1, 'cVqtSSoVxFyPqTRGfeESi31uCYfgTF4tGWRtGeVs84fzybiX5TPk', |
||||
cryptoengine.TYPE_P2PKH) |
||||
|
||||
with pytest.raises(WalletError): |
||||
wallet.import_private_key( |
||||
1, 'cRAGLvPmhpzJNgdMT4W2gVwEW3fusfaDqdQWM2vnWLgXKzCWKtcM', |
||||
cryptoengine.TYPE_P2SH_P2WPKH) |
||||
|
||||
# test persist imported keys |
||||
wallet.save() |
||||
data = storage.file_data |
||||
|
||||
del wallet |
||||
del storage |
||||
|
||||
storage = VolatileStorage(data=data) |
||||
wallet = SegwitLegacyWallet(storage) |
||||
|
||||
imported_paths_md0 = list(wallet.yield_imported_paths(0)) |
||||
imported_paths_md1 = list(wallet.yield_imported_paths(1)) |
||||
assert len(imported_paths_md0) == 1 |
||||
assert len(imported_paths_md1) == 1 |
||||
|
||||
# verify imported addresses |
||||
assert wallet.get_addr_path(imported_paths_md0[0]) == '2MzY5yyonUY7zpHspg7jB7WQs1uJxKafQe4' |
||||
assert wallet.get_addr_path(imported_paths_md1[0]) == 'mpCX9EbdXpcrKMtjEe1fqFhvzctkfzMYTX' |
||||
|
||||
# test remove key |
||||
wallet.remove_imported_key(path=imported_paths_md0[0]) |
||||
assert not list(wallet.yield_imported_paths(0)) |
||||
|
||||
assert wallet.get_details(imported_paths_md1[0]) == (1, 'imported', 0) |
||||
|
||||
|
||||
@pytest.mark.parametrize('wif,keytype,type_check', [ |
||||
['cVqtSSoVxFyPqTRGfeESi31uCYfgTF4tGWRtGeVs84fzybiX5TPk', |
||||
cryptoengine.TYPE_P2PKH, assert_not_segwit], |
||||
['cRAGLvPmhpzJNgdMT4W2gVwEW3fusfaDqdQWM2vnWLgXKzCWKtcM', |
||||
cryptoengine.TYPE_P2SH_P2WPKH, assert_segwit] |
||||
]) |
||||
def test_signing_imported(setup_wallet, wif, keytype, type_check): |
||||
jm_single().config.set('BLOCKCHAIN', 'network', 'testnet') |
||||
storage = VolatileStorage() |
||||
SegwitLegacyWallet.initialize(storage, get_network()) |
||||
wallet = SegwitLegacyWallet(storage) |
||||
|
||||
MIXDEPTH = 0 |
||||
path = wallet.import_private_key(MIXDEPTH, wif, keytype) |
||||
utxo = fund_wallet_addr(wallet, wallet.get_addr_path(path)) |
||||
tx = btc.deserialize(btc.mktx(['{}:{}'.format(hexlify(utxo[0]), utxo[1])], |
||||
['00'*17 + ':' + str(10**8 - 9000)])) |
||||
binarize_tx(tx) |
||||
script = wallet.get_script_path(path) |
||||
wallet.sign_tx(tx, {0: (script, 10**8)}) |
||||
type_check(tx) |
||||
txout = jm_single().bc_interface.pushtx(hexlify(btc.serialize(tx))) |
||||
assert txout |
||||
|
||||
|
||||
@pytest.mark.parametrize('wallet_cls,type_check', [ |
||||
[LegacyWallet, assert_not_segwit], |
||||
[SegwitLegacyWallet, assert_segwit] |
||||
]) |
||||
def test_signing_simple(setup_wallet, wallet_cls, type_check): |
||||
jm_single().config.set('BLOCKCHAIN', 'network', 'testnet') |
||||
storage = VolatileStorage() |
||||
wallet_cls.initialize(storage, get_network()) |
||||
wallet = wallet_cls(storage) |
||||
utxo = fund_wallet_addr(wallet, wallet.get_internal_addr(0)) |
||||
tx = btc.deserialize(btc.mktx(['{}:{}'.format(hexlify(utxo[0]), utxo[1])], |
||||
['00'*17 + ':' + str(10**8 - 9000)])) |
||||
binarize_tx(tx) |
||||
script = wallet.get_script(0, 1, 0) |
||||
wallet.sign_tx(tx, {0: (script, 10**8)}) |
||||
type_check(tx) |
||||
txout = jm_single().bc_interface.pushtx(hexlify(btc.serialize(tx))) |
||||
assert txout |
||||
|
||||
|
||||
def test_add_utxos(setup_wallet): |
||||
jm_single().config.set('BLOCKCHAIN', 'network', 'testnet') |
||||
amount = 10**8 |
||||
num_tx = 3 |
||||
|
||||
wallet = get_populated_wallet(amount, num_tx) |
||||
|
||||
balances = wallet.get_balance_by_mixdepth() |
||||
assert balances[0] == num_tx * amount |
||||
for md in range(1, wallet.max_mixdepth + 1): |
||||
assert balances[md] == 0 |
||||
|
||||
utxos = wallet.get_utxos_by_mixdepth_() |
||||
assert len(utxos[0]) == num_tx |
||||
for md in range(1, wallet.max_mixdepth + 1): |
||||
assert not utxos[md] |
||||
|
||||
with pytest.raises(Exception): |
||||
# no funds in mixdepth |
||||
wallet.select_utxos_(1, amount) |
||||
|
||||
with pytest.raises(Exception): |
||||
# not enough funds |
||||
wallet.select_utxos_(0, amount * (num_tx + 1)) |
||||
|
||||
wallet.reset_utxos() |
||||
assert wallet.get_balance_by_mixdepth()[0] == 0 |
||||
|
||||
|
||||
def test_select_utxos(setup_wallet): |
||||
jm_single().config.set('BLOCKCHAIN', 'network', 'testnet') |
||||
amount = 10**8 |
||||
|
||||
wallet = get_populated_wallet(amount) |
||||
utxos = wallet.select_utxos_(0, amount // 2) |
||||
|
||||
assert len(utxos) == 1 |
||||
utxos = list(utxos.keys()) |
||||
|
||||
more_utxos = wallet.select_utxos_(0, int(amount * 1.5), utxo_filter=utxos) |
||||
assert len(more_utxos) == 2 |
||||
assert utxos[0] not in more_utxos |
||||
|
||||
|
||||
def test_add_new_utxos(setup_wallet): |
||||
jm_single().config.set('BLOCKCHAIN', 'network', 'testnet') |
||||
wallet = get_populated_wallet(num=1) |
||||
|
||||
scripts = [wallet.get_new_script(x, True) for x in range(3)] |
||||
tx_scripts = list(scripts) |
||||
tx_scripts.append(b'\x22'*17) |
||||
|
||||
tx = btc.deserialize(btc.mktx( |
||||
['0'*64 + ':2'], [{'script': hexlify(s), 'value': 10**8} |
||||
for s in tx_scripts])) |
||||
binarize_tx(tx) |
||||
txid = b'\x01' * 32 |
||||
added = wallet.add_new_utxos_(tx, txid) |
||||
assert len(added) == len(scripts) |
||||
|
||||
added_scripts = {x['script'] for x in added.values()} |
||||
for s in scripts: |
||||
assert s in added_scripts |
||||
|
||||
balances = wallet.get_balance_by_mixdepth() |
||||
assert balances[0] == 2 * 10**8 |
||||
assert balances[1] == 10**8 |
||||
assert balances[2] == 10**8 |
||||
assert len(balances) == wallet.max_mixdepth + 1 |
||||
|
||||
|
||||
def test_remove_old_utxos(setup_wallet): |
||||
jm_single().config.set('BLOCKCHAIN', 'network', 'testnet') |
||||
wallet = get_populated_wallet() |
||||
|
||||
# add some more utxos to mixdepth 1 |
||||
for i in range(3): |
||||
txin = jm_single().bc_interface.grab_coins( |
||||
wallet.get_internal_addr(1), 1) |
||||
wallet.add_utxo(unhexlify(txin), 0, wallet.get_script(1, 1, i), 10**8) |
||||
|
||||
inputs = wallet.select_utxos_(0, 10**8) |
||||
inputs.update(wallet.select_utxos_(1, 2 * 10**8)) |
||||
assert len(inputs) == 3 |
||||
|
||||
tx_inputs = list(inputs.keys()) |
||||
tx_inputs.append((b'\x12'*32, 6)) |
||||
|
||||
tx = btc.deserialize(btc.mktx( |
||||
['{}:{}'.format(hexlify(txid), i) for txid, i in tx_inputs], |
||||
['0' * 36 + ':' + str(3 * 10**8 - 1000)])) |
||||
binarize_tx(tx) |
||||
|
||||
removed = wallet.remove_old_utxos_(tx) |
||||
assert len(removed) == len(inputs) |
||||
|
||||
for txid in removed: |
||||
assert txid in inputs |
||||
|
||||
balances = wallet.get_balance_by_mixdepth() |
||||
assert balances[0] == 2 * 10**8 |
||||
assert balances[1] == 10**8 |
||||
assert balances[2] == 0 |
||||
assert len(balances) == wallet.max_mixdepth + 1 |
||||
|
||||
|
||||
def test_initialize_twice(setup_wallet): |
||||
wallet = get_populated_wallet(num=0) |
||||
storage = wallet._storage |
||||
with pytest.raises(WalletError): |
||||
SegwitLegacyWallet.initialize(storage, get_network()) |
||||
|
||||
|
||||
def test_is_known(setup_wallet): |
||||
wallet = get_populated_wallet(num=0) |
||||
script = wallet.get_new_script(1, True) |
||||
addr = wallet.get_new_addr(2, False) |
||||
|
||||
assert wallet.is_known_script(script) |
||||
assert wallet.is_known_addr(addr) |
||||
assert wallet.is_known_addr(wallet.script_to_addr(script)) |
||||
assert wallet.is_known_script(wallet.addr_to_script(addr)) |
||||
|
||||
assert not wallet.is_known_script(b'\x12' * len(script)) |
||||
assert not wallet.is_known_addr('2MzY5yyonUY7zpHspg7jB7WQs1uJxKafQe4') |
||||
|
||||
|
||||
def test_wallet_save(setup_wallet): |
||||
wallet = get_populated_wallet() |
||||
|
||||
script = wallet.get_external_script(1) |
||||
|
||||
wallet.save() |
||||
storage = wallet._storage |
||||
data = storage.file_data |
||||
|
||||
del wallet |
||||
del storage |
||||
|
||||
storage = VolatileStorage(data=data) |
||||
wallet = SegwitLegacyWallet(storage) |
||||
|
||||
assert wallet.get_next_unused_index(0, True) == 3 |
||||
assert wallet.get_next_unused_index(0, False) == 0 |
||||
assert wallet.get_next_unused_index(1, True) == 0 |
||||
assert wallet.get_next_unused_index(1, False) == 1 |
||||
assert wallet.is_known_script(script) |
||||
|
||||
|
||||
def test_set_next_index(setup_wallet): |
||||
wallet = get_populated_wallet() |
||||
|
||||
assert wallet.get_next_unused_index(0, True) == 3 |
||||
|
||||
with pytest.raises(Exception): |
||||
# cannot advance index without force=True |
||||
wallet.set_next_index(0, True, 5) |
||||
|
||||
wallet.set_next_index(0, True, 1) |
||||
assert wallet.get_next_unused_index(0, True) == 1 |
||||
|
||||
wallet.set_next_index(0, True, 20, force=True) |
||||
assert wallet.get_next_unused_index(0, True) == 20 |
||||
|
||||
script = wallet.get_new_script(0, True) |
||||
path = wallet.script_to_path(script) |
||||
index = wallet.get_details(path)[2] |
||||
assert index == 20 |
||||
|
||||
|
||||
def test_path_repr(setup_wallet): |
||||
wallet = get_populated_wallet() |
||||
path = wallet.get_path(2, False, 0) |
||||
path_repr = wallet.get_path_repr(path) |
||||
path_new = wallet.path_repr_to_path(path_repr) |
||||
|
||||
assert path_new == path |
||||
|
||||
|
||||
def test_path_repr_imported(setup_wallet): |
||||
wallet = get_populated_wallet(num=0) |
||||
path = wallet.import_private_key( |
||||
0, 'cRAGLvPmhpzJNgdMT4W2gVwEW3fusfaDqdQWM2vnWLgXKzCWKtcM', |
||||
cryptoengine.TYPE_P2SH_P2WPKH) |
||||
path_repr = wallet.get_path_repr(path) |
||||
path_new = wallet.path_repr_to_path(path_repr) |
||||
|
||||
assert path_new == path |
||||
|
||||
|
||||
def test_wrong_wallet_cls(setup_wallet): |
||||
storage = VolatileStorage() |
||||
SegwitLegacyWallet.initialize(storage, get_network()) |
||||
wallet = SegwitLegacyWallet(storage) |
||||
|
||||
wallet.save() |
||||
data = storage.file_data |
||||
|
||||
del wallet |
||||
del storage |
||||
|
||||
storage = VolatileStorage(data=data) |
||||
|
||||
with pytest.raises(Exception): |
||||
LegacyWallet(storage) |
||||
|
||||
|
||||
def test_wallet_id(setup_wallet): |
||||
storage1 = VolatileStorage() |
||||
SegwitLegacyWallet.initialize(storage1, get_network()) |
||||
wallet1 = SegwitLegacyWallet(storage1) |
||||
|
||||
storage2 = VolatileStorage() |
||||
LegacyWallet.initialize(storage2, get_network(), entropy=wallet1._entropy) |
||||
wallet2 = LegacyWallet(storage2) |
||||
|
||||
assert wallet1.get_wallet_id() != wallet2.get_wallet_id() |
||||
|
||||
storage2 = VolatileStorage() |
||||
SegwitLegacyWallet.initialize(storage2, get_network(), |
||||
entropy=wallet1._entropy) |
||||
wallet2 = SegwitLegacyWallet(storage2) |
||||
|
||||
assert wallet1.get_wallet_id() == wallet2.get_wallet_id() |
||||
|
||||
|
||||
def test_addr_script_conversion(setup_wallet): |
||||
wallet = get_populated_wallet(num=1) |
||||
|
||||
path = wallet.get_path(0, True, 0) |
||||
script = wallet.get_script_path(path) |
||||
addr = wallet.script_to_addr(script) |
||||
|
||||
assert script == wallet.addr_to_script(addr) |
||||
addr_path = wallet.addr_to_path(addr) |
||||
assert path == addr_path |
||||
|
||||
|
||||
def test_imported_key_removed(setup_wallet): |
||||
wif = 'cRAGLvPmhpzJNgdMT4W2gVwEW3fusfaDqdQWM2vnWLgXKzCWKtcM' |
||||
key_type = cryptoengine.TYPE_P2SH_P2WPKH |
||||
|
||||
storage = VolatileStorage() |
||||
SegwitLegacyWallet.initialize(storage, get_network()) |
||||
wallet = SegwitLegacyWallet(storage) |
||||
|
||||
path = wallet.import_private_key(1, wif, key_type) |
||||
script = wallet.get_script_path(path) |
||||
assert wallet.is_known_script(script) |
||||
|
||||
wallet.remove_imported_key(path=path) |
||||
assert not wallet.is_known_script(script) |
||||
|
||||
with pytest.raises(WalletError): |
||||
wallet.get_script_path(path) |
||||
|
||||
|
||||
@pytest.fixture(scope='module') |
||||
def setup_wallet(): |
||||
load_program_config() |
||||
jm_single().bc_interface.tick_forward_chain_interval = 2 |
||||
@ -0,0 +1,151 @@
|
||||
#!/usr/bin/env python2 |
||||
|
||||
import argparse |
||||
import json |
||||
import os.path |
||||
from hashlib import sha256 |
||||
from binascii import hexlify, unhexlify |
||||
from collections import defaultdict |
||||
|
||||
from jmclient import Storage, decryptData, load_program_config |
||||
from jmclient.wallet_utils import get_password, get_wallet_cls,\ |
||||
cli_get_wallet_passphrase_check, get_wallet_path |
||||
from jmbitcoin import wif_compressed_privkey |
||||
|
||||
|
||||
class ConvertException(Exception): |
||||
pass |
||||
|
||||
|
||||
def get_max_mixdepth(data): |
||||
return max(1, len(data.get('index_cache', [1])) - 1, |
||||
*data.get('imported', {}).keys()) |
||||
|
||||
|
||||
def is_encrypted(wallet_data): |
||||
return 'encrypted_seed' in wallet_data or 'encrypted_entropy' in wallet_data |
||||
|
||||
|
||||
def double_sha256(plaintext): |
||||
return sha256(sha256(plaintext).digest()).digest() |
||||
|
||||
|
||||
def decrypt_entropy_extension(enc_data, key): |
||||
data = decryptData(key, unhexlify(enc_data)) |
||||
if data[-9] != b'\xff': |
||||
raise ConvertException("Wrong password.") |
||||
chunks = data.split(b'\xff') |
||||
if len(chunks) < 3 or data[-8:] != double_sha256(chunks[1])[:8]: |
||||
raise ConvertException("Wrong password.") |
||||
return chunks[1] |
||||
|
||||
|
||||
def decrypt_wallet_data(data, password): |
||||
key = double_sha256(password.encode('utf-8')) |
||||
|
||||
enc_entropy = data.get('encrypted_seed') or data.get('encrypted_entropy') |
||||
enc_entropy_ext = data.get('encrypted_mnemonic_extension') |
||||
enc_imported = data.get('imported_keys') |
||||
|
||||
entropy = decryptData(key, unhexlify(enc_entropy)) |
||||
data['entropy'] = entropy |
||||
if enc_entropy_ext: |
||||
data['entropy_ext'] = decrypt_entropy_extension(enc_entropy_ext, key) |
||||
|
||||
if enc_imported: |
||||
imported_keys = defaultdict(list) |
||||
for e in enc_imported: |
||||
md = int(e['mixdepth']) |
||||
imported_enc_key = unhexlify(e['encrypted_privkey']) |
||||
imported_key = decryptData(key, imported_enc_key) |
||||
imported_keys[md].append(imported_key) |
||||
data['imported'] = imported_keys |
||||
|
||||
|
||||
def new_wallet_from_data(data, file_name): |
||||
print("Creating new wallet file.") |
||||
new_pw = cli_get_wallet_passphrase_check() |
||||
if new_pw is False: |
||||
return False |
||||
|
||||
storage = Storage(file_name, create=True, password=new_pw) |
||||
wallet_cls = get_wallet_cls() |
||||
|
||||
kwdata = { |
||||
'entropy': data['entropy'], |
||||
'timestamp': data.get('creation_time'), |
||||
'max_mixdepth': get_max_mixdepth(data) |
||||
} |
||||
|
||||
if 'entropy_ext' in data: |
||||
kwdata['entropy_extension'] = data['entropy_ext'] |
||||
|
||||
wallet_cls.initialize(storage, data['network'], **kwdata) |
||||
wallet = wallet_cls(storage) |
||||
|
||||
if 'index_cache' in data: |
||||
for md, indices in enumerate(data['index_cache']): |
||||
wallet.set_next_index(md, 0, indices[0], force=True) |
||||
wallet.set_next_index(md, 1, indices[1], force=True) |
||||
|
||||
if 'imported' in data: |
||||
for md in data['imported']: |
||||
for privkey in data['imported'][md]: |
||||
privkey += b'\x01' |
||||
wif = wif_compressed_privkey(hexlify(privkey)) |
||||
wallet.import_private_key(md, wif) |
||||
|
||||
wallet.save() |
||||
wallet.close() |
||||
return True |
||||
|
||||
|
||||
def parse_old_wallet(fh): |
||||
file_data = json.load(fh) |
||||
|
||||
if is_encrypted(file_data): |
||||
pw = get_password("Enter password for old wallet file: ") |
||||
try: |
||||
decrypt_wallet_data(file_data, pw) |
||||
except ValueError: |
||||
print("Failed to open wallet: bad password") |
||||
return |
||||
except Exception as e: |
||||
print("Error: {}".format(e)) |
||||
print("Failed to open wallet. Wrong password?") |
||||
return |
||||
|
||||
return file_data |
||||
|
||||
|
||||
def main(): |
||||
parser = argparse.ArgumentParser( |
||||
description="Convert old joinmarket json wallet format to new jmdat " |
||||
"format") |
||||
parser.add_argument('old_wallet_file', type=open) |
||||
parser.add_argument('--name', '-n', required=False, dest='name', |
||||
help="Name of the new wallet file. Default: [old wallet name].jmdat") |
||||
|
||||
try: |
||||
args = parser.parse_args() |
||||
except Exception as e: |
||||
print("Error: {}".format(e)) |
||||
return |
||||
|
||||
data = parse_old_wallet(args.old_wallet_file) |
||||
|
||||
if not data: |
||||
return |
||||
|
||||
file_name = args.name or\ |
||||
os.path.split(args.old_wallet_file.name)[-1].rsplit('.', 1)[0] + '.jmdat' |
||||
wallet_path = get_wallet_path(file_name, None) |
||||
if new_wallet_from_data(data, wallet_path): |
||||
print("New wallet file created at {}".format(wallet_path)) |
||||
else: |
||||
print("Failed to convert wallet.") |
||||
|
||||
|
||||
if __name__ == '__main__': |
||||
load_program_config() |
||||
main() |
||||
Loading…
Reference in new issue