@ -7,243 +7,352 @@ import random
import socket
import threading
import time
import sys
import ssl
from twisted . python . log import startLogging
from twisted . internet . protocol import ClientFactory , Protocol
from twisted . internet . ssl import ClientContextFactory
from twisted . protocols . basic import LineReceiver
from twisted . internet import reactor , task , defer
from . blockchaininterface import BlockchainInterface , is_index_ahead_of_cache
from . configure import get_p2pk_vbyte
from . configure import get_p2sh _vbyte
from . support import get_log
from . electrum_data import ( get_default_ports , get_default_servers ,
set_electrum_testnet , DEFAULT_PROTO )
log = get_log ( )
# Default server list from electrum client
# https://github.com/spesmilo/electrum/blob/753a28b452dca1023fbde548469c36a34555dc95/lib/network.py
DEFAULT_ELECTRUM_SERVER_LIST = [
' erbium1.sytes.net:50001 ' ,
' ecdsa.net:50001 ' ,
' electrum0.electricnewyear.net:50001 ' ,
' VPS.hsmiths.com:50001 ' ,
' ELECTRUM.jdubya.info:50001 ' ,
' electrum.no-ip.org:50001 ' ,
' us.electrum.be:50001 ' ,
' bitcoins.sk:50001 ' ,
' electrum.petrkr.net:50001 ' ,
' electrum.dragonzone.net:50001 ' ,
' Electrum.hsmiths.com:8080 ' ,
' electrum3.hachre.de:50001 ' ,
' elec.luggs.co:80 ' ,
' btc.smsys.me:110 ' ,
' electrum.online:50001 ' ,
]
class ElectrumConnectionError ( Exception ) :
pass
class ElectrumInterface ( BlockchainInterface ) :
class TxElectrumClientProtocol ( LineReceiver ) :
#map deferreds to msgids to correctly link response with request
deferreds = { }
delimiter = " \n "
def __init__ ( self , factory ) :
self . factory = factory
def connectionMade ( self ) :
log . debug ( ' connection to Electrum succesful ' )
self . msg_id = 0
self . factory . bci . sync_addresses ( self . factory . bci . wallet )
self . start_ping ( )
self . call_server_method ( ' blockchain.numblocks.subscribe ' )
def start_ping ( self ) :
pingloop = task . LoopingCall ( self . ping )
pingloop . start ( 60.0 )
def ping ( self ) :
#We dont bother tracking response to this;
#just for keeping connection active
self . call_server_method ( ' server.version ' )
def send_json ( self , json_data ) :
data = json . dumps ( json_data ) . encode ( )
self . sendLine ( data )
def call_server_method ( self , method , params = [ ] ) :
self . msg_id = self . msg_id + 1
current_id = self . msg_id
self . deferreds [ current_id ] = defer . Deferred ( )
method_dict = {
' id ' : current_id ,
' method ' : method ,
' params ' : params
}
self . send_json ( method_dict )
return self . deferreds [ current_id ]
def lineReceived ( self , line ) :
try :
parsed = json . loads ( line )
msgid = parsed [ ' id ' ]
linked_deferred = self . deferreds [ msgid ]
except :
log . debug ( " Ignored response from Electrum server: " + str ( line ) )
return
linked_deferred . callback ( parsed )
class TxElectrumClientProtocolFactory ( ClientFactory ) :
def __init__ ( self , bci ) :
self . bci = bci
def buildProtocol ( self , addr ) :
self . client = TxElectrumClientProtocol ( self )
return self . client
def clientConnectionLost ( self , connector , reason ) :
log . debug ( ' Electrum connection lost, reason: ' + str ( reason ) )
self . bci . start_electrum_proto ( None )
class ElectrumConn ( threading . Thread ) :
def __init__ ( self , electrum_server ) :
threading . Thread . __init__ ( self )
self . daemon = True
self . msg_id = 0
self . RetQueue = Queue . Queue ( )
try :
self . s = socket . create_connection ( ( electrum_server . split ( ' : ' ) [ 0 ] ,
int ( electrum_server . split ( ' : ' ) [ 1 ] ) ) )
except Exception as e :
log . error ( " Error connecting to electrum server. "
" Try again to connect to a random server or set a "
" server in the config. " )
def clientConnectionFailed ( self , connector , reason ) :
print ( ' connection failed ' )
self . bci . start_electrum_proto ( None )
class ElectrumConn ( threading . Thread ) :
def __init__ ( self , server , port , proto ) :
threading . Thread . __init__ ( self )
self . daemon = True
self . msg_id = 0
self . RetQueue = Queue . Queue ( )
try :
if proto == ' t ' :
self . s = socket . create_connection ( ( server , int ( port ) ) )
elif proto == ' s ' :
self . raw_socket = socket . socket ( socket . AF_INET , socket . SOCK_STREAM )
#reads are sometimes quite slow, so conservative, but we must
#time out a completely hanging connection.
self . raw_socket . settimeout ( 60 )
self . raw_socket . connect ( ( server , int ( port ) ) )
self . s = ssl . wrap_socket ( self . raw_socket )
else :
#Wrong proto is not accepted for restarts
log . error ( " Failure to connect to Electrum, "
" protocol must be TCP or SSL. " )
os . _exit ( 1 )
self . ping ( )
except Exception as e :
log . error ( " Error connecting to electrum server; trying again. " )
raise ElectrumConnectionError
self . ping ( )
def run ( self ) :
while True :
all_data = None
while True :
data = self . s . recv ( 1024 )
if data is None :
continue
if all_data is None :
all_data = data
else :
all_data = all_data + data
if ' \n ' in all_data :
break
data_json = json . loads ( all_data [ : - 1 ] . decode ( ) )
self . RetQueue . put ( data_json )
def ping ( self ) :
log . debug ( ' sending server ping ' )
self . send_json ( { ' id ' : 0 , ' method ' : ' server.version ' , ' params ' : [ ] } )
t = threading . Timer ( 60 , self . ping )
t . daemon = True
t . start ( )
def send_json ( self , json_data ) :
data = json . dumps ( json_data ) . encode ( )
self . s . send ( data + b ' \n ' )
def call_server_method ( self , method , params = [ ] ) :
self . msg_id = self . msg_id + 1
current_id = self . msg_id
method_dict = {
' id ' : current_id ,
' method ' : method ,
' params ' : params
}
self . send_json ( method_dict )
def run ( self ) :
while True :
all_data = None
while True :
ret_data = self . RetQueue . get ( )
if ret_data . get ( ' id ' , None ) == current_id :
return ret_data
data = self . s . recv ( 1024 )
if data is None :
continue
if all_data is None :
all_data = data
else :
log . debug ( json . dumps ( ret_data ) )
all_data = all_data + data
if ' \n ' in all_data :
break
data_json = json . loads ( all_data [ : - 1 ] . decode ( ) )
self . RetQueue . put ( data_json )
def __init__ ( self , testnet = False , electrum_server = None ) :
super ( ElectrumInterface , self ) . __init__ ( )
def ping ( self ) :
log . debug ( ' Sending Electrum server ping ' )
self . send_json ( { ' id ' : 0 , ' method ' : ' server.version ' , ' params ' : [ ] } )
t = threading . Timer ( 60 , self . ping )
t . daemon = True
t . start ( )
def send_json ( self , json_data ) :
data = json . dumps ( json_data ) . encode ( )
self . s . send ( data + b ' \n ' )
def call_server_method ( self , method , params = [ ] ) :
self . msg_id = self . msg_id + 1
current_id = self . msg_id
method_dict = {
' id ' : current_id ,
' method ' : method ,
' params ' : params
}
self . send_json ( method_dict )
while True :
ret_data = self . RetQueue . get ( )
if ret_data . get ( ' id ' , None ) == current_id :
return ret_data
else :
log . debug ( json . dumps ( ret_data ) )
class ElectrumInterface ( BlockchainInterface ) :
BATCH_SIZE = 8
def __init__ ( self , testnet = False , electrum_server = None ) :
self . synctype = " sync-only "
if testnet :
raise Exception ( NotImplemented )
if electrum_server is None :
electrum_server = random . choice ( DEFAULT_ELECTRUM_SERVER_LIST )
self . server_domain = electrum_server . split ( ' : ' ) [ 0 ]
self . last_sync_unspent = 0
self . electrum_conn = self . ElectrumConn ( electrum_server )
set_electrum_testnet ( )
self . start_electrum_proto ( )
self . electrum_conn = None
self . start_connection_thread ( )
#task.LoopingCall objects that track transactions, keyed by txids.
#Format: {"txid": (loop, unconfirmed true/false, confirmed true/false,
#spent true/false), ..}
self . tx_watcher_loops = { }
self . wallet_synced = False
def start_electrum_proto ( self , electrum_server = None ) :
self . server , self . port = self . get_server ( electrum_server )
self . factory = TxElectrumClientProtocolFactory ( self )
if DEFAULT_PROTO == ' s ' :
ctx = ClientContextFactory ( )
reactor . connectSSL ( self . server , self . port , self . factory , ctx )
elif DEFAULT_PROTO == ' t ' :
reactor . connectTCP ( self . server , self . port , self . factory )
else :
raise Exception ( " Unrecognized connection protocol to Electrum, "
" should be one of ' t ' or ' s ' (TCP or SSL), "
" critical error, quitting. " )
def start_connection_thread ( self ) :
""" Initiate a thread that serves blocking, single
calls to an Electrum server . This won ' t usually be the
same server that ' s used to do sync (which, confusingly,
is asynchronous ) .
"""
try :
s , p = self . get_server ( None )
self . electrum_conn = ElectrumConn ( s , p , DEFAULT_PROTO )
except ElectrumConnectionError :
reactor . callLater ( 1.0 , self . start_connection_thread )
return
self . electrum_conn . start ( )
# used to hold open server conn
#used to hold open server conn
self . electrum_conn . call_server_method ( ' blockchain.numblocks.subscribe ' )
def get_from_electrum ( self , method , params = [ ] ) :
def sync_wallet ( self , wallet , restart_cb = False ) :
self . wallet = wallet
#wipe the temporary cache of address histories
self . temp_addr_history = { }
if self . synctype == " sync-only " :
reactor . run ( )
def get_server ( self , electrum_server ) :
if not electrum_server :
while True :
electrum_server = random . choice ( get_default_servers ( ) . keys ( ) )
if DEFAULT_PROTO in get_default_servers ( ) [ electrum_server ] :
break
s = electrum_server
p = int ( get_default_servers ( ) [ electrum_server ] [ DEFAULT_PROTO ] )
log . debug ( ' Trying to connect to Electrum server: ' + str ( electrum_server ) )
return ( s , p )
def get_from_electrum ( self , method , params = [ ] , blocking = False ) :
params = [ params ] if type ( params ) is not list else params
return self . electrum_conn . call_server_method ( method , params )
if blocking :
return self . electrum_conn . call_server_method ( method , params )
else :
return self . factory . client . call_server_method ( method , params )
def sync_addresses ( self , wallet ) :
log . debug ( " downloading wallet history from electrum server " )
for mix_depth in range ( wallet . max_mix_depth ) :
def sync_addresses ( self , wallet , restart_cb = None ) :
if not self . electrum_conn :
#wait until we have some connection up before starting
reactor . callLater ( 0.2 , self . sync_addresses , wallet , restart_cb )
return
log . debug ( " downloading wallet history from Electrum server ... " )
for mixdepth in range ( wallet . max_mix_depth ) :
for forchange in [ 0 , 1 ] :
unused_addr_count = 0
last_used_addr = ' '
while ( unused_addr_count < wallet . gaplimit or not is_index_ahead_of_cache ( wallet , mix_depth , forchange ) ) :
addr = wallet . get_new_addr ( mix_depth , forchange )
addr_hist_info = self . get_from_electrum ( ' blockchain.address.get_history ' , addr )
if len ( addr_hist_info [ ' result ' ] ) > 0 :
last_used_addr = addr
unused_addr_count = 0
else :
unused_addr_count + = 1
if last_used_addr == ' ' :
wallet . index [ mix_depth ] [ forchange ] = 0
self . synchronize_batch ( wallet , mixdepth , forchange , 0 )
def synchronize_batch ( self , wallet , mixdepth , forchange , start_index ) :
#for debugging only:
#log.debug("Syncing address batch, m, fc, i: " + ",".join(
# [str(x) for x in [mixdepth, forchange, start_index]]))
if mixdepth not in self . temp_addr_history :
self . temp_addr_history [ mixdepth ] = { }
if forchange not in self . temp_addr_history [ mixdepth ] :
self . temp_addr_history [ mixdepth ] [ forchange ] = { " finished " : False }
for i in range ( start_index , start_index + self . BATCH_SIZE ) :
#get_new_addr is OK here, as guaranteed to be sequential *on this branch*
a = wallet . get_new_addr ( mixdepth , forchange )
d = self . get_from_electrum ( ' blockchain.address.get_history ' , a )
d . addCallback ( self . process_address_history , wallet ,
mixdepth , forchange , i , a , start_index )
#makes sure entries in temporary address history are ready
#to be accessed.
if i not in self . temp_addr_history [ mixdepth ] [ forchange ] :
self . temp_addr_history [ mixdepth ] [ forchange ] [ i ] = { ' synced ' : False ,
' addr ' : a ,
' used ' : False }
def process_address_history ( self , history , wallet , mixdepth , forchange , i ,
addr , start_index ) :
""" Given the history data for an address from Electrum, update the current view
of the wallet ' s usage at mixdepth mixdepth and account forchange, address addr at
index i . Once all addresses from index start_index to start_index + self . BATCH_SIZE
have been thus updated , trigger either continuation to the next batch , or , if
conditions are fulfilled , end syncing for this ( mixdepth , forchange ) branch , and
if all such branches are finished , proceed to the sync_unspent step .
"""
tah = self . temp_addr_history [ mixdepth ] [ forchange ]
if len ( history [ ' result ' ] ) > 0 :
tah [ i ] [ ' used ' ] = True
tah [ i ] [ ' synced ' ] = True
#Having updated this specific record, check if the entire batch from start_index
#has been synchronized
if all ( [ tah [ i ] [ ' synced ' ] for i in range ( start_index , start_index + self . BATCH_SIZE ) ] ) :
#check if unused goes back as much as gaplimit *and* we are ahead of any
#existing index_cache from the wallet file; if both true, end, else, continue
#to next batch
if all ( [ tah [ i ] [ ' used ' ] is False for i in range (
start_index + self . BATCH_SIZE - wallet . gaplimit ,
start_index + self . BATCH_SIZE ) ] ) and is_index_ahead_of_cache (
wallet , mixdepth , forchange ) :
last_used_addr = None
#to find last used, note that it may be in the *previous* batch;
#may as well just search from the start, since it takes no time.
for i in range ( start_index + self . BATCH_SIZE ) :
if tah [ i ] [ ' used ' ] :
last_used_addr = tah [ i ] [ ' addr ' ]
if last_used_addr :
wallet . index [ mixdepth ] [ forchange ] = wallet . addr_cache [ last_used_addr ] [ 2 ] + 1
else :
wallet . index [ mix_depth ] [ forchange ] = wallet . addr_cache [ last_used_addr ] [ 2 ] + 1
wallet . index [ mixdepth ] [ forchange ] = 0
#account for index_cache
if not is_index_ahead_of_cache ( wallet , mixdepth , forchange ) :
wallet . index [ mixdepth ] [ forchange ] = wallet . index_cache [ mixdepth ] [ forchange ]
tah [ " finished " ] = True
#check if all branches are finished to trigger next stage of sync.
addr_sync_complete = True
for m in range ( wallet . max_mix_depth ) :
for fc in [ 0 , 1 ] :
if not self . temp_addr_history [ m ] [ fc ] [ " finished " ] :
addr_sync_complete = False
if addr_sync_complete :
self . sync_unspent ( wallet )
else :
#continue search forwards on this branch
self . synchronize_batch ( wallet , mixdepth , forchange , start_index + self . BATCH_SIZE )
def sync_unspent ( self , wallet ) :
# finds utxos in the wallet
st = time . time ( )
# dont refresh unspent dict more often than 5 minutes
rate_limit_time = 5 * 60
if st - self . last_sync_unspent < rate_limit_time :
log . debug ( ' electrum sync_unspent() happened too recently ( %d sec), skipping ' % ( st - self . last_sync_unspent ) )
return
wallet . unspent = { }
addrs = wallet . addr_cache . keys ( )
#Prepare list of all used addresses
addrs = [ ]
for m in range ( wallet . max_mix_depth ) :
for fc in [ 0 , 1 ] :
branch_list = [ ]
for k , v in self . temp_addr_history [ m ] [ fc ] . iteritems ( ) :
if k == " finished " :
continue
if v [ " used " ] :
branch_list . append ( v [ " addr " ] )
addrs . extend ( branch_list )
if len ( addrs ) == 0 :
log . debug ( ' no tx used ' )
if self . synctype == ' sync-only ' :
reactor . stop ( )
return
#make sure to add any addresses during the run (a subset of those
#added to the address cache)
addrs = list ( set ( self . wallet . addr_cache . keys ( ) ) . union ( set ( addrs ) ) )
self . listunspent_calls = 0
for a in addrs :
unspent_info = self . get_from_electrum ( ' blockchain.address.listunspent ' , a )
res = unspent_info [ ' result ' ]
for u in res :
wallet . unspent [ str ( u [ ' tx_hash ' ] ) + ' : ' + str ( u [ ' tx_pos ' ] ) ] = { ' address ' : a , ' value ' : int ( u [ ' value ' ] ) }
for u in wallet . spent_utxos :
wallet . unspent . pop ( u , None )
self . last_sync_unspent = time . time ( )
log . debug ( ' electrum sync_unspent took ' + str ( ( self . last_sync_unspent - st ) ) + ' sec ' )
def add_tx_notify ( self , txd , unconfirmfun , confirmfun , notifyaddr ) :
unconfirm_timeout = 10 * 60 # seconds
unconfirm_poll_period = 5
confirm_timeout = 2 * 60 * 60
confirm_poll_period = 5 * 60
class NotifyThread ( threading . Thread ) :
def __init__ ( self , blockchaininterface , txd , unconfirmfun , confirmfun ) :
threading . Thread . __init__ ( self )
self . daemon = True
self . blockchaininterface = blockchaininterface
self . unconfirmfun = unconfirmfun
self . confirmfun = confirmfun
self . tx_output_set = set ( [ ( sv [ ' script ' ] , sv [ ' value ' ] ) for sv in txd [ ' outs ' ] ] )
self . output_addresses = [ btc . script_to_address ( scrval [ 0 ] , get_p2pk_vbyte ( ) ) for scrval in self . tx_output_set ]
log . debug ( ' txoutset= ' + pprint . pformat ( self . tx_output_set ) )
log . debug ( ' outaddrs= ' + ' , ' . join ( self . output_addresses ) )
def run ( self ) :
st = int ( time . time ( ) )
unconfirmed_txid = None
unconfirmed_txhex = None
while not unconfirmed_txid :
time . sleep ( unconfirm_poll_period )
if int ( time . time ( ) ) - st > unconfirm_timeout :
log . debug ( ' checking for unconfirmed tx timed out ' )
return
shared_txid = None
for a in self . output_addresses :
unconftx = self . blockchaininterface . get_from_electrum ( ' blockchain.address.get_mempool ' , a ) . get ( ' result ' )
unconftxs = set ( [ str ( t [ ' tx_hash ' ] ) for t in unconftx ] )
if not shared_txid :
shared_txid = unconftxs
else :
shared_txid = shared_txid . intersection ( unconftxs )
log . debug ( ' sharedtxid = ' + str ( shared_txid ) )
if len ( shared_txid ) == 0 :
continue
data = [ ]
for txid in shared_txid :
txdata = str ( self . blockchaininterface . get_from_electrum ( ' blockchain.transaction.get ' , txid ) . get ( ' result ' ) )
data . append ( { ' hex ' : txdata , ' id ' : txid } )
for txdata in data :
txhex = txdata [ ' hex ' ]
outs = set ( [ ( sv [ ' script ' ] , sv [ ' value ' ] ) for sv in btc . deserialize ( txhex ) [ ' outs ' ] ] )
log . debug ( ' unconfirm query outs = ' + str ( outs ) )
if outs == self . tx_output_set :
unconfirmed_txid = txdata [ ' id ' ]
unconfirmed_txhex = txhex
break
self . unconfirmfun ( btc . deserialize ( unconfirmed_txhex ) , unconfirmed_txid )
st = int ( time . time ( ) )
confirmed_txid = None
confirmed_txhex = None
while not confirmed_txid :
time . sleep ( confirm_poll_period )
if int ( time . time ( ) ) - st > confirm_timeout :
log . debug ( ' checking for confirmed tx timed out ' )
return
shared_txid = None
for a in self . output_addresses :
conftx = self . blockchaininterface . get_from_electrum ( ' blockchain.address.listunspent ' , a ) . get ( ' result ' )
conftxs = set ( [ str ( t [ ' tx_hash ' ] ) for t in conftx ] )
if not shared_txid :
shared_txid = conftxs
else :
shared_txid = shared_txid . intersection ( conftxs )
log . debug ( ' sharedtxid = ' + str ( shared_txid ) )
if len ( shared_txid ) == 0 :
continue
data = [ ]
for txid in shared_txid :
txdata = str ( self . blockchaininterface . get_from_electrum ( ' blockchain.transaction.get ' , txid ) . get ( ' result ' ) )
data . append ( { ' hex ' : txdata , ' id ' : txid } )
for txdata in data :
txhex = txdata [ ' hex ' ]
outs = set ( [ ( sv [ ' script ' ] , sv [ ' value ' ] ) for sv in btc . deserialize ( txhex ) [ ' outs ' ] ] )
log . debug ( ' confirm query outs = ' + str ( outs ) )
if outs == self . tx_output_set :
confirmed_txid = txdata [ ' id ' ]
confirmed_txhex = txhex
break
self . confirmfun ( btc . deserialize ( confirmed_txhex ) , confirmed_txid , 1 )
NotifyThread ( self , txd , unconfirmfun , confirmfun ) . start ( )
d = self . get_from_electrum ( ' blockchain.address.listunspent ' , a )
d . addCallback ( self . process_listunspent_data , wallet , a , len ( addrs ) )
def process_listunspent_data ( self , unspent_info , wallet , address , n ) :
self . listunspent_calls + = 1
res = unspent_info [ ' result ' ]
for u in res :
wallet . unspent [ str ( u [ ' tx_hash ' ] ) + ' : ' + str (
u [ ' tx_pos ' ] ) ] = { ' address ' : address , ' value ' : int ( u [ ' value ' ] ) }
if self . listunspent_calls == n :
for u in wallet . spent_utxos :
wallet . unspent . pop ( u , None )
self . wallet_synced = True
if self . synctype == " sync-only " :
reactor . stop ( )
def pushtx ( self , txhex ) :
brcst_res = self . get_from_electrum ( ' blockchain.transaction.broadcast ' , txhex )
brcst_res = self . get_from_electrum ( ' blockchain.transaction.broadcast ' ,
txhex , blocking = True )
brcst_status = brcst_res [ ' result ' ]
if isinstance ( brcst_status , str ) and len ( brcst_status ) == 64 :
return ( True , brcst_status )
@ -252,39 +361,153 @@ class ElectrumInterface(BlockchainInterface):
def query_utxo_set ( self , txout , includeconf = False ) :
self . current_height = self . get_from_electrum (
" blockchain.numblocks.subscribe " ) [ ' result ' ]
" blockchain.numblocks.subscribe " , blocking = True ) [ ' result ' ]
if not isinstance ( txout , list ) :
txout = [ txout ]
utxos = [ [ t [ : 64 ] , int ( t [ 65 : ] ) ] for t in txout ]
result = [ ]
for ut in utxos :
address = self . get_from_electrum ( " blockchain.utxo.get_address " , ut ) [ ' result ' ]
utxo_info = self . get_from_electrum ( " blockchain.address.listunspent " , address ) [ ' result ' ]
address = self . get_from_electrum ( " blockchain.utxo.get_address " ,
ut , blocking = True ) [ ' result ' ]
utxo_info = self . get_from_electrum ( " blockchain.address.listunspent " ,
address , blocking = True ) [ ' result ' ]
utxo = None
for u in utxo_info :
if u [ ' tx_hash ' ] == ut [ 0 ] and u [ ' tx_pos ' ] == ut [ 1 ] :
utxo = u
if utxo is None :
raise Exception ( " UTXO Not Found " )
r = {
' value ' : utxo [ ' value ' ] ,
' address ' : address ,
' script ' : btc . address_to_script ( address )
}
if includeconf :
if int ( utxo [ ' height ' ] ) in [ 0 , - 1 ] :
#-1 means unconfirmed inputs
r [ ' confirms ' ] = 0
else :
#+1 because if current height = tx height, that's 1 conf
r [ ' confirms ' ] = int ( self . current_height ) - int (
utxo [ ' height ' ] ) + 1
result . append ( r )
result . append ( None )
else :
r = {
' value ' : utxo [ ' value ' ] ,
' address ' : address ,
' script ' : btc . address_to_script ( address )
}
if includeconf :
if int ( utxo [ ' height ' ] ) in [ 0 , - 1 ] :
#-1 means unconfirmed inputs
r [ ' confirms ' ] = 0
else :
#+1 because if current height = tx height, that's 1 conf
r [ ' confirms ' ] = int ( self . current_height ) - int (
utxo [ ' height ' ] ) + 1
result . append ( r )
return result
def estimate_fee_per_kb ( self , N ) :
fee_info = self . get_from_electrum ( ' blockchain.estimatefee ' , N )
print ( " N is: " + str ( N ) )
if super ( ElectrumInterface , self ) . fee_per_kb_has_been_manually_set ( N ) :
return int ( random . uniform ( N * float ( 0.8 ) , N * float ( 1.2 ) ) )
fee_info = self . get_from_electrum ( ' blockchain.estimatefee ' , N , blocking = True )
print ( ' got fee info result: ' + str ( fee_info ) )
fee = fee_info . get ( ' result ' )
fee_per_kb_sat = int ( float ( fee ) * 100000000 )
return fee_per_kb_sat
def outputs_watcher ( self , wallet_name , notifyaddr , tx_output_set ,
unconfirmfun , confirmfun , timeoutfun ) :
""" Given a key for the watcher loop (notifyaddr), a wallet name (account),
a set of outputs , and unconfirm , confirm and timeout callbacks ,
check to see if a transaction matching that output set has appeared in
the wallet . Call the callbacks and update the watcher loop state .
End the loop when the confirmation has been seen ( no spent monitoring here ) .
"""
wl = self . tx_watcher_loops [ notifyaddr ]
print ( ' txoutset= ' + pprint . pformat ( tx_output_set ) )
unconftx = self . get_from_electrum ( ' blockchain.address.get_mempool ' ,
notifyaddr , blocking = True ) . get ( ' result ' )
unconftxs = set ( [ str ( t [ ' tx_hash ' ] ) for t in unconftx ] )
if len ( unconftxs ) :
txdatas = [ ]
for txid in unconftxs :
txdatas . append ( { ' id ' : txid ,
' hex ' : str ( self . get_from_electrum (
' blockchain.transaction.get ' , txid ,
blocking = True ) . get ( ' result ' ) ) } )
unconfirmed_txid = None
for txdata in txdatas :
txhex = txdata [ ' hex ' ]
outs = set ( [ ( sv [ ' script ' ] , sv [ ' value ' ] ) for sv in btc . deserialize (
txhex ) [ ' outs ' ] ] )
print ( ' unconfirm query outs = ' + str ( outs ) )
if outs == tx_output_set :
unconfirmed_txid = txdata [ ' id ' ]
unconfirmed_txhex = txhex
break
#call unconf callback if it was found in the mempool
if unconfirmed_txid and not wl [ 1 ] :
print ( " Tx: " + str ( unconfirmed_txid ) + " seen on network. " )
unconfirmfun ( btc . deserialize ( unconfirmed_txhex ) , unconfirmed_txid )
wl [ 1 ] = True
return
conftx = self . get_from_electrum ( ' blockchain.address.listunspent ' ,
notifyaddr , blocking = True ) . get ( ' result ' )
conftxs = set ( [ str ( t [ ' tx_hash ' ] ) for t in conftx ] )
if len ( conftxs ) :
txdatas = [ ]
for txid in conftxs :
txdata = str ( self . get_from_electrum ( ' blockchain.transaction.get ' ,
txid , blocking = True ) . get ( ' result ' ) )
txdatas . append ( { ' hex ' : txdata , ' id ' : txid } )
confirmed_txid = None
for txdata in txdatas :
txhex = txdata [ ' hex ' ]
outs = set ( [ ( sv [ ' script ' ] , sv [ ' value ' ] ) for sv in btc . deserialize (
txhex ) [ ' outs ' ] ] )
print ( ' confirm query outs = ' + str ( outs ) )
if outs == tx_output_set :
confirmed_txid = txdata [ ' id ' ]
confirmed_txhex = txhex
break
if confirmed_txid and not wl [ 2 ] :
confirmfun ( btc . deserialize ( confirmed_txhex ) , confirmed_txid , 1 )
wl [ 2 ] = True
wl [ 0 ] . stop ( )
return
def tx_watcher ( self , txd , unconfirmfun , confirmfun , spentfun , c , n ) :
""" Called at a polling interval, checks if the given deserialized
transaction ( which must be fully signed ) is ( a ) broadcast , ( b ) confirmed
and ( c ) spent from . ( c , n ignored in electrum version , just supports
registering first confirmation ) .
TODO : There is no handling of conflicts here .
"""
txid = btc . txhash ( btc . serialize ( txd ) )
wl = self . tx_watcher_loops [ txid ]
#first check if in mempool (unconfirmed)
#choose an output address for the query. Filter out
#p2pkh addresses, assume p2sh (thus would fail to find tx on
#some nonstandard script type)
addr = None
for i in range ( len ( txd [ ' outs ' ] ) ) :
if not btc . is_p2pkh_script ( txd [ ' outs ' ] [ i ] [ ' script ' ] ) :
addr = btc . script_to_address ( txd [ ' outs ' ] [ i ] [ ' script ' ] , get_p2sh_vbyte ( ) )
break
if not addr :
log . error ( " Failed to find any p2sh output, cannot be a standard "
" joinmarket transaction, fatal error! " )
reactor . stop ( )
return
unconftxs_res = self . get_from_electrum ( ' blockchain.address.get_mempool ' ,
addr , blocking = True ) . get ( ' result ' )
unconftxs = [ str ( t [ ' tx_hash ' ] ) for t in unconftxs_res ]
if not wl [ 1 ] and txid in unconftxs :
print ( " Tx: " + str ( txid ) + " seen on network. " )
unconfirmfun ( txd , txid )
wl [ 1 ] = True
return
conftx = self . get_from_electrum ( ' blockchain.address.listunspent ' ,
addr , blocking = True ) . get ( ' result ' )
conftxs = [ str ( t [ ' tx_hash ' ] ) for t in conftx ]
if not wl [ 2 ] and len ( conftxs ) and txid in conftxs :
print ( " Tx: " + str ( txid ) + " is confirmed. " )
confirmfun ( txd , txid , 1 )
wl [ 2 ] = True
#Note we do not stop the monitoring loop when
#confirmations occur, since we are also monitoring for spending.
return
if not spentfun or wl [ 3 ] :
return