| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196 |
- """
- ScanningTab - clinical MRI scanner UI simulation with real scan initiation.
- Panels:
- - Left: scrollable protocol queue (ProtocolListWidget)
- - Centre: 3 MRI image viewers (MriViewerWidget x 3)
- - Bottom: QTabWidget with parameter tabs; "Геометрия" tab holds rotation controls
- Rotation matrix (3x3, ZYX Euler) is merged into the seq_info dict and sent
- to the orchestrator's start_measurement step as "rotation_matrix".
- """
- from __future__ import annotations
- import math
- import json
- import os
- import numpy as np
- from PySide6.QtCore import Qt, QThread, QTimer, Signal
- from PySide6.QtGui import (
- QColor, QFont, QImage, QLinearGradient, QPainter, QPen, QPolygonF,
- )
- from PySide6.QtCore import QPointF
- from PySide6.QtWidgets import (
- QButtonGroup, QComboBox, QDoubleSpinBox, QFileDialog, QFormLayout, QFrame,
- QGridLayout, QGroupBox, QHBoxLayout, QLabel, QListWidget, QMessageBox,
- QPushButton, QSplitter, QTabWidget, QTextEdit, QVBoxLayout, QWidget,
- )
- try:
- import httpx as _httpx
- _HAS_HTTPX = True
- except ImportError:
- _HAS_HTTPX = False
- from src import i18n
- # -- colour palette -------------------------------------------------------------
- _BG_DARK = "#1a1a2e"
- _PANEL_BG = "#2a2a2a"
- _IMAGE_BG = "#000000"
- _ACCENT = "#f0c040"
- _ACCENT_DIM = "#555533"
- _ORANGE_SEL = "#e65100"
- _BTN_BG = "#252535"
- # -- animation -----------------------------------------------------------------
- _SCAN_LINE_INTERVAL_MS = 40 # 25 fps
- _SCAN_SWEEP_DURATION = 120 # ticks per vertical sweep
- # -- mouse interaction ---------------------------------------------------------
- _ROT_SENSITIVITY = 0.4 # degrees per pixel for Ctrl+drag rotation
- # -- default orientations (Rx, Ry, Rz in degrees) ------------------------------
- _PRESETS = {
- "Axial": (0.0, 0.0, 0.0),
- "Coronal": (90.0, 0.0, 0.0),
- "Sagittal": (0.0, 90.0, 0.0),
- }
- _PROTOCOLS = [
- "FID", "SE", "TSE"
- ]
- # Physical axes for each viewer plane: (horizontal_axis, vertical_axis)
- # X=0, Y=1, Z=2 in physical space; Y is flipped to screen coords in paintEvent
- _VIEWER_AXES: dict[str, tuple[int, int]] = {
- "Axial": (0, 1),
- "Cor": (0, 2),
- "Sag": (1, 2),
- }
- # ==============================================================================
- def _euler_to_matrix(rx_deg: float, ry_deg: float, rz_deg: float) -> list:
- """ZYX Euler angles (degrees) -> 3x3 rotation matrix as list-of-lists."""
- rx = math.radians(rx_deg)
- ry = math.radians(ry_deg)
- rz = math.radians(rz_deg)
- cx, sx = math.cos(rx), math.sin(rx)
- cy, sy = math.cos(ry), math.sin(ry)
- cz, sz = math.cos(rz), math.sin(rz)
- Rx = np.array([[1, 0, 0 ],
- [0, cx, -sx],
- [0, sx, cx]])
- Ry = np.array([[ cy, 0, sy],
- [ 0, 1, 0],
- [-sy, 0, cy]])
- Rz = np.array([[cz, -sz, 0],
- [sz, cz, 0],
- [0, 0, 1]])
- R = Rz @ Ry @ Rx
- return [[round(float(v), 6) for v in row] for row in R]
- def _project_slice_quad(R: list, viewer_label: str, size: float = 0.42) -> list:
- """
- Project the scan-plane square onto a viewer's 2D projection plane.
- The scan square lies in the (freq-encode, phase-encode) plane.
- freq direction = R column 0, phase direction = R column 1.
- Viewer planes:
- "Axial" -> XY (physical axes 0, 1)
- "Cor" -> XZ (physical axes 0, 2)
- "Sag" -> YZ (physical axes 1, 2)
- Returns list of 4 (u, v) tuples in [-1, 1] space, Y-up convention.
- """
- ax0, ax1 = _VIEWER_AXES.get(viewer_label, (0, 1))
- # freq and phase unit vectors in physical space
- freq = [R[i][0] for i in range(3)]
- phase = [R[i][1] for i in range(3)]
- corners = []
- for sf, sp in ((size, size), (-size, size), (-size, -size), (size, -size)):
- pt3 = [freq[i] * sf + phase[i] * sp for i in range(3)]
- corners.append((pt3[ax0], pt3[ax1]))
- return corners
- def _generate_noise_image(w: int = 256, h: int = 256) -> QImage:
- rng = np.random.default_rng(42)
- data = rng.integers(0, 70, (h, w), dtype=np.uint8)
- cy, cx = h / 2, w / 2
- Y, X = np.ogrid[:h, :w]
- dist = np.sqrt(((X - cx) / cx) ** 2 + ((Y - cy) / cy) ** 2)
- mask = np.clip(1.0 - dist * 0.85, 0.05, 1.0)
- data = (data * mask).astype(np.uint8)
- alpha = np.full((h, w), 200, dtype=np.uint8)
- rgba = np.stack([data, data, data, alpha], axis=-1)
- img = QImage(rgba.tobytes(), w, h, 4 * w, QImage.Format_RGBA8888)
- return img.copy()
- # ==============================================================================
- class MriViewerWidget(QWidget):
- """
- Dark MRI image viewer - pure QPainter.
- Ctrl + left-drag moves the slice square within the viewer plane.
- The widget emits slice_offset_changed(ax0, ax1, d0, d1) so ScanningTab
- can accumulate a shared 3-D offset vector and push it back to all viewers.
- """
- # slice_offset_changed(physical_axis_u, physical_axis_v, delta_u, delta_v)
- slice_offset_changed = Signal(int, int, float, float)
- # rotation_delta(drx_deg, dry_deg) - Ctrl+drag gamedev-style rotation
- rotation_delta = Signal(float, float)
- _IDENTITY = [[1, 0, 0], [0, 1, 0], [0, 0, 1]]
- def __init__(self, label: str = "Axial", parent: QWidget | None = None) -> None:
- super().__init__(parent)
- self._label = label
- self._scan_y = 0
- self._active_scan = False
- self._rot_matrix = [row[:] for row in self._IDENTITY]
- self._slice_offset = [0.0, 0.0, 0.0] # 3-D offset in logical [-1, 1] space
- self._drag_pos = None # QPointF while dragging
- self._drag_mode = None # 'move' | 'rotate'
- self._noise = _generate_noise_image()
- self.setMinimumSize(120, 120)
- self.setMouseTracking(True) # needed for cursor updates without button
- # -- public API ---------------------------------------------------------
- def set_scanning(self, active: bool) -> None:
- self._active_scan = active
- self.update()
- def set_rotation_matrix(self, R: list) -> None:
- self._rot_matrix = R
- self.update()
- def set_slice_offset(self, offset: list) -> None:
- self._slice_offset = list(offset)
- self.update()
- def advance_scanline(self, phase: int) -> None:
- if self.height() == 0:
- return
- self._scan_y = int(phase / _SCAN_SWEEP_DURATION * self.height())
- self.update()
- # -- mouse interaction --------------------------------------------------
- # LMB drag -> move slice (translate)
- # Ctrl + LMB drag -> rotate slice (gamedev orbit: dx=Ry yaw, dy=Rx pitch)
- def _pixel_scale(self) -> float:
- return min(self.width(), self.height()) * 0.46
- def _hover_cursor(self, modifiers) -> Qt.CursorShape:
- return Qt.OpenHandCursor if (modifiers & Qt.ControlModifier) else Qt.SizeAllCursor
- def mousePressEvent(self, event) -> None: # noqa: N802
- if event.button() == Qt.LeftButton:
- self._drag_pos = event.position()
- self._drag_mode = "rotate" if (event.modifiers() & Qt.ControlModifier) else "move"
- self.setCursor(Qt.ClosedHandCursor if self._drag_mode == "rotate" else Qt.SizeAllCursor)
- event.accept()
- else:
- super().mousePressEvent(event)
- def mouseMoveEvent(self, event) -> None: # noqa: N802
- if not (event.buttons() & Qt.LeftButton):
- self.setCursor(self._hover_cursor(event.modifiers()))
- if self._drag_pos is not None and (event.buttons() & Qt.LeftButton):
- pos = event.position()
- dx = pos.x() - self._drag_pos.x()
- dy = pos.y() - self._drag_pos.y()
- if self._drag_mode == "move":
- scale = self._pixel_scale()
- if scale > 0:
- ax0, ax1 = _VIEWER_AXES[self._label]
- self.slice_offset_changed.emit(ax0, ax1, dx / scale, -dy / scale)
- else: # rotate - gamedev FPS orbit
- # horizontal drag -> yaw (Ry)
- # vertical drag -> pitch (Rx); screen-down = positive pitch
- self.rotation_delta.emit(
- dy * _ROT_SENSITIVITY, # drx
- dx * _ROT_SENSITIVITY, # dry
- )
- self._drag_pos = pos
- event.accept()
- else:
- super().mouseMoveEvent(event)
- def mouseReleaseEvent(self, event) -> None: # noqa: N802
- if event.button() == Qt.LeftButton and self._drag_pos is not None:
- self._drag_pos = None
- self._drag_mode = None
- self.setCursor(self._hover_cursor(event.modifiers()))
- event.accept()
- else:
- super().mouseReleaseEvent(event)
- # -- painting -----------------------------------------------------------
- def paintEvent(self, event) -> None: # noqa: N802
- p = QPainter(self)
- rc = self.rect()
- w, h = rc.width(), rc.height()
- # 1 - background
- p.fillRect(rc, QColor(_IMAGE_BG))
- # 2 - noise texture
- p.setOpacity(0.9)
- p.drawImage(rc, self._noise)
- p.setOpacity(1.0)
- # 3 - crosshair
- cx, cy = w // 2, h // 2
- p.setPen(QPen(QColor("#444444"), 1))
- p.drawLine(cx - int(w * 0.30), cy, cx + int(w * 0.30), cy)
- p.drawLine(cx, cy - int(h * 0.30), cx, cy + int(h * 0.30))
- # 4 - scale ticks (3 per edge at 25/50/75 %)
- p.setPen(QPen(QColor("#888888"), 1))
- tick = 8
- for frac in (0.25, 0.50, 0.75):
- tx, ty = int(w * frac), int(h * frac)
- p.drawLine(tx, 0, tx, tick)
- p.drawLine(tx, h - tick, tx, h)
- p.drawLine(0, ty, tick, ty)
- p.drawLine(w - tick, ty, w, ty)
- # 5 - dim outer border
- p.setPen(QPen(QColor(_ACCENT_DIM), 1))
- p.drawRect(rc.adjusted(2, 2, -2, -2))
- # 6 - projected slice square (with 3-D offset applied)
- scale = self._pixel_scale()
- ax0, ax1 = _VIEWER_AXES[self._label]
- off_u = self._slice_offset[ax0]
- off_v = self._slice_offset[ax1]
- origin_x = cx + off_u * scale
- origin_y = cy - off_v * scale # Y-up -> Y-down
- corners_uv = _project_slice_quad(self._rot_matrix, self._label)
- pts = QPolygonF([
- QPointF(origin_x + u * scale, origin_y - v * scale)
- for u, v in corners_uv
- ])
- # semi-transparent fill
- p.setPen(Qt.NoPen)
- p.setBrush(QColor(240, 192, 64, 45))
- p.drawPolygon(pts)
- # solid border
- pen = QPen(QColor(_ACCENT), 2)
- pen.setJoinStyle(Qt.MiterJoin)
- p.setPen(pen)
- p.setBrush(Qt.NoBrush)
- p.drawPolygon(pts)
- # 7 - scan sweep stripe
- if self._active_scan:
- sy = self._scan_y
- grad = QLinearGradient(0, sy, w, sy)
- grad.setColorAt(0.0, QColor(0, 0, 0, 0))
- grad.setColorAt(0.5, QColor(255, 255, 255, 96))
- grad.setColorAt(1.0, QColor(0, 0, 0, 0))
- p.fillRect(0, max(0, sy - 2), w, 5, grad)
- # 8 - orientation label
- font = QFont("Arial", 11, QFont.Bold)
- p.setFont(font)
- p.setPen(Qt.white)
- p.drawText(rc.adjusted(8, 4, 0, 0), Qt.AlignTop | Qt.AlignLeft, self._label)
- p.end()
- # ==============================================================================
- class ProtocolListWidget(QWidget):
- protocol_selected = Signal(str)
- def __init__(self, parent: QWidget | None = None) -> None:
- super().__init__(parent)
- self.setStyleSheet(f"background: {_PANEL_BG};")
- lay = QVBoxLayout(self)
- lay.setContentsMargins(4, 8, 4, 4)
- lay.setSpacing(4)
- self._header_lbl = QLabel(i18n.tr("protocols_header"))
- self._header_lbl.setStyleSheet(
- "color: #aaaaaa; font-weight: bold; font-size: 11px; background: transparent;"
- )
- lay.addWidget(self._header_lbl)
- self._list = QListWidget()
- self._list.setStyleSheet(f"""
- QListWidget {{
- background: {_PANEL_BG};
- color: #ffffff;
- border: 1px solid #3a3a3a;
- font-size: 12px;
- outline: 0;
- }}
- QListWidget::item {{
- padding: 6px 8px;
- border-bottom: 1px solid #333333;
- }}
- QListWidget::item:selected {{
- background: {_ORANGE_SEL};
- color: #ffffff;
- }}
- QListWidget::item:hover:!selected {{
- background: #3a3a3a;
- }}
- """)
- for name in _PROTOCOLS:
- self._list.addItem(name)
- self._list.setCurrentRow(0)
- self._list.currentTextChanged.connect(self.protocol_selected)
- lay.addWidget(self._list, stretch=1)
- def retranslate_ui(self) -> None:
- self._header_lbl.setText(i18n.tr("protocols_header"))
- # ==============================================================================
- class _ScanPipelineWorker(QThread):
- """
- Full pipeline: health-check → load scenario (with seq_file / seq_info) →
- run_all → poll → extract raw data → emit raw_data_ready.
- The orchestrator is responsible for:
- - Forwarding the .seq file to seq-interp for interpretation
- - Running the full measurement scenario (spectrometer, reconstructor, …)
- The GUI never calls seq-interp or any other microservice directly.
- """
- progress = Signal(str)
- job_started = Signal(str)
- finished = Signal(str)
- error = Signal(str)
- raw_data_ready = Signal(str) # absolute path to temp JSON file
- def __init__(
- self,
- seq_file_path: str | None,
- seq_info: dict | None,
- orchestrator_url: str,
- scenario_id: str = "full_pipeline",
- protocol: str = "",
- parent=None,
- ) -> None:
- super().__init__(parent)
- self._seq_file = seq_file_path
- self._seq_info = dict(seq_info) if seq_info else {}
- self._scenario_id = scenario_id
- self._protocol = protocol
- from src.clients.orchestrator_client import OrchestratorClient
- self._orch = OrchestratorClient(orchestrator_url)
- def run(self) -> None:
- try:
- self._run_pipeline()
- except Exception as exc:
- if not self.isInterruptionRequested():
- self.error.emit(str(exc))
- def _run_pipeline(self) -> None:
- self.progress.emit("Проверка оркестратора...")
- if not self._orch.healthcheck():
- raise RuntimeError("Оркестратор недоступен — проверьте, что сервис запущен")
- self.progress.emit(
- f"Отправка задания в оркестратор "
- f"[сценарий: {self._scenario_id}, ИП: {self._protocol or '—'}]…"
- )
- job_id = self._orch.scan(
- seq_file_path=self._seq_file or None,
- seq_info=self._seq_info or None,
- scenario_id=self._scenario_id,
- protocol=self._protocol,
- )
- self.job_started.emit(job_id)
- self.progress.emit(f" Job {job_id[:8]}… запущен, ожидание результата…")
- def _on_status(status: str) -> None:
- self.progress.emit(f" [{status}]")
- final = self._orch.poll_scan(
- job_id,
- timeout=300.0,
- poll_interval=2.0,
- progress_cb=_on_status,
- interrupted_fn=self.isInterruptionRequested,
- )
- steps = final.get("steps", [])
- meas_id = None
- raw_data = None
- for step in steps:
- name = step.get("name")
- res = step.get("result") or {}
- if name == "start_measurement":
- meas_id = res.get("measurement_id")
- if name == "fetch_data":
- raw_data = res.get("data")
- self.progress.emit(f" Сканирование завершено (meas_id={meas_id})")
- is_stub = str(meas_id) in ("", "None", "meas_stub") or meas_id is None
- if raw_data and not is_stub:
- raw_path = self._save_raw(raw_data, meas_id)
- if raw_path:
- self.raw_data_ready.emit(raw_path)
- self.finished.emit(f"job завершён (meas_id={meas_id})")
- def _save_raw(self, data, meas_id) -> str | None:
- import tempfile, time as _t
- fname = f"scan_raw_{meas_id}_{int(_t.time())}.json"
- fpath = os.path.join(tempfile.gettempdir(), fname)
- try:
- with open(fpath, "w", encoding="utf-8") as fh:
- json.dump(data, fh)
- self.progress.emit(f" Данные сохранены: {fname}")
- return fpath
- except Exception as exc:
- self.progress.emit(f" Не удалось сохранить данные: {exc}")
- return None
- # ==============================================================================
- class _ScanWorker(QThread):
- """Fire-and-forget: load scenario and run_all via orchestrator REST."""
- finished = Signal(str) # job_id
- error = Signal(str)
- def __init__(self, url: str, info: dict, parent=None) -> None:
- super().__init__(parent)
- self._url = url.rstrip("/")
- self._info = info
- def run(self) -> None:
- try:
- if not _HAS_HTTPX:
- import urllib.request, urllib.error
- self._run_urllib()
- else:
- self._run_httpx()
- except Exception as exc:
- self.error.emit(str(exc))
- def _run_httpx(self) -> None:
- payload = {"param_overrides": {"start_measurement": {"info": self._info}}}
- with _httpx.Client(timeout=15) as client:
- r = client.post(
- f"{self._url}/scenario/load/full_pipeline",
- json=payload,
- )
- r.raise_for_status()
- job_id = r.json().get("job_id", "?")
- r2 = client.post(f"{self._url}/scenario/{job_id}/run_all")
- r2.raise_for_status()
- self.finished.emit(job_id)
- def _run_urllib(self) -> None:
- import urllib.request
- payload = {"param_overrides": {"start_measurement": {"info": self._info}}}
- data = json.dumps(payload).encode()
- headers = {"Content-Type": "application/json"}
- req = urllib.request.Request(
- f"{self._url}/scenario/load/full_pipeline",
- data=data, headers=headers, method="POST",
- )
- with urllib.request.urlopen(req, timeout=15) as resp:
- body = json.loads(resp.read())
- job_id = body.get("job_id", "?")
- req2 = urllib.request.Request(
- f"{self._url}/scenario/{job_id}/run_all",
- data=b"{}", headers=headers, method="POST",
- )
- with urllib.request.urlopen(req2, timeout=15):
- pass
- self.finished.emit(job_id)
- # ==============================================================================
- class ScanningTab(QWidget):
- """
- Operator-facing scanning tab.
- Bottom QTabWidget tabs match Siemens syngo layout:
- Основные | Контраст | Разрешение | Геометрия | Система
- "Геометрия" holds orientation presets + Rx/Ry/Rz spinboxes + live 3x3 matrix.
- """
- scan_job_started = Signal(str) # job_id — emitted once the orchestrator accepts the job
- raw_data_ready = Signal(str) # absolute path to temp JSON file
- def __init__(self, parent: QWidget | None = None) -> None:
- super().__init__(parent)
- self.setStyleSheet(f"background: {_BG_DARK};")
- self._viewers: list[MriViewerWidget] = []
- self._scan_tick: int = 0
- self._seq_info: dict | None = None
- self._seq_file_path: str | None = None
- self._orchestrator_url: str = "http://localhost:1717"
- self._scan_worker: _ScanPipelineWorker | None = None
- self._active_protocol: str = _PROTOCOLS[0]
- self._scenario_id: str = "full_pipeline"
- self._slice_offset: list[float] = [0.0, 0.0, 0.0]
- root = QVBoxLayout(self)
- root.setContentsMargins(0, 0, 0, 0)
- root.setSpacing(0)
- vsplit = QSplitter(Qt.Vertical)
- vsplit.setStyleSheet("QSplitter::handle { background: #333344; height: 3px; }")
- vsplit.addWidget(self._build_upper_area())
- vsplit.addWidget(self._build_bottom_panel())
- vsplit.setSizes([700, 140])
- vsplit.setChildrenCollapsible(False)
- root.addWidget(vsplit, stretch=1)
- self._setup_animation()
- self._update_matrix_display()
- self._update_slice_display()
- self._update_scan_ready_state()
- # -- public API ---------------------------------------------------------
- def apply_seq_info(self, info_dict: dict) -> None:
- """Receive exported sequence info from SeqInterpTab and auto-start scan."""
- self._seq_info = dict(info_dict)
- self._seq_file_path = None
- self._update_scan_ready_state()
- label = info_dict.get("infostr") or "sequence"
- self._log(f"Получены параметры последовательности: {label}", "INFO")
- if not self._btn_scan.isChecked():
- self._btn_scan.setChecked(True)
- def set_orchestrator_url(self, url: str) -> None:
- self._orchestrator_url = url
- def retranslate_ui(self) -> None:
- self._protocol_list.retranslate_ui()
- # param tabs: 0-3 = placeholders, 4 = geometry
- _keys = ["tab_main", "tab_contrast", "tab_resolution", "tab_system", "tab_geometry"]
- for idx, key in enumerate(_keys):
- self._param_tabs.setTabText(idx, i18n.tr(key))
- self._preset_group.setTitle(i18n.tr("grp_orientation"))
- self._rot_group.setTitle(i18n.tr("grp_rotation"))
- self._matrix_group.setTitle(i18n.tr("grp_rot_matrix"))
- # scan button: keep text in sync with current scan state
- if self._scan_timer.isActive():
- self._btn_scan.setText(i18n.tr("btn_stop_scan"))
- else:
- self._btn_scan.setText(i18n.tr("btn_scan"))
- self._update_scan_ready_state()
- # -- layout builders ----------------------------------------------------
- def _build_upper_area(self) -> QWidget:
- container = QWidget()
- container.setStyleSheet(f"background: {_BG_DARK};")
- lay = QVBoxLayout(container)
- lay.setContentsMargins(0, 0, 0, 0)
- lay.setSpacing(0)
- hsplit = QSplitter(Qt.Horizontal)
- hsplit.setStyleSheet("QSplitter::handle { background: #333344; width: 3px; }")
- hsplit.addWidget(self._build_protocol_panel())
- hsplit.addWidget(self._build_image_grid())
- hsplit.setSizes([220, 980])
- hsplit.setChildrenCollapsible(False)
- lay.addWidget(hsplit)
- return container
- def _build_protocol_panel(self) -> QWidget:
- container = QWidget()
- container.setStyleSheet(f"background: {_PANEL_BG};")
- lay = QVBoxLayout(container)
- lay.setContentsMargins(0, 0, 0, 0)
- lay.setSpacing(0)
- self._protocol_list = ProtocolListWidget()
- self._protocol_list.protocol_selected.connect(self._on_protocol_selected)
- lay.addWidget(self._protocol_list, stretch=1)
- sep = QFrame()
- sep.setFrameShape(QFrame.HLine)
- sep.setFixedHeight(1)
- sep.setStyleSheet("background: #3a3a4a; border: none;")
- lay.addWidget(sep)
- # Scenario selector
- sc_container = QWidget()
- sc_container.setStyleSheet(f"background: {_PANEL_BG};")
- sc_lay = QVBoxLayout(sc_container)
- sc_lay.setContentsMargins(6, 6, 6, 4)
- sc_lay.setSpacing(4)
- sc_header = QLabel("Сценарий оркестратора")
- sc_header.setStyleSheet(
- "color: #7777aa; font-size: 10px; font-weight: bold; background: transparent;"
- )
- sc_lay.addWidget(sc_header)
- sc_row = QHBoxLayout()
- sc_row.setSpacing(4)
- _combo_style = (
- "QComboBox {"
- f" background: {_BTN_BG}; color: #ccccee;"
- " border: 1px solid #444466; border-radius: 3px;"
- " font-size: 11px; padding: 3px 6px;"
- "}"
- "QComboBox::drop-down { border: none; width: 16px; }"
- "QComboBox QAbstractItemView {"
- f" background: {_BTN_BG}; color: #ccccee; border: 1px solid #444466;"
- " selection-background-color: #e65100;"
- "}"
- )
- self._scenario_combo = QComboBox()
- self._scenario_combo.setStyleSheet(_combo_style)
- self._scenario_combo.setToolTip("Тип сценария, который будет запущен в оркестраторе")
- self._scenario_combo.addItem("full_pipeline")
- self._scenario_combo.currentTextChanged.connect(self._on_scenario_selected)
- sc_row.addWidget(self._scenario_combo, stretch=1)
- btn_refresh_sc = QPushButton("↻")
- btn_refresh_sc.setFixedSize(24, 24)
- btn_refresh_sc.setToolTip("Получить список сценариев из оркестратора")
- btn_refresh_sc.setStyleSheet(
- f"QPushButton {{ background: {_BTN_BG}; color: #7777aa;"
- " border: 1px solid #444466; border-radius: 3px; font-size: 13px; }"
- "QPushButton:hover { color: #ffffff; background: #303050; }"
- )
- btn_refresh_sc.clicked.connect(self._on_refresh_scenarios)
- sc_row.addWidget(btn_refresh_sc)
- sc_lay.addLayout(sc_row)
- lay.addWidget(sc_container)
- sep2 = QFrame()
- sep2.setFrameShape(QFrame.HLine)
- sep2.setFixedHeight(1)
- sep2.setStyleSheet("background: #3a3a4a; border: none;")
- lay.addWidget(sep2)
- # .seq file loader
- btn_load = QPushButton("Загрузить .seq…")
- btn_load.setToolTip("Выбрать готовый .seq файл для запуска полного пайплайна")
- btn_load.setStyleSheet(
- "QPushButton {"
- f" background: {_BTN_BG}; color: #aaaacc;"
- " border: 1px solid #444466; border-radius: 3px;"
- " font-size: 11px; padding: 5px 8px; margin: 6px 6px 2px 6px;"
- "}"
- "QPushButton:hover { background: #303050; color: #ffffff; }"
- )
- btn_load.clicked.connect(self._on_load_seq_clicked)
- lay.addWidget(btn_load)
- self._lbl_seq_file = QLabel("Файл не выбран")
- self._lbl_seq_file.setWordWrap(True)
- self._lbl_seq_file.setStyleSheet(
- "color: #555577; font-size: 10px; background: transparent;"
- "padding: 0 8px 6px 8px;"
- )
- lay.addWidget(self._lbl_seq_file)
- return container
- def _build_image_grid(self) -> QWidget:
- container = QWidget()
- container.setStyleSheet(f"background: {_BG_DARK};")
- grid = QGridLayout(container)
- grid.setContentsMargins(6, 6, 6, 6)
- grid.setSpacing(6)
- for col, lbl in enumerate(("Axial", "Cor", "Sag")):
- viewer = MriViewerWidget(label=lbl)
- viewer.slice_offset_changed.connect(self._on_slice_offset_changed)
- viewer.rotation_delta.connect(self._on_rotation_delta)
- grid.addWidget(viewer, 0, col)
- grid.setColumnStretch(col, 1)
- self._viewers.append(viewer)
- grid.setRowStretch(0, 1)
- return container
- def _build_bottom_panel(self) -> QWidget:
- panel = QWidget()
- panel.setStyleSheet("background: #16162a; border-top: 1px solid #333355;")
- outer = QVBoxLayout(panel)
- outer.setContentsMargins(0, 0, 0, 0)
- outer.setSpacing(0)
- # -- parameter tabs -------------------------------------------------
- self._param_tabs = QTabWidget()
- self._param_tabs.setStyleSheet("""
- QTabWidget::pane { border: none; background: #16162a; }
- QTabBar::tab {
- background: #1e1e38; color: #aaaacc;
- padding: 5px 14px; border: 1px solid #333355;
- border-bottom: none; margin-right: 2px; font-size: 11px;
- }
- QTabBar::tab:selected { background: #252545; color: #ffffff; }
- QTabBar::tab:hover:!selected { background: #222240; }
- """)
- _placeholder_keys = ["tab_main", "tab_contrast", "tab_resolution", "tab_system"]
- for key in _placeholder_keys:
- w = QLabel(f"[ {i18n.tr(key)} — TODO ]")
- w.setAlignment(Qt.AlignCenter)
- w.setStyleSheet("color: #555577; background: #16162a;")
- self._param_tabs.addTab(w, i18n.tr(key))
- geo_tab = self._build_geometry_tab()
- self._param_tabs.addTab(geo_tab, i18n.tr("tab_geometry"))
- log_tab = self._build_log_tab()
- self._param_tabs.addTab(log_tab, "Лог")
- self._param_tabs.setCurrentWidget(geo_tab)
- outer.addWidget(self._param_tabs, stretch=1)
- # -- bottom action bar ----------------------------------------------
- action_bar = QWidget()
- action_bar.setStyleSheet("background: #16162a; border-top: 1px solid #2a2a4a;")
- action_lay = QHBoxLayout(action_bar)
- action_lay.setContentsMargins(12, 6, 12, 6)
- action_lay.setSpacing(12)
- self._status_label = QLabel(i18n.tr("no_data"))
- self._status_label.setStyleSheet("color: #666688; font-size: 11px;")
- action_lay.addWidget(self._status_label)
- action_lay.addStretch()
- self._btn_scan = QPushButton(i18n.tr("btn_scan"))
- self._btn_scan.setCheckable(True)
- self._btn_scan.setMinimumWidth(140)
- self._btn_scan.setStyleSheet(
- "QPushButton {"
- " background: #1a3a1a; color: #88ee88;"
- " border: 1px solid #336633; border-radius: 4px;"
- " font-size: 12px; font-weight: bold; padding: 5px 16px;"
- "}"
- "QPushButton:checked {"
- " background: #3a1a1a; color: #ee8888; border-color: #663333;"
- "}"
- "QPushButton:hover:!checked { background: #1e4a1e; }"
- )
- self._btn_scan.toggled.connect(self._on_scan_toggled)
- action_lay.addWidget(self._btn_scan)
- outer.addWidget(action_bar)
- return panel
- def _build_geometry_tab(self) -> QWidget:
- w = QWidget()
- w.setStyleSheet("background: #16162a;")
- lay = QHBoxLayout(w)
- lay.setContentsMargins(12, 8, 12, 8)
- lay.setSpacing(16)
- # -- orientation presets --------------------------------------------
- self._preset_group = QGroupBox(i18n.tr("grp_orientation"))
- preset_group = self._preset_group
- preset_group.setStyleSheet(self._group_style())
- preset_lay = QVBoxLayout(preset_group)
- preset_lay.setSpacing(4)
- self._btn_group = QButtonGroup(self)
- self._btn_group.setExclusive(True)
- self._preset_buttons: dict[str, QPushButton] = {}
- for name in ("Axial", "Coronal", "Sagittal"):
- btn = QPushButton(name)
- btn.setCheckable(True)
- btn.setStyleSheet(self._preset_btn_style())
- preset_lay.addWidget(btn)
- self._btn_group.addButton(btn)
- self._preset_buttons[name] = btn
- btn.clicked.connect(lambda checked, n=name: self._on_preset(n))
- self._preset_buttons["Axial"].setChecked(True)
- lay.addWidget(preset_group)
- # -- rotation angles ------------------------------------------------
- self._rot_group = QGroupBox(i18n.tr("grp_rotation"))
- rot_group = self._rot_group
- rot_group.setStyleSheet(self._group_style())
- form = QFormLayout(rot_group)
- form.setSpacing(6)
- form.setContentsMargins(8, 4, 8, 4)
- spin_style = (
- "QDoubleSpinBox {"
- " background: #252540; color: #ddddff;"
- " border: 1px solid #445; border-radius: 3px; padding: 2px 4px;"
- "}"
- "QDoubleSpinBox:focus { border-color: #f0c040; }"
- )
- lbl_style = "color: #aaaacc; font-size: 11px;"
- self._spin_rx = QDoubleSpinBox()
- self._spin_ry = QDoubleSpinBox()
- self._spin_rz = QDoubleSpinBox()
- for spin in (self._spin_rx, self._spin_ry, self._spin_rz):
- spin.setRange(-180.0, 180.0)
- spin.setSingleStep(1.0)
- spin.setDecimals(1)
- spin.setSuffix(" deg")
- spin.setStyleSheet(spin_style)
- spin.setMinimumWidth(90)
- spin.valueChanged.connect(self._on_rotation_changed)
- for lbl_text, spin in (("Rx", self._spin_rx), ("Ry", self._spin_ry), ("Rz", self._spin_rz)):
- lbl = QLabel(lbl_text)
- lbl.setStyleSheet(lbl_style)
- form.addRow(lbl, spin)
- lay.addWidget(rot_group)
- # -- rotation matrix display ----------------------------------------
- self._matrix_group = QGroupBox(i18n.tr("grp_rot_matrix"))
- matrix_group = self._matrix_group
- matrix_group.setStyleSheet(self._group_style())
- matrix_lay = QVBoxLayout(matrix_group)
- matrix_lay.setContentsMargins(8, 4, 8, 4)
- self._matrix_label = QLabel()
- self._matrix_label.setFont(QFont("Courier New", 10))
- self._matrix_label.setStyleSheet("color: #99ccff; background: transparent;")
- self._matrix_label.setAlignment(Qt.AlignLeft | Qt.AlignVCenter)
- matrix_lay.addWidget(self._matrix_label)
- lay.addWidget(matrix_group)
- lay.addStretch()
- return w
- def _build_log_tab(self) -> QWidget:
- w = QWidget()
- w.setStyleSheet("background: #16162a;")
- lay = QVBoxLayout(w)
- lay.setContentsMargins(6, 6, 6, 4)
- lay.setSpacing(4)
- self._log_view = QTextEdit()
- self._log_view.setReadOnly(True)
- self._log_view.setFont(QFont("Courier New", 9))
- self._log_view.setStyleSheet(
- "QTextEdit {"
- " background: #0e0e1c; color: #aaaacc;"
- " border: 1px solid #2a2a4a; border-radius: 3px;"
- "}"
- )
- lay.addWidget(self._log_view, stretch=1)
- btn_clear = QPushButton("Очистить")
- btn_clear.setFixedWidth(90)
- btn_clear.setStyleSheet(
- "QPushButton {"
- f" background: {_BTN_BG}; color: #777799;"
- " border: 1px solid #333355; border-radius: 3px;"
- " font-size: 10px; padding: 3px 8px;"
- "}"
- "QPushButton:hover { color: #aaaacc; }"
- )
- btn_clear.clicked.connect(self._log_view.clear)
- lay.addWidget(btn_clear, alignment=Qt.AlignRight)
- return w
- _LOG_COLORS = {"INFO": "#aaaacc", "WARN": "#e6a817", "ERR": "#ee4444"}
- def _log(self, msg: str, level: str = "INFO") -> None:
- if not hasattr(self, "_log_view"):
- return
- from datetime import datetime
- ts = datetime.now().strftime("%H:%M:%S")
- color = self._LOG_COLORS.get(level, "#aaaacc")
- line = (
- f"<span style='color:#555577'>{ts}</span>"
- f" <span style='color:{color}'>[{level}]</span>"
- f" {msg}"
- )
- self._log_view.append(line)
- sb = self._log_view.verticalScrollBar()
- sb.setValue(sb.maximum())
- @staticmethod
- def _group_style() -> str:
- return (
- "QGroupBox {"
- " color: #8888aa; font-size: 10px; font-weight: bold;"
- " border: 1px solid #333355; border-radius: 4px; margin-top: 6px;"
- "}"
- "QGroupBox::title { subcontrol-origin: margin; left: 6px; top: -1px; }"
- )
- @staticmethod
- def _preset_btn_style() -> str:
- return (
- "QPushButton {"
- f" background: {_BTN_BG}; color: #ccccee;"
- " border: 1px solid #444466; border-radius: 3px;"
- " font-size: 11px; padding: 4px 10px;"
- "}"
- "QPushButton:checked { background: #e65100; color: #ffffff; border-color: #ff7733; }"
- "QPushButton:hover:!checked { background: #303050; }"
- )
- # -- animation ----------------------------------------------------------
- def _setup_animation(self) -> None:
- self._scan_timer = QTimer(self)
- self._scan_timer.setInterval(_SCAN_LINE_INTERVAL_MS)
- self._scan_timer.timeout.connect(self._on_tick)
- def _on_tick(self) -> None:
- self._scan_tick += 1
- offset = _SCAN_SWEEP_DURATION // max(len(self._viewers), 1)
- for i, viewer in enumerate(self._viewers):
- phase = (self._scan_tick + i * offset) % _SCAN_SWEEP_DURATION
- viewer.advance_scanline(phase)
- # -- rotation logic -----------------------------------------------------
- def _on_preset(self, name: str) -> None:
- rx, ry, rz = _PRESETS[name]
- for spin in (self._spin_rx, self._spin_ry, self._spin_rz):
- spin.blockSignals(True)
- self._spin_rx.setValue(rx)
- self._spin_ry.setValue(ry)
- self._spin_rz.setValue(rz)
- for spin in (self._spin_rx, self._spin_ry, self._spin_rz):
- spin.blockSignals(False)
- self._on_rotation_changed()
- def _on_rotation_changed(self) -> None:
- self._update_matrix_display()
- self._update_slice_display()
- self._sync_preset_buttons()
- def _compute_rotation_matrix(self) -> list:
- return _euler_to_matrix(
- self._spin_rx.value(),
- self._spin_ry.value(),
- self._spin_rz.value(),
- )
- def _update_matrix_display(self) -> None:
- R = self._compute_rotation_matrix()
- lines = []
- for row in R:
- lines.append(" ".join(f"{v:+.3f}" for v in row))
- self._matrix_label.setText("\n".join(lines))
- def _update_slice_display(self) -> None:
- """Push current rotation matrix and offset to all viewers."""
- R = self._compute_rotation_matrix()
- for v in self._viewers:
- v.set_rotation_matrix(R)
- v.set_slice_offset(self._slice_offset)
- def _on_slice_offset_changed(self, ax0: int, ax1: int, d0: float, d1: float) -> None:
- """Accumulate drag delta from any viewer into the shared 3-D offset."""
- self._slice_offset[ax0] = max(-0.95, min(0.95, self._slice_offset[ax0] + d0))
- self._slice_offset[ax1] = max(-0.95, min(0.95, self._slice_offset[ax1] + d1))
- R = self._compute_rotation_matrix()
- for v in self._viewers:
- v.set_slice_offset(self._slice_offset)
- v.set_rotation_matrix(R)
- def _on_rotation_delta(self, drx: float, dry: float) -> None:
- """Gamedev-style Ctrl+drag: apply incremental Rx/Ry rotation from any viewer."""
- def _wrap(val: float) -> float:
- while val > 180.0: val -= 360.0
- while val < -180.0: val += 360.0
- return val
- for spin, delta in ((self._spin_rx, drx), (self._spin_ry, dry)):
- spin.blockSignals(True)
- spin.setValue(_wrap(spin.value() + delta))
- spin.blockSignals(False)
- self._on_rotation_changed()
- def _sync_preset_buttons(self) -> None:
- """Check if current angles match a known preset; highlight button if so."""
- rx = round(self._spin_rx.value(), 1)
- ry = round(self._spin_ry.value(), 1)
- rz = round(self._spin_rz.value(), 1)
- matched = None
- for name, (prx, pry, prz) in _PRESETS.items():
- if (rx, ry, rz) == (prx, pry, prz):
- matched = name
- break
- for name, btn in self._preset_buttons.items():
- btn.blockSignals(True)
- btn.setChecked(name == matched)
- btn.blockSignals(False)
- # -- .seq file loading --------------------------------------------------
- def _on_load_seq_clicked(self) -> None:
- path, _ = QFileDialog.getOpenFileName(
- self,
- "Выбрать .seq файл",
- os.path.join(os.path.dirname(__file__), os.pardir, os.pardir),
- "Pulseq файлы (*.seq);;Все файлы (*)",
- )
- if not path:
- return
- self._seq_file_path = path
- self._seq_info = None
- fname = os.path.basename(path)
- self._lbl_seq_file.setText(fname)
- self._lbl_seq_file.setStyleSheet(
- "color: #e65100; font-size: 10px; background: transparent;"
- "padding: 0 8px 6px 8px;"
- )
- self._log(f"Загружен файл: {fname}", "INFO")
- self._update_scan_ready_state()
- # -- scan initiation ----------------------------------------------------
- def _on_scan_toggled(self, checked: bool) -> None:
- if checked:
- if self._scan_worker is not None and self._scan_worker.isRunning():
- return
- if self._seq_info is None and self._seq_file_path is None:
- QMessageBox.warning(
- self, i18n.tr("dlg_no_data_title"),
- i18n.tr("dlg_no_seq_msg"),
- )
- self._btn_scan.setChecked(False)
- return
- info = dict(self._seq_info) if self._seq_info else {}
- if info:
- info["rotation_matrix"] = self._compute_rotation_matrix()
- info["slice_position"] = list(self._slice_offset)
- self._scan_worker = _ScanPipelineWorker(
- seq_file_path=self._seq_file_path,
- seq_info=info if info else None,
- orchestrator_url=self._orchestrator_url,
- scenario_id=self._scenario_id,
- protocol=self._active_protocol,
- parent=self,
- )
- self._scan_worker.job_started.connect(self.scan_job_started)
- self._scan_worker.finished.connect(self._on_scan_done)
- self._scan_worker.error.connect(self._on_scan_error)
- self._scan_worker.progress.connect(self._on_scan_progress)
- self._scan_worker.raw_data_ready.connect(self._on_raw_data_ready_log)
- self._scan_worker.raw_data_ready.connect(self.raw_data_ready)
- self._scan_worker.start()
- self._scan_timer.start()
- self._btn_scan.setText(i18n.tr("btn_stop_scan"))
- self._status_label.setText("Инициализация пайплайна…")
- self._status_label.setStyleSheet("color: #88ee88; font-size: 11px;")
- self._log("--- Запуск пайплайна сканирования ---", "INFO")
- for v in self._viewers:
- v.set_scanning(True)
- # Switch to Log tab so the user can follow progress
- log_idx = self._param_tabs.indexOf(self._log_view.parent())
- if log_idx >= 0:
- self._param_tabs.setCurrentIndex(log_idx)
- else:
- self._scan_timer.stop()
- if self._scan_worker and self._scan_worker.isRunning():
- self._scan_worker.requestInterruption()
- self._btn_scan.setText(i18n.tr("btn_scan"))
- for v in self._viewers:
- v.set_scanning(False)
- self._update_scan_ready_state()
- def _on_raw_data_ready_log(self, path: str) -> None:
- self._log(f"Данные отправлены во вкладку Spectroscopy: {os.path.basename(path)}", "INFO")
- def _on_scan_progress(self, msg: str) -> None:
- self._status_label.setText(msg[:80])
- self._status_label.setStyleSheet("color: #88ee88; font-size: 11px;")
- self._log(msg, "INFO")
- def _on_scan_done(self, msg: str) -> None:
- self._scan_timer.stop()
- self._status_label.setText("Готово")
- self._status_label.setStyleSheet("color: #66ccff; font-size: 11px;")
- self._log(msg, "INFO")
- for v in self._viewers:
- v.set_scanning(False)
- self._btn_scan.setChecked(False)
- def _on_scan_error(self, err: str) -> None:
- self._scan_timer.stop()
- self._status_label.setText(f"{i18n.tr('error_prefix')}: {err[:70]}")
- self._status_label.setStyleSheet("color: #ee4444; font-size: 11px;")
- self._log(err, "ERR")
- for v in self._viewers:
- v.set_scanning(False)
- self._btn_scan.setChecked(False)
- def _update_scan_ready_state(self) -> None:
- if self._seq_file_path:
- fname = os.path.basename(self._seq_file_path)
- self._status_label.setText(f"Файл: {fname}")
- self._status_label.setStyleSheet("color: #e65100; font-size: 11px;")
- elif self._seq_info is not None:
- self._status_label.setText(i18n.tr("ready_to_scan"))
- self._status_label.setStyleSheet("color: #e65100; font-size: 11px;")
- else:
- self._status_label.setText(i18n.tr("no_data"))
- self._status_label.setStyleSheet("color: #666688; font-size: 11px;")
- # -- scenario selection -------------------------------------------------
- def _on_scenario_selected(self, name: str) -> None:
- if name:
- self._scenario_id = name
- def _on_refresh_scenarios(self) -> None:
- import httpx
- try:
- r = httpx.get(
- f"{self._orchestrator_url}/scenario/list",
- timeout=5.0,
- )
- if r.is_success:
- scenarios: list[str] = r.json().get("scenarios", [])
- if scenarios:
- current = self._scenario_combo.currentText()
- self._scenario_combo.blockSignals(True)
- self._scenario_combo.clear()
- for s in scenarios:
- self._scenario_combo.addItem(s)
- idx = self._scenario_combo.findText(current)
- self._scenario_combo.setCurrentIndex(max(idx, 0))
- self._scenario_combo.blockSignals(False)
- self._scenario_id = self._scenario_combo.currentText()
- self._log(f"Сценарии загружены: {scenarios}", "INFO")
- else:
- self._log("Оркестратор вернул пустой список сценариев", "WARN")
- else:
- self._log(f"Ошибка загрузки сценариев: HTTP {r.status_code}", "WARN")
- except Exception as exc:
- self._log(f"Не удалось подключиться к оркестратору: {exc}", "WARN")
- # -- protocol selection -------------------------------------------------
- def _on_protocol_selected(self, name: str) -> None:
- self._active_protocol = name
|