Browse Source

aiorpcx address synchronizer

master
Janus 7 years ago committed by SomberNight
parent
commit
b120584f97
No known key found for this signature in database
GPG Key ID: B33B5F232C6271E9
  1. 1
      contrib/requirements/requirements.txt
  2. 11
      electrum/address_synchronizer.py
  3. 46
      electrum/network.py
  4. 259
      electrum/synchronizer.py
  5. 1
      run_electrum

1
contrib/requirements/requirements.txt

@ -8,3 +8,4 @@ jsonrpclib-pelix
PySocks>=1.6.6 PySocks>=1.6.6
qdarkstyle<3.0 qdarkstyle<3.0
typing>=3.0.0 typing>=3.0.0
aiorpcx>=0.7.1

11
electrum/address_synchronizer.py

@ -22,6 +22,7 @@
# SOFTWARE. # SOFTWARE.
import threading import threading
import asyncio
import itertools import itertools
from collections import defaultdict from collections import defaultdict
@ -138,16 +139,18 @@ class AddressSynchronizer(PrintError):
self.network = network self.network = network
if self.network is not None: if self.network is not None:
self.verifier = SPV(self.network, self) self.verifier = SPV(self.network, self)
self.synchronizer = Synchronizer(self, network) self.synchronizer = Synchronizer(self)
#network.add_jobs([self.verifier, self.synchronizer]) #network.add_jobs([self.verifier])
self.network.futures.append(asyncio.run_coroutine_threadsafe(self.synchronizer.send_subscriptions(), self.network.asyncio_loop))
self.network.futures.append(asyncio.run_coroutine_threadsafe(self.synchronizer.handle_status(), self.network.asyncio_loop))
self.network.futures.append(asyncio.run_coroutine_threadsafe(self.synchronizer.main(), self.network.asyncio_loop))
else: else:
self.verifier = None self.verifier = None
self.synchronizer = None self.synchronizer = None
def stop_threads(self): def stop_threads(self):
if self.network: if self.network:
#self.network.remove_jobs([self.synchronizer, self.verifier]) #self.network.remove_jobs([self.verifier])
self.synchronizer.release()
self.synchronizer = None self.synchronizer = None
self.verifier = None self.verifier = None
# Now no references to the synchronizer or verifier # Now no references to the synchronizer or verifier

46
electrum/network.py

