You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

355 lines
12 KiB

# -*- coding: utf-8 -*-
import attr
import logging
import re
from enum import IntEnum
import electrum_ecc as ecc
from electrum import constants
from electrum.bitcoin import sha256d, address_to_script, is_address, opcodes
from electrum.descriptor import (PubkeyProvider, PKHDescriptor, WPKHDescriptor,
SHDescriptor)
from electrum.crypto import hash_160
from electrum.transaction import (PartialTransaction, Transaction,
get_script_type_from_output_script)
from electrum.util import to_bytes
TXID_PATTERN = re.compile('([0123456789ABCDEFabcdef]{64})')
ADDR_PATTERN = re.compile(
'([123456789ABCDEFGHJKLMNPQRSTUVWXYZ'
'abcdefghijkmnopqrstuvwxyz]{20,80})')
FILTERED_TXID = '<filtered txid>'
FILTERED_ADDR = '<filtered address>'
# secp256k1 prime
prime = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEFFFFFC2F
def decompress_secp256k1_pubkey(pk):
x = int.from_bytes(pk[1:33], byteorder='big')
y_sq = (pow(x, 3, prime) + 7) % prime
y = pow(y_sq, (prime + 1) // 4, prime)
if y % 2 != pk[0] % 2:
y = prime - y
y = y.to_bytes(32, byteorder='big')
return b'\x04' + pk[1:33] + y
class UnknownAddressForLabel(Exception):
def __init__(self, addr: str):
super().__init__(f"Unknown address for this wallet: {addr}.")
def verify_signature(pubkey: bytes, sig: bytes, h: bytes) -> bool:
return ecc.ECPubkey(pubkey).ecdsa_verify(sig, h)
def filter_log_line(line):
'''Filter out txids/addresses from log lines'''
pos = 0
output_line = ''
while pos < len(line):
m = TXID_PATTERN.search(line, pos)
if m:
output_line += line[pos:m.start()]
output_line += FILTERED_TXID
pos = m.end()
continue
m = ADDR_PATTERN.search(line, pos)
if m:
addr = m.group()
if is_address(addr, net=constants.net):
output_line += line[pos:m.start()]
output_line += FILTERED_ADDR
pos = m.end()
continue
output_line += line[pos:]
break
return output_line
def guess_address_script_type(addr):
net = constants.net
if not is_address(addr, net=net):
return 'invalid bitcoin address'
try:
script = address_to_script(addr, net=net)
return get_script_type_from_output_script(to_bytes(script))
except Exception:
return 'unknown address type'
def add_txin_sig(jmman, tx, txin_idx, txin_prevtx, sigmsg):
if not isinstance(tx, Transaction):
tx = Transaction(tx)
inputs = tx.inputs()
if len(inputs) - 1 < txin_idx:
jmman.logger.info(f'add_txin_sig: txin_idx {txin_idx} too big')
return
txin = inputs[txin_idx]
if not isinstance(txin_prevtx, Transaction):
txin_prevtx = Transaction(txin_prevtx)
if txin.prevout.txid.hex() != txin_prevtx.txid():
jmman.logger.info('add_txin_sig: wrong txin_prevtx')
return
prevout_o = txin_prevtx.outputs()[txin.prevout.out_idx]
scriptPubKey = prevout_o.scriptpubkey
script_type = get_script_type_from_output_script(scriptPubKey)
sig = None
if script_type in ['p2pkh', 'p2wpkh', 'p2sh']:
sig_len = sigmsg[0]
sig = sigmsg[1:1+sig_len]
pubk = sigmsg[2+sig_len:]
else:
jmman.logger.info(f'add_txin_sig: not implemented'
f' scripttype {script_type}')
return
if not sig:
jmman.logger.info(f'add_txin_sig: no sig found'
f' for {script_type}')
return
if sig_len not in [70, 71, 72]: # DER 70, 71, 72
jmman.logger.info(f'add_txin_sig: scriptsig wrong'
f' DER length: {sig_len}')
return
add_txin_descriptor(jmman, tx, txin_idx, txin_prevtx, pubk.hex())
tx.add_signature_to_txin(txin_idx=txin_idx, signing_pubkey=pubk, sig=sig)
tx.inputs()[txin_idx].finalize()
def add_txin_descriptor(jmman, tx, txin_idx, txin_prevtx, pubk_hex):
if not isinstance(tx, Transaction):
tx = Transaction(tx)
inputs = tx.inputs()
if len(inputs) - 1 < txin_idx:
jmman.logger.debug(f'add_txin_descriptor: txin_idx {txin_idx} too big')
return
txin = inputs[txin_idx]
if not isinstance(txin_prevtx, Transaction):
txin_prevtx = Transaction(txin_prevtx)
if txin.prevout.txid.hex() != txin_prevtx.txid():
jmman.logger.debug('add_txin_descriptor: wrong txin_prevtx')
return
prevout_o = txin_prevtx.outputs()[txin.prevout.out_idx]
scriptPubKey = prevout_o.scriptpubkey
script_type = get_script_type_from_output_script(scriptPubKey)
pubk_prov = PubkeyProvider(None, pubk_hex, None)
if script_type == 'p2pkh':
d = PKHDescriptor(pubk_prov)
txin.script_descriptor = d
elif script_type == 'p2wpkh':
d = WPKHDescriptor(pubk_prov)
txin.script_descriptor = d
txin.witness_utxo = prevout_o
elif script_type == 'p2sh':
sub_d = WPKHDescriptor(pubk_prov)
d = SHDescriptor(sub_d)
if d.expand().output_script != scriptPubKey:
jmman.logger.debug(f'add_txin_descriptor: prevout p2sh sriptPubKey'
f' seems not p2sh-p2wpkh script for {pubk_hex}')
return
txin.script_descriptor = d
txin.witness_utxo = prevout_o
else:
jmman.logger.debug(f'add_txin_descriptor: not implemented'
f' scripttype {script_type}')
return
def verify_txin_sig(jmman, tx, txin_idx, txin_prevtx):
if not isinstance(tx, Transaction):
tx = Transaction(tx)
inputs = tx.inputs()
if len(inputs) - 1 < txin_idx:
jmman.logger.debug(f'verify_txin_sig: txin_idx {txin_idx} too big')
return False
txin = inputs[txin_idx]
if not isinstance(txin_prevtx, Transaction):
txin_prevtx = Transaction(txin_prevtx)
if txin.prevout.txid.hex() != txin_prevtx.txid():
jmman.logger.debug('verify_txin_sig: wrong txin_prevtx')
return False
prevout_o = txin_prevtx.outputs()[txin.prevout.out_idx]
scriptPubKey = prevout_o.scriptpubkey
witness = txin.witness
scriptSig = txin.script_sig
script_type = get_script_type_from_output_script(scriptPubKey)
if script_type != 'p2wpkh' and not scriptSig:
jmman.logger.debug(f'verify_txin_sig: empty scriptSig'
f' for {script_type}')
return False
sig = None
if witness and script_type in ['p2wpkh', 'p2sh']:
c_elements = witness[0]
if c_elements != 2:
jmman.logger.debug(f'verify_txin_sig: wrong count of {script_type}'
f' withess elements: {c_elements}')
sig_len = witness[1]
sig = witness[2:2+sig_len]
pubk = witness[3+sig_len:]
elif script_type == 'p2pkh':
sig_len = scriptSig[0]
sig = scriptSig[1:1+sig_len]
pubk = scriptSig[2+sig_len:]
if not sig:
jmman.logger.debug(f'verify_txin_sig: no sig found for {script_type}')
return False
if sig_len not in [71, 72, 73]: # DER 70, 71, 72 + sighash type byte
jmman.logger.debug(f'verify_txin_sig: scriptsig wrong'
f' DER length: {sig_len}, sig={sig.hex()}')
return False
sighash = sig[-1]
if sighash not in [1, 2, 3, 0x80]:
jmman.logger.debug(f'verify_txin_sig: wrong SIGHASH {sighash}')
return False
if not pubk:
jmman.logger.debug(f'verify_txin_sig: no pubk found for {script_type}')
return False
pubk_len = len(pubk)
if pubk_len != 33:
jmman.logger.debug(f'verify_txin_sig: wrong pubk lenght {pubk_len}')
return False
pubk_prov = PubkeyProvider(None, pubk.hex(), None)
part_tx = PartialTransaction.from_tx(tx)
part_txin = part_tx.inputs()[txin_idx]
part_txin.sighash = sighash
if script_type == 'p2pkh':
pk_hash160 = scriptPubKey[3:-2]
if pk_hash160 != hash_160(pubk):
jmman.logger.debug('verify_txin_sig: hash160 differ for p2pkh')
d = PKHDescriptor(pubk_prov)
part_txin.script_descriptor = d
elif witness and script_type == 'p2wpkh':
pk_hash160 = scriptPubKey[2:]
if pk_hash160 != hash_160(pubk):
jmman.logger.debug('verify_txin_sig: hash160 differ for p2wpkh')
d = WPKHDescriptor(pubk_prov)
part_txin.script_descriptor = d
part_txin.witness_utxo = prevout_o
elif witness and script_type == 'p2sh':
if (len(scriptPubKey) != 23
or scriptPubKey[0] != opcodes.OP_HASH160
or scriptPubKey[1] != 20
or scriptPubKey[-1] != opcodes.OP_EQUAL):
jmman.logger.debug('verify_txin_sig: invalid scriptPubKey'
' for p2wpkh-psh')
return False
if (len(scriptSig) != 23
or scriptSig[0] != 22 # length of scriptSig
or scriptSig[1] != 0 # version byte
or scriptSig[2] != 20): # witness program
jmman.logger.debug('verify_txin_sig: invalid scriptSig'
' for p2wpkh-psh')
return False
script_hash = scriptPubKey[2:-1]
if script_hash != hash_160(scriptSig[1:]):
jmman.logger.debug('verify_txin_sig: hash160 differ'
' for p2wpkh-p2sh')
sub_d = WPKHDescriptor(pubk_prov)
d = SHDescriptor(sub_d)
part_txin.script_descriptor = d
part_txin.witness_utxo = prevout_o
else:
jmman.logger.debug(f'verify_txin_sig: unknown script_type'
f' {script_type}')
return False
pre_hash = sha256d(part_tx.serialize_preimage(txin_idx))
sig = ecc.ecdsa_sig64_from_der_sig(sig[:-1])
return verify_signature(pubk, sig, pre_hash)
class JMStates(IntEnum):
'''JMManager states'''
Unsupported = 0 # JM is unsupported on this wallet
Disabled = 1 # JM is disabled yet
Ready = 3 # Ready to mixing
Mixing = 4 # Mixing is running
class KPStates(IntEnum):
'''Keypairs cache states'''
Empty = 0
Ready = 1
class JMGUILogHandler(logging.Handler):
'''Write log to maxsize limited queue'''
def __init__(self, jmman):
super(JMGUILogHandler, self).__init__()
self.shortcut = jmman.LOGGING_SHORTCUT
self.jmman = jmman
self.jmman_id = id(jmman)
self.head = 0
self.tail = 0
self.log = dict()
jmman.logger.addHandler(self)
self.setLevel(logging.DEBUG)
self.setFormatter(logging.Formatter("%(levelname)s: %(message)s"))
self.notify = False
def handle(self, record):
if (not hasattr(record, 'jmman_id')
or record.jmman_id != self.jmman_id):
return False
self.log[self.tail] = record
self.tail += 1
if self.tail - self.head > 1000:
self.clear_log(100)
if self.notify:
self.jmman.postpone_notification('jm_log_changes', self.jmman)
return True
def clear_log(self, count=0):
head = self.head
if not count:
count = self.tail - head
for i in range(head, head+count):
self.log.pop(i, None)
self.head = head + count
if self.notify:
self.jmman.postpone_notification('jm_log_changes', self.jmman)
@attr.s
class JMAddress:
mixdepth = attr.ib(type=int)
branch = attr.ib(type=int)
index = attr.ib(type=tuple)
@attr.s
class JMUtxo:
addr = attr.ib(type=str)
value = attr.ib(type=int)
mixdepth = attr.ib(type=int)