import asyncio import time import urllib import re from decimal import Decimal, InvalidOperation from enum import IntEnum from typing import NamedTuple, Optional, Callable, List, TYPE_CHECKING, Tuple from . import bitcoin from .contacts import AliasNotFoundException from .i18n import _ from .invoices import Invoice from .logging import Logger from .util import parse_max_spend, format_satoshis_plain, InvoiceError from .util import get_asyncio_loop, log_exceptions from .transaction import PartialTxOutput from .lnurl import decode_lnurl, request_lnurl, callback_lnurl, LNURLError, lightning_address_to_url from .bitcoin import COIN, TOTAL_COIN_SUPPLY_LIMIT_IN_BTC, opcodes, construct_script from .lnaddr import lndecode, LnDecodeException, LnInvoiceException from .lnutil import IncompatibleOrInsaneFeatures from .bip21 import parse_bip21_URI, InvalidBitcoinURI, LIGHTNING_URI_SCHEME, BITCOIN_BIP21_URI_SCHEME if TYPE_CHECKING: from .wallet import Abstract_Wallet from .transaction import Transaction def maybe_extract_lightning_payment_identifier(data: str) -> Optional[str]: data = data.strip() # whitespaces data = data.lower() if data.startswith(LIGHTNING_URI_SCHEME + ':ln'): cut_prefix = LIGHTNING_URI_SCHEME + ':' data = data[len(cut_prefix):] if data.startswith('ln'): return data return None def is_uri(data: str) -> bool: data = data.lower() if (data.startswith(LIGHTNING_URI_SCHEME + ":") or data.startswith(BITCOIN_BIP21_URI_SCHEME + ':')): return True return False RE_ALIAS = r'(.*?)\s*\<([0-9A-Za-z]{1,})\>' RE_EMAIL = r'\b[A-Za-z0-9._%+-]+@([A-Za-z0-9-]+\.)+[A-Z|a-z]{2,7}\b' RE_DOMAIN = r'\b([A-Za-z0-9-]+\.)+[A-Z|a-z]{2,7}\b' class PaymentIdentifierState(IntEnum): EMPTY = 0 # Initial state. INVALID = 1 # Unrecognized PI AVAILABLE = 2 # PI contains a payable destination # payable means there's enough addressing information to submit to one # of the channels Electrum supports (on-chain, lightning) NEED_RESOLVE = 3 # PI contains a recognized destination format, but needs an online resolve step LNURLP_FINALIZE = 4 # PI contains a resolved LNURLp, but needs amount and comment to resolve to a bolt11 MERCHANT_NOTIFY = 5 # PI contains a valid payment request and on-chain destination. It should notify # the merchant payment processor of the tx after on-chain broadcast, # and supply a refund address (bip70) MERCHANT_ACK = 6 # PI notified merchant. nothing to be done. ERROR = 50 # generic error NOT_FOUND = 51 # PI contains a recognized destination format, but resolve step was unsuccesful MERCHANT_ERROR = 52 # PI failed notifying the merchant after broadcasting onchain TX INVALID_AMOUNT = 53 # Specified amount not accepted class PaymentIdentifierType(IntEnum): UNKNOWN = 0 SPK = 1 BIP21 = 2 BIP70 = 3 MULTILINE = 4 BOLT11 = 5 LNURLP = 6 EMAILLIKE = 7 OPENALIAS = 8 LNADDR = 9 DOMAINLIKE = 10 class FieldsForGUI(NamedTuple): recipient: Optional[str] amount: Optional[int] description: Optional[str] validated: Optional[bool] comment: Optional[int] amount_range: Optional[Tuple[int, int]] class PaymentIdentifier(Logger): """ Takes: * bitcoin addresses or script * paytomany csv * openalias * bip21 URI * lightning-URI (containing bolt11 or lnurl) * bolt11 invoice * lnurl * lightning address """ def __init__(self, wallet: 'Abstract_Wallet', text: str): Logger.__init__(self) self._state = PaymentIdentifierState.EMPTY self.wallet = wallet self.contacts = wallet.contacts if wallet is not None else None self.config = wallet.config if wallet is not None else None self.text = text.strip() self._type = PaymentIdentifierType.UNKNOWN self.error = None # if set, GUI should show error and stop self.warning = None # if set, GUI should ask user if they want to proceed # more than one of those may be set self.multiline_outputs = None self._is_max = False self.bolt11 = None self.bip21 = None self.spk = None # self.emaillike = None self.domainlike = None self.openalias_data = None # self.bip70 = None self.bip70_data = None self.merchant_ack_status = None self.merchant_ack_message = None # self.lnurl = None self.lnurl_data = None self.parse(text) @property def type(self): return self._type def set_state(self, state: 'PaymentIdentifierState'): self.logger.debug(f'PI state {self._state.name} -> {state.name}') self._state = state @property def state(self): return self._state def need_resolve(self): return self._state == PaymentIdentifierState.NEED_RESOLVE def need_finalize(self): return self._state == PaymentIdentifierState.LNURLP_FINALIZE def need_merchant_notify(self): return self._state == PaymentIdentifierState.MERCHANT_NOTIFY def is_valid(self): return self._state not in [PaymentIdentifierState.INVALID, PaymentIdentifierState.EMPTY] def is_available(self): return self._state in [PaymentIdentifierState.AVAILABLE] def is_lightning(self): return bool(self.lnurl) or bool(self.bolt11) def is_onchain(self): if self._type in [PaymentIdentifierType.SPK, PaymentIdentifierType.MULTILINE, PaymentIdentifierType.BIP70, PaymentIdentifierType.OPENALIAS]: return True if self._type in [PaymentIdentifierType.LNURLP, PaymentIdentifierType.BOLT11, PaymentIdentifierType.LNADDR]: return bool(self.bolt11) and bool(self.bolt11.get_address()) def is_multiline(self): return bool(self.multiline_outputs) def is_multiline_max(self): return self.is_multiline() and self._is_max def is_amount_locked(self): if self._type == PaymentIdentifierType.BIP21: return bool(self.bip21.get('amount')) elif self._type == PaymentIdentifierType.BIP70: return not self.need_resolve() # always fixed after resolve? elif self._type == PaymentIdentifierType.BOLT11: return bool(self.bolt11.get_amount_sat()) elif self._type in [PaymentIdentifierType.LNURLP, PaymentIdentifierType.LNADDR]: # amount limits known after resolve, might be specific amount or locked to range if self.need_resolve(): return False if self.need_finalize(): self.logger.debug(f'lnurl f {self.lnurl_data.min_sendable_sat}-{self.lnurl_data.max_sendable_sat}') return not (self.lnurl_data.min_sendable_sat < self.lnurl_data.max_sendable_sat) return True elif self._type == PaymentIdentifierType.MULTILINE: return True else: return False def is_error(self) -> bool: return self._state >= PaymentIdentifierState.ERROR def get_error(self) -> str: return self.error def parse(self, text: str): # parse text, set self._type and self.error text = text.strip() if not text: return if outputs := self._parse_as_multiline(text): self._type = PaymentIdentifierType.MULTILINE self.multiline_outputs = outputs if self.error: self.set_state(PaymentIdentifierState.INVALID) else: self.set_state(PaymentIdentifierState.AVAILABLE) elif invoice_or_lnurl := maybe_extract_lightning_payment_identifier(text): if invoice_or_lnurl.startswith('lnurl'): self._type = PaymentIdentifierType.LNURLP try: self.lnurl = decode_lnurl(invoice_or_lnurl) self.set_state(PaymentIdentifierState.NEED_RESOLVE) except Exception as e: self.error = _("Error parsing LNURL") + f":\n{e}" self.set_state(PaymentIdentifierState.INVALID) return else: self._type = PaymentIdentifierType.BOLT11 try: self.bolt11 = Invoice.from_bech32(invoice_or_lnurl) except InvoiceError as e: self.error = self._get_error_from_invoiceerror(e) self.set_state(PaymentIdentifierState.INVALID) self.logger.debug(f'Exception cause {e.args!r}') return self.set_state(PaymentIdentifierState.AVAILABLE) elif text.lower().startswith(BITCOIN_BIP21_URI_SCHEME + ':'): try: out = parse_bip21_URI(text) except InvalidBitcoinURI as e: self.error = _("Error parsing URI") + f":\n{e}" self.set_state(PaymentIdentifierState.INVALID) return self.bip21 = out self.bip70 = out.get('r') if self.bip70: self._type = PaymentIdentifierType.BIP70 self.set_state(PaymentIdentifierState.NEED_RESOLVE) else: self._type = PaymentIdentifierType.BIP21 # check optional lightning in bip21, set self.bolt11 if valid bolt11 = out.get('lightning') if bolt11: try: lndecode(bolt11) # if we get here, we have a usable bolt11 self.bolt11 = bolt11 except LnInvoiceException as e: self.logger.debug(_("Error parsing Lightning invoice") + f":\n{e}") except IncompatibleOrInsaneFeatures as e: self.logger.debug(_("Invoice requires unknown or incompatible Lightning feature") + f":\n{e!r}") self.set_state(PaymentIdentifierState.AVAILABLE) elif scriptpubkey := self.parse_output(text): self._type = PaymentIdentifierType.SPK self.spk = scriptpubkey self.set_state(PaymentIdentifierState.AVAILABLE) elif contact := self.contacts.by_name(text): if contact['type'] == 'address': self._type = PaymentIdentifierType.BIP21 self.bip21 = { 'address': contact['address'], 'label': contact['name'] } self.set_state(PaymentIdentifierState.AVAILABLE) elif contact['type'] == 'openalias': self._type = PaymentIdentifierType.EMAILLIKE self.emaillike = contact['address'] self.set_state(PaymentIdentifierState.NEED_RESOLVE) elif re.match(RE_EMAIL, text): self._type = PaymentIdentifierType.EMAILLIKE self.emaillike = text self.set_state(PaymentIdentifierState.NEED_RESOLVE) elif re.match(RE_DOMAIN, text): self._type = PaymentIdentifierType.DOMAINLIKE self.domainlike = text self.set_state(PaymentIdentifierState.NEED_RESOLVE) elif self.error is None: truncated_text = f"{text[:100]}..." if len(text) > 100 else text self.error = f"Unknown payment identifier:\n{truncated_text}" self.set_state(PaymentIdentifierState.INVALID) def resolve(self, *, on_finished: Callable[['PaymentIdentifier'], None]) -> None: assert self._state == PaymentIdentifierState.NEED_RESOLVE coro = self._do_resolve(on_finished=on_finished) asyncio.run_coroutine_threadsafe(coro, get_asyncio_loop()) @log_exceptions async def _do_resolve(self, *, on_finished: Callable[['PaymentIdentifier'], None] = None): try: if self.emaillike or self.domainlike: # TODO: parallel lookup? data = await self.resolve_openalias() if data: self.openalias_data = data self.logger.debug(f'OA: {data!r}') name = data.get('name') address = data.get('address') key = self.emaillike if self.emaillike else self.domainlike self.contacts[key] = ('openalias', name) if not data.get('validated'): self.warning = _( 'WARNING: the alias "{}" could not be validated via an additional ' 'security check, DNSSEC, and thus may not be correct.').format(key) try: assert bitcoin.is_address(address) scriptpubkey = bytes.fromhex(bitcoin.address_to_script(address)) self._type = PaymentIdentifierType.OPENALIAS self.spk = scriptpubkey self.set_state(PaymentIdentifierState.AVAILABLE) except Exception as e: self.error = str(e) self.set_state(PaymentIdentifierState.NOT_FOUND) elif self.emaillike: lnurl = lightning_address_to_url(self.emaillike) try: data = await request_lnurl(lnurl) self._type = PaymentIdentifierType.LNADDR self.lnurl = lnurl self.lnurl_data = data self.set_state(PaymentIdentifierState.LNURLP_FINALIZE) except LNURLError as e: self.set_state(PaymentIdentifierState.NOT_FOUND) except Exception as e: # NOTE: any other exception is swallowed here (e.g. DNS error) # as the user may be typing and we have an incomplete emaillike self.set_state(PaymentIdentifierState.NOT_FOUND) else: self.set_state(PaymentIdentifierState.NOT_FOUND) elif self.bip70: from . import paymentrequest pr = await paymentrequest.get_payment_request(self.bip70) if pr.verify(): self.bip70_data = pr self.set_state(PaymentIdentifierState.MERCHANT_NOTIFY) else: self.error = pr.error self.set_state(PaymentIdentifierState.ERROR) elif self.lnurl: data = await request_lnurl(self.lnurl) self.lnurl_data = data self.set_state(PaymentIdentifierState.LNURLP_FINALIZE) self.logger.debug(f'LNURL data: {data!r}') else: self.set_state(PaymentIdentifierState.ERROR) return except Exception as e: self.error = str(e) self.logger.error(f"_do_resolve() got error: {e!r}") self.set_state(PaymentIdentifierState.ERROR) finally: if on_finished: on_finished(self) def finalize( self, *, amount_sat: int = 0, comment: str = None, on_finished: Callable[['PaymentIdentifier'], None] = None, ): assert self._state == PaymentIdentifierState.LNURLP_FINALIZE coro = self._do_finalize(amount_sat=amount_sat, comment=comment, on_finished=on_finished) asyncio.run_coroutine_threadsafe(coro, get_asyncio_loop()) @log_exceptions async def _do_finalize( self, *, amount_sat: int = None, comment: str = None, on_finished: Callable[['PaymentIdentifier'], None] = None, ): from .invoices import Invoice try: if not self.lnurl_data: raise Exception("Unexpected missing LNURL data") if not (self.lnurl_data.min_sendable_sat <= amount_sat <= self.lnurl_data.max_sendable_sat): self.error = _('Amount must be between %d and %d sat.') \ % (self.lnurl_data.min_sendable_sat, self.lnurl_data.max_sendable_sat) self.set_state(PaymentIdentifierState.INVALID_AMOUNT) return if self.lnurl_data.comment_allowed == 0: comment = None params = {'amount': amount_sat * 1000} if comment: params['comment'] = comment try: invoice_data = await callback_lnurl(self.lnurl_data.callback_url, params=params) except LNURLError as e: self.error = f"LNURL request encountered error: {e}" self.set_state(PaymentIdentifierState.ERROR) return bolt11_invoice = invoice_data.get('pr') invoice = Invoice.from_bech32(bolt11_invoice) if invoice.get_amount_sat() != amount_sat: raise Exception("lnurl returned invoice with wrong amount") # this will change what is returned by get_fields_for_GUI self.bolt11 = bolt11_invoice self.set_state(PaymentIdentifierState.AVAILABLE) except Exception as e: self.error = str(e) self.logger.error(f"_do_finalize() got error: {e!r}") self.set_state(PaymentIdentifierState.ERROR) finally: if on_finished: on_finished(self) def notify_merchant( self, *, tx: 'Transaction', refund_address: str, on_finished: Callable[['PaymentIdentifier'], None] = None, ): assert self._state == PaymentIdentifierState.MERCHANT_NOTIFY assert tx assert refund_address coro = self._do_notify_merchant(tx, refund_address, on_finished=on_finished) asyncio.run_coroutine_threadsafe(coro, get_asyncio_loop()) @log_exceptions async def _do_notify_merchant( self, tx: 'Transaction', refund_address: str, *, on_finished: Callable[['PaymentIdentifier'], None] = None, ): try: if not self.bip70_data: self.set_state(PaymentIdentifierState.ERROR) return ack_status, ack_msg = await self.bip70_data.send_payment_and_receive_paymentack(tx.serialize(), refund_address) self.logger.info(f"Payment ACK: {ack_status}. Ack message: {ack_msg}") self.merchant_ack_status = ack_status self.merchant_ack_message = ack_msg self.set_state(PaymentIdentifierState.MERCHANT_ACK) except Exception as e: self.error = str(e) self.logger.error(f"_do_notify_merchant() got error: {e!r}") self.set_state(PaymentIdentifierState.MERCHANT_ERROR) finally: if on_finished: on_finished(self) def get_onchain_outputs(self, amount): if self.bip70: return self.bip70_data.get_outputs() elif self.multiline_outputs: return self.multiline_outputs elif self.spk: return [PartialTxOutput(scriptpubkey=self.spk, value=amount)] elif self.bip21: address = self.bip21.get('address') scriptpubkey = self.parse_output(address) return [PartialTxOutput(scriptpubkey=scriptpubkey, value=amount)] else: raise Exception('not onchain') def _parse_as_multiline(self, text: str): # filter out empty lines lines = text.split('\n') lines = [i for i in lines if i] is_multiline = len(lines) > 1 outputs = [] # type: List[PartialTxOutput] errors = '' total = 0 self._is_max = False for i, line in enumerate(lines): try: output = self.parse_address_and_amount(line) outputs.append(output) if parse_max_spend(output.value): self._is_max = True else: total += output.value except Exception as e: errors = f'{errors}line #{i}: {str(e)}\n' continue if is_multiline and errors: self.error = errors.strip() if errors else None self.logger.debug(f'multiline: {outputs!r}, {self.error}') return outputs def parse_address_and_amount(self, line: str) -> 'PartialTxOutput': try: x, y = line.split(',') except ValueError: raise Exception("expected two comma-separated values: (address, amount)") from None scriptpubkey = self.parse_output(x) if not scriptpubkey: raise Exception('Invalid address') amount = self.parse_amount(y) return PartialTxOutput(scriptpubkey=scriptpubkey, value=amount) def parse_output(self, x: str) -> bytes: try: address = self.parse_address(x) return bytes.fromhex(bitcoin.address_to_script(address)) except Exception as e: pass try: script = self.parse_script(x) return bytes.fromhex(script) except Exception as e: pass # raise Exception("Invalid address or script.") def parse_script(self, x: str): script = '' for word in x.split(): if word[0:3] == 'OP_': opcode_int = opcodes[word] script += construct_script([opcode_int]) else: bytes.fromhex(word) # to test it is hex data script += construct_script([word]) return script def parse_amount(self, x: str): x = x.strip() if not x: raise Exception("Amount is empty") if parse_max_spend(x): return x p = pow(10, self.config.get_decimal_point()) try: return int(p * Decimal(x)) except InvalidOperation: raise Exception("Invalid amount") def parse_address(self, line: str): r = line.strip() m = re.match('^' + RE_ALIAS + '$', r) address = str(m.group(2) if m else r) assert bitcoin.is_address(address) return address def _get_error_from_invoiceerror(self, e: 'InvoiceError') -> str: error = _("Error parsing Lightning invoice") + f":\n{e!r}" if e.args and len(e.args): arg = e.args[0] if isinstance(arg, LnInvoiceException): error = _("Error parsing Lightning invoice") + f":\n{e}" elif isinstance(arg, IncompatibleOrInsaneFeatures): error = _("Invoice requires unknown or incompatible Lightning feature") + f":\n{e!r}" return error def get_fields_for_GUI(self) -> FieldsForGUI: recipient = None amount = None description = None validated = None comment = None amount_range = None if (self.emaillike or self.domainlike) and self.openalias_data: key = self.emaillike if self.emaillike else self.domainlike address = self.openalias_data.get('address') name = self.openalias_data.get('name') description = name recipient = key + ' <' + address + '>' validated = self.openalias_data.get('validated') if not validated: self.warning = _('WARNING: the alias "{}" could not be validated via an additional ' 'security check, DNSSEC, and thus may not be correct.').format(key) elif self.bolt11: recipient, amount, description = self._get_bolt11_fields() elif self.lnurl and self.lnurl_data: domain = urllib.parse.urlparse(self.lnurl).netloc recipient = f"{self.lnurl_data.metadata_plaintext} <{domain}>" description = self.lnurl_data.metadata_plaintext if self.lnurl_data.comment_allowed: comment = self.lnurl_data.comment_allowed if self.lnurl_data.min_sendable_sat: amount = self.lnurl_data.min_sendable_sat if self.lnurl_data.min_sendable_sat != self.lnurl_data.max_sendable_sat: amount_range = (self.lnurl_data.min_sendable_sat, self.lnurl_data.max_sendable_sat) elif self.bip70 and self.bip70_data: pr = self.bip70_data if pr.error: self.error = pr.error else: recipient = pr.get_requestor() amount = pr.get_amount() description = pr.get_memo() validated = not pr.has_expired() elif self.spk: pass elif self.multiline_outputs: pass elif self.bip21: label = self.bip21.get('label') address = self.bip21.get('address') recipient = f'{label} <{address}>' if label else address amount = self.bip21.get('amount') description = self.bip21.get('message') # TODO: use label as description? (not BIP21 compliant) # if label and not description: # description = label return FieldsForGUI(recipient=recipient, amount=amount, description=description, comment=comment, validated=validated, amount_range=amount_range) def _get_bolt11_fields(self): lnaddr = self.bolt11._lnaddr # TODO: improve access to lnaddr pubkey = lnaddr.pubkey.serialize().hex() for k, v in lnaddr.tags: if k == 'd': description = v break else: description = '' amount = lnaddr.get_amount_sat() return pubkey, amount, description async def resolve_openalias(self) -> Optional[dict]: key = self.emaillike if self.emaillike else self.domainlike # TODO: below check needed? we already matched RE_EMAIL/RE_DOMAIN # if not (('.' in key) and ('<' not in key) and (' ' not in key)): # return None parts = key.split(sep=',') # assuming single line if parts and len(parts) > 0 and bitcoin.is_address(parts[0]): return None try: data = self.contacts.resolve(key) # TODO: don't use contacts as delegate to resolve openalias, separate. return data except AliasNotFoundException as e: self.logger.info(f'OpenAlias not found: {repr(e)}') return None except Exception as e: self.logger.info(f'error resolving address/alias: {repr(e)}') return None def has_expired(self): if self.bip70: return self.bip70_data.has_expired() elif self.bolt11: return self.bolt11.has_expired() elif self.bip21: expires = self.bip21.get('exp') + self.bip21.get('time') if self.bip21.get('exp') else 0 return bool(expires) and expires < time.time() return False def invoice_from_payment_identifier( pi: 'PaymentIdentifier', wallet: 'Abstract_Wallet', amount_sat: int, message: str = None ): # FIXME: this should not be a PI method # ideally, PI should not have a reference to wallet. if pi.is_lightning(): invoice = pi.bolt11 if not invoice: return if invoice.amount_msat is None: invoice.amount_msat = int(amount_sat * 1000) return invoice else: outputs = pi.get_onchain_outputs(amount_sat) message = pi.bip21.get('message') if pi.bip21 else message bip70_data = self.bip70_data if self.bip70 else None return wallet.create_invoice( outputs=outputs, message=message, pr=bip70_data, URI=pi.bip21)