Browse Source

add stallMonitor to retry txs in tumbler

master
Adam Gibson 9 years ago
parent
commit
72e155e1ab
No known key found for this signature in database
GPG Key ID: B3AE09F1E9A3197A
  1. 2
      README.md
  2. 33
      jmclient/jmclient/client_protocol.py
  3. 8
      jmclient/jmclient/taker.py
  4. 28
      scripts/joinmarket-qt.py
  5. 9
      scripts/tumbler.py
  6. 6
      test/regtest_joinmarket.cfg

2
README.md

@ -23,7 +23,7 @@ Joinmarket's own [messaging protocol](https://github.com/JoinMarket-Org/JoinMark
The client and server currently communicate using twisted.protocol.amp, see
[AMP](https://amp-protocol.net/),
and the specification of the communication between the client and server is isolated to
[this](https://github.com/AdamISZ/joinmarket-clientserver/blob/master/jmbase/commands.py) module.
[this](https://github.com/AdamISZ/joinmarket-clientserver/blob/master/jmbase/jmbase/commands.py) module.
Currently the messaging layer of Joinmarket is IRC-only (but easily extensible, see [here](https://github.com/JoinMarket-Org/joinmarket/issues/650).
The IRC layer is also implemented here using Twisted, reducing the complexity required with threading.

33
jmclient/jmclient/client_protocol.py

@ -80,6 +80,12 @@ class JMTakerClientProtocol(amp.AMP):
irc_configs = get_irc_mchannels()
minmakers = jm_single().config.getint("POLICY", "minimum_makers")
maker_timeout_sec = jm_single().maker_timeout_sec
#To avoid creating yet another config variable, we set the timeout
#to 20 * maker_timeout_sec.
reactor.callLater(20*maker_timeout_sec, self.stallMonitor,
self.taker.schedule_index+1)
d = self.callRemote(commands.JMInit,
bcsource=blockchain_source,
network=network,
@ -88,6 +94,33 @@ class JMTakerClientProtocol(amp.AMP):
maker_timeout_sec=maker_timeout_sec)
self.defaultCallbacks(d)
def stallMonitor(self, schedule_index):
"""Diagnoses whether long wait is due to any kind of failure;
if so, calls the taker on_finished_callback with a failure
flag so that the transaction can be re-tried or abandoned, as desired.
Note that this *MUST* not trigger any action once the coinjoin transaction
is seen on the network (hence waiting_for_conf).
The schedule index parameter tells us whether the processing has moved
on to the next item before we were woken up.
"""
jlog.info("STALL MONITOR:")
if not self.taker.schedule_index == schedule_index:
#TODO pre-initialize() ?
jlog.info("No stall detected, continuing")
return
if self.taker.waiting_for_conf:
#Don't restart if the tx is already on the network!
jlog.info("No stall detected, continuing")
return
if not self.taker.txid:
#txid is set on pushing; if it's not there, we have failed.
jlog.info("Stall detected. Regenerating transactions and retrying.")
self.taker.on_finished_callback(False, True, 0.0)
else:
#This shouldn't really happen; if the tx confirmed,
#the finished callback should already be called.
jlog.info("Tx was already pushed; ignoring")
def set_nick(self):
self.nick_pubkey = btc.privtopub(self.nick_priv)
self.nick_pkh_raw = hashlib.sha256(self.nick_pubkey).digest()[

8
jmclient/jmclient/taker.py

@ -44,6 +44,7 @@ class Taker(object):
self.schedule = schedule
self.order_chooser = order_chooser
self.ignored_makers = None
self.waiting_for_conf = False
self.txid = None
self.schedule_index = -1
#allow custom wallet-based clients to use their own signing code;
@ -140,6 +141,11 @@ class Taker(object):
self.orderbook, self.total_cj_fee = choose_orders(
orderbook, self.cjamount, self.n_counterparties, self.order_chooser,
self.ignored_makers)
if self.orderbook is None:
#Failure to get an orderbook means order selection failed
#for some reason; no action is taken, we let the stallMonitor
# + the finished callback decide whether to retry.
return False
if self.filter_orders_callback:
accepted = self.filter_orders_callback([self.orderbook,
self.total_cj_fee],
@ -593,8 +599,10 @@ class Taker(object):
def unconfirm_callback(self, txd, txid):
jlog.debug("Transaction seen on network, waiting for confirmation")
self.waiting_for_conf = True
def confirm_callback(self, txd, txid, confirmations):
self.waiting_for_conf = False
jlog.debug("Confirmed callback in taker, confs: " + str(confirmations))
fromtx=False if self.schedule_index + 1 == len(self.schedule) else True
waittime = self.schedule[self.schedule_index][4]

28
scripts/joinmarket-qt.py

@ -50,7 +50,7 @@ from jmclient import (load_program_config, get_network, Wallet,
JMTakerClientProtocolFactory, WalletError,
start_reactor, get_schedule, get_tumble_schedule,
schedule_to_text, mn_decode, mn_encode, create_wallet_file,
get_blockchain_interface_instance)
get_blockchain_interface_instance, sync_wallet)
from qtsupport import (ScheduleWizard, warnings, config_tips, config_types,
TaskThread, QtHandler, XStream, Buttons, CloseButton,
@ -178,7 +178,7 @@ class SettingsTab(QDialog):
#an awkward design element from the core code: maker_timeout_sec
#is set outside the config, if it doesn't exist in the config.
#Add it here and it will be in the newly updated config file.
if section == 'MESSAGING' and 'maker_timeout_sec' not in [
if section == 'TIMEOUT' and 'maker_timeout_sec' not in [
_[0] for _ in pairs
]:
jm_single().config.set(section, 'maker_timeout_sec', '60')
@ -236,6 +236,9 @@ class SettingsTab(QDialog):
if str(t[0].text()) == 'blockchain_source':
jm_single().bc_interface = get_blockchain_interface_instance(
jm_single().config)
if str(t[0].text()) == 'maker_timeout_sec':
jm_single().maker_timeout_sec = int(t[1].text())
log.debug("Set maker timeout sec to : " + str(jm_single().maker_timeout_sec))
def getSettingsFields(self, section, names):
results = []
@ -486,18 +489,14 @@ class SpendTab(QWidget):
JMQtMessageBox(
self,
"Connecting to IRC.\nView real-time log in the lower pane.",
title="Sendpayment")
title="Coinjoin starting")
self.toggleButtons(False, sched=multiple)
log.debug('starting coinjoin ..')
w.statusBar().showMessage("Syncing wallet ...")
if jm_single().config.get("BLOCKCHAIN", "blockchain_source") not in [
"blockr", "bc.i", "electrum-server"]:
jm_single().bc_interface.sync_wallet(w.wallet, fast=True)
else:
jm_single().bc_interface.sync_wallet(w.wallet)
sync_wallet(w.wallet, fast=True)
if not multiple:
destaddr = str(self.widgets[0][1].text())
#convert from bitcoins (enforced by QDoubleValidator) to satoshis
@ -659,7 +658,7 @@ class SpendTab(QWidget):
self.giveUp()
def startNextTransaction(self):
jm_single().bc_interface.sync_wallet(w.wallet)
sync_wallet(w.wallet, fast=True)
self.clientfactory.getClient().clientStart()
def takerFinished(self):
@ -678,7 +677,14 @@ class SpendTab(QWidget):
QtCore.QTimer.singleShot(self.taker_finished_waittime,
self.startNextTransaction)
else:
#a transaction failed; just stop
#a transaction failed to reach broadcast;
#restart processing from the failed schedule entry;
#note that for some failure vectors this is essentially
#an infinite loop, but the user can abort any time (or
#modify the wallet e.g. to add commitment utxos).
self.taker.schedule_index -= 1
log.info("Transaction failed after timeout, trying again")
QtCore.QTimer.singleShot(0, self.startNextTransaction)
self.giveUp()
else:
#the final, or a permanent failure
@ -1274,7 +1280,7 @@ class JMMainWindow(QMainWindow):
jm_single().config.set('POLICY', 'listunspent_args', '[0]')
assert self.wallet, "No wallet loaded"
thread = TaskThread(self)
task = partial(jm_single().bc_interface.sync_wallet, self.wallet)
task = partial(sync_wallet, self.wallet, True)
thread.add(task, on_done=self.updateWalletInfo)
self.statusBar().showMessage("Reading wallet from blockchain ...")
return True

9
scripts/tumbler.py

@ -72,8 +72,11 @@ def main():
log.info("Waiting for: " + str(waittime) + " seconds.")
reactor.callLater(waittime, clientfactory.getClient().clientStart)
else:
#a transaction failed; just stop
reactor.stop()
#a transaction failed; tumbler is aggressive in trying to
#complete, so restart processing from the failed schedule entry:
clientfactory.getClient().taker.schedule_index -= 1
log.info("Transaction failed after timeout, trying again")
reactor.callLater(0, clientfactory.getClient().clientStart)
else:
if not res:
log.info("Did not complete successfully, shutting down")
@ -84,6 +87,7 @@ def main():
#to allow testing of confirm/unconfirm callback for multiple txs
if isinstance(jm_single().bc_interface, RegtestBitcoinCoreInterface):
jm_single().bc_interface.tick_forward_chain_interval = 10
jm_single().maker_timeout_sec = 5
#instantiate Taker with given schedule and run
taker = Taker(wallet,
@ -97,7 +101,6 @@ def main():
jm_single().config.getint("DAEMON", "daemon_port"),
clientfactory, daemon=daemon)
if __name__ == "__main__":
main()
print('done')

6
test/regtest_joinmarket.cfg

@ -1,5 +1,11 @@
#NOTE: This configuration file is for testing with regtest only
#For mainnet usage, running a JoinMarket script will create the default file
[DAEMON]
no_daemon = 1
daemon_port = 27183
daemon_host = localhost
use_ssl = false
[BLOCKCHAIN]
blockchain_source = regtest
rpc_host = localhost

Loading…
Cancel
Save