Browse Source

aiorpcx: remove callback based code, add session to Interface

master
Janus 7 years ago committed by SomberNight
parent
commit
8f36c9167d
No known key found for this signature in database
GPG Key ID: B33B5F232C6271E9
  1. 18
      electrum/address_synchronizer.py
  2. 5
      electrum/gui/kivy/main_window.py
  3. 4
      electrum/gui/qt/network_dialog.py
  4. 41
      electrum/interface.py
  5. 607
      electrum/network.py
  6. 36
      electrum/verifier.py
  7. 2
      electrum/version.py

18
electrum/address_synchronizer.py

@ -135,15 +135,25 @@ class AddressSynchronizer(PrintError):
# add it in case it was previously unconfirmed # add it in case it was previously unconfirmed
self.add_unverified_tx(tx_hash, tx_height) self.add_unverified_tx(tx_hash, tx_height)
def on_default_server_changed(self, evt):
for i in self.network.futures:
if i.done() and i.exception():
raise i.exception()
if not i.done():
i.cancel()
self.network.futures.clear()
self.network.futures.append(asyncio.get_event_loop().create_task(self.verifier.main()))
self.network.futures.append(asyncio.get_event_loop().create_task(self.synchronizer.send_subscriptions()))
self.network.futures.append(asyncio.get_event_loop().create_task(self.synchronizer.handle_status()))
assert self.network.interface.session is not None
self.network.futures.append(asyncio.get_event_loop().create_task(self.synchronizer.main()))
def start_threads(self, network): def start_threads(self, network):
self.network = network self.network = network
if self.network is not None: if self.network is not None:
self.verifier = SPV(self.network, self) self.verifier = SPV(self.network, self)
self.synchronizer = Synchronizer(self) self.synchronizer = Synchronizer(self)
#network.add_jobs([self.verifier]) self.network.register_callback(self.on_default_server_changed, ['default_server_changed'])
self.network.futures.append(asyncio.run_coroutine_threadsafe(self.synchronizer.send_subscriptions(), self.network.asyncio_loop))
self.network.futures.append(asyncio.run_coroutine_threadsafe(self.synchronizer.handle_status(), self.network.asyncio_loop))
self.network.futures.append(asyncio.run_coroutine_threadsafe(self.synchronizer.main(), self.network.asyncio_loop))
else: else:
self.verifier = None self.verifier = None
self.synchronizer = None self.synchronizer = None

5
electrum/gui/kivy/main_window.py

@ -15,6 +15,7 @@ from electrum.util import profiler, InvalidPassword
from electrum.plugin import run_hook from electrum.plugin import run_hook
from electrum.util import format_satoshis, format_satoshis_plain from electrum.util import format_satoshis, format_satoshis_plain
from electrum.paymentrequest import PR_UNPAID, PR_PAID, PR_UNKNOWN, PR_EXPIRED from electrum.paymentrequest import PR_UNPAID, PR_PAID, PR_UNKNOWN, PR_EXPIRED
from electrum import blockchain
from .i18n import _ from .i18n import _
from kivy.app import App from kivy.app import App
@ -114,10 +115,10 @@ class ElectrumWindow(App):
from .uix.dialogs.choice_dialog import ChoiceDialog from .uix.dialogs.choice_dialog import ChoiceDialog
chains = self.network.get_blockchains() chains = self.network.get_blockchains()
def cb(name): def cb(name):
for index, b in self.network.blockchains.items(): for index, b in blockchain.blockchains.items():
if name == b.get_name(): if name == b.get_name():
self.network.follow_chain(index) self.network.follow_chain(index)
names = [self.network.blockchains[b].get_name() for b in chains] names = [blockchain.blockchains[b].get_name() for b in chains]
if len(names) > 1: if len(names) > 1:
cur_chain = self.network.blockchain().get_name() cur_chain = self.network.blockchain().get_name()
ChoiceDialog(_('Choose your chain'), names, cur_chain, cb).open() ChoiceDialog(_('Choose your chain'), names, cur_chain, cb).open()

4
electrum/gui/qt/network_dialog.py

@ -31,7 +31,7 @@ from PyQt5.QtWidgets import *
import PyQt5.QtCore as QtCore import PyQt5.QtCore as QtCore
from electrum.i18n import _ from electrum.i18n import _
from electrum import constants from electrum import constants, blockchain
from electrum.util import print_error from electrum.util import print_error
from electrum.network import serialize_server, deserialize_server from electrum.network import serialize_server, deserialize_server
@ -103,7 +103,7 @@ class NodesListWidget(QTreeWidget):
chains = network.get_blockchains() chains = network.get_blockchains()
n_chains = len(chains) n_chains = len(chains)
for k, items in chains.items(): for k, items in chains.items():
b = network.blockchains[k] b = blockchain.blockchains[k]
name = b.get_name() name = b.get_name()
if n_chains >1: if n_chains >1:
x = QTreeWidgetItem([name + '@%d'%b.get_forkpoint(), '%d'%b.height()]) x = QTreeWidgetItem([name + '@%d'%b.get_forkpoint(), '%d'%b.height()])

