| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091 |
- from yattag import Doc, indent
- class XMLGenerator:
- """
- Генератор XML синхронизации в формате, совместимом с test1_full/srv_interp.py
- """
- def generate(self, sync_data: dict, path: str, hw):
- number_of_blocks = sync_data["number_of_blocks"]
- gate_rf = sync_data["gate_rf"]
- gate_tr_switch = sync_data["gate_tr_switch"]
- gate_adc = sync_data["gate_adc"]
- blocks_duration = sync_data["blocks_duration"]
- synchro_block_timer = sync_data["synchro_block_timer"]
- min_block_time = sync_data["min_block_time"]
- doc, tag, text = Doc().tagtext()
- with tag("root"):
- with tag("ParamCount"):
- text(number_of_blocks + 1)
- adc_times_values = []
- adc_times_starts = []
- # Массивы синхронизатора имеют ведущий seed-элемент на индексе 0
- # (стартовый блок), значимые блоки идут по индексам 1..nb. Seed
- # представлен заголовочным тегом (*1), поэтому в цикле эмитим
- # элементы 1..nb: block_iter k -> индекс массива k + 1.
- # (Совпадает с эталонной логикой srv_interp.synchronization.)
- with tag("RF"):
- with tag("RF1"):
- text(0)
- for rf_iter in range(number_of_blocks):
- with tag("RF" + str(rf_iter + 2)):
- text(gate_rf[rf_iter + 1])
- with tag("SW"):
- with tag("SW1"):
- text(1)
- for sw_iter in range(number_of_blocks):
- with tag("SW" + str(sw_iter + 2)):
- text(gate_tr_switch[sw_iter + 1])
- with tag("ADC"):
- with tag("ADC1"):
- text(0)
- for adc_iter in range(number_of_blocks):
- idx = adc_iter + 1
- # Непрерывный участок HIGH (в т.ч. собранный из разбитых
- # блоков) считается одним событием ADC: засекаем фронт и
- # суммируем длительность всего участка для points-окна.
- is_rising_edge = (
- gate_adc[idx] == 1 and gate_adc[idx - 1] == 0
- )
- if is_rising_edge:
- adc_times_starts.append(sum(blocks_duration[0:idx]))
- run_duration = 0
- run_iter = idx
- while (run_iter < len(gate_adc)
- and gate_adc[run_iter] == 1):
- run_duration += blocks_duration[run_iter]
- run_iter += 1
- adc_times_values.append(run_duration)
- with tag("ADC" + str(adc_iter + 2)):
- text(gate_adc[idx])
- with tag("GR"):
- with tag("GR1"):
- text(1)
- for gr_iter in range(number_of_blocks):
- # Последнее событие импульсной последовательности
- # переводим из LOW в HIGH (включение градиентной системы)
- gr_value = 1 if gr_iter == number_of_blocks - 1 else 0
- with tag("GR" + str(gr_iter + 2)):
- text(gr_value)
- with tag("CL"):
- with tag("CL1"):
- text(int(min_block_time / synchro_block_timer))
- for cl_iter in range(number_of_blocks):
- with tag("CL" + str(cl_iter + 2)):
- text(int(blocks_duration[cl_iter + 1] / synchro_block_timer))
- xml_string = indent(doc.getvalue(), indentation=" " * 4, newline="\r")
- with open(path, "w", encoding="utf-8") as f:
- f.write(xml_string)
- return adc_times_values, adc_times_starts
|