| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100 |
- """
- FastAPI application for remote MRI sequence interpretation.
- Endpoints:
- POST /interpret/ — upload a .seq file, run full pipeline, return task_id
- GET /status/ — return status of all processed tasks
- """
- import asyncio
- import os
- import shutil
- from fastapi import FastAPI, File, UploadFile
- from src.config import config
- from src.hardware.constraints import HardwareConstraints
- from src.interfaces.pulseq_adapter import PulseqLoader
- from src.core.synchronizer import Synchronizer
- from src.interfaces.xml_generator import XMLGenerator
- from src.interfaces.rf_exporter import RFExporter
- from src.interfaces.gradient_exporter import GradientExporter
- from src.interfaces.picoscope_exporter import PicoScopeExporter
- from src.interfaces.post_request_generator import PostRequestGenerator
- app = FastAPI(title="LF-MRI Sequence Interpreter API")
- UPLOAD_DIR = config.get("upload_dir", "uploads")
- OUTPUT_DIR = config.get("output_dir", "output")
- os.makedirs(UPLOAD_DIR, exist_ok=True)
- os.makedirs(OUTPUT_DIR, exist_ok=True)
- _tasks: dict[str, str] = {}
- async def _run_pipeline(seq_path: str, task_id: str) -> None:
- try:
- hw = HardwareConstraints()
- loader = PulseqLoader(hw)
- seq_data = await asyncio.to_thread(loader.load, seq_path)
- synchronizer = Synchronizer(hw)
- sync_data = await asyncio.to_thread(synchronizer.process, seq_data["sequence"])
- out_dir = os.path.join(OUTPUT_DIR, task_id)
- os.makedirs(out_dir, exist_ok=True)
- hw_cfg = config.hw_config
- xml_gen = XMLGenerator()
- adc_values, adc_starts = await asyncio.to_thread(
- xml_gen.generate, sync_data, os.path.join(out_dir, "sync_v2.xml"), hw
- )
- tasks = [asyncio.to_thread(RFExporter().export, seq_data, seq_data.get("params", {}), out_dir)]
- if all(k in seq_data for k in ["gx", "gy", "gz"]):
- tasks.append(asyncio.to_thread(
- GradientExporter().export, seq_data, seq_data.get("params", {}), out_dir
- ))
- iadc = hw_cfg.get("iadc", {})
- tasks.append(asyncio.to_thread(
- PicoScopeExporter().generate,
- adc_values, adc_starts, out_dir, hw,
- sampling_freq=iadc.get("srate", 8e6),
- num_channels=iadc.get("n_channels", 3),
- ))
- await asyncio.gather(*tasks)
- post_gen = PostRequestGenerator()
- post_payload = post_gen.build(
- seq_data=seq_data,
- adc_values=adc_values,
- sequence_path=seq_path,
- output_dir=out_dir,
- hw_cfg=hw_cfg,
- rf_raster_time=seq_data.get("params", {}).get("rf_raster_time", 1e-6),
- )
- post_gen.write(post_payload, out_dir)
- _tasks[task_id] = f"Completed → {out_dir}"
- except Exception as exc:
- _tasks[task_id] = f"Failed: {exc}"
- @app.post("/interpret/")
- async def interpret_endpoint(file: UploadFile = File(...)):
- """Upload a .seq file and run the full interpretation pipeline."""
- file_path = os.path.join(UPLOAD_DIR, file.filename)
- with open(file_path, "wb") as buf:
- shutil.copyfileobj(file.file, buf)
- task_id = os.path.splitext(file.filename)[0]
- _tasks[task_id] = "Processing"
- asyncio.create_task(_run_pipeline(file_path, task_id))
- return {"status": "accepted", "task_id": task_id,
- "message": f"Processing {file.filename}"}
- @app.get("/status/")
- async def status_endpoint():
- """Return the status of all submitted tasks."""
- return {"tasks": _tasks}
|