diff --git a/jmclient/jmclient/blockchaininterface.py b/jmclient/jmclient/blockchaininterface.py index 6cb8fec..8f7dfd9 100644 --- a/jmclient/jmclient/blockchaininterface.py +++ b/jmclient/jmclient/blockchaininterface.py @@ -59,20 +59,85 @@ class BlockchainInterface(object): """Finds the unspent transaction outputs belonging to this wallet, sets wallet.unspent """ + def add_tx_notify(self, txd, unconfirmfun, confirmfun, notifyaddr, + wallet_name=None, timeoutfun=None, spentfun=None, txid_flag=True, + n=0, c=1, vb=None): + """Given a deserialized transaction txd, + callback functions for broadcast and confirmation of the transaction, + an address to import, and a callback function for timeout, set up + a polling loop to check for events on the transaction. Also optionally set + to trigger "confirmed" callback on number of confirmations c. Also checks + for spending (if spentfun is not None) of the outpoint n. + If txid_flag is True, we create a watcher loop on the txid (hence only + really usable in a segwit context, and only on fully formed transactions), + else we create a watcher loop on the output set of the transaction (taken + from the outs field of the txd). + """ + if not vb: + vb = get_p2pk_vbyte() + one_addr_imported = False + for outs in txd['outs']: + addr = btc.script_to_address(outs['script'], vb) + if self.rpc('getaccount', [addr]) != '': + one_addr_imported = True + break + if not one_addr_imported: + self.rpc('importaddress', [notifyaddr, 'joinmarket-notify', False]) + + #Warning! In case of txid_flag false, this is *not* a valid txid, + #but only a hash of an incomplete transaction serialization; but, + #it still suffices as a unique key for tracking, in this case. + txid = btc.txhash(btc.serialize(txd)) + if not txid_flag: + tx_output_set = set([(sv['script'], sv['value']) for sv in txd['outs']]) + loop = task.LoopingCall(self.outputs_watcher, wallet_name, notifyaddr, + tx_output_set, unconfirmfun, confirmfun, + timeoutfun) + log.debug("Created watcher loop for address: " + notifyaddr) + loopkey = notifyaddr + else: + loop = task.LoopingCall(self.tx_watcher, txd, unconfirmfun, confirmfun, + spentfun, c, n) + log.debug("Created watcher loop for txid: " + txid) + loopkey = txid + self.tx_watcher_loops[loopkey] = [loop, False, False, False] + #Hardcoded polling interval, but in any case it can be very short. + loop.start(5.0) + #TODO Hardcoded very long timeout interval + reactor.callLater(7200, self.tx_timeout, txd, loopkey, timeoutfun) + + def tx_timeout(self, txd, loopkey, timeoutfun): + #TODO: 'loopkey' is an address not a txid for Makers, handle that. + if not timeoutfun: + return + if not txid in self.tx_watcher_loops: + return + if not self.tx_watcher_loops[loopkey][1]: + #Not confirmed after 2 hours; give up + log.info("Timed out waiting for confirmation of: " + str(loopkey)) + self.tx_watcher_loops[loopkey][0].stop() + timeoutfun(txd, loopkey) + @abc.abstractmethod - def add_tx_notify(self, - txd, - unconfirmfun, - confirmfun, - notifyaddr, - timeoutfun=None, - vb=None): + def outputs_watcher(self, wallet_name, notifyaddr, tx_output_set, + unconfirmfun, confirmfun, timeoutfun): + """Given a key for the watcher loop (notifyaddr), a wallet name (account), + a set of outputs, and unconfirm, confirm and timeout callbacks, + check to see if a transaction matching that output set has appeared in + the wallet. Call the callbacks and update the watcher loop state. + End the loop when the confirmation has been seen (no spent monitoring here). """ - Invokes unconfirmfun and confirmfun when tx is seen on the network - If timeoutfun not None, called with boolean argument that tells - whether this is the timeout for unconfirmed or confirmed - timeout for uncontirmed = False + pass + + @abc.abstractmethod + def tx_watcher(self, txd, unconfirmfun, confirmfun, spentfun, c, n): + """Called at a polling interval, checks if the given deserialized + transaction (which must be fully signed) is (a) broadcast, (b) confirmed + and (c) spent from at index n, and notifies confirmation if number + of confs = c. + TODO: Deal with conflicts correctly. Here just abandons monitoring. """ + pass @abc.abstractmethod def pushtx(self, txhex): @@ -547,65 +612,6 @@ class BitcoinCoreInterface(BlockchainInterface): et = time.time() log.debug('bitcoind sync_unspent took ' + str((et - st)) + 'sec') - def add_tx_notify(self, txd, unconfirmfun, confirmfun, notifyaddr, - wallet_name=None, timeoutfun=None, spentfun=None, txid_flag=True, - n=0, c=1, vb=None): - """Given a deserialized transaction txd, - callback functions for broadcast and confirmation of the transaction, - an address to import, and a callback function for timeout, set up - a polling loop to check for events on the transaction. Also optionally set - to trigger "confirmed" callback on number of confirmations c. Also checks - for spending (if spentfun is not None) of the outpoint n. - If txid_flag is True, we create a watcher loop on the txid (hence only - really usable in a segwit context, and only on fully formed transactions), - else we create a watcher loop on the output set of the transaction (taken - from the outs field of the txd). - """ - if not vb: - vb = get_p2pk_vbyte() - one_addr_imported = False - for outs in txd['outs']: - addr = btc.script_to_address(outs['script'], vb) - if self.rpc('getaccount', [addr]) != '': - one_addr_imported = True - break - if not one_addr_imported: - self.rpc('importaddress', [notifyaddr, 'joinmarket-notify', False]) - - #Warning! In case of txid_flag false, this is *not* a valid txid, - #but only a hash of an incomplete transaction serialization; but, - #it still suffices as a unique key for tracking, in this case. - txid = btc.txhash(btc.serialize(txd)) - if not txid_flag: - tx_output_set = set([(sv['script'], sv['value']) for sv in txd['outs']]) - loop = task.LoopingCall(self.outputs_watcher, wallet_name, notifyaddr, - tx_output_set, unconfirmfun, confirmfun, - timeoutfun) - log.debug("Created watcher loop for address: " + notifyaddr) - loopkey = notifyaddr - else: - loop = task.LoopingCall(self.tx_watcher, txd, unconfirmfun, confirmfun, - spentfun, c, n) - log.debug("Created watcher loop for txid: " + txid) - loopkey = txid - self.tx_watcher_loops[loopkey] = [loop, False, False, False] - #Hardcoded polling interval, but in any case it can be very short. - loop.start(5.0) - #TODO Hardcoded very long timeout interval - reactor.callLater(7200, self.tx_timeout, txd, loopkey, timeoutfun) - - def tx_timeout(self, txd, loopkey, timeoutfun): - #TODO: 'loopkey' is an address not a txid for Makers, handle that. - if not timeoutfun: - return - if not txid in self.tx_watcher_loops: - return - if not self.tx_watcher_loops[loopkey][1]: - #Not confirmed after 2 hours; give up - log.info("Timed out waiting for confirmation of: " + str(loopkey)) - self.tx_watcher_loops[loopkey][0].stop() - timeoutfun(txd, loopkey) - def get_deser_from_gettransaction(self, rpcretval): """Get full transaction deserialization from a call to `gettransaction` diff --git a/jmclient/jmclient/electruminterface.py b/jmclient/jmclient/electruminterface.py index 7da6c8f..a711c66 100644 --- a/jmclient/jmclient/electruminterface.py +++ b/jmclient/jmclient/electruminterface.py @@ -7,6 +7,11 @@ import random import socket import threading import time +import sys +from twisted.python.log import startLogging +from twisted.internet.protocol import ClientFactory, Protocol +from twisted.protocols.basic import LineReceiver +from twisted.internet import reactor, task, defer from .blockchaininterface import BlockchainInterface, is_index_ahead_of_cache from .configure import get_p2pk_vbyte from .support import get_log @@ -14,233 +19,448 @@ from .support import get_log log = get_log() # Default server list from electrum client -# https://github.com/spesmilo/electrum/blob/753a28b452dca1023fbde548469c36a34555dc95/lib/network.py -DEFAULT_ELECTRUM_SERVER_LIST = [ - 'erbium1.sytes.net:50001', - 'ecdsa.net:50001', - 'electrum0.electricnewyear.net:50001', - 'VPS.hsmiths.com:50001', - 'ELECTRUM.jdubya.info:50001', - 'electrum.no-ip.org:50001', - 'us.electrum.be:50001', - 'bitcoins.sk:50001', - 'electrum.petrkr.net:50001', - 'electrum.dragonzone.net:50001', - 'Electrum.hsmiths.com:8080', - 'electrum3.hachre.de:50001', - 'elec.luggs.co:80', - 'btc.smsys.me:110', - 'electrum.online:50001', -] +# https://github.com/spesmilo/electrum, file https://github.com/spesmilo/electrum/blob/7dbd612d5dad13cd6f1c0df32534a578bad331ad/lib/servers.json +DEFAULT_PORTS = {'t':'50001', 's':'50002'} -class ElectrumInterface(BlockchainInterface): +DEFAULT_SERVERS = { + "E-X.not.fyi": { + "pruning": "-", + "s": "50002", + "t": "50001", + "version": "1.1" + }, + "ELECTRUMX.not.fyi": { + "pruning": "-", + "s": "50002", + "t": "50001", + "version": "1.1" + }, + "ELEX01.blackpole.online": { + "pruning": "-", + "s": "50002", + "t": "50001", + "version": "1.1" + }, + "VPS.hsmiths.com": { + "pruning": "-", + "s": "50002", + "t": "50001", + "version": "1.1" + }, + "bitcoin.freedomnode.com": { + "pruning": "-", + "s": "50002", + "t": "50001", + "version": "1.1" + }, + "btc.smsys.me": { + "pruning": "-", + "s": "995", + "version": "1.1" + }, + "currentlane.lovebitco.in": { + "pruning": "-", + "t": "50001", + "version": "1.1" + }, + "daedalus.bauerj.eu": { + "pruning": "-", + "s": "50002", + "t": "50001", + "version": "1.1" + }, + "de01.hamster.science": { + "pruning": "-", + "s": "50002", + "t": "50001", + "version": "1.1" + }, + "ecdsa.net": { + "pruning": "-", + "s": "110", + "t": "50001", + "version": "1.1" + }, + "elec.luggs.co": { + "pruning": "-", + "s": "443", + "version": "1.1" + }, + "electrum.akinbo.org": { + "pruning": "-", + "s": "50002", + "t": "50001", + "version": "1.1" + }, + "electrum.antumbra.se": { + "pruning": "-", + "s": "50002", + "t": "50001", + "version": "1.1" + }, + "electrum.be": { + "pruning": "-", + "s": "50002", + "t": "50001", + "version": "1.1" + }, + "electrum.coinucopia.io": { + "pruning": "-", + "s": "50002", + "t": "50001", + "version": "1.1" + }, + "electrum.cutie.ga": { + "pruning": "-", + "s": "50002", + "t": "50001", + "version": "1.1" + }, + "electrum.festivaldelhumor.org": { + "pruning": "-", + "s": "50002", + "t": "50001", + "version": "1.1" + }, + "electrum.hsmiths.com": { + "pruning": "-", + "s": "50002", + "t": "50001", + "version": "1.1" + }, + "electrum.qtornado.com": { + "pruning": "-", + "s": "50002", + "t": "50001", + "version": "1.1" + }, + "electrum.vom-stausee.de": { + "pruning": "-", + "s": "50002", + "t": "50001", + "version": "1.1" + }, + "electrum3.hachre.de": { + "pruning": "-", + "s": "50002", + "t": "50001", + "version": "1.1" + }, + "electrumx.bot.nu": { + "pruning": "-", + "s": "50002", + "t": "50001", + "version": "1.1" + }, + "electrumx.westeurope.cloudapp.azure.com": { + "pruning": "-", + "s": "50002", + "t": "50001", + "version": "1.1" + }, + "elx01.knas.systems": { + "pruning": "-", + "s": "50002", + "t": "50001", + "version": "1.1" + }, + "ex-btc.server-on.net": { + "pruning": "-", + "s": "50002", + "t": "50001", + "version": "1.1" + }, + "helicarrier.bauerj.eu": { + "pruning": "-", + "s": "50002", + "t": "50001", + "version": "1.1" + }, + "mooo.not.fyi": { + "pruning": "-", + "s": "50012", + "t": "50011", + "version": "1.1" + }, + "ndnd.selfhost.eu": { + "pruning": "-", + "s": "50002", + "t": "50001", + "version": "1.1" + }, + "node.arihanc.com": { + "pruning": "-", + "s": "50002", + "t": "50001", + "version": "1.1" + }, + "node.xbt.eu": { + "pruning": "-", + "s": "50002", + "t": "50001", + "version": "1.1" + }, + "node1.volatilevictory.com": { + "pruning": "-", + "s": "50002", + "t": "50001", + "version": "1.1" + }, + "noserver4u.de": { + "pruning": "-", + "s": "50002", + "t": "50001", + "version": "1.1" + }, + "qmebr.spdns.org": { + "pruning": "-", + "s": "50002", + "t": "50001", + "version": "1.1" + }, + "raspi.hsmiths.com": { + "pruning": "-", + "s": "51002", + "t": "51001", + "version": "1.1" + }, + "s2.noip.pl": { + "pruning": "-", + "s": "50102", + "version": "1.1" + }, + "s5.noip.pl": { + "pruning": "-", + "s": "50105", + "version": "1.1" + }, + "songbird.bauerj.eu": { + "pruning": "-", + "s": "50002", + "t": "50001", + "version": "1.1" + }, + "us.electrum.be": { + "pruning": "-", + "s": "50002", + "t": "50001", + "version": "1.1" + }, + "us01.hamster.science": { + "pruning": "-", + "s": "50002", + "t": "50001", + "version": "1.1" + } +} - class ElectrumConn(threading.Thread): - - def __init__(self, electrum_server): - threading.Thread.__init__(self) - self.daemon = True - self.msg_id = 0 - self.RetQueue = Queue.Queue() - try: - self.s = socket.create_connection((electrum_server.split(':')[0], - int(electrum_server.split(':')[1]))) - except Exception as e: - log.error("Error connecting to electrum server. " - "Try again to connect to a random server or set a " - "server in the config.") - os._exit(1) - self.ping() - - def run(self): - while True: - all_data = None - while True: - data = self.s.recv(1024) - if data is None: - continue - if all_data is None: - all_data = data - else: - all_data = all_data + data - if '\n' in all_data: - break - data_json = json.loads(all_data[:-1].decode()) - self.RetQueue.put(data_json) - - def ping(self): - log.debug('sending server ping') - self.send_json({'id':0,'method':'server.version','params':[]}) - t = threading.Timer(60, self.ping) - t.daemon = True - t.start() - - def send_json(self, json_data): - data = json.dumps(json_data).encode() - self.s.send(data + b'\n') - - def call_server_method(self, method, params=[]): - self.msg_id = self.msg_id + 1 - current_id = self.msg_id - method_dict = { - 'id': current_id, - 'method': method, - 'params': params - } - self.send_json(method_dict) - while True: - ret_data = self.RetQueue.get() - if ret_data.get('id', None) == current_id: - return ret_data - else: - log.debug(json.dumps(ret_data)) +def set_electrum_testnet(): + global DEFAULT_PORTS, DEFAULT_SERVERS + DEFAULT_PORTS = {'t':'51001', 's':'51002'} + DEFAULT_SERVERS = { + 'testnetnode.arihanc.com': {'t':'51001', 's':'51002'}, + 'testnet1.bauerj.eu': {'t':'51001', 's':'51002'}, + '14.3.140.101': {'t':'51001', 's':'51002'}, + 'testnet.hsmiths.com': {'t':'53011', 's':'53012'}, + 'electrum.akinbo.org': {'t':'51001', 's':'51002'}, + 'ELEX05.blackpole.online': {'t':'52011', 's':'52002'},} - def __init__(self, testnet=False, electrum_server=None): - super(ElectrumInterface, self).__init__() +class TxElectrumClientProtocol(LineReceiver): + #map deferreds to msgids to correctly link response with request + deferreds = {} + delimiter = "\n" + + def __init__(self, factory): + self.factory = factory + + def connectionMade(self): + print('connection to Electrum made') + self.msg_id = 0 + self.factory.bci.sync_addresses(self.factory.bci.wallet) + self.start_ping() + self.call_server_method('blockchain.numblocks.subscribe') + + def start_ping(self): + pingloop = task.LoopingCall(self.ping) + pingloop.start(60.0) + + def ping(self): + print('sending server ping') + #We dont bother tracking response to this; + #just for keeping connection active + self.call_server_method('server.version') + def send_json(self, json_data): + data = json.dumps(json_data).encode() + self.sendLine(data) + + def call_server_method(self, method, params=[]): + self.msg_id = self.msg_id + 1 + current_id = self.msg_id + self.deferreds[current_id] = defer.Deferred() + method_dict = { + 'id': current_id, + 'method': method, + 'params': params + } + self.send_json(method_dict) + return self.deferreds[current_id] + + def lineReceived(self, line): + try: + parsed = json.loads(line) + msgid = parsed['id'] + linked_deferred = self.deferreds[msgid] + except: + log.debug("Ignored response from Electrum server: " + str(line)) + return + linked_deferred.callback(parsed) + +class TxElectrumClientProtocolFactory(ClientFactory): + + def __init__(self, bci): + self.bci = bci + def buildProtocol(self,addr): + self.client = TxElectrumClientProtocol(self) + return self.client + + def clientConnectionLost(self,connector,reason): + print('connection lost') + + def clientConnectionFailed(self,connector,reason): + print('connection failed') + +class ElectrumInterface(BlockchainInterface): + BATCH_SIZE = 8 + def __init__(self, testnet=False, electrum_server=None): if testnet: - raise Exception(NotImplemented) - if electrum_server is None: - electrum_server = random.choice(DEFAULT_ELECTRUM_SERVER_LIST) - self.server_domain = electrum_server.split(':')[0] - self.last_sync_unspent = 0 - self.electrum_conn = self.ElectrumConn(electrum_server) - self.electrum_conn.start() - # used to hold open server conn - self.electrum_conn.call_server_method('blockchain.numblocks.subscribe') + set_electrum_testnet() + self.server, self.port = self.get_server(electrum_server) + self.factory = TxElectrumClientProtocolFactory(self) + reactor.connectTCP(self.server, self.port, self.factory) + + def sync_wallet(self, wallet, restart_cb=False): + self.wallet = wallet + #wipe the temporary cache of address histories + self.temp_addr_history = {} + startLogging(sys.stdout) + reactor.run() + + def get_server(self, electrum_server): + if not electrum_server: + electrum_server = random.choice(DEFAULT_SERVERS.keys()) + s = electrum_server + p = int(DEFAULT_SERVERS[electrum_server]['t']) + print('Trying to connect to Electrum server: ' + str(electrum_server)) + return (s, p) def get_from_electrum(self, method, params=[]): params = [params] if type(params) is not list else params - return self.electrum_conn.call_server_method(method, params) + return self.factory.client.call_server_method(method, params) - def sync_addresses(self, wallet): - log.debug("downloading wallet history from electrum server") - for mix_depth in range(wallet.max_mix_depth): + def sync_addresses(self, wallet, restart_cb=None): + log.debug("downloading wallet history from Electrum server ...") + for mixdepth in range(wallet.max_mix_depth): for forchange in [0, 1]: - unused_addr_count = 0 - last_used_addr = '' - while (unused_addr_count < wallet.gaplimit or not is_index_ahead_of_cache(wallet, mix_depth, forchange)): - addr = wallet.get_new_addr(mix_depth, forchange) - addr_hist_info = self.get_from_electrum('blockchain.address.get_history', addr) - if len(addr_hist_info['result']) > 0: - last_used_addr = addr - unused_addr_count = 0 - else: - unused_addr_count += 1 - if last_used_addr == '': - wallet.index[mix_depth][forchange] = 0 + self.synchronize_batch(wallet, mixdepth, forchange, 0) + + def synchronize_batch(self, wallet, mixdepth, forchange, start_index): + log.debug("Syncing address batch, m, fc, i: " + ",".join( + [str(x) for x in [mixdepth, forchange, start_index]])) + if mixdepth not in self.temp_addr_history: + self.temp_addr_history[mixdepth] = {} + if forchange not in self.temp_addr_history[mixdepth]: + self.temp_addr_history[mixdepth][forchange] = {"finished": False} + for i in range(start_index, start_index + self.BATCH_SIZE): + #get_new_addr is OK here, as guaranteed to be sequential *on this branch* + a = wallet.get_new_addr(mixdepth, forchange) + d = self.get_from_electrum('blockchain.address.get_history', a) + d.addCallback(self.process_address_history, wallet, + mixdepth, forchange, i, a, start_index) + #makes sure entries in temporary address history are ready + #to be accessed. + if i not in self.temp_addr_history[mixdepth][forchange]: + self.temp_addr_history[mixdepth][forchange][i] = {'synced': False, + 'addr': a, + 'used': False} + log.info("added tah entry: " + str(self.temp_addr_history[mixdepth][forchange][i])) + + def process_address_history(self, history, wallet, mixdepth, forchange, i, + addr, start_index): + """Given the history data for an address from Electrum, update the current view + of the wallet's usage at mixdepth mixdepth and account forchange, address addr at + index i. Once all addresses from index start_index to start_index + self.BATCH_SIZE + have been thus updated, trigger either continuation to the next batch, or, if + conditions are fulfilled, end syncing for this (mixdepth, forchange) branch, and + if all such branches are finished, proceed to the sync_unspent step. + """ + tah = self.temp_addr_history[mixdepth][forchange] + if len(history['result']) > 0: + tah[i]['used'] = True + tah[i]['synced'] = True + #Having updated this specific record, check if the entire batch from start_index + #has been synchronized + if all([tah[i]['synced'] for i in range(start_index, start_index + self.BATCH_SIZE)]): + #check if unused goes back as much as gaplimit; if so, end, else, continue + #to next batch + if all([tah[i]['used'] is False for i in range( + start_index+self.BATCH_SIZE-wallet.gaplimit, + start_index+self.BATCH_SIZE)]): + last_used_addr = None + #to find last used, note that it may be in the *previous* batch; + #may as well just search from the start, since it takes no time. + for i in range(start_index + self.BATCH_SIZE): + if tah[i]['used']: + last_used_addr = tah[i]['addr'] + if last_used_addr: + wallet.index[mixdepth][forchange] = wallet.addr_cache[last_used_addr][2] + 1 else: - wallet.index[mix_depth][forchange] = wallet.addr_cache[last_used_addr][2] + 1 + wallet.index[mixdepth][forchange] = 0 + tah["finished"] = True + #check if all branches are finished to trigger next stage of sync. + addr_sync_complete = True + for m in range(wallet.max_mix_depth): + for fc in [0, 1]: + if not self.temp_addr_history[m][fc]["finished"]: + addr_sync_complete = False + if addr_sync_complete: + self.sync_unspent(wallet) + else: + #continue search forwards on this branch + self.synchronize_batch(wallet, mixdepth, forchange, start_index + self.BATCH_SIZE) def sync_unspent(self, wallet): # finds utxos in the wallet - st = time.time() - # dont refresh unspent dict more often than 5 minutes - rate_limit_time = 5 * 60 - if st - self.last_sync_unspent < rate_limit_time: - log.debug('electrum sync_unspent() happened too recently (%dsec), skipping' % (st - self.last_sync_unspent)) - return wallet.unspent = {} - addrs = wallet.addr_cache.keys() + #Prepare list of all used addresses + addrs = [] + for m in range(wallet.max_mix_depth): + for fc in [0, 1]: + branch_list = [] + for k, v in self.temp_addr_history[m][fc].iteritems(): + if k == "finished": + continue + if v["used"]: + branch_list.append(v["addr"]) + addrs.extend(branch_list) if len(addrs) == 0: log.debug('no tx used') return + self.listunspent_calls = 0 for a in addrs: - unspent_info = self.get_from_electrum('blockchain.address.listunspent', a) - res = unspent_info['result'] - for u in res: - wallet.unspent[str(u['tx_hash']) + ':' + str(u['tx_pos'])] = {'address': a, 'value': int(u['value'])} - for u in wallet.spent_utxos: - wallet.unspent.pop(u, None) - self.last_sync_unspent = time.time() - log.debug('electrum sync_unspent took ' + str((self.last_sync_unspent - st)) + 'sec') - - def add_tx_notify(self, txd, unconfirmfun, confirmfun, notifyaddr): - unconfirm_timeout = 10 * 60 # seconds - unconfirm_poll_period = 5 - confirm_timeout = 2 * 60 * 60 - confirm_poll_period = 5 * 60 - - class NotifyThread(threading.Thread): - - def __init__(self, blockchaininterface, txd, unconfirmfun, confirmfun): - threading.Thread.__init__(self) - self.daemon = True - self.blockchaininterface = blockchaininterface - self.unconfirmfun = unconfirmfun - self.confirmfun = confirmfun - self.tx_output_set = set([(sv['script'], sv['value']) for sv in txd['outs']]) - self.output_addresses = [btc.script_to_address(scrval[0], get_p2pk_vbyte()) for scrval in self.tx_output_set] - log.debug('txoutset=' + pprint.pformat(self.tx_output_set)) - log.debug('outaddrs=' + ','.join(self.output_addresses)) - - def run(self): - st = int(time.time()) - unconfirmed_txid = None - unconfirmed_txhex = None - while not unconfirmed_txid: - time.sleep(unconfirm_poll_period) - if int(time.time()) - st > unconfirm_timeout: - log.debug('checking for unconfirmed tx timed out') - return - shared_txid = None - for a in self.output_addresses: - unconftx = self.blockchaininterface.get_from_electrum('blockchain.address.get_mempool', a).get('result') - unconftxs = set([str(t['tx_hash']) for t in unconftx]) - if not shared_txid: - shared_txid = unconftxs - else: - shared_txid = shared_txid.intersection(unconftxs) - log.debug('sharedtxid = ' + str(shared_txid)) - if len(shared_txid) == 0: - continue - data = [] - for txid in shared_txid: - txdata = str(self.blockchaininterface.get_from_electrum('blockchain.transaction.get', txid).get('result')) - data.append({'hex':txdata,'id':txid}) - for txdata in data: - txhex = txdata['hex'] - outs = set([(sv['script'], sv['value']) for sv in btc.deserialize(txhex)['outs']]) - log.debug('unconfirm query outs = ' + str(outs)) - if outs == self.tx_output_set: - unconfirmed_txid = txdata['id'] - unconfirmed_txhex = txhex - break - self.unconfirmfun(btc.deserialize(unconfirmed_txhex), unconfirmed_txid) - st = int(time.time()) - confirmed_txid = None - confirmed_txhex = None - while not confirmed_txid: - time.sleep(confirm_poll_period) - if int(time.time()) - st > confirm_timeout: - log.debug('checking for confirmed tx timed out') - return - shared_txid = None - for a in self.output_addresses: - conftx = self.blockchaininterface.get_from_electrum('blockchain.address.listunspent', a).get('result') - conftxs = set([str(t['tx_hash']) for t in conftx]) - if not shared_txid: - shared_txid = conftxs - else: - shared_txid = shared_txid.intersection(conftxs) - log.debug('sharedtxid = ' + str(shared_txid)) - if len(shared_txid) == 0: - continue - data = [] - for txid in shared_txid: - txdata = str(self.blockchaininterface.get_from_electrum('blockchain.transaction.get', txid).get('result')) - data.append({'hex':txdata,'id':txid}) - for txdata in data: - txhex = txdata['hex'] - outs = set([(sv['script'], sv['value']) for sv in btc.deserialize(txhex)['outs']]) - log.debug('confirm query outs = ' + str(outs)) - if outs == self.tx_output_set: - confirmed_txid = txdata['id'] - confirmed_txhex = txhex - break - self.confirmfun(btc.deserialize(confirmed_txhex), confirmed_txid, 1) - - NotifyThread(self, txd, unconfirmfun, confirmfun).start() + d = self.get_from_electrum('blockchain.address.listunspent', a) + d.addCallback(self.process_listunspent_data, wallet, a, len(addrs)) + + def process_listunspent_data(self, unspent_info, wallet, address, n): + self.listunspent_calls += 1 + res = unspent_info['result'] + for u in res: + wallet.unspent[str(u['tx_hash']) + ':' + str( + u['tx_pos'])] = {'address': address, 'value': int(u['value'])} + if self.listunspent_calls == n: + for u in wallet.spent_utxos: + wallet.unspent.pop(u, None) + reactor.stop() def pushtx(self, txhex): brcst_res = self.get_from_electrum('blockchain.transaction.broadcast', txhex) @@ -288,3 +508,99 @@ class ElectrumInterface(BlockchainInterface): fee_per_kb_sat = int(float(fee) * 100000000) return fee_per_kb_sat + def outputs_watcher(self, wallet_name, notifyaddr, tx_output_set, + unconfirmfun, confirmfun, timeoutfun): + """Given a key for the watcher loop (notifyaddr), a wallet name (account), + a set of outputs, and unconfirm, confirm and timeout callbacks, + check to see if a transaction matching that output set has appeared in + the wallet. Call the callbacks and update the watcher loop state. + End the loop when the confirmation has been seen (no spent monitoring here). + """ + wl = self.tx_watcher_loops[notifyaddr] + print('txoutset=' + pprint.pformat(tx_output_set)) + unconftx = self.get_from_electrum('blockchain.address.get_mempool', + notifyaddr).get('result') + unconftxs = set([str(t['tx_hash']) for t in unconftx]) + if len(unconftxs): + txdatas = [] + for txid in unconftxs: + txdatas.append(str(self.get_from_electrum('blockchain.transaction.get', + txid).get('result'))) + unconfirmed_txid = None + for txdata in txdatas: + txhex = txdata['hex'] + outs = set([(sv['script'], sv['value']) for sv in btc.deserialize( + txhex)['outs']]) + print('unconfirm query outs = ' + str(outs)) + if outs == tx_output_set: + unconfirmed_txid = txdata['id'] + unconfirmed_txhex = txhex + break + #call unconf callback if it was found in the mempool + if unconfirmed_txid and not wl[1]: + print("Tx: " + str(real_txid) + " seen on network.") + unconfirmfun(txd, real_txid) + wl[1] = True + return + + conftx = self.get_from_electrum('blockchain.address.listunspent', + notifyaddr).get('result') + conftxs = set([str(t['tx_hash']) for t in conftx]) + if len(conftxs): + txdatas = [] + for txid in conftxs: + txdata = str(self.get_from_electrum('blockchain.transaction.get', + txid).get('result')) + txdatas.append({'hex':txdata,'id':txid}) + confirmed_txid = None + for txdata in txdatas: + txhex = txdata['hex'] + outs = set([(sv['script'], sv['value']) for sv in btc.deserialize( + txhex)['outs']]) + print('confirm query outs = ' + str(outs)) + if outs == tx_output_set: + confirmed_txid = txdata['id'] + confirmed_txhex = txhex + break + if confirmed_txid and not wl[2]: + confirmfun(btc.deserialize(confirmed_txhex), confirmed_txid, 1) + wl[2] = True + wl[0].stop() + return + + def tx_watcher(self, txd, unconfirmfun, confirmfun, spentfun, c, n): + """Called at a polling interval, checks if the given deserialized + transaction (which must be fully signed) is (a) broadcast, (b) confirmed + and (c) spent from at index n, and notifies confirmation if number + of confs = c. + TODO: Deal with conflicts correctly. Here just abandons monitoring. + """ + txid = btc.txhash(btc.serialize(txd)) + wl = self.tx_watcher_loops[txid] + txdata = str(self.get_from_electrum('blockchain.transaction.get', + txid).get('result')) + txdatas.append({'hex':txdata,'id':txid}) + if "confirmations" not in res: + print("Malformed gettx result: " + str(res)) + return + if not wl[1] and res["confirmations"] == 0: + print("Tx: " + str(txid) + " seen on network.") + unconfirmfun(txd, txid) + wl[1] = True + return + if not wl[2] and res["confirmations"] > 0: + print("Tx: " + str(txid) + " has " + str( + res["confirmations"]) + " confirmations.") + confirmfun(txd, txid, res["confirmations"]) + if c <= res["confirmations"]: + wl[2] = True + #Note we do not stop the monitoring loop when + #confirmations occur, since we are also monitoring for spending. + return + if res["confirmations"] < 0: + print("Tx: " + str(txid) + " has a conflict. Abandoning.") + wl[0].stop() + return + if not spentfun or wl[3]: + return +