| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360 |
- """
- nmr_processor.py - чистый вычислительный модуль NMR-обработки сигнала.
- Пайплайн (из spec_combined.py, исправлено: fs берётся из JSON):
- raw int16 (base64)
- -> масштабирование (int16 * voltage_range / 32768)
- -> bandpass filter (Butterworth, sosfiltfilt zero-phase)
- -> IQ demodulation (x sin + j | cos при fc)
- -> LPF (Butterworth, sosfiltfilt)
- -> decimation
- -> zero-padding
- -> FFT + fftshift
- -> поиск пика (в отрицательных частотах), FWHM
- Нет зависимостей на FastAPI или GUI - только numpy/scipy.
- """
- from __future__ import annotations
- import base64
- import json
- from dataclasses import dataclass, field
- import numpy as np
- from scipy.signal import butter, sosfiltfilt, decimate
- _MAX_TRANSPORT_PTS = 8000 # максимум точек в JSON-ответе
- # -- Параметры ----------------------------------------------------------------
- @dataclass
- class NMRParams:
- """Параметры обработки NMR-сигнала."""
- center_freq: float = 2.95e6 # Гц - несущая (Larmor frequency)
- bandpass_bw: float = 0.1e6 # Гц - ширина полосового фильтра
- lp_cutoff: float = 0.6e6 # Гц - частота среза НЧФ
- dec_factor: int = 10 # коэффициент децимации
- zero_pad: int = 500_001 # количество нулей для zero-padding
- butter_order: int = 4 # порядок фильтра Баттерворта
- voltage_range: float = 5.0 # В - диапазон ADC (int16 -> V)
- averaging_num: int = 0 # averaging_num в JSON
- data_num: int = 0 # data_num в JSON
- channel_num: int = 1 # channel_num в JSON
- # -- Результат ----------------------------------------------------------------
- @dataclass
- class NMRResult:
- """Полный результат обработки одного измерения."""
- # Метаданные
- n_samples: int = 0
- sample_rate_hz: float = 0.0
- duration_ms: float = 0.0
- dec_factor_actual: int = 1
- # Сырой сигнал (до обработки)
- raw_t_ms: list = field(default_factory=list)
- raw_real: list = field(default_factory=list)
- raw_amplitude: list = field(default_factory=list)
- raw_freq_hz: list = field(default_factory=list)
- raw_magnitude: list = field(default_factory=list)
- raw_phase_rad: list = field(default_factory=list)
- # Демодулированный FID (после BPF -> demod -> LPF -> децимация)
- demod_t_ms: list = field(default_factory=list)
- demod_real: list = field(default_factory=list)
- demod_imag: list = field(default_factory=list)
- demod_amplitude: list = field(default_factory=list)
- # NMR-спектр (zero-pad + FFT + fftshift)
- freq_khz: list = field(default_factory=list)
- magnitude: list = field(default_factory=list)
- phase_rad: list = field(default_factory=list)
- # Пиковые метрики
- peak_freq_khz: float = 0.0
- peak_amplitude: float = 0.0
- fwhm_khz: float = 0.0
- freq_step_khz: float = 0.0
- # -- Загрузка hardware JSON ----------------------------------------------------
- def get_channel_value(value, channel_num: int, field_name: str):
- """Возвращает скалярное значение поля или значение для выбранного канала."""
- if isinstance(value, (list, tuple)):
- if channel_num < 0 or channel_num >= len(value):
- raise ValueError(
- f"Поле {field_name} не содержит значение для channel_num={channel_num}"
- )
- return value[channel_num]
- return value
- def decode_hardware_json(
- json_data: list | dict,
- params: NMRParams,
- ) -> tuple[np.ndarray, float]:
- """
- Извлекает сигнал и sample_rate из hardware JSON.
- Поддерживаемые форматы:
- * flat-list (fast-api-spectroscopy / spectrometer_service):
- [{averaging_num: 0, data_num: 0, measurement_rate: ...,
- channel_data: [{channel_num: 1, channel_data: "BASE64"}]}]
- * nested-dict (spec_combined.py):
- {averaging_num_0: [{data_num: 0, measurement_rate: ..., ...}]}
- sample_rate берётся из поля measurement_rate (ошибка в spec_combined.py
- была в том, что dt=100e-9 захардкожен; здесь используем реальный rate).
- Масштабирование: arr_int16 * voltage_range / 32768 -> float64 [V]
- """
- # Нормализуем к flat-list
- if isinstance(json_data, dict):
- key = f"averaging_num_{params.averaging_num}"
- items = json_data.get(key, [])
- else:
- items = json_data
- # Найти нужную запись
- measurement = None
- for item in items:
- avg = item.get("averaging_num", 0)
- dn = item.get("data_num", 0)
- if avg == params.averaging_num and dn == params.data_num:
- measurement = item
- break
- if measurement is None:
- raise ValueError(
- f"Измерение averaging_num={params.averaging_num}, "
- f"data_num={params.data_num} не найдено в JSON"
- )
- # Найти канал
- raw_b64 = None
- for ch in measurement.get("channel_data", []):
- if ch.get("channel_num") == params.channel_num:
- raw_b64 = ch.get("channel_data")
- break
- if raw_b64 is None:
- raise ValueError(f"Канал channel_num={params.channel_num} не найден")
- # Декодирование: base64 -> int16 -> float64
- points = get_channel_value(
- measurement.get("measurement_points"),
- params.channel_num,
- "measurement_points",
- )
- decoded = base64.b64decode(raw_b64.encode("utf-8"))
- arr = np.frombuffer(decoded, dtype=np.int16)
- if points is not None:
- arr = arr[:int(points)]
- signal = arr.astype(np.float64) * params.voltage_range / 32768.0
- # Частота дискретизации из JSON
- sample_rate = float(
- get_channel_value(
- measurement.get("measurement_rate", 8e6),
- params.channel_num,
- "measurement_rate",
- )
- )
- return signal, sample_rate
- # -- Вспомогательные функции ---------------------------------------------------
- def compute_fwhm(freq: np.ndarray, spectrum: np.ndarray, peak_idx: int) -> float:
- """Ширина пика на полувысоте (kHz или в единицах freq)."""
- half_max = spectrum[peak_idx] / 2.0
- left = peak_idx
- while left > 0 and spectrum[left] > half_max:
- left -= 1
- right = peak_idx
- while right < len(spectrum) - 1 and spectrum[right] > half_max:
- right += 1
- return float(freq[right] - freq[left])
- def _downsample(arr: np.ndarray, max_pts: int = _MAX_TRANSPORT_PTS) -> list:
- """Даунсэмплирует массив до max_pts точек и конвертирует в list[float]."""
- a = np.asarray(arr, dtype=float).flatten()
- if len(a) > max_pts:
- idx = np.linspace(0, len(a) - 1, max_pts, dtype=int)
- a = a[idx]
- return [float(x) for x in a]
- # -- Основной пайплайн ---------------------------------------------------------
- def process_nmr(
- signal: np.ndarray,
- sample_rate: float,
- params: NMRParams,
- ) -> NMRResult:
- """
- Полный NMR-пайплайн (аналог spec_combined.process_signal_2,
- с исправлениями: fs из аргумента, sosfiltfilt вместо sosfilt).
- signal - сигнал в [В], 1D float64
- sample_rate - частота дискретизации [Гц]
- params - параметры обработки
- Возвращает NMRResult со всеми массивами, даунсэмплированными до
- _MAX_TRANSPORT_PTS точек.
- """
- result = NMRResult()
- n_orig = len(signal)
- dt = 1.0 / sample_rate
- result.n_samples = n_orig
- result.sample_rate_hz = sample_rate
- result.duration_ms = round(n_orig * dt * 1e3, 4)
- # -- 1. Сырой сигнал --------------------------------------------------
- t_raw_ms = np.arange(n_orig) * dt * 1e3
- result.raw_t_ms = _downsample(t_raw_ms)
- result.raw_real = _downsample(signal)
- result.raw_amplitude = _downsample(np.abs(signal))
- raw_centered = signal - np.mean(signal)
- raw_n = len(raw_centered)
- if raw_n > 0:
- raw_fft = np.fft.rfft(raw_centered) * 2 / raw_n
- raw_freq_hz = np.fft.rfftfreq(raw_n, d=dt)
- result.raw_freq_hz = _downsample(raw_freq_hz)
- result.raw_magnitude = _downsample(np.abs(raw_fft))
- result.raw_phase_rad = _downsample(np.angle(raw_fft))
- # -- 2. Разбивка на точки_на_запись / n_записей -----------------------
- # Если сигнал содержит несколько повторений, разбиваем на матрицу
- # (аналогично spec_combined.py). Если одна запись - матрица (1, N).
- points_num = n_orig # всё как одна запись (стандартный случай)
- n_reps = int(n_orig / points_num)
- cutoff = n_reps * points_num
- ch = signal[:cutoff]
- ch = np.vstack(np.split(ch, n_reps)) # shape: (n_reps, points_num)
- # -- 3. Полосовой фильтр (BPF) ----------------------------------------
- fc = params.center_freq
- bw = params.bandpass_bw
- f_low = max(fc - bw / 2, 1.0)
- f_high = min(fc + bw / 2, sample_rate / 2 - 1.0)
- sos_bp = butter(params.butter_order, [f_low, f_high],
- btype="bandpass", fs=sample_rate, output="sos")
- ch_bp = sosfiltfilt(sos_bp, ch, axis=1)
- # -- 4. IQ-демодуляция ------------------------------------------------
- t_pts = np.arange(points_num) * dt
- demod_i = np.sin(2 * np.pi * fc * t_pts) # I - вещественная часть
- demod_q = np.cos(2 * np.pi * fc * t_pts) # Q - мнимая часть
- ch_real = ch_bp * demod_i
- ch_imag = ch_bp * demod_q
- # -- 5. НЧФ (LPF) -----------------------------------------------------
- lp_cut = min(params.lp_cutoff, sample_rate / 2 - 1.0)
- sos_lp = butter(params.butter_order, lp_cut,
- btype="lowpass", fs=sample_rate, output="sos")
- ch_real_f = sosfiltfilt(sos_lp, ch_real, axis=1)
- ch_imag_f = sosfiltfilt(sos_lp, ch_imag, axis=1)
- # -- 6. Децимация -----------------------------------------------------
- dec = int(params.dec_factor)
- if dec > 1:
- ch_real_d = decimate(ch_real_f, dec, axis=1)
- ch_imag_d = decimate(ch_imag_f, dec, axis=1)
- else:
- ch_real_d = ch_real_f
- ch_imag_d = ch_imag_f
- dec_actual = round(points_num / ch_real_d.shape[1])
- Td_new = dt * dec_actual
- result.dec_factor_actual = int(dec_actual)
- # -- 7. Комплексный FID (первая запись) -------------------------------
- complex_fid = ch_real_d[0] + 1j * ch_imag_d[0]
- n_fid = len(complex_fid)
- t_fid_ms = np.arange(n_fid) * Td_new * 1e3
- result.demod_t_ms = _downsample(t_fid_ms)
- result.demod_real = _downsample(complex_fid.real)
- result.demod_imag = _downsample(complex_fid.imag)
- result.demod_amplitude = _downsample(np.abs(complex_fid))
- # -- 8. Zero-padding --------------------------------------------------
- padded = np.pad(complex_fid, (0, max(0, params.zero_pad)), mode="constant")
- # -- 9. FFT + fftshift ------------------------------------------------
- spectrum = np.abs(np.fft.fftshift(np.fft.fft(padded)))
- n_sp = len(padded)
- BW = 1.0 / Td_new # полоса после децимации
- dff = BW / (n_sp - 1)
- freq_hz = np.arange(-BW / 2, BW / 2 + dff, dff)[:n_sp]
- freq_khz = freq_hz / 1e3
- phase = np.angle(np.fft.fftshift(np.fft.fft(padded)))
- result.freq_khz = _downsample(freq_khz)
- result.magnitude = _downsample(spectrum)
- result.phase_rad = _downsample(phase)
- result.freq_step_khz = float(dff / 1e3)
- # -- 10. Поиск пика (в отрицательных частотах) ------------------------
- neg_mask = freq_khz < 0
- if neg_mask.any():
- neg_spec = spectrum.copy()
- neg_spec[~neg_mask] = 0.0
- peak_idx = int(np.argmax(neg_spec))
- else:
- peak_idx = int(np.argmax(spectrum))
- result.peak_freq_khz = float(freq_khz[peak_idx])
- result.peak_amplitude = float(spectrum[peak_idx])
- result.fwhm_khz = compute_fwhm(freq_khz, spectrum, peak_idx)
- return result
- # -- Утилита для пакетной обработки -------------------------------------------
- def process_json_file(
- path: str,
- params: NMRParams,
- ) -> dict:
- """
- Загружает JSON-файл, декодирует и обрабатывает.
- Возвращает dict с полями NMRResult + 'filename', 'status', 'error'.
- """
- import os
- filename = os.path.basename(path)
- try:
- with open(path, encoding="utf-8") as fh:
- raw = json.load(fh)
- signal, fs = decode_hardware_json(raw, params)
- res = process_nmr(signal, fs, params)
- return {
- "filename": filename,
- "status": "ok",
- "error": None,
- "peak_freq_khz": res.peak_freq_khz,
- "fwhm_khz": res.fwhm_khz,
- "peak_amplitude": res.peak_amplitude,
- "freq_step_khz": res.freq_step_khz,
- }
- except Exception as exc:
- return {
- "filename": filename,
- "status": "error",
- "error": f"{type(exc).__name__}: {exc}",
- "peak_freq_khz": None,
- "fwhm_khz": None,
- "peak_amplitude": None,
- "freq_step_khz": None,
- }
|