from __future__ import annotations import os import sys from typing import Any, Dict, Optional, Tuple from PyQt6 import uic from PyQt6.QtCore import QTimer from PyQt6.QtWidgets import QApplication, QMainWindow from config import ANALYSIS_BASE_URL, HARDWARE_BASE_URL, HTTP_TIMEOUT_SEC, INTERP_BASE_URL from mainui.clients import AnalysisState, LoggedApiClient from mainui.tabs.analysis_tab import AnalysisTabMixin from mainui.tabs.core import MainWindowCoreMixin from mainui.tabs.scan_tab import ScanTabMixin from mainui.tabs.services_tab import ServicesTabMixin from mainui.tabs.steam_tab import SteamTabMixin class MainWindow( MainWindowCoreMixin, ServicesTabMixin, SteamTabMixin, ScanTabMixin, AnalysisTabMixin, QMainWindow, ): """ Works with the "thin" mainwindow.ui: - tabWidgetMain with containers: vlServicesContainer vlSteamSeqgenContainer vlScanControlContainer vlSpectrumToolContainer Each tab is loaded from its own *.ui into the corresponding container. """ def __init__(self) -> None: super().__init__() base_dir = os.path.dirname(os.path.dirname(__file__)) self.ui_dir = os.path.join(base_dir, "ui") uic.loadUi(os.path.join(self.ui_dir, "mainwindow.ui"), self) # Dedicated clients per service self.hardware_api = LoggedApiClient(HARDWARE_BASE_URL, timeout_sec=HTTP_TIMEOUT_SEC) self.interp_api = LoggedApiClient(INTERP_BASE_URL, timeout_sec=HTTP_TIMEOUT_SEC) self.analysis = AnalysisState(base_url=ANALYSIS_BASE_URL) self._scan_id: Optional[str] = None self._steam_params: Dict[str, Any] = {} self._sequence_file_path: str = "" self._steam_params_dirty: bool = False self._steam_internal_update: bool = False self._steam_default_ranges: Dict[str, Tuple[float, float]] = {} self._steam_label_bases: Dict[str, str] = {} # ---- load tab UIs self.services = None if hasattr(self, "vlServicesContainer"): self.services = self._load_tab_into_container( os.path.join(self.ui_dir, "tab_services.ui"), self.vlServicesContainer, ) self.steam = None if hasattr(self, "vlSteamSeqgenContainer"): self.steam = self._load_tab_into_container( os.path.join(self.ui_dir, "tab_seqgen_steam.ui"), self.vlSteamSeqgenContainer, ) self.scan = self._load_tab_into_container( os.path.join(self.ui_dir, "tab_scan_control.ui"), self.vlScanControlContainer, ) self.spec = self._load_tab_into_container( os.path.join(self.ui_dir, "tab_spectrum_tool.ui"), self.vlSpectrumToolContainer, ) # ---- timers self.scan_timer = QTimer(self) self.scan_timer.setInterval(500) self.scan_timer.timeout.connect(self.poll_scan_progress) self.backend_ping_timer = QTimer(self) self.backend_ping_timer.setInterval(3000) self.backend_ping_timer.timeout.connect(self._ping_backend_service) self.interp_ping_timer = QTimer(self) self.interp_ping_timer.setInterval(3000) self.interp_ping_timer.timeout.connect(self._ping_interp_service) self.analysis_ping_timer = QTimer(self) self.analysis_ping_timer.setInterval(3000) self.analysis_ping_timer.timeout.connect(self._ping_analysis_service) # ---- wire SeqGen tab if self.steam is not None: self.steam.btnSteamSet.clicked.connect(self.steam_set_limits) self.steam.btnSteamSave.clicked.connect(self.steam_save_sequence) if hasattr(self.steam, "btnSteamSaveAs"): self.steam.btnSteamSaveAs.clicked.connect(self.steam_save_sequence_as) # ---- wire tab 2 system_button_map = { "gradient": ("btnGradLoad", "btnGradStart"), "rf": ("btnRFLoad", "btnRFStart"), "adc": ("btnADCLoad", "btnADCStart"), } for system, (load_btn_name, start_btn_name) in system_button_map.items(): if hasattr(self.scan, load_btn_name): getattr(self.scan, load_btn_name).clicked.connect( lambda _checked=False, s=system: self.system_load(s) ) if hasattr(self.scan, start_btn_name): getattr(self.scan, start_btn_name).clicked.connect( lambda _checked=False, s=system: self.system_start(s) ) self.scan.btnPrepareScan.clicked.connect(self.scan_prepare) if hasattr(self.scan, "btnLoadToScanner"): self.scan.btnLoadToScanner.clicked.connect(self.scan_load_to_scanner) if hasattr(self.scan, "btnInterpreterStatus"): self.scan.btnInterpreterStatus.clicked.connect(self.scan_interpreter_status) if hasattr(self.scan, "btnFetchMeasurementData"): self.scan.btnFetchMeasurementData.clicked.connect(self.scan_fetch_measurement_data) self.scan.btnStartScan.clicked.connect(self.scan_start) self.scan.btnAbortScan.clicked.connect(self.scan_abort) # ---- init scan UI self.scan.pbScanProgress.setValue(0) if hasattr(self.scan, "lblBackendPing"): self.scan.lblBackendPing.setStyleSheet("color: #808080;") if hasattr(self.scan, "lblInterpPing"): self.scan.lblInterpPing.setStyleSheet("color: #808080;") self._set_led("gradient", None) self._set_led("rf", None) self._set_led("adc", None) self._refresh_current_sequence_ui() self._init_scan_measurement_ui() # ---- init services tab self._init_services_tab() self._init_steam_tab() # ---- analysis panel self._init_analysis_panel() self.backend_ping_timer.start() self.interp_ping_timer.start() self._ping_backend_service() self._ping_interp_service() self.statusbar.showMessage("Ready", 2000) def main() -> None: app = QApplication(sys.argv) w = MainWindow() w.show() sys.exit(app.exec())