"""
ScanningTab - clinical MRI scanner UI simulation with real scan initiation.
Panels:
- Left: scrollable protocol queue (ProtocolListWidget)
- Centre: 3 MRI image viewers (MriViewerWidget x 3)
- Bottom: QTabWidget with parameter tabs; "Геометрия" tab holds rotation controls
Rotation matrix (3x3, ZYX Euler) is merged into the seq_info dict and sent
to the orchestrator's start_measurement step as "rotation_matrix".
"""
from __future__ import annotations
import math
import json
import os
import numpy as np
from PySide6.QtCore import Qt, QThread, QTimer, Signal
from PySide6.QtGui import (
QColor, QFont, QImage, QLinearGradient, QPainter, QPen, QPolygonF,
)
from PySide6.QtCore import QPointF
from PySide6.QtWidgets import (
QButtonGroup, QComboBox, QDoubleSpinBox, QFileDialog, QFormLayout, QFrame,
QGridLayout, QGroupBox, QHBoxLayout, QLabel, QListWidget, QMessageBox,
QPushButton, QSplitter, QTabWidget, QTextEdit, QVBoxLayout, QWidget,
)
try:
import httpx as _httpx
_HAS_HTTPX = True
except ImportError:
_HAS_HTTPX = False
from src import i18n
# -- colour palette -------------------------------------------------------------
_BG_DARK = "#1a1a2e"
_PANEL_BG = "#2a2a2a"
_IMAGE_BG = "#000000"
_ACCENT = "#f0c040"
_ACCENT_DIM = "#555533"
_ORANGE_SEL = "#e65100"
_BTN_BG = "#252535"
# -- animation -----------------------------------------------------------------
_SCAN_LINE_INTERVAL_MS = 40 # 25 fps
_SCAN_SWEEP_DURATION = 120 # ticks per vertical sweep
# -- mouse interaction ---------------------------------------------------------
_ROT_SENSITIVITY = 0.4 # degrees per pixel for Ctrl+drag rotation
# -- default orientations (Rx, Ry, Rz in degrees) ------------------------------
_PRESETS = {
"Axial": (0.0, 0.0, 0.0),
"Coronal": (90.0, 0.0, 0.0),
"Sagittal": (0.0, 90.0, 0.0),
}
_PROTOCOLS = [
"FID", "SE", "TSE"
]
# Physical axes for each viewer plane: (horizontal_axis, vertical_axis)
# X=0, Y=1, Z=2 in physical space; Y is flipped to screen coords in paintEvent
_VIEWER_AXES: dict[str, tuple[int, int]] = {
"Axial": (0, 1),
"Cor": (0, 2),
"Sag": (1, 2),
}
# ==============================================================================
def _euler_to_matrix(rx_deg: float, ry_deg: float, rz_deg: float) -> list:
"""ZYX Euler angles (degrees) -> 3x3 rotation matrix as list-of-lists."""
rx = math.radians(rx_deg)
ry = math.radians(ry_deg)
rz = math.radians(rz_deg)
cx, sx = math.cos(rx), math.sin(rx)
cy, sy = math.cos(ry), math.sin(ry)
cz, sz = math.cos(rz), math.sin(rz)
Rx = np.array([[1, 0, 0 ],
[0, cx, -sx],
[0, sx, cx]])
Ry = np.array([[ cy, 0, sy],
[ 0, 1, 0],
[-sy, 0, cy]])
Rz = np.array([[cz, -sz, 0],
[sz, cz, 0],
[0, 0, 1]])
R = Rz @ Ry @ Rx
return [[round(float(v), 6) for v in row] for row in R]
def _project_slice_quad(R: list, viewer_label: str, size: float = 0.42) -> list:
"""
Project the scan-plane square onto a viewer's 2D projection plane.
The scan square lies in the (freq-encode, phase-encode) plane.
freq direction = R column 0, phase direction = R column 1.
Viewer planes:
"Axial" -> XY (physical axes 0, 1)
"Cor" -> XZ (physical axes 0, 2)
"Sag" -> YZ (physical axes 1, 2)
Returns list of 4 (u, v) tuples in [-1, 1] space, Y-up convention.
"""
ax0, ax1 = _VIEWER_AXES.get(viewer_label, (0, 1))
# freq and phase unit vectors in physical space
freq = [R[i][0] for i in range(3)]
phase = [R[i][1] for i in range(3)]
corners = []
for sf, sp in ((size, size), (-size, size), (-size, -size), (size, -size)):
pt3 = [freq[i] * sf + phase[i] * sp for i in range(3)]
corners.append((pt3[ax0], pt3[ax1]))
return corners
def _generate_noise_image(w: int = 256, h: int = 256) -> QImage:
rng = np.random.default_rng(42)
data = rng.integers(0, 70, (h, w), dtype=np.uint8)
cy, cx = h / 2, w / 2
Y, X = np.ogrid[:h, :w]
dist = np.sqrt(((X - cx) / cx) ** 2 + ((Y - cy) / cy) ** 2)
mask = np.clip(1.0 - dist * 0.85, 0.05, 1.0)
data = (data * mask).astype(np.uint8)
alpha = np.full((h, w), 200, dtype=np.uint8)
rgba = np.stack([data, data, data, alpha], axis=-1)
img = QImage(rgba.tobytes(), w, h, 4 * w, QImage.Format_RGBA8888)
return img.copy()
# ==============================================================================
class MriViewerWidget(QWidget):
"""
Dark MRI image viewer - pure QPainter.
Ctrl + left-drag moves the slice square within the viewer plane.
The widget emits slice_offset_changed(ax0, ax1, d0, d1) so ScanningTab
can accumulate a shared 3-D offset vector and push it back to all viewers.
"""
# slice_offset_changed(physical_axis_u, physical_axis_v, delta_u, delta_v)
slice_offset_changed = Signal(int, int, float, float)
# rotation_delta(drx_deg, dry_deg) - Ctrl+drag gamedev-style rotation
rotation_delta = Signal(float, float)
_IDENTITY = [[1, 0, 0], [0, 1, 0], [0, 0, 1]]
def __init__(self, label: str = "Axial", parent: QWidget | None = None) -> None:
super().__init__(parent)
self._label = label
self._scan_y = 0
self._active_scan = False
self._rot_matrix = [row[:] for row in self._IDENTITY]
self._slice_offset = [0.0, 0.0, 0.0] # 3-D offset in logical [-1, 1] space
self._drag_pos = None # QPointF while dragging
self._drag_mode = None # 'move' | 'rotate'
self._noise = _generate_noise_image()
self.setMinimumSize(120, 120)
self.setMouseTracking(True) # needed for cursor updates without button
# -- public API ---------------------------------------------------------
def set_scanning(self, active: bool) -> None:
self._active_scan = active
self.update()
def set_rotation_matrix(self, R: list) -> None:
self._rot_matrix = R
self.update()
def set_slice_offset(self, offset: list) -> None:
self._slice_offset = list(offset)
self.update()
def advance_scanline(self, phase: int) -> None:
if self.height() == 0:
return
self._scan_y = int(phase / _SCAN_SWEEP_DURATION * self.height())
self.update()
# -- mouse interaction --------------------------------------------------
# LMB drag -> move slice (translate)
# Ctrl + LMB drag -> rotate slice (gamedev orbit: dx=Ry yaw, dy=Rx pitch)
def _pixel_scale(self) -> float:
return min(self.width(), self.height()) * 0.46
def _hover_cursor(self, modifiers) -> Qt.CursorShape:
return Qt.OpenHandCursor if (modifiers & Qt.ControlModifier) else Qt.SizeAllCursor
def mousePressEvent(self, event) -> None: # noqa: N802
if event.button() == Qt.LeftButton:
self._drag_pos = event.position()
self._drag_mode = "rotate" if (event.modifiers() & Qt.ControlModifier) else "move"
self.setCursor(Qt.ClosedHandCursor if self._drag_mode == "rotate" else Qt.SizeAllCursor)
event.accept()
else:
super().mousePressEvent(event)
def mouseMoveEvent(self, event) -> None: # noqa: N802
if not (event.buttons() & Qt.LeftButton):
self.setCursor(self._hover_cursor(event.modifiers()))
if self._drag_pos is not None and (event.buttons() & Qt.LeftButton):
pos = event.position()
dx = pos.x() - self._drag_pos.x()
dy = pos.y() - self._drag_pos.y()
if self._drag_mode == "move":
scale = self._pixel_scale()
if scale > 0:
ax0, ax1 = _VIEWER_AXES[self._label]
self.slice_offset_changed.emit(ax0, ax1, dx / scale, -dy / scale)
else: # rotate - gamedev FPS orbit
# horizontal drag -> yaw (Ry)
# vertical drag -> pitch (Rx); screen-down = positive pitch
self.rotation_delta.emit(
dy * _ROT_SENSITIVITY, # drx
dx * _ROT_SENSITIVITY, # dry
)
self._drag_pos = pos
event.accept()
else:
super().mouseMoveEvent(event)
def mouseReleaseEvent(self, event) -> None: # noqa: N802
if event.button() == Qt.LeftButton and self._drag_pos is not None:
self._drag_pos = None
self._drag_mode = None
self.setCursor(self._hover_cursor(event.modifiers()))
event.accept()
else:
super().mouseReleaseEvent(event)
# -- painting -----------------------------------------------------------
def paintEvent(self, event) -> None: # noqa: N802
p = QPainter(self)
rc = self.rect()
w, h = rc.width(), rc.height()
# 1 - background
p.fillRect(rc, QColor(_IMAGE_BG))
# 2 - noise texture
p.setOpacity(0.9)
p.drawImage(rc, self._noise)
p.setOpacity(1.0)
# 3 - crosshair
cx, cy = w // 2, h // 2
p.setPen(QPen(QColor("#444444"), 1))
p.drawLine(cx - int(w * 0.30), cy, cx + int(w * 0.30), cy)
p.drawLine(cx, cy - int(h * 0.30), cx, cy + int(h * 0.30))
# 4 - scale ticks (3 per edge at 25/50/75 %)
p.setPen(QPen(QColor("#888888"), 1))
tick = 8
for frac in (0.25, 0.50, 0.75):
tx, ty = int(w * frac), int(h * frac)
p.drawLine(tx, 0, tx, tick)
p.drawLine(tx, h - tick, tx, h)
p.drawLine(0, ty, tick, ty)
p.drawLine(w - tick, ty, w, ty)
# 5 - dim outer border
p.setPen(QPen(QColor(_ACCENT_DIM), 1))
p.drawRect(rc.adjusted(2, 2, -2, -2))
# 6 - projected slice square (with 3-D offset applied)
scale = self._pixel_scale()
ax0, ax1 = _VIEWER_AXES[self._label]
off_u = self._slice_offset[ax0]
off_v = self._slice_offset[ax1]
origin_x = cx + off_u * scale
origin_y = cy - off_v * scale # Y-up -> Y-down
corners_uv = _project_slice_quad(self._rot_matrix, self._label)
pts = QPolygonF([
QPointF(origin_x + u * scale, origin_y - v * scale)
for u, v in corners_uv
])
# semi-transparent fill
p.setPen(Qt.NoPen)
p.setBrush(QColor(240, 192, 64, 45))
p.drawPolygon(pts)
# solid border
pen = QPen(QColor(_ACCENT), 2)
pen.setJoinStyle(Qt.MiterJoin)
p.setPen(pen)
p.setBrush(Qt.NoBrush)
p.drawPolygon(pts)
# 7 - scan sweep stripe
if self._active_scan:
sy = self._scan_y
grad = QLinearGradient(0, sy, w, sy)
grad.setColorAt(0.0, QColor(0, 0, 0, 0))
grad.setColorAt(0.5, QColor(255, 255, 255, 96))
grad.setColorAt(1.0, QColor(0, 0, 0, 0))
p.fillRect(0, max(0, sy - 2), w, 5, grad)
# 8 - orientation label
font = QFont("Arial", 11, QFont.Bold)
p.setFont(font)
p.setPen(Qt.white)
p.drawText(rc.adjusted(8, 4, 0, 0), Qt.AlignTop | Qt.AlignLeft, self._label)
p.end()
# ==============================================================================
class ProtocolListWidget(QWidget):
protocol_selected = Signal(str)
def __init__(self, parent: QWidget | None = None) -> None:
super().__init__(parent)
self.setStyleSheet(f"background: {_PANEL_BG};")
lay = QVBoxLayout(self)
lay.setContentsMargins(4, 8, 4, 4)
lay.setSpacing(4)
self._header_lbl = QLabel(i18n.tr("protocols_header"))
self._header_lbl.setStyleSheet(
"color: #aaaaaa; font-weight: bold; font-size: 11px; background: transparent;"
)
lay.addWidget(self._header_lbl)
self._list = QListWidget()
self._list.setStyleSheet(f"""
QListWidget {{
background: {_PANEL_BG};
color: #ffffff;
border: 1px solid #3a3a3a;
font-size: 12px;
outline: 0;
}}
QListWidget::item {{
padding: 6px 8px;
border-bottom: 1px solid #333333;
}}
QListWidget::item:selected {{
background: {_ORANGE_SEL};
color: #ffffff;
}}
QListWidget::item:hover:!selected {{
background: #3a3a3a;
}}
""")
for name in _PROTOCOLS:
self._list.addItem(name)
self._list.setCurrentRow(0)
self._list.currentTextChanged.connect(self.protocol_selected)
lay.addWidget(self._list, stretch=1)
def retranslate_ui(self) -> None:
self._header_lbl.setText(i18n.tr("protocols_header"))
# ==============================================================================
class _ScanPipelineWorker(QThread):
"""
Full pipeline: health-check → load scenario (with seq_file / seq_info) →
run_all → poll → extract raw data → emit raw_data_ready.
The orchestrator is responsible for:
- Forwarding the .seq file to seq-interp for interpretation
- Running the full measurement scenario (spectrometer, reconstructor, …)
The GUI never calls seq-interp or any other microservice directly.
"""
progress = Signal(str)
job_started = Signal(str)
finished = Signal(str)
error = Signal(str)
raw_data_ready = Signal(str) # absolute path to temp JSON file
def __init__(
self,
seq_file_path: str | None,
seq_info: dict | None,
orchestrator_url: str,
scenario_id: str = "full_pipeline",
protocol: str = "",
parent=None,
) -> None:
super().__init__(parent)
self._seq_file = seq_file_path
self._seq_info = dict(seq_info) if seq_info else {}
self._scenario_id = scenario_id
self._protocol = protocol
from src.clients.orchestrator_client import OrchestratorClient
self._orch = OrchestratorClient(orchestrator_url)
def run(self) -> None:
try:
self._run_pipeline()
except Exception as exc:
if not self.isInterruptionRequested():
self.error.emit(str(exc))
def _run_pipeline(self) -> None:
self.progress.emit("Проверка оркестратора...")
if not self._orch.healthcheck():
raise RuntimeError("Оркестратор недоступен — проверьте, что сервис запущен")
self.progress.emit(
f"Отправка задания в оркестратор "
f"[сценарий: {self._scenario_id}, ИП: {self._protocol or '—'}]…"
)
job_id = self._orch.scan(
seq_file_path=self._seq_file or None,
seq_info=self._seq_info or None,
scenario_id=self._scenario_id,
protocol=self._protocol,
)
self.job_started.emit(job_id)
self.progress.emit(f" Job {job_id[:8]}… запущен, ожидание результата…")
def _on_status(status: str) -> None:
self.progress.emit(f" [{status}]")
final = self._orch.poll_scan(
job_id,
timeout=300.0,
poll_interval=2.0,
progress_cb=_on_status,
interrupted_fn=self.isInterruptionRequested,
)
steps = final.get("steps", [])
meas_id = None
raw_data = None
for step in steps:
name = step.get("name")
res = step.get("result") or {}
if name == "start_measurement":
meas_id = res.get("measurement_id")
if name == "fetch_data":
raw_data = res.get("data")
self.progress.emit(f" Сканирование завершено (meas_id={meas_id})")
is_stub = str(meas_id) in ("", "None", "meas_stub") or meas_id is None
if raw_data and not is_stub:
raw_path = self._save_raw(raw_data, meas_id)
if raw_path:
self.raw_data_ready.emit(raw_path)
self.finished.emit(f"job завершён (meas_id={meas_id})")
def _save_raw(self, data, meas_id) -> str | None:
import tempfile, time as _t
fname = f"scan_raw_{meas_id}_{int(_t.time())}.json"
fpath = os.path.join(tempfile.gettempdir(), fname)
try:
with open(fpath, "w", encoding="utf-8") as fh:
json.dump(data, fh)
self.progress.emit(f" Данные сохранены: {fname}")
return fpath
except Exception as exc:
self.progress.emit(f" Не удалось сохранить данные: {exc}")
return None
# ==============================================================================
class _ScanWorker(QThread):
"""Fire-and-forget: load scenario and run_all via orchestrator REST."""
finished = Signal(str) # job_id
error = Signal(str)
def __init__(self, url: str, info: dict, parent=None) -> None:
super().__init__(parent)
self._url = url.rstrip("/")
self._info = info
def run(self) -> None:
try:
if not _HAS_HTTPX:
import urllib.request, urllib.error
self._run_urllib()
else:
self._run_httpx()
except Exception as exc:
self.error.emit(str(exc))
def _run_httpx(self) -> None:
payload = {"param_overrides": {"start_measurement": {"info": self._info}}}
with _httpx.Client(timeout=15) as client:
r = client.post(
f"{self._url}/scenario/load/full_pipeline",
json=payload,
)
r.raise_for_status()
job_id = r.json().get("job_id", "?")
r2 = client.post(f"{self._url}/scenario/{job_id}/run_all")
r2.raise_for_status()
self.finished.emit(job_id)
def _run_urllib(self) -> None:
import urllib.request
payload = {"param_overrides": {"start_measurement": {"info": self._info}}}
data = json.dumps(payload).encode()
headers = {"Content-Type": "application/json"}
req = urllib.request.Request(
f"{self._url}/scenario/load/full_pipeline",
data=data, headers=headers, method="POST",
)
with urllib.request.urlopen(req, timeout=15) as resp:
body = json.loads(resp.read())
job_id = body.get("job_id", "?")
req2 = urllib.request.Request(
f"{self._url}/scenario/{job_id}/run_all",
data=b"{}", headers=headers, method="POST",
)
with urllib.request.urlopen(req2, timeout=15):
pass
self.finished.emit(job_id)
# ==============================================================================
class ScanningTab(QWidget):
"""
Operator-facing scanning tab.
Bottom QTabWidget tabs match Siemens syngo layout:
Основные | Контраст | Разрешение | Геометрия | Система
"Геометрия" holds orientation presets + Rx/Ry/Rz spinboxes + live 3x3 matrix.
"""
scan_job_started = Signal(str) # job_id — emitted once the orchestrator accepts the job
raw_data_ready = Signal(str) # absolute path to temp JSON file
def __init__(self, parent: QWidget | None = None) -> None:
super().__init__(parent)
self.setStyleSheet(f"background: {_BG_DARK};")
self._viewers: list[MriViewerWidget] = []
self._scan_tick: int = 0
self._seq_info: dict | None = None
self._seq_file_path: str | None = None
self._orchestrator_url: str = "http://localhost:1717"
self._scan_worker: _ScanPipelineWorker | None = None
self._active_protocol: str = _PROTOCOLS[0]
self._scenario_id: str = "full_pipeline"
self._slice_offset: list[float] = [0.0, 0.0, 0.0]
root = QVBoxLayout(self)
root.setContentsMargins(0, 0, 0, 0)
root.setSpacing(0)
vsplit = QSplitter(Qt.Vertical)
vsplit.setStyleSheet("QSplitter::handle { background: #333344; height: 3px; }")
vsplit.addWidget(self._build_upper_area())
vsplit.addWidget(self._build_bottom_panel())
vsplit.setSizes([700, 140])
vsplit.setChildrenCollapsible(False)
root.addWidget(vsplit, stretch=1)
self._setup_animation()
self._update_matrix_display()
self._update_slice_display()
self._update_scan_ready_state()
# -- public API ---------------------------------------------------------
def apply_seq_info(self, info_dict: dict) -> None:
"""Receive exported sequence info from SeqInterpTab and auto-start scan."""
self._seq_info = dict(info_dict)
self._seq_file_path = None
self._update_scan_ready_state()
label = info_dict.get("infostr") or "sequence"
self._log(f"Получены параметры последовательности: {label}", "INFO")
if not self._btn_scan.isChecked():
self._btn_scan.setChecked(True)
def set_orchestrator_url(self, url: str) -> None:
self._orchestrator_url = url
def retranslate_ui(self) -> None:
self._protocol_list.retranslate_ui()
# param tabs: 0-3 = placeholders, 4 = geometry
_keys = ["tab_main", "tab_contrast", "tab_resolution", "tab_system", "tab_geometry"]
for idx, key in enumerate(_keys):
self._param_tabs.setTabText(idx, i18n.tr(key))
self._preset_group.setTitle(i18n.tr("grp_orientation"))
self._rot_group.setTitle(i18n.tr("grp_rotation"))
self._matrix_group.setTitle(i18n.tr("grp_rot_matrix"))
# scan button: keep text in sync with current scan state
if self._scan_timer.isActive():
self._btn_scan.setText(i18n.tr("btn_stop_scan"))
else:
self._btn_scan.setText(i18n.tr("btn_scan"))
self._update_scan_ready_state()
# -- layout builders ----------------------------------------------------
def _build_upper_area(self) -> QWidget:
container = QWidget()
container.setStyleSheet(f"background: {_BG_DARK};")
lay = QVBoxLayout(container)
lay.setContentsMargins(0, 0, 0, 0)
lay.setSpacing(0)
hsplit = QSplitter(Qt.Horizontal)
hsplit.setStyleSheet("QSplitter::handle { background: #333344; width: 3px; }")
hsplit.addWidget(self._build_protocol_panel())
hsplit.addWidget(self._build_image_grid())
hsplit.setSizes([220, 980])
hsplit.setChildrenCollapsible(False)
lay.addWidget(hsplit)
return container
def _build_protocol_panel(self) -> QWidget:
container = QWidget()
container.setStyleSheet(f"background: {_PANEL_BG};")
lay = QVBoxLayout(container)
lay.setContentsMargins(0, 0, 0, 0)
lay.setSpacing(0)
self._protocol_list = ProtocolListWidget()
self._protocol_list.protocol_selected.connect(self._on_protocol_selected)
lay.addWidget(self._protocol_list, stretch=1)
sep = QFrame()
sep.setFrameShape(QFrame.HLine)
sep.setFixedHeight(1)
sep.setStyleSheet("background: #3a3a4a; border: none;")
lay.addWidget(sep)
# Scenario selector
sc_container = QWidget()
sc_container.setStyleSheet(f"background: {_PANEL_BG};")
sc_lay = QVBoxLayout(sc_container)
sc_lay.setContentsMargins(6, 6, 6, 4)
sc_lay.setSpacing(4)
sc_header = QLabel("Сценарий оркестратора")
sc_header.setStyleSheet(
"color: #7777aa; font-size: 10px; font-weight: bold; background: transparent;"
)
sc_lay.addWidget(sc_header)
sc_row = QHBoxLayout()
sc_row.setSpacing(4)
_combo_style = (
"QComboBox {"
f" background: {_BTN_BG}; color: #ccccee;"
" border: 1px solid #444466; border-radius: 3px;"
" font-size: 11px; padding: 3px 6px;"
"}"
"QComboBox::drop-down { border: none; width: 16px; }"
"QComboBox QAbstractItemView {"
f" background: {_BTN_BG}; color: #ccccee; border: 1px solid #444466;"
" selection-background-color: #e65100;"
"}"
)
self._scenario_combo = QComboBox()
self._scenario_combo.setStyleSheet(_combo_style)
self._scenario_combo.setToolTip("Тип сценария, который будет запущен в оркестраторе")
self._scenario_combo.addItem("full_pipeline")
self._scenario_combo.currentTextChanged.connect(self._on_scenario_selected)
sc_row.addWidget(self._scenario_combo, stretch=1)
btn_refresh_sc = QPushButton("↻")
btn_refresh_sc.setFixedSize(24, 24)
btn_refresh_sc.setToolTip("Получить список сценариев из оркестратора")
btn_refresh_sc.setStyleSheet(
f"QPushButton {{ background: {_BTN_BG}; color: #7777aa;"
" border: 1px solid #444466; border-radius: 3px; font-size: 13px; }"
"QPushButton:hover { color: #ffffff; background: #303050; }"
)
btn_refresh_sc.clicked.connect(self._on_refresh_scenarios)
sc_row.addWidget(btn_refresh_sc)
sc_lay.addLayout(sc_row)
lay.addWidget(sc_container)
sep2 = QFrame()
sep2.setFrameShape(QFrame.HLine)
sep2.setFixedHeight(1)
sep2.setStyleSheet("background: #3a3a4a; border: none;")
lay.addWidget(sep2)
# .seq file loader
btn_load = QPushButton("Загрузить .seq…")
btn_load.setToolTip("Выбрать готовый .seq файл для запуска полного пайплайна")
btn_load.setStyleSheet(
"QPushButton {"
f" background: {_BTN_BG}; color: #aaaacc;"
" border: 1px solid #444466; border-radius: 3px;"
" font-size: 11px; padding: 5px 8px; margin: 6px 6px 2px 6px;"
"}"
"QPushButton:hover { background: #303050; color: #ffffff; }"
)
btn_load.clicked.connect(self._on_load_seq_clicked)
lay.addWidget(btn_load)
self._lbl_seq_file = QLabel("Файл не выбран")
self._lbl_seq_file.setWordWrap(True)
self._lbl_seq_file.setStyleSheet(
"color: #555577; font-size: 10px; background: transparent;"
"padding: 0 8px 6px 8px;"
)
lay.addWidget(self._lbl_seq_file)
return container
def _build_image_grid(self) -> QWidget:
container = QWidget()
container.setStyleSheet(f"background: {_BG_DARK};")
grid = QGridLayout(container)
grid.setContentsMargins(6, 6, 6, 6)
grid.setSpacing(6)
for col, lbl in enumerate(("Axial", "Cor", "Sag")):
viewer = MriViewerWidget(label=lbl)
viewer.slice_offset_changed.connect(self._on_slice_offset_changed)
viewer.rotation_delta.connect(self._on_rotation_delta)
grid.addWidget(viewer, 0, col)
grid.setColumnStretch(col, 1)
self._viewers.append(viewer)
grid.setRowStretch(0, 1)
return container
def _build_bottom_panel(self) -> QWidget:
panel = QWidget()
panel.setStyleSheet("background: #16162a; border-top: 1px solid #333355;")
outer = QVBoxLayout(panel)
outer.setContentsMargins(0, 0, 0, 0)
outer.setSpacing(0)
# -- parameter tabs -------------------------------------------------
self._param_tabs = QTabWidget()
self._param_tabs.setStyleSheet("""
QTabWidget::pane { border: none; background: #16162a; }
QTabBar::tab {
background: #1e1e38; color: #aaaacc;
padding: 5px 14px; border: 1px solid #333355;
border-bottom: none; margin-right: 2px; font-size: 11px;
}
QTabBar::tab:selected { background: #252545; color: #ffffff; }
QTabBar::tab:hover:!selected { background: #222240; }
""")
_placeholder_keys = ["tab_main", "tab_contrast", "tab_resolution", "tab_system"]
for key in _placeholder_keys:
w = QLabel(f"[ {i18n.tr(key)} — TODO ]")
w.setAlignment(Qt.AlignCenter)
w.setStyleSheet("color: #555577; background: #16162a;")
self._param_tabs.addTab(w, i18n.tr(key))
geo_tab = self._build_geometry_tab()
self._param_tabs.addTab(geo_tab, i18n.tr("tab_geometry"))
log_tab = self._build_log_tab()
self._param_tabs.addTab(log_tab, "Лог")
self._param_tabs.setCurrentWidget(geo_tab)
outer.addWidget(self._param_tabs, stretch=1)
# -- bottom action bar ----------------------------------------------
action_bar = QWidget()
action_bar.setStyleSheet("background: #16162a; border-top: 1px solid #2a2a4a;")
action_lay = QHBoxLayout(action_bar)
action_lay.setContentsMargins(12, 6, 12, 6)
action_lay.setSpacing(12)
self._status_label = QLabel(i18n.tr("no_data"))
self._status_label.setStyleSheet("color: #666688; font-size: 11px;")
action_lay.addWidget(self._status_label)
action_lay.addStretch()
self._btn_scan = QPushButton(i18n.tr("btn_scan"))
self._btn_scan.setCheckable(True)
self._btn_scan.setMinimumWidth(140)
self._btn_scan.setStyleSheet(
"QPushButton {"
" background: #1a3a1a; color: #88ee88;"
" border: 1px solid #336633; border-radius: 4px;"
" font-size: 12px; font-weight: bold; padding: 5px 16px;"
"}"
"QPushButton:checked {"
" background: #3a1a1a; color: #ee8888; border-color: #663333;"
"}"
"QPushButton:hover:!checked { background: #1e4a1e; }"
)
self._btn_scan.toggled.connect(self._on_scan_toggled)
action_lay.addWidget(self._btn_scan)
outer.addWidget(action_bar)
return panel
def _build_geometry_tab(self) -> QWidget:
w = QWidget()
w.setStyleSheet("background: #16162a;")
lay = QHBoxLayout(w)
lay.setContentsMargins(12, 8, 12, 8)
lay.setSpacing(16)
# -- orientation presets --------------------------------------------
self._preset_group = QGroupBox(i18n.tr("grp_orientation"))
preset_group = self._preset_group
preset_group.setStyleSheet(self._group_style())
preset_lay = QVBoxLayout(preset_group)
preset_lay.setSpacing(4)
self._btn_group = QButtonGroup(self)
self._btn_group.setExclusive(True)
self._preset_buttons: dict[str, QPushButton] = {}
for name in ("Axial", "Coronal", "Sagittal"):
btn = QPushButton(name)
btn.setCheckable(True)
btn.setStyleSheet(self._preset_btn_style())
preset_lay.addWidget(btn)
self._btn_group.addButton(btn)
self._preset_buttons[name] = btn
btn.clicked.connect(lambda checked, n=name: self._on_preset(n))
self._preset_buttons["Axial"].setChecked(True)
lay.addWidget(preset_group)
# -- rotation angles ------------------------------------------------
self._rot_group = QGroupBox(i18n.tr("grp_rotation"))
rot_group = self._rot_group
rot_group.setStyleSheet(self._group_style())
form = QFormLayout(rot_group)
form.setSpacing(6)
form.setContentsMargins(8, 4, 8, 4)
spin_style = (
"QDoubleSpinBox {"
" background: #252540; color: #ddddff;"
" border: 1px solid #445; border-radius: 3px; padding: 2px 4px;"
"}"
"QDoubleSpinBox:focus { border-color: #f0c040; }"
)
lbl_style = "color: #aaaacc; font-size: 11px;"
self._spin_rx = QDoubleSpinBox()
self._spin_ry = QDoubleSpinBox()
self._spin_rz = QDoubleSpinBox()
for spin in (self._spin_rx, self._spin_ry, self._spin_rz):
spin.setRange(-180.0, 180.0)
spin.setSingleStep(1.0)
spin.setDecimals(1)
spin.setSuffix(" deg")
spin.setStyleSheet(spin_style)
spin.setMinimumWidth(90)
spin.valueChanged.connect(self._on_rotation_changed)
for lbl_text, spin in (("Rx", self._spin_rx), ("Ry", self._spin_ry), ("Rz", self._spin_rz)):
lbl = QLabel(lbl_text)
lbl.setStyleSheet(lbl_style)
form.addRow(lbl, spin)
lay.addWidget(rot_group)
# -- rotation matrix display ----------------------------------------
self._matrix_group = QGroupBox(i18n.tr("grp_rot_matrix"))
matrix_group = self._matrix_group
matrix_group.setStyleSheet(self._group_style())
matrix_lay = QVBoxLayout(matrix_group)
matrix_lay.setContentsMargins(8, 4, 8, 4)
self._matrix_label = QLabel()
self._matrix_label.setFont(QFont("Courier New", 10))
self._matrix_label.setStyleSheet("color: #99ccff; background: transparent;")
self._matrix_label.setAlignment(Qt.AlignLeft | Qt.AlignVCenter)
matrix_lay.addWidget(self._matrix_label)
lay.addWidget(matrix_group)
lay.addStretch()
return w
def _build_log_tab(self) -> QWidget:
w = QWidget()
w.setStyleSheet("background: #16162a;")
lay = QVBoxLayout(w)
lay.setContentsMargins(6, 6, 6, 4)
lay.setSpacing(4)
self._log_view = QTextEdit()
self._log_view.setReadOnly(True)
self._log_view.setFont(QFont("Courier New", 9))
self._log_view.setStyleSheet(
"QTextEdit {"
" background: #0e0e1c; color: #aaaacc;"
" border: 1px solid #2a2a4a; border-radius: 3px;"
"}"
)
lay.addWidget(self._log_view, stretch=1)
btn_clear = QPushButton("Очистить")
btn_clear.setFixedWidth(90)
btn_clear.setStyleSheet(
"QPushButton {"
f" background: {_BTN_BG}; color: #777799;"
" border: 1px solid #333355; border-radius: 3px;"
" font-size: 10px; padding: 3px 8px;"
"}"
"QPushButton:hover { color: #aaaacc; }"
)
btn_clear.clicked.connect(self._log_view.clear)
lay.addWidget(btn_clear, alignment=Qt.AlignRight)
return w
_LOG_COLORS = {"INFO": "#aaaacc", "WARN": "#e6a817", "ERR": "#ee4444"}
def _log(self, msg: str, level: str = "INFO") -> None:
if not hasattr(self, "_log_view"):
return
from datetime import datetime
ts = datetime.now().strftime("%H:%M:%S")
color = self._LOG_COLORS.get(level, "#aaaacc")
line = (
f"{ts}"
f" [{level}]"
f" {msg}"
)
self._log_view.append(line)
sb = self._log_view.verticalScrollBar()
sb.setValue(sb.maximum())
@staticmethod
def _group_style() -> str:
return (
"QGroupBox {"
" color: #8888aa; font-size: 10px; font-weight: bold;"
" border: 1px solid #333355; border-radius: 4px; margin-top: 6px;"
"}"
"QGroupBox::title { subcontrol-origin: margin; left: 6px; top: -1px; }"
)
@staticmethod
def _preset_btn_style() -> str:
return (
"QPushButton {"
f" background: {_BTN_BG}; color: #ccccee;"
" border: 1px solid #444466; border-radius: 3px;"
" font-size: 11px; padding: 4px 10px;"
"}"
"QPushButton:checked { background: #e65100; color: #ffffff; border-color: #ff7733; }"
"QPushButton:hover:!checked { background: #303050; }"
)
# -- animation ----------------------------------------------------------
def _setup_animation(self) -> None:
self._scan_timer = QTimer(self)
self._scan_timer.setInterval(_SCAN_LINE_INTERVAL_MS)
self._scan_timer.timeout.connect(self._on_tick)
def _on_tick(self) -> None:
self._scan_tick += 1
offset = _SCAN_SWEEP_DURATION // max(len(self._viewers), 1)
for i, viewer in enumerate(self._viewers):
phase = (self._scan_tick + i * offset) % _SCAN_SWEEP_DURATION
viewer.advance_scanline(phase)
# -- rotation logic -----------------------------------------------------
def _on_preset(self, name: str) -> None:
rx, ry, rz = _PRESETS[name]
for spin in (self._spin_rx, self._spin_ry, self._spin_rz):
spin.blockSignals(True)
self._spin_rx.setValue(rx)
self._spin_ry.setValue(ry)
self._spin_rz.setValue(rz)
for spin in (self._spin_rx, self._spin_ry, self._spin_rz):
spin.blockSignals(False)
self._on_rotation_changed()
def _on_rotation_changed(self) -> None:
self._update_matrix_display()
self._update_slice_display()
self._sync_preset_buttons()
def _compute_rotation_matrix(self) -> list:
return _euler_to_matrix(
self._spin_rx.value(),
self._spin_ry.value(),
self._spin_rz.value(),
)
def _update_matrix_display(self) -> None:
R = self._compute_rotation_matrix()
lines = []
for row in R:
lines.append(" ".join(f"{v:+.3f}" for v in row))
self._matrix_label.setText("\n".join(lines))
def _update_slice_display(self) -> None:
"""Push current rotation matrix and offset to all viewers."""
R = self._compute_rotation_matrix()
for v in self._viewers:
v.set_rotation_matrix(R)
v.set_slice_offset(self._slice_offset)
def _on_slice_offset_changed(self, ax0: int, ax1: int, d0: float, d1: float) -> None:
"""Accumulate drag delta from any viewer into the shared 3-D offset."""
self._slice_offset[ax0] = max(-0.95, min(0.95, self._slice_offset[ax0] + d0))
self._slice_offset[ax1] = max(-0.95, min(0.95, self._slice_offset[ax1] + d1))
R = self._compute_rotation_matrix()
for v in self._viewers:
v.set_slice_offset(self._slice_offset)
v.set_rotation_matrix(R)
def _on_rotation_delta(self, drx: float, dry: float) -> None:
"""Gamedev-style Ctrl+drag: apply incremental Rx/Ry rotation from any viewer."""
def _wrap(val: float) -> float:
while val > 180.0: val -= 360.0
while val < -180.0: val += 360.0
return val
for spin, delta in ((self._spin_rx, drx), (self._spin_ry, dry)):
spin.blockSignals(True)
spin.setValue(_wrap(spin.value() + delta))
spin.blockSignals(False)
self._on_rotation_changed()
def _sync_preset_buttons(self) -> None:
"""Check if current angles match a known preset; highlight button if so."""
rx = round(self._spin_rx.value(), 1)
ry = round(self._spin_ry.value(), 1)
rz = round(self._spin_rz.value(), 1)
matched = None
for name, (prx, pry, prz) in _PRESETS.items():
if (rx, ry, rz) == (prx, pry, prz):
matched = name
break
for name, btn in self._preset_buttons.items():
btn.blockSignals(True)
btn.setChecked(name == matched)
btn.blockSignals(False)
# -- .seq file loading --------------------------------------------------
def _on_load_seq_clicked(self) -> None:
path, _ = QFileDialog.getOpenFileName(
self,
"Выбрать .seq файл",
os.path.join(os.path.dirname(__file__), os.pardir, os.pardir),
"Pulseq файлы (*.seq);;Все файлы (*)",
)
if not path:
return
self._seq_file_path = path
self._seq_info = None
fname = os.path.basename(path)
self._lbl_seq_file.setText(fname)
self._lbl_seq_file.setStyleSheet(
"color: #e65100; font-size: 10px; background: transparent;"
"padding: 0 8px 6px 8px;"
)
self._log(f"Загружен файл: {fname}", "INFO")
self._update_scan_ready_state()
# -- scan initiation ----------------------------------------------------
def _on_scan_toggled(self, checked: bool) -> None:
if checked:
if self._scan_worker is not None and self._scan_worker.isRunning():
return
if self._seq_info is None and self._seq_file_path is None:
QMessageBox.warning(
self, i18n.tr("dlg_no_data_title"),
i18n.tr("dlg_no_seq_msg"),
)
self._btn_scan.setChecked(False)
return
info = dict(self._seq_info) if self._seq_info else {}
if info:
info["rotation_matrix"] = self._compute_rotation_matrix()
info["slice_position"] = list(self._slice_offset)
self._scan_worker = _ScanPipelineWorker(
seq_file_path=self._seq_file_path,
seq_info=info if info else None,
orchestrator_url=self._orchestrator_url,
scenario_id=self._scenario_id,
protocol=self._active_protocol,
parent=self,
)
self._scan_worker.job_started.connect(self.scan_job_started)
self._scan_worker.finished.connect(self._on_scan_done)
self._scan_worker.error.connect(self._on_scan_error)
self._scan_worker.progress.connect(self._on_scan_progress)
self._scan_worker.raw_data_ready.connect(self._on_raw_data_ready_log)
self._scan_worker.raw_data_ready.connect(self.raw_data_ready)
self._scan_worker.start()
self._scan_timer.start()
self._btn_scan.setText(i18n.tr("btn_stop_scan"))
self._status_label.setText("Инициализация пайплайна…")
self._status_label.setStyleSheet("color: #88ee88; font-size: 11px;")
self._log("--- Запуск пайплайна сканирования ---", "INFO")
for v in self._viewers:
v.set_scanning(True)
# Switch to Log tab so the user can follow progress
log_idx = self._param_tabs.indexOf(self._log_view.parent())
if log_idx >= 0:
self._param_tabs.setCurrentIndex(log_idx)
else:
self._scan_timer.stop()
if self._scan_worker and self._scan_worker.isRunning():
self._scan_worker.requestInterruption()
self._btn_scan.setText(i18n.tr("btn_scan"))
for v in self._viewers:
v.set_scanning(False)
self._update_scan_ready_state()
def _on_raw_data_ready_log(self, path: str) -> None:
self._log(f"Данные отправлены во вкладку Spectroscopy: {os.path.basename(path)}", "INFO")
def _on_scan_progress(self, msg: str) -> None:
self._status_label.setText(msg[:80])
self._status_label.setStyleSheet("color: #88ee88; font-size: 11px;")
self._log(msg, "INFO")
def _on_scan_done(self, msg: str) -> None:
self._scan_timer.stop()
self._status_label.setText("Готово")
self._status_label.setStyleSheet("color: #66ccff; font-size: 11px;")
self._log(msg, "INFO")
for v in self._viewers:
v.set_scanning(False)
self._btn_scan.setChecked(False)
def _on_scan_error(self, err: str) -> None:
self._scan_timer.stop()
self._status_label.setText(f"{i18n.tr('error_prefix')}: {err[:70]}")
self._status_label.setStyleSheet("color: #ee4444; font-size: 11px;")
self._log(err, "ERR")
for v in self._viewers:
v.set_scanning(False)
self._btn_scan.setChecked(False)
def _update_scan_ready_state(self) -> None:
if self._seq_file_path:
fname = os.path.basename(self._seq_file_path)
self._status_label.setText(f"Файл: {fname}")
self._status_label.setStyleSheet("color: #e65100; font-size: 11px;")
elif self._seq_info is not None:
self._status_label.setText(i18n.tr("ready_to_scan"))
self._status_label.setStyleSheet("color: #e65100; font-size: 11px;")
else:
self._status_label.setText(i18n.tr("no_data"))
self._status_label.setStyleSheet("color: #666688; font-size: 11px;")
# -- scenario selection -------------------------------------------------
def _on_scenario_selected(self, name: str) -> None:
if name:
self._scenario_id = name
def _on_refresh_scenarios(self) -> None:
import httpx
try:
r = httpx.get(
f"{self._orchestrator_url}/scenario/list",
timeout=5.0,
)
if r.is_success:
scenarios: list[str] = r.json().get("scenarios", [])
if scenarios:
current = self._scenario_combo.currentText()
self._scenario_combo.blockSignals(True)
self._scenario_combo.clear()
for s in scenarios:
self._scenario_combo.addItem(s)
idx = self._scenario_combo.findText(current)
self._scenario_combo.setCurrentIndex(max(idx, 0))
self._scenario_combo.blockSignals(False)
self._scenario_id = self._scenario_combo.currentText()
self._log(f"Сценарии загружены: {scenarios}", "INFO")
else:
self._log("Оркестратор вернул пустой список сценариев", "WARN")
else:
self._log(f"Ошибка загрузки сценариев: HTTP {r.status_code}", "WARN")
except Exception as exc:
self._log(f"Не удалось подключиться к оркестратору: {exc}", "WARN")
# -- protocol selection -------------------------------------------------
def _on_protocol_selected(self, name: str) -> None:
self._active_protocol = name