| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348 |
- # -*- coding: utf-8 -*-
- """
- Created on 05/09/2024
- @author: spacexer
- """
- from LF_scanner import pypulseq as pp
- import numpy as np
- from types import SimpleNamespace
- import json
- from yattag import Doc, indent
- def seq_file_input(seq_file_name="empty.seq"):
- seq_input = pp.Sequence()
- seq_input.read(file_path=seq_file_name)
- seq_output_dict = seq_input.waveforms_export()
- return seq_input, seq_output_dict
- def output_seq(dict, param, path='test1/'):
- """
- The interpretation from pypulseq format of sequence to the files needed to analog part of MRI
- :param dict: Dictionary of the impulse sequence pypulseq provided
- :return: files in "grad_output/" directory of every type of amplitudes and time points
- """
- '''
- Gradient
- '''
- loc_t_gx = gradient_time_convertation(param, dict['t_gx'])
- loc_t_gy = gradient_time_convertation(param, dict['t_gy'])
- loc_t_gz = gradient_time_convertation(param, dict['t_gz'])
- loc_gx = gradient_ampl_convertation(param, dict['gx'])
- loc_gy = gradient_ampl_convertation(param, dict['gy'])
- loc_gz = gradient_ampl_convertation(param, dict['gz'])
- gx_out = duplicates_delete(np.transpose([loc_t_gx, loc_gx]))
- gy_out = duplicates_delete(np.transpose([loc_t_gy, loc_gy]))
- gz_out = duplicates_delete(np.transpose([loc_t_gz, loc_gz]))
- np.savetxt(path + 'gx.txt', gx_out, fmt='%10.0f')
- np.savetxt(path + 'gy.txt', gy_out, fmt='%10.0f')
- np.savetxt(path + 'gz.txt', gz_out, fmt='%10.0f')
- '''
- Radio
- '''
- rf_raster_local = param['rf_raster_time']
- rf_out = radio_ampl_convertation(dict["rf"], rf_raster=rf_raster_local)
- file_rf = open(path + 'rf_' + str(rf_raster_local) + '_raster.bin', "wb")
- for byte in rf_out:
- file_rf.write(byte.to_bytes(1, byteorder='big', signed=1))
- file_rf.close()
- '''
- for radiofreq tests
- '''
- # np.savetxt(path + 'rf_time.txt', np.transpose(dict["t_rf"]))
- # np.savetxt(path + 'rf_ampl.txt', np.transpose(dict["rf"]))
- # plt.plot(dict["t_rf"][0:2000], np.real(dict["rf"][0:2000]), label="real")
- # plt.plot(dict["t_rf"][0:2000], np.imag(dict["rf"][0:2000]), label="image")
- # plt.legend()
- # plt.show()
- def radio_ampl_convertation(rf_ampl, rf_raster=1e-6):
- #TODO: sampling resize to raster different with seqgen
- out_rf_list = []
- rf_ampl_raster = 127
- rf_ampl_maximum = np.abs(max(rf_ampl))
- proportional_cf_rf = rf_ampl_raster / rf_ampl_maximum
- for rf_iter in range(len(rf_ampl)):
- out_rf_list.append(round(rf_ampl[rf_iter].real * proportional_cf_rf))
- out_rf_list.append(round(rf_ampl[rf_iter].imag * proportional_cf_rf))
- return out_rf_list
- def duplicates_delete(loc_list):
- new_list = [[0] * 2]
- for i in range(len(loc_list)):
- if loc_list[i][0] not in np.transpose(new_list)[0]:
- new_list.append(loc_list[i])
- return new_list
- def gradient_time_convertation(param_loc, time_sample):
- g_raster_time = param_loc['grad_raster_time']
- time_sample /= g_raster_time
- return time_sample
- def gradient_ampl_convertation(param, gradient_herz):
- """
- Helper function that convert amplitudes to dimensionless format for machine
- 1 bit for sign, 15 bits of numbers
- :param gradient_herz: 2D array of amplitude and time points in Hz/m
- :return: gradient_dimless: 2D array of dimensionless points
- """
- # amplitude raster is 32768
- # maximum grad = 10 mT/m
- # artificial gap is 1 mT/m so 9 mT/m is now should be split in parts
- amplitude_max = param['G_amp_max']
- amplitude_raster = 32767
- step_Hz_m = amplitude_max / amplitude_raster # Hz/m step gradient
- gradient_dimless = gradient_herz / step_Hz_m * 1000
- # assert abs(any(gradient_dimless)) > 32768, 'Amplitude is higher than expected, check the rate number'
- return gradient_dimless
- def adc_correction(blocks_number_loc, seq_input_loc):
- """
- Helper function that rise times for correction of ADC events
- Вспомогательная функция получения времён для коррекции АЦП событий
- :return: rise_time: float, stores in pulseq, related to exact type of gradient events
- хранится в pulseq, связан с конкретным типом градиентного события
- fall_time: float, same as rise_time
- аналогично rise_time
- """
- rise_time, fall_time = None, None
- is_adc_inside = False
- for j in range(blocks_number_loc - 1):
- iterable_block = seq_input_loc.get_block(block_index=j + 1)
- if iterable_block.adc is not None:
- is_adc_inside = True
- rise_time = iterable_block.gx.rise_time
- fall_time = iterable_block.gx.fall_time
- if not is_adc_inside:
- raise Exception("No ADC event found inside sequence")
- return rise_time, fall_time
- def adc_event_edges(local_gate_adc):
- """
- Helper function that rise numbers of blocks of border correction of ADC events
- Вспомогательная функция для получения номеров блоков границ коррекции АЦП событий
- :return: num_begin_l: int, number of time block when adc event starts
- номер временного блока начала АЦП события
- num_finish_l: int, same but ends
- то же, но для окончания
- """
- num_begin_l = 0
- flag_begin = False
- flag_finish = False
- num_finish_l = 1
- for k in range(len(local_gate_adc) - 1):
- if local_gate_adc[k] != 0 and not flag_begin:
- num_begin_l = k
- flag_begin = True
- if local_gate_adc[k] != 0 and local_gate_adc[k + 1] == 0 and not flag_finish:
- num_finish_l = k
- flag_finish = True
- return num_begin_l, num_finish_l
- def synchronization(sync_sequence, synchro_block_timer=20e-9, path='test1/', TR_DELAY_L=800e-9, RF_DELAY_L=800e-9,
- START_DELAY_L=800e-9):
- ### MAIN LOOP ###
- ### ОСНОВНОЙ ЦИКЛ###
- MIN_BLOCK_TIME = 400e-9
- assert START_DELAY_L >= RF_DELAY_L
- assert TR_DELAY_L >= synchro_block_timer
- assert RF_DELAY_L >= synchro_block_timer
- number_of_blocks = len(sync_sequence.block_events)
- gate_adc = [0]
- gate_rf = [0] * CONST_HACK_RF_DELAY
- gate_tr_switch = [1]
- blocks_duration = [START_DELAY_L]
- adc_times_values = []
- adc_times_starts = []
- '''
- ID RF GX GY GZ ADC EXT
- 0 1 2 3 4 5 6
- '''
- added_blocks = 0
- for block_counter in range(number_of_blocks):
- is_not_adc_block = True
- if sync_sequence.block_events[block_counter + 1][5]:
- is_not_adc_block = False
- gate_adc.append(0)
- gate_rf.append(gate_rf[-1])
- blocks_duration[-1] -= TR_DELAY_L
- blocks_duration.append(TR_DELAY_L)
- gate_tr_switch.append(0)
- added_blocks += 1
- gate_adc.append(1)
- gate_tr_switch.append(0)
- else:
- gate_tr_switch.append(1)
- gate_adc.append(0)
- if sync_sequence.block_events[block_counter + 1][1] and is_not_adc_block:
- gate_rf.append(1)
- gate_adc.append(gate_adc[-1])
- blocks_duration[-1] -= RF_DELAY_L
- blocks_duration.append(RF_DELAY_L)
- gate_tr_switch.append(gate_tr_switch[-1])
- added_blocks += 1
- gate_rf.append(1)
- else:
- gate_rf.append(0)
- current_block_dur = sync_sequence.block_durations[block_counter + 1]
- blocks_duration.append(current_block_dur)
- number_of_blocks += added_blocks
- # gate_gx = [1] * number_of_blocks
- # gate_gy = [1] * number_of_blocks
- # gate_gz = [1] * number_of_blocks
- '''
- test1 swap
- '''
- # assert any(block_times) < MIN_BLOCK_TIME, "ERROR: events in the current sequence are less than 400 ns"
- doc, tag, text = Doc().tagtext()
- with tag('root'):
- with tag('ParamCount'):
- text(number_of_blocks)
- with tag('RF'):
- for RF_iter in range(number_of_blocks):
- with tag('RF' + str(RF_iter + 1)):
- text(gate_rf[RF_iter])
- with tag('SW'):
- for SW_iter in range(number_of_blocks):
- with tag('SW' + str(SW_iter + 1)):
- text(gate_tr_switch[SW_iter])
- with tag('ADC'):
- for ADC_iter in range(number_of_blocks):
- if gate_adc[ADC_iter] == 1:
- adc_times_values.append(blocks_duration[ADC_iter])
- adc_times_starts.append(sum(blocks_duration[0:ADC_iter]))
- with tag('ADC' + str(ADC_iter + 1)):
- text(gate_adc[ADC_iter])
- with tag('GR'):
- with tag('GR1'):
- text(1)
- for GX_iter in range(1, number_of_blocks):
- with tag('GR'+ str(GX_iter + 1)):
- text(0)
- with tag('CL'):
- with tag('CL' + str(1)):
- text(int(MIN_BLOCK_TIME / synchro_block_timer))
- for CL_iter in range(1, number_of_blocks):
- with tag('CL' + str(CL_iter + 1)):
- text(int(blocks_duration[CL_iter] / synchro_block_timer))
- result = indent(
- doc.getvalue(),
- indentation=' ' * 4,
- newline='\r'
- )
- sync_file = open(path + "sync_v2.xml", "w")
- sync_file.write(result)
- sync_file.close()
- picoscope_set(adc_times_values, adc_times_starts)
- def picoscope_set(adc_val, adc_start, number_of_channels_l=8, sampling_freq_l=4e7, path='test1/'):
- # sampling rate = 40 MHz = 4e7 1/s
- # adc_val in seconds
- adc_out_timings = []
- for i in adc_val:
- adc_out_timings.append(int(i * sampling_freq_l))
- doc, tag, text, line = Doc().ttl()
- with tag('root'):
- with tag('points'):
- with tag('title'):
- text("Points")
- with tag('value'):
- text(str(adc_out_timings))
- with tag('num_of_channels'):
- with tag('title'):
- text("Number of Channels")
- with tag('value'):
- text(number_of_channels_l)
- with tag('times'):
- with tag('title'):
- text("Times")
- with tag('value'):
- text(str(adc_start).format('%.e'))
- with tag('sample_freq'):
- with tag('title'):
- text("Sample Frequency")
- with tag('value'):
- text(sampling_freq_l)
- result = indent(
- doc.getvalue(),
- indentation=' ' * 4,
- newline='\r'
- )
- sync_file = open(path + "picoscope_params.xml", "w")
- sync_file.write(result)
- sync_file.close()
- if __name__ == "__main__":
- CONST_HACK_RF_DELAY = 17 * 2 * 2
- SEQ_INPUT, SEQ_DICT = seq_file_input(seq_file_name='sequences/turbo_FLASH_060924_0444.seq')
- # SEQ_INPUT, SEQ_DICT = seq_file_input(seq_file_name='sequences/test1_full.seq')
- params_path = 'sequences/'
- params_filename = "turbo_FLASH_060924_0444"
- # params_filename = "test1_full"
- file = open(params_path + params_filename + ".json", 'r')
- SEQ_PARAM = json.load(file)
- file.close()
- '''
- integartion of srv_seq_gen
- '''
- # SEQ_PARAM = set_limits()
- # SEQ_INPUT = save_param()
- # SEQ_DICT = SEQ_INPUT.waveforms_export()
- '''
- simulation of inputing the JSON and SEQ
- '''
- # artificial delays due to construction of the MRI
- # искусственные задержки из-за тех. особенностей МРТ
- # RF_dtime = 10 * 1e-6
- # TR_dtime = 10 * 1e-6
- time_info = SEQ_INPUT.duration()
- blocks_number = time_info[1]
- time_dur = time_info[0]
- # output interpretation. all formats of files defined in method
- # интерпретация выхода. Все форматы файлов определены в методе
- output_seq(SEQ_DICT, SEQ_PARAM)
- # defining constants of the sequence
- # определение констант последовательности
- local_definitions = SEQ_INPUT.definitions
- ADC_raster = local_definitions['AdcRasterTime']
- RF_raster = local_definitions['RadiofrequencyRasterTime']
- synchronization(SEQ_INPUT)
|