test_synchronizer.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. """
  2. Tests for Synchronizer.process — the synchro-sequence builder.
  3. Covers the constraints added for the LF-MRI hardware:
  4. * minimum event duration = 20 clock ticks (real duration, TR preserved);
  5. * maximum block duration = 999999 ticks (long blocks split into equal parts);
  6. * ADC stays a single continuous HIGH level when its block is split;
  7. * the TR/RF/START delay enable flags (no 1/0-tick artifacts);
  8. * parallel gate arrays stay length-balanced.
  9. """
  10. from __future__ import annotations
  11. import pytest
  12. from seq_interp.src.core.synchronizer import Synchronizer
  13. from seq_interp.src.hardware.constraints import HardwareConstraints
  14. TIMER = 20e-9 # MIN_BLOCK_DURATION — one clock tick
  15. MIN_TICKS = 20
  16. MAX_TICKS = 999999
  17. def _hw(**flags) -> HardwareConstraints:
  18. hw = HardwareConstraints()
  19. for k, v in flags.items():
  20. setattr(hw, k, v)
  21. return hw
  22. def _ticks(sync_data) -> list[int]:
  23. t = sync_data["synchro_block_timer"]
  24. return [round(d / t) for d in sync_data["blocks_duration"]]
  25. def _array_lengths(sync_data) -> set[int]:
  26. return {len(sync_data[k]) for k in
  27. ("gate_adc", "gate_rf", "gate_tr_switch", "blocks_duration")}
  28. # --------------------------------------------------------------------------- #
  29. # Array bookkeeping
  30. # --------------------------------------------------------------------------- #
  31. def test_gate_arrays_balanced_and_seed_offset(make_seq):
  32. """All gate arrays share one length == number_of_blocks + 1 (leading seed)."""
  33. seq = make_seq([{"rf": True, "dur": 1e-3},
  34. {"dur": 5e-3},
  35. {"adc": True, "dur": 2e-3}])
  36. d = Synchronizer(_hw()).process(seq)
  37. lengths = _array_lengths(d)
  38. assert len(lengths) == 1
  39. assert lengths.pop() == d["number_of_blocks"] + 1
  40. # --------------------------------------------------------------------------- #
  41. # Minimum event duration (20 ticks), TR preserved
  42. # --------------------------------------------------------------------------- #
  43. def test_min_event_is_20_ticks(make_seq):
  44. """No emitted block is shorter than 20 ticks, none is zero."""
  45. seq = make_seq([{"rf": True, "dur": 1e-3},
  46. {"adc": True, "dur": 2e-3}])
  47. d = Synchronizer(_hw()).process(seq)
  48. assert min(_ticks(d)) >= MIN_TICKS
  49. def test_total_duration_preserved_by_min_clamp(make_seq):
  50. """Stretching short blocks is compensated from a neighbour — TR unchanged."""
  51. seq = make_seq([{"rf": True, "dur": 1e-3},
  52. {"dur": 5e-3},
  53. {"adc": True, "dur": 2e-3}])
  54. hw = _hw()
  55. total_in = sum(seq.block_durations.values()) + max(hw.START_DELAY, hw.RF_DELAY)
  56. d = Synchronizer(hw).process(seq)
  57. assert sum(d["blocks_duration"]) == pytest.approx(total_in)
  58. # --------------------------------------------------------------------------- #
  59. # Maximum block duration (999999 ticks) — splitting
  60. # --------------------------------------------------------------------------- #
  61. def test_long_block_split_under_max(make_seq):
  62. """A 50 ms ADC block (2.5M ticks) is split so every part <= 999999 ticks."""
  63. seq = make_seq([{"adc": True, "dur": 50e-3}])
  64. d = Synchronizer(_hw()).process(seq)
  65. assert max(_ticks(d)) <= MAX_TICKS
  66. def test_split_preserves_total_duration(make_seq):
  67. seq = make_seq([{"adc": True, "dur": 50e-3}])
  68. hw = _hw()
  69. total_in = 50e-3 + max(hw.START_DELAY, hw.RF_DELAY)
  70. d = Synchronizer(hw).process(seq)
  71. assert sum(d["blocks_duration"]) == pytest.approx(total_in)
  72. def test_split_keeps_adc_high_continuous(make_seq):
  73. """Split parts of an ADC block all stay HIGH (level, not per-block pulse)."""
  74. seq = make_seq([{"adc": True, "dur": 50e-3}])
  75. d = Synchronizer(_hw()).process(seq)
  76. high = [i for i, v in enumerate(d["gate_adc"]) if v == 1]
  77. # contiguous run, more than one part
  78. assert len(high) >= 2
  79. assert high == list(range(high[0], high[0] + len(high)))
  80. # --------------------------------------------------------------------------- #
  81. # Delay enable flags — no inserted block when off
  82. # --------------------------------------------------------------------------- #
  83. def test_tr_delay_flag_removes_inserted_block(make_seq):
  84. seq = make_seq([{"adc": True, "dur": 2e-3}])
  85. on = Synchronizer(_hw(TR_DELAY_ENABLED=True)).process(seq)
  86. off = Synchronizer(_hw(TR_DELAY_ENABLED=False)).process(seq)
  87. assert off["number_of_blocks"] == on["number_of_blocks"] - 1
  88. assert min(_ticks(off)) >= MIN_TICKS
  89. def test_rf_delay_flag_removes_inserted_block(make_seq):
  90. seq = make_seq([{"rf": True, "dur": 1e-3}])
  91. on = Synchronizer(_hw(RF_DELAY_ENABLED=True)).process(seq)
  92. off = Synchronizer(_hw(RF_DELAY_ENABLED=False)).process(seq)
  93. assert off["number_of_blocks"] == on["number_of_blocks"] - 1
  94. assert min(_ticks(off)) >= MIN_TICKS
  95. def test_start_delay_flag_uses_minimal_nonzero_start(make_seq):
  96. """START off keeps the (non-emitted) seed block, shrunk to the 20-tick min."""
  97. seq = make_seq([{"adc": True, "dur": 2e-3}])
  98. off = Synchronizer(_hw(START_DELAY_ENABLED=False)).process(seq)
  99. assert off["blocks_duration"][0] == pytest.approx(20 * off["synchro_block_timer"])
  100. assert min(_ticks(off)) >= MIN_TICKS
  101. def test_all_flags_off_no_artifacts(make_seq):
  102. seq = make_seq([{"rf": True, "dur": 1e-3},
  103. {"dur": 5e-3},
  104. {"adc": True, "dur": 2e-3}])
  105. d = Synchronizer(_hw(TR_DELAY_ENABLED=False,
  106. RF_DELAY_ENABLED=False,
  107. START_DELAY_ENABLED=False)).process(seq)
  108. ticks = _ticks(d)
  109. assert min(ticks) >= MIN_TICKS
  110. assert max(ticks) <= MAX_TICKS
  111. assert len(_array_lengths(d)) == 1