Browse Source

Tweak tumble schedule on failure to retry more effectively.

Separate tumbler log.
Also change schedules to lists from tuples for modification.
master
Adam Gibson 10 years ago
parent
commit
ad0f407d0c
No known key found for this signature in database
GPG Key ID: B3AE09F1E9A3197A
  1. 3
      jmclient/jmclient/__init__.py
  2. 84
      jmclient/jmclient/schedule.py
  3. 10
      jmclient/jmclient/taker.py
  4. 58
      jmclient/test/test_schedule.py
  5. 6
      jmclient/test/test_taker.py
  6. 2
      scripts/joinmarket-qt.py
  7. 2
      scripts/sendpayment.py
  8. 45
      scripts/tumbler.py

3
jmclient/jmclient/__init__.py

@ -30,7 +30,8 @@ from .podle import (set_commitment_file, get_commitment_file,
generate_podle_error_string, add_external_commitments,
PoDLE, generate_podle, get_podle_commitments,
update_commitments)
from .schedule import get_schedule, get_tumble_schedule, schedule_to_text
from .schedule import (get_schedule, get_tumble_schedule, schedule_to_text,
tweak_tumble_schedule)
from .commitment_utils import get_utxo_info, validate_utxo_data, quit
# Set default logging handler to avoid "No handler found" warnings.

84
jmclient/jmclient/schedule.py

@ -1,7 +1,9 @@
#!/usr/bin/env python
from __future__ import print_function
import copy
from pprint import pformat
from jmclient import (validate_address, rand_exp_array,
rand_norm_array, rand_pow_array)
rand_norm_array, rand_pow_array, jm_single)
"""Utility functions for dealing with Taker schedules.
- get_schedule(filename):
@ -24,7 +26,12 @@ def get_schedule(filename):
return (False, "Failed to parse schedule line: " + sl)
try:
mixdepth = int(mixdepth)
amount = int(amount)
#TODO this isn't the right way, but floats must be allowed
#for any persisted tumbler-style schedule
if "." in amount:
amount = float(amount)
else:
amount = int(amount)
makercount = int(makercount)
destaddr = destaddr.strip()
waittime = float(waittime)
@ -34,9 +41,17 @@ def get_schedule(filename):
success, errmsg = validate_address(destaddr)
if not success:
return (False, "Invalid address: " + destaddr + "," + errmsg)
schedule.append((mixdepth, amount, makercount, destaddr, waittime))
schedule.append([mixdepth, amount, makercount, destaddr, waittime])
return (True, schedule)
def get_amount_fractions(power, count):
"""Get 'count' fractions following power law distn according to
parameter 'power'
"""
amount_fractions = rand_pow_array(power, count)
amount_fractions = [1.0 - x for x in amount_fractions]
return [x / sum(amount_fractions) for x in amount_fractions]
def get_tumble_schedule(options, destaddrs):
"""for the general intent and design of the tumbler algo, see the docs in
joinmarket-org/joinmarket.
@ -62,10 +77,7 @@ def get_tumble_schedule(options, destaddrs):
# amount_fraction cant be 1.0, some coins must be left over
if txcount == 1:
txcount = 2
# assume that the sizes of outputs will follow a power law
amount_fractions = rand_pow_array(options['amountpower'], txcount)
amount_fractions = [1.0 - x for x in amount_fractions]
amount_fractions = [x / sum(amount_fractions) for x in amount_fractions]
amount_fractions = get_amount_fractions(options['amountpower'], txcount)
# transaction times are uncorrelated
# time between events in a poisson process followed exp
waits = rand_exp_array(options['timelambda'], txcount)
@ -106,9 +118,63 @@ def get_tumble_schedule(options, destaddrs):
[tx_list.remove(t) for t in tx_list_remove]
schedule = []
for t in tx_list:
schedule.append((t['srcmixdepth'], t['amount_fraction'],
t['makercount'], t['destination'], t['wait']))
schedule.append([t['srcmixdepth'], t['amount_fraction'],
t['makercount'], t['destination'], t['wait']])
return schedule
def tweak_tumble_schedule(options, schedule, last_completed):
"""If a tx in a schedule failed for some reason, and we want
to make a best effort to complete the schedule, we can tweak
the failed entry to improve the odds of success on re-try.
Both the size/amount and the number of counterparties may have
been a cause for failure, so we change both of those where
possible.
Returns a new, altered schedule file (should continue at same index)
"""
new_schedule = copy.deepcopy(schedule)
altered = new_schedule[last_completed + 1]
#For sweeps, we'll try with a lower number of counterparties if we can.
#Note that this is usually counterproductive for non-sweeps, which fall
#back and so benefit in reliability from *higher* counterparty numbers.
if altered[1] == 0:
new_n_cp = altered[2] - 1
if new_n_cp < jm_single().config.getint("POLICY", "minimum_makers"):
new_n_cp = jm_single().config.getint("POLICY", "minimum_makers")
altered[2] = new_n_cp
if not altered[1] == 0:
#For non-sweeps, there's a fractional amount (tumbler).
#Increasing or decreasing the amount could improve the odds of success,
#since it depends on liquidity and minsizes, so we tweak in both
#directions randomly.
#Strategy:
#1. calculate the total percentage remaining in the mixdepth.
#2. calculate the number remaining incl. sweep.
#3. Re-use 'getamountfracs' algo for this reduced number, then scale it
#to the number remaining.
#4. As before, reset the final to '0' for sweep.
#find the number of entries remaining, not including the final sweep,
#for this mixdepth:
#First get all sched entries for this mixdepth
this_mixdepth_entries = [s for s in new_schedule if s[0] == altered[0]]
already_done = this_mixdepth_entries[:this_mixdepth_entries.index(altered)]
tobedone = this_mixdepth_entries[this_mixdepth_entries.index(altered):]
#find total frac left to be spent
alreadyspent = sum([x[1] for x in already_done])
tobespent = 1.0 - alreadyspent
#power law for what's left:
new_fracs = get_amount_fractions(options['amountpower'], len(tobedone))
#rescale; the sum must be 'tobespent':
new_fracs = [x*tobespent for x in new_fracs]
#starting from the known 'last_completed+1' index, apply these new
#fractions, with 0 at the end for sweep
for i, j in enumerate(range(
last_completed + 1, last_completed + 1 + len(tobedone))):
new_schedule[j][1] = new_fracs[i]
#reset the sweep
new_schedule[last_completed + 1 + len(tobedone) - 1][1] = 0
return new_schedule
def schedule_to_text(schedule):
return "\n".join([",".join([str(y) for y in x]) for x in schedule])

