From ad0f407d0c0ac2e24821383569b7b8698ed18889 Mon Sep 17 00:00:00 2001 From: Adam Gibson Date: Tue, 7 Feb 2017 15:24:13 +0200 Subject: [PATCH] Tweak tumble schedule on failure to retry more effectively. Separate tumbler log. Also change schedules to lists from tuples for modification. --- jmclient/jmclient/__init__.py | 3 +- jmclient/jmclient/schedule.py | 84 ++++++++++++++++++++++++++++++---- jmclient/jmclient/taker.py | 10 +++- jmclient/test/test_schedule.py | 58 +++++++++++++++++++++-- jmclient/test/test_taker.py | 6 +-- scripts/joinmarket-qt.py | 2 +- scripts/sendpayment.py | 2 +- scripts/tumbler.py | 45 +++++++++++++++--- 8 files changed, 182 insertions(+), 28 deletions(-) diff --git a/jmclient/jmclient/__init__.py b/jmclient/jmclient/__init__.py index 5b13e43..1895f9b 100644 --- a/jmclient/jmclient/__init__.py +++ b/jmclient/jmclient/__init__.py @@ -30,7 +30,8 @@ from .podle import (set_commitment_file, get_commitment_file, generate_podle_error_string, add_external_commitments, PoDLE, generate_podle, get_podle_commitments, update_commitments) -from .schedule import get_schedule, get_tumble_schedule, schedule_to_text +from .schedule import (get_schedule, get_tumble_schedule, schedule_to_text, + tweak_tumble_schedule) from .commitment_utils import get_utxo_info, validate_utxo_data, quit # Set default logging handler to avoid "No handler found" warnings. diff --git a/jmclient/jmclient/schedule.py b/jmclient/jmclient/schedule.py index dcd81d4..435591a 100644 --- a/jmclient/jmclient/schedule.py +++ b/jmclient/jmclient/schedule.py @@ -1,7 +1,9 @@ #!/usr/bin/env python from __future__ import print_function +import copy +from pprint import pformat from jmclient import (validate_address, rand_exp_array, - rand_norm_array, rand_pow_array) + rand_norm_array, rand_pow_array, jm_single) """Utility functions for dealing with Taker schedules. - get_schedule(filename): @@ -24,7 +26,12 @@ def get_schedule(filename): return (False, "Failed to parse schedule line: " + sl) try: mixdepth = int(mixdepth) - amount = int(amount) + #TODO this isn't the right way, but floats must be allowed + #for any persisted tumbler-style schedule + if "." in amount: + amount = float(amount) + else: + amount = int(amount) makercount = int(makercount) destaddr = destaddr.strip() waittime = float(waittime) @@ -34,9 +41,17 @@ def get_schedule(filename): success, errmsg = validate_address(destaddr) if not success: return (False, "Invalid address: " + destaddr + "," + errmsg) - schedule.append((mixdepth, amount, makercount, destaddr, waittime)) + schedule.append([mixdepth, amount, makercount, destaddr, waittime]) return (True, schedule) +def get_amount_fractions(power, count): + """Get 'count' fractions following power law distn according to + parameter 'power' + """ + amount_fractions = rand_pow_array(power, count) + amount_fractions = [1.0 - x for x in amount_fractions] + return [x / sum(amount_fractions) for x in amount_fractions] + def get_tumble_schedule(options, destaddrs): """for the general intent and design of the tumbler algo, see the docs in joinmarket-org/joinmarket. @@ -62,10 +77,7 @@ def get_tumble_schedule(options, destaddrs): # amount_fraction cant be 1.0, some coins must be left over if txcount == 1: txcount = 2 - # assume that the sizes of outputs will follow a power law - amount_fractions = rand_pow_array(options['amountpower'], txcount) - amount_fractions = [1.0 - x for x in amount_fractions] - amount_fractions = [x / sum(amount_fractions) for x in amount_fractions] + amount_fractions = get_amount_fractions(options['amountpower'], txcount) # transaction times are uncorrelated # time between events in a poisson process followed exp waits = rand_exp_array(options['timelambda'], txcount) @@ -106,9 +118,63 @@ def get_tumble_schedule(options, destaddrs): [tx_list.remove(t) for t in tx_list_remove] schedule = [] for t in tx_list: - schedule.append((t['srcmixdepth'], t['amount_fraction'], - t['makercount'], t['destination'], t['wait'])) + schedule.append([t['srcmixdepth'], t['amount_fraction'], + t['makercount'], t['destination'], t['wait']]) return schedule +def tweak_tumble_schedule(options, schedule, last_completed): + """If a tx in a schedule failed for some reason, and we want + to make a best effort to complete the schedule, we can tweak + the failed entry to improve the odds of success on re-try. + Both the size/amount and the number of counterparties may have + been a cause for failure, so we change both of those where + possible. + Returns a new, altered schedule file (should continue at same index) + """ + new_schedule = copy.deepcopy(schedule) + altered = new_schedule[last_completed + 1] + #For sweeps, we'll try with a lower number of counterparties if we can. + #Note that this is usually counterproductive for non-sweeps, which fall + #back and so benefit in reliability from *higher* counterparty numbers. + if altered[1] == 0: + new_n_cp = altered[2] - 1 + if new_n_cp < jm_single().config.getint("POLICY", "minimum_makers"): + new_n_cp = jm_single().config.getint("POLICY", "minimum_makers") + altered[2] = new_n_cp + if not altered[1] == 0: + #For non-sweeps, there's a fractional amount (tumbler). + #Increasing or decreasing the amount could improve the odds of success, + #since it depends on liquidity and minsizes, so we tweak in both + #directions randomly. + #Strategy: + #1. calculate the total percentage remaining in the mixdepth. + #2. calculate the number remaining incl. sweep. + #3. Re-use 'getamountfracs' algo for this reduced number, then scale it + #to the number remaining. + #4. As before, reset the final to '0' for sweep. + #find the number of entries remaining, not including the final sweep, + #for this mixdepth: + + #First get all sched entries for this mixdepth + this_mixdepth_entries = [s for s in new_schedule if s[0] == altered[0]] + already_done = this_mixdepth_entries[:this_mixdepth_entries.index(altered)] + tobedone = this_mixdepth_entries[this_mixdepth_entries.index(altered):] + + #find total frac left to be spent + alreadyspent = sum([x[1] for x in already_done]) + tobespent = 1.0 - alreadyspent + #power law for what's left: + new_fracs = get_amount_fractions(options['amountpower'], len(tobedone)) + #rescale; the sum must be 'tobespent': + new_fracs = [x*tobespent for x in new_fracs] + #starting from the known 'last_completed+1' index, apply these new + #fractions, with 0 at the end for sweep + for i, j in enumerate(range( + last_completed + 1, last_completed + 1 + len(tobedone))): + new_schedule[j][1] = new_fracs[i] + #reset the sweep + new_schedule[last_completed + 1 + len(tobedone) - 1][1] = 0 + return new_schedule + def schedule_to_text(schedule): return "\n".join([",".join([str(y) for y in x]) for x in schedule]) \ No newline at end of file diff --git a/jmclient/jmclient/taker.py b/jmclient/jmclient/taker.py index 357dfe1..92036a7 100644 --- a/jmclient/jmclient/taker.py +++ b/jmclient/jmclient/taker.py @@ -98,8 +98,14 @@ class Taker(object): #non-integer coinjoin amounts are treated as fractions #this is currently used by the tumbler algo if isinstance(self.cjamount, float): - mixdepthbal = self.wallet.get_balance_by_mixdepth()[self.mixdepth] - self.cjamount = int(self.cjamount * mixdepthbal) + #the mixdepth balance is fixed at the *start* of each new + #mixdepth in tumble schedules: + if self.schedule_index == 0 or si[0] != self.schedule[ + self.schedule_index - 1]: + self.mixdepthbal = self.wallet.get_balance_by_mixdepth( + )[self.mixdepth] + #reset to satoshis + self.cjamount = int(self.cjamount * self.mixdepthbal) if self.cjamount < jm_single().mincjamount: jlog.debug("Coinjoin amount too low, bringing up.") self.cjamount = jm_single().mincjamount diff --git a/jmclient/test/test_schedule.py b/jmclient/test/test_schedule.py index a250221..609f452 100644 --- a/jmclient/test/test_schedule.py +++ b/jmclient/test/test_schedule.py @@ -3,7 +3,8 @@ from __future__ import absolute_import '''test schedule module.''' import pytest -from jmclient import (get_schedule, get_tumble_schedule, load_program_config) +from jmclient import (get_schedule, get_tumble_schedule, + tweak_tumble_schedule, load_program_config) import os valids = """#sample for testing @@ -89,8 +90,55 @@ def test_tumble_schedule(destaddrs, txcparams, mixdepthcount): schedule = get_tumble_schedule(options, destaddrs) dests = [x[3] for x in schedule] assert set(destaddrs).issubset(set(dests)) - - - - \ No newline at end of file +@pytest.mark.parametrize( + "destaddrs, txcparams, mixdepthcount, lastcompleted, makercountrange", + [ + (["mzzAYbtPpANxpNVGCVBAhZYzrxyZtoix7i", + "mifCWfmygxKhsP3qM3HZi3ZjBEJu7m39h8", + "mnTn9KVQQT9zy9R4E2ZGzWPK4EfcEcV9Y5"], (6,0), 5, 17, (6,0)), + #edge case: very first transaction + (["mzzAYbtPpANxpNVGCVBAhZYzrxyZtoix7i", + "mifCWfmygxKhsP3qM3HZi3ZjBEJu7m39h8", + "mnTn9KVQQT9zy9R4E2ZGzWPK4EfcEcV9Y5"], (3,0), 4, -1, (6,0)), + #edge case: hit minimum_makers limit + (["mzzAYbtPpANxpNVGCVBAhZYzrxyZtoix7i", + "mifCWfmygxKhsP3qM3HZi3ZjBEJu7m39h8", + "mnTn9KVQQT9zy9R4E2ZGzWPK4EfcEcV9Y5"], (3,0), 4, -1, (2,0)), + #edge case: it's a sweep + (["mzzAYbtPpANxpNVGCVBAhZYzrxyZtoix7i", + "mifCWfmygxKhsP3qM3HZi3ZjBEJu7m39h8", + "mnTn9KVQQT9zy9R4E2ZGzWPK4EfcEcV9Y5"], (3,0), 4, 1, (5,0)), + #mid-run case in 2nd mixdepth + (["mzzAYbtPpANxpNVGCVBAhZYzrxyZtoix7i", + "mifCWfmygxKhsP3qM3HZi3ZjBEJu7m39h8", + "mnTn9KVQQT9zy9R4E2ZGzWPK4EfcEcV9Y5"], (6,0), 4, 7, (5,0)), + #sanity check, typical parameters + (["mzzAYbtPpANxpNVGCVBAhZYzrxyZtoix7i", + "mifCWfmygxKhsP3qM3HZi3ZjBEJu7m39h8", + "mnTn9KVQQT9zy9R4E2ZGzWPK4EfcEcV9Y5"], (4,1), 4, 8, (6,1)), + ]) +def test_tumble_tweak(destaddrs, txcparams, mixdepthcount, lastcompleted, + makercountrange): + load_program_config() + options = get_options() + options['mixdepthcount'] = mixdepthcount + options['txcountparams'] = txcparams + options['makercountrange'] = makercountrange + schedule = get_tumble_schedule(options, destaddrs) + dests = [x[3] for x in schedule] + assert set(destaddrs).issubset(set(dests)) + new_schedule = tweak_tumble_schedule(options, schedule, lastcompleted) + #sanity check: each amount fraction list should add up to near 1.0, + #so some is left over for sweep + for i in range(mixdepthcount): + entries = [x for x in new_schedule if x[0] == i] + total_frac_for_mixdepth = sum([x[1] for x in entries]) + #TODO spurious failure is possible here, not an ideal check + print('got total frac for mixdepth: ', str(total_frac_for_mixdepth)) + assert total_frac_for_mixdepth < 0.999 + from pprint import pformat + print("here is the new schedule: ") + print(pformat(new_schedule)) + print("and old:") + print(pformat(schedule)) diff --git a/jmclient/test/test_taker.py b/jmclient/test/test_taker.py index 5679afb..07431be 100644 --- a/jmclient/test/test_taker.py +++ b/jmclient/test/test_taker.py @@ -70,7 +70,7 @@ def get_taker(schedule=None, schedule_len=0, sign_method=None, on_finished=None, filter_orders=None): if not schedule: #note, for taker.initalize() this will result in junk - schedule = [('a', 'b', 'c', 'd', 'e')]*schedule_len + schedule = [['a', 'b', 'c', 'd', 'e']]*schedule_len print("Using schedule: " + str(schedule)) on_finished_callback = on_finished if on_finished else taker_finished filter_orders_callback = filter_orders if filter_orders else dummy_filter_orderbook @@ -83,11 +83,11 @@ def test_filter_rejection(createcmtdata): print("calling filter orders rejection") return False taker = get_taker(filter_orders=filter_orders_reject) - taker.schedule = [(0, 20000000, 3, "mnsquzxrHXpFsZeL42qwbKdCP2y1esN3qw", 0)] + taker.schedule = [[0, 20000000, 3, "mnsquzxrHXpFsZeL42qwbKdCP2y1esN3qw", 0]] res = taker.initialize(t_orderbook) assert not res[0] taker = get_taker(filter_orders=filter_orders_reject) - taker.schedule = [(0, 0, 3, "mnsquzxrHXpFsZeL42qwbKdCP2y1esN3qw", 0)] + taker.schedule = [[0, 0, 3, "mnsquzxrHXpFsZeL42qwbKdCP2y1esN3qw", 0]] res = taker.initialize(t_orderbook) assert not res[0] diff --git a/scripts/joinmarket-qt.py b/scripts/joinmarket-qt.py index 953208a..2cb3a4f 100644 --- a/scripts/joinmarket-qt.py +++ b/scripts/joinmarket-qt.py @@ -505,7 +505,7 @@ class SpendTab(QWidget): makercount = int(self.widgets[1][1].text()) mixdepth = int(self.widgets[2][1].text()) #note 'amount' is integer, so not interpreted as fraction - self.taker_schedule = [(mixdepth, amount, makercount, destaddr, 0)] + self.taker_schedule = [[mixdepth, amount, makercount, destaddr, 0]] else: assert self.loaded_schedule self.taker_schedule = self.loaded_schedule diff --git a/scripts/sendpayment.py b/scripts/sendpayment.py index 53504a1..3c7428d 100644 --- a/scripts/sendpayment.py +++ b/scripts/sendpayment.py @@ -107,7 +107,7 @@ def main(): if not addr_valid: print('ERROR: Address invalid. ' + errormsg) return - schedule = [(options.mixdepth, amount, options.makercount, destaddr)] + schedule = [[options.mixdepth, amount, options.makercount, destaddr]] else: result, schedule = get_schedule(options.schedule) if not result: diff --git a/scripts/tumbler.py b/scripts/tumbler.py index 2afe794..41b1fa8 100644 --- a/scripts/tumbler.py +++ b/scripts/tumbler.py @@ -9,18 +9,31 @@ import time import os import pprint import copy +import logging from jmclient import (Taker, load_program_config, get_schedule, weighted_order_choose, JMTakerClientProtocolFactory, start_reactor, validate_address, jm_single, WalletError, Wallet, sync_wallet, get_tumble_schedule, - RegtestBitcoinCoreInterface, estimate_tx_fee) + RegtestBitcoinCoreInterface, estimate_tx_fee, + tweak_tumble_schedule) from jmbase.support import get_log, debug_dump_object, get_password from cli_options import get_tumbler_parser log = get_log() def main(): + tumble_log = logging.getLogger('tumbler') + tumble_log.setLevel(logging.DEBUG) + logFormatter = logging.Formatter( + ('%(asctime)s %(message)s')) + logsdir = os.path.join(os.path.dirname( + jm_single().config_location), "logs") + fileHandler = logging.FileHandler( + logsdir + '/TUMBLE.log') + fileHandler.setFormatter(logFormatter) + tumble_log.addHandler(fileHandler) + (options, args) = get_tumbler_parser().parse_args() options = vars(options) @@ -56,7 +69,9 @@ def main(): schedule = get_tumble_schedule(options, destaddrs) print("got schedule:") print(pprint.pformat(schedule)) - + tumble_log.info("TUMBLE STARTING") + tumble_log.info("With this schedule: ") + tumble_log.info(pprint.pformat(schedule)) #callback for order checking; dummy/passthrough def filter_orders_callback(orders_fees, cjamount): return True @@ -64,14 +79,32 @@ def main(): def taker_finished(res, fromtx=False, waittime=0.0): if fromtx: if res: + tumble_log.info("Completed successfully.") + tumble_log.info("We sent: " + str(taker.cjamount) + \ + " satoshis to address: " + taker.my_cj_addr + \ + " from mixdepth: " + \ + str(taker.schedule[taker.schedule_index][0])) + waiting_message = "Waiting for: " + str(waittime) + " seconds." + tumble_log.info(waiting_message) sync_wallet(wallet, fast=options['fastsync']) - log.info("Waiting for: " + str(waittime) + " seconds.") + log.info(waiting_message) reactor.callLater(waittime, clientfactory.getClient().clientStart) else: #a transaction failed; tumbler is aggressive in trying to - #complete, so restart processing from the failed schedule entry: - clientfactory.getClient().taker.schedule_index -= 1 - log.info("Transaction failed after timeout, trying again") + #complete; we tweak the schedule from this point in the mixdepth, + #then try again: + tumble_log.info("Transaction attempt failed, tweaking schedule" + " and trying again.") + tumble_log.info("The paramaters of the failed attempt: ") + tumble_log.info(str(taker.schedule[taker.schedule_index])) + log.info("Schedule entry: " + str( + taker.schedule[taker.schedule_index]) + \ + " failed after timeout, trying again") + taker.schedule_index -= 1 + taker.schedule = tweak_tumble_schedule(options, taker.schedule, + taker.schedule_index) + tumble_log.info("We tweaked the schedule, the new schedule is:") + tumble_log.info(pprint.pformat(taker.schedule)) reactor.callLater(0, clientfactory.getClient().clientStart) else: if not res: