from __future__ import annotations import json import math import os from datetime import datetime from types import SimpleNamespace from typing import Dict, Tuple from PyQt6.QtCore import QTimer from PyQt6.QtWidgets import QDoubleSpinBox, QFileDialog, QMessageBox, QSpinBox try: from test_seqgen.seqgen_STEAM import seqgen_STEAM _SEQGEN_IMPORT_ERROR = "" except Exception as _e: seqgen_STEAM = None _SEQGEN_IMPORT_ERROR = str(_e) class SteamTabMixin: def _steam_gradient_limits( self, g_amp_max_mtm: float, g_slew_max_tms: float, grad_raster_time_us: float, ) -> Tuple[float, float, float, float, float]: gamma = 42.576e6 grad_raster_time = max(grad_raster_time_us * 1e-6, 1e-12) g_amp_max = g_amp_max_mtm * 1e-3 * gamma g_slew_max = max(g_slew_max_tms * gamma, 1e-12) tau_max = math.ceil((g_amp_max / g_slew_max) / grad_raster_time) * grad_raster_time return gamma, g_amp_max, g_slew_max, grad_raster_time, tau_max def _init_steam_tab(self) -> None: if self.steam is None: return self._cache_steam_default_ranges() self._cache_steam_label_bases() self._wire_steam_param_inputs() if not self.steam.leSteamOutput.text().strip(): self.steam.leSteamOutput.setText(f"STEAM_{datetime.now().strftime('%d%m%y_%H%M')}") if seqgen_STEAM is None: self.steam.lblSteamStatus.setText("seqgen_STEAM import failed. Check dependencies.") return QTimer.singleShot(0, self.steam_set_limits) def _cache_steam_default_ranges(self) -> None: if self.steam is None: return widgets = ( self.steam.sbSteamTExMs, self.steam.sbSteamTau1Ms, self.steam.sbSteamTau2Ms, self.steam.sbSteamTR, ) for w in widgets: self._steam_default_ranges[w.objectName()] = (float(w.minimum()), float(w.maximum())) def _cache_steam_label_bases(self) -> None: if self.steam is None: return labels = (self.steam.lblTau1, self.steam.lblTau2, self.steam.lblTR, self.steam.lblTEx) for lbl in labels: self._steam_label_bases[lbl.objectName()] = lbl.text() def _wire_steam_param_inputs(self) -> None: if self.steam is None: return for sb in self.steam.gbSeqParams.findChildren(QDoubleSpinBox): sb.valueChanged.connect(self._on_steam_param_changed) for sb in self.steam.gbSeqParams.findChildren(QSpinBox): sb.valueChanged.connect(self._on_steam_param_changed) def _on_steam_param_changed(self, *_args) -> None: if self.steam is None: return if self._steam_internal_update: return self._steam_params_dirty = True self.steam_set_limits(update_status=False, write_log=False) self._update_steam_seq_params_highlight() self.steam.lblSteamStatus.setText("Parameters changed. SeqGen not run yet.") def _update_steam_seq_params_highlight(self) -> None: if self.steam is None: return if self._steam_params_dirty: self.steam.gbSeqParams.setStyleSheet( "QGroupBox#gbSeqParams { background-color: #ABA8A7;} " "QGroupBox#gbSeqParams QLabel { color: #000000; } " ) else: self.steam.gbSeqParams.setStyleSheet("") def _recalculate_steam_param_ranges(self) -> None: if self.steam is None: return g_amp_max_mtm = float(self.steam.sbSteamGAmpMax.value()) g_slew_max_tms = float(self.steam.sbSteamGSlewMax.value()) grad_raster_time_us = float(self.steam.sbSteamGradRasterUs.value()) bw_per_point = float(self.steam.sbSteamBWPerPoint.value()) tau1_ms = float(self.steam.sbSteamTau1Ms.value()) tau2_ms = float(self.steam.sbSteamTau2Ms.value()) tr_s = float(self.steam.sbSteamTR.value()) t_ex_ms = float(self.steam.sbSteamTExMs.value()) _, _, _, _, tau_max = self._steam_gradient_limits( g_amp_max_mtm=g_amp_max_mtm, g_slew_max_tms=g_slew_max_tms, grad_raster_time_us=grad_raster_time_us, ) tau1_s = tau1_ms * 1e-3 tau2_s = tau2_ms * 1e-3 t_ex_s = t_ex_ms * 1e-3 grad_block_s = t_ex_s + 2.0 * tau_max adc_duration_s = 0.5 / max(bw_per_point, 1e-12) # Timing constraints from seqgen_STEAM: # tau1 >= grad_block, tau2 >= grad_block, TR >= 2*tau1 + tau2 + adc + 0.5*grad_block tau1_min_ms = grad_block_s * 1e3 tau2_min_ms = grad_block_s * 1e3 tr_min_s = 2.0 * tau1_s + tau2_s + adc_duration_s + 0.5 * grad_block_s # Upper bounds driven by current TR and paired delays. tau1_max_from_tr_ms = ((tr_s - tau2_s - adc_duration_s - 0.5 * grad_block_s) / 2.0) * 1e3 tau2_max_from_tr_ms = (tr_s - 2.0 * tau1_s - adc_duration_s - 0.5 * grad_block_s) * 1e3 tau1_def_min, tau1_def_max = self._steam_default_ranges.get( self.steam.sbSteamTau1Ms.objectName(), (float(self.steam.sbSteamTau1Ms.minimum()), float(self.steam.sbSteamTau1Ms.maximum())), ) tau2_def_min, tau2_def_max = self._steam_default_ranges.get( self.steam.sbSteamTau2Ms.objectName(), (float(self.steam.sbSteamTau2Ms.minimum()), float(self.steam.sbSteamTau2Ms.maximum())), ) tr_def_min, tr_def_max = self._steam_default_ranges.get( self.steam.sbSteamTR.objectName(), (float(self.steam.sbSteamTR.minimum()), float(self.steam.sbSteamTR.maximum())), ) tex_def_min, tex_def_max = self._steam_default_ranges.get( self.steam.sbSteamTExMs.objectName(), (float(self.steam.sbSteamTExMs.minimum()), float(self.steam.sbSteamTExMs.maximum())), ) tau1_min = max(tau1_def_min, tau1_min_ms) tau2_min = max(tau2_def_min, tau2_min_ms) tr_min = max(tr_def_min, tr_min_s) tau1_max = min(tau1_def_max, max(tau1_min, tau1_max_from_tr_ms)) tau2_max = min(tau2_def_max, max(tau2_min, tau2_max_from_tr_ms)) tex_max_from_taus_ms = min(tau1_s, tau2_s) * 1e3 - (2.0 * tau_max * 1e3) tex_min = tex_def_min tex_max = min(tex_def_max, max(tex_min, tex_max_from_taus_ms)) self._steam_internal_update = True try: self.steam.sbSteamTau1Ms.setMinimum(tau1_min) self.steam.sbSteamTau1Ms.setMaximum(tau1_max) self.steam.sbSteamTau2Ms.setMinimum(tau2_min) self.steam.sbSteamTau2Ms.setMaximum(tau2_max) self.steam.sbSteamTR.setMinimum(tr_min) self.steam.sbSteamTR.setMaximum(tr_def_max) self.steam.sbSteamTExMs.setMinimum(tex_min) self.steam.sbSteamTExMs.setMaximum(tex_max) self.steam.sbSteamTau1Ms.setToolTip(f"Range: {tau1_min:.6f} .. {tau1_max:.6f} ms") self.steam.sbSteamTau2Ms.setToolTip(f"Range: {tau2_min:.6f} .. {tau2_max:.6f} ms") self.steam.sbSteamTR.setToolTip(f"Range: {tr_min:.6f} .. {tr_def_max:.6f} s") self.steam.sbSteamTExMs.setToolTip(f"Range: {tex_min:.6f} .. {tex_max:.6f} ms") self.steam.lblTau1.setText( f"{self._steam_label_bases.get('lblTau1', 'tau1, ms')} [{tau1_min:.3f} .. {tau1_max:.3f}]" ) self.steam.lblTau2.setText( f"{self._steam_label_bases.get('lblTau2', 'tau2, ms')} [{tau2_min:.3f} .. {tau2_max:.3f}]" ) self.steam.lblTR.setText( f"{self._steam_label_bases.get('lblTR', 'TR, s')} [{tr_min:.3f} .. {tr_def_max:.3f}]" ) self.steam.lblTEx.setText( f"{self._steam_label_bases.get('lblTEx', 't_ex, ms')} [{tex_min:.3f} .. {tex_max:.3f}]" ) finally: self._steam_internal_update = False def steam_set_limits(self, update_status: bool = True, write_log: bool = True) -> None: if self.steam is None: return self._recalculate_steam_param_ranges() average = float(self.steam.sbAverage.value()) g_amp_max_mtm = float(self.steam.sbSteamGAmpMax.value()) g_slew_max_tms = float(self.steam.sbSteamGSlewMax.value()) grad_raster_time_us = float(self.steam.sbSteamGradRasterUs.value()) rf_raster_time_us = float(self.steam.sbSteamRfRasterUs.value()) rf_ringdown_us = float(self.steam.sbSteamRfRingdownUs.value()) rf_dead_us = float(self.steam.sbSteamRfDeadUs.value()) adc_dead_us = float(self.steam.sbSteamAdcDeadUs.value()) voxel_thkn_mm = float(self.steam.sbSteamVoxelThknMm.value()) bw_per_point = float(self.steam.sbSteamBWPerPoint.value()) n_point = int(self.steam.sbSteamNPoint.value()) tau1_ms = float(self.steam.sbSteamTau1Ms.value()) tau2_ms = float(self.steam.sbSteamTau2Ms.value()) tr = float(self.steam.sbSteamTR.value()) t_ex_ms = float(self.steam.sbSteamTExMs.value()) t_bw_product_ex = float(self.steam.sbSteamTBW.value()) flip_angle = float(self.steam.sbSteamFlipAngle.value()) apodization = float(self.steam.sbSteamApodization.value()) gamma, g_amp_max, g_slew_max, grad_raster_time, tau_max = self._steam_gradient_limits( g_amp_max_mtm=g_amp_max_mtm, g_slew_max_tms=g_slew_max_tms, grad_raster_time_us=grad_raster_time_us, ) rf_raster_time = rf_raster_time_us * 1e-6 rf_ringdown_time = rf_ringdown_us * 1e-6 rf_dead_time = rf_dead_us * 1e-6 adc_dead_time = adc_dead_us * 1e-6 voxel_thkn = voxel_thkn_mm * 1e-3 tau1 = tau1_ms * 1e-3 tau2 = tau2_ms * 1e-3 t_ex = t_ex_ms * 1e-3 bw_ex_pulse = t_bw_product_ex / t_ex self._steam_params = { "G_amp_max": g_amp_max, "G_slew_max": g_slew_max, "gamma": gamma, "grad_raster_time": grad_raster_time, "rf_raster_time": rf_raster_time, "t_ex": t_ex, "t_BW_product_ex": t_bw_product_ex, "FA": flip_angle, "apodization": apodization, "BW_ex_pulse": bw_ex_pulse, "rf_ringdown_time": rf_ringdown_time, "rf_dead_time": rf_dead_time, "adc_dead_time": adc_dead_time, "voxel_thkn": voxel_thkn, "N_point": n_point, "BW_per_point": bw_per_point, "dG": tau_max, "tau1": tau1, "tau2": tau2, "TR": tr, "average": average, } if update_status: self.steam.lblSteamStatus.setText("STEAM parameters updated.") if write_log: self.log("[STEAM] Parameters updated.") def steam_save_sequence(self) -> None: self._steam_save_sequence(save_as=False) def steam_save_sequence_as(self) -> None: self._steam_save_sequence(save_as=True) def _steam_default_sequences_dir(self) -> str: project_root = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) return os.path.join(project_root, "data", "sequences") def _steam_save_sequence(self, save_as: bool) -> None: if self.steam is None: return if seqgen_STEAM is None: msg = "Cannot import test_seqgen.seqgen_STEAM." if _SEQGEN_IMPORT_ERROR: msg = f"{msg}\n\nDetails: {_SEQGEN_IMPORT_ERROR}" QMessageBox.warning(self, "STEAM unavailable", msg) return self.steam_set_limits() default_base = self.steam.leSteamOutput.text().strip() or f"STEAM_{datetime.now().strftime('%d%m%y_%H%M')}" default_seq = f"{default_base}.seq" seq_dir = self._steam_default_sequences_dir() os.makedirs(seq_dir, exist_ok=True) seq_path = os.path.join(seq_dir, default_seq) if save_as: seq_path, _ = QFileDialog.getSaveFileName( self, "Save STEAM sequence as", seq_path, "Sequence (*.seq);;All files (*)", ) if not seq_path: return if not seq_path.lower().endswith(".seq"): seq_path += ".seq" json_path = os.path.splitext(seq_path)[0] + ".json" try: params = SimpleNamespace(**self._steam_params) seq = seqgen_STEAM(params) seq.write(seq_path) with open(json_path, "w", encoding="utf-8") as f: json.dump(self._steam_params, f, ensure_ascii=False, indent=2) self._steam_params_dirty = False self._update_steam_seq_params_highlight() self._sequence_file_path = seq_path self._refresh_current_sequence_ui() self.steam.leSteamOutput.setText(os.path.splitext(os.path.basename(seq_path))[0]) self.steam.lblSteamStatus.setText(f"Saved: {seq_path}") self.log(f"[STEAM] Saved sequence: {seq_path}") self.log(f"[STEAM] Saved params: {json_path}") except Exception as e: self.steam.lblSteamStatus.setText("Save failed.") QMessageBox.critical(self, "STEAM save error", str(e))