import os from fastapi import FastAPI, Form from fastapi import HTTPException from fastapi.responses import FileResponse from fastapi.responses import JSONResponse from lib2to3.fixer_util import FromImport from scipy.fft import fft from scipy.signal import butter, sosfilt, decimate from state import create_session, get_session, update_session EXPORT_DIR = "exported_plots" os.makedirs(EXPORT_DIR, exist_ok=True) app = FastAPI() from fastapi import UploadFile, File from uuid import uuid4 import json import base64 import numpy as np import scipy.fft as fft import matplotlib.pyplot as plt class DataDecoder: def __init__(self, filename): self.data = '' with open(filename, 'r') as file: self.data = file.read() self.structed_data = json.loads(self.data) def getRawData(self, averaging_num=0, data_num=0, channel_num=0): for items in self.structed_data: if (items['averaging_num'] == averaging_num and items['data_num'] == data_num): for channel_data in items['channel_data']: if (channel_data['channel_num'] == channel_num): return channel_data['channel_data'] return None def getDataDecoded(self, averaging_num=0, data_num=0, channel_num=0, points=10): rawData = self.getRawData(averaging_num, data_num, channel_num) edata = base64.b64decode(rawData.encode('utf-8')) arr = np.frombuffer(edata, dtype=np.int16, count=points, offset=0) return arr def getDataScaled(self, averaging_num=0, data_num=0, channel_num=0, points=10, range=5.0): decodedData = self.getDataDecoded(averaging_num, data_num, channel_num, points) return range * decodedData / 32768 def getDataRate(self, averaging_num=0, data_num=0): for items in self.structed_data: if (items['averaging_num'] == averaging_num and items['data_num'] == data_num): return items['measurement_rate'] def getDataSpectrum(self, averaging_num=0, data_num=0, channel_num=0, points=10, range=5.0, zero_fill=0, first_idx=0, last_idx=10): scaledData = self.getDataScaled(averaging_num, data_num, channel_num, points, range) zerofilledData = np.append(scaledData[first_idx:last_idx], np.zeros(zero_fill)) rate = self.getDataRate() transferedData = fft.rfft(zerofilledData) * 2 / (last_idx - first_idx) freqs = fft.rfftfreq((last_idx - first_idx) + zero_fill, 1 / rate) spectrum = np.abs(transferedData) phases = np.angle(transferedData) spect_dict = {'freqs': freqs, 'phases': phases, 'spectrum': spectrum} return spect_dict def getDataPoints(self, averaging_num=0, data_num=0): for items in self.structed_data: if (items['averaging_num'] == averaging_num and items['data_num'] == data_num): return items['measurement_points'] @app.post("/upload/") async def upload_json(file: UploadFile = File(...)): try: contents = await file.read() session_id = str(uuid4()) filename = f"uploaded_{session_id}.json" path = os.path.join(EXPORT_DIR, filename) with open(path, "wb") as f: f.write(contents) decoder = DataDecoder(path) points = decoder.getDataPoints() if not points: raise HTTPException(status_code=400, detail="measurement_points not found") points = points[0] rate = decoder.getDataRate() if not rate: raise HTTPException(status_code=400, detail="measurement_rate not found") ch0 = decoder.getDataScaled(channel_num=0, points=points) ch1 = decoder.getDataScaled(channel_num=1, points=points) time = np.arange(len(ch0)) / rate session_id=create_session({ "decoder_path": path, "ch0": ch0, "ch1": ch1, "time": time, "dt": 1 / rate, "rate": rate }) return {"session_id": session_id} except Exception as e: import traceback traceback.print_exc() raise HTTPException(status_code=500, detail=f"Upload failed: {type(e).__name__}: {str(e)}") @app.post("/filter/") def apply_filters(session_id: str = Form(...), dt: float = Form(...), center_freq: float = Form(...), lower_freq: float = Form(...), higher_freq: float = Form(...), low_freq: float = Form(...)): state = get_session(session_id) if not state: raise HTTPException(status_code=404, detail="Session not found") # df = state["raw"] def round_sig(x, sig=4): if x == 0: return 0 from math import log10, floor return round(x, sig - int(floor(log10(abs(x)))) - 1) # dt = round_sig((df['Время'][1] - df['Время'][0]) / (1000), 4) # # dt=df['Время'][1]-df['Время'][0] # fs = 1 / dt # target = df['Канал A'].to_numpy() # ch1 = df['Канал B'].to_numpy() time = state["raw"]["time"] ch0 = np.array(state["raw"]["ch0"]) ch1 = np.array(state["raw"]["ch1"]) dt = round_sig(time[1] - time[0], 4) target = ch0 fs = 1 / dt points_num = len(target) n = int(len(ch1) / points_num) ch1 = np.vstack(np.split(ch1[:n * points_num], n)) target = np.vstack(np.split(target[:n * points_num], n)) bp = butter(4, [lower_freq, higher_freq], 'bp', fs=fs, output='sos') lp = butter(4, low_freq, 'lowpass', fs=fs, output='sos') ch1_f = sosfilt(bp, ch1, axis=1) target_f = sosfilt(bp, target, axis=1) update_session(session_id, "dt", dt) update_session(session_id, "center_freq", center_freq) update_session(session_id, "real", np.sin(2 * np.pi * center_freq * np.arange(0, points_num * dt, dt))) update_session(session_id, "imag", np.cos(2 * np.pi * center_freq * np.arange(0, points_num * dt, dt))) real_part = np.sin(2 * np.pi * center_freq * np.arange(0, ch1.shape[1] * dt, dt)) imag_part = np.cos(2 * np.pi * center_freq * np.arange(0, ch1.shape[1] * dt, dt)) ch1_real = ch1_f * real_part ch1_imag = ch1_f * imag_part ch1_real_f = sosfilt(lp, ch1_real, axis=1) ch1_imag_f = sosfilt(lp, ch1_imag, axis=1) ch1_complex = ch1_real_f + 1j * ch1_imag_f update_session(session_id, "ch1_f", ch1_complex) update_session(session_id, "dt", dt) return {"status": "filtered"} @app.post("/fft/") def compute_fft(session_id: str = Form(...), coef_dec_1: int = Form(10), coef_dec_2: int = Form(2), coef_dec_3: int = Form(1), coef_dec_4: int = Form(1)): state = get_session(session_id) if not state or "ch1_f" not in state: raise HTTPException(status_code=400, detail="Not filtered yet") try: signal = state["ch1_f"] decimated = decimate(signal, coef_dec_1, axis=1) decimated = decimate(decimated, coef_dec_2, axis=1) decimated = decimate(decimated, coef_dec_3, axis=1) decimated = decimate(decimated, coef_dec_4, axis=1) print(coef_dec_1) # decimated = decimate(decimated, 2, axis=1) slice_ = decimated[0][2500:] Td_new = state["dt"] * 10 sp_row = np.abs(fft.fftshift(fft.fft(slice_))) BW = 1 / Td_new dff = BW / (len(slice_) - 1) freq = np.arange(-BW / 2, BW / 2 + dff, dff)[:len(sp_row)] / 1000 update_session(session_id, "fft", { "fid": slice_.real.tolist(), "spectrum": sp_row.tolist(), "frequency": freq.tolist() }) # print(len(decimated[0])) except Exception as e: import traceback traceback.print_exc() raise HTTPException(status_code=500, detail=f"Bad coeff: {type(e).__name__}: {str(e)}") return {"status": "fft done"} @app.get("/result/") def get_result(session_id: str): state = get_session(session_id) if not state or "fft" not in state: raise HTTPException(status_code=400, detail="FFT not yet computed") return JSONResponse(state["fft"]) @app.post("/export/") def export_plots(session_id: str = Form(...)): state = get_session(session_id) if not state or "fft" not in state: raise HTTPException(status_code=400, detail="FFT not computed") fid = state["fft"]["fid"] spectrum = state["fft"]["spectrum"] freq = state["fft"]["frequency"] fid_path = os.path.join(EXPORT_DIR, f"{session_id}_fid.png") spectrum_path = os.path.join(EXPORT_DIR, f"{session_id}_spectrum.png") # print(session_id) # print("DONE") # ========== FID plot ========== plt.figure(figsize=(8, 4)) plt.plot(fid) plt.title("FID") plt.xlabel("Samples") plt.ylabel("Amplitude") plt.grid() plt.tight_layout() plt.savefig(fid_path) plt.close() # ========== Spectrum plot ========= plt.figure(figsize=(8, 4)) plt.plot(freq, spectrum) plt.title("Spectrum") plt.xlabel("Frequency (kHz)") plt.ylabel("Magnitude") plt.grid() plt.tight_layout() plt.savefig(spectrum_path) plt.close() return { "fid_plot": f"/download/{session_id}_fid.png", "spectrum_plot": f"/download/{session_id}_spectrum.png" } @app.post("/plot-raw/") def plot_raw(session_id: str = Form(...)): state = get_session(session_id) if not state: raise HTTPException(status_code=404, detail="Session not found") ch0 = state["raw"]["ch0"] ch1 = state["raw"]["ch1"] fig, axs = plt.subplots(2, 1, figsize=(10, 5), sharex=True) axs[0].plot(ch0) axs[0].set_title("Канал A (весь сигнал)") axs[0].grid() axs[1].plot(ch1) axs[1].set_title("Канал B (весь сигнал)") axs[1].grid() plt.tight_layout() plot_path = os.path.join(EXPORT_DIR, f"{session_id}_raw_channels.png") plt.savefig(plot_path) plt.close() return { "raw_plot": f"/download/{session_id}_raw_channels.png" } @app.get("/download/{filename}") def download_file(filename: str): file_path = os.path.join(EXPORT_DIR, filename) if not os.path.exists(file_path): raise HTTPException(status_code=404, detail="File not found") return FileResponse(file_path, media_type="image/png", filename=filename) @app.post("/export-raw-data/") def export_raw_data(session_id: str = Form(...)): state = get_session(session_id) if not state: raise HTTPException(status_code=404, detail="Session not found") signal = state["raw"]["ch1"] # сырые данные без демодуляции и децимации print(signal) if isinstance(signal, np.ndarray): signal = signal.tolist() return JSONResponse(content={ "status": "raw signal", "data": signal, "path": session_id }) @app.post("/export-filter-data/") def export_filter_data(session_id: str = Form(...), center_freq: float = Form(...), lower_freq: float = Form(...), higher_freq: float = Form(...), low_freq: float = Form(...)): state = get_session(session_id) if not state: raise HTTPException(status_code=404, detail="Session not found") def round_sig(x, sig=4): if x == 0: return 0 from math import log10, floor return round(x, sig - int(floor(log10(abs(x)))) - 1) time = state["raw"]["time"] ch0 = np.array(state["raw"]["ch0"]) ch1 = np.array(state["raw"]["ch1"]) dt = round_sig(time[1] - time[0], 4) target = ch0 fs = 1 / dt points_num = len(target) n = int(len(ch1) / points_num) ch1 = np.vstack(np.split(ch1[:n * points_num], n)) target = np.vstack(np.split(target[:n * points_num], n)) bp = butter(4, [lower_freq, higher_freq], 'bp', fs=fs, output='sos') lp = butter(4, low_freq, 'lowpass', fs=fs, output='sos') ch1_f = sosfilt(bp, ch1, axis=1) target_f = sosfilt(bp, target, axis=1) update_session(session_id, "dt", dt) update_session(session_id, "center_freq", center_freq) update_session(session_id, "real", np.sin(2 * np.pi * center_freq * np.arange(0, points_num * dt, dt))) update_session(session_id, "imag", np.cos(2 * np.pi * center_freq * np.arange(0, points_num * dt, dt))) real_part = np.sin(2 * np.pi * center_freq * np.arange(0, ch1.shape[1] * dt, dt)) imag_part = np.cos(2 * np.pi * center_freq * np.arange(0, ch1.shape[1] * dt, dt)) ch1_real = ch1_f * real_part ch1_imag = ch1_f * imag_part ch1_real_f = sosfilt(lp, ch1_real, axis=1) ch1_imag_f = sosfilt(lp, ch1_imag, axis=1) ch1_complex = ch1_real_f + 1j * ch1_imag_f update_session(session_id, "ch1_f", ch1_complex) update_session(session_id, "dt", dt) # return JSONResponse(state["ch1_f"], state["dt"]) return JSONResponse(content={ "status": "filtered signal", "signal_real": np.array(ch1_complex).real.tolist(), "signal_imag": np.array(ch1_complex).imag.tolist(), "time_data_signal":dt.tolist(), "path": session_id }) @app.post("/export-decdem-data/") def export_filter_data(session_id: str = Form(...)): state = get_session(session_id) if not state: raise HTTPException(status_code=400, detail="Session not found") if not state["ch1_f"]: raise HTTPException(status_code=404, detail="Do filter before") signal = state["ch1_f"] decimated = decimate(signal, 10, axis=1) decimated = decimate(decimated, 2, axis=1) update_session(session_id, "decimated", decimated) # return JSONResponse(decimated) return JSONResponse(content={ "status": "dec and dem signal", "signal_real": np.array(decimated).real.tolist(), "signal_imag": np.array(decimated).imag.tolist(), "path": session_id }) @app.post("/export-position-freq/") def position_frequency_axis(session_id: str = Form(...)): state = get_session(session_id) if not state: raise HTTPException(status_code=400, detail="Session not found") if not state["decimated"]: raise HTTPException(status_code=404, detail="Do decimate before") signal = state["decimated"] N = len(signal) fft_signal = np.fft.fft(signal) amplitude = np.abs(fft_signal)[:N//2] # только положительные частоты # индекс максимальной амплитуды peak_index = np.argmax(amplitude) peak_frequency = peak_index / N return JSONResponse(content={ "status": "peak position", "peak max amplitude in freq": peak_frequency, "path": session_id }) @app.post("/export-FWHM/") def find_FWHM(session_id: str = Form(...)): state = get_session(session_id) if not state: raise HTTPException(status_code=400, detail="Session not found") if not state["decimated"]: raise HTTPException(status_code=404, detail="Do decimate before") signal = state["decimated"] N = len(signal) fft_signal = np.fft.fft(signal) amplitude = np.abs(fft_signal)[:N//2] # положительные частоты max_amplitude = np.max(amplitude) half_max = max_amplitude / 2 indices_above_half_max = np.where(amplitude >= half_max)[0] if len(indices_above_half_max) < 2: raise HTTPException(status_code=200, detail="Bad signal, not found FWHM") FWHM_index_range = indices_above_half_max[-1] - indices_above_half_max[0] FWHM_normalized = FWHM_index_range / N return JSONResponse(content={ "status": "FWHM", "width at half height": FWHM_normalized, "path": session_id }) @app.post("/export-max-amplitude-freq/") def max_amplitude_time_domain(session_id: str = Form(...)): state = get_session(session_id) if not state: raise HTTPException(status_code=400, detail="Session not found") if not state["decimated"]: raise HTTPException(status_code=404, detail="Do decimate before") signal = state["decimated"] max_amplitude = np.max(np.abs(signal)) return JSONResponse(content={ "status": "max amplitude freq", "max amplitude": max_amplitude, "path": session_id })