| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103 |
- import asyncio
- import logging
- import os
- from datetime import datetime
- from seq_interp.src.hardware.constraints import HardwareConstraints
- from seq_interp.src.interfaces.pulseq_adapter import PulseqLoader
- from seq_interp.src.core.synchronizer import Synchronizer
- from seq_interp.src.interfaces.xml_generator import XMLGenerator
- from seq_interp.src.interfaces.rf_exporter import RFExporter
- from seq_interp.src.interfaces.gradient_exporter import GradientExporter
- from seq_interp.src.interfaces.picoscope_exporter import PicoScopeExporter
- from seq_interp.src.interfaces.post_request_generator import PostRequestGenerator
- from seq_interp.src.config import config
- async def main(sequence_path: str = None, hw_config_path: str = None, output_dir: str = "output") -> None:
- """
- Точка входа интерпретатора MRI-последовательности.
- """
- log_dir = "log"
- os.makedirs(log_dir, exist_ok=True)
- log_filename = os.path.join(log_dir, f"interp_log_{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.log")
- logging.basicConfig(
- level=logging.INFO,
- format="%(asctime)s [%(levelname)s] %(message)s",
- handlers=[logging.FileHandler(log_filename), logging.StreamHandler()],
- )
- logger = logging.getLogger("MRISequenceInterpreter")
- if not sequence_path:
- raise ValueError("sequence_path is required")
- hw = HardwareConstraints(json_path=hw_config_path)
- loader = PulseqLoader(hw)
- seq_data = loader.load(sequence_path)
- os.makedirs(output_dir, exist_ok=True)
- sync_sequence = seq_data["sequence"]
- params = seq_data["params"]
- synchronizer = Synchronizer(hw)
- sync_data = synchronizer.process(sync_sequence)
- xml_generator = XMLGenerator()
- rf_exporter = RFExporter()
- grad_exporter = GradientExporter()
- pico_exporter = PicoScopeExporter()
- post_generator = PostRequestGenerator()
- hw_cfg = config.hw_config # полный словарь из hw_config.json
- xml_path = os.path.join(output_dir, "sync_v2.xml")
- adc_values, adc_starts = await asyncio.to_thread(xml_generator.generate, sync_data, xml_path, hw)
- tasks = []
- tasks.append(asyncio.to_thread(rf_exporter.export, seq_data, params, output_dir))
- if all(k in seq_data for k in ["gx", "gy", "gz", "t_gx", "t_gy", "t_gz"]):
- tasks.append(asyncio.to_thread(grad_exporter.export, seq_data, params, output_dir))
- iadc_cfg = hw_cfg.get("iadc", {})
- tasks.append(asyncio.to_thread(
- pico_exporter.generate,
- adc_values, adc_starts, output_dir, hw,
- sampling_freq=iadc_cfg.get("srate", 8e6),
- num_channels=iadc_cfg.get("n_channels", 3),
- ))
- await asyncio.gather(*tasks)
- # Генерация и (опционально) отправка POST-манифеста
- post_payload = post_generator.build(
- seq_data=seq_data,
- adc_values=adc_values,
- sequence_path=sequence_path,
- output_dir=output_dir,
- hw_cfg=hw_cfg,
- rf_raster_time=params["rf_raster_time"],
- )
- post_generator.write(post_payload, output_dir)
- hw_service_url = hw_cfg.get("hw_service_url")
- if hw_service_url:
- await post_generator.send(post_payload, hw_service_url)
- logger.info("Интерпретация завершена, результаты записаны в: %s", output_dir)
- if __name__ == "__main__":
- script_dir = os.path.dirname(os.path.abspath(__file__))
- project_root = os.path.dirname(os.path.dirname(script_dir))
- asyncio.run(
- main(
- sequence_path=os.path.join(project_root, "seq_interp", "data", "input", "rf_adc_only_fixed3 (4).seq"),
- hw_config_path=os.path.join(project_root, "seq_interp", "cfg", "hw_config.json"),
- output_dir=os.path.join(project_root, "seq_interp", "data", "output"),
- )
- )
|