from __future__ import annotations import json import math import os from typing import Any, Callable, Dict, Optional, Tuple import requests from PyQt6.QtCore import QRectF, Qt from PyQt6.QtGui import QPainter, QPixmap from PyQt6.QtWidgets import ( QFileDialog, QGridLayout, QGroupBox, QHBoxLayout, QLabel, QLineEdit, QMessageBox, QPushButton, QPlainTextEdit, QVBoxLayout, QWidget, ) from config import HTTP_TIMEOUT_SEC from mainui.clients import LoggedAnalysisApi from mainui.widgets import InteractiveImageView from mainui.worker import run_in_thread class AnalysisTabMixin: def _analysis_base_url(self) -> str: return (self.analysis.base_url or "").rstrip("/") def _analysis_api(self) -> LoggedAnalysisApi: base = self._analysis_base_url() self.analysis.base_url = base return LoggedAnalysisApi(base_url=base, timeout=HTTP_TIMEOUT_SEC) def _init_analysis_panel(self) -> None: root_layout = self.spec.layout() if root_layout is None: root_layout = QVBoxLayout(self.spec) self.spec.setLayout(root_layout) gb = QGroupBox("Analysis service (REST)") gl = QGridLayout(gb) self.leAnalysisFile = QLineEdit("") self.leAnalysisFile.setPlaceholderText("Select CSV to upload...") btnBrowse = QPushButton("Browse...") btnBrowse.clicked.connect(self.analysis_pick_file) gl.addWidget(QLabel("File:"), 0, 0) gl.addWidget(self.leAnalysisFile, 0, 1, 1, 4) gl.addWidget(btnBrowse, 0, 5) self.leSessionId = QLineEdit("") self.leSessionId.setReadOnly(True) self.leSessionId.setPlaceholderText("auto (after upload)") btnUpload = QPushButton("Upload") btnUpload.clicked.connect(self.analysis_upload) gl.addWidget(QLabel("Session:"), 1, 0) gl.addWidget(self.leSessionId, 1, 1, 1, 4) gl.addWidget(btnUpload, 1, 5) # filter params # Defaults aligned with test_spectrum/test.py self.leDt = QLineEdit("1.125e-7") self.leCenter = QLineEdit("2.97e6") self.leLo = QLineEdit("2.92e6") self.leHi = QLineEdit("3.02e6") self.leLow = QLineEdit("600e3") gl.addWidget(QLabel("dt (s)"), 2, 0) gl.addWidget(self.leDt, 2, 1) gl.addWidget(QLabel("center (Hz)"), 2, 2) gl.addWidget(self.leCenter, 2, 3) gl.addWidget(QLabel("lo/hi/low (Hz)"), 2, 4) w_f = QWidget() hl = QHBoxLayout() hl.setContentsMargins(0, 0, 0, 0) for le in (self.leLo, self.leHi, self.leLow): le.setFixedWidth(90) hl.addWidget(le) w_f.setLayout(hl) gl.addWidget(w_f, 2, 5) btnFilter = QPushButton("Filter") btnFilter.clicked.connect(self.analysis_filter) gl.addWidget(btnFilter, 3, 5) # fft params self.leC1 = QLineEdit("") self.leC2 = QLineEdit("") self.leC3 = QLineEdit("") self.leC4 = QLineEdit("") self.leC1.setPlaceholderText("10") self.leC2.setPlaceholderText("2") self.leC3.setPlaceholderText("1") self.leC4.setPlaceholderText("1") gl.addWidget(QLabel("FFT dec coefs c1..c4"), 3, 0) w_fft = QWidget() hl2 = QHBoxLayout() hl2.setContentsMargins(0, 0, 0, 0) for le in (self.leC1, self.leC2, self.leC3, self.leC4): le.setFixedWidth(55) hl2.addWidget(le) w_fft.setLayout(hl2) gl.addWidget(w_fft, 3, 1, 1, 4) btnFFT = QPushButton("FFT") btnFFT.clicked.connect(self.analysis_fft) gl.addWidget(btnFFT, 4, 5) btnResult = QPushButton("Get result") btnPlotRaw = QPushButton("Plot raw") btnExport = QPushButton("Export plots") btnResult.clicked.connect(self.analysis_result) btnPlotRaw.clicked.connect(self.analysis_plot_raw) btnExport.clicked.connect(self.analysis_export) gl.addWidget(btnResult, 4, 0) gl.addWidget(btnPlotRaw, 4, 1) gl.addWidget(btnExport, 4, 2) self.pteAnalysisLog = QPlainTextEdit() self.pteAnalysisLog.setReadOnly(True) self.pteAnalysisLog.setMaximumBlockCount(2000) gl.addWidget(self.pteAnalysisLog, 5, 0, 1, 6) root_layout.insertWidget(0, gb) self.lblAnalysisPing = QLabel("* Analysis service: unknown") self.lblAnalysisPing.setStyleSheet("color: #808080;") root_layout.addWidget(self.lblAnalysisPing) self.lblRawImg = self._ensure_image_view(self.spec.timePlotContainer, "RAW") self.lblSpecImg = self._ensure_image_view(self.spec.spectrumPlotContainer, "SPEC") self.lblExtraImg = self._ensure_image_view(self.spec.demodPlotContainer, "EXTRA") if hasattr(self.spec, "gbTimePlot"): self.spec.gbTimePlot.setMinimumHeight(460) if hasattr(self.spec, "gbSpectrumPlot"): self.spec.gbSpectrumPlot.setMinimumHeight(460) if hasattr(self.spec, "gbDemodPlot"): self.spec.gbDemodPlot.setMinimumHeight(380) if hasattr(self.spec, "gbTimeDomain"): self.spec.gbTimeDomain.setVisible(False) if hasattr(self.spec, "gbDemod"): self.spec.gbDemod.setVisible(False) if hasattr(self.spec, "btnPlotOrigSpec"): self.spec.btnPlotOrigSpec.setVisible(False) if hasattr(self.spec, "btnPlotDemodSpec"): self.spec.btnPlotDemodSpec.setVisible(False) if hasattr(self.spec, "gbDemodPlot"): self.spec.gbDemodPlot.setVisible(False) if hasattr(self.spec, "btnDemodulate"): self.spec.btnDemodulate.clicked.connect(self._show_demod_plot) if hasattr(self.spec, "vlFreq"): row1 = QHBoxLayout() row2 = QHBoxLayout() self.btnExportRawData = QPushButton("Raw data") self.btnExportFilterData = QPushButton("Filter data") self.btnExportDecDemData = QPushButton("DecDem data") self.btnPeakFreq = QPushButton("Peak freq") self.btnFwhm = QPushButton("FWHM") self.btnMaxAmp = QPushButton("Max amp") self.btnExportDecDemData.setEnabled(False) self.btnPeakFreq.setEnabled(False) self.btnFwhm.setEnabled(False) self.btnMaxAmp.setEnabled(False) self.btnExportDecDemData.setToolTip("Disabled: endpoint is broken in current spectrum.py backend") self.btnPeakFreq.setToolTip("Disabled: depends on broken /export-decdem-data/") self.btnFwhm.setToolTip("Disabled: depends on broken /export-decdem-data/") self.btnMaxAmp.setToolTip("Disabled: depends on broken /export-decdem-data/") row1.addWidget(self.btnExportRawData) row1.addWidget(self.btnExportFilterData) row1.addWidget(self.btnExportDecDemData) row2.addWidget(self.btnPeakFreq) row2.addWidget(self.btnFwhm) row2.addWidget(self.btnMaxAmp) self.spec.vlFreq.addLayout(row1) self.spec.vlFreq.addLayout(row2) self.btnExportRawData.clicked.connect(self.analysis_export_raw_data) self.btnExportFilterData.clicked.connect(self.analysis_export_filter_data) self.btnExportDecDemData.clicked.connect(self.analysis_export_decdem_data) self.btnPeakFreq.clicked.connect(self.analysis_export_position_freq) self.btnFwhm.clicked.connect(self.analysis_export_fwhm) self.btnMaxAmp.clicked.connect(self.analysis_export_max_amplitude_freq) self.lblSpectrumMetrics = QLabel("Spectrum metrics: -") self.lblSpectrumMetrics.setStyleSheet("color: #505050;") root_layout.addWidget(self.lblSpectrumMetrics) self.spec_log(f"[ANALYSIS] ready. base={self.analysis.base_url}") self.analysis_ping_timer.start() self._ping_analysis_service() def _ensure_image_view(self, container: QWidget, title: str) -> InteractiveImageView: lay = container.layout() if lay is None: lay = QVBoxLayout(container) container.setLayout(lay) view = InteractiveImageView(title=title, parent=container) lay.addWidget(view) return view def _show_demod_plot(self) -> None: if hasattr(self.spec, "gbDemodPlot"): self.spec.gbDemodPlot.setVisible(True) def _to_float_list(self, value: Any) -> list[float]: if isinstance(value, (list, tuple)): out: list[float] = [] for v in value: try: fv = float(v) if math.isfinite(fv): out.append(fv) except Exception: continue return out try: fv = float(value) return [fv] if math.isfinite(fv) else [] except Exception: return [] def _render_line_plot( self, view: InteractiveImageView, title: str, y_data: list[float], x_data: Optional[list[float]] = None, ) -> None: if not y_data: view.set_placeholder(f"{title}: no numeric data") return n = len(y_data) if x_data is None or len(x_data) != n: x_data = [float(i) for i in range(n)] max_points = 4000 if n > max_points: step = int(math.ceil(n / max_points)) y_data = y_data[::step] x_data = x_data[::step] xmin, xmax = min(x_data), max(x_data) ymin, ymax = min(y_data), max(y_data) if xmax <= xmin: xmax = xmin + 1.0 if ymax <= ymin: ymax = ymin + 1.0 w, h, m = 1200, 420, 32 pix = QPixmap(w, h) pix.fill(Qt.GlobalColor.white) p = QPainter(pix) p.setRenderHint(QPainter.RenderHint.Antialiasing, True) p.setPen(Qt.GlobalColor.black) p.drawRect(m, m, w - 2 * m, h - 2 * m) plot_w = float(w - 2 * m) plot_h = float(h - 2 * m) last = None p.setPen(Qt.GlobalColor.blue) for x, y in zip(x_data, y_data): px = m + (x - xmin) * plot_w / (xmax - xmin) py = m + (ymax - y) * plot_h / (ymax - ymin) if last is not None: p.drawLine(int(last[0]), int(last[1]), int(px), int(py)) last = (px, py) p.setPen(Qt.GlobalColor.darkGray) p.drawText(QRectF(m, 4, w - 2 * m, 22), title) p.end() view.set_pixmap(pix) def _set_spectrum_metrics(self, text: str) -> None: if hasattr(self, "lblSpectrumMetrics"): self.lblSpectrumMetrics.setText(f"Spectrum metrics: {text}") def _set_analysis_ping(self, ok: Optional[bool], msg: str = "") -> None: if not hasattr(self, "lblAnalysisPing"): return base = self._analysis_base_url() if ok is None: self.lblAnalysisPing.setText(f"* Analysis service: checking ({base})") self.lblAnalysisPing.setStyleSheet("color: #808080;") elif ok: self.lblAnalysisPing.setText(f"* Analysis service: online ({base})") self.lblAnalysisPing.setStyleSheet("color: #2e7d32;") else: tail = f" - {msg}" if msg else "" self.lblAnalysisPing.setText(f"* Analysis service: offline ({base}){tail}") self.lblAnalysisPing.setStyleSheet("color: #c62828;") def _ping_analysis_service(self) -> None: self._set_analysis_ping(None) base = self._analysis_base_url() def job() -> Tuple[bool, str]: try: r = requests.get(base, timeout=2.0) return True, f"HTTP {r.status_code}" except Exception as e: return False, str(e) def ok(result: Tuple[bool, str]) -> None: state, msg = result self._set_analysis_ping(state, msg if not state else "") def err(msg: str) -> None: self._set_analysis_ping(False, msg) run_in_thread(self, job, ok, err) def analysis_pick_file(self) -> None: path, _ = QFileDialog.getOpenFileName( self, "Select file for analysis", "", "Data (*.csv *.json);;CSV (*.csv);;JSON (*.json);;All files (*)", ) if not path: return self.leAnalysisFile.setText(path) self.analysis.last_uploaded_path = path self.spec_log(f"[ANALYSIS] file selected: {path}") def _get_analysis_upload_path(self) -> Optional[str]: path = self.leAnalysisFile.text().strip() or self.analysis.last_uploaded_path if not path or not os.path.exists(path): QMessageBox.warning(self, "Analysis", "Select a valid file to upload.") return None if os.path.getsize(path) == 0: QMessageBox.warning(self, "Analysis", "Selected file is empty.") return None if path.lower().endswith(".json"): try: with open(path, "r", encoding="utf-8") as f: json.load(f) except Exception as e: QMessageBox.warning(self, "Analysis", f"Invalid JSON file: {e}") return None return path def _ensure_session_id(self, on_ready: Callable[[str], None]) -> None: sid = (self.analysis.session_id or self.leSessionId.text().strip()) if sid: on_ready(str(sid)) return path = self._get_analysis_upload_path() if not path: return def job() -> str: api = self._analysis_api() return api.upload(path) def ok(session_id: str) -> None: self.analysis.session_id = session_id self.leSessionId.setText(session_id) self.spec_log(f"[ANALYSIS] uploaded. session_id={session_id}") on_ready(session_id) def err(msg: str) -> None: self.spec_log(f"[ANALYSIS] upload error: {msg}") self.spec_log("[ANALYSIS] session_id missing, uploading automatically...") run_in_thread(self, job, ok, err) def analysis_upload(self) -> None: path = self._get_analysis_upload_path() if not path: return def job() -> str: api = self._analysis_api() return api.upload(path) def ok(session_id: str) -> None: self.analysis.session_id = session_id self.leSessionId.setText(session_id) self.spec_log(f"[ANALYSIS] uploaded. session_id={session_id}") def err(msg: str) -> None: self.spec_log(f"[ANALYSIS] upload error: {msg}") self.spec_log("[ANALYSIS] uploading...") run_in_thread(self, job, ok, err) def analysis_filter(self) -> None: try: dt = float(self.leDt.text().strip()) center = float(self.leCenter.text().strip()) lo = float(self.leLo.text().strip()) hi = float(self.leHi.text().strip()) low = float(self.leLow.text().strip()) except Exception: QMessageBox.warning(self, "Analysis", "Invalid filter parameters.") return def proceed(sid: str) -> None: def job() -> str: api = self._analysis_api() api.filter(session_id=sid, dt=dt, center=center, lo=lo, hi=hi, low=low) return "ok" def ok(_res: str) -> None: self.spec_log("[ANALYSIS] filter done.") def err(msg: str) -> None: self.spec_log(f"[ANALYSIS] filter error: {msg}") self.spec_log("[ANALYSIS] filtering...") run_in_thread(self, job, ok, err) self._ensure_session_id(proceed) def analysis_fft(self) -> None: c1s = self.leC1.text().strip() c2s = self.leC2.text().strip() c3s = self.leC3.text().strip() c4s = self.leC4.text().strip() c1: Optional[int] = None c2: Optional[int] = None c3: Optional[int] = None c4: Optional[int] = None if any((c1s, c2s, c3s, c4s)): if not all((c1s, c2s, c3s, c4s)): QMessageBox.warning(self, "Analysis", "Fill all FFT coefficients or leave all empty.") return try: c1 = int(c1s) c2 = int(c2s) c3 = int(c3s) c4 = int(c4s) except Exception: QMessageBox.warning(self, "Analysis", "Invalid FFT coefficients.") return def proceed(sid: str) -> None: def job() -> str: api = self._analysis_api() api.fft(session_id=sid, c1=c1, c2=c2, c3=c3, c4=c4) return "ok" def ok(_res: str) -> None: self.spec_log("[ANALYSIS] fft done.") def err(msg: str) -> None: self.spec_log(f"[ANALYSIS] fft error: {msg}") self.spec_log("[ANALYSIS] fft...") run_in_thread(self, job, ok, err) self._ensure_session_id(proceed) def analysis_result(self) -> None: def proceed(sid: str) -> None: def job() -> Dict[str, Any]: api = self._analysis_api() return api.result(session_id=sid) def ok(data: Dict[str, Any]) -> None: self.spec_log("[ANALYSIS] result:") self.spec_log(json.dumps(data, ensure_ascii=False, indent=2)[:5000]) def err(msg: str) -> None: self.spec_log(f"[ANALYSIS] result error: {msg}") run_in_thread(self, job, ok, err) self._ensure_session_id(proceed) def analysis_export(self) -> None: def proceed(sid: str) -> None: def job() -> Dict[str, Any]: api = self._analysis_api() return api.export_plots(session_id=sid) def ok(data: Dict[str, Any]) -> None: self.spec_log("[ANALYSIS] export response received.") self.spec_log(json.dumps(data, ensure_ascii=False, indent=2)[:5000]) self._try_render_images_from_response(data) def err(msg: str) -> None: self.spec_log(f"[ANALYSIS] export error: {msg}") run_in_thread(self, job, ok, err) self._ensure_session_id(proceed) def analysis_plot_raw(self) -> None: def proceed(sid: str) -> None: def job() -> Dict[str, Any]: api = self._analysis_api() return api.plot_raw(session_id=sid) def ok(data: Dict[str, Any]) -> None: self.spec_log("[ANALYSIS] plot-raw response received.") self.spec_log(json.dumps(data, ensure_ascii=False, indent=2)[:5000]) self._try_render_images_from_response(data) def err(msg: str) -> None: self.spec_log(f"[ANALYSIS] plot-raw error: {msg}") run_in_thread(self, job, ok, err) self._ensure_session_id(proceed) def _run_filter_fft_and_then(self, sid: str, on_ready: Callable[[], None]) -> None: try: dt = float(self.leDt.text().strip()) center = float(self.leCenter.text().strip()) lo = float(self.leLo.text().strip()) hi = float(self.leHi.text().strip()) low = float(self.leLow.text().strip()) except Exception: QMessageBox.warning(self, "Analysis", "Invalid filter parameters.") return c1s = self.leC1.text().strip() c2s = self.leC2.text().strip() c3s = self.leC3.text().strip() c4s = self.leC4.text().strip() c1 = c2 = c3 = c4 = None if any([c1s, c2s, c3s, c4s]): if not all([c1s, c2s, c3s, c4s]): QMessageBox.warning(self, "Analysis", "Fill all FFT coefficients or leave all empty.") return try: c1, c2, c3, c4 = int(c1s), int(c2s), int(c3s), int(c4s) except Exception: QMessageBox.warning(self, "Analysis", "Invalid FFT coefficients.") return def job() -> None: api = self._analysis_api() api.filter(session_id=sid, dt=dt, center=center, lo=lo, hi=hi, low=low) api.fft(session_id=sid, c1=c1, c2=c2, c3=c3, c4=c4) def ok(_: Any) -> None: self.spec_log("[ANALYSIS] filter+fft ready.") on_ready() def err(msg: str) -> None: self.spec_log(f"[ANALYSIS] filter+fft error: {msg}") self.spec_log("[ANALYSIS] running filter+fft before metric...") run_in_thread(self, job, ok, err) def analysis_export_raw_data(self) -> None: def proceed(sid: str) -> None: def job() -> Dict[str, Any]: api = self._analysis_api() return api.export_raw_data(session_id=sid) def ok(data: Dict[str, Any]) -> None: y = self._to_float_list(data.get("data")) self._render_line_plot(self.lblRawImg, "Raw signal", y) self.spec_log(f"[ANALYSIS] export-raw-data done. points={len(y)}") def err(msg: str) -> None: self.spec_log(f"[ANALYSIS] export-raw-data error: {msg}") run_in_thread(self, job, ok, err) self._ensure_session_id(proceed) def analysis_export_filter_data(self) -> None: def proceed(sid: str) -> None: try: center = float(self.leCenter.text().strip()) lo = float(self.leLo.text().strip()) hi = float(self.leHi.text().strip()) low = float(self.leLow.text().strip()) except Exception: QMessageBox.warning(self, "Analysis", "Invalid filter frequencies.") return def job() -> Dict[str, Any]: api = self._analysis_api() return api.export_filter_data( session_id=sid, center_freq=center, lower_freq=lo, higher_freq=hi, low_freq=low, ) def ok(data: Dict[str, Any]) -> None: y = self._to_float_list(data.get("signal_real")) x = self._to_float_list(data.get("time_data_signal")) x_use = x if len(x) == len(y) else None self._render_line_plot(self.lblRawImg, "Filtered signal (real)", y, x_use) self.spec_log(f"[ANALYSIS] export-filter-data done. points={len(y)}") def err(msg: str) -> None: self.spec_log(f"[ANALYSIS] export-filter-data error: {msg}") run_in_thread(self, job, ok, err) self._ensure_session_id(proceed) def analysis_export_decdem_data(self) -> None: def proceed(sid: str) -> None: def job() -> Dict[str, Any]: api = self._analysis_api() return api.export_decdem_data(session_id=sid) def ok(data: Dict[str, Any]) -> None: y = self._to_float_list(data.get("signal_real")) self._render_line_plot(self.lblExtraImg, "Dec/Dem signal (real)", y) self._show_demod_plot() self.spec_log(f"[ANALYSIS] export-decdem-data done. points={len(y)}") def err(msg: str) -> None: self.spec_log(f"[ANALYSIS] export-decdem-data error: {msg}") run_in_thread(self, job, ok, err) self._ensure_session_id(proceed) def analysis_export_position_freq(self) -> None: def proceed(sid: str) -> None: self._run_metric_export( sid=sid, api_call=lambda api: api.export_position_freq(session_id=sid), value_key="peak max amplitude in freq", metric_prefix="peak freq", log_name="export-position-freq", ) self._ensure_session_id(proceed) def analysis_export_fwhm(self) -> None: def proceed(sid: str) -> None: self._run_metric_export( sid=sid, api_call=lambda api: api.export_fwhm(session_id=sid), value_key="width at half height", metric_prefix="FWHM", log_name="export-FWHM", ) self._ensure_session_id(proceed) def analysis_export_max_amplitude_freq(self) -> None: def proceed(sid: str) -> None: self._run_metric_export( sid=sid, api_call=lambda api: api.export_max_amplitude_freq(session_id=sid), value_key="max amplitude", metric_prefix="max amp", log_name="export-max-amplitude-freq", ) self._ensure_session_id(proceed) def _run_metric_export( self, *, sid: str, api_call: Callable[[LoggedAnalysisApi], Dict[str, Any]], value_key: str, metric_prefix: str, log_name: str, ) -> None: def run_metric() -> None: def job() -> Dict[str, Any]: return api_call(self._analysis_api()) def ok(data: Dict[str, Any]) -> None: val = data.get(value_key, "n/a") self._set_spectrum_metrics(f"{metric_prefix}={val}") self.spec_log(f"[ANALYSIS] {log_name}: {val}") def err(msg: str) -> None: self.spec_log(f"[ANALYSIS] {log_name} error: {msg}") run_in_thread(self, job, ok, err) self._run_filter_fft_and_then(sid, run_metric) def _try_render_images_from_response(self, data: Dict[str, Any]) -> None: if not isinstance(data, dict): return urls = [] for k, v in data.items(): if isinstance(v, str) and v.startswith("/"): urls.append((k, v)) elif isinstance(v, dict): for k2, v2 in v.items(): if isinstance(v2, str) and v2.startswith("/"): urls.append((f"{k}.{k2}", v2)) if not urls: self.spec_log("[ANALYSIS] No image urls found in response.") return targets = [ (self.lblRawImg, "RAW"), (self.lblSpecImg, "SPEC"), (self.lblExtraImg, "EXTRA"), ] for (lbl, tag), (name, url) in zip(targets, urls[:3]): self.spec_log(f"[ANALYSIS] rendering {name}: {url}") self._download_and_show_image(url, lbl, tag) def _download_and_show_image(self, rel_url: str, target_lbl: InteractiveImageView, tag: str) -> None: def job() -> bytes: api = self._analysis_api() return api.download_bytes(rel_url) def ok(content: bytes) -> None: target_lbl.set_image_bytes(content) def err(msg: str) -> None: target_lbl.set_placeholder(f"{tag}: download error") self.spec_log(f"[ANALYSIS] download error: {msg}") run_in_thread(self, job, ok, err)