""" Scanner Control tab - communicates exclusively with the lf_orchestration server. All microservice switching (Spectrometer, Reconstructor, etc.) is handled by the orchestrator; this tab never connects to those services directly. """ from __future__ import annotations import json from PySide6.QtCore import Qt, QTimer from PySide6.QtGui import QFont, QColor from PySide6.QtWidgets import ( QWidget, QSplitter, QVBoxLayout, QHBoxLayout, QGroupBox, QLabel, QPushButton, QProgressBar, QTextEdit, QScrollArea, QSizePolicy, QTabWidget, QComboBox, QLineEdit, QTableWidget, QTableWidgetItem, QHeaderView, QAbstractItemView, ) from src.clients.orchestrator_client import OrchestratorClient, OrchestratorError from src.gui.workers import OrchestratorWorker from src import i18n _STATUS_COLORS = { "pending": "#9e9e9e", "running": "#e65100", "done": "#2e7d32", "failed": "#c62828", } _POLL_INTERVAL_MS = 1500 class ScannerTab(QWidget): """Orchestrator-based scanner control panel.""" def __init__( self, hw_config_path: str | None = None, orchestrator_url: str = "http://localhost:1717", parent: QWidget | None = None, ) -> None: super().__init__(parent) self._hw_config_path = hw_config_path self._client = OrchestratorClient(orchestrator_url) self._job_id: str | None = None self._seq_info: dict | None = None self._conn_state: str = "offline" # Active workers - kept alive while running self._run_worker: OrchestratorWorker | None = None self._poll_worker: OrchestratorWorker | None = None self._poll_timer = QTimer(self) self._poll_timer.setInterval(_POLL_INTERVAL_MS) self._poll_timer.timeout.connect(self._poll_status) self._build_layout() # ================================================================== # # Public API # # ================================================================== # def set_hw_config(self, path: str) -> None: self._hw_config_path = path self._append_log(f"HW config: {path}") def apply_seq_info(self, info_dict: dict) -> None: """Receive sequence info from SeqInterpTab after export.""" self._seq_info = info_dict summary_lines = [] if "infostr" in info_dict: summary_lines.append(info_dict["infostr"]) if "time" in info_dict: summary_lines.append(info_dict["time"]) adc = info_dict.get("iadc", {}) if "points" in adc: summary_lines.append(f"ADC windows: {len(adc['points'])}") self._seq_info_label.setText("\n".join(summary_lines) if summary_lines else "-") self._append_log("Sequence info received from Sequence tab.") def retranslate_ui(self) -> None: self._url_label.setText(i18n.tr("url_label")) self._btn_connect.setText(i18n.tr("btn_connect")) self._status_label.setText(i18n.tr(self._conn_state)) self._scenario_grp.setTitle(i18n.tr("grp_scenario")) self._btn_refresh.setText(i18n.tr("btn_refresh")) self._seq_grp.setTitle(i18n.tr("grp_seq_info")) self._job_grp.setTitle(i18n.tr("grp_job")) self._btn_load.setText(i18n.tr("btn_load_scenario")) self._btn_run_all.setText(i18n.tr("btn_run_all")) self._btn_next.setText(i18n.tr("btn_next_step")) self._btn_abort.setText(i18n.tr("btn_abort")) self._steps_table.setHorizontalHeaderLabels([ i18n.tr("col_step"), i18n.tr("col_status"), i18n.tr("col_result") ]) self._bottom_tabs.setTabText(0, i18n.tr("tab_step_result")) self._bottom_tabs.setTabText(1, i18n.tr("tab_seq_info_view")) self._bottom_tabs.setTabText(2, i18n.tr("tab_log")) # ================================================================== # # Layout builders # # ================================================================== # def _build_layout(self) -> None: root = QVBoxLayout(self) root.setContentsMargins(6, 6, 6, 6) root.setSpacing(6) root.addWidget(self._build_connection_bar()) split = QSplitter(Qt.Horizontal) split.addWidget(self._build_left_panel()) split.addWidget(self._build_right_panel()) split.setSizes([280, 720]) root.addWidget(split, stretch=1) def _build_connection_bar(self) -> QWidget: bar = QWidget() lay = QHBoxLayout(bar) lay.setContentsMargins(0, 0, 0, 0) self._url_label = QLabel(i18n.tr("url_label")) lay.addWidget(self._url_label) self._url_edit = QLineEdit(self._client.base_url) self._url_edit.setMaximumWidth(260) lay.addWidget(self._url_edit) self._btn_connect = QPushButton(i18n.tr("btn_connect")) self._btn_connect.setFixedWidth(80) self._btn_connect.clicked.connect(self._on_connect) lay.addWidget(self._btn_connect) self._status_label = QLabel(i18n.tr("offline")) self._status_label.setStyleSheet("color: #9e9e9e; font-weight: bold;") lay.addWidget(self._status_label) self._conn_progress = QProgressBar() self._conn_progress.setRange(0, 0) self._conn_progress.setFixedWidth(80) self._conn_progress.setVisible(False) lay.addWidget(self._conn_progress) lay.addStretch() return bar def _build_left_panel(self) -> QWidget: container = QWidget() container.setMinimumWidth(200) container.setMaximumWidth(320) lay = QVBoxLayout(container) lay.setContentsMargins(4, 4, 4, 4) lay.setSpacing(8) # Scenario selector self._scenario_grp = QGroupBox(i18n.tr("grp_scenario")) sg_lay = QVBoxLayout(self._scenario_grp) self._scenario_combo = QComboBox() self._scenario_combo.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Fixed) sg_lay.addWidget(self._scenario_combo) self._btn_refresh = QPushButton(i18n.tr("btn_refresh")) self._btn_refresh.clicked.connect(self._on_refresh_scenarios) sg_lay.addWidget(self._btn_refresh) lay.addWidget(self._scenario_grp) # Sequence summary self._seq_grp = QGroupBox(i18n.tr("grp_seq_info")) seq_lay = QVBoxLayout(self._seq_grp) self._seq_info_label = QLabel(i18n.tr("no_seq_loaded")) self._seq_info_label.setWordWrap(True) self._seq_info_label.setStyleSheet("color: palette(mid); font-style: italic;") self._seq_info_label.setFont(QFont("Courier New", 8)) seq_lay.addWidget(self._seq_info_label) lay.addWidget(self._seq_grp) # Job info self._job_grp = QGroupBox(i18n.tr("grp_job")) job_lay = QVBoxLayout(self._job_grp) self._job_label = QLabel(i18n.tr("no_job")) self._job_label.setFont(QFont("Courier New", 8)) self._job_label.setWordWrap(True) self._job_label.setStyleSheet("color: palette(mid);") job_lay.addWidget(self._job_label) lay.addWidget(self._job_grp) # Control buttons self._btn_load = QPushButton(i18n.tr("btn_load_scenario")) self._btn_load.setMinimumHeight(30) self._btn_load.clicked.connect(self._on_load_scenario) lay.addWidget(self._btn_load) self._btn_run_all = QPushButton(i18n.tr("btn_run_all")) self._btn_run_all.setMinimumHeight(30) self._btn_run_all.setEnabled(False) self._btn_run_all.clicked.connect(self._on_run_all) lay.addWidget(self._btn_run_all) self._btn_next = QPushButton(i18n.tr("btn_next_step")) self._btn_next.setMinimumHeight(30) self._btn_next.setEnabled(False) self._btn_next.clicked.connect(self._on_next_step) lay.addWidget(self._btn_next) self._btn_abort = QPushButton(i18n.tr("btn_abort")) self._btn_abort.setMinimumHeight(30) self._btn_abort.setEnabled(False) self._btn_abort.clicked.connect(self._on_abort) lay.addWidget(self._btn_abort) lay.addStretch() scroll = QScrollArea() scroll.setWidget(container) scroll.setWidgetResizable(True) scroll.setHorizontalScrollBarPolicy(Qt.ScrollBarAlwaysOff) return scroll def _build_right_panel(self) -> QWidget: panel = QWidget() lay = QVBoxLayout(panel) lay.setContentsMargins(4, 4, 4, 4) lay.setSpacing(6) # Steps table self._steps_table = QTableWidget(0, 3) self._steps_table.setHorizontalHeaderLabels([ i18n.tr("col_step"), i18n.tr("col_status"), i18n.tr("col_result") ]) self._steps_table.horizontalHeader().setSectionResizeMode(0, QHeaderView.Stretch) self._steps_table.horizontalHeader().setSectionResizeMode(1, QHeaderView.ResizeToContents) self._steps_table.horizontalHeader().setSectionResizeMode(2, QHeaderView.Stretch) self._steps_table.verticalHeader().setVisible(False) self._steps_table.setEditTriggers(QAbstractItemView.NoEditTriggers) self._steps_table.setSelectionBehavior(QAbstractItemView.SelectRows) self._steps_table.setFont(QFont("Courier New", 9)) self._steps_table.currentCellChanged.connect( lambda row, *_: self._on_step_selected(row) ) lay.addWidget(self._steps_table, stretch=1) # Bottom tabs bottom_tabs = QTabWidget() self._step_result_view = QTextEdit() self._step_result_view.setReadOnly(True) self._step_result_view.setFont(QFont("Courier New", 9)) self._bottom_tabs = bottom_tabs bottom_tabs.addTab(self._step_result_view, i18n.tr("tab_step_result")) self._seq_info_view = QTextEdit() self._seq_info_view.setReadOnly(True) self._seq_info_view.setFont(QFont("Courier New", 9)) bottom_tabs.addTab(self._seq_info_view, i18n.tr("tab_seq_info_view")) self._log_view = QTextEdit() self._log_view.setReadOnly(True) self._log_view.setFont(QFont("Courier New", 9)) bottom_tabs.addTab(self._log_view, i18n.tr("tab_log")) lay.addWidget(bottom_tabs, stretch=1) return panel # ================================================================== # # Connection # # ================================================================== # def _on_connect(self) -> None: url = self._url_edit.text().strip() if url: self._client = OrchestratorClient(url) self._btn_connect.setEnabled(False) self._conn_progress.setVisible(True) worker = OrchestratorWorker(self._client.healthcheck) worker.finished.connect(self._on_healthcheck_done) worker.error.connect(self._on_healthcheck_error) worker.start() self._hc_worker = worker # keep alive def _on_healthcheck_done(self, ok: object) -> None: self._btn_connect.setEnabled(True) self._conn_progress.setVisible(False) if ok: self._conn_state = "online" self._status_label.setText(i18n.tr("online")) self._status_label.setStyleSheet("color: #2e7d32; font-weight: bold;") self._append_log(f"Connected to orchestrator: {self._client.base_url}") self._on_refresh_scenarios() else: self._conn_state = "offline" self._status_label.setText(i18n.tr("offline")) self._status_label.setStyleSheet("color: #9e9e9e; font-weight: bold;") self._append_log("Orchestrator not reachable.") def _on_healthcheck_error(self, msg: str) -> None: self._btn_connect.setEnabled(True) self._conn_progress.setVisible(False) self._conn_state = "conn_error" self._status_label.setText(i18n.tr("conn_error")) self._status_label.setStyleSheet("color: #c62828; font-weight: bold;") self._append_log(f"Connect error: {msg}") # ================================================================== # # Scenario listing # # ================================================================== # def _on_refresh_scenarios(self) -> None: worker = OrchestratorWorker(self._client.list_scenarios) worker.finished.connect(self._on_scenarios_loaded) worker.error.connect(lambda msg: self._append_log(f"List error: {msg}")) worker.start() self._list_worker = worker def _on_scenarios_loaded(self, scenarios: object) -> None: self._scenario_combo.clear() for s in (scenarios or []): self._scenario_combo.addItem(s) self._append_log(f"Scenarios: {list(scenarios or [])}") # ================================================================== # # Job control # # ================================================================== # def _on_load_scenario(self) -> None: scenario_id = self._scenario_combo.currentText() if not scenario_id: self._append_log("No scenario selected.") return param_overrides = None if self._seq_info: param_overrides = {"start_measurement": {"info": self._seq_info}} self._append_log(f"Loading scenario '{scenario_id}'...") worker = OrchestratorWorker( self._client.load_scenario, scenario_id, param_overrides ) worker.finished.connect(self._on_scenario_loaded) worker.error.connect(lambda msg: self._append_log(f"Load error: {msg}")) worker.start() self._load_worker = worker def _on_scenario_loaded(self, job_id: object) -> None: self._job_id = str(job_id) self._job_label.setText(self._job_id[:24] + "..." if len(self._job_id) > 24 else self._job_id) self._append_log(f"Job created: {self._job_id}") self._btn_run_all.setEnabled(True) self._btn_next.setEnabled(True) self._btn_abort.setEnabled(False) self._steps_table.setRowCount(0) # Fetch initial step list self._fetch_status_once() def _on_run_all(self) -> None: if not self._job_id: return self._append_log("Running all steps...") self._btn_run_all.setEnabled(False) self._btn_next.setEnabled(False) self._btn_abort.setEnabled(True) self._run_worker = OrchestratorWorker(self._client.run_all, self._job_id) self._run_worker.finished.connect(self._on_run_all_done) self._run_worker.error.connect(self._on_worker_error) self._run_worker.start() self._poll_timer.start() def _on_run_all_done(self, result: object) -> None: self._poll_timer.stop() self._btn_abort.setEnabled(False) self._append_log("Run all complete.") if isinstance(result, dict) and "steps" in result: self._update_steps_table(result["steps"]) def _on_next_step(self) -> None: if not self._job_id: return worker = OrchestratorWorker(self._client.next_step, self._job_id) worker.finished.connect(self._on_next_done) worker.error.connect(self._on_worker_error) worker.start() self._next_worker = worker def _on_next_done(self, result: object) -> None: self._append_log("Step executed.") self._fetch_status_once() def _on_abort(self) -> None: self._poll_timer.stop() if self._run_worker and self._run_worker.isRunning(): self._run_worker.terminate() self._btn_run_all.setEnabled(True) self._btn_next.setEnabled(True) self._btn_abort.setEnabled(False) self._append_log("Aborted.") def _on_worker_error(self, msg: str) -> None: self._poll_timer.stop() self._btn_run_all.setEnabled(True) self._btn_next.setEnabled(True) self._btn_abort.setEnabled(False) self._append_log(f"Error: {msg}") # ================================================================== # # Polling # # ================================================================== # def _fetch_status_once(self) -> None: if not self._job_id: return worker = OrchestratorWorker(self._client.get_status, self._job_id) worker.finished.connect(self._on_status_received) worker.error.connect(lambda msg: self._append_log(f"Poll error: {msg}")) worker.start() self._status_worker = worker def _poll_status(self) -> None: if self._poll_worker and self._poll_worker.isRunning(): return # previous poll still in flight self._poll_worker = OrchestratorWorker(self._client.get_status, self._job_id) self._poll_worker.finished.connect(self._on_status_received) self._poll_worker.error.connect(lambda msg: None) # silently ignore poll errors self._poll_worker.start() def _on_status_received(self, status: object) -> None: if not isinstance(status, dict): return steps = status.get("steps", []) self._update_steps_table(steps) # ================================================================== # # Steps table # # ================================================================== # def _update_steps_table(self, steps: list) -> None: current_row = self._steps_table.currentRow() self._steps_table.setRowCount(len(steps)) for row, step in enumerate(steps): name = step.get("name", "") status = step.get("status", "pending").lower() result = step.get("result", "") result_str = json.dumps(result, default=str)[:80] if result else "" name_item = QTableWidgetItem(name) status_item = QTableWidgetItem(status) result_item = QTableWidgetItem(result_str) color = QColor(_STATUS_COLORS.get(status, "#9e9e9e")) for item in (name_item, status_item, result_item): item.setForeground(color) self._steps_table.setItem(row, 0, name_item) self._steps_table.setItem(row, 1, status_item) self._steps_table.setItem(row, 2, result_item) if current_row >= 0: self._steps_table.setCurrentCell(current_row, 0) # Store full step data for detail view self._steps_data = steps def _on_step_selected(self, row: int) -> None: if not hasattr(self, "_steps_data") or row < 0 or row >= len(self._steps_data): return step = self._steps_data[row] result = step.get("result", None) self._step_result_view.setPlainText( json.dumps(result, indent=2, default=str) if result is not None else "" ) # ================================================================== # # Log # # ================================================================== # def _append_log(self, msg: str) -> None: self._log_view.append(msg) if self._seq_info is not None: # Also refresh seq info view with latest raw dict self._seq_info_view.setPlainText( json.dumps(self._seq_info, indent=2, default=str) )