import time import os import io import zipfile import shutil import json from typing import List, Tuple from PIL import Image import streamlit as st from matplotlib import pyplot as plt import base64 import yaml from easydict import EasyDict import numpy as np import h5py from datetime import datetime from Project_Koma.koma_adapter import ( is_koma_available, list_h5_files, list_seq_files, ) # Интеграция с koma_scan для автозапуска контейнера и вызова сканирования from utils.koma_scan import ( run_container, stop_container, scan_once, scan_and_reconstruct, ) # Прямая реконструкция (with/without k-space sort) from Project_Koma.reconstruction import ( process_hdf5_with_sort, process_hdf5_without_sort, ) from flow_model.main import flow_model from utils.page_tse_nirsii import page_tse_nirsii, make_phantom_zip # ---------- Theme-aware logo (top-right, base64-embedded) ---------- def _b64_img(path: str) -> str | None: try: with open(path, "rb") as f: return base64.b64encode(f.read()).decode("ascii") except Exception: return None def nav_to(p: str): st.session_state.page = p def header_with_theme_logo(title: str, light_path: str = "logos/NEW_PHYSTECH_for_light.png", dark_path: str = "logos/NEW_PHYSTECH_for_dark.png", size_px: int = 100): light_b64 = _b64_img(light_path) dark_b64 = _b64_img(dark_path) if not (light_b64 or dark_b64): # если нет логотипов — просто выводим заголовок st.markdown(f"## {title}") return light_src = f"data:image/png;base64,{light_b64}" if light_b64 else "" dark_src = f"data:image/png;base64,{dark_b64}" if dark_b64 else light_src html = f"""

{title}

