You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

363 lines
11 KiB

import logging, sys
import binascii
import random
import socket
from getpass import getpass
from os import path, environ
from functools import wraps
from optparse import IndentedHelpFormatter
from sqlite3 import Cursor, Row
from typing import List
import urllib.parse as urlparse
# JoinMarket version
JM_CORE_VERSION = '0.9.11dev'
# global Joinmarket constants
JM_WALLET_NAME_PREFIX = "joinmarket-wallet-"
JM_APP_NAME = "joinmarket"
# Exit status codes
EXIT_SUCCESS = 0
EXIT_FAILURE = 1
EXIT_ARGERROR = 2
# optparse munges description paragraphs. We sometimes
# don't want that.
class IndentedHelpFormatterWithNL(IndentedHelpFormatter):
def format_description(self, description):
return description
from chromalog.log import (
ColorizingStreamHandler,
ColorizingFormatter,
)
from chromalog.colorizer import GenericColorizer, MonochromaticColorizer
from colorama import Fore, Back, Style
# magic; importing e.g. 'info' actually instantiates
# that as a function that uses the color map
# defined below. ( noqa because flake doesn't understand)
from chromalog.mark.helpers.simple import ( # noqa: F401
debug,
info,
important,
success,
warning,
error,
critical,
)
# our chosen colorings for log messages in JM:
jm_color_map = {
'debug': (Style.DIM + Fore.LIGHTBLUE_EX, Style.RESET_ALL),
'info': (Style.BRIGHT + Fore.BLUE, Style.RESET_ALL),
'important': (Style.BRIGHT, Style.RESET_ALL),
'success': (Fore.GREEN, Style.RESET_ALL),
'warning': (Fore.YELLOW, Style.RESET_ALL),
'error': (Fore.RED, Style.RESET_ALL),
'critical': (Back.RED, Style.RESET_ALL),
}
class JMColorizer(GenericColorizer):
default_color_map = jm_color_map
jm_colorizer = JMColorizer()
logFormatter = ColorizingFormatter(
"%(asctime)s [%(levelname)s] %(message)s")
log = logging.getLogger('joinmarket')
log.setLevel(logging.DEBUG)
joinmarket_alert = ['']
core_alert = ['']
debug_silence = [False]
class JoinMarketStreamHandler(ColorizingStreamHandler):
def __init__(self):
if sys.stdout.isatty():
super().__init__(colorizer=jm_colorizer, stream=sys.stdout)
else:
super().__init__(colorizer=MonochromaticColorizer,
stream=sys.stdout)
def emit(self, record):
if joinmarket_alert[0]:
print('JoinMarket Alert Message: ' + joinmarket_alert[0])
if core_alert[0]:
print('Core Alert Message: ' + core_alert[0])
if not debug_silence[0]:
super().emit(record)
handler = JoinMarketStreamHandler()
handler.setFormatter(logFormatter)
log.addHandler(handler)
# hex/binary conversion routines used by dependent packages
def hextobin(h):
"""Convert a hex string to bytes"""
return binascii.unhexlify(h.encode('utf8'))
def bintohex(b):
"""Convert bytes to a hex string"""
return binascii.hexlify(b).decode('utf8')
def lehextobin(h):
"""Convert a little-endian hex string to bytes
Lets you write uint256's and uint160's the way the Satoshi codebase shows
them.
"""
return binascii.unhexlify(h.encode('utf8'))[::-1]
def bintolehex(b):
"""Convert bytes to a little-endian hex string
Lets you show uint256's and uint160's the way the Satoshi codebase shows
them.
"""
return binascii.hexlify(b[::-1]).decode('utf8')
def utxostr_to_utxo(x):
if not isinstance(x, str):
return (False, "not a string")
y = x.split(":")
if len(y) != 2:
return (False,
"string is not two items separated by :")
try:
n = int(y[1])
except:
return (False, "utxo index was not an integer.")
if n < 0:
return (False, "utxo index must not be negative.")
if len(y[0]) != 64:
return (False, "txid is not 64 hex characters.")
try:
txid = binascii.unhexlify(y[0])
except:
return (False, "txid is not hex.")
return (True, (txid, n))
def utxo_to_utxostr(u):
if not isinstance(u, tuple):
return (False, "utxo is not a tuple.")
if not len(u) == 2:
return (False, "utxo should have two elements.")
if not isinstance(u[0], bytes):
return (False, "txid should be bytes.")
if not isinstance(u[1], int):
return (False, "index should be int.")
if u[1] < 0:
return (False, "index must be a positive integer.")
if not len(u[0]) == 32:
return (False, "txid must be 32 bytes.")
txid = binascii.hexlify(u[0]).decode("ascii")
return (True, txid + ":" + str(u[1]))
def jmprint(msg, level="info"):
""" Provides the ability to print messages
with consistent formatting, outside the logging system
(in case you don't want the standard log format).
Example applications are: REPL style stuff, and/or
some very important / user workflow affecting communication.
Note that this exclusively for console printout, NOT for
logging to file (chromalog will handle file streams
properly, but this will not).
"""
if not level in jm_color_map.keys():
raise Exception("Unsupported formatting")
if sys.stdout.isatty():
# .colorize_message function does a .format() on the string,
# which does not work with string-ified json; this should
# result in output as intended:
msg = msg.replace('{', '{{')
msg = msg.replace('}', '}}')
fmtfn = eval(level)
fmtd_msg = fmtfn(msg)
if sys.stdout.isatty():
print(jm_colorizer.colorize_message(fmtd_msg))
else:
print(fmtd_msg)
def get_log():
"""
provides joinmarket logging instance
:return: log instance
"""
return log
def set_logging_level(level):
handler.setLevel(level)
def set_logging_color(colored=False):
if colored and sys.stdout.isatty():
handler.colorizer = jm_colorizer
else:
handler.colorizer = MonochromaticColorizer()
def chunks(d, n):
return [d[x:x + n] for x in range(0, len(d), n)]
def get_password(msg): #pragma: no cover
password = getpass(msg)
if not isinstance(password, bytes):
password = password.encode('utf-8')
return password
def lookup_appdata_folder(appname):
""" Given an appname as a string,
return the correct directory for storing
data for the given OS environment.
"""
if sys.platform == 'darwin':
if "HOME" in environ:
data_folder = path.join(environ["HOME"],
"Library/Application support/",
appname) + '/'
else:
jmprint("Could not find home folder")
sys.exit(EXIT_FAILURE)
elif 'win32' in sys.platform or 'win64' in sys.platform:
data_folder = path.join(environ['APPDATA'], appname) + '\\'
else:
data_folder = path.expanduser(path.join("~", "." + appname + "/"))
return data_folder
def get_jm_version_str():
return "JoinMarket " + JM_CORE_VERSION
def print_jm_version(option, opt_str, value, parser):
print(get_jm_version_str())
sys.exit(EXIT_SUCCESS)
# helper functions for conversions of format between over-the-wire JM
# and internal. See details in hexbin() docstring.
def _convert(x):
good, utxo = utxostr_to_utxo(x)
if good:
return utxo
else:
try:
b = hextobin(x)
return b
except:
return x
def listchanger(l):
rlist = []
for x in l:
if isinstance(x, list):
rlist.append(listchanger(x))
elif isinstance(x, dict):
rlist.append(dictchanger(x))
else:
rlist.append(_convert(x))
return rlist
def dictchanger(d):
rdict = {}
for k, v in d.items():
if isinstance(v, dict):
rdict[_convert(k)] = dictchanger(v)
elif isinstance(v, list):
rdict[_convert(k)] = listchanger(v)
else:
rdict[_convert(k)] = _convert(v)
return rdict
def hexbin(func):
""" Decorator for functions of taker and maker receiving over
the wire AMP arguments that may be in hex or hextxid:n format
and converting all to binary.
Functions to which this decorator applies should have all arguments
be one of:
- hex string (keys), converted here to binary
- lists of keys or txid:n strings (converted here to binary, or
(txidbytes, n))
- lists of lists or dicts, to which these rules apply recursively.
- any other string (unchanged)
- dicts with keys as per above; values are altered recursively according
to the rules above.
"""
@wraps(func)
def func_wrapper(inst, *args, **kwargs):
newargs = []
for arg in args:
if isinstance(arg, (list, tuple)):
newargs.append(listchanger(arg))
elif isinstance(arg, dict):
newargs.append(dictchanger(arg))
else:
newargs.append(_convert(arg))
return func(inst, *newargs, **kwargs)
return func_wrapper
def wrapped_urlparse(url):
""" This wrapper is unfortunately necessary as there appears
to be a bug in the urlparse handling of *.onion strings:
If http:// is prepended, the url parses correctly, but if it
is not, the .hostname property is erroneously None.
"""
if isinstance(url, str):
a, b = (".onion", "http://")
else:
a, b = (b".onion", b"http://")
if url.endswith(a) and not url.startswith(b):
url = b + url
return urlparse.urlparse(url)
def bdict_sdict_convert(d, output_binary=False):
""" Useful for converting dicts from url parameter sets
to a form that can be handled by json dumps/loads.
This code only works if *all* keys in the dict
are binary strings, and all values are lists of same.
This code could be extended if needed.
If output_binary is True, the reverse operation is performed.
"""
newd = {}
for k, v in d.items():
if output_binary:
newv = [a.encode("utf-8") for a in v]
newd[k.encode("utf-8")] = newv
else:
newv = [a.decode("utf-8") for a in v]
newd[k.decode("utf-8")] = newv
return newd
def random_insert(old, new):
""" Insert elements of new at random indices in
the old list, without changing the ordering of the old list.
"""
for n in new:
insertion_index = random.randint(0, len(old))
old[:] = old[:insertion_index] + [n] + old[insertion_index:]
def get_free_tcp_ports(num_ports: int) -> List[int]:
""" Get first free TCP ports you can bind to on localhost.
"""
sockets = []
ports = []
for i in range(num_ports):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(("127.0.0.1", 0))
s.listen(1)
ports.append(s.getsockname()[1])
sockets.append(s)
for s in sockets:
s.close()
return ports
def dict_factory(cursor: Cursor, row: Row) -> dict:
fields = [column[0] for column in cursor.description]
return {key: value for key, value in zip(fields, row)}