diff --git a/jmbitcoin/jmbitcoin/secp256k1_transaction.py b/jmbitcoin/jmbitcoin/secp256k1_transaction.py index 1c7bf63..40b5bbf 100644 --- a/jmbitcoin/jmbitcoin/secp256k1_transaction.py +++ b/jmbitcoin/jmbitcoin/secp256k1_transaction.py @@ -327,12 +327,16 @@ def address_to_script(addr): # Output script to address representation +def is_p2pkh_script(script): + if script[:3] == b'\x76\xa9\x14' and script[-2:] == b'\x88\xac' and len( + script) == 25: + return True + return False def script_to_address(script, vbyte=0): if re.match('^[0-9a-fA-F]*$', script): script = binascii.unhexlify(script) - if script[:3] == b'\x76\xa9\x14' and script[-2:] == b'\x88\xac' and len( - script) == 25: + if is_p2pkh_script(script): return bin_to_b58check(script[3:-2], vbyte) # pubkey hash addresses else: # BIP0016 scripthash addresses: requires explicit vbyte set diff --git a/jmclient/jmclient/blockchaininterface.py b/jmclient/jmclient/blockchaininterface.py index 8f7dfd9..35e2668 100644 --- a/jmclient/jmclient/blockchaininterface.py +++ b/jmclient/jmclient/blockchaininterface.py @@ -49,6 +49,10 @@ class BlockchainInterface(object): self.sync_addresses(wallet, restart_cb) self.sync_unspent(wallet) + @staticmethod + def get_wallet_name(wallet): + return 'joinmarket-wallet-' + btc.dbl_sha256(wallet.keys[0][0])[:6] + @abc.abstractmethod def sync_addresses(self, wallet): """Finds which addresses have been used and sets @@ -75,18 +79,23 @@ class BlockchainInterface(object): """ if not vb: vb = get_p2pk_vbyte() - one_addr_imported = False - for outs in txd['outs']: - addr = btc.script_to_address(outs['script'], vb) - if self.rpc('getaccount', [addr]) != '': - one_addr_imported = True - break - if not one_addr_imported: - self.rpc('importaddress', [notifyaddr, 'joinmarket-notify', False]) + if isinstance(self, BitcoinCoreInterface) or isinstance(self, + RegtestBitcoinCoreInterface): + #This code ensures that a walletnotify is triggered, by + #ensuring that at least one of the output addresses is + #imported into the wallet (note the sweep special case, where + #none of the output addresses belong to me). + one_addr_imported = False + for outs in txd['outs']: + addr = btc.script_to_address(outs['script'], vb) + if self.rpc('getaccount', [addr]) != '': + one_addr_imported = True + break + if not one_addr_imported: + self.rpc('importaddress', [notifyaddr, 'joinmarket-notify', False]) #Warning! In case of txid_flag false, this is *not* a valid txid, - #but only a hash of an incomplete transaction serialization; but, - #it still suffices as a unique key for tracking, in this case. + #but only a hash of an incomplete transaction serialization. txid = btc.txhash(btc.serialize(txd)) if not txid_flag: tx_output_set = set([(sv['script'], sv['value']) for sv in txd['outs']]) @@ -283,10 +292,6 @@ class BitcoinCoreInterface(BlockchainInterface): #spent true/false), ..} self.tx_watcher_loops = {} - @staticmethod - def get_wallet_name(wallet): - return 'joinmarket-wallet-' + btc.dbl_sha256(wallet.keys[0][0])[:6] - def get_block(self, blockheight): """Returns full serialized block at a given height. """ diff --git a/jmclient/jmclient/client_protocol.py b/jmclient/jmclient/client_protocol.py index 0266476..c996d24 100644 --- a/jmclient/jmclient/client_protocol.py +++ b/jmclient/jmclient/client_protocol.py @@ -1,7 +1,7 @@ #! /usr/bin/env python from __future__ import print_function from twisted.python.log import startLogging, err -from twisted.internet import protocol, reactor +from twisted.internet import protocol, reactor, task from twisted.internet.task import LoopingCall from twisted.internet.error import (ConnectionLost, ConnectionAborted, ConnectionClosed, ConnectionDone) @@ -143,11 +143,26 @@ class JMMakerClientProtocol(JMClientProtocol): @commands.JMUp.responder def on_JM_UP(self): + #wait until ready locally to submit offers (can be delayed + #if wallet sync is slow). + self.offers_ready_loop_counter = 0 + self.offers_ready_loop = task.LoopingCall(self.submitOffers) + self.offers_ready_loop.start(2.0) + return {'accepted': True} + + def submitOffers(self): + self.offers_ready_loop_counter += 1 + if self.offers_ready_loop_counter == 300: + jlog.info("Failed to start after 10 minutes, giving up.") + self.offers_ready_loop.stop() + reactor.stop() + if not self.client.offerlist: + return + self.offers_ready_loop.stop() d = self.callRemote(commands.JMSetup, role="MAKER", initdata=json.dumps(self.client.offerlist)) self.defaultCallbacks(d) - return {'accepted': True} @commands.JMSetupDone.responder def on_JM_SETUP_DONE(self): @@ -257,10 +272,17 @@ class JMMakerClientProtocol(JMClientProtocol): if not offerinfo: jlog.info("Failed to find notified unconfirmed transaction: " + txid) return + jm_single().bc_interface.wallet_synced = False jm_single().bc_interface.sync_unspent(self.client.wallet) jlog.info('tx in a block: ' + txid) - #TODO track the earning - #jlog.info('earned = ' + str(self.real_cjfee - self.txfee)) + self.wait_for_sync_loop = task.LoopingCall(self.modify_orders, offerinfo, + confirmations, txid) + self.wait_for_sync_loop.start(2.0) + + def modify_orders(self, offerinfo, confirmations, txid): + if not jm_single().bc_interface.wallet_synced: + return + self.wait_for_sync_loop.stop() to_cancel, to_announce = self.client.on_tx_confirmed(offerinfo, confirmations, txid) self.client.modify_orders(to_cancel, to_announce) diff --git a/jmclient/jmclient/electruminterface.py b/jmclient/jmclient/electruminterface.py index a711c66..ca87c99 100644 --- a/jmclient/jmclient/electruminterface.py +++ b/jmclient/jmclient/electruminterface.py @@ -13,7 +13,7 @@ from twisted.internet.protocol import ClientFactory, Protocol from twisted.protocols.basic import LineReceiver from twisted.internet import reactor, task, defer from .blockchaininterface import BlockchainInterface, is_index_ahead_of_cache -from .configure import get_p2pk_vbyte +from .configure import get_p2sh_vbyte from .support import get_log log = get_log() @@ -264,6 +264,8 @@ def set_electrum_testnet(): 'testnet.hsmiths.com': {'t':'53011', 's':'53012'}, 'electrum.akinbo.org': {'t':'51001', 's':'51002'}, 'ELEX05.blackpole.online': {'t':'52011', 's':'52002'},} + #Replace with for regtest: + #'localhost': {'t': '50001', 's': '51002'},} class TxElectrumClientProtocol(LineReceiver): #map deferreds to msgids to correctly link response with request @@ -285,7 +287,6 @@ class TxElectrumClientProtocol(LineReceiver): pingloop.start(60.0) def ping(self): - print('sending server ping') #We dont bother tracking response to this; #just for keeping connection active self.call_server_method('server.version') @@ -330,21 +331,92 @@ class TxElectrumClientProtocolFactory(ClientFactory): def clientConnectionFailed(self,connector,reason): print('connection failed') +class ElectrumConn(threading.Thread): + + def __init__(self, server, port): + threading.Thread.__init__(self) + self.daemon = True + self.msg_id = 0 + self.RetQueue = Queue.Queue() + try: + self.s = socket.create_connection((server,int(port))) + except Exception as e: + log.error("Error connecting to electrum server. " + "Try again to connect to a random server or set a " + "server in the config.") + os._exit(1) + self.ping() + + def run(self): + while True: + all_data = None + while True: + data = self.s.recv(1024) + if data is None: + continue + if all_data is None: + all_data = data + else: + all_data = all_data + data + if '\n' in all_data: + break + data_json = json.loads(all_data[:-1].decode()) + self.RetQueue.put(data_json) + + def ping(self): + log.debug('Sending Electrum server ping') + self.send_json({'id':0,'method':'server.version','params':[]}) + t = threading.Timer(60, self.ping) + t.daemon = True + t.start() + + def send_json(self, json_data): + data = json.dumps(json_data).encode() + self.s.send(data + b'\n') + + def call_server_method(self, method, params=[]): + self.msg_id = self.msg_id + 1 + current_id = self.msg_id + method_dict = { + 'id': current_id, + 'method': method, + 'params': params + } + self.send_json(method_dict) + while True: + ret_data = self.RetQueue.get() + if ret_data.get('id', None) == current_id: + return ret_data + else: + log.debug(json.dumps(ret_data)) + class ElectrumInterface(BlockchainInterface): BATCH_SIZE = 8 def __init__(self, testnet=False, electrum_server=None): + self.synctype = "sync-only" if testnet: set_electrum_testnet() self.server, self.port = self.get_server(electrum_server) self.factory = TxElectrumClientProtocolFactory(self) reactor.connectTCP(self.server, self.port, self.factory) + #start the thread for blocking calls during execution + self.electrum_conn = ElectrumConn(self.server, self.port) + self.electrum_conn.start() + #used to hold open server conn + self.electrum_conn.call_server_method('blockchain.numblocks.subscribe') + #task.LoopingCall objects that track transactions, keyed by txids. + #Format: {"txid": (loop, unconfirmed true/false, confirmed true/false, + #spent true/false), ..} + self.tx_watcher_loops = {} + self.wallet_synced = False def sync_wallet(self, wallet, restart_cb=False): self.wallet = wallet #wipe the temporary cache of address histories self.temp_addr_history = {} - startLogging(sys.stdout) - reactor.run() + if self.synctype == "sync-only": + startLogging(sys.stdout) + reactor.run() def get_server(self, electrum_server): if not electrum_server: @@ -354,9 +426,12 @@ class ElectrumInterface(BlockchainInterface): print('Trying to connect to Electrum server: ' + str(electrum_server)) return (s, p) - def get_from_electrum(self, method, params=[]): + def get_from_electrum(self, method, params=[], blocking=False): params = [params] if type(params) is not list else params - return self.factory.client.call_server_method(method, params) + if blocking: + return self.electrum_conn.call_server_method(method, params) + else: + return self.factory.client.call_server_method(method, params) def sync_addresses(self, wallet, restart_cb=None): log.debug("downloading wallet history from Electrum server ...") @@ -365,8 +440,9 @@ class ElectrumInterface(BlockchainInterface): self.synchronize_batch(wallet, mixdepth, forchange, 0) def synchronize_batch(self, wallet, mixdepth, forchange, start_index): - log.debug("Syncing address batch, m, fc, i: " + ",".join( - [str(x) for x in [mixdepth, forchange, start_index]])) + #for debugging only: + #log.debug("Syncing address batch, m, fc, i: " + ",".join( + # [str(x) for x in [mixdepth, forchange, start_index]])) if mixdepth not in self.temp_addr_history: self.temp_addr_history[mixdepth] = {} if forchange not in self.temp_addr_history[mixdepth]: @@ -383,7 +459,6 @@ class ElectrumInterface(BlockchainInterface): self.temp_addr_history[mixdepth][forchange][i] = {'synced': False, 'addr': a, 'used': False} - log.info("added tah entry: " + str(self.temp_addr_history[mixdepth][forchange][i])) def process_address_history(self, history, wallet, mixdepth, forchange, i, addr, start_index): @@ -445,7 +520,12 @@ class ElectrumInterface(BlockchainInterface): addrs.extend(branch_list) if len(addrs) == 0: log.debug('no tx used') + if self.synctype == 'sync-only': + reactor.stop() return + #make sure to add any addresses during the run (a subset of those + #added to the address cache) + addrs = list(set(self.wallet.addr_cache.keys()).union(set(addrs))) self.listunspent_calls = 0 for a in addrs: d = self.get_from_electrum('blockchain.address.listunspent', a) @@ -460,10 +540,13 @@ class ElectrumInterface(BlockchainInterface): if self.listunspent_calls == n: for u in wallet.spent_utxos: wallet.unspent.pop(u, None) - reactor.stop() + self.wallet_synced = True + if self.synctype == "sync-only": + reactor.stop() def pushtx(self, txhex): - brcst_res = self.get_from_electrum('blockchain.transaction.broadcast', txhex) + brcst_res = self.get_from_electrum('blockchain.transaction.broadcast', + txhex, blocking=True) brcst_status = brcst_res['result'] if isinstance(brcst_status, str) and len(brcst_status) == 64: return (True, brcst_status) @@ -472,14 +555,16 @@ class ElectrumInterface(BlockchainInterface): def query_utxo_set(self, txout, includeconf=False): self.current_height = self.get_from_electrum( - "blockchain.numblocks.subscribe")['result'] + "blockchain.numblocks.subscribe", blocking=True)['result'] if not isinstance(txout, list): txout = [txout] utxos = [[t[:64],int(t[65:])] for t in txout] result = [] for ut in utxos: - address = self.get_from_electrum("blockchain.utxo.get_address", ut)['result'] - utxo_info = self.get_from_electrum("blockchain.address.listunspent", address)['result'] + address = self.get_from_electrum("blockchain.utxo.get_address", + ut, blocking=True)['result'] + utxo_info = self.get_from_electrum("blockchain.address.listunspent", + address, blocking=True)['result'] utxo = None for u in utxo_info: if u['tx_hash'] == ut[0] and u['tx_pos'] == ut[1]: @@ -503,7 +588,11 @@ class ElectrumInterface(BlockchainInterface): return result def estimate_fee_per_kb(self, N): - fee_info = self.get_from_electrum('blockchain.estimatefee', N) + print("N is: " + str(N)) + if super(ElectrumInterface, self).fee_per_kb_has_been_manually_set(N): + return int(random.uniform(N * float(0.8), N * float(1.2))) + fee_info = self.get_from_electrum('blockchain.estimatefee', N, blocking=True) + print('got fee info result: ' + str(fee_info)) fee = fee_info.get('result') fee_per_kb_sat = int(float(fee) * 100000000) return fee_per_kb_sat @@ -519,13 +608,15 @@ class ElectrumInterface(BlockchainInterface): wl = self.tx_watcher_loops[notifyaddr] print('txoutset=' + pprint.pformat(tx_output_set)) unconftx = self.get_from_electrum('blockchain.address.get_mempool', - notifyaddr).get('result') + notifyaddr, blocking=True).get('result') unconftxs = set([str(t['tx_hash']) for t in unconftx]) if len(unconftxs): txdatas = [] for txid in unconftxs: - txdatas.append(str(self.get_from_electrum('blockchain.transaction.get', - txid).get('result'))) + txdatas.append({'id': txid, + 'hex':str(self.get_from_electrum( + 'blockchain.transaction.get',txid, + blocking=True).get('result'))}) unconfirmed_txid = None for txdata in txdatas: txhex = txdata['hex'] @@ -538,19 +629,19 @@ class ElectrumInterface(BlockchainInterface): break #call unconf callback if it was found in the mempool if unconfirmed_txid and not wl[1]: - print("Tx: " + str(real_txid) + " seen on network.") - unconfirmfun(txd, real_txid) + print("Tx: " + str(unconfirmed_txid) + " seen on network.") + unconfirmfun(btc.deserialize(unconfirmed_txhex), unconfirmed_txid) wl[1] = True return conftx = self.get_from_electrum('blockchain.address.listunspent', - notifyaddr).get('result') + notifyaddr, blocking=True).get('result') conftxs = set([str(t['tx_hash']) for t in conftx]) if len(conftxs): txdatas = [] for txid in conftxs: txdata = str(self.get_from_electrum('blockchain.transaction.get', - txid).get('result')) + txid, blocking=True).get('result')) txdatas.append({'hex':txdata,'id':txid}) confirmed_txid = None for txdata in txdatas: @@ -571,35 +662,44 @@ class ElectrumInterface(BlockchainInterface): def tx_watcher(self, txd, unconfirmfun, confirmfun, spentfun, c, n): """Called at a polling interval, checks if the given deserialized transaction (which must be fully signed) is (a) broadcast, (b) confirmed - and (c) spent from at index n, and notifies confirmation if number - of confs = c. - TODO: Deal with conflicts correctly. Here just abandons monitoring. + and (c) spent from. (c, n ignored in electrum version, just supports + registering first confirmation). + TODO: There is no handling of conflicts here. """ txid = btc.txhash(btc.serialize(txd)) wl = self.tx_watcher_loops[txid] - txdata = str(self.get_from_electrum('blockchain.transaction.get', - txid).get('result')) - txdatas.append({'hex':txdata,'id':txid}) - if "confirmations" not in res: - print("Malformed gettx result: " + str(res)) + #first check if in mempool (unconfirmed) + #choose an output address for the query. Filter out + #p2pkh addresses, assume p2sh (thus would fail to find tx on + #some nonstandard script type) + addr = None + for i in range(len(txd['outs'])): + if not btc.is_p2pkh_script(txd['outs'][i]['script']): + addr = btc.script_to_address(txd['outs'][i]['script'], get_p2sh_vbyte()) + break + if not addr: + log.error("Failed to find any p2sh output, cannot be a standard " + "joinmarket transaction, fatal error!") + reactor.stop() return - if not wl[1] and res["confirmations"] == 0: + unconftxs_res = self.get_from_electrum('blockchain.address.get_mempool', + addr, blocking=True).get('result') + unconftxs = [str(t['tx_hash']) for t in unconftxs_res] + + if not wl[1] and txid in unconftxs: print("Tx: " + str(txid) + " seen on network.") unconfirmfun(txd, txid) wl[1] = True return - if not wl[2] and res["confirmations"] > 0: - print("Tx: " + str(txid) + " has " + str( - res["confirmations"]) + " confirmations.") - confirmfun(txd, txid, res["confirmations"]) - if c <= res["confirmations"]: - wl[2] = True - #Note we do not stop the monitoring loop when - #confirmations occur, since we are also monitoring for spending. - return - if res["confirmations"] < 0: - print("Tx: " + str(txid) + " has a conflict. Abandoning.") - wl[0].stop() + conftx = self.get_from_electrum('blockchain.address.listunspent', + addr, blocking=True).get('result') + conftxs = [str(t['tx_hash']) for t in conftx] + if not wl[2] and len(conftxs) and txid in conftxs: + print("Tx: " + str(txid) + " is confirmed.") + confirmfun(txd, txid, 1) + wl[2] = True + #Note we do not stop the monitoring loop when + #confirmations occur, since we are also monitoring for spending. return if not spentfun or wl[3]: return diff --git a/jmclient/jmclient/maker.py b/jmclient/jmclient/maker.py index e1b70fe..e099ea9 100644 --- a/jmclient/jmclient/maker.py +++ b/jmclient/jmclient/maker.py @@ -15,18 +15,27 @@ from jmclient.support import (calc_cj_fee) from jmclient.wallet import estimate_tx_fee from jmclient.podle import (generate_podle, get_podle_commitments, verify_podle, PoDLE, PoDLEError, generate_podle_error_string) +from twisted.internet import task jlog = get_log() - + class Maker(object): def __init__(self, wallet): self.active_orders = {} self.wallet = wallet self.nextoid = -1 + self.sync_wait_loop = task.LoopingCall(self.try_to_create_my_orders) + self.sync_wait_loop.start(2.0) + self.offerlist = None + self.aborted = False + + def try_to_create_my_orders(self): + if not jm_single().bc_interface.wallet_synced: + return self.offerlist = self.create_my_orders() + self.sync_wait_loop.stop() if not self.offerlist: - #If we cannot create an offer at startup, quit + jlog.info("Failed to create offers, giving up.") sys.exit(0) - self.aborted = False def on_auth_received(self, nick, offer, commitment, cr, amount, kphex): """Receives data on proposed transaction offer from daemon, verifies diff --git a/jmclient/jmclient/yieldgenerator.py b/jmclient/jmclient/yieldgenerator.py index b2623a0..df08851 100644 --- a/jmclient/jmclient/yieldgenerator.py +++ b/jmclient/jmclient/yieldgenerator.py @@ -5,6 +5,7 @@ import datetime import os import time import abc +from twisted.python.log import startLogging from optparse import OptionParser from jmbase import get_password from jmclient import (Maker, jm_single, get_network, load_program_config, get_log, @@ -248,6 +249,8 @@ def ygmain(ygclass, txfee=1000, cjfee_a=200, cjfee_r=0.002, ordertype='swreloffe print("Failed to load wallet, error message: " + repr(e)) sys.exit(0) break + if jm_single().config.get("BLOCKCHAIN", "blockchain_source") == "electrum-server": + jm_single().bc_interface.synctype = "with-script" sync_wallet(wallet, fast=options.fastsync) maker = ygclass(wallet, [options.txfee, cjfee_a, cjfee_r, @@ -257,6 +260,8 @@ def ygmain(ygclass, txfee=1000, cjfee_a=200, cjfee_r=0.002, ordertype='swreloffe nodaemon = jm_single().config.getint("DAEMON", "no_daemon") daemon = True if nodaemon == 1 else False + if jm_single().config.get("BLOCKCHAIN", "network") in ["regtest", "testnet"]: + startLogging(sys.stdout) start_reactor(jm_single().config.get("DAEMON", "daemon_host"), jm_single().config.getint("DAEMON", "daemon_port"), clientfactory, daemon=daemon) diff --git a/scripts/sendpayment.py b/scripts/sendpayment.py index 24f4878..26aee83 100644 --- a/scripts/sendpayment.py +++ b/scripts/sendpayment.py @@ -25,7 +25,7 @@ from jmclient import (Taker, load_program_config, get_schedule, Wallet, BitcoinCoreWallet, sync_wallet, RegtestBitcoinCoreInterface, estimate_tx_fee, direct_send, SegwitWallet) - +from twisted.python.log import startLogging from jmbase.support import get_log, debug_dump_object, get_password from cli_options import get_sendpayment_parser @@ -144,8 +144,11 @@ def main(): break else: wallet = BitcoinCoreWallet(fromaccount=wallet_name) + if jm_single().config.get("BLOCKCHAIN", + "blockchain_source") == "electrum-server" and options.makercount != 0: + jm_single().bc_interface.synctype = "with-script" + #wallet sync will now only occur on reactor start if we're joining. sync_wallet(wallet, fast=options.fastsync) - if options.makercount == 0: if isinstance(wallet, BitcoinCoreWallet): raise NotImplementedError("Direct send only supported for JM wallets") @@ -210,6 +213,8 @@ def main(): clientfactory = JMClientProtocolFactory(taker) nodaemon = jm_single().config.getint("DAEMON", "no_daemon") daemon = True if nodaemon == 1 else False + if jm_single().config.get("BLOCKCHAIN", "network") in ["regtest", "testnet"]: + startLogging(sys.stdout) start_reactor(jm_single().config.get("DAEMON", "daemon_host"), jm_single().config.getint("DAEMON", "daemon_port"), clientfactory, daemon=daemon) diff --git a/scripts/tumbler.py b/scripts/tumbler.py index e16c3d3..2f1cf12 100644 --- a/scripts/tumbler.py +++ b/scripts/tumbler.py @@ -11,7 +11,7 @@ import os import pprint import copy import logging - +from twisted.python.log import startLogging from jmclient import (Taker, load_program_config, get_schedule, weighted_order_choose, JMClientProtocolFactory, start_reactor, validate_address, jm_single, WalletError, @@ -53,6 +53,9 @@ def main(): print("Failed to load wallet, error message: " + repr(e)) sys.exit(0) break + if jm_single().config.get("BLOCKCHAIN", + "blockchain_source") == "electrum-server": + jm_single().bc_interface.synctype = "with-script" sync_wallet(wallet, fast=options['fastsync']) #Parse options and generate schedule @@ -133,6 +136,8 @@ def main(): clientfactory = JMClientProtocolFactory(taker) nodaemon = jm_single().config.getint("DAEMON", "no_daemon") daemon = True if nodaemon == 1 else False + if jm_single().config.get("BLOCKCHAIN", "network") in ["regtest", "testnet"]: + startLogging(sys.stdout) start_reactor(jm_single().config.get("DAEMON", "daemon_host"), jm_single().config.getint("DAEMON", "daemon_port"), clientfactory, daemon=daemon)