|
|
|
|
@ -17,11 +17,6 @@ from jmbase.support import get_log
|
|
|
|
|
|
|
|
|
|
log = get_log() |
|
|
|
|
|
|
|
|
|
def is_index_ahead_of_cache(wallet, mix_depth, forchange): |
|
|
|
|
if mix_depth >= len(wallet.index_cache): |
|
|
|
|
return True |
|
|
|
|
return wallet.index[mix_depth][forchange] >= wallet.index_cache[mix_depth][ |
|
|
|
|
forchange] |
|
|
|
|
|
|
|
|
|
def sync_wallet(wallet, fast=False): |
|
|
|
|
"""Wrapper function to choose fast syncing where it's |
|
|
|
|
@ -55,8 +50,7 @@ class BlockchainInterface(object):
|
|
|
|
|
|
|
|
|
|
@abc.abstractmethod |
|
|
|
|
def sync_addresses(self, wallet): |
|
|
|
|
"""Finds which addresses have been used and sets |
|
|
|
|
wallet.index appropriately""" |
|
|
|
|
"""Finds which addresses have been used""" |
|
|
|
|
|
|
|
|
|
@abc.abstractmethod |
|
|
|
|
def sync_unspent(self, wallet): |
|
|
|
|
@ -421,41 +415,37 @@ class BitcoinCoreInterface(BlockchainInterface):
|
|
|
|
|
wallet_name = self.get_wallet_name(wallet) |
|
|
|
|
agd = self.rpc('listaddressgroupings', []) |
|
|
|
|
#flatten all groups into a single list; then, remove duplicates |
|
|
|
|
fagd = [tuple(item) for sublist in agd for item in sublist] |
|
|
|
|
fagd = (tuple(item) for sublist in agd for item in sublist) |
|
|
|
|
#"deduplicated flattened address grouping data" = dfagd |
|
|
|
|
dfagd = list(set(fagd)) |
|
|
|
|
#for lookup, want dict of form {"address": amount} |
|
|
|
|
used_address_dict = {} |
|
|
|
|
dfagd = set(fagd) |
|
|
|
|
used_addresses = set() |
|
|
|
|
for addr_info in dfagd: |
|
|
|
|
if len(addr_info) < 3 or addr_info[2] != wallet_name: |
|
|
|
|
continue |
|
|
|
|
used_address_dict[addr_info[0]] = (addr_info[1], addr_info[2]) |
|
|
|
|
used_addresses.add(addr_info[0]) |
|
|
|
|
|
|
|
|
|
#for a first run, import first chunk |
|
|
|
|
if len(used_address_dict.keys()) == 0: |
|
|
|
|
if not used_addresses: |
|
|
|
|
log.info("Detected new wallet, performing initial import") |
|
|
|
|
for i in range(wallet.max_mix_depth): |
|
|
|
|
for j in [0, 1]: |
|
|
|
|
addrs_to_import = [] |
|
|
|
|
for k in range(wallet.gaplimit + 10): # a few more for safety |
|
|
|
|
addrs_to_import.append(wallet.get_addr(i, j, k)) |
|
|
|
|
self.import_addresses(addrs_to_import, wallet_name) |
|
|
|
|
wallet.index[i][j] = 0 |
|
|
|
|
# delegate inital address import to sync_addresses |
|
|
|
|
# this should be fast because "getaddressesbyaccount" should return |
|
|
|
|
# an empty list in this case |
|
|
|
|
self.sync_addresses(wallet) |
|
|
|
|
self.wallet_synced = True |
|
|
|
|
return |
|
|
|
|
|
|
|
|
|
#Wallet has been used; scan forwards. |
|
|
|
|
log.debug("Fast sync in progress. Got this many used addresses: " + str( |
|
|
|
|
len(used_address_dict))) |
|
|
|
|
len(used_addresses))) |
|
|
|
|
#Need to have wallet.index point to the last used address |
|
|
|
|
#and fill addr_cache. |
|
|
|
|
#Algo: |
|
|
|
|
# 1. Scan batch 1 of each branch, accumulate wallet addresses into dict. |
|
|
|
|
# 2. Find matches between that dict and used addresses, add those to |
|
|
|
|
# used_indices dict and add to address cache. |
|
|
|
|
# 3. Check if all addresses in 'used addresses' have been matched, if |
|
|
|
|
# 1. Scan batch 1 of each branch, record matched wallet addresses. |
|
|
|
|
# 2. Check if all addresses in 'used addresses' have been matched, if |
|
|
|
|
# so, break. |
|
|
|
|
# 4. Repeat the above for batch 2, 3.. up to max 20 batches. |
|
|
|
|
# 5. If after all 20 batches not all used addresses were matched, |
|
|
|
|
# 3. Repeat the above for batch 2, 3.. up to max 20 batches. |
|
|
|
|
# 4. If after all 20 batches not all used addresses were matched, |
|
|
|
|
# quit with error. |
|
|
|
|
# 5. Calculate used indices. |
|
|
|
|
# 6. If all used addresses were matched, set wallet index to highest |
|
|
|
|
# found index in each branch and mark wallet sync complete. |
|
|
|
|
#Rationale for this algo: |
|
|
|
|
@ -466,170 +456,143 @@ class BitcoinCoreInterface(BlockchainInterface):
|
|
|
|
|
# not be exposed to the user; it is not the same as gap limit, in fact, |
|
|
|
|
# the concept of gap limit does not apply to this kind of sync, which |
|
|
|
|
# *assumes* that the most recent usage of addresses is indeed recorded. |
|
|
|
|
used_indices = {} |
|
|
|
|
local_addr_cache = {} |
|
|
|
|
found_addresses = [] |
|
|
|
|
remaining_used_addresses = used_addresses.copy() |
|
|
|
|
addresses, saved_indices = self._collect_addresses_init(wallet) |
|
|
|
|
for addr in addresses: |
|
|
|
|
remaining_used_addresses.discard(addr) |
|
|
|
|
|
|
|
|
|
BATCH_SIZE = 100 |
|
|
|
|
for j in range(20): |
|
|
|
|
for md in range(wallet.max_mix_depth): |
|
|
|
|
if md not in used_indices: |
|
|
|
|
used_indices[md] = {} |
|
|
|
|
for fc in [0, 1]: |
|
|
|
|
if fc not in used_indices[md]: |
|
|
|
|
used_indices[md][fc] = [] |
|
|
|
|
for i in range(j*BATCH_SIZE, (j+1)*BATCH_SIZE): |
|
|
|
|
local_addr_cache[(md, fc, i)] = wallet.get_addr(md, fc, i) |
|
|
|
|
batch_found_addresses = [x for x in local_addr_cache.iteritems( |
|
|
|
|
) if x[1] in used_address_dict.keys()] |
|
|
|
|
for x in batch_found_addresses: |
|
|
|
|
md, fc, i = x[0] |
|
|
|
|
addr = x[1] |
|
|
|
|
used_indices[md][fc].append(i) |
|
|
|
|
wallet.addr_cache[addr] = (md, fc, i) |
|
|
|
|
found_addresses.extend(batch_found_addresses) |
|
|
|
|
if len(found_addresses) == len(used_address_dict.keys()): |
|
|
|
|
MAX_ITERATIONS = 20 |
|
|
|
|
for j in range(MAX_ITERATIONS): |
|
|
|
|
if not remaining_used_addresses: |
|
|
|
|
break |
|
|
|
|
if j == 19: |
|
|
|
|
for addr in \ |
|
|
|
|
self._collect_addresses_gap(wallet, gap_limit=BATCH_SIZE): |
|
|
|
|
remaining_used_addresses.discard(addr) |
|
|
|
|
else: |
|
|
|
|
raise Exception("Failed to sync in fast mode after 20 batches; " |
|
|
|
|
"please re-try wallet sync without --fast flag.") |
|
|
|
|
#Find the highest index in each branch and set the wallet index |
|
|
|
|
for md in range(wallet.max_mix_depth): |
|
|
|
|
for fc in [0, 1]: |
|
|
|
|
if len(used_indices[md][fc]): |
|
|
|
|
used_indices[md][fc].sort() |
|
|
|
|
wallet.index[md][fc] = used_indices[md][fc][-1] + 1 |
|
|
|
|
else: |
|
|
|
|
wallet.index[md][fc] = 0 |
|
|
|
|
if not is_index_ahead_of_cache(wallet, md, fc): |
|
|
|
|
wallet.index[md][fc] = wallet.index_cache[md][fc] |
|
|
|
|
self.wallet_synced = True |
|
|
|
|
|
|
|
|
|
# creating used_indices on-the-fly would be more efficient, but the |
|
|
|
|
# overall performance gain is probably negligible |
|
|
|
|
used_indices = self._get_used_indices(wallet, used_addresses) |
|
|
|
|
self._rewind_wallet_indices(wallet, used_indices, saved_indices) |
|
|
|
|
self.wallet_synced = True |
|
|
|
|
|
|
|
|
|
def sync_addresses(self, wallet, restart_cb=None): |
|
|
|
|
log.debug('requesting detailed wallet history') |
|
|
|
|
log.debug("requesting detailed wallet history") |
|
|
|
|
wallet_name = self.get_wallet_name(wallet) |
|
|
|
|
#TODO It is worth considering making this user configurable: |
|
|
|
|
addr_req_count = 20 |
|
|
|
|
wallet_addr_list = [] |
|
|
|
|
for mix_depth in range(wallet.max_mix_depth): |
|
|
|
|
for forchange in [0, 1]: |
|
|
|
|
#If we have an index-cache available, we can use it |
|
|
|
|
#to decide how much to import (note that this list |
|
|
|
|
#*always* starts from index 0 on each branch). |
|
|
|
|
#In cases where the Bitcoin Core instance is fresh, |
|
|
|
|
#this will allow the entire import+rescan to occur |
|
|
|
|
#in 2 steps only. |
|
|
|
|
if wallet.index_cache != [[0, 0]] * wallet.max_mix_depth: |
|
|
|
|
#Need to request N*addr_req_count where N is least s.t. |
|
|
|
|
#N*addr_req_count > index_cache val. This is so that the batching |
|
|
|
|
#process in the main loop *always* has already imported enough |
|
|
|
|
#addresses to complete. |
|
|
|
|
req_count = int(wallet.index_cache[mix_depth][forchange] / |
|
|
|
|
addr_req_count) + 1 |
|
|
|
|
req_count *= addr_req_count |
|
|
|
|
else: |
|
|
|
|
#If we have *nothing* - no index_cache, and no info |
|
|
|
|
#in Core wallet (imports), we revert to a batching mode |
|
|
|
|
#with a default size. |
|
|
|
|
#In this scenario it could require several restarts *and* |
|
|
|
|
#rescans; perhaps user should set addr_req_count high |
|
|
|
|
#(see above TODO) |
|
|
|
|
req_count = addr_req_count |
|
|
|
|
wallet_addr_list += [wallet.get_new_addr(mix_depth, forchange) |
|
|
|
|
for _ in range(req_count)] |
|
|
|
|
#Indices are reset here so that the next algorithm step starts |
|
|
|
|
#from the beginning of each branch |
|
|
|
|
wallet.index[mix_depth][forchange] = 0 |
|
|
|
|
# makes more sense to add these in an account called "joinmarket-imported" but its much |
|
|
|
|
# simpler to add to the same account here |
|
|
|
|
for privkey_list in wallet.imported_privkeys.values(): |
|
|
|
|
for privkey in privkey_list: |
|
|
|
|
imported_addr = btc.privtoaddr(privkey, |
|
|
|
|
magicbyte=get_p2pk_vbyte()) |
|
|
|
|
wallet_addr_list.append(imported_addr) |
|
|
|
|
imported_addr_list = self.rpc('getaddressesbyaccount', [wallet_name]) |
|
|
|
|
if not set(wallet_addr_list).issubset(set(imported_addr_list)): |
|
|
|
|
self.add_watchonly_addresses(wallet_addr_list, wallet_name, restart_cb) |
|
|
|
|
return |
|
|
|
|
|
|
|
|
|
buf = self.rpc('listtransactions', [wallet_name, 1000, 0, True]) |
|
|
|
|
txs = buf |
|
|
|
|
# If the buffer's full, check for more, until it ain't |
|
|
|
|
while len(buf) == 1000: |
|
|
|
|
buf = self.rpc('listtransactions', [wallet_name, 1000, len(txs), |
|
|
|
|
True]) |
|
|
|
|
txs += buf |
|
|
|
|
# TODO check whether used_addr_list can be a set, may be faster (if |
|
|
|
|
# its a hashset) and allows using issubset() here and setdiff() for |
|
|
|
|
# finding which addresses need importing |
|
|
|
|
|
|
|
|
|
# TODO also check the fastest way to build up python lists, i suspect |
|
|
|
|
# using += is slow |
|
|
|
|
used_addr_list = [tx['address'] |
|
|
|
|
for tx in txs if tx['category'] == 'receive'] |
|
|
|
|
too_few_addr_mix_change = [] |
|
|
|
|
for mix_depth in range(wallet.max_mix_depth): |
|
|
|
|
for forchange in [0, 1]: |
|
|
|
|
unused_addr_count = 0 |
|
|
|
|
last_used_addr = '' |
|
|
|
|
breakloop = False |
|
|
|
|
while not breakloop: |
|
|
|
|
if unused_addr_count >= wallet.gaplimit and \ |
|
|
|
|
is_index_ahead_of_cache(wallet, mix_depth, |
|
|
|
|
forchange): |
|
|
|
|
break |
|
|
|
|
mix_change_addrs = [ |
|
|
|
|
wallet.get_new_addr(mix_depth, forchange) |
|
|
|
|
for _ in range(addr_req_count) |
|
|
|
|
] |
|
|
|
|
for mc_addr in mix_change_addrs: |
|
|
|
|
if mc_addr not in imported_addr_list: |
|
|
|
|
too_few_addr_mix_change.append((mix_depth, forchange |
|
|
|
|
)) |
|
|
|
|
breakloop = True |
|
|
|
|
break |
|
|
|
|
if mc_addr in used_addr_list: |
|
|
|
|
last_used_addr = mc_addr |
|
|
|
|
unused_addr_count = 0 |
|
|
|
|
else: |
|
|
|
|
unused_addr_count += 1 |
|
|
|
|
#index setting here depends on whether we broke out of the loop |
|
|
|
|
#early; if we did, it means we need to prepare the index |
|
|
|
|
#at the level of the last used address or zero so as to not |
|
|
|
|
#miss any imports in add_watchonly_addresses. |
|
|
|
|
#If we didn't, we need to respect the index_cache to avoid |
|
|
|
|
#potential address reuse. |
|
|
|
|
if breakloop: |
|
|
|
|
if last_used_addr == '': |
|
|
|
|
wallet.index[mix_depth][forchange] = 0 |
|
|
|
|
else: |
|
|
|
|
wallet.index[mix_depth][forchange] = \ |
|
|
|
|
wallet.addr_cache[last_used_addr][2] + 1 |
|
|
|
|
else: |
|
|
|
|
if last_used_addr == '': |
|
|
|
|
next_avail_idx = max([wallet.index_cache[mix_depth][ |
|
|
|
|
forchange], 0]) |
|
|
|
|
else: |
|
|
|
|
next_avail_idx = max([wallet.addr_cache[last_used_addr][ |
|
|
|
|
2] + 1, wallet.index_cache[mix_depth][forchange]]) |
|
|
|
|
wallet.index[mix_depth][forchange] = next_avail_idx |
|
|
|
|
|
|
|
|
|
wallet_addr_list = [] |
|
|
|
|
if len(too_few_addr_mix_change) > 0: |
|
|
|
|
indices = [wallet.index[mc[0]][mc[1]] |
|
|
|
|
for mc in too_few_addr_mix_change] |
|
|
|
|
log.debug('too few addresses in ' + str(too_few_addr_mix_change) + |
|
|
|
|
' at ' + str(indices)) |
|
|
|
|
for mix_depth, forchange in too_few_addr_mix_change: |
|
|
|
|
wallet_addr_list += [ |
|
|
|
|
wallet.get_new_addr(mix_depth, forchange) |
|
|
|
|
for _ in range(addr_req_count * 3) |
|
|
|
|
] |
|
|
|
|
|
|
|
|
|
self.add_watchonly_addresses(wallet_addr_list, wallet_name, restart_cb) |
|
|
|
|
addresses, saved_indices = self._collect_addresses_init(wallet) |
|
|
|
|
imported_addresses = set(self.rpc('getaddressesbyaccount', [wallet_name])) |
|
|
|
|
|
|
|
|
|
if not addresses.issubset(imported_addresses): |
|
|
|
|
self.add_watchonly_addresses(addresses - imported_addresses, |
|
|
|
|
wallet_name, restart_cb) |
|
|
|
|
return |
|
|
|
|
|
|
|
|
|
self.wallet_synced = True |
|
|
|
|
used_addresses_gen = (tx['address'] |
|
|
|
|
for tx in self._yield_transactions(wallet_name) |
|
|
|
|
if tx['category'] == 'receive') |
|
|
|
|
|
|
|
|
|
used_indices = self._get_used_indices(wallet, used_addresses_gen) |
|
|
|
|
log.debug("got used indices: {}".format(used_indices)) |
|
|
|
|
gap_limit_used = not self._check_gap_indices(wallet, used_indices) |
|
|
|
|
self._rewind_wallet_indices(wallet, used_indices, saved_indices) |
|
|
|
|
|
|
|
|
|
new_addresses = self._collect_addresses_gap(wallet) |
|
|
|
|
if not new_addresses.issubset(imported_addresses): |
|
|
|
|
log.debug("Syncing iteration finished, additional step required") |
|
|
|
|
self.add_watchonly_addresses(new_addresses - imported_addresses, |
|
|
|
|
wallet_name, restart_cb) |
|
|
|
|
self.wallet_synced = False |
|
|
|
|
elif gap_limit_used: |
|
|
|
|
log.debug("Syncing iteration finished, additional step required") |
|
|
|
|
self.wallet_synced = False |
|
|
|
|
else: |
|
|
|
|
log.debug("Wallet successfully synced") |
|
|
|
|
self._rewind_wallet_indices(wallet, used_indices, saved_indices) |
|
|
|
|
self.wallet_synced = True |
|
|
|
|
|
|
|
|
|
@staticmethod |
|
|
|
|
def _rewind_wallet_indices(wallet, used_indices, saved_indices): |
|
|
|
|
for md in used_indices: |
|
|
|
|
for int_type in (0, 1): |
|
|
|
|
index = max(used_indices[md][int_type], |
|
|
|
|
saved_indices[md][int_type]) |
|
|
|
|
wallet.set_next_index(md, int_type, index, force=True) |
|
|
|
|
|
|
|
|
|
@staticmethod |
|
|
|
|
def _get_used_indices(wallet, addr_gen): |
|
|
|
|
indices = {x: [0, 0] for x in range(wallet.max_mixdepth + 1)} |
|
|
|
|
|
|
|
|
|
for addr in addr_gen: |
|
|
|
|
if not wallet.is_known_addr(addr): |
|
|
|
|
continue |
|
|
|
|
md, internal, index = wallet.get_details( |
|
|
|
|
wallet.addr_to_path(addr)) |
|
|
|
|
if internal not in (0, 1): |
|
|
|
|
assert internal == 'imported' |
|
|
|
|
continue |
|
|
|
|
indices[md][internal] = max(indices[md][internal], index + 1) |
|
|
|
|
|
|
|
|
|
return indices |
|
|
|
|
|
|
|
|
|
@staticmethod |
|
|
|
|
def _check_gap_indices(wallet, used_indices): |
|
|
|
|
for md in used_indices: |
|
|
|
|
for internal in (0, 1): |
|
|
|
|
if used_indices[md][internal] >\ |
|
|
|
|
max(wallet.get_next_unused_index(md, internal), 0): |
|
|
|
|
return False |
|
|
|
|
return True |
|
|
|
|
|
|
|
|
|
@staticmethod |
|
|
|
|
def _collect_addresses_init(wallet): |
|
|
|
|
addresses = set() |
|
|
|
|
saved_indices = dict() |
|
|
|
|
|
|
|
|
|
for md in range(wallet.max_mixdepth + 1): |
|
|
|
|
saved_indices[md] = [0, 0] |
|
|
|
|
for internal in (0, 1): |
|
|
|
|
next_unused = wallet.get_next_unused_index(md, internal) |
|
|
|
|
for index in range(next_unused): |
|
|
|
|
addresses.add(wallet.get_addr(md, internal, index)) |
|
|
|
|
for index in range(wallet.gap_limit): |
|
|
|
|
addresses.add(wallet.get_new_addr(md, internal)) |
|
|
|
|
wallet.set_next_index(md, internal, next_unused) |
|
|
|
|
saved_indices[md][internal] = next_unused |
|
|
|
|
for path in wallet.yield_imported_paths(md): |
|
|
|
|
addresses.add(wallet.get_addr_path(path)) |
|
|
|
|
|
|
|
|
|
return addresses, saved_indices |
|
|
|
|
|
|
|
|
|
@staticmethod |
|
|
|
|
def _collect_addresses_gap(wallet, gap_limit=None): |
|
|
|
|
gap_limit = gap_limit or wallet.gap_limit |
|
|
|
|
addresses = set() |
|
|
|
|
|
|
|
|
|
for md in range(wallet.max_mixdepth + 1): |
|
|
|
|
for internal in (True, False): |
|
|
|
|
old_next = wallet.get_next_unused_index(md, internal) |
|
|
|
|
for index in range(gap_limit): |
|
|
|
|
addresses.add(wallet.get_new_addr(md, internal)) |
|
|
|
|
wallet.set_next_index(md, internal, old_next) |
|
|
|
|
|
|
|
|
|
return addresses |
|
|
|
|
|
|
|
|
|
def _yield_transactions(self, wallet_name): |
|
|
|
|
batch_size = 1000 |
|
|
|
|
iteration = 0 |
|
|
|
|
while True: |
|
|
|
|
new = self.rpc( |
|
|
|
|
'listtransactions', |
|
|
|
|
[wallet_name, batch_size, iteration * batch_size, True]) |
|
|
|
|
for tx in new: |
|
|
|
|
yield tx |
|
|
|
|
if len(new) < batch_size: |
|
|
|
|
return |
|
|
|
|
iteration += 1 |
|
|
|
|
|
|
|
|
|
def start_unspent_monitoring(self, wallet): |
|
|
|
|
self.unspent_monitoring_loop = task.LoopingCall(self.sync_unspent, wallet) |
|
|
|
|
@ -654,8 +617,7 @@ class BitcoinCoreInterface(BlockchainInterface):
|
|
|
|
|
continue |
|
|
|
|
if u['account'] != wallet_name: |
|
|
|
|
continue |
|
|
|
|
# TODO |
|
|
|
|
if u['address'] not in wallet.addr_cache: |
|
|
|
|
if not wallet.is_known_addr(u['address']): |
|
|
|
|
continue |
|
|
|
|
self._add_unspent_utxo(wallet, u) |
|
|
|
|
et = time.time() |
|
|
|
|
|