import time import os import io import zipfile from pathlib import Path from typing import List, Tuple from PIL import Image import streamlit as st st.set_page_config( page_title="MRI physics based augmentation", page_icon="🧠", layout="wide" ) # ---------- Simple router in session_state ---------- if "page" not in st.session_state: st.session_state.page = "home" # storage for generated phantom (appears after progress completes) if "phantom_blob" not in st.session_state: st.session_state.phantom_blob = None if "phantom_name" not in st.session_state: st.session_state.phantom_name = None def nav_to(p: str): st.session_state.page = p st.title("MRI physics based augmentation") # ---------- Theme-aware logo (top-right, base64-embedded) ---------- import base64 def _b64_img(path: str) -> str | None: try: with open(path, "rb") as f: return base64.b64encode(f.read()).decode("ascii") except Exception: return None def render_theme_logo(light_path: str = "logos/NEW_PHYSTECH_for_light.png", dark_path: str = "logos/NEW_PHYSTECH_for_dark.png", size_px: int = 250): """ Рисует фиксированный логотип справа сверху, меняющийся по теме. Картинки вшиваются через base64, чтобы избежать проблем с путями. """ light_b64 = _b64_img(light_path) dark_b64 = _b64_img(dark_path) # Если нет файлов, просто ничего не рисуем (и даём подсказку внизу страницы) if not light_b64 and not dark_b64: return light_src = f"data:image/png;base64,{light_b64}" if light_b64 else "" dark_src = f"data:image/png;base64,{dark_b64}" if dark_b64 else light_src html = f"""
logo logo
""" st.markdown(html, unsafe_allow_html=True) render_theme_logo() # ---------- Image helpers ---------- SUPPORTED_EXTS = (".png", ".jpg", ".jpeg", ".bmp", ".tif", ".tiff") SUPPORTED_EXTS_PHANTOM = (".dcm", ".nii", ".nii.gz", ".nrrd", ".npy", ".png", ".jpg", ".jpeg") MAX_VALUE_DATASET = 100000 def center_crop_to_square(img: Image.Image) -> Image.Image: """Center-crop PIL image to a square based on the smaller side.""" w, h = img.size s = min(w, h) left = (w - s) // 2 top = (h - s) // 2 return img.crop((left, top, left + s, top + s)) def load_and_prepare_assets(asset_dir: str = "assets", count: int = 3, size: Tuple[int, int] = (320, 320)) -> List[Tuple[Image.Image, str]]: """Load up to `count` images from asset_dir, center-crop to square, resize to `size`.""" results = [] if not os.path.isdir(asset_dir): return results files = sorted([f for f in os.listdir(asset_dir) if os.path.splitext(f.lower())[1] in SUPPORTED_EXTS]) for fname in files[:count]: path = os.path.join(asset_dir, fname) try: img = Image.open(path).convert("RGB") img = center_crop_to_square(img) img = img.resize(size, Image.LANCZOS) results.append((img, fname)) except Exception: continue return results def run_job_stub(status_placeholder, progress_placeholder, steps=None, delay=0.9): """Simulate a long-running job with progress and 3-line status stream. Returns True when finished.""" if steps is None: steps = [ "Инициализация пайплайна...", "Обработка входных данных...", "Генерация синтетических изображений...", "Постобработка результатов...", "Готово!", ] progress = 0 last3 = [] progress_placeholder.progress(progress, text="Waiting to start...") status_placeholder.markdown("") for i, msg in enumerate(steps, 1): last3.append(msg) last3 = last3[-3:] # keep only the last 3 # Newest at the top for the "falls down" feel: lines = [] for idx, line in enumerate(reversed(last3)): if idx == 0: lines.append(f"- **{line}**") elif idx == 1: lines.append(f"- {line}") else: lines.append(f"- {line}") status_placeholder.markdown("
".join(lines), unsafe_allow_html=True) progress = int(i * 100 / len(steps)) progress_placeholder.progress(progress, text=f"Progress: {progress}%") time.sleep(delay) return True def make_demo_zip() -> bytes: """Create a demo ZIP to download as 'phantom' result.""" buf = io.BytesIO() with zipfile.ZipFile(buf, "w", compression=zipfile.ZIP_DEFLATED) as zf: zf.writestr("phantom/README.txt", "Demo phantom result. Replace with real generated files.") buf.seek(0) return buf.getvalue() # ---------- Pages ---------- def page_home(): st.subheader("Информация о контрастах и генерации фантомов") st.write( "Здесь можно разместить описание ИП (T1/T2/PD), принципы формирования данных и примеры сканов. " "Картинки ниже подгружаются из папки `assets/`, **автоматически центр-обрезаются в квадрат** и " "**приводятся к одинаковому размеру** (320×320)." ) # Load 3 prepared images from assets images = load_and_prepare_assets("assets", count=3, size=(320, 320)) if images: cols = st.columns(len(images)) for (img, name), col in zip(images, cols): with col: st.image(img, caption=name, use_container_width=False) else: st.info("Положите 1–3 изображения в папку `assets/` (png/jpg/tif), и они появятся здесь одинакового размера.") st.markdown("---") c1, c2 = st.columns(2) with c1: with st.container(border=True): st.markdown("#### 🧠 Phantom generation") st.write("Upload T1/T2/PD images and begin the generation.") st.button("Move to the phantom", type="primary", use_container_width=True, on_click=nav_to, args=("phantom",)) with c2: with st.container(border=True): st.markdown("#### 📦 Dataset generation") st.write("Generation of a dataset based on pulse sequence parameters.") st.button("Move to the dataset", type="primary", use_container_width=True, on_click=nav_to, args=("dataset",)) def page_phantom(): st.button("← Homepage", on_click=nav_to, args=("home",)) st.subheader("Generate the phantom") st.caption("Please upload T1/T2/PD images of a brain scan") c1, c2, c3 = st.columns(3) with c1: t1_file = st.file_uploader("T1", type=SUPPORTED_EXTS_PHANTOM) with c2: t2_file = st.file_uploader("T2", type=SUPPORTED_EXTS_PHANTOM) with c3: pd_file = st.file_uploader("PD", type=SUPPORTED_EXTS_PHANTOM) start_btn = st.button("Begin generation", type="primary") progress_ph = st.empty() statuses_ph = st.empty() if start_btn: if not (t1_file and t2_file and pd_file): st.error("Нужно загрузить все три файла: T1, T2, PD.") else: done = run_job_stub(statuses_ph, progress_ph) if done: # Save demo result into session_state for download button st.session_state.phantom_blob = make_demo_zip() st.session_state.phantom_name = "phantom_demo.zip" st.success("Готово! Можно скачать результат.") # The download button appears only when phantom_blob is present (after job completes) if st.session_state.phantom_blob: st.download_button( "Download phantom", data=st.session_state.phantom_blob, file_name=st.session_state.phantom_name or "phantom_demo.zip", mime="application/zip", use_container_width=False, type="primary", ) def page_dataset(): st.button("← Homepage", on_click=nav_to, args=("home",)) st.subheader("Generate the dataset") st.caption("Dataset generation. Please choose parameters for the pulse sequence.") c1, c2, c3 = st.columns(3) with c1: t1d_file = st.file_uploader("T1", key="t1d", type=SUPPORTED_EXTS_PHANTOM) with c2: t2d_file = st.file_uploader("T2", key="t2d", type=SUPPORTED_EXTS_PHANTOM) with c3: pdd_file = st.file_uploader("PD", key="pdd", type=SUPPORTED_EXTS_PHANTOM) count = st.number_input("Сколько примеров сгенерировать", min_value=1, max_value=MAX_VALUE_DATASET, value=50, step=1) start_ds = st.button("Начать генерацию датасета", type="primary") progress2 = st.empty() status2 = st.empty() if start_ds: if not (t1d_file and t2d_file and pdd_file): st.error("Нужно загрузить все три файла: T1, T2, PD.") else: run_job_stub(status2, progress2) st.success(f"Датасет сформирован (демо). Количество: {int(count)}") # ---------- Router ---------- if st.session_state.page == "home": page_home() elif st.session_state.page == "phantom": page_phantom() elif st.session_state.page == "dataset": page_dataset() else: st.session_state.page = "home" page_home()