7 changed files with 362 additions and 354 deletions
@ -0,0 +1,349 @@ |
|||||||
|
#! /usr/bin/env python |
||||||
|
from __future__ import print_function |
||||||
|
|
||||||
|
from .message_channel import MessageChannelCollection |
||||||
|
from .orderbookwatch import OrderbookWatch |
||||||
|
from .enc_wrapper import (as_init_encryption, init_keypair, init_pubkey, |
||||||
|
NaclError) |
||||||
|
from .protocol import (COMMAND_PREFIX, ORDER_KEYS, NICK_HASH_LENGTH, |
||||||
|
NICK_MAX_ENCODED, JM_VERSION, JOINMARKET_NICK_HEADER) |
||||||
|
from .irc import IRCMessageChannel |
||||||
|
|
||||||
|
from jmbase.commands import * |
||||||
|
from twisted.protocols import amp |
||||||
|
from twisted.internet import reactor |
||||||
|
from twisted.internet.protocol import ServerFactory |
||||||
|
from twisted.internet.error import (ConnectionLost, ConnectionAborted, |
||||||
|
ConnectionClosed, ConnectionDone) |
||||||
|
from twisted.python import failure, log |
||||||
|
import json |
||||||
|
import time |
||||||
|
import threading |
||||||
|
|
||||||
|
|
||||||
|
"""Joinmarket application protocol control flow. |
||||||
|
For documentation on protocol (formats, message sequence) see |
||||||
|
https://github.com/JoinMarket-Org/JoinMarket-Docs/blob/master/ |
||||||
|
Joinmarket-messaging-protocol.md |
||||||
|
""" |
||||||
|
""" |
||||||
|
*** |
||||||
|
API |
||||||
|
*** |
||||||
|
The client-daemon two-way communication is documented in jmbase.commands.py |
||||||
|
""" |
||||||
|
|
||||||
|
|
||||||
|
class MCThread(threading.Thread): |
||||||
|
|
||||||
|
def __init__(self, mc): |
||||||
|
threading.Thread.__init__(self, name='MCThread') |
||||||
|
self.mc = mc |
||||||
|
self.daemon = True |
||||||
|
|
||||||
|
def run(self): |
||||||
|
self.mc.run() |
||||||
|
|
||||||
|
|
||||||
|
class JMProtocolError(Exception): |
||||||
|
pass |
||||||
|
|
||||||
|
class JMDaemonServerProtocol(amp.AMP, OrderbookWatch): |
||||||
|
|
||||||
|
def __init__(self, factory): |
||||||
|
self.factory = factory |
||||||
|
self.jm_state = 0 |
||||||
|
self.restart_mc_required = False |
||||||
|
self.irc_configs = None |
||||||
|
self.mcc = None |
||||||
|
self.sig_lock = threading.Lock() |
||||||
|
|
||||||
|
def checkClientResponse(self, response): |
||||||
|
"""A generic check of client acceptance; any failure |
||||||
|
is considered criticial. |
||||||
|
""" |
||||||
|
if 'accepted' not in response or not response['accepted']: |
||||||
|
reactor.stop() |
||||||
|
|
||||||
|
def defaultErrback(self, failure): |
||||||
|
failure.trap(ConnectionAborted, ConnectionClosed, ConnectionDone, ConnectionLost) |
||||||
|
|
||||||
|
def defaultCallbacks(self, d): |
||||||
|
d.addCallback(self.checkClientResponse) |
||||||
|
d.addErrback(self.defaultErrback) |
||||||
|
|
||||||
|
@JMInit.responder |
||||||
|
def on_JM_INIT(self, bcsource, network, irc_configs, minmakers, |
||||||
|
maker_timeout_sec): |
||||||
|
"""Reads in required configuration from client for a new |
||||||
|
session; feeds back joinmarket messaging protocol constants |
||||||
|
(required for nick creation). |
||||||
|
If a new message channel configuration is required, the current |
||||||
|
one is shutdown in preparation. |
||||||
|
""" |
||||||
|
self.maker_timeout_sec = int(maker_timeout_sec) |
||||||
|
self.minmakers = int(minmakers) |
||||||
|
irc_configs = json.loads(irc_configs) |
||||||
|
#(bitcoin) network only referenced in channel name construction |
||||||
|
self.network = network |
||||||
|
if irc_configs == self.irc_configs: |
||||||
|
self.restart_mc_required = False |
||||||
|
log.msg("New init received did not require a new message channel" |
||||||
|
" setup.") |
||||||
|
else: |
||||||
|
if self.irc_configs: |
||||||
|
#close the existing connections |
||||||
|
self.mc_shutdown() |
||||||
|
self.irc_configs = irc_configs |
||||||
|
self.restart_mc_required = True |
||||||
|
mcs = [IRCMessageChannel(c, |
||||||
|
daemon=self, |
||||||
|
realname='btcint=' + bcsource) |
||||||
|
for c in self.irc_configs] |
||||||
|
self.mcc = MessageChannelCollection(mcs) |
||||||
|
OrderbookWatch.set_msgchan(self, self.mcc) |
||||||
|
#register taker-specific msgchan callbacks here |
||||||
|
self.mcc.register_taker_callbacks(self.on_error, self.on_pubkey, |
||||||
|
self.on_ioauth, self.on_sig) |
||||||
|
self.mcc.set_daemon(self) |
||||||
|
d = self.callRemote(JMInitProto, |
||||||
|
nick_hash_length=NICK_HASH_LENGTH, |
||||||
|
nick_max_encoded=NICK_MAX_ENCODED, |
||||||
|
joinmarket_nick_header=JOINMARKET_NICK_HEADER, |
||||||
|
joinmarket_version=JM_VERSION) |
||||||
|
self.defaultCallbacks(d) |
||||||
|
return {'accepted': True} |
||||||
|
|
||||||
|
@JMStartMC.responder |
||||||
|
def on_JM_START_MC(self, nick): |
||||||
|
"""Starts message channel threads, if we are working with |
||||||
|
a new message channel configuration. Sets new nick if required. |
||||||
|
JM_UP will be called when the welcome messages are received. |
||||||
|
""" |
||||||
|
self.init_connections(nick) |
||||||
|
return {'accepted': True} |
||||||
|
|
||||||
|
def init_connections(self, nick): |
||||||
|
"""Sets up message channel connections |
||||||
|
if they are not already up; re-sets joinmarket state to 0 |
||||||
|
for a new transaction; effectively means any previous |
||||||
|
incomplete transaction is wiped. |
||||||
|
""" |
||||||
|
self.jm_state = 0 #uninited |
||||||
|
if self.restart_mc_required: |
||||||
|
MCThread(self.mcc).start() |
||||||
|
self.restart_mc_required = False |
||||||
|
else: |
||||||
|
#if we are not restarting the MC, |
||||||
|
#we must simulate the on_welcome message: |
||||||
|
self.on_welcome() |
||||||
|
self.mcc.set_nick(nick) |
||||||
|
|
||||||
|
|
||||||
|
def on_welcome(self): |
||||||
|
"""Fired when channel indicated state readiness |
||||||
|
""" |
||||||
|
d = self.callRemote(JMUp) |
||||||
|
self.defaultCallbacks(d) |
||||||
|
|
||||||
|
@JMSetup.responder |
||||||
|
def on_JM_SETUP(self, role, n_counterparties): |
||||||
|
assert self.jm_state == 0 |
||||||
|
assert n_counterparties > 1 |
||||||
|
#TODO consider MAKER role implementation here |
||||||
|
assert role == "TAKER" |
||||||
|
self.requested_counterparties = n_counterparties |
||||||
|
self.crypto_boxes = {} |
||||||
|
self.kp = init_keypair() |
||||||
|
print("Received setup command") |
||||||
|
d = self.callRemote(JMSetupDone) |
||||||
|
self.defaultCallbacks(d) |
||||||
|
#Request orderbook here, on explicit setup request from client, |
||||||
|
#assumes messagechannels are in "up" state. Orders are read |
||||||
|
#in the callback on_order_seen in OrderbookWatch. |
||||||
|
#TODO: pubmsg should not (usually?) fire if already up from previous run. |
||||||
|
self.mcc.pubmsg(COMMAND_PREFIX + "orderbook") |
||||||
|
self.jm_state = 1 |
||||||
|
return {'accepted': True} |
||||||
|
|
||||||
|
@JMRequestOffers.responder |
||||||
|
def on_JM_REQUEST_OFFERS(self): |
||||||
|
"""Reports the current state of the orderbook. |
||||||
|
This call is stateless.""" |
||||||
|
rows = self.db.execute('SELECT * FROM orderbook;').fetchall() |
||||||
|
self.orderbook = [dict([(k, o[k]) for k in ORDER_KEYS]) for o in rows] |
||||||
|
log.msg("About to send orderbook of size: " + str(len(self.orderbook))) |
||||||
|
string_orderbook = json.dumps(self.orderbook) |
||||||
|
d = self.callRemote(JMOffers, |
||||||
|
orderbook=string_orderbook) |
||||||
|
self.defaultCallbacks(d) |
||||||
|
return {'accepted': True} |
||||||
|
|
||||||
|
@JMFill.responder |
||||||
|
def on_JM_FILL(self, amount, commitment, revelation, filled_offers): |
||||||
|
if not (self.jm_state == 1 and isinstance(amount, int) and amount >=0): |
||||||
|
return {'accepted': False} |
||||||
|
self.cjamount = amount |
||||||
|
self.commitment = commitment |
||||||
|
self.revelation = revelation |
||||||
|
#Reset utxo data to null for this new transaction |
||||||
|
self.ioauth_data = {} |
||||||
|
self.active_orders = json.loads(filled_offers) |
||||||
|
for nick, offer_dict in self.active_orders.iteritems(): |
||||||
|
offer_fill_msg = " ".join([str(offer_dict["oid"]), str(amount), str( |
||||||
|
self.kp.hex_pk()), str(commitment)]) |
||||||
|
self.mcc.prepare_privmsg(nick, "fill", offer_fill_msg) |
||||||
|
reactor.callLater(self.maker_timeout_sec, self.completeStage1) |
||||||
|
self.jm_state = 2 |
||||||
|
return {'accepted': True} |
||||||
|
|
||||||
|
def on_pubkey(self, nick, maker_pk): |
||||||
|
"""This is handled locally in the daemon; set up e2e |
||||||
|
encrypted messaging with this counterparty |
||||||
|
""" |
||||||
|
if nick not in self.active_orders.keys(): |
||||||
|
log.msg("Counterparty not part of this transaction. Ignoring") |
||||||
|
return |
||||||
|
try: |
||||||
|
self.crypto_boxes[nick] = [maker_pk, as_init_encryption( |
||||||
|
self.kp, init_pubkey(maker_pk))] |
||||||
|
except NaclError as e: |
||||||
|
print("Unable to setup crypto box with " + nick + ": " + repr(e)) |
||||||
|
self.mcc.send_error(nick, "invalid nacl pubkey: " + maker_pk) |
||||||
|
return |
||||||
|
self.mcc.prepare_privmsg(nick, "auth", str(self.revelation)) |
||||||
|
|
||||||
|
def on_ioauth(self, nick, utxo_list, auth_pub, cj_addr, change_addr, |
||||||
|
btc_sig): |
||||||
|
"""Passes through to Taker the information from counterparties once |
||||||
|
they've all been received; note that we must also pass back the maker_pk |
||||||
|
so it can be verified against the btc-sigs for anti-MITM |
||||||
|
""" |
||||||
|
if nick not in self.active_orders.keys(): |
||||||
|
print("Got an unexpected ioauth from nick: " + str(nick)) |
||||||
|
return |
||||||
|
self.ioauth_data[nick] = [utxo_list, auth_pub, cj_addr, change_addr, |
||||||
|
btc_sig, self.crypto_boxes[nick][0]] |
||||||
|
if self.ioauth_data.keys() == self.active_orders.keys(): |
||||||
|
#Finish early if we got all |
||||||
|
self.respondToIoauths(True) |
||||||
|
|
||||||
|
def respondToIoauths(self, accepted): |
||||||
|
if self.jm_state != 2: |
||||||
|
#this can be called a second time on timeout, in which case we |
||||||
|
#do nothing |
||||||
|
return |
||||||
|
d = self.callRemote(JMFillResponse, |
||||||
|
success=accepted, |
||||||
|
ioauth_data = json.dumps(self.ioauth_data)) |
||||||
|
if not accepted: |
||||||
|
#Client simply accepts failure TODO |
||||||
|
self.defaultCallbacks(d) |
||||||
|
else: |
||||||
|
#Act differently if *we* provided utxos, but |
||||||
|
#client does not accept for some reason |
||||||
|
d.addCallback(self.checkUtxosAccepted) |
||||||
|
d.addErrback(self.defaultErrback) |
||||||
|
|
||||||
|
def completeStage1(self): |
||||||
|
"""Timeout of stage 1 requests; |
||||||
|
either send success + ioauth data if enough makers, |
||||||
|
else send failure to client. |
||||||
|
""" |
||||||
|
response = True if len(self.ioauth_data.keys()) >= self.minmakers else False |
||||||
|
self.respondToIoauths(response) |
||||||
|
|
||||||
|
def checkUtxosAccepted(self, accepted): |
||||||
|
if not accepted: |
||||||
|
log.msg("Taker rejected utxos provided; resetting.") |
||||||
|
#TODO create re-set function to start again |
||||||
|
else: |
||||||
|
#only update state if client accepted |
||||||
|
self.jm_state = 3 |
||||||
|
|
||||||
|
@JMMakeTx.responder |
||||||
|
def on_JM_MAKE_TX(self, nick_list, txhex): |
||||||
|
if not self.jm_state == 3: |
||||||
|
log.msg("Make tx was called in wrong state, rejecting") |
||||||
|
return {'accepted': False} |
||||||
|
nick_list = json.loads(nick_list) |
||||||
|
self.mcc.send_tx(nick_list, txhex) |
||||||
|
return {'accepted': True} |
||||||
|
|
||||||
|
def on_sig(self, nick, sig): |
||||||
|
"""Pass signature through to Taker. |
||||||
|
""" |
||||||
|
d = self.callRemote(JMSigReceived, |
||||||
|
nick=nick, |
||||||
|
sig=sig) |
||||||
|
self.defaultCallbacks(d) |
||||||
|
|
||||||
|
"""The following functions handle requests and responses |
||||||
|
from client for messaging signing and verifying. |
||||||
|
""" |
||||||
|
def request_signed_message(self, nick, cmd, msg, msg_to_be_signed, hostid): |
||||||
|
"""The daemon passes the nick and cmd fields |
||||||
|
to the client so it can be echoed back to the privmsg |
||||||
|
after return (with signature); note that the cmd is already |
||||||
|
inside "msg" after having been parsed in MessageChannel; this |
||||||
|
duplication is so that the client does not need to know the |
||||||
|
message syntax. |
||||||
|
""" |
||||||
|
with self.sig_lock: |
||||||
|
d = self.callRemote(JMRequestMsgSig, |
||||||
|
nick=str(nick), |
||||||
|
cmd=str(cmd), |
||||||
|
msg=str(msg), |
||||||
|
msg_to_be_signed=str(msg_to_be_signed), |
||||||
|
hostid=str(hostid)) |
||||||
|
self.defaultCallbacks(d) |
||||||
|
|
||||||
|
def request_signature_verify(self, msg, fullmsg, sig, pubkey, nick, hashlen, |
||||||
|
max_encoded, hostid): |
||||||
|
with self.sig_lock: |
||||||
|
d = self.callRemote(JMRequestMsgSigVerify, |
||||||
|
msg=msg, |
||||||
|
fullmsg=fullmsg, |
||||||
|
sig=sig, |
||||||
|
pubkey=pubkey, |
||||||
|
nick=nick, |
||||||
|
hashlen=hashlen, |
||||||
|
max_encoded=max_encoded, |
||||||
|
hostid=hostid) |
||||||
|
self.defaultCallbacks(d) |
||||||
|
|
||||||
|
@JMMsgSignature.responder |
||||||
|
def on_JM_MSGSIGNATURE(self, nick, cmd, msg_to_return, hostid): |
||||||
|
self.mcc.privmsg(nick, cmd, msg_to_return, mc=hostid) |
||||||
|
return {'accepted': True} |
||||||
|
|
||||||
|
@JMMsgSignatureVerify.responder |
||||||
|
def on_JM_MSGSIGNATURE_VERIFY(self, verif_result, nick, fullmsg, hostid): |
||||||
|
if not verif_result: |
||||||
|
log.msg("Verification failed for nick: " + str(nick)) |
||||||
|
else: |
||||||
|
self.mcc.on_verified_privmsg(nick, fullmsg, hostid) |
||||||
|
return {'accepted': True} |
||||||
|
|
||||||
|
def get_crypto_box_from_nick(self, nick): |
||||||
|
if nick in self.crypto_boxes and self.crypto_boxes[nick] != None: |
||||||
|
return self.crypto_boxes[nick][1] # libsodium encryption object |
||||||
|
else: |
||||||
|
log.msg('something wrong, no crypto object, nick=' + nick + |
||||||
|
', message will be dropped') |
||||||
|
return None |
||||||
|
|
||||||
|
def on_error(self): |
||||||
|
log.msg("Unimplemented on_error") |
||||||
|
|
||||||
|
def mc_shutdown(self): |
||||||
|
log.msg("Message channels being shutdown by daemon") |
||||||
|
if self.mcc: |
||||||
|
self.mcc.shutdown() |
||||||
|
|
||||||
|
|
||||||
|
class JMDaemonServerProtocolFactory(ServerFactory): |
||||||
|
protocol = JMDaemonServerProtocol |
||||||
|
|
||||||
|
def buildProtocol(self, addr): |
||||||
|
return JMDaemonServerProtocol(self) |
||||||
Loading…
Reference in new issue