@ -30,7 +30,7 @@ import random
import select
import traceback
from collections import defaultdict , deque
from threading import Lock
import threading
import socks
import socket
@ -204,7 +204,17 @@ class Network(util.DaemonThread):
util . DaemonThread . __init__ ( self )
self . config = SimpleConfig ( config ) if type ( config ) == type ( { } ) else config
self . num_server = 8 if not self . config . get ( ' oneserver ' ) else 0
self . blockchain = Blockchain ( self . config , self )
self . blockchains = { 0 : Blockchain ( self . config , 0 ) }
for x in os . listdir ( self . config . path ) :
if x . startswith ( ' blockchain_fork_ ' ) :
n = int ( x [ 16 : ] )
b = Blockchain ( self . config , n )
self . blockchains [ n ] = b
self . print_error ( " blockchains " , self . blockchains . keys ( ) )
self . blockchain_index = config . get ( ' blockchain_index ' , 0 )
if self . blockchain_index not in self . blockchains . keys ( ) :
self . blockchain_index = 0
# Server for addresses and transactions
self . default_server = self . config . get ( ' server ' )
# Sanitize default server
@ -215,13 +225,12 @@ class Network(util.DaemonThread):
if not self . default_server :
self . default_server = pick_random_server ( )
self . lock = Lock ( )
self . lock = threading . Lock ( )
self . pending_sends = [ ]
self . message_id = 0
self . debug = False
self . irc_servers = { } # returned by interface (list from irc)
self . recent_servers = self . read_recent_servers ( )
self . catch_up = None # interface catching up
self . banner = ' '
self . donation_address = ' '
@ -493,18 +502,15 @@ class Network(util.DaemonThread):
if servers :
self . switch_to_interface ( random . choice ( servers ) )
def switch_lagging_interface ( self , suggestion = None ) :
def switch_lagging_interface ( self ) :
''' If auto_connect and lagging, switch interface '''
if self . server_is_lagging ( ) and self . auto_connect :
if suggestion and self . protocol == deserialize_server ( suggestion ) [ 2 ] :
self . switch_to_interface ( suggestion )
else :
# switch to one that has the correct header (not height)
header = self . get_header ( self . get_local_height ( ) )
filtered = map ( lambda x : x [ 0 ] , filter ( lambda x : x [ 1 ] == header , self . headers . items ( ) ) )
if filtered :
choice = random . choice ( filtered )
self . switch_to_interface ( choice )
# switch to one that has the correct header (not height)
header = self . blockchain ( ) . read_header ( self . get_local_height ( ) )
filtered = map ( lambda x : x [ 0 ] , filter ( lambda x : x [ 1 ] == header , self . headers . items ( ) ) )
if filtered :
choice = random . choice ( filtered )
self . switch_to_interface ( choice )
def switch_to_interface ( self , server ) :
''' Switch to server as our interface. If no connection exists nor
@ -688,15 +694,31 @@ class Network(util.DaemonThread):
self . close_interface ( self . interfaces [ server ] )
self . headers . pop ( server , None )
self . notify ( ' interfaces ' )
if server == self . catch_up :
self . catch_up = None
for b in self . blockchains . values ( ) :
if b . catch_up == server :
b . catch_up = None
def get_checkpoint ( self ) :
return max ( self . blockchains . keys ( ) )
def get_blockchain ( self , header ) :
from blockchain import hash_header
if type ( header ) is not dict :
return False
header_hash = hash_header ( header )
height = header . get ( ' block_height ' )
for b in self . blockchains . values ( ) :
if header_hash == b . get_hash ( height ) :
return b
return False
def new_interface ( self , server , socket ) :
self . add_recent_server ( server )
interface = Interface ( server , socket )
interface . blockchain = None
interface . mode = ' checkpoint '
self . interfaces [ server ] = interface
self . request_header ( interface , self . blockchain . checkpoint_height )
self . request_header ( interface , self . get_checkpoint ( ) )
if server == self . default_server :
self . switch_to_interface ( server )
self . notify ( ' interfaces ' )
@ -758,26 +780,27 @@ class Network(util.DaemonThread):
index = response [ ' params ' ] [ 0 ]
if interface . request != index :
return
connect = self . blockchain . connect_chunk ( index , response [ ' result ' ] )
connect = interface . blockchain . connect_chunk ( index , response [ ' result ' ] )
# If not finished, get the next chunk
if not connect :
return
if self . get_local_ height( ) < interface . tip :
if interface . blockchain . height ( ) < interface . tip :
self . request_chunk ( interface , index + 1 )
else :
interface . request = None
interface . mode = ' default '
interface . print_error ( ' catch up done ' )
interface . blockchain . catch_up = None
self . notify ( ' updated ' )
def request_header ( self , interface , height ) :
interface . print_error ( " requesting header %d " % height )
#interface.print_error("requesting header %d" % height )
self . queue_request ( ' blockchain.block.get_header ' , [ height ] , interface )
interface . request = height
interface . req_time = time . time ( )
def on_get_header ( self , interface , response ) :
''' Handle receiving a single block header '''
if self . blockchain . downloading_headers :
return
header = response . get ( ' result ' )
if not header :
interface . print_error ( response )
@ -789,20 +812,27 @@ class Network(util.DaemonThread):
self . connection_down ( interface . server )
return
self . on_header ( interface , header )
def can_connect ( self , header ) :
for blockchain in self . blockchains . values ( ) :
if blockchain . can_connect ( header ) :
return blockchain
def on_header ( self , interface , header ) :
height = header . get ( ' block_height ' )
if interface . mode == ' checkpoint ' :
if self . blockchain . pass_checkpoint ( header ) :
b = self . get_blockchain ( header )
if b :
interface . mode = ' default '
interface . blockchain = b
#interface.print_error('passed checkpoint', b.filename)
self . queue_request ( ' blockchain.headers.subscribe ' , [ ] , interface )
else :
if interface != self . interface or self . auto_connect :
interface . print_error ( " checkpoint failed " )
self . connection_down ( interface . server )
interface . print_error ( " checkpoint failed " )
self . connection_down ( interface . server )
interface . request = None
return
can_connect = self . blockchain . can_connect ( header )
can_connect = self . can_connect ( header )
if interface . mode == ' backward ' :
if can_connect :
interface . good = height
@ -821,36 +851,56 @@ class Network(util.DaemonThread):
interface . good = height
else :
interface . bad = height
if interface . good == interface . bad - 1 :
interface . print_error ( " catching up from %d " % interface . good )
interface . mode = ' default '
next_height = interface . good
else :
if interface . bad != interface . good + 1 :
next_height = ( interface . bad + interface . good ) / / 2
elif interface . mode == ' default ' :
else :
interface . print_error ( " found connection at %d " % interface . good )
delta1 = interface . blockchain . height ( ) - interface . good
delta2 = interface . tip - interface . good
if delta1 > 10 and delta2 > 10 :
interface . print_error ( " chain split detected: %d ( %d %d ) " % ( interface . good , delta1 , delta2 ) )
interface . blockchain . fork ( interface . bad )
interface . blockchain = Blockchain ( self . config , interface . bad )
self . blockchains [ interface . bad ] = interface . blockchain
if interface . blockchain . catch_up is None :
interface . blockchain . catch_up = interface . server
interface . print_error ( " catching up " )
interface . mode = ' catch_up '
next_height = interface . good
else :
# todo: if current catch_up is too slow, queue others
next_height = None
elif interface . mode == ' catch_up ' :
if can_connect :
self . blockchain . save_header ( header )
interface . blockchain . save_header ( header )
self . notify ( ' updated ' )
next_height = height + 1 if height < interface . tip else None
else :
interface . print_error ( " cannot connect %d " % height )
interface . mode = ' backward '
interface . bad = height
next_height = height - 1
next_height = None
if next_height is None :
# exit catch_up state
interface . request = None
interface . mode = ' default '
interface . print_error ( ' catch up done ' , interface . blockchain . catch_up )
interface . blockchain . catch_up = None
elif interface . mode == ' default ' :
assert not can_connect
interface . print_error ( " cannot connect %d " % height )
interface . mode = ' backward '
interface . bad = height
# save height where we failed
interface . blockchain_height = interface . blockchain . height ( )
next_height = height - 1
else :
raise BaseException ( interface . mode )
# If not finished, get the next header
if next_height :
if interface . mode != ' default ' :
self . request_header ( interface , next_height )
if interface . mode == ' catch_up ' and interface . tip > next_height + 50 :
self . request_chunk ( interface , next_height / / 2016 )
else :
if interface . tip > next_height + 50 :
self . request_chunk ( interface , next_height / / 2016 )
else :
self . request_header ( interface , next_height )
else :
interface . request = None
self . catch_up = None
self . request_header ( interface , next_height )
def maintain_requests ( self ) :
for interface in self . interfaces . values ( ) :
@ -879,8 +929,33 @@ class Network(util.DaemonThread):
for interface in rout :
self . process_responses ( interface )
def init_headers_file ( self ) :
filename = self . blockchains [ 0 ] . path ( )
if os . path . exists ( filename ) :
self . downloading_headers = False
return
def download_thread ( ) :
try :
import urllib , socket
socket . setdefaulttimeout ( 30 )
self . print_error ( " downloading " , bitcoin . HEADERS_URL )
urllib . urlretrieve ( bitcoin . HEADERS_URL , filename + ' .tmp ' )
os . rename ( filename + ' .tmp ' , filename )
self . print_error ( " done. " )
except Exception :
self . print_error ( " download failed. creating file " , filename )
open ( filename , ' wb+ ' ) . close ( )
self . downloading_headers = False
self . blockchains [ 0 ] . set_local_height ( )
self . downloading_headers = True
t = threading . Thread ( target = download_thread )
t . daemon = True
t . start ( )
def run ( self ) :
self . blockchain . init ( )
self . init_headers_file ( )
while self . is_running ( ) and self . downloading_headers :
time . sleep ( 1 )
while self . is_running ( ) :
self . maintain_sockets ( )
self . wait_on_sockets ( )
@ -890,35 +965,51 @@ class Network(util.DaemonThread):
self . stop_network ( )
self . on_stop ( )
def on_notify_header ( self , i , header ) :
def on_notify_header ( self , interface , header ) :
height = header . get ( ' block_height ' )
if not height :
return
self . headers [ i . server ] = header
i . tip = height
local_height = self . get_local_height ( )
if i . tip > local_height :
i . print_error ( " better height " , height )
# if I can connect, do it right away
if self . blockchain . can_connect ( header ) :
self . blockchain . save_header ( header )
self . headers [ interface . server ] = header
interface . tip = height
local_height = interface . blockchain . height ( )
if interface . mode != ' default ' :
return
if interface . tip > local_height + 1 :
if interface . blockchain . catch_up is None :
interface . blockchain . catch_up = interface . server
interface . mode = ' catch_up ' # must transition to search if it does not connect
self . request_header ( interface , local_height + 1 )
else :
# another interface is catching up
pass
elif interface . tip == local_height + 1 :
if interface . blockchain . can_connect ( header ) :
interface . blockchain . save_header ( header )
self . notify ( ' updated ' )
# otherwise trigger a search
elif self . catch_up is None :
self . catch_up = i . server
self . on_header ( i , header )
if i == self . interface :
else :
interface . mode = ' backward '
interface . bad = height
self . request_header ( interface , local_height )
else :
if not interface . blockchain . can_connect ( header ) :
interface . mode = ' backward '
interface . bad = height
self . request_header ( interface , height - 1 )
else :
pass
if interface == self . interface :
self . switch_lagging_interface ( )
self . notify ( ' updated ' )
def blockchain ( self ) :
if self . interface and self . interface . blockchain is not None :
self . blockchain_index = self . interface . blockchain . checkpoint
self . config . set_key ( ' blockchain_index ' , self . blockchain_index )
def get_header ( self , tx_height ) :
return self . blockchain . read_header ( tx_height )
return self . blockchains [ self . blockchain_index ]
def get_local_height ( self ) :
return self . blockchain . height ( )
return self . blockchain ( ) . height ( )
def synchronous_get ( self , request , timeout = 30 ) :
queue = Queue . Queue ( )