# -*- coding: utf-8 -*- """ Created on 05/09/2024 @author: spacexer """ from LF_scanner import pypulseq as pp import numpy as np from types import SimpleNamespace import json from yattag import Doc, indent def seq_file_input(seq_file_name="empty.seq"): seq_input = pp.Sequence() seq_input.read(file_path=seq_file_name) seq_output_dict = seq_input.waveforms_export() return seq_input, seq_output_dict def output_seq(dict, param, path='test1/'): """ The interpretation from pypulseq format of sequence to the files needed to analog part of MRI :param dict: Dictionary of the impulse sequence pypulseq provided :return: files in "grad_output/" directory of every type of amplitudes and time points """ ''' Gradient ''' loc_t_gx = gradient_time_convertation(param, dict['t_gx']) loc_t_gy = gradient_time_convertation(param, dict['t_gy']) loc_t_gz = gradient_time_convertation(param, dict['t_gz']) loc_gx = gradient_ampl_convertation(param, dict['gx']) loc_gy = gradient_ampl_convertation(param, dict['gy']) loc_gz = gradient_ampl_convertation(param, dict['gz']) gx_out = duplicates_delete(np.transpose([loc_t_gx, loc_gx])) gy_out = duplicates_delete(np.transpose([loc_t_gy, loc_gy])) gz_out = duplicates_delete(np.transpose([loc_t_gz, loc_gz])) np.savetxt(path + 'gx.txt', gx_out, fmt='%10.0f') np.savetxt(path + 'gy.txt', gy_out, fmt='%10.0f') np.savetxt(path + 'gz.txt', gz_out, fmt='%10.0f') ''' Radio ''' rf_raster_local = param['rf_raster_time'] rf_out = radio_ampl_convertation(dict["rf"], rf_raster=rf_raster_local) file_rf = open(path + 'rf_' + str(rf_raster_local) + '_raster.bin', "wb") for byte in rf_out: file_rf.write(byte.to_bytes(1, byteorder='big', signed=1)) file_rf.close() ''' for radiofreq tests ''' # np.savetxt(path + 'rf_time.txt', np.transpose(dict["t_rf"])) # np.savetxt(path + 'rf_ampl.txt', np.transpose(dict["rf"])) # plt.plot(dict["t_rf"][0:2000], np.real(dict["rf"][0:2000]), label="real") # plt.plot(dict["t_rf"][0:2000], np.imag(dict["rf"][0:2000]), label="image") # plt.legend() # plt.show() def radio_ampl_convertation(rf_ampl, rf_raster=1e-6): #TODO: sampling resize to raster different with seqgen out_rf_list = [] rf_ampl_raster = 127 rf_ampl_maximum = np.abs(max(rf_ampl)) proportional_cf_rf = rf_ampl_raster / rf_ampl_maximum for rf_iter in range(len(rf_ampl)): out_rf_list.append(round(rf_ampl[rf_iter].real * proportional_cf_rf)) out_rf_list.append(round(rf_ampl[rf_iter].imag * proportional_cf_rf)) return out_rf_list def duplicates_delete(loc_list): new_list = [[0] * 2] for i in range(len(loc_list)): if loc_list[i][0] not in np.transpose(new_list)[0]: new_list.append(loc_list[i]) return new_list def gradient_time_convertation(param_loc, time_sample): g_raster_time = param_loc['grad_raster_time'] time_sample /= g_raster_time return time_sample def gradient_ampl_convertation(param, gradient_herz): """ Helper function that convert amplitudes to dimensionless format for machine 1 bit for sign, 15 bits of numbers :param gradient_herz: 2D array of amplitude and time points in Hz/m :return: gradient_dimless: 2D array of dimensionless points """ # amplitude raster is 32768 # maximum grad = 10 mT/m # artificial gap is 1 mT/m so 9 mT/m is now should be split in parts amplitude_max = param['G_amp_max'] amplitude_raster = 32767 step_Hz_m = amplitude_max / amplitude_raster # Hz/m step gradient gradient_dimless = gradient_herz / step_Hz_m * 1000 # assert abs(any(gradient_dimless)) > 32768, 'Amplitude is higher than expected, check the rate number' return gradient_dimless def adc_correction(blocks_number_loc, seq_input_loc): """ Helper function that rise times for correction of ADC events Вспомогательная функция получения времён для коррекции АЦП событий :return: rise_time: float, stores in pulseq, related to exact type of gradient events хранится в pulseq, связан с конкретным типом градиентного события fall_time: float, same as rise_time аналогично rise_time """ rise_time, fall_time = None, None is_adc_inside = False for j in range(blocks_number_loc - 1): iterable_block = seq_input_loc.get_block(block_index=j + 1) if iterable_block.adc is not None: is_adc_inside = True rise_time = iterable_block.gx.rise_time fall_time = iterable_block.gx.fall_time if not is_adc_inside: raise Exception("No ADC event found inside sequence") return rise_time, fall_time def adc_event_edges(local_gate_adc): """ Helper function that rise numbers of blocks of border correction of ADC events Вспомогательная функция для получения номеров блоков границ коррекции АЦП событий :return: num_begin_l: int, number of time block when adc event starts номер временного блока начала АЦП события num_finish_l: int, same but ends то же, но для окончания """ num_begin_l = 0 flag_begin = False flag_finish = False num_finish_l = 1 for k in range(len(local_gate_adc) - 1): if local_gate_adc[k] != 0 and not flag_begin: num_begin_l = k flag_begin = True if local_gate_adc[k] != 0 and local_gate_adc[k + 1] == 0 and not flag_finish: num_finish_l = k flag_finish = True return num_begin_l, num_finish_l def synchronization(sync_sequence, synchro_block_timer=20e-9, path='test1/', TR_DELAY_L=800e-9, RF_DELAY_L=800e-9, START_DELAY_L=800e-9): ### MAIN LOOP ### ### ОСНОВНОЙ ЦИКЛ### MIN_BLOCK_TIME = 400e-9 assert START_DELAY_L >= RF_DELAY_L assert TR_DELAY_L >= synchro_block_timer assert RF_DELAY_L >= synchro_block_timer number_of_blocks = len(sync_sequence.block_events) gate_adc = [0] gate_rf = [0] * CONST_HACK_RF_DELAY gate_tr_switch = [1] blocks_duration = [START_DELAY_L] adc_times_values = [] adc_times_starts = [] ''' ID RF GX GY GZ ADC EXT 0 1 2 3 4 5 6 ''' added_blocks = 0 for block_counter in range(number_of_blocks): is_not_adc_block = True if sync_sequence.block_events[block_counter + 1][5]: is_not_adc_block = False gate_adc.append(0) gate_rf.append(gate_rf[-1]) blocks_duration[-1] -= TR_DELAY_L blocks_duration.append(TR_DELAY_L) gate_tr_switch.append(0) added_blocks += 1 gate_adc.append(1) gate_tr_switch.append(0) else: gate_tr_switch.append(1) gate_adc.append(0) if sync_sequence.block_events[block_counter + 1][1] and is_not_adc_block: gate_rf.append(1) gate_adc.append(gate_adc[-1]) blocks_duration[-1] -= RF_DELAY_L blocks_duration.append(RF_DELAY_L) gate_tr_switch.append(gate_tr_switch[-1]) added_blocks += 1 gate_rf.append(1) else: gate_rf.append(0) current_block_dur = sync_sequence.block_durations[block_counter + 1] blocks_duration.append(current_block_dur) number_of_blocks += added_blocks # gate_gx = [1] * number_of_blocks # gate_gy = [1] * number_of_blocks # gate_gz = [1] * number_of_blocks ''' test1 swap ''' # assert any(block_times) < MIN_BLOCK_TIME, "ERROR: events in the current sequence are less than 400 ns" doc, tag, text = Doc().tagtext() with tag('root'): with tag('ParamCount'): text(number_of_blocks) with tag('RF'): for RF_iter in range(number_of_blocks): with tag('RF' + str(RF_iter + 1)): text(gate_rf[RF_iter]) with tag('SW'): for SW_iter in range(number_of_blocks): with tag('SW' + str(SW_iter + 1)): text(gate_tr_switch[SW_iter]) with tag('ADC'): for ADC_iter in range(number_of_blocks): if gate_adc[ADC_iter] == 1: adc_times_values.append(blocks_duration[ADC_iter]) adc_times_starts.append(sum(blocks_duration[0:ADC_iter])) with tag('ADC' + str(ADC_iter + 1)): text(gate_adc[ADC_iter]) with tag('GR'): with tag('GR1'): text(1) for GX_iter in range(1, number_of_blocks): with tag('GR'+ str(GX_iter + 1)): text(0) with tag('CL'): with tag('CL' + str(1)): text(int(MIN_BLOCK_TIME / synchro_block_timer)) for CL_iter in range(1, number_of_blocks): with tag('CL' + str(CL_iter + 1)): text(int(blocks_duration[CL_iter] / synchro_block_timer)) result = indent( doc.getvalue(), indentation=' ' * 4, newline='\r' ) sync_file = open(path + "sync_v2.xml", "w") sync_file.write(result) sync_file.close() picoscope_set(adc_times_values, adc_times_starts) def picoscope_set(adc_val, adc_start, number_of_channels_l=8, sampling_freq_l=4e7, path='test1/'): # sampling rate = 40 MHz = 4e7 1/s # adc_val in seconds adc_out_timings = [] for i in adc_val: adc_out_timings.append(int(i * sampling_freq_l)) doc, tag, text, line = Doc().ttl() with tag('root'): with tag('points'): with tag('title'): text("Points") with tag('value'): text(str(adc_out_timings)) with tag('num_of_channels'): with tag('title'): text("Number of Channels") with tag('value'): text(number_of_channels_l) with tag('times'): with tag('title'): text("Times") with tag('value'): text(str(adc_start).format('%.e')) with tag('sample_freq'): with tag('title'): text("Sample Frequency") with tag('value'): text(sampling_freq_l) result = indent( doc.getvalue(), indentation=' ' * 4, newline='\r' ) sync_file = open(path + "picoscope_params.xml", "w") sync_file.write(result) sync_file.close() if __name__ == "__main__": CONST_HACK_RF_DELAY = 17 * 2 * 2 SEQ_INPUT, SEQ_DICT = seq_file_input(seq_file_name='sequences/turbo_FLASH_060924_0444.seq') # SEQ_INPUT, SEQ_DICT = seq_file_input(seq_file_name='sequences/test1_full.seq') params_path = 'sequences/' params_filename = "turbo_FLASH_060924_0444" # params_filename = "test1_full" file = open(params_path + params_filename + ".json", 'r') SEQ_PARAM = json.load(file) file.close() ''' integartion of srv_seq_gen ''' # SEQ_PARAM = set_limits() # SEQ_INPUT = save_param() # SEQ_DICT = SEQ_INPUT.waveforms_export() ''' simulation of inputing the JSON and SEQ ''' # artificial delays due to construction of the MRI # искусственные задержки из-за тех. особенностей МРТ # RF_dtime = 10 * 1e-6 # TR_dtime = 10 * 1e-6 time_info = SEQ_INPUT.duration() blocks_number = time_info[1] time_dur = time_info[0] # output interpretation. all formats of files defined in method # интерпретация выхода. Все форматы файлов определены в методе output_seq(SEQ_DICT, SEQ_PARAM) # defining constants of the sequence # определение констант последовательности local_definitions = SEQ_INPUT.definitions ADC_raster = local_definitions['AdcRasterTime'] RF_raster = local_definitions['RadiofrequencyRasterTime'] synchronization(SEQ_INPUT)