""" st.markdown(html, unsafe_allow_html=True) st.set_page_config( page_title="MRI physics based augmentation", page_icon="🧠", layout="wide" ) header_with_theme_logo("MRI physics based augmentation") # ---------- Simple router in session_state ---------- if "page" not in st.session_state: st.session_state.page = "home" # storage for generated phantom (appears after progress completes) if "phantom_blob" not in st.session_state: st.session_state.phantom_blob = None # ---------- Helpers: validation of uploaded files ---------- def _validate_seq_bytes(filename: str, data: bytes, max_size_mb: int = 5) -> tuple[bool, str | None]: try: if len(data) == 0: return False, f"{filename}: empty file" if len(data) > max_size_mb * 1024 * 1024: return False, f"{filename}: file is too large (> {max_size_mb} MB)" # try decode as text _ = data.decode("utf-8", errors="ignore") return True, None except Exception as e: return False, f"{filename}: validation error: {e}" def _validate_kso_json_bytes(filename: str, data: bytes) -> tuple[bool, str | None]: try: obj = json.loads(data.decode("utf-8", errors="ignore")) # Accept two schemas observed in repo if isinstance(obj, dict) and "k_space_order" in obj: kso = obj["k_space_order"] if isinstance(kso, dict) and "k_space_order" in kso: kso = kso["k_space_order"] if isinstance(kso, list) and len(kso) > 0: return True, None return False, f"{filename}: missing or invalid 'k_space_order'" except Exception as e: return False, f"{filename}: invalid JSON ({e})" def _validate_phantom_h5_bytes(filename: str, data: bytes) -> tuple[bool, str | None]: try: with h5py.File(io.BytesIO(data), 'r') as hf: # find any dataset with non-zero size has_dataset = False def _walker(name, obj): nonlocal has_dataset try: if isinstance(obj, h5py.Dataset): if obj.size and obj.size > 0: has_dataset = True except Exception: pass hf.visititems(_walker) if not has_dataset: return False, f"{filename}: HDF5 contains no datasets" return True, None except Exception as e: return False, f"{filename}: invalid HDF5 ({e})" if "phantom_name" not in st.session_state: st.session_state.phantom_name = None PHANTOM_OUTPUT_PATH = "./flow_model/phantoms_h5" # ---------- CUDA / GPU helpers ---------- def _cuda_status(): try: import torch available = torch.cuda.is_available() devices = [] if available: try: count = torch.cuda.device_count() for i in range(count): name = torch.cuda.get_device_name(i) cap = torch.cuda.get_device_capability(i) dev = torch.device(f"cuda:{i}") props = torch.cuda.get_device_properties(i) total_mem = getattr(props, 'total_memory', None) devices.append({ "index": i, "name": name, "capability": f"{cap[0]}.{cap[1]}", "total_mem_gb": round((total_mem or 0) / (1024**3), 2), }) except Exception: pass return { "torch_version": getattr(torch, "__version__", "unknown"), "cuda_available": available, "cuda_version": getattr(torch.version, "cuda", None), "devices": devices, } except Exception as e: return {"error": str(e)} def _gpu_self_test(): import time import torch if not torch.cuda.is_available(): return False, "CUDA недоступна (torch.cuda.is_available() == False)" try: dev = torch.device("cuda") a = torch.randn((1024, 1024), device=dev) b = torch.randn((1024, 1024), device=dev) torch.cuda.synchronize() t0 = time.perf_counter() c = a @ b torch.cuda.synchronize() dt = (time.perf_counter() - t0) * 1000.0 _ = c.mean().item() return True, f"Успех: матр. умножение на GPU заняло ~{dt:.1f} мс" except Exception as e: return False, f"Ошибка вычислений на GPU: {e}" # ---------- Image helpers ---------- # SUPPORTED_EXTS = (".png", ".jpg", ".jpeg", ".bmp", ".tif", ".tiff") # SUPPORTED_EXTS_PHANTOM = (".dcm", ".nii", ".nii.gz", ".nrrd", ".npy", ".png", ".jpg", ".jpeg") SUPPORTED_EXTS = (".png", ".jpg", ".jpeg", ".bmp", ".tif", ".tiff") # Must be a tuple; previously was a plain string and broke the file_uploader filter SUPPORTED_EXTS_PHANTOM = (".npy",) MAX_VALUE_DATASET = 100000 SEQUENCE_PRESETS = { "Turbo Spin Echo (TSE)": [ {"Параметр": "TR_ms", "Значение": 4000, "Комментарий": "Repetition time"}, {"Параметр": "TE_ms", "Значение": 80, "Комментарий": "Контраст T2"}, {"Параметр": "TurboFactor","Значение": 16, "Комментарий": "Echo train length"}, {"Параметр": "FOV_mm", "Значение": 220, "Комментарий": ""}, {"Параметр": "Slice_mm", "Значение": 3, "Комментарий": ""}, ], "Gradient Echo (GRE)": [ {"Параметр": "TR_ms", "Значение": 30, "Комментарий": "Короткий TR"}, {"Параметр": "TE_ms", "Значение": 5, "Комментарий": ""}, {"Параметр": "Flip_deg", "Значение": 15, "Комментарий": "Угол наклона"}, {"Параметр": "FOV_mm", "Значение": 220, "Комментарий": ""}, {"Параметр": "Slice_mm", "Значение": 3, "Комментарий": ""}, ], "FLAIR": [ {"Параметр": "TR_ms", "Значение": 9000, "Комментарий": "Длинный TR"}, {"Параметр": "TE_ms", "Значение": 100, "Комментарий": ""}, {"Параметр": "TI_ms", "Значение": 2500, "Комментарий": "Инверсия CSF"}, {"Параметр": "FOV_mm", "Значение": 220, "Комментарий": ""}, {"Параметр": "Slice_mm", "Значение": 4, "Комментарий": ""}, ], } # ---- Заглушка: границы параметров по последовательностям ---- # Позже заменишь на import из своего модуля, например: # from my_bounds_provider import fetch_param_bounds def fetch_param_bounds(): # Формат: # { "SEQ_NAME": { "ParamKey": (min, max), ... }, ... } return { "Turbo Spin Echo (TSE)": { "TR_ms": (500, 12000), "TE_ms": (10, 300), "TurboFactor": (2, 64), "FOV_mm": (100, 300), "Slice_mm": (1, 10), # "Matrix" без числовых границ — оставляем строкой }, "Gradient Echo (GRE)": { "TR_ms": (5, 200), "TE_ms": (2, 40), "Flip_deg": (1, 90), "FOV_mm": (100, 300), "Slice_mm": (1, 10), }, "FLAIR": { "TR_ms": (4000, 15000), "TE_ms": (50, 300), "TI_ms": (800, 3500), "FOV_mm": (100, 300), "Slice_mm": (1, 10), }, } def as_str(x): return "" if x is None else str(x) def stringify_columns(rows, cols=("Значение", "Мин", "Макс")): """Преобразует указанные колонки во всех строках в строки (для data_editor).""" out = [] for r in rows: rr = dict(r) for c in cols: if c in rr: rr[c] = as_str(rr[c]) out.append(rr) return out NUMERIC_KEYS = {"TR_ms", "TE_ms", "TI_ms", "FOV_mm", "Slice_mm", "TurboFactor", "Flip_deg"} def to_kv_dict(rows): """Список строк таблицы -> dict {param: value} для генератора.""" out = {} for r in rows: k = str(r.get("Параметр", "")).strip() if not k: continue out[k] = r.get("Значение", None) return out def try_number(x): """Пытаемся привести введённое значение к float (для числовых ключей).""" try: if x is None or x == "": return None return float(x) except Exception: return x # оставить как есть (строка) def markdown_table(rows, columns): """Рендер простой таблицы (без pandas) через Markdown.""" if not rows: st.write("No data.") return header = "| " + " | ".join(columns) + " |" sep = "| " + " | ".join(["---"] * len(columns)) + " |" lines = [header, sep] for r in rows: line = "| " + " | ".join(str(r.get(col, "")) for col in columns) + " |" lines.append(line) st.markdown("\n".join(lines)) def center_crop_to_square(img: Image.Image) -> Image.Image: """Center-crop PIL image to a square based on the smaller side.""" w, h = img.size s = min(w, h) left = (w - s) // 2 top = (h - s) // 2 return img.crop((left, top, left + s, top + s)) def load_and_prepare_assets(asset_dir: str = "assets", count: int = 3, size: Tuple[int, int] = (320, 320)) -> List[Tuple[Image.Image, str]]: """Load up to `count` images from asset_dir, center-crop to square, resize to `size`.""" results = [] if not os.path.isdir(asset_dir): return results files = sorted([f for f in os.listdir(asset_dir) if os.path.splitext(f.lower())[1] in SUPPORTED_EXTS]) for fname in files[:count]: path = os.path.join(asset_dir, fname) try: img = Image.open(path).convert("RGB") img = center_crop_to_square(img) img = img.resize(size, Image.LANCZOS) results.append((img, fname)) except Exception: continue return results def run_job_stub(status_placeholder, progress_placeholder, steps=None, delay=0.9): """Simulate a long-running job with progress and 3-line status stream. Returns True when finished.""" if steps is None: steps = [ "Инициализация пайплайна...", "Обработка входных данных...", "Генерация синтетических изображений...", "Постобработка результатов...", "Готово!", ] progress = 0 last3 = [] progress_placeholder.progress(progress, text="Waiting to start...") status_placeholder.markdown("") for i, msg in enumerate(steps, 1): last3.append(msg) last3 = last3[-3:] # keep only the last 3 # Newest at the top for the "falls down" feel: lines = [] for idx, line in enumerate(reversed(last3)): if idx == 0: lines.append(f"- **{line}**") elif idx == 1: lines.append(f"- {line}") else: lines.append(f"- {line}") status_placeholder.markdown("
".join(lines), unsafe_allow_html=True) progress = int(i * 100 / len(steps)) progress_placeholder.progress(progress, text=f"Progress: {progress}%") time.sleep(delay) return True def make_demo_zip() -> bytes: """Create a demo ZIP to download as 'phantom' result.""" buf = io.BytesIO() with zipfile.ZipFile(buf, "w", compression=zipfile.ZIP_DEFLATED) as zf: zf.writestr("phantom/README.txt", "Demo phantom result. Replace with real generated files.") buf.seek(0) return buf.getvalue() # ---------- Pages ---------- def page_home(): st.header("What does this app do") st.markdown( """ This is an interactive app for physics‑based MRI data augmentation: - Phantom generation: using uploaded 2D T1/T2/PD maps, the app builds a volumetric phantom and saves it as an .h5 file (folder `flow_model/phantoms_h5`). - Pulse sequence generation (TSE): based on the parameters table, it builds a grid and creates `.seq` files plus auxiliary `.json` files (folder `sequences/`). The "Number of sequences" field shows how many files will be created. - KOMA simulation: the selected phantom and sequences are sent to the KOMA simulator; raw data (`download/rawdata`) and reconstructed images (`download/images`) are saved and available for download. Quick start: 1) Open "Phantom generation", upload T1/T2/PD maps (.npy, 2D 128×128), then click "Begin generation". 2) Go to "Sequence dataset generation", adjust ranges in the table and generate sequences. 3) Choose a phantom and sequences, start the scan and download the results. """ ) # Load 3 prepared images from assets images = load_and_prepare_assets("assets", count=3, size=(320, 320)) if images: cols = st.columns(len(images)) for (img, name), col in zip(images, cols): with col: st.image(img, use_container_width=False) # st.image(img, caption=name, use_container_width=False) else: st.info("Place 1–3 images into the `assets/` folder (png/jpg/tif), and they will appear here with the same size.") st.markdown("---") c1, c2 = st.columns(2) with c1: with st.container(border=True): st.markdown("#### 🧠 Phantom generation") st.write("Upload T1/T2/PD images and begin the generation.") st.button("Move to the phantom", type="primary", use_container_width=True, on_click=nav_to, args=("phantom",)) with c2: with st.container(border=True): st.markdown("#### 📦 Sequence dataset generation") st.write("Generation of a dataset based on pulse sequence parameters.") st.button("Move to the dataset", type="primary", use_container_width=True, on_click=nav_to, args=("dataset",)) def page_phantom(): st.button("← Homepage", on_click=nav_to, args=("home",)) st.subheader("Generate the phantom") st.caption("Please upload 2D T1/T2/PD TSE images of brain") c1, c2, c3 = st.columns(3) with c1: t1_file = st.file_uploader("T1", type=SUPPORTED_EXTS_PHANTOM) # T1 preview if t1_file is not None: try: _t1 = np.load(io.BytesIO(t1_file.getvalue())) if _t1.ndim == 2: # нормализация в [0,1] для корректного отображения arr = _t1.astype(np.float32) if np.any(np.isfinite(arr)): arr = np.nan_to_num(arr, nan=np.nanmin(arr) if np.isnan(arr).any() else 0.0) minv = float(np.min(arr)) maxv = float(np.max(arr)) if maxv > minv: arr = (arr - minv) / (maxv - minv) else: arr = np.zeros_like(arr, dtype=np.float32) else: arr = np.zeros_like(arr, dtype=np.float32) st.caption(f"T1 shape: {tuple(_t1.shape)}") st.image(arr, clamp=True, use_container_width=True) else: st.warning(f"Expected a 2D array for T1, got an array with {_t1.ndim} dimensions.") except Exception as _e: st.warning(f"Failed to preview T1: {_e}") with c2: t2_file = st.file_uploader("T2", type=SUPPORTED_EXTS_PHANTOM) # T2 preview if t2_file is not None: try: _t2 = np.load(io.BytesIO(t2_file.getvalue())) if _t2.ndim == 2: arr = _t2.astype(np.float32) if np.any(np.isfinite(arr)): arr = np.nan_to_num(arr, nan=np.nanmin(arr) if np.isnan(arr).any() else 0.0) minv = float(np.min(arr)) maxv = float(np.max(arr)) if maxv > minv: arr = (arr - minv) / (maxv - minv) else: arr = np.zeros_like(arr, dtype=np.float32) else: arr = np.zeros_like(arr, dtype=np.float32) st.caption(f"T2 shape: {tuple(_t2.shape)}") st.image(arr, clamp=True, use_container_width=True) else: st.warning(f"Expected a 2D array for T2, got an array with {_t2.ndim} dimensions.") except Exception as _e: st.warning(f"Failed to preview T2: {_e}") with c3: pd_file = st.file_uploader("PD", type=SUPPORTED_EXTS_PHANTOM) # PD preview if pd_file is not None: try: _pd = np.load(io.BytesIO(pd_file.getvalue())) if _pd.ndim == 2: arr = _pd.astype(np.float32) if np.any(np.isfinite(arr)): arr = np.nan_to_num(arr, nan=np.nanmin(arr) if np.isnan(arr).any() else 0.0) minv = float(np.min(arr)) maxv = float(np.max(arr)) if maxv > minv: arr = (arr - minv) / (maxv - minv) else: arr = np.zeros_like(arr, dtype=np.float32) else: arr = np.zeros_like(arr, dtype=np.float32) st.caption(f"PD shape: {tuple(_pd.shape)}") st.image(arr, clamp=True, use_container_width=True) else: st.warning(f"Expected a 2D array for PD, got an array with {_pd.ndim} dimensions.") except Exception as _e: st.warning(f"Failed to preview PD: {_e}") start_btn = st.button("Begin generation", type="primary") progress_ph = st.empty() statuses_ph = st.empty() with open('upload/upload_cfg.yaml') as f: upload_cfg = yaml.load(f, Loader=yaml.FullLoader) upload_cfg = EasyDict(upload_cfg) if start_btn: # If user provided all 3 modality maps, combine them into a single weighted input if t1_file and t2_file and pd_file: try: # Read npy from uploaded buffers t1 = np.load(io.BytesIO(t1_file.getvalue())) t2 = np.load(io.BytesIO(t2_file.getvalue())) pd = np.load(io.BytesIO(pd_file.getvalue())) # Basic validation if t1.ndim != 2 or t2.ndim != 2 or pd.ndim != 2: st.error("Each of the T1/T2/PD files must be a 2D array (128x128).") return if not (t1.shape == t2.shape == pd.shape): st.error(f"Shapes do not match: T1 {t1.shape}, T2 {t2.shape}, PD {pd.shape}.") return # Enforce exact expected size if t1.shape != (128, 128): st.error(f"Expected shape (128, 128) for each map, got {t1.shape}.") return # Check numeric/finite values for name, arr in [("T1", t1), ("T2", t2), ("PD", pd)]: if not np.issubdtype(arr.dtype, np.number): st.error(f"{name} must contain numeric values, got dtype {arr.dtype}.") return if not np.isfinite(arr).any(): st.error(f"{name} contains no finite values.") return # Stack into 3-channel weighted image (H, W, 3) weighted_3_ch = np.array([t1, t2, pd]).transpose(1, 2, 0).astype(np.float32) st.info(f"Prepared input (H, W, C) = {weighted_3_ch.shape}") # Save to the expected upload path for the model dataset save_dir = os.path.join('upload', 'weighted') os.makedirs(save_dir, exist_ok=True) # Unique name per upload ts = int(time.time()) save_path = os.path.join(save_dir, f'user_upload_{ts}.npy') np.save(save_path, weighted_3_ch) # Run model in upload mode (rootB fixed in upload_cfg) pref = st.session_state.get("device_pref", "Авто (CUDA если доступна)") pref_map = { "Авто (CUDA если доступна)": None, "CPU": 'cpu', "CUDA": 'cuda', } flow_model(upload_cfg, mode='upload', device_pref=pref_map.get(pref)) done = run_job_stub(statuses_ph, progress_ph) if done: result_dir = PHANTOM_OUTPUT_PATH st.session_state.phantom_blob = make_phantom_zip(result_dir) st.session_state.phantom_name = "phantom_result.zip" st.success("Done! You can download the result.") except Exception as e: st.exception(e) else: # Fallback to pre-uploaded sample if user didn't provide all three files st.error("You must upload all three files: T1, T2, PD.") st.error("A pre-uploaded sample will be used") pref = st.session_state.get("device_pref", "Авто (CUDA если доступна)") pref_map = { "Авто (CUDA если доступна)": None, "CPU": 'cpu', "CUDA": 'cuda', } flow_model(upload_cfg, mode='upload', device_pref=pref_map.get(pref)) done = run_job_stub(statuses_ph, progress_ph) if done: result_dir = PHANTOM_OUTPUT_PATH st.session_state.phantom_blob = make_phantom_zip(result_dir) st.session_state.phantom_name = "phantom_result.zip" st.success("Done! You can download the result.") # The download button appears only when phantom_blob is present (after job completes) if st.session_state.get("phantom_blob"): st.download_button( "Download phantom", data=st.session_state.phantom_blob, file_name=st.session_state.get("phantom_name", "phantom_result.zip"), mime="application/zip", use_container_width=False, type="primary", ) # --- Предпросмотр сгенерированного фантома (.h5) --- def _normalize01(a: np.ndarray) -> np.ndarray: a = a.astype(np.float32) a = np.nan_to_num(a, nan=np.nanmin(a) if np.isnan(a).any() else 0.0) minv = float(np.min(a)) maxv = float(np.max(a)) if maxv > minv: return (a - minv) / (maxv - minv) return np.zeros_like(a, dtype=np.float32) def _to_viridis_rgb(slice2d: np.ndarray) -> np.ndarray: """ 2D массив -> цветное изображение в тех же цветах, что и plt.imshow(slice2d) с colormap='viridis'. Возвращает RGB uint8 (H, W, 3). """ norm = _normalize01(slice2d) # [0,1] cmap = plt.get_cmap() # как в matplotlib по умолчанию rgba = cmap(norm) # (H, W, 4), float32, 0..1 rgb = rgba[..., :3] # отбрасываем альфу rgb_uint8 = (rgb * 255).astype("uint8") # (H, W, 3) uint8 return rgb_uint8 def _extract_phantom_slices(h5_path: str) -> tuple[np.ndarray | None, list[np.ndarray]]: """ Читает HDF5-фантом как в примере: root -> first group -> first dataset -> phantom_data (H, W, Nslice) Возвращает: - полный объем phantom_data (или None при ошибке) - список до 4 нормированных срезов phantom_data[:, :, ph] """ try: with h5py.File(h5_path, "r") as f: keys_lvl1 = list(f.keys()) if not keys_lvl1: return None, [] g = f[keys_lvl1[0]] # если сразу dataset if isinstance(g, h5py.Dataset): phantom_data = np.array(g) else: # предполагаем группу и берём первый dataset внутри keys_lvl2 = list(g.keys()) if not keys_lvl2: return None, [] ds = g[keys_lvl2[0]] phantom_data = np.array(ds) if phantom_data.ndim != 3: # не тот формат return None, [] n_slices = phantom_data.shape[-1] max_show = min(4, n_slices) slices = [] for ph in range(max_show): sl = phantom_data[:, :, ph] slices.append(_normalize01(sl)) return phantom_data, slices except Exception: return None, [] def _list_h5(dir_path: str) -> list[str]: if not os.path.isdir(dir_path): return [] return [os.path.join(dir_path, f) for f in os.listdir(dir_path) if f.lower().endswith('.h5')] def _latest_h5(dir_path: str) -> str | None: files = _list_h5(dir_path) if not files: return None files.sort(key=lambda p: os.path.getmtime(p), reverse=True) return files[0] st.divider() st.markdown("### Phantom preview") def _list_h5(dir_path: str) -> list[str]: if not os.path.isdir(dir_path): return [] return [ os.path.join(dir_path, f) for f in os.listdir(dir_path) if f.lower().endswith(".h5") ] def _latest_h5(dir_path: str) -> str | None: files = _list_h5(dir_path) if not files: return None files.sort(key=lambda p: os.path.getmtime(p), reverse=True) return files[0] # --- Автоматический предпросмотр самого последнего фантома --- latest = _latest_h5(PHANTOM_OUTPUT_PATH) if latest: st.caption(f"Latest phantom: {os.path.basename(latest)}") volume, slices = _extract_phantom_slices(latest) flag = False if slices: # 2x2 сетка с первыми четырьмя срезами st.markdown("#### Phantom reconstruction") cols = st.columns(3) for i, img in enumerate(slices): # slices — это сами срезы phantom_data[:, :, ph] if i==1: flag = True continue if flag: i=i-1 with cols[i]: st.image( _to_viridis_rgb(img), use_container_width=True, ) else: st.info("Cannot extract 3D dataset for preview from selected .h5 phantom.") else: st.info("No phantom .h5 files found. Generate a phantom first to see the preview.") def page_dataset(): page_tse_nirsii() # Выбор последовательностей .seq — из всех подпапок каталога "sequences" seq_dir = "sequences" # Блок управления директориями (последовательности и фантомы) with st.container(border=True): st.markdown("#### Data management") # Инициализация состояния показа панели очистки/загрузки if "show_seq_clear" not in st.session_state: st.session_state.show_seq_clear = False if "show_seq_upload" not in st.session_state: st.session_state.show_seq_upload = False # Делаем две компактные кнопки в правой части строки: Upload и Clear c_left, c_right_u, c_right_c = st.columns([8, 1, 1]) with c_right_u: if st.button("Upload", key="seq_upload_btn", type="secondary", help="Upload files into the 'sequences' folder"): st.session_state.show_seq_upload = True with c_right_c: if st.button("Clear", key="seq_clear_btn", type="secondary", help="Clear the entire 'sequences' folder"): st.session_state.show_seq_clear = True with c_left: st.caption("You can clear the sequences folder. Optionally, download a backup before deletion.") # Панель загрузки файлов в папку sequences if st.session_state.show_seq_upload: with st.expander("Upload sequences — select files", expanded=True): uploaded_seq_files = st.file_uploader( "Choose .seq and related .json files", type=["seq", "json"], accept_multiple_files=True, key="seq_upload_files", ) save_seq_now = st.button("Save files to 'sequences'", key="seq_save_now", type="primary") if save_seq_now: if not uploaded_seq_files: st.warning("No files selected for upload.") else: try: os.makedirs(seq_dir, exist_ok=True) saved, skipped = [], [] for uf in uploaded_seq_files: fname = uf.name data = uf.getbuffer() ok = False reason = None if fname.lower().endswith('.seq'): ok, reason = _validate_seq_bytes(fname, bytes(data)) elif fname.lower().endswith('.json'): ok, reason = _validate_kso_json_bytes(fname, bytes(data)) else: ok, reason = False, f"{fname}: unsupported type" if ok: out_path = os.path.join(seq_dir, fname) with open(out_path, "wb") as f: f.write(data) saved.append(fname) else: skipped.append(reason or fname) if saved: st.success("Saved: " + ", ".join(saved)) if skipped: st.warning("Skipped: " + "; ".join(skipped)) if saved: st.session_state.show_seq_upload = False st.rerun() except Exception as e: st.error(f"Error while saving files: {e}") if st.session_state.show_seq_clear: with st.expander("Clear sequences folder — confirmation", expanded=True): # Ask whether to download a backup archive before clearing need_backup = st.radio( "Download all files from ‘sequences’ before clearing?", options=["Yes", "No"], index=0, horizontal=True, key="seq_clear_backup_choice", ) # Подготовка ZIP при выборе «Да» zip_bytes: bytes | None = None zip_name = None if need_backup == "Yes": try: buf = io.BytesIO() with zipfile.ZipFile(buf, mode="w", compression=zipfile.ZIP_DEFLATED) as zf: for root, _dirs, files in os.walk(seq_dir): for f in files: full_path = os.path.join(root, f) # относительный путь внутри архива arcname = os.path.relpath(full_path, start=os.path.dirname(seq_dir)) zf.write(full_path, arcname=arcname) buf.seek(0) zip_bytes = buf.read() ts = int(time.time()) zip_name = f"sequences_backup_{ts}.zip" except Exception as e: st.warning(f"Failed to prepare backup archive: {e}") zip_bytes = None if zip_bytes is not None and zip_name is not None: st.download_button( "Download backup (ZIP)", data=zip_bytes, file_name=zip_name, mime="application/zip", use_container_width=True, key="dl_seq_backup_zip", ) confirm = st.checkbox("I understand that all files in ‘sequences’ will be permanently deleted", key="seq_confirm") delete_now = st.button("Delete now", type="primary", key="seq_delete_now") if delete_now: if not confirm: st.error("You must check the confirmation box to proceed with deletion.") else: try: # Удаляем папку целиком и пересоздаём if os.path.isdir(seq_dir): shutil.rmtree(seq_dir, ignore_errors=True) os.makedirs(seq_dir, exist_ok=True) st.success("The ‘sequences’ folder has been cleared.") # Скрыть панель и перезапустить, чтобы обновить список st.session_state.show_seq_clear = False st.rerun() except Exception as e: st.error(f"Error while clearing the folder: {e}") st.markdown("---") # Инициализация состояния показа панели очистки/загрузки фантомов if "show_ph_clear" not in st.session_state: st.session_state.show_ph_clear = False if "show_ph_upload" not in st.session_state: st.session_state.show_ph_upload = False c_left_ph, c_right_ph_u, c_right_ph_c = st.columns([8, 1, 1]) with c_right_ph_u: if st.button("Upload", key="ph_upload_btn", type="secondary", help="Upload phantom .h5 files into the folder"): st.session_state.show_ph_upload = True with c_right_ph_c: if st.button("Clear", key="ph_clear_btn", type="secondary", help="Clear the entire 'phantoms' folder"): st.session_state.show_ph_clear = True with c_left_ph: st.caption("You can clear the phantoms folder. Optionally, download a backup before deletion.") # Панель загрузки фантомов if st.session_state.show_ph_upload: with st.expander("Upload phantoms — select .h5 files", expanded=True): uploaded_ph_files = st.file_uploader( "Choose phantom files (.h5)", type=["h5"], accept_multiple_files=True, key="ph_upload_files", ) save_ph_now = st.button("Save files to 'phantoms'", key="ph_save_now", type="primary") if save_ph_now: if not uploaded_ph_files: st.warning("No files selected for upload.") else: try: os.makedirs(PHANTOM_OUTPUT_PATH, exist_ok=True) saved, skipped = [], [] for uf in uploaded_ph_files: fname = uf.name data = uf.getbuffer() ok, reason = _validate_phantom_h5_bytes(fname, bytes(data)) if ok: out_path = os.path.join(PHANTOM_OUTPUT_PATH, fname) with open(out_path, "wb") as f: f.write(data) saved.append(fname) else: skipped.append(reason or fname) if saved: st.success("Saved: " + ", ".join(saved)) if skipped: st.warning("Skipped: " + "; ".join(skipped)) if saved: st.session_state.show_ph_upload = False st.rerun() except Exception as e: st.error(f"Error while saving phantom files: {e}") if st.session_state.show_ph_clear: with st.expander("Clear phantoms folder — confirmation", expanded=True): need_backup_ph = st.radio( "Download all files from ‘phantoms’ before clearing?", options=["Yes", "No"], index=0, horizontal=True, key="ph_clear_backup_choice", ) ph_zip_bytes: bytes | None = None ph_zip_name = None if need_backup_ph == "Yes": try: buf = io.BytesIO() with zipfile.ZipFile(buf, mode="w", compression=zipfile.ZIP_DEFLATED) as zf: for root, _dirs, files in os.walk(PHANTOM_OUTPUT_PATH): for f in files: full_path = os.path.join(root, f) arcname = os.path.relpath(full_path, start=os.path.dirname(PHANTOM_OUTPUT_PATH)) zf.write(full_path, arcname=arcname) buf.seek(0) ph_zip_bytes = buf.read() ts = int(time.time()) ph_zip_name = f"phantoms_backup_{ts}.zip" except Exception as e: st.warning(f"Failed to prepare backup archive: {e}") ph_zip_bytes = None if ph_zip_bytes is not None and ph_zip_name is not None: st.download_button( "Download backup (ZIP)", data=ph_zip_bytes, file_name=ph_zip_name, mime="application/zip", use_container_width=True, key="dl_ph_backup_zip", ) ph_confirm = st.checkbox("I understand that all files in ‘phantoms’ will be permanently deleted", key="ph_confirm") ph_delete_now = st.button("Delete now", type="primary", key="ph_delete_now") if ph_delete_now: if not ph_confirm: st.error("You must check the confirmation box to proceed with deletion.") else: try: if os.path.isdir(PHANTOM_OUTPUT_PATH): shutil.rmtree(PHANTOM_OUTPUT_PATH, ignore_errors=True) os.makedirs(PHANTOM_OUTPUT_PATH, exist_ok=True) st.success("The ‘phantoms’ folder has been cleared.") st.session_state.show_ph_clear = False st.rerun() except Exception as e: st.error(f"Error while clearing the folder: {e}") # После возможной очистки — перечитываем список файлов seq_files = list_seq_files(seq_dir) st.markdown("---") st.markdown("### Koma MRI simulator") # Выбор фантома (.h5) для отправки в KOMA phantom_dir = PHANTOM_OUTPUT_PATH phantom_h5_list = list_h5_files(phantom_dir) phantom_label_map = {os.path.basename(p): p for p in phantom_h5_list} if not phantom_h5_list: st.info("Directory with phantoms is empty. Generate phantoms first on the page back.") return phantom_choice = st.selectbox( "Choose phantom to scan", options=list(phantom_label_map.keys()), index=0, key="koma_phantom_choice", ) # Показываем относительные пути, чтобы было видно подпапки и избежать коллизий имен seq_label_map = {os.path.relpath(p, start=seq_dir): p for p in seq_files} if not seq_files: st.info("Sequence directory is empty. Generate them first on this page.") return seq_choices = st.multiselect( "Choose pulse sequences", options=list(seq_label_map.keys()), default=list(seq_label_map.keys())[:1], key="koma_seq_choices", ) # Инициализация persist-состояния для сохранения результатов между перерисовками if "koma_last_results" not in st.session_state: # список элементов: {seq_label, seq_path, ks_order_path, raw_out} st.session_state.koma_last_results = [] if "koma_batch_zip" not in st.session_state: # dict: {bytes, name} st.session_state.koma_batch_zip = None # Флаг запроса остановки текущего сканирования if "koma_stop_requested" not in st.session_state: st.session_state.koma_stop_requested = False # Вспомогательная функция: очистка выходных директорий перед новым сканом def _clear_output_dirs(dirs: list[str], status_cb=None): for d in dirs: try: if os.path.isdir(d): shutil.rmtree(d, ignore_errors=True) os.makedirs(d, exist_ok=True) if status_cb: status_cb.info(f"Cleared folder: {d}") except Exception as e: if status_cb: status_cb.warning(f"Failed to clear {d}: {e}") # Группа кнопок запуска сканирования и статус в одном боксе with st.container(border=True): c_run, c_status = st.columns([2, 3]) with c_run: run_koma = st.button("Start scan", type="primary", use_container_width=True) run_all = st.button("Scan all phantoms", use_container_width=True) stop_pressed = st.button("Stop the scan", use_container_width=True) if stop_pressed: st.session_state.koma_stop_requested = True # Опция: использовать GPU для контейнера KOMA (docker --gpus all) if "koma_use_gpu" not in st.session_state: st.session_state.koma_use_gpu = False st.session_state.koma_use_gpu = st.checkbox( "Use GPU for KOMA container (--gpus all)", value=st.session_state.koma_use_gpu, help="Требуется установленный NVIDIA Container Toolkit и CUDA-совместимый образ KOMA" ) with c_status: status_box = st.empty() # Локальная утилита ожидания готовности сервиса после старта контейнера def _wait_for_koma_ready(status_cb, timeout_sec: float = 60.0, poll_sec: float = 0.5) -> bool: start_t = time.time() last_update = -1 while time.time() - start_t < timeout_sec: if is_koma_available(): status_cb.success("KOMA service is reachable. Proceeding to scan…") return True # обновляем индикатор раз в ~1 секунду elapsed = int(time.time() - start_t) if elapsed != last_update: remaining = int(timeout_sec - (time.time() - start_t)) status_cb.info(f"Waiting for KOMA to become ready… {elapsed}s elapsed (≤ {int(timeout_sec)}s)") last_update = elapsed time.sleep(poll_sec) return False if run_koma: status_box.info("Preparing KOMA simulator and sending data…") phantom_path = phantom_label_map[phantom_choice] # Quick pre-check phantom file sanity try: with h5py.File(phantom_path, 'r') as _hf: pass except Exception as _e: status_box.error(f"Selected phantom is not a valid .h5: {phantom_path}. Error: {_e}") st.stop() if not seq_choices: status_box.warning("Please choose at least one sequence to scan.") st.stop() # Принудительно очищаем выходные папки перед новым запуском одиночного сканирования raw_dir = os.path.join("download", "rawdata") images_dir = os.path.join("download", "images") _clear_output_dirs([raw_dir, images_dir], status_cb=status_box) # Сбрасываем предыдущие результаты, чтобы не отображать превью удалённых файлов st.session_state.koma_last_results = [] try: # Автозапуск контейнера только если сервис недоступен started_by_us = False if not is_koma_available(): status_box.info("KOMA is not reachable — starting container…") rc = run_container(use_gpu=bool(st.session_state.get("koma_use_gpu", False))) if rc != 0: # Возможно, контейнер уже запущен другим процессом — перепроверим доступность if not is_koma_available(): status_box.error("Failed to start KOMA container: check if the Docker engine is running") st.stop() else: started_by_us = True # Дождаться, пока HTTP-сервис поднимется if not _wait_for_koma_ready(status_box): status_box.error("KOMA did not become ready in time after start. Aborting scan.") # Если контейнер запускали мы — останавливаем try: stop_container() except Exception: pass st.stop() else: status_box.info("KOMA is already running — will not start a new container.") # Сканирование по всем выбранным последовательностям в рамках одного запуска контейнера new_results = [] # Директория для сохранения реконструированных изображений os.makedirs(images_dir, exist_ok=True) for i, seq_label in enumerate(seq_choices): # Проверка запроса остановки перед началом следующей последовательности if st.session_state.get("koma_stop_requested"): status_box.warning("Stop requested. Finishing current cycle and shutting down container…") break seq_path = seq_label_map[seq_label] seq_base = os.path.splitext(os.path.basename(seq_path))[0] ph_base = os.path.splitext(os.path.basename(phantom_path))[0] # Добавляем метку времени, чтобы исключить перезапись файлов между запусками ts_str = datetime.now().strftime('%Y%m%d_%H%M%S_%f') raw_name = f"raw_{ph_base}_{seq_base}_{ts_str}.h5" raw_out = os.path.join(raw_dir, raw_name) # Путь к JSON порядка укладки k-space рядом с выбранной последовательностью ks_order_path = os.path.splitext(seq_path)[0] + "_k_space_order_filing.json" status_box.info("Scanning of the set number: "+str(i+1)) ok, err = scan_once(phantom_path, seq_path, raw_out) if not ok: status_box.error(f"Scan error for {seq_base}: {err}") continue status_box.success(f"Raw data saved: {raw_out}") # Реконструкция предпросмотра (with/without sort) if os.path.isfile(ks_order_path): _, img = process_hdf5_with_sort(raw_out, ks_order_path, plot=False) else: img = process_hdf5_without_sort(raw_out, plot=False) # Нормализация пригодится при последующем отображении в постоянном блоке, # но здесь больше не рендерим превью и кнопки, чтобы избежать дублирования # текущих и предыдущих результатов на странице. Отрисовка выполняется # только в блоке «Previous scan results» ниже. _ = float(np.min(img)) # no-op to ensure img was computed without errors # Сохраняем реконструкцию как .npy, чтобы кнопка "Download all images (ZIP)" # включала результаты одиночного сканирования по всем выбранным ИП img_fname = f"{ph_base}_{seq_base}_{ts_str}.npy" try: np.save(os.path.join(images_dir, img_fname), img) except Exception as _save_err: # Не прерываем весь цикл, просто сообщаем статус status_box.warning(f"Failed to save image for {seq_base}: {_save_err}") # Сохраняем результат для повторного показа после rerun/навигации new_results.append({ "seq_label": seq_label, "seq_path": seq_path, "ks_order_path": ks_order_path, "raw_out": raw_out, }) # Перезаписываем «последние результаты» с текущего запуска if new_results: st.session_state.koma_last_results = new_results except Exception as e: status_box.error(f"KOMA error: {e}") finally: # Останавливаем контейнер только если запускали сами try: if 'started_by_us' in locals() and started_by_us: stop_container() # Сбрасываем флаг остановки после завершения if st.session_state.get("koma_stop_requested"): st.session_state.koma_stop_requested = False except Exception: pass # Пакетное сканирование всех фантомов if run_all: status_box.info("Batch: preparing KOMA container and scanning all phantoms…") if not seq_choices: status_box.warning("Please choose at least one sequence to scan.") st.stop() # Принудительно очищаем выходные папки перед пакетным сканированием raw_dir_all = os.path.join("download", "rawdata") images_dir_all = os.path.join("download", "images") _clear_output_dirs([raw_dir_all, images_dir_all], status_cb=status_box) # Также сбросим предыдущие результаты предпросмотра st.session_state.koma_last_results = [] try: # Автозапуск контейнера только если сервис недоступен started_by_us = False if not is_koma_available(): status_box.info("KOMA is not reachable — starting container…") rc = run_container(use_gpu=bool(st.session_state.get("koma_use_gpu", False))) if rc != 0: if not is_koma_available(): status_box.error("Failed to start KOMA container: check if the Docker engine is running") st.stop() else: started_by_us = True if not _wait_for_koma_ready(status_box): status_box.error("KOMA did not become ready in time after start. Aborting batch scan.") try: stop_container() except Exception: pass st.stop() else: status_box.info("KOMA is already running — will not start a new container.") # запуск пакетного сканирования для каждой выбранной последовательности for seq_label in seq_choices: if st.session_state.get("koma_stop_requested"): status_box.warning("Stop requested. Aborting batch after current stage and shutting down container…") break seq_path = seq_label_map[seq_label] seq_dir_abs = os.path.dirname(seq_path) seq_basename = os.path.splitext(os.path.basename(seq_path))[0] scan_and_reconstruct( phantoms_dir=PHANTOM_OUTPUT_PATH, seq_dir=seq_dir_abs, seq_basename=seq_basename, raw_dir=os.path.join("download", "rawdata"), img_dir=os.path.join("download", "images"), ks_dir=os.path.join("download", "k_space"), plot=False, ) status_box.success("Batch scan finished for all selected sequences. Files saved to download/rawdata, download/images and download/k_space") # Подготовка ZIP-архива со всеми результатами. Кнопку скачивания # в этом же рендер-проходе не показываем, чтобы избежать дублирования # с постоянным блоком ниже. Кнопка будет отрисована из session_state. try: buf = io.BytesIO() with zipfile.ZipFile(buf, mode="w", compression=zipfile.ZIP_DEFLATED) as zf: base_dirs = [ os.path.join("download", "rawdata"), os.path.join("download", "images"), os.path.join("download", "k_space"), ] for bdir in base_dirs: if not os.path.isdir(bdir): continue for root, _dirs, files in os.walk(bdir): for f in files: full_path = os.path.join(root, f) # относительный путь внутри архива — от папки download/ arcname = os.path.relpath(full_path, start="download") zf.write(full_path, arcname=arcname) buf.seek(0) zip_bytes = buf.read() ts = int(time.time()) zip_name = f"koma_batch_results_{ts}.zip" # Сохраняем ZIP в состоянии; кнопка будет выведена в постоянном блоке ниже st.session_state.koma_batch_zip = {"bytes": zip_bytes, "name": zip_name} except Exception as e: st.warning(f"Failed to prepare ZIP with all results: {e}") except Exception as e: status_box.error(f"Batch KOMA error: {e}") finally: try: if 'started_by_us' in locals() and started_by_us: stop_container() if st.session_state.get("koma_stop_requested"): st.session_state.koma_stop_requested = False except Exception: pass # -------- Постоянный рендер результатов (сохраняется между кликами/навигацией) -------- if st.session_state.koma_last_results: st.markdown("---") st.markdown("#### Previous scan results") for res in st.session_state.koma_last_results: seq_label = res.get("seq_label") raw_out = res.get("raw_out") ks_order_path = res.get("ks_order_path") if not raw_out or not os.path.isfile(raw_out): # файл могли удалить — пропускаем continue # Пытаемся построить изображение предпросмотра на лету try: if ks_order_path and os.path.isfile(ks_order_path): _, img = process_hdf5_with_sort(raw_out, ks_order_path, plot=False) else: img = process_hdf5_without_sort(raw_out, plot=False) vmin, vmax = float(np.min(img)), float(np.max(img)) img_disp = (img - vmin) / (vmax - vmin) if vmax > vmin else img except Exception: img_disp = None img = None seq_base = os.path.splitext(os.path.basename(res.get("seq_path", ""))) [0] col_img2, col_ctrl2 = st.columns([1, 2], gap="small") with col_img2: if img_disp is not None: st.image(img_disp, caption=f"Preview: {seq_label}", use_container_width=True, clamp=True) else: st.caption(f"Preview unavailable for {seq_label}") with col_ctrl2: st.download_button( f"Download raw (.h5): {seq_base}", data=open(raw_out, 'rb').read(), file_name=os.path.basename(raw_out), mime="application/octet-stream", key=f"dl_raw_prev_{os.path.splitext(os.path.basename(raw_out))[0]}", use_container_width=True, ) # Дополнительные кнопки скачивания реконструированного изображения рядом с превью # Формируем базовое имя файла изображения из имени raw (убираем префикс raw_) raw_base = os.path.splitext(os.path.basename(raw_out))[0] img_base = raw_base[4:] if raw_base.startswith("raw_") else raw_base # Кнопка скачать .npy (текущее реконструированное изображение) if img is not None: try: npy_buf = io.BytesIO() np.save(npy_buf, img) npy_buf.seek(0) st.download_button( f"Download image (.npy): {seq_base}", data=npy_buf.getvalue(), file_name=f"{img_base}.npy", mime="application/octet-stream", key=f"dl_img_npy_{raw_base}", use_container_width=True, ) except Exception: pass # Кнопка скачать .png (нормализованное изображение 0..255) try: # Безопасная нормализация для PNG vmin2, vmax2 = float(np.min(img)), float(np.max(img)) if vmax2 > vmin2: img_norm = (img - vmin2) / (vmax2 - vmin2) else: img_norm = np.zeros_like(img, dtype=np.float32) img_uint8 = (np.clip(img_norm, 0, 1) * 255).astype(np.uint8) png_img = Image.fromarray(img_uint8) png_buf = io.BytesIO() png_img.save(png_buf, format="PNG") png_buf.seek(0) st.download_button( f"Download image (.png): {seq_base}", data=png_buf.getvalue(), file_name=f"{img_base}.png", mime="image/png", key=f"dl_img_png_{raw_base}", use_container_width=True, ) except Exception: pass # Кнопка «скачать всё» сохранённая после batch-сканирования if st.session_state.koma_batch_zip: st.markdown("---") st.download_button( "Download all results (ZIP)", data=st.session_state.koma_batch_zip["bytes"], file_name=st.session_state.koma_batch_zip["name"], mime="application/zip", use_container_width=True, key="dl_koma_batch_zip_saved", ) # Дополнительно: отдельные кнопки «скачать все изображения» и «скачать весь k_space» # Кнопки доступны независимо от пакетного режима, если в соответствующих папках есть файлы images_dir = os.path.join("download", "images") kspace_dir = os.path.join("download", "rawdata") def _zip_dir_bytes(base_dir: str, arc_base: str) -> tuple[bytes | None, str | None]: try: if not os.path.isdir(base_dir): return None, None has_files = any( f for _r, _d, fs in os.walk(base_dir) for f in fs ) if not has_files: return None, None buf = io.BytesIO() with zipfile.ZipFile(buf, mode="w", compression=zipfile.ZIP_DEFLATED) as zf: for root, _dirs, files in os.walk(base_dir): for f in files: full_path = os.path.join(root, f) # сохраняем структуру относительно корня download/ arcname = os.path.relpath(full_path, start="download") # Вложим в архив, сохраняя структуру относительно корня download/ zf.write(full_path, arcname=arcname) buf.seek(0) ts = int(time.time()) name = f"{arc_base}_{ts}.zip" return buf.read(), name except Exception: return None, None # Собираем ZIP для изображений img_zip_bytes, img_zip_name = _zip_dir_bytes(images_dir, "koma_images") # Собираем ZIP для k-space ks_zip_bytes, ks_zip_name = _zip_dir_bytes(kspace_dir, "koma_k_space") if img_zip_bytes or ks_zip_bytes: st.markdown("---") cols_zip = st.columns(2) with cols_zip[0]: if img_zip_bytes and img_zip_name: st.download_button( "Download all images (ZIP)", data=img_zip_bytes, file_name=img_zip_name, mime="application/zip", use_container_width=True, key="dl_koma_all_images_zip", ) else: st.caption("No images found in download/images") with cols_zip[1]: if ks_zip_bytes and ks_zip_name: st.download_button( "Download all k-spaces (ZIP)", data=ks_zip_bytes, file_name=ks_zip_name, mime="application/zip", use_container_width=True, key="dl_koma_all_kspace_zip", ) else: st.caption("No k-spaces found in download/k_space") # ---------- Router ---------- if st.session_state.page == "home": page_home() elif st.session_state.page == "phantom": page_phantom() elif st.session_state.page == "dataset": page_dataset() else: st.session_state.page = "home" page_home()