@ -7,6 +7,7 @@ import numbers
import random
from binascii import hexlify , unhexlify
from datetime import datetime
from calendar import timegm
from copy import deepcopy
from mnemonic import Mnemonic as MnemonicParent
from hashlib import sha256
@ -20,7 +21,8 @@ from .blockchaininterface import INF_HEIGHT
from . support import select_gradual , select_greedy , select_greediest , \
select
from . cryptoengine import TYPE_P2PKH , TYPE_P2SH_P2WPKH , \
TYPE_P2WPKH , ENGINES
TYPE_P2WPKH , TYPE_TIMELOCK_P2WSH , TYPE_SEGWIT_LEGACY_WALLET_FIDELITY_BONDS , \
ENGINES
from . support import get_random_bytes
from . import mn_encode , mn_decode
import jmbitcoin as btc
@ -559,7 +561,7 @@ class BaseWallet(object):
returns :
tuple ( mixdepth , type , index )
type is one of 0 , 1 , ' imported '
type is one of 0 , 1 , ' imported ' , 2 , 3
"""
raise NotImplementedError ( )
@ -976,27 +978,28 @@ class BaseWallet(object):
def rewind_wallet_indices ( self , used_indices , saved_indices ) :
for md in used_indices :
for address_type in ( self . ADDRESS_TYPE_EXTERNAL ,
self . ADDRESS_TYPE_INTERNAL ) :
for address_type in range ( min ( len ( used_indices [ md ] ) , len ( saved_indices [ md ] ) ) ) :
index = max ( used_indices [ md ] [ address_type ] ,
saved_indices [ md ] [ address_type ] )
self . set_next_index ( md , address_type , index , force = True )
def _get_default_used_indices ( self ) :
return { x : [ 0 , 0 ] for x in range ( self . max_mixdepth + 1 ) }
def get_used_indices ( self , addr_gen ) :
""" Returns a dict of max used indices for each branch in
the wallet , from the given addresses addr_gen , assuming
that they are known to the wallet .
"""
indices = { x : [ 0 , 0 ] for x in range ( self . max_mixdepth + 1 ) }
indices = self . _get_default_used_indices ( )
for addr in addr_gen :
if not self . is_known_addr ( addr ) :
continue
md , address_type , index = self . get_details (
self . addr_to_path ( addr ) )
if address_type not in ( self . ADDRESS_TYPE_EXTERNAL ,
self . ADDRESS_TYPE_INTERNAL ) :
if address_type not in ( self . BIP32_EXT_ID , self . BIP32_INT_ID ,
FidelityBondMixin . BIP32_TIMELOCK_ID , FidelityBondMixin . BIP32_BURN_ID ) :
assert address_type == ' imported '
continue
indices [ md ] [ address_type ] = max ( indices [ md ] [ address_type ] , index + 1 )
@ -1379,6 +1382,10 @@ class BIP32Wallet(BaseWallet):
def _derive_bip32_master_key ( cls , seed ) :
return cls . _ENGINE . derive_bip32_master_key ( seed )
@classmethod
def _get_supported_address_types ( cls ) :
return ( cls . BIP32_EXT_ID , cls . BIP32_INT_ID )
def get_script_from_path ( self , path ) :
if not self . _is_my_bip32_path ( path ) :
raise WalletError ( " unable to get script for unknown key path " )
@ -1387,11 +1394,14 @@ class BIP32Wallet(BaseWallet):
if not 0 < = md < = self . max_mixdepth :
raise WalletError ( " Mixdepth outside of wallet ' s range. " )
assert address_type in ( self . BIP32_EXT_ID , self . BIP32_INT_ID )
assert address_type in self . _get_supported_address_types ( )
current_index = self . _index_cache [ md ] [ address_type ]
if index == current_index :
if index == current_index \
and address_type != FidelityBondMixin . BIP32_TIMELOCK_ID :
#special case for timelocked addresses because for them the
#concept of a "next address" cant be used
return self . get_new_script_override_disable ( md , address_type )
priv , engine = self . _get_priv_from_path ( path )
@ -1481,9 +1491,16 @@ class BIP32Wallet(BaseWallet):
def get_new_script_override_disable ( self , mixdepth , address_type ) :
# This is called by get_script_from_path and calls back there. We need to
# ensure all conditions match to avoid endless recursion.
index = self . get_index_cache_and_increment ( mixdepth , address_type )
return self . get_script_and_update_map ( mixdepth , address_type , index )
def get_index_cache_and_increment ( self , mixdepth , address_type ) :
index = self . _index_cache [ mixdepth ] [ address_type ]
self . _index_cache [ mixdepth ] [ address_type ] + = 1
path = self . get_path ( mixdepth , address_type , index )
return index
def get_script_and_update_map ( self , * args ) :
path = self . get_path ( * args )
script = self . get_script_from_path ( path )
self . _script_map [ script ] = path
return script
@ -1571,8 +1588,6 @@ class LegacyWallet(ImportWalletMixin, BIP32Wallet):
def _get_bip32_base_path ( self ) :
return self . _key_ident , 0
class BIP32PurposedWallet ( BIP32Wallet ) :
""" A class to encapsulate cases like
BIP44 , 49 and 84 , all of which are derivatives
@ -1595,6 +1610,174 @@ class BIP32PurposedWallet(BIP32Wallet):
return path [ len ( self . _get_bip32_base_path ( ) ) ] - 2 * * 31
class FidelityBondMixin ( object ) :
BIP32_TIMELOCK_ID = 2
BIP32_BURN_ID = 3
"""
Explaination of time number
incrementing time numbers ( 0 , 1 , 2 , 3 , 4. . .
will produce datetimes
suitable for timelocking ( 1 st january , 1 st april , 1 st july . . . .
this greatly reduces the number of possible timelock values
and is helpful for recovery of funds because the wallet can search
the only addresses corresponding to timenumbers which are far fewer
For example , if TIMENUMBER_UNIT = 2 ( i . e . every time number is two months )
then there are 6 timelocks per year so just 600 possible
addresses per century per pubkey . Easily searchable when recovering a
wallet from seed phrase . Therefore the user doesn ' t need to store any
dates , the seed phrase is sufficent for recovery .
"""
#should be a factor of 12, the number of months in a year
TIMENUMBER_UNIT = 1
# all timelocks are 1st of the month at midnight
TIMELOCK_DAY_AND_SHORTER = ( 1 , 0 , 0 , 0 , 0 )
TIMELOCK_EPOCH_YEAR = 2020
TIMELOCK_EPOCH_MONTH = 1 #january
MONTHS_IN_YEAR = 12
TIMELOCK_ERA_YEARS = 30
TIMENUMBERS_PER_PUBKEY = TIMELOCK_ERA_YEARS * MONTHS_IN_YEAR / / TIMENUMBER_UNIT
"""
As each pubkey corresponds to hundreds of addresses , to reduce load the
given gap limit will be reduced by this factor . Also these timelocked
addresses are never handed out to takers so there wont be a problem of
having many used addresses with no transactions on them .
"""
TIMELOCK_GAP_LIMIT_REDUCTION_FACTOR = 6
#only one mixdepth will have fidelity bonds in it
FIDELITY_BOND_MIXDEPTH = 0
@classmethod
def _time_number_to_timestamp ( cls , timenumber ) :
"""
converts a time number to a unix timestamp
"""
year = cls . TIMELOCK_EPOCH_YEAR + ( timenumber * cls . TIMENUMBER_UNIT ) / / cls . MONTHS_IN_YEAR
month = cls . TIMELOCK_EPOCH_MONTH + ( timenumber * cls . TIMENUMBER_UNIT ) % cls . MONTHS_IN_YEAR
return timegm ( datetime ( year , month , * cls . TIMELOCK_DAY_AND_SHORTER ) . timetuple ( ) )
@classmethod
def timestamp_to_time_number ( cls , timestamp ) :
"""
converts a datetime object to a time number
"""
dt = datetime . utcfromtimestamp ( timestamp )
if ( dt . month - cls . TIMELOCK_EPOCH_MONTH ) % cls . TIMENUMBER_UNIT != 0 :
raise ValueError ( )
day_and_shorter_tuple = ( dt . day , dt . hour , dt . minute , dt . second , dt . microsecond )
if day_and_shorter_tuple != cls . TIMELOCK_DAY_AND_SHORTER :
raise ValueError ( )
timenumber = ( dt . year - cls . TIMELOCK_EPOCH_YEAR ) * ( cls . MONTHS_IN_YEAR / /
cls . TIMENUMBER_UNIT ) + ( ( dt . month - cls . TIMELOCK_EPOCH_MONTH ) / / cls . TIMENUMBER_UNIT )
if timenumber < 0 or timenumber > cls . TIMENUMBERS_PER_PUBKEY :
raise ValueError ( " datetime out of range " )
return timenumber
@classmethod
def _is_timelocked_path ( cls , path ) :
return len ( path ) > 4 and path [ 4 ] == cls . BIP32_TIMELOCK_ID
def get_xpub_from_fidelity_bond_master_pub_key ( cls , mpk ) :
if mpk . startswith ( cls . _BIP32_PUBKEY_PREFIX ) :
return mpk [ len ( cls . _BIP32_PUBKEY_PREFIX ) : ]
else :
return False
def _get_key_ident ( self ) :
first_path = self . get_path ( 0 , 0 )
priv , engine = self . _get_priv_from_path ( first_path )
pub = engine . privkey_to_pubkey ( priv )
return sha256 ( sha256 ( pub ) . digest ( ) ) . digest ( ) [ : 3 ]
def _populate_script_map ( self ) :
super ( FidelityBondMixin , self ) . _populate_script_map ( )
for md in self . _index_cache :
address_type = self . BIP32_TIMELOCK_ID
for i in range ( self . _index_cache [ md ] [ address_type ] ) :
for timenumber in range ( self . TIMENUMBERS_PER_PUBKEY ) :
path = self . get_path ( md , address_type , i , timenumber )
script = self . get_script_from_path ( path )
self . _script_map [ script ] = path
@classmethod
def _get_supported_address_types ( cls ) :
return ( cls . BIP32_EXT_ID , cls . BIP32_INT_ID , cls . BIP32_TIMELOCK_ID , cls . BIP32_BURN_ID )
def _get_priv_from_path ( self , path ) :
if self . _is_timelocked_path ( path ) :
key_path = path [ : - 1 ]
locktime = path [ - 1 ]
engine = ENGINES [ TYPE_TIMELOCK_P2WSH ]
privkey = engine . derive_bip32_privkey ( self . _master_key , key_path )
return ( privkey , locktime ) , engine
else :
return super ( FidelityBondMixin , self ) . _get_priv_from_path ( path )
def get_path ( self , mixdepth = None , address_type = None , index = None , timenumber = None ) :
if address_type == None or address_type in ( self . BIP32_EXT_ID , self . BIP32_INT_ID ,
self . BIP32_BURN_ID ) or index == None :
return super ( FidelityBondMixin , self ) . get_path ( mixdepth , address_type , index )
elif address_type == self . BIP32_TIMELOCK_ID :
assert timenumber != None
timestamp = self . _time_number_to_timestamp ( timenumber )
return tuple ( chain ( self . _get_bip32_export_path ( mixdepth , address_type ) ,
( index , timestamp ) ) )
else :
assert 0
"""
We define a new serialization of the bip32 path to include bip65 timelock addresses
Previously it was m / 44 ' /1 ' / 0 ' /3/0
For a timelocked address it will be m / 44 ' /1 ' / 0 ' /3/0:13245432
The timelock will be in unix format and added to the end with a colon " : " character
refering to the pubkey plus the timelock value which together are needed to create the address
"""
def get_path_repr ( self , path ) :
if self . _is_timelocked_path ( path ) and len ( path ) == 7 :
return super ( FidelityBondMixin , self ) . get_path_repr ( path [ : - 1 ] ) + \
" : " + str ( path [ - 1 ] )
else :
return super ( FidelityBondMixin , self ) . get_path_repr ( path )
def path_repr_to_path ( self , pathstr ) :
if pathstr . find ( " : " ) == - 1 :
return super ( FidelityBondMixin , self ) . path_repr_to_path ( pathstr )
else :
colon_chunks = pathstr . split ( " : " )
if len ( colon_chunks ) != 2 :
raise WalletError ( " Not a valid wallet timelock path: {} " . format ( pathstr ) )
return tuple ( chain (
super ( FidelityBondMixin , self ) . path_repr_to_path ( colon_chunks [ 0 ] ) ,
( int ( colon_chunks [ 1 ] ) , )
) )
def get_details ( self , path ) :
if self . _is_timelocked_path ( path ) :
return self . _get_mixdepth_from_path ( path ) , path [ - 3 ] , path [ - 2 ]
else :
return super ( FidelityBondMixin , self ) . get_details ( path )
def _get_default_used_indices ( self ) :
return { x : [ 0 , 0 , 0 , 0 ] for x in range ( self . max_mixdepth + 1 ) }
def get_script ( self , mixdepth , address_type , index , timenumber = None ) :
path = self . get_path ( mixdepth , address_type , index , timenumber )
return self . get_script_from_path ( path )
def get_addr ( self , mixdepth , address_type , index , timenumber = None ) :
script = self . get_script ( mixdepth , address_type , index , timenumber )
return self . script_to_addr ( script )
#class FidelityBondWatchonlyWallet(ImportWalletMixin, BIP39WalletMixin, FidelityBondMixin):
class BIP49Wallet ( BIP32PurposedWallet ) :
_PURPOSE = 2 * * 31 + 49
_ENGINE = ENGINES [ TYPE_P2SH_P2WPKH ]
@ -1609,8 +1792,12 @@ class SegwitLegacyWallet(ImportWalletMixin, BIP39WalletMixin, BIP49Wallet):
class SegwitWallet ( ImportWalletMixin , BIP39WalletMixin , BIP84Wallet ) :
TYPE = TYPE_P2WPKH
class SegwitLegacyWalletFidelityBonds ( FidelityBondMixin , SegwitLegacyWallet ) :
TYPE = TYPE_SEGWIT_LEGACY_WALLET_FIDELITY_BONDS
WALLET_IMPLEMENTATIONS = {
LegacyWallet . TYPE : LegacyWallet ,
SegwitLegacyWallet . TYPE : SegwitLegacyWallet ,
SegwitWallet . TYPE : SegwitWallet
SegwitWallet . TYPE : SegwitWallet ,
SegwitLegacyWalletFidelityBonds . TYPE : SegwitLegacyWalletFidelityBonds
}