@ -20,6 +20,7 @@
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE. # SOFTWARE.
import asyncio
import time import time
import queue import queue
import os import os
@ -204,7 +205,6 @@ class Network(PrintError):
self.callback_lock = threading.Lock() self.callback_lock = threading.Lock()
self.pending_sends_lock = threading.Lock() self.pending_sends_lock = threading.Lock()
self.recent_servers_lock = threading.RLock() # <- re-entrant self.recent_servers_lock = threading.RLock() # <- re-entrant
self.subscribed_addresses_lock = threading.Lock()
self.blockchains_lock = threading.Lock() self.blockchains_lock = threading.Lock()
self.pending_sends = [] self.pending_sends = []
@ -226,7 +226,6 @@ class Network(PrintError):
util.make_dir(dir_path) util.make_dir(dir_path)
# subscriptions and requests # subscriptions and requests
self.subscribed_addresses = set() # note: needs self.subscribed_addresses_lock
self.h2addr = {} self.h2addr = {}
# Requests from client we've not seen a response to # Requests from client we've not seen a response to
self.unanswered_requests = {} self.unanswered_requests = {}
@ -245,6 +244,7 @@ class Network(PrintError):
self.start_network(deserialize_server(self.default_server)[2], self.start_network(deserialize_server(self.default_server)[2],
deserialize_proxy(self.config.get('proxy'))) deserialize_proxy(self.config.get('proxy')))
self.asyncio_loop = asyncio.get_event_loop() self.asyncio_loop = asyncio.get_event_loop()
self.futures = []
def with_interface_lock(func): def with_interface_lock(func):
def func_wrapper(self, *args, **kwargs): def func_wrapper(self, *args, **kwargs):
@ -338,26 +338,6 @@ class Network(PrintError):
interface.queue_request(method, params, message_id) interface.queue_request(method, params, message_id)
return message_id return message_id
@with_interface_lock
def send_subscriptions(self):
assert self.interface
self.print_error('sending subscriptions to', self.interface.server, len(self.unanswered_requests), len(self.subscribed_addresses))
self.sub_cache.clear()
# Resend unanswered requests
requests = self.unanswered_requests.values()
self.unanswered_requests = {}
for request in requests:
message_id = self.queue_request(request[0], request[1])
self.unanswered_requests[message_id] = request
self.queue_request('server.banner', [])
self.queue_request('server.donation_address', [])
self.queue_request('server.peers.subscribe', [])
self.request_fee_estimates()
self.queue_request('blockchain.relayfee', [])
with self.subscribed_addresses_lock:
for h in self.subscribed_addresses:
self.queue_request('blockchain.scripthash.subscribe', [h])
def request_fee_estimates(self): def request_fee_estimates(self):
from .simple_config import FEE_ETA_TARGETS from .simple_config import FEE_ETA_TARGETS
self.config.requested_fee_estimates() self.config.requested_fee_estimates()
@ -578,7 +558,6 @@ class Network(PrintError):
# fixme: we don't want to close headers sub # fixme: we don't want to close headers sub
#self.close_interface(self.interface) #self.close_interface(self.interface)
self.interface = i self.interface = i
self.send_subscriptions()
self.set_status('connected') self.set_status('connected')
self.notify('updated') self.notify('updated')
self.notify('interfaces') self.notify('interfaces')
@ -683,11 +662,6 @@ class Network(PrintError):
# Copy the request method and params to the response # Copy the request method and params to the response
response['method'] = method response['method'] = method
response['params'] = params response['params'] = params
# Only once we've received a response to an addr subscription
# add it to the list; avoids double-sends on reconnection
if method == 'blockchain.scripthash.subscribe':
with self.subscribed_addresses_lock:
self.subscribed_addresses.add(params[0])
else: else:
if not response: # Closed remotely / misbehaving if not response: # Closed remotely / misbehaving
self.connection_down(interface.server) self.connection_down(interface.server)
@ -1078,6 +1052,7 @@ class Network(PrintError):
self.asyncio_loop.run_until_complete(self.gat) self.asyncio_loop.run_until_complete(self.gat)
except concurrent.futures.CancelledError: except concurrent.futures.CancelledError:
pass pass
[f.cancel() for f in self.futures]
def on_notify_header(self, interface, header_dict): def on_notify_header(self, interface, header_dict):
try: try:
@ -1212,21 +1187,6 @@ class Network(PrintError):
callback(x2) callback(x2)
return cb2 return cb2
def subscribe_to_addresses(self, addresses, callback):
hash2address = {
bitcoin.address_to_scripthash(address): address
for address in addresses}
self.h2addr.update(hash2address)
msgs = [
('blockchain.scripthash.subscribe', [x])
for x in hash2address.keys()]
self.send(msgs, self.map_scripthash_to_address(callback))
def request_address_history(self, address, callback):
h = bitcoin.address_to_scripthash(address)
self.h2addr.update({h: address})
self.send([('blockchain.scripthash.get_history', [h])], self.map_scripthash_to_address(callback))
# NOTE this method handles exceptions and a special edge case, counter to # NOTE this method handles exceptions and a special edge case, counter to
# what the other ElectrumX methods do. This is unexpected. # what the other ElectrumX methods do. This is unexpected.
def broadcast_transaction(self, transaction, callback=None): def broadcast_transaction(self, transaction, callback=None):

259
electrum/synchronizer.py

