""" ScanningTab - clinical MRI scanner UI simulation with real scan initiation. Panels: - Left: scrollable protocol queue (ProtocolListWidget) - Centre: 3 MRI image viewers (MriViewerWidget x 3) - Bottom: QTabWidget with parameter tabs; "Геометрия" tab holds rotation controls Rotation matrix (3x3, ZYX Euler) is merged into the seq_info dict and sent to the orchestrator's start_measurement step as "rotation_matrix". """ from __future__ import annotations import math import json import os import numpy as np from PySide6.QtCore import Qt, QThread, QTimer, Signal from PySide6.QtGui import ( QColor, QFont, QImage, QLinearGradient, QPainter, QPen, QPolygonF, ) from PySide6.QtCore import QPointF from PySide6.QtWidgets import ( QButtonGroup, QComboBox, QDoubleSpinBox, QFileDialog, QFormLayout, QFrame, QGridLayout, QGroupBox, QHBoxLayout, QLabel, QListWidget, QMessageBox, QPushButton, QSplitter, QTabWidget, QTextEdit, QVBoxLayout, QWidget, ) try: import httpx as _httpx _HAS_HTTPX = True except ImportError: _HAS_HTTPX = False from src import i18n # -- colour palette ------------------------------------------------------------- _BG_DARK = "#1a1a2e" _PANEL_BG = "#2a2a2a" _IMAGE_BG = "#000000" _ACCENT = "#f0c040" _ACCENT_DIM = "#555533" _ORANGE_SEL = "#e65100" _BTN_BG = "#252535" # -- animation ----------------------------------------------------------------- _SCAN_LINE_INTERVAL_MS = 40 # 25 fps _SCAN_SWEEP_DURATION = 120 # ticks per vertical sweep # -- mouse interaction --------------------------------------------------------- _ROT_SENSITIVITY = 0.4 # degrees per pixel for Ctrl+drag rotation # -- default orientations (Rx, Ry, Rz in degrees) ------------------------------ _PRESETS = { "Axial": (0.0, 0.0, 0.0), "Coronal": (90.0, 0.0, 0.0), "Sagittal": (0.0, 90.0, 0.0), } _PROTOCOLS = [ "FID", "SE", "TSE" ] # Physical axes for each viewer plane: (horizontal_axis, vertical_axis) # X=0, Y=1, Z=2 in physical space; Y is flipped to screen coords in paintEvent _VIEWER_AXES: dict[str, tuple[int, int]] = { "Axial": (0, 1), "Cor": (0, 2), "Sag": (1, 2), } # ============================================================================== def _euler_to_matrix(rx_deg: float, ry_deg: float, rz_deg: float) -> list: """ZYX Euler angles (degrees) -> 3x3 rotation matrix as list-of-lists.""" rx = math.radians(rx_deg) ry = math.radians(ry_deg) rz = math.radians(rz_deg) cx, sx = math.cos(rx), math.sin(rx) cy, sy = math.cos(ry), math.sin(ry) cz, sz = math.cos(rz), math.sin(rz) Rx = np.array([[1, 0, 0 ], [0, cx, -sx], [0, sx, cx]]) Ry = np.array([[ cy, 0, sy], [ 0, 1, 0], [-sy, 0, cy]]) Rz = np.array([[cz, -sz, 0], [sz, cz, 0], [0, 0, 1]]) R = Rz @ Ry @ Rx return [[round(float(v), 6) for v in row] for row in R] def _project_slice_quad(R: list, viewer_label: str, size: float = 0.42) -> list: """ Project the scan-plane square onto a viewer's 2D projection plane. The scan square lies in the (freq-encode, phase-encode) plane. freq direction = R column 0, phase direction = R column 1. Viewer planes: "Axial" -> XY (physical axes 0, 1) "Cor" -> XZ (physical axes 0, 2) "Sag" -> YZ (physical axes 1, 2) Returns list of 4 (u, v) tuples in [-1, 1] space, Y-up convention. """ ax0, ax1 = _VIEWER_AXES.get(viewer_label, (0, 1)) # freq and phase unit vectors in physical space freq = [R[i][0] for i in range(3)] phase = [R[i][1] for i in range(3)] corners = [] for sf, sp in ((size, size), (-size, size), (-size, -size), (size, -size)): pt3 = [freq[i] * sf + phase[i] * sp for i in range(3)] corners.append((pt3[ax0], pt3[ax1])) return corners def _generate_noise_image(w: int = 256, h: int = 256) -> QImage: rng = np.random.default_rng(42) data = rng.integers(0, 70, (h, w), dtype=np.uint8) cy, cx = h / 2, w / 2 Y, X = np.ogrid[:h, :w] dist = np.sqrt(((X - cx) / cx) ** 2 + ((Y - cy) / cy) ** 2) mask = np.clip(1.0 - dist * 0.85, 0.05, 1.0) data = (data * mask).astype(np.uint8) alpha = np.full((h, w), 200, dtype=np.uint8) rgba = np.stack([data, data, data, alpha], axis=-1) img = QImage(rgba.tobytes(), w, h, 4 * w, QImage.Format_RGBA8888) return img.copy() # ============================================================================== class MriViewerWidget(QWidget): """ Dark MRI image viewer - pure QPainter. Ctrl + left-drag moves the slice square within the viewer plane. The widget emits slice_offset_changed(ax0, ax1, d0, d1) so ScanningTab can accumulate a shared 3-D offset vector and push it back to all viewers. """ # slice_offset_changed(physical_axis_u, physical_axis_v, delta_u, delta_v) slice_offset_changed = Signal(int, int, float, float) # rotation_delta(drx_deg, dry_deg) - Ctrl+drag gamedev-style rotation rotation_delta = Signal(float, float) _IDENTITY = [[1, 0, 0], [0, 1, 0], [0, 0, 1]] def __init__(self, label: str = "Axial", parent: QWidget | None = None) -> None: super().__init__(parent) self._label = label self._scan_y = 0 self._active_scan = False self._rot_matrix = [row[:] for row in self._IDENTITY] self._slice_offset = [0.0, 0.0, 0.0] # 3-D offset in logical [-1, 1] space self._drag_pos = None # QPointF while dragging self._drag_mode = None # 'move' | 'rotate' self._noise = _generate_noise_image() self.setMinimumSize(120, 120) self.setMouseTracking(True) # needed for cursor updates without button # -- public API --------------------------------------------------------- def set_scanning(self, active: bool) -> None: self._active_scan = active self.update() def set_rotation_matrix(self, R: list) -> None: self._rot_matrix = R self.update() def set_slice_offset(self, offset: list) -> None: self._slice_offset = list(offset) self.update() def advance_scanline(self, phase: int) -> None: if self.height() == 0: return self._scan_y = int(phase / _SCAN_SWEEP_DURATION * self.height()) self.update() # -- mouse interaction -------------------------------------------------- # LMB drag -> move slice (translate) # Ctrl + LMB drag -> rotate slice (gamedev orbit: dx=Ry yaw, dy=Rx pitch) def _pixel_scale(self) -> float: return min(self.width(), self.height()) * 0.46 def _hover_cursor(self, modifiers) -> Qt.CursorShape: return Qt.OpenHandCursor if (modifiers & Qt.ControlModifier) else Qt.SizeAllCursor def mousePressEvent(self, event) -> None: # noqa: N802 if event.button() == Qt.LeftButton: self._drag_pos = event.position() self._drag_mode = "rotate" if (event.modifiers() & Qt.ControlModifier) else "move" self.setCursor(Qt.ClosedHandCursor if self._drag_mode == "rotate" else Qt.SizeAllCursor) event.accept() else: super().mousePressEvent(event) def mouseMoveEvent(self, event) -> None: # noqa: N802 if not (event.buttons() & Qt.LeftButton): self.setCursor(self._hover_cursor(event.modifiers())) if self._drag_pos is not None and (event.buttons() & Qt.LeftButton): pos = event.position() dx = pos.x() - self._drag_pos.x() dy = pos.y() - self._drag_pos.y() if self._drag_mode == "move": scale = self._pixel_scale() if scale > 0: ax0, ax1 = _VIEWER_AXES[self._label] self.slice_offset_changed.emit(ax0, ax1, dx / scale, -dy / scale) else: # rotate - gamedev FPS orbit # horizontal drag -> yaw (Ry) # vertical drag -> pitch (Rx); screen-down = positive pitch self.rotation_delta.emit( dy * _ROT_SENSITIVITY, # drx dx * _ROT_SENSITIVITY, # dry ) self._drag_pos = pos event.accept() else: super().mouseMoveEvent(event) def mouseReleaseEvent(self, event) -> None: # noqa: N802 if event.button() == Qt.LeftButton and self._drag_pos is not None: self._drag_pos = None self._drag_mode = None self.setCursor(self._hover_cursor(event.modifiers())) event.accept() else: super().mouseReleaseEvent(event) # -- painting ----------------------------------------------------------- def paintEvent(self, event) -> None: # noqa: N802 p = QPainter(self) rc = self.rect() w, h = rc.width(), rc.height() # 1 - background p.fillRect(rc, QColor(_IMAGE_BG)) # 2 - noise texture p.setOpacity(0.9) p.drawImage(rc, self._noise) p.setOpacity(1.0) # 3 - crosshair cx, cy = w // 2, h // 2 p.setPen(QPen(QColor("#444444"), 1)) p.drawLine(cx - int(w * 0.30), cy, cx + int(w * 0.30), cy) p.drawLine(cx, cy - int(h * 0.30), cx, cy + int(h * 0.30)) # 4 - scale ticks (3 per edge at 25/50/75 %) p.setPen(QPen(QColor("#888888"), 1)) tick = 8 for frac in (0.25, 0.50, 0.75): tx, ty = int(w * frac), int(h * frac) p.drawLine(tx, 0, tx, tick) p.drawLine(tx, h - tick, tx, h) p.drawLine(0, ty, tick, ty) p.drawLine(w - tick, ty, w, ty) # 5 - dim outer border p.setPen(QPen(QColor(_ACCENT_DIM), 1)) p.drawRect(rc.adjusted(2, 2, -2, -2)) # 6 - projected slice square (with 3-D offset applied) scale = self._pixel_scale() ax0, ax1 = _VIEWER_AXES[self._label] off_u = self._slice_offset[ax0] off_v = self._slice_offset[ax1] origin_x = cx + off_u * scale origin_y = cy - off_v * scale # Y-up -> Y-down corners_uv = _project_slice_quad(self._rot_matrix, self._label) pts = QPolygonF([ QPointF(origin_x + u * scale, origin_y - v * scale) for u, v in corners_uv ]) # semi-transparent fill p.setPen(Qt.NoPen) p.setBrush(QColor(240, 192, 64, 45)) p.drawPolygon(pts) # solid border pen = QPen(QColor(_ACCENT), 2) pen.setJoinStyle(Qt.MiterJoin) p.setPen(pen) p.setBrush(Qt.NoBrush) p.drawPolygon(pts) # 7 - scan sweep stripe if self._active_scan: sy = self._scan_y grad = QLinearGradient(0, sy, w, sy) grad.setColorAt(0.0, QColor(0, 0, 0, 0)) grad.setColorAt(0.5, QColor(255, 255, 255, 96)) grad.setColorAt(1.0, QColor(0, 0, 0, 0)) p.fillRect(0, max(0, sy - 2), w, 5, grad) # 8 - orientation label font = QFont("Arial", 11, QFont.Bold) p.setFont(font) p.setPen(Qt.white) p.drawText(rc.adjusted(8, 4, 0, 0), Qt.AlignTop | Qt.AlignLeft, self._label) p.end() # ============================================================================== class ProtocolListWidget(QWidget): protocol_selected = Signal(str) def __init__(self, parent: QWidget | None = None) -> None: super().__init__(parent) self.setStyleSheet(f"background: {_PANEL_BG};") lay = QVBoxLayout(self) lay.setContentsMargins(4, 8, 4, 4) lay.setSpacing(4) self._header_lbl = QLabel(i18n.tr("protocols_header")) self._header_lbl.setStyleSheet( "color: #aaaaaa; font-weight: bold; font-size: 11px; background: transparent;" ) lay.addWidget(self._header_lbl) self._list = QListWidget() self._list.setStyleSheet(f""" QListWidget {{ background: {_PANEL_BG}; color: #ffffff; border: 1px solid #3a3a3a; font-size: 12px; outline: 0; }} QListWidget::item {{ padding: 6px 8px; border-bottom: 1px solid #333333; }} QListWidget::item:selected {{ background: {_ORANGE_SEL}; color: #ffffff; }} QListWidget::item:hover:!selected {{ background: #3a3a3a; }} """) for name in _PROTOCOLS: self._list.addItem(name) self._list.setCurrentRow(0) self._list.currentTextChanged.connect(self.protocol_selected) lay.addWidget(self._list, stretch=1) def retranslate_ui(self) -> None: self._header_lbl.setText(i18n.tr("protocols_header")) # ============================================================================== class _ScanPipelineWorker(QThread): """ Full pipeline: health-check → load scenario (with seq_file / seq_info) → run_all → poll → extract raw data → emit raw_data_ready. The orchestrator is responsible for: - Forwarding the .seq file to seq-interp for interpretation - Running the full measurement scenario (spectrometer, reconstructor, …) The GUI never calls seq-interp or any other microservice directly. """ progress = Signal(str) job_started = Signal(str) finished = Signal(str) error = Signal(str) raw_data_ready = Signal(str) # absolute path to temp JSON file def __init__( self, seq_file_path: str | None, seq_info: dict | None, orchestrator_url: str, scenario_id: str = "full_pipeline", protocol: str = "", parent=None, ) -> None: super().__init__(parent) self._seq_file = seq_file_path self._seq_info = dict(seq_info) if seq_info else {} self._scenario_id = scenario_id self._protocol = protocol from src.clients.orchestrator_client import OrchestratorClient self._orch = OrchestratorClient(orchestrator_url) def run(self) -> None: try: self._run_pipeline() except Exception as exc: if not self.isInterruptionRequested(): self.error.emit(str(exc)) def _run_pipeline(self) -> None: self.progress.emit("Проверка оркестратора...") if not self._orch.healthcheck(): raise RuntimeError("Оркестратор недоступен — проверьте, что сервис запущен") self.progress.emit( f"Отправка задания в оркестратор " f"[сценарий: {self._scenario_id}, ИП: {self._protocol or '—'}]…" ) job_id = self._orch.scan( seq_file_path=self._seq_file or None, seq_info=self._seq_info or None, scenario_id=self._scenario_id, protocol=self._protocol, ) self.job_started.emit(job_id) self.progress.emit(f" Job {job_id[:8]}… запущен, ожидание результата…") def _on_status(status: str) -> None: self.progress.emit(f" [{status}]") final = self._orch.poll_scan( job_id, timeout=300.0, poll_interval=2.0, progress_cb=_on_status, interrupted_fn=self.isInterruptionRequested, ) steps = final.get("steps", []) meas_id = None raw_data = None for step in steps: name = step.get("name") res = step.get("result") or {} if name == "start_measurement": meas_id = res.get("measurement_id") if name == "fetch_data": raw_data = res.get("data") self.progress.emit(f" Сканирование завершено (meas_id={meas_id})") is_stub = str(meas_id) in ("", "None", "meas_stub") or meas_id is None if raw_data and not is_stub: raw_path = self._save_raw(raw_data, meas_id) if raw_path: self.raw_data_ready.emit(raw_path) self.finished.emit(f"job завершён (meas_id={meas_id})") def _save_raw(self, data, meas_id) -> str | None: import tempfile, time as _t fname = f"scan_raw_{meas_id}_{int(_t.time())}.json" fpath = os.path.join(tempfile.gettempdir(), fname) try: with open(fpath, "w", encoding="utf-8") as fh: json.dump(data, fh) self.progress.emit(f" Данные сохранены: {fname}") return fpath except Exception as exc: self.progress.emit(f" Не удалось сохранить данные: {exc}") return None # ============================================================================== class _ScanWorker(QThread): """Fire-and-forget: load scenario and run_all via orchestrator REST.""" finished = Signal(str) # job_id error = Signal(str) def __init__(self, url: str, info: dict, parent=None) -> None: super().__init__(parent) self._url = url.rstrip("/") self._info = info def run(self) -> None: try: if not _HAS_HTTPX: import urllib.request, urllib.error self._run_urllib() else: self._run_httpx() except Exception as exc: self.error.emit(str(exc)) def _run_httpx(self) -> None: payload = {"param_overrides": {"start_measurement": {"info": self._info}}} with _httpx.Client(timeout=15) as client: r = client.post( f"{self._url}/scenario/load/full_pipeline", json=payload, ) r.raise_for_status() job_id = r.json().get("job_id", "?") r2 = client.post(f"{self._url}/scenario/{job_id}/run_all") r2.raise_for_status() self.finished.emit(job_id) def _run_urllib(self) -> None: import urllib.request payload = {"param_overrides": {"start_measurement": {"info": self._info}}} data = json.dumps(payload).encode() headers = {"Content-Type": "application/json"} req = urllib.request.Request( f"{self._url}/scenario/load/full_pipeline", data=data, headers=headers, method="POST", ) with urllib.request.urlopen(req, timeout=15) as resp: body = json.loads(resp.read()) job_id = body.get("job_id", "?") req2 = urllib.request.Request( f"{self._url}/scenario/{job_id}/run_all", data=b"{}", headers=headers, method="POST", ) with urllib.request.urlopen(req2, timeout=15): pass self.finished.emit(job_id) # ============================================================================== class ScanningTab(QWidget): """ Operator-facing scanning tab. Bottom QTabWidget tabs match Siemens syngo layout: Основные | Контраст | Разрешение | Геометрия | Система "Геометрия" holds orientation presets + Rx/Ry/Rz spinboxes + live 3x3 matrix. """ scan_job_started = Signal(str) # job_id — emitted once the orchestrator accepts the job raw_data_ready = Signal(str) # absolute path to temp JSON file def __init__(self, parent: QWidget | None = None) -> None: super().__init__(parent) self.setStyleSheet(f"background: {_BG_DARK};") self._viewers: list[MriViewerWidget] = [] self._scan_tick: int = 0 self._seq_info: dict | None = None self._seq_file_path: str | None = None self._orchestrator_url: str = "http://localhost:1717" self._scan_worker: _ScanPipelineWorker | None = None self._active_protocol: str = _PROTOCOLS[0] self._scenario_id: str = "full_pipeline" self._slice_offset: list[float] = [0.0, 0.0, 0.0] root = QVBoxLayout(self) root.setContentsMargins(0, 0, 0, 0) root.setSpacing(0) vsplit = QSplitter(Qt.Vertical) vsplit.setStyleSheet("QSplitter::handle { background: #333344; height: 3px; }") vsplit.addWidget(self._build_upper_area()) vsplit.addWidget(self._build_bottom_panel()) vsplit.setSizes([700, 140]) vsplit.setChildrenCollapsible(False) root.addWidget(vsplit, stretch=1) self._setup_animation() self._update_matrix_display() self._update_slice_display() self._update_scan_ready_state() # -- public API --------------------------------------------------------- def apply_seq_info(self, info_dict: dict) -> None: """Receive exported sequence info from SeqInterpTab and auto-start scan.""" self._seq_info = dict(info_dict) self._seq_file_path = None self._update_scan_ready_state() label = info_dict.get("infostr") or "sequence" self._log(f"Получены параметры последовательности: {label}", "INFO") if not self._btn_scan.isChecked(): self._btn_scan.setChecked(True) def set_orchestrator_url(self, url: str) -> None: self._orchestrator_url = url def retranslate_ui(self) -> None: self._protocol_list.retranslate_ui() # param tabs: 0-3 = placeholders, 4 = geometry _keys = ["tab_main", "tab_contrast", "tab_resolution", "tab_system", "tab_geometry"] for idx, key in enumerate(_keys): self._param_tabs.setTabText(idx, i18n.tr(key)) self._preset_group.setTitle(i18n.tr("grp_orientation")) self._rot_group.setTitle(i18n.tr("grp_rotation")) self._matrix_group.setTitle(i18n.tr("grp_rot_matrix")) # scan button: keep text in sync with current scan state if self._scan_timer.isActive(): self._btn_scan.setText(i18n.tr("btn_stop_scan")) else: self._btn_scan.setText(i18n.tr("btn_scan")) self._update_scan_ready_state() # -- layout builders ---------------------------------------------------- def _build_upper_area(self) -> QWidget: container = QWidget() container.setStyleSheet(f"background: {_BG_DARK};") lay = QVBoxLayout(container) lay.setContentsMargins(0, 0, 0, 0) lay.setSpacing(0) hsplit = QSplitter(Qt.Horizontal) hsplit.setStyleSheet("QSplitter::handle { background: #333344; width: 3px; }") hsplit.addWidget(self._build_protocol_panel()) hsplit.addWidget(self._build_image_grid()) hsplit.setSizes([220, 980]) hsplit.setChildrenCollapsible(False) lay.addWidget(hsplit) return container def _build_protocol_panel(self) -> QWidget: container = QWidget() container.setStyleSheet(f"background: {_PANEL_BG};") lay = QVBoxLayout(container) lay.setContentsMargins(0, 0, 0, 0) lay.setSpacing(0) self._protocol_list = ProtocolListWidget() self._protocol_list.protocol_selected.connect(self._on_protocol_selected) lay.addWidget(self._protocol_list, stretch=1) sep = QFrame() sep.setFrameShape(QFrame.HLine) sep.setFixedHeight(1) sep.setStyleSheet("background: #3a3a4a; border: none;") lay.addWidget(sep) # Scenario selector sc_container = QWidget() sc_container.setStyleSheet(f"background: {_PANEL_BG};") sc_lay = QVBoxLayout(sc_container) sc_lay.setContentsMargins(6, 6, 6, 4) sc_lay.setSpacing(4) sc_header = QLabel("Сценарий оркестратора") sc_header.setStyleSheet( "color: #7777aa; font-size: 10px; font-weight: bold; background: transparent;" ) sc_lay.addWidget(sc_header) sc_row = QHBoxLayout() sc_row.setSpacing(4) _combo_style = ( "QComboBox {" f" background: {_BTN_BG}; color: #ccccee;" " border: 1px solid #444466; border-radius: 3px;" " font-size: 11px; padding: 3px 6px;" "}" "QComboBox::drop-down { border: none; width: 16px; }" "QComboBox QAbstractItemView {" f" background: {_BTN_BG}; color: #ccccee; border: 1px solid #444466;" " selection-background-color: #e65100;" "}" ) self._scenario_combo = QComboBox() self._scenario_combo.setStyleSheet(_combo_style) self._scenario_combo.setToolTip("Тип сценария, который будет запущен в оркестраторе") self._scenario_combo.addItem("full_pipeline") self._scenario_combo.currentTextChanged.connect(self._on_scenario_selected) sc_row.addWidget(self._scenario_combo, stretch=1) btn_refresh_sc = QPushButton("↻") btn_refresh_sc.setFixedSize(24, 24) btn_refresh_sc.setToolTip("Получить список сценариев из оркестратора") btn_refresh_sc.setStyleSheet( f"QPushButton {{ background: {_BTN_BG}; color: #7777aa;" " border: 1px solid #444466; border-radius: 3px; font-size: 13px; }" "QPushButton:hover { color: #ffffff; background: #303050; }" ) btn_refresh_sc.clicked.connect(self._on_refresh_scenarios) sc_row.addWidget(btn_refresh_sc) sc_lay.addLayout(sc_row) lay.addWidget(sc_container) sep2 = QFrame() sep2.setFrameShape(QFrame.HLine) sep2.setFixedHeight(1) sep2.setStyleSheet("background: #3a3a4a; border: none;") lay.addWidget(sep2) # .seq file loader btn_load = QPushButton("Загрузить .seq…") btn_load.setToolTip("Выбрать готовый .seq файл для запуска полного пайплайна") btn_load.setStyleSheet( "QPushButton {" f" background: {_BTN_BG}; color: #aaaacc;" " border: 1px solid #444466; border-radius: 3px;" " font-size: 11px; padding: 5px 8px; margin: 6px 6px 2px 6px;" "}" "QPushButton:hover { background: #303050; color: #ffffff; }" ) btn_load.clicked.connect(self._on_load_seq_clicked) lay.addWidget(btn_load) self._lbl_seq_file = QLabel("Файл не выбран") self._lbl_seq_file.setWordWrap(True) self._lbl_seq_file.setStyleSheet( "color: #555577; font-size: 10px; background: transparent;" "padding: 0 8px 6px 8px;" ) lay.addWidget(self._lbl_seq_file) return container def _build_image_grid(self) -> QWidget: container = QWidget() container.setStyleSheet(f"background: {_BG_DARK};") grid = QGridLayout(container) grid.setContentsMargins(6, 6, 6, 6) grid.setSpacing(6) for col, lbl in enumerate(("Axial", "Cor", "Sag")): viewer = MriViewerWidget(label=lbl) viewer.slice_offset_changed.connect(self._on_slice_offset_changed) viewer.rotation_delta.connect(self._on_rotation_delta) grid.addWidget(viewer, 0, col) grid.setColumnStretch(col, 1) self._viewers.append(viewer) grid.setRowStretch(0, 1) return container def _build_bottom_panel(self) -> QWidget: panel = QWidget() panel.setStyleSheet("background: #16162a; border-top: 1px solid #333355;") outer = QVBoxLayout(panel) outer.setContentsMargins(0, 0, 0, 0) outer.setSpacing(0) # -- parameter tabs ------------------------------------------------- self._param_tabs = QTabWidget() self._param_tabs.setStyleSheet(""" QTabWidget::pane { border: none; background: #16162a; } QTabBar::tab { background: #1e1e38; color: #aaaacc; padding: 5px 14px; border: 1px solid #333355; border-bottom: none; margin-right: 2px; font-size: 11px; } QTabBar::tab:selected { background: #252545; color: #ffffff; } QTabBar::tab:hover:!selected { background: #222240; } """) _placeholder_keys = ["tab_main", "tab_contrast", "tab_resolution", "tab_system"] for key in _placeholder_keys: w = QLabel(f"[ {i18n.tr(key)} — TODO ]") w.setAlignment(Qt.AlignCenter) w.setStyleSheet("color: #555577; background: #16162a;") self._param_tabs.addTab(w, i18n.tr(key)) geo_tab = self._build_geometry_tab() self._param_tabs.addTab(geo_tab, i18n.tr("tab_geometry")) log_tab = self._build_log_tab() self._param_tabs.addTab(log_tab, "Лог") self._param_tabs.setCurrentWidget(geo_tab) outer.addWidget(self._param_tabs, stretch=1) # -- bottom action bar ---------------------------------------------- action_bar = QWidget() action_bar.setStyleSheet("background: #16162a; border-top: 1px solid #2a2a4a;") action_lay = QHBoxLayout(action_bar) action_lay.setContentsMargins(12, 6, 12, 6) action_lay.setSpacing(12) self._status_label = QLabel(i18n.tr("no_data")) self._status_label.setStyleSheet("color: #666688; font-size: 11px;") action_lay.addWidget(self._status_label) action_lay.addStretch() self._btn_scan = QPushButton(i18n.tr("btn_scan")) self._btn_scan.setCheckable(True) self._btn_scan.setMinimumWidth(140) self._btn_scan.setStyleSheet( "QPushButton {" " background: #1a3a1a; color: #88ee88;" " border: 1px solid #336633; border-radius: 4px;" " font-size: 12px; font-weight: bold; padding: 5px 16px;" "}" "QPushButton:checked {" " background: #3a1a1a; color: #ee8888; border-color: #663333;" "}" "QPushButton:hover:!checked { background: #1e4a1e; }" ) self._btn_scan.toggled.connect(self._on_scan_toggled) action_lay.addWidget(self._btn_scan) outer.addWidget(action_bar) return panel def _build_geometry_tab(self) -> QWidget: w = QWidget() w.setStyleSheet("background: #16162a;") lay = QHBoxLayout(w) lay.setContentsMargins(12, 8, 12, 8) lay.setSpacing(16) # -- orientation presets -------------------------------------------- self._preset_group = QGroupBox(i18n.tr("grp_orientation")) preset_group = self._preset_group preset_group.setStyleSheet(self._group_style()) preset_lay = QVBoxLayout(preset_group) preset_lay.setSpacing(4) self._btn_group = QButtonGroup(self) self._btn_group.setExclusive(True) self._preset_buttons: dict[str, QPushButton] = {} for name in ("Axial", "Coronal", "Sagittal"): btn = QPushButton(name) btn.setCheckable(True) btn.setStyleSheet(self._preset_btn_style()) preset_lay.addWidget(btn) self._btn_group.addButton(btn) self._preset_buttons[name] = btn btn.clicked.connect(lambda checked, n=name: self._on_preset(n)) self._preset_buttons["Axial"].setChecked(True) lay.addWidget(preset_group) # -- rotation angles ------------------------------------------------ self._rot_group = QGroupBox(i18n.tr("grp_rotation")) rot_group = self._rot_group rot_group.setStyleSheet(self._group_style()) form = QFormLayout(rot_group) form.setSpacing(6) form.setContentsMargins(8, 4, 8, 4) spin_style = ( "QDoubleSpinBox {" " background: #252540; color: #ddddff;" " border: 1px solid #445; border-radius: 3px; padding: 2px 4px;" "}" "QDoubleSpinBox:focus { border-color: #f0c040; }" ) lbl_style = "color: #aaaacc; font-size: 11px;" self._spin_rx = QDoubleSpinBox() self._spin_ry = QDoubleSpinBox() self._spin_rz = QDoubleSpinBox() for spin in (self._spin_rx, self._spin_ry, self._spin_rz): spin.setRange(-180.0, 180.0) spin.setSingleStep(1.0) spin.setDecimals(1) spin.setSuffix(" deg") spin.setStyleSheet(spin_style) spin.setMinimumWidth(90) spin.valueChanged.connect(self._on_rotation_changed) for lbl_text, spin in (("Rx", self._spin_rx), ("Ry", self._spin_ry), ("Rz", self._spin_rz)): lbl = QLabel(lbl_text) lbl.setStyleSheet(lbl_style) form.addRow(lbl, spin) lay.addWidget(rot_group) # -- rotation matrix display ---------------------------------------- self._matrix_group = QGroupBox(i18n.tr("grp_rot_matrix")) matrix_group = self._matrix_group matrix_group.setStyleSheet(self._group_style()) matrix_lay = QVBoxLayout(matrix_group) matrix_lay.setContentsMargins(8, 4, 8, 4) self._matrix_label = QLabel() self._matrix_label.setFont(QFont("Courier New", 10)) self._matrix_label.setStyleSheet("color: #99ccff; background: transparent;") self._matrix_label.setAlignment(Qt.AlignLeft | Qt.AlignVCenter) matrix_lay.addWidget(self._matrix_label) lay.addWidget(matrix_group) lay.addStretch() return w def _build_log_tab(self) -> QWidget: w = QWidget() w.setStyleSheet("background: #16162a;") lay = QVBoxLayout(w) lay.setContentsMargins(6, 6, 6, 4) lay.setSpacing(4) self._log_view = QTextEdit() self._log_view.setReadOnly(True) self._log_view.setFont(QFont("Courier New", 9)) self._log_view.setStyleSheet( "QTextEdit {" " background: #0e0e1c; color: #aaaacc;" " border: 1px solid #2a2a4a; border-radius: 3px;" "}" ) lay.addWidget(self._log_view, stretch=1) btn_clear = QPushButton("Очистить") btn_clear.setFixedWidth(90) btn_clear.setStyleSheet( "QPushButton {" f" background: {_BTN_BG}; color: #777799;" " border: 1px solid #333355; border-radius: 3px;" " font-size: 10px; padding: 3px 8px;" "}" "QPushButton:hover { color: #aaaacc; }" ) btn_clear.clicked.connect(self._log_view.clear) lay.addWidget(btn_clear, alignment=Qt.AlignRight) return w _LOG_COLORS = {"INFO": "#aaaacc", "WARN": "#e6a817", "ERR": "#ee4444"} def _log(self, msg: str, level: str = "INFO") -> None: if not hasattr(self, "_log_view"): return from datetime import datetime ts = datetime.now().strftime("%H:%M:%S") color = self._LOG_COLORS.get(level, "#aaaacc") line = ( f"{ts}" f" [{level}]" f" {msg}" ) self._log_view.append(line) sb = self._log_view.verticalScrollBar() sb.setValue(sb.maximum()) @staticmethod def _group_style() -> str: return ( "QGroupBox {" " color: #8888aa; font-size: 10px; font-weight: bold;" " border: 1px solid #333355; border-radius: 4px; margin-top: 6px;" "}" "QGroupBox::title { subcontrol-origin: margin; left: 6px; top: -1px; }" ) @staticmethod def _preset_btn_style() -> str: return ( "QPushButton {" f" background: {_BTN_BG}; color: #ccccee;" " border: 1px solid #444466; border-radius: 3px;" " font-size: 11px; padding: 4px 10px;" "}" "QPushButton:checked { background: #e65100; color: #ffffff; border-color: #ff7733; }" "QPushButton:hover:!checked { background: #303050; }" ) # -- animation ---------------------------------------------------------- def _setup_animation(self) -> None: self._scan_timer = QTimer(self) self._scan_timer.setInterval(_SCAN_LINE_INTERVAL_MS) self._scan_timer.timeout.connect(self._on_tick) def _on_tick(self) -> None: self._scan_tick += 1 offset = _SCAN_SWEEP_DURATION // max(len(self._viewers), 1) for i, viewer in enumerate(self._viewers): phase = (self._scan_tick + i * offset) % _SCAN_SWEEP_DURATION viewer.advance_scanline(phase) # -- rotation logic ----------------------------------------------------- def _on_preset(self, name: str) -> None: rx, ry, rz = _PRESETS[name] for spin in (self._spin_rx, self._spin_ry, self._spin_rz): spin.blockSignals(True) self._spin_rx.setValue(rx) self._spin_ry.setValue(ry) self._spin_rz.setValue(rz) for spin in (self._spin_rx, self._spin_ry, self._spin_rz): spin.blockSignals(False) self._on_rotation_changed() def _on_rotation_changed(self) -> None: self._update_matrix_display() self._update_slice_display() self._sync_preset_buttons() def _compute_rotation_matrix(self) -> list: return _euler_to_matrix( self._spin_rx.value(), self._spin_ry.value(), self._spin_rz.value(), ) def _update_matrix_display(self) -> None: R = self._compute_rotation_matrix() lines = [] for row in R: lines.append(" ".join(f"{v:+.3f}" for v in row)) self._matrix_label.setText("\n".join(lines)) def _update_slice_display(self) -> None: """Push current rotation matrix and offset to all viewers.""" R = self._compute_rotation_matrix() for v in self._viewers: v.set_rotation_matrix(R) v.set_slice_offset(self._slice_offset) def _on_slice_offset_changed(self, ax0: int, ax1: int, d0: float, d1: float) -> None: """Accumulate drag delta from any viewer into the shared 3-D offset.""" self._slice_offset[ax0] = max(-0.95, min(0.95, self._slice_offset[ax0] + d0)) self._slice_offset[ax1] = max(-0.95, min(0.95, self._slice_offset[ax1] + d1)) R = self._compute_rotation_matrix() for v in self._viewers: v.set_slice_offset(self._slice_offset) v.set_rotation_matrix(R) def _on_rotation_delta(self, drx: float, dry: float) -> None: """Gamedev-style Ctrl+drag: apply incremental Rx/Ry rotation from any viewer.""" def _wrap(val: float) -> float: while val > 180.0: val -= 360.0 while val < -180.0: val += 360.0 return val for spin, delta in ((self._spin_rx, drx), (self._spin_ry, dry)): spin.blockSignals(True) spin.setValue(_wrap(spin.value() + delta)) spin.blockSignals(False) self._on_rotation_changed() def _sync_preset_buttons(self) -> None: """Check if current angles match a known preset; highlight button if so.""" rx = round(self._spin_rx.value(), 1) ry = round(self._spin_ry.value(), 1) rz = round(self._spin_rz.value(), 1) matched = None for name, (prx, pry, prz) in _PRESETS.items(): if (rx, ry, rz) == (prx, pry, prz): matched = name break for name, btn in self._preset_buttons.items(): btn.blockSignals(True) btn.setChecked(name == matched) btn.blockSignals(False) # -- .seq file loading -------------------------------------------------- def _on_load_seq_clicked(self) -> None: path, _ = QFileDialog.getOpenFileName( self, "Выбрать .seq файл", os.path.join(os.path.dirname(__file__), os.pardir, os.pardir), "Pulseq файлы (*.seq);;Все файлы (*)", ) if not path: return self._seq_file_path = path self._seq_info = None fname = os.path.basename(path) self._lbl_seq_file.setText(fname) self._lbl_seq_file.setStyleSheet( "color: #e65100; font-size: 10px; background: transparent;" "padding: 0 8px 6px 8px;" ) self._log(f"Загружен файл: {fname}", "INFO") self._update_scan_ready_state() # -- scan initiation ---------------------------------------------------- def _on_scan_toggled(self, checked: bool) -> None: if checked: if self._scan_worker is not None and self._scan_worker.isRunning(): return if self._seq_info is None and self._seq_file_path is None: QMessageBox.warning( self, i18n.tr("dlg_no_data_title"), i18n.tr("dlg_no_seq_msg"), ) self._btn_scan.setChecked(False) return info = dict(self._seq_info) if self._seq_info else {} if info: info["rotation_matrix"] = self._compute_rotation_matrix() info["slice_position"] = list(self._slice_offset) self._scan_worker = _ScanPipelineWorker( seq_file_path=self._seq_file_path, seq_info=info if info else None, orchestrator_url=self._orchestrator_url, scenario_id=self._scenario_id, protocol=self._active_protocol, parent=self, ) self._scan_worker.job_started.connect(self.scan_job_started) self._scan_worker.finished.connect(self._on_scan_done) self._scan_worker.error.connect(self._on_scan_error) self._scan_worker.progress.connect(self._on_scan_progress) self._scan_worker.raw_data_ready.connect(self._on_raw_data_ready_log) self._scan_worker.raw_data_ready.connect(self.raw_data_ready) self._scan_worker.start() self._scan_timer.start() self._btn_scan.setText(i18n.tr("btn_stop_scan")) self._status_label.setText("Инициализация пайплайна…") self._status_label.setStyleSheet("color: #88ee88; font-size: 11px;") self._log("--- Запуск пайплайна сканирования ---", "INFO") for v in self._viewers: v.set_scanning(True) # Switch to Log tab so the user can follow progress log_idx = self._param_tabs.indexOf(self._log_view.parent()) if log_idx >= 0: self._param_tabs.setCurrentIndex(log_idx) else: self._scan_timer.stop() if self._scan_worker and self._scan_worker.isRunning(): self._scan_worker.requestInterruption() self._btn_scan.setText(i18n.tr("btn_scan")) for v in self._viewers: v.set_scanning(False) self._update_scan_ready_state() def _on_raw_data_ready_log(self, path: str) -> None: self._log(f"Данные отправлены во вкладку Spectroscopy: {os.path.basename(path)}", "INFO") def _on_scan_progress(self, msg: str) -> None: self._status_label.setText(msg[:80]) self._status_label.setStyleSheet("color: #88ee88; font-size: 11px;") self._log(msg, "INFO") def _on_scan_done(self, msg: str) -> None: self._scan_timer.stop() self._status_label.setText("Готово") self._status_label.setStyleSheet("color: #66ccff; font-size: 11px;") self._log(msg, "INFO") for v in self._viewers: v.set_scanning(False) self._btn_scan.setChecked(False) def _on_scan_error(self, err: str) -> None: self._scan_timer.stop() self._status_label.setText(f"{i18n.tr('error_prefix')}: {err[:70]}") self._status_label.setStyleSheet("color: #ee4444; font-size: 11px;") self._log(err, "ERR") for v in self._viewers: v.set_scanning(False) self._btn_scan.setChecked(False) def _update_scan_ready_state(self) -> None: if self._seq_file_path: fname = os.path.basename(self._seq_file_path) self._status_label.setText(f"Файл: {fname}") self._status_label.setStyleSheet("color: #e65100; font-size: 11px;") elif self._seq_info is not None: self._status_label.setText(i18n.tr("ready_to_scan")) self._status_label.setStyleSheet("color: #e65100; font-size: 11px;") else: self._status_label.setText(i18n.tr("no_data")) self._status_label.setStyleSheet("color: #666688; font-size: 11px;") # -- scenario selection ------------------------------------------------- def _on_scenario_selected(self, name: str) -> None: if name: self._scenario_id = name def _on_refresh_scenarios(self) -> None: import httpx try: r = httpx.get( f"{self._orchestrator_url}/scenario/list", timeout=5.0, ) if r.is_success: scenarios: list[str] = r.json().get("scenarios", []) if scenarios: current = self._scenario_combo.currentText() self._scenario_combo.blockSignals(True) self._scenario_combo.clear() for s in scenarios: self._scenario_combo.addItem(s) idx = self._scenario_combo.findText(current) self._scenario_combo.setCurrentIndex(max(idx, 0)) self._scenario_combo.blockSignals(False) self._scenario_id = self._scenario_combo.currentText() self._log(f"Сценарии загружены: {scenarios}", "INFO") else: self._log("Оркестратор вернул пустой список сценариев", "WARN") else: self._log(f"Ошибка загрузки сценариев: HTTP {r.status_code}", "WARN") except Exception as exc: self._log(f"Не удалось подключиться к оркестратору: {exc}", "WARN") # -- protocol selection ------------------------------------------------- def _on_protocol_selected(self, name: str) -> None: self._active_protocol = name