41
electrum/interface.py

@ -31,6 +31,7 @@ import threading
import traceback import traceback
import aiorpcx import aiorpcx
import asyncio import asyncio
import concurrent.futures
import requests import requests
@ -43,18 +44,23 @@ from . import x509
from . import pem from . import pem
from .version import ELECTRUM_VERSION, PROTOCOL_VERSION from .version import ELECTRUM_VERSION, PROTOCOL_VERSION
from .util import NotificationSession from .util import NotificationSession
from . import blockchain
class Interface(PrintError): class Interface(PrintError):
def __init__(self, server, config_path, connecting, proxy): def __init__(self, network, server, config_path, proxy):
self.exception = None self.exception = None
self.connecting = connecting self.ready = asyncio.Future()
self.server = server self.server = server
self.host, self.port, self.protocol = self.server.split(':') self.host, self.port, self.protocol = self.server.split(':')
self.port = int(self.port) self.port = int(self.port)
self.config_path = config_path self.config_path = config_path
self.cert_path = os.path.join(self.config_path, 'certs', self.host) self.cert_path = os.path.join(self.config_path, 'certs', self.host)
self.fut = asyncio.get_event_loop().create_task(self.run()) self.fut = asyncio.get_event_loop().create_task(self.run())
self.tip_header = None
self.tip = 0
self.blockchain = None
self.network = network
if proxy: if proxy:
proxy['user'] = proxy.get('user', '') proxy['user'] = proxy.get('user', '')
if proxy['user'] == '': if proxy['user'] == '':
@ -71,7 +77,7 @@ class Interface(PrintError):
elif proxy['mode'] == "socks5": elif proxy['mode'] == "socks5":
self.proxy = aiorpcx.socks.SOCKSProxy((proxy['host'], int(proxy['port'])), aiorpcx.socks.SOCKS5, auth) self.proxy = aiorpcx.socks.SOCKSProxy((proxy['host'], int(proxy['port'])), aiorpcx.socks.SOCKS5, auth)
else: else:
raise NotImplementedError raise NotImplementedError # http proxy not available with aiorpcx
else: else:
self.proxy = None self.proxy = None
@ -128,7 +134,17 @@ class Interface(PrintError):
assert False assert False
def mark_ready(self): def mark_ready(self):
self.connecting.remove(self.server) assert self.tip_header
chain = blockchain.check_header(self.tip_header)
if not chain:
self.blockchain = blockchain.blockchains[0]
else:
self.blockchain = chain
self.print_error("set blockchain with height", self.blockchain.height())
if not self.ready.done():
self.ready.set_result(1)
async def save_certificate(self): async def save_certificate(self):
if not os.path.exists(self.cert_path): if not os.path.exists(self.cert_path):
@ -161,16 +177,25 @@ class Interface(PrintError):
return None return None
async def open_session(self, sslc, do_sleep=True, execute_after_connect=lambda: None): async def open_session(self, sslc, do_sleep=True, execute_after_connect=lambda: None):
async with NotificationSession(None, None, self.host, self.port, ssl=sslc, proxy=self.proxy) as session: q = asyncio.Queue()
async with NotificationSession(None, q, self.host, self.port, ssl=sslc, proxy=self.proxy) as session:
ver = await session.send_request('server.version', [ELECTRUM_VERSION, PROTOCOL_VERSION]) ver = await session.send_request('server.version', [ELECTRUM_VERSION, PROTOCOL_VERSION])
print(ver) self.print_error(ver, do_sleep, self.host)
connect_hook_executed = False connect_hook_executed = False
while do_sleep: while do_sleep:
if not connect_hook_executed: if not connect_hook_executed:
connect_hook_executed = True connect_hook_executed = True
res = await session.send_request('blockchain.headers.subscribe')
self.tip_header = blockchain.deserialize_header(bfh(res['hex']), res['height'])
self.tip = res['height']
execute_after_connect() execute_after_connect()
await asyncio.wait_for(session.send_request('server.ping'), 5) self.session = session
await asyncio.sleep(300) try:
new_header = await asyncio.wait_for(q.get(), 300)
self.tip_header = new_header
self.tip = new_header['block_height']
except concurrent.futures.TimeoutError:
await asyncio.wait_for(session.send_request('server.ping'), 5)
def queue_request(self, method, params, msg_id): def queue_request(self, method, params, msg_id):
pass pass

607
electrum/network.py