@ -22,102 +22,86 @@
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE. # SOFTWARE.
import traceback
import ssl
import asyncio
from aiorpcx import ClientSession, Request, Notification, TaskGroup
from threading import Lock from threading import Lock
import hashlib import hashlib
import concurrent.futures
# from .bitcoin import Hash, hash_encode # from .bitcoin import Hash, hash_encode
from .transaction import Transaction from .transaction import Transaction
from .util import ThreadJob, bh2u from .util import ThreadJob, bh2u, PrintError, aiosafe
from .bitcoin import address_to_scripthash
from .version import ELECTRUM_VERSION, PROTOCOL_VERSION
class Synchronizer(ThreadJob): def history_status(h):
if not h:
return None
status = ''
for tx_hash, height in h:
status += tx_hash + ':%d:' % height
return bh2u(hashlib.sha256(status.encode('ascii')).digest())
class NotificationSession(ClientSession):
def __init__(self, queue, *args, **kwargs):
super(NotificationSession, self).__init__(*args, **kwargs)
self.queue = queue
@aiosafe
async def handle_request(self, request):
if isinstance(request, Notification):
if request.method == 'blockchain.scripthash.subscribe':
args = request.args
await self.queue.put((args[0], args[1]))
class Synchronizer(PrintError):
'''The synchronizer keeps the wallet up-to-date with its set of '''The synchronizer keeps the wallet up-to-date with its set of
addresses and their transactions. It subscribes over the network addresses and their transactions. It subscribes over the network
to wallet addresses, gets the wallet to generate new addresses to wallet addresses, gets the wallet to generate new addresses
when necessary, requests the transaction history of any addresses when necessary, requests the transaction history of any addresses
we don't have the full history of, and requests binary transaction we don't have the full history of, and requests binary transaction
data of any transactions the wallet doesn't have. data of any transactions the wallet doesn't have.
External interface: __init__() and add() member functions.
''' '''
def __init__(self, wallet):
def __init__(self, wallet, network):
self.wallet = wallet self.wallet = wallet
self.network = network
self.new_addresses = set()
# Entries are (tx_hash, tx_height) tuples
self.requested_tx = {} self.requested_tx = {}
self.requested_histories = {} self.requested_histories = {}
self.requested_addrs = set() self.requested_addrs = set()
self.lock = Lock() self.scripthash_to_address = {}
# Queues
self.add_queue = asyncio.Queue()
self.status_queue = asyncio.Queue()
self.initialized = False async def send_version(self):
self.initialize() r = await self.session.send_request('server.version', [ELECTRUM_VERSION, PROTOCOL_VERSION])
def parse_response(self, response):
if response.get('error'):
self.print_error("response error:", response)
return None, None
return response['params'], response['result']
def is_up_to_date(self): def is_up_to_date(self):
return (not self.requested_tx and not self.requested_histories return (not self.requested_addrs and not self.requested_histories)
and not self.requested_addrs)
def add(self, addr):
def release(self): self.requested_addrs.add(addr)
self.network.unsubscribe(self.on_address_status) self.add_queue.put_nowait(addr)
def add(self, address): async def on_address_status(self, addr, status):
'''This can be called from the proxy or GUI threads.'''
with self.lock:
self.new_addresses.add(address)
def subscribe_to_addresses(self, addresses):
if addresses:
self.requested_addrs |= addresses
self.network.subscribe_to_addresses(addresses, self.on_address_status)
def get_status(self, h):
if not h:
return None
status = ''
for tx_hash, height in h:
status += tx_hash + ':%d:' % height
return bh2u(hashlib.sha256(status.encode('ascii')).digest())
def on_address_status(self, response):
if self.wallet.synchronizer is None and self.initialized:
return # we have been killed, this was just an orphan callback
params, result = self.parse_response(response)
if not params:
return
addr = params[0]
history = self.wallet.history.get(addr, []) history = self.wallet.history.get(addr, [])
if self.get_status(history) != result: if history_status(history) == status:
# note that at this point 'result' can be None;
# if we had a history for addr but now the server is telling us
# there is no history
if addr not in self.requested_histories:
self.requested_histories[addr] = result
self.network.request_address_history(addr, self.on_address_history)
# remove addr from list only after it is added to requested_histories
if addr in self.requested_addrs: # Notifications won't be in
self.requested_addrs.remove(addr)
def on_address_history(self, response):
if self.wallet.synchronizer is None and self.initialized:
return # we have been killed, this was just an orphan callback
params, result = self.parse_response(response)
if not params:
return return
addr = params[0] # note that at this point 'result' can be None;
try: # if we had a history for addr but now the server is telling us
server_status = self.requested_histories[addr] # there is no history
except KeyError: if addr in self.requested_histories:
# note: server_status can be None even if we asked for the history,
# so it is not sufficient to test that
self.print_error("receiving history (unsolicited)", addr, len(result))
return return
# request address history
self.requested_histories[addr] = status
h = address_to_scripthash(addr)
result = await self.session.send_request("blockchain.scripthash.get_history", [h])
self.print_error("receiving history", addr, len(result)) self.print_error("receiving history", addr, len(result))
hashes = set(map(lambda item: item['tx_hash'], result)) hashes = set(map(lambda item: item['tx_hash'], result))
hist = list(map(lambda item: (item['tx_hash'], item['height']), result)) hist = list(map(lambda item: (item['tx_hash'], item['height']), result))
@ -128,23 +112,44 @@ class Synchronizer(ThreadJob):
if len(hashes) != len(result): if len(hashes) != len(result):
self.print_error("error: server history has non-unique txids: %s"% addr) self.print_error("error: server history has non-unique txids: %s"% addr)
# Check that the status corresponds to what was announced # Check that the status corresponds to what was announced
elif self.get_status(hist) != server_status: elif history_status(hist) != status:
self.print_error("error: status mismatch: %s" % addr) self.print_error("error: status mismatch: %s" % addr)
else: else:
# Store received history # Store received history
self.wallet.receive_history_callback(addr, hist, tx_fees) self.wallet.receive_history_callback(addr, hist, tx_fees)
# Request transactions we don't have # Request transactions we don't have
self.request_missing_txs(hist) # "hist" is a list of [tx_hash, tx_height] lists
transaction_hashes = []
for tx_hash, tx_height in hist:
if tx_hash in self.requested_tx:
continue
if tx_hash in self.wallet.transactions:
continue
transaction_hashes.append(tx_hash)
self.requested_tx[tx_hash] = tx_height
for tx_hash in transaction_hashes:
await self.get_transaction(tx_hash)
# Remove request; this allows up_to_date to be True # Remove request; this allows up_to_date to be True
self.requested_histories.pop(addr) self.requested_histories.pop(addr)
def on_tx_response(self, response): async def request_missing_txs(self, hist):
if self.wallet.synchronizer is None and self.initialized: # "hist" is a list of [tx_hash, tx_height] lists
return # we have been killed, this was just an orphan callback transaction_hashes = []
params, result = self.parse_response(response) for tx_hash, tx_height in hist:
if not params: if tx_hash in self.requested_tx:
return continue
tx_hash = params[0] if tx_hash in self.wallet.transactions:
continue
transaction_hashes.append(tx_hash)
self.requested_tx[tx_hash] = tx_height
for tx_hash in transaction_hashes:
await self.get_transaction(tx_hash)
async def get_transaction(self, tx_hash):
result = await self.session.send_request('blockchain.transaction.get', [tx_hash])
tx = Transaction(result) tx = Transaction(result)
try: try:
tx.deserialize() tx.deserialize()
@ -160,54 +165,44 @@ class Synchronizer(ThreadJob):
self.print_error("received tx %s height: %d bytes: %d" % self.print_error("received tx %s height: %d bytes: %d" %
(tx_hash, tx_height, len(tx.raw))) (tx_hash, tx_height, len(tx.raw)))
# callbacks # callbacks
self.network.trigger_callback('new_transaction', tx) self.wallet.network.trigger_callback('new_transaction', tx)
if not self.requested_tx:
self.network.trigger_callback('updated') async def subscribe_to_address(self, addr):
h = address_to_scripthash(addr)
def request_missing_txs(self, hist): self.scripthash_to_address[h] = addr
# "hist" is a list of [tx_hash, tx_height] lists status = await self.session.send_request('blockchain.scripthash.subscribe', [h])
transaction_hashes = [] await self.status_queue.put((h, status))
for tx_hash, tx_height in hist: self.requested_addrs.remove(addr)
if tx_hash in self.requested_tx:
continue @aiosafe
if tx_hash in self.wallet.transactions: async def send_subscriptions(self):
continue async with TaskGroup() as group:
transaction_hashes.append(tx_hash) while True:
self.requested_tx[tx_hash] = tx_height addr = await self.add_queue.get()
await group.spawn(self.subscribe_to_address(addr))
self.network.get_transactions(transaction_hashes, self.on_tx_response)
@aiosafe
def initialize(self): async def handle_status(self):
'''Check the initial state of the wallet. Subscribe to all its async with TaskGroup() as group:
addresses, and request any transactions in its address history while True:
we don't have. h, status = await self.status_queue.get()
''' addr = self.scripthash_to_address[h]
for history in self.wallet.history.values(): await group.spawn(self.on_address_status(addr, status))
# Old electrum servers returned ['*'] when all history for
# the address was pruned. This no longer happens but may @aiosafe
# remain in old wallets. async def main(self):
if history == ['*']: conn = self.wallet.network.default_server
continue host, port, protocol = conn.split(':')
self.request_missing_txs(history) sslc = ssl.SSLContext(ssl.PROTOCOL_TLS) if protocol == 's' else None
async with NotificationSession(self.status_queue, host, int(port), ssl=sslc) as session:
if self.requested_tx: self.session = session
self.print_error("missing tx", self.requested_tx) await self.send_version()
self.subscribe_to_addresses(set(self.wallet.get_addresses())) self.wallet.synchronizer = self
self.initialized = True for addr in self.wallet.get_addresses(): self.add(addr)
while True:
def run(self): await asyncio.sleep(1)
'''Called from the network proxy thread main loop.''' self.wallet.synchronize()
# 1. Create new addresses up_to_date = self.is_up_to_date()
self.wallet.synchronize() if up_to_date != self.wallet.is_up_to_date():
self.wallet.set_up_to_date(up_to_date)
# 2. Subscribe to new addresses self.wallet.network.trigger_callback('updated')
with self.lock:
addresses = self.new_addresses
self.new_addresses = set()
self.subscribe_to_addresses(addresses)
# 3. Detect if situation has changed
up_to_date = self.is_up_to_date()
if up_to_date != self.wallet.is_up_to_date():
self.wallet.set_up_to_date(up_to_date)
self.network.trigger_callback('updated')

1
run_electrum

@ -48,6 +48,7 @@ def check_imports():
import qrcode import qrcode
import google.protobuf import google.protobuf
import jsonrpclib import jsonrpclib
import aiorpcx
except ImportError as e: except ImportError as e:
sys.exit("Error: %s. Try 'sudo pip install <module-name>'"%str(e)) sys.exit("Error: %s. Try 'sudo pip install <module-name>'"%str(e))
# the following imports are for pyinstaller # the following imports are for pyinstaller

Loading…
Cancel
Save