test_xml_generator.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. """
  2. Tests for XMLGenerator.generate — the sync_v2.xml writer.
  3. Covers the block-emission contract:
  4. * blocks are emitted for array indices 1..nb (seed at index 0 is represented
  5. by the header tag), so the last real block is never dropped;
  6. * ADC is reported as a single event per continuous HIGH run, and its window
  7. duration equals the full (un-truncated) intended duration;
  8. * the gradient system goes HIGH on the very last event;
  9. * every emitted CL value is within [20, 999999] ticks.
  10. """
  11. from __future__ import annotations
  12. import xml.etree.ElementTree as ET
  13. from seq_interp.src.core.synchronizer import Synchronizer
  14. from seq_interp.src.hardware.constraints import HardwareConstraints
  15. from seq_interp.src.interfaces.xml_generator import XMLGenerator
  16. MIN_TICKS = 20
  17. MAX_TICKS = 999999
  18. def _hw(**flags) -> HardwareConstraints:
  19. hw = HardwareConstraints()
  20. for k, v in flags.items():
  21. setattr(hw, k, v)
  22. return hw
  23. def _generate(make_seq, specs, tmp_path, **flags):
  24. """Run synchronizer + generator, return (parsed_channels, adc_vals, adc_starts)."""
  25. hw = _hw(**flags)
  26. sync_data = Synchronizer(hw).process(make_seq(specs))
  27. path = tmp_path / "sync_v2.xml"
  28. adc_vals, adc_starts = XMLGenerator().generate(sync_data, str(path), hw)
  29. root = ET.parse(str(path)).getroot()
  30. channels = {}
  31. for ch in ("RF", "SW", "ADC", "GR", "CL"):
  32. node = root.find(ch)
  33. channels[ch] = [int(child.text) for child in node]
  34. channels["ParamCount"] = int(root.find("ParamCount").text)
  35. return channels, adc_vals, adc_starts
  36. # --------------------------------------------------------------------------- #
  37. # Emission contract / off-by-one regression
  38. # --------------------------------------------------------------------------- #
  39. def test_param_count_and_channel_lengths(make_seq, tmp_path):
  40. ch, _, _ = _generate(
  41. make_seq,
  42. [{"rf": True, "dur": 1e-3}, {"adc": True, "dur": 2e-3}],
  43. tmp_path,
  44. )
  45. n = ch["ParamCount"]
  46. for name in ("RF", "SW", "ADC", "GR", "CL"):
  47. assert len(ch[name]) == n, name
  48. def test_trailing_adc_block_is_emitted(make_seq, tmp_path):
  49. """Regression: a sequence whose LAST block is ADC must still emit ADC=1."""
  50. ch, adc_vals, _ = _generate(make_seq, [{"adc": True, "dur": 2e-3}], tmp_path)
  51. assert sum(ch["ADC"]) >= 1 # the ADC HIGH is not dropped
  52. assert len(adc_vals) == 1 # exactly one ADC event
  53. # --------------------------------------------------------------------------- #
  54. # ADC single event + exact window
  55. # --------------------------------------------------------------------------- #
  56. def test_split_adc_single_event_full_window(make_seq, tmp_path):
  57. """A 50 ms ADC block split into parts is ONE event spanning the full 50 ms."""
  58. ch, adc_vals, _ = _generate(make_seq, [{"adc": True, "dur": 50e-3}], tmp_path)
  59. assert sum(ch["ADC"]) >= 2 # multiple HIGH columns (was split)
  60. assert len(adc_vals) == 1 # but a single ADC event / trigger
  61. assert adc_vals[0] == 50e-3 # full window, not a truncated part
  62. def test_adc_window_not_truncated_by_min_clamp(make_seq, tmp_path):
  63. """Min-clamp compensation must not steal duration from the ADC block."""
  64. _, adc_vals, _ = _generate(
  65. make_seq,
  66. [{"rf": True, "dur": 1e-3}, {"adc": True, "dur": 2e-3}],
  67. tmp_path,
  68. TR_DELAY_ENABLED=True,
  69. )
  70. assert len(adc_vals) == 1
  71. assert adc_vals[0] == 2e-3
  72. # --------------------------------------------------------------------------- #
  73. # Gradient system: last event HIGH
  74. # --------------------------------------------------------------------------- #
  75. def test_gradient_high_on_last_event(make_seq, tmp_path):
  76. ch, _, _ = _generate(
  77. make_seq,
  78. [{"rf": True, "dur": 1e-3}, {"dur": 5e-3}, {"adc": True, "dur": 2e-3}],
  79. tmp_path,
  80. )
  81. assert ch["GR"][-1] == 1 # last event HIGH
  82. assert all(v == 0 for v in ch["GR"][1:-1]) # all middle blocks LOW
  83. # --------------------------------------------------------------------------- #
  84. # CL bounds
  85. # --------------------------------------------------------------------------- #
  86. def test_cl_values_within_bounds(make_seq, tmp_path):
  87. ch, _, _ = _generate(
  88. make_seq,
  89. [{"rf": True, "dur": 1e-3}, {"adc": True, "dur": 50e-3}],
  90. tmp_path,
  91. )
  92. assert min(ch["CL"]) >= MIN_TICKS
  93. assert max(ch["CL"]) <= MAX_TICKS