Kaynağa Gözat

minor changes

Nikita Babich 7 ay önce
ebeveyn
işleme
10ae5da2da
1 değiştirilmiş dosya ile 127 ekleme ve 130 silme
  1. 127 130
      1_block_synchronizer.py

+ 127 - 130
1_block_synchronizer.py

@@ -1,15 +1,22 @@
+# synchronizer : converts Pulseq (.seq) files into sequences of amplitude, time and synchro sets.
+# Output is given by MRI machine
+# Babich Nikita, Kozin Roman, Karsakov  Grigory
+# March 2024
+
 import numpy as np
-import json
 from matplotlib import pyplot as plt
 from pulseq_fixed import sequence_fixed as puls_fix
 
-seq_file = "seq_store/SE_rfdeath_5000.seq"
-seq_input = puls_fix.Sequence()
-seq_input.read(file_path=seq_file)
-seq_output_dict = seq_input.waveforms_export(time_range=(0, 3))
-
 
 def output_seq(dict):
+    """
+    The interpretation from pypulseq format of sequence to the files needed to analog part of MRI
+
+    :param dict: Dictionary of the impulse sequence pypulseq provided
+
+    :return: files in "data_output_seq/" directory of every type of amplitudes and time points
+
+    """
     loc_t_adc = dict['t_adc']
     loc_t_rf = dict['t_rf']
     loc_t_rf_centers = dict['t_rf_centers']
@@ -60,77 +67,13 @@ def output_seq(dict):
         f.write(data)
 
 
-output_seq(seq_output_dict)
-
-# added type check in Sequence.block, read does not make an empty variable with a type
-# is there the other way to do it?
-# print(seq_output_dict['gx'])
-# Engage what exactly every array means
-# print(seq_input.waveforms_and_times())
-# plt.plot()
-# plt.show()
-
-# print(seq_output_dict)
-# t_adc t_rf t_rf_centers t_gx t_gy t_gz adc rf rf_centers gx gy gz
-# seq_input.plot()
-
-# plt.plot(seq_output_dict['t_rf'], seq_output_dict['rf'])
-# plt.show()
-# plt.plot(seq_output_dict['t_adc'], seq_output_dict['adc'])
-# plt.show()
-
-
-local_definitions = seq_input.definitions
-ADC_raster = local_definitions['AdcRasterTime']
-RF_raster = local_definitions['RadiofrequencyRasterTime']
-
-RF_dtime = 100 * 1e-6
-TR_dtime = 100 * 1e-6
-# artificial delays
-
-time_info = seq_input.duration()
-blocks_number = time_info[1]
-time_dur = time_info[0]
-time_step = 20 * 1e-9
-N_samples = int(time_dur / time_step)
-# TODO: why two times bigger? what effort on output
-time_sample = np.linspace(0, time_dur, N_samples)
-
-gate_adc = np.zeros(N_samples)
-gate_rf = np.zeros(N_samples)
-gate_tr_switch = np.ones(N_samples)
-gate_gx = np.zeros(N_samples)
-gate_gy = np.zeros(N_samples)
-gate_gz = np.zeros(N_samples)
-
-local_delay_rf = RF_dtime
-local_delay_tr = TR_dtime
-local_raster_time = time_step
-
-# TODO: function defining beginning and ending of the RF events
-RF_assintant = [seq_output_dict['t_rf'][0] - RF_dtime, seq_output_dict['t_rf'][-1]]
-
-
-def gates_output(gates, synchro_impulse=20 * 1e-9):
-    for i_loc in range(len(gates['gx'])):
-        a = 1
-    with open('data_output/tr_switch.txt', 'w') as f:
-        data = str(tuple(gates['tr_switch']))
-        f.write(data)
-    with open('data_output/rf.txt', 'w') as f:
-        data = str(tuple(gates['rf']))
-        f.write(data)
-    with open('data_output/adc.txt', 'w') as f:
-        data = str(tuple(gates['adc']))
-        f.write(data)
-    data = {'gate_gx': tuple(gates['gx']),
-            'gate_gy': tuple(gates['gy']),
-            'gate_gz': tuple(gates['gz'])}
-    with open('data_output/gradient_gates.json', 'w') as outfile:
-        json.dump(data, outfile)
-
-
 def adc_correction():
+    """
+    Helper function that rise times for correction of ADC events
+
+    :return:    rise_time: float, stores in pulseq, related to exact type of gradient events
+                fall_time: float, same as rise_time
+    """
     rise_time, fall_time = None, None
     is_adc_inside = False
     for j in range(blocks_number - 1):
