@ -1,6 +1,7 @@
#! /usr/bin/env python
#! /usr/bin/env python
import collections
import collections
import itertools
import time
import time
import sys
import sys
from decimal import Decimal
from decimal import Decimal
@ -83,9 +84,9 @@ class WalletService(Service):
# transactions we are actively monitoring,
# transactions we are actively monitoring,
# i.e. they are not new but we want to track:
# i.e. they are not new but we want to track:
self . active_txid s = [ ]
self . active_txs = { }
# to ensure transactions are only processed once:
# to ensure transactions are only processed once:
self . processed_txids = [ ]
self . processed_txids = set ( )
self . set_autofreeze_warning_cb ( )
self . set_autofreeze_warning_cb ( )
@ -285,6 +286,23 @@ class WalletService(Service):
self . autofreeze_warning_cb ( utxo )
self . autofreeze_warning_cb ( utxo )
self . disable_utxo ( * utxo )
self . disable_utxo ( * utxo )
def _yield_new_transactions ( self ) :
""" Constrains the sequence generated by bci._yield_transactions so
that it stops just before it would yield the newest transaction
previously yielded by _yield_new_transactions .
"""
since_txid = self . last_seen_txid
last = True
for tx in self . bci . _yield_transactions ( ) :
if ' txid ' in tx :
txid = tx [ ' txid ' ]
if txid == since_txid :
return
if last :
self . last_seen_txid = txid
last = False
yield tx
def transaction_monitor ( self ) :
def transaction_monitor ( self ) :
""" Keeps track of any changes in the wallet (new transactions).
""" Keeps track of any changes in the wallet (new transactions).
Intended to be run as a twisted task . LoopingCall so that this
Intended to be run as a twisted task . LoopingCall so that this
@ -294,33 +312,19 @@ class WalletService(Service):
if not self . update_blockheight ( ) :
if not self . update_blockheight ( ) :
return
return
txlist = self . bci . list_transactions ( 100 )
if not txlist :
return
new_txs = [ ]
for x in txlist :
# process either (a) a completely new tx or
# (b) a tx that reached unconf status but we are still
# waiting for conf (active_txids)
if " txid " not in x :
continue
if x [ ' txid ' ] in self . active_txids or x [ ' txid ' ] not in self . old_txs :
new_txs . append ( x )
# reset for next polling event:
self . old_txs = set ( x [ ' txid ' ] for x in txlist if " txid " in x )
# for this invocation of transaction_monitor, we *don't* want
# for this invocation of transaction_monitor, we *don't* want
# to call `gettransaction` more than once per txid, even if the
# to call `gettransaction` more than once per txid, even if the
# `listtransactions` result has multiple instances for different
# `listtransactions` result has multiple instances for different
# wallet labels; so we use a temporary variable to cache.
# wallet labels; so we use a temporary variable to cache.
gettx_results = { }
seen_txids = set ( )
for tx in new_txs :
wallet_name = self . get_wallet_name ( )
txid = tx [ " txid " ]
for txid in itertools . chain ( list ( self . active_txs ) , reversed (
if txid not in gettx_results :
[ tx [ ' txid ' ] for tx in self . _yield_new_transactions ( )
res = self . bci . get_transaction ( hextobin ( txid ) )
if ' txid ' in tx ] ) ) :
gettx_results [ txid ] = res
if txid in seen_txids :
else :
continue
res = gettx_results [ txid ]
seen_txids . add ( txid )
res = self . bci . get_transaction ( hextobin ( txid ) )
if not res :
if not res :
continue
continue
confs = res [ " confirmations " ]
confs = res [ " confirmations " ]
@ -330,13 +334,18 @@ class WalletService(Service):
if confs < 0 :
if confs < 0 :
jlog . info (
jlog . info (
" Transaction: " + txid + " has a conflict, abandoning. " )
" Transaction: " + txid + " has a conflict, abandoning. " )
self . active_txs . pop ( txid , None )
continue
continue
if confs == 0 :
if confs == 0 :
if txid in self . active_txs :
# avoid processing an unconfirmed tx we've already seen
continue
height = None
height = None
else :
else :
height = self . current_blockheight - confs + 1
height = self . current_blockheight - confs + 1
txd = self . bci . get_deser_from_gettransaction ( res )
txd = self . active_txs . get ( txid ) or \
self . bci . get_deser_from_gettransaction ( res )
if txd is None :
if txd is None :
continue
continue
removed_utxos , added_utxos = self . wallet . process_new_tx ( txd , height )
removed_utxos , added_utxos = self . wallet . process_new_tx ( txd , height )
@ -347,14 +356,16 @@ class WalletService(Service):
# is absurdly fast, this is considered acceptable compared with
# is absurdly fast, this is considered acceptable compared with
# additional complexity.
# additional complexity.
self . log_new_tx ( removed_utxos , added_utxos , txid )
self . log_new_tx ( removed_utxos , added_utxos , txid )
self . processed_txids . appen d ( txid )
self . processed_txids . ad d ( txid )
# first fire 'all' type callbacks, irrespective of if the
# first fire 'all' type callbacks, irrespective of if the
# transaction pertains to anything known (but must
# transaction pertains to anything known (but must
# have correct label per above); filter on this Joinmarket wallet label,
# have correct label per above); filter on this Joinmarket wallet label,
# or the external monitoring label:
# or the external monitoring label:
if ( self . bci . is_address_labeled ( tx , self . get_wallet_name ( ) ) or
if ( any ( self . bci . is_address_labeled ( addr , wallet_name ) or
self . bci . is_address_labeled ( tx , self . EXTERNAL_WALLET_LABEL ) ) :
self . bci . is_address_labeled ( addr ,
self . EXTERNAL_WALLET_LABEL )
for addr in res [ " details " ] ) ) :
for f in self . callbacks [ " all " ] :
for f in self . callbacks [ " all " ] :
# note we need no return value as we will never
# note we need no return value as we will never
# remove these from the list
# remove these from the list
@ -373,10 +384,10 @@ class WalletService(Service):
# Note also that it's entirely possible that there are only removals,
# Note also that it's entirely possible that there are only removals,
# not additions, to the utxo set, specifically in sweeps to external
# not additions, to the utxo set, specifically in sweeps to external
# addresses. In this case, since removal can by definition only
# addresses. In this case, since removal can by definition only
# happen once, we must allow entries in self.active_txid s through the
# happen once, we must allow txids in self.active_tx s through the
# filter.
# filter.
if len ( added_utxos ) > 0 or len ( removed_utxos ) > 0 \
if len ( added_utxos ) > 0 or len ( removed_utxos ) > 0 \
or txid in self . active_txid s :
or txid in self . active_txs :
if confs == 0 :
if confs == 0 :
for k in possible_keys :
for k in possible_keys :
if k in self . callbacks [ " unconfirmed " ] :
if k in self . callbacks [ " unconfirmed " ] :
@ -385,7 +396,7 @@ class WalletService(Service):
if f ( txd , txid ) :
if f ( txd , txid ) :
self . callbacks [ " unconfirmed " ] [ k ] . remove ( f )
self . callbacks [ " unconfirmed " ] [ k ] . remove ( f )
# keep monitoring for conf > 0:
# keep monitoring for conf > 0:
self . active_txids . append ( txid )
self . active_txs [ txid ] = txd
elif confs > 0 :
elif confs > 0 :
for k in possible_keys :
for k in possible_keys :
if k in self . callbacks [ " confirmed " ] :
if k in self . callbacks [ " confirmed " ] :
@ -393,7 +404,7 @@ class WalletService(Service):
if f ( txd , txid , confs ) :
if f ( txd , txid , confs ) :
self . callbacks [ " confirmed " ] [ k ] . remove ( f )
self . callbacks [ " confirmed " ] [ k ] . remove ( f )
if txid in self . active_txids :
if txid in self . active_txids :
self . active_txids . remove ( txid )
self . active_txs . pop ( txid , None )
def check_callback_called ( self , txinfo , callback , cbtype , msg ) :
def check_callback_called ( self , txinfo , callback , cbtype , msg ) :
""" Intended to be a deferred Task to be scheduled some
""" Intended to be a deferred Task to be scheduled some
@ -406,7 +417,7 @@ class WalletService(Service):
if callback in self . callbacks [ cbtype ] [ txinfo ] :
if callback in self . callbacks [ cbtype ] [ txinfo ] :
# the callback was not called, drop it and warn
# the callback was not called, drop it and warn
self . callbacks [ cbtype ] [ txinfo ] . remove ( callback )
self . callbacks [ cbtype ] [ txinfo ] . remove ( callback )
# TODO - dangling txids in self.active_txid s will
# TODO - dangling txids in self.active_txs will
# be caused by this, but could also happen for
# be caused by this, but could also happen for
# other reasons; possibly add logic to ensure that
# other reasons; possibly add logic to ensure that
# this never occurs, although their presence should
# this never occurs, although their presence should
@ -454,8 +465,9 @@ class WalletService(Service):
self . sync_unspent ( )
self . sync_unspent ( )
# Don't attempt updates on transactions that existed
# Don't attempt updates on transactions that existed
# before startup
# before startup
self . old_txs = [ x [ ' txid ' ] for x in self . bci . list_transactions ( 100 )
self . last_seen_txid = next (
if " txid " in x ]
( tx [ ' txid ' ] for tx in self . bci . _yield_transactions ( )
if ' txid ' in tx ) , None )
if isinstance ( self . bci , BitcoinCoreNoHistoryInterface ) :
if isinstance ( self . bci , BitcoinCoreNoHistoryInterface ) :
self . bci . set_wallet_no_history ( self . wallet )
self . bci . set_wallet_no_history ( self . wallet )
return self . synced
return self . synced
@ -668,13 +680,13 @@ class WalletService(Service):
to this wallet , as a list .
to this wallet , as a list .
"""
"""
res = [ ]
res = [ ]
processed_txids = [ ]
processed_txids = set ( )
for r in self . bci . _yield_transactions ( ) :
for r in self . bci . _yield_transactions ( ) :
txid = r [ " txid " ]
txid = r [ " txid " ]
if txid not in processed_txids :
if txid not in processed_txids :
tx = self . bci . get_transaction ( hextobin ( txid ) )
tx = self . bci . get_transaction ( hextobin ( txid ) )
res . append ( self . bci . get_deser_from_gettransaction ( tx ) )
res . append ( self . bci . get_deser_from_gettransaction ( tx ) )
processed_txids . appen d ( txid )
processed_txids . ad d ( txid )
return res
return res
def get_transaction ( self , txid ) :
def get_transaction ( self , txid ) :