diff --git a/electrum/tests/test_qml_types.py b/electrum/tests/test_qml_types.py new file mode 100644 index 000000000..d7d543af6 --- /dev/null +++ b/electrum/tests/test_qml_types.py @@ -0,0 +1,85 @@ +import shutil +import tempfile + +from electrum import SimpleConfig +from electrum.gui.qml.qetypes import QEAmount +from electrum.tests.test_qt_base import QETestCase, QEventReceiver, qt_test + + +class WalletMock: + def __init__(self, electrum_path): + self.config = SimpleConfig({ + 'electrum_path': electrum_path, + 'decimal_point': 5 + }) + self.contacts = None + + +class QETestTypes(QETestCase): + + def setUp(self): + super().setUp() + self.electrum_path = tempfile.mkdtemp() + self.wallet = WalletMock(self.electrum_path) + + def tearDown(self): + super().tearDown() + shutil.rmtree(self.electrum_path) + + @qt_test + def test_qeamount(self): + a = QEAmount() + self.assertTrue(a.isEmpty) + a_er = QEventReceiver(a.valueChanged) + a.satsInt = 1 + self.assertTrue(bool(a_er.received)) + self.assertFalse(a.isEmpty) + + a_er.clear() + a.clear() + self.assertTrue(a.isEmpty) + self.assertTrue(bool(a_er.received)) + + a_er.clear() + a.isMax = True + self.assertTrue(a.isMax) + self.assertFalse(a.isEmpty) + self.assertTrue(bool(a_er.received)) + + @qt_test + def test_qeamount_copy(self): + a = QEAmount() + b = QEAmount() + b.satsInt = 1 + c = QEAmount() + c.msatsInt = 1 + d = QEAmount() + d.isMax = True + + t = QEAmount() + t_er = QEventReceiver(t.valueChanged) + + t.copyFrom(a) + self.assertTrue(t.isEmpty) + self.assertEqual(0, len(t_er.received)) + + t.clear() + t_er.clear() + t.copyFrom(b) + self.assertFalse(t.isEmpty) + self.assertEqual(t.satsInt, 1) + self.assertEqual(1, len(t_er.received)) + + t.clear() + t_er.clear() + t.copyFrom(c) + self.assertFalse(t.isEmpty) + self.assertEqual(t.msatsInt, 1) + self.assertEqual(1, len(t_er.received)) + + t.clear() + t_er.clear() + t.copyFrom(d) + self.assertFalse(t.isEmpty) + self.assertTrue(t.isMax) + self.assertEqual(1, len(t_er.received)) diff --git a/electrum/tests/test_qt_base.py b/electrum/tests/test_qt_base.py new file mode 100644 index 000000000..3ea6a8a24 --- /dev/null +++ b/electrum/tests/test_qt_base.py @@ -0,0 +1,92 @@ +import threading +import unittest +from functools import wraps, partial +from unittest import SkipTest + +from PyQt6.QtCore import QCoreApplication, QTimer, QMetaObject, Qt, pyqtSlot, QObject + + +class TestQCoreApplication(QCoreApplication): + @pyqtSlot() + def doInvoke(self): + getattr(self._instance, self._method)() + + +class QEventReceiver(QObject): + def __init__(self, *signals): + super().__init__() + self.received = [] + self.signals = [] + for signal in signals: + self.signals.append(signal) + signal.connect(partial(self.doReceive, signal)) + + # intentionally no pyqtSlot decorator, to catch all + def doReceive(self, signal, *args): + self.received.append((signal, args)) + + def receivedForSignal(self, signal): + return list(filter(lambda x: x[0] == signal, self.received)) + + def clear(self): + self.received.clear() + + +class QETestCase(unittest.IsolatedAsyncioTestCase): + + def setUp(self): + super().setUp() + self.app = None + self._e = None + self._event = threading.Event() + + def start_qt_task(): + self.app = TestQCoreApplication([]) + + self.timer = QTimer(self.app) + self.timer.setSingleShot(False) + self.timer.setInterval(500) # msec + self.timer.timeout.connect(lambda: None) # periodically enter python scope + + self.app.exec() + self.app = None + + self.qt_thread = threading.Thread(target=start_qt_task) + self.qt_thread.start() + + def tearDown(self): + self.app.exit() + if self.qt_thread.is_alive(): + self.qt_thread.join() + + +def qt_test(func): + @wraps(func) + def decorator(self, *args): + if threading.current_thread().name == 'MainThread': + self.app._instance = self + self.app._method = func.__name__ + QMetaObject.invokeMethod(self.app, 'doInvoke', Qt.ConnectionType.QueuedConnection) + self._event.wait(15) + if self._e: + print(f'raising ex: {self._e!r}') + # deallocate stored exception from qt thread otherwise we SEGV garbage collector + # instead, re-create using the exception message, special casing AssertionError and SkipTest + e = None + if isinstance(self._e, AssertionError): + e = AssertionError(str(self._e)) + elif isinstance(self._e, SkipTest): + e = SkipTest(str(self._e)) + else: + e = Exception(str(self._e)) + self._e = None + raise e + return + try: + func(self, *args) + except Exception as e: + self._e = e + finally: + self._event.set() + self._event.clear() + return decorator