1_block_synchronizer.py 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. # synchronizer : converts Pulseq (.seq) files into sequences of amplitude, time and synchro sets.
  2. # Output is given by MRI machine
  3. # Babich Nikita, Kozin Roman, Karsakov Grigory
  4. # March 2024
  5. import numpy as np
  6. from matplotlib import pyplot as plt
  7. from pulseq_fixed import sequence_fixed as puls_fix
  8. def output_seq(dict):
  9. """
  10. The interpretation from pypulseq format of sequence to the files needed to analog part of MRI
  11. :param dict: Dictionary of the impulse sequence pypulseq provided
  12. :return: files in "data_output_seq/" directory of every type of amplitudes and time points
  13. """
  14. loc_t_adc = dict['t_adc']
  15. loc_t_rf = dict['t_rf']
  16. loc_t_rf_centers = dict['t_rf_centers']
  17. loc_t_gx = dict['t_gx']
  18. loc_t_gy = dict['t_gy']
  19. loc_t_gz = dict['t_gz']
  20. loc_adc = dict['adc']
  21. loc_rf = dict['rf']
  22. loc_rf_centers = dict['rf_centers']
  23. loc_gx = dict['gx']
  24. loc_gy = dict['gy']
  25. loc_gz = dict['gz']
  26. with open('data_output_seq/t_adc.txt', 'w') as f:
  27. data = str(tuple(loc_t_adc))
  28. f.write(data)
  29. with open('data_output_seq/t_rf.txt', 'w') as f:
  30. data = str(tuple(loc_t_rf))
  31. f.write(data)
  32. with open('data_output_seq/t_rf_centers.txt', 'w') as f:
  33. data = str(tuple(loc_t_rf_centers))
  34. f.write(data)
  35. with open('data_output_seq/t_gx.txt', 'w') as f:
  36. data = str(tuple(loc_t_gx))
  37. f.write(data)
  38. with open('data_output_seq/t_gy.txt', 'w') as f:
  39. data = str(tuple(loc_t_gy))
  40. f.write(data)
  41. with open('data_output_seq/t_gz.txt', 'w') as f:
  42. data = str(tuple(loc_t_gz))
  43. f.write(data)
  44. with open('data_output_seq/adc.txt', 'w') as f:
  45. data = str(tuple(loc_adc))
  46. f.write(data)
  47. with open('data_output_seq/rf.txt', 'w') as f:
  48. data = str(tuple(loc_rf))
  49. f.write(data)
  50. with open('data_output_seq/rf_centers.txt', 'w') as f:
  51. data = str(tuple(loc_rf_centers))
  52. f.write(data)
  53. with open('data_output_seq/gx.txt', 'w') as f:
  54. data = str(tuple(loc_gx))
  55. f.write(data)
  56. with open('data_output_seq/gy.txt', 'w') as f:
  57. data = str(tuple(loc_gy))
  58. f.write(data)
  59. with open('data_output_seq/gz.txt', 'w') as f:
  60. data = str(tuple(loc_gz))
  61. f.write(data)
  62. def adc_correction():
  63. """
  64. Helper function that rise times for correction of ADC events
  65. :return: rise_time: float, stores in pulseq, related to exact type of gradient events
  66. fall_time: float, same as rise_time
  67. """
  68. rise_time, fall_time = None, None
  69. is_adc_inside = False
  70. for j in range(blocks_number - 1):
  71. iterable_block = seq_input.get_block(block_index=j + 1)
  72. if iterable_block.adc is not None:
  73. is_adc_inside = True
  74. rise_time = iterable_block.gx.rise_time
  75. fall_time = iterable_block.gx.fall_time
  76. if not is_adc_inside:
  77. raise Exception("No ADC event found inside sequence")
  78. return rise_time, fall_time
  79. def adc_event_edges(local_gate_adc):
  80. """
  81. Helper function that rise numbers of blocks of border correction of ADC events
  82. :return: num_begin_l: int, number of time block when adc event starts
  83. num_finish_l: int, same but ends
  84. """
  85. num_begin_l = 0
  86. flag_begin = False
  87. flag_finish = False
  88. num_finish_l = 1
  89. for k in range(len(local_gate_adc) - 1):
  90. if local_gate_adc[k] != 0 and not flag_begin:
  91. num_begin_l = k
  92. flag_begin = True
  93. if local_gate_adc[k] != 0 and local_gate_adc[k + 1] == 0 and not flag_finish:
  94. num_finish_l = k
  95. flag_finish = True
  96. return num_begin_l, num_finish_l
  97. def synchronization(N_samples):
  98. ### MAIN LOOP ###
  99. for i in range(N_samples):
  100. # delaying of RF event for time period of local delay
  101. if RF_assintant[0] - RF_raster < time_sample[i] < RF_assintant[0] + RF_raster:
  102. RF_stop = int(RF_assintant[1] / time_step)
  103. gate_rf[i:RF_stop] = 1.0
  104. var = 1
  105. # mandatory disabling of RF gate due to ADC work same time
  106. gate_rf_2 = map(lambda x: time_sample[i] - ADC_raster < x < time_sample[i] + ADC_raster and 1 or 0,
  107. seq_output_dict['t_adc'])
  108. if np.any(np.array(list(gate_rf_2)) > 0):
  109. gate_rf[i] = 0.0
  110. # TR switch with own delay before ADC turning
  111. gate_tr_1 = map(lambda x: time_sample[i] - ADC_raster < x < time_sample[i] + ADC_raster and 1 or 0,
  112. seq_output_dict['t_adc'])
  113. if np.any(np.array(list(gate_tr_1)) > 0):
  114. block_delay_tr = int(local_delay_tr / time_step)
  115. gate_tr_switch[i - block_delay_tr:i + 1] = 0.0
  116. # first step of ADC gate - enabling
  117. gate_adc_1 = map(lambda x: time_sample[i] - ADC_raster < x < time_sample[i] + ADC_raster and 1 or 0,
  118. seq_output_dict['t_adc'])
  119. if np.any(np.array(list(gate_adc_1)) > 0):
  120. gate_adc[i] = 1.0
  121. # adc correction sue to rise and fall time of gradient
  122. # defining time that ADC need to be disabled during of
  123. rise_time_loc, fall_time_loc = adc_correction()
  124. num_beg, num_fin = adc_event_edges(gate_adc)
  125. rise_time_tick = int(rise_time_loc / time_step)
  126. fall_time_tick = int(rise_time_loc / time_step)
  127. gate_adc[num_beg:num_beg + rise_time_tick] = 0.0
  128. gate_adc[num_fin - fall_time_tick:num_fin + 1] = 0.0
  129. gates_release = {"adc": gate_adc,
  130. "rf": gate_rf,
  131. "tr_switch": gate_tr_switch,
  132. "gx": gate_gx,
  133. "gy": gate_gy,
  134. "gz": gate_gz}
  135. # gates_output(gates_release)
  136. if __name__ == '__main__':
  137. print('')
  138. seq_file = "seq_store/SE_rfdeath_5000.seq"
  139. seq_input = puls_fix.Sequence()
  140. seq_input.read(file_path=seq_file)
  141. seq_output_dict = seq_input.waveforms_export(time_range=(0, 3))
  142. # artificial delays due to construction of the MRI
  143. RF_dtime = 100 * 1e-6
  144. TR_dtime = 100 * 1e-6
  145. time_info = seq_input.duration()
  146. blocks_number = time_info[1]
  147. time_dur = time_info[0]
  148. time_step = 20 * 1e-9
  149. N_samples = int(time_dur / time_step)
  150. time_sample = np.linspace(0, time_dur, N_samples)
  151. # output interpretation. all formats of files defined in method
  152. output_seq(seq_output_dict)
  153. # defining constants of the sequence
  154. local_definitions = seq_input.definitions
  155. ADC_raster = local_definitions['AdcRasterTime']
  156. RF_raster = local_definitions['RadiofrequencyRasterTime']
  157. gate_adc = np.zeros(N_samples)
  158. gate_rf = np.zeros(N_samples)
  159. gate_tr_switch = np.ones(N_samples)
  160. gate_gx = np.zeros(N_samples)
  161. gate_gy = np.zeros(N_samples)
  162. gate_gz = np.zeros(N_samples)
  163. local_delay_rf = RF_dtime
  164. local_delay_tr = TR_dtime
  165. local_raster_time = time_step
  166. RF_assintant = [seq_output_dict['t_rf'][0] - RF_dtime, seq_output_dict['t_rf'][-1]]
  167. synchronization(N_samples)
  168. # testing plots for synchronization
  169. plt.plot(seq_output_dict['t_gx'][:int(N_samples)], seq_output_dict['gx'][:int(N_samples)])
  170. plt.plot(seq_output_dict['t_gy'][:int(N_samples)], seq_output_dict['gy'][:int(N_samples)])
  171. plt.plot(seq_output_dict['t_gz'][:int(N_samples)], seq_output_dict['gz'][:int(N_samples)])
  172. plt.savefig("plots_output/gradients.png")
  173. plt.show()
  174. plt.plot(seq_output_dict['t_gx'][:int(N_samples)], seq_output_dict['gx'][:int(N_samples)] / 720)
  175. plt.plot(time_sample[:int(N_samples)], gate_adc[:int(N_samples)], label='ADC gate')
  176. plt.plot(time_sample[:int(N_samples)], gate_tr_switch[:int(N_samples)], label='TR switch')
  177. plt.plot(seq_output_dict['t_rf'], seq_output_dict['rf'] / 210, label='RF signal')
  178. plt.plot(time_sample[:int(N_samples)], gate_rf[:int(N_samples)], label='RF gate')
  179. plt.legend()
  180. plt.savefig("plots_output/synchro_pulse.png")
  181. plt.show()