import time
import os
import io
import zipfile
from pathlib import Path
from typing import List, Tuple
from PIL import Image
import streamlit as st
st.set_page_config(
page_title="MRI physics based augmentation",
page_icon="🧠",
layout="wide"
)
# ---------- Simple router in session_state ----------
if "page" not in st.session_state:
st.session_state.page = "home"
# storage for generated phantom (appears after progress completes)
if "phantom_blob" not in st.session_state:
st.session_state.phantom_blob = None
if "phantom_name" not in st.session_state:
st.session_state.phantom_name = None
def nav_to(p: str):
st.session_state.page = p
st.title("MRI physics based augmentation")
# ---------- Theme-aware logo (top-right, base64-embedded) ----------
import base64
def _b64_img(path: str) -> str | None:
try:
with open(path, "rb") as f:
return base64.b64encode(f.read()).decode("ascii")
except Exception:
return None
def render_theme_logo(light_path: str = "logos/NEW_PHYSTECH_for_light.png",
dark_path: str = "logos/NEW_PHYSTECH_for_dark.png",
size_px: int = 250):
"""
Рисует фиксированный логотип справа сверху, меняющийся по теме.
Картинки вшиваются через base64, чтобы избежать проблем с путями.
"""
light_b64 = _b64_img(light_path)
dark_b64 = _b64_img(dark_path)
# Если нет файлов, просто ничего не рисуем (и даём подсказку внизу страницы)
if not light_b64 and not dark_b64:
return
light_src = f"data:image/png;base64,{light_b64}" if light_b64 else ""
dark_src = f"data:image/png;base64,{dark_b64}" if dark_b64 else light_src
html = f"""
"""
st.markdown(html, unsafe_allow_html=True)
render_theme_logo()
# ---------- Image helpers ----------
SUPPORTED_EXTS = (".png", ".jpg", ".jpeg", ".bmp", ".tif", ".tiff")
SUPPORTED_EXTS_PHANTOM = (".dcm", ".nii", ".nii.gz", ".nrrd", ".npy", ".png", ".jpg", ".jpeg")
MAX_VALUE_DATASET = 100000
def center_crop_to_square(img: Image.Image) -> Image.Image:
"""Center-crop PIL image to a square based on the smaller side."""
w, h = img.size
s = min(w, h)
left = (w - s) // 2
top = (h - s) // 2
return img.crop((left, top, left + s, top + s))
def load_and_prepare_assets(asset_dir: str = "assets", count: int = 3, size: Tuple[int, int] = (320, 320)) -> List[Tuple[Image.Image, str]]:
"""Load up to `count` images from asset_dir, center-crop to square, resize to `size`."""
results = []
if not os.path.isdir(asset_dir):
return results
files = sorted([f for f in os.listdir(asset_dir) if os.path.splitext(f.lower())[1] in SUPPORTED_EXTS])
for fname in files[:count]:
path = os.path.join(asset_dir, fname)
try:
img = Image.open(path).convert("RGB")
img = center_crop_to_square(img)
img = img.resize(size, Image.LANCZOS)
results.append((img, fname))
except Exception:
continue
return results
def run_job_stub(status_placeholder, progress_placeholder, steps=None, delay=0.9):
"""Simulate a long-running job with progress and 3-line status stream.
Returns True when finished."""
if steps is None:
steps = [
"Инициализация пайплайна...",
"Обработка входных данных...",
"Генерация синтетических изображений...",
"Постобработка результатов...",
"Готово!",
]
progress = 0
last3 = []
progress_placeholder.progress(progress, text="Waiting to start...")
status_placeholder.markdown("")
for i, msg in enumerate(steps, 1):
last3.append(msg)
last3 = last3[-3:] # keep only the last 3
# Newest at the top for the "falls down" feel:
lines = []
for idx, line in enumerate(reversed(last3)):
if idx == 0:
lines.append(f"- **{line}**")
elif idx == 1:
lines.append(f"- {line}")
else:
lines.append(f"- {line}")
status_placeholder.markdown("
".join(lines), unsafe_allow_html=True)
progress = int(i * 100 / len(steps))
progress_placeholder.progress(progress, text=f"Progress: {progress}%")
time.sleep(delay)
return True
def make_demo_zip() -> bytes:
"""Create a demo ZIP to download as 'phantom' result."""
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w", compression=zipfile.ZIP_DEFLATED) as zf:
zf.writestr("phantom/README.txt", "Demo phantom result. Replace with real generated files.")
buf.seek(0)
return buf.getvalue()
# ---------- Pages ----------
def page_home():
st.subheader("Информация о контрастах и генерации фантомов")
st.write(
"Здесь можно разместить описание ИП (T1/T2/PD), принципы формирования данных и примеры сканов. "
"Картинки ниже подгружаются из папки `assets/`, **автоматически центр-обрезаются в квадрат** и "
"**приводятся к одинаковому размеру** (320×320)."
)
# Load 3 prepared images from assets
images = load_and_prepare_assets("assets", count=3, size=(320, 320))
if images:
cols = st.columns(len(images))
for (img, name), col in zip(images, cols):
with col:
st.image(img, caption=name, use_container_width=False)
else:
st.info("Положите 1–3 изображения в папку `assets/` (png/jpg/tif), и они появятся здесь одинакового размера.")
st.markdown("---")
c1, c2 = st.columns(2)
with c1:
with st.container(border=True):
st.markdown("#### 🧠 Phantom generation")
st.write("Upload T1/T2/PD images and begin the generation.")
st.button("Move to the phantom", type="primary", use_container_width=True, on_click=nav_to, args=("phantom",))
with c2:
with st.container(border=True):
st.markdown("#### 📦 Dataset generation")
st.write("Generation of a dataset based on pulse sequence parameters.")
st.button("Move to the dataset", type="primary", use_container_width=True, on_click=nav_to, args=("dataset",))
def page_phantom():
st.button("← Homepage", on_click=nav_to, args=("home",))
st.subheader("Generate the phantom")
st.caption("Please upload T1/T2/PD images of a brain scan")
c1, c2, c3 = st.columns(3)
with c1:
t1_file = st.file_uploader("T1", type=SUPPORTED_EXTS_PHANTOM)
with c2:
t2_file = st.file_uploader("T2", type=SUPPORTED_EXTS_PHANTOM)
with c3:
pd_file = st.file_uploader("PD", type=SUPPORTED_EXTS_PHANTOM)
start_btn = st.button("Begin generation", type="primary")
progress_ph = st.empty()
statuses_ph = st.empty()
if start_btn:
if not (t1_file and t2_file and pd_file):
st.error("Нужно загрузить все три файла: T1, T2, PD.")
else:
done = run_job_stub(statuses_ph, progress_ph)
if done:
# Save demo result into session_state for download button
st.session_state.phantom_blob = make_demo_zip()
st.session_state.phantom_name = "phantom_demo.zip"
st.success("Готово! Можно скачать результат.")
# The download button appears only when phantom_blob is present (after job completes)
if st.session_state.phantom_blob:
st.download_button(
"Download phantom",
data=st.session_state.phantom_blob,
file_name=st.session_state.phantom_name or "phantom_demo.zip",
mime="application/zip",
use_container_width=False,
type="primary",
)
def page_dataset():
st.button("← Homepage", on_click=nav_to, args=("home",))
st.subheader("Generate the dataset")
st.caption("Dataset generation. Please choose parameters for the pulse sequence.")
c1, c2, c3 = st.columns(3)
with c1:
t1d_file = st.file_uploader("T1", key="t1d", type=SUPPORTED_EXTS_PHANTOM)
with c2:
t2d_file = st.file_uploader("T2", key="t2d", type=SUPPORTED_EXTS_PHANTOM)
with c3:
pdd_file = st.file_uploader("PD", key="pdd", type=SUPPORTED_EXTS_PHANTOM)
count = st.number_input("Сколько примеров сгенерировать", min_value=1, max_value=MAX_VALUE_DATASET, value=50, step=1)
start_ds = st.button("Начать генерацию датасета", type="primary")
progress2 = st.empty()
status2 = st.empty()
if start_ds:
if not (t1d_file and t2d_file and pdd_file):
st.error("Нужно загрузить все три файла: T1, T2, PD.")
else:
run_job_stub(status2, progress2)
st.success(f"Датасет сформирован (демо). Количество: {int(count)}")
# ---------- Router ----------
if st.session_state.page == "home":
page_home()
elif st.session_state.page == "phantom":
page_phantom()
elif st.session_state.page == "dataset":
page_dataset()
else:
st.session_state.page = "home"
page_home()