|
|
|
|
@ -4,7 +4,7 @@ import unittest
|
|
|
|
|
from functools import wraps, partial |
|
|
|
|
from unittest import SkipTest |
|
|
|
|
|
|
|
|
|
from PyQt6.QtCore import QCoreApplication, QTimer, QMetaObject, Qt, pyqtSlot, QObject |
|
|
|
|
from PyQt6.QtCore import QCoreApplication, QMetaObject, Qt, pyqtSlot, QObject |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestQCoreApplication(QCoreApplication): |
|
|
|
|
@ -33,42 +33,48 @@ class QEventReceiver(QObject):
|
|
|
|
|
self.received.clear() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class QETestCase(unittest.IsolatedAsyncioTestCase): |
|
|
|
|
class QETestCase(unittest.TestCase): |
|
|
|
|
|
|
|
|
|
def setUp(self): |
|
|
|
|
super().setUp() |
|
|
|
|
self.app = None |
|
|
|
|
self._e = None |
|
|
|
|
self._event = threading.Event() |
|
|
|
|
self._testcase_event = threading.Event() |
|
|
|
|
self._app_ready_event = threading.Event() |
|
|
|
|
|
|
|
|
|
def start_qt_task(): |
|
|
|
|
self.app = TestQCoreApplication([]) |
|
|
|
|
|
|
|
|
|
# self.timer = QTimer(self.app) |
|
|
|
|
# self.timer.setSingleShot(False) |
|
|
|
|
# self.timer.setInterval(500) # msec |
|
|
|
|
# self.timer.timeout.connect(lambda: None) # periodically enter python scope |
|
|
|
|
|
|
|
|
|
self.app.exec() |
|
|
|
|
self.app = None |
|
|
|
|
|
|
|
|
|
self.qt_thread = threading.Thread(target=start_qt_task) |
|
|
|
|
self.qt_thread.start() |
|
|
|
|
try: |
|
|
|
|
assert self.app is None |
|
|
|
|
self.app = TestQCoreApplication([]) |
|
|
|
|
self._app_ready_event.set() |
|
|
|
|
self.app.exec() |
|
|
|
|
self.app = None |
|
|
|
|
except Exception as e: |
|
|
|
|
print(f'Problem starting QCoreApplication: {str(e)}') |
|
|
|
|
|
|
|
|
|
self._qt_thread = threading.Thread(target=start_qt_task) |
|
|
|
|
self._qt_thread.start() |
|
|
|
|
|
|
|
|
|
def tearDown(self): |
|
|
|
|
self.app.exit() |
|
|
|
|
if self.qt_thread.is_alive(): |
|
|
|
|
self.qt_thread.join() |
|
|
|
|
if self._qt_thread.is_alive(): |
|
|
|
|
self._qt_thread.join() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def qt_test(func): |
|
|
|
|
@wraps(func) |
|
|
|
|
def decorator(self, *args): |
|
|
|
|
if threading.current_thread().name == 'MainThread': |
|
|
|
|
res = self._app_ready_event.wait(3) |
|
|
|
|
if not res: |
|
|
|
|
raise Exception('app not ready in time') |
|
|
|
|
self._testcase_event.clear() |
|
|
|
|
self.app._instance = self |
|
|
|
|
self.app._method = func.__name__ |
|
|
|
|
QMetaObject.invokeMethod(self.app, 'doInvoke', Qt.ConnectionType.QueuedConnection) |
|
|
|
|
self._event.wait(15) |
|
|
|
|
res = self._testcase_event.wait(15) |
|
|
|
|
if not res: |
|
|
|
|
self._e = Exception('testcase timed out') |
|
|
|
|
if self._e: |
|
|
|
|
print("".join(traceback.format_exception(self._e))) |
|
|
|
|
# deallocate stored exception from qt thread otherwise we SEGV garbage collector |
|
|
|
|
@ -88,6 +94,5 @@ def qt_test(func):
|
|
|
|
|
except Exception as e: |
|
|
|
|
self._e = e |
|
|
|
|
finally: |
|
|
|
|
self._event.set() |
|
|
|
|
self._event.clear() |
|
|
|
|
self._testcase_event.set() |
|
|
|
|
return decorator |
|
|
|
|
|