Browse Source

initial async wallet sync working

master
AdamISZ 8 years ago
parent
commit
67529015cf
No known key found for this signature in database
GPG Key ID: B3AE09F1E9A3197A
  1. 146
      jmclient/jmclient/blockchaininterface.py
  2. 732
      jmclient/jmclient/electruminterface.py

146
jmclient/jmclient/blockchaininterface.py

@ -59,20 +59,85 @@ class BlockchainInterface(object):
"""Finds the unspent transaction outputs belonging to this wallet,
sets wallet.unspent """
def add_tx_notify(self, txd, unconfirmfun, confirmfun, notifyaddr,
wallet_name=None, timeoutfun=None, spentfun=None, txid_flag=True,
n=0, c=1, vb=None):
"""Given a deserialized transaction txd,
callback functions for broadcast and confirmation of the transaction,
an address to import, and a callback function for timeout, set up
a polling loop to check for events on the transaction. Also optionally set
to trigger "confirmed" callback on number of confirmations c. Also checks
for spending (if spentfun is not None) of the outpoint n.
If txid_flag is True, we create a watcher loop on the txid (hence only
really usable in a segwit context, and only on fully formed transactions),
else we create a watcher loop on the output set of the transaction (taken
from the outs field of the txd).
"""
if not vb:
vb = get_p2pk_vbyte()
one_addr_imported = False
for outs in txd['outs']:
addr = btc.script_to_address(outs['script'], vb)
if self.rpc('getaccount', [addr]) != '':
one_addr_imported = True
break
if not one_addr_imported:
self.rpc('importaddress', [notifyaddr, 'joinmarket-notify', False])
#Warning! In case of txid_flag false, this is *not* a valid txid,
#but only a hash of an incomplete transaction serialization; but,
#it still suffices as a unique key for tracking, in this case.
txid = btc.txhash(btc.serialize(txd))
if not txid_flag:
tx_output_set = set([(sv['script'], sv['value']) for sv in txd['outs']])
loop = task.LoopingCall(self.outputs_watcher, wallet_name, notifyaddr,
tx_output_set, unconfirmfun, confirmfun,
timeoutfun)
log.debug("Created watcher loop for address: " + notifyaddr)
loopkey = notifyaddr
else:
loop = task.LoopingCall(self.tx_watcher, txd, unconfirmfun, confirmfun,
spentfun, c, n)
log.debug("Created watcher loop for txid: " + txid)
loopkey = txid
self.tx_watcher_loops[loopkey] = [loop, False, False, False]
#Hardcoded polling interval, but in any case it can be very short.
loop.start(5.0)
#TODO Hardcoded very long timeout interval
reactor.callLater(7200, self.tx_timeout, txd, loopkey, timeoutfun)
def tx_timeout(self, txd, loopkey, timeoutfun):
#TODO: 'loopkey' is an address not a txid for Makers, handle that.
if not timeoutfun:
return
if not txid in self.tx_watcher_loops:
return
if not self.tx_watcher_loops[loopkey][1]:
#Not confirmed after 2 hours; give up
log.info("Timed out waiting for confirmation of: " + str(loopkey))
self.tx_watcher_loops[loopkey][0].stop()
timeoutfun(txd, loopkey)
@abc.abstractmethod
def add_tx_notify(self,
txd,
unconfirmfun,
confirmfun,
notifyaddr,
timeoutfun=None,
vb=None):
def outputs_watcher(self, wallet_name, notifyaddr, tx_output_set,
unconfirmfun, confirmfun, timeoutfun):
"""Given a key for the watcher loop (notifyaddr), a wallet name (account),
a set of outputs, and unconfirm, confirm and timeout callbacks,
check to see if a transaction matching that output set has appeared in
the wallet. Call the callbacks and update the watcher loop state.
End the loop when the confirmation has been seen (no spent monitoring here).
"""
Invokes unconfirmfun and confirmfun when tx is seen on the network
If timeoutfun not None, called with boolean argument that tells
whether this is the timeout for unconfirmed or confirmed
timeout for uncontirmed = False
pass
@abc.abstractmethod
def tx_watcher(self, txd, unconfirmfun, confirmfun, spentfun, c, n):
"""Called at a polling interval, checks if the given deserialized
transaction (which must be fully signed) is (a) broadcast, (b) confirmed
and (c) spent from at index n, and notifies confirmation if number
of confs = c.
TODO: Deal with conflicts correctly. Here just abandons monitoring.
"""
pass
@abc.abstractmethod
def pushtx(self, txhex):
@ -547,65 +612,6 @@ class BitcoinCoreInterface(BlockchainInterface):
et = time.time()
log.debug('bitcoind sync_unspent took ' + str((et - st)) + 'sec')
def add_tx_notify(self, txd, unconfirmfun, confirmfun, notifyaddr,
wallet_name=None, timeoutfun=None, spentfun=None, txid_flag=True,
n=0, c=1, vb=None):
"""Given a deserialized transaction txd,
callback functions for broadcast and confirmation of the transaction,
an address to import, and a callback function for timeout, set up
a polling loop to check for events on the transaction. Also optionally set
to trigger "confirmed" callback on number of confirmations c. Also checks
for spending (if spentfun is not None) of the outpoint n.
If txid_flag is True, we create a watcher loop on the txid (hence only
really usable in a segwit context, and only on fully formed transactions),
else we create a watcher loop on the output set of the transaction (taken
from the outs field of the txd).
"""
if not vb:
vb = get_p2pk_vbyte()
one_addr_imported = False
for outs in txd['outs']:
addr = btc.script_to_address(outs['script'], vb)
if self.rpc('getaccount', [addr]) != '':
one_addr_imported = True
break
if not one_addr_imported:
self.rpc('importaddress', [notifyaddr, 'joinmarket-notify', False])
#Warning! In case of txid_flag false, this is *not* a valid txid,
#but only a hash of an incomplete transaction serialization; but,
#it still suffices as a unique key for tracking, in this case.
txid = btc.txhash(btc.serialize(txd))
if not txid_flag:
tx_output_set = set([(sv['script'], sv['value']) for sv in txd['outs']])
loop = task.LoopingCall(self.outputs_watcher, wallet_name, notifyaddr,
tx_output_set, unconfirmfun, confirmfun,
timeoutfun)
log.debug("Created watcher loop for address: " + notifyaddr)
loopkey = notifyaddr
else:
loop = task.LoopingCall(self.tx_watcher, txd, unconfirmfun, confirmfun,
spentfun, c, n)
log.debug("Created watcher loop for txid: " + txid)
loopkey = txid
self.tx_watcher_loops[loopkey] = [loop, False, False, False]
#Hardcoded polling interval, but in any case it can be very short.
loop.start(5.0)
#TODO Hardcoded very long timeout interval
reactor.callLater(7200, self.tx_timeout, txd, loopkey, timeoutfun)
def tx_timeout(self, txd, loopkey, timeoutfun):
#TODO: 'loopkey' is an address not a txid for Makers, handle that.
if not timeoutfun:
return
if not txid in self.tx_watcher_loops:
return
if not self.tx_watcher_loops[loopkey][1]:
#Not confirmed after 2 hours; give up
log.info("Timed out waiting for confirmation of: " + str(loopkey))
self.tx_watcher_loops[loopkey][0].stop()
timeoutfun(txd, loopkey)
def get_deser_from_gettransaction(self, rpcretval):
"""Get full transaction deserialization from a call
to `gettransaction`

732
jmclient/jmclient/electruminterface.py

@ -7,6 +7,11 @@ import random
import socket
import threading
import time
import sys
from twisted.python.log import startLogging
from twisted.internet.protocol import ClientFactory, Protocol
from twisted.protocols.basic import LineReceiver
from twisted.internet import reactor, task, defer
from .blockchaininterface import BlockchainInterface, is_index_ahead_of_cache
from .configure import get_p2pk_vbyte
from .support import get_log
@ -14,233 +19,448 @@ from .support import get_log
log = get_log()
# Default server list from electrum client
# https://github.com/spesmilo/electrum/blob/753a28b452dca1023fbde548469c36a34555dc95/lib/network.py
DEFAULT_ELECTRUM_SERVER_LIST = [
'erbium1.sytes.net:50001',
'ecdsa.net:50001',
'electrum0.electricnewyear.net:50001',
'VPS.hsmiths.com:50001',
'ELECTRUM.jdubya.info:50001',
'electrum.no-ip.org:50001',
'us.electrum.be:50001',
'bitcoins.sk:50001',
'electrum.petrkr.net:50001',
'electrum.dragonzone.net:50001',
'Electrum.hsmiths.com:8080',
'electrum3.hachre.de:50001',
'elec.luggs.co:80',
'btc.smsys.me:110',
'electrum.online:50001',
]
# https://github.com/spesmilo/electrum, file https://github.com/spesmilo/electrum/blob/7dbd612d5dad13cd6f1c0df32534a578bad331ad/lib/servers.json
DEFAULT_PORTS = {'t':'50001', 's':'50002'}
class ElectrumInterface(BlockchainInterface):
DEFAULT_SERVERS = {
"E-X.not.fyi": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.1"
},
"ELECTRUMX.not.fyi": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.1"
},
"ELEX01.blackpole.online": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.1"
},
"VPS.hsmiths.com": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.1"
},
"bitcoin.freedomnode.com": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.1"
},
"btc.smsys.me": {
"pruning": "-",
"s": "995",
"version": "1.1"
},
"currentlane.lovebitco.in": {
"pruning": "-",
"t": "50001",
"version": "1.1"
},
"daedalus.bauerj.eu": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.1"
},
"de01.hamster.science": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.1"
},
"ecdsa.net": {
"pruning": "-",
"s": "110",
"t": "50001",
"version": "1.1"
},
"elec.luggs.co": {
"pruning": "-",
"s": "443",
"version": "1.1"
},
"electrum.akinbo.org": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.1"
},
"electrum.antumbra.se": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.1"
},
"electrum.be": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.1"
},
"electrum.coinucopia.io": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.1"
},
"electrum.cutie.ga": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.1"
},
"electrum.festivaldelhumor.org": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.1"
},
"electrum.hsmiths.com": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.1"
},
"electrum.qtornado.com": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.1"
},
"electrum.vom-stausee.de": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.1"
},
"electrum3.hachre.de": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.1"
},
"electrumx.bot.nu": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.1"
},
"electrumx.westeurope.cloudapp.azure.com": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.1"
},
"elx01.knas.systems": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.1"
},
"ex-btc.server-on.net": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.1"
},
"helicarrier.bauerj.eu": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.1"
},
"mooo.not.fyi": {
"pruning": "-",
"s": "50012",
"t": "50011",
"version": "1.1"
},
"ndnd.selfhost.eu": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.1"
},
"node.arihanc.com": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.1"
},
"node.xbt.eu": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.1"
},
"node1.volatilevictory.com": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.1"
},
"noserver4u.de": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.1"
},
"qmebr.spdns.org": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.1"
},
"raspi.hsmiths.com": {
"pruning": "-",
"s": "51002",
"t": "51001",
"version": "1.1"
},
"s2.noip.pl": {
"pruning": "-",
"s": "50102",
"version": "1.1"
},
"s5.noip.pl": {
"pruning": "-",
"s": "50105",
"version": "1.1"
},
"songbird.bauerj.eu": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.1"
},
"us.electrum.be": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.1"
},
"us01.hamster.science": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.1"
}
}
class ElectrumConn(threading.Thread):
def __init__(self, electrum_server):
threading.Thread.__init__(self)
self.daemon = True
self.msg_id = 0
self.RetQueue = Queue.Queue()
try:
self.s = socket.create_connection((electrum_server.split(':')[0],
int(electrum_server.split(':')[1])))
except Exception as e:
log.error("Error connecting to electrum server. "
"Try again to connect to a random server or set a "
"server in the config.")
os._exit(1)
self.ping()
def run(self):
while True:
all_data = None
while True:
data = self.s.recv(1024)
if data is None:
continue
if all_data is None:
all_data = data
else:
all_data = all_data + data
if '\n' in all_data:
break
data_json = json.loads(all_data[:-1].decode())
self.RetQueue.put(data_json)
def ping(self):
log.debug('sending server ping')
self.send_json({'id':0,'method':'server.version','params':[]})
t = threading.Timer(60, self.ping)
t.daemon = True
t.start()
def send_json(self, json_data):
data = json.dumps(json_data).encode()
self.s.send(data + b'\n')
def call_server_method(self, method, params=[]):
self.msg_id = self.msg_id + 1
current_id = self.msg_id
method_dict = {
'id': current_id,
'method': method,
'params': params
}
self.send_json(method_dict)
while True:
ret_data = self.RetQueue.get()
if ret_data.get('id', None) == current_id:
return ret_data
else:
log.debug(json.dumps(ret_data))
def set_electrum_testnet():
global DEFAULT_PORTS, DEFAULT_SERVERS
DEFAULT_PORTS = {'t':'51001', 's':'51002'}
DEFAULT_SERVERS = {
'testnetnode.arihanc.com': {'t':'51001', 's':'51002'},
'testnet1.bauerj.eu': {'t':'51001', 's':'51002'},
'14.3.140.101': {'t':'51001', 's':'51002'},
'testnet.hsmiths.com': {'t':'53011', 's':'53012'},
'electrum.akinbo.org': {'t':'51001', 's':'51002'},
'ELEX05.blackpole.online': {'t':'52011', 's':'52002'},}
def __init__(self, testnet=False, electrum_server=None):
super(ElectrumInterface, self).__init__()
class TxElectrumClientProtocol(LineReceiver):
#map deferreds to msgids to correctly link response with request
deferreds = {}
delimiter = "\n"
def __init__(self, factory):
self.factory = factory
def connectionMade(self):
print('connection to Electrum made')
self.msg_id = 0
self.factory.bci.sync_addresses(self.factory.bci.wallet)
self.start_ping()
self.call_server_method('blockchain.numblocks.subscribe')
def start_ping(self):
pingloop = task.LoopingCall(self.ping)
pingloop.start(60.0)
def ping(self):
print('sending server ping')
#We dont bother tracking response to this;
#just for keeping connection active
self.call_server_method('server.version')
def send_json(self, json_data):
data = json.dumps(json_data).encode()
self.sendLine(data)
def call_server_method(self, method, params=[]):
self.msg_id = self.msg_id + 1
current_id = self.msg_id
self.deferreds[current_id] = defer.Deferred()
method_dict = {
'id': current_id,
'method': method,
'params': params
}
self.send_json(method_dict)
return self.deferreds[current_id]
def lineReceived(self, line):
try:
parsed = json.loads(line)
msgid = parsed['id']
linked_deferred = self.deferreds[msgid]
except:
log.debug("Ignored response from Electrum server: " + str(line))
return
linked_deferred.callback(parsed)
class TxElectrumClientProtocolFactory(ClientFactory):
def __init__(self, bci):
self.bci = bci
def buildProtocol(self,addr):
self.client = TxElectrumClientProtocol(self)
return self.client
def clientConnectionLost(self,connector,reason):
print('connection lost')
def clientConnectionFailed(self,connector,reason):
print('connection failed')
class ElectrumInterface(BlockchainInterface):
BATCH_SIZE = 8
def __init__(self, testnet=False, electrum_server=None):
if testnet:
raise Exception(NotImplemented)
if electrum_server is None:
electrum_server = random.choice(DEFAULT_ELECTRUM_SERVER_LIST)
self.server_domain = electrum_server.split(':')[0]
self.last_sync_unspent = 0
self.electrum_conn = self.ElectrumConn(electrum_server)
self.electrum_conn.start()
# used to hold open server conn
self.electrum_conn.call_server_method('blockchain.numblocks.subscribe')
set_electrum_testnet()
self.server, self.port = self.get_server(electrum_server)
self.factory = TxElectrumClientProtocolFactory(self)
reactor.connectTCP(self.server, self.port, self.factory)
def sync_wallet(self, wallet, restart_cb=False):
self.wallet = wallet
#wipe the temporary cache of address histories
self.temp_addr_history = {}
startLogging(sys.stdout)
reactor.run()
def get_server(self, electrum_server):
if not electrum_server:
electrum_server = random.choice(DEFAULT_SERVERS.keys())
s = electrum_server
p = int(DEFAULT_SERVERS[electrum_server]['t'])
print('Trying to connect to Electrum server: ' + str(electrum_server))
return (s, p)
def get_from_electrum(self, method, params=[]):
params = [params] if type(params) is not list else params
return self.electrum_conn.call_server_method(method, params)
return self.factory.client.call_server_method(method, params)
def sync_addresses(self, wallet):
log.debug("downloading wallet history from electrum server")
for mix_depth in range(wallet.max_mix_depth):
def sync_addresses(self, wallet, restart_cb=None):
log.debug("downloading wallet history from Electrum server ...")
for mixdepth in range(wallet.max_mix_depth):
for forchange in [0, 1]:
unused_addr_count = 0
last_used_addr = ''
while (unused_addr_count < wallet.gaplimit or not is_index_ahead_of_cache(wallet, mix_depth, forchange)):
addr = wallet.get_new_addr(mix_depth, forchange)
addr_hist_info = self.get_from_electrum('blockchain.address.get_history', addr)
if len(addr_hist_info['result']) > 0:
last_used_addr = addr
unused_addr_count = 0
else:
unused_addr_count += 1
if last_used_addr == '':
wallet.index[mix_depth][forchange] = 0
self.synchronize_batch(wallet, mixdepth, forchange, 0)
def synchronize_batch(self, wallet, mixdepth, forchange, start_index):
log.debug("Syncing address batch, m, fc, i: " + ",".join(
[str(x) for x in [mixdepth, forchange, start_index]]))
if mixdepth not in self.temp_addr_history:
self.temp_addr_history[mixdepth] = {}
if forchange not in self.temp_addr_history[mixdepth]:
self.temp_addr_history[mixdepth][forchange] = {"finished": False}
for i in range(start_index, start_index + self.BATCH_SIZE):
#get_new_addr is OK here, as guaranteed to be sequential *on this branch*
a = wallet.get_new_addr(mixdepth, forchange)
d = self.get_from_electrum('blockchain.address.get_history', a)
d.addCallback(self.process_address_history, wallet,
mixdepth, forchange, i, a, start_index)
#makes sure entries in temporary address history are ready
#to be accessed.
if i not in self.temp_addr_history[mixdepth][forchange]:
self.temp_addr_history[mixdepth][forchange][i] = {'synced': False,
'addr': a,
'used': False}
log.info("added tah entry: " + str(self.temp_addr_history[mixdepth][forchange][i]))
def process_address_history(self, history, wallet, mixdepth, forchange, i,
addr, start_index):
"""Given the history data for an address from Electrum, update the current view
of the wallet's usage at mixdepth mixdepth and account forchange, address addr at
index i. Once all addresses from index start_index to start_index + self.BATCH_SIZE
have been thus updated, trigger either continuation to the next batch, or, if
conditions are fulfilled, end syncing for this (mixdepth, forchange) branch, and
if all such branches are finished, proceed to the sync_unspent step.
"""
tah = self.temp_addr_history[mixdepth][forchange]
if len(history['result']) > 0:
tah[i]['used'] = True
tah[i]['synced'] = True
#Having updated this specific record, check if the entire batch from start_index
#has been synchronized
if all([tah[i]['synced'] for i in range(start_index, start_index + self.BATCH_SIZE)]):
#check if unused goes back as much as gaplimit; if so, end, else, continue
#to next batch
if all([tah[i]['used'] is False for i in range(
start_index+self.BATCH_SIZE-wallet.gaplimit,
start_index+self.BATCH_SIZE)]):
last_used_addr = None
#to find last used, note that it may be in the *previous* batch;
#may as well just search from the start, since it takes no time.
for i in range(start_index + self.BATCH_SIZE):
if tah[i]['used']:
last_used_addr = tah[i]['addr']
if last_used_addr:
wallet.index[mixdepth][forchange] = wallet.addr_cache[last_used_addr][2] + 1
else:
wallet.index[mix_depth][forchange] = wallet.addr_cache[last_used_addr][2] + 1
wallet.index[mixdepth][forchange] = 0
tah["finished"] = True
#check if all branches are finished to trigger next stage of sync.
addr_sync_complete = True
for m in range(wallet.max_mix_depth):
for fc in [0, 1]:
if not self.temp_addr_history[m][fc]["finished"]:
addr_sync_complete = False
if addr_sync_complete:
self.sync_unspent(wallet)
else:
#continue search forwards on this branch
self.synchronize_batch(wallet, mixdepth, forchange, start_index + self.BATCH_SIZE)
def sync_unspent(self, wallet):
# finds utxos in the wallet
st = time.time()
# dont refresh unspent dict more often than 5 minutes
rate_limit_time = 5 * 60
if st - self.last_sync_unspent < rate_limit_time:
log.debug('electrum sync_unspent() happened too recently (%dsec), skipping' % (st - self.last_sync_unspent))
return
wallet.unspent = {}
addrs = wallet.addr_cache.keys()
#Prepare list of all used addresses
addrs = []
for m in range(wallet.max_mix_depth):
for fc in [0, 1]:
branch_list = []
for k, v in self.temp_addr_history[m][fc].iteritems():
if k == "finished":
continue
if v["used"]:
branch_list.append(v["addr"])
addrs.extend(branch_list)
if len(addrs) == 0:
log.debug('no tx used')
return
self.listunspent_calls = 0
for a in addrs:
unspent_info = self.get_from_electrum('blockchain.address.listunspent', a)
res = unspent_info['result']
for u in res:
wallet.unspent[str(u['tx_hash']) + ':' + str(u['tx_pos'])] = {'address': a, 'value': int(u['value'])}
for u in wallet.spent_utxos:
wallet.unspent.pop(u, None)
self.last_sync_unspent = time.time()
log.debug('electrum sync_unspent took ' + str((self.last_sync_unspent - st)) + 'sec')
def add_tx_notify(self, txd, unconfirmfun, confirmfun, notifyaddr):
unconfirm_timeout = 10 * 60 # seconds
unconfirm_poll_period = 5
confirm_timeout = 2 * 60 * 60
confirm_poll_period = 5 * 60
class NotifyThread(threading.Thread):
def __init__(self, blockchaininterface, txd, unconfirmfun, confirmfun):
threading.Thread.__init__(self)
self.daemon = True
self.blockchaininterface = blockchaininterface
self.unconfirmfun = unconfirmfun
self.confirmfun = confirmfun
self.tx_output_set = set([(sv['script'], sv['value']) for sv in txd['outs']])
self.output_addresses = [btc.script_to_address(scrval[0], get_p2pk_vbyte()) for scrval in self.tx_output_set]
log.debug('txoutset=' + pprint.pformat(self.tx_output_set))
log.debug('outaddrs=' + ','.join(self.output_addresses))
def run(self):
st = int(time.time())
unconfirmed_txid = None
unconfirmed_txhex = None
while not unconfirmed_txid:
time.sleep(unconfirm_poll_period)
if int(time.time()) - st > unconfirm_timeout:
log.debug('checking for unconfirmed tx timed out')
return
shared_txid = None
for a in self.output_addresses:
unconftx = self.blockchaininterface.get_from_electrum('blockchain.address.get_mempool', a).get('result')
unconftxs = set([str(t['tx_hash']) for t in unconftx])
if not shared_txid:
shared_txid = unconftxs
else:
shared_txid = shared_txid.intersection(unconftxs)
log.debug('sharedtxid = ' + str(shared_txid))
if len(shared_txid) == 0:
continue
data = []
for txid in shared_txid:
txdata = str(self.blockchaininterface.get_from_electrum('blockchain.transaction.get', txid).get('result'))
data.append({'hex':txdata,'id':txid})
for txdata in data:
txhex = txdata['hex']
outs = set([(sv['script'], sv['value']) for sv in btc.deserialize(txhex)['outs']])
log.debug('unconfirm query outs = ' + str(outs))
if outs == self.tx_output_set:
unconfirmed_txid = txdata['id']
unconfirmed_txhex = txhex
break
self.unconfirmfun(btc.deserialize(unconfirmed_txhex), unconfirmed_txid)
st = int(time.time())
confirmed_txid = None
confirmed_txhex = None
while not confirmed_txid:
time.sleep(confirm_poll_period)
if int(time.time()) - st > confirm_timeout:
log.debug('checking for confirmed tx timed out')
return
shared_txid = None
for a in self.output_addresses:
conftx = self.blockchaininterface.get_from_electrum('blockchain.address.listunspent', a).get('result')
conftxs = set([str(t['tx_hash']) for t in conftx])
if not shared_txid:
shared_txid = conftxs
else:
shared_txid = shared_txid.intersection(conftxs)
log.debug('sharedtxid = ' + str(shared_txid))
if len(shared_txid) == 0:
continue
data = []
for txid in shared_txid:
txdata = str(self.blockchaininterface.get_from_electrum('blockchain.transaction.get', txid).get('result'))
data.append({'hex':txdata,'id':txid})
for txdata in data:
txhex = txdata['hex']
outs = set([(sv['script'], sv['value']) for sv in btc.deserialize(txhex)['outs']])
log.debug('confirm query outs = ' + str(outs))
if outs == self.tx_output_set:
confirmed_txid = txdata['id']
confirmed_txhex = txhex
break
self.confirmfun(btc.deserialize(confirmed_txhex), confirmed_txid, 1)
NotifyThread(self, txd, unconfirmfun, confirmfun).start()
d = self.get_from_electrum('blockchain.address.listunspent', a)
d.addCallback(self.process_listunspent_data, wallet, a, len(addrs))
def process_listunspent_data(self, unspent_info, wallet, address, n):
self.listunspent_calls += 1
res = unspent_info['result']
for u in res:
wallet.unspent[str(u['tx_hash']) + ':' + str(
u['tx_pos'])] = {'address': address, 'value': int(u['value'])}
if self.listunspent_calls == n:
for u in wallet.spent_utxos:
wallet.unspent.pop(u, None)
reactor.stop()
def pushtx(self, txhex):
brcst_res = self.get_from_electrum('blockchain.transaction.broadcast', txhex)
@ -288,3 +508,99 @@ class ElectrumInterface(BlockchainInterface):
fee_per_kb_sat = int(float(fee) * 100000000)
return fee_per_kb_sat
def outputs_watcher(self, wallet_name, notifyaddr, tx_output_set,
unconfirmfun, confirmfun, timeoutfun):
"""Given a key for the watcher loop (notifyaddr), a wallet name (account),
a set of outputs, and unconfirm, confirm and timeout callbacks,
check to see if a transaction matching that output set has appeared in
the wallet. Call the callbacks and update the watcher loop state.
End the loop when the confirmation has been seen (no spent monitoring here).
"""
wl = self.tx_watcher_loops[notifyaddr]
print('txoutset=' + pprint.pformat(tx_output_set))
unconftx = self.get_from_electrum('blockchain.address.get_mempool',
notifyaddr).get('result')
unconftxs = set([str(t['tx_hash']) for t in unconftx])
if len(unconftxs):
txdatas = []
for txid in unconftxs:
txdatas.append(str(self.get_from_electrum('blockchain.transaction.get',
txid).get('result')))
unconfirmed_txid = None
for txdata in txdatas:
txhex = txdata['hex']
outs = set([(sv['script'], sv['value']) for sv in btc.deserialize(
txhex)['outs']])
print('unconfirm query outs = ' + str(outs))
if outs == tx_output_set:
unconfirmed_txid = txdata['id']
unconfirmed_txhex = txhex
break
#call unconf callback if it was found in the mempool
if unconfirmed_txid and not wl[1]:
print("Tx: " + str(real_txid) + " seen on network.")
unconfirmfun(txd, real_txid)
wl[1] = True
return
conftx = self.get_from_electrum('blockchain.address.listunspent',
notifyaddr).get('result')
conftxs = set([str(t['tx_hash']) for t in conftx])
if len(conftxs):
txdatas = []
for txid in conftxs:
txdata = str(self.get_from_electrum('blockchain.transaction.get',
txid).get('result'))
txdatas.append({'hex':txdata,'id':txid})
confirmed_txid = None
for txdata in txdatas:
txhex = txdata['hex']
outs = set([(sv['script'], sv['value']) for sv in btc.deserialize(
txhex)['outs']])
print('confirm query outs = ' + str(outs))
if outs == tx_output_set:
confirmed_txid = txdata['id']
confirmed_txhex = txhex
break
if confirmed_txid and not wl[2]:
confirmfun(btc.deserialize(confirmed_txhex), confirmed_txid, 1)
wl[2] = True
wl[0].stop()
return
def tx_watcher(self, txd, unconfirmfun, confirmfun, spentfun, c, n):
"""Called at a polling interval, checks if the given deserialized
transaction (which must be fully signed) is (a) broadcast, (b) confirmed
and (c) spent from at index n, and notifies confirmation if number
of confs = c.
TODO: Deal with conflicts correctly. Here just abandons monitoring.
"""
txid = btc.txhash(btc.serialize(txd))
wl = self.tx_watcher_loops[txid]
txdata = str(self.get_from_electrum('blockchain.transaction.get',
txid).get('result'))
txdatas.append({'hex':txdata,'id':txid})
if "confirmations" not in res:
print("Malformed gettx result: " + str(res))
return
if not wl[1] and res["confirmations"] == 0:
print("Tx: " + str(txid) + " seen on network.")
unconfirmfun(txd, txid)
wl[1] = True
return
if not wl[2] and res["confirmations"] > 0:
print("Tx: " + str(txid) + " has " + str(
res["confirmations"]) + " confirmations.")
confirmfun(txd, txid, res["confirmations"])
if c <= res["confirmations"]:
wl[2] = True
#Note we do not stop the monitoring loop when
#confirmations occur, since we are also monitoring for spending.
return
if res["confirmations"] < 0:
print("Tx: " + str(txid) + " has a conflict. Abandoning.")
wl[0].stop()
return
if not spentfun or wl[3]:
return

Loading…
Cancel
Save