| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838 |
- from __future__ import annotations
- import json
- import os
- from typing import Any, Callable, Dict, Optional, Tuple
- import requests
- from PyQt6.QtWidgets import QFileDialog, QLabel, QMessageBox
- from config import ENDPOINTS
- from mainui.clients import ApiResult
- from mainui.utils import short_json, short_path
- from mainui.worker import run_in_thread
- class ScanTabMixin:
- def _init_scan_measurement_ui(self) -> None:
- if not hasattr(self, "scan") or self.scan is None:
- return
- if not hasattr(self.scan, "pteMeasurementPayload"):
- return
- t = self._measurement_payload_template()
- if hasattr(self.scan, "leMeasurementEngine"):
- self.scan.leMeasurementEngine.setText(str(t["engine"]))
- if hasattr(self.scan, "leMeasurementInfo"):
- self.scan.leMeasurementInfo.setText(str(t["infostr"]))
- osc = t["oscilloscope"]
- if hasattr(self.scan, "cbOscEnabled"):
- self.scan.cbOscEnabled.setChecked(bool(osc["enabled"]))
- if hasattr(self.scan, "leOscDeviceModel"):
- self.scan.leOscDeviceModel.setText(str(osc["device_model"]))
- if hasattr(self.scan, "sbOscSampleRate"):
- self.scan.sbOscSampleRate.setValue(int(osc["sample_rate"]))
- if hasattr(self.scan, "leOscPoints"):
- self.scan.leOscPoints.setText(",".join(str(x) for x in osc["points"]))
- if hasattr(self.scan, "leOscChannels"):
- self.scan.leOscChannels.setText(",".join(str(x) for x in osc["channels"]))
- if hasattr(self.scan, "leOscRanges"):
- self.scan.leOscRanges.setText(",".join(str(x) for x in osc["ranges"]))
- if hasattr(self.scan, "sbOscNTriggers"):
- self.scan.sbOscNTriggers.setValue(int(osc["ntriggers"]))
- if hasattr(self.scan, "sbOscAveraging"):
- self.scan.sbOscAveraging.setValue(int(osc["averaging"]))
- if hasattr(self.scan, "sbOscTriggerChannel"):
- self.scan.sbOscTriggerChannel.setValue(int(osc["trigger_channel"]))
- if hasattr(self.scan, "sbOscThreshold"):
- self.scan.sbOscThreshold.setValue(int(osc["threshold"]))
- if hasattr(self.scan, "sbOscDirection"):
- self.scan.sbOscDirection.setValue(int(osc["direction"]))
- if hasattr(self.scan, "sbOscAutomeasureTime"):
- self.scan.sbOscAutomeasureTime.setValue(int(osc["automeasure_time"]))
- sync = t["synchronizer"]
- if hasattr(self.scan, "cbSyncEnabled"):
- self.scan.cbSyncEnabled.setChecked(bool(sync["enabled"]))
- if hasattr(self.scan, "leSyncDeviceModel"):
- self.scan.leSyncDeviceModel.setText(str(sync["device_model"]))
- if hasattr(self.scan, "leSyncFile"):
- self.scan.leSyncFile.setText(str(sync["file"]))
- if hasattr(self.scan, "leSyncPort"):
- self.scan.leSyncPort.setText(str(sync["port"]))
- sdr = t["sdr"]
- if hasattr(self.scan, "cbSdrEnabled"):
- self.scan.cbSdrEnabled.setChecked(bool(sdr["enabled"]))
- if hasattr(self.scan, "leSdrDeviceModel"):
- self.scan.leSdrDeviceModel.setText(str(sdr["device_model"]))
- if hasattr(self.scan, "sbSdrSampleRate"):
- self.scan.sbSdrSampleRate.setValue(int(sdr["sample_rate"]))
- if hasattr(self.scan, "dsbSdrFreq"):
- self.scan.dsbSdrFreq.setValue(float(sdr["freq"]))
- if hasattr(self.scan, "dsbSdrAmpl"):
- self.scan.dsbSdrAmpl.setValue(float(sdr["ampl"]))
- if hasattr(self.scan, "dsbSdrGain"):
- self.scan.dsbSdrGain.setValue(float(sdr["gain"]))
- if hasattr(self.scan, "leSdrFile"):
- self.scan.leSdrFile.setText(str(sdr["file"]))
- gru = t["gru"]
- if hasattr(self.scan, "cbGruEnabled"):
- self.scan.cbGruEnabled.setChecked(bool(gru["enabled"]))
- if hasattr(self.scan, "leGruDeviceModel"):
- self.scan.leGruDeviceModel.setText(str(gru["device_model"]))
- if hasattr(self.scan, "leGruIp"):
- self.scan.leGruIp.setText(str(gru["ip"]))
- if hasattr(self.scan, "leGruFile"):
- self.scan.leGruFile.setText(str(gru["file"]))
- self._wire_measurement_payload_preview()
- self._refresh_measurement_payload_preview()
- if hasattr(self.scan, "lblMeasurementStatus"):
- self.scan.lblMeasurementStatus.setText("Measurement status: ready")
- def _measurement_payload_template(self) -> Dict[str, Any]:
- return {
- "oscilloscope": {
- "device_model": "",
- "sample_rate": 8000000,
- "points": [1024],
- "channels": [0],
- "ranges": [5],
- "ntriggers": 1,
- "averaging": 1,
- "trigger_channel": 0,
- "threshold": 0,
- "direction": 2,
- "automeasure_time": 0,
- "enabled": True,
- },
- "synchronizer": {"device_model": "", "file": "", "port": "", "enabled": False},
- "sdr": {
- "device_model": "",
- "sample_rate": 0,
- "freq": 0,
- "ampl": 0,
- "gain": 0,
- "file": "",
- "enabled": False,
- },
- "gru": {"device_model": "", "ip": "", "file": "", "enabled": False},
- "engine": "default",
- "infostr": "mainui",
- }
- def _build_measurement_payload(self) -> Optional[Dict[str, Any]]:
- payload = self._measurement_payload_template()
- if hasattr(self.scan, "leMeasurementEngine"):
- payload["engine"] = self.scan.leMeasurementEngine.text().strip() or "default"
- if hasattr(self.scan, "leMeasurementInfo"):
- payload["infostr"] = self.scan.leMeasurementInfo.text().strip()
- osc = payload["oscilloscope"]
- if hasattr(self.scan, "cbOscEnabled"):
- osc["enabled"] = bool(self.scan.cbOscEnabled.isChecked())
- if hasattr(self.scan, "leOscDeviceModel"):
- osc["device_model"] = self.scan.leOscDeviceModel.text().strip()
- if hasattr(self.scan, "sbOscSampleRate"):
- osc["sample_rate"] = int(self.scan.sbOscSampleRate.value())
- if hasattr(self.scan, "leOscPoints"):
- parsed = self._parse_number_list(self.scan.leOscPoints.text(), "points", int)
- if parsed is None:
- return None
- osc["points"] = parsed
- if hasattr(self.scan, "leOscChannels"):
- parsed = self._parse_number_list(self.scan.leOscChannels.text(), "channels", int)
- if parsed is None:
- return None
- osc["channels"] = parsed
- if hasattr(self.scan, "leOscRanges"):
- parsed = self._parse_number_list(self.scan.leOscRanges.text(), "ranges", float)
- if parsed is None:
- return None
- osc["ranges"] = parsed
- if hasattr(self.scan, "sbOscNTriggers"):
- osc["ntriggers"] = int(self.scan.sbOscNTriggers.value())
- if hasattr(self.scan, "sbOscAveraging"):
- osc["averaging"] = int(self.scan.sbOscAveraging.value())
- if hasattr(self.scan, "sbOscTriggerChannel"):
- osc["trigger_channel"] = int(self.scan.sbOscTriggerChannel.value())
- if hasattr(self.scan, "sbOscThreshold"):
- osc["threshold"] = int(self.scan.sbOscThreshold.value())
- if hasattr(self.scan, "sbOscDirection"):
- osc["direction"] = int(self.scan.sbOscDirection.value())
- if hasattr(self.scan, "sbOscAutomeasureTime"):
- osc["automeasure_time"] = int(self.scan.sbOscAutomeasureTime.value())
- sync = payload["synchronizer"]
- if hasattr(self.scan, "cbSyncEnabled"):
- sync["enabled"] = bool(self.scan.cbSyncEnabled.isChecked())
- if hasattr(self.scan, "leSyncDeviceModel"):
- sync["device_model"] = self.scan.leSyncDeviceModel.text().strip()
- if hasattr(self.scan, "leSyncFile"):
- sync["file"] = self.scan.leSyncFile.text().strip()
- if hasattr(self.scan, "leSyncPort"):
- sync["port"] = self.scan.leSyncPort.text().strip()
- sdr = payload["sdr"]
- if hasattr(self.scan, "cbSdrEnabled"):
- sdr["enabled"] = bool(self.scan.cbSdrEnabled.isChecked())
- if hasattr(self.scan, "leSdrDeviceModel"):
- sdr["device_model"] = self.scan.leSdrDeviceModel.text().strip()
- if hasattr(self.scan, "sbSdrSampleRate"):
- sdr["sample_rate"] = int(self.scan.sbSdrSampleRate.value())
- if hasattr(self.scan, "dsbSdrFreq"):
- sdr["freq"] = float(self.scan.dsbSdrFreq.value())
- if hasattr(self.scan, "dsbSdrAmpl"):
- sdr["ampl"] = float(self.scan.dsbSdrAmpl.value())
- if hasattr(self.scan, "dsbSdrGain"):
- sdr["gain"] = float(self.scan.dsbSdrGain.value())
- if hasattr(self.scan, "leSdrFile"):
- sdr["file"] = self.scan.leSdrFile.text().strip()
- gru = payload["gru"]
- if hasattr(self.scan, "cbGruEnabled"):
- gru["enabled"] = bool(self.scan.cbGruEnabled.isChecked())
- if hasattr(self.scan, "leGruDeviceModel"):
- gru["device_model"] = self.scan.leGruDeviceModel.text().strip()
- if hasattr(self.scan, "leGruIp"):
- gru["ip"] = self.scan.leGruIp.text().strip()
- if hasattr(self.scan, "leGruFile"):
- gru["file"] = self.scan.leGruFile.text().strip()
- seq_path = (self._sequence_file_path or "").strip()
- if seq_path and os.path.isfile(seq_path) and not sync.get("file"):
- sync["file"] = seq_path
- return payload
- def _parse_number_list(
- self,
- raw_text: str,
- field_name: str,
- cast: Callable[[str], Any],
- ) -> Optional[list]:
- text = (raw_text or "").strip()
- if not text:
- return []
- parts = [p.strip() for p in text.replace(";", ",").split(",") if p.strip()]
- out: list = []
- try:
- for p in parts:
- out.append(cast(p))
- except Exception:
- QMessageBox.warning(self, "Measurement payload", f"Invalid {field_name} list.")
- return None
- return out
- def _wire_measurement_payload_preview(self) -> None:
- if not hasattr(self, "scan") or self.scan is None:
- return
- names = (
- "leMeasurementEngine",
- "leMeasurementInfo",
- "cbOscEnabled",
- "leOscDeviceModel",
- "sbOscSampleRate",
- "leOscPoints",
- "leOscChannels",
- "leOscRanges",
- "sbOscNTriggers",
- "sbOscAveraging",
- "sbOscTriggerChannel",
- "sbOscThreshold",
- "sbOscDirection",
- "sbOscAutomeasureTime",
- "cbSyncEnabled",
- "leSyncDeviceModel",
- "leSyncFile",
- "leSyncPort",
- "cbSdrEnabled",
- "leSdrDeviceModel",
- "sbSdrSampleRate",
- "dsbSdrFreq",
- "dsbSdrAmpl",
- "dsbSdrGain",
- "leSdrFile",
- "cbGruEnabled",
- "leGruDeviceModel",
- "leGruIp",
- "leGruFile",
- )
- for name in names:
- if not hasattr(self.scan, name):
- continue
- w = getattr(self.scan, name)
- sig = getattr(w, "textChanged", None)
- if sig is not None:
- sig.connect(self._refresh_measurement_payload_preview)
- continue
- sig = getattr(w, "valueChanged", None)
- if sig is not None:
- sig.connect(self._refresh_measurement_payload_preview)
- continue
- sig = getattr(w, "stateChanged", None)
- if sig is not None:
- sig.connect(self._refresh_measurement_payload_preview)
- def _refresh_measurement_payload_preview(self, *_args) -> None:
- if not hasattr(self.scan, "pteMeasurementPayload"):
- return
- payload = self._build_measurement_payload()
- if payload is None:
- return
- self.scan.pteMeasurementPayload.setPlainText(json.dumps(payload, ensure_ascii=False, indent=2))
- def _extract_scan_id(self, data: Any) -> str:
- if isinstance(data, dict):
- for key in ("measurement_id", "id", "scan_id"):
- val = data.get(key)
- if val is not None and str(val).strip():
- return str(val)
- return ""
- def scan_fetch_measurement_data(self) -> None:
- if not self._scan_id:
- QMessageBox.warning(self, "Measurement data", "No measurement id. Start scan first.")
- return
- ep_tpl = ENDPOINTS["scan"].get("data_by_id", "")
- if not ep_tpl:
- self.log("[SCAN] data endpoint is not configured.")
- return
- ep = ep_tpl.format(scan_id=self._scan_id)
- params: Dict[str, Any] = {}
- if hasattr(self.scan, "leDataNum"):
- data_num = self.scan.leDataNum.text().strip()
- if data_num:
- try:
- params["data_num"] = int(data_num)
- except Exception:
- QMessageBox.warning(self, "Measurement data", "data_num must be integer.")
- return
- if hasattr(self.scan, "leAveragingNum"):
- averaging_num = self.scan.leAveragingNum.text().strip()
- if averaging_num:
- try:
- params["averaging_num"] = int(averaging_num)
- except Exception:
- QMessageBox.warning(self, "Measurement data", "averaging_num must be integer.")
- return
- def call() -> ApiResult:
- return self.hardware_api.get_json(ep, params=params or None)
- def on_success(res: ApiResult) -> None:
- size = 0
- if isinstance(res.data, dict):
- size = len(json.dumps(res.data, ensure_ascii=False))
- self.log(f"[SCAN] measurement data loaded. bytes~{size}")
- if hasattr(self.scan, "lblMeasurementStatus"):
- self.scan.lblMeasurementStatus.setText("Measurement status: data loaded")
- self._run_api_result(
- call,
- pre_log=f"[SCAN] requesting measurement data: id={self._scan_id}",
- on_success=on_success,
- error_msg="[SCAN] measurement data error: {error}",
- thread_error_msg="[SCAN] measurement data thread error: {error}",
- )
- def _current_sequence_params(self) -> dict:
- return self._steam_params if self._steam_params else {}
- def _service_base_url(self, client: Any) -> str:
- return (getattr(client, "base_url", "") or "").rstrip("/")
- def _set_service_ping_label(
- self,
- *,
- label_attr: str,
- service_name: str,
- base_url: str,
- ok: Optional[bool],
- msg: str = "",
- include_error_tail: bool = False,
- ) -> None:
- if not hasattr(self.scan, label_attr):
- return
- label = getattr(self.scan, label_attr)
- if ok is None:
- label.setText(f"* {service_name}: checking ({base_url})")
- label.setStyleSheet("color: #808080;")
- return
- if ok:
- label.setText(f"* {service_name}: online ({base_url})")
- label.setStyleSheet("color: #2e7d32;")
- return
- tail = f" - {msg}" if include_error_tail and msg else ""
- label.setText(f"* {service_name}: offline ({base_url}){tail}")
- label.setStyleSheet("color: #c62828;")
- def _ping_service(
- self,
- *,
- client: Any,
- set_label_state: Callable[[Optional[bool], str], None],
- ) -> None:
- set_label_state(None, "")
- base = self._service_base_url(client)
- def job() -> Tuple[bool, str]:
- try:
- r = requests.get(base, timeout=2.0)
- return True, f"HTTP {r.status_code}"
- except Exception as e:
- return False, str(e)
- def ok(result: Tuple[bool, str]) -> None:
- state, msg = result
- set_label_state(state, msg if not state else "")
- def err(msg: str) -> None:
- set_label_state(False, msg)
- self._run_async(job, ok, err)
- def _run_async(
- self,
- job: Callable[[], Any],
- ok: Callable[[Any], None],
- err: Callable[[str], None],
- pre_log: Optional[str] = None,
- ) -> None:
- if pre_log:
- self.log(pre_log)
- run_in_thread(self, job, ok, err)
- def _run_api_result(
- self,
- call: Callable[[], ApiResult],
- *,
- pre_log: Optional[str] = None,
- on_success: Optional[Callable[[ApiResult], None]] = None,
- on_error: Optional[Callable[[ApiResult], None]] = None,
- on_thread_error: Optional[Callable[[str], None]] = None,
- success_msg: Optional[str] = None,
- error_msg: Optional[str] = None,
- thread_error_msg: Optional[str] = None,
- success_msg_fn: Optional[Callable[[ApiResult], str]] = None,
- error_msg_fn: Optional[Callable[[ApiResult], str]] = None,
- thread_error_msg_fn: Optional[Callable[[str], str]] = None,
- ) -> None:
- def ok(res: ApiResult) -> None:
- if res.ok:
- if success_msg_fn is not None:
- self.log(success_msg_fn(res))
- elif success_msg:
- self.log(success_msg.format(status_code=res.status_code))
- if on_success is not None:
- on_success(res)
- else:
- if error_msg_fn is not None:
- self.log(error_msg_fn(res))
- elif error_msg:
- self.log(error_msg.format(error=res.error, status_code=res.status_code))
- if on_error is not None:
- on_error(res)
- def err(msg: str) -> None:
- if thread_error_msg_fn is not None:
- self.log(thread_error_msg_fn(msg))
- elif thread_error_msg:
- self.log(thread_error_msg.format(error=msg))
- if on_thread_error is not None:
- on_thread_error(msg)
- self._run_async(call, ok, err, pre_log=pre_log)
- def backend_connect(self) -> None:
- self._ping_backend_service()
- self._ping_interp_service()
- self.check_system_health("gradient")
- self.check_system_health("rf")
- self.check_system_health("adc")
- def backend_disconnect(self) -> None:
- self._set_backend_ping(False, "disconnected")
- self._set_interp_ping(False, "disconnected")
- self.log("[BACKEND] Disconnected (UI).")
- self._set_led("gradient", None)
- self._set_led("rf", None)
- self._set_led("adc", None)
- def _set_backend_ping(self, ok: Optional[bool], msg: str = "") -> None:
- self._set_service_ping_label(
- label_attr="lblBackendPing",
- service_name="srv-hardware",
- base_url=self._service_base_url(self.hardware_api),
- ok=ok,
- msg=msg,
- include_error_tail=False,
- )
- def _ping_backend_service(self) -> None:
- self._ping_service(
- client=self.hardware_api,
- set_label_state=self._set_backend_ping,
- )
- def _set_interp_ping(self, ok: Optional[bool], msg: str = "") -> None:
- self._set_service_ping_label(
- label_attr="lblInterpPing",
- service_name="srv-interp",
- base_url=self._service_base_url(self.interp_api),
- ok=ok,
- msg=msg,
- include_error_tail=False,
- )
- def _ping_interp_service(self) -> None:
- self._ping_service(
- client=self.interp_api,
- set_label_state=self._set_interp_ping,
- )
- def _set_led(self, system: str, ok: Optional[bool]) -> None:
- def set_color(lbl: QLabel, color: str):
- lbl.setText("*")
- lbl.setStyleSheet(f"color: {color}; font-size: 18px;")
- if system == "gradient":
- lbl_name = "lblGradLed"
- elif system == "rf":
- lbl_name = "lblRFLed"
- elif system == "adc":
- lbl_name = "lblADCLed"
- else:
- return
- if not hasattr(self.scan, lbl_name):
- return
- lbl = getattr(self.scan, lbl_name)
- if ok is None:
- set_color(lbl, "#808080")
- elif ok:
- set_color(lbl, "#2e7d32")
- else:
- set_color(lbl, "#c62828")
- def check_system_health(self, system: str) -> None:
- ep = ENDPOINTS["systems"][system]["health"]
- def job() -> Tuple[str, ApiResult]:
- return system, self.hardware_api.get_json(ep)
- def ok(result: Tuple[str, ApiResult]) -> None:
- sys_name, res = result
- if res.ok:
- ok_flag = True
- msg = "OK"
- if isinstance(res.data, dict):
- ok_flag = bool(res.data.get("ok", True))
- msg = str(res.data.get("message", "OK"))
- self._set_led(sys_name, ok_flag)
- self.log(f"[{sys_name.upper()}] health: {msg}")
- else:
- self._set_led(sys_name, False)
- self.log(f"[{sys_name.upper()}] health error: {res.error}")
- def err(msg: str) -> None:
- self._set_led(system, False)
- self.log(f"[{system.upper()}] health thread error: {msg}")
- self._run_async(job, ok, err)
- def system_load(self, system: str) -> None:
- path, _ = QFileDialog.getOpenFileName(self, f"Select config for {system}", "", "All files (*)")
- if not path:
- return
- ep = ENDPOINTS["systems"][system]["upload_config"]
- def call() -> ApiResult:
- return self.hardware_api.post_file(ep, path)
- self._run_system_action(
- system=system,
- call=call,
- pre_log=f"[{system.upper()}] uploading config...",
- success_msg=f"[{system.upper()}] config uploaded (HTTP {{status_code}})",
- error_msg=f"[{system.upper()}] upload error: {{error}}",
- thread_error_msg=f"[{system.upper()}] upload thread error: {{error}}",
- )
- def system_start(self, system: str) -> None:
- ep = ENDPOINTS["systems"][system]["start"]
- def call() -> ApiResult:
- return self.hardware_api.post_json(ep, payload={})
- self._run_system_action(
- system=system,
- call=call,
- pre_log=f"[{system.upper()}] starting...",
- success_msg=f"[{system.upper()}] started (HTTP {{status_code}})",
- error_msg=f"[{system.upper()}] start error: {{error}}",
- thread_error_msg=f"[{system.upper()}] start thread error: {{error}}",
- )
- def _run_system_action(
- self,
- *,
- system: str,
- call: Callable[[], ApiResult],
- pre_log: str,
- success_msg: str,
- error_msg: str,
- thread_error_msg: str,
- ) -> None:
- def success_hook(_res: ApiResult) -> None:
- self.check_system_health(system)
- def error_hook(_res: ApiResult) -> None:
- self._set_led(system, False)
- def thread_error_hook(_msg: str) -> None:
- self._set_led(system, False)
- self._run_api_result(
- call,
- pre_log=pre_log,
- success_msg=success_msg,
- error_msg=error_msg,
- thread_error_msg=thread_error_msg,
- on_success=success_hook,
- on_error=error_hook,
- on_thread_error=thread_error_hook,
- )
- def scan_prepare(self) -> None:
- self.log("[SCAN] prepare requested.")
- if hasattr(self.scan, "pteMeasurementPayload"):
- payload = self._build_measurement_payload()
- if payload is None:
- return
- self.log(f"[SCAN] measurement payload is valid. keys={len(payload)}")
- if hasattr(self.scan, "lblMeasurementStatus"):
- self.scan.lblMeasurementStatus.setText("Measurement status: payload validated")
- return
- loaded_path = (self._sequence_file_path or "").lower()
- if loaded_path.endswith(".seq"):
- self.log("[SCAN] .seq loaded: skipping JSON params upload.")
- else:
- if self._current_sequence_params():
- self.send_sequence_params_to_backend()
- else:
- self.log("[SEQ] params upload skipped: no sequence params in UI.")
- self.send_sequence_interpret_to_backend()
- def scan_load_to_scanner(self) -> None:
- default_path = (self._sequence_file_path or "").strip()
- default_dir = os.path.dirname(default_path) if default_path else ""
- file_path, _ = QFileDialog.getOpenFileName(
- self,
- "Select .seq file to load to scanner",
- default_dir,
- "Sequence (*.seq);;All files (*)",
- )
- if not file_path:
- self.log("[SEQ] load to scanner canceled.")
- return
- self._sequence_file_path = file_path
- self._refresh_current_sequence_ui()
- self.send_sequence_interpret_to_backend(file_path=file_path)
- def send_sequence_params_to_backend(self) -> None:
- payload = self._current_sequence_params()
- ep = ENDPOINTS["sequence_params"]
- def call() -> ApiResult:
- return self.hardware_api.post_json(ep, payload)
- self._run_api_result(
- call,
- pre_log="[SEQ] sending params to backend...",
- success_msg="[SEQ] params sent (HTTP {status_code})",
- error_msg="[SEQ] send error: {error}",
- thread_error_msg="[SEQ] send thread error: {error}",
- )
- def send_sequence_interpret_to_backend(self, file_path: Optional[str] = None) -> None:
- ep = ENDPOINTS["sequence_interpret"]["interpret"]
- ep_status = ENDPOINTS["sequence_interpret"]["status"]
- file_path = (file_path or self._sequence_file_path or "").strip()
- if not file_path or not os.path.isfile(file_path):
- file_path, _ = QFileDialog.getOpenFileName(
- self,
- "Select .seq file for interpretation",
- "",
- "Sequence (*.seq);;All files (*)",
- )
- if not file_path:
- self.log("[SEQ] interpretation skipped: .seq file was not selected.")
- return
- self._sequence_file_path = file_path
- self._refresh_current_sequence_ui()
- def job() -> Tuple[ApiResult, ApiResult]:
- upload_res = self.interp_api.post_file(ep, file_path, field_name="file")
- status_res = self.interp_api.get_json(ep_status) if upload_res.ok else ApiResult(ok=False, status_code=0, error="")
- return upload_res, status_res
- def ok(result: Tuple[ApiResult, ApiResult]) -> None:
- upload_res, status_res = result
- if upload_res.ok:
- self.log(f"[SEQ] interpret upload sent (HTTP {upload_res.status_code})")
- if status_res.ok and isinstance(status_res.data, dict):
- tasks = status_res.data.get("tasks")
- self.log(f"[SEQ] interpret status: {short_json(tasks if tasks is not None else status_res.data)}")
- else:
- self.log(f"[SEQ] interpretation error: {upload_res.error}")
- def err(msg: str) -> None:
- self.log(f"[SEQ] interpretation thread error: {msg}")
- self._run_async(job, ok, err, pre_log=f"[SEQ] sending interpretation file: {short_path(file_path)}")
- def _refresh_current_sequence_ui(self) -> None:
- if not hasattr(self, "scan") or self.scan is None:
- return
- line = getattr(self.scan, "leCurrentSequence", None)
- if line is None:
- return
- path = (self._sequence_file_path or "").strip()
- if path and path.lower().endswith(".seq"):
- line.setText(path)
- else:
- line.clear()
- def scan_interpreter_status(self) -> None:
- ep_status = ENDPOINTS["sequence_interpret"]["status"]
- def call() -> ApiResult:
- return self.interp_api.get_json(ep_status)
- self._run_api_result(
- call,
- pre_log="[SEQ] requesting interpreter status...",
- success_msg_fn=lambda res: f"[SEQ] interpreter status: {short_json(res.data)}",
- error_msg="[SEQ] interpreter status error: {error}",
- thread_error_msg="[SEQ] interpreter status thread error: {error}",
- )
- def scan_start(self) -> None:
- self.scan.pbScanProgress.setRange(0, 100)
- self.scan.pbScanProgress.setValue(0)
- payload = self._build_measurement_payload()
- if payload is None:
- return
- ep = ENDPOINTS["scan"]["start"]
- ep_fallback_get = ENDPOINTS["scan"].get("start_fallback_get", ep)
- def call() -> ApiResult:
- res = self.hardware_api.post_json(ep, payload)
- if res.ok:
- return res
- # Some measurement services start acquisition via GET /api/measurement
- return self.hardware_api.get_json(ep_fallback_get)
- def success_hook(res: ApiResult) -> None:
- self._scan_id = self._extract_scan_id(res.data)
- self.log(f"[SCAN] started. scan_id={self._scan_id}")
- if hasattr(self.scan, "lblMeasurementStatus"):
- sid = self._scan_id if self._scan_id else "unknown"
- self.scan.lblMeasurementStatus.setText(f"Measurement status: running (id={sid})")
- self.scan_timer.start()
- self._run_api_result(
- call,
- pre_log="[SCAN] starting...",
- on_success=success_hook,
- error_msg="[SCAN] start error: {error}",
- thread_error_msg="[SCAN] start thread error: {error}",
- )
- def scan_abort(self) -> None:
- self.log("[SCAN] abort requested.")
- self.scan_stop()
- def scan_stop(self) -> None:
- self.scan_timer.stop()
- ep = ENDPOINTS["scan"].get("stop", "")
- if not ep:
- self.log("[SCAN] stopped (local timer only).")
- return
- def call() -> ApiResult:
- return self.hardware_api.post_json(ep, payload={"scan_id": self._scan_id})
- self._run_api_result(
- call,
- success_msg="[SCAN] stopped.",
- error_msg="[SCAN] stop endpoint unavailable: {error}",
- thread_error_msg="[SCAN] stop thread error: {error}",
- )
- def poll_scan_progress(self) -> None:
- state_by_id_tpl = ENDPOINTS["scan"].get("state_by_id", "")
- ep_state_legacy = ENDPOINTS["scan"].get("state", "")
- ep_progress_legacy = ENDPOINTS["scan"].get("progress", "")
- def call() -> ApiResult:
- if self._scan_id and state_by_id_tpl:
- ep_state = state_by_id_tpl.format(scan_id=self._scan_id)
- res = self.hardware_api.get_json(ep_state)
- if res.ok:
- return res
- if ep_state_legacy:
- res = self.hardware_api.get_json(ep_state_legacy)
- if res.ok:
- return res
- if ep_progress_legacy:
- return self.hardware_api.get_json(ep_progress_legacy, params={"scan_id": self._scan_id})
- return ApiResult(ok=False, status_code=0, error="No scan state/progress endpoint configured")
- def on_success(res: ApiResult) -> None:
- progress = None
- message = ""
- done = False
- if isinstance(res.data, dict):
- progress = res.data.get("progress", None)
- message = str(res.data.get("message", ""))
- done = bool(res.data.get("done", False))
- if not message:
- status = res.data.get("status")
- if status is not None:
- message = str(status)
- if not done:
- done = bool(res.data.get("data_ready", False))
- if progress is None:
- if done:
- progress = 100
- elif message:
- s = message.lower()
- if any(x in s for x in ("run", "progress", "acquir", "measure")):
- progress = 50
- if isinstance(progress, (int, float)):
- self.scan.pbScanProgress.setRange(0, 100)
- self.scan.pbScanProgress.setValue(int(progress))
- if message:
- self.statusbar.showMessage(message, 1500)
- if hasattr(self.scan, "lblMeasurementStatus"):
- self.scan.lblMeasurementStatus.setText(f"Measurement status: {message}")
- if done or (isinstance(progress, (int, float)) and progress >= 100):
- self.scan_timer.stop()
- self.log("[SCAN] finished.")
- if hasattr(self.scan, "lblMeasurementStatus"):
- self.scan.lblMeasurementStatus.setText("Measurement status: data_ready")
- self._run_api_result(
- call,
- on_success=on_success,
- error_msg="[SCAN] progress error: {error}",
- thread_error_msg="[SCAN] progress thread error: {error}",
- )
|