""" nmr_processor.py - чистый вычислительный модуль NMR-обработки сигнала. Пайплайн (из spec_combined.py, исправлено: fs берётся из JSON): raw int16 (base64) -> масштабирование (int16 * voltage_range / 32768) -> bandpass filter (Butterworth, sosfiltfilt zero-phase) -> IQ demodulation (x sin + j | cos при fc) -> LPF (Butterworth, sosfiltfilt) -> decimation -> zero-padding -> FFT + fftshift -> поиск пика (в отрицательных частотах), FWHM Нет зависимостей на FastAPI или GUI - только numpy/scipy. """ from __future__ import annotations import base64 import json from dataclasses import dataclass, field import numpy as np from scipy.signal import butter, sosfiltfilt, decimate _MAX_TRANSPORT_PTS = 8000 # максимум точек в JSON-ответе # -- Параметры ---------------------------------------------------------------- @dataclass class NMRParams: """Параметры обработки NMR-сигнала.""" center_freq: float = 2.95e6 # Гц - несущая (Larmor frequency) bandpass_bw: float = 0.1e6 # Гц - ширина полосового фильтра lp_cutoff: float = 0.6e6 # Гц - частота среза НЧФ dec_factor: int = 10 # коэффициент децимации zero_pad: int = 500_001 # количество нулей для zero-padding butter_order: int = 4 # порядок фильтра Баттерворта voltage_range: float = 5.0 # В - диапазон ADC (int16 -> V) averaging_num: int = 0 # averaging_num в JSON data_num: int = 0 # data_num в JSON channel_num: int = 1 # channel_num в JSON # -- Результат ---------------------------------------------------------------- @dataclass class NMRResult: """Полный результат обработки одного измерения.""" # Метаданные n_samples: int = 0 sample_rate_hz: float = 0.0 duration_ms: float = 0.0 dec_factor_actual: int = 1 # Сырой сигнал (до обработки) raw_t_ms: list = field(default_factory=list) raw_real: list = field(default_factory=list) raw_amplitude: list = field(default_factory=list) raw_freq_hz: list = field(default_factory=list) raw_magnitude: list = field(default_factory=list) raw_phase_rad: list = field(default_factory=list) # Демодулированный FID (после BPF -> demod -> LPF -> децимация) demod_t_ms: list = field(default_factory=list) demod_real: list = field(default_factory=list) demod_imag: list = field(default_factory=list) demod_amplitude: list = field(default_factory=list) # NMR-спектр (zero-pad + FFT + fftshift) freq_khz: list = field(default_factory=list) magnitude: list = field(default_factory=list) phase_rad: list = field(default_factory=list) # Пиковые метрики peak_freq_khz: float = 0.0 peak_amplitude: float = 0.0 fwhm_khz: float = 0.0 freq_step_khz: float = 0.0 # -- Загрузка hardware JSON ---------------------------------------------------- def get_channel_value(value, channel_num: int, field_name: str): """Возвращает скалярное значение поля или значение для выбранного канала.""" if isinstance(value, (list, tuple)): if channel_num < 0 or channel_num >= len(value): raise ValueError( f"Поле {field_name} не содержит значение для channel_num={channel_num}" ) return value[channel_num] return value def decode_hardware_json( json_data: list | dict, params: NMRParams, ) -> tuple[np.ndarray, float]: """ Извлекает сигнал и sample_rate из hardware JSON. Поддерживаемые форматы: * flat-list (fast-api-spectroscopy / spectrometer_service): [{averaging_num: 0, data_num: 0, measurement_rate: ..., channel_data: [{channel_num: 1, channel_data: "BASE64"}]}] * nested-dict (spec_combined.py): {averaging_num_0: [{data_num: 0, measurement_rate: ..., ...}]} sample_rate берётся из поля measurement_rate (ошибка в spec_combined.py была в том, что dt=100e-9 захардкожен; здесь используем реальный rate). Масштабирование: arr_int16 * voltage_range / 32768 -> float64 [V] """ # Нормализуем к flat-list if isinstance(json_data, dict): key = f"averaging_num_{params.averaging_num}" items = json_data.get(key, []) else: items = json_data # Найти нужную запись measurement = None for item in items: avg = item.get("averaging_num", 0) dn = item.get("data_num", 0) if avg == params.averaging_num and dn == params.data_num: measurement = item break if measurement is None: raise ValueError( f"Измерение averaging_num={params.averaging_num}, " f"data_num={params.data_num} не найдено в JSON" ) # Найти канал raw_b64 = None for ch in measurement.get("channel_data", []): if ch.get("channel_num") == params.channel_num: raw_b64 = ch.get("channel_data") break if raw_b64 is None: raise ValueError(f"Канал channel_num={params.channel_num} не найден") # Декодирование: base64 -> int16 -> float64 points = get_channel_value( measurement.get("measurement_points"), params.channel_num, "measurement_points", ) decoded = base64.b64decode(raw_b64.encode("utf-8")) arr = np.frombuffer(decoded, dtype=np.int16) if points is not None: arr = arr[:int(points)] signal = arr.astype(np.float64) * params.voltage_range / 32768.0 # Частота дискретизации из JSON sample_rate = float( get_channel_value( measurement.get("measurement_rate", 8e6), params.channel_num, "measurement_rate", ) ) return signal, sample_rate # -- Вспомогательные функции --------------------------------------------------- def compute_fwhm(freq: np.ndarray, spectrum: np.ndarray, peak_idx: int) -> float: """Ширина пика на полувысоте (kHz или в единицах freq).""" half_max = spectrum[peak_idx] / 2.0 left = peak_idx while left > 0 and spectrum[left] > half_max: left -= 1 right = peak_idx while right < len(spectrum) - 1 and spectrum[right] > half_max: right += 1 return float(freq[right] - freq[left]) def _downsample(arr: np.ndarray, max_pts: int = _MAX_TRANSPORT_PTS) -> list: """Даунсэмплирует массив до max_pts точек и конвертирует в list[float].""" a = np.asarray(arr, dtype=float).flatten() if len(a) > max_pts: idx = np.linspace(0, len(a) - 1, max_pts, dtype=int) a = a[idx] return [float(x) for x in a] # -- Основной пайплайн --------------------------------------------------------- def process_nmr( signal: np.ndarray, sample_rate: float, params: NMRParams, ) -> NMRResult: """ Полный NMR-пайплайн (аналог spec_combined.process_signal_2, с исправлениями: fs из аргумента, sosfiltfilt вместо sosfilt). signal - сигнал в [В], 1D float64 sample_rate - частота дискретизации [Гц] params - параметры обработки Возвращает NMRResult со всеми массивами, даунсэмплированными до _MAX_TRANSPORT_PTS точек. """ result = NMRResult() n_orig = len(signal) dt = 1.0 / sample_rate result.n_samples = n_orig result.sample_rate_hz = sample_rate result.duration_ms = round(n_orig * dt * 1e3, 4) # -- 1. Сырой сигнал -------------------------------------------------- t_raw_ms = np.arange(n_orig) * dt * 1e3 result.raw_t_ms = _downsample(t_raw_ms) result.raw_real = _downsample(signal) result.raw_amplitude = _downsample(np.abs(signal)) raw_centered = signal - np.mean(signal) raw_n = len(raw_centered) if raw_n > 0: raw_fft = np.fft.rfft(raw_centered) * 2 / raw_n raw_freq_hz = np.fft.rfftfreq(raw_n, d=dt) result.raw_freq_hz = _downsample(raw_freq_hz) result.raw_magnitude = _downsample(np.abs(raw_fft)) result.raw_phase_rad = _downsample(np.angle(raw_fft)) # -- 2. Разбивка на точки_на_запись / n_записей ----------------------- # Если сигнал содержит несколько повторений, разбиваем на матрицу # (аналогично spec_combined.py). Если одна запись - матрица (1, N). points_num = n_orig # всё как одна запись (стандартный случай) n_reps = int(n_orig / points_num) cutoff = n_reps * points_num ch = signal[:cutoff] ch = np.vstack(np.split(ch, n_reps)) # shape: (n_reps, points_num) # -- 3. Полосовой фильтр (BPF) ---------------------------------------- fc = params.center_freq bw = params.bandpass_bw f_low = max(fc - bw / 2, 1.0) f_high = min(fc + bw / 2, sample_rate / 2 - 1.0) sos_bp = butter(params.butter_order, [f_low, f_high], btype="bandpass", fs=sample_rate, output="sos") ch_bp = sosfiltfilt(sos_bp, ch, axis=1) # -- 4. IQ-демодуляция ------------------------------------------------ t_pts = np.arange(points_num) * dt demod_i = np.sin(2 * np.pi * fc * t_pts) # I - вещественная часть demod_q = np.cos(2 * np.pi * fc * t_pts) # Q - мнимая часть ch_real = ch_bp * demod_i ch_imag = ch_bp * demod_q # -- 5. НЧФ (LPF) ----------------------------------------------------- lp_cut = min(params.lp_cutoff, sample_rate / 2 - 1.0) sos_lp = butter(params.butter_order, lp_cut, btype="lowpass", fs=sample_rate, output="sos") ch_real_f = sosfiltfilt(sos_lp, ch_real, axis=1) ch_imag_f = sosfiltfilt(sos_lp, ch_imag, axis=1) # -- 6. Децимация ----------------------------------------------------- dec = int(params.dec_factor) if dec > 1: ch_real_d = decimate(ch_real_f, dec, axis=1) ch_imag_d = decimate(ch_imag_f, dec, axis=1) else: ch_real_d = ch_real_f ch_imag_d = ch_imag_f dec_actual = round(points_num / ch_real_d.shape[1]) Td_new = dt * dec_actual result.dec_factor_actual = int(dec_actual) # -- 7. Комплексный FID (первая запись) ------------------------------- complex_fid = ch_real_d[0] + 1j * ch_imag_d[0] n_fid = len(complex_fid) t_fid_ms = np.arange(n_fid) * Td_new * 1e3 result.demod_t_ms = _downsample(t_fid_ms) result.demod_real = _downsample(complex_fid.real) result.demod_imag = _downsample(complex_fid.imag) result.demod_amplitude = _downsample(np.abs(complex_fid)) # -- 8. Zero-padding -------------------------------------------------- padded = np.pad(complex_fid, (0, max(0, params.zero_pad)), mode="constant") # -- 9. FFT + fftshift ------------------------------------------------ spectrum = np.abs(np.fft.fftshift(np.fft.fft(padded))) n_sp = len(padded) BW = 1.0 / Td_new # полоса после децимации dff = BW / (n_sp - 1) freq_hz = np.arange(-BW / 2, BW / 2 + dff, dff)[:n_sp] freq_khz = freq_hz / 1e3 phase = np.angle(np.fft.fftshift(np.fft.fft(padded))) result.freq_khz = _downsample(freq_khz) result.magnitude = _downsample(spectrum) result.phase_rad = _downsample(phase) result.freq_step_khz = float(dff / 1e3) # -- 10. Поиск пика (в отрицательных частотах) ------------------------ neg_mask = freq_khz < 0 if neg_mask.any(): neg_spec = spectrum.copy() neg_spec[~neg_mask] = 0.0 peak_idx = int(np.argmax(neg_spec)) else: peak_idx = int(np.argmax(spectrum)) result.peak_freq_khz = float(freq_khz[peak_idx]) result.peak_amplitude = float(spectrum[peak_idx]) result.fwhm_khz = compute_fwhm(freq_khz, spectrum, peak_idx) return result # -- Утилита для пакетной обработки ------------------------------------------- def process_json_file( path: str, params: NMRParams, ) -> dict: """ Загружает JSON-файл, декодирует и обрабатывает. Возвращает dict с полями NMRResult + 'filename', 'status', 'error'. """ import os filename = os.path.basename(path) try: with open(path, encoding="utf-8") as fh: raw = json.load(fh) signal, fs = decode_hardware_json(raw, params) res = process_nmr(signal, fs, params) return { "filename": filename, "status": "ok", "error": None, "peak_freq_khz": res.peak_freq_khz, "fwhm_khz": res.fwhm_khz, "peak_amplitude": res.peak_amplitude, "freq_step_khz": res.freq_step_khz, } except Exception as exc: return { "filename": filename, "status": "error", "error": f"{type(exc).__name__}: {exc}", "peak_freq_khz": None, "fwhm_khz": None, "peak_amplitude": None, "freq_step_khz": None, }