# worker.py from __future__ import annotations from typing import Any, Callable from PyQt6.QtCore import QObject, QThread, pyqtSignal, pyqtSlot class Worker(QObject): finished = pyqtSignal(object) failed = pyqtSignal(str) def __init__(self, func: Callable[[], Any]) -> None: super().__init__() self._func = func @pyqtSlot() def run(self) -> None: try: result = self._func() self.finished.emit(result) except Exception as e: self.failed.emit(str(e)) def run_in_thread(parent: QObject, func: Callable[[], Any], on_ok, on_err) -> None: """ Запустить func() в отдельном QThread. on_ok(result) вызовется в GUI-потоке (через сигнал). """ thread = QThread(parent) worker = Worker(func) worker.moveToThread(thread) # Keep references alive until thread finishes. # Without this, local vars may be garbage-collected early and the job never starts. active = getattr(parent, "_active_threads", None) if active is None: active = set() setattr(parent, "_active_threads", active) active.add((thread, worker)) thread.started.connect(worker.run) worker.finished.connect(on_ok) worker.failed.connect(on_err) worker.finished.connect(thread.quit) worker.failed.connect(thread.quit) worker.finished.connect(worker.deleteLater) worker.failed.connect(worker.deleteLater) def _cleanup() -> None: try: active.discard((thread, worker)) except Exception: pass thread.finished.connect(_cleanup) thread.finished.connect(thread.deleteLater) thread.start()