from __future__ import annotations import json import os from typing import Any, Callable, Dict, Optional, Tuple import requests from PyQt6.QtWidgets import QFileDialog, QLabel, QMessageBox from config import ENDPOINTS from mainui.clients import ApiResult from mainui.utils import short_json, short_path from mainui.worker import run_in_thread class ScanTabMixin: def _init_scan_measurement_ui(self) -> None: if not hasattr(self, "scan") or self.scan is None: return if not hasattr(self.scan, "pteMeasurementPayload"): return t = self._measurement_payload_template() if hasattr(self.scan, "leMeasurementEngine"): self.scan.leMeasurementEngine.setText(str(t["engine"])) if hasattr(self.scan, "leMeasurementInfo"): self.scan.leMeasurementInfo.setText(str(t["infostr"])) osc = t["oscilloscope"] if hasattr(self.scan, "cbOscEnabled"): self.scan.cbOscEnabled.setChecked(bool(osc["enabled"])) if hasattr(self.scan, "leOscDeviceModel"): self.scan.leOscDeviceModel.setText(str(osc["device_model"])) if hasattr(self.scan, "sbOscSampleRate"): self.scan.sbOscSampleRate.setValue(int(osc["sample_rate"])) if hasattr(self.scan, "leOscPoints"): self.scan.leOscPoints.setText(",".join(str(x) for x in osc["points"])) if hasattr(self.scan, "leOscChannels"): self.scan.leOscChannels.setText(",".join(str(x) for x in osc["channels"])) if hasattr(self.scan, "leOscRanges"): self.scan.leOscRanges.setText(",".join(str(x) for x in osc["ranges"])) if hasattr(self.scan, "sbOscNTriggers"): self.scan.sbOscNTriggers.setValue(int(osc["ntriggers"])) if hasattr(self.scan, "sbOscAveraging"): self.scan.sbOscAveraging.setValue(int(osc["averaging"])) if hasattr(self.scan, "sbOscTriggerChannel"): self.scan.sbOscTriggerChannel.setValue(int(osc["trigger_channel"])) if hasattr(self.scan, "sbOscThreshold"): self.scan.sbOscThreshold.setValue(int(osc["threshold"])) if hasattr(self.scan, "sbOscDirection"): self.scan.sbOscDirection.setValue(int(osc["direction"])) if hasattr(self.scan, "sbOscAutomeasureTime"): self.scan.sbOscAutomeasureTime.setValue(int(osc["automeasure_time"])) sync = t["synchronizer"] if hasattr(self.scan, "cbSyncEnabled"): self.scan.cbSyncEnabled.setChecked(bool(sync["enabled"])) if hasattr(self.scan, "leSyncDeviceModel"): self.scan.leSyncDeviceModel.setText(str(sync["device_model"])) if hasattr(self.scan, "leSyncFile"): self.scan.leSyncFile.setText(str(sync["file"])) if hasattr(self.scan, "leSyncPort"): self.scan.leSyncPort.setText(str(sync["port"])) sdr = t["sdr"] if hasattr(self.scan, "cbSdrEnabled"): self.scan.cbSdrEnabled.setChecked(bool(sdr["enabled"])) if hasattr(self.scan, "leSdrDeviceModel"): self.scan.leSdrDeviceModel.setText(str(sdr["device_model"])) if hasattr(self.scan, "sbSdrSampleRate"): self.scan.sbSdrSampleRate.setValue(int(sdr["sample_rate"])) if hasattr(self.scan, "dsbSdrFreq"): self.scan.dsbSdrFreq.setValue(float(sdr["freq"])) if hasattr(self.scan, "dsbSdrAmpl"): self.scan.dsbSdrAmpl.setValue(float(sdr["ampl"])) if hasattr(self.scan, "dsbSdrGain"): self.scan.dsbSdrGain.setValue(float(sdr["gain"])) if hasattr(self.scan, "leSdrFile"): self.scan.leSdrFile.setText(str(sdr["file"])) gru = t["gru"] if hasattr(self.scan, "cbGruEnabled"): self.scan.cbGruEnabled.setChecked(bool(gru["enabled"])) if hasattr(self.scan, "leGruDeviceModel"): self.scan.leGruDeviceModel.setText(str(gru["device_model"])) if hasattr(self.scan, "leGruIp"): self.scan.leGruIp.setText(str(gru["ip"])) if hasattr(self.scan, "leGruFile"): self.scan.leGruFile.setText(str(gru["file"])) self._wire_measurement_payload_preview() self._refresh_measurement_payload_preview() if hasattr(self.scan, "lblMeasurementStatus"): self.scan.lblMeasurementStatus.setText("Measurement status: ready") def _measurement_payload_template(self) -> Dict[str, Any]: return { "oscilloscope": { "device_model": "", "sample_rate": 8000000, "points": [1024], "channels": [0], "ranges": [5], "ntriggers": 1, "averaging": 1, "trigger_channel": 0, "threshold": 0, "direction": 2, "automeasure_time": 0, "enabled": True, }, "synchronizer": {"device_model": "", "file": "", "port": "", "enabled": False}, "sdr": { "device_model": "", "sample_rate": 0, "freq": 0, "ampl": 0, "gain": 0, "file": "", "enabled": False, }, "gru": {"device_model": "", "ip": "", "file": "", "enabled": False}, "engine": "default", "infostr": "mainui", } def _build_measurement_payload(self) -> Optional[Dict[str, Any]]: payload = self._measurement_payload_template() if hasattr(self.scan, "leMeasurementEngine"): payload["engine"] = self.scan.leMeasurementEngine.text().strip() or "default" if hasattr(self.scan, "leMeasurementInfo"): payload["infostr"] = self.scan.leMeasurementInfo.text().strip() osc = payload["oscilloscope"] if hasattr(self.scan, "cbOscEnabled"): osc["enabled"] = bool(self.scan.cbOscEnabled.isChecked()) if hasattr(self.scan, "leOscDeviceModel"): osc["device_model"] = self.scan.leOscDeviceModel.text().strip() if hasattr(self.scan, "sbOscSampleRate"): osc["sample_rate"] = int(self.scan.sbOscSampleRate.value()) if hasattr(self.scan, "leOscPoints"): parsed = self._parse_number_list(self.scan.leOscPoints.text(), "points", int) if parsed is None: return None osc["points"] = parsed if hasattr(self.scan, "leOscChannels"): parsed = self._parse_number_list(self.scan.leOscChannels.text(), "channels", int) if parsed is None: return None osc["channels"] = parsed if hasattr(self.scan, "leOscRanges"): parsed = self._parse_number_list(self.scan.leOscRanges.text(), "ranges", float) if parsed is None: return None osc["ranges"] = parsed if hasattr(self.scan, "sbOscNTriggers"): osc["ntriggers"] = int(self.scan.sbOscNTriggers.value()) if hasattr(self.scan, "sbOscAveraging"): osc["averaging"] = int(self.scan.sbOscAveraging.value()) if hasattr(self.scan, "sbOscTriggerChannel"): osc["trigger_channel"] = int(self.scan.sbOscTriggerChannel.value()) if hasattr(self.scan, "sbOscThreshold"): osc["threshold"] = int(self.scan.sbOscThreshold.value()) if hasattr(self.scan, "sbOscDirection"): osc["direction"] = int(self.scan.sbOscDirection.value()) if hasattr(self.scan, "sbOscAutomeasureTime"): osc["automeasure_time"] = int(self.scan.sbOscAutomeasureTime.value()) sync = payload["synchronizer"] if hasattr(self.scan, "cbSyncEnabled"): sync["enabled"] = bool(self.scan.cbSyncEnabled.isChecked()) if hasattr(self.scan, "leSyncDeviceModel"): sync["device_model"] = self.scan.leSyncDeviceModel.text().strip() if hasattr(self.scan, "leSyncFile"): sync["file"] = self.scan.leSyncFile.text().strip() if hasattr(self.scan, "leSyncPort"): sync["port"] = self.scan.leSyncPort.text().strip() sdr = payload["sdr"] if hasattr(self.scan, "cbSdrEnabled"): sdr["enabled"] = bool(self.scan.cbSdrEnabled.isChecked()) if hasattr(self.scan, "leSdrDeviceModel"): sdr["device_model"] = self.scan.leSdrDeviceModel.text().strip() if hasattr(self.scan, "sbSdrSampleRate"): sdr["sample_rate"] = int(self.scan.sbSdrSampleRate.value()) if hasattr(self.scan, "dsbSdrFreq"): sdr["freq"] = float(self.scan.dsbSdrFreq.value()) if hasattr(self.scan, "dsbSdrAmpl"): sdr["ampl"] = float(self.scan.dsbSdrAmpl.value()) if hasattr(self.scan, "dsbSdrGain"): sdr["gain"] = float(self.scan.dsbSdrGain.value()) if hasattr(self.scan, "leSdrFile"): sdr["file"] = self.scan.leSdrFile.text().strip() gru = payload["gru"] if hasattr(self.scan, "cbGruEnabled"): gru["enabled"] = bool(self.scan.cbGruEnabled.isChecked()) if hasattr(self.scan, "leGruDeviceModel"): gru["device_model"] = self.scan.leGruDeviceModel.text().strip() if hasattr(self.scan, "leGruIp"): gru["ip"] = self.scan.leGruIp.text().strip() if hasattr(self.scan, "leGruFile"): gru["file"] = self.scan.leGruFile.text().strip() seq_path = (self._sequence_file_path or "").strip() if seq_path and os.path.isfile(seq_path) and not sync.get("file"): sync["file"] = seq_path return payload def _parse_number_list( self, raw_text: str, field_name: str, cast: Callable[[str], Any], ) -> Optional[list]: text = (raw_text or "").strip() if not text: return [] parts = [p.strip() for p in text.replace(";", ",").split(",") if p.strip()] out: list = [] try: for p in parts: out.append(cast(p)) except Exception: QMessageBox.warning(self, "Measurement payload", f"Invalid {field_name} list.") return None return out def _wire_measurement_payload_preview(self) -> None: if not hasattr(self, "scan") or self.scan is None: return names = ( "leMeasurementEngine", "leMeasurementInfo", "cbOscEnabled", "leOscDeviceModel", "sbOscSampleRate", "leOscPoints", "leOscChannels", "leOscRanges", "sbOscNTriggers", "sbOscAveraging", "sbOscTriggerChannel", "sbOscThreshold", "sbOscDirection", "sbOscAutomeasureTime", "cbSyncEnabled", "leSyncDeviceModel", "leSyncFile", "leSyncPort", "cbSdrEnabled", "leSdrDeviceModel", "sbSdrSampleRate", "dsbSdrFreq", "dsbSdrAmpl", "dsbSdrGain", "leSdrFile", "cbGruEnabled", "leGruDeviceModel", "leGruIp", "leGruFile", ) for name in names: if not hasattr(self.scan, name): continue w = getattr(self.scan, name) sig = getattr(w, "textChanged", None) if sig is not None: sig.connect(self._refresh_measurement_payload_preview) continue sig = getattr(w, "valueChanged", None) if sig is not None: sig.connect(self._refresh_measurement_payload_preview) continue sig = getattr(w, "stateChanged", None) if sig is not None: sig.connect(self._refresh_measurement_payload_preview) def _refresh_measurement_payload_preview(self, *_args) -> None: if not hasattr(self.scan, "pteMeasurementPayload"): return payload = self._build_measurement_payload() if payload is None: return self.scan.pteMeasurementPayload.setPlainText(json.dumps(payload, ensure_ascii=False, indent=2)) def _extract_scan_id(self, data: Any) -> str: if isinstance(data, dict): for key in ("measurement_id", "id", "scan_id"): val = data.get(key) if val is not None and str(val).strip(): return str(val) return "" def scan_fetch_measurement_data(self) -> None: if not self._scan_id: QMessageBox.warning(self, "Measurement data", "No measurement id. Start scan first.") return ep_tpl = ENDPOINTS["scan"].get("data_by_id", "") if not ep_tpl: self.log("[SCAN] data endpoint is not configured.") return ep = ep_tpl.format(scan_id=self._scan_id) params: Dict[str, Any] = {} if hasattr(self.scan, "leDataNum"): data_num = self.scan.leDataNum.text().strip() if data_num: try: params["data_num"] = int(data_num) except Exception: QMessageBox.warning(self, "Measurement data", "data_num must be integer.") return if hasattr(self.scan, "leAveragingNum"): averaging_num = self.scan.leAveragingNum.text().strip() if averaging_num: try: params["averaging_num"] = int(averaging_num) except Exception: QMessageBox.warning(self, "Measurement data", "averaging_num must be integer.") return def call() -> ApiResult: return self.hardware_api.get_json(ep, params=params or None) def on_success(res: ApiResult) -> None: size = 0 if isinstance(res.data, dict): size = len(json.dumps(res.data, ensure_ascii=False)) self.log(f"[SCAN] measurement data loaded. bytes~{size}") if hasattr(self.scan, "lblMeasurementStatus"): self.scan.lblMeasurementStatus.setText("Measurement status: data loaded") self._run_api_result( call, pre_log=f"[SCAN] requesting measurement data: id={self._scan_id}", on_success=on_success, error_msg="[SCAN] measurement data error: {error}", thread_error_msg="[SCAN] measurement data thread error: {error}", ) def _current_sequence_params(self) -> dict: return self._steam_params if self._steam_params else {} def _service_base_url(self, client: Any) -> str: return (getattr(client, "base_url", "") or "").rstrip("/") def _set_service_ping_label( self, *, label_attr: str, service_name: str, base_url: str, ok: Optional[bool], msg: str = "", include_error_tail: bool = False, ) -> None: if not hasattr(self.scan, label_attr): return label = getattr(self.scan, label_attr) if ok is None: label.setText(f"* {service_name}: checking ({base_url})") label.setStyleSheet("color: #808080;") return if ok: label.setText(f"* {service_name}: online ({base_url})") label.setStyleSheet("color: #2e7d32;") return tail = f" - {msg}" if include_error_tail and msg else "" label.setText(f"* {service_name}: offline ({base_url}){tail}") label.setStyleSheet("color: #c62828;") def _ping_service( self, *, client: Any, set_label_state: Callable[[Optional[bool], str], None], ) -> None: set_label_state(None, "") base = self._service_base_url(client) def job() -> Tuple[bool, str]: try: r = requests.get(base, timeout=2.0) return True, f"HTTP {r.status_code}" except Exception as e: return False, str(e) def ok(result: Tuple[bool, str]) -> None: state, msg = result set_label_state(state, msg if not state else "") def err(msg: str) -> None: set_label_state(False, msg) self._run_async(job, ok, err) def _run_async( self, job: Callable[[], Any], ok: Callable[[Any], None], err: Callable[[str], None], pre_log: Optional[str] = None, ) -> None: if pre_log: self.log(pre_log) run_in_thread(self, job, ok, err) def _run_api_result( self, call: Callable[[], ApiResult], *, pre_log: Optional[str] = None, on_success: Optional[Callable[[ApiResult], None]] = None, on_error: Optional[Callable[[ApiResult], None]] = None, on_thread_error: Optional[Callable[[str], None]] = None, success_msg: Optional[str] = None, error_msg: Optional[str] = None, thread_error_msg: Optional[str] = None, success_msg_fn: Optional[Callable[[ApiResult], str]] = None, error_msg_fn: Optional[Callable[[ApiResult], str]] = None, thread_error_msg_fn: Optional[Callable[[str], str]] = None, ) -> None: def ok(res: ApiResult) -> None: if res.ok: if success_msg_fn is not None: self.log(success_msg_fn(res)) elif success_msg: self.log(success_msg.format(status_code=res.status_code)) if on_success is not None: on_success(res) else: if error_msg_fn is not None: self.log(error_msg_fn(res)) elif error_msg: self.log(error_msg.format(error=res.error, status_code=res.status_code)) if on_error is not None: on_error(res) def err(msg: str) -> None: if thread_error_msg_fn is not None: self.log(thread_error_msg_fn(msg)) elif thread_error_msg: self.log(thread_error_msg.format(error=msg)) if on_thread_error is not None: on_thread_error(msg) self._run_async(call, ok, err, pre_log=pre_log) def backend_connect(self) -> None: self._ping_backend_service() self._ping_interp_service() self.check_system_health("gradient") self.check_system_health("rf") self.check_system_health("adc") def backend_disconnect(self) -> None: self._set_backend_ping(False, "disconnected") self._set_interp_ping(False, "disconnected") self.log("[BACKEND] Disconnected (UI).") self._set_led("gradient", None) self._set_led("rf", None) self._set_led("adc", None) def _set_backend_ping(self, ok: Optional[bool], msg: str = "") -> None: self._set_service_ping_label( label_attr="lblBackendPing", service_name="srv-hardware", base_url=self._service_base_url(self.hardware_api), ok=ok, msg=msg, include_error_tail=False, ) def _ping_backend_service(self) -> None: self._ping_service( client=self.hardware_api, set_label_state=self._set_backend_ping, ) def _set_interp_ping(self, ok: Optional[bool], msg: str = "") -> None: self._set_service_ping_label( label_attr="lblInterpPing", service_name="srv-interp", base_url=self._service_base_url(self.interp_api), ok=ok, msg=msg, include_error_tail=False, ) def _ping_interp_service(self) -> None: self._ping_service( client=self.interp_api, set_label_state=self._set_interp_ping, ) def _set_led(self, system: str, ok: Optional[bool]) -> None: def set_color(lbl: QLabel, color: str): lbl.setText("*") lbl.setStyleSheet(f"color: {color}; font-size: 18px;") if system == "gradient": lbl_name = "lblGradLed" elif system == "rf": lbl_name = "lblRFLed" elif system == "adc": lbl_name = "lblADCLed" else: return if not hasattr(self.scan, lbl_name): return lbl = getattr(self.scan, lbl_name) if ok is None: set_color(lbl, "#808080") elif ok: set_color(lbl, "#2e7d32") else: set_color(lbl, "#c62828") def check_system_health(self, system: str) -> None: ep = ENDPOINTS["systems"][system]["health"] def job() -> Tuple[str, ApiResult]: return system, self.hardware_api.get_json(ep) def ok(result: Tuple[str, ApiResult]) -> None: sys_name, res = result if res.ok: ok_flag = True msg = "OK" if isinstance(res.data, dict): ok_flag = bool(res.data.get("ok", True)) msg = str(res.data.get("message", "OK")) self._set_led(sys_name, ok_flag) self.log(f"[{sys_name.upper()}] health: {msg}") else: self._set_led(sys_name, False) self.log(f"[{sys_name.upper()}] health error: {res.error}") def err(msg: str) -> None: self._set_led(system, False) self.log(f"[{system.upper()}] health thread error: {msg}") self._run_async(job, ok, err) def system_load(self, system: str) -> None: path, _ = QFileDialog.getOpenFileName(self, f"Select config for {system}", "", "All files (*)") if not path: return ep = ENDPOINTS["systems"][system]["upload_config"] def call() -> ApiResult: return self.hardware_api.post_file(ep, path) self._run_system_action( system=system, call=call, pre_log=f"[{system.upper()}] uploading config...", success_msg=f"[{system.upper()}] config uploaded (HTTP {{status_code}})", error_msg=f"[{system.upper()}] upload error: {{error}}", thread_error_msg=f"[{system.upper()}] upload thread error: {{error}}", ) def system_start(self, system: str) -> None: ep = ENDPOINTS["systems"][system]["start"] def call() -> ApiResult: return self.hardware_api.post_json(ep, payload={}) self._run_system_action( system=system, call=call, pre_log=f"[{system.upper()}] starting...", success_msg=f"[{system.upper()}] started (HTTP {{status_code}})", error_msg=f"[{system.upper()}] start error: {{error}}", thread_error_msg=f"[{system.upper()}] start thread error: {{error}}", ) def _run_system_action( self, *, system: str, call: Callable[[], ApiResult], pre_log: str, success_msg: str, error_msg: str, thread_error_msg: str, ) -> None: def success_hook(_res: ApiResult) -> None: self.check_system_health(system) def error_hook(_res: ApiResult) -> None: self._set_led(system, False) def thread_error_hook(_msg: str) -> None: self._set_led(system, False) self._run_api_result( call, pre_log=pre_log, success_msg=success_msg, error_msg=error_msg, thread_error_msg=thread_error_msg, on_success=success_hook, on_error=error_hook, on_thread_error=thread_error_hook, ) def scan_prepare(self) -> None: self.log("[SCAN] prepare requested.") if hasattr(self.scan, "pteMeasurementPayload"): payload = self._build_measurement_payload() if payload is None: return self.log(f"[SCAN] measurement payload is valid. keys={len(payload)}") if hasattr(self.scan, "lblMeasurementStatus"): self.scan.lblMeasurementStatus.setText("Measurement status: payload validated") return loaded_path = (self._sequence_file_path or "").lower() if loaded_path.endswith(".seq"): self.log("[SCAN] .seq loaded: skipping JSON params upload.") else: if self._current_sequence_params(): self.send_sequence_params_to_backend() else: self.log("[SEQ] params upload skipped: no sequence params in UI.") self.send_sequence_interpret_to_backend() def scan_load_to_scanner(self) -> None: default_path = (self._sequence_file_path or "").strip() default_dir = os.path.dirname(default_path) if default_path else "" file_path, _ = QFileDialog.getOpenFileName( self, "Select .seq file to load to scanner", default_dir, "Sequence (*.seq);;All files (*)", ) if not file_path: self.log("[SEQ] load to scanner canceled.") return self._sequence_file_path = file_path self._refresh_current_sequence_ui() self.send_sequence_interpret_to_backend(file_path=file_path) def send_sequence_params_to_backend(self) -> None: payload = self._current_sequence_params() ep = ENDPOINTS["sequence_params"] def call() -> ApiResult: return self.hardware_api.post_json(ep, payload) self._run_api_result( call, pre_log="[SEQ] sending params to backend...", success_msg="[SEQ] params sent (HTTP {status_code})", error_msg="[SEQ] send error: {error}", thread_error_msg="[SEQ] send thread error: {error}", ) def send_sequence_interpret_to_backend(self, file_path: Optional[str] = None) -> None: ep = ENDPOINTS["sequence_interpret"]["interpret"] ep_status = ENDPOINTS["sequence_interpret"]["status"] file_path = (file_path or self._sequence_file_path or "").strip() if not file_path or not os.path.isfile(file_path): file_path, _ = QFileDialog.getOpenFileName( self, "Select .seq file for interpretation", "", "Sequence (*.seq);;All files (*)", ) if not file_path: self.log("[SEQ] interpretation skipped: .seq file was not selected.") return self._sequence_file_path = file_path self._refresh_current_sequence_ui() def job() -> Tuple[ApiResult, ApiResult]: upload_res = self.interp_api.post_file(ep, file_path, field_name="file") status_res = self.interp_api.get_json(ep_status) if upload_res.ok else ApiResult(ok=False, status_code=0, error="") return upload_res, status_res def ok(result: Tuple[ApiResult, ApiResult]) -> None: upload_res, status_res = result if upload_res.ok: self.log(f"[SEQ] interpret upload sent (HTTP {upload_res.status_code})") if status_res.ok and isinstance(status_res.data, dict): tasks = status_res.data.get("tasks") self.log(f"[SEQ] interpret status: {short_json(tasks if tasks is not None else status_res.data)}") else: self.log(f"[SEQ] interpretation error: {upload_res.error}") def err(msg: str) -> None: self.log(f"[SEQ] interpretation thread error: {msg}") self._run_async(job, ok, err, pre_log=f"[SEQ] sending interpretation file: {short_path(file_path)}") def _refresh_current_sequence_ui(self) -> None: if not hasattr(self, "scan") or self.scan is None: return line = getattr(self.scan, "leCurrentSequence", None) if line is None: return path = (self._sequence_file_path or "").strip() if path and path.lower().endswith(".seq"): line.setText(path) else: line.clear() def scan_interpreter_status(self) -> None: ep_status = ENDPOINTS["sequence_interpret"]["status"] def call() -> ApiResult: return self.interp_api.get_json(ep_status) self._run_api_result( call, pre_log="[SEQ] requesting interpreter status...", success_msg_fn=lambda res: f"[SEQ] interpreter status: {short_json(res.data)}", error_msg="[SEQ] interpreter status error: {error}", thread_error_msg="[SEQ] interpreter status thread error: {error}", ) def scan_start(self) -> None: self.scan.pbScanProgress.setRange(0, 100) self.scan.pbScanProgress.setValue(0) payload = self._build_measurement_payload() if payload is None: return ep = ENDPOINTS["scan"]["start"] ep_fallback_get = ENDPOINTS["scan"].get("start_fallback_get", ep) def call() -> ApiResult: res = self.hardware_api.post_json(ep, payload) if res.ok: return res # Some measurement services start acquisition via GET /api/measurement return self.hardware_api.get_json(ep_fallback_get) def success_hook(res: ApiResult) -> None: self._scan_id = self._extract_scan_id(res.data) self.log(f"[SCAN] started. scan_id={self._scan_id}") if hasattr(self.scan, "lblMeasurementStatus"): sid = self._scan_id if self._scan_id else "unknown" self.scan.lblMeasurementStatus.setText(f"Measurement status: running (id={sid})") self.scan_timer.start() self._run_api_result( call, pre_log="[SCAN] starting...", on_success=success_hook, error_msg="[SCAN] start error: {error}", thread_error_msg="[SCAN] start thread error: {error}", ) def scan_abort(self) -> None: self.log("[SCAN] abort requested.") self.scan_stop() def scan_stop(self) -> None: self.scan_timer.stop() ep = ENDPOINTS["scan"].get("stop", "") if not ep: self.log("[SCAN] stopped (local timer only).") return def call() -> ApiResult: return self.hardware_api.post_json(ep, payload={"scan_id": self._scan_id}) self._run_api_result( call, success_msg="[SCAN] stopped.", error_msg="[SCAN] stop endpoint unavailable: {error}", thread_error_msg="[SCAN] stop thread error: {error}", ) def poll_scan_progress(self) -> None: state_by_id_tpl = ENDPOINTS["scan"].get("state_by_id", "") ep_state_legacy = ENDPOINTS["scan"].get("state", "") ep_progress_legacy = ENDPOINTS["scan"].get("progress", "") def call() -> ApiResult: if self._scan_id and state_by_id_tpl: ep_state = state_by_id_tpl.format(scan_id=self._scan_id) res = self.hardware_api.get_json(ep_state) if res.ok: return res if ep_state_legacy: res = self.hardware_api.get_json(ep_state_legacy) if res.ok: return res if ep_progress_legacy: return self.hardware_api.get_json(ep_progress_legacy, params={"scan_id": self._scan_id}) return ApiResult(ok=False, status_code=0, error="No scan state/progress endpoint configured") def on_success(res: ApiResult) -> None: progress = None message = "" done = False if isinstance(res.data, dict): progress = res.data.get("progress", None) message = str(res.data.get("message", "")) done = bool(res.data.get("done", False)) if not message: status = res.data.get("status") if status is not None: message = str(status) if not done: done = bool(res.data.get("data_ready", False)) if progress is None: if done: progress = 100 elif message: s = message.lower() if any(x in s for x in ("run", "progress", "acquir", "measure")): progress = 50 if isinstance(progress, (int, float)): self.scan.pbScanProgress.setRange(0, 100) self.scan.pbScanProgress.setValue(int(progress)) if message: self.statusbar.showMessage(message, 1500) if hasattr(self.scan, "lblMeasurementStatus"): self.scan.lblMeasurementStatus.setText(f"Measurement status: {message}") if done or (isinstance(progress, (int, float)) and progress >= 100): self.scan_timer.stop() self.log("[SCAN] finished.") if hasattr(self.scan, "lblMeasurementStatus"): self.scan.lblMeasurementStatus.setText("Measurement status: data_ready") self._run_api_result( call, on_success=on_success, error_msg="[SCAN] progress error: {error}", thread_error_msg="[SCAN] progress thread error: {error}", )