From 77c1c2ea0ea8b9d80ea634e5bed67e5ba99ec305 Mon Sep 17 00:00:00 2001 From: Adam Gibson Date: Mon, 19 Dec 2016 17:00:18 +0200 Subject: [PATCH] rewrite IRCMC to use twisted --- .coveragerc | 1 - jmdaemon/jmdaemon/__init__.py | 2 +- jmdaemon/jmdaemon/daemon_protocol.py | 13 +- jmdaemon/jmdaemon/irc.py | 746 +++++++++++--------------- jmdaemon/jmdaemon/message_channel.py | 32 +- jmdaemon/jmdaemon/orderbookwatch.py | 1 - jmdaemon/setup.py | 2 +- jmdaemon/test/test_daemon_protocol.py | 26 +- jmdaemon/test/test_irc_messaging.py | 144 +++-- jmdaemon/test/test_message_channel.py | 31 +- 10 files changed, 425 insertions(+), 573 deletions(-) diff --git a/.coveragerc b/.coveragerc index a0e8796..807c3d8 100644 --- a/.coveragerc +++ b/.coveragerc @@ -13,4 +13,3 @@ omit = jmdaemon/test/* jmdaemon/setup.py jmdaemon/jmdaemon/socks.py - jmdaemon/jmdaemon/irc.py diff --git a/jmdaemon/jmdaemon/__init__.py b/jmdaemon/jmdaemon/__init__.py index 2b20230..9eba1ca 100644 --- a/jmdaemon/jmdaemon/__init__.py +++ b/jmdaemon/jmdaemon/__init__.py @@ -4,7 +4,7 @@ import logging from protocol import * from .enc_wrapper import as_init_encryption, decode_decrypt, \ encrypt_encode, init_keypair, init_pubkey, get_pubkey, NaclError -from .irc import IRCMessageChannel, B_PER_SEC +from .irc import IRCMessageChannel from jmbase.support import get_log from .message_channel import MessageChannel, MessageChannelCollection from .orderbookwatch import OrderbookWatch diff --git a/jmdaemon/jmdaemon/daemon_protocol.py b/jmdaemon/jmdaemon/daemon_protocol.py index 46edb1d..a97e907 100644 --- a/jmdaemon/jmdaemon/daemon_protocol.py +++ b/jmdaemon/jmdaemon/daemon_protocol.py @@ -34,17 +34,6 @@ The client-daemon two-way communication is documented in jmbase.commands.py """ -class MCThread(threading.Thread): - - def __init__(self, mc): - threading.Thread.__init__(self, name='MCThread') - self.mc = mc - self.daemon = True - - def run(self): - self.mc.run() - - class JMProtocolError(Exception): pass @@ -132,7 +121,7 @@ class JMDaemonServerProtocol(amp.AMP, OrderbookWatch): """ self.jm_state = 0 #uninited if self.restart_mc_required: - MCThread(self.mcc).start() + self.mcc.run() self.restart_mc_required = False else: #if we are not restarting the MC, diff --git a/jmdaemon/jmdaemon/irc.py b/jmdaemon/jmdaemon/irc.py index 4fbb23a..4a530af 100644 --- a/jmdaemon/jmdaemon/irc.py +++ b/jmdaemon/jmdaemon/irc.py @@ -4,217 +4,198 @@ import base64 import random import socket import ssl +#TODO: SSL support (can it be done without back-end openssl?) import threading import time -import Queue - - +from twisted.internet import reactor, protocol +from twisted.internet.endpoints import TCP4ClientEndpoint +from twisted.internet.ssl import ClientContextFactory +from twisted.logger import Logger +from twisted.words.protocols import irc from jmdaemon.message_channel import MessageChannel from jmbase.support import get_log, chunks -from jmdaemon.socks import socksocket, setdefaultproxy, PROXY_TYPE_SOCKS5 +from txsocksx.client import SOCKS5ClientEndpoint +from txsocksx.tls import TLSWrapClientEndpoint from jmdaemon.protocol import * MAX_PRIVMSG_LEN = 450 -PING_INTERVAL = 300 -PING_TIMEOUT = 60 - -#Throttling parameters; data from -#tests by @chris-belcher: -##worked (bytes per sec/bytes per sec interval / counterparties / max_privmsg_len) -#300/4 / 6 / 400 -#600/4 / 6 / 400 -#450/4 / 10 / 400 -#450/4 / 10 / 450 -#525/4 / 10 / 450 -##didnt work -#600/4 / 10 / 450 -#600/4 / 10 / 400 -#2000/2 / 10 / 400 -#450/4 / 10 / 475 -MSG_INTERVAL = 0.001 -B_PER_SEC = 450 -B_PER_SEC_INTERVAL = 4.0 - -def get_config_irc_channel(chan_name, btcnet): - channel = "#" + chan_name - if btcnet == "testnet": - channel += "-test" - return channel log = get_log() +def wlog(*x): + """Simplifier to add lists to the debug log + """ + msg = " ".join([str(a) for a in x]) + log.debug(msg) + def get_irc_text(line): return line[line[1:].find(':') + 2:] def get_irc_nick(source): - full_nick = source[1:source.find('!')] + full_nick = source[0:source.find('!')] return full_nick[:NICK_MAX_ENCODED+2] -class ThrottleThread(threading.Thread): +def get_config_irc_channel(chan_name, btcnet): + channel = "#" + chan_name + if btcnet == "testnet": + channel += "-test" + return channel - def __init__(self, irc): - threading.Thread.__init__(self, name='ThrottleThread') - self.daemon = True - self.irc = irc - self.msg_buffer = [] +class TxIRCFactory(protocol.ClientFactory): + def __init__(self, wrapper): + self.wrapper = wrapper + self.channel = self.wrapper.channel - def run(self): - log.debug("starting throttle thread") - last_msg_time = 0 - print_throttle_msg = True - while not self.irc.give_up: - self.irc.lockthrottle.acquire() - while not (self.irc.throttleQ.empty() and self.irc.obQ.empty() - and self.irc.pingQ.empty()): - time.sleep(0.0001) #need to avoid cpu spinning if throttled - try: - pingmsg = self.irc.pingQ.get(block=False) - #ping messages are not counted to throttling totals, - #so send immediately - self.irc.sock.sendall(pingmsg + '\r\n') - continue - except Queue.Empty: - pass - except: - log.warn("failed to send ping message on socket") - break - #First throttling mechanism: no more than 1 line - #per MSG_INTERVAL seconds. - x = time.time() - last_msg_time - if x < MSG_INTERVAL: - continue - #Second throttling mechanism: limited kB/s rate - #over the most recent period. - q = time.time() - B_PER_SEC_INTERVAL - #clean out old messages - self.msg_buffer = [_ for _ in self.msg_buffer if _[1] > q] - bytes_recent = sum(len(i[0]) for i in self.msg_buffer) - if bytes_recent > B_PER_SEC * B_PER_SEC_INTERVAL: - if print_throttle_msg: - log.debug("Throttling triggered, with: "+str( - bytes_recent)+ " bytes in the last "+str( - B_PER_SEC_INTERVAL)+" seconds.") - print_throttle_msg = False - continue - print_throttle_msg = True - try: - throttled_msg = self.irc.throttleQ.get(block=False) - except Queue.Empty: - try: - throttled_msg = self.irc.obQ.get(block=False) - except Queue.Empty: - #this code *should* be unreachable. - continue - try: - self.irc.sock.sendall(throttled_msg+'\r\n') - last_msg_time = time.time() - self.msg_buffer.append((throttled_msg, last_msg_time)) - except: - log.error("failed to send on socket") - try: - self.irc.fd.close() - except: pass - break - self.irc.lockthrottle.wait() - self.irc.lockthrottle.release() - - log.debug("Ended throttling thread.") - -class PingThread(threading.Thread): - - def __init__(self, irc): - threading.Thread.__init__(self, name='PingThread') - self.daemon = True - self.irc = irc + def buildProtocol(self, addr): + p = txIRC_Client(self.wrapper) + p.factory = self + self.wrapper.set_tx_irc_client(p) + return p + + def clientConnectionLost(self, connector, reason): + log.info('IRC connection lost: ' + str(reason)) + if not self.wrapper.give_up: + log.info('Attempting to reconnect...') + reactor.callLater(self.wrapper.reconnect_interval, + connector.connect()) + + def clientConnectionFailed(self, connector, reason): + log.info('IRC connection failed: ' + reason) - def run(self): - log.debug('starting ping thread') - while not self.irc.give_up: - time.sleep(PING_INTERVAL) - try: - self.irc.ping_reply = False - # maybe use this to calculate the lag one day - self.irc.lockcond.acquire() - self.irc.send_raw('PING LAG' + str(int(time.time() * 1000))) - self.irc.lockcond.wait(PING_TIMEOUT) - self.irc.lockcond.release() - if not self.irc.ping_reply: - log.warn('irc ping timed out') - try: - self.irc.close() - except: - pass - try: - self.irc.fd.close() - except: - pass - try: - self.irc.sock.shutdown(socket.SHUT_RDWR) - self.irc.sock.close() - except: - pass - except IOError as e: - log.debug('ping thread: ' + repr(e)) - log.debug('ended ping thread') - - -# handle one channel at a time class IRCMessageChannel(MessageChannel): - # close implies it will attempt to reconnect - def close(self): - try: - self.sock.sendall("QUIT\r\n") - except IOError as e: - log.info('errored while trying to quit: ' + repr(e)) + + def __init__(self, + configdata, + username='username', + realname='realname', + password=None, + daemon=None): + MessageChannel.__init__(self, daemon=daemon) + self.give_up = True + self.serverport = (configdata['host'], configdata['port']) + #default hostid for use with miniircd which doesnt send NETWORK + self.hostid = configdata['host'] + str(configdata['port']) + self.socks5 = configdata["socks5"] + self.usessl = configdata["usessl"] + self.socks5_host = configdata["socks5_host"] + self.socks5_port = int(configdata["socks5_port"]) + self.channel = get_config_irc_channel(configdata["channel"], + configdata["btcnet"]) + self.userrealname = (username, realname) + if password and len(password) == 0: + password = None + self.password = password + + self.tx_irc_client = None + #TODO can be configuration var, how long between reconnect attempts: + self.reconnect_interval = 10 + #implementation of abstract base class methods; + #these are mostly but not exclusively acting as pass through + #to the wrapped twisted IRC client protocol + def run(self): + self.give_up = False + self.build_irc() def shutdown(self): - self.close() + self.tx_irc_client.quit() self.give_up = True - # Maker callbacks - def _announce_orders(self, orderlist): - """This publishes orders to the pit and to - counterparties. Note that it does *not* use chunking. - So, it tries to optimise space usage thusly: - As many complete orderlines are fit onto one line - as possible, and overflow goes onto another line. - Each list entry in orderlist must have format: - !ordername + def _pubmsg(self, msg): + self.tx_irc_client._pubmsg(msg) - Then, what is published is lines of form: - !ordername !ordername .. + def _privmsg(self, nick, cmd, msg): + self.tx_irc_client._privmsg(nick, cmd, msg) - fitting as many list entries as possible onto one line, - up to the limit of the IRC parameters (see MAX_PRIVMSG_LEN). + def change_nick(self, new_nick): + self.tx_irc_client.setNick(new_nick) - Order announce in private is handled by privmsg/_privmsg - using chunking, no longer using this function. + def _announce_orders(self, offerlist): + self.tx_irc_client._announce_orders(offerlist) + #end ABC impl. + + def set_tx_irc_client(self, txircclt): + self.tx_irc_client = txircclt + + def build_irc(self): + """The main starting method that creates a protocol object + according to the config variables, ready for whenever + the reactor starts running. """ - header = 'PRIVMSG ' + self.channel + ' :' - orderlines = [] - for i, order in enumerate(orderlist): - orderlines.append(order) - line = header + ''.join(orderlines) + ' ~' - if len(line) > MAX_PRIVMSG_LEN or i == len(orderlist) - 1: - if i < len(orderlist) - 1: - line = header + ''.join(orderlines[:-1]) + ' ~' - self.send_raw(line) - orderlines = [orderlines[-1]] + wlog('building irc') + if self.tx_irc_client: + raise Exception('irc already built') + if self.usessl.lower() == 'true': + factory = TxIRCFactory(self) + ctx = ClientContextFactory() + reactor.connectSSL(self.serverport[0], self.serverport[1], + factory, ctx) + elif self.socks5.lower() == 'true': + #TODO not yet tested! to say it needs to be is a slight understatement. + factory = TxIRCFactory(self) + torEndpoint = TCP4ClientEndpoint(reactor, self.socks5_host, + self.socks5_port) + ircEndpoint = SOCKS5ClientEndpoint(self.serverport[0], + self.serverport[1], torEndpoint) + if self.usessl: + ctx = ClientContextFactory() + tlsEndpoint = TLSWrapClientEndpoint(ctx, ircEndpoint) + tlsEndpoint.connect(factory) + else: + ircEndpoint.connect(factory) + else: + try: + factory = TxIRCFactory(self) + wlog('build_irc: ', self.serverport[0], self.serverport[1], + self.channel) + self.tcp_connector = reactor.connectTCP( + self.serverport[0], self.serverport[1], factory) + except Exception as e: + wlog('error in buildirc: ' + repr(e)) + +class txIRC_Client(irc.IRCClient, object): + """ + lineRate is a class variable in the superclass used to limit + messages / second. heartbeat is what you'd think + TODO check this handles throttling as necessary, should do. + """ + lineRate = 0.5 + heartbeatinterval = 60 + + def __init__(self, wrapper): + self.wrapper = wrapper + self.channel = self.wrapper.channel + self.nickname = self.wrapper.nick + self.password = self.wrapper.password + self.hostname = self.wrapper.serverport[0] + self.built_privmsg = {} + # todo: build pong timeout watchdot + + def irc_unknown(self, prefix, command, params): + pass + + def irc_PONG(self, *args, **kwargs): + # todo: pong called getattr() style. use for health + pass + + def connectionMade(self): + return irc.IRCClient.connectionMade(self) + + def connectionLost(self, reason=protocol.connectionDone): + wlog('connectionLost:') + if self.wrapper.on_disconnect: + reactor.callLater(0.0, self.wrapper.on_disconnect, self.wrapper) + return irc.IRCClient.connectionLost(self, reason) + + def send(self, send_to, msg): + # todo: use proper twisted IRC support (encoding + sendCommand) + omsg = 'PRIVMSG %s :' % (send_to,) + msg + self.sendLine(omsg.encode('ascii')) def _pubmsg(self, message): - line = "PRIVMSG " + self.channel + " :" + message - assert len(line) <= MAX_PRIVMSG_LEN - ob = False - if any([x in line for x in offername_list]): - ob = True - self.send_raw(line, ob) + self.send(self.channel, message) def _privmsg(self, nick, cmd, message): - """Send a privmsg to an irc counterparty, - using chunking as appropriate for long messages. - """ - ob = True if cmd in offername_list else False header = "PRIVMSG " + nick + " :" max_chunk_len = MAX_PRIVMSG_LEN - len(header) - len(cmd) - 4 # 1 for command prefix 1 for space 2 for trailer @@ -226,253 +207,174 @@ class IRCMessageChannel(MessageChannel): trailer = ' ~' if m == message_chunks[-1] else ' ;' if m == message_chunks[0]: m = COMMAND_PREFIX + cmd + ' ' + m - self.send_raw(header + m + trailer, ob) + self.send(nick, m + trailer) - def change_nick(self, new_nick): - self.nick = new_nick - self.send_raw('NICK ' + self.nick) - - def send_raw(self, line, ob=False): - # Messages are queued and prioritised. - # This is an addressing of github #300 - if line.startswith("PING") or line.startswith("PONG"): - self.pingQ.put(line) - elif ob: - self.obQ.put(line) - else: - self.throttleQ.put(line) - self.lockthrottle.acquire() - self.lockthrottle.notify() - self.lockthrottle.release() - - def __handle_privmsg(self, source, target, message): - nick = get_irc_nick(source) - #ensure return value 'parsed' is length > 2 - if len(message) < 4: - return - if target == self.nick: - if message[0] == '\x01': - endindex = message[1:].find('\x01') - if endindex == -1: - return - ctcp = message[1:endindex + 1] - if ctcp.upper() == 'VERSION': - self.send_raw('PRIVMSG ' + nick + - ' :\x01VERSION xchat 2.8.8 Ubuntu\x01') - return - - if nick not in self.built_privmsg: - self.built_privmsg[nick] = message[:-2] - else: - self.built_privmsg[nick] += message[:-2] - if message[-1] == '~': - parsed = self.built_privmsg[nick] - # wipe the message buffer waiting for the next one - del self.built_privmsg[nick] - log.debug("< - def __init__(self, - configdata, - username='username', - realname='realname', - password=None, - daemon=None): - MessageChannel.__init__(self, daemon=daemon) - self.give_up = True - self.serverport = (configdata['host'], configdata['port']) - #default hostid for use with miniircd which doesnt send NETWORK - self.hostid = configdata['host'] + str(configdata['port']) - self.socks5 = configdata["socks5"] - self.usessl = configdata["usessl"] - self.socks5_host = configdata["socks5_host"] - self.socks5_port = int(configdata["socks5_port"]) - self.channel = get_config_irc_channel(configdata["channel"], - configdata["btcnet"]) - self.userrealname = (username, realname) - if password and len(password) == 0: - password = None - self.given_password = password - self.pingQ = Queue.Queue() - self.throttleQ = Queue.Queue() - self.obQ = Queue.Queue() - self.reconnect_interval = 30 - - def set_reconnect_interval(self, interval): - """For testing reconnection functions. - """ - self.reconnect_interval = interval + Then, what is published is lines of form: + !ordername !ordername .. - def run(self): - self.give_up = False - self.ping_reply = True - self.lockcond = threading.Condition() - self.lockthrottle = threading.Condition() - PingThread(self).start() - ThrottleThread(self).start() + fitting as many list entries as possible onto one line, + up to the limit of the IRC parameters (see MAX_PRIVMSG_LEN). - while not self.give_up: - try: - log.info("connecting to host %s" % - (self.hostid)) - if self.socks5.lower() == 'true': - log.debug("Using socks5 proxy %s:%d" % - (self.socks5_host, self.socks5_port)) - setdefaultproxy(PROXY_TYPE_SOCKS5, - self.socks5_host, self.socks5_port, - True) - self.sock = socksocket() + Order announce in private is handled by privmsg/_privmsg + using chunking, no longer using this function. + """ + header = 'PRIVMSG ' + self.channel + ' :' + offerlines = [] + for i, offer in enumerate(offerlist): + offerlines.append(offer) + line = header + ''.join(offerlines) + ' ~' + if len(line) > MAX_PRIVMSG_LEN or i == len(offerlist) - 1: + if i < len(offerlist) - 1: + line = header + ''.join(offerlines[:-1]) + ' ~' + self.sendLine(line) + offerlines = [offerlines[-1]] + # --------------------------------------------- + # general callbacks from superclass + # --------------------------------------------- + + def signedOn(self): + wlog('signedOn:') + self.join(self.factory.channel) + + def joined(self, channel): + wlog('joined: ', channel) + #Use as trigger for start to mcc: + reactor.callLater(0.0, self.wrapper.on_welcome, self.wrapper) + + def privmsg(self, userIn, channel, msg): + reactor.callLater(0.0, self.handle_privmsg, + userIn, channel, msg) + + def __on_privmsg(self, nick, msg): + self.wrapper.on_privmsg(nick, msg) + + def __on_pubmsg(self, nick, msg): + self.wrapper.on_pubmsg(nick, msg) + + def handle_privmsg(self, sent_from, sent_to, message): + try: + nick = get_irc_nick(sent_from) + # todo: kludge - we need this elsewhere. rearchitect!! + self.from_to = (nick, sent_to) + if sent_to == self.wrapper.nick: + if nick not in self.built_privmsg: + if message[0] != COMMAND_PREFIX: + wlog('bad command ', message[0]) + return + + # new message starting + cmd_string = message[1:].split(' ')[0] + self.built_privmsg[nick] = [cmd_string, message[:-2]] else: - self.sock = socket.socket(socket.AF_INET, - socket.SOCK_STREAM) - self.sock.connect(self.serverport) - if self.usessl.lower() == 'true': - self.sock = ssl.wrap_socket(self.sock) - self.fd = self.sock.makefile() - self.password = None - if self.given_password: - self.password = self.given_password - self.send_raw('CAP REQ :sasl') - self.send_raw('USER %s b c :%s' % self.userrealname) - self.nick = self.given_nick - self.send_raw('NICK ' + self.nick) - while 1: - try: - line = self.fd.readline() - except AttributeError as e: - raise IOError(repr(e)) - if line is None: - log.debug("line returned null from %s" % - (self.hostid)) - break - if len(line) == 0: - log.debug("line was zero length from %s" % - (self.hostid)) - break - self.__handle_line(line) - except IOError as e: - import traceback - log.debug("logging traceback from %s: \n" % - (self.hostid) + traceback.format_exc()) - finally: - try: - self.fd.close() - self.sock.close() - except Exception as e: + self.built_privmsg[nick][1] += message[:-2] + if message[-1] == ';': pass - if self.on_disconnect: - self.on_disconnect(self) - log.info("disconnected from irc host %s" % - (self.hostid)) - if not self.give_up: - time.sleep(self.reconnect_interval) - log.info('ending irc') - self.give_up = True + elif message[-1] == '~': + parsed = self.built_privmsg[nick][1] + # wipe the message buffer waiting for the next one + del self.built_privmsg[nick] + self.__on_privmsg(nick, parsed) + else: + # drop the bad nick + del self.built_privmsg[nick] + elif sent_to == self.channel: + self.__on_pubmsg(nick, message) + else: + wlog('what is this?: ', sent_from, sent_to, message[:80]) + except: + wlog('unable to parse privmsg, msg: ', message) + + def action(self, user, channel, msg): + wlog('unhandled action: ', user, channel, msg) + + def alterCollidedNick(self, nickname): + """ + Generate an altered version of a nickname that caused a collision in an + effort to create an unused related name for subsequent registration. + :param nickname: + """ + newnick = nickname + '_' + wlog('nickname collision, changed to ', newnick) + return newnick + + def modeChanged(self, user, channel, _set, modes, args): + wlog('(unhandled) modeChanged: ', user, channel, _set, modes, args) + + def pong(self, user, secs): + wlog('pong: ', user, secs) + + def userJoined(self, user, channel): + wlog('user joined: ', user, channel) + + def userKicked(self, kickee, channel, kicker, message): + wlog('kicked: ', kickee, channel, kicker, message) + if self.wrapper.on_nick_leave: + reactor.callLater(0.0, self.wrapper.on_nick_leave, kickee, self.wrapper) + + def userLeft(self, user, channel): + wlog('left: ', user, channel) + if self.wrapper.on_nick_leave: + reactor.callLater(0.0, self.wrapper.on_nick_leave, user, self.wrapper) + + def userRenamed(self, oldname, newname): + wlog('rename: ', oldname, newname) + #TODO nick change handling + + def userQuit(self, user, quitMessage): + wlog('userQuit: ', user, quitMessage) + if self.wrapper.on_nick_leave: + reactor.callLater(0.0, self.wrapper.on_nick_leave, user, self.wrapper) + + def topicUpdated(self, user, channel, newTopic): + wlog('topicUpdated: ', user, channel, newTopic) + if self.wrapper.on_set_topic: + reactor.callLater(0.0, self.wrapper.on_set_topic, newTopic) + + def receivedMOTD(self, motd): + wlog('motd: ', motd) + + def created(self, when): + wlog('(unhandled) created: ', when) + + def yourHost(self, info): + wlog('(unhandled) yourhost: ', info) + + def isupport(self, options): + """Used to set the name of the IRC *network* + (as distinct from the individual server), used + for signature replay defence (see signing code in message_channel.py). + If this option ("NETWORK") is not found, we fallback to the default + hostid = servername+port as shown in IRCMessageChannel (should only + happen in testing). + """ + for o in options: + try: + k, v = o.split('=') + if k == 'NETWORK': + self.wrapper.hostid = v + except Exception as e: + wlog('failed to parse isupport option, ignoring') + + def myInfo(self, servername, version, umodes, cmodes): + wlog('(unhandled) myInfo: ', servername, version, umodes, cmodes) + + def luserChannels(self, channels): + wlog('(unhandled) luserChannels: ', channels) + + def bounce(self, info): + wlog('(unhandled) bounce: ', info) + + def left(self, channel): + wlog('(unhandled) left: ', channel) + + def noticed(self, user, channel, message): + wlog('(unhandled) noticed: ', user, channel, message) \ No newline at end of file diff --git a/jmdaemon/jmdaemon/message_channel.py b/jmdaemon/jmdaemon/message_channel.py index 7a76c7b..0184e16 100644 --- a/jmdaemon/jmdaemon/message_channel.py +++ b/jmdaemon/jmdaemon/message_channel.py @@ -167,36 +167,8 @@ class MessageChannelCollection(object): self.nicks_seen[mc] = self.nicks_seen[mc].difference(set([nick])) def run(self, failures=None): - """At the moment this is effectively a - do-nothing main loop. May be suboptimal. - For now it allows us to receive the - shutdown() signal for all message channels - and propagate it. - Additionally, for testing, a parameter 'failures' - may be passed, a tuple (type, message channel index, count) - which will perform a connection shutdown of type type - after iteration count count on message channel - self.mchannels[channel index]. - """ for mc in self.mchannels: - MChannelThread(mc).start() - i = 0 - while True: - time.sleep(1) - i += 1 - if self.give_up: - log.info("Shutting down all connections") - break - #feature only used for testing: - #deliberately shutdown a connection at a certain time. - #TODO may not be sufficiently deterministic. - if failures and i == failures[2]: - if failures[0] == 'break': - self.mchannels[failures[1]].close() - elif failures[0] == 'shutdown': - self.mchannels[failures[1]].shutdown() - else: - raise NotImplementedError("Failure injection type unknown") + mc.run() #UNCONDITIONAL PUBLIC/BROADCAST: use all message #channels for these functions. @@ -696,7 +668,7 @@ class MessageChannel(object): """Send a message to a specific counterparty""" @abc.abstractmethod - def _announce_orders(self, orderlist, nick): + def _announce_orders(self, offerlist, nick): """Send orders defined in list orderlist either to the shared public channel (pit), if nick=None, or to an individual counterparty nick. Note that diff --git a/jmdaemon/jmdaemon/orderbookwatch.py b/jmdaemon/jmdaemon/orderbookwatch.py index 28e2588..d1a51f6 100644 --- a/jmdaemon/jmdaemon/orderbookwatch.py +++ b/jmdaemon/jmdaemon/orderbookwatch.py @@ -13,7 +13,6 @@ from decimal import InvalidOperation, Decimal from jmdaemon.protocol import JM_VERSION from jmbase.support import get_log, joinmarket_alert, DUST_THRESHOLD -from jmdaemon.irc import B_PER_SEC log = get_log() diff --git a/jmdaemon/setup.py b/jmdaemon/setup.py index 1d874ac..3e87569 100644 --- a/jmdaemon/setup.py +++ b/jmdaemon/setup.py @@ -9,5 +9,5 @@ setup(name='joinmarketdaemon', author_email='ekaggata@gmail.com', license='GPL', packages=['jmdaemon'], - install_requires=['libnacl', 'joinmarketbase'], + install_requires=['txsocksx', 'pyopenssl', 'libnacl', 'joinmarketbase'], zip_safe=False) diff --git a/jmdaemon/test/test_daemon_protocol.py b/jmdaemon/test/test_daemon_protocol.py index 0a59b4e..e3a6537 100644 --- a/jmdaemon/test/test_daemon_protocol.py +++ b/jmdaemon/test/test_daemon_protocol.py @@ -27,10 +27,17 @@ import json import time import base64 from dummy_mc import DummyMessageChannel +from test_message_channel import make_valid_nick test_completed = False end_early = False jlog = get_log() +class DummyMC(DummyMessageChannel): + #override run() for twisted compatibility + def run(self): + if self.on_welcome: + reactor.callLater(1, self.on_welcome, self) + class JMProtocolError(Exception): pass @@ -215,7 +222,7 @@ class JMDaemonTestServerProtocol(JMDaemonServerProtocol): maker_timeout_sec): self.maker_timeout_sec = int(maker_timeout_sec) self.minmakers = int(minmakers) - mcs = [DummyMessageChannel(None)] + mcs = [DummyMC(None)] self.mcc = MessageChannelCollection(mcs) #The following is a hack to get the counterparties marked seen/active; #note it must happen before callign set_msgchan for OrderbookWatch @@ -274,6 +281,16 @@ class JMDaemonTestServerProtocolFactory(ServerFactory): return JMDaemonTestServerProtocol(self) +class JMDaemonTest2ServerProtocol(JMDaemonServerProtocol): + #override here to avoid actually instantiating IRCMessageChannels + def init_connections(self, nick): + self.mc_shutdown() + +class JMDaemonTest2ServerProtocolFactory(ServerFactory): + protocol = JMDaemonTest2ServerProtocol + def buildProtocol(self, addr): + return JMDaemonTest2ServerProtocol(self) + class TrialTestJMDaemonProto(unittest.TestCase): def setUp(self): @@ -284,10 +301,8 @@ class TrialTestJMDaemonProto(unittest.TestCase): clientconn = reactor.connectTCP("localhost", 27184, JMTestClientProtocolFactory()) self.addCleanup(clientconn.disconnect) - print("Got here") def test_waiter(self): - print("test_main()") return task.deferLater(reactor, 12, self._called_by_deffered) def _called_by_deffered(self): @@ -299,18 +314,15 @@ class TestJMDaemonProtoInit(unittest.TestCase): def setUp(self): global end_early end_early = True - print("setUp()") load_program_config() jm_single().maker_timeout_sec = 1 - self.port = reactor.listenTCP(27184, JMDaemonServerProtocolFactory()) + self.port = reactor.listenTCP(27184, JMDaemonTest2ServerProtocolFactory()) self.addCleanup(self.port.stopListening) clientconn = reactor.connectTCP("localhost", 27184, JMTestClientProtocolFactory()) self.addCleanup(clientconn.disconnect) - print("Got here") def test_waiter(self): - print("test_main()") return task.deferLater(reactor, 5, self._called_by_deffered) def _called_by_deffered(self): diff --git a/jmdaemon/test/test_irc_messaging.py b/jmdaemon/test/test_irc_messaging.py index 6c498a8..cf07a28 100644 --- a/jmdaemon/test/test_irc_messaging.py +++ b/jmdaemon/test/test_irc_messaging.py @@ -9,18 +9,18 @@ import pytest import time import threading import hashlib +from twisted.trial import unittest +from twisted.internet import reactor, task, defer import jmbitcoin as btc from jmdaemon import (JOINMARKET_NICK_HEADER, NICK_HASH_LENGTH, - NICK_MAX_ENCODED, IRCMessageChannel) + NICK_MAX_ENCODED, IRCMessageChannel, + MessageChannelCollection) from jmdaemon.message_channel import CJPeerError import jmdaemon #needed for test framework from jmclient import (load_program_config, get_irc_mchannels, jm_single) -python_cmd = "python2" -yg_cmd = "yield-generator-basic.py" -yg_name = "ygtest" -si = 3 +si = 1 class DummyDaemon(object): def request_signature_verify(self, a, b, c, d, e, f, g, h): @@ -29,28 +29,19 @@ class DummyDaemon(object): class DummyMC(IRCMessageChannel): def __init__(self, configdata, nick, daemon): super(DummyMC, self).__init__(configdata, daemon=daemon) - """ - #hacked in here to allow auth without mc-collection - nick_priv = hashlib.sha256(os.urandom(16)).hexdigest() + '01' - nick_pubkey = btc.privtopub(nick_priv) - nick_pkh_raw = hashlib.sha256(nick_pubkey).digest()[ - :NICK_HASH_LENGTH] - nick_pkh = btc.changebase(nick_pkh_raw, 256, 58) - #right pad to maximum possible; b58 is not fixed length. - #Use 'O' as one of the 4 not included chars in base58. - nick_pkh += 'O' * (NICK_MAX_ENCODED - len(nick_pkh)) - #The constructed length will be 1 + 1 + NICK_MAX_ENCODED - nick = JOINMARKET_NICK_HEADER + str( - jm_single().JM_VERSION) + nick_pkh - jm_single().nickname = nick - """ self.daemon = daemon self.set_nick(nick) def on_connect(x): print('simulated on-connect') -def on_welcome(x): +def on_welcome(mc): print('simulated on-welcome') + if mc.nick == "irc_publisher": + d = task.deferLater(reactor, 3.0, junk_pubmsgs, mc) + d.addCallback(junk_longmsgs) + d.addCallback(junk_announce) + d.addCallback(junk_fill) + def on_disconnect(x): print('simulated on-disconnect') @@ -62,40 +53,9 @@ def on_order_seen(dummy, counterparty, oid, ordertype, minsize, def on_pubkey(pubkey): print "received pubkey: " + pubkey -class RawIRCThread(threading.Thread): - - def __init__(self, ircmsgchan): - threading.Thread.__init__(self, name='RawIRCThread') - self.daemon = True - self.ircmsgchan = ircmsgchan - - def run(self): - self.ircmsgchan.run() - -def test_junk_messages(setup_messaging): - #start a yg bot just to receive messages - """ - wallets = make_wallets(1, - wallet_structures=[[1,0,0,0,0]], - mean_amt=1) - wallet = wallets[0]['wallet'] - ygp = local_command([python_cmd, yg_cmd,\ - str(wallets[0]['seed'])], bg=True) - """ - #time.sleep(90) +def junk_pubmsgs(mc): #start a raw IRCMessageChannel instance in a thread; #then call send_* on it with various errant messages - dm = DummyDaemon() - mc = DummyMC(get_irc_mchannels()[0], "irc_ping_test", dm) - mc.register_orderbookwatch_callbacks(on_order_seen=on_order_seen) - mc.register_taker_callbacks(on_pubkey=on_pubkey) - mc.on_connect = on_connect - mc.on_disconnect = on_disconnect - mc.on_welcome = on_welcome - RawIRCThread(mc).start() - #start up a fake counterparty - mc2 = DummyMC(get_irc_mchannels()[0], yg_name, dm) - RawIRCThread(mc2).start() time.sleep(si) mc.request_orderbook() time.sleep(si) @@ -104,50 +64,84 @@ def test_junk_messages(setup_messaging): time.sleep(si) #should be ignored; can we check? mc.pubmsg("!orderbook!orderbook") - time.sleep(si) + return mc + +def junk_longmsgs(mc): #assuming MAX_PRIVMSG_LEN is not something crazy #big like 550, this should fail - with pytest.raises(AssertionError) as e_info: - mc.pubmsg("junk and crap"*40) + #with pytest.raises(AssertionError) as e_info: + mc.pubmsg("junk and crap"*40) time.sleep(si) #assuming MAX_PRIVMSG_LEN is not something crazy #small like 180, this should succeed mc.pubmsg("junk and crap"*15) time.sleep(si) + return mc + +def junk_announce(mc): #try a long order announcement in public #because we don't want to build a real orderbook, #call the underlying IRC announce function. #TODO: how to test that the sent format was correct? + print('got here') mc._announce_orders(["!abc def gh 0001"]*30) time.sleep(si) + return mc + +def junk_fill(mc): + cpname = "irc_receiver" #send a fill with an invalid pubkey to the existing yg; #this should trigger a NaclError but should NOT kill it. - mc._privmsg(yg_name, "fill", "0 10000000 abcdef") - #Test that null privmsg does not cause crash; TODO check maker log? - mc.send_raw("PRIVMSG " + yg_name + " :") - time.sleep(si) + mc._privmsg(cpname, "fill", "0 10000000 abcdef") #Try with ob flag mc._pubmsg("!reloffer stuff") time.sleep(si) #Trigger throttling with large messages - mc._privmsg(yg_name, "tx", "aa"*5000) + mc._privmsg(cpname, "tx", "aa"*5000) time.sleep(si) #with pytest.raises(CJPeerError) as e_info: - mc.send_error(yg_name, "fly you fools!") + mc.send_error(cpname, "fly you fools!") time.sleep(si) - #Test the effect of shutting down the connection - mc.set_reconnect_interval(si-1) - mc.close() - mc._announce_orders(["!abc def gh 0001"]*30) - time.sleep(si+2) - #kill the connection at socket level - mc.shutdown() - -@pytest.fixture(scope="module") -def setup_messaging(): - #Trigger PING LAG sending artificially - jmdaemon.irc.PING_INTERVAL = 3 - load_program_config() + return mc + +def getmc(nick): + dm = DummyDaemon() + mc = DummyMC(get_irc_mchannels()[0], nick, dm) + mc.register_orderbookwatch_callbacks(on_order_seen=on_order_seen) + mc.register_taker_callbacks(on_pubkey=on_pubkey) + mc.on_connect = on_connect + mc.on_disconnect = on_disconnect + mc.on_welcome = on_welcome + mcc = MessageChannelCollection([mc]) + return dm, mc, mcc + +class TrialIRC(unittest.TestCase): + + def setUp(self): + load_program_config() + print(get_irc_mchannels()[0]) + jm_single().maker_timeout_sec = 1 + dm, mc, mcc = getmc("irc_publisher") + dm2, mc2, mcc2 = getmc("irc_receiver") + mcc.run() + mcc2.run() + def cb(m): + #don't try to reconnect + m.give_up = True + m.tcp_connector.disconnect() + self.addCleanup(cb, mc) + self.addCleanup(cb, mc2) + #test_junk_messages() + print("Got here") + + def test_waiter(self): + print("test_main()") + #reactor.callLater(1.0, junk_messages, self.mcc) + return task.deferLater(reactor, 32, self._called_by_deffered) + + def _called_by_deffered(self): + pass + diff --git a/jmdaemon/test/test_message_channel.py b/jmdaemon/test/test_message_channel.py index b86333a..b76462d 100644 --- a/jmdaemon/test/test_message_channel.py +++ b/jmdaemon/test/test_message_channel.py @@ -22,7 +22,7 @@ import traceback import threading import jmbitcoin as bitcoin from dummy_mc import DummyMessageChannel - +from twisted.internet import reactor jlog = get_log() def make_valid_nick(i=0): @@ -350,25 +350,10 @@ def test_mc_run(failuretype, mcindex, wait): ob.set_msgchan(mcc) dummydaemon = DaemonForSigns(mcc) mcc.set_daemon(dummydaemon) - #to externally trigger give up condition, start mcc itself in a thread - MChannelThread(mcc).start() - time.sleep(0.2) - mcc.give_up = True - time.sleep(1.2) - #wipe state, this time use failure injections - mcc = MessageChannelCollection(dmcs) - #to test exception raise on bad failure inject, don't use thread: - if failuretype == "bad": - with pytest.raises(NotImplementedError) as e_info: - mcc.run(failures=[failuretype, mcindex, wait]) - else: - #need to override thread run() - class FIThread(MChannelThread): - def run(self): - self.mc.run(failures=self.failures) - fi = FIThread(mcc) - fi.failures = [failuretype, mcindex, wait] - fi.start() - time.sleep(wait+0.5) - - \ No newline at end of file + #need to override thread run() + class FIThread(MChannelThread): + def run(self): + self.mc.run() + fi = FIThread(mcc) + fi.start() + time.sleep(wait+0.5)