| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311 |
- from __future__ import annotations
- import json
- import math
- import os
- from datetime import datetime
- from types import SimpleNamespace
- from typing import Dict, Tuple
- from PyQt6.QtCore import QTimer
- from PyQt6.QtWidgets import QDoubleSpinBox, QFileDialog, QMessageBox, QSpinBox
- try:
- from test_seqgen.seqgen_STEAM import seqgen_STEAM
- _SEQGEN_IMPORT_ERROR = ""
- except Exception as _e:
- seqgen_STEAM = None
- _SEQGEN_IMPORT_ERROR = str(_e)
- class SteamTabMixin:
- def _steam_gradient_limits(
- self,
- g_amp_max_mtm: float,
- g_slew_max_tms: float,
- grad_raster_time_us: float,
- ) -> Tuple[float, float, float, float, float]:
- gamma = 42.576e6
- grad_raster_time = max(grad_raster_time_us * 1e-6, 1e-12)
- g_amp_max = g_amp_max_mtm * 1e-3 * gamma
- g_slew_max = max(g_slew_max_tms * gamma, 1e-12)
- tau_max = math.ceil((g_amp_max / g_slew_max) / grad_raster_time) * grad_raster_time
- return gamma, g_amp_max, g_slew_max, grad_raster_time, tau_max
- def _init_steam_tab(self) -> None:
- if self.steam is None:
- return
- self._cache_steam_default_ranges()
- self._cache_steam_label_bases()
- self._wire_steam_param_inputs()
- if not self.steam.leSteamOutput.text().strip():
- self.steam.leSteamOutput.setText(f"STEAM_{datetime.now().strftime('%d%m%y_%H%M')}")
- if seqgen_STEAM is None:
- self.steam.lblSteamStatus.setText("seqgen_STEAM import failed. Check dependencies.")
- return
- QTimer.singleShot(0, self.steam_set_limits)
- def _cache_steam_default_ranges(self) -> None:
- if self.steam is None:
- return
- widgets = (
- self.steam.sbSteamTExMs,
- self.steam.sbSteamTau1Ms,
- self.steam.sbSteamTau2Ms,
- self.steam.sbSteamTR,
- )
- for w in widgets:
- self._steam_default_ranges[w.objectName()] = (float(w.minimum()), float(w.maximum()))
- def _cache_steam_label_bases(self) -> None:
- if self.steam is None:
- return
- labels = (self.steam.lblTau1, self.steam.lblTau2, self.steam.lblTR, self.steam.lblTEx)
- for lbl in labels:
- self._steam_label_bases[lbl.objectName()] = lbl.text()
- def _wire_steam_param_inputs(self) -> None:
- if self.steam is None:
- return
- for sb in self.steam.gbSeqParams.findChildren(QDoubleSpinBox):
- sb.valueChanged.connect(self._on_steam_param_changed)
- for sb in self.steam.gbSeqParams.findChildren(QSpinBox):
- sb.valueChanged.connect(self._on_steam_param_changed)
- def _on_steam_param_changed(self, *_args) -> None:
- if self.steam is None:
- return
- if self._steam_internal_update:
- return
- self._steam_params_dirty = True
- self.steam_set_limits(update_status=False, write_log=False)
- self._update_steam_seq_params_highlight()
- self.steam.lblSteamStatus.setText("Parameters changed. SeqGen not run yet.")
- def _update_steam_seq_params_highlight(self) -> None:
- if self.steam is None:
- return
- if self._steam_params_dirty:
- self.steam.gbSeqParams.setStyleSheet(
- "QGroupBox#gbSeqParams { background-color: #ABA8A7;} "
- "QGroupBox#gbSeqParams QLabel { color: #000000; } "
- )
- else:
- self.steam.gbSeqParams.setStyleSheet("")
- def _recalculate_steam_param_ranges(self) -> None:
- if self.steam is None:
- return
- g_amp_max_mtm = float(self.steam.sbSteamGAmpMax.value())
- g_slew_max_tms = float(self.steam.sbSteamGSlewMax.value())
- grad_raster_time_us = float(self.steam.sbSteamGradRasterUs.value())
- bw_per_point = float(self.steam.sbSteamBWPerPoint.value())
- tau1_ms = float(self.steam.sbSteamTau1Ms.value())
- tau2_ms = float(self.steam.sbSteamTau2Ms.value())
- tr_s = float(self.steam.sbSteamTR.value())
- t_ex_ms = float(self.steam.sbSteamTExMs.value())
- _, _, _, _, tau_max = self._steam_gradient_limits(
- g_amp_max_mtm=g_amp_max_mtm,
- g_slew_max_tms=g_slew_max_tms,
- grad_raster_time_us=grad_raster_time_us,
- )
- tau1_s = tau1_ms * 1e-3
- tau2_s = tau2_ms * 1e-3
- t_ex_s = t_ex_ms * 1e-3
- grad_block_s = t_ex_s + 2.0 * tau_max
- adc_duration_s = 0.5 / max(bw_per_point, 1e-12)
- # Timing constraints from seqgen_STEAM:
- # tau1 >= grad_block, tau2 >= grad_block, TR >= 2*tau1 + tau2 + adc + 0.5*grad_block
- tau1_min_ms = grad_block_s * 1e3
- tau2_min_ms = grad_block_s * 1e3
- tr_min_s = 2.0 * tau1_s + tau2_s + adc_duration_s + 0.5 * grad_block_s
- # Upper bounds driven by current TR and paired delays.
- tau1_max_from_tr_ms = ((tr_s - tau2_s - adc_duration_s - 0.5 * grad_block_s) / 2.0) * 1e3
- tau2_max_from_tr_ms = (tr_s - 2.0 * tau1_s - adc_duration_s - 0.5 * grad_block_s) * 1e3
- tau1_def_min, tau1_def_max = self._steam_default_ranges.get(
- self.steam.sbSteamTau1Ms.objectName(),
- (float(self.steam.sbSteamTau1Ms.minimum()), float(self.steam.sbSteamTau1Ms.maximum())),
- )
- tau2_def_min, tau2_def_max = self._steam_default_ranges.get(
- self.steam.sbSteamTau2Ms.objectName(),
- (float(self.steam.sbSteamTau2Ms.minimum()), float(self.steam.sbSteamTau2Ms.maximum())),
- )
- tr_def_min, tr_def_max = self._steam_default_ranges.get(
- self.steam.sbSteamTR.objectName(),
- (float(self.steam.sbSteamTR.minimum()), float(self.steam.sbSteamTR.maximum())),
- )
- tex_def_min, tex_def_max = self._steam_default_ranges.get(
- self.steam.sbSteamTExMs.objectName(),
- (float(self.steam.sbSteamTExMs.minimum()), float(self.steam.sbSteamTExMs.maximum())),
- )
- tau1_min = max(tau1_def_min, tau1_min_ms)
- tau2_min = max(tau2_def_min, tau2_min_ms)
- tr_min = max(tr_def_min, tr_min_s)
- tau1_max = min(tau1_def_max, max(tau1_min, tau1_max_from_tr_ms))
- tau2_max = min(tau2_def_max, max(tau2_min, tau2_max_from_tr_ms))
- tex_max_from_taus_ms = min(tau1_s, tau2_s) * 1e3 - (2.0 * tau_max * 1e3)
- tex_min = tex_def_min
- tex_max = min(tex_def_max, max(tex_min, tex_max_from_taus_ms))
- self._steam_internal_update = True
- try:
- self.steam.sbSteamTau1Ms.setMinimum(tau1_min)
- self.steam.sbSteamTau1Ms.setMaximum(tau1_max)
- self.steam.sbSteamTau2Ms.setMinimum(tau2_min)
- self.steam.sbSteamTau2Ms.setMaximum(tau2_max)
- self.steam.sbSteamTR.setMinimum(tr_min)
- self.steam.sbSteamTR.setMaximum(tr_def_max)
- self.steam.sbSteamTExMs.setMinimum(tex_min)
- self.steam.sbSteamTExMs.setMaximum(tex_max)
- self.steam.sbSteamTau1Ms.setToolTip(f"Range: {tau1_min:.6f} .. {tau1_max:.6f} ms")
- self.steam.sbSteamTau2Ms.setToolTip(f"Range: {tau2_min:.6f} .. {tau2_max:.6f} ms")
- self.steam.sbSteamTR.setToolTip(f"Range: {tr_min:.6f} .. {tr_def_max:.6f} s")
- self.steam.sbSteamTExMs.setToolTip(f"Range: {tex_min:.6f} .. {tex_max:.6f} ms")
- self.steam.lblTau1.setText(
- f"{self._steam_label_bases.get('lblTau1', 'tau1, ms')} [{tau1_min:.3f} .. {tau1_max:.3f}]"
- )
- self.steam.lblTau2.setText(
- f"{self._steam_label_bases.get('lblTau2', 'tau2, ms')} [{tau2_min:.3f} .. {tau2_max:.3f}]"
- )
- self.steam.lblTR.setText(
- f"{self._steam_label_bases.get('lblTR', 'TR, s')} [{tr_min:.3f} .. {tr_def_max:.3f}]"
- )
- self.steam.lblTEx.setText(
- f"{self._steam_label_bases.get('lblTEx', 't_ex, ms')} [{tex_min:.3f} .. {tex_max:.3f}]"
- )
- finally:
- self._steam_internal_update = False
- def steam_set_limits(self, update_status: bool = True, write_log: bool = True) -> None:
- if self.steam is None:
- return
- self._recalculate_steam_param_ranges()
- average = float(self.steam.sbAverage.value())
- g_amp_max_mtm = float(self.steam.sbSteamGAmpMax.value())
- g_slew_max_tms = float(self.steam.sbSteamGSlewMax.value())
- grad_raster_time_us = float(self.steam.sbSteamGradRasterUs.value())
- rf_raster_time_us = float(self.steam.sbSteamRfRasterUs.value())
- rf_ringdown_us = float(self.steam.sbSteamRfRingdownUs.value())
- rf_dead_us = float(self.steam.sbSteamRfDeadUs.value())
- adc_dead_us = float(self.steam.sbSteamAdcDeadUs.value())
- voxel_thkn_mm = float(self.steam.sbSteamVoxelThknMm.value())
- bw_per_point = float(self.steam.sbSteamBWPerPoint.value())
- n_point = int(self.steam.sbSteamNPoint.value())
- tau1_ms = float(self.steam.sbSteamTau1Ms.value())
- tau2_ms = float(self.steam.sbSteamTau2Ms.value())
- tr = float(self.steam.sbSteamTR.value())
- t_ex_ms = float(self.steam.sbSteamTExMs.value())
- t_bw_product_ex = float(self.steam.sbSteamTBW.value())
- flip_angle = float(self.steam.sbSteamFlipAngle.value())
- apodization = float(self.steam.sbSteamApodization.value())
- gamma, g_amp_max, g_slew_max, grad_raster_time, tau_max = self._steam_gradient_limits(
- g_amp_max_mtm=g_amp_max_mtm,
- g_slew_max_tms=g_slew_max_tms,
- grad_raster_time_us=grad_raster_time_us,
- )
- rf_raster_time = rf_raster_time_us * 1e-6
- rf_ringdown_time = rf_ringdown_us * 1e-6
- rf_dead_time = rf_dead_us * 1e-6
- adc_dead_time = adc_dead_us * 1e-6
- voxel_thkn = voxel_thkn_mm * 1e-3
- tau1 = tau1_ms * 1e-3
- tau2 = tau2_ms * 1e-3
- t_ex = t_ex_ms * 1e-3
- bw_ex_pulse = t_bw_product_ex / t_ex
- self._steam_params = {
- "G_amp_max": g_amp_max,
- "G_slew_max": g_slew_max,
- "gamma": gamma,
- "grad_raster_time": grad_raster_time,
- "rf_raster_time": rf_raster_time,
- "t_ex": t_ex,
- "t_BW_product_ex": t_bw_product_ex,
- "FA": flip_angle,
- "apodization": apodization,
- "BW_ex_pulse": bw_ex_pulse,
- "rf_ringdown_time": rf_ringdown_time,
- "rf_dead_time": rf_dead_time,
- "adc_dead_time": adc_dead_time,
- "voxel_thkn": voxel_thkn,
- "N_point": n_point,
- "BW_per_point": bw_per_point,
- "dG": tau_max,
- "tau1": tau1,
- "tau2": tau2,
- "TR": tr,
- "average": average,
- }
- if update_status:
- self.steam.lblSteamStatus.setText("STEAM parameters updated.")
- if write_log:
- self.log("[STEAM] Parameters updated.")
- def steam_save_sequence(self) -> None:
- self._steam_save_sequence(save_as=False)
- def steam_save_sequence_as(self) -> None:
- self._steam_save_sequence(save_as=True)
- def _steam_default_sequences_dir(self) -> str:
- project_root = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
- return os.path.join(project_root, "data", "sequences")
- def _steam_save_sequence(self, save_as: bool) -> None:
- if self.steam is None:
- return
- if seqgen_STEAM is None:
- msg = "Cannot import test_seqgen.seqgen_STEAM."
- if _SEQGEN_IMPORT_ERROR:
- msg = f"{msg}\n\nDetails: {_SEQGEN_IMPORT_ERROR}"
- QMessageBox.warning(self, "STEAM unavailable", msg)
- return
- self.steam_set_limits()
- default_base = self.steam.leSteamOutput.text().strip() or f"STEAM_{datetime.now().strftime('%d%m%y_%H%M')}"
- default_seq = f"{default_base}.seq"
- seq_dir = self._steam_default_sequences_dir()
- os.makedirs(seq_dir, exist_ok=True)
- seq_path = os.path.join(seq_dir, default_seq)
- if save_as:
- seq_path, _ = QFileDialog.getSaveFileName(
- self,
- "Save STEAM sequence as",
- seq_path,
- "Sequence (*.seq);;All files (*)",
- )
- if not seq_path:
- return
- if not seq_path.lower().endswith(".seq"):
- seq_path += ".seq"
- json_path = os.path.splitext(seq_path)[0] + ".json"
- try:
- params = SimpleNamespace(**self._steam_params)
- seq = seqgen_STEAM(params)
- seq.write(seq_path)
- with open(json_path, "w", encoding="utf-8") as f:
- json.dump(self._steam_params, f, ensure_ascii=False, indent=2)
- self._steam_params_dirty = False
- self._update_steam_seq_params_highlight()
- self._sequence_file_path = seq_path
- self._refresh_current_sequence_ui()
- self.steam.leSteamOutput.setText(os.path.splitext(os.path.basename(seq_path))[0])
- self.steam.lblSteamStatus.setText(f"Saved: {seq_path}")
- self.log(f"[STEAM] Saved sequence: {seq_path}")
- self.log(f"[STEAM] Saved params: {json_path}")
- except Exception as e:
- self.steam.lblSteamStatus.setText("Save failed.")
- QMessageBox.critical(self, "STEAM save error", str(e))
|