@ -20,14 +20,12 @@
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE. # SOFTWARE.
import asyncio import concurrent.futures
import time import time
import queue import queue
import os import os
import errno
import random import random
import re import re
import select
from collections import defaultdict from collections import defaultdict
import threading import threading
import socket import socket
@ -39,18 +37,15 @@ import dns
import dns.resolver import dns.resolver
from . import util from . import util
from .util import print_error, PrintError from .util import PrintError, print_error, bfh
from . import bitcoin
from .bitcoin import COIN from .bitcoin import COIN
from . import constants from . import constants
from . import blockchain from . import blockchain
from .version import ELECTRUM_VERSION, PROTOCOL_VERSION
from .i18n import _
from .blockchain import InvalidHeader
from .interface import Interface from .interface import Interface
import asyncio import asyncio
import concurrent.futures import concurrent.futures
from .version import PROTOCOL_VERSION
NODES_RETRY_INTERVAL = 60 NODES_RETRY_INTERVAL = 60
SERVER_RETRY_INTERVAL = 10 SERVER_RETRY_INTERVAL = 10
@ -163,7 +158,6 @@ def deserialize_server(server_str):
def serialize_server(host, port, protocol): def serialize_server(host, port, protocol):
return str(':'.join([host, port, protocol])) return str(':'.join([host, port, protocol]))
class Network(PrintError): class Network(PrintError):
"""The Network class manages a set of connections to remote electrum """The Network class manages a set of connections to remote electrum
servers, each connected socket is handled by an Interface() object. servers, each connected socket is handled by an Interface() object.
@ -183,10 +177,10 @@ class Network(PrintError):
config = {} # Do not use mutables as default values! config = {} # Do not use mutables as default values!
self.config = SimpleConfig(config) if isinstance(config, dict) else config self.config = SimpleConfig(config) if isinstance(config, dict) else config
self.num_server = 10 if not self.config.get('oneserver') else 0 self.num_server = 10 if not self.config.get('oneserver') else 0
self.blockchains = blockchain.read_blockchains(self.config) # note: needs self.blockchains_lock blockchain.blockchains = blockchain.read_blockchains(self.config) # note: needs self.blockchains_lock
self.print_error("blockchains", self.blockchains.keys()) self.print_error("blockchains", list(blockchain.blockchains.keys()))
self.blockchain_index = config.get('blockchain_index', 0) self.blockchain_index = config.get('blockchain_index', 0)
if self.blockchain_index not in self.blockchains.keys(): if self.blockchain_index not in blockchain.blockchains.keys():
self.blockchain_index = 0 self.blockchain_index = 0
# Server for addresses and transactions # Server for addresses and transactions
self.default_server = self.config.get('server', None) self.default_server = self.config.get('server', None)
@ -201,6 +195,7 @@ class Network(PrintError):
self.default_server = pick_random_server() self.default_server = pick_random_server()
# locks: if you need to take multiple ones, acquire them in the order they are defined here! # locks: if you need to take multiple ones, acquire them in the order they are defined here!
self.bhi_lock = asyncio.Lock()
self.interface_lock = threading.RLock() # <- re-entrant self.interface_lock = threading.RLock() # <- re-entrant
self.callback_lock = threading.Lock() self.callback_lock = threading.Lock()
self.pending_sends_lock = threading.Lock() self.pending_sends_lock = threading.Lock()
@ -322,29 +317,6 @@ class Network(PrintError):
def is_connecting(self): def is_connecting(self):
return self.connection_status == 'connecting' return self.connection_status == 'connecting'
@with_interface_lock
def queue_request(self, method, params, interface=None):
# If you want to queue a request on any interface it must go
# through this function so message ids are properly tracked
if interface is None:
interface = self.interface
if interface is None:
self.print_error('warning: dropping request', method, params)
return
message_id = self.message_id
self.message_id += 1
if self.debug:
self.print_error(interface.host, "-->", method, params, message_id)
interface.queue_request(method, params, message_id)
return message_id
def request_fee_estimates(self):
from .simple_config import FEE_ETA_TARGETS
self.config.requested_fee_estimates()
self.queue_request('mempool.get_fee_histogram', [])
for i in FEE_ETA_TARGETS:
self.queue_request('blockchain.estimatefee', [i])
def get_status_value(self, key): def get_status_value(self, key):
if key == 'status': if key == 'status':
value = self.connection_status value = self.connection_status
@ -415,11 +387,6 @@ class Network(PrintError):
self.start_interface(server) self.start_interface(server)
return server return server
def start_interfaces(self):
self.start_interface(self.default_server)
for i in range(self.num_server - 1):
self.start_random_interface()
def set_proxy(self, proxy): def set_proxy(self, proxy):
self.proxy = proxy self.proxy = proxy
# Store these somewhere so we can un-monkey-patch # Store these somewhere so we can un-monkey-patch
@ -475,7 +442,6 @@ class Network(PrintError):
self.disconnected_servers = set([]) # note: needs self.interface_lock self.disconnected_servers = set([]) # note: needs self.interface_lock
self.protocol = protocol self.protocol = protocol
self.set_proxy(proxy) self.set_proxy(proxy)
self.start_interfaces()
@with_interface_lock @with_interface_lock
def stop_network(self): def stop_network(self):
@ -558,6 +524,7 @@ class Network(PrintError):
# fixme: we don't want to close headers sub # fixme: we don't want to close headers sub
#self.close_interface(self.interface) #self.close_interface(self.interface)
self.interface = i self.interface = i
self.trigger_callback('default_server_changed')
self.set_status('connected') self.set_status('connected')
self.notify('updated') self.notify('updated')
self.notify('interfaces') self.notify('interfaces')
@ -638,91 +605,6 @@ class Network(PrintError):
""" hashable index for subscriptions and cache""" """ hashable index for subscriptions and cache"""
return str(method) + (':' + str(params[0]) if params else '') return str(method) + (':' + str(params[0]) if params else '')
def process_responses(self, interface):
responses = interface.get_responses()
for request, response in responses:
if request:
method, params, message_id = request
k = self.get_index(method, params)
# client requests go through self.send() with a
# callback, are only sent to the current interface,
# and are placed in the unanswered_requests dictionary
client_req = self.unanswered_requests.pop(message_id, None)
if client_req:
if interface != self.interface:
# we probably changed the current interface
# in the meantime; drop this.
return
callbacks = [client_req[2]]
else:
# fixme: will only work for subscriptions
k = self.get_index(method, params)
callbacks = list(self.subscriptions.get(k, []))
# Copy the request method and params to the response
response['method'] = method
response['params'] = params
else:
if not response: # Closed remotely / misbehaving
self.connection_down(interface.server)
break
# Rewrite response shape to match subscription request response
method = response.get('method')
params = response.get('params')
k = self.get_index(method, params)
if method == 'blockchain.headers.subscribe':
response['result'] = params[0]
response['params'] = []
elif method == 'blockchain.scripthash.subscribe':
response['params'] = [params[0]] # addr
response['result'] = params[1]
callbacks = list(self.subscriptions.get(k, []))
# update cache if it's a subscription
if method.endswith('.subscribe'):
with self.interface_lock:
self.sub_cache[k] = response
# Response is now in canonical form
self.process_response(interface, response, callbacks)
def send(self, messages, callback):
'''Messages is a list of (method, params) tuples'''
messages = list(messages)
with self.pending_sends_lock:
self.pending_sends.append((messages, callback))
@with_interface_lock
def process_pending_sends(self):
# Requests needs connectivity. If we don't have an interface,
# we cannot process them.
if not self.interface:
return
with self.pending_sends_lock:
sends = self.pending_sends
self.pending_sends = []
for messages, callback in sends:
for method, params in messages:
r = None
if method.endswith('.subscribe'):
k = self.get_index(method, params)
# add callback to list
l = list(self.subscriptions.get(k, []))
if callback not in l:
l.append(callback)
with self.callback_lock:
self.subscriptions[k] = l
# check cached response for subscriptions
r = self.sub_cache.get(k)
if r is not None:
self.print_error("cache hit", k)
callback(r)
else:
message_id = self.queue_request(method, params)
self.unanswered_requests[message_id] = method, params, callback
def unsubscribe(self, callback): def unsubscribe(self, callback):
'''Unsubscribe a callback to free object references to enable GC.''' '''Unsubscribe a callback to free object references to enable GC.'''
# Note: we can't unsubscribe from the server, so if we receive # Note: we can't unsubscribe from the server, so if we receive
@ -744,297 +626,37 @@ class Network(PrintError):
self.close_interface(self.interfaces[server]) self.close_interface(self.interfaces[server])
self.notify('interfaces') self.notify('interfaces')
with self.blockchains_lock: with self.blockchains_lock:
for b in self.blockchains.values(): for b in blockchain.blockchains.values():
if b.catch_up == server: if b.catch_up == server:
b.catch_up = None b.catch_up = None
@util.aiosafe
async def new_interface(self, server): async def new_interface(self, server):
# todo: get tip first, then decide which checkpoint to use. # todo: get tip first, then decide which checkpoint to use.
self.add_recent_server(server) self.add_recent_server(server)
interface = Interface(server, self.config.path, self.connecting, self.proxy)
interface.blockchain = None
interface.tip_header = None
interface.tip = 0
interface.mode = 'default'
interface.request = None
with self.interface_lock:
self.interfaces[server] = interface
# server.version should be the first message
params = [ELECTRUM_VERSION, PROTOCOL_VERSION]
self.queue_request('server.version', params, interface)
self.queue_request('blockchain.headers.subscribe', [True], interface)
if server == self.default_server:
self.switch_to_interface(server)
#self.notify('interfaces')
def maintain_sockets(self):
'''Socket maintenance.'''
# Responses to connection attempts?
while not self.socket_queue.empty():
server = self.socket_queue.get()
if server in self.connecting:
self.connecting.remove(server)
if socket:
self.new_interface(server)
else:
self.connection_down(server)
# Send pings and shut down stale interfaces
# must use copy of values
with self.interface_lock:
interfaces = list(self.interfaces.values())
for interface in interfaces:
if interface.has_timed_out():
self.connection_down(interface.server)
elif interface.ping_required():
self.queue_request('server.ping', [], interface)
now = time.time()
# nodes
with self.interface_lock:
if len(self.interfaces) + len(self.connecting) < self.num_server:
self.start_random_interface()
if now - self.nodes_retry_time > NODES_RETRY_INTERVAL:
self.print_error('network: retrying connections')
self.disconnected_servers = set([])
self.nodes_retry_time = now
# main interface
with self.interface_lock:
if not self.is_connected():
if self.auto_connect:
if not self.is_connecting():
self.switch_to_random_interface()
else:
if self.default_server in self.disconnected_servers:
if now - self.server_retry_time > SERVER_RETRY_INTERVAL:
self.disconnected_servers.remove(self.default_server)
self.server_retry_time = now
else:
self.switch_to_interface(self.default_server)
else:
if self.config.is_fee_estimates_update_required():
self.request_fee_estimates()
def request_chunk(self, interface, index): interface = Interface(self, server, self.config.path, self.proxy)
if index in self.requested_chunks: try:
return await asyncio.wait_for(interface.ready, 5)
interface.print_error("requesting chunk %d" % index) except BaseException as e:
self.requested_chunks.add(index) import traceback
height = index * 2016 traceback.print_exc()
self.queue_request('blockchain.block.headers', [height, 2016], self.print_error(interface.server, "couldn't launch because", str(e), str(type(e)))
interface)
def on_block_headers(self, interface, response):
'''Handle receiving a chunk of block headers'''
error = response.get('error')
result = response.get('result')
params = response.get('params')
blockchain = interface.blockchain
if result is None or params is None or error is not None:
interface.print_error(error or 'bad response')
return
# Ignore unsolicited chunks
height = params[0]
index = height // 2016
if index * 2016 != height or index not in self.requested_chunks:
interface.print_error("received chunk %d (unsolicited)" % index)
return
else:
interface.print_error("received chunk %d" % index)
self.requested_chunks.remove(index)
hexdata = result['hex']
connect = blockchain.connect_chunk(index, hexdata)
if not connect:
self.connection_down(interface.server)
return
if index >= len(blockchain.checkpoints):
# If not finished, get the next chunk
if blockchain.height() < interface.tip:
self.request_chunk(interface, index+1)
else:
interface.mode = 'default'
interface.print_error('catch up done', blockchain.height())
blockchain.catch_up = None
else:
# the verifier must have asked for this chunk
pass
self.notify('updated')
def on_get_header(self, interface, response):
'''Handle receiving a single block header'''
header = response.get('result')
if not header:
interface.print_error(response)
self.connection_down(interface.server)
return
height = header.get('block_height')
#interface.print_error('got header', height, blockchain.hash_header(header))
if interface.request != height:
interface.print_error("unsolicited header",interface.request, height)
self.connection_down(interface.server) self.connection_down(interface.server)
return return
chain = blockchain.check_header(header) finally:
if interface.mode == 'backward': self.connecting.remove(server)
can_connect = blockchain.can_connect(header)
if can_connect and can_connect.catch_up is None:
interface.mode = 'catch_up'
interface.blockchain = can_connect
interface.blockchain.save_header(header)
next_height = height + 1
interface.blockchain.catch_up = interface.server
elif chain:
# FIXME should await "initial chunk download".
# binary search will NOT do the correct thing if we don't yet
# have all headers up to the fork height
interface.print_error("binary search")
interface.mode = 'binary'
interface.blockchain = chain
interface.good = height
next_height = (interface.bad + interface.good) // 2
assert next_height >= self.max_checkpoint(), (interface.bad, interface.good)
else:
if height == 0:
self.connection_down(interface.server)
next_height = None
else:
interface.bad = height
interface.bad_header = header
delta = interface.tip - height
next_height = max(self.max_checkpoint(), interface.tip - 2 * delta)
if height == next_height:
self.connection_down(interface.server)
next_height = None
elif interface.mode == 'binary':
if chain:
interface.good = height
interface.blockchain = chain
else:
interface.bad = height
interface.bad_header = header
if interface.bad != interface.good + 1:
next_height = (interface.bad + interface.good) // 2
assert next_height >= self.max_checkpoint()
elif not interface.blockchain.can_connect(interface.bad_header, check_height=False):
self.connection_down(interface.server)
next_height = None
else:
branch = self.blockchains.get(interface.bad)
if branch is not None:
if branch.check_header(interface.bad_header):
interface.print_error('joining chain', interface.bad)
next_height = None
elif branch.parent().check_header(header):
interface.print_error('reorg', interface.bad, interface.tip)
interface.blockchain = branch.parent()
next_height = interface.bad
else:
interface.print_error('forkpoint conflicts with existing fork', branch.path())
branch.write(b'', 0)
branch.save_header(interface.bad_header)
interface.mode = 'catch_up'
interface.blockchain = branch
next_height = interface.bad + 1
interface.blockchain.catch_up = interface.server
else:
bh = interface.blockchain.height()
next_height = None
if bh > interface.good:
if not interface.blockchain.check_header(interface.bad_header):
b = interface.blockchain.fork(interface.bad_header)
with self.blockchains_lock:
self.blockchains[interface.bad] = b
interface.blockchain = b
interface.print_error("new chain", b.forkpoint)
interface.mode = 'catch_up'
maybe_next_height = interface.bad + 1
if maybe_next_height <= interface.tip:
next_height = maybe_next_height
interface.blockchain.catch_up = interface.server
else:
assert bh == interface.good
if interface.blockchain.catch_up is None and bh < interface.tip:
interface.print_error("catching up from %d"% (bh + 1))
interface.mode = 'catch_up'
next_height = bh + 1
interface.blockchain.catch_up = interface.server
self.notify('updated')
elif interface.mode == 'catch_up':
can_connect = interface.blockchain.can_connect(header)
if can_connect:
interface.blockchain.save_header(header)
next_height = height + 1 if height < interface.tip else None
else:
# go back
interface.print_error("cannot connect", height)
interface.mode = 'backward'
interface.bad = height
interface.bad_header = header
next_height = height - 1
if next_height is None:
# exit catch_up state
interface.print_error('catch up done', interface.blockchain.height())
interface.blockchain.catch_up = None
self.switch_lagging_interface()
self.notify('updated')
else:
raise Exception(interface.mode)
# If not finished, get the next header
if next_height is not None:
if next_height < 0:
self.connection_down(interface.server)
next_height = None
elif interface.mode == 'catch_up' and interface.tip > next_height + 50:
self.request_chunk(interface, next_height // 2016)
else:
self.request_header(interface, next_height)
if next_height is None:
interface.mode = 'default'
interface.request = None
self.notify('updated')
# refresh network dialog
self.notify('interfaces')
def maintain_requests(self):
with self.interface_lock: with self.interface_lock:
interfaces = list(self.interfaces.values()) self.interfaces[server] = interface
for interface in interfaces:
if interface.request and time.time() - interface.request_time > 20:
interface.print_error("blockchain request timed out")
self.connection_down(interface.server)
continue
def wait_on_sockets(self): if server == self.default_server:
# Python docs say Windows doesn't like empty selects. self.switch_to_interface(server)
# Sleep to prevent busy looping
if not self.interfaces: #self.notify('interfaces')
time.sleep(0.1)
return
with self.interface_lock:
interfaces = list(self.interfaces.values())
rin = [i for i in interfaces]
win = [i for i in interfaces if i.num_requests()]
try:
rout, wout, xout = select.select(rin, win, [], 0.1)
except socket.error as e:
if e.errno == errno.EINTR:
return
raise
assert not xout
for interface in wout:
interface.send_requests()
for interface in rout:
self.process_responses(interface)
def init_headers_file(self): def init_headers_file(self):
b = self.blockchains[0] b = blockchain.blockchains[0]
filename = b.path() filename = b.path()
length = 80 * len(constants.net.CHECKPOINTS) * 2016 length = 80 * len(constants.net.CHECKPOINTS) * 2016
if not os.path.exists(filename) or os.path.getsize(filename) < length: if not os.path.exists(filename) or os.path.getsize(filename) < length:
@ -1052,73 +674,18 @@ class Network(PrintError):
self.asyncio_loop.run_until_complete(self.gat) self.asyncio_loop.run_until_complete(self.gat)
except concurrent.futures.CancelledError: except concurrent.futures.CancelledError:
pass pass
[f.cancel() for f in self.futures]
def on_notify_header(self, interface, header_dict):
try:
header_hex, height = header_dict['hex'], header_dict['height']
except KeyError:
# no point in keeping this connection without headers sub
self.connection_down(interface.server)
return
try:
header = blockchain.deserialize_header(util.bfh(header_hex), height)
except InvalidHeader:
self.connection_down(interface.server)
return
#interface.print_error('notified of header', height, blockchain.hash_header(header))
if height < self.max_checkpoint():
self.connection_down(interface.server)
return
interface.tip_header = header
interface.tip = height
if interface.mode != 'default':
return
b = blockchain.check_header(header)
if b:
interface.blockchain = b
self.switch_lagging_interface()
self.notify('updated')
self.notify('interfaces')
return
b = blockchain.can_connect(header)
if b:
interface.blockchain = b
b.save_header(header)
self.switch_lagging_interface()
self.notify('updated')
self.notify('interfaces')
return
with self.blockchains_lock:
tip = max([x.height() for x in self.blockchains.values()])
if tip >=0:
interface.mode = 'backward'
interface.bad = height
interface.bad_header = header
self.request_header(interface, min(tip +1, height - 1))
else:
chain = self.blockchains[0]
if chain.catch_up is None:
chain.catch_up = interface
interface.mode = 'catch_up'
interface.blockchain = chain
with self.blockchains_lock:
self.print_error("switching to catchup mode", tip, self.blockchains)
self.request_header(interface, 0)
else:
self.print_error("chain already catching up with", chain.catch_up.server)
@with_interface_lock @with_interface_lock
def blockchain(self): def blockchain(self):
if self.interface and self.interface.blockchain is not None: if self.interface and self.interface.blockchain is not None:
self.blockchain_index = self.interface.blockchain.forkpoint self.blockchain_index = self.interface.blockchain.forkpoint
return self.blockchains[self.blockchain_index] return blockchain.blockchains[self.blockchain_index]
@with_interface_lock @with_interface_lock
def get_blockchains(self): def get_blockchains(self):
out = {} out = {}
with self.blockchains_lock: with self.blockchains_lock:
blockchain_items = list(self.blockchains.items()) blockchain_items = list(blockchain.blockchains.items())
for k, b in blockchain_items: for k, b in blockchain_items:
r = list(filter(lambda i: i.blockchain==b, list(self.interfaces.values()))) r = list(filter(lambda i: i.blockchain==b, list(self.interfaces.values())))
if r: if r:
@ -1126,14 +693,14 @@ class Network(PrintError):
return out return out
def follow_chain(self, index): def follow_chain(self, index):
blockchain = self.blockchains.get(index) bc = blockchain.blockchains.get(index)
if blockchain: if bc:
self.blockchain_index = index self.blockchain_index = index
self.config.set_key('blockchain_index', index) self.config.set_key('blockchain_index', index)
with self.interface_lock: with self.interface_lock:
interfaces = list(self.interfaces.values()) interfaces = list(self.interfaces.values())
for i in interfaces: for i in interfaces:
if i.blockchain == blockchain: if i.blockchain == bc:
self.switch_to_interface(i.server) self.switch_to_interface(i.server)
break break
else: else:
@ -1149,119 +716,6 @@ class Network(PrintError):
def get_local_height(self): def get_local_height(self):
return self.blockchain().height() return self.blockchain().height()
@staticmethod
def __wait_for(it):
"""Wait for the result of calling lambda `it`."""
q = queue.Queue()
it(q.put)
try:
result = q.get(block=True, timeout=30)
except queue.Empty:
raise util.TimeoutException(_('Server did not answer'))
if result.get('error'):
raise Exception(result.get('error'))
return result.get('result')
@staticmethod
def __with_default_synchronous_callback(invocation, callback):
""" Use this method if you want to make the network request
synchronous. """
if not callback:
return Network.__wait_for(invocation)
invocation(callback)
def request_header(self, interface, height):
self.queue_request('blockchain.block.get_header', [height], interface)
interface.request = height
interface.req_time = time.time()
def map_scripthash_to_address(self, callback):
def cb2(x):
x2 = x.copy()
p = x2.pop('params')
addr = self.h2addr[p[0]]
x2['params'] = [addr]
callback(x2)
return cb2
# NOTE this method handles exceptions and a special edge case, counter to
# what the other ElectrumX methods do. This is unexpected.
def broadcast_transaction(self, transaction, callback=None):
command = 'blockchain.transaction.broadcast'
invocation = lambda c: self.send([(command, [str(transaction)])], c)
if callback:
invocation(callback)
return
try:
out = Network.__wait_for(invocation)
except BaseException as e:
return False, "error: " + str(e)
if out != transaction.txid():
return False, "error: " + out
return True, out
def get_history_for_scripthash(self, hash, callback=None):
command = 'blockchain.scripthash.get_history'
invocation = lambda c: self.send([(command, [hash])], c)
return Network.__with_default_synchronous_callback(invocation, callback)
def subscribe_to_headers(self, callback=None):
command = 'blockchain.headers.subscribe'
invocation = lambda c: self.send([(command, [True])], c)
return Network.__with_default_synchronous_callback(invocation, callback)
def subscribe_to_address(self, address, callback=None):
command = 'blockchain.address.subscribe'
invocation = lambda c: self.send([(command, [address])], c)
return Network.__with_default_synchronous_callback(invocation, callback)
def get_merkle_for_transaction(self, tx_hash, tx_height, callback=None):
command = 'blockchain.transaction.get_merkle'
invocation = lambda c: self.send([(command, [tx_hash, tx_height])], c)
return Network.__with_default_synchronous_callback(invocation, callback)
def subscribe_to_scripthash(self, scripthash, callback=None):
command = 'blockchain.scripthash.subscribe'
invocation = lambda c: self.send([(command, [scripthash])], c)
return Network.__with_default_synchronous_callback(invocation, callback)
def get_transaction(self, transaction_hash, callback=None):
command = 'blockchain.transaction.get'
invocation = lambda c: self.send([(command, [transaction_hash])], c)
return Network.__with_default_synchronous_callback(invocation, callback)
def get_transactions(self, transaction_hashes, callback=None):
command = 'blockchain.transaction.get'
messages = [(command, [tx_hash]) for tx_hash in transaction_hashes]
invocation = lambda c: self.send(messages, c)
return Network.__with_default_synchronous_callback(invocation, callback)
def listunspent_for_scripthash(self, scripthash, callback=None):
command = 'blockchain.scripthash.listunspent'
invocation = lambda c: self.send([(command, [scripthash])], c)
return Network.__with_default_synchronous_callback(invocation, callback)
def get_balance_for_scripthash(self, scripthash, callback=None):
command = 'blockchain.scripthash.get_balance'
invocation = lambda c: self.send([(command, [scripthash])], c)
return Network.__with_default_synchronous_callback(invocation, callback)
def export_checkpoints(self, path): def export_checkpoints(self, path):
# run manually from the console to generate checkpoints # run manually from the console to generate checkpoints
cp = self.blockchain().get_checkpoints() cp = self.blockchain().get_checkpoints()
@ -1310,3 +764,4 @@ class Network(PrintError):
if changed: if changed:
self.notify('updated') self.notify('updated')
await asyncio.sleep(1) await asyncio.sleep(1)

36
electrum/verifier.py

@ -21,9 +21,10 @@
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE. # SOFTWARE.
import asyncio
from typing import Sequence, Optional from typing import Sequence, Optional
from .util import ThreadJob, bh2u, VerifiedTxInfo from .util import ThreadJob, bh2u, VerifiedTxInfo, aiosafe
from .bitcoin import Hash, hash_decode, hash_encode from .bitcoin import Hash, hash_decode, hash_encode
from .transaction import Transaction from .transaction import Transaction
from .blockchain import hash_header from .blockchain import hash_header
@ -45,17 +46,21 @@ class SPV(ThreadJob):
self.merkle_roots = {} # txid -> merkle root (once it has been verified) self.merkle_roots = {} # txid -> merkle root (once it has been verified)
self.requested_merkle = set() # txid set of pending requests self.requested_merkle = set() # txid set of pending requests
def run(self): @aiosafe
interface = self.network.interface async def main(self):
if not interface: while True:
return await self.run()
await asyncio.sleep(1)
blockchain = interface.blockchain async def run(self):
blockchain = self.network.blockchain()
if not blockchain: if not blockchain:
self.print_error("no blockchain")
return return
local_height = self.network.get_local_height() local_height = self.network.get_local_height()
unverified = self.wallet.get_unverified_txs() unverified = self.wallet.get_unverified_txs()
#print("verifier run", len(unverified))
for tx_hash, tx_height in unverified.items(): for tx_hash, tx_height in unverified.items():
# do not request merkle branch before headers are available # do not request merkle branch before headers are available
if tx_height <= 0 or tx_height > local_height: if tx_height <= 0 or tx_height > local_height:
@ -65,31 +70,26 @@ class SPV(ThreadJob):
if header is None: if header is None:
index = tx_height // 2016 index = tx_height // 2016
if index < len(blockchain.checkpoints): if index < len(blockchain.checkpoints):
self.network.request_chunk(interface, index) # FIXME disabled until async block header download has been merged
pass #await self.network.request_chunk(tx_height, None)
elif (tx_hash not in self.requested_merkle elif (tx_hash not in self.requested_merkle
and tx_hash not in self.merkle_roots): and tx_hash not in self.merkle_roots):
self.network.get_merkle_for_transaction(
tx_hash,
tx_height,
self.verify_merkle)
self.print_error('requested merkle', tx_hash) self.print_error('requested merkle', tx_hash)
self.requested_merkle.add(tx_hash) self.requested_merkle.add(tx_hash)
self.verify_merkle(tx_hash, await self.network.get_merkle_for_transaction(
tx_hash,
tx_height
))
if self.network.blockchain() != self.blockchain: if self.network.blockchain() != self.blockchain:
self.blockchain = self.network.blockchain() self.blockchain = self.network.blockchain()
self.undo_verifications() self.undo_verifications()
def verify_merkle(self, response): def verify_merkle(self, tx_hash, merkle):
if self.wallet.verifier is None: if self.wallet.verifier is None:
return # we have been killed, this was just an orphan callback return # we have been killed, this was just an orphan callback
if response.get('error'):
self.print_error('received an error:', response)
return
params = response['params']
merkle = response['result']
# Verify the hash of the server-provided merkle branch to a # Verify the hash of the server-provided merkle branch to a
# transaction matches the merkle root of its block # transaction matches the merkle root of its block
tx_hash = params[0]
tx_height = merkle.get('block_height') tx_height = merkle.get('block_height')
pos = merkle.get('pos') pos = merkle.get('pos')
merkle_branch = merkle.get('merkle') merkle_branch = merkle.get('merkle')

2
electrum/version.py

@ -1,7 +1,7 @@
ELECTRUM_VERSION = '3.2.3' # version of the client package ELECTRUM_VERSION = '3.2.3' # version of the client package
APK_VERSION = '3.2.3.1' # read by buildozer.spec APK_VERSION = '3.2.3.1' # read by buildozer.spec
PROTOCOL_VERSION = '1.2' # protocol version requested PROTOCOL_VERSION = '1.4' # protocol version requested
# The hash of the mnemonic seed must begin with this # The hash of the mnemonic seed must begin with this
SEED_PREFIX = '01' # Standard wallet SEED_PREFIX = '01' # Standard wallet

Loading…
Cancel
Save