from __future__ import annotations from dataclasses import dataclass from typing import Any, Dict, Optional import requests from config import ANALYSIS_BASE_URL, HTTP_TIMEOUT_SEC, get_endpoint, join_url @dataclass(frozen=True) class AnalysisExport: """Опциональный тип для результатов export/plot API (зависит от бэкенда).""" payload: Dict[str, Any] class AnalysisService: """Client for analysis microservice. Переносит функциональность прежнего analysis_api.py. Все PATH берём из config.ENDPOINTS["analysis"]. """ def __init__(self, base_url: str | None = None, timeout: float | None = None) -> None: self.base = (base_url or ANALYSIS_BASE_URL).rstrip("/") self.timeout = float(timeout if timeout is not None else HTTP_TIMEOUT_SEC) self.session = requests.Session() def _url(self, endpoint_key: str) -> str: path = get_endpoint("analysis", endpoint_key) return join_url(self.base, path) def _post_with_session_fallback( self, endpoint_key: str, session_id: str, extra_form: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: """POST endpoint with session_id. Some backend versions expect session_id as form field, others as query param. Try form first; if validation fails (422), retry with query params. """ url = self._url(endpoint_key) payload: Dict[str, Any] = {"session_id": session_id} if extra_form: payload.update(extra_form) r = self.session.post(url, data=payload, timeout=self.timeout) if r.status_code == 422: r = self.session.post(url, params=payload, timeout=self.timeout) r.raise_for_status() return r.json() if r.content else {} def upload(self, file_path: str) -> str: url = self._url("upload") with open(file_path, "rb") as f: r = self.session.post(url, files={"file": f}, timeout=self.timeout) r.raise_for_status() j = r.json() if r.content else {} if "session_id" not in j: raise ValueError(f"analysis.upload(): missing 'session_id' in response: {j}") return str(j["session_id"]) def filter( self, session_id: str, dt: float, center: float, lo: float, hi: float, low: float, ) -> None: url = self._url("filter") data = { "session_id": session_id, "dt": dt, "center_freq": center, "lower_freq": lo, "higher_freq": hi, "low_freq": low, } r = self.session.post(url, data=data, timeout=self.timeout) r.raise_for_status() def fft( self, session_id: str, c1: Optional[int] = None, c2: Optional[int] = None, c3: Optional[int] = None, c4: Optional[int] = None, ) -> None: url = self._url("fft") data = {"session_id": session_id} if c1 is not None and c2 is not None and c3 is not None and c4 is not None: data.update( { "coef_dec_1": c1, "coef_dec_2": c2, "coef_dec_3": c3, "coef_dec_4": c4, } ) r = self.session.post(url, data=data, timeout=self.timeout) r.raise_for_status() def result(self, session_id: str) -> Dict[str, Any]: url = self._url("result") r = self.session.get(url, params={"session_id": session_id}, timeout=self.timeout) r.raise_for_status() return r.json() if r.content else {} def export_plots(self, session_id: str) -> Dict[str, Any]: url = self._url("export") r = self.session.post(url, data={"session_id": session_id}, timeout=self.timeout) r.raise_for_status() return r.json() if r.content else {} def plot_raw(self, session_id: str) -> Dict[str, Any]: url = self._url("plot_raw") r = self.session.post(url, data={"session_id": session_id}, timeout=self.timeout) r.raise_for_status() return r.json() if r.content else {} def export_raw_data(self, session_id: str) -> Dict[str, Any]: return self._post_with_session_fallback("export_raw_data", session_id) def export_filter_data( self, session_id: str, center_freq: float, lower_freq: float, higher_freq: float, low_freq: float, ) -> Dict[str, Any]: return self._post_with_session_fallback( "export_filter_data", session_id, extra_form={ "center_freq": center_freq, "lower_freq": lower_freq, "higher_freq": higher_freq, "low_freq": low_freq, }, ) def export_decdem_data(self, session_id: str) -> Dict[str, Any]: return self._post_with_session_fallback("export_decdem_data", session_id) def export_position_freq(self, session_id: str) -> Dict[str, Any]: return self._post_with_session_fallback("export_position_freq", session_id) def export_fwhm(self, session_id: str) -> Dict[str, Any]: return self._post_with_session_fallback("export_fwhm", session_id) def export_max_amplitude_freq(self, session_id: str) -> Dict[str, Any]: return self._post_with_session_fallback("export_max_amplitude_freq", session_id) def download_bytes(self, rel_or_abs_url: str) -> bytes: s = (rel_or_abs_url or "").strip() if not s: raise ValueError("download_bytes(): empty url") # Если бэкенд вернул абсолютный URL — используем как есть. if s.startswith("http://") or s.startswith("https://"): url = s else: # иначе считаем относительным к base if not s.startswith("/"): s = "/" + s url = self.base + s r = self.session.get(url, timeout=self.timeout) r.raise_for_status() return r.content