|
@@ -0,0 +1,1708 @@
|
|
|
+import itertools
|
|
|
+import math
|
|
|
+from collections import OrderedDict
|
|
|
+from copy import copy
|
|
|
+from itertools import chain
|
|
|
+from types import SimpleNamespace
|
|
|
+from typing import Tuple, List, Iterable
|
|
|
+from typing import Union
|
|
|
+from warnings import warn
|
|
|
+
|
|
|
+import matplotlib as mpl
|
|
|
+import numpy as np
|
|
|
+from matplotlib import pyplot as plt
|
|
|
+
|
|
|
+from pypulseq import eps
|
|
|
+from pypulseq.Sequence import block, parula
|
|
|
+from pypulseq.Sequence.ext_test_report import ext_test_report
|
|
|
+from pypulseq.Sequence.read_seq import read
|
|
|
+from pypulseq.Sequence.write_seq import write as write_seq
|
|
|
+from pypulseq.calc_rf_center import calc_rf_center
|
|
|
+from pypulseq.check_timing import check_timing as ext_check_timing
|
|
|
+from pypulseq.decompress_shape import decompress_shape
|
|
|
+from pypulseq.event_lib import EventLibrary
|
|
|
+from pypulseq.opts import Opts
|
|
|
+from pypulseq.supported_labels_rf_use import get_supported_labels
|
|
|
+from version import major, minor, revision
|
|
|
+
|
|
|
+
|
|
|
+class Sequence:
|
|
|
+ """
|
|
|
+ Generate sequences and read/write sequence files. This class defines properties and methods to define a complete MR
|
|
|
+ sequence including RF pulses, gradients, ADC events, etc. The class provides an implementation of the open MR
|
|
|
+ sequence format defined by the Pulseq project. See http://pulseq.github.io/.
|
|
|
+
|
|
|
+ See also `demo_read.py`, `demo_write.py`.
|
|
|
+ """
|
|
|
+
|
|
|
+ read_seq = None
|
|
|
+ version_major = major
|
|
|
+ version_minor = minor
|
|
|
+ version_revision = revision
|
|
|
+
|
|
|
+ def __init__(self, system=Opts()):
|
|
|
+ # =========
|
|
|
+ # EVENT LIBRARIES
|
|
|
+ # =========
|
|
|
+ self.adc_library = EventLibrary() # Library of ADC events
|
|
|
+ self.delay_library = EventLibrary() # Library of delay events
|
|
|
+ # Library of extension events. Extension events form single-linked zero-terminated lists
|
|
|
+ self.extensions_library = EventLibrary()
|
|
|
+ self.grad_library = EventLibrary() # Library of gradient events
|
|
|
+ self.label_inc_library = (
|
|
|
+ EventLibrary()
|
|
|
+ ) # Library of Label(inc) events (reference from the extensions library)
|
|
|
+ self.label_set_library = (
|
|
|
+ EventLibrary()
|
|
|
+ ) # Library of Label(set) events (reference from the extensions library)
|
|
|
+ self.rf_library = EventLibrary() # Library of RF events
|
|
|
+ self.shape_library = EventLibrary() # Library of compressed shapes
|
|
|
+ self.trigger_library = EventLibrary() # Library of trigger events
|
|
|
+
|
|
|
+ # =========
|
|
|
+ # OTHER
|
|
|
+ # =========
|
|
|
+ self.system = system
|
|
|
+
|
|
|
+ self.block_events = OrderedDict() # Event table
|
|
|
+ self.definitions = dict() # Optional sequence definitions
|
|
|
+
|
|
|
+ self.rf_raster_time = (
|
|
|
+ self.system.rf_raster_time
|
|
|
+ ) # RF raster time (system dependent)
|
|
|
+ self.grad_raster_time = (
|
|
|
+ self.system.grad_raster_time
|
|
|
+ ) # Gradient raster time (system dependent)
|
|
|
+ self.adc_raster_time = (
|
|
|
+ self.system.adc_raster_time
|
|
|
+ ) # ADC raster time (system dependent)
|
|
|
+ self.block_duration_raster = self.system.block_duration_raster
|
|
|
+ self.set_definition("AdcRasterTime", self.adc_raster_time)
|
|
|
+ self.set_definition("BlockDurationRaster", self.block_duration_raster)
|
|
|
+ self.set_definition("GradientRasterTime", self.grad_raster_time)
|
|
|
+ self.set_definition("RadiofrequencyRasterTime", self.rf_raster_time)
|
|
|
+ self.signature_type = ""
|
|
|
+ self.signature_file = ""
|
|
|
+ self.signature_value = ""
|
|
|
+
|
|
|
+ self.block_durations = [] # Cache of block durations
|
|
|
+ self.extension_numeric_idx = [] # numeric IDs of the used extensions
|
|
|
+ self.extension_string_idx = [] # string IDs of the used extensions
|
|
|
+
|
|
|
+ def __str__(self) -> str:
|
|
|
+ s = "Sequence:"
|
|
|
+ s += "\nshape_library: " + str(self.shape_library)
|
|
|
+ s += "\nrf_library: " + str(self.rf_library)
|
|
|
+ s += "\ngrad_library: " + str(self.grad_library)
|
|
|
+ s += "\nadc_library: " + str(self.adc_library)
|
|
|
+ s += "\ndelay_library: " + str(self.delay_library)
|
|
|
+ s += "\nextensions_library: " + str(
|
|
|
+ self.extensions_library
|
|
|
+ ) # inserted for trigger support by mveldmann
|
|
|
+ s += "\nrf_raster_time: " + str(self.rf_raster_time)
|
|
|
+ s += "\ngrad_raster_time: " + str(self.grad_raster_time)
|
|
|
+ s += "\nblock_events: " + str(len(self.block_events))
|
|
|
+ return s
|
|
|
+
|
|
|
+ def add_block(self, *args: SimpleNamespace) -> None:
|
|
|
+ """
|
|
|
+ Add a new block/multiple events to the sequence. Adds a sequence block with provided as a block structure
|
|
|
+
|
|
|
+ See also:
|
|
|
+ - `pypulseq.Sequence.sequence.Sequence.set_block()`
|
|
|
+ - `pypulseq.make_adc.make_adc()`
|
|
|
+ - `pypulseq.make_trapezoid.make_trapezoid()`
|
|
|
+ - `pypulseq.make_sinc_pulse.make_sinc_pulse()`
|
|
|
+
|
|
|
+ Parameters
|
|
|
+ ----------
|
|
|
+ args : SimpleNamespace
|
|
|
+ Block structure or events to be added as a block to `Sequence`.
|
|
|
+ """
|
|
|
+ block.set_block(self, len(self.block_events) + 1, *args)
|
|
|
+
|
|
|
+ def calculate_kspace(
|
|
|
+ self, trajectory_delay: int = 0
|
|
|
+ ) -> Tuple[np.array, np.array, np.array, np.array, np.array]:
|
|
|
+ """
|
|
|
+ Calculates the k-space trajectory of the entire pulse sequence.
|
|
|
+
|
|
|
+ Parameters
|
|
|
+ ----------
|
|
|
+ trajectory_delay : int, default=0
|
|
|
+ Compensation factor in seconds (s) to align ADC and gradients in the reconstruction.
|
|
|
+
|
|
|
+ Returns
|
|
|
+ -------
|
|
|
+ k_traj_adc : numpy.array
|
|
|
+ K-space trajectory sampled at `t_adc` timepoints.
|
|
|
+ k_traj : numpy.array
|
|
|
+ K-space trajectory of the entire pulse sequence.
|
|
|
+ t_excitation : numpy.array
|
|
|
+ Excitation timepoints.
|
|
|
+ t_refocusing : numpy.array
|
|
|
+ Refocusing timepoints.
|
|
|
+ t_adc : numpy.array
|
|
|
+ Sampling timepoints.
|
|
|
+ """
|
|
|
+ if np.any(np.abs(trajectory_delay) > 100e-6):
|
|
|
+ raise Warning(
|
|
|
+ f"Trajectory delay of {trajectory_delay * 1e6} us is suspiciously high"
|
|
|
+ )
|
|
|
+
|
|
|
+ # Initialise the counters and accumulator objects
|
|
|
+ count_excitation = 0
|
|
|
+ count_refocusing = 0
|
|
|
+ count_adc_samples = 0
|
|
|
+
|
|
|
+ # Loop through the blocks to prepare preallocations
|
|
|
+ for block_counter in range(len(self.block_events)):
|
|
|
+ block = self.get_block(block_counter + 1)
|
|
|
+ if block.rf is not None:
|
|
|
+ if (
|
|
|
+ not hasattr(block.rf, "use")
|
|
|
+ or block.rf.use == "excitation"
|
|
|
+ or block.rf.use == "undefined"
|
|
|
+ ):
|
|
|
+ count_excitation += 1
|
|
|
+ elif block.rf.use == "refocusing":
|
|
|
+ count_refocusing += 1
|
|
|
+
|
|
|
+ if block.adc is not None:
|
|
|
+ count_adc_samples += int(block.adc.num_samples)
|
|
|
+
|
|
|
+ t_excitation = np.zeros(count_excitation)
|
|
|
+ t_refocusing = np.zeros(count_refocusing)
|
|
|
+ k_time = np.zeros(count_adc_samples)
|
|
|
+ current_duration = 0
|
|
|
+ count_excitation = 0
|
|
|
+ count_refocusing = 0
|
|
|
+ kc_outer = 0
|
|
|
+ traj_recon_delay = trajectory_delay
|
|
|
+
|
|
|
+ # Go through the blocks and collect RF and ADC timing data
|
|
|
+ for block_counter in range(len(self.block_events)):
|
|
|
+ block = self.get_block(block_counter + 1)
|
|
|
+
|
|
|
+ if block.rf is not None:
|
|
|
+ rf = block.rf
|
|
|
+ rf_center, _ = calc_rf_center(rf)
|
|
|
+ t = rf.delay + rf_center
|
|
|
+ if (
|
|
|
+ not hasattr(block.rf, "use")
|
|
|
+ or block.rf.use == "excitation"
|
|
|
+ or block.rf.use == "undefined"
|
|
|
+ ):
|
|
|
+ t_excitation[count_excitation] = current_duration + t
|
|
|
+ count_excitation += 1
|
|
|
+ elif block.rf.use == "refocusing":
|
|
|
+ t_refocusing[count_refocusing] = current_duration + t
|
|
|
+ count_refocusing += 1
|
|
|
+
|
|
|
+ if block.adc is not None:
|
|
|
+ _k_time = np.arange(block.adc.num_samples) + 0.5
|
|
|
+ _k_time = (
|
|
|
+ _k_time * block.adc.dwell
|
|
|
+ + block.adc.delay
|
|
|
+ + current_duration
|
|
|
+ + traj_recon_delay
|
|
|
+ )
|
|
|
+ k_time[kc_outer : kc_outer + block.adc.num_samples] = _k_time
|
|
|
+ kc_outer += block.adc.num_samples
|
|
|
+ current_duration += self.block_durations[block_counter]
|
|
|
+
|
|
|
+ # Now calculate the actual k-space trajectory based on the gradient waveforms
|
|
|
+ gw = self.gradient_waveforms()
|
|
|
+ i_excitation = np.round(t_excitation / self.grad_raster_time)
|
|
|
+ i_refocusing = np.round(t_refocusing / self.grad_raster_time)
|
|
|
+ i_periods = np.sort(
|
|
|
+ [1, *(i_excitation + 1), *(i_refocusing + 1), gw.shape[1] + 1]
|
|
|
+ ).astype(np.int32)
|
|
|
+ # i_periods -= 1 # Python is 0-indexed
|
|
|
+ ii_next_excitation = np.min((len(i_excitation), 1))
|
|
|
+ ii_next_refocusing = np.min((len(i_refocusing), 1))
|
|
|
+ k_traj = np.zeros_like(gw)
|
|
|
+ k = np.zeros((3, 1))
|
|
|
+
|
|
|
+ for i in range(len(i_periods) - 1):
|
|
|
+ i_period_end = i_periods[i + 1] - 1
|
|
|
+ k_period = np.concatenate(
|
|
|
+ (k, gw[:, i_periods[i] - 1 : i_period_end] * self.grad_raster_time),
|
|
|
+ axis=1,
|
|
|
+ )
|
|
|
+ k_period = np.cumsum(k_period, axis=1)
|
|
|
+ k_traj[:, i_periods[i] - 1 : i_period_end] = k_period[:, 1:]
|
|
|
+ k = k_period[:, -1]
|
|
|
+
|
|
|
+ if (
|
|
|
+ ii_next_excitation > 0
|
|
|
+ and i_excitation[ii_next_excitation - 1] == i_period_end
|
|
|
+ ):
|
|
|
+ k[:] = 0
|
|
|
+ k_traj[:, i_period_end - 1] = np.nan
|
|
|
+ ii_next_excitation = min(len(i_excitation), ii_next_excitation + 1)
|
|
|
+
|
|
|
+ if (
|
|
|
+ ii_next_refocusing > 0
|
|
|
+ and i_refocusing[ii_next_refocusing - 1] == i_period_end
|
|
|
+ ):
|
|
|
+ k = -k
|
|
|
+ ii_next_refocusing = min(len(i_refocusing), ii_next_refocusing + 1)
|
|
|
+
|
|
|
+ k = k.reshape((-1, 1)) # To be compatible with np.concatenate
|
|
|
+
|
|
|
+ k_traj_adc = []
|
|
|
+ for _k_traj_row in k_traj:
|
|
|
+ result = np.interp(
|
|
|
+ xp=np.array(range(1, k_traj.shape[1] + 1)) * self.grad_raster_time,
|
|
|
+ fp=_k_traj_row,
|
|
|
+ x=k_time,
|
|
|
+ )
|
|
|
+ k_traj_adc.append(result)
|
|
|
+ k_traj_adc = np.stack(k_traj_adc)
|
|
|
+ t_adc = k_time
|
|
|
+
|
|
|
+ return k_traj_adc, k_traj, t_excitation, t_refocusing, t_adc
|
|
|
+
|
|
|
+ def calculate_kspacePP(
|
|
|
+ self,
|
|
|
+ trajectory_delay: Union[int, float, np.ndarray] = 0,
|
|
|
+ gradient_offset: int = 0,
|
|
|
+ ) -> Tuple[np.array, np.array, np.array, np.array, np.array]:
|
|
|
+ """
|
|
|
+ Calculates the k-space trajectory of the entire pulse sequence.
|
|
|
+
|
|
|
+ Parameters
|
|
|
+ ----------
|
|
|
+ trajectory_delay : int, default=0
|
|
|
+ Compensation factor in seconds (s) to align ADC and gradients in the reconstruction.
|
|
|
+
|
|
|
+ Returns
|
|
|
+ -------
|
|
|
+ k_traj_adc : numpy.array
|
|
|
+ K-space trajectory sampled at `t_adc` timepoints.
|
|
|
+ k_traj : numpy.array
|
|
|
+ K-space trajectory of the entire pulse sequence.
|
|
|
+ t_excitation : numpy.array
|
|
|
+ Excitation timepoints.
|
|
|
+ t_refocusing : numpy.array
|
|
|
+ Refocusing timepoints.
|
|
|
+ t_adc : numpy.array
|
|
|
+ Sampling timepoints.
|
|
|
+ """
|
|
|
+ if np.any(np.abs(trajectory_delay) > 100e-6):
|
|
|
+ raise Warning(
|
|
|
+ f"Trajectory delay of {trajectory_delay * 1e6} us is suspiciously high"
|
|
|
+ )
|
|
|
+
|
|
|
+ total_duration = np.sum(self.block_durations)
|
|
|
+
|
|
|
+ gw_data, tfp_excitation, tfp_refocusing, t_adc, _ = self.waveforms_and_times()
|
|
|
+
|
|
|
+ ng = len(gw_data)
|
|
|
+ # Gradient delay handling
|
|
|
+ if isinstance(trajectory_delay, (int, float)):
|
|
|
+ gradient_delays = [trajectory_delay] * ng
|
|
|
+ else:
|
|
|
+ assert (
|
|
|
+ len(trajectory_delay) == ng
|
|
|
+ ) # Need to have same number of gradient channels
|
|
|
+ gradient_delays = [trajectory_delay] * ng
|
|
|
+
|
|
|
+ # Gradient offset handling
|
|
|
+ if isinstance(gradient_offset, (int, float)):
|
|
|
+ gradient_offset = [gradient_offset] * ng
|
|
|
+ else:
|
|
|
+ assert (
|
|
|
+ len(gradient_offset) == ng
|
|
|
+ ) # Need to have same number of gradient channels
|
|
|
+
|
|
|
+ # Convert data to piecewise polynomials
|
|
|
+ gw_pp = []
|
|
|
+ gw_pp_MATLAB = []
|
|
|
+ for j in range(ng):
|
|
|
+ wave_cnt = gw_data[j].shape[1]
|
|
|
+ if wave_cnt == 0:
|
|
|
+ if np.abs(gradient_offset[j]) <= eps:
|
|
|
+ continue
|
|
|
+ else:
|
|
|
+ gw = np.array(([0, total_duration], [0, 0]))
|
|
|
+ else:
|
|
|
+ gw = gw_data[j]
|
|
|
+
|
|
|
+ # Now gw contains the waveform from the current axis
|
|
|
+ if np.abs(gradient_delays[j]) > eps:
|
|
|
+ gw[1] = gw[1] - gradient_delays[j] # Anisotropic gradient delay support
|
|
|
+ if not np.all(np.isfinite(gw)):
|
|
|
+ raise Warning("Not all elements of the generated waveform are finite.")
|
|
|
+
|
|
|
+ teps = 1e-12
|
|
|
+ if gw[0, 0] > 0 and gw[1, -1] < total_duration:
|
|
|
+ # teps terms to avoid integration errors over extended periods of time
|
|
|
+ _temp1 = np.array(([-teps, gw[0, 0] - teps], [0, 0]))
|
|
|
+ _temp2 = np.array(([gw[0, -1] + teps, total_duration + teps], [0, 0]))
|
|
|
+ gw = np.hstack((_temp1, gw, _temp2))
|
|
|
+ elif gw[0, 0] > 0:
|
|
|
+ _temp = np.array(([-teps, gw[0, 0] - teps], [0, 0]))
|
|
|
+ gw = np.hstack((_temp, gw))
|
|
|
+ elif gw[0, -1] < total_duration:
|
|
|
+ _temp = np.array(([gw[0, -1] + teps, total_duration + teps], [0, 0]))
|
|
|
+ gw = np.hstack((gw, _temp))
|
|
|
+
|
|
|
+ if np.abs(gradient_offset[j]) > eps:
|
|
|
+ gw[1:] += gradient_offset[j]
|
|
|
+
|
|
|
+ gw[1][gw[1] == -0.0] = 0.0
|
|
|
+ # Specify window to be same as domain prevent numpy from remapping to [-1, 1]
|
|
|
+ polyfit = [
|
|
|
+ np.polynomial.Polynomial.fit(
|
|
|
+ gw[0, i : (i + 2)],
|
|
|
+ gw[1, i : (i + 2)],
|
|
|
+ deg=1,
|
|
|
+ window=gw[0, i : (i + 2)],
|
|
|
+ )
|
|
|
+ for i in range(len(gw[0]) - 1)
|
|
|
+ ]
|
|
|
+ polyfit = np.stack(polyfit)
|
|
|
+ gw_pp.append(polyfit)
|
|
|
+
|
|
|
+ ###
|
|
|
+ """
|
|
|
+ Fix coefs for consistency with MATLAB:
|
|
|
+ 1. MATLAB returns coefficients in descending order whereas Numpy returns coefficients in ascending order.
|
|
|
+ 2. MATLAB returns local coefficients that will NOT match Numpy's outputs. Refer to the equation under the
|
|
|
+ "output arguments" section of `mkpp` MATLAB docs to convert and match coefficient outputs.
|
|
|
+ 3. Finally, MATLAB seems to not store any -1 < x < 1 coefficients, so we zero them out.
|
|
|
+ """
|
|
|
+ polyfit_MATLAB = []
|
|
|
+ for i in range(len(polyfit)):
|
|
|
+ polyfit_MATLAB_i = copy(polyfit[i])
|
|
|
+ lower = polyfit_MATLAB_i.domain[0]
|
|
|
+ co = polyfit_MATLAB_i.coef
|
|
|
+ co = co[::-1] # Reverse
|
|
|
+ a = co[0]
|
|
|
+ b = co[1] + (a * lower)
|
|
|
+ if -1 < a < 1: # to investigate
|
|
|
+ a = 0
|
|
|
+ if -1 < b < 1:
|
|
|
+ b = 0
|
|
|
+ # co = [b, a] # Un-reverse for Numpy
|
|
|
+ co = [a, b]
|
|
|
+ polyfit_MATLAB_i.coef = co
|
|
|
+ polyfit_MATLAB.append(polyfit_MATLAB_i)
|
|
|
+ gw_pp_MATLAB.append(polyfit_MATLAB)
|
|
|
+ ###
|
|
|
+
|
|
|
+ # Calculate slice positions. For now we entirely rely on the excitation -- ignoring complicated interleaved
|
|
|
+ # refocused sequences
|
|
|
+ if len(tfp_excitation) > 0:
|
|
|
+ # Position in x, y, z
|
|
|
+ slice_pos = np.zeros((len(gw_data), tfp_excitation.shape[1]))
|
|
|
+ for j in range(len(gw_data)):
|
|
|
+ if gw_pp[j] is None:
|
|
|
+ slice_pos[j] = np.empty_like((1, slice_pos.shape[1]))
|
|
|
+ else:
|
|
|
+ slice_pos[j] = np.divide(
|
|
|
+ tfp_excitation[1], self.ppval_numpy(gw_pp[j], tfp_excitation[0])
|
|
|
+ )
|
|
|
+ slice_pos[~np.isfinite(slice_pos)] = 0 # Reset undefined to 0
|
|
|
+ t_slice_pos = tfp_excitation[0]
|
|
|
+ else:
|
|
|
+ slice_pos = []
|
|
|
+ t_slice_pos = []
|
|
|
+
|
|
|
+ # FNINT
|
|
|
+ def fnint(arr_poly):
|
|
|
+ pieces = len(arr_poly)
|
|
|
+ breaks = np.stack([pp.domain for pp in arr_poly])
|
|
|
+ breaks = np.append(breaks[:, 0], breaks[-1, 1])
|
|
|
+ coefs = np.stack([pp.coef for pp in arr_poly])
|
|
|
+ order = len(arr_poly[1].coef)
|
|
|
+ dim = 1
|
|
|
+ coefs = coefs / np.tile(range(order, 0, -1), (dim * pieces, 1))
|
|
|
+ xs = np.diff(breaks[:-1])
|
|
|
+ index = np.arange(pieces - 1)
|
|
|
+ vv = xs * coefs[index, 0]
|
|
|
+ for i in range(1, order):
|
|
|
+ vv = xs * (vv + coefs[index, i])
|
|
|
+ last = np.cumsum(np.insert(vv, 0, 0)).reshape((-1, 1))
|
|
|
+
|
|
|
+ coefs = np.hstack((coefs[:, :order], last))
|
|
|
+ arr_poly_integ = []
|
|
|
+ for i in range(pieces):
|
|
|
+ arr_poly_integ.append(
|
|
|
+ np.polynomial.Polynomial(
|
|
|
+ coefs[i],
|
|
|
+ domain=[breaks[i], breaks[i + 1]],
|
|
|
+ window=[breaks[i], breaks[i + 1]],
|
|
|
+ )
|
|
|
+ )
|
|
|
+
|
|
|
+ return arr_poly_integ, coefs, breaks
|
|
|
+
|
|
|
+ # =========
|
|
|
+
|
|
|
+ # Integrate waveforms as PPs to produce gradient moments
|
|
|
+ gm_pp = []
|
|
|
+ tc = []
|
|
|
+ for i in range(ng):
|
|
|
+ if gw_pp[i] is None:
|
|
|
+ continue
|
|
|
+ res_fnint, res_coefs, res_breaks = fnint(gw_pp_MATLAB[i])
|
|
|
+ gm_pp.append(res_fnint)
|
|
|
+ tc.append(res_breaks)
|
|
|
+ # "Sample" ramps for display purposes otherwise piecewise-linear display (plot) fails
|
|
|
+ ii = np.nonzero(np.abs(res_coefs[:, 0]) > eps)[0]
|
|
|
+ if len(ii) > 0:
|
|
|
+ tca = []
|
|
|
+ for j in range(len(ii)):
|
|
|
+ res = (
|
|
|
+ np.arange(
|
|
|
+ np.floor(float(res_breaks[ii[j]] / self.grad_raster_time)),
|
|
|
+ np.ceil(
|
|
|
+ (res_breaks[ii[j] + 1] / self.grad_raster_time) + 1
|
|
|
+ ),
|
|
|
+ )
|
|
|
+ * self.grad_raster_time
|
|
|
+ )
|
|
|
+ tca.extend(res)
|
|
|
+ tc.append(tca)
|
|
|
+ tc = np.array(list(chain(*tc)))
|
|
|
+
|
|
|
+ if len(tfp_excitation) == 0:
|
|
|
+ t_excitation = np.array([])
|
|
|
+ else:
|
|
|
+ t_excitation = tfp_excitation[0]
|
|
|
+
|
|
|
+ if len(tfp_refocusing) == 0:
|
|
|
+ t_refocusing = np.array([])
|
|
|
+ else:
|
|
|
+ t_refocusing = tfp_refocusing[0]
|
|
|
+
|
|
|
+ t_acc = 1e-10 # Temporal accuracy
|
|
|
+ t_acc_inv = 1 / t_acc
|
|
|
+ # tc = self.__flatten_jagged_arr(tc)
|
|
|
+ t_ktraj = t_acc * np.unique(
|
|
|
+ np.round(
|
|
|
+ t_acc_inv
|
|
|
+ * np.array(
|
|
|
+ [
|
|
|
+ *tc,
|
|
|
+ 0,
|
|
|
+ *np.array(t_excitation) * 2 - self.rf_raster_time,
|
|
|
+ *np.array(t_excitation) - self.rf_raster_time,
|
|
|
+ *t_excitation,
|
|
|
+ *np.array(t_refocusing) - self.rf_raster_time,
|
|
|
+ *t_refocusing,
|
|
|
+ *t_adc,
|
|
|
+ total_duration,
|
|
|
+ ]
|
|
|
+ )
|
|
|
+ )
|
|
|
+ )
|
|
|
+ i_excitation = np.isin(t_ktraj, t_acc * np.round(t_acc_inv * t_excitation))
|
|
|
+ i_excitation = np.nonzero(i_excitation)[0] # Convert boolean array into indices
|
|
|
+ i_refocusing = np.isin(t_ktraj, t_acc * np.round(t_acc_inv * t_refocusing))
|
|
|
+ i_refocusing = np.nonzero(i_refocusing)[0] # Convert boolean array into indices
|
|
|
+ i_adc = np.isin(t_ktraj, t_acc * np.round(t_acc_inv * t_adc))
|
|
|
+ i_adc = np.nonzero(i_adc)[0] # Convert boolean array into indices
|
|
|
+
|
|
|
+ i_periods = np.unique([1, *i_excitation, *i_refocusing, len(t_ktraj) - 1])
|
|
|
+ if len(i_excitation) > 0:
|
|
|
+ ii_next_excitation = 1
|
|
|
+ else:
|
|
|
+ ii_next_excitation = 0
|
|
|
+ if len(i_refocusing) > 0:
|
|
|
+ ii_next_refocusing = 1
|
|
|
+ else:
|
|
|
+ ii_next_refocusing = 0
|
|
|
+
|
|
|
+ k_traj = np.zeros((3, len(t_ktraj)))
|
|
|
+ for i in range(ng):
|
|
|
+ if gw_pp_MATLAB[i] is None:
|
|
|
+ continue
|
|
|
+
|
|
|
+ it = np.logical_and(
|
|
|
+ t_ktraj >= t_acc * np.round(t_acc_inv * res_breaks[0]),
|
|
|
+ t_ktraj <= t_acc * np.round(t_acc_inv * res_breaks[-1]),
|
|
|
+ )
|
|
|
+ k_traj[i, it] = self.ppval_MATLAB(gm_pp[i], t_ktraj[it])
|
|
|
+ if t_ktraj[it[-1]] < t_ktraj[-1]:
|
|
|
+ k_traj[i, it[-1] + 1 :] = k_traj[i, it[-1]]
|
|
|
+
|
|
|
+ # Convert gradient moments to kspace
|
|
|
+ dk = -k_traj[:, 0]
|
|
|
+ for i in range(len(i_periods) - 1):
|
|
|
+ i_period = i_periods[i]
|
|
|
+ i_period_end = i_periods[i + 1]
|
|
|
+ if ii_next_excitation > 0 and i_excitation[ii_next_excitation] == i_period:
|
|
|
+ if np.abs(t_ktraj[i_period] - t_excitation[ii_next_excitation]) > t_acc:
|
|
|
+ raise Warning(
|
|
|
+ f"np.abs(t_ktraj[i_period]-t_excitation[ii_next_excitation]) < {t_acc} failed for ii_next_excitation={ii_next_excitation} error={t_ktraj(i_period) - t_excitation(ii_next_excitation)}"
|
|
|
+ )
|
|
|
+ dk = -k_traj[:, i_period]
|
|
|
+ if i_period > 1:
|
|
|
+ # Use nans to mark the excitation points since they interrupt the plots
|
|
|
+ k_traj[:, i_period - 1] = np.NaN
|
|
|
+ # -1 on len(i_excitation) for 0-based indexing
|
|
|
+ ii_next_excitation = np.minimum(
|
|
|
+ len(i_excitation) - 1, ii_next_excitation + 1
|
|
|
+ )
|
|
|
+ elif (
|
|
|
+ ii_next_refocusing > 0 and i_refocusing[ii_next_refocusing] == i_period
|
|
|
+ ):
|
|
|
+ dk = -k_traj[:, i_period]
|
|
|
+ dk = -2 * k_traj[:, i_period] - dk
|
|
|
+ # -1 on len(i_excitation) for 0-based indexing
|
|
|
+ ii_next_refocusing = np.minimum(
|
|
|
+ len(i_refocusing) - 1, ii_next_refocusing + 1
|
|
|
+ )
|
|
|
+
|
|
|
+ k_traj[:, i_period:i_period_end] = (
|
|
|
+ k_traj[:, i_period:i_period_end] + dk[:, None]
|
|
|
+ )
|
|
|
+
|
|
|
+ k_traj[:, i_period_end] = k_traj[:, i_period_end] + dk
|
|
|
+ k_traj_adc = k_traj[:, i_adc]
|
|
|
+
|
|
|
+ return k_traj_adc, k_traj, t_excitation, t_refocusing, t_adc
|
|
|
+
|
|
|
+ def check_timing(self) -> Tuple[bool, List[str]]:
|
|
|
+ """
|
|
|
+ Checks timing of all blocks and objects in the sequence optionally returns the detailed error log. This
|
|
|
+ function also modifies the sequence object by adding the field "TotalDuration" to sequence definitions.
|
|
|
+
|
|
|
+ Returns
|
|
|
+ -------
|
|
|
+ is_ok : bool
|
|
|
+ Boolean flag indicating timing errors.
|
|
|
+ error_report : str
|
|
|
+ Error report in case of timing errors.
|
|
|
+ """
|
|
|
+ error_report = []
|
|
|
+ is_ok = True
|
|
|
+ num_blocks = len(self.block_events)
|
|
|
+ total_duration = 0
|
|
|
+
|
|
|
+ for block_counter in range(num_blocks):
|
|
|
+ block = self.get_block(block_counter + 1)
|
|
|
+ events = [e for e in vars(block).values() if e is not None]
|
|
|
+ res, rep, duration = ext_check_timing(self.system, *events)
|
|
|
+ is_ok = is_ok and res
|
|
|
+
|
|
|
+ # Check the stored total block duration
|
|
|
+ if np.abs(duration - self.block_durations[block_counter]) > eps:
|
|
|
+ rep += "Inconsistency between the stored block duration and the duration of the block content"
|
|
|
+ is_ok = False
|
|
|
+ duration = self.block_durations[block_counter]
|
|
|
+
|
|
|
+ # Check RF dead times
|
|
|
+ if block.rf is not None:
|
|
|
+ if block.rf.delay - block.rf.dead_time < -eps:
|
|
|
+ rep += (
|
|
|
+ f"Delay of {block.rf.delay * 1e6} us is smaller than the RF dead time "
|
|
|
+ f"{block.rf.dead_time * 1e6} us"
|
|
|
+ )
|
|
|
+ is_ok = False
|
|
|
+
|
|
|
+ if (
|
|
|
+ block.rf.delay + block.rf.t[-1] + block.rf.ringdown_time - duration
|
|
|
+ > eps
|
|
|
+ ):
|
|
|
+ rep += (
|
|
|
+ f"Time between the end of the RF pulse at {block.rf.delay + block.rf.t[-1]} and the end "
|
|
|
+ f"of the block at {duration * 1e6} us is shorter than rf_ringdown_time"
|
|
|
+ )
|
|
|
+ is_ok = False
|
|
|
+
|
|
|
+ # Check ADC dead times
|
|
|
+ if block.adc is not None:
|
|
|
+ if block.adc.delay - self.system.adc_dead_time < -eps:
|
|
|
+ rep += "adc.delay < system.adc_dead_time"
|
|
|
+ is_ok = False
|
|
|
+
|
|
|
+ if (
|
|
|
+ block.adc.delay
|
|
|
+ + block.adc.num_samples * block.adc.dwell
|
|
|
+ + self.system.adc_dead_time
|
|
|
+ - duration
|
|
|
+ > eps
|
|
|
+ ):
|
|
|
+ rep += "adc: system.adc_dead_time (post-ADC) violation"
|
|
|
+ is_ok = False
|
|
|
+
|
|
|
+ # Update report
|
|
|
+ if len(rep) != 0:
|
|
|
+ error_report.append(f"Event: {block_counter} - {rep}\n")
|
|
|
+ total_duration += duration
|
|
|
+
|
|
|
+ # Check if all the gradients in the last block are ramped down properly
|
|
|
+ if len(events) != 0 and all([isinstance(e, SimpleNamespace) for e in events]):
|
|
|
+ for e in range(len(events)):
|
|
|
+ if not isinstance(events[e], list) and events[e].type == "grad":
|
|
|
+ if events[e].last != 0:
|
|
|
+ error_report.append(
|
|
|
+ f"Event {num_blocks - 1} gradients do not ramp to 0 at the end of the sequence"
|
|
|
+ )
|
|
|
+
|
|
|
+ self.set_definition("TotalDuration", total_duration)
|
|
|
+
|
|
|
+ return is_ok, error_report
|
|
|
+
|
|
|
+ def duration(self) -> Tuple[int, int, np.ndarray]:
|
|
|
+ """
|
|
|
+ Returns the total duration of this sequence, and the total count of blocks and events.
|
|
|
+
|
|
|
+ Returns
|
|
|
+ -------
|
|
|
+ duration : int
|
|
|
+ Duration of this sequence in seconds (s).
|
|
|
+ num_blocks : int
|
|
|
+ Number of blocks in this sequence.
|
|
|
+ event_count : np.ndarray
|
|
|
+ Number of events in this sequence.
|
|
|
+ """
|
|
|
+ num_blocks = len(self.block_events)
|
|
|
+ event_count = np.zeros(len(self.block_events[1]))
|
|
|
+ duration = 0
|
|
|
+ for block_counter in range(num_blocks):
|
|
|
+ event_count += self.block_events[block_counter + 1] > 0
|
|
|
+ duration += self.block_durations[block_counter]
|
|
|
+
|
|
|
+ return duration, num_blocks, event_count
|
|
|
+
|
|
|
+ def __flatten_jagged_arr(self, arr: np.array) -> np.array:
|
|
|
+ # Sanity check: we don't need to do anything if we have a flat array
|
|
|
+ def __flat_check(arr: np.array) -> bool:
|
|
|
+ return all([not isinstance(x, Iterable) for x in arr])
|
|
|
+
|
|
|
+ if __flat_check(arr):
|
|
|
+ return arr
|
|
|
+
|
|
|
+ # Flatten the array simply -- 1 level deep
|
|
|
+ arr_flattened = list(itertools.chain(*arr))
|
|
|
+
|
|
|
+ # Not flat yet?
|
|
|
+ if __flat_check(arr_flattened):
|
|
|
+ return arr_flattened
|
|
|
+
|
|
|
+ # Flatten the array -- 2 levels deep
|
|
|
+ any_ragged = [isinstance(x, Iterable) for x in arr_flattened]
|
|
|
+
|
|
|
+ if np.any(any_ragged):
|
|
|
+ idx_ragged = np.array(np.where(any_ragged)[0])
|
|
|
+ for i in range(len(idx_ragged)):
|
|
|
+ ii = idx_ragged[i]
|
|
|
+
|
|
|
+ # If we are not at the end of the list, we need to update the indices of the remaining elements
|
|
|
+ # Because once we expand and insert this list element, the indices of the remaining elements
|
|
|
+ # will be shifted by len(this element)
|
|
|
+ if i != len(idx_ragged) - 1:
|
|
|
+ idx_ragged[i + 1 :] += len(arr_flattened[ii])
|
|
|
+
|
|
|
+ arr_flattened = np.insert(arr_flattened, ii, arr_flattened[ii])
|
|
|
+
|
|
|
+ return arr_flattened
|
|
|
+
|
|
|
+ def flip_grad_axis(self, axis: str) -> None:
|
|
|
+ """
|
|
|
+ Invert all gradients along the corresponding axis/channel. The function acts on all gradient objects already
|
|
|
+ added to the sequence object.
|
|
|
+
|
|
|
+ Parameters
|
|
|
+ ----------
|
|
|
+ axis : str
|
|
|
+ Gradients to invert or scale. Must be one of 'x', 'y' or 'z'.
|
|
|
+ """
|
|
|
+ self.mod_grad_axis(axis, modifier=-1)
|
|
|
+
|
|
|
+ def get_block(self, block_index: int) -> SimpleNamespace:
|
|
|
+ """
|
|
|
+ Return a block of the sequence specified by the index. The block is created from the sequence data with all
|
|
|
+ events and shapes decompressed.
|
|
|
+
|
|
|
+ See also:
|
|
|
+ - `pypulseq.Sequence.sequence.Sequence.set_block()`.
|
|
|
+ - `pypulseq.Sequence.sequence.Sequence.add_block()`.
|
|
|
+
|
|
|
+ Parameters
|
|
|
+ ----------
|
|
|
+ block_index : int
|
|
|
+ Index of block to be retrieved from `Sequence`.
|
|
|
+
|
|
|
+ Returns
|
|
|
+ -------
|
|
|
+ SimpleNamespace
|
|
|
+ Event identified by `block_index`.
|
|
|
+ """
|
|
|
+ return block.get_block(self, block_index)
|
|
|
+
|
|
|
+ def get_definition(self, key: str) -> str:
|
|
|
+ """
|
|
|
+ Return value of the definition specified by the key. These definitions can be added manually or read from the
|
|
|
+ header of a sequence file defined in the sequence header. An empty array is returned if the key is not defined.
|
|
|
+
|
|
|
+ See also `pypulseq.Sequence.sequence.Sequence.set_definition()`.
|
|
|
+
|
|
|
+ Parameters
|
|
|
+ ----------
|
|
|
+ key : str
|
|
|
+ Key of definition to retrieve.
|
|
|
+
|
|
|
+ Returns
|
|
|
+ -------
|
|
|
+ str
|
|
|
+ Definition identified by `key` if found, else returns ''.
|
|
|
+ """
|
|
|
+ if key in self.definitions:
|
|
|
+ return self.definitions[key]
|
|
|
+ else:
|
|
|
+ return ""
|
|
|
+
|
|
|
+ def get_extension_type_ID(self, extension_string: str) -> int:
|
|
|
+ """
|
|
|
+ Get numeric extension ID for `extension_string`. Will automatically create a new ID if unknown.
|
|
|
+
|
|
|
+ Parameters
|
|
|
+ ----------
|
|
|
+ extension_string : str
|
|
|
+ Given string extension ID.
|
|
|
+
|
|
|
+ Returns
|
|
|
+ -------
|
|
|
+ extension_id : int
|
|
|
+ Numeric ID for given string extension ID.
|
|
|
+
|
|
|
+ """
|
|
|
+ if extension_string not in self.extension_string_idx:
|
|
|
+ if len(self.extension_numeric_idx) == 0:
|
|
|
+ extension_id = 1
|
|
|
+ else:
|
|
|
+ extension_id = 1 + max(self.extension_numeric_idx)
|
|
|
+
|
|
|
+ self.extension_numeric_idx.append(extension_id)
|
|
|
+ self.extension_string_idx.append(extension_string)
|
|
|
+ assert len(self.extension_numeric_idx) == len(self.extension_string_idx)
|
|
|
+ else:
|
|
|
+ num = self.extension_string_idx.index(extension_string)
|
|
|
+ extension_id = self.extension_numeric_idx[num]
|
|
|
+
|
|
|
+ return extension_id
|
|
|
+
|
|
|
+ def get_extension_type_string(self, extension_id: int) -> str:
|
|
|
+ """
|
|
|
+ Get string extension ID for `extension_id`.
|
|
|
+
|
|
|
+ Parameters
|
|
|
+ ----------
|
|
|
+ extension_id : int
|
|
|
+ Given numeric extension ID.
|
|
|
+
|
|
|
+ Returns
|
|
|
+ -------
|
|
|
+ extension_str : str
|
|
|
+ String ID for the given numeric extension ID.
|
|
|
+
|
|
|
+ Raises
|
|
|
+ ------
|
|
|
+ ValueError
|
|
|
+ If given numeric extension ID is unknown.
|
|
|
+ """
|
|
|
+ if extension_id in self.extension_numeric_idx:
|
|
|
+ num = self.extension_numeric_idx.index(extension_id)
|
|
|
+ else:
|
|
|
+ raise ValueError(
|
|
|
+ f"Extension for the given ID - {extension_id} - is unknown."
|
|
|
+ )
|
|
|
+
|
|
|
+ extension_str = self.extension_string_idx[num]
|
|
|
+ return extension_str
|
|
|
+
|
|
|
+ def mod_grad_axis(self, axis: str, modifier: int) -> None:
|
|
|
+ """
|
|
|
+ Invert or scale all gradients along the corresponding axis/channel. The function acts on all gradient objects
|
|
|
+ already added to the sequence object.
|
|
|
+
|
|
|
+ Parameters
|
|
|
+ ----------
|
|
|
+ axis : str
|
|
|
+ Gradients to invert or scale. Must be one of 'x', 'y' or 'z'.
|
|
|
+ modifier : int
|
|
|
+ Scaling value.
|
|
|
+
|
|
|
+ Raises
|
|
|
+ ------
|
|
|
+ ValueError
|
|
|
+ If invalid `axis` is passed. Must be one of 'x', 'y','z'.
|
|
|
+ RuntimeError
|
|
|
+ If same gradient event is used on multiple axes.
|
|
|
+ """
|
|
|
+ if axis not in ["x", "y", "z"]:
|
|
|
+ raise ValueError(
|
|
|
+ f"Invalid axis. Must be one of 'x', 'y','z'. Passed: {axis}"
|
|
|
+ )
|
|
|
+
|
|
|
+ channel_num = ["x", "y", "z"].index(axis)
|
|
|
+ other_channels = [0, 1, 2]
|
|
|
+ other_channels.remove(channel_num)
|
|
|
+
|
|
|
+ # Go through all event table entries and list gradient objects in the library
|
|
|
+ all_grad_events = np.array(list(self.block_events.values()))
|
|
|
+ all_grad_events = all_grad_events[:, 2:5]
|
|
|
+
|
|
|
+ selected_events = np.unique(all_grad_events[:, channel_num])
|
|
|
+ selected_events = selected_events[selected_events != 0]
|
|
|
+ other_events = np.unique(all_grad_events[:, other_channels])
|
|
|
+ if len(np.intersect1d(selected_events, other_events)) > 0:
|
|
|
+ raise RuntimeError(
|
|
|
+ "mod_grad_axis does not yet support the same gradient event used on multiple axes."
|
|
|
+ )
|
|
|
+
|
|
|
+ for i in range(len(selected_events)):
|
|
|
+ self.grad_library.data[selected_events[i]][0] *= modifier
|
|
|
+ if (
|
|
|
+ self.grad_library.type[selected_events[i]] == "g"
|
|
|
+ and self.grad_library.lengths[selected_events[i]] == 5
|
|
|
+ ):
|
|
|
+ # Need to update first and last fields
|
|
|
+ self.grad_library.data[selected_events[i]][3] *= modifier
|
|
|
+ self.grad_library.data[selected_events[i]][4] *= modifier
|
|
|
+
|
|
|
+ def plot(
|
|
|
+ self,
|
|
|
+ label: str = str(),
|
|
|
+ show_blocks: bool = False,
|
|
|
+ save: bool = False,
|
|
|
+ time_range=(0, np.inf),
|
|
|
+ time_disp: str = "s",
|
|
|
+ grad_disp: str = "kHz/m",
|
|
|
+ plot_now: bool = True
|
|
|
+ ) -> None:
|
|
|
+ """
|
|
|
+ Plot `Sequence`.
|
|
|
+
|
|
|
+ Parameters
|
|
|
+ ----------
|
|
|
+ label : str, defualt=str()
|
|
|
+ Plot label values for ADC events: in this example for LIN and REP labels; other valid labes are accepted as
|
|
|
+ a comma-separated list.
|
|
|
+ save : bool, default=False
|
|
|
+ Boolean flag indicating if plots should be saved. The two figures will be saved as JPG with numerical
|
|
|
+ suffixes to the filename 'seq_plot'.
|
|
|
+ show_blocks : bool, default=False
|
|
|
+ Boolean flag to indicate if grid and tick labels at the block boundaries are to be plotted.
|
|
|
+ time_range : iterable, default=(0, np.inf)
|
|
|
+ Time range (x-axis limits) for plotting the sequence. Default is 0 to infinity (entire sequence).
|
|
|
+ time_disp : str, default='s'
|
|
|
+ Time display type, must be one of `s`, `ms` or `us`.
|
|
|
+ grad_disp : str, default='s'
|
|
|
+ Gradient display unit, must be one of `kHz/m` or `mT/m`.
|
|
|
+ plot_now : bool, default=True
|
|
|
+ If true, function immediately shows the plots, blocking the rest of the code until plots are exited.
|
|
|
+ If false, plots are shown when plt.show() is called. Useful if plots are to be modified.
|
|
|
+ plot_type : str, default='Gradient'
|
|
|
+ Gradients display type, must be one of either 'Gradient' or 'Kspace'.
|
|
|
+ """
|
|
|
+ mpl.rcParams["lines.linewidth"] = 0.75 # Set default Matplotlib linewidth
|
|
|
+
|
|
|
+ valid_time_units = ["s", "ms", "us"]
|
|
|
+ valid_grad_units = ["kHz/m", "mT/m"]
|
|
|
+ valid_labels = get_supported_labels()
|
|
|
+ if (
|
|
|
+ not all([isinstance(x, (int, float)) for x in time_range])
|
|
|
+ or len(time_range) != 2
|
|
|
+ ):
|
|
|
+ raise ValueError("Invalid time range")
|
|
|
+ if time_disp not in valid_time_units:
|
|
|
+ raise ValueError("Unsupported time unit")
|
|
|
+
|
|
|
+ if grad_disp not in valid_grad_units:
|
|
|
+ raise ValueError(
|
|
|
+ "Unsupported gradient unit. Supported gradient units are: "
|
|
|
+ + str(valid_grad_units)
|
|
|
+ )
|
|
|
+
|
|
|
+ fig1, fig2 = plt.figure(1), plt.figure(2)
|
|
|
+ sp11 = fig1.add_subplot(311)
|
|
|
+ sp12 = fig1.add_subplot(312, sharex=sp11)
|
|
|
+ sp13 = fig1.add_subplot(313, sharex=sp11)
|
|
|
+ fig2_subplots = [
|
|
|
+ fig2.add_subplot(311, sharex=sp11),
|
|
|
+ fig2.add_subplot(312, sharex=sp11),
|
|
|
+ fig2.add_subplot(313, sharex=sp11),
|
|
|
+ ]
|
|
|
+
|
|
|
+ t_factor_list = [1, 1e3, 1e6]
|
|
|
+ t_factor = t_factor_list[valid_time_units.index(time_disp)]
|
|
|
+
|
|
|
+ g_factor_list = [1e-3, 1e3 / self.system.gamma]
|
|
|
+ g_factor = g_factor_list[valid_grad_units.index(grad_disp)]
|
|
|
+
|
|
|
+ t0 = 0
|
|
|
+ label_defined = False
|
|
|
+ label_idx_to_plot = []
|
|
|
+ label_legend_to_plot = []
|
|
|
+ label_store = dict()
|
|
|
+ for i in range(len(valid_labels)):
|
|
|
+ label_store[valid_labels[i]] = 0
|
|
|
+ if valid_labels[i] in label.upper():
|
|
|
+ label_idx_to_plot.append(i)
|
|
|
+ label_legend_to_plot.append(valid_labels[i])
|
|
|
+
|
|
|
+ if len(label_idx_to_plot) != 0:
|
|
|
+ p = parula.main(len(label_idx_to_plot) + 1)
|
|
|
+ label_colors_to_plot = p(np.arange(len(label_idx_to_plot)))
|
|
|
+ label_colors_to_plot = np.roll(label_colors_to_plot, -1, axis=0).tolist()
|
|
|
+
|
|
|
+ # Block timings
|
|
|
+ block_edges = np.cumsum([0, *self.block_durations])
|
|
|
+ block_edges_in_range = block_edges[
|
|
|
+ (block_edges >= time_range[0]) * (block_edges <= time_range[1])
|
|
|
+ ]
|
|
|
+ if show_blocks:
|
|
|
+ for sp in [sp11, sp12, sp13, *fig2_subplots]:
|
|
|
+ sp.set_xticks(t_factor * block_edges_in_range)
|
|
|
+ sp.set_xticklabels(rotation=90)
|
|
|
+
|
|
|
+ for block_counter in range(len(self.block_events)):
|
|
|
+ block = self.get_block(block_counter + 1)
|
|
|
+ is_valid = time_range[0] <= t0 <= time_range[1]
|
|
|
+ if is_valid:
|
|
|
+ if getattr(block, "label", None) is not None:
|
|
|
+ for i in range(len(block.label)):
|
|
|
+ if block.label[i].type == "labelinc":
|
|
|
+ label_store[block.label[i].label] += block.label[i].value
|
|
|
+ else:
|
|
|
+ label_store[block.label[i].label] = block.label[i].value
|
|
|
+ label_defined = True
|
|
|
+
|
|
|
+ if getattr(block, "adc", None) is not None: # ADC
|
|
|
+ adc = block.adc
|
|
|
+ # From Pulseq: According to the information from Klaus Scheffler and indirectly from Siemens this
|
|
|
+ # is the present convention - the samples are shifted by 0.5 dwell
|
|
|
+ t = adc.delay + (np.arange(int(adc.num_samples)) + 0.5) * adc.dwell
|
|
|
+ sp11.plot(t_factor * (t0 + t), np.zeros(len(t)), "rx")
|
|
|
+ sp13.plot(
|
|
|
+ t_factor * (t0 + t),
|
|
|
+ np.angle(
|
|
|
+ np.exp(1j * adc.phase_offset)
|
|
|
+ * np.exp(1j * 2 * np.pi * t * adc.freq_offset)
|
|
|
+ ),
|
|
|
+ "b.",
|
|
|
+ markersize=0.25,
|
|
|
+ )
|
|
|
+
|
|
|
+ if label_defined and len(label_idx_to_plot) != 0:
|
|
|
+ cycler = mpl.cycler(color=label_colors_to_plot)
|
|
|
+ sp11.set_prop_cycle(cycler)
|
|
|
+ label_colors_to_plot = np.roll(
|
|
|
+ label_colors_to_plot, -1, axis=0
|
|
|
+ ).tolist()
|
|
|
+ arr_label_store = list(label_store.values())
|
|
|
+ lbl_vals = np.take(arr_label_store, label_idx_to_plot)
|
|
|
+ t = t0 + adc.delay + (adc.num_samples - 1) / 2 * adc.dwell
|
|
|
+ _t = [t_factor * t] * len(lbl_vals)
|
|
|
+ # Plot each label individually to retrieve each corresponding Line2D object
|
|
|
+ p = itertools.chain.from_iterable(
|
|
|
+ [
|
|
|
+ sp11.plot(__t, _lbl_vals, ".")
|
|
|
+ for __t, _lbl_vals in zip(_t, lbl_vals)
|
|
|
+ ]
|
|
|
+ )
|
|
|
+ if len(label_legend_to_plot) != 0:
|
|
|
+ sp11.legend(p, label_legend_to_plot, loc="upper left")
|
|
|
+ label_legend_to_plot = []
|
|
|
+
|
|
|
+ if getattr(block, "rf", None) is not None: # RF
|
|
|
+ rf = block.rf
|
|
|
+ tc, ic = calc_rf_center(rf)
|
|
|
+ time = rf.t
|
|
|
+ signal = rf.signal
|
|
|
+ if np.abs(signal[0]) != 0:
|
|
|
+ signal = np.insert(signal, obj=0, values=0)
|
|
|
+ time = np.insert(time, obj=0, values=time[0])
|
|
|
+ ic += 1
|
|
|
+
|
|
|
+ if np.abs(signal[-1]) != 0:
|
|
|
+ signal = np.append(signal, 0)
|
|
|
+ time = np.append(time, time[-1])
|
|
|
+
|
|
|
+ sp12.plot(t_factor * (t0 + time + rf.delay), np.abs(signal))
|
|
|
+ sp13.plot(
|
|
|
+ t_factor * (t0 + time + rf.delay),
|
|
|
+ np.angle(
|
|
|
+ signal
|
|
|
+ * np.exp(1j * rf.phase_offset)
|
|
|
+ * np.exp(1j * 2 * math.pi * time * rf.freq_offset)
|
|
|
+ ),
|
|
|
+ t_factor * (t0 + tc + rf.delay),
|
|
|
+ np.angle(
|
|
|
+ signal[ic]
|
|
|
+ * np.exp(1j * rf.phase_offset)
|
|
|
+ * np.exp(1j * 2 * math.pi * time[ic] * rf.freq_offset)
|
|
|
+ ),
|
|
|
+ "xb",
|
|
|
+ )
|
|
|
+
|
|
|
+ grad_channels = ["gx", "gy", "gz"]
|
|
|
+ for x in range(len(grad_channels)): # Gradients
|
|
|
+ if getattr(block, grad_channels[x], None) is not None:
|
|
|
+ grad = getattr(block, grad_channels[x])
|
|
|
+ if grad.type == "grad":
|
|
|
+ # We extend the shape by adding the first and the last points in an effort of making the
|
|
|
+ # display a bit less confusing...
|
|
|
+ time = grad.delay + [0, *grad.tt, grad.shape_dur]
|
|
|
+ waveform = g_factor * np.array(
|
|
|
+ (grad.first, *grad.waveform, grad.last)
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ time = np.cumsum(
|
|
|
+ [
|
|
|
+ 0,
|
|
|
+ grad.delay,
|
|
|
+ grad.rise_time,
|
|
|
+ grad.flat_time,
|
|
|
+ grad.fall_time,
|
|
|
+ ]
|
|
|
+ )
|
|
|
+ waveform = (
|
|
|
+ g_factor * grad.amplitude * np.array([0, 0, 1, 1, 0])
|
|
|
+ )
|
|
|
+ fig2_subplots[x].plot(t_factor * (t0 + time), waveform)
|
|
|
+ t0 += self.block_durations[block_counter]
|
|
|
+
|
|
|
+ grad_plot_labels = ["x", "y", "z"]
|
|
|
+ sp11.set_ylabel("ADC")
|
|
|
+ sp12.set_ylabel("RF mag (Hz)")
|
|
|
+ sp13.set_ylabel("RF/ADC phase (rad)")
|
|
|
+ sp13.set_xlabel(f"t ({time_disp})")
|
|
|
+ for x in range(3):
|
|
|
+ _label = grad_plot_labels[x]
|
|
|
+ fig2_subplots[x].set_ylabel(f"G{_label} ({grad_disp})")
|
|
|
+ fig2_subplots[-1].set_xlabel(f"t ({time_disp})")
|
|
|
+
|
|
|
+ # Setting display limits
|
|
|
+ disp_range = t_factor * np.array([time_range[0], min(t0, time_range[1])])
|
|
|
+ [x.set_xlim(disp_range) for x in [sp11, sp12, sp13, *fig2_subplots]]
|
|
|
+
|
|
|
+ # Grid on
|
|
|
+ for sp in [sp11, sp12, sp13, *fig2_subplots]:
|
|
|
+ sp.grid()
|
|
|
+
|
|
|
+ fig1.tight_layout()
|
|
|
+ fig2.tight_layout()
|
|
|
+ if save:
|
|
|
+ fig1.savefig("seq_plot1.jpg")
|
|
|
+ fig2.savefig("seq_plot2.jpg")
|
|
|
+
|
|
|
+ if plot_now:
|
|
|
+ plt.show()
|
|
|
+
|
|
|
+ def ppval_numpy(self, arr_np_poly: np.ndarray, xq: np.ndarray):
|
|
|
+ """
|
|
|
+ Perform piece-wise polynomial evaluation on Numpy's polyfit objects.
|
|
|
+
|
|
|
+ Parameters
|
|
|
+ ==========
|
|
|
+ arr_np_poly : Iterable[np.poly1d]
|
|
|
+ xq : np.array
|
|
|
+
|
|
|
+ Returns
|
|
|
+ =======
|
|
|
+ """
|
|
|
+ result = []
|
|
|
+ result2 = []
|
|
|
+ breaks = np.array([p.domain for p in arr_np_poly])
|
|
|
+ breaks = np.array([*breaks[::2].flatten(), breaks[-1, -1]])
|
|
|
+ last_match = 0
|
|
|
+ for x in xq:
|
|
|
+ # Evaluate polynomial only when we find x's corresponding pp window
|
|
|
+ for i in range(last_match, len(arr_np_poly)):
|
|
|
+ corresponding_pp = x > arr_np_poly[i].domain
|
|
|
+ if corresponding_pp.tolist() == [True, False]:
|
|
|
+ value = np.polynomial.polynomial.polyval(x, arr_np_poly[i].coef)
|
|
|
+ result.append(value)
|
|
|
+ last_match = i
|
|
|
+ break
|
|
|
+
|
|
|
+ corresponding_pp2 = np.nonzero((x < breaks))[0][0]
|
|
|
+ value2 = np.polynomial.polynomial.polyval(
|
|
|
+ x, arr_np_poly[corresponding_pp2].coef
|
|
|
+ )
|
|
|
+ result2.append(value2)
|
|
|
+
|
|
|
+ return result
|
|
|
+
|
|
|
+ def ppval_MATLAB(self, arr_MATLAB_poly: np.ndarray, xq: np.ndarray) -> np.array:
|
|
|
+ """
|
|
|
+ Perform piece-wise polynomial evaluation on MATLAB's polyfit objects.
|
|
|
+
|
|
|
+ Returns
|
|
|
+ =======
|
|
|
+ """
|
|
|
+
|
|
|
+ def __ppval(c, x, b, i, n):
|
|
|
+ return c[n] * (x - b) ** i + __ppval(c, x, b, i + 1, n - 1) if n >= 0 else 0
|
|
|
+
|
|
|
+ coefs = np.array([p.coef for p in arr_MATLAB_poly])
|
|
|
+ breaks = np.array([p.domain for p in arr_MATLAB_poly])
|
|
|
+ breaks = np.array([*breaks[::2].flatten(), breaks[-1, -1]])
|
|
|
+ result = []
|
|
|
+
|
|
|
+ for x in xq:
|
|
|
+ corresponding_pp = np.nonzero((x < breaks))[0][0]
|
|
|
+ c = coefs[corresponding_pp - 1]
|
|
|
+ b = breaks[corresponding_pp - 1]
|
|
|
+ # c = coefs[2]
|
|
|
+ # res = c[0] * (x - breaks[2]) ** 2 + c[1] * (x - breaks[2]) + c[2]
|
|
|
+ res = __ppval(c, x, b, i=0, n=len(c) - 1)
|
|
|
+ result.append(res)
|
|
|
+
|
|
|
+ return np.array(result)
|
|
|
+
|
|
|
+ def read(self, file_path: str, detect_rf_use: bool = False) -> None:
|
|
|
+ """
|
|
|
+ Read `.seq` file from `file_path`.
|
|
|
+
|
|
|
+ Parameters
|
|
|
+ ----------
|
|
|
+ detect_rf_use
|
|
|
+ file_path : str
|
|
|
+ Path to `.seq` file to be read.
|
|
|
+ """
|
|
|
+ read(self, path=file_path, detect_rf_use=detect_rf_use)
|
|
|
+
|
|
|
+ def register_adc_event(self, event: EventLibrary) -> int:
|
|
|
+ return block.register_adc_event(self, event)
|
|
|
+
|
|
|
+ def register_grad_event(
|
|
|
+ self, event: SimpleNamespace
|
|
|
+ ) -> Union[int, Tuple[int, int]]:
|
|
|
+ return block.register_grad_event(self, event)
|
|
|
+
|
|
|
+ def register_label_event(self, event: SimpleNamespace) -> int:
|
|
|
+ return block.register_label_event(self, event)
|
|
|
+
|
|
|
+ def register_rf_event(self, event: SimpleNamespace) -> Tuple[int, List[int]]:
|
|
|
+ return block.register_rf_event(self, event)
|
|
|
+
|
|
|
+ def rf_from_lib_data(self, lib_data: list, use: str = str()) -> SimpleNamespace:
|
|
|
+ """
|
|
|
+ Construct RF object from `lib_data`.
|
|
|
+
|
|
|
+ Parameters
|
|
|
+ ----------
|
|
|
+ lib_data : list
|
|
|
+ RF envelope.
|
|
|
+ use : str, default=str()
|
|
|
+ RF event use.
|
|
|
+
|
|
|
+ Returns
|
|
|
+ -------
|
|
|
+ rf : SimpleNamespace
|
|
|
+ RF object constructed from `lib_data`.
|
|
|
+ """
|
|
|
+ rf = SimpleNamespace()
|
|
|
+ rf.type = "rf"
|
|
|
+
|
|
|
+ amplitude, mag_shape, phase_shape = lib_data[0], lib_data[1], lib_data[2]
|
|
|
+ shape_data = self.shape_library.data[mag_shape]
|
|
|
+ compressed = SimpleNamespace()
|
|
|
+ compressed.num_samples = shape_data[0]
|
|
|
+ compressed.data = shape_data[1:]
|
|
|
+ mag = decompress_shape(compressed)
|
|
|
+ shape_data = self.shape_library.data[phase_shape]
|
|
|
+ compressed.num_samples = shape_data[0]
|
|
|
+ compressed.data = shape_data[1:]
|
|
|
+ phase = decompress_shape(compressed)
|
|
|
+ rf.signal = amplitude * mag * np.exp(1j * 2 * np.pi * phase)
|
|
|
+ time_shape = lib_data[3]
|
|
|
+ if time_shape > 0:
|
|
|
+ shape_data = self.shape_library.data[time_shape]
|
|
|
+ compressed.num_samples = shape_data[0]
|
|
|
+ compressed.data = shape_data[1:]
|
|
|
+ rf.t = decompress_shape(compressed) * self.rf_raster_time
|
|
|
+ rf.shape_dur = (
|
|
|
+ np.ceil((rf.t[-1] - eps) / self.rf_raster_time) * self.rf_raster_time
|
|
|
+ )
|
|
|
+ else: # Generate default time raster on the fly
|
|
|
+ rf.t = (np.arange(1, len(rf.signal) + 1) - 0.5) * self.rf_raster_time
|
|
|
+ rf.shape_dur = len(rf.signal) * self.rf_raster_time
|
|
|
+
|
|
|
+ rf.delay = lib_data[4]
|
|
|
+ rf.freq_offset = lib_data[5]
|
|
|
+ rf.phase_offset = lib_data[6]
|
|
|
+
|
|
|
+ rf.dead_time = self.system.rf_dead_time
|
|
|
+ rf.ringdown_time = self.system.rf_ringdown_time
|
|
|
+
|
|
|
+ if use != "":
|
|
|
+ use_cases = {
|
|
|
+ "e": "excitation",
|
|
|
+ "r": "refocusing",
|
|
|
+ "i": "inversion",
|
|
|
+ "s": "saturation",
|
|
|
+ "p": "preparation",
|
|
|
+ }
|
|
|
+ rf.use = use_cases[use] if use in use_cases else "undefined"
|
|
|
+
|
|
|
+ return rf
|
|
|
+
|
|
|
+ def set_block(self, block_index: int, *args: SimpleNamespace) -> None:
|
|
|
+ """
|
|
|
+ Replace block at index with new block provided as block structure, add sequence block, or create a new block
|
|
|
+ from events and store at position specified by index. The block or events are provided in uncompressed form and
|
|
|
+ will be stored in the compressed, non-redundant internal libraries.
|
|
|
+
|
|
|
+ See also:
|
|
|
+ - `pypulseq.Sequence.sequence.Sequence.get_block()`
|
|
|
+ - `pypulseq.Sequence.sequence.Sequence.add_block()`
|
|
|
+
|
|
|
+ Parameters
|
|
|
+ ----------
|
|
|
+ block_index : int
|
|
|
+ Index at which block is replaced.
|
|
|
+ args : SimpleNamespace
|
|
|
+ Block or events to be replaced/added or created at `block_index`.
|
|
|
+ """
|
|
|
+ block.set_block(self, block_index, *args)
|
|
|
+
|
|
|
+ def set_definition(
|
|
|
+ self, key: str, value: Union[float, int, list, np.ndarray, str, tuple]
|
|
|
+ ) -> None:
|
|
|
+ """
|
|
|
+ Modify a custom definition of the sequence. Set the user definition 'key' to value 'value'. If the definition
|
|
|
+ does not exist it will be created.
|
|
|
+
|
|
|
+ See also `pypulseq.Sequence.sequence.Sequence.get_definition()`.
|
|
|
+
|
|
|
+ Parameters
|
|
|
+ ----------
|
|
|
+ key : str
|
|
|
+ Definition key.
|
|
|
+ value : int, list, np.ndarray, str or tuple
|
|
|
+ Definition value.
|
|
|
+ """
|
|
|
+ if key == "FOV":
|
|
|
+ if np.max(value) > 1:
|
|
|
+ text = "Definition FOV uses values exceeding 1 m. "
|
|
|
+ text += "New Pulseq interpreters expect values in units of meters."
|
|
|
+ warn(text)
|
|
|
+
|
|
|
+ self.definitions[key] = value
|
|
|
+
|
|
|
+ def set_extension_string_ID(self, extension_str: str, extension_id: int) -> None:
|
|
|
+ """
|
|
|
+ Set numeric ID for the given string extension ID.
|
|
|
+
|
|
|
+ Parameters
|
|
|
+ ----------
|
|
|
+ extension_str : str
|
|
|
+ Given string extension ID.
|
|
|
+ extension_id : int
|
|
|
+ Given numeric extension ID.
|
|
|
+
|
|
|
+ Raises
|
|
|
+ ------
|
|
|
+ ValueError
|
|
|
+ If given numeric or string extension ID is not unique.
|
|
|
+ """
|
|
|
+ if (
|
|
|
+ extension_str in self.extension_string_idx
|
|
|
+ or extension_id in self.extension_numeric_idx
|
|
|
+ ):
|
|
|
+ raise ValueError("Numeric or string ID is not unique")
|
|
|
+
|
|
|
+ self.extension_numeric_idx.append(extension_id)
|
|
|
+ self.extension_string_idx.append(extension_str)
|
|
|
+ assert len(self.extension_numeric_idx) == len(self.extension_string_idx)
|
|
|
+
|
|
|
+ def test_report(self) -> str:
|
|
|
+ """
|
|
|
+ Analyze the sequence and return a text report.
|
|
|
+ """
|
|
|
+ return ext_test_report(self)
|
|
|
+
|
|
|
+ def waveforms_and_times(
|
|
|
+ self, append_RF: bool = False
|
|
|
+ ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
|
|
|
+ """
|
|
|
+ Decompress the entire gradient waveform. Returns gradient waveforms as a tuple of `np.ndarray` of
|
|
|
+ `gradient_axes` (typically 3) dimensions. Each `np.ndarray` contains timepoints and the corresponding
|
|
|
+ gradient amplitude values. Additional return values are time points of excitations, refocusings and ADC
|
|
|
+ sampling points.
|
|
|
+
|
|
|
+ Parameters
|
|
|
+ ----------
|
|
|
+ append_RF : bool, default=False
|
|
|
+ Boolean flag to indicate if RF wave shapes are to be appended after the gradients.
|
|
|
+
|
|
|
+ Returns
|
|
|
+ -------
|
|
|
+ wave_data : np.ndarray
|
|
|
+ tfp_excitation : np.ndarray
|
|
|
+ Contains time moments, frequency and phase offsets of the excitation RF pulses (similar for `
|
|
|
+ tfp_refocusing`).
|
|
|
+ tfp_refocusing : np.ndarray
|
|
|
+ t_adc: np.ndarray
|
|
|
+ Contains times of all ADC sample points.
|
|
|
+ fp_adc : np.ndarray
|
|
|
+ Contains frequency and phase offsets of each ADC object (not samples).
|
|
|
+ """
|
|
|
+ grad_channels = ["gx", "gy", "gz"]
|
|
|
+
|
|
|
+ num_blocks = len(self.block_events)
|
|
|
+
|
|
|
+ # Collect shape pieces
|
|
|
+ if append_RF:
|
|
|
+ shape_channels = len(grad_channels) + 1 # Last 'channel' is RF
|
|
|
+ else:
|
|
|
+ shape_channels = len(grad_channels)
|
|
|
+
|
|
|
+ shape_pieces = np.empty((shape_channels, num_blocks), dtype="object")
|
|
|
+ # Also collect RF and ADC timing data
|
|
|
+ # t_excitation, t_refocusing, t_adc
|
|
|
+ tfp_excitation = []
|
|
|
+ tfp_refocusing = []
|
|
|
+ t_adc = []
|
|
|
+ fp_adc = []
|
|
|
+
|
|
|
+ curr_dur = 0
|
|
|
+ out_len = np.zeros(shape_channels) # Last 'channel' is RF
|
|
|
+
|
|
|
+ for block_counter in range(num_blocks):
|
|
|
+ block = self.get_block(block_counter + 1)
|
|
|
+ for j in range(len(grad_channels)):
|
|
|
+ grad = getattr(block, grad_channels[j])
|
|
|
+ if grad is not None: # Gradients
|
|
|
+ if grad.type == "grad":
|
|
|
+ # Check if we have an extended trapezoid or an arbitrary gradient on a regular raster
|
|
|
+ tt_rast = grad.tt / self.grad_raster_time + 0.5
|
|
|
+ if np.all(
|
|
|
+ np.abs(tt_rast - np.arange(1, len(tt_rast) + 1)) < eps
|
|
|
+ ): # Arbitrary gradient
|
|
|
+ """
|
|
|
+ Arbitrary gradient: restore & recompress shape - if we had a trapezoid converted to shape we
|
|
|
+ have to find the "corners" and we can eliminate internal samples on the straight segments
|
|
|
+ but first we have to restore samples on the edges of the gradient raster intervals for that
|
|
|
+ we need the first sample.
|
|
|
+ """
|
|
|
+ max_abs = np.max(np.abs(grad.waveform))
|
|
|
+ odd_step1 = np.array([grad.first, *(2 * grad.waveform)])
|
|
|
+ odd_step2 = odd_step1 * (
|
|
|
+ np.mod(np.arange(1, len(odd_step1) + 1), 2) * 2 - 1
|
|
|
+ )
|
|
|
+ waveform_odd_rest = np.cumsum(odd_step2) * (
|
|
|
+ np.mod(np.arange(1, len(odd_step2)), 2) * 2 - 1
|
|
|
+ )
|
|
|
+ else: # Extended trapezoid
|
|
|
+ out_len[j] += len(grad.tt)
|
|
|
+ shape_pieces[j, block_counter] = np.array(
|
|
|
+ [
|
|
|
+ curr_dur + grad.delay + grad.tt,
|
|
|
+ grad.waveform,
|
|
|
+ ]
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ if np.abs(grad.flat_time) > eps:
|
|
|
+ out_len[j] += 4
|
|
|
+ _temp = np.vstack(
|
|
|
+ (
|
|
|
+ curr_dur
|
|
|
+ + grad.delay
|
|
|
+ + np.cumsum(
|
|
|
+ [
|
|
|
+ 0,
|
|
|
+ grad.rise_time,
|
|
|
+ grad.flat_time,
|
|
|
+ grad.fall_time,
|
|
|
+ ]
|
|
|
+ ),
|
|
|
+ grad.amplitude * np.array([0, 1, 1, 0]),
|
|
|
+ )
|
|
|
+ )
|
|
|
+ shape_pieces[j, block_counter] = _temp
|
|
|
+ else:
|
|
|
+ out_len[j] += 3
|
|
|
+ _temp = np.vstack(
|
|
|
+ (
|
|
|
+ curr_dur
|
|
|
+ + grad.delay
|
|
|
+ + np.cumsum([0, grad.rise_time, grad.fall_time]),
|
|
|
+ grad.amplitude * np.array([0, 1, 0]),
|
|
|
+ )
|
|
|
+ )
|
|
|
+ shape_pieces[j, block_counter] = _temp
|
|
|
+ if block.rf is not None: # RF
|
|
|
+ rf = block.rf
|
|
|
+ t = rf.delay + calc_rf_center(rf)[0]
|
|
|
+ if not hasattr(rf, "use") or block.rf.use in [
|
|
|
+ "excitation",
|
|
|
+ "undefined",
|
|
|
+ ]:
|
|
|
+ tfp_excitation.append(
|
|
|
+ [curr_dur + t, block.rf.freq_offset, block.rf.phase_offset]
|
|
|
+ )
|
|
|
+ elif block.rf.use == "refocusing":
|
|
|
+ tfp_refocusing.append(
|
|
|
+ [curr_dur + t, block.rf.freq_offset, block.rf.phase_offset]
|
|
|
+ )
|
|
|
+ if append_RF:
|
|
|
+ rf_piece = np.array(
|
|
|
+ [
|
|
|
+ curr_dur + rf.delay + rf.t,
|
|
|
+ rf.signal
|
|
|
+ * np.exp(
|
|
|
+ 1j
|
|
|
+ * (rf.phase_offset + 2 * np.pi * rf.freq_offset * rf.t)
|
|
|
+ ),
|
|
|
+ ]
|
|
|
+ )
|
|
|
+ out_len[-1] += len(rf.t)
|
|
|
+
|
|
|
+ if np.abs(rf.signal[0]) > 0:
|
|
|
+ pre = np.array([[curr_dur + rf.delay + rf.t[0] - eps], [0]])
|
|
|
+ rf_piece = np.hstack((pre, rf_piece))
|
|
|
+ out_len[-1] += pre.shape[1]
|
|
|
+
|
|
|
+ if np.abs(rf.signal[-1]) > 0:
|
|
|
+ post = np.array([[curr_dur + rf.delay + rf.t[-1] + eps], [0]])
|
|
|
+ rf_piece = np.hstack((rf_piece, post))
|
|
|
+ out_len[-1] += post.shape[1]
|
|
|
+
|
|
|
+ shape_pieces[-1, block_counter] = rf_piece
|
|
|
+
|
|
|
+ if block.adc is not None: # ADC
|
|
|
+ t_adc.extend(
|
|
|
+ np.arange(block.adc.num_samples)
|
|
|
+ + 0.5 * block.adc.dwell
|
|
|
+ + block.adc.delay
|
|
|
+ + curr_dur
|
|
|
+ )
|
|
|
+ fp_adc.append([block.adc.freq_offset, block.adc.phase_offset])
|
|
|
+
|
|
|
+ curr_dur += self.block_durations[block_counter]
|
|
|
+
|
|
|
+ # Collect wave data
|
|
|
+ wave_data = np.empty(shape_channels, dtype="object")
|
|
|
+ for j in range(shape_channels):
|
|
|
+ wave_data[j] = np.zeros((2, int(out_len[j])))
|
|
|
+
|
|
|
+ # TODO: This is weird, and needs to be fixed. Time points are also complex this way, and spends 4 times more memory than necessary.
|
|
|
+ if append_RF:
|
|
|
+ wave_data[j] = np.zeros((2, int(out_len[j])), dtype=np.complex128)
|
|
|
+
|
|
|
+ wave_cnt = np.zeros(shape_channels, dtype=int)
|
|
|
+ for block_counter in range(num_blocks):
|
|
|
+ for j in range(shape_channels):
|
|
|
+ if shape_pieces[j, block_counter] is not None:
|
|
|
+ wave_data_local = shape_pieces[j, block_counter]
|
|
|
+ length = wave_data_local.shape[1]
|
|
|
+ if (
|
|
|
+ wave_cnt[j] == 0
|
|
|
+ or wave_data[j][0, wave_cnt[j] - 1] != wave_data_local[0, 0]
|
|
|
+ ):
|
|
|
+ wave_data[j][
|
|
|
+ :, wave_cnt[j] + np.arange(length)
|
|
|
+ ] = wave_data_local
|
|
|
+ wave_cnt[j] += length
|
|
|
+ else:
|
|
|
+ wave_data[j][
|
|
|
+ :, wave_cnt[j] + np.arange(length - 1)
|
|
|
+ ] = wave_data_local[:, 1:]
|
|
|
+ wave_cnt[j] += length - 1
|
|
|
+ if wave_cnt[j] != len(np.unique(wave_data[j][0, : wave_cnt[j]])):
|
|
|
+ raise Warning(
|
|
|
+ "Not all elements of the generated time vector are unique."
|
|
|
+ )
|
|
|
+
|
|
|
+ # Trim output data
|
|
|
+ for j in range(shape_channels):
|
|
|
+ if wave_cnt[j] < wave_data[j].shape[1]:
|
|
|
+ wave_data[j] = wave_data[j][:, : wave_cnt[j]]
|
|
|
+
|
|
|
+ tfp_excitation = np.array(tfp_excitation).transpose()
|
|
|
+ tfp_refocusing = np.array(tfp_refocusing)
|
|
|
+ t_adc = np.array(t_adc)
|
|
|
+ fp_adc = np.array(fp_adc)
|
|
|
+
|
|
|
+ return wave_data, tfp_excitation, tfp_refocusing, t_adc, fp_adc
|
|
|
+
|
|
|
+ def waveforms_export(self, time_range=(0, np.inf)) -> dict:
|
|
|
+ """
|
|
|
+ Plot `Sequence`.
|
|
|
+
|
|
|
+ Parameters
|
|
|
+ ----------
|
|
|
+ time_range : iterable, default=(0, np.inf)
|
|
|
+ Time range (x-axis limits) for all waveforms. Default is 0 to infinity (entire sequence).
|
|
|
+
|
|
|
+ Returns
|
|
|
+ -------
|
|
|
+ all_waveforms: dict
|
|
|
+ Dictionary containing the following sequence waveforms and time array(s):
|
|
|
+ - `t_adc` - ADC timing array [seconds]
|
|
|
+ - `t_rf` - RF timing array [seconds]
|
|
|
+ - `t_rf_centers`: `rf_t_centers`,
|
|
|
+ - `t_gx`: x gradient timing array,
|
|
|
+ - `t_gy`: y gradient timing array,
|
|
|
+ - `t_gz`: z gradient timing array,
|
|
|
+ - `adc` - ADC complex signal (amplitude=1, phase=adc phase) [a.u.]
|
|
|
+ - `rf` - RF complex signal
|
|
|
+ - `rf_centers`: RF centers array,
|
|
|
+ - `gx` - x gradient
|
|
|
+ - `gy` - y gradient
|
|
|
+ - `gz` - z gradient
|
|
|
+ - `grad_unit`: [kHz/m],
|
|
|
+ - `rf_unit`: [Hz],
|
|
|
+ - `time_unit`: [seconds],
|
|
|
+ """
|
|
|
+ # Check time range validity
|
|
|
+ if (
|
|
|
+ not all([isinstance(x, (int, float)) for x in time_range])
|
|
|
+ or len(time_range) != 2
|
|
|
+ ):
|
|
|
+ raise ValueError("Invalid time range")
|
|
|
+
|
|
|
+ t0 = 0
|
|
|
+ adc_t_all = np.array([])
|
|
|
+ adc_signal_all = np.array([], dtype=complex)
|
|
|
+ rf_t_all = np.array([])
|
|
|
+ rf_signal_all = np.array([], dtype=complex)
|
|
|
+ rf_t_centers = np.array([])
|
|
|
+ rf_signal_centers = np.array([], dtype=complex)
|
|
|
+ gx_t_all = np.array([])
|
|
|
+ gy_t_all = np.array([])
|
|
|
+ gz_t_all = np.array([])
|
|
|
+ gx_all = np.array([])
|
|
|
+ gy_all = np.array([])
|
|
|
+ gz_all = np.array([])
|
|
|
+
|
|
|
+ for block_counter in range(len(self.block_events)): # For each block
|
|
|
+ block = self.get_block(block_counter + 1) # Retrieve it
|
|
|
+ is_valid = (
|
|
|
+ time_range[0] <= t0 <= time_range[1]
|
|
|
+ ) # Check if "current time" is within requested range.
|
|
|
+ if is_valid:
|
|
|
+ # Case 1: ADC
|
|
|
+ if hasattr(block, "adc") and block.adc!=None:
|
|
|
+ adc = block.adc # Get adc info
|
|
|
+ # From Pulseq: According to the information from Klaus Scheffler and indirectly from Siemens this
|
|
|
+ # is the present convention - the samples are shifted by 0.5 dwell
|
|
|
+ t = adc.delay + (np.arange(int(adc.num_samples)) + 0.5) * adc.dwell
|
|
|
+ adc_t = t0 + t
|
|
|
+ adc_signal = np.exp(1j * adc.phase_offset) * np.exp(
|
|
|
+ 1j * 2 * np.pi * t * adc.freq_offset
|
|
|
+ )
|
|
|
+ adc_t_all = np.append(adc_t_all, adc_t)
|
|
|
+ adc_signal_all = np.append(adc_signal_all, adc_signal)
|
|
|
+
|
|
|
+ if hasattr(block, "rf") and block.rf!=None:
|
|
|
+ rf = block.rf
|
|
|
+ tc, ic = calc_rf_center(rf)
|
|
|
+ t = rf.t + rf.delay
|
|
|
+ tc = tc + rf.delay
|
|
|
+
|
|
|
+ # Debug - visualize
|
|
|
+ # sp12.plot(t_factor * (t0 + t), np.abs(rf.signal))
|
|
|
+ # plt.show()
|
|
|
+ # sp13.plot(t_factor * (t0 + t), np.angle(rf.signal * np.exp(1j * rf.phase_offset)
|
|
|
+ # * np.exp(1j * 2 * math.pi * rf.t * rf.freq_offset)),
|
|
|
+ # t_factor * (t0 + tc), np.angle(rf.signal[ic] * np.exp(1j * rf.phase_offset)
|
|
|
+ # * np.exp(1j * 2 * math.pi * rf.t[ic] * rf.freq_offset)),
|
|
|
+ # 'xb')
|
|
|
+ # plt.show()
|
|
|
+ rf_t = t0 + t
|
|
|
+ rf = (
|
|
|
+ rf.signal
|
|
|
+ * np.exp(1j * rf.phase_offset)
|
|
|
+ * np.exp(1j * 2 * math.pi * rf.t * rf.freq_offset)
|
|
|
+ )
|
|
|
+ rf_t_all = np.append(rf_t_all, rf_t)
|
|
|
+ rf_signal_all = np.append(rf_signal_all, rf)
|
|
|
+ rf_t_centers = np.append(rf_t_centers, rf_t[ic])
|
|
|
+ rf_signal_centers = np.append(rf_signal_centers, rf[ic])
|
|
|
+
|
|
|
+ grad_channels = ['gx', 'gy', 'gz']
|
|
|
+ for x in range(
|
|
|
+ len(grad_channels)
|
|
|
+ ): # Check each gradient channel: x, y, and z
|
|
|
+ if hasattr(
|
|
|
+ block, grad_channels[x]
|
|
|
+ ):
|
|
|
+ # If this channel is on in current block
|
|
|
+ grad = getattr(block, grad_channels[x])
|
|
|
+ if grad == None:
|
|
|
+ break
|
|
|
+ elif grad.type == "grad": # Arbitrary gradient option
|
|
|
+ # In place unpacking of grad.t with the starred expression
|
|
|
+ g_t = (
|
|
|
+ t0
|
|
|
+ + grad.delay
|
|
|
+ + [
|
|
|
+ 0,
|
|
|
+ *(grad.t + (grad.t[1] - grad.t[0]) / 2),
|
|
|
+ grad.t[-1] + grad.t[1] - grad.t[0],
|
|
|
+ ]
|
|
|
+ )
|
|
|
+ g = 1e-3 * np.array((grad.first, *grad.waveform, grad.last))
|
|
|
+ else: # Trapezoid gradient option
|
|
|
+ g_t = t0 + np.cumsum(
|
|
|
+ [
|
|
|
+ 0,
|
|
|
+ grad.delay,
|
|
|
+ grad.rise_time,
|
|
|
+ grad.flat_time,
|
|
|
+ grad.fall_time,
|
|
|
+ ]
|
|
|
+ )
|
|
|
+ g = 1e-3 * grad.amplitude * np.array([0, 0, 1, 1, 0])
|
|
|
+
|
|
|
+ if grad.channel == "x":
|
|
|
+ gx_t_all = np.append(gx_t_all, g_t)
|
|
|
+ gx_all = np.append(gx_all, g)
|
|
|
+ elif grad.channel == "y":
|
|
|
+ gy_t_all = np.append(gy_t_all, g_t)
|
|
|
+ gy_all = np.append(gy_all, g)
|
|
|
+ elif grad.channel == "z":
|
|
|
+ gz_t_all = np.append(gz_t_all, g_t)
|
|
|
+ gz_all = np.append(gz_all, g)
|
|
|
+
|
|
|
+ t0 += self.block_durations[
|
|
|
+ block_counter
|
|
|
+ ] # "Current time" gets updated to end of block just examined
|
|
|
+
|
|
|
+ all_waveforms = {
|
|
|
+ "t_adc": adc_t_all,
|
|
|
+ "t_rf": rf_t_all,
|
|
|
+ "t_rf_centers": rf_t_centers,
|
|
|
+ "t_gx": gx_t_all,
|
|
|
+ "t_gy": gy_t_all,
|
|
|
+ "t_gz": gz_t_all,
|
|
|
+ "adc": adc_signal_all,
|
|
|
+ "rf": rf_signal_all,
|
|
|
+ "rf_centers": rf_signal_centers,
|
|
|
+ "gx": gx_all,
|
|
|
+ "gy": gy_all,
|
|
|
+ "gz": gz_all,
|
|
|
+ "grad_unit": "[kHz/m]",
|
|
|
+ "rf_unit": "[Hz]",
|
|
|
+ "time_unit": "[seconds]",
|
|
|
+ }
|
|
|
+
|
|
|
+ return all_waveforms
|
|
|
+
|
|
|
+ def write(self, name: str, create_signature: bool = True) -> None:
|
|
|
+ """
|
|
|
+ Write the sequence data to the given filename using the open file format for MR sequences.
|
|
|
+
|
|
|
+ See also `pypulseq.Sequence.read_seq.read()`.
|
|
|
+
|
|
|
+ Parameters
|
|
|
+ ----------
|
|
|
+ name : str
|
|
|
+ Filename of `.seq` file to be written to disk.
|
|
|
+ create_signature : bool, default=True
|
|
|
+ Boolean flag to indicate if the file has to be signed.
|
|
|
+ """
|
|
|
+ write_seq(self, name, create_signature)
|