Browse Source

rewrite IRCMC to use twisted

master
Adam Gibson 9 years ago
parent
commit
77c1c2ea0e
No known key found for this signature in database
GPG Key ID: B3AE09F1E9A3197A
  1. 1
      .coveragerc
  2. 2
      jmdaemon/jmdaemon/__init__.py
  3. 13
      jmdaemon/jmdaemon/daemon_protocol.py
  4. 722
      jmdaemon/jmdaemon/irc.py
  5. 32
      jmdaemon/jmdaemon/message_channel.py
  6. 1
      jmdaemon/jmdaemon/orderbookwatch.py
  7. 2
      jmdaemon/setup.py
  8. 26
      jmdaemon/test/test_daemon_protocol.py
  9. 140
      jmdaemon/test/test_irc_messaging.py
  10. 19
      jmdaemon/test/test_message_channel.py

1
.coveragerc

@ -13,4 +13,3 @@ omit =
jmdaemon/test/* jmdaemon/test/*
jmdaemon/setup.py jmdaemon/setup.py
jmdaemon/jmdaemon/socks.py jmdaemon/jmdaemon/socks.py
jmdaemon/jmdaemon/irc.py

2
jmdaemon/jmdaemon/__init__.py

@ -4,7 +4,7 @@ import logging
from protocol import * from protocol import *
from .enc_wrapper import as_init_encryption, decode_decrypt, \ from .enc_wrapper import as_init_encryption, decode_decrypt, \
encrypt_encode, init_keypair, init_pubkey, get_pubkey, NaclError encrypt_encode, init_keypair, init_pubkey, get_pubkey, NaclError
from .irc import IRCMessageChannel, B_PER_SEC from .irc import IRCMessageChannel
from jmbase.support import get_log from jmbase.support import get_log
from .message_channel import MessageChannel, MessageChannelCollection from .message_channel import MessageChannel, MessageChannelCollection
from .orderbookwatch import OrderbookWatch from .orderbookwatch import OrderbookWatch

13
jmdaemon/jmdaemon/daemon_protocol.py

@ -34,17 +34,6 @@ The client-daemon two-way communication is documented in jmbase.commands.py
""" """
class MCThread(threading.Thread):
def __init__(self, mc):
threading.Thread.__init__(self, name='MCThread')
self.mc = mc
self.daemon = True
def run(self):
self.mc.run()
class JMProtocolError(Exception): class JMProtocolError(Exception):
pass pass
@ -132,7 +121,7 @@ class JMDaemonServerProtocol(amp.AMP, OrderbookWatch):
""" """
self.jm_state = 0 #uninited self.jm_state = 0 #uninited
if self.restart_mc_required: if self.restart_mc_required:
MCThread(self.mcc).start() self.mcc.run()
self.restart_mc_required = False self.restart_mc_required = False
else: else:
#if we are not restarting the MC, #if we are not restarting the MC,

722
jmdaemon/jmdaemon/irc.py

@ -4,176 +4,212 @@ import base64
import random import random
import socket import socket
import ssl import ssl
#TODO: SSL support (can it be done without back-end openssl?)
import threading import threading
import time import time
import Queue from twisted.internet import reactor, protocol
from twisted.internet.endpoints import TCP4ClientEndpoint
from twisted.internet.ssl import ClientContextFactory
from twisted.logger import Logger
from twisted.words.protocols import irc
from jmdaemon.message_channel import MessageChannel from jmdaemon.message_channel import MessageChannel
from jmbase.support import get_log, chunks from jmbase.support import get_log, chunks
from jmdaemon.socks import socksocket, setdefaultproxy, PROXY_TYPE_SOCKS5 from txsocksx.client import SOCKS5ClientEndpoint
from txsocksx.tls import TLSWrapClientEndpoint
from jmdaemon.protocol import * from jmdaemon.protocol import *
MAX_PRIVMSG_LEN = 450 MAX_PRIVMSG_LEN = 450
PING_INTERVAL = 300
PING_TIMEOUT = 60
#Throttling parameters; data from
#tests by @chris-belcher:
##worked (bytes per sec/bytes per sec interval / counterparties / max_privmsg_len)
#300/4 / 6 / 400
#600/4 / 6 / 400
#450/4 / 10 / 400
#450/4 / 10 / 450
#525/4 / 10 / 450
##didnt work
#600/4 / 10 / 450
#600/4 / 10 / 400
#2000/2 / 10 / 400
#450/4 / 10 / 475
MSG_INTERVAL = 0.001
B_PER_SEC = 450
B_PER_SEC_INTERVAL = 4.0
def get_config_irc_channel(chan_name, btcnet):
channel = "#" + chan_name
if btcnet == "testnet":
channel += "-test"
return channel
log = get_log() log = get_log()
def wlog(*x):
"""Simplifier to add lists to the debug log
"""
msg = " ".join([str(a) for a in x])
log.debug(msg)
def get_irc_text(line): def get_irc_text(line):
return line[line[1:].find(':') + 2:] return line[line[1:].find(':') + 2:]
def get_irc_nick(source): def get_irc_nick(source):
full_nick = source[1:source.find('!')] full_nick = source[0:source.find('!')]
return full_nick[:NICK_MAX_ENCODED+2] return full_nick[:NICK_MAX_ENCODED+2]
class ThrottleThread(threading.Thread): def get_config_irc_channel(chan_name, btcnet):
channel = "#" + chan_name
if btcnet == "testnet":
channel += "-test"
return channel
def __init__(self, irc): class TxIRCFactory(protocol.ClientFactory):
threading.Thread.__init__(self, name='ThrottleThread') def __init__(self, wrapper):
self.daemon = True self.wrapper = wrapper
self.irc = irc self.channel = self.wrapper.channel
self.msg_buffer = []
def run(self): def buildProtocol(self, addr):
log.debug("starting throttle thread") p = txIRC_Client(self.wrapper)
last_msg_time = 0 p.factory = self
print_throttle_msg = True self.wrapper.set_tx_irc_client(p)
while not self.irc.give_up: return p
self.irc.lockthrottle.acquire()
while not (self.irc.throttleQ.empty() and self.irc.obQ.empty()
and self.irc.pingQ.empty()):
time.sleep(0.0001) #need to avoid cpu spinning if throttled
try:
pingmsg = self.irc.pingQ.get(block=False)
#ping messages are not counted to throttling totals,
#so send immediately
self.irc.sock.sendall(pingmsg + '\r\n')
continue
except Queue.Empty:
pass
except:
log.warn("failed to send ping message on socket")
break
#First throttling mechanism: no more than 1 line
#per MSG_INTERVAL seconds.
x = time.time() - last_msg_time
if x < MSG_INTERVAL:
continue
#Second throttling mechanism: limited kB/s rate
#over the most recent period.
q = time.time() - B_PER_SEC_INTERVAL
#clean out old messages
self.msg_buffer = [_ for _ in self.msg_buffer if _[1] > q]
bytes_recent = sum(len(i[0]) for i in self.msg_buffer)
if bytes_recent > B_PER_SEC * B_PER_SEC_INTERVAL:
if print_throttle_msg:
log.debug("Throttling triggered, with: "+str(
bytes_recent)+ " bytes in the last "+str(
B_PER_SEC_INTERVAL)+" seconds.")
print_throttle_msg = False
continue
print_throttle_msg = True
try:
throttled_msg = self.irc.throttleQ.get(block=False)
except Queue.Empty:
try:
throttled_msg = self.irc.obQ.get(block=False)
except Queue.Empty:
#this code *should* be unreachable.
continue
try:
self.irc.sock.sendall(throttled_msg+'\r\n')
last_msg_time = time.time()
self.msg_buffer.append((throttled_msg, last_msg_time))
except:
log.error("failed to send on socket")
try:
self.irc.fd.close()
except: pass
break
self.irc.lockthrottle.wait()
self.irc.lockthrottle.release()
log.debug("Ended throttling thread.") def clientConnectionLost(self, connector, reason):
log.info('IRC connection lost: ' + str(reason))
if not self.wrapper.give_up:
log.info('Attempting to reconnect...')
reactor.callLater(self.wrapper.reconnect_interval,
connector.connect())
class PingThread(threading.Thread): def clientConnectionFailed(self, connector, reason):
log.info('IRC connection failed: ' + reason)
def __init__(self, irc): class IRCMessageChannel(MessageChannel):
threading.Thread.__init__(self, name='PingThread')
self.daemon = True
self.irc = irc
def __init__(self,
configdata,
username='username',
realname='realname',
password=None,
daemon=None):
MessageChannel.__init__(self, daemon=daemon)
self.give_up = True
self.serverport = (configdata['host'], configdata['port'])
#default hostid for use with miniircd which doesnt send NETWORK
self.hostid = configdata['host'] + str(configdata['port'])
self.socks5 = configdata["socks5"]
self.usessl = configdata["usessl"]
self.socks5_host = configdata["socks5_host"]
self.socks5_port = int(configdata["socks5_port"])
self.channel = get_config_irc_channel(configdata["channel"],
configdata["btcnet"])
self.userrealname = (username, realname)
if password and len(password) == 0:
password = None
self.password = password
self.tx_irc_client = None
#TODO can be configuration var, how long between reconnect attempts:
self.reconnect_interval = 10
#implementation of abstract base class methods;
#these are mostly but not exclusively acting as pass through
#to the wrapped twisted IRC client protocol
def run(self): def run(self):
log.debug('starting ping thread') self.give_up = False
while not self.irc.give_up: self.build_irc()
time.sleep(PING_INTERVAL)
try: def shutdown(self):
self.irc.ping_reply = False self.tx_irc_client.quit()
# maybe use this to calculate the lag one day self.give_up = True
self.irc.lockcond.acquire()
self.irc.send_raw('PING LAG' + str(int(time.time() * 1000))) def _pubmsg(self, msg):
self.irc.lockcond.wait(PING_TIMEOUT) self.tx_irc_client._pubmsg(msg)
self.irc.lockcond.release()
if not self.irc.ping_reply: def _privmsg(self, nick, cmd, msg):
log.warn('irc ping timed out') self.tx_irc_client._privmsg(nick, cmd, msg)
try:
self.irc.close() def change_nick(self, new_nick):
except: self.tx_irc_client.setNick(new_nick)
pass
def _announce_orders(self, offerlist):
self.tx_irc_client._announce_orders(offerlist)
#end ABC impl.
def set_tx_irc_client(self, txircclt):
self.tx_irc_client = txircclt
def build_irc(self):
"""The main starting method that creates a protocol object
according to the config variables, ready for whenever
the reactor starts running.
"""
wlog('building irc')
if self.tx_irc_client:
raise Exception('irc already built')
if self.usessl.lower() == 'true':
factory = TxIRCFactory(self)
ctx = ClientContextFactory()
reactor.connectSSL(self.serverport[0], self.serverport[1],
factory, ctx)
elif self.socks5.lower() == 'true':
#TODO not yet tested! to say it needs to be is a slight understatement.
factory = TxIRCFactory(self)
torEndpoint = TCP4ClientEndpoint(reactor, self.socks5_host,
self.socks5_port)
ircEndpoint = SOCKS5ClientEndpoint(self.serverport[0],
self.serverport[1], torEndpoint)
if self.usessl:
ctx = ClientContextFactory()
tlsEndpoint = TLSWrapClientEndpoint(ctx, ircEndpoint)
tlsEndpoint.connect(factory)
else:
ircEndpoint.connect(factory)
else:
try: try:
self.irc.fd.close() factory = TxIRCFactory(self)
except: wlog('build_irc: ', self.serverport[0], self.serverport[1],
self.channel)
self.tcp_connector = reactor.connectTCP(
self.serverport[0], self.serverport[1], factory)
except Exception as e:
wlog('error in buildirc: ' + repr(e))
class txIRC_Client(irc.IRCClient, object):
"""
lineRate is a class variable in the superclass used to limit
messages / second. heartbeat is what you'd think
TODO check this handles throttling as necessary, should do.
"""
lineRate = 0.5
heartbeatinterval = 60
def __init__(self, wrapper):
self.wrapper = wrapper
self.channel = self.wrapper.channel
self.nickname = self.wrapper.nick
self.password = self.wrapper.password
self.hostname = self.wrapper.serverport[0]
self.built_privmsg = {}
# todo: build pong timeout watchdot
def irc_unknown(self, prefix, command, params):
pass pass
try:
self.irc.sock.shutdown(socket.SHUT_RDWR) def irc_PONG(self, *args, **kwargs):
self.irc.sock.close() # todo: pong called getattr() style. use for health
except:
pass pass
except IOError as e:
log.debug('ping thread: ' + repr(e))
log.debug('ended ping thread')
def connectionMade(self):
return irc.IRCClient.connectionMade(self)
# handle one channel at a time def connectionLost(self, reason=protocol.connectionDone):
class IRCMessageChannel(MessageChannel): wlog('connectionLost:')
# close implies it will attempt to reconnect if self.wrapper.on_disconnect:
def close(self): reactor.callLater(0.0, self.wrapper.on_disconnect, self.wrapper)
try: return irc.IRCClient.connectionLost(self, reason)
self.sock.sendall("QUIT\r\n")
except IOError as e:
log.info('errored while trying to quit: ' + repr(e))
def shutdown(self): def send(self, send_to, msg):
self.close() # todo: use proper twisted IRC support (encoding + sendCommand)
self.give_up = True omsg = 'PRIVMSG %s :' % (send_to,) + msg
self.sendLine(omsg.encode('ascii'))
def _pubmsg(self, message):
self.send(self.channel, message)
def _privmsg(self, nick, cmd, message):
header = "PRIVMSG " + nick + " :"
max_chunk_len = MAX_PRIVMSG_LEN - len(header) - len(cmd) - 4
# 1 for command prefix 1 for space 2 for trailer
if len(message) > max_chunk_len:
message_chunks = chunks(message, max_chunk_len)
else:
message_chunks = [message]
for m in message_chunks:
trailer = ' ~' if m == message_chunks[-1] else ' ;'
if m == message_chunks[0]:
m = COMMAND_PREFIX + cmd + ' ' + m
self.send(nick, m + trailer)
# Maker callbacks def _announce_orders(self, offerlist):
def _announce_orders(self, orderlist):
"""This publishes orders to the pit and to """This publishes orders to the pit and to
counterparties. Note that it does *not* use chunking. counterparties. Note that it does *not* use chunking.
So, it tries to optimise space usage thusly: So, it tries to optimise space usage thusly:
@ -192,287 +228,153 @@ class IRCMessageChannel(MessageChannel):
using chunking, no longer using this function. using chunking, no longer using this function.
""" """
header = 'PRIVMSG ' + self.channel + ' :' header = 'PRIVMSG ' + self.channel + ' :'
orderlines = [] offerlines = []
for i, order in enumerate(orderlist): for i, offer in enumerate(offerlist):
orderlines.append(order) offerlines.append(offer)
line = header + ''.join(orderlines) + ' ~' line = header + ''.join(offerlines) + ' ~'
if len(line) > MAX_PRIVMSG_LEN or i == len(orderlist) - 1: if len(line) > MAX_PRIVMSG_LEN or i == len(offerlist) - 1:
if i < len(orderlist) - 1: if i < len(offerlist) - 1:
line = header + ''.join(orderlines[:-1]) + ' ~' line = header + ''.join(offerlines[:-1]) + ' ~'
self.send_raw(line) self.sendLine(line)
orderlines = [orderlines[-1]] offerlines = [offerlines[-1]]
# ---------------------------------------------
def _pubmsg(self, message): # general callbacks from superclass
line = "PRIVMSG " + self.channel + " :" + message # ---------------------------------------------
assert len(line) <= MAX_PRIVMSG_LEN
ob = False def signedOn(self):
if any([x in line for x in offername_list]): wlog('signedOn:')
ob = True self.join(self.factory.channel)
self.send_raw(line, ob)
def joined(self, channel):
def _privmsg(self, nick, cmd, message): wlog('joined: ', channel)
"""Send a privmsg to an irc counterparty, #Use as trigger for start to mcc:
using chunking as appropriate for long messages. reactor.callLater(0.0, self.wrapper.on_welcome, self.wrapper)
"""
ob = True if cmd in offername_list else False def privmsg(self, userIn, channel, msg):
header = "PRIVMSG " + nick + " :" reactor.callLater(0.0, self.handle_privmsg,
max_chunk_len = MAX_PRIVMSG_LEN - len(header) - len(cmd) - 4 userIn, channel, msg)
# 1 for command prefix 1 for space 2 for trailer
if len(message) > max_chunk_len: def __on_privmsg(self, nick, msg):
message_chunks = chunks(message, max_chunk_len) self.wrapper.on_privmsg(nick, msg)
else:
message_chunks = [message] def __on_pubmsg(self, nick, msg):
for m in message_chunks: self.wrapper.on_pubmsg(nick, msg)
trailer = ' ~' if m == message_chunks[-1] else ' ;'
if m == message_chunks[0]: def handle_privmsg(self, sent_from, sent_to, message):
m = COMMAND_PREFIX + cmd + ' ' + m try:
self.send_raw(header + m + trailer, ob) nick = get_irc_nick(sent_from)
# todo: kludge - we need this elsewhere. rearchitect!!
def change_nick(self, new_nick): self.from_to = (nick, sent_to)
self.nick = new_nick if sent_to == self.wrapper.nick:
self.send_raw('NICK ' + self.nick) if nick not in self.built_privmsg:
if message[0] != COMMAND_PREFIX:
def send_raw(self, line, ob=False): wlog('bad command ', message[0])
# Messages are queued and prioritised.
# This is an addressing of github #300
if line.startswith("PING") or line.startswith("PONG"):
self.pingQ.put(line)
elif ob:
self.obQ.put(line)
else:
self.throttleQ.put(line)
self.lockthrottle.acquire()
self.lockthrottle.notify()
self.lockthrottle.release()
def __handle_privmsg(self, source, target, message):
nick = get_irc_nick(source)
#ensure return value 'parsed' is length > 2
if len(message) < 4:
return
if target == self.nick:
if message[0] == '\x01':
endindex = message[1:].find('\x01')
if endindex == -1:
return
ctcp = message[1:endindex + 1]
if ctcp.upper() == 'VERSION':
self.send_raw('PRIVMSG ' + nick +
' :\x01VERSION xchat 2.8.8 Ubuntu\x01')
return return
if nick not in self.built_privmsg: # new message starting
self.built_privmsg[nick] = message[:-2] cmd_string = message[1:].split(' ')[0]
self.built_privmsg[nick] = [cmd_string, message[:-2]]
else: else:
self.built_privmsg[nick] += message[:-2] self.built_privmsg[nick][1] += message[:-2]
if message[-1] == '~': if message[-1] == ';':
parsed = self.built_privmsg[nick] pass
elif message[-1] == '~':
parsed = self.built_privmsg[nick][1]
# wipe the message buffer waiting for the next one # wipe the message buffer waiting for the next one
del self.built_privmsg[nick] del self.built_privmsg[nick]
log.debug("<<privmsg on %s: " % self.__on_privmsg(nick, parsed)
(self.hostid) + "nick=%s message=%s" % (nick, parsed)) else:
self.on_privmsg(nick, parsed)
elif message[-1] != ';':
# drop the bad nick # drop the bad nick
del self.built_privmsg[nick] del self.built_privmsg[nick]
elif target == self.channel: elif sent_to == self.channel:
log.info("<<pubmsg on %s: " % self.__on_pubmsg(nick, message)
(self.hostid) + "nick=%s message=%s" %
(nick, message))
self.on_pubmsg(nick, message)
else: else:
log.debug("what is this? privmsg on %s: " % wlog('what is this?: ', sent_from, sent_to, message[:80])
(self.hostid) + "src=%s target=%s message=%s;" % except:
(source, target, message)) wlog('unable to parse privmsg, msg: ', message)
def __handle_line(self, line):
line = line.rstrip()
# log.debug('<< ' + line)
if line.startswith('PING '):
self.send_raw(line.replace('PING', 'PONG'))
return
_chunks = line.split(' ')
if _chunks[1] == 'QUIT':
nick = get_irc_nick(_chunks[0])
if nick == self.nick:
raise IOError('we quit')
else:
if self.on_nick_leave:
self.on_nick_leave(nick, self)
elif _chunks[1] == '433': # nick in use
# helps keep identity constant if just _ added
#request new nick on *all* channels via callback
if self.on_nick_change:
self.on_nick_change(self.nick + '_')
if self.password:
if _chunks[1] == 'CAP':
if _chunks[3] != 'ACK':
log.warn("server %s " %
(self.hostid) + "does not support SASL, quitting")
self.shutdown()
self.send_raw('AUTHENTICATE PLAIN')
elif _chunks[0] == 'AUTHENTICATE':
self.send_raw('AUTHENTICATE ' + base64.b64encode(
self.nick + '\x00' + self.nick + '\x00' + self.password))
elif _chunks[1] == '903':
log.info("Successfully authenticated on %s" %
(self.hostid))
self.password = None
self.send_raw('CAP END')
elif _chunks[1] == '904':
log.warn("Failed authentication %s " %
(self.hostid) + ", wrong password")
self.shutdown()
return
if _chunks[1] == 'PRIVMSG': def action(self, user, channel, msg):
self.__handle_privmsg(_chunks[0], _chunks[2], get_irc_text(line)) wlog('unhandled action: ', user, channel, msg)
if _chunks[1] == 'PONG':
self.ping_reply = True
self.lockcond.acquire()
self.lockcond.notify()
self.lockcond.release()
elif _chunks[1] == '376': # end of motd
self.built_privmsg = {}
if self.on_connect:
self.on_connect(self)
if self.hostid == 'agora-irc':
self.send_raw('PART #AGORA')
self.send_raw('JOIN ' + self.channel)
self.send_raw(
'MODE ' + self.nick + ' +B') # marks as bots on unreal
self.send_raw(
'MODE ' + self.nick + ' -R') # allows unreg'd private messages
elif _chunks[1] == '366': # end of names list
log.info("Connected to IRC and joined channel on %s " %
(self.hostid))
if self.on_welcome:
self.on_welcome(self) #informs mc-collection that we are ready for use
elif _chunks[1] == '332' or _chunks[1] == 'TOPIC': # channel topic
topic = get_irc_text(line)
self.on_set_topic(topic)
elif _chunks[1] == 'KICK':
target = _chunks[3]
if target == self.nick:
self.give_up = True
fmt = '{} has kicked us from the irc channel! Reason= {}'.format
raise IOError(fmt(get_irc_nick(_chunks[0]), get_irc_text(line)))
else:
if self.on_nick_leave:
self.on_nick_leave(target, self)
elif _chunks[1] == 'PART':
nick = get_irc_nick(_chunks[0])
if self.on_nick_leave:
self.on_nick_leave(nick, self)
elif _chunks[1] == '005':
'''
:port80b.se.quakenet.org 005 J5BzJGGfyw5GaPc MAXNICKLEN=15
TOPICLEN=250 AWAYLEN=160 KICKLEN=250 CHANNELLEN=200
MAXCHANNELLEN=200 CHANTYPES=#& PREFIX=(ov)@+ STATUSMSG=@+
CHANMODES=b,k,l,imnpstrDducCNMT CASEMAPPING=rfc1459
NETWORK=QuakeNet :are supported by this server
'''
for chu in _chunks[3:]:
if chu[0] == ':':
break
if chu.lower().startswith('network='):
self.hostid = chu[8:]
log.debug('found network name: ' + self.hostid + ';')
def __init__(self, def alterCollidedNick(self, nickname):
configdata,
username='username',
realname='realname',
password=None,
daemon=None):
MessageChannel.__init__(self, daemon=daemon)
self.give_up = True
self.serverport = (configdata['host'], configdata['port'])
#default hostid for use with miniircd which doesnt send NETWORK
self.hostid = configdata['host'] + str(configdata['port'])
self.socks5 = configdata["socks5"]
self.usessl = configdata["usessl"]
self.socks5_host = configdata["socks5_host"]
self.socks5_port = int(configdata["socks5_port"])
self.channel = get_config_irc_channel(configdata["channel"],
configdata["btcnet"])
self.userrealname = (username, realname)
if password and len(password) == 0:
password = None
self.given_password = password
self.pingQ = Queue.Queue()
self.throttleQ = Queue.Queue()
self.obQ = Queue.Queue()
self.reconnect_interval = 30
def set_reconnect_interval(self, interval):
"""For testing reconnection functions.
""" """
self.reconnect_interval = interval Generate an altered version of a nickname that caused a collision in an
effort to create an unused related name for subsequent registration.
def run(self): :param nickname:
self.give_up = False """
self.ping_reply = True newnick = nickname + '_'
self.lockcond = threading.Condition() wlog('nickname collision, changed to ', newnick)
self.lockthrottle = threading.Condition() return newnick
PingThread(self).start()
ThrottleThread(self).start() def modeChanged(self, user, channel, _set, modes, args):
wlog('(unhandled) modeChanged: ', user, channel, _set, modes, args)
while not self.give_up:
try: def pong(self, user, secs):
log.info("connecting to host %s" % wlog('pong: ', user, secs)
(self.hostid))
if self.socks5.lower() == 'true': def userJoined(self, user, channel):
log.debug("Using socks5 proxy %s:%d" % wlog('user joined: ', user, channel)
(self.socks5_host, self.socks5_port))
setdefaultproxy(PROXY_TYPE_SOCKS5, def userKicked(self, kickee, channel, kicker, message):
self.socks5_host, self.socks5_port, wlog('kicked: ', kickee, channel, kicker, message)
True) if self.wrapper.on_nick_leave:
self.sock = socksocket() reactor.callLater(0.0, self.wrapper.on_nick_leave, kickee, self.wrapper)
else:
self.sock = socket.socket(socket.AF_INET, def userLeft(self, user, channel):
socket.SOCK_STREAM) wlog('left: ', user, channel)
self.sock.connect(self.serverport) if self.wrapper.on_nick_leave:
if self.usessl.lower() == 'true': reactor.callLater(0.0, self.wrapper.on_nick_leave, user, self.wrapper)
self.sock = ssl.wrap_socket(self.sock)
self.fd = self.sock.makefile() def userRenamed(self, oldname, newname):
self.password = None wlog('rename: ', oldname, newname)
if self.given_password: #TODO nick change handling
self.password = self.given_password
self.send_raw('CAP REQ :sasl') def userQuit(self, user, quitMessage):
self.send_raw('USER %s b c :%s' % self.userrealname) wlog('userQuit: ', user, quitMessage)
self.nick = self.given_nick if self.wrapper.on_nick_leave:
self.send_raw('NICK ' + self.nick) reactor.callLater(0.0, self.wrapper.on_nick_leave, user, self.wrapper)
while 1:
try: def topicUpdated(self, user, channel, newTopic):
line = self.fd.readline() wlog('topicUpdated: ', user, channel, newTopic)
except AttributeError as e: if self.wrapper.on_set_topic:
raise IOError(repr(e)) reactor.callLater(0.0, self.wrapper.on_set_topic, newTopic)
if line is None:
log.debug("line returned null from %s" % def receivedMOTD(self, motd):
(self.hostid)) wlog('motd: ', motd)
break
if len(line) == 0: def created(self, when):
log.debug("line was zero length from %s" % wlog('(unhandled) created: ', when)
(self.hostid))
break def yourHost(self, info):
self.__handle_line(line) wlog('(unhandled) yourhost: ', info)
except IOError as e:
import traceback def isupport(self, options):
log.debug("logging traceback from %s: \n" % """Used to set the name of the IRC *network*
(self.hostid) + traceback.format_exc()) (as distinct from the individual server), used
finally: for signature replay defence (see signing code in message_channel.py).
If this option ("NETWORK") is not found, we fallback to the default
hostid = servername+port as shown in IRCMessageChannel (should only
happen in testing).
"""
for o in options:
try: try:
self.fd.close() k, v = o.split('=')
self.sock.close() if k == 'NETWORK':
self.wrapper.hostid = v
except Exception as e: except Exception as e:
pass wlog('failed to parse isupport option, ignoring')
if self.on_disconnect:
self.on_disconnect(self) def myInfo(self, servername, version, umodes, cmodes):
log.info("disconnected from irc host %s" % wlog('(unhandled) myInfo: ', servername, version, umodes, cmodes)
(self.hostid))
if not self.give_up: def luserChannels(self, channels):
time.sleep(self.reconnect_interval) wlog('(unhandled) luserChannels: ', channels)
log.info('ending irc')
self.give_up = True def bounce(self, info):
wlog('(unhandled) bounce: ', info)
def left(self, channel):
wlog('(unhandled) left: ', channel)
def noticed(self, user, channel, message):
wlog('(unhandled) noticed: ', user, channel, message)

32
jmdaemon/jmdaemon/message_channel.py

@ -167,36 +167,8 @@ class MessageChannelCollection(object):
self.nicks_seen[mc] = self.nicks_seen[mc].difference(set([nick])) self.nicks_seen[mc] = self.nicks_seen[mc].difference(set([nick]))
def run(self, failures=None): def run(self, failures=None):
"""At the moment this is effectively a
do-nothing main loop. May be suboptimal.
For now it allows us to receive the
shutdown() signal for all message channels
and propagate it.
Additionally, for testing, a parameter 'failures'
may be passed, a tuple (type, message channel index, count)
which will perform a connection shutdown of type type
after iteration count count on message channel
self.mchannels[channel index].
"""
for mc in self.mchannels: for mc in self.mchannels:
MChannelThread(mc).start() mc.run()
i = 0
while True:
time.sleep(1)
i += 1
if self.give_up:
log.info("Shutting down all connections")
break
#feature only used for testing:
#deliberately shutdown a connection at a certain time.
#TODO may not be sufficiently deterministic.
if failures and i == failures[2]:
if failures[0] == 'break':
self.mchannels[failures[1]].close()
elif failures[0] == 'shutdown':
self.mchannels[failures[1]].shutdown()
else:
raise NotImplementedError("Failure injection type unknown")
#UNCONDITIONAL PUBLIC/BROADCAST: use all message #UNCONDITIONAL PUBLIC/BROADCAST: use all message
#channels for these functions. #channels for these functions.
@ -696,7 +668,7 @@ class MessageChannel(object):
"""Send a message to a specific counterparty""" """Send a message to a specific counterparty"""
@abc.abstractmethod @abc.abstractmethod
def _announce_orders(self, orderlist, nick): def _announce_orders(self, offerlist, nick):
"""Send orders defined in list orderlist either """Send orders defined in list orderlist either
to the shared public channel (pit), if nick=None, to the shared public channel (pit), if nick=None,
or to an individual counterparty nick. Note that or to an individual counterparty nick. Note that

1
jmdaemon/jmdaemon/orderbookwatch.py

@ -13,7 +13,6 @@ from decimal import InvalidOperation, Decimal
from jmdaemon.protocol import JM_VERSION from jmdaemon.protocol import JM_VERSION
from jmbase.support import get_log, joinmarket_alert, DUST_THRESHOLD from jmbase.support import get_log, joinmarket_alert, DUST_THRESHOLD
from jmdaemon.irc import B_PER_SEC
log = get_log() log = get_log()

2
jmdaemon/setup.py

@ -9,5 +9,5 @@ setup(name='joinmarketdaemon',
author_email='ekaggata@gmail.com', author_email='ekaggata@gmail.com',
license='GPL', license='GPL',
packages=['jmdaemon'], packages=['jmdaemon'],
install_requires=['libnacl', 'joinmarketbase'], install_requires=['txsocksx', 'pyopenssl', 'libnacl', 'joinmarketbase'],
zip_safe=False) zip_safe=False)

26
jmdaemon/test/test_daemon_protocol.py

@ -27,10 +27,17 @@ import json
import time import time
import base64 import base64
from dummy_mc import DummyMessageChannel from dummy_mc import DummyMessageChannel
from test_message_channel import make_valid_nick
test_completed = False test_completed = False
end_early = False end_early = False
jlog = get_log() jlog = get_log()
class DummyMC(DummyMessageChannel):
#override run() for twisted compatibility
def run(self):
if self.on_welcome:
reactor.callLater(1, self.on_welcome, self)
class JMProtocolError(Exception): class JMProtocolError(Exception):
pass pass
@ -215,7 +222,7 @@ class JMDaemonTestServerProtocol(JMDaemonServerProtocol):
maker_timeout_sec): maker_timeout_sec):
self.maker_timeout_sec = int(maker_timeout_sec) self.maker_timeout_sec = int(maker_timeout_sec)
self.minmakers = int(minmakers) self.minmakers = int(minmakers)
mcs = [DummyMessageChannel(None)] mcs = [DummyMC(None)]
self.mcc = MessageChannelCollection(mcs) self.mcc = MessageChannelCollection(mcs)
#The following is a hack to get the counterparties marked seen/active; #The following is a hack to get the counterparties marked seen/active;
#note it must happen before callign set_msgchan for OrderbookWatch #note it must happen before callign set_msgchan for OrderbookWatch
@ -274,6 +281,16 @@ class JMDaemonTestServerProtocolFactory(ServerFactory):
return JMDaemonTestServerProtocol(self) return JMDaemonTestServerProtocol(self)
class JMDaemonTest2ServerProtocol(JMDaemonServerProtocol):
#override here to avoid actually instantiating IRCMessageChannels
def init_connections(self, nick):
self.mc_shutdown()
class JMDaemonTest2ServerProtocolFactory(ServerFactory):
protocol = JMDaemonTest2ServerProtocol
def buildProtocol(self, addr):
return JMDaemonTest2ServerProtocol(self)
class TrialTestJMDaemonProto(unittest.TestCase): class TrialTestJMDaemonProto(unittest.TestCase):
def setUp(self): def setUp(self):
@ -284,10 +301,8 @@ class TrialTestJMDaemonProto(unittest.TestCase):
clientconn = reactor.connectTCP("localhost", 27184, clientconn = reactor.connectTCP("localhost", 27184,
JMTestClientProtocolFactory()) JMTestClientProtocolFactory())
self.addCleanup(clientconn.disconnect) self.addCleanup(clientconn.disconnect)
print("Got here")
def test_waiter(self): def test_waiter(self):
print("test_main()")
return task.deferLater(reactor, 12, self._called_by_deffered) return task.deferLater(reactor, 12, self._called_by_deffered)
def _called_by_deffered(self): def _called_by_deffered(self):
@ -299,18 +314,15 @@ class TestJMDaemonProtoInit(unittest.TestCase):
def setUp(self): def setUp(self):
global end_early global end_early
end_early = True end_early = True
print("setUp()")
load_program_config() load_program_config()
jm_single().maker_timeout_sec = 1 jm_single().maker_timeout_sec = 1
self.port = reactor.listenTCP(27184, JMDaemonServerProtocolFactory()) self.port = reactor.listenTCP(27184, JMDaemonTest2ServerProtocolFactory())
self.addCleanup(self.port.stopListening) self.addCleanup(self.port.stopListening)
clientconn = reactor.connectTCP("localhost", 27184, clientconn = reactor.connectTCP("localhost", 27184,
JMTestClientProtocolFactory()) JMTestClientProtocolFactory())
self.addCleanup(clientconn.disconnect) self.addCleanup(clientconn.disconnect)
print("Got here")
def test_waiter(self): def test_waiter(self):
print("test_main()")
return task.deferLater(reactor, 5, self._called_by_deffered) return task.deferLater(reactor, 5, self._called_by_deffered)
def _called_by_deffered(self): def _called_by_deffered(self):

140
jmdaemon/test/test_irc_messaging.py

@ -9,18 +9,18 @@ import pytest
import time import time
import threading import threading
import hashlib import hashlib
from twisted.trial import unittest
from twisted.internet import reactor, task, defer
import jmbitcoin as btc import jmbitcoin as btc
from jmdaemon import (JOINMARKET_NICK_HEADER, NICK_HASH_LENGTH, from jmdaemon import (JOINMARKET_NICK_HEADER, NICK_HASH_LENGTH,
NICK_MAX_ENCODED, IRCMessageChannel) NICK_MAX_ENCODED, IRCMessageChannel,
MessageChannelCollection)
from jmdaemon.message_channel import CJPeerError from jmdaemon.message_channel import CJPeerError
import jmdaemon import jmdaemon
#needed for test framework #needed for test framework
from jmclient import (load_program_config, get_irc_mchannels, jm_single) from jmclient import (load_program_config, get_irc_mchannels, jm_single)
python_cmd = "python2" si = 1
yg_cmd = "yield-generator-basic.py"
yg_name = "ygtest"
si = 3
class DummyDaemon(object): class DummyDaemon(object):
def request_signature_verify(self, a, b, c, d, e, def request_signature_verify(self, a, b, c, d, e,
f, g, h): f, g, h):
@ -29,28 +29,19 @@ class DummyDaemon(object):
class DummyMC(IRCMessageChannel): class DummyMC(IRCMessageChannel):
def __init__(self, configdata, nick, daemon): def __init__(self, configdata, nick, daemon):
super(DummyMC, self).__init__(configdata, daemon=daemon) super(DummyMC, self).__init__(configdata, daemon=daemon)
"""
#hacked in here to allow auth without mc-collection
nick_priv = hashlib.sha256(os.urandom(16)).hexdigest() + '01'
nick_pubkey = btc.privtopub(nick_priv)
nick_pkh_raw = hashlib.sha256(nick_pubkey).digest()[
:NICK_HASH_LENGTH]
nick_pkh = btc.changebase(nick_pkh_raw, 256, 58)
#right pad to maximum possible; b58 is not fixed length.
#Use 'O' as one of the 4 not included chars in base58.
nick_pkh += 'O' * (NICK_MAX_ENCODED - len(nick_pkh))
#The constructed length will be 1 + 1 + NICK_MAX_ENCODED
nick = JOINMARKET_NICK_HEADER + str(
jm_single().JM_VERSION) + nick_pkh
jm_single().nickname = nick
"""
self.daemon = daemon self.daemon = daemon
self.set_nick(nick) self.set_nick(nick)
def on_connect(x): def on_connect(x):
print('simulated on-connect') print('simulated on-connect')
def on_welcome(x): def on_welcome(mc):
print('simulated on-welcome') print('simulated on-welcome')
if mc.nick == "irc_publisher":
d = task.deferLater(reactor, 3.0, junk_pubmsgs, mc)
d.addCallback(junk_longmsgs)
d.addCallback(junk_announce)
d.addCallback(junk_fill)
def on_disconnect(x): def on_disconnect(x):
print('simulated on-disconnect') print('simulated on-disconnect')
@ -62,40 +53,9 @@ def on_order_seen(dummy, counterparty, oid, ordertype, minsize,
def on_pubkey(pubkey): def on_pubkey(pubkey):
print "received pubkey: " + pubkey print "received pubkey: " + pubkey
class RawIRCThread(threading.Thread): def junk_pubmsgs(mc):
def __init__(self, ircmsgchan):
threading.Thread.__init__(self, name='RawIRCThread')
self.daemon = True
self.ircmsgchan = ircmsgchan
def run(self):
self.ircmsgchan.run()
def test_junk_messages(setup_messaging):
#start a yg bot just to receive messages
"""
wallets = make_wallets(1,
wallet_structures=[[1,0,0,0,0]],
mean_amt=1)
wallet = wallets[0]['wallet']
ygp = local_command([python_cmd, yg_cmd,\
str(wallets[0]['seed'])], bg=True)
"""
#time.sleep(90)
#start a raw IRCMessageChannel instance in a thread; #start a raw IRCMessageChannel instance in a thread;
#then call send_* on it with various errant messages #then call send_* on it with various errant messages
dm = DummyDaemon()
mc = DummyMC(get_irc_mchannels()[0], "irc_ping_test", dm)
mc.register_orderbookwatch_callbacks(on_order_seen=on_order_seen)
mc.register_taker_callbacks(on_pubkey=on_pubkey)
mc.on_connect = on_connect
mc.on_disconnect = on_disconnect
mc.on_welcome = on_welcome
RawIRCThread(mc).start()
#start up a fake counterparty
mc2 = DummyMC(get_irc_mchannels()[0], yg_name, dm)
RawIRCThread(mc2).start()
time.sleep(si) time.sleep(si)
mc.request_orderbook() mc.request_orderbook()
time.sleep(si) time.sleep(si)
@ -104,50 +64,84 @@ def test_junk_messages(setup_messaging):
time.sleep(si) time.sleep(si)
#should be ignored; can we check? #should be ignored; can we check?
mc.pubmsg("!orderbook!orderbook") mc.pubmsg("!orderbook!orderbook")
time.sleep(si) return mc
def junk_longmsgs(mc):
#assuming MAX_PRIVMSG_LEN is not something crazy #assuming MAX_PRIVMSG_LEN is not something crazy
#big like 550, this should fail #big like 550, this should fail
with pytest.raises(AssertionError) as e_info: #with pytest.raises(AssertionError) as e_info:
mc.pubmsg("junk and crap"*40) mc.pubmsg("junk and crap"*40)
time.sleep(si) time.sleep(si)
#assuming MAX_PRIVMSG_LEN is not something crazy #assuming MAX_PRIVMSG_LEN is not something crazy
#small like 180, this should succeed #small like 180, this should succeed
mc.pubmsg("junk and crap"*15) mc.pubmsg("junk and crap"*15)
time.sleep(si) time.sleep(si)
return mc
def junk_announce(mc):
#try a long order announcement in public #try a long order announcement in public
#because we don't want to build a real orderbook, #because we don't want to build a real orderbook,
#call the underlying IRC announce function. #call the underlying IRC announce function.
#TODO: how to test that the sent format was correct? #TODO: how to test that the sent format was correct?
print('got here')
mc._announce_orders(["!abc def gh 0001"]*30) mc._announce_orders(["!abc def gh 0001"]*30)
time.sleep(si) time.sleep(si)
return mc
def junk_fill(mc):
cpname = "irc_receiver"
#send a fill with an invalid pubkey to the existing yg; #send a fill with an invalid pubkey to the existing yg;
#this should trigger a NaclError but should NOT kill it. #this should trigger a NaclError but should NOT kill it.
mc._privmsg(yg_name, "fill", "0 10000000 abcdef") mc._privmsg(cpname, "fill", "0 10000000 abcdef")
#Test that null privmsg does not cause crash; TODO check maker log?
mc.send_raw("PRIVMSG " + yg_name + " :")
time.sleep(si)
#Try with ob flag #Try with ob flag
mc._pubmsg("!reloffer stuff") mc._pubmsg("!reloffer stuff")
time.sleep(si) time.sleep(si)
#Trigger throttling with large messages #Trigger throttling with large messages
mc._privmsg(yg_name, "tx", "aa"*5000) mc._privmsg(cpname, "tx", "aa"*5000)
time.sleep(si) time.sleep(si)
#with pytest.raises(CJPeerError) as e_info: #with pytest.raises(CJPeerError) as e_info:
mc.send_error(yg_name, "fly you fools!") mc.send_error(cpname, "fly you fools!")
time.sleep(si) time.sleep(si)
#Test the effect of shutting down the connection return mc
mc.set_reconnect_interval(si-1)
mc.close() def getmc(nick):
mc._announce_orders(["!abc def gh 0001"]*30) dm = DummyDaemon()
time.sleep(si+2) mc = DummyMC(get_irc_mchannels()[0], nick, dm)
#kill the connection at socket level mc.register_orderbookwatch_callbacks(on_order_seen=on_order_seen)
mc.shutdown() mc.register_taker_callbacks(on_pubkey=on_pubkey)
mc.on_connect = on_connect
@pytest.fixture(scope="module") mc.on_disconnect = on_disconnect
def setup_messaging(): mc.on_welcome = on_welcome
#Trigger PING LAG sending artificially mcc = MessageChannelCollection([mc])
jmdaemon.irc.PING_INTERVAL = 3 return dm, mc, mcc
class TrialIRC(unittest.TestCase):
def setUp(self):
load_program_config() load_program_config()
print(get_irc_mchannels()[0])
jm_single().maker_timeout_sec = 1
dm, mc, mcc = getmc("irc_publisher")
dm2, mc2, mcc2 = getmc("irc_receiver")
mcc.run()
mcc2.run()
def cb(m):
#don't try to reconnect
m.give_up = True
m.tcp_connector.disconnect()
self.addCleanup(cb, mc)
self.addCleanup(cb, mc2)
#test_junk_messages()
print("Got here")
def test_waiter(self):
print("test_main()")
#reactor.callLater(1.0, junk_messages, self.mcc)
return task.deferLater(reactor, 32, self._called_by_deffered)
def _called_by_deffered(self):
pass

19
jmdaemon/test/test_message_channel.py

@ -22,7 +22,7 @@ import traceback
import threading import threading
import jmbitcoin as bitcoin import jmbitcoin as bitcoin
from dummy_mc import DummyMessageChannel from dummy_mc import DummyMessageChannel
from twisted.internet import reactor
jlog = get_log() jlog = get_log()
def make_valid_nick(i=0): def make_valid_nick(i=0):
@ -350,25 +350,10 @@ def test_mc_run(failuretype, mcindex, wait):
ob.set_msgchan(mcc) ob.set_msgchan(mcc)
dummydaemon = DaemonForSigns(mcc) dummydaemon = DaemonForSigns(mcc)
mcc.set_daemon(dummydaemon) mcc.set_daemon(dummydaemon)
#to externally trigger give up condition, start mcc itself in a thread
MChannelThread(mcc).start()
time.sleep(0.2)
mcc.give_up = True
time.sleep(1.2)
#wipe state, this time use failure injections
mcc = MessageChannelCollection(dmcs)
#to test exception raise on bad failure inject, don't use thread:
if failuretype == "bad":
with pytest.raises(NotImplementedError) as e_info:
mcc.run(failures=[failuretype, mcindex, wait])
else:
#need to override thread run() #need to override thread run()
class FIThread(MChannelThread): class FIThread(MChannelThread):
def run(self): def run(self):
self.mc.run(failures=self.failures) self.mc.run()
fi = FIThread(mcc) fi = FIThread(mcc)
fi.failures = [failuretype, mcindex, wait]
fi.start() fi.start()
time.sleep(wait+0.5) time.sleep(wait+0.5)
Loading…
Cancel
Save