import sys import numpy as np import string from PySide6.QtWidgets import ( QApplication, QMainWindow, QWidget, QDockWidget, QVBoxLayout, QHBoxLayout, QPushButton, QLabel, QLineEdit, QFileDialog, QListWidget, QListWidgetItem, QGroupBox, QMessageBox, QCheckBox ) from PySide6.QtCore import Qt # Для встроенного тулбара matplotlib from matplotlib.backends.backend_qtagg import NavigationToolbar2QT as NavigationToolbar from scipy.io import savemat # <-- Импорт для .mat сохранения from mpl_canvas import MplCanvas from plotting import ( plot_time_domain, plot_spectrum, plot_demodulated_time_domain ) class MainWindow(QMainWindow): def __init__(self): super().__init__() self.csv_data = None self.demod_data = None self.demod_time_ns = None self.sampling_rate_ns = 13 self.loaded_file_path = None self.init_ui() def init_ui(self): self.setWindowTitle("Demo: Signal / Spectrum / Demodulation plotting by Vladimir Pugovkin") central_widget = QWidget() self.setCentralWidget(central_widget) main_layout = QVBoxLayout(central_widget) # ------------ Первая строка с кнопками ----------- top_bar_1 = QHBoxLayout() top_bar_1.setSpacing(8) self.load_btn = QPushButton("Load CSV") self.load_btn.setFixedWidth(130) self.load_btn.clicked.connect(self.load_csv) top_bar_1.addWidget(self.load_btn) # Чекбокс для bandpass self.use_bandpass_checkbox = QCheckBox("Bandpass Filter") top_bar_1.addWidget(self.use_bandpass_checkbox) bandpass_label_low = QLabel("Lower freq (MHz):") self.bandpass_low_input = QLineEdit("10") self.bandpass_low_input.setFixedWidth(60) bandpass_label_high = QLabel("Upper freq (MHz):") self.bandpass_high_input = QLineEdit("50") self.bandpass_high_input.setFixedWidth(60) top_bar_1.addWidget(bandpass_label_low) top_bar_1.addWidget(self.bandpass_low_input) top_bar_1.addWidget(bandpass_label_high) top_bar_1.addWidget(self.bandpass_high_input) top_bar_1.setSpacing(15) self.plot_time_btn = QPushButton("Plot Time Domain") self.plot_time_btn.setFixedWidth(180) self.plot_time_btn.clicked.connect(self.plot_time_domain) top_bar_1.addWidget(self.plot_time_btn) # --- Дополнительные поля для спектра --- freq_range_label = QLabel("Spectrum range (MHz):") self.freq_min_input = QLineEdit("-100") self.freq_min_input.setFixedWidth(60) self.freq_max_input = QLineEdit("100") self.freq_max_input.setFixedWidth(60) top_bar_1.addWidget(freq_range_label) top_bar_1.addWidget(self.freq_min_input) top_bar_1.addWidget(self.freq_max_input) # Убрали чекбокс "Shift spectrum to 0?" так как он не нужен # Изменили текст чекбокса "Plot demod data?" -> "Plot demod data" self.plot_demod_checkbox = QCheckBox("Plot demod data") top_bar_1.addWidget(self.plot_demod_checkbox) self.plot_spectrum_btn = QPushButton("Plot Spectrum") self.plot_spectrum_btn.setFixedWidth(180) self.plot_spectrum_btn.clicked.connect(self.plot_spectrum) top_bar_1.addWidget(self.plot_spectrum_btn) # ---------------- Кнопка экспорта в .mat ------------- self.export_btn = QPushButton("Export to .mat") self.export_btn.setFixedWidth(180) self.export_btn.clicked.connect(self.export_data_to_mat) top_bar_1.addWidget(self.export_btn) top_bar_1.addStretch() main_layout.addLayout(top_bar_1) # ------------ Вторая строка с элементами ----------- top_bar_2 = QHBoxLayout() top_bar_2.setSpacing(15) label_fc = QLabel("Central freq (MHz):") self.fc_input = QLineEdit("0") self.fc_input.setFixedWidth(60) label_st = QLabel("Start time (ns):") self.start_time_input = QLineEdit("0") self.start_time_input.setFixedWidth(70) label_end = QLabel("End time (ns):") self.end_time_input = QLineEdit("1000") self.end_time_input.setFixedWidth(70) label_dec = QLabel("Decimation factor:") self.decimation_factor_input = QLineEdit("4") self.decimation_factor_input.setFixedWidth(60) top_bar_2.addWidget(label_fc) top_bar_2.addWidget(self.fc_input) top_bar_2.addSpacing(5) top_bar_2.addWidget(label_st) top_bar_2.addWidget(self.start_time_input) top_bar_2.addWidget(label_end) top_bar_2.addWidget(self.end_time_input) top_bar_2.addSpacing(5) top_bar_2.addWidget(label_dec) top_bar_2.addWidget(self.decimation_factor_input) self.demod_dec_btn = QPushButton("Demodulation + Decimation") self.demod_dec_btn.setFixedWidth(300) self.demod_dec_btn.clicked.connect(self.demodulate_and_decimate) top_bar_2.addWidget(self.demod_dec_btn) top_bar_2.addStretch() main_layout.addLayout(top_bar_2) # ============ Блоки с графиками (Time, Spectrum, Demod) ============= plot_row_1 = QHBoxLayout() # --- Группа "Time Domain" --- self.time_group = QGroupBox("Time Domain") time_layout = QVBoxLayout(self.time_group) self.time_canvas = MplCanvas(self, width=5, height=4) time_layout.addWidget(self.time_canvas) # Добавляем тулбар: self.time_toolbar = NavigationToolbar(self.time_canvas, self.time_group) time_layout.addWidget(self.time_toolbar) # --- Группа "Spectrum" --- self.freq_group = QGroupBox("Spectrum") freq_layout = QVBoxLayout(self.freq_group) self.freq_canvas = MplCanvas(self, width=5, height=4) freq_layout.addWidget(self.freq_canvas) self.freq_toolbar = NavigationToolbar(self.freq_canvas, self.freq_group) freq_layout.addWidget(self.freq_toolbar) plot_row_1.addWidget(self.time_group, stretch=1) plot_row_1.addWidget(self.freq_group, stretch=1) main_layout.addLayout(plot_row_1) # --- Вторая строка с графиком демодулированного сигнала --- plot_row_2 = QHBoxLayout() self.demod_group = QGroupBox("Demodulated Signal") demod_layout = QVBoxLayout(self.demod_group) self.demod_time_canvas = MplCanvas(self, width=5, height=3) demod_layout.addWidget(self.demod_time_canvas) self.demod_toolbar = NavigationToolbar(self.demod_time_canvas, self.demod_group) demod_layout.addWidget(self.demod_toolbar) plot_row_2.addWidget(self.demod_group, stretch=1) main_layout.addLayout(plot_row_2) self.demod_group.raise_() # ============ DockWidget для выбора каналов ============ dock = QDockWidget("Channels", self) dock.setFeatures( QDockWidget.DockWidgetFloatable | QDockWidget.DockWidgetMovable | QDockWidget.DockWidgetVerticalTitleBar ) self.channels_list = QListWidget() self.channels_list.setSelectionMode(QListWidget.MultiSelection) self.channels_list.setMaximumWidth(250) dock_widget = QWidget() vbox = QVBoxLayout(dock_widget) label_ch = QLabel("Select channels:") vbox.addWidget(label_ch) vbox.addWidget(self.channels_list) dock_widget.setLayout(vbox) dock.setWidget(dock_widget) self.addDockWidget(Qt.RightDockWidgetArea, dock) # ================== Стили =================== self.setStyleSheet(""" QMainWindow { background-color: #FAFAFA; } QDockWidget { background-color: #F4F7F9; } QDockWidget::title { background-color: #34495E; color: white; padding: 8px; font-size: 16px; } QGroupBox { border: 2px solid #3498db; border-radius: 9px; margin-top: 30px; font-weight: bold; font-size: 20px; } QGroupBox::title { subcontrol-origin: margin; subcontrol-position: top center; padding: -5px 25px; font-size: 20px; } QPushButton { background-color: #3DAEE9; color: white; border-radius: 9px; padding: 6px 16px; font-size: 16px; } QPushButton:hover { background-color: #3592CC; } QPushButton:pressed { background-color: #2A7098; } QCheckBox { font-size: 18px; } QLineEdit { padding: 3px; font-size: 18px; } QLabel { font-size: 18px; } """) self.showMaximized() # ========== Загрузка CSV ========== def load_csv(self): file_dialog = QFileDialog(self, "Select .csv file", ".", "CSV Files (*.csv)") if file_dialog.exec(): file_path = file_dialog.selectedFiles()[0] else: return if not file_path: return try: with open(file_path, 'r', encoding='utf-8') as f: cleaned_lines = [] for line in f: line = line.strip() if line.endswith(','): line = line[:-1] cleaned_lines.append(line) loaded_array = np.loadtxt(cleaned_lines, delimiter=',') if loaded_array.ndim == 1: loaded_array = loaded_array.reshape(-1, 1) self.csv_data = loaded_array num_cols = self.csv_data.shape[1] self.channels_list.clear() for i in range(num_cols): channel_name = string.ascii_uppercase[i] if i < 26 else f"CH{i}" item = QListWidgetItem(f"Channel {channel_name}") if i == 0: item.setCheckState(Qt.Checked) else: item.setCheckState(Qt.Unchecked) self.channels_list.addItem(item) self.loaded_file_path = file_path QMessageBox.information( self, "File loaded", f"CSV file loaded:\n{file_path}" ) except Exception as e: QMessageBox.warning(self, "Error loading", f"Could not load file:\n{e}") self.csv_data = None # ========== Определение интервала времени ========== def get_time_slice_indices(self): if self.csv_data is None: return None, None try: start_ns = float(self.start_time_input.text()) end_ns = float(self.end_time_input.text()) if end_ns <= start_ns: raise ValueError("end <= start") except ValueError: return None, None dt_ns = self.sampling_rate_ns start_idx = int(round(start_ns / dt_ns)) end_idx = int(round(end_ns / dt_ns)) if start_idx < 0: start_idx = 0 if end_idx > self.csv_data.shape[0]: end_idx = self.csv_data.shape[0] if start_idx >= end_idx: return None, None return start_idx, end_idx # ========== Выбор каналов ========== def get_selected_channels(self): indices = [] for i in range(self.channels_list.count()): item = self.channels_list.item(i) if item.checkState() == Qt.Checked: indices.append(i) return indices # ========== Полосовой фильтр через FFT ========== def bandpass_filter_fft(self, data, dt_s, f_low_hz, f_high_hz): N = data.shape[0] num_ch = data.shape[1] filtered = np.zeros_like(data, dtype=float) freqs = np.fft.fftfreq(N, d=dt_s) for ch in range(num_ch): y = data[:, ch] Y = np.fft.fft(y) # Создаём маску в нужном диапазоне mask = (np.abs(freqs) >= f_low_hz) & (np.abs(freqs) <= f_high_hz) Y[~mask] = 0.0 y_filt = np.fft.ifft(Y) filtered[:, ch] = np.real(y_filt) return filtered # ========== Применить фильтр при необходимости ========== def apply_bandpass_if_needed(self, data, dt_s): out = data.copy() if self.use_bandpass_checkbox.isChecked(): try: f_low_mhz = float(self.bandpass_low_input.text()) f_high_mhz = float(self.bandpass_high_input.text()) if f_high_mhz <= f_low_mhz: raise ValueError except ValueError: QMessageBox.warning(self, "Bandpass Error", "Incorrect bandpass frequencies.") else: f_low_hz = f_low_mhz * 1e6 f_high_hz = f_high_mhz * 1e6 out = self.bandpass_filter_fft(out, dt_s, f_low_hz, f_high_hz) return out # ========== Построение временной области ========== def plot_time_domain(self): if self.csv_data is None: QMessageBox.warning(self, "Error", "No CSV loaded.") return start_idx, end_idx = self.get_time_slice_indices() if start_idx is None or end_idx is None: QMessageBox.warning(self, "Error", "Invalid time interval.") return selected_channels = self.get_selected_channels() if not selected_channels: QMessageBox.warning(self, "Error", "No channels selected.") return raw_cropped = self.csv_data[start_idx:end_idx, :] # Перевод в вольты raw_cropped_v = raw_cropped / 32767.0 * 5.0 dt_s = self.sampling_rate_ns * 1e-9 data_for_plot = self.apply_bandpass_if_needed(raw_cropped_v, dt_s) self.time_canvas.ax.clear() time_axis_ns = np.arange(start_idx, end_idx) * self.sampling_rate_ns channel_labels = [ f"Channel {string.ascii_uppercase[ch] if ch<26 else ch}" for ch in selected_channels ] plot_time_domain( axis=self.time_canvas.ax, time_axis_ns=time_axis_ns, data=data_for_plot[:, selected_channels], channels=channel_labels ) self.time_canvas.draw() # ========== Построение спектра ========== def plot_spectrum(self): if self.csv_data is None: QMessageBox.warning(self, "Error", "No CSV loaded.") return try: frequency_mhz = float(self.fc_input.text()) except ValueError: frequency_mhz = 0.0 try: freq_min = float(self.freq_min_input.text()) freq_max = float(self.freq_max_input.text()) except ValueError: freq_min, freq_max = None, None start_idx, end_idx = self.get_time_slice_indices() if start_idx is None or end_idx is None: QMessageBox.warning(self, "Error", "Invalid time interval.") return selected_channels = self.get_selected_channels() if not selected_channels: QMessageBox.warning(self, "Error", "No channels selected.") return # Если включён режим построения спектра демодулированных данных if self.plot_demod_checkbox.isChecked(): if self.demod_data is None or self.demod_time_ns is None: QMessageBox.warning(self, "Error", "No demodulated data. Please demodulate first.") return decimation_factor = self.get_decimation_factor() if not decimation_factor: return dt_s = (self.sampling_rate_ns * 1e-9) * decimation_factor data_for_fft = self.demod_data[:, selected_channels] n_win = data_for_fft.shape[0] freq_axis_mhz = np.fft.fftfreq(n_win, d=dt_s) / 1e6 self.freq_canvas.ax.clear() channel_labels = [ f"Channel {string.ascii_uppercase[ch] if ch<26 else ch}" for ch in selected_channels ] plot_spectrum( axis=self.freq_canvas.ax, frequency=freq_axis_mhz, data=data_for_fft, channels=channel_labels, middle_frequency=0.0 # демодулированные данные находятся в базовой полосе ) if freq_min is not None and freq_max is not None and freq_max > freq_min: self.freq_canvas.ax.set_xlim([freq_min, freq_max]) self.freq_canvas.draw() else: raw_cropped = self.csv_data[start_idx:end_idx, :] raw_cropped_v = raw_cropped / 32767.0 * 5.0 dt_s = self.sampling_rate_ns * 1e-9 data_for_fft = self.apply_bandpass_if_needed(raw_cropped_v, dt_s) n_win = data_for_fft.shape[0] freq_axis_mhz = np.fft.fftfreq(n_win, d=dt_s) / 1e6 self.freq_canvas.ax.clear() channel_labels = [ f"Channel {string.ascii_uppercase[ch] if ch<26 else ch}" for ch in selected_channels ] plot_spectrum( axis=self.freq_canvas.ax, frequency=freq_axis_mhz, data=data_for_fft[:, selected_channels], channels=channel_labels, middle_frequency=frequency_mhz # центральная частота берётся из поля ) if freq_min is not None and freq_max is not None and freq_max > freq_min: self.freq_canvas.ax.set_xlim([freq_min, freq_max]) self.freq_canvas.draw() def get_decimation_factor(self): try: decimation_factor = int(self.decimation_factor_input.text()) if decimation_factor < 1: raise ValueError return decimation_factor except ValueError: QMessageBox.warning(self, "Error", "Invalid decimation factor.") return None # ========== Демодуляция и децимация ========== def demodulate_and_decimate(self): if self.csv_data is None: QMessageBox.warning(self, "Error", "No CSV loaded.") return try: frequency_mhz = float(self.fc_input.text()) except ValueError: frequency_mhz = 0.0 decimation_factor = self.get_decimation_factor() if not decimation_factor: return start_idx, end_idx = self.get_time_slice_indices() if start_idx is None or end_idx is None: QMessageBox.warning(self, "Error", "Invalid time interval.") return selected_channels = self.get_selected_channels() if not selected_channels: QMessageBox.warning(self, "Error", "Select at least one channel.") return cropped_data = self.csv_data[start_idx:end_idx, :] cropped_data_v = cropped_data / 32767.0 * 5.0 dt_s = self.sampling_rate_ns * 1e-9 data_for_demod = self.apply_bandpass_if_needed(cropped_data_v, dt_s) n_win = data_for_demod.shape[0] t_vec = np.arange(n_win) * dt_s fc_hz = frequency_mhz * 1e6 # Демодуляция: перенос на нулевую частоту self.demod_data = np.zeros((n_win, len(selected_channels)), dtype=np.complex64) for idx, ch in enumerate(selected_channels): y = data_for_demod[:, ch] mixer = np.exp(-1j * 2 * np.pi * fc_hz * t_vec) self.demod_data[:, idx] = y * mixer # Децимация self.demod_data = self.demod_data[::decimation_factor, :] decimated_dt_s = dt_s * decimation_factor self.demod_time_ns = np.arange(self.demod_data.shape[0]) * decimated_dt_s * 1e9 # Рисуем демодулированный сигнал (I-компонент) self.demod_time_canvas.ax.clear() channel_labels = [ f"Channel {string.ascii_uppercase[ch] if ch<26 else ch}" for ch in selected_channels ] plot_demodulated_time_domain( ax=self.demod_time_canvas.ax, time_ns=self.demod_time_ns, demod_data=self.demod_data, selected_channels=channel_labels ) self.demod_time_canvas.draw() QMessageBox.information( self, "Demodulation", f"Demod on {frequency_mhz} MHz + decim x{decimation_factor} done!" ) # ========== Экспорт в .mat ========== def export_data_to_mat(self): if self.demod_data is None or self.demod_time_ns is None: QMessageBox.warning(self, "Error", "No demodulated data.") return file_dialog = QFileDialog(self, "Save as .mat", ".", "MAT files (*.mat)") file_dialog.setAcceptMode(QFileDialog.AcceptSave) file_dialog.setDefaultSuffix("mat") if file_dialog.exec(): save_path = file_dialog.selectedFiles()[0] else: return if not save_path: return try: savemat(save_path, { 'time_ns': self.demod_time_ns, 'demod_data': self.demod_data }) QMessageBox.information(self, "Export", f"Saved:\n{save_path}") except Exception as e: QMessageBox.warning(self, "Error saving", f"Could not save:\n{e}") def main(): app = QApplication(sys.argv) window = MainWindow() window.show() sys.exit(app.exec()) main()