10
jmclient/jmclient/taker.py

@ -98,8 +98,14 @@ class Taker(object):
#non-integer coinjoin amounts are treated as fractions
#this is currently used by the tumbler algo
if isinstance(self.cjamount, float):
mixdepthbal = self.wallet.get_balance_by_mixdepth()[self.mixdepth]
self.cjamount = int(self.cjamount * mixdepthbal)
#the mixdepth balance is fixed at the *start* of each new
#mixdepth in tumble schedules:
if self.schedule_index == 0 or si[0] != self.schedule[
self.schedule_index - 1]:
self.mixdepthbal = self.wallet.get_balance_by_mixdepth(
)[self.mixdepth]
#reset to satoshis
self.cjamount = int(self.cjamount * self.mixdepthbal)
if self.cjamount < jm_single().mincjamount:
jlog.debug("Coinjoin amount too low, bringing up.")
self.cjamount = jm_single().mincjamount

58
jmclient/test/test_schedule.py

@ -3,7 +3,8 @@ from __future__ import absolute_import
'''test schedule module.'''
import pytest
from jmclient import (get_schedule, get_tumble_schedule, load_program_config)
from jmclient import (get_schedule, get_tumble_schedule,
tweak_tumble_schedule, load_program_config)
import os
valids = """#sample for testing
@ -89,8 +90,55 @@ def test_tumble_schedule(destaddrs, txcparams, mixdepthcount):
schedule = get_tumble_schedule(options, destaddrs)
dests = [x[3] for x in schedule]
assert set(destaddrs).issubset(set(dests))
@pytest.mark.parametrize(
"destaddrs, txcparams, mixdepthcount, lastcompleted, makercountrange",
[
(["mzzAYbtPpANxpNVGCVBAhZYzrxyZtoix7i",
"mifCWfmygxKhsP3qM3HZi3ZjBEJu7m39h8",
"mnTn9KVQQT9zy9R4E2ZGzWPK4EfcEcV9Y5"], (6,0), 5, 17, (6,0)),
#edge case: very first transaction
(["mzzAYbtPpANxpNVGCVBAhZYzrxyZtoix7i",
"mifCWfmygxKhsP3qM3HZi3ZjBEJu7m39h8",
"mnTn9KVQQT9zy9R4E2ZGzWPK4EfcEcV9Y5"], (3,0), 4, -1, (6,0)),
#edge case: hit minimum_makers limit
(["mzzAYbtPpANxpNVGCVBAhZYzrxyZtoix7i",
"mifCWfmygxKhsP3qM3HZi3ZjBEJu7m39h8",
"mnTn9KVQQT9zy9R4E2ZGzWPK4EfcEcV9Y5"], (3,0), 4, -1, (2,0)),
#edge case: it's a sweep
(["mzzAYbtPpANxpNVGCVBAhZYzrxyZtoix7i",
"mifCWfmygxKhsP3qM3HZi3ZjBEJu7m39h8",
"mnTn9KVQQT9zy9R4E2ZGzWPK4EfcEcV9Y5"], (3,0), 4, 1, (5,0)),
#mid-run case in 2nd mixdepth
(["mzzAYbtPpANxpNVGCVBAhZYzrxyZtoix7i",
"mifCWfmygxKhsP3qM3HZi3ZjBEJu7m39h8",
"mnTn9KVQQT9zy9R4E2ZGzWPK4EfcEcV9Y5"], (6,0), 4, 7, (5,0)),
#sanity check, typical parameters
(["mzzAYbtPpANxpNVGCVBAhZYzrxyZtoix7i",
"mifCWfmygxKhsP3qM3HZi3ZjBEJu7m39h8",
"mnTn9KVQQT9zy9R4E2ZGzWPK4EfcEcV9Y5"], (4,1), 4, 8, (6,1)),
])
def test_tumble_tweak(destaddrs, txcparams, mixdepthcount, lastcompleted,
makercountrange):
load_program_config()
options = get_options()
options['mixdepthcount'] = mixdepthcount
options['txcountparams'] = txcparams
options['makercountrange'] = makercountrange
schedule = get_tumble_schedule(options, destaddrs)
dests = [x[3] for x in schedule]
assert set(destaddrs).issubset(set(dests))
new_schedule = tweak_tumble_schedule(options, schedule, lastcompleted)
#sanity check: each amount fraction list should add up to near 1.0,
#so some is left over for sweep
for i in range(mixdepthcount):
entries = [x for x in new_schedule if x[0] == i]
total_frac_for_mixdepth = sum([x[1] for x in entries])
#TODO spurious failure is possible here, not an ideal check
print('got total frac for mixdepth: ', str(total_frac_for_mixdepth))
assert total_frac_for_mixdepth < 0.999
from pprint import pformat
print("here is the new schedule: ")
print(pformat(new_schedule))
print("and old:")
print(pformat(schedule))

6
jmclient/test/test_taker.py

@ -70,7 +70,7 @@ def get_taker(schedule=None, schedule_len=0, sign_method=None, on_finished=None,
filter_orders=None):
if not schedule:
#note, for taker.initalize() this will result in junk
schedule = [('a', 'b', 'c', 'd', 'e')]*schedule_len
schedule = [['a', 'b', 'c', 'd', 'e']]*schedule_len
print("Using schedule: " + str(schedule))
on_finished_callback = on_finished if on_finished else taker_finished
filter_orders_callback = filter_orders if filter_orders else dummy_filter_orderbook
@ -83,11 +83,11 @@ def test_filter_rejection(createcmtdata):
print("calling filter orders rejection")
return False
taker = get_taker(filter_orders=filter_orders_reject)
taker.schedule = [(0, 20000000, 3, "mnsquzxrHXpFsZeL42qwbKdCP2y1esN3qw", 0)]
taker.schedule = [[0, 20000000, 3, "mnsquzxrHXpFsZeL42qwbKdCP2y1esN3qw", 0]]
res = taker.initialize(t_orderbook)
assert not res[0]
taker = get_taker(filter_orders=filter_orders_reject)
taker.schedule = [(0, 0, 3, "mnsquzxrHXpFsZeL42qwbKdCP2y1esN3qw", 0)]
taker.schedule = [[0, 0, 3, "mnsquzxrHXpFsZeL42qwbKdCP2y1esN3qw", 0]]
res = taker.initialize(t_orderbook)
assert not res[0]

2
scripts/joinmarket-qt.py

@ -505,7 +505,7 @@ class SpendTab(QWidget):
makercount = int(self.widgets[1][1].text())
mixdepth = int(self.widgets[2][1].text())
#note 'amount' is integer, so not interpreted as fraction
self.taker_schedule = [(mixdepth, amount, makercount, destaddr, 0)]
self.taker_schedule = [[mixdepth, amount, makercount, destaddr, 0]]
else:
assert self.loaded_schedule
self.taker_schedule = self.loaded_schedule

2
scripts/sendpayment.py

@ -107,7 +107,7 @@ def main():
if not addr_valid:
print('ERROR: Address invalid. ' + errormsg)
return
schedule = [(options.mixdepth, amount, options.makercount, destaddr)]
schedule = [[options.mixdepth, amount, options.makercount, destaddr]]
else:
result, schedule = get_schedule(options.schedule)
if not result:

45
scripts/tumbler.py

@ -9,18 +9,31 @@ import time
import os
import pprint
import copy
import logging
from jmclient import (Taker, load_program_config, get_schedule, weighted_order_choose,
JMTakerClientProtocolFactory, start_reactor,
validate_address, jm_single, WalletError,
Wallet, sync_wallet, get_tumble_schedule,
RegtestBitcoinCoreInterface, estimate_tx_fee)
RegtestBitcoinCoreInterface, estimate_tx_fee,
tweak_tumble_schedule)
from jmbase.support import get_log, debug_dump_object, get_password
from cli_options import get_tumbler_parser
log = get_log()
def main():
tumble_log = logging.getLogger('tumbler')
tumble_log.setLevel(logging.DEBUG)
logFormatter = logging.Formatter(
('%(asctime)s %(message)s'))
logsdir = os.path.join(os.path.dirname(
jm_single().config_location), "logs")
fileHandler = logging.FileHandler(
logsdir + '/TUMBLE.log')
fileHandler.setFormatter(logFormatter)
tumble_log.addHandler(fileHandler)
(options, args) = get_tumbler_parser().parse_args()
options = vars(options)
@ -56,7 +69,9 @@ def main():
schedule = get_tumble_schedule(options, destaddrs)
print("got schedule:")
print(pprint.pformat(schedule))
tumble_log.info("TUMBLE STARTING")
tumble_log.info("With this schedule: ")
tumble_log.info(pprint.pformat(schedule))
#callback for order checking; dummy/passthrough
def filter_orders_callback(orders_fees, cjamount):
return True
@ -64,14 +79,32 @@ def main():
def taker_finished(res, fromtx=False, waittime=0.0):
if fromtx:
if res:
tumble_log.info("Completed successfully.")
tumble_log.info("We sent: " + str(taker.cjamount) + \
" satoshis to address: " + taker.my_cj_addr + \
" from mixdepth: " + \
str(taker.schedule[taker.schedule_index][0]))
waiting_message = "Waiting for: " + str(waittime) + " seconds."
tumble_log.info(waiting_message)
sync_wallet(wallet, fast=options['fastsync'])
log.info("Waiting for: " + str(waittime) + " seconds.")
log.info(waiting_message)
reactor.callLater(waittime, clientfactory.getClient().clientStart)
else:
#a transaction failed; tumbler is aggressive in trying to
#complete, so restart processing from the failed schedule entry:
clientfactory.getClient().taker.schedule_index -= 1
log.info("Transaction failed after timeout, trying again")
#complete; we tweak the schedule from this point in the mixdepth,
#then try again:
tumble_log.info("Transaction attempt failed, tweaking schedule"
" and trying again.")
tumble_log.info("The paramaters of the failed attempt: ")
tumble_log.info(str(taker.schedule[taker.schedule_index]))
log.info("Schedule entry: " + str(
taker.schedule[taker.schedule_index]) + \
" failed after timeout, trying again")
taker.schedule_index -= 1
taker.schedule = tweak_tumble_schedule(options, taker.schedule,
taker.schedule_index)
tumble_log.info("We tweaked the schedule, the new schedule is:")
tumble_log.info(pprint.pformat(taker.schedule))
reactor.callLater(0, clientfactory.getClient().clientStart)
else:
if not res:

Loading…
Cancel
Save