Browse Source

adopt blockchaininterface for new wallet

master
undeath 8 years ago
parent
commit
1f309678ca
  1. 328
      jmclient/jmclient/blockchaininterface.py
  2. 2
      jmclient/jmclient/electruminterface.py
  3. 6
      test/test_segwit.py

328
jmclient/jmclient/blockchaininterface.py

@ -17,11 +17,6 @@ from jmbase.support import get_log
log = get_log()
def is_index_ahead_of_cache(wallet, mix_depth, forchange):
if mix_depth >= len(wallet.index_cache):
return True
return wallet.index[mix_depth][forchange] >= wallet.index_cache[mix_depth][
forchange]
def sync_wallet(wallet, fast=False):
"""Wrapper function to choose fast syncing where it's
@ -55,8 +50,7 @@ class BlockchainInterface(object):
@abc.abstractmethod
def sync_addresses(self, wallet):
"""Finds which addresses have been used and sets
wallet.index appropriately"""
"""Finds which addresses have been used"""
@abc.abstractmethod
def sync_unspent(self, wallet):
@ -421,41 +415,37 @@ class BitcoinCoreInterface(BlockchainInterface):
wallet_name = self.get_wallet_name(wallet)
agd = self.rpc('listaddressgroupings', [])
#flatten all groups into a single list; then, remove duplicates
fagd = [tuple(item) for sublist in agd for item in sublist]
fagd = (tuple(item) for sublist in agd for item in sublist)
#"deduplicated flattened address grouping data" = dfagd
dfagd = list(set(fagd))
#for lookup, want dict of form {"address": amount}
used_address_dict = {}
dfagd = set(fagd)
used_addresses = set()
for addr_info in dfagd:
if len(addr_info) < 3 or addr_info[2] != wallet_name:
continue
used_address_dict[addr_info[0]] = (addr_info[1], addr_info[2])
used_addresses.add(addr_info[0])
#for a first run, import first chunk
if len(used_address_dict.keys()) == 0:
if not used_addresses:
log.info("Detected new wallet, performing initial import")
for i in range(wallet.max_mix_depth):
for j in [0, 1]:
addrs_to_import = []
for k in range(wallet.gaplimit + 10): # a few more for safety
addrs_to_import.append(wallet.get_addr(i, j, k))
self.import_addresses(addrs_to_import, wallet_name)
wallet.index[i][j] = 0
# delegate inital address import to sync_addresses
# this should be fast because "getaddressesbyaccount" should return
# an empty list in this case
self.sync_addresses(wallet)
self.wallet_synced = True
return
#Wallet has been used; scan forwards.
log.debug("Fast sync in progress. Got this many used addresses: " + str(
len(used_address_dict)))
len(used_addresses)))
#Need to have wallet.index point to the last used address
#and fill addr_cache.
#Algo:
# 1. Scan batch 1 of each branch, accumulate wallet addresses into dict.
# 2. Find matches between that dict and used addresses, add those to
# used_indices dict and add to address cache.
# 3. Check if all addresses in 'used addresses' have been matched, if
# 1. Scan batch 1 of each branch, record matched wallet addresses.
# 2. Check if all addresses in 'used addresses' have been matched, if
# so, break.
# 4. Repeat the above for batch 2, 3.. up to max 20 batches.
# 5. If after all 20 batches not all used addresses were matched,
# 3. Repeat the above for batch 2, 3.. up to max 20 batches.
# 4. If after all 20 batches not all used addresses were matched,
# quit with error.
# 5. Calculate used indices.
# 6. If all used addresses were matched, set wallet index to highest
# found index in each branch and mark wallet sync complete.
#Rationale for this algo:
@ -466,170 +456,143 @@ class BitcoinCoreInterface(BlockchainInterface):
# not be exposed to the user; it is not the same as gap limit, in fact,
# the concept of gap limit does not apply to this kind of sync, which
# *assumes* that the most recent usage of addresses is indeed recorded.
used_indices = {}
local_addr_cache = {}
found_addresses = []
remaining_used_addresses = used_addresses.copy()
addresses, saved_indices = self._collect_addresses_init(wallet)
for addr in addresses:
remaining_used_addresses.discard(addr)
BATCH_SIZE = 100
for j in range(20):
for md in range(wallet.max_mix_depth):
if md not in used_indices:
used_indices[md] = {}
for fc in [0, 1]:
if fc not in used_indices[md]:
used_indices[md][fc] = []
for i in range(j*BATCH_SIZE, (j+1)*BATCH_SIZE):
local_addr_cache[(md, fc, i)] = wallet.get_addr(md, fc, i)
batch_found_addresses = [x for x in local_addr_cache.iteritems(
) if x[1] in used_address_dict.keys()]
for x in batch_found_addresses:
md, fc, i = x[0]
addr = x[1]
used_indices[md][fc].append(i)
wallet.addr_cache[addr] = (md, fc, i)
found_addresses.extend(batch_found_addresses)
if len(found_addresses) == len(used_address_dict.keys()):
MAX_ITERATIONS = 20
for j in range(MAX_ITERATIONS):
if not remaining_used_addresses:
break
if j == 19:
for addr in \
self._collect_addresses_gap(wallet, gap_limit=BATCH_SIZE):
remaining_used_addresses.discard(addr)
else:
raise Exception("Failed to sync in fast mode after 20 batches; "
"please re-try wallet sync without --fast flag.")
#Find the highest index in each branch and set the wallet index
for md in range(wallet.max_mix_depth):
for fc in [0, 1]:
if len(used_indices[md][fc]):
used_indices[md][fc].sort()
wallet.index[md][fc] = used_indices[md][fc][-1] + 1
else:
wallet.index[md][fc] = 0
if not is_index_ahead_of_cache(wallet, md, fc):
wallet.index[md][fc] = wallet.index_cache[md][fc]
self.wallet_synced = True
# creating used_indices on-the-fly would be more efficient, but the
# overall performance gain is probably negligible
used_indices = self._get_used_indices(wallet, used_addresses)
self._rewind_wallet_indices(wallet, used_indices, saved_indices)
self.wallet_synced = True
def sync_addresses(self, wallet, restart_cb=None):
log.debug('requesting detailed wallet history')
log.debug("requesting detailed wallet history")
wallet_name = self.get_wallet_name(wallet)
#TODO It is worth considering making this user configurable:
addr_req_count = 20
wallet_addr_list = []
for mix_depth in range(wallet.max_mix_depth):
for forchange in [0, 1]:
#If we have an index-cache available, we can use it
#to decide how much to import (note that this list
#*always* starts from index 0 on each branch).
#In cases where the Bitcoin Core instance is fresh,
#this will allow the entire import+rescan to occur
#in 2 steps only.
if wallet.index_cache != [[0, 0]] * wallet.max_mix_depth:
#Need to request N*addr_req_count where N is least s.t.
#N*addr_req_count > index_cache val. This is so that the batching
#process in the main loop *always* has already imported enough
#addresses to complete.
req_count = int(wallet.index_cache[mix_depth][forchange] /
addr_req_count) + 1
req_count *= addr_req_count
else:
#If we have *nothing* - no index_cache, and no info
#in Core wallet (imports), we revert to a batching mode
#with a default size.
#In this scenario it could require several restarts *and*
#rescans; perhaps user should set addr_req_count high
#(see above TODO)
req_count = addr_req_count
wallet_addr_list += [wallet.get_new_addr(mix_depth, forchange)
for _ in range(req_count)]
#Indices are reset here so that the next algorithm step starts
#from the beginning of each branch
wallet.index[mix_depth][forchange] = 0
# makes more sense to add these in an account called "joinmarket-imported" but its much
# simpler to add to the same account here
for privkey_list in wallet.imported_privkeys.values():
for privkey in privkey_list:
imported_addr = btc.privtoaddr(privkey,
magicbyte=get_p2pk_vbyte())
wallet_addr_list.append(imported_addr)
imported_addr_list = self.rpc('getaddressesbyaccount', [wallet_name])
if not set(wallet_addr_list).issubset(set(imported_addr_list)):
self.add_watchonly_addresses(wallet_addr_list, wallet_name, restart_cb)
return
buf = self.rpc('listtransactions', [wallet_name, 1000, 0, True])
txs = buf
# If the buffer's full, check for more, until it ain't
while len(buf) == 1000:
buf = self.rpc('listtransactions', [wallet_name, 1000, len(txs),
True])
txs += buf
# TODO check whether used_addr_list can be a set, may be faster (if
# its a hashset) and allows using issubset() here and setdiff() for
# finding which addresses need importing
# TODO also check the fastest way to build up python lists, i suspect
# using += is slow
used_addr_list = [tx['address']
for tx in txs if tx['category'] == 'receive']
too_few_addr_mix_change = []
for mix_depth in range(wallet.max_mix_depth):
for forchange in [0, 1]:
unused_addr_count = 0
last_used_addr = ''
breakloop = False
while not breakloop:
if unused_addr_count >= wallet.gaplimit and \
is_index_ahead_of_cache(wallet, mix_depth,
forchange):
break
mix_change_addrs = [
wallet.get_new_addr(mix_depth, forchange)
for _ in range(addr_req_count)
]
for mc_addr in mix_change_addrs:
if mc_addr not in imported_addr_list:
too_few_addr_mix_change.append((mix_depth, forchange
))
breakloop = True
break
if mc_addr in used_addr_list:
last_used_addr = mc_addr
unused_addr_count = 0
else:
unused_addr_count += 1
#index setting here depends on whether we broke out of the loop
#early; if we did, it means we need to prepare the index
#at the level of the last used address or zero so as to not
#miss any imports in add_watchonly_addresses.
#If we didn't, we need to respect the index_cache to avoid
#potential address reuse.
if breakloop:
if last_used_addr == '':
wallet.index[mix_depth][forchange] = 0
else:
wallet.index[mix_depth][forchange] = \
wallet.addr_cache[last_used_addr][2] + 1
else:
if last_used_addr == '':
next_avail_idx = max([wallet.index_cache[mix_depth][
forchange], 0])
else:
next_avail_idx = max([wallet.addr_cache[last_used_addr][
2] + 1, wallet.index_cache[mix_depth][forchange]])
wallet.index[mix_depth][forchange] = next_avail_idx
wallet_addr_list = []
if len(too_few_addr_mix_change) > 0:
indices = [wallet.index[mc[0]][mc[1]]
for mc in too_few_addr_mix_change]
log.debug('too few addresses in ' + str(too_few_addr_mix_change) +
' at ' + str(indices))
for mix_depth, forchange in too_few_addr_mix_change:
wallet_addr_list += [
wallet.get_new_addr(mix_depth, forchange)
for _ in range(addr_req_count * 3)
]
self.add_watchonly_addresses(wallet_addr_list, wallet_name, restart_cb)
addresses, saved_indices = self._collect_addresses_init(wallet)
imported_addresses = set(self.rpc('getaddressesbyaccount', [wallet_name]))
if not addresses.issubset(imported_addresses):
self.add_watchonly_addresses(addresses - imported_addresses,
wallet_name, restart_cb)
return
self.wallet_synced = True
used_addresses_gen = (tx['address']
for tx in self._yield_transactions(wallet_name)
if tx['category'] == 'receive')
used_indices = self._get_used_indices(wallet, used_addresses_gen)
log.debug("got used indices: {}".format(used_indices))
gap_limit_used = not self._check_gap_indices(wallet, used_indices)
self._rewind_wallet_indices(wallet, used_indices, saved_indices)
new_addresses = self._collect_addresses_gap(wallet)
if not new_addresses.issubset(imported_addresses):
log.debug("Syncing iteration finished, additional step required")
self.add_watchonly_addresses(new_addresses - imported_addresses,
wallet_name, restart_cb)
self.wallet_synced = False
elif gap_limit_used:
log.debug("Syncing iteration finished, additional step required")
self.wallet_synced = False
else:
log.debug("Wallet successfully synced")
self._rewind_wallet_indices(wallet, used_indices, saved_indices)
self.wallet_synced = True
@staticmethod
def _rewind_wallet_indices(wallet, used_indices, saved_indices):
for md in used_indices:
for int_type in (0, 1):
index = max(used_indices[md][int_type],
saved_indices[md][int_type])
wallet.set_next_index(md, int_type, index, force=True)
@staticmethod
def _get_used_indices(wallet, addr_gen):
indices = {x: [0, 0] for x in range(wallet.max_mixdepth + 1)}
for addr in addr_gen:
if not wallet.is_known_addr(addr):
continue
md, internal, index = wallet.get_details(
wallet.addr_to_path(addr))
if internal not in (0, 1):
assert internal == 'imported'
continue
indices[md][internal] = max(indices[md][internal], index + 1)
return indices
@staticmethod
def _check_gap_indices(wallet, used_indices):
for md in used_indices:
for internal in (0, 1):
if used_indices[md][internal] >\
max(wallet.get_next_unused_index(md, internal), 0):
return False
return True
@staticmethod
def _collect_addresses_init(wallet):
addresses = set()
saved_indices = dict()
for md in range(wallet.max_mixdepth + 1):
saved_indices[md] = [0, 0]
for internal in (0, 1):
next_unused = wallet.get_next_unused_index(md, internal)
for index in range(next_unused):
addresses.add(wallet.get_addr(md, internal, index))
for index in range(wallet.gap_limit):
addresses.add(wallet.get_new_addr(md, internal))
wallet.set_next_index(md, internal, next_unused)
saved_indices[md][internal] = next_unused
for path in wallet.yield_imported_paths(md):
addresses.add(wallet.get_addr_path(path))
return addresses, saved_indices
@staticmethod
def _collect_addresses_gap(wallet, gap_limit=None):
gap_limit = gap_limit or wallet.gap_limit
addresses = set()
for md in range(wallet.max_mixdepth + 1):
for internal in (True, False):
old_next = wallet.get_next_unused_index(md, internal)
for index in range(gap_limit):
addresses.add(wallet.get_new_addr(md, internal))
wallet.set_next_index(md, internal, old_next)
return addresses
def _yield_transactions(self, wallet_name):
batch_size = 1000
iteration = 0
while True:
new = self.rpc(
'listtransactions',
[wallet_name, batch_size, iteration * batch_size, True])
for tx in new:
yield tx
if len(new) < batch_size:
return
iteration += 1
def start_unspent_monitoring(self, wallet):
self.unspent_monitoring_loop = task.LoopingCall(self.sync_unspent, wallet)
@ -654,8 +617,7 @@ class BitcoinCoreInterface(BlockchainInterface):
continue
if u['account'] != wallet_name:
continue
# TODO
if u['address'] not in wallet.addr_cache:
if not wallet.is_known_addr(u['address']):
continue
self._add_unspent_utxo(wallet, u)
et = time.time()

2
jmclient/jmclient/electruminterface.py

@ -356,7 +356,7 @@ class ElectrumInterface(BlockchainInterface):
self.listunspent_calls = len(addrs)
for a in addrs:
# FIXME: update to protocol version 1.1 and use scripthash instead
script = wallet.address_to_script(a)
script = wallet.addr_to_script(a)
d = self.get_from_electrum('blockchain.address.listunspent', a)
d.addCallback(self.process_listunspent_data, wallet, script)

6
test/test_segwit.py

@ -113,7 +113,7 @@ def test_spend_p2sh_p2wpkh_multi(setup_segwit, wallet_structure, in_amt, amount,
# import new addresses to bitcoind
jm_single().bc_interface.import_addresses(
[nsw_wallet.script_to_address(x)
[nsw_wallet.script_to_addr(x)
for x in [cj_script, change_script]],
jm_single().bc_interface.get_wallet_name(nsw_wallet))
@ -137,8 +137,8 @@ def test_spend_p2sh_p2wpkh_multi(setup_segwit, wallet_structure, in_amt, amount,
assert txid
balances = jm_single().bc_interface.get_received_by_addr(
[nsw_wallet.script_to_address(cj_script),
nsw_wallet.script_to_address(change_script)], None)['data']
[nsw_wallet.script_to_addr(cj_script),
nsw_wallet.script_to_addr(change_script)], None)['data']
assert balances[0]['balance'] == amount
assert balances[1]['balance'] == change_amt

Loading…
Cancel
Save