@@ -145,6 +88,12 @@ def adc_correction():
 
 
 def adc_event_edges(local_gate_adc):
+    """
+    Helper function that rise numbers of blocks of border  correction of ADC events
+
+    :return:    num_begin_l:    int, number of time block when adc event starts
+                num_finish_l:   int, same but ends
+    """
     num_begin_l = 0
     flag_begin = False
     flag_finish = False
@@ -159,59 +108,107 @@ def adc_event_edges(local_gate_adc):
     return num_begin_l, num_finish_l
 
 
-for i in range(N_samples):
-    # delaying of RF event for time period of local delay
-    if RF_assintant[0] - RF_raster < time_sample[i] < RF_assintant[0] + RF_raster:
-        RF_stop = int(RF_assintant[1] / time_step)
-        gate_rf[i:RF_stop] = 1.0
-        var = 1
-    # mandatory disabling of RF gate due to ADC work same time
-    gate_rf_2 = map(lambda x: time_sample[i] - ADC_raster < x < time_sample[i] + ADC_raster and 1 or 0,
-                    seq_output_dict['t_adc'])
-    if np.any(np.array(list(gate_rf_2)) > 0):
-        gate_rf[i] = 0.0
-    # TR switch with own delay before ADC turning
-    gate_tr_1 = map(lambda x: time_sample[i] - ADC_raster < x < time_sample[i] + ADC_raster and 1 or 0,
-                    seq_output_dict['t_adc'])
-    if np.any(np.array(list(gate_tr_1)) > 0):
-        block_delay_tr = int(local_delay_tr / time_step)
-        gate_tr_switch[i - block_delay_tr:i + 1] = 0.0
-    # first step of ADC gate - enabling
-    # TODO: ADC gate feeling gradients form of rise and fall
-    gate_adc_1 = map(lambda x: time_sample[i] - ADC_raster < x < time_sample[i] + ADC_raster and 1 or 0,
-                     seq_output_dict['t_adc'])
-    if np.any(np.array(list(gate_adc_1)) > 0):
-        gate_adc[i] = 1.0
-
-# adc correction sue to rise and fall time of gradient
-# defining time that ADC need to be disabled during of
-rise_time_loc, fall_time_loc = adc_correction()
-num_beg, num_fin = adc_event_edges(gate_adc)
-rise_time_tick = int(rise_time_loc / time_step)
-fall_time_tick = int(rise_time_loc / time_step)
-gate_adc[num_beg:num_beg + rise_time_tick] = 0.0
-gate_adc[num_fin - fall_time_tick:num_fin + 1] = 0.0
-
-gates_release = {"adc": gate_adc,
-                 "rf": gate_rf,
-                 "tr_switch": gate_tr_switch,
-                 "gx": gate_gx,
-                 "gy": gate_gy,
-                 "gz": gate_gz}
-
-plt.plot(seq_output_dict['t_gx'][:int(N_samples)], seq_output_dict['gx'][:int(N_samples)])
-plt.plot(seq_output_dict['t_gy'][:int(N_samples)], seq_output_dict['gy'][:int(N_samples)])
-plt.plot(seq_output_dict['t_gz'][:int(N_samples)], seq_output_dict['gz'][:int(N_samples)])
-plt.savefig("plots_output/gradients.png")
-plt.show()
-
-plt.plot(seq_output_dict['t_gx'][:int(N_samples)], seq_output_dict['gx'][:int(N_samples)] / 720)
-plt.plot(time_sample[:int(N_samples)], gate_adc[:int(N_samples)], label='ADC gate')
-plt.plot(time_sample[:int(N_samples)], gate_tr_switch[:int(N_samples)], label='TR switch')
-plt.plot(seq_output_dict['t_rf'], seq_output_dict['rf'] / 210, label='RF signal')
-plt.plot(time_sample[:int(N_samples)], gate_rf[:int(N_samples)], label='RF gate')
-plt.legend()
-plt.savefig("plots_output/synchro_pulse.png")
-plt.show()
+def synchronization(N_samples):
+    ### MAIN LOOP ###
+    for i in range(N_samples):
+        # delaying of RF event for time period of local delay
+        if RF_assintant[0] - RF_raster < time_sample[i] < RF_assintant[0] + RF_raster:
+            RF_stop = int(RF_assintant[1] / time_step)
+            gate_rf[i:RF_stop] = 1.0
+            var = 1
+
+        # mandatory disabling of RF gate due to ADC work same time
+        gate_rf_2 = map(lambda x: time_sample[i] - ADC_raster < x < time_sample[i] + ADC_raster and 1 or 0,
+                        seq_output_dict['t_adc'])
+        if np.any(np.array(list(gate_rf_2)) > 0):
+            gate_rf[i] = 0.0
+
+        # TR switch with own delay before ADC turning
+        gate_tr_1 = map(lambda x: time_sample[i] - ADC_raster < x < time_sample[i] + ADC_raster and 1 or 0,
+                        seq_output_dict['t_adc'])
+        if np.any(np.array(list(gate_tr_1)) > 0):
+            block_delay_tr = int(local_delay_tr / time_step)
+            gate_tr_switch[i - block_delay_tr:i + 1] = 0.0
+
+        # first step of ADC gate - enabling
+        gate_adc_1 = map(lambda x: time_sample[i] - ADC_raster < x < time_sample[i] + ADC_raster and 1 or 0,
+                         seq_output_dict['t_adc'])
+        if np.any(np.array(list(gate_adc_1)) > 0):
+            gate_adc[i] = 1.0
+
+    # adc correction sue to rise and fall time of gradient
+    # defining time that ADC need to be disabled during of
+    rise_time_loc, fall_time_loc = adc_correction()
+    num_beg, num_fin = adc_event_edges(gate_adc)
+    rise_time_tick = int(rise_time_loc / time_step)
+    fall_time_tick = int(rise_time_loc / time_step)
+    gate_adc[num_beg:num_beg + rise_time_tick] = 0.0
+    gate_adc[num_fin - fall_time_tick:num_fin + 1] = 0.0
+
+    gates_release = {"adc": gate_adc,
+                     "rf": gate_rf,
+                     "tr_switch": gate_tr_switch,
+                     "gx": gate_gx,
+                     "gy": gate_gy,
+                     "gz": gate_gz}
+
 
 # gates_output(gates_release)
+
+
+if __name__ == '__main__':
+    print('')
+    seq_file = "seq_store/SE_rfdeath_5000.seq"
+    seq_input = puls_fix.Sequence()
+    seq_input.read(file_path=seq_file)
+    seq_output_dict = seq_input.waveforms_export(time_range=(0, 3))
+
+    # artificial delays due to construction of the MRI
+    RF_dtime = 100 * 1e-6
+    TR_dtime = 100 * 1e-6
+
+    time_info = seq_input.duration()
+    blocks_number = time_info[1]
+    time_dur = time_info[0]
+    time_step = 20 * 1e-9
+    N_samples = int(time_dur / time_step)
+    time_sample = np.linspace(0, time_dur, N_samples)
+
+    # output interpretation. all formats of files defined in method
+    output_seq(seq_output_dict)
+
+    # defining constants of the sequence
+    local_definitions = seq_input.definitions
+    ADC_raster = local_definitions['AdcRasterTime']
+    RF_raster = local_definitions['RadiofrequencyRasterTime']
+
+    gate_adc = np.zeros(N_samples)
+    gate_rf = np.zeros(N_samples)
+    gate_tr_switch = np.ones(N_samples)
+    gate_gx = np.zeros(N_samples)
+    gate_gy = np.zeros(N_samples)
+    gate_gz = np.zeros(N_samples)
+
+    local_delay_rf = RF_dtime
+    local_delay_tr = TR_dtime
+    local_raster_time = time_step
+
+    RF_assintant = [seq_output_dict['t_rf'][0] - RF_dtime, seq_output_dict['t_rf'][-1]]
+
+    synchronization(N_samples)
+
+    # testing plots for synchronization
+    plt.plot(seq_output_dict['t_gx'][:int(N_samples)], seq_output_dict['gx'][:int(N_samples)])
+    plt.plot(seq_output_dict['t_gy'][:int(N_samples)], seq_output_dict['gy'][:int(N_samples)])
+    plt.plot(seq_output_dict['t_gz'][:int(N_samples)], seq_output_dict['gz'][:int(N_samples)])
+    plt.savefig("plots_output/gradients.png")
+    plt.show()
+
+    plt.plot(seq_output_dict['t_gx'][:int(N_samples)], seq_output_dict['gx'][:int(N_samples)] / 720)
+    plt.plot(time_sample[:int(N_samples)], gate_adc[:int(N_samples)], label='ADC gate')
+    plt.plot(time_sample[:int(N_samples)], gate_tr_switch[:int(N_samples)], label='TR switch')
+    plt.plot(seq_output_dict['t_rf'], seq_output_dict['rf'] / 210, label='RF signal')
+    plt.plot(time_sample[:int(N_samples)], gate_rf[:int(N_samples)], label='RF gate')
+    plt.legend()
+    plt.savefig("plots_output/synchro_pulse.png")
+    plt.show()