From 1f309678ca82aeb2bbc8a24a356a3574f4d077f3 Mon Sep 17 00:00:00 2001 From: undeath Date: Fri, 6 Jul 2018 14:57:39 +0200 Subject: [PATCH] adopt blockchaininterface for new wallet --- jmclient/jmclient/blockchaininterface.py | 328 ++++++++++------------- jmclient/jmclient/electruminterface.py | 2 +- test/test_segwit.py | 6 +- 3 files changed, 149 insertions(+), 187 deletions(-) diff --git a/jmclient/jmclient/blockchaininterface.py b/jmclient/jmclient/blockchaininterface.py index 447255c..d5d7f8e 100644 --- a/jmclient/jmclient/blockchaininterface.py +++ b/jmclient/jmclient/blockchaininterface.py @@ -17,11 +17,6 @@ from jmbase.support import get_log log = get_log() -def is_index_ahead_of_cache(wallet, mix_depth, forchange): - if mix_depth >= len(wallet.index_cache): - return True - return wallet.index[mix_depth][forchange] >= wallet.index_cache[mix_depth][ - forchange] def sync_wallet(wallet, fast=False): """Wrapper function to choose fast syncing where it's @@ -55,8 +50,7 @@ class BlockchainInterface(object): @abc.abstractmethod def sync_addresses(self, wallet): - """Finds which addresses have been used and sets - wallet.index appropriately""" + """Finds which addresses have been used""" @abc.abstractmethod def sync_unspent(self, wallet): @@ -421,41 +415,37 @@ class BitcoinCoreInterface(BlockchainInterface): wallet_name = self.get_wallet_name(wallet) agd = self.rpc('listaddressgroupings', []) #flatten all groups into a single list; then, remove duplicates - fagd = [tuple(item) for sublist in agd for item in sublist] + fagd = (tuple(item) for sublist in agd for item in sublist) #"deduplicated flattened address grouping data" = dfagd - dfagd = list(set(fagd)) - #for lookup, want dict of form {"address": amount} - used_address_dict = {} + dfagd = set(fagd) + used_addresses = set() for addr_info in dfagd: if len(addr_info) < 3 or addr_info[2] != wallet_name: continue - used_address_dict[addr_info[0]] = (addr_info[1], addr_info[2]) + used_addresses.add(addr_info[0]) + #for a first run, import first chunk - if len(used_address_dict.keys()) == 0: + if not used_addresses: log.info("Detected new wallet, performing initial import") - for i in range(wallet.max_mix_depth): - for j in [0, 1]: - addrs_to_import = [] - for k in range(wallet.gaplimit + 10): # a few more for safety - addrs_to_import.append(wallet.get_addr(i, j, k)) - self.import_addresses(addrs_to_import, wallet_name) - wallet.index[i][j] = 0 + # delegate inital address import to sync_addresses + # this should be fast because "getaddressesbyaccount" should return + # an empty list in this case + self.sync_addresses(wallet) self.wallet_synced = True return + #Wallet has been used; scan forwards. log.debug("Fast sync in progress. Got this many used addresses: " + str( - len(used_address_dict))) + len(used_addresses))) #Need to have wallet.index point to the last used address - #and fill addr_cache. #Algo: - # 1. Scan batch 1 of each branch, accumulate wallet addresses into dict. - # 2. Find matches between that dict and used addresses, add those to - # used_indices dict and add to address cache. - # 3. Check if all addresses in 'used addresses' have been matched, if + # 1. Scan batch 1 of each branch, record matched wallet addresses. + # 2. Check if all addresses in 'used addresses' have been matched, if # so, break. - # 4. Repeat the above for batch 2, 3.. up to max 20 batches. - # 5. If after all 20 batches not all used addresses were matched, + # 3. Repeat the above for batch 2, 3.. up to max 20 batches. + # 4. If after all 20 batches not all used addresses were matched, # quit with error. + # 5. Calculate used indices. # 6. If all used addresses were matched, set wallet index to highest # found index in each branch and mark wallet sync complete. #Rationale for this algo: @@ -466,170 +456,143 @@ class BitcoinCoreInterface(BlockchainInterface): # not be exposed to the user; it is not the same as gap limit, in fact, # the concept of gap limit does not apply to this kind of sync, which # *assumes* that the most recent usage of addresses is indeed recorded. - used_indices = {} - local_addr_cache = {} - found_addresses = [] + remaining_used_addresses = used_addresses.copy() + addresses, saved_indices = self._collect_addresses_init(wallet) + for addr in addresses: + remaining_used_addresses.discard(addr) + BATCH_SIZE = 100 - for j in range(20): - for md in range(wallet.max_mix_depth): - if md not in used_indices: - used_indices[md] = {} - for fc in [0, 1]: - if fc not in used_indices[md]: - used_indices[md][fc] = [] - for i in range(j*BATCH_SIZE, (j+1)*BATCH_SIZE): - local_addr_cache[(md, fc, i)] = wallet.get_addr(md, fc, i) - batch_found_addresses = [x for x in local_addr_cache.iteritems( - ) if x[1] in used_address_dict.keys()] - for x in batch_found_addresses: - md, fc, i = x[0] - addr = x[1] - used_indices[md][fc].append(i) - wallet.addr_cache[addr] = (md, fc, i) - found_addresses.extend(batch_found_addresses) - if len(found_addresses) == len(used_address_dict.keys()): + MAX_ITERATIONS = 20 + for j in range(MAX_ITERATIONS): + if not remaining_used_addresses: break - if j == 19: + for addr in \ + self._collect_addresses_gap(wallet, gap_limit=BATCH_SIZE): + remaining_used_addresses.discard(addr) + else: raise Exception("Failed to sync in fast mode after 20 batches; " "please re-try wallet sync without --fast flag.") - #Find the highest index in each branch and set the wallet index - for md in range(wallet.max_mix_depth): - for fc in [0, 1]: - if len(used_indices[md][fc]): - used_indices[md][fc].sort() - wallet.index[md][fc] = used_indices[md][fc][-1] + 1 - else: - wallet.index[md][fc] = 0 - if not is_index_ahead_of_cache(wallet, md, fc): - wallet.index[md][fc] = wallet.index_cache[md][fc] - self.wallet_synced = True + # creating used_indices on-the-fly would be more efficient, but the + # overall performance gain is probably negligible + used_indices = self._get_used_indices(wallet, used_addresses) + self._rewind_wallet_indices(wallet, used_indices, saved_indices) + self.wallet_synced = True def sync_addresses(self, wallet, restart_cb=None): - log.debug('requesting detailed wallet history') + log.debug("requesting detailed wallet history") wallet_name = self.get_wallet_name(wallet) - #TODO It is worth considering making this user configurable: - addr_req_count = 20 - wallet_addr_list = [] - for mix_depth in range(wallet.max_mix_depth): - for forchange in [0, 1]: - #If we have an index-cache available, we can use it - #to decide how much to import (note that this list - #*always* starts from index 0 on each branch). - #In cases where the Bitcoin Core instance is fresh, - #this will allow the entire import+rescan to occur - #in 2 steps only. - if wallet.index_cache != [[0, 0]] * wallet.max_mix_depth: - #Need to request N*addr_req_count where N is least s.t. - #N*addr_req_count > index_cache val. This is so that the batching - #process in the main loop *always* has already imported enough - #addresses to complete. - req_count = int(wallet.index_cache[mix_depth][forchange] / - addr_req_count) + 1 - req_count *= addr_req_count - else: - #If we have *nothing* - no index_cache, and no info - #in Core wallet (imports), we revert to a batching mode - #with a default size. - #In this scenario it could require several restarts *and* - #rescans; perhaps user should set addr_req_count high - #(see above TODO) - req_count = addr_req_count - wallet_addr_list += [wallet.get_new_addr(mix_depth, forchange) - for _ in range(req_count)] - #Indices are reset here so that the next algorithm step starts - #from the beginning of each branch - wallet.index[mix_depth][forchange] = 0 - # makes more sense to add these in an account called "joinmarket-imported" but its much - # simpler to add to the same account here - for privkey_list in wallet.imported_privkeys.values(): - for privkey in privkey_list: - imported_addr = btc.privtoaddr(privkey, - magicbyte=get_p2pk_vbyte()) - wallet_addr_list.append(imported_addr) - imported_addr_list = self.rpc('getaddressesbyaccount', [wallet_name]) - if not set(wallet_addr_list).issubset(set(imported_addr_list)): - self.add_watchonly_addresses(wallet_addr_list, wallet_name, restart_cb) - return - buf = self.rpc('listtransactions', [wallet_name, 1000, 0, True]) - txs = buf - # If the buffer's full, check for more, until it ain't - while len(buf) == 1000: - buf = self.rpc('listtransactions', [wallet_name, 1000, len(txs), - True]) - txs += buf - # TODO check whether used_addr_list can be a set, may be faster (if - # its a hashset) and allows using issubset() here and setdiff() for - # finding which addresses need importing - - # TODO also check the fastest way to build up python lists, i suspect - # using += is slow - used_addr_list = [tx['address'] - for tx in txs if tx['category'] == 'receive'] - too_few_addr_mix_change = [] - for mix_depth in range(wallet.max_mix_depth): - for forchange in [0, 1]: - unused_addr_count = 0 - last_used_addr = '' - breakloop = False - while not breakloop: - if unused_addr_count >= wallet.gaplimit and \ - is_index_ahead_of_cache(wallet, mix_depth, - forchange): - break - mix_change_addrs = [ - wallet.get_new_addr(mix_depth, forchange) - for _ in range(addr_req_count) - ] - for mc_addr in mix_change_addrs: - if mc_addr not in imported_addr_list: - too_few_addr_mix_change.append((mix_depth, forchange - )) - breakloop = True - break - if mc_addr in used_addr_list: - last_used_addr = mc_addr - unused_addr_count = 0 - else: - unused_addr_count += 1 -#index setting here depends on whether we broke out of the loop -#early; if we did, it means we need to prepare the index -#at the level of the last used address or zero so as to not -#miss any imports in add_watchonly_addresses. -#If we didn't, we need to respect the index_cache to avoid -#potential address reuse. - if breakloop: - if last_used_addr == '': - wallet.index[mix_depth][forchange] = 0 - else: - wallet.index[mix_depth][forchange] = \ - wallet.addr_cache[last_used_addr][2] + 1 - else: - if last_used_addr == '': - next_avail_idx = max([wallet.index_cache[mix_depth][ - forchange], 0]) - else: - next_avail_idx = max([wallet.addr_cache[last_used_addr][ - 2] + 1, wallet.index_cache[mix_depth][forchange]]) - wallet.index[mix_depth][forchange] = next_avail_idx - - wallet_addr_list = [] - if len(too_few_addr_mix_change) > 0: - indices = [wallet.index[mc[0]][mc[1]] - for mc in too_few_addr_mix_change] - log.debug('too few addresses in ' + str(too_few_addr_mix_change) + - ' at ' + str(indices)) - for mix_depth, forchange in too_few_addr_mix_change: - wallet_addr_list += [ - wallet.get_new_addr(mix_depth, forchange) - for _ in range(addr_req_count * 3) - ] - - self.add_watchonly_addresses(wallet_addr_list, wallet_name, restart_cb) + addresses, saved_indices = self._collect_addresses_init(wallet) + imported_addresses = set(self.rpc('getaddressesbyaccount', [wallet_name])) + + if not addresses.issubset(imported_addresses): + self.add_watchonly_addresses(addresses - imported_addresses, + wallet_name, restart_cb) return - self.wallet_synced = True + used_addresses_gen = (tx['address'] + for tx in self._yield_transactions(wallet_name) + if tx['category'] == 'receive') + + used_indices = self._get_used_indices(wallet, used_addresses_gen) + log.debug("got used indices: {}".format(used_indices)) + gap_limit_used = not self._check_gap_indices(wallet, used_indices) + self._rewind_wallet_indices(wallet, used_indices, saved_indices) + + new_addresses = self._collect_addresses_gap(wallet) + if not new_addresses.issubset(imported_addresses): + log.debug("Syncing iteration finished, additional step required") + self.add_watchonly_addresses(new_addresses - imported_addresses, + wallet_name, restart_cb) + self.wallet_synced = False + elif gap_limit_used: + log.debug("Syncing iteration finished, additional step required") + self.wallet_synced = False + else: + log.debug("Wallet successfully synced") + self._rewind_wallet_indices(wallet, used_indices, saved_indices) + self.wallet_synced = True + + @staticmethod + def _rewind_wallet_indices(wallet, used_indices, saved_indices): + for md in used_indices: + for int_type in (0, 1): + index = max(used_indices[md][int_type], + saved_indices[md][int_type]) + wallet.set_next_index(md, int_type, index, force=True) + + @staticmethod + def _get_used_indices(wallet, addr_gen): + indices = {x: [0, 0] for x in range(wallet.max_mixdepth + 1)} + + for addr in addr_gen: + if not wallet.is_known_addr(addr): + continue + md, internal, index = wallet.get_details( + wallet.addr_to_path(addr)) + if internal not in (0, 1): + assert internal == 'imported' + continue + indices[md][internal] = max(indices[md][internal], index + 1) + + return indices + + @staticmethod + def _check_gap_indices(wallet, used_indices): + for md in used_indices: + for internal in (0, 1): + if used_indices[md][internal] >\ + max(wallet.get_next_unused_index(md, internal), 0): + return False + return True + + @staticmethod + def _collect_addresses_init(wallet): + addresses = set() + saved_indices = dict() + + for md in range(wallet.max_mixdepth + 1): + saved_indices[md] = [0, 0] + for internal in (0, 1): + next_unused = wallet.get_next_unused_index(md, internal) + for index in range(next_unused): + addresses.add(wallet.get_addr(md, internal, index)) + for index in range(wallet.gap_limit): + addresses.add(wallet.get_new_addr(md, internal)) + wallet.set_next_index(md, internal, next_unused) + saved_indices[md][internal] = next_unused + for path in wallet.yield_imported_paths(md): + addresses.add(wallet.get_addr_path(path)) + + return addresses, saved_indices + + @staticmethod + def _collect_addresses_gap(wallet, gap_limit=None): + gap_limit = gap_limit or wallet.gap_limit + addresses = set() + + for md in range(wallet.max_mixdepth + 1): + for internal in (True, False): + old_next = wallet.get_next_unused_index(md, internal) + for index in range(gap_limit): + addresses.add(wallet.get_new_addr(md, internal)) + wallet.set_next_index(md, internal, old_next) + + return addresses + + def _yield_transactions(self, wallet_name): + batch_size = 1000 + iteration = 0 + while True: + new = self.rpc( + 'listtransactions', + [wallet_name, batch_size, iteration * batch_size, True]) + for tx in new: + yield tx + if len(new) < batch_size: + return + iteration += 1 def start_unspent_monitoring(self, wallet): self.unspent_monitoring_loop = task.LoopingCall(self.sync_unspent, wallet) @@ -654,8 +617,7 @@ class BitcoinCoreInterface(BlockchainInterface): continue if u['account'] != wallet_name: continue - # TODO - if u['address'] not in wallet.addr_cache: + if not wallet.is_known_addr(u['address']): continue self._add_unspent_utxo(wallet, u) et = time.time() diff --git a/jmclient/jmclient/electruminterface.py b/jmclient/jmclient/electruminterface.py index cf3cd5b..b7edd86 100644 --- a/jmclient/jmclient/electruminterface.py +++ b/jmclient/jmclient/electruminterface.py @@ -356,7 +356,7 @@ class ElectrumInterface(BlockchainInterface): self.listunspent_calls = len(addrs) for a in addrs: # FIXME: update to protocol version 1.1 and use scripthash instead - script = wallet.address_to_script(a) + script = wallet.addr_to_script(a) d = self.get_from_electrum('blockchain.address.listunspent', a) d.addCallback(self.process_listunspent_data, wallet, script) diff --git a/test/test_segwit.py b/test/test_segwit.py index cfe8b1f..b590500 100644 --- a/test/test_segwit.py +++ b/test/test_segwit.py @@ -113,7 +113,7 @@ def test_spend_p2sh_p2wpkh_multi(setup_segwit, wallet_structure, in_amt, amount, # import new addresses to bitcoind jm_single().bc_interface.import_addresses( - [nsw_wallet.script_to_address(x) + [nsw_wallet.script_to_addr(x) for x in [cj_script, change_script]], jm_single().bc_interface.get_wallet_name(nsw_wallet)) @@ -137,8 +137,8 @@ def test_spend_p2sh_p2wpkh_multi(setup_segwit, wallet_structure, in_amt, amount, assert txid balances = jm_single().bc_interface.get_received_by_addr( - [nsw_wallet.script_to_address(cj_script), - nsw_wallet.script_to_address(change_script)], None)['data'] + [nsw_wallet.script_to_addr(cj_script), + nsw_wallet.script_to_addr(change_script)], None)['data'] assert balances[0]['balance'] == amount assert balances[1]['balance'] == change_amt