Browse Source

qml: add typing declarations to qechanneldetails

master
Sander van Grieken 2 years ago
parent
commit
19d9467b93
No known key found for this signature in database
GPG Key ID: 9BCF8209EA402EBA
  1. 71
      electrum/gui/qml/qechanneldetails.py

71
electrum/gui/qml/qechanneldetails.py

@ -8,7 +8,7 @@ from electrum.i18n import _
from electrum.gui import messages from electrum.gui import messages
from electrum.logging import get_logger from electrum.logging import get_logger
from electrum.lnutil import LOCAL, REMOTE from electrum.lnutil import LOCAL, REMOTE
from electrum.lnchannel import ChanCloseOption, ChannelState, AbstractChannel, Channel from electrum.lnchannel import ChanCloseOption, ChannelState, AbstractChannel, Channel, ChannelBackup
from electrum.util import format_short_id from electrum.util import format_short_id
from .auth import AuthMixin, auth_protect from .auth import AuthMixin, auth_protect
@ -16,6 +16,9 @@ from .qewallet import QEWallet
from .qetypes import QEAmount from .qetypes import QEAmount
from .util import QtEventListener, event_listener from .util import QtEventListener, event_listener
if TYPE_CHECKING:
from electrum.wallet import Abstract_Wallet
class QEChannelDetails(AuthMixin, QObject, QtEventListener): class QEChannelDetails(AuthMixin, QObject, QtEventListener):
_logger = get_logger(__name__) _logger = get_logger(__name__)
@ -49,7 +52,7 @@ class QEChannelDetails(AuthMixin, QObject, QtEventListener):
self.destroyed.connect(lambda: self.on_destroy()) self.destroyed.connect(lambda: self.on_destroy())
@event_listener @event_listener
def on_event_channel(self, wallet, channel): def on_event_channel(self, wallet: 'Abstract_Wallet', channel: 'AbstractChannel'):
if wallet == self._wallet.wallet and self._channelid == channel.channel_id.hex(): if wallet == self._wallet.wallet and self._channelid == channel.channel_id.hex():
self.channelChanged.emit() self.channelChanged.emit()
@ -58,7 +61,7 @@ class QEChannelDetails(AuthMixin, QObject, QtEventListener):
walletChanged = pyqtSignal() walletChanged = pyqtSignal()
@pyqtProperty(QEWallet, notify=walletChanged) @pyqtProperty(QEWallet, notify=walletChanged)
def wallet(self): def wallet(self) -> QEWallet:
return self._wallet return self._wallet
@wallet.setter @wallet.setter
@ -69,7 +72,7 @@ class QEChannelDetails(AuthMixin, QObject, QtEventListener):
channelidChanged = pyqtSignal() channelidChanged = pyqtSignal()
@pyqtProperty(str, notify=channelidChanged) @pyqtProperty(str, notify=channelidChanged)
def channelid(self): def channelid(self) -> str:
return self._channelid return self._channelid
@channelid.setter @channelid.setter
@ -88,52 +91,53 @@ class QEChannelDetails(AuthMixin, QObject, QtEventListener):
self.channelChanged.emit() self.channelChanged.emit()
@pyqtProperty(str, notify=channelChanged) @pyqtProperty(str, notify=channelChanged)
def name(self): def name(self) -> str:
if not self._channel: if not self._channel:
return return ''
return self._wallet.wallet.lnworker.get_node_alias(self._channel.node_id) or '' return self._wallet.wallet.lnworker.get_node_alias(self._channel.node_id) or ''
@pyqtProperty(str, notify=channelChanged) @pyqtProperty(str, notify=channelChanged)
def pubkey(self): def pubkey(self) -> str:
return self._channel.node_id.hex() return self._channel.node_id.hex()
@pyqtProperty(str, notify=channelChanged) @pyqtProperty(str, notify=channelChanged)
def shortCid(self): def shortCid(self) -> str:
return self._channel.short_id_for_GUI() return self._channel.short_id_for_GUI()
@pyqtProperty(str, notify=channelChanged) @pyqtProperty(str, notify=channelChanged)
def localScidAlias(self): def localScidAlias(self) -> str:
lsa = self._channel.get_local_scid_alias() lsa = self._channel.get_local_scid_alias()
return format_short_id(lsa) if lsa else '' return format_short_id(lsa) if lsa else ''
@pyqtProperty(str, notify=channelChanged) @pyqtProperty(str, notify=channelChanged)
def remoteScidAlias(self): def remoteScidAlias(self) -> str:
rsa = self._channel.get_remote_scid_alias() rsa = self._channel.get_remote_scid_alias()
return format_short_id(rsa) if rsa else '' return format_short_id(rsa) if rsa else ''
@pyqtProperty(str, notify=channelChanged) @pyqtProperty(str, notify=channelChanged)
def currentFeerate(self): def currentFeerate(self) -> str:
if self._channel.is_backup(): if self._channel.is_backup():
return '' return ''
assert isinstance(self._channel, Channel) assert isinstance(self._channel, Channel)
return self._wallet.wallet.config.format_fee_rate(4 * self._channel.get_latest_feerate(LOCAL)) return self._wallet.wallet.config.format_fee_rate(4 * self._channel.get_latest_feerate(LOCAL))
@pyqtProperty(str, notify=channelChanged) @pyqtProperty(str, notify=channelChanged)
def state(self): def state(self) -> str:
return self._channel.get_state_for_GUI() return self._channel.get_state_for_GUI()
@pyqtProperty(int, notify=channelChanged) @pyqtProperty(int, notify=channelChanged)
def stateCode(self): def stateCode(self) -> ChannelState:
return self._channel.get_state() return self._channel.get_state()
@pyqtProperty(str, notify=channelChanged) @pyqtProperty(str, notify=channelChanged)
def initiator(self): def initiator(self) -> str:
if self._channel.is_backup(): if self._channel.is_backup():
return '' return ''
assert isinstance(self._channel, Channel)
return 'Local' if self._channel.constraints.is_initiator else 'Remote' return 'Local' if self._channel.constraints.is_initiator else 'Remote'
@pyqtProperty('QVariantMap', notify=channelChanged) @pyqtProperty('QVariantMap', notify=channelChanged)
def fundingOutpoint(self): def fundingOutpoint(self) -> dict:
outpoint = self._channel.funding_outpoint outpoint = self._channel.funding_outpoint
return { return {
'txid': outpoint.txid, 'txid': outpoint.txid,
@ -141,7 +145,7 @@ class QEChannelDetails(AuthMixin, QObject, QtEventListener):
} }
@pyqtProperty(str, notify=channelChanged) @pyqtProperty(str, notify=channelChanged)
def closingTxid(self): def closingTxid(self) -> str:
if not self._channel.is_closed(): if not self._channel.is_closed():
return '' return ''
item = self._channel.get_closing_height() item = self._channel.get_closing_height()
@ -152,72 +156,72 @@ class QEChannelDetails(AuthMixin, QObject, QtEventListener):
return '' return ''
@pyqtProperty(QEAmount, notify=channelChanged) @pyqtProperty(QEAmount, notify=channelChanged)
def capacity(self): def capacity(self) -> QEAmount:
self._capacity.copyFrom(QEAmount(amount_sat=self._channel.get_capacity())) self._capacity.copyFrom(QEAmount(amount_sat=self._channel.get_capacity()))
return self._capacity return self._capacity
@pyqtProperty(QEAmount, notify=channelChanged) @pyqtProperty(QEAmount, notify=channelChanged)
def localCapacity(self): def localCapacity(self) -> QEAmount:
if not self._channel.is_backup(): if not self._channel.is_backup():
self._local_capacity.copyFrom(QEAmount(amount_msat=self._channel.balance(LOCAL))) self._local_capacity.copyFrom(QEAmount(amount_msat=self._channel.balance(LOCAL)))
return self._local_capacity return self._local_capacity
@pyqtProperty(QEAmount, notify=channelChanged) @pyqtProperty(QEAmount, notify=channelChanged)
def remoteCapacity(self): def remoteCapacity(self) -> QEAmount:
if not self._channel.is_backup(): if not self._channel.is_backup():
self._remote_capacity.copyFrom(QEAmount(amount_msat=self._channel.balance(REMOTE))) self._remote_capacity.copyFrom(QEAmount(amount_msat=self._channel.balance(REMOTE)))
return self._remote_capacity return self._remote_capacity
@pyqtProperty(QEAmount, notify=channelChanged) @pyqtProperty(QEAmount, notify=channelChanged)
def canSend(self): def canSend(self) -> QEAmount:
if not self._channel.is_backup(): if not self._channel.is_backup():
self._can_send.copyFrom(QEAmount(amount_msat=self._channel.available_to_spend(LOCAL))) self._can_send.copyFrom(QEAmount(amount_msat=self._channel.available_to_spend(LOCAL)))
return self._can_send return self._can_send
@pyqtProperty(QEAmount, notify=channelChanged) @pyqtProperty(QEAmount, notify=channelChanged)
def canReceive(self): def canReceive(self) -> QEAmount:
if not self._channel.is_backup(): if not self._channel.is_backup():
self._can_receive.copyFrom(QEAmount(amount_msat=self._channel.available_to_spend(REMOTE))) self._can_receive.copyFrom(QEAmount(amount_msat=self._channel.available_to_spend(REMOTE)))
return self._can_receive return self._can_receive
@pyqtProperty(bool, notify=channelChanged) @pyqtProperty(bool, notify=channelChanged)
def frozenForSending(self): def frozenForSending(self) -> bool:
return self._channel.is_frozen_for_sending() return self._channel.is_frozen_for_sending()
@pyqtProperty(bool, notify=channelChanged) @pyqtProperty(bool, notify=channelChanged)
def frozenForReceiving(self): def frozenForReceiving(self) -> bool:
return self._channel.is_frozen_for_receiving() return self._channel.is_frozen_for_receiving()
@pyqtProperty(str, notify=channelChanged) @pyqtProperty(str, notify=channelChanged)
def channelType(self): def channelType(self) -> str:
return self._channel.storage['channel_type'].name_minimal if 'channel_type' in self._channel.storage else 'Channel Backup' return self._channel.storage['channel_type'].name_minimal if 'channel_type' in self._channel.storage else 'Channel Backup'
@pyqtProperty(bool, notify=channelChanged) @pyqtProperty(bool, notify=channelChanged)
def isOpen(self): def isOpen(self) -> bool:
return self._channel.is_open() return self._channel.is_open()
@pyqtProperty(bool, notify=channelChanged) @pyqtProperty(bool, notify=channelChanged)
def canClose(self): def canClose(self) -> bool:
return self.canCoopClose or self.canLocalForceClose or self.canRequestForceClose return self.canCoopClose or self.canLocalForceClose or self.canRequestForceClose
@pyqtProperty(bool, notify=channelChanged) @pyqtProperty(bool, notify=channelChanged)
def canCoopClose(self): def canCoopClose(self) -> bool:
return ChanCloseOption.COOP_CLOSE in self._channel.get_close_options() return ChanCloseOption.COOP_CLOSE in self._channel.get_close_options()
@pyqtProperty(bool, notify=channelChanged) @pyqtProperty(bool, notify=channelChanged)
def canLocalForceClose(self): def canLocalForceClose(self) -> bool:
return ChanCloseOption.LOCAL_FCLOSE in self._channel.get_close_options() return ChanCloseOption.LOCAL_FCLOSE in self._channel.get_close_options()
@pyqtProperty(bool, notify=channelChanged) @pyqtProperty(bool, notify=channelChanged)
def canRequestForceClose(self): def canRequestForceClose(self) -> bool:
return ChanCloseOption.REQUEST_REMOTE_FCLOSE in self._channel.get_close_options() return ChanCloseOption.REQUEST_REMOTE_FCLOSE in self._channel.get_close_options()
@pyqtProperty(bool, notify=channelChanged) @pyqtProperty(bool, notify=channelChanged)
def canDelete(self): def canDelete(self) -> bool:
return self._channel.can_be_deleted() return self._channel.can_be_deleted()
@pyqtProperty(str, notify=channelChanged) @pyqtProperty(str, notify=channelChanged)
def messageForceClose(self): def messageForceClose(self) -> str:
return messages.MSG_REQUEST_FORCE_CLOSE.strip() return messages.MSG_REQUEST_FORCE_CLOSE.strip()
@pyqtProperty(str, notify=channelChanged) @pyqtProperty(str, notify=channelChanged)
@ -237,6 +241,7 @@ class QEChannelDetails(AuthMixin, QObject, QtEventListener):
def backupType(self): def backupType(self):
if not self.isBackup: if not self.isBackup:
return '' return ''
assert isinstance(self._channel, ChannelBackup)
return 'imported' if self._channel.is_imported else 'on-chain' return 'imported' if self._channel.is_imported else 'on-chain'
@pyqtProperty(int, notify=channelChanged) @pyqtProperty(int, notify=channelChanged)
@ -251,6 +256,7 @@ class QEChannelDetails(AuthMixin, QObject, QtEventListener):
@pyqtSlot() @pyqtSlot()
def freezeForSending(self): def freezeForSending(self):
assert isinstance(self._channel, Channel)
lnworker = self._channel.lnworker lnworker = self._channel.lnworker
if lnworker.channel_db or lnworker.is_trampoline_peer(self._channel.node_id): if lnworker.channel_db or lnworker.is_trampoline_peer(self._channel.node_id):
self._channel.set_frozen_for_sending(not self.frozenForSending) self._channel.set_frozen_for_sending(not self.frozenForSending)
@ -261,6 +267,7 @@ class QEChannelDetails(AuthMixin, QObject, QtEventListener):
@pyqtSlot() @pyqtSlot()
def freezeForReceiving(self): def freezeForReceiving(self):
assert isinstance(self._channel, Channel)
lnworker = self._channel.lnworker lnworker = self._channel.lnworker
if lnworker.channel_db or lnworker.is_trampoline_peer(self._channel.node_id): if lnworker.channel_db or lnworker.is_trampoline_peer(self._channel.node_id):
self._channel.set_frozen_for_receiving(not self.frozenForReceiving) self._channel.set_frozen_for_receiving(not self.frozenForReceiving)
@ -273,7 +280,7 @@ class QEChannelDetails(AuthMixin, QObject, QtEventListener):
self.do_close_channel(closetype) self.do_close_channel(closetype)
@auth_protect(message=_('Close Lightning channel?')) @auth_protect(message=_('Close Lightning channel?'))
def do_close_channel(self, closetype): def do_close_channel(self, closetype: str):
channel_id = self._channel.channel_id channel_id = self._channel.channel_id
def handle_result(success: bool, msg: str = ''): def handle_result(success: bool, msg: str = ''):

Loading…
Cancel
Save