123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537 |
- import sys
- import numpy as np
- import string
- from PySide6.QtWidgets import (
- QApplication, QMainWindow, QWidget,
- QVBoxLayout, QHBoxLayout, QGridLayout,
- QPushButton, QLabel, QLineEdit, QFileDialog,
- QListWidget, QListWidgetItem, QGroupBox, QMessageBox,
- QCheckBox
- )
- from PySide6.QtCore import Qt
- # Для PySide6 используем:
- from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas
- # Если бы использовали PyQt5: from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas
- # Если PySide6 + matplotlib>=3.5, часто тоже 'qt5agg' работает, либо 'qt6agg'.
- from matplotlib.figure import Figure
- class MplCanvas(FigureCanvas):
- """
- Класс-обёртка для холста matplotlib, чтобы легко встраивать в PySide/PyQt.
- """
- def __init__(self, parent=None, width=5, height=4, dpi=100):
- self.fig = Figure(figsize=(width, height), dpi=dpi)
- self.ax = self.fig.add_subplot(111)
- super(MplCanvas, self).__init__(self.fig)
- self.setParent(parent)
- self.fig.tight_layout()
- class MainWindow(QMainWindow):
-
- def __init__(self):
- super().__init__()
- self.setWindowTitle("Пример GUI: сигналы, спектр, демодуляция + фильтрация")
- # Основной виджет и лейаут
- central_widget = QWidget()
- self.setCentralWidget(central_widget)
- main_layout = QVBoxLayout()
- central_widget.setLayout(main_layout)
- # ========== Блок кнопок и полей ввода ==========
- controls_layout = QGridLayout()
- # Кнопка "Загрузить CSV"
- self.load_btn = QPushButton("Загрузить CSV")
- self.load_btn.clicked.connect(self.load_csv)
- controls_layout.addWidget(self.load_btn, 0, 0, 1, 3)
- # Список доступных каналов (столбцов)
- self.channels_list = QListWidget()
- self.channels_list.setSelectionMode(QListWidget.MultiSelection)
- self.channels_list.setFixedHeight(100)
- controls_layout.addWidget(QLabel("Выберите каналы:"), 1, 0)
- controls_layout.addWidget(self.channels_list, 2, 0, 1, 3)
- # Поле ввода центральной частоты (МГц)
- self.fc_label = QLabel("Центральная частота (МГц):")
- self.fc_input = QLineEdit("0")
- controls_layout.addWidget(self.fc_label, 3, 0)
- controls_layout.addWidget(self.fc_input, 3, 1)
- # Два поля: начальное и конечное время (нс)
- self.start_time_label = QLabel("Начальное время (нс):")
- self.start_time_input = QLineEdit("0")
- controls_layout.addWidget(self.start_time_label, 4, 0)
- controls_layout.addWidget(self.start_time_input, 4, 1)
- self.end_time_label = QLabel("Конечное время (нс):")
- self.end_time_input = QLineEdit("1000")
- controls_layout.addWidget(self.end_time_label, 5, 0)
- controls_layout.addWidget(self.end_time_input, 5, 1)
- # Кнопки для построения (исходный сигнал)
- self.plot_time_btn = QPushButton("Построить временной график")
- self.plot_time_btn.clicked.connect(self.plot_time_domain)
- controls_layout.addWidget(self.plot_time_btn, 6, 0)
- self.plot_spectrum_btn = QPushButton("Построить спектр")
- self.plot_spectrum_btn.clicked.connect(self.plot_spectrum)
- controls_layout.addWidget(self.plot_spectrum_btn, 6, 1)
- # Новые элементы: Коэффициент децимации + кнопка "Демодуляция и децимация"
- self.dec_factor_label = QLabel("Коэффициент децимации:")
- self.dec_factor_input = QLineEdit("4") # По умолчанию 4
- controls_layout.addWidget(self.dec_factor_label, 7, 0)
- controls_layout.addWidget(self.dec_factor_input, 7, 1)
- self.demod_dec_btn = QPushButton("Демодуляция и децимация")
- self.demod_dec_btn.clicked.connect(self.demodulate_and_decimate)
- controls_layout.addWidget(self.demod_dec_btn, 7, 2)
- # ====== Чекбокс для скользящего среднего ======
- self.use_moving_average_checkbox = QCheckBox("Применять фильтрацию (скользящее среднее)")
- controls_layout.addWidget(self.use_moving_average_checkbox, 8, 0, 1, 3)
- # ====== Чекбокс для полосового фильтра + поля ввода ======
- self.use_bandpass_checkbox = QCheckBox("Применять полосовой фильтр")
- controls_layout.addWidget(self.use_bandpass_checkbox, 9, 0, 1, 3)
- self.bandpass_low_label = QLabel("Нижняя частота (МГц):")
- self.bandpass_low_input = QLineEdit("10")
- controls_layout.addWidget(self.bandpass_low_label, 10, 0)
- controls_layout.addWidget(self.bandpass_low_input, 10, 1)
- self.bandpass_high_label = QLabel("Верхняя частота (МГц):")
- self.bandpass_high_input = QLineEdit("50")
- controls_layout.addWidget(self.bandpass_high_label, 11, 0)
- controls_layout.addWidget(self.bandpass_high_input, 11, 1)
- main_layout.addLayout(controls_layout)
- # ========== Блок для отображения графиков ==========
- plots_layout = QHBoxLayout()
- # Левый блок (временной график - с учётом фильтра, если включен)
- self.time_group = QGroupBox("Временная зависимость")
- time_layout = QVBoxLayout()
- self.time_canvas = MplCanvas(self, width=5, height=4)
- time_layout.addWidget(self.time_canvas)
- self.time_group.setLayout(time_layout)
- plots_layout.addWidget(self.time_group, stretch=1)
- # Правый блок (спектр - с учётом фильтра, если включен)
- self.freq_group = QGroupBox("Спектр")
- freq_layout = QVBoxLayout()
- self.freq_canvas = MplCanvas(self, width=5, height=4)
- freq_layout.addWidget(self.freq_canvas)
- self.freq_group.setLayout(freq_layout)
- plots_layout.addWidget(self.freq_group, stretch=1)
- main_layout.addLayout(plots_layout)
- # ======== Второй ряд для демодулированных данных ========
- demod_layout = QHBoxLayout()
- # График во временной области после демодуляции/децимации
- self.demod_time_group = QGroupBox("Временная зависимость (после демодуляции и децимации)")
- demod_time_layout = QVBoxLayout()
- self.demod_time_canvas = MplCanvas(self, width=5, height=4)
- demod_time_layout.addWidget(self.demod_time_canvas)
- self.demod_time_group.setLayout(demod_time_layout)
- demod_layout.addWidget(self.demod_time_group, stretch=1)
- main_layout.addLayout(demod_layout)
- # Инициализируем переменные для хранения данных
- self.csv_data = None # Исходные данные из CSV (numpy-массив)
- self.demod_data = None # Данные после демодуляции/децимации
- self.sampling_rate_ns = 13 # Шаг во времени 13 нс
- self.loaded_file_path = None
- # Показываем окно
- self.resize(1400, 800)
- self.show()
- def load_csv(self):
- """
- Загрузка CSV-файла и заполнение списка каналов.
- Учтён случай, когда в конце строк есть лишняя запятая.
- """
- file_dialog = QFileDialog(self, "Выберите CSV-файл", ".", "CSV Files (*.csv)")
- if file_dialog.exec():
- file_path = file_dialog.selectedFiles()[0]
- else:
- return
- if not file_path:
- return
- try:
- # Открываем файл и обрезаем лишнюю запятую в конце строки
- with open(file_path, 'r', encoding='utf-8') as f:
- cleaned_lines = []
- for line in f:
- line = line.strip() # Убираем пробелы и \n
- if line.endswith(','): # Если строка заканчивается запятой
- line = line[:-1] # Удаляем её
- cleaned_lines.append(line)
- # Передаём полученные «чистые» строки в loadtxt
- loaded_array = np.loadtxt(cleaned_lines, delimiter=',')
-
- # Если массив одномерный (ndim=1), превращаем в (N, 1)
- if loaded_array.ndim == 1:
- loaded_array = loaded_array.reshape(-1, 1)
-
- # Сохраняем в self.csv_data
- self.csv_data = loaded_array
-
- # Определяем число столбцов
- num_cols = self.csv_data.shape[1]
- # Заполняем список каналов
- self.channels_list.clear()
- for i in range(num_cols):
- channel = string.ascii_uppercase[i]
- item = QListWidgetItem(f"Канал {channel}")
- # Пусть у первого канала будет Checked по умолчанию
- item.setCheckState(Qt.Checked if i == 0 else Qt.Unchecked)
- self.channels_list.addItem(item)
- self.loaded_file_path = file_path
- QMessageBox.information(self, "Загрузка CSV", f"Файл {file_path} успешно загружен.")
- except Exception as e:
- QMessageBox.warning(self, "Ошибка", f"Не удалось загрузить файл: {e}")
- self.csv_data = None
- def get_time_slice_indices(self):
- """
- Вспомогательный метод: получить start_idx, end_idx из полей ввода,
- с учётом sampling_rate_ns и размеров массива csv_data.
- """
- if self.csv_data is None:
- return None, None
- try:
- start_ns = float(self.start_time_input.text())
- end_ns = float(self.end_time_input.text())
- if end_ns <= start_ns:
- raise ValueError("end_ns must be > start_ns")
- except ValueError:
- return None, None
- dt = self.sampling_rate_ns
- start_idx = int(round(start_ns / dt))
- end_idx = int(round(end_ns / dt))
- if start_idx < 0:
- start_idx = 0
- if end_idx > self.csv_data.shape[0]:
- end_idx = self.csv_data.shape[0]
- if start_idx >= end_idx:
- return None, None
- return start_idx, end_idx
- def get_selected_channels(self):
- """
- Вспомогательный метод: получить индексы выбранных каналов.
- """
- selected_channels = []
- for i in range(self.channels_list.count()):
- item = self.channels_list.item(i)
- if item.checkState() == Qt.Checked:
- selected_channels.append(i)
- return selected_channels
- # ----------------------------------------------------------------------------
- # Вспомогательные методы фильтрации
- # ----------------------------------------------------------------------------
- def moving_average_filter(self, data, window_size=25):
- """
- Фильтр скользящего среднего.
- data.shape = (N, num_channels)
- Возвращаем копию data (float) с отфильтрованными значениями.
- """
- filtered = np.zeros_like(data, dtype=float)
- kernel = np.ones(window_size) / window_size
- for ch in range(data.shape[1]):
- y = data[:, ch]
- y_filt = np.convolve(y, kernel, mode='same')
- filtered[:, ch] = y_filt
- return filtered
- def bandpass_filter_fft(self, data, dt_s, f_low_hz, f_high_hz):
- """
- Пример полосового фильтра в частотной области:
- обнуляем все компоненты спектра, которые лежат вне диапазона [f_low_hz, f_high_hz].
-
- data.shape = (N, num_channels)
- dt_s — шаг по времени, с
- f_low_hz, f_high_hz — границы полосы (Гц)
- Возвращаем отфильтрованный сигнал (float).
- """
- N = data.shape[0]
- num_ch = data.shape[1]
- filtered = np.zeros_like(data, dtype=float)
- # Частоты для FFT
- freqs = np.fft.fftfreq(N, d=dt_s)
- for ch in range(num_ch):
- y = data[:, ch]
- Y = np.fft.fft(y)
- # Создаём маску, где частоты внутри [f_low_hz, f_high_hz] (по модулю)
- mask = (np.abs(freqs) >= f_low_hz) & (np.abs(freqs) <= f_high_hz)
- # Обнулим всё, что вне маски
- Y[~mask] = 0.0
- # Обратное преобразование
- y_filt = np.fft.ifft(Y)
- filtered[:, ch] = np.real(y_filt) # сигнал считаем вещественным
- return filtered
- def apply_filters_if_needed(self, data, dt_s):
- """
- Последовательно применяем фильтры, если соответствующие чекбоксы включены:
- 1) Полосовой фильтр (bandpass)
- 2) Фильтр скользящего среднего (moving average)
- """
- # data.shape = (N, num_channels)
- out = data.copy()
- # 1) Проверяем, включён ли чекбокс "Применять полосовой фильтр"
- if self.use_bandpass_checkbox.isChecked():
- try:
- f_low_mhz = float(self.bandpass_low_input.text())
- f_high_mhz = float(self.bandpass_high_input.text())
- if f_high_mhz <= f_low_mhz:
- raise ValueError
- except ValueError:
- QMessageBox.warning(self, "Ошибка", "Некорректные границы полосового фильтра.")
- # Если ошибка, не делаем полосовой фильтр
- else:
- f_low_hz = f_low_mhz * 1e6
- f_high_hz = f_high_mhz * 1e6
- out = self.bandpass_filter_fft(out, dt_s, f_low_hz, f_high_hz)
- # 2) Проверяем, включён ли чекбокс "Применять фильтрацию (скользящее среднее)"
- if self.use_moving_average_checkbox.isChecked():
- out = self.moving_average_filter(out, window_size=25)
- return out
- # ----------------------------------------------------------------------------
- # Основные методы: построение во времени, в спектре, демодуляция
- # ----------------------------------------------------------------------------
- def plot_time_domain(self):
- """
- Построение временной зависимости выбранных каналов (с учётом фильтрации, если включена).
- """
- if self.csv_data is None:
- QMessageBox.warning(self, "Ошибка", "Сперва загрузите CSV-файл.")
- return
- start_idx, end_idx = self.get_time_slice_indices()
- if start_idx is None or end_idx is None:
- QMessageBox.warning(self, "Ошибка", "Некорректный интервал времени.")
- return
- selected_channels = self.get_selected_channels()
- if not selected_channels:
- QMessageBox.warning(self, "Ошибка", "Выберите хотя бы один канал для построения.")
- return
- # Срезаем нужные данные
- raw_cropped = self.csv_data[start_idx:end_idx, :]
- # Нормируем к вольтам
- raw_cropped_v = raw_cropped / 32767.0 * 5.0
- # Применяем фильтры (если чекбоксы включены)
- dt_s = self.sampling_rate_ns * 1e-9
- data_for_plot = self.apply_filters_if_needed(raw_cropped_v, dt_s)
- # Очищаем ось
- self.time_canvas.ax.clear()
- dt = self.sampling_rate_ns
- time_axis_ns = np.arange(start_idx, end_idx) * dt # в нс
- for ch in selected_channels:
- y = data_for_plot[:, ch]
- self.time_canvas.ax.plot(time_axis_ns, y, label=f"Канал {ch+1}")
- self.time_canvas.ax.set_xlabel("Время (нс)")
- self.time_canvas.ax.set_ylabel("Амплитуда (В)")
- self.time_canvas.ax.legend()
- self.time_canvas.ax.grid(True)
- self.time_canvas.draw()
- def plot_spectrum(self):
- """
- Построение спектра для выбранных каналов (с учётом фильтрации).
- """
- if self.csv_data is None:
- QMessageBox.warning(self, "Ошибка", "Сперва загрузите CSV-файл.")
- return
- # Считываем центральную частоту (МГц) — чтобы отметить её на графике
- try:
- fc_mhz = float(self.fc_input.text())
- except ValueError:
- fc_mhz = 0.0
- start_idx, end_idx = self.get_time_slice_indices()
- if start_idx is None or end_idx is None:
- QMessageBox.warning(self, "Ошибка", "Некорректный интервал времени.")
- return
- selected_channels = self.get_selected_channels()
- if not selected_channels:
- QMessageBox.warning(self, "Ошибка", "Выберите хотя бы один канал для построения.")
- return
- # Срезаем нужные данные
- raw_cropped = self.csv_data[start_idx:end_idx, :]
- # Нормируем к вольтам
- raw_cropped_v = raw_cropped / 32767.0 * 5.0
- # Применяем фильтры (если нужно)
- dt_s = self.sampling_rate_ns * 1e-9
- data_for_fft = self.apply_filters_if_needed(raw_cropped_v, dt_s)
- # Очищаем ось
- self.freq_canvas.ax.clear()
- # Параметры для FFT
- n_win = data_for_fft.shape[0] # число точек
- freqs = np.fft.fftfreq(n_win, d=dt_s) # массив частот (в Гц)
- freqs_mhz = freqs / 1e6 # переведём в МГц
- for ch in selected_channels:
- y = data_for_fft[:, ch]
- # FFT
- Y = np.fft.fft(y)
- spectrum = np.abs(Y)
- self.freq_canvas.ax.plot(freqs_mhz, spectrum, label=f"Канал {ch+1}")
- # Добавим вертикальную линию на центральной частоте (если нужно)
- if fc_mhz != 0.0:
- self.freq_canvas.ax.axvline(x=fc_mhz, color='red', linestyle='--', label="Центр")
- self.freq_canvas.ax.set_xlabel("Частота (МГц)")
- self.freq_canvas.ax.set_ylabel("Амплитуда спектра (отн. ед.)")
- self.freq_canvas.ax.legend()
- self.freq_canvas.ax.grid(True)
- self.freq_canvas.draw()
- def demodulate_and_decimate(self):
- """
- Демодуляция (IQ-смещение) на центральную частоту fc_mhz и децимация
- для выбранных каналов, исходя из заданного интервала [start_ns, end_ns].
- При этом берём либо исходный, либо уже отфильтрованный сигнал.
- Результат во временной области (I-компонента) выводим на self.demod_time_canvas.
- """
- if self.csv_data is None:
- QMessageBox.warning(self, "Ошибка", "Сперва загрузите CSV-файл.")
- return
- # Считываем центральную частоту (МГц)
- try:
- fc_mhz = float(self.fc_input.text())
- except ValueError:
- fc_mhz = 0.0
- # Считываем коэффициент децимации
- try:
- dec_factor = int(self.dec_factor_input.text())
- if dec_factor < 1:
- raise ValueError
- except ValueError:
- QMessageBox.warning(self, "Ошибка", "Некорректный коэффициент децимации.")
- return
- start_idx, end_idx = self.get_time_slice_indices()
- if start_idx is None or end_idx is None:
- QMessageBox.warning(self, "Ошибка", "Некорректный интервал времени.")
- return
- selected_channels = self.get_selected_channels()
- if not selected_channels:
- QMessageBox.warning(self, "Ошибка", "Выберите хотя бы один канал для обработки.")
- return
- # Берём нужный срез по времени, нормируем к вольтам
- cropped_data = self.csv_data[start_idx:end_idx, :]
- cropped_data_v = cropped_data / 32767.0 * 5.0
- # Применяем фильтры (если включены чекбоксы)
- dt_s = self.sampling_rate_ns * 1e-9
- data_for_demod = self.apply_filters_if_needed(cropped_data_v, dt_s)
- # Рассчитаем временные метки (в секундах) для исходного сигнала
- n_win = data_for_demod.shape[0]
- t_vec = np.arange(n_win) * dt_s
- # Переводим частоту в Гц
- fc_hz = fc_mhz * 1e6
- # Создадим массив для результирующего сигнала (комплексный)
- self.demod_data = np.zeros((n_win, len(selected_channels)), dtype=np.complex64)
- # Для каждой выбранной колонки — демодулируем
- for idx, ch in enumerate(selected_channels):
- y = data_for_demod[:, ch]
- # Умножение на e^-j(2π fc t) для смещения
- mixer = np.exp(-1j * 2 * np.pi * fc_hz * t_vec)
- y_demod = y * mixer
- self.demod_data[:, idx] = y_demod
- # Далее простейшая децимация (прореживание) — без доп. фильтра
- self.demod_data = self.demod_data[::dec_factor, :]
- # Очищаем ось для отображения демодулированного сигнала
- self.demod_time_canvas.ax.clear()
- # Создадим новую временную ось для децимированного сигнала
- decimated_dt_s = dt_s * dec_factor
- dec_time_vec_ns = (np.arange(self.demod_data.shape[0]) * decimated_dt_s) * 1e9 # в нс
- # Построим только I-компоненту для каждого канала
- for idx, ch in enumerate(selected_channels):
- y_i = np.real(self.demod_data[:, idx])
- self.demod_time_canvas.ax.plot(dec_time_vec_ns, y_i, label=f"Канал {ch+1} (I)")
- self.demod_time_canvas.ax.set_xlabel("Время после децимации (нс)")
- self.demod_time_canvas.ax.set_ylabel("I-компонента (В)")
- self.demod_time_canvas.ax.legend()
- self.demod_time_canvas.ax.grid(True)
- self.demod_time_canvas.draw()
- QMessageBox.information(
- self, "Демодуляция и децимация",
- f"Демодуляция на {fc_mhz} МГц и децимация (factor={dec_factor}) выполнены."
- )
- def main():
- app = QApplication(sys.argv)
- window = MainWindow()
- sys.exit(app.exec())
- if __name__ == "__main__":
- main()
|