|
- import sys
- import numpy as np
- import string
- from PySide6.QtWidgets import (
- QApplication, QMainWindow, QWidget, QDockWidget,
- QVBoxLayout, QHBoxLayout,
- QPushButton, QLabel, QLineEdit, QFileDialog,
- QListWidget, QListWidgetItem, QGroupBox, QMessageBox,
- QCheckBox
- )
- from PySide6.QtCore import Qt
- # Для встроенного тулбара matplotlib
- from matplotlib.backends.backend_qtagg import NavigationToolbar2QT as NavigationToolbar
- from scipy.io import savemat # <-- Импорт для .mat сохранения
- from mpl_canvas import MplCanvas
- from plotting import (
- plot_time_domain,
- plot_spectrum,
- plot_demodulated_time_domain
- )
- class MainWindow(QMainWindow):
- def __init__(self):
- super().__init__()
- self.csv_data = None
- self.demod_data = None
- self.demod_time_ns = None
- self.sampling_rate_ns = 13
- self.loaded_file_path = None
- self.init_ui()
- def init_ui(self):
- self.setWindowTitle("Demo: Signal / Spectrum / Demodulation plotting by Vladimir Pugovkin")
- central_widget = QWidget()
- self.setCentralWidget(central_widget)
- main_layout = QVBoxLayout(central_widget)
- # ------------ Первая строка с кнопками -----------
- top_bar_1 = QHBoxLayout()
- top_bar_1.setSpacing(8)
- self.load_btn = QPushButton("Load CSV")
- self.load_btn.setFixedWidth(130)
- self.load_btn.clicked.connect(self.load_csv)
- top_bar_1.addWidget(self.load_btn)
- # Чекбокс для bandpass
- self.use_bandpass_checkbox = QCheckBox("Bandpass Filter")
- top_bar_1.addWidget(self.use_bandpass_checkbox)
- bandpass_label_low = QLabel("Lower freq (MHz):")
- self.bandpass_low_input = QLineEdit("10")
- self.bandpass_low_input.setFixedWidth(60)
- bandpass_label_high = QLabel("Upper freq (MHz):")
- self.bandpass_high_input = QLineEdit("50")
- self.bandpass_high_input.setFixedWidth(60)
- top_bar_1.addWidget(bandpass_label_low)
- top_bar_1.addWidget(self.bandpass_low_input)
- top_bar_1.addWidget(bandpass_label_high)
- top_bar_1.addWidget(self.bandpass_high_input)
- top_bar_1.setSpacing(15)
- self.plot_time_btn = QPushButton("Plot Time Domain")
- self.plot_time_btn.setFixedWidth(180)
- self.plot_time_btn.clicked.connect(self.plot_time_domain)
- top_bar_1.addWidget(self.plot_time_btn)
- # --- Дополнительные поля для спектра ---
- freq_range_label = QLabel("Spectrum range (MHz):")
- self.freq_min_input = QLineEdit("-100")
- self.freq_min_input.setFixedWidth(60)
- self.freq_max_input = QLineEdit("100")
- self.freq_max_input.setFixedWidth(60)
- top_bar_1.addWidget(freq_range_label)
- top_bar_1.addWidget(self.freq_min_input)
- top_bar_1.addWidget(self.freq_max_input)
- # Убрали чекбокс "Shift spectrum to 0?" так как он не нужен
- # Изменили текст чекбокса "Plot demod data?" -> "Plot demod data"
- self.plot_demod_checkbox = QCheckBox("Plot demod data")
- top_bar_1.addWidget(self.plot_demod_checkbox)
- self.plot_spectrum_btn = QPushButton("Plot Spectrum")
- self.plot_spectrum_btn.setFixedWidth(180)
- self.plot_spectrum_btn.clicked.connect(self.plot_spectrum)
- top_bar_1.addWidget(self.plot_spectrum_btn)
- # ---------------- Кнопка экспорта в .mat -------------
- self.export_btn = QPushButton("Export to .mat")
- self.export_btn.setFixedWidth(180)
- self.export_btn.clicked.connect(self.export_data_to_mat)
- top_bar_1.addWidget(self.export_btn)
- top_bar_1.addStretch()
- main_layout.addLayout(top_bar_1)
- # ------------ Вторая строка с элементами -----------
- top_bar_2 = QHBoxLayout()
- top_bar_2.setSpacing(15)
- label_fc = QLabel("Central freq (MHz):")
- self.fc_input = QLineEdit("0")
- self.fc_input.setFixedWidth(60)
- label_st = QLabel("Start time (ns):")
- self.start_time_input = QLineEdit("0")
- self.start_time_input.setFixedWidth(70)
- label_end = QLabel("End time (ns):")
- self.end_time_input = QLineEdit("1000")
- self.end_time_input.setFixedWidth(70)
- label_dec = QLabel("Decimation factor:")
- self.decimation_factor_input = QLineEdit("4")
- self.decimation_factor_input.setFixedWidth(60)
- top_bar_2.addWidget(label_fc)
- top_bar_2.addWidget(self.fc_input)
- top_bar_2.addSpacing(5)
- top_bar_2.addWidget(label_st)
- top_bar_2.addWidget(self.start_time_input)
- top_bar_2.addWidget(label_end)
- top_bar_2.addWidget(self.end_time_input)
- top_bar_2.addSpacing(5)
- top_bar_2.addWidget(label_dec)
- top_bar_2.addWidget(self.decimation_factor_input)
- self.demod_dec_btn = QPushButton("Demodulation + Decimation")
- self.demod_dec_btn.setFixedWidth(300)
- self.demod_dec_btn.clicked.connect(self.demodulate_and_decimate)
- top_bar_2.addWidget(self.demod_dec_btn)
- top_bar_2.addStretch()
- main_layout.addLayout(top_bar_2)
- # ============ Блоки с графиками (Time, Spectrum, Demod) =============
- plot_row_1 = QHBoxLayout()
- # --- Группа "Time Domain" ---
- self.time_group = QGroupBox("Time Domain")
- time_layout = QVBoxLayout(self.time_group)
- self.time_canvas = MplCanvas(self, width=5, height=4)
- time_layout.addWidget(self.time_canvas)
- # Добавляем тулбар:
- self.time_toolbar = NavigationToolbar(self.time_canvas, self.time_group)
- time_layout.addWidget(self.time_toolbar)
- # --- Группа "Spectrum" ---
- self.freq_group = QGroupBox("Spectrum")
- freq_layout = QVBoxLayout(self.freq_group)
- self.freq_canvas = MplCanvas(self, width=5, height=4)
- freq_layout.addWidget(self.freq_canvas)
- self.freq_toolbar = NavigationToolbar(self.freq_canvas, self.freq_group)
- freq_layout.addWidget(self.freq_toolbar)
- plot_row_1.addWidget(self.time_group, stretch=1)
- plot_row_1.addWidget(self.freq_group, stretch=1)
- main_layout.addLayout(plot_row_1)
- # --- Вторая строка с графиком демодулированного сигнала ---
- plot_row_2 = QHBoxLayout()
- self.demod_group = QGroupBox("Demodulated Signal")
- demod_layout = QVBoxLayout(self.demod_group)
- self.demod_time_canvas = MplCanvas(self, width=5, height=3)
- demod_layout.addWidget(self.demod_time_canvas)
- self.demod_toolbar = NavigationToolbar(self.demod_time_canvas, self.demod_group)
- demod_layout.addWidget(self.demod_toolbar)
- plot_row_2.addWidget(self.demod_group, stretch=1)
- main_layout.addLayout(plot_row_2)
- self.demod_group.raise_()
- # ============ DockWidget для выбора каналов ============
- dock = QDockWidget("Channels", self)
- dock.setFeatures(
- QDockWidget.DockWidgetFloatable
- | QDockWidget.DockWidgetMovable
- | QDockWidget.DockWidgetVerticalTitleBar
- )
- self.channels_list = QListWidget()
- self.channels_list.setSelectionMode(QListWidget.MultiSelection)
- self.channels_list.setMaximumWidth(250)
- dock_widget = QWidget()
- vbox = QVBoxLayout(dock_widget)
- label_ch = QLabel("Select channels:")
- vbox.addWidget(label_ch)
- vbox.addWidget(self.channels_list)
- dock_widget.setLayout(vbox)
- dock.setWidget(dock_widget)
- self.addDockWidget(Qt.RightDockWidgetArea, dock)
- # ================== Стили ===================
- self.setStyleSheet("""
- QMainWindow {
- background-color: #FAFAFA;
- }
- QDockWidget {
- background-color: #F4F7F9;
- }
- QDockWidget::title {
- background-color: #34495E;
- color: white;
- padding: 8px;
- font-size: 16px;
- }
- QGroupBox {
- border: 2px solid #3498db;
- border-radius: 9px;
- margin-top: 30px;
- font-weight: bold;
- font-size: 20px;
- }
- QGroupBox::title {
- subcontrol-origin: margin;
- subcontrol-position: top center;
- padding: -5px 25px;
- font-size: 20px;
- }
- QPushButton {
- background-color: #3DAEE9;
- color: white;
- border-radius: 9px;
- padding: 6px 16px;
- font-size: 16px;
- }
- QPushButton:hover {
- background-color: #3592CC;
- }
- QPushButton:pressed {
- background-color: #2A7098;
- }
- QCheckBox {
- font-size: 18px;
- }
- QLineEdit {
- padding: 3px;
- font-size: 18px;
- }
- QLabel {
- font-size: 18px;
- }
- """)
- self.showMaximized()
- # ========== Загрузка CSV ==========
- def load_csv(self):
- file_dialog = QFileDialog(self, "Select .csv file", ".", "CSV Files (*.csv)")
- if file_dialog.exec():
- file_path = file_dialog.selectedFiles()[0]
- else:
- return
- if not file_path:
- return
- try:
- with open(file_path, 'r', encoding='utf-8') as f:
- cleaned_lines = []
- for line in f:
- line = line.strip()
- if line.endswith(','):
- line = line[:-1]
- cleaned_lines.append(line)
- loaded_array = np.loadtxt(cleaned_lines, delimiter=',')
- if loaded_array.ndim == 1:
- loaded_array = loaded_array.reshape(-1, 1)
- self.csv_data = loaded_array
- num_cols = self.csv_data.shape[1]
- self.channels_list.clear()
- for i in range(num_cols):
- channel_name = string.ascii_uppercase[i] if i < 26 else f"CH{i}"
- item = QListWidgetItem(f"Channel {channel_name}")
- if i == 0:
- item.setCheckState(Qt.Checked)
- else:
- item.setCheckState(Qt.Unchecked)
- self.channels_list.addItem(item)
- self.loaded_file_path = file_path
- QMessageBox.information(
- self, "File loaded", f"CSV file loaded:\n{file_path}"
- )
- except Exception as e:
- QMessageBox.warning(self, "Error loading", f"Could not load file:\n{e}")
- self.csv_data = None
- # ========== Определение интервала времени ==========
- def get_time_slice_indices(self):
- if self.csv_data is None:
- return None, None
- try:
- start_ns = float(self.start_time_input.text())
- end_ns = float(self.end_time_input.text())
- if end_ns <= start_ns:
- raise ValueError("end <= start")
- except ValueError:
- return None, None
- dt_ns = self.sampling_rate_ns
- start_idx = int(round(start_ns / dt_ns))
- end_idx = int(round(end_ns / dt_ns))
- if start_idx < 0:
- start_idx = 0
- if end_idx > self.csv_data.shape[0]:
- end_idx = self.csv_data.shape[0]
- if start_idx >= end_idx:
- return None, None
- return start_idx, end_idx
- # ========== Выбор каналов ==========
- def get_selected_channels(self):
- indices = []
- for i in range(self.channels_list.count()):
- item = self.channels_list.item(i)
- if item.checkState() == Qt.Checked:
- indices.append(i)
- return indices
- # ========== Полосовой фильтр через FFT ==========
- def bandpass_filter_fft(self, data, dt_s, f_low_hz, f_high_hz):
- N = data.shape[0]
- num_ch = data.shape[1]
- filtered = np.zeros_like(data, dtype=float)
- freqs = np.fft.fftfreq(N, d=dt_s)
- for ch in range(num_ch):
- y = data[:, ch]
- Y = np.fft.fft(y)
- # Создаём маску в нужном диапазоне
- mask = (np.abs(freqs) >= f_low_hz) & (np.abs(freqs) <= f_high_hz)
- Y[~mask] = 0.0
- y_filt = np.fft.ifft(Y)
- filtered[:, ch] = np.real(y_filt)
- return filtered
- # ========== Применить фильтр при необходимости ==========
- def apply_bandpass_if_needed(self, data, dt_s):
- out = data.copy()
- if self.use_bandpass_checkbox.isChecked():
- try:
- f_low_mhz = float(self.bandpass_low_input.text())
- f_high_mhz = float(self.bandpass_high_input.text())
- if f_high_mhz <= f_low_mhz:
- raise ValueError
- except ValueError:
- QMessageBox.warning(self, "Bandpass Error", "Incorrect bandpass frequencies.")
- else:
- f_low_hz = f_low_mhz * 1e6
- f_high_hz = f_high_mhz * 1e6
- out = self.bandpass_filter_fft(out, dt_s, f_low_hz, f_high_hz)
- return out
- # ========== Построение временной области ==========
- def plot_time_domain(self):
- if self.csv_data is None:
- QMessageBox.warning(self, "Error", "No CSV loaded.")
- return
- start_idx, end_idx = self.get_time_slice_indices()
- if start_idx is None or end_idx is None:
- QMessageBox.warning(self, "Error", "Invalid time interval.")
- return
- selected_channels = self.get_selected_channels()
- if not selected_channels:
- QMessageBox.warning(self, "Error", "No channels selected.")
- return
- raw_cropped = self.csv_data[start_idx:end_idx, :]
- # Перевод в вольты
- raw_cropped_v = raw_cropped / 32767.0 * 5.0
- dt_s = self.sampling_rate_ns * 1e-9
- data_for_plot = self.apply_bandpass_if_needed(raw_cropped_v, dt_s)
- self.time_canvas.ax.clear()
- time_axis_ns = np.arange(start_idx, end_idx) * self.sampling_rate_ns
- channel_labels = [
- f"Channel {string.ascii_uppercase[ch] if ch<26 else ch}"
- for ch in selected_channels
- ]
- plot_time_domain(
- axis=self.time_canvas.ax,
- time_axis_ns=time_axis_ns,
- data=data_for_plot[:, selected_channels],
- channels=channel_labels
- )
- self.time_canvas.draw()
- # ========== Построение спектра ==========
- def plot_spectrum(self):
- if self.csv_data is None:
- QMessageBox.warning(self, "Error", "No CSV loaded.")
- return
- try:
- frequency_mhz = float(self.fc_input.text())
- except ValueError:
- frequency_mhz = 0.0
- try:
- freq_min = float(self.freq_min_input.text())
- freq_max = float(self.freq_max_input.text())
- except ValueError:
- freq_min, freq_max = None, None
- start_idx, end_idx = self.get_time_slice_indices()
- if start_idx is None or end_idx is None:
- QMessageBox.warning(self, "Error", "Invalid time interval.")
- return
- selected_channels = self.get_selected_channels()
- if not selected_channels:
- QMessageBox.warning(self, "Error", "No channels selected.")
- return
- # Если включён режим построения спектра демодулированных данных
- if self.plot_demod_checkbox.isChecked():
- if self.demod_data is None or self.demod_time_ns is None:
- QMessageBox.warning(self, "Error", "No demodulated data. Please demodulate first.")
- return
- decimation_factor = self.get_decimation_factor()
- if not decimation_factor:
- return
- dt_s = (self.sampling_rate_ns * 1e-9) * decimation_factor
- data_for_fft = self.demod_data[:, selected_channels]
- n_win = data_for_fft.shape[0]
- freq_axis_mhz = np.fft.fftfreq(n_win, d=dt_s) / 1e6
- self.freq_canvas.ax.clear()
- channel_labels = [
- f"Channel {string.ascii_uppercase[ch] if ch<26 else ch}"
- for ch in selected_channels
- ]
- plot_spectrum(
- axis=self.freq_canvas.ax,
- frequency=freq_axis_mhz,
- data=data_for_fft,
- channels=channel_labels,
- middle_frequency=0.0 # демодулированные данные находятся в базовой полосе
- )
- if freq_min is not None and freq_max is not None and freq_max > freq_min:
- self.freq_canvas.ax.set_xlim([freq_min, freq_max])
- self.freq_canvas.draw()
- else:
- raw_cropped = self.csv_data[start_idx:end_idx, :]
- raw_cropped_v = raw_cropped / 32767.0 * 5.0
- dt_s = self.sampling_rate_ns * 1e-9
- data_for_fft = self.apply_bandpass_if_needed(raw_cropped_v, dt_s)
- n_win = data_for_fft.shape[0]
- freq_axis_mhz = np.fft.fftfreq(n_win, d=dt_s) / 1e6
- self.freq_canvas.ax.clear()
- channel_labels = [
- f"Channel {string.ascii_uppercase[ch] if ch<26 else ch}"
- for ch in selected_channels
- ]
- plot_spectrum(
- axis=self.freq_canvas.ax,
- frequency=freq_axis_mhz,
- data=data_for_fft[:, selected_channels],
- channels=channel_labels,
- middle_frequency=frequency_mhz # центральная частота берётся из поля
- )
- if freq_min is not None and freq_max is not None and freq_max > freq_min:
- self.freq_canvas.ax.set_xlim([freq_min, freq_max])
- self.freq_canvas.draw()
- def get_decimation_factor(self):
- try:
- decimation_factor = int(self.decimation_factor_input.text())
- if decimation_factor < 1:
- raise ValueError
- return decimation_factor
- except ValueError:
- QMessageBox.warning(self, "Error", "Invalid decimation factor.")
- return None
- # ========== Демодуляция и децимация ==========
- def demodulate_and_decimate(self):
- if self.csv_data is None:
- QMessageBox.warning(self, "Error", "No CSV loaded.")
- return
- try:
- frequency_mhz = float(self.fc_input.text())
- except ValueError:
- frequency_mhz = 0.0
- decimation_factor = self.get_decimation_factor()
- if not decimation_factor:
- return
- start_idx, end_idx = self.get_time_slice_indices()
- if start_idx is None or end_idx is None:
- QMessageBox.warning(self, "Error", "Invalid time interval.")
- return
- selected_channels = self.get_selected_channels()
- if not selected_channels:
- QMessageBox.warning(self, "Error", "Select at least one channel.")
- return
- cropped_data = self.csv_data[start_idx:end_idx, :]
- cropped_data_v = cropped_data / 32767.0 * 5.0
- dt_s = self.sampling_rate_ns * 1e-9
- data_for_demod = self.apply_bandpass_if_needed(cropped_data_v, dt_s)
- n_win = data_for_demod.shape[0]
- t_vec = np.arange(n_win) * dt_s
- fc_hz = frequency_mhz * 1e6
- # Демодуляция: перенос на нулевую частоту
- self.demod_data = np.zeros((n_win, len(selected_channels)), dtype=np.complex64)
- for idx, ch in enumerate(selected_channels):
- y = data_for_demod[:, ch]
- mixer = np.exp(-1j * 2 * np.pi * fc_hz * t_vec)
- self.demod_data[:, idx] = y * mixer
- # Децимация
- self.demod_data = self.demod_data[::decimation_factor, :]
- decimated_dt_s = dt_s * decimation_factor
- self.demod_time_ns = np.arange(self.demod_data.shape[0]) * decimated_dt_s * 1e9
- # Рисуем демодулированный сигнал (I-компонент)
- self.demod_time_canvas.ax.clear()
- channel_labels = [
- f"Channel {string.ascii_uppercase[ch] if ch<26 else ch}"
- for ch in selected_channels
- ]
- plot_demodulated_time_domain(
- ax=self.demod_time_canvas.ax,
- time_ns=self.demod_time_ns,
- demod_data=self.demod_data,
- selected_channels=channel_labels
- )
- self.demod_time_canvas.draw()
- QMessageBox.information(
- self, "Demodulation",
- f"Demod on {frequency_mhz} MHz + decim x{decimation_factor} done!"
- )
- # ========== Экспорт в .mat ==========
- def export_data_to_mat(self):
- if self.demod_data is None or self.demod_time_ns is None:
- QMessageBox.warning(self, "Error", "No demodulated data.")
- return
- file_dialog = QFileDialog(self, "Save as .mat", ".", "MAT files (*.mat)")
- file_dialog.setAcceptMode(QFileDialog.AcceptSave)
- file_dialog.setDefaultSuffix("mat")
- if file_dialog.exec():
- save_path = file_dialog.selectedFiles()[0]
- else:
- return
- if not save_path:
- return
- try:
- savemat(save_path, {
- 'time_ns': self.demod_time_ns,
- 'demod_data': self.demod_data
- })
- QMessageBox.information(self, "Export", f"Saved:\n{save_path}")
- except Exception as e:
- QMessageBox.warning(self, "Error saving", f"Could not save:\n{e}")
- def main():
- app = QApplication(sys.argv)
- window = MainWindow()
- window.show()
- sys.exit(app.exec())
- main()
|