diff --git a/jmclient/jmclient/configure.py b/jmclient/jmclient/configure.py index cf5f05d..c9bbaa7 100644 --- a/jmclient/jmclient/configure.py +++ b/jmclient/jmclient/configure.py @@ -1,21 +1,20 @@ - +import atexit import io import logging import os import re import socket -import sys import subprocess -import atexit -from signal import SIGINT - +import sys from configparser import ConfigParser, NoOptionError +from signal import SIGINT +from typing import Any, List, Optional, Tuple import jmbitcoin as btc -from jmclient.jsonrpc import JsonRpc from jmbase.support import (get_log, joinmarket_alert, core_alert, debug_silence, set_logging_level, jmprint, set_logging_color, JM_APP_NAME, lookup_appdata_folder, EXIT_FAILURE) +from jmclient.jsonrpc import JsonRpc from jmclient.podle import set_commitment_file log = get_log() @@ -83,7 +82,7 @@ global_singleton.commit_file_location = 'cmtdata/commitments.json' global_singleton.wait_for_commitments = 0 -def jm_single(): +def jm_single() -> AttributeDict: return global_singleton # FIXME: Add rpc_* options here in the future! @@ -535,13 +534,13 @@ polling_interval_minutes = 60 #This allows use of the jmclient package with a #configuration set by an external caller; not to be used #in conjuction with calls to load_program_config. -def set_config(cfg, bcint=None): +def set_config(cfg: ConfigParser, bcint = None) -> None: global_singleton.config = cfg if bcint: global_singleton.bc_interface = bcint -def get_mchannels(mode="TAKER"): +def get_mchannels(mode: str = "TAKER") -> list: SECTION_NAME = 'MESSAGING' # FIXME: remove in future release if jm_single().config.has_section(SECTION_NAME): @@ -612,7 +611,7 @@ def get_mchannels(mode="TAKER"): assert len(onion_sections) < 2 return irc_sections + onion_sections -def _get_irc_mchannels_old(): +def _get_irc_mchannels_old() -> list: fields = [("host", str), ("port", int), ("channel", str), ("usessl", str), ("socks5", str), ("socks5_host", str), ("socks5_port", str)] configdata = {} @@ -635,11 +634,11 @@ class JMPluginService(object): any additional service (such as SNICKER). For now only covers logging. """ - def __init__(self, name, requires_logging=True): + def __init__(self, name: str, requires_logging: bool = True) -> None: self.name = name self.requires_logging = requires_logging - def start_plugin_logging(self, wallet): + def start_plugin_logging(self, wallet: str) -> None: """ This requires the name of the active wallet to set the logfile; TODO other plugin services may need a different setup. @@ -649,10 +648,10 @@ class JMPluginService(object): self.wallet.get_wallet_name()) self.start_logging() - def set_log_dir(self, logdirname): + def set_log_dir(self, logdirname: str) -> None: self.logdirname = logdirname - def start_logging(self): + def start_logging(self) -> None: logFormatter = logging.Formatter( ('%(asctime)s [%(levelname)-5.5s] {} - %(message)s'.format( self.name))) @@ -661,11 +660,11 @@ class JMPluginService(object): fileHandler.setFormatter(logFormatter) get_log().addHandler(fileHandler) -def get_network(): +def get_network() -> str: """Returns network name""" return global_singleton.config.get("BLOCKCHAIN", "network") -def validate_address(addr): +def validate_address(addr: str) -> Tuple[bool, str]: try: # automatically respects the network # as set in btc.select_chain_params(...) @@ -682,23 +681,24 @@ def validate_address(addr): _BURN_DESTINATION = "BURN" -def is_burn_destination(destination): +def is_burn_destination(destination: str) -> bool: return destination == _BURN_DESTINATION -def get_interest_rate(): +def get_interest_rate() -> float: return float(global_singleton.config.get('POLICY', 'interest_rate', fallback=_DEFAULT_INTEREST_RATE)) -def get_bondless_makers_allowance(): +def get_bondless_makers_allowance() -> float: return float(global_singleton.config.get('POLICY', 'bondless_makers_allowance', fallback=_DEFAULT_BONDLESS_MAKERS_ALLOWANCE)) -def remove_unwanted_default_settings(config): +def _remove_unwanted_default_settings(config: ConfigParser) -> None: for section in config.sections(): if section.startswith('MESSAGING:'): config.remove_section(section) -def load_program_config(config_path="", bs=None, plugin_services=[]): +def load_program_config(config_path: str = "", bs: Optional[str] = None, + plugin_services: List[JMPluginService] = []) -> None: global_singleton.config.readfp(io.StringIO(defaultconfig)) if not config_path: config_path = lookup_appdata_folder(global_singleton.APPNAME) @@ -718,7 +718,7 @@ def load_program_config(config_path="", bs=None, plugin_services=[]): global_singleton.config_location = os.path.join( global_singleton.datadir, global_singleton.config_location) - remove_unwanted_default_settings(global_singleton.config) + _remove_unwanted_default_settings(global_singleton.config) try: loadedFiles = global_singleton.config.read( [global_singleton.config_location]) @@ -810,12 +810,12 @@ def load_program_config(config_path="", bs=None, plugin_services=[]): os.makedirs(plogsdir) p.set_log_dir(plogsdir) -def gracefully_kill_subprocess(p): +def gracefully_kill_subprocess(p) -> None: # See https://stackoverflow.com/questions/43274476/is-there-a-way-to-check-if-a-subprocess-is-still-running if p.poll() is None: p.send_signal(SIGINT) -def check_and_start_tor(): +def check_and_start_tor() -> None: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) result = sock.connect_ex(("127.0.0.1", 9050)) sock.close() @@ -838,7 +838,7 @@ def check_and_start_tor(): atexit.register(gracefully_kill_subprocess, tor_subprocess) log.debug("Started Tor subprocess with pid " + str(tor_subprocess.pid)) -def load_test_config(**kwargs): +def load_test_config(**kwargs) -> None: if "config_path" not in kwargs: load_program_config(config_path=".", **kwargs) else: @@ -847,7 +847,7 @@ def load_test_config(**kwargs): ########################################################## ## Returns a tuple (rpc_user: String, rpc_pass: String) ## ########################################################## -def get_bitcoin_rpc_credentials(_config): +def _get_bitcoin_rpc_credentials(_config: ConfigParser) -> Tuple[str, str]: filepath = None try: @@ -868,7 +868,7 @@ def get_bitcoin_rpc_credentials(_config): raise ValueError("Invalid RPC auth credentials `rpc_user` and `rpc_password`") return rpc_user, rpc_password -def get_blockchain_interface_instance(_config): +def get_blockchain_interface_instance(_config: ConfigParser): # todo: refactor joinmarket module to get rid of loops # importing here is necessary to avoid import loops from jmclient.blockchaininterface import BitcoinCoreInterface, \ @@ -892,7 +892,7 @@ def get_blockchain_interface_instance(_config): rpc_port = 38332 else: raise ValueError('wrong network configured: ' + network) - rpc_user, rpc_password = get_bitcoin_rpc_credentials(_config) + rpc_user, rpc_password = _get_bitcoin_rpc_credentials(_config) rpc_wallet_file = _config.get("BLOCKCHAIN", "rpc_wallet_file") rpc = JsonRpc(rpc_host, rpc_port, rpc_user, rpc_password) if source == 'bitcoin-rpc': #pragma: no cover @@ -923,7 +923,7 @@ def get_blockchain_interface_instance(_config): raise ValueError("Invalid blockchain source") return bc_interface -def update_persist_config(section, name, value): +def update_persist_config(section: str, name: str, value: Any) -> bool: """ Unfortunately we cannot persist an updated config while preserving the full set of comments with ConfigParser's model (the 'set no-value settings' doesn't cut it). @@ -977,21 +977,21 @@ def update_persist_config(section, name, value): f.writelines([x.encode("utf-8") for x in newlines]) return True -def is_segwit_mode(): +def is_segwit_mode() -> bool: return jm_single().config.get('POLICY', 'segwit') != 'false' -def is_native_segwit_mode(): +def is_native_segwit_mode() -> bool: if not is_segwit_mode(): return False return jm_single().config.get('POLICY', 'native') != 'false' -def process_shutdown(mode="command-line"): +def process_shutdown(mode: str = "command-line") -> None: if mode=="command-line": from twisted.internet import reactor for dc in reactor.getDelayedCalls(): dc.cancel() reactor.stop() -def process_startup(): +def process_startup() -> None: from twisted.internet import reactor reactor.run()