| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484 |
- import os
- from fastapi import FastAPI, Form
- from fastapi import HTTPException
- from fastapi.responses import FileResponse
- from fastapi.responses import JSONResponse
- from lib2to3.fixer_util import FromImport
- from scipy.fft import fft
- from scipy.signal import butter, sosfilt, decimate
- from state import create_session, get_session, update_session
- EXPORT_DIR = "exported_plots"
- os.makedirs(EXPORT_DIR, exist_ok=True)
- app = FastAPI()
- from fastapi import UploadFile, File
- from uuid import uuid4
- import json
- import base64
- import numpy as np
- import scipy.fft as fft
- import matplotlib.pyplot as plt
- class DataDecoder:
- def __init__(self, filename):
- self.data = ''
- with open(filename, 'r') as file:
- self.data = file.read()
- self.structed_data = json.loads(self.data)
- def getRawData(self, averaging_num=0, data_num=0, channel_num=0):
- for items in self.structed_data:
- if (items['averaging_num'] == averaging_num and items['data_num'] == data_num):
- for channel_data in items['channel_data']:
- if (channel_data['channel_num'] == channel_num):
- return channel_data['channel_data']
- return None
- def getDataDecoded(self, averaging_num=0, data_num=0, channel_num=0, points=10):
- rawData = self.getRawData(averaging_num, data_num, channel_num)
- edata = base64.b64decode(rawData.encode('utf-8'))
- arr = np.frombuffer(edata, dtype=np.int16, count=points, offset=0)
- return arr
- def getDataScaled(self, averaging_num=0, data_num=0, channel_num=0, points=10, range=5.0):
- decodedData = self.getDataDecoded(averaging_num, data_num, channel_num, points)
- return range * decodedData / 32768
- def getDataRate(self, averaging_num=0, data_num=0):
- for items in self.structed_data:
- if (items['averaging_num'] == averaging_num and items['data_num'] == data_num):
- return items['measurement_rate']
- def getDataSpectrum(self, averaging_num=0, data_num=0, channel_num=0, points=10, range=5.0, zero_fill=0,
- first_idx=0, last_idx=10):
- scaledData = self.getDataScaled(averaging_num, data_num, channel_num, points, range)
- zerofilledData = np.append(scaledData[first_idx:last_idx], np.zeros(zero_fill))
- rate = self.getDataRate()
- transferedData = fft.rfft(zerofilledData) * 2 / (last_idx - first_idx)
- freqs = fft.rfftfreq((last_idx - first_idx) + zero_fill, 1 / rate)
- spectrum = np.abs(transferedData)
- phases = np.angle(transferedData)
- spect_dict = {'freqs': freqs,
- 'phases': phases,
- 'spectrum': spectrum}
- return spect_dict
- def getDataPoints(self, averaging_num=0, data_num=0):
- for items in self.structed_data:
- if (items['averaging_num'] == averaging_num and items['data_num'] == data_num):
- return items['measurement_points']
- @app.post("/upload/")
- async def upload_json(file: UploadFile = File(...)):
- try:
- contents = await file.read()
- session_id = str(uuid4())
- filename = f"uploaded_{session_id}.json"
- path = os.path.join(EXPORT_DIR, filename)
- with open(path, "wb") as f:
- f.write(contents)
- decoder = DataDecoder(path)
- points = decoder.getDataPoints()
- if not points:
- raise HTTPException(status_code=400, detail="measurement_points not found")
- points = points[0]
- rate = decoder.getDataRate()
- if not rate:
- raise HTTPException(status_code=400, detail="measurement_rate not found")
- ch0 = decoder.getDataScaled(channel_num=0, points=points)
- ch1 = decoder.getDataScaled(channel_num=1, points=points)
- time = np.arange(len(ch0)) / rate
- session_id=create_session({
- "decoder_path": path,
- "ch0": ch0,
- "ch1": ch1,
- "time": time,
- "dt": 1 / rate,
- "rate": rate
- })
- return {"session_id": session_id}
- except Exception as e:
- import traceback
- traceback.print_exc()
- raise HTTPException(status_code=500, detail=f"Upload failed: {type(e).__name__}: {str(e)}")
- @app.post("/filter/")
- def apply_filters(session_id: str = Form(...), dt: float = Form(...),
- center_freq: float = Form(...),
- lower_freq: float = Form(...),
- higher_freq: float = Form(...),
- low_freq: float = Form(...)):
-
- state = get_session(session_id)
- if not state:
- raise HTTPException(status_code=404, detail="Session not found")
- # df = state["raw"]
- def round_sig(x, sig=4):
- if x == 0:
- return 0
- from math import log10, floor
- return round(x, sig - int(floor(log10(abs(x)))) - 1)
- # dt = round_sig((df['Время'][1] - df['Время'][0]) / (1000), 4)
- # # dt=df['Время'][1]-df['Время'][0]
- # fs = 1 / dt
- # target = df['Канал A'].to_numpy()
- # ch1 = df['Канал B'].to_numpy()
- time = state["raw"]["time"]
- ch0 = np.array(state["raw"]["ch0"])
- ch1 = np.array(state["raw"]["ch1"])
- dt = round_sig(time[1] - time[0], 4)
- target = ch0
- fs = 1 / dt
-
- points_num = len(target)
- n = int(len(ch1) / points_num)
- ch1 = np.vstack(np.split(ch1[:n * points_num], n))
- target = np.vstack(np.split(target[:n * points_num], n))
- bp = butter(4, [lower_freq, higher_freq], 'bp', fs=fs, output='sos')
- lp = butter(4, low_freq, 'lowpass', fs=fs, output='sos')
- ch1_f = sosfilt(bp, ch1, axis=1)
- target_f = sosfilt(bp, target, axis=1)
- update_session(session_id, "dt", dt)
- update_session(session_id, "center_freq", center_freq)
- update_session(session_id, "real", np.sin(2 * np.pi * center_freq * np.arange(0, points_num * dt, dt)))
- update_session(session_id, "imag", np.cos(2 * np.pi * center_freq * np.arange(0, points_num * dt, dt)))
- real_part = np.sin(2 * np.pi * center_freq * np.arange(0, ch1.shape[1] * dt, dt))
- imag_part = np.cos(2 * np.pi * center_freq * np.arange(0, ch1.shape[1] * dt, dt))
- ch1_real = ch1_f * real_part
- ch1_imag = ch1_f * imag_part
- ch1_real_f = sosfilt(lp, ch1_real, axis=1)
- ch1_imag_f = sosfilt(lp, ch1_imag, axis=1)
- ch1_complex = ch1_real_f + 1j * ch1_imag_f
- update_session(session_id, "ch1_f", ch1_complex)
- update_session(session_id, "dt", dt)
- return {"status": "filtered"}
- @app.post("/fft/")
- def compute_fft(session_id: str = Form(...),
- coef_dec_1: int = Form(10),
- coef_dec_2: int = Form(2),
- coef_dec_3: int = Form(1),
- coef_dec_4: int = Form(1)):
-
- state = get_session(session_id)
- if not state or "ch1_f" not in state:
- raise HTTPException(status_code=400, detail="Not filtered yet")
- try:
- signal = state["ch1_f"]
- decimated = decimate(signal, coef_dec_1, axis=1)
- decimated = decimate(decimated, coef_dec_2, axis=1)
- decimated = decimate(decimated, coef_dec_3, axis=1)
- decimated = decimate(decimated, coef_dec_4, axis=1)
-
- print(coef_dec_1)
-
- # decimated = decimate(decimated, 2, axis=1)
- slice_ = decimated[0][2500:]
- Td_new = state["dt"] * 10
- sp_row = np.abs(fft.fftshift(fft.fft(slice_)))
- BW = 1 / Td_new
- dff = BW / (len(slice_) - 1)
- freq = np.arange(-BW / 2, BW / 2 + dff, dff)[:len(sp_row)] / 1000
-
- update_session(session_id, "fft", {
- "fid": slice_.real.tolist(),
- "spectrum": sp_row.tolist(),
- "frequency": freq.tolist()
- })
- # print(len(decimated[0]))
-
- except Exception as e:
- import traceback
- traceback.print_exc()
- raise HTTPException(status_code=500, detail=f"Bad coeff: {type(e).__name__}: {str(e)}")
- return {"status": "fft done"}
- @app.get("/result/")
- def get_result(session_id: str):
- state = get_session(session_id)
- if not state or "fft" not in state:
- raise HTTPException(status_code=400, detail="FFT not yet computed")
- return JSONResponse(state["fft"])
- @app.post("/export/")
- def export_plots(session_id: str = Form(...)):
- state = get_session(session_id)
- if not state or "fft" not in state:
- raise HTTPException(status_code=400, detail="FFT not computed")
- fid = state["fft"]["fid"]
- spectrum = state["fft"]["spectrum"]
- freq = state["fft"]["frequency"]
- fid_path = os.path.join(EXPORT_DIR, f"{session_id}_fid.png")
- spectrum_path = os.path.join(EXPORT_DIR, f"{session_id}_spectrum.png")
- # print(session_id)
- # print("DONE")
- # ========== FID plot ==========
- plt.figure(figsize=(8, 4))
- plt.plot(fid)
- plt.title("FID")
- plt.xlabel("Samples")
- plt.ylabel("Amplitude")
- plt.grid()
- plt.tight_layout()
- plt.savefig(fid_path)
- plt.close()
- # ========== Spectrum plot =========
- plt.figure(figsize=(8, 4))
- plt.plot(freq, spectrum)
- plt.title("Spectrum")
- plt.xlabel("Frequency (kHz)")
- plt.ylabel("Magnitude")
- plt.grid()
- plt.tight_layout()
- plt.savefig(spectrum_path)
- plt.close()
- return {
- "fid_plot": f"/download/{session_id}_fid.png",
- "spectrum_plot": f"/download/{session_id}_spectrum.png"
- }
- @app.post("/plot-raw/")
- def plot_raw(session_id: str = Form(...)):
- state = get_session(session_id)
- if not state:
- raise HTTPException(status_code=404, detail="Session not found")
- ch0 = state["raw"]["ch0"]
- ch1 = state["raw"]["ch1"]
- fig, axs = plt.subplots(2, 1, figsize=(10, 5), sharex=True)
- axs[0].plot(ch0)
- axs[0].set_title("Канал A (весь сигнал)")
- axs[0].grid()
- axs[1].plot(ch1)
- axs[1].set_title("Канал B (весь сигнал)")
- axs[1].grid()
- plt.tight_layout()
- plot_path = os.path.join(EXPORT_DIR, f"{session_id}_raw_channels.png")
- plt.savefig(plot_path)
- plt.close()
- return {
- "raw_plot": f"/download/{session_id}_raw_channels.png"
- }
- @app.get("/download/{filename}")
- def download_file(filename: str):
- file_path = os.path.join(EXPORT_DIR, filename)
- if not os.path.exists(file_path):
- raise HTTPException(status_code=404, detail="File not found")
- return FileResponse(file_path, media_type="image/png", filename=filename)
- @app.post("/export-raw-data/")
- def export_raw_data(session_id: str = Form(...)):
- state = get_session(session_id)
- if not state:
- raise HTTPException(status_code=404, detail="Session not found")
-
- signal = state["raw"]["ch1"] # сырые данные без демодуляции и децимации
- print(signal)
- if isinstance(signal, np.ndarray):
- signal = signal.tolist()
- return JSONResponse(content={
- "status": "raw signal",
- "data": signal,
- "path": session_id
- })
- @app.post("/export-filter-data/")
- def export_filter_data(session_id: str = Form(...),
- center_freq: float = Form(...),
- lower_freq: float = Form(...),
- higher_freq: float = Form(...),
- low_freq: float = Form(...)):
- state = get_session(session_id)
- if not state:
- raise HTTPException(status_code=404, detail="Session not found")
-
- def round_sig(x, sig=4):
- if x == 0:
- return 0
- from math import log10, floor
- return round(x, sig - int(floor(log10(abs(x)))) - 1)
- time = state["raw"]["time"]
- ch0 = np.array(state["raw"]["ch0"])
- ch1 = np.array(state["raw"]["ch1"])
- dt = round_sig(time[1] - time[0], 4)
- target = ch0
- fs = 1 / dt
- points_num = len(target)
- n = int(len(ch1) / points_num)
- ch1 = np.vstack(np.split(ch1[:n * points_num], n))
- target = np.vstack(np.split(target[:n * points_num], n))
- bp = butter(4, [lower_freq, higher_freq], 'bp', fs=fs, output='sos')
- lp = butter(4, low_freq, 'lowpass', fs=fs, output='sos')
- ch1_f = sosfilt(bp, ch1, axis=1)
- target_f = sosfilt(bp, target, axis=1)
- update_session(session_id, "dt", dt)
- update_session(session_id, "center_freq", center_freq)
- update_session(session_id, "real", np.sin(2 * np.pi * center_freq * np.arange(0, points_num * dt, dt)))
- update_session(session_id, "imag", np.cos(2 * np.pi * center_freq * np.arange(0, points_num * dt, dt)))
- real_part = np.sin(2 * np.pi * center_freq * np.arange(0, ch1.shape[1] * dt, dt))
- imag_part = np.cos(2 * np.pi * center_freq * np.arange(0, ch1.shape[1] * dt, dt))
- ch1_real = ch1_f * real_part
- ch1_imag = ch1_f * imag_part
- ch1_real_f = sosfilt(lp, ch1_real, axis=1)
- ch1_imag_f = sosfilt(lp, ch1_imag, axis=1)
- ch1_complex = ch1_real_f + 1j * ch1_imag_f
- update_session(session_id, "ch1_f", ch1_complex)
- update_session(session_id, "dt", dt)
-
- # return JSONResponse(state["ch1_f"], state["dt"])
- return JSONResponse(content={
- "status": "filtered signal",
- "signal_real": np.array(ch1_complex).real.tolist(),
- "signal_imag": np.array(ch1_complex).imag.tolist(),
- "time_data_signal":dt.tolist(),
- "path": session_id
- })
- @app.post("/export-decdem-data/")
- def export_filter_data(session_id: str = Form(...)):
- state = get_session(session_id)
- if not state:
- raise HTTPException(status_code=400, detail="Session not found")
-
- if not state["ch1_f"]:
- raise HTTPException(status_code=404, detail="Do filter before")
-
- signal = state["ch1_f"]
- decimated = decimate(signal, 10, axis=1)
- decimated = decimate(decimated, 2, axis=1)
- update_session(session_id, "decimated", decimated)
-
- # return JSONResponse(decimated)
- return JSONResponse(content={
- "status": "dec and dem signal",
- "signal_real": np.array(decimated).real.tolist(),
- "signal_imag": np.array(decimated).imag.tolist(),
- "path": session_id
- })
- @app.post("/export-position-freq/")
- def position_frequency_axis(session_id: str = Form(...)):
- state = get_session(session_id)
- if not state:
- raise HTTPException(status_code=400, detail="Session not found")
- if not state["decimated"]:
- raise HTTPException(status_code=404, detail="Do decimate before")
-
- signal = state["decimated"]
-
- N = len(signal)
- fft_signal = np.fft.fft(signal)
-
- amplitude = np.abs(fft_signal)[:N//2] # только положительные частоты
- # индекс максимальной амплитуды
- peak_index = np.argmax(amplitude)
-
- peak_frequency = peak_index / N
-
- return JSONResponse(content={
- "status": "peak position",
- "peak max amplitude in freq": peak_frequency,
- "path": session_id
- })
- @app.post("/export-FWHM/")
- def find_FWHM(session_id: str = Form(...)):
- state = get_session(session_id)
- if not state:
- raise HTTPException(status_code=400, detail="Session not found")
- if not state["decimated"]:
- raise HTTPException(status_code=404, detail="Do decimate before")
-
- signal = state["decimated"]
- N = len(signal)
- fft_signal = np.fft.fft(signal)
- amplitude = np.abs(fft_signal)[:N//2] # положительные частоты
- max_amplitude = np.max(amplitude)
- half_max = max_amplitude / 2
- indices_above_half_max = np.where(amplitude >= half_max)[0]
- if len(indices_above_half_max) < 2:
- raise HTTPException(status_code=200, detail="Bad signal, not found FWHM")
- FWHM_index_range = indices_above_half_max[-1] - indices_above_half_max[0]
- FWHM_normalized = FWHM_index_range / N
- return JSONResponse(content={
- "status": "FWHM",
- "width at half height": FWHM_normalized,
- "path": session_id
- })
- @app.post("/export-max-amplitude-freq/")
- def max_amplitude_time_domain(session_id: str = Form(...)):
- state = get_session(session_id)
- if not state:
- raise HTTPException(status_code=400, detail="Session not found")
- if not state["decimated"]:
- raise HTTPException(status_code=404, detail="Do decimate before")
- signal = state["decimated"]
- max_amplitude = np.max(np.abs(signal))
- return JSONResponse(content={
- "status": "max amplitude freq",
- "max amplitude": max_amplitude,
- "path": session_id
- })
|