import sys import numpy as np import string from PySide6.QtWidgets import ( QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QGridLayout, QPushButton, QLabel, QLineEdit, QFileDialog, QListWidget, QListWidgetItem, QGroupBox, QMessageBox, QCheckBox ) from PySide6.QtCore import Qt # Для PySide6 используем: from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas # Если бы использовали PyQt5: from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas # Если PySide6 + matplotlib>=3.5, часто тоже 'qt5agg' работает, либо 'qt6agg'. from matplotlib.figure import Figure class MplCanvas(FigureCanvas): """ Класс-обёртка для холста matplotlib, чтобы легко встраивать в PySide/PyQt. """ def __init__(self, parent=None, width=5, height=4, dpi=100): self.fig = Figure(figsize=(width, height), dpi=dpi) self.ax = self.fig.add_subplot(111) super(MplCanvas, self).__init__(self.fig) self.setParent(parent) self.fig.tight_layout() class MainWindow(QMainWindow): def __init__(self): super().__init__() self.setWindowTitle("Пример GUI: сигналы, спектр, демодуляция + фильтрация") # Основной виджет и лейаут central_widget = QWidget() self.setCentralWidget(central_widget) main_layout = QVBoxLayout() central_widget.setLayout(main_layout) # ========== Блок кнопок и полей ввода ========== controls_layout = QGridLayout() # Кнопка "Загрузить CSV" self.load_btn = QPushButton("Загрузить CSV") self.load_btn.clicked.connect(self.load_csv) controls_layout.addWidget(self.load_btn, 0, 0, 1, 3) # Список доступных каналов (столбцов) self.channels_list = QListWidget() self.channels_list.setSelectionMode(QListWidget.MultiSelection) self.channels_list.setFixedHeight(100) controls_layout.addWidget(QLabel("Выберите каналы:"), 1, 0) controls_layout.addWidget(self.channels_list, 2, 0, 1, 3) # Поле ввода центральной частоты (МГц) self.fc_label = QLabel("Центральная частота (МГц):") self.fc_input = QLineEdit("0") controls_layout.addWidget(self.fc_label, 3, 0) controls_layout.addWidget(self.fc_input, 3, 1) # Два поля: начальное и конечное время (нс) self.start_time_label = QLabel("Начальное время (нс):") self.start_time_input = QLineEdit("0") controls_layout.addWidget(self.start_time_label, 4, 0) controls_layout.addWidget(self.start_time_input, 4, 1) self.end_time_label = QLabel("Конечное время (нс):") self.end_time_input = QLineEdit("1000") controls_layout.addWidget(self.end_time_label, 5, 0) controls_layout.addWidget(self.end_time_input, 5, 1) # Кнопки для построения (исходный сигнал) self.plot_time_btn = QPushButton("Построить временной график") self.plot_time_btn.clicked.connect(self.plot_time_domain) controls_layout.addWidget(self.plot_time_btn, 6, 0) self.plot_spectrum_btn = QPushButton("Построить спектр") self.plot_spectrum_btn.clicked.connect(self.plot_spectrum) controls_layout.addWidget(self.plot_spectrum_btn, 6, 1) # Новые элементы: Коэффициент децимации + кнопка "Демодуляция и децимация" self.dec_factor_label = QLabel("Коэффициент децимации:") self.dec_factor_input = QLineEdit("4") # По умолчанию 4 controls_layout.addWidget(self.dec_factor_label, 7, 0) controls_layout.addWidget(self.dec_factor_input, 7, 1) self.demod_dec_btn = QPushButton("Демодуляция и децимация") self.demod_dec_btn.clicked.connect(self.demodulate_and_decimate) controls_layout.addWidget(self.demod_dec_btn, 7, 2) # ====== Чекбокс для скользящего среднего ====== self.use_moving_average_checkbox = QCheckBox("Применять фильтрацию (скользящее среднее)") controls_layout.addWidget(self.use_moving_average_checkbox, 8, 0, 1, 3) # ====== Чекбокс для полосового фильтра + поля ввода ====== self.use_bandpass_checkbox = QCheckBox("Применять полосовой фильтр") controls_layout.addWidget(self.use_bandpass_checkbox, 9, 0, 1, 3) self.bandpass_low_label = QLabel("Нижняя частота (МГц):") self.bandpass_low_input = QLineEdit("10") controls_layout.addWidget(self.bandpass_low_label, 10, 0) controls_layout.addWidget(self.bandpass_low_input, 10, 1) self.bandpass_high_label = QLabel("Верхняя частота (МГц):") self.bandpass_high_input = QLineEdit("50") controls_layout.addWidget(self.bandpass_high_label, 11, 0) controls_layout.addWidget(self.bandpass_high_input, 11, 1) main_layout.addLayout(controls_layout) # ========== Блок для отображения графиков ========== plots_layout = QHBoxLayout() # Левый блок (временной график - с учётом фильтра, если включен) self.time_group = QGroupBox("Временная зависимость") time_layout = QVBoxLayout() self.time_canvas = MplCanvas(self, width=5, height=4) time_layout.addWidget(self.time_canvas) self.time_group.setLayout(time_layout) plots_layout.addWidget(self.time_group, stretch=1) # Правый блок (спектр - с учётом фильтра, если включен) self.freq_group = QGroupBox("Спектр") freq_layout = QVBoxLayout() self.freq_canvas = MplCanvas(self, width=5, height=4) freq_layout.addWidget(self.freq_canvas) self.freq_group.setLayout(freq_layout) plots_layout.addWidget(self.freq_group, stretch=1) main_layout.addLayout(plots_layout) # ======== Второй ряд для демодулированных данных ======== demod_layout = QHBoxLayout() # График во временной области после демодуляции/децимации self.demod_time_group = QGroupBox("Временная зависимость (после демодуляции и децимации)") demod_time_layout = QVBoxLayout() self.demod_time_canvas = MplCanvas(self, width=5, height=4) demod_time_layout.addWidget(self.demod_time_canvas) self.demod_time_group.setLayout(demod_time_layout) demod_layout.addWidget(self.demod_time_group, stretch=1) main_layout.addLayout(demod_layout) # Инициализируем переменные для хранения данных self.csv_data = None # Исходные данные из CSV (numpy-массив) self.demod_data = None # Данные после демодуляции/децимации self.sampling_rate_ns = 13 # Шаг во времени 13 нс self.loaded_file_path = None # Показываем окно self.resize(1400, 800) self.show() def load_csv(self): """ Загрузка CSV-файла и заполнение списка каналов. Учтён случай, когда в конце строк есть лишняя запятая. """ file_dialog = QFileDialog(self, "Выберите CSV-файл", ".", "CSV Files (*.csv)") if file_dialog.exec(): file_path = file_dialog.selectedFiles()[0] else: return if not file_path: return try: # Открываем файл и обрезаем лишнюю запятую в конце строки with open(file_path, 'r', encoding='utf-8') as f: cleaned_lines = [] for line in f: line = line.strip() # Убираем пробелы и \n if line.endswith(','): # Если строка заканчивается запятой line = line[:-1] # Удаляем её cleaned_lines.append(line) # Передаём полученные «чистые» строки в loadtxt loaded_array = np.loadtxt(cleaned_lines, delimiter=',') # Если массив одномерный (ndim=1), превращаем в (N, 1) if loaded_array.ndim == 1: loaded_array = loaded_array.reshape(-1, 1) # Сохраняем в self.csv_data self.csv_data = loaded_array # Определяем число столбцов num_cols = self.csv_data.shape[1] # Заполняем список каналов self.channels_list.clear() for i in range(num_cols): channel = string.ascii_uppercase[i] item = QListWidgetItem(f"Канал {channel}") # Пусть у первого канала будет Checked по умолчанию item.setCheckState(Qt.Checked if i == 0 else Qt.Unchecked) self.channels_list.addItem(item) self.loaded_file_path = file_path QMessageBox.information(self, "Загрузка CSV", f"Файл {file_path} успешно загружен.") except Exception as e: QMessageBox.warning(self, "Ошибка", f"Не удалось загрузить файл: {e}") self.csv_data = None def get_time_slice_indices(self): """ Вспомогательный метод: получить start_idx, end_idx из полей ввода, с учётом sampling_rate_ns и размеров массива csv_data. """ if self.csv_data is None: return None, None try: start_ns = float(self.start_time_input.text()) end_ns = float(self.end_time_input.text()) if end_ns <= start_ns: raise ValueError("end_ns must be > start_ns") except ValueError: return None, None dt = self.sampling_rate_ns start_idx = int(round(start_ns / dt)) end_idx = int(round(end_ns / dt)) if start_idx < 0: start_idx = 0 if end_idx > self.csv_data.shape[0]: end_idx = self.csv_data.shape[0] if start_idx >= end_idx: return None, None return start_idx, end_idx def get_selected_channels(self): """ Вспомогательный метод: получить индексы выбранных каналов. """ selected_channels = [] for i in range(self.channels_list.count()): item = self.channels_list.item(i) if item.checkState() == Qt.Checked: selected_channels.append(i) return selected_channels # ---------------------------------------------------------------------------- # Вспомогательные методы фильтрации # ---------------------------------------------------------------------------- def moving_average_filter(self, data, window_size=25): """ Фильтр скользящего среднего. data.shape = (N, num_channels) Возвращаем копию data (float) с отфильтрованными значениями. """ filtered = np.zeros_like(data, dtype=float) kernel = np.ones(window_size) / window_size for ch in range(data.shape[1]): y = data[:, ch] y_filt = np.convolve(y, kernel, mode='same') filtered[:, ch] = y_filt return filtered def bandpass_filter_fft(self, data, dt_s, f_low_hz, f_high_hz): """ Пример полосового фильтра в частотной области: обнуляем все компоненты спектра, которые лежат вне диапазона [f_low_hz, f_high_hz]. data.shape = (N, num_channels) dt_s — шаг по времени, с f_low_hz, f_high_hz — границы полосы (Гц) Возвращаем отфильтрованный сигнал (float). """ N = data.shape[0] num_ch = data.shape[1] filtered = np.zeros_like(data, dtype=float) # Частоты для FFT freqs = np.fft.fftfreq(N, d=dt_s) for ch in range(num_ch): y = data[:, ch] Y = np.fft.fft(y) # Создаём маску, где частоты внутри [f_low_hz, f_high_hz] (по модулю) mask = (np.abs(freqs) >= f_low_hz) & (np.abs(freqs) <= f_high_hz) # Обнулим всё, что вне маски Y[~mask] = 0.0 # Обратное преобразование y_filt = np.fft.ifft(Y) filtered[:, ch] = np.real(y_filt) # сигнал считаем вещественным return filtered def apply_filters_if_needed(self, data, dt_s): """ Последовательно применяем фильтры, если соответствующие чекбоксы включены: 1) Полосовой фильтр (bandpass) 2) Фильтр скользящего среднего (moving average) """ # data.shape = (N, num_channels) out = data.copy() # 1) Проверяем, включён ли чекбокс "Применять полосовой фильтр" if self.use_bandpass_checkbox.isChecked(): try: f_low_mhz = float(self.bandpass_low_input.text()) f_high_mhz = float(self.bandpass_high_input.text()) if f_high_mhz <= f_low_mhz: raise ValueError except ValueError: QMessageBox.warning(self, "Ошибка", "Некорректные границы полосового фильтра.") # Если ошибка, не делаем полосовой фильтр else: f_low_hz = f_low_mhz * 1e6 f_high_hz = f_high_mhz * 1e6 out = self.bandpass_filter_fft(out, dt_s, f_low_hz, f_high_hz) # 2) Проверяем, включён ли чекбокс "Применять фильтрацию (скользящее среднее)" if self.use_moving_average_checkbox.isChecked(): out = self.moving_average_filter(out, window_size=25) return out # ---------------------------------------------------------------------------- # Основные методы: построение во времени, в спектре, демодуляция # ---------------------------------------------------------------------------- def plot_time_domain(self): """ Построение временной зависимости выбранных каналов (с учётом фильтрации, если включена). """ if self.csv_data is None: QMessageBox.warning(self, "Ошибка", "Сперва загрузите CSV-файл.") return start_idx, end_idx = self.get_time_slice_indices() if start_idx is None or end_idx is None: QMessageBox.warning(self, "Ошибка", "Некорректный интервал времени.") return selected_channels = self.get_selected_channels() if not selected_channels: QMessageBox.warning(self, "Ошибка", "Выберите хотя бы один канал для построения.") return # Срезаем нужные данные raw_cropped = self.csv_data[start_idx:end_idx, :] # Нормируем к вольтам raw_cropped_v = raw_cropped / 32767.0 * 5.0 # Применяем фильтры (если чекбоксы включены) dt_s = self.sampling_rate_ns * 1e-9 data_for_plot = self.apply_filters_if_needed(raw_cropped_v, dt_s) # Очищаем ось self.time_canvas.ax.clear() dt = self.sampling_rate_ns time_axis_ns = np.arange(start_idx, end_idx) * dt # в нс for ch in selected_channels: y = data_for_plot[:, ch] self.time_canvas.ax.plot(time_axis_ns, y, label=f"Канал {ch+1}") self.time_canvas.ax.set_xlabel("Время (нс)") self.time_canvas.ax.set_ylabel("Амплитуда (В)") self.time_canvas.ax.legend() self.time_canvas.ax.grid(True) self.time_canvas.draw() def plot_spectrum(self): """ Построение спектра для выбранных каналов (с учётом фильтрации). """ if self.csv_data is None: QMessageBox.warning(self, "Ошибка", "Сперва загрузите CSV-файл.") return # Считываем центральную частоту (МГц) — чтобы отметить её на графике try: fc_mhz = float(self.fc_input.text()) except ValueError: fc_mhz = 0.0 start_idx, end_idx = self.get_time_slice_indices() if start_idx is None or end_idx is None: QMessageBox.warning(self, "Ошибка", "Некорректный интервал времени.") return selected_channels = self.get_selected_channels() if not selected_channels: QMessageBox.warning(self, "Ошибка", "Выберите хотя бы один канал для построения.") return # Срезаем нужные данные raw_cropped = self.csv_data[start_idx:end_idx, :] # Нормируем к вольтам raw_cropped_v = raw_cropped / 32767.0 * 5.0 # Применяем фильтры (если нужно) dt_s = self.sampling_rate_ns * 1e-9 data_for_fft = self.apply_filters_if_needed(raw_cropped_v, dt_s) # Очищаем ось self.freq_canvas.ax.clear() # Параметры для FFT n_win = data_for_fft.shape[0] # число точек freqs = np.fft.fftfreq(n_win, d=dt_s) # массив частот (в Гц) freqs_mhz = freqs / 1e6 # переведём в МГц for ch in selected_channels: y = data_for_fft[:, ch] # FFT Y = np.fft.fft(y) spectrum = np.abs(Y) self.freq_canvas.ax.plot(freqs_mhz, spectrum, label=f"Канал {ch+1}") # Добавим вертикальную линию на центральной частоте (если нужно) if fc_mhz != 0.0: self.freq_canvas.ax.axvline(x=fc_mhz, color='red', linestyle='--', label="Центр") self.freq_canvas.ax.set_xlabel("Частота (МГц)") self.freq_canvas.ax.set_ylabel("Амплитуда спектра (отн. ед.)") self.freq_canvas.ax.legend() self.freq_canvas.ax.grid(True) self.freq_canvas.draw() def demodulate_and_decimate(self): """ Демодуляция (IQ-смещение) на центральную частоту fc_mhz и децимация для выбранных каналов, исходя из заданного интервала [start_ns, end_ns]. При этом берём либо исходный, либо уже отфильтрованный сигнал. Результат во временной области (I-компонента) выводим на self.demod_time_canvas. """ if self.csv_data is None: QMessageBox.warning(self, "Ошибка", "Сперва загрузите CSV-файл.") return # Считываем центральную частоту (МГц) try: fc_mhz = float(self.fc_input.text()) except ValueError: fc_mhz = 0.0 # Считываем коэффициент децимации try: dec_factor = int(self.dec_factor_input.text()) if dec_factor < 1: raise ValueError except ValueError: QMessageBox.warning(self, "Ошибка", "Некорректный коэффициент децимации.") return start_idx, end_idx = self.get_time_slice_indices() if start_idx is None or end_idx is None: QMessageBox.warning(self, "Ошибка", "Некорректный интервал времени.") return selected_channels = self.get_selected_channels() if not selected_channels: QMessageBox.warning(self, "Ошибка", "Выберите хотя бы один канал для обработки.") return # Берём нужный срез по времени, нормируем к вольтам cropped_data = self.csv_data[start_idx:end_idx, :] cropped_data_v = cropped_data / 32767.0 * 5.0 # Применяем фильтры (если включены чекбоксы) dt_s = self.sampling_rate_ns * 1e-9 data_for_demod = self.apply_filters_if_needed(cropped_data_v, dt_s) # Рассчитаем временные метки (в секундах) для исходного сигнала n_win = data_for_demod.shape[0] t_vec = np.arange(n_win) * dt_s # Переводим частоту в Гц fc_hz = fc_mhz * 1e6 # Создадим массив для результирующего сигнала (комплексный) self.demod_data = np.zeros((n_win, len(selected_channels)), dtype=np.complex64) # Для каждой выбранной колонки — демодулируем for idx, ch in enumerate(selected_channels): y = data_for_demod[:, ch] # Умножение на e^-j(2π fc t) для смещения mixer = np.exp(-1j * 2 * np.pi * fc_hz * t_vec) y_demod = y * mixer self.demod_data[:, idx] = y_demod # Далее простейшая децимация (прореживание) — без доп. фильтра self.demod_data = self.demod_data[::dec_factor, :] # Очищаем ось для отображения демодулированного сигнала self.demod_time_canvas.ax.clear() # Создадим новую временную ось для децимированного сигнала decimated_dt_s = dt_s * dec_factor dec_time_vec_ns = (np.arange(self.demod_data.shape[0]) * decimated_dt_s) * 1e9 # в нс # Построим только I-компоненту для каждого канала for idx, ch in enumerate(selected_channels): y_i = np.real(self.demod_data[:, idx]) self.demod_time_canvas.ax.plot(dec_time_vec_ns, y_i, label=f"Канал {ch+1} (I)") self.demod_time_canvas.ax.set_xlabel("Время после децимации (нс)") self.demod_time_canvas.ax.set_ylabel("I-компонента (В)") self.demod_time_canvas.ax.legend() self.demod_time_canvas.ax.grid(True) self.demod_time_canvas.draw() QMessageBox.information( self, "Демодуляция и децимация", f"Демодуляция на {fc_mhz} МГц и децимация (factor={dec_factor}) выполнены." ) def main(): app = QApplication(sys.argv) window = MainWindow() sys.exit(app.exec()) if __name__ == "__main__": main()