| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176 |
- from __future__ import annotations
- from dataclasses import dataclass
- from typing import Any, Dict, Optional
- import requests
- from config import ANALYSIS_BASE_URL, HTTP_TIMEOUT_SEC, get_endpoint, join_url
- @dataclass(frozen=True)
- class AnalysisExport:
- """Опциональный тип для результатов export/plot API (зависит от бэкенда)."""
- payload: Dict[str, Any]
- class AnalysisService:
- """Client for analysis microservice.
- Переносит функциональность прежнего analysis_api.py.
- Все PATH берём из config.ENDPOINTS["analysis"].
- """
- def __init__(self, base_url: str | None = None, timeout: float | None = None) -> None:
- self.base = (base_url or ANALYSIS_BASE_URL).rstrip("/")
- self.timeout = float(timeout if timeout is not None else HTTP_TIMEOUT_SEC)
- self.session = requests.Session()
- def _url(self, endpoint_key: str) -> str:
- path = get_endpoint("analysis", endpoint_key)
- return join_url(self.base, path)
- def _post_with_session_fallback(
- self,
- endpoint_key: str,
- session_id: str,
- extra_form: Optional[Dict[str, Any]] = None,
- ) -> Dict[str, Any]:
- """POST endpoint with session_id.
- Some backend versions expect session_id as form field, others as query param.
- Try form first; if validation fails (422), retry with query params.
- """
- url = self._url(endpoint_key)
- payload: Dict[str, Any] = {"session_id": session_id}
- if extra_form:
- payload.update(extra_form)
- r = self.session.post(url, data=payload, timeout=self.timeout)
- if r.status_code == 422:
- r = self.session.post(url, params=payload, timeout=self.timeout)
- r.raise_for_status()
- return r.json() if r.content else {}
- def upload(self, file_path: str) -> str:
- url = self._url("upload")
- with open(file_path, "rb") as f:
- r = self.session.post(url, files={"file": f}, timeout=self.timeout)
- r.raise_for_status()
- j = r.json() if r.content else {}
- if "session_id" not in j:
- raise ValueError(f"analysis.upload(): missing 'session_id' in response: {j}")
- return str(j["session_id"])
- def filter(
- self,
- session_id: str,
- dt: float,
- center: float,
- lo: float,
- hi: float,
- low: float,
- ) -> None:
- url = self._url("filter")
- data = {
- "session_id": session_id,
- "dt": dt,
- "center_freq": center,
- "lower_freq": lo,
- "higher_freq": hi,
- "low_freq": low,
- }
- r = self.session.post(url, data=data, timeout=self.timeout)
- r.raise_for_status()
- def fft(
- self,
- session_id: str,
- c1: Optional[int] = None,
- c2: Optional[int] = None,
- c3: Optional[int] = None,
- c4: Optional[int] = None,
- ) -> None:
- url = self._url("fft")
- data = {"session_id": session_id}
- if c1 is not None and c2 is not None and c3 is not None and c4 is not None:
- data.update(
- {
- "coef_dec_1": c1,
- "coef_dec_2": c2,
- "coef_dec_3": c3,
- "coef_dec_4": c4,
- }
- )
- r = self.session.post(url, data=data, timeout=self.timeout)
- r.raise_for_status()
- def result(self, session_id: str) -> Dict[str, Any]:
- url = self._url("result")
- r = self.session.get(url, params={"session_id": session_id}, timeout=self.timeout)
- r.raise_for_status()
- return r.json() if r.content else {}
- def export_plots(self, session_id: str) -> Dict[str, Any]:
- url = self._url("export")
- r = self.session.post(url, data={"session_id": session_id}, timeout=self.timeout)
- r.raise_for_status()
- return r.json() if r.content else {}
- def plot_raw(self, session_id: str) -> Dict[str, Any]:
- url = self._url("plot_raw")
- r = self.session.post(url, data={"session_id": session_id}, timeout=self.timeout)
- r.raise_for_status()
- return r.json() if r.content else {}
- def export_raw_data(self, session_id: str) -> Dict[str, Any]:
- return self._post_with_session_fallback("export_raw_data", session_id)
- def export_filter_data(
- self,
- session_id: str,
- center_freq: float,
- lower_freq: float,
- higher_freq: float,
- low_freq: float,
- ) -> Dict[str, Any]:
- return self._post_with_session_fallback(
- "export_filter_data",
- session_id,
- extra_form={
- "center_freq": center_freq,
- "lower_freq": lower_freq,
- "higher_freq": higher_freq,
- "low_freq": low_freq,
- },
- )
- def export_decdem_data(self, session_id: str) -> Dict[str, Any]:
- return self._post_with_session_fallback("export_decdem_data", session_id)
- def export_position_freq(self, session_id: str) -> Dict[str, Any]:
- return self._post_with_session_fallback("export_position_freq", session_id)
- def export_fwhm(self, session_id: str) -> Dict[str, Any]:
- return self._post_with_session_fallback("export_fwhm", session_id)
- def export_max_amplitude_freq(self, session_id: str) -> Dict[str, Any]:
- return self._post_with_session_fallback("export_max_amplitude_freq", session_id)
- def download_bytes(self, rel_or_abs_url: str) -> bytes:
- s = (rel_or_abs_url or "").strip()
- if not s:
- raise ValueError("download_bytes(): empty url")
- # Если бэкенд вернул абсолютный URL — используем как есть.
- if s.startswith("http://") or s.startswith("https://"):
- url = s
- else:
- # иначе считаем относительным к base
- if not s.startswith("/"):
- s = "/" + s
- url = self.base + s
- r = self.session.get(url, timeout=self.timeout)
- r.raise_for_status()
- return r.content
|