|
|
@@ -0,0 +1,1893 @@
|
|
|
+import itertools
|
|
|
+import math
|
|
|
+from collections import OrderedDict
|
|
|
+from types import SimpleNamespace
|
|
|
+from typing import Tuple, List
|
|
|
+from typing import Union
|
|
|
+from warnings import warn
|
|
|
+import tkinter as tk
|
|
|
+
|
|
|
+try:
|
|
|
+ from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg, NavigationToolbar2Tk
|
|
|
+ from matplotlib.figure import Figure
|
|
|
+ import matplotlib as mpl
|
|
|
+ from matplotlib import pyplot as plt
|
|
|
+except Exception:
|
|
|
+ FigureCanvasTkAgg = None
|
|
|
+ NavigationToolbar2Tk = None
|
|
|
+ Figure = None
|
|
|
+ mpl = None
|
|
|
+ plt = None
|
|
|
+
|
|
|
+try:
|
|
|
+ from typing import Self
|
|
|
+except ImportError:
|
|
|
+ from typing import TypeVar
|
|
|
+ Self = TypeVar('Self', bound='Sequence')
|
|
|
+
|
|
|
+import numpy as np
|
|
|
+try:
|
|
|
+ from scipy.interpolate import PPoly
|
|
|
+except Exception:
|
|
|
+ PPoly = None
|
|
|
+
|
|
|
+from LF_scanner.pypulseq import eps
|
|
|
+from LF_scanner.pypulseq.calc_rf_center import calc_rf_center
|
|
|
+from LF_scanner.pypulseq.check_timing import check_timing as ext_check_timing
|
|
|
+from LF_scanner.pypulseq.decompress_shape import decompress_shape
|
|
|
+from LF_scanner.pypulseq.event_lib import EventLibrary
|
|
|
+from LF_scanner.pypulseq.opts import Opts
|
|
|
+from LF_scanner.pypulseq.supported_labels_rf_use import get_supported_labels
|
|
|
+from LF_scanner.version import major, minor, revision
|
|
|
+from LF_scanner.pypulseq.block_to_events import block_to_events
|
|
|
+from LF_scanner.pypulseq.utils.cumsum import cumsum
|
|
|
+from copy import deepcopy
|
|
|
+
|
|
|
+from LF_scanner.pypulseq.Sequence import block
|
|
|
+try:
|
|
|
+ from LF_scanner.pypulseq.Sequence import parula
|
|
|
+except Exception:
|
|
|
+ parula = None
|
|
|
+from LF_scanner.pypulseq.Sequence.ext_test_report import ext_test_report
|
|
|
+from LF_scanner.pypulseq.Sequence.read_seq import read
|
|
|
+from LF_scanner.pypulseq.Sequence.write_seq import write as write_seq
|
|
|
+try:
|
|
|
+ from LF_scanner.pypulseq.Sequence.calc_pns import calc_pns
|
|
|
+except Exception:
|
|
|
+ calc_pns = None
|
|
|
+try:
|
|
|
+ from LF_scanner.pypulseq.Sequence.calc_grad_spectrum import calculate_gradient_spectrum
|
|
|
+except Exception:
|
|
|
+ calculate_gradient_spectrum = None
|
|
|
+
|
|
|
+
|
|
|
+class Sequence:
|
|
|
+ """
|
|
|
+ Generate sequences and read/write sequence files. This class defines properties and methods to define a complete MR
|
|
|
+ sequence including RF pulses, gradients, ADC events, etc. The class provides an implementation of the open MR
|
|
|
+ sequence format defined by the Pulseq project. See http://pulseq.github.io/.
|
|
|
+
|
|
|
+ See also `demo_read.py`, `demo_write.py`.
|
|
|
+ """
|
|
|
+
|
|
|
+ version_major = int(major)
|
|
|
+ version_minor = int(minor)
|
|
|
+ version_revision = revision
|
|
|
+
|
|
|
+ def __init__(self, system=None, use_block_cache=True):
|
|
|
+ if system == None:
|
|
|
+ system = Opts()
|
|
|
+
|
|
|
+ # =========
|
|
|
+ # EVENT LIBRARIES
|
|
|
+ # =========
|
|
|
+ self.adc_library = EventLibrary() # Library of ADC events
|
|
|
+ self.delay_library = EventLibrary() # Library of delay events
|
|
|
+ # Library of extension events. Extension events form single-linked zero-terminated lists
|
|
|
+ self.extensions_library = EventLibrary()
|
|
|
+ self.grad_library = EventLibrary() # Library of gradient events
|
|
|
+ self.label_inc_library = (
|
|
|
+ EventLibrary()
|
|
|
+ ) # Library of Label(inc) events (reference from the extensions library)
|
|
|
+ self.label_set_library = (
|
|
|
+ EventLibrary()
|
|
|
+ ) # Library of Label(set) events (reference from the extensions library)
|
|
|
+ self.rf_library = EventLibrary() # Library of RF events
|
|
|
+ self.shape_library = EventLibrary(numpy_data=True) # Library of compressed shapes
|
|
|
+ self.trigger_library = EventLibrary() # Library of trigger events
|
|
|
+
|
|
|
+ # =========
|
|
|
+ # OTHER
|
|
|
+ # =========
|
|
|
+ self.system = system
|
|
|
+
|
|
|
+ self.block_events = OrderedDict() # Event table
|
|
|
+ self.use_block_cache = use_block_cache
|
|
|
+ self.block_cache = dict() # Block cache
|
|
|
+ self.next_free_block_ID = 1
|
|
|
+
|
|
|
+ self.definitions = dict() # Optional sequence definitions
|
|
|
+
|
|
|
+ self.rf_raster_time = (
|
|
|
+ self.system.rf_raster_time
|
|
|
+ ) # RF raster time (system dependent)
|
|
|
+ self.grad_raster_time = (
|
|
|
+ self.system.grad_raster_time
|
|
|
+ ) # Gradient raster time (system dependent)
|
|
|
+ self.adc_raster_time = (
|
|
|
+ self.system.adc_raster_time
|
|
|
+ ) # ADC raster time (system dependent)
|
|
|
+ self.block_duration_raster = self.system.block_duration_raster
|
|
|
+ self.set_definition("AdcRasterTime", self.adc_raster_time)
|
|
|
+ self.set_definition("BlockDurationRaster", self.block_duration_raster)
|
|
|
+ self.set_definition("GradientRasterTime", self.grad_raster_time)
|
|
|
+ self.set_definition("RadiofrequencyRasterTime", self.rf_raster_time)
|
|
|
+ self.signature_type = ""
|
|
|
+ self.signature_file = ""
|
|
|
+ self.signature_value = ""
|
|
|
+
|
|
|
+ self.block_durations = dict() # Cache of block durations
|
|
|
+ self.extension_numeric_idx = [] # numeric IDs of the used extensions
|
|
|
+ self.extension_string_idx = [] # string IDs of the used extensions
|
|
|
+
|
|
|
+ def __str__(self) -> str:
|
|
|
+ s = "Sequence:"
|
|
|
+ s += "\nshape_library: " + str(self.shape_library)
|
|
|
+ s += "\nrf_library: " + str(self.rf_library)
|
|
|
+ s += "\ngrad_library: " + str(self.grad_library)
|
|
|
+ s += "\nadc_library: " + str(self.adc_library)
|
|
|
+ s += "\ndelay_library: " + str(self.delay_library)
|
|
|
+ s += "\nextensions_library: " + str(
|
|
|
+ self.extensions_library
|
|
|
+ ) # inserted for trigger support by mveldmann
|
|
|
+ s += "\nrf_raster_time: " + str(self.rf_raster_time)
|
|
|
+ s += "\ngrad_raster_time: " + str(self.grad_raster_time)
|
|
|
+ s += "\nblock_events: " + str(len(self.block_events))
|
|
|
+ return s
|
|
|
+
|
|
|
+ def adc_times(
|
|
|
+ self, time_range: List[float] = None
|
|
|
+ ) -> Tuple[np.ndarray, np.ndarray]:
|
|
|
+ """
|
|
|
+ Return time points of ADC sampling points.
|
|
|
+
|
|
|
+ Returns
|
|
|
+ -------
|
|
|
+ t_adc: np.ndarray
|
|
|
+ Contains times of all ADC sample points.
|
|
|
+ fp_adc : np.ndarray
|
|
|
+ Contains frequency and phase offsets of each ADC object (not samples).
|
|
|
+ """
|
|
|
+
|
|
|
+ # Collect ADC timing data
|
|
|
+ t_adc = []
|
|
|
+ fp_adc = []
|
|
|
+
|
|
|
+ curr_dur = 0
|
|
|
+ if time_range == None:
|
|
|
+ blocks = self.block_events
|
|
|
+ else:
|
|
|
+ if len(time_range) != 2:
|
|
|
+ raise ValueError('Time range must be list of two elements')
|
|
|
+ if time_range[0] > time_range[1]:
|
|
|
+ raise ValueError('End time of time_range must be after begin time')
|
|
|
+
|
|
|
+ # Calculate end times of each block
|
|
|
+ bd = np.array(list(self.block_durations.values()))
|
|
|
+ t = np.cumsum(bd)
|
|
|
+ # Search block end times for start of time range
|
|
|
+ begin_block = np.searchsorted(t, time_range[0])
|
|
|
+ # Search block begin times for end of time range
|
|
|
+ end_block = np.searchsorted(t - bd, time_range[1], side='right')
|
|
|
+ blocks = list(self.block_durations.keys())[begin_block:end_block]
|
|
|
+ curr_dur = t[begin_block] - bd[begin_block]
|
|
|
+
|
|
|
+ for block_counter in blocks:
|
|
|
+ block = self.get_block(block_counter)
|
|
|
+
|
|
|
+ if block.adc is not None: # ADC
|
|
|
+ t_adc.append(
|
|
|
+ (np.arange(block.adc.num_samples) + 0.5) * block.adc.dwell
|
|
|
+ + block.adc.delay
|
|
|
+ + curr_dur
|
|
|
+ )
|
|
|
+ fp_adc.append([block.adc.freq_offset, block.adc.phase_offset])
|
|
|
+
|
|
|
+ curr_dur += self.block_durations[block_counter]
|
|
|
+
|
|
|
+ if t_adc == []:
|
|
|
+ # If there are no ADCs, make sure the output is the right shape
|
|
|
+ t_adc = np.zeros(0)
|
|
|
+ fp_adc = np.zeros((0, 2))
|
|
|
+ else:
|
|
|
+ t_adc = np.concatenate(t_adc)
|
|
|
+ fp_adc = np.array(fp_adc)
|
|
|
+
|
|
|
+ return t_adc, fp_adc
|
|
|
+
|
|
|
+ def add_block(self, *args: SimpleNamespace) -> None:
|
|
|
+ """
|
|
|
+ Add a new block/multiple events to the sequence. Adds a sequence block with provided as a block structure
|
|
|
+
|
|
|
+ See also:
|
|
|
+ - `pypulseq.Sequence.sequence.Sequence.set_block()`
|
|
|
+ - `pypulseq.make_adc.make_adc()`
|
|
|
+ - `pypulseq.make_trapezoid.make_trapezoid()`
|
|
|
+ - `pypulseq.make_sinc_pulse.make_sinc_pulse()`
|
|
|
+
|
|
|
+ Parameters
|
|
|
+ ----------
|
|
|
+ args : SimpleNamespace
|
|
|
+ Block structure or events to be added as a block to `Sequence`.
|
|
|
+ """
|
|
|
+ block.set_block(self, self.next_free_block_ID, *args)
|
|
|
+ self.next_free_block_ID += 1
|
|
|
+
|
|
|
+ def calculate_gradient_spectrum(
|
|
|
+ self, max_frequency: float = 2000,
|
|
|
+ window_width: float = 0.05,
|
|
|
+ frequency_oversampling: float = 3,
|
|
|
+ time_range: List[float] = None,
|
|
|
+ plot: bool = True,
|
|
|
+ combine_mode: str = 'max',
|
|
|
+ use_derivative: bool = False,
|
|
|
+ acoustic_resonances: List[dict] = []
|
|
|
+ ) -> Tuple[List[np.ndarray], np.ndarray, np.ndarray, np.ndarray]:
|
|
|
+ """
|
|
|
+ Calculates the gradient spectrum of the sequence. Returns a spectrogram
|
|
|
+ for each gradient channel, as well as a root-sum-squares combined
|
|
|
+ spectrogram.
|
|
|
+
|
|
|
+ Works by splitting the sequence into windows that are 'window_width'
|
|
|
+ long and calculating the fourier transform of each window. Windows
|
|
|
+ overlap 50% with the previous and next window. When 'combine_mode' is
|
|
|
+ not 'none', all windows are combined into one spectrogram.
|
|
|
+
|
|
|
+ Parameters
|
|
|
+ ----------
|
|
|
+ max_frequency : float, optional
|
|
|
+ Maximum frequency to include in spectrograms. The default is 2000.
|
|
|
+ window_width : float, optional
|
|
|
+ Window width (in seconds). The default is 0.05.
|
|
|
+ frequency_oversampling : float, optional
|
|
|
+ Oversampling in the frequency dimension, higher values make
|
|
|
+ smoother spectrograms. The default is 3.
|
|
|
+ time_range : List[float], optional
|
|
|
+ Time range over which to calculate the spectrograms as a list of
|
|
|
+ two timepoints (in seconds) (e.g. [1, 1.5])
|
|
|
+ The default is None.
|
|
|
+ plot : bool, optional
|
|
|
+ Whether to plot the spectograms. The default is True.
|
|
|
+ combine_mode : str, optional
|
|
|
+ How to combine all windows into one spectrogram, options:
|
|
|
+ 'max', 'mean', 'sos' (root-sum-of-squares), 'none' (no combination)
|
|
|
+ The default is 'max'.
|
|
|
+ use_derivative : bool, optional
|
|
|
+ Whether the use the derivative of the gradient waveforms instead of the
|
|
|
+ gradient waveforms for the gradient spectrum calculations. The default
|
|
|
+ is False
|
|
|
+ acoustic_resonances : List[dict], optional
|
|
|
+ Acoustic resonances as a list of dictionaries with 'frequency' and
|
|
|
+ 'bandwidth' elements. Only used when plot==True. The default is [].
|
|
|
+
|
|
|
+ Returns
|
|
|
+ -------
|
|
|
+ spectrograms : List[np.ndarray]
|
|
|
+ List of spectrograms per gradient channel.
|
|
|
+ spectrogram_sos : np.ndarray
|
|
|
+ Root-sum-of-squares combined spectrogram over all gradient channels.
|
|
|
+ frequencies : np.ndarray
|
|
|
+ Frequency axis of the spectrograms.
|
|
|
+ times : np.ndarray
|
|
|
+ Time axis of the spectrograms (only relevant when combine_mode == 'none').
|
|
|
+
|
|
|
+ """
|
|
|
+ return calculate_gradient_spectrum(self, max_frequency=max_frequency,
|
|
|
+ window_width=window_width,
|
|
|
+ frequency_oversampling=frequency_oversampling,
|
|
|
+ time_range=time_range,
|
|
|
+ plot=plot,
|
|
|
+ combine_mode=combine_mode,
|
|
|
+ use_derivative=use_derivative,
|
|
|
+ acoustic_resonances=acoustic_resonances)
|
|
|
+
|
|
|
+ def calculate_kspace(
|
|
|
+ self,
|
|
|
+ trajectory_delay: Union[float, List[float], np.ndarray] = 0,
|
|
|
+ gradient_offset: Union[float, List[float], np.ndarray] = 0
|
|
|
+ ) -> Tuple[np.array, np.array, List[float], List[float], np.array]:
|
|
|
+ """
|
|
|
+ Calculates the k-space trajectory of the entire pulse sequence.
|
|
|
+
|
|
|
+ Parameters
|
|
|
+ ----------
|
|
|
+ trajectory_delay : float or list, default=0
|
|
|
+ Compensation factor in seconds (s) to align ADC and gradients in the reconstruction.
|
|
|
+ gradient_offset : float or list, default=0
|
|
|
+ Simulates background gradients (specified in Hz/m)
|
|
|
+
|
|
|
+ Returns
|
|
|
+ -------
|
|
|
+ k_traj_adc : numpy.array
|
|
|
+ K-space trajectory sampled at `t_adc` timepoints.
|
|
|
+ k_traj : numpy.array
|
|
|
+ K-space trajectory of the entire pulse sequence.
|
|
|
+ t_excitation : List[float]
|
|
|
+ Excitation timepoints.
|
|
|
+ t_refocusing : List[float]
|
|
|
+ Refocusing timepoints.
|
|
|
+ t_adc : numpy.array
|
|
|
+ Sampling timepoints.
|
|
|
+ """
|
|
|
+ if np.any(np.abs(trajectory_delay) > 100e-6):
|
|
|
+ raise Warning(
|
|
|
+ f"Trajectory delay of {trajectory_delay * 1e6} us is suspiciously high"
|
|
|
+ )
|
|
|
+
|
|
|
+ total_duration = sum(self.block_durations.values())
|
|
|
+
|
|
|
+ t_excitation, fp_excitation, t_refocusing, _ = self.rf_times()
|
|
|
+ t_adc, _ = self.adc_times()
|
|
|
+
|
|
|
+ # Convert data to piecewise polynomials
|
|
|
+ gw_pp = self.get_gradients(trajectory_delay, gradient_offset)
|
|
|
+ ng = len(gw_pp)
|
|
|
+
|
|
|
+ # Calculate slice positions. For now we entirely rely on the excitation -- ignoring complicated interleaved
|
|
|
+ # refocused sequences
|
|
|
+ if len(t_excitation) > 0:
|
|
|
+ # Position in x, y, z
|
|
|
+ slice_pos = np.zeros((ng, len(t_excitation)))
|
|
|
+ for j in range(ng):
|
|
|
+ if gw_pp[j] is None:
|
|
|
+ slice_pos[j] = np.nan
|
|
|
+ else:
|
|
|
+ # Check for divisions by zero to avoid numpy warning
|
|
|
+ divisor = np.array(gw_pp[j](t_excitation))
|
|
|
+ slice_pos[j, divisor != 0.0] = fp_excitation[0, divisor != 0.0] / divisor[divisor != 0.0]
|
|
|
+ slice_pos[j, divisor == 0.0] = np.nan
|
|
|
+
|
|
|
+ slice_pos[~np.isfinite(slice_pos)] = 0 # Reset undefined to 0
|
|
|
+ else:
|
|
|
+ slice_pos = []
|
|
|
+
|
|
|
+ # =========
|
|
|
+ # Integrate waveforms as PPs to produce gradient moments
|
|
|
+ gm_pp = []
|
|
|
+ tc = []
|
|
|
+ for i in range(ng):
|
|
|
+ if gw_pp[i] is None:
|
|
|
+ gm_pp.append(None)
|
|
|
+ continue
|
|
|
+
|
|
|
+ gm_pp.append(gw_pp[i].antiderivative())
|
|
|
+ tc.append(gm_pp[i].x)
|
|
|
+ # "Sample" ramps for display purposes otherwise piecewise-linear display (plot) fails
|
|
|
+ ii = np.flatnonzero(np.abs(gm_pp[i].c[0, :]) > eps)
|
|
|
+
|
|
|
+ # Do nothing if there are no ramps
|
|
|
+ if ii.shape[0] == 0:
|
|
|
+ continue
|
|
|
+
|
|
|
+ starts = np.int64(np.floor((gm_pp[i].x[ii] + eps) / self.grad_raster_time))
|
|
|
+ ends = np.int64(np.ceil((gm_pp[i].x[ii + 1] - eps) / self.grad_raster_time))
|
|
|
+
|
|
|
+ # Create all ranges starts[0]:ends[0], starts[1]:ends[1], etc.
|
|
|
+ lengths = ends - starts + 1
|
|
|
+ inds = np.ones((lengths).sum())
|
|
|
+ # Calculate output index where each range will start
|
|
|
+ start_inds = np.cumsum(np.concatenate(([0], lengths[:-1])))
|
|
|
+ # Create element-wise differences that will cumsum into
|
|
|
+ # the final indices: [starts[0], 1, 1, starts[1]-starts[0]-lengths[0]+1, 1, etc.]
|
|
|
+ inds[start_inds] = np.concatenate(([starts[0]], np.diff(starts) - lengths[:-1] + 1))
|
|
|
+
|
|
|
+ tc.append(np.cumsum(inds) * self.grad_raster_time)
|
|
|
+ if tc != []:
|
|
|
+ tc = np.concatenate(tc)
|
|
|
+
|
|
|
+ t_acc = 1e-10 # Temporal accuracy
|
|
|
+ t_acc_inv = 1 / t_acc
|
|
|
+ # tc = self.__flatten_jagged_arr(tc)
|
|
|
+ t_ktraj = t_acc * np.unique(
|
|
|
+ np.round(
|
|
|
+ t_acc_inv
|
|
|
+ * np.array(
|
|
|
+ [
|
|
|
+ *tc,
|
|
|
+ 0,
|
|
|
+ *np.asarray(t_excitation) - 2 * self.rf_raster_time,
|
|
|
+ *np.asarray(t_excitation) - self.rf_raster_time,
|
|
|
+ *t_excitation,
|
|
|
+ *np.asarray(t_refocusing) - self.rf_raster_time,
|
|
|
+ *t_refocusing,
|
|
|
+ *t_adc,
|
|
|
+ total_duration,
|
|
|
+ ]
|
|
|
+ )
|
|
|
+ )
|
|
|
+ )
|
|
|
+
|
|
|
+ i_excitation = np.searchsorted(t_ktraj, t_acc * np.round(t_acc_inv * np.asarray(t_excitation)))
|
|
|
+ i_refocusing = np.searchsorted(t_ktraj, t_acc * np.round(t_acc_inv * np.asarray(t_refocusing)))
|
|
|
+ i_adc = np.searchsorted(t_ktraj, t_acc * np.round(t_acc_inv * np.asarray(t_adc)))
|
|
|
+
|
|
|
+ i_periods = np.unique([0, *i_excitation, *i_refocusing, len(t_ktraj) - 1])
|
|
|
+ if len(i_excitation) > 0:
|
|
|
+ ii_next_excitation = 0
|
|
|
+ else:
|
|
|
+ ii_next_excitation = -1
|
|
|
+ if len(i_refocusing) > 0:
|
|
|
+ ii_next_refocusing = 0
|
|
|
+ else:
|
|
|
+ ii_next_refocusing = -1
|
|
|
+
|
|
|
+ k_traj = np.zeros((ng, len(t_ktraj)))
|
|
|
+ for i in range(ng):
|
|
|
+ if gw_pp[i] is None:
|
|
|
+ continue
|
|
|
+
|
|
|
+ it = np.where(np.logical_and(
|
|
|
+ t_ktraj >= t_acc * round(t_acc_inv * gm_pp[i].x[0]),
|
|
|
+ t_ktraj <= t_acc * round(t_acc_inv * gm_pp[i].x[-1]),
|
|
|
+ ))[0]
|
|
|
+ k_traj[i, it] = gm_pp[i](t_ktraj[it])
|
|
|
+ if t_ktraj[it[-1]] < t_ktraj[-1]:
|
|
|
+ k_traj[i, it[-1] + 1:] = k_traj[i, it[-1]]
|
|
|
+
|
|
|
+ # Convert gradient moments to kspace
|
|
|
+ dk = -k_traj[:, 0]
|
|
|
+ for i in range(len(i_periods) - 1):
|
|
|
+ i_period = i_periods[i]
|
|
|
+ i_period_end = i_periods[i + 1]
|
|
|
+ if ii_next_excitation >= 0 and i_excitation[ii_next_excitation] == i_period:
|
|
|
+ if abs(t_ktraj[i_period] - t_excitation[ii_next_excitation]) > t_acc:
|
|
|
+ raise Warning(
|
|
|
+ f"abs(t_ktraj[i_period]-t_excitation[ii_next_excitation]) < {t_acc} failed for ii_next_excitation={ii_next_excitation} error={t_ktraj(i_period) - t_excitation(ii_next_excitation)}"
|
|
|
+ )
|
|
|
+ dk = -k_traj[:, i_period]
|
|
|
+ if i_period > 0:
|
|
|
+ # Use nans to mark the excitation points since they interrupt the plots
|
|
|
+ k_traj[:, i_period - 1] = np.nan
|
|
|
+ # -1 on len(i_excitation) for 0-based indexing
|
|
|
+ ii_next_excitation = min(len(i_excitation) - 1, ii_next_excitation + 1)
|
|
|
+ elif (
|
|
|
+ ii_next_refocusing >= 0 and i_refocusing[ii_next_refocusing] == i_period
|
|
|
+ ):
|
|
|
+ # dk = -k_traj[:, i_period]
|
|
|
+ dk = -2 * k_traj[:, i_period] - dk
|
|
|
+ # -1 on len(i_excitation) for 0-based indexing
|
|
|
+ ii_next_refocusing = min(len(i_refocusing) - 1, ii_next_refocusing + 1)
|
|
|
+
|
|
|
+ k_traj[:, i_period:i_period_end] = (
|
|
|
+ k_traj[:, i_period:i_period_end] + dk[:, None]
|
|
|
+ )
|
|
|
+
|
|
|
+ k_traj[:, i_period_end] = k_traj[:, i_period_end] + dk
|
|
|
+ k_traj_adc = k_traj[:, i_adc]
|
|
|
+
|
|
|
+ return k_traj_adc, k_traj, t_excitation, t_refocusing, t_adc
|
|
|
+
|
|
|
+ def calculate_kspacePP(
|
|
|
+ self,
|
|
|
+ trajectory_delay: Union[float, List[float], np.ndarray] = 0,
|
|
|
+ gradient_offset: Union[float, List[float], np.ndarray] = 0
|
|
|
+ ) -> Tuple[np.array, np.array, np.array, np.array, np.array]:
|
|
|
+ warn('Sequence.calculate_kspacePP has been deprecated, use calculate_kspace instead', DeprecationWarning,
|
|
|
+ stacklevel=2)
|
|
|
+ return self.calculate_kspace(trajectory_delay, gradient_offset)
|
|
|
+
|
|
|
+ def calculate_pns(
|
|
|
+ self,
|
|
|
+ hardware: SimpleNamespace,
|
|
|
+ time_range: List[float] = None,
|
|
|
+ do_plots: bool = True
|
|
|
+ ) -> Tuple[bool, np.array, np.ndarray, np.array]:
|
|
|
+ """
|
|
|
+ Calculate PNS using safe model implementation by Szczepankiewicz and Witzel
|
|
|
+ See http://github.com/filip-szczepankiewicz/safe_pns_prediction
|
|
|
+
|
|
|
+ Returns pns levels due to respective axes (normalized to 1 and not to 100#)
|
|
|
+
|
|
|
+ Parameters
|
|
|
+ ----------
|
|
|
+ hardware : SimpleNamespace
|
|
|
+ Hardware specifications. See safe_example_hw() from
|
|
|
+ the safe_pns_prediction package. Alternatively a text file
|
|
|
+ in the .asc format (Siemens) can be passed, e.g. for Prisma
|
|
|
+ it is MP_GPA_K2309_2250V_951A_AS82.asc (we leave it as an
|
|
|
+ exercise to the interested user to find were these files
|
|
|
+ can be acquired from)
|
|
|
+ do_plots : bool, optional
|
|
|
+ Plot the results from the PNS calculations. The default is True.
|
|
|
+
|
|
|
+ Returns
|
|
|
+ -------
|
|
|
+ ok : bool
|
|
|
+ Boolean flag indicating whether peak PNS is within acceptable limits
|
|
|
+ pns_norm : numpy.array [N]
|
|
|
+ PNS norm over all gradient channels, normalized to 1
|
|
|
+ pns_components : numpy.array [Nx3]
|
|
|
+ PNS levels per gradient channel
|
|
|
+ t_pns : np.array [N]
|
|
|
+ Time axis for the pns_norm and pns_components arrays
|
|
|
+ """
|
|
|
+ return calc_pns(self, hardware, time_range=time_range, do_plots=do_plots)
|
|
|
+
|
|
|
+ def check_timing(self) -> Tuple[bool, List[str]]:
|
|
|
+ """
|
|
|
+ Checks timing of all blocks and objects in the sequence optionally returns the detailed error log. This
|
|
|
+ function also modifies the sequence object by adding the field "TotalDuration" to sequence definitions.
|
|
|
+
|
|
|
+ Returns
|
|
|
+ -------
|
|
|
+ is_ok : bool
|
|
|
+ Boolean flag indicating timing errors.
|
|
|
+ error_report : str
|
|
|
+ Error report in case of timing errors.
|
|
|
+ """
|
|
|
+ error_report = []
|
|
|
+ is_ok = True
|
|
|
+ total_duration = 0
|
|
|
+
|
|
|
+ for block_counter in self.block_events:
|
|
|
+ block = self.get_block(block_counter)
|
|
|
+ events = block_to_events(block)
|
|
|
+ res, rep, duration = ext_check_timing(self.system, *events)
|
|
|
+ is_ok = is_ok and res
|
|
|
+
|
|
|
+ # Check the stored total block duration
|
|
|
+ if abs(duration - self.block_durations[block_counter]) > eps:
|
|
|
+ rep += "Inconsistency between the stored block duration and the duration of the block content"
|
|
|
+ is_ok = False
|
|
|
+ duration = self.block_durations[block_counter]
|
|
|
+
|
|
|
+ # Check RF dead times
|
|
|
+ if block.rf is not None:
|
|
|
+ if block.rf.delay - block.rf.dead_time < -eps:
|
|
|
+ rep += (
|
|
|
+ f"Delay of {block.rf.delay * 1e6} us is smaller than the RF dead time "
|
|
|
+ f"{block.rf.dead_time * 1e6} us"
|
|
|
+ )
|
|
|
+ is_ok = False
|
|
|
+
|
|
|
+ if (
|
|
|
+ block.rf.delay + block.rf.t[-1] + block.rf.ringdown_time - duration
|
|
|
+ > eps
|
|
|
+ ):
|
|
|
+ rep += (
|
|
|
+ f"Time between the end of the RF pulse at {block.rf.delay + block.rf.t[-1]} and the end "
|
|
|
+ f"of the block at {duration * 1e6} us is shorter than rf_ringdown_time"
|
|
|
+ )
|
|
|
+ is_ok = False
|
|
|
+
|
|
|
+ # Check ADC dead times
|
|
|
+ if block.adc is not None:
|
|
|
+ if block.adc.delay - self.system.adc_dead_time < -eps:
|
|
|
+ rep += "adc.delay < system.adc_dead_time"
|
|
|
+ is_ok = False
|
|
|
+
|
|
|
+ if (
|
|
|
+ block.adc.delay
|
|
|
+ + block.adc.num_samples * block.adc.dwell
|
|
|
+ + self.system.adc_dead_time
|
|
|
+ - duration
|
|
|
+ > eps
|
|
|
+ ):
|
|
|
+ rep += "adc: system.adc_dead_time (post-ADC) violation"
|
|
|
+ is_ok = False
|
|
|
+
|
|
|
+ # Update report
|
|
|
+ if len(rep) != 0:
|
|
|
+ error_report.append(f"Event: {block_counter} - {rep}\n")
|
|
|
+ total_duration += duration
|
|
|
+
|
|
|
+ # Check if all the gradients in the last block are ramped down properly
|
|
|
+ if len(events) != 0 and all([isinstance(e, SimpleNamespace) for e in events]):
|
|
|
+ for e in range(len(events)):
|
|
|
+ if not isinstance(events[e], list) and events[e].type == "grad":
|
|
|
+ if events[e].last != 0:
|
|
|
+ error_report.append(
|
|
|
+ f"Event: {list(self.block_events)[-1]} - Gradients do not ramp to 0 at the end of the sequence"
|
|
|
+ )
|
|
|
+
|
|
|
+ self.set_definition("TotalDuration", total_duration)
|
|
|
+
|
|
|
+ return is_ok, error_report
|
|
|
+
|
|
|
+ def duration(self) -> Tuple[int, int, np.ndarray]:
|
|
|
+ """
|
|
|
+ Returns the total duration of this sequence, and the total count of blocks and events.
|
|
|
+
|
|
|
+ Returns
|
|
|
+ -------
|
|
|
+ duration : int
|
|
|
+ Duration of this sequence in seconds (s).
|
|
|
+ num_blocks : int
|
|
|
+ Number of blocks in this sequence.
|
|
|
+ event_count : np.ndarray
|
|
|
+ Number of events in this sequence.
|
|
|
+ """
|
|
|
+ num_blocks = len(self.block_events)
|
|
|
+ event_count = np.zeros(len(next(iter(self.block_events.values()))))
|
|
|
+ duration = 0
|
|
|
+ for block_counter in self.block_events:
|
|
|
+ event_count += self.block_events[block_counter] > 0
|
|
|
+ duration += self.block_durations[block_counter]
|
|
|
+
|
|
|
+ return duration, num_blocks, event_count
|
|
|
+
|
|
|
+ def evaluate_labels(
|
|
|
+ self,
|
|
|
+ init: dict = None,
|
|
|
+ evolution: str = 'none'
|
|
|
+ ) -> dict:
|
|
|
+ """
|
|
|
+ Evaluate label values of the entire sequence.
|
|
|
+
|
|
|
+ When no evolution is given, returns the label values at the end of the
|
|
|
+ sequence. Returns a dictionary with keys named after the labels used
|
|
|
+ in the sequence. Only the keys corresponding to the labels actually
|
|
|
+ used are created.
|
|
|
+ E.g. labels['LIN'] == 4
|
|
|
+
|
|
|
+ When evolution is given, labels are tracked through the sequence. See
|
|
|
+ below for options for different types of evolutions. The resulting
|
|
|
+ dictionary will contain arrays of the label values.
|
|
|
+ E.g. labels['LIN'] == np.array([0,1,2,3,4])
|
|
|
+
|
|
|
+ Initial values for the labels can be given with the 'init' parameter.
|
|
|
+ Useful if evaluating labels block-by-block.
|
|
|
+
|
|
|
+ Parameters
|
|
|
+ ----------
|
|
|
+ init : dict, optional
|
|
|
+ Dictionary containing initial label values. The default is None.
|
|
|
+ evolution : str, optional
|
|
|
+ Flag to specify tracking of label evolutions.
|
|
|
+ Must be one of: 'none', 'adc', 'label', 'blocks' (default = 'none')
|
|
|
+ 'blocks': Return label values for all blocks.
|
|
|
+ 'adc': Return label values only for blocks containing ADC events.
|
|
|
+ 'label': Return label values only for blocks where labels are
|
|
|
+ manipulated.
|
|
|
+
|
|
|
+ Returns
|
|
|
+ -------
|
|
|
+ labels : dict
|
|
|
+ Dictionary containing label values.
|
|
|
+ If evolution == 'none', the dictionary values only contains the
|
|
|
+ final label value.
|
|
|
+ Otherwise, the dictionary values are arrays of label evolutions.
|
|
|
+ Only the labels that are used in the sequence are created in the
|
|
|
+ dictionary.
|
|
|
+
|
|
|
+ """
|
|
|
+ labels = init or dict()
|
|
|
+ label_evolution = []
|
|
|
+
|
|
|
+ # TODO: MATLAB implementation includes block_range parameter. But in
|
|
|
+ # general we cannot assume linear block ordering. Could include
|
|
|
+ # time_range like in other sequence functions. Or a blocks
|
|
|
+ # parameter to specify which blocks to loop over?
|
|
|
+ for block_counter in self.block_events:
|
|
|
+ block = self.get_block(block_counter)
|
|
|
+
|
|
|
+ if block.label is not None:
|
|
|
+ # Current block has labels
|
|
|
+ for lab in block.label.values():
|
|
|
+ if lab.type == 'labelinc':
|
|
|
+ # Increment label
|
|
|
+ if lab.label not in labels:
|
|
|
+ labels[lab.label] = 0
|
|
|
+
|
|
|
+ labels[lab.label] += lab.value
|
|
|
+ else:
|
|
|
+ # Set label
|
|
|
+ labels[lab.label] = lab.value
|
|
|
+
|
|
|
+ if evolution == 'label':
|
|
|
+ label_evolution.append(dict(labels))
|
|
|
+
|
|
|
+ if evolution == 'blocks' or (evolution == 'adc' and block.adc is not None):
|
|
|
+ label_evolution.append(dict(labels))
|
|
|
+
|
|
|
+ # Convert evolutions into label dictionary
|
|
|
+ if len(label_evolution) > 0:
|
|
|
+ for lab in labels:
|
|
|
+ labels[lab] = np.array([e[lab] if lab in e else 0 for e in label_evolution])
|
|
|
+
|
|
|
+ return labels
|
|
|
+
|
|
|
+ def flip_grad_axis(self, axis: str) -> None:
|
|
|
+ """
|
|
|
+ Invert all gradients along the corresponding axis/channel. The function acts on all gradient objects already
|
|
|
+ added to the sequence object.
|
|
|
+
|
|
|
+ Parameters
|
|
|
+ ----------
|
|
|
+ axis : str
|
|
|
+ Gradients to invert or scale. Must be one of 'x', 'y' or 'z'.
|
|
|
+ """
|
|
|
+ self.mod_grad_axis(axis, modifier=-1)
|
|
|
+
|
|
|
+ def get_block(self, block_index: int) -> SimpleNamespace:
|
|
|
+ """
|
|
|
+ Return a block of the sequence specified by the index. The block is created from the sequence data with all
|
|
|
+ events and shapes decompressed.
|
|
|
+
|
|
|
+ See also:
|
|
|
+ - `pypulseq.Sequence.sequence.Sequence.set_block()`.
|
|
|
+ - `pypulseq.Sequence.sequence.Sequence.add_block()`.
|
|
|
+
|
|
|
+ Parameters
|
|
|
+ ----------
|
|
|
+ block_index : int
|
|
|
+ Index of block to be retrieved from `Sequence`.
|
|
|
+
|
|
|
+ Returns
|
|
|
+ -------
|
|
|
+ SimpleNamespace
|
|
|
+ Event identified by `block_index`.
|
|
|
+ """
|
|
|
+ return block.get_block(self, block_index)
|
|
|
+
|
|
|
+ def get_definition(self, key: str) -> str:
|
|
|
+ """
|
|
|
+ Return value of the definition specified by the key. These definitions can be added manually or read from the
|
|
|
+ header of a sequence file defined in the sequence header. An empty array is returned if the key is not defined.
|
|
|
+
|
|
|
+ See also `pypulseq.Sequence.sequence.Sequence.set_definition()`.
|
|
|
+
|
|
|
+ Parameters
|
|
|
+ ----------
|
|
|
+ key : str
|
|
|
+ Key of definition to retrieve.
|
|
|
+
|
|
|
+ Returns
|
|
|
+ -------
|
|
|
+ str
|
|
|
+ Definition identified by `key` if found, else returns ''.
|
|
|
+ """
|
|
|
+ if key in self.definitions:
|
|
|
+ return self.definitions[key]
|
|
|
+ else:
|
|
|
+ return ""
|
|
|
+
|
|
|
+ def get_extension_type_ID(self, extension_string: str) -> int:
|
|
|
+ """
|
|
|
+ Get numeric extension ID for `extension_string`. Will automatically create a new ID if unknown.
|
|
|
+
|
|
|
+ Parameters
|
|
|
+ ----------
|
|
|
+ extension_string : str
|
|
|
+ Given string extension ID.
|
|
|
+
|
|
|
+ Returns
|
|
|
+ -------
|
|
|
+ extension_id : int
|
|
|
+ Numeric ID for given string extension ID.
|
|
|
+
|
|
|
+ """
|
|
|
+ if extension_string not in self.extension_string_idx:
|
|
|
+ if len(self.extension_numeric_idx) == 0:
|
|
|
+ extension_id = 1
|
|
|
+ else:
|
|
|
+ extension_id = 1 + max(self.extension_numeric_idx)
|
|
|
+
|
|
|
+ self.extension_numeric_idx.append(extension_id)
|
|
|
+ self.extension_string_idx.append(extension_string)
|
|
|
+ assert len(self.extension_numeric_idx) == len(self.extension_string_idx)
|
|
|
+ else:
|
|
|
+ num = self.extension_string_idx.index(extension_string)
|
|
|
+ extension_id = self.extension_numeric_idx[num]
|
|
|
+
|
|
|
+ return extension_id
|
|
|
+
|
|
|
+ def get_extension_type_string(self, extension_id: int) -> str:
|
|
|
+ """
|
|
|
+ Get string extension ID for `extension_id`.
|
|
|
+
|
|
|
+ Parameters
|
|
|
+ ----------
|
|
|
+ extension_id : int
|
|
|
+ Given numeric extension ID.
|
|
|
+
|
|
|
+ Returns
|
|
|
+ -------
|
|
|
+ extension_str : str
|
|
|
+ String ID for the given numeric extension ID.
|
|
|
+
|
|
|
+ Raises
|
|
|
+ ------
|
|
|
+ ValueError
|
|
|
+ If given numeric extension ID is unknown.
|
|
|
+ """
|
|
|
+ if extension_id in self.extension_numeric_idx:
|
|
|
+ num = self.extension_numeric_idx.index(extension_id)
|
|
|
+ else:
|
|
|
+ raise ValueError(
|
|
|
+ f"Extension for the given ID - {extension_id} - is unknown."
|
|
|
+ )
|
|
|
+
|
|
|
+ extension_str = self.extension_string_idx[num]
|
|
|
+ return extension_str
|
|
|
+
|
|
|
+ def get_gradients(self,
|
|
|
+ trajectory_delay: Union[float, List[float], np.ndarray] = 0,
|
|
|
+ gradient_offset: Union[float, List[float], np.ndarray] = 0,
|
|
|
+ time_range: List[float] = None) -> List[PPoly]:
|
|
|
+ """
|
|
|
+ Get all gradient waveforms of the sequence in a piecewise-polynomial
|
|
|
+ format (scipy PPoly). Gradient values can be accessed easily at one or
|
|
|
+ more timepoints using `gw_pp[channel](t)` (where t is a float, list of
|
|
|
+ floats, or numpy array). Note that PPoly objects return nan for
|
|
|
+ timepoints outside the waveform.
|
|
|
+
|
|
|
+ Parameters
|
|
|
+ ----------
|
|
|
+ trajectory_delay : float or list, default=0
|
|
|
+ Compensation factor in seconds (s) to align ADC and gradients in the reconstruction.
|
|
|
+ gradient_offset : float or list, default=0
|
|
|
+ Simulates background gradients (specified in Hz/m)
|
|
|
+
|
|
|
+ Returns
|
|
|
+ -------
|
|
|
+ gw_pp : List[PPoly]
|
|
|
+ List of gradient waveforms for each of the gradient channels,
|
|
|
+ expressed as scipy PPoly objects.
|
|
|
+ """
|
|
|
+ if np.any(np.abs(trajectory_delay) > 100e-6):
|
|
|
+ raise Warning(
|
|
|
+ f"Trajectory delay of {trajectory_delay * 1e6} us is suspiciously high"
|
|
|
+ )
|
|
|
+
|
|
|
+ total_duration = sum(self.block_durations.values())
|
|
|
+
|
|
|
+ gw_data = self.waveforms(time_range=time_range)
|
|
|
+ ng = len(gw_data)
|
|
|
+
|
|
|
+ # Gradient delay handling
|
|
|
+ if isinstance(trajectory_delay, (int, float)):
|
|
|
+ gradient_delays = [trajectory_delay] * ng
|
|
|
+ else:
|
|
|
+ assert (len(trajectory_delay) == ng) # Need to have same number of gradient channels
|
|
|
+ gradient_delays = [trajectory_delay] * ng
|
|
|
+
|
|
|
+ # Gradient offset handling
|
|
|
+ if isinstance(gradient_offset, (int, float)):
|
|
|
+ gradient_offset = [gradient_offset] * ng
|
|
|
+ else:
|
|
|
+ assert (len(gradient_offset) == ng) # Need to have same number of gradient channels
|
|
|
+
|
|
|
+ # Convert data to piecewise polynomials
|
|
|
+ gw_pp = []
|
|
|
+ for j in range(ng):
|
|
|
+ wave_cnt = gw_data[j].shape[1]
|
|
|
+ if wave_cnt == 0:
|
|
|
+ if np.abs(gradient_offset[j]) <= eps:
|
|
|
+ gw_pp.append(None)
|
|
|
+ continue
|
|
|
+ else:
|
|
|
+ gw = np.array(([0, total_duration], [0, 0]))
|
|
|
+ else:
|
|
|
+ gw = gw_data[j]
|
|
|
+
|
|
|
+ # Now gw contains the waveform from the current axis
|
|
|
+ if np.abs(gradient_delays[j]) > eps:
|
|
|
+ gw[0] = gw[0] - gradient_delays[j] # Anisotropic gradient delay support
|
|
|
+ if not np.all(np.isfinite(gw)):
|
|
|
+ raise Warning("Not all elements of the generated waveform are finite.")
|
|
|
+
|
|
|
+ teps = 1e-12
|
|
|
+ _temp1 = np.array(([gw[0, 0] - 2 * teps, gw[0, 0] - teps], [0, 0]))
|
|
|
+ _temp2 = np.array(([gw[0, -1] + teps, gw[0, -1] + 2 * teps], [0, 0]))
|
|
|
+ gw = np.hstack((_temp1, gw, _temp2))
|
|
|
+
|
|
|
+ if np.abs(gradient_offset[j]) > eps:
|
|
|
+ gw[1, :] += gradient_offset[j]
|
|
|
+
|
|
|
+ gw[1][gw[1] == -0.0] = 0.0
|
|
|
+
|
|
|
+ gw_pp.append(PPoly(np.stack((np.diff(gw[1]) / np.diff(gw[0]),
|
|
|
+ gw[1][:-1])), gw[0], extrapolate=True))
|
|
|
+ return gw_pp
|
|
|
+
|
|
|
+ def mod_grad_axis(self, axis: str, modifier: int) -> None:
|
|
|
+ """
|
|
|
+ Invert or scale all gradients along the corresponding axis/channel. The function acts on all gradient objects
|
|
|
+ already added to the sequence object.
|
|
|
+
|
|
|
+ Parameters
|
|
|
+ ----------
|
|
|
+ axis : str
|
|
|
+ Gradients to invert or scale. Must be one of 'x', 'y' or 'z'.
|
|
|
+ modifier : int
|
|
|
+ Scaling value.
|
|
|
+
|
|
|
+ Raises
|
|
|
+ ------
|
|
|
+ ValueError
|
|
|
+ If invalid `axis` is passed. Must be one of 'x', 'y','z'.
|
|
|
+ RuntimeError
|
|
|
+ If same gradient event is used on multiple axes.
|
|
|
+ """
|
|
|
+ if axis not in ["x", "y", "z"]:
|
|
|
+ raise ValueError(
|
|
|
+ f"Invalid axis. Must be one of 'x', 'y','z'. Passed: {axis}"
|
|
|
+ )
|
|
|
+
|
|
|
+ channel_num = ["x", "y", "z"].index(axis)
|
|
|
+ other_channels = [0, 1, 2]
|
|
|
+ other_channels.remove(channel_num)
|
|
|
+
|
|
|
+ # Go through all event table entries and list gradient objects in the library
|
|
|
+ all_grad_events = np.array(list(self.block_events.values()))
|
|
|
+ all_grad_events = all_grad_events[:, 2:5]
|
|
|
+
|
|
|
+ selected_events = np.unique(all_grad_events[:, channel_num])
|
|
|
+ selected_events = selected_events[selected_events != 0]
|
|
|
+ other_events = np.unique(all_grad_events[:, other_channels])
|
|
|
+ if len(np.intersect1d(selected_events, other_events)) > 0:
|
|
|
+ raise RuntimeError(
|
|
|
+ "mod_grad_axis does not yet support the same gradient event used on multiple axes."
|
|
|
+ )
|
|
|
+
|
|
|
+ for i in range(len(selected_events)):
|
|
|
+ self.grad_library.data[selected_events[i]][0] *= modifier
|
|
|
+ if (
|
|
|
+ self.grad_library.type[selected_events[i]] == "g"
|
|
|
+ and self.grad_library.lengths[selected_events[i]] == 5
|
|
|
+ ):
|
|
|
+ # Need to update first and last fields
|
|
|
+ self.grad_library.data[selected_events[i]][3] *= modifier
|
|
|
+ self.grad_library.data[selected_events[i]][4] *= modifier
|
|
|
+
|
|
|
+ def plot(
|
|
|
+ self, tk_obj,
|
|
|
+ label: str = str(),
|
|
|
+ show_blocks: bool = False,
|
|
|
+ save: bool = False,
|
|
|
+ time_range=(0, np.inf),
|
|
|
+ time_disp: str = "s",
|
|
|
+ grad_disp: str = "kHz/m",
|
|
|
+ plot_now: bool = True,
|
|
|
+ tk_plot: bool = True
|
|
|
+ ) -> None:
|
|
|
+ """
|
|
|
+ Plot `Sequence`.
|
|
|
+
|
|
|
+ Parameters
|
|
|
+ ----------
|
|
|
+ label : str, defualt=str()
|
|
|
+ Plot label values for ADC events: in this example for LIN and REP labels; other valid labes are accepted as
|
|
|
+ a comma-separated list.
|
|
|
+ save : bool, default=False
|
|
|
+ Boolean flag indicating if plots should be saved. The two figures will be saved as JPG with numerical
|
|
|
+ suffixes to the filename 'seq_plot'.
|
|
|
+ show_blocks : bool, default=False
|
|
|
+ Boolean flag to indicate if grid and tick labels at the block boundaries are to be plotted.
|
|
|
+ time_range : iterable, default=(0, np.inf)
|
|
|
+ Time range (x-axis limits) for plotting the sequence. Default is 0 to infinity (entire sequence).
|
|
|
+ time_disp : str, default='s'
|
|
|
+ Time display type, must be one of `s`, `ms` or `us`.
|
|
|
+ grad_disp : str, default='s'
|
|
|
+ Gradient display unit, must be one of `kHz/m` or `mT/m`.
|
|
|
+ plot_now : bool, default=True
|
|
|
+ If true, function immediately shows the plots, blocking the rest of the code until plots are exited.
|
|
|
+ If false, plots are shown when plt.show() is called. Useful if plots are to be modified.
|
|
|
+ plot_type : str, default='Gradient'
|
|
|
+ Gradients display type, must be one of either 'Gradient' or 'Kspace'.
|
|
|
+ """
|
|
|
+ mpl.rcParams["lines.linewidth"] = 0.75 # Set default Matplotlib linewidth
|
|
|
+
|
|
|
+ valid_time_units = ["s", "ms", "us"]
|
|
|
+ valid_grad_units = ["kHz/m", "mT/m"]
|
|
|
+ valid_labels = get_supported_labels()
|
|
|
+ if (
|
|
|
+ not all([isinstance(x, (int, float)) for x in time_range])
|
|
|
+ or len(time_range) != 2
|
|
|
+ ):
|
|
|
+ raise ValueError("Invalid time range")
|
|
|
+ if time_disp not in valid_time_units:
|
|
|
+ raise ValueError("Unsupported time unit")
|
|
|
+
|
|
|
+ if grad_disp not in valid_grad_units:
|
|
|
+ raise ValueError(
|
|
|
+ "Unsupported gradient unit. Supported gradient units are: "
|
|
|
+ + str(valid_grad_units)
|
|
|
+ )
|
|
|
+
|
|
|
+ fig1, fig2 = plt.figure(1), plt.figure(2)
|
|
|
+
|
|
|
+ sp11 = fig1.add_subplot(311)
|
|
|
+ sp12 = fig1.add_subplot(312, sharex=sp11)
|
|
|
+ sp13 = fig1.add_subplot(313, sharex=sp11)
|
|
|
+ fig2_subplots = [
|
|
|
+ fig2.add_subplot(311, sharex=sp11),
|
|
|
+ fig2.add_subplot(312, sharex=sp11),
|
|
|
+ fig2.add_subplot(313, sharex=sp11),
|
|
|
+ ]
|
|
|
+
|
|
|
+ t_factor_list = [1, 1e3, 1e6]
|
|
|
+ t_factor = t_factor_list[valid_time_units.index(time_disp)]
|
|
|
+
|
|
|
+ g_factor_list = [1e-3, 1e3 / self.system.gamma]
|
|
|
+ g_factor = g_factor_list[valid_grad_units.index(grad_disp)]
|
|
|
+
|
|
|
+ t0 = 0
|
|
|
+ label_defined = False
|
|
|
+ label_idx_to_plot = []
|
|
|
+ label_legend_to_plot = []
|
|
|
+ label_store = dict()
|
|
|
+ for i in range(len(valid_labels)):
|
|
|
+ label_store[valid_labels[i]] = 0
|
|
|
+ if valid_labels[i] in label.upper():
|
|
|
+ label_idx_to_plot.append(i)
|
|
|
+ label_legend_to_plot.append(valid_labels[i])
|
|
|
+
|
|
|
+ if len(label_idx_to_plot) != 0:
|
|
|
+ p = parula.main(len(label_idx_to_plot) + 1)
|
|
|
+ label_colors_to_plot = p(np.arange(len(label_idx_to_plot)))
|
|
|
+ cycler = mpl.cycler(color=label_colors_to_plot)
|
|
|
+ sp11.set_prop_cycle(cycler)
|
|
|
+
|
|
|
+ # Block timings
|
|
|
+ block_edges = np.cumsum([0] + [x[1] for x in sorted(self.block_durations.items())])
|
|
|
+ block_edges_in_range = block_edges[
|
|
|
+ (block_edges >= time_range[0]) * (block_edges <= time_range[1])
|
|
|
+ ]
|
|
|
+ if show_blocks:
|
|
|
+ for sp in [sp11, sp12, sp13, *fig2_subplots]:
|
|
|
+ sp.set_xticks(t_factor * block_edges_in_range)
|
|
|
+ sp.set_xticklabels(sp.get_xticklabels(), rotation=90)
|
|
|
+
|
|
|
+ for block_counter in self.block_events:
|
|
|
+ block = self.get_block(block_counter)
|
|
|
+ is_valid = (time_range[0] <= t0 + self.block_durations[block_counter]
|
|
|
+ and t0 <= time_range[1])
|
|
|
+ if is_valid:
|
|
|
+ if getattr(block, "label", None) is not None:
|
|
|
+ for i in range(len(block.label)):
|
|
|
+ if block.label[i].type == "labelinc":
|
|
|
+ label_store[block.label[i].label] += block.label[i].value
|
|
|
+ else:
|
|
|
+ label_store[block.label[i].label] = block.label[i].value
|
|
|
+ label_defined = True
|
|
|
+
|
|
|
+ if getattr(block, "adc", None) is not None: # ADC
|
|
|
+ adc = block.adc
|
|
|
+ # From Pulseq: According to the information from Klaus Scheffler and indirectly from Siemens this
|
|
|
+ # is the present convention - the samples are shifted by 0.5 dwell
|
|
|
+ t = adc.delay + (np.arange(int(adc.num_samples)) + 0.5) * adc.dwell
|
|
|
+ sp11.plot(t_factor * (t0 + t), np.zeros(len(t)), "rx")
|
|
|
+ sp13.plot(
|
|
|
+ t_factor * (t0 + t),
|
|
|
+ np.angle(
|
|
|
+ np.exp(1j * adc.phase_offset)
|
|
|
+ * np.exp(1j * 2 * np.pi * t * adc.freq_offset)
|
|
|
+ ),
|
|
|
+ "b.",
|
|
|
+ markersize=0.25,
|
|
|
+ )
|
|
|
+
|
|
|
+ if label_defined and len(label_idx_to_plot) != 0:
|
|
|
+ arr_label_store = list(label_store.values())
|
|
|
+ lbl_vals = np.take(arr_label_store, label_idx_to_plot)
|
|
|
+ t = t0 + adc.delay + (adc.num_samples - 1) / 2 * adc.dwell
|
|
|
+ _t = [t_factor * t] * len(lbl_vals)
|
|
|
+ # Plot each label individually to retrieve each corresponding Line2D object
|
|
|
+ p = itertools.chain.from_iterable(
|
|
|
+ [
|
|
|
+ sp11.plot(__t, _lbl_vals, ".")
|
|
|
+ for __t, _lbl_vals in zip(_t, lbl_vals)
|
|
|
+ ]
|
|
|
+ )
|
|
|
+ if len(label_legend_to_plot) != 0:
|
|
|
+ sp11.legend(p, label_legend_to_plot, loc="upper left")
|
|
|
+ label_legend_to_plot = []
|
|
|
+
|
|
|
+ if getattr(block, "rf", None) is not None: # RF
|
|
|
+ rf = block.rf
|
|
|
+ tc, ic = calc_rf_center(rf)
|
|
|
+ time = rf.t
|
|
|
+ signal = rf.signal
|
|
|
+ if abs(signal[0]) != 0:
|
|
|
+ signal = np.concatenate(([0], signal))
|
|
|
+ time = np.concatenate(([time[0]], time))
|
|
|
+ ic += 1
|
|
|
+
|
|
|
+ if abs(signal[-1]) != 0:
|
|
|
+ signal = np.concatenate((signal, [0]))
|
|
|
+ time = np.concatenate((time, [time[-1]]))
|
|
|
+
|
|
|
+ sp12.plot(t_factor * (t0 + time + rf.delay), np.abs(signal))
|
|
|
+ sp13.plot(
|
|
|
+ t_factor * (t0 + time + rf.delay),
|
|
|
+ np.angle(
|
|
|
+ signal
|
|
|
+ * np.exp(1j * rf.phase_offset)
|
|
|
+ * np.exp(1j * 2 * math.pi * time * rf.freq_offset)
|
|
|
+ ),
|
|
|
+ t_factor * (t0 + tc + rf.delay),
|
|
|
+ np.angle(
|
|
|
+ signal[ic]
|
|
|
+ * np.exp(1j * rf.phase_offset)
|
|
|
+ * np.exp(1j * 2 * math.pi * time[ic] * rf.freq_offset)
|
|
|
+ ),
|
|
|
+ "xb",
|
|
|
+ )
|
|
|
+
|
|
|
+ grad_channels = ["gx", "gy", "gz"]
|
|
|
+ for x in range(len(grad_channels)): # Gradients
|
|
|
+ if getattr(block, grad_channels[x], None) is not None:
|
|
|
+ grad = getattr(block, grad_channels[x])
|
|
|
+ if grad.type == "grad":
|
|
|
+ # We extend the shape by adding the first and the last points in an effort of making the
|
|
|
+ # display a bit less confusing...
|
|
|
+ time = grad.delay + np.array([0, *grad.tt, grad.shape_dur])
|
|
|
+ waveform = g_factor * np.array(
|
|
|
+ (grad.first, *grad.waveform, grad.last)
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ time = np.array(cumsum(
|
|
|
+ 0,
|
|
|
+ grad.delay,
|
|
|
+ grad.rise_time,
|
|
|
+ grad.flat_time,
|
|
|
+ grad.fall_time,
|
|
|
+ ))
|
|
|
+ waveform = (
|
|
|
+ g_factor * grad.amplitude * np.array([0, 0, 1, 1, 0])
|
|
|
+ )
|
|
|
+ fig2_subplots[x].plot(t_factor * (t0 + time), waveform)
|
|
|
+ t0 += self.block_durations[block_counter]
|
|
|
+
|
|
|
+ grad_plot_labels = ["x", "y", "z"]
|
|
|
+ sp11.set_ylabel("ADC")
|
|
|
+ sp12.set_ylabel("RF mag (Hz)")
|
|
|
+ sp13.set_ylabel("RF/ADC phase (rad)")
|
|
|
+ sp13.set_xlabel(f"t ({time_disp})")
|
|
|
+ for x in range(3):
|
|
|
+ _label = grad_plot_labels[x]
|
|
|
+ fig2_subplots[x].set_ylabel(f"G{_label} ({grad_disp})")
|
|
|
+ fig2_subplots[-1].set_xlabel(f"t ({time_disp})")
|
|
|
+
|
|
|
+ # Setting display limits
|
|
|
+ disp_range = t_factor * np.array([time_range[0], min(t0, time_range[1])])
|
|
|
+ [x.set_xlim(disp_range) for x in [sp11, sp12, sp13, *fig2_subplots]]
|
|
|
+
|
|
|
+ # Grid on
|
|
|
+ for sp in [sp11, sp12, sp13, *fig2_subplots]:
|
|
|
+ sp.grid()
|
|
|
+
|
|
|
+ fig1.tight_layout()
|
|
|
+ fig2.tight_layout()
|
|
|
+ if save:
|
|
|
+ fig1.savefig("seq_plot1.jpg")
|
|
|
+ fig2.savefig("seq_plot2.jpg")
|
|
|
+
|
|
|
+ if tk_plot:
|
|
|
+ root = tk.Tk()
|
|
|
+ root.title("График с панелью инструментов")
|
|
|
+
|
|
|
+ canvas = FigureCanvasTkAgg(fig1, master=root)
|
|
|
+ canvas.draw()
|
|
|
+
|
|
|
+ # Добавляем панель инструментов (зумирование, сохранение и т. д.)
|
|
|
+ toolbar = NavigationToolbar2Tk(canvas, root)
|
|
|
+ toolbar.update()
|
|
|
+
|
|
|
+ canvas.get_tk_widget().pack(side=tk.TOP, fill=tk.BOTH, expand=True)
|
|
|
+
|
|
|
+ if plot_now:
|
|
|
+ plt.show()
|
|
|
+
|
|
|
+ def read(self, file_path: str, detect_rf_use: bool = False, remove_duplicates: bool = True) -> None:
|
|
|
+ """
|
|
|
+ Read `.seq` file from `file_path`.
|
|
|
+
|
|
|
+ Parameters
|
|
|
+ ----------
|
|
|
+ detect_rf_use
|
|
|
+ file_path : str
|
|
|
+ Path to `.seq` file to be read.
|
|
|
+ remove_duplicates : bool, default=True
|
|
|
+ Remove duplicate events from the sequence after reading.
|
|
|
+ """
|
|
|
+ if self.use_block_cache:
|
|
|
+ self.block_cache.clear()
|
|
|
+
|
|
|
+ read(self, path=file_path, detect_rf_use=detect_rf_use, remove_duplicates=remove_duplicates)
|
|
|
+
|
|
|
+ # Initialize next free block ID
|
|
|
+ self.next_free_block_ID = (max(self.block_events) + 1) if self.block_events else 1
|
|
|
+
|
|
|
+ def register_adc_event(self, event: EventLibrary) -> int:
|
|
|
+ return block.register_adc_event(self, event)
|
|
|
+
|
|
|
+ def register_grad_event(
|
|
|
+ self, event: SimpleNamespace
|
|
|
+ ) -> Union[int, Tuple[int, int]]:
|
|
|
+ return block.register_grad_event(self, event)
|
|
|
+
|
|
|
+ def register_label_event(self, event: SimpleNamespace) -> int:
|
|
|
+ return block.register_label_event(self, event)
|
|
|
+
|
|
|
+ def register_rf_event(self, event: SimpleNamespace) -> Tuple[int, List[int]]:
|
|
|
+ return block.register_rf_event(self, event)
|
|
|
+
|
|
|
+ def remove_duplicates(self, in_place: bool = False) -> Self:
|
|
|
+ """
|
|
|
+ Removes duplicate events from the shape and event libraries contained
|
|
|
+ in this sequence.
|
|
|
+
|
|
|
+ Parameters
|
|
|
+ ----------
|
|
|
+ in_place : bool, optional
|
|
|
+ If true, removes the duplicates from the current sequence.
|
|
|
+ Otherwise, a copy is created. The default is False.
|
|
|
+
|
|
|
+ Returns
|
|
|
+ -------
|
|
|
+ seq_copy : Sequence
|
|
|
+ If `in_place`, returns self. Otherwise returns a copy of the
|
|
|
+ sequence.
|
|
|
+ """
|
|
|
+ if in_place:
|
|
|
+ seq_copy = self
|
|
|
+ else:
|
|
|
+ # Avoid copying block_cache for performance
|
|
|
+ tmp = self.block_cache
|
|
|
+ self.block_cache = {}
|
|
|
+ seq_copy = deepcopy(self)
|
|
|
+ self.block_cache = tmp
|
|
|
+
|
|
|
+ # Find duplicate in shape library
|
|
|
+ seq_copy.shape_library, mapping = seq_copy.shape_library.remove_duplicates(9)
|
|
|
+
|
|
|
+ # Remap shape IDs of arbitrary gradient events
|
|
|
+ for grad_id in seq_copy.grad_library.data:
|
|
|
+ if seq_copy.grad_library.type[grad_id] == 'g':
|
|
|
+ data = seq_copy.grad_library.data[grad_id]
|
|
|
+ new_data = (data[0],) + (mapping[data[1]], mapping[data[2]]) + data[3:]
|
|
|
+ if data != new_data:
|
|
|
+ seq_copy.grad_library.update(grad_id, None, new_data)
|
|
|
+
|
|
|
+ # Remap shape IDs of RF events
|
|
|
+ for rf_id in seq_copy.rf_library.data:
|
|
|
+ data = seq_copy.rf_library.data[rf_id]
|
|
|
+ new_data = (data[0],) + (mapping[data[1]], mapping[data[2]], mapping[data[3]]) + data[4:]
|
|
|
+ if data != new_data:
|
|
|
+ seq_copy.rf_library.update(rf_id, None, new_data)
|
|
|
+
|
|
|
+ # Filter duplicates in gradient library
|
|
|
+ seq_copy.grad_library, mapping = seq_copy.grad_library.remove_duplicates((6, -6, -6, -6, -6, -6))
|
|
|
+
|
|
|
+ # Remap gradient event IDs
|
|
|
+ for block_id in seq_copy.block_events:
|
|
|
+ seq_copy.block_events[block_id][2] = mapping[seq_copy.block_events[block_id][2]]
|
|
|
+ seq_copy.block_events[block_id][3] = mapping[seq_copy.block_events[block_id][3]]
|
|
|
+ seq_copy.block_events[block_id][4] = mapping[seq_copy.block_events[block_id][4]]
|
|
|
+
|
|
|
+ # Filter duplicates in RF library
|
|
|
+ seq_copy.rf_library, mapping = seq_copy.rf_library.remove_duplicates((6, 0, 0, 0, 6, 6, 6))
|
|
|
+
|
|
|
+ # Remap RF event IDs
|
|
|
+ for block_id in seq_copy.block_events:
|
|
|
+ seq_copy.block_events[block_id][1] = mapping[seq_copy.block_events[block_id][1]]
|
|
|
+
|
|
|
+ # Filter duplicates in ADC library
|
|
|
+ seq_copy.adc_library, mapping = seq_copy.adc_library.remove_duplicates((0, -9, -6, 6, 6, 6))
|
|
|
+
|
|
|
+ # Remap ADC event IDs
|
|
|
+ for block_id in seq_copy.block_events:
|
|
|
+ seq_copy.block_events[block_id][5] = mapping[seq_copy.block_events[block_id][5]]
|
|
|
+
|
|
|
+ return seq_copy
|
|
|
+
|
|
|
+ def rf_from_lib_data(self, lib_data: list, use: str = str()) -> SimpleNamespace:
|
|
|
+ """
|
|
|
+ Construct RF object from `lib_data`.
|
|
|
+
|
|
|
+ Parameters
|
|
|
+ ----------
|
|
|
+ lib_data : list
|
|
|
+ RF envelope.
|
|
|
+ use : str, default=str()
|
|
|
+ RF event use.
|
|
|
+
|
|
|
+ Returns
|
|
|
+ -------
|
|
|
+ rf : SimpleNamespace
|
|
|
+ RF object constructed from `lib_data`.
|
|
|
+ """
|
|
|
+ rf = SimpleNamespace()
|
|
|
+ rf.type = "rf"
|
|
|
+
|
|
|
+ amplitude, mag_shape, phase_shape = lib_data[0], lib_data[1], lib_data[2]
|
|
|
+ shape_data = self.shape_library.data[mag_shape]
|
|
|
+ compressed = SimpleNamespace()
|
|
|
+ compressed.num_samples = shape_data[0]
|
|
|
+ compressed.data = shape_data[1:]
|
|
|
+ mag = decompress_shape(compressed)
|
|
|
+ shape_data = self.shape_library.data[phase_shape]
|
|
|
+ compressed.num_samples = shape_data[0]
|
|
|
+ compressed.data = shape_data[1:]
|
|
|
+ phase = decompress_shape(compressed)
|
|
|
+ rf.signal = amplitude * mag * np.exp(1j * 2 * np.pi * phase)
|
|
|
+ time_shape = lib_data[3]
|
|
|
+ if time_shape > 0:
|
|
|
+ shape_data = self.shape_library.data[time_shape]
|
|
|
+ compressed.num_samples = shape_data[0]
|
|
|
+ compressed.data = shape_data[1:]
|
|
|
+ rf.t = decompress_shape(compressed) * self.rf_raster_time
|
|
|
+ rf.shape_dur = (
|
|
|
+ math.ceil((rf.t[-1] - eps) / self.rf_raster_time) * self.rf_raster_time
|
|
|
+ )
|
|
|
+ else: # Generate default time raster on the fly
|
|
|
+ rf.t = (np.arange(1, len(rf.signal) + 1) - 0.5) * self.rf_raster_time
|
|
|
+ rf.shape_dur = len(rf.signal) * self.rf_raster_time
|
|
|
+
|
|
|
+ rf.delay = lib_data[4]
|
|
|
+ rf.freq_offset = lib_data[5]
|
|
|
+ rf.phase_offset = lib_data[6]
|
|
|
+
|
|
|
+ rf.dead_time = self.system.rf_dead_time
|
|
|
+ rf.ringdown_time = self.system.rf_ringdown_time
|
|
|
+
|
|
|
+ if use != "":
|
|
|
+ use_cases = {
|
|
|
+ "e": "excitation",
|
|
|
+ "r": "refocusing",
|
|
|
+ "i": "inversion",
|
|
|
+ "s": "saturation",
|
|
|
+ "p": "preparation",
|
|
|
+ }
|
|
|
+ rf.use = use_cases[use] if use in use_cases else "undefined"
|
|
|
+
|
|
|
+ return rf
|
|
|
+
|
|
|
+ def rf_times(
|
|
|
+ self, time_range: List[float] = None
|
|
|
+ ) -> Tuple[List[float], np.ndarray, List[float], np.ndarray, np.ndarray]:
|
|
|
+ """
|
|
|
+ Return time points of excitations and refocusings.
|
|
|
+
|
|
|
+ Returns
|
|
|
+ -------
|
|
|
+ t_excitation : List[float]
|
|
|
+ Contains time moments of the excitation RF pulses
|
|
|
+ fp_excitation : np.ndarray
|
|
|
+ Contains frequency and phase offsets of the excitation RF pulses
|
|
|
+ t_refocusing : List[float]
|
|
|
+ Contains time moments of the refocusing RF pulses
|
|
|
+ fp_refocusing : np.ndarray
|
|
|
+ Contains frequency and phase offsets of the excitation RF pulses
|
|
|
+ """
|
|
|
+
|
|
|
+ # Collect RF timing data
|
|
|
+ t_excitation = []
|
|
|
+ fp_excitation = []
|
|
|
+ t_refocusing = []
|
|
|
+ fp_refocusing = []
|
|
|
+
|
|
|
+ curr_dur = 0
|
|
|
+ if time_range == None:
|
|
|
+ blocks = self.block_events
|
|
|
+ else:
|
|
|
+ if len(time_range) != 2:
|
|
|
+ raise ValueError('Time range must be list of two elements')
|
|
|
+ if time_range[0] > time_range[1]:
|
|
|
+ raise ValueError('End time of time_range must be after begin time')
|
|
|
+
|
|
|
+ # Calculate end times of each block
|
|
|
+ bd = np.array(list(self.block_durations.values()))
|
|
|
+ t = np.cumsum(bd)
|
|
|
+ # Search block end times for start of time range
|
|
|
+ begin_block = np.searchsorted(t, time_range[0])
|
|
|
+ # Search block begin times for end of time range
|
|
|
+ end_block = np.searchsorted(t - bd, time_range[1], side='right')
|
|
|
+ blocks = list(self.block_durations.keys())[begin_block:end_block]
|
|
|
+ curr_dur = t[begin_block] - bd[begin_block]
|
|
|
+
|
|
|
+ for block_counter in blocks:
|
|
|
+ block = self.get_block(block_counter)
|
|
|
+
|
|
|
+ if block.rf is not None: # RF
|
|
|
+ rf = block.rf
|
|
|
+ t = rf.delay + calc_rf_center(rf)[0]
|
|
|
+ if not hasattr(rf, "use") or block.rf.use in [
|
|
|
+ "excitation",
|
|
|
+ "undefined",
|
|
|
+ ]:
|
|
|
+ t_excitation.append(curr_dur + t)
|
|
|
+ fp_excitation.append([block.rf.freq_offset, block.rf.phase_offset])
|
|
|
+ elif block.rf.use == "refocusing":
|
|
|
+ t_refocusing.append(curr_dur + t)
|
|
|
+ fp_refocusing.append([block.rf.freq_offset, block.rf.phase_offset])
|
|
|
+
|
|
|
+ curr_dur += self.block_durations[block_counter]
|
|
|
+
|
|
|
+ if len(t_excitation) != 0:
|
|
|
+ fp_excitation = np.array(fp_excitation).T
|
|
|
+ else:
|
|
|
+ fp_excitation = np.empty((2, 0))
|
|
|
+
|
|
|
+ if len(t_refocusing) != 0:
|
|
|
+ fp_refocusing = np.array(fp_refocusing).T
|
|
|
+ else:
|
|
|
+ fp_refocusing = np.empty((2, 0))
|
|
|
+
|
|
|
+ return t_excitation, fp_excitation, t_refocusing, fp_refocusing
|
|
|
+
|
|
|
+ def set_block(self, block_index: int, *args: SimpleNamespace) -> None:
|
|
|
+ """
|
|
|
+ Replace block at index with new block provided as block structure, add sequence block, or create a new block
|
|
|
+ from events and store at position specified by index. The block or events are provided in uncompressed form and
|
|
|
+ will be stored in the compressed, non-redundant internal libraries.
|
|
|
+
|
|
|
+ See also:
|
|
|
+ - `pypulseq.Sequence.sequence.Sequence.get_block()`
|
|
|
+ - `pypulseq.Sequence.sequence.Sequence.add_block()`
|
|
|
+
|
|
|
+ Parameters
|
|
|
+ ----------
|
|
|
+ block_index : int
|
|
|
+ Index at which block is replaced.
|
|
|
+ args : SimpleNamespace
|
|
|
+ Block or events to be replaced/added or created at `block_index`.
|
|
|
+ """
|
|
|
+ block.set_block(self, block_index, *args)
|
|
|
+
|
|
|
+ if block_index >= self.next_free_block_ID:
|
|
|
+ self.next_free_block_ID = block_index + 1
|
|
|
+
|
|
|
+ def set_definition(
|
|
|
+ self, key: str, value: Union[float, int, list, np.ndarray, str, tuple]
|
|
|
+ ) -> None:
|
|
|
+ """
|
|
|
+ Modify a custom definition of the sequence. Set the user definition 'key' to value 'value'. If the definition
|
|
|
+ does not exist it will be created.
|
|
|
+
|
|
|
+ See also `pypulseq.Sequence.sequence.Sequence.get_definition()`.
|
|
|
+
|
|
|
+ Parameters
|
|
|
+ ----------
|
|
|
+ key : str
|
|
|
+ Definition key.
|
|
|
+ value : int, list, np.ndarray, str or tuple
|
|
|
+ Definition value.
|
|
|
+ """
|
|
|
+ if key == "FOV":
|
|
|
+ if np.max(value) > 1:
|
|
|
+ text = "Definition FOV uses values exceeding 1 m. "
|
|
|
+ text += "New Pulseq interpreters expect values in units of meters."
|
|
|
+ warn(text)
|
|
|
+
|
|
|
+ self.definitions[key] = value
|
|
|
+
|
|
|
+ def set_extension_string_ID(self, extension_str: str, extension_id: int) -> None:
|
|
|
+ """
|
|
|
+ Set numeric ID for the given string extension ID.
|
|
|
+
|
|
|
+ Parameters
|
|
|
+ ----------
|
|
|
+ extension_str : str
|
|
|
+ Given string extension ID.
|
|
|
+ extension_id : int
|
|
|
+ Given numeric extension ID.
|
|
|
+
|
|
|
+ Raises
|
|
|
+ ------
|
|
|
+ ValueError
|
|
|
+ If given numeric or string extension ID is not unique.
|
|
|
+ """
|
|
|
+ if (
|
|
|
+ extension_str in self.extension_string_idx
|
|
|
+ or extension_id in self.extension_numeric_idx
|
|
|
+ ):
|
|
|
+ raise ValueError("Numeric or string ID is not unique")
|
|
|
+
|
|
|
+ self.extension_numeric_idx.append(extension_id)
|
|
|
+ self.extension_string_idx.append(extension_str)
|
|
|
+ assert len(self.extension_numeric_idx) == len(self.extension_string_idx)
|
|
|
+
|
|
|
+ def test_report(self) -> str:
|
|
|
+ """
|
|
|
+ Analyze the sequence and return a text report.
|
|
|
+ """
|
|
|
+ return ext_test_report(self)
|
|
|
+
|
|
|
+ def waveforms(
|
|
|
+ self, append_RF: bool = False, time_range: List[float] = None
|
|
|
+ ) -> Tuple[np.ndarray]:
|
|
|
+ """
|
|
|
+ Decompress the entire gradient waveform. Returns gradient waveforms as a tuple of `np.ndarray` of
|
|
|
+ `gradient_axes` (typically 3) dimensions. Each `np.ndarray` contains timepoints and the corresponding
|
|
|
+ gradient amplitude values.
|
|
|
+
|
|
|
+ Parameters
|
|
|
+ ----------
|
|
|
+ append_RF : bool, default=False
|
|
|
+ Boolean flag to indicate if RF wave shapes are to be appended after the gradients.
|
|
|
+
|
|
|
+ Returns
|
|
|
+ -------
|
|
|
+ wave_data : np.ndarray
|
|
|
+ """
|
|
|
+ grad_channels = ["gx", "gy", "gz"]
|
|
|
+
|
|
|
+ # Collect shape pieces
|
|
|
+ if append_RF:
|
|
|
+ shape_channels = len(grad_channels) + 1 # Last 'channel' is RF
|
|
|
+ else:
|
|
|
+ shape_channels = len(grad_channels)
|
|
|
+
|
|
|
+ shape_pieces = [[] for _ in range(shape_channels)]
|
|
|
+ out_len = np.zeros(shape_channels) # Last 'channel' is RF
|
|
|
+
|
|
|
+ curr_dur = 0
|
|
|
+ if time_range == None:
|
|
|
+ blocks = self.block_events
|
|
|
+ else:
|
|
|
+ if len(time_range) != 2:
|
|
|
+ raise ValueError('Time range must be list of two elements')
|
|
|
+ if time_range[0] > time_range[1]:
|
|
|
+ raise ValueError('End time of time_range must be after begin time')
|
|
|
+
|
|
|
+ # Calculate end times of each block
|
|
|
+ bd = np.array(list(self.block_durations.values()))
|
|
|
+ t = np.cumsum(bd)
|
|
|
+ # Search block end times for start of time range
|
|
|
+ begin_block = np.searchsorted(t, time_range[0])
|
|
|
+ # Search block begin times for end of time range
|
|
|
+ end_block = np.searchsorted(t - bd, time_range[1], side='right')
|
|
|
+ blocks = list(self.block_durations.keys())[begin_block:end_block]
|
|
|
+ curr_dur = t[begin_block] - bd[begin_block]
|
|
|
+
|
|
|
+ for block_counter in blocks:
|
|
|
+ block = self.get_block(block_counter)
|
|
|
+
|
|
|
+ for j in range(len(grad_channels)):
|
|
|
+ grad = getattr(block, grad_channels[j])
|
|
|
+ if grad is not None: # Gradients
|
|
|
+ if grad.type == "grad":
|
|
|
+ # Check if we have an extended trapezoid or an arbitrary gradient on a regular raster
|
|
|
+ tt_rast = grad.tt / self.grad_raster_time + 0.5
|
|
|
+ if np.all(
|
|
|
+ np.abs(tt_rast - np.arange(1, len(tt_rast) + 1)) < eps
|
|
|
+ ): # Arbitrary gradient
|
|
|
+ """
|
|
|
+ Arbitrary gradient: restore & recompress shape - if we had a trapezoid converted to shape we
|
|
|
+ have to find the "corners" and we can eliminate internal samples on the straight segments
|
|
|
+ but first we have to restore samples on the edges of the gradient raster intervals for that
|
|
|
+ we need the first sample.
|
|
|
+ """
|
|
|
+
|
|
|
+ # TODO: Implement restoreAdditionalShapeSamples
|
|
|
+ # https://github.com/pulseq/pulseq/blob/master/matlab/%2Bmr/restoreAdditionalShapeSamples.m
|
|
|
+
|
|
|
+ out_len[j] += len(grad.tt) + 2
|
|
|
+ shape_pieces[j].append(np.array(
|
|
|
+ [
|
|
|
+ curr_dur + grad.delay + np.concatenate(
|
|
|
+ ([0], grad.tt, [grad.tt[-1] + self.grad_raster_time / 2])),
|
|
|
+ np.concatenate(([grad.first], grad.waveform, [grad.last]))
|
|
|
+ ]
|
|
|
+ ))
|
|
|
+ else: # Extended trapezoid
|
|
|
+ out_len[j] += len(grad.tt)
|
|
|
+ shape_pieces[j].append(np.array(
|
|
|
+ [
|
|
|
+ curr_dur + grad.delay + grad.tt,
|
|
|
+ grad.waveform,
|
|
|
+ ]
|
|
|
+ ))
|
|
|
+ else:
|
|
|
+ if abs(grad.flat_time) > eps:
|
|
|
+ out_len[j] += 4
|
|
|
+ _temp = np.vstack(
|
|
|
+ (
|
|
|
+ cumsum(
|
|
|
+ curr_dur + grad.delay,
|
|
|
+ grad.rise_time,
|
|
|
+ grad.flat_time,
|
|
|
+ grad.fall_time,
|
|
|
+ ),
|
|
|
+ grad.amplitude * np.array([0, 1, 1, 0]),
|
|
|
+ )
|
|
|
+ )
|
|
|
+ shape_pieces[j].append(_temp)
|
|
|
+ else:
|
|
|
+ if abs(grad.rise_time) > eps and abs(grad.fall_time) > eps:
|
|
|
+ out_len[j] += 3
|
|
|
+ _temp = np.vstack(
|
|
|
+ (
|
|
|
+ cumsum(curr_dur + grad.delay, grad.rise_time, grad.fall_time),
|
|
|
+ grad.amplitude * np.array([0, 1, 0]),
|
|
|
+ )
|
|
|
+ )
|
|
|
+ shape_pieces[j].append(_temp)
|
|
|
+ else:
|
|
|
+ if abs(grad.amplitude) > eps:
|
|
|
+ print(
|
|
|
+ 'Warning: "empty" gradient with non-zero magnitude detected in block {}'.format(
|
|
|
+ block_counter))
|
|
|
+
|
|
|
+ if block.rf is not None: # RF
|
|
|
+ rf = block.rf
|
|
|
+ if append_RF:
|
|
|
+ rf_piece = np.array(
|
|
|
+ [
|
|
|
+ curr_dur + rf.delay + rf.t,
|
|
|
+ rf.signal
|
|
|
+ * np.exp(
|
|
|
+ 1j
|
|
|
+ * (rf.phase_offset + 2 * np.pi * rf.freq_offset * rf.t)
|
|
|
+ ),
|
|
|
+ ]
|
|
|
+ )
|
|
|
+ out_len[-1] += len(rf.t)
|
|
|
+
|
|
|
+ if abs(rf.signal[0]) > 0:
|
|
|
+ pre = np.array([[rf_piece[0, 0] - 0.1 * self.system.rf_raster_time], [0]])
|
|
|
+ rf_piece = np.hstack((pre, rf_piece))
|
|
|
+ out_len[-1] += pre.shape[1]
|
|
|
+
|
|
|
+ if abs(rf.signal[-1]) > 0:
|
|
|
+ post = np.array([[rf_piece[0, -1] + 0.1 * self.system.rf_raster_time], [0]])
|
|
|
+ rf_piece = np.hstack((rf_piece, post))
|
|
|
+ out_len[-1] += post.shape[1]
|
|
|
+
|
|
|
+ shape_pieces[-1].append(rf_piece)
|
|
|
+
|
|
|
+ curr_dur += self.block_durations[block_counter]
|
|
|
+
|
|
|
+ # Collect wave data
|
|
|
+ wave_data = []
|
|
|
+
|
|
|
+ for j in range(shape_channels):
|
|
|
+ if shape_pieces[j] == []:
|
|
|
+ wave_data.append(np.zeros((2, 0)))
|
|
|
+ continue
|
|
|
+
|
|
|
+ # If the first element of the next shape has the same time as
|
|
|
+ # the last element of the previous shape, drop the first
|
|
|
+ # element of the next shape.
|
|
|
+ shape_pieces[j] = ([shape_pieces[j][0]] +
|
|
|
+ [cur if prev[0, -1] + eps < cur[0, 0] else cur[:, 1:]
|
|
|
+ for prev, cur in zip(shape_pieces[j][:-1], shape_pieces[j][1:])])
|
|
|
+
|
|
|
+ wave_data.append(np.concatenate(shape_pieces[j], axis=1))
|
|
|
+
|
|
|
+ rftdiff = np.diff(wave_data[j][0])
|
|
|
+ if np.any(rftdiff < eps):
|
|
|
+ raise Warning(
|
|
|
+ "Time vector elements are not monotonically increasing."
|
|
|
+ )
|
|
|
+
|
|
|
+ return wave_data
|
|
|
+
|
|
|
+ def waveforms_and_times(
|
|
|
+ self, append_RF: bool = False, time_range: List[float] = None
|
|
|
+ ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
|
|
|
+ """
|
|
|
+ Decompress the entire gradient waveform. Returns gradient waveforms as a tuple of `np.ndarray` of
|
|
|
+ `gradient_axes` (typically 3) dimensions. Each `np.ndarray` contains timepoints and the corresponding
|
|
|
+ gradient amplitude values. Additional return values are time points of excitations, refocusings and ADC
|
|
|
+ sampling points.
|
|
|
+
|
|
|
+ Parameters
|
|
|
+ ----------
|
|
|
+ append_RF : bool, default=False
|
|
|
+ Boolean flag to indicate if RF wave shapes are to be appended after the gradients.
|
|
|
+
|
|
|
+ Returns
|
|
|
+ -------
|
|
|
+ wave_data : np.ndarray
|
|
|
+ tfp_excitation : np.ndarray
|
|
|
+ Contains time moments, frequency and phase offsets of the excitation RF pulses (similar for `
|
|
|
+ tfp_refocusing`).
|
|
|
+ tfp_refocusing : np.ndarray
|
|
|
+ t_adc: np.ndarray
|
|
|
+ Contains times of all ADC sample points.
|
|
|
+ fp_adc : np.ndarray
|
|
|
+ Contains frequency and phase offsets of each ADC object (not samples).
|
|
|
+ """
|
|
|
+
|
|
|
+ wave_data = self.waveforms(append_RF=append_RF, time_range=time_range)
|
|
|
+ t_excitation, fp_excitation, t_refocusing, fp_refocusing = self.rf_times(time_range=time_range)
|
|
|
+ t_adc, fp_adc = self.adc_times(time_range=time_range)
|
|
|
+
|
|
|
+ # Join times, frequency and phases of RF pulses for compatibility with previous implementation
|
|
|
+ tfp_excitation = np.concatenate((np.array(t_excitation)[None], fp_excitation), axis=0)
|
|
|
+ tfp_refocusing = np.concatenate((np.array(t_refocusing)[None], fp_refocusing), axis=0)
|
|
|
+
|
|
|
+ return wave_data, tfp_excitation, tfp_refocusing, t_adc, fp_adc
|
|
|
+
|
|
|
+ def waveforms_export(self, time_range=(0, np.inf)) -> dict:
|
|
|
+ """
|
|
|
+ Plot `Sequence`.
|
|
|
+
|
|
|
+ Parameters
|
|
|
+ ----------
|
|
|
+ time_range : iterable, default=(0, np.inf)
|
|
|
+ Time range (x-axis limits) for all waveforms. Default is 0 to infinity (entire sequence).
|
|
|
+
|
|
|
+ Returns
|
|
|
+ -------
|
|
|
+ all_waveforms: dict
|
|
|
+ Dictionary containing the following sequence waveforms and time array(s):
|
|
|
+ - `t_adc` - ADC timing array [seconds]
|
|
|
+ - `t_rf` - RF timing array [seconds]
|
|
|
+ - `t_rf_centers`: `rf_t_centers`,
|
|
|
+ - `t_gx`: x gradient timing array,
|
|
|
+ - `t_gy`: y gradient timing array,
|
|
|
+ - `t_gz`: z gradient timing array,
|
|
|
+ - `adc` - ADC complex signal (amplitude=1, phase=adc phase) [a.u.]
|
|
|
+ - `rf` - RF complex signal
|
|
|
+ - `rf_centers`: RF centers array,
|
|
|
+ - `gx` - x gradient
|
|
|
+ - `gy` - y gradient
|
|
|
+ - `gz` - z gradient
|
|
|
+ - `grad_unit`: [kHz/m],
|
|
|
+ - `rf_unit`: [Hz],
|
|
|
+ - `time_unit`: [seconds],
|
|
|
+ """
|
|
|
+ # Check time range validity
|
|
|
+ if (
|
|
|
+ not all([isinstance(x, (int, float)) for x in time_range])
|
|
|
+ or len(time_range) != 2
|
|
|
+ ):
|
|
|
+ raise ValueError("Invalid time range")
|
|
|
+
|
|
|
+ t0 = 0
|
|
|
+ adc_t_all = np.array([])
|
|
|
+ adc_signal_all = np.array([], dtype=complex)
|
|
|
+ rf_t_all = np.array([])
|
|
|
+ rf_signal_all = np.array([], dtype=complex)
|
|
|
+ rf_t_centers = np.array([])
|
|
|
+ rf_signal_centers = np.array([], dtype=complex)
|
|
|
+ gx_t_all = np.array([])
|
|
|
+ gy_t_all = np.array([])
|
|
|
+ gz_t_all = np.array([])
|
|
|
+ gx_all = np.array([])
|
|
|
+ gy_all = np.array([])
|
|
|
+ gz_all = np.array([])
|
|
|
+
|
|
|
+ for block_counter in self.block_events: # For each block
|
|
|
+ block = self.get_block(block_counter) # Retrieve it
|
|
|
+ is_valid = (
|
|
|
+ time_range[0] <= t0 <= time_range[1]
|
|
|
+ ) # Check if "current time" is within requested range.
|
|
|
+ if is_valid:
|
|
|
+ # Case 1: ADC
|
|
|
+ if block.adc != None:
|
|
|
+ adc = block.adc # Get adc info
|
|
|
+ # From Pulseq: According to the information from Klaus Scheffler and indirectly from Siemens this
|
|
|
+ # is the present convention - the samples are shifted by 0.5 dwell
|
|
|
+ t = adc.delay + (np.arange(int(adc.num_samples)) + 0.5) * adc.dwell
|
|
|
+ adc_t = t0 + t
|
|
|
+ adc_signal = np.exp(1j * adc.phase_offset) * np.exp(
|
|
|
+ 1j * 2 * np.pi * t * adc.freq_offset
|
|
|
+ )
|
|
|
+ adc_t_all = np.concatenate((adc_t_all, adc_t))
|
|
|
+ adc_signal_all = np.concatenate((adc_signal_all, adc_signal))
|
|
|
+
|
|
|
+ if block.rf != None:
|
|
|
+ rf = block.rf
|
|
|
+ tc, ic = calc_rf_center(rf)
|
|
|
+ t = rf.t + rf.delay
|
|
|
+ tc = tc + rf.delay
|
|
|
+
|
|
|
+ # Debug - visualize
|
|
|
+ # sp12.plot(t_factor * (t0 + t), np.abs(rf.signal))
|
|
|
+ # sp13.plot(t_factor * (t0 + t), np.angle(rf.signal * np.exp(1j * rf.phase_offset)
|
|
|
+ # * np.exp(1j * 2 * math.pi * rf.t * rf.freq_offset)),
|
|
|
+ # t_factor * (t0 + tc), np.angle(rf.signal[ic] * np.exp(1j * rf.phase_offset)
|
|
|
+ # * np.exp(1j * 2 * math.pi * rf.t[ic] * rf.freq_offset)),
|
|
|
+ # 'xb')
|
|
|
+
|
|
|
+ rf_t = t0 + t
|
|
|
+ rf = (
|
|
|
+ rf.signal
|
|
|
+ * np.exp(1j * rf.phase_offset)
|
|
|
+ * np.exp(1j * 2 * math.pi * rf.t * rf.freq_offset)
|
|
|
+ )
|
|
|
+ rf_t_all = np.concatenate((rf_t_all, rf_t))
|
|
|
+ rf_signal_all = np.concatenate((rf_signal_all, rf))
|
|
|
+ rf_t_centers = np.concatenate((rf_t_centers, [rf_t[ic]]))
|
|
|
+ rf_signal_centers = np.concatenate((rf_signal_centers, [rf[ic]]))
|
|
|
+
|
|
|
+ grad_channels = ["gx", "gy", "gz"]
|
|
|
+ for x in range(
|
|
|
+ len(grad_channels)
|
|
|
+ ): # Check each gradient channel: x, y, and z
|
|
|
+ if getattr(block, grad_channels[x]) != None:
|
|
|
+ # If this channel is on in current block
|
|
|
+ grad = getattr(block, grad_channels[x])
|
|
|
+ if grad.type == "grad": # Arbitrary gradient option
|
|
|
+ # In place unpacking of grad.t with the starred expression
|
|
|
+ g_t = (
|
|
|
+ t0
|
|
|
+ + grad.delay
|
|
|
+ + [
|
|
|
+ 0,
|
|
|
+ *(grad.tt + (grad.tt[1] - grad.tt[0]) / 2),
|
|
|
+ grad.tt[-1] + grad.tt[1] - grad.tt[0],
|
|
|
+ ]
|
|
|
+ )
|
|
|
+ g = 1e-3 * np.array((grad.first, *grad.waveform, grad.last))
|
|
|
+ else: # Trapezoid gradient option
|
|
|
+ g_t = cumsum(
|
|
|
+ t0,
|
|
|
+ grad.delay,
|
|
|
+ grad.rise_time,
|
|
|
+ grad.flat_time,
|
|
|
+ grad.fall_time,
|
|
|
+ )
|
|
|
+ g = 1e-3 * grad.amplitude * np.array([0, 0, 1, 1, 0])
|
|
|
+
|
|
|
+ if grad.channel == "x":
|
|
|
+ gx_t_all = np.concatenate((gx_t_all, g_t))
|
|
|
+ gx_all = np.concatenate((gx_all, g))
|
|
|
+ elif grad.channel == "y":
|
|
|
+ gy_t_all = np.concatenate((gy_t_all, g_t))
|
|
|
+ gy_all = np.concatenate((gy_all, g))
|
|
|
+ elif grad.channel == "z":
|
|
|
+ gz_t_all = np.concatenate((gz_t_all, g_t))
|
|
|
+ gz_all = np.concatenate((gz_all, g))
|
|
|
+
|
|
|
+ t0 += self.block_durations[
|
|
|
+ block_counter
|
|
|
+ ] # "Current time" gets updated to end of block just examined
|
|
|
+
|
|
|
+ all_waveforms = {
|
|
|
+ "t_adc": adc_t_all,
|
|
|
+ "t_rf": rf_t_all,
|
|
|
+ "t_rf_centers": rf_t_centers,
|
|
|
+ "t_gx": gx_t_all,
|
|
|
+ "t_gy": gy_t_all,
|
|
|
+ "t_gz": gz_t_all,
|
|
|
+ "adc": adc_signal_all,
|
|
|
+ "rf": rf_signal_all,
|
|
|
+ "rf_centers": rf_signal_centers,
|
|
|
+ "gx": gx_all,
|
|
|
+ "gy": gy_all,
|
|
|
+ "gz": gz_all,
|
|
|
+ "grad_unit": "[kHz/m]",
|
|
|
+ "rf_unit": "[Hz]",
|
|
|
+ "time_unit": "[seconds]",
|
|
|
+ }
|
|
|
+
|
|
|
+ return all_waveforms
|
|
|
+
|
|
|
+ def write(self, name: str, create_signature: bool = True, remove_duplicates: bool = True) -> Union[str, None]:
|
|
|
+ """
|
|
|
+ Write the sequence data to the given filename using the open file format for MR sequences.
|
|
|
+
|
|
|
+ See also `pypulseq.Sequence.read_seq.read()`.
|
|
|
+
|
|
|
+ Parameters
|
|
|
+ ----------
|
|
|
+ name : str
|
|
|
+ Filename of `.seq` file to be written to disk.
|
|
|
+ create_signature : bool, default=True
|
|
|
+ Boolean flag to indicate if the file has to be signed.
|
|
|
+ remove_duplicates : bool, default=True
|
|
|
+ Remove duplicate events from the sequence before writing
|
|
|
+
|
|
|
+ Returns
|
|
|
+ -------
|
|
|
+ signature or None : If create_signature is True, it returns the written .seq file's signature as a string,
|
|
|
+ otherwise it returns None. Note that, if remove_duplicates is True, signature belongs to the
|
|
|
+ deduplicated sequences signature, and not the Sequence that is stored in the Sequence object.
|
|
|
+ """
|
|
|
+ signature = write_seq(self, name, create_signature)
|
|
|
+
|
|
|
+ if signature is not None:
|
|
|
+ self.signature_type = "md5"
|
|
|
+ self.signature_file = "text"
|
|
|
+ self.signature_value = signature
|
|
|
+ return signature
|
|
|
+ else:
|
|
|
+ return None
|