import json import h5py import numpy as np from prompt_toolkit.key_binding.bindings.named_commands import self_insert from reco import * from seqData import SequenceDataFrame from service import ReconstructionService from traj import * class ReconstructionApp(): def __init__(self, name, digit, shift): super().__init__() self.name = name # type seq (linear|non|epi|radial) self.digit = digit # '2d' | '3d' self.phase_shift = shift # bool (сдвиг надо/ не надо) def start_reconstruction(self, path_raw_data, path_np_data_json, path_order_json): try: service = ReconstructionService() nX, nY, nZ, d_scans, nContr, spoil, ETL, N_TE, phi_wing, N_wings = self.read_consts(path_np_data_json) service.path_raw_data = path_raw_data raw_data = service.read_mat_file() print(raw_data.shape) sequence_data = SequenceDataFrame() sequence_data.read_raw_data(raw_data) sequence_data.read_dots(int(nX)) sequence_name = self.name if sequence_name == 'nonlinear_decart(tse)' or sequence_name == 'radial_propeller': k_space_order = self.read_order_from_json(path_order_json) self.k_space_order = k_space_order arr_mat = np.array(sequence_data.mat_df['dot']) digit = self.digit if not path_raw_data or not path_np_data_json: raise ValueError("Пожалуйста, выберите файлы и введите данные перед началом реконструкции.") else: json_data = path_np_data_json if digit == '2d': self.reconstruct_2d(sequence_name, arr_mat, json_data) elif digit == '3d': self.reconstruct_3d(sequence_name, arr_mat, json_data) except ValueError as ve: raise ValueError(str(ve)) except NotImplementedError as nie: raise NotImplementedError(str(nie)) except Exception as e: raise RuntimeError(f"Произошла ошибка в процессе реконструкции: {e}") def read_order_from_json(self, path_order_json): with open(path_order_json, 'r') as f: data = json.load(f) return data.get("k_space_order", []) def read_mat_file(self, path): with h5py.File(path, 'r') as f: key = list(f.keys())[0] with h5py.File(path, 'r') as f: data = f[key][:] return np.array(data) def get_value_or_default(self, json_obj, key, default_value=1): return int(json_obj.get(key, default_value)) def read_consts(self, path): with open(path, 'r') as file: data = json.load(file) d_scans = self.get_value_or_default(data, "D_scans", 1) Np = int(data['Np']) Nf = int(data['Nf']) sl_nb = int(data['sl_nb']) nContrast = self.get_value_or_default(data, "contrasts", 1) spoil = self.get_value_or_default(data, "RF_spoil", 0) ETL = self.get_value_or_default(data, 'ETL', 0) N_TE = self.get_value_or_default(data, 'N_TE', 0) phi_wing = self.get_value_or_default(data, "phi_wing", 0) N_wings = self.get_value_or_default(data, "N_wings", 0) return Np, Nf, sl_nb, d_scans, nContrast, spoil, ETL, N_TE, phi_wing, N_wings def read_consts_3d(self, path): with open(path, 'r') as file: data = json.load(file) d_scans = self.get_value_or_default(data, "D_scans", 1) Np = int(data['Np']) Nf = int(data['Nf']) sl_nb = int(data['Nss_image']) # слои в 3д if sl_nb % 2 == 1: sl_nb -= 1 nContrast = self.get_value_or_default(data, "contrasts", 1) spoil = self.get_value_or_default(data, "RF_spoil", 0) return Np, Nf, sl_nb, d_scans, nContrast, spoil def reconstruct_2d(self, sequence_name, mat_data, json_data): try: nX, nY, nZ, d_scans, nContr, spoil, ETL, N_TE, phi_wing, N_wings = self.read_consts(json_data) if self.phase_shift == False: spoil = 0 if sequence_name == "linear_decart": all_data = gather_data_along_trajectory(mat_data, nX, nY, nZ, nContr) save_ffft(all_data, spoil) elif sequence_name == "nonlinear_decart(tse)": all_data = gather_data_along_trajectory_nonlinear(mat_data, self.k_space_order, nX, nY, nZ, nContr) save_ffft(all_data, spoil) elif sequence_name == "linear_epi": all_data = gather_data_along_trajectory_linear_epi(mat_data, nX, nY, nZ, nContr) save_ffft(all_data, spoil) elif sequence_name == "radial_propeller": all_data = gather_data_along_trajectory_radial(mat_data, self.k_space_order, nX, nY, nZ, phi_wing, N_wings) save_ffft(all_data, spoil) else: raise NotImplementedError("Для данного типа ИП еще не добавлена функция.") except Exception as e: raise RuntimeError(f"Произошла ошибка в процессе реконструкции: {e}") def reconstruct_3d(self, sequence_name, mat_data, json_data): try: nX, nY, nZ, d_scans, nContr, spoil = self.read_consts_3d(json_data) if self.phase_shift == False: spoil = 0 if sequence_name == "linear_decart": all_data = gather_data_along_trajectory(mat_data, nX, nY, nZ, nContr) save_ffft_3d(all_data, spoil) elif sequence_name == "nonlinear_decart(tse)": all_data = gather_data_along_trajectory_nonlinear(mat_data, self.k_space_order, nX, nY, nZ, nContr) save_ffft_3d(all_data, spoil) else: raise NotImplementedError("Для данного типа ИП еще не добавлена функция.") except Exception as e: raise RuntimeError(f"Произошла ошибка в процессе реконструкции: {e}") class ReconstructionH5(): def __init__(self, name, digit, shift): super().__init__() self.name = name # type seq (linear|non|epi|radial) self.digit = digit # '2d' | '3d' self.phase_shift = shift # bool (сдвиг надо/ не надо) def start_reconstruction(self, path_raw_data, path_np_data_json, path_order_json): try: service = ReconstructionService() nX, nY, nZ, d_scans, nContr, spoil, ETL, N_TE, phi_wing, N_wings = self.read_consts(path_np_data_json) service.path_raw_data = path_raw_data raw_data = self.read_hdf5_file(path_raw_data) # [coil, y*z*contrast, x] print(raw_data.shape) sequence_name = self.name if sequence_name == 'nonlinear_decart(tse)' or sequence_name == 'radial_propeller': k_space_order = self.read_order_from_json(path_order_json) self.k_space_order = k_space_order digit = self.digit if not path_raw_data or not path_np_data_json: raise ValueError("Пожалуйста, выберите файлы и введите данные перед началом реконструкции.") else: json_data = path_np_data_json if digit == '2d': self.reconstruct_2d_h5(sequence_name, raw_data, json_data) elif digit == '3d': self.reconstruct_3d_h5(sequence_name, raw_data, json_data) except ValueError as ve: raise ValueError(str(ve)) except NotImplementedError as nie: raise NotImplementedError(str(nie)) except Exception as e: raise RuntimeError(f"Произошла ошибка в процессе реконструкции: {e}") def read_order_from_json(self, path_order_json): with open(path_order_json, 'r') as f: data = json.load(f) return data.get("k_space_order", []) def read_hdf5_file(self, path): with h5py.File(path, 'r') as f: key = list(f.keys())[0] data = f[key] keyw = list(data.keys())[0] arr_data = np.array(data[keyw]) return np.array(arr_data) # [coil, y*z*contrast, x] def get_value_or_default(self, json_obj, key, default_value=1): return int(json_obj.get(key, default_value)) def read_consts(self, path): with open(path, 'r') as file: data = json.load(file) d_scans = self.get_value_or_default(data, "D_scans", 1) Np = int(data['Np']) Nf = int(data['Nf']) sl_nb = int(data['sl_nb']) nContrast = self.get_value_or_default(data, "contrasts", 1) spoil = self.get_value_or_default(data, "RF_spoil", 0) ETL = self.get_value_or_default(data, 'ETL', 0) N_TE = self.get_value_or_default(data, 'N_TE', 0) phi_wing = self.get_value_or_default(data, "phi_wing", 0) N_wings = self.get_value_or_default(data, "N_wings", 0) return Np, Nf, sl_nb, d_scans, nContrast, spoil, ETL, N_TE, phi_wing, N_wings def read_consts_3d(self, path): with open(path, 'r') as file: data = json.load(file) d_scans = self.get_value_or_default(data, "D_scans", 1) Np = int(data['Np']) Nf = int(data['Nf']) sl_nb = int(data['Nss_image']) # слои в 3д if sl_nb % 2 == 1: sl_nb -= 1 nContrast = self.get_value_or_default(data, "contrasts", 1) spoil = self.get_value_or_default(data, "RF_spoil", 0) return Np, Nf, sl_nb, d_scans, nContrast, spoil def reconstruct_2d_h5(self, sequence_name, mat_data, json_data): # mat_data = [coil, y*contrast*z, x] -> [x, y, z, contrast, coil] try: nX, nY, nZ, d_scans, nContr, spoil, ETL, N_TE, phi_wing, N_wings = self.read_consts(json_data) if self.phase_shift == False: spoil = 0 if sequence_name == "linear_decart": all_data = self.gather_data_along_trajectory_h5(mat_data, nX, nY, nZ, nContr) save_ffft(all_data, spoil) elif sequence_name == "nonlinear_decart(tse)": all_data = self.gather_data_along_trajectory_nonlinear_h5(mat_data, self.k_space_order, nX, nY, nZ, nContr) save_ffft(all_data, spoil) elif sequence_name == "linear_epi": all_data = self.gather_data_along_trajectory_linear_epi_h5(mat_data, nX, nY, nZ, nContr) save_ffft(all_data, spoil) elif sequence_name == "radial_propeller": all_data = self.gather_data_along_trajectory_radial_h5(mat_data, self.k_space_order, nX, nY, nZ, phi_wing, N_wings) save_ffft(all_data, spoil) else: raise NotImplementedError("Для данного типа ИП еще не добавлена функция.") except Exception as e: raise RuntimeError(f"Произошла ошибка в процессе реконструкции: {e}") def gather_data_along_trajectory_h5(self, mat_data, nX, nY, nZ, nContr, nCoils=1): nCoils = mat_data.shape[2] gathered_data = np.zeros((nX, nY, nZ, nContr, nCoils), dtype=np.complex128) i = 0 for y in range(0, nY): for z in range(0, nZ): for contrast in range(0, nContr): for x in range(0, nX): gathered_data[x, y, z, contrast, :] = mat_data[x, i, :] i += 1 return gathered_data def gather_data_along_trajectory_nonlinear_h5(self, mat_data, order, nX, nY, nZ, nContr, nCoils=1): # out: [x, y, z, contrast, coil] nCoils = mat_data.shape[2] gathered_data = np.zeros((nX, nY, nZ, nContr, nCoils), dtype=np.complex128) N_center = (nY - 1) // 2 if nY % 2 == 0: N_center += 1 # print('N_center', N_center) step_y = order # print("STEP TSE", step_y) i = 0 print(len(step_y)) index_y = len(step_y) # for coil in range(nCoils): for z in range(0, nZ): for ind in range(index_y): # for z in range(0, nZ): for y in step_y[ind]: # print(y, "step", step_y[ind]) if not isinstance(y, str): # for z in range(0, nZ): for x in range(0, nX): gathered_data[x, y, z, 0, :] = mat_data[x, i, :] i += 1 return gathered_data def gather_data_along_trajectory_linear_epi_h5(self, mat_data, nX, nY, nZ, nContr, nCoils=1): # out: [x, y, z, contrast, coil] nCoils = mat_data.shape[2] gathered_data = np.zeros((nX, nY, nZ, nContr, nCoils), dtype=np.complex128) i = 0 # for coil in range(nCoils): for z in range(0, nZ): for y in range(0, nY): for contrast in range(0, nContr): if y % 2 == 1: for x in range(0, nX): gathered_data[y, x, z, contrast, :] = mat_data[x, i, :] i += 1 else: for x in range(nX - 1, -1, -1): gathered_data[y, x, z, contrast, :] = mat_data[x, i, :] i += 1 return gathered_data def gather_data_along_trajectory_radial_h5(self, mat_data, order, nX, nY, nZ, phi_wing, N_wings, nCoils=1): # out: [x, y, z, contrast, coil] nCoils = mat_data.shape[2] k_space_not_rotate = self.read_raw_data_radial_h5(mat_data, order, nX, N_wings, nCoils) k_space = rotate_radial(k_space_not_rotate, phi_wing, N_wings) # in traj k_spaces_with_coils = np.stack(k_space, axis=0) k_spaces_with_contrast = np.expand_dims(k_spaces_with_coils, axis=1) k_spaces = np.transpose(k_spaces_with_contrast, (3, 4, 2, 1, 0)) print(k_spaces.shape) return k_spaces def read_raw_data_radial_h5(self, mat_data, order, nX, N_wings, nCoils): # out: [coil, slice, n_wing, x, y] # in: [coil, z*n_wings*y, x] nSlices = int((mat_data.shape[1]) / N_wings / len(order[0])) k_space_size = nX k_space = np.zeros((nCoils, nSlices, N_wings, k_space_size, k_space_size), dtype=np.complex128) center_k_space = k_space_size // 2 # положение по Х, центр меняться не будет, можно учитывать как Мх step_lines = order # k_spaces_not_rotate = [] ind = 0 for blade_number in range(0, N_wings, 1): for iSlice in range(nSlices): for arr_line_number in step_lines: # print(arr_line_number, "-arr_line_number") for line_number in arr_line_number: # print(line_number, "-line_number") if not isinstance(line_number, str): for index in range(nX): # print(index, "-index") radius = center_k_space + line_number - (len(arr_line_number) // 2) k_space[:, iSlice, blade_number, radius, index] = mat_data[index,ind,:] ind += 1 return k_space def reconstruct_3d_h5(self, sequence_name, mat_data, json_data): try: nX, nY, nZ, d_scans, nContr, spoil = self.read_consts_3d(json_data) if self.phase_shift == False: spoil = 0 if sequence_name == "linear_decart": all_data = self.gather_data_along_trajectory_h5(mat_data, nX, nY, nZ, nContr) save_ffft_3d(all_data, spoil) elif sequence_name == "nonlinear_decart(tse)": all_data = self.gather_data_along_trajectory_nonlinear_h5(mat_data, self.k_space_order, nX, nY, nZ, nContr) save_ffft_3d(all_data, spoil) else: raise NotImplementedError("Для данного типа ИП еще не добавлена функция.") except Exception as e: raise RuntimeError(f"Произошла ошибка в процессе реконструкции: {e}")