@ -1,21 +1,20 @@
import atexit
import io
import logging
import os
import re
import socket
import sys
import subprocess
import atexit
from signal import SIGINT
import sys
from configparser import ConfigParser , NoOptionError
from signal import SIGINT
from typing import Any , List , Optional , Tuple
import jmbitcoin as btc
from jmclient . jsonrpc import JsonRpc
from jmbase . support import ( get_log , joinmarket_alert , core_alert , debug_silence ,
set_logging_level , jmprint , set_logging_color ,
JM_APP_NAME , lookup_appdata_folder , EXIT_FAILURE )
from jmclient . jsonrpc import JsonRpc
from jmclient . podle import set_commitment_file
log = get_log ( )
@ -83,7 +82,7 @@ global_singleton.commit_file_location = 'cmtdata/commitments.json'
global_singleton . wait_for_commitments = 0
def jm_single ( ) :
def jm_single ( ) - > AttributeDict :
return global_singleton
# FIXME: Add rpc_* options here in the future!
@ -535,13 +534,13 @@ polling_interval_minutes = 60
#This allows use of the jmclient package with a
#configuration set by an external caller; not to be used
#in conjuction with calls to load_program_config.
def set_config ( cfg , bcint = None ) :
def set_config ( cfg : ConfigParser , bcint = None ) - > None :
global_singleton . config = cfg
if bcint :
global_singleton . bc_interface = bcint
def get_mchannels ( mode = " TAKER " ) :
def get_mchannels ( mode : str = " TAKER " ) - > list :
SECTION_NAME = ' MESSAGING '
# FIXME: remove in future release
if jm_single ( ) . config . has_section ( SECTION_NAME ) :
@ -612,7 +611,7 @@ def get_mchannels(mode="TAKER"):
assert len ( onion_sections ) < 2
return irc_sections + onion_sections
def _get_irc_mchannels_old ( ) :
def _get_irc_mchannels_old ( ) - > list :
fields = [ ( " host " , str ) , ( " port " , int ) , ( " channel " , str ) , ( " usessl " , str ) ,
( " socks5 " , str ) , ( " socks5_host " , str ) , ( " socks5_port " , str ) ]
configdata = { }
@ -635,11 +634,11 @@ class JMPluginService(object):
any additional service ( such as SNICKER ) .
For now only covers logging .
"""
def __init__ ( self , name , requires_logging = True ) :
def __init__ ( self , name : str , requires_logging : bool = True ) - > None :
self . name = name
self . requires_logging = requires_logging
def start_plugin_logging ( self , wallet ) :
def start_plugin_logging ( self , wallet : str ) - > None :
""" This requires the name of the active wallet
to set the logfile ; TODO other plugin services may
need a different setup .
@ -649,10 +648,10 @@ class JMPluginService(object):
self . wallet . get_wallet_name ( ) )
self . start_logging ( )
def set_log_dir ( self , logdirname ) :
def set_log_dir ( self , logdirname : str ) - > None :
self . logdirname = logdirname
def start_logging ( self ) :
def start_logging ( self ) - > None :
logFormatter = logging . Formatter (
( ' %(asctime)s [ %(levelname)-5.5s ] {} - %(message)s ' . format (
self . name ) ) )
@ -661,11 +660,11 @@ class JMPluginService(object):
fileHandler . setFormatter ( logFormatter )
get_log ( ) . addHandler ( fileHandler )
def get_network ( ) :
def get_network ( ) - > str :
""" Returns network name """
return global_singleton . config . get ( " BLOCKCHAIN " , " network " )
def validate_address ( addr ) :
def validate_address ( addr : str ) - > Tuple [ bool , str ] :
try :
# automatically respects the network
# as set in btc.select_chain_params(...)
@ -682,23 +681,24 @@ def validate_address(addr):
_BURN_DESTINATION = " BURN "
def is_burn_destination ( destination ) :
def is_burn_destination ( destination : str ) - > bool :
return destination == _BURN_DESTINATION
def get_interest_rate ( ) :
def get_interest_rate ( ) - > float :
return float ( global_singleton . config . get ( ' POLICY ' , ' interest_rate ' ,
fallback = _DEFAULT_INTEREST_RATE ) )
def get_bondless_makers_allowance ( ) :
def get_bondless_makers_allowance ( ) - > float :
return float ( global_singleton . config . get ( ' POLICY ' , ' bondless_makers_allowance ' ,
fallback = _DEFAULT_BONDLESS_MAKERS_ALLOWANCE ) )
def remove_unwanted_default_settings ( config ) :
def _ remove_unwanted_default_settings( config : ConfigParser ) - > None :
for section in config . sections ( ) :
if section . startswith ( ' MESSAGING: ' ) :
config . remove_section ( section )
def load_program_config ( config_path = " " , bs = None , plugin_services = [ ] ) :
def load_program_config ( config_path : str = " " , bs : Optional [ str ] = None ,
plugin_services : List [ JMPluginService ] = [ ] ) - > None :
global_singleton . config . readfp ( io . StringIO ( defaultconfig ) )
if not config_path :
config_path = lookup_appdata_folder ( global_singleton . APPNAME )
@ -718,7 +718,7 @@ def load_program_config(config_path="", bs=None, plugin_services=[]):
global_singleton . config_location = os . path . join (
global_singleton . datadir , global_singleton . config_location )
remove_unwanted_default_settings ( global_singleton . config )
_ remove_unwanted_default_settings( global_singleton . config )
try :
loadedFiles = global_singleton . config . read (
[ global_singleton . config_location ] )
@ -810,12 +810,12 @@ def load_program_config(config_path="", bs=None, plugin_services=[]):
os . makedirs ( plogsdir )
p . set_log_dir ( plogsdir )
def gracefully_kill_subprocess ( p ) :
def gracefully_kill_subprocess ( p ) - > None :
# See https://stackoverflow.com/questions/43274476/is-there-a-way-to-check-if-a-subprocess-is-still-running
if p . poll ( ) is None :
p . send_signal ( SIGINT )
def check_and_start_tor ( ) :
def check_and_start_tor ( ) - > None :
sock = socket . socket ( socket . AF_INET , socket . SOCK_STREAM )
result = sock . connect_ex ( ( " 127.0.0.1 " , 9050 ) )
sock . close ( )
@ -838,7 +838,7 @@ def check_and_start_tor():
atexit . register ( gracefully_kill_subprocess , tor_subprocess )
log . debug ( " Started Tor subprocess with pid " + str ( tor_subprocess . pid ) )
def load_test_config ( * * kwargs ) :
def load_test_config ( * * kwargs ) - > None :
if " config_path " not in kwargs :
load_program_config ( config_path = " . " , * * kwargs )
else :
@ -847,7 +847,7 @@ def load_test_config(**kwargs):
##########################################################
## Returns a tuple (rpc_user: String, rpc_pass: String) ##
##########################################################
def get_bitcoin_rpc_credentials ( _config ) :
def _ get_bitcoin_rpc_credentials( _config : ConfigParser ) - > Tuple [ str , str ] :
filepath = None
try :
@ -868,7 +868,7 @@ def get_bitcoin_rpc_credentials(_config):
raise ValueError ( " Invalid RPC auth credentials `rpc_user` and `rpc_password` " )
return rpc_user , rpc_password
def get_blockchain_interface_instance ( _config ) :
def get_blockchain_interface_instance ( _config : ConfigParser ) :
# todo: refactor joinmarket module to get rid of loops
# importing here is necessary to avoid import loops
from jmclient . blockchaininterface import BitcoinCoreInterface , \
@ -892,7 +892,7 @@ def get_blockchain_interface_instance(_config):
rpc_port = 38332
else :
raise ValueError ( ' wrong network configured: ' + network )
rpc_user , rpc_password = get_bitcoin_rpc_credentials ( _config )
rpc_user , rpc_password = _ get_bitcoin_rpc_credentials( _config )
rpc_wallet_file = _config . get ( " BLOCKCHAIN " , " rpc_wallet_file " )
rpc = JsonRpc ( rpc_host , rpc_port , rpc_user , rpc_password )
if source == ' bitcoin-rpc ' : #pragma: no cover
@ -923,7 +923,7 @@ def get_blockchain_interface_instance(_config):
raise ValueError ( " Invalid blockchain source " )
return bc_interface
def update_persist_config ( section , name , value ) :
def update_persist_config ( section : str , name : str , value : Any ) - > bool :
""" Unfortunately we cannot persist an updated config
while preserving the full set of comments with ConfigParser ' s
model ( the ' set no-value settings ' doesn ' t cut it).
@ -977,21 +977,21 @@ def update_persist_config(section, name, value):
f . writelines ( [ x . encode ( " utf-8 " ) for x in newlines ] )
return True
def is_segwit_mode ( ) :
def is_segwit_mode ( ) - > bool :
return jm_single ( ) . config . get ( ' POLICY ' , ' segwit ' ) != ' false '
def is_native_segwit_mode ( ) :
def is_native_segwit_mode ( ) - > bool :
if not is_segwit_mode ( ) :
return False
return jm_single ( ) . config . get ( ' POLICY ' , ' native ' ) != ' false '
def process_shutdown ( mode = " command-line " ) :
def process_shutdown ( mode : str = " command-line " ) - > None :
if mode == " command-line " :
from twisted . internet import reactor
for dc in reactor . getDelayedCalls ( ) :
dc . cancel ( )
reactor . stop ( )
def process_startup ( ) :
def process_startup ( ) - > None :
from twisted . internet import reactor
reactor . run ( )