import itertools import math from collections import OrderedDict from copy import copy from itertools import chain from types import SimpleNamespace from typing import Tuple, List, Iterable from typing import Union from warnings import warn import matplotlib as mpl import numpy as np from matplotlib import pyplot as plt from pypulseq import eps from pypulseq.Sequence import block, parula from pypulseq.Sequence.ext_test_report import ext_test_report from pypulseq.Sequence.read_seq import read from pypulseq.Sequence.write_seq import write as write_seq from pypulseq.calc_rf_center import calc_rf_center from pypulseq.check_timing import check_timing as ext_check_timing from pypulseq.decompress_shape import decompress_shape from pypulseq.event_lib import EventLibrary from pypulseq.opts import Opts from pypulseq.supported_labels_rf_use import get_supported_labels from version import major, minor, revision class Sequence: """ Generate sequences and read/write sequence files. This class defines properties and methods to define a complete MR sequence including RF pulses, gradients, ADC events, etc. The class provides an implementation of the open MR sequence format defined by the Pulseq project. See http://pulseq.github.io/. See also `demo_read.py`, `demo_write.py`. """ read_seq = None version_major = major version_minor = minor version_revision = revision def __init__(self, system=Opts()): # ========= # EVENT LIBRARIES # ========= self.adc_library = EventLibrary() # Library of ADC events self.delay_library = EventLibrary() # Library of delay events # Library of extension events. Extension events form single-linked zero-terminated lists self.extensions_library = EventLibrary() self.grad_library = EventLibrary() # Library of gradient events self.label_inc_library = ( EventLibrary() ) # Library of Label(inc) events (reference from the extensions library) self.label_set_library = ( EventLibrary() ) # Library of Label(set) events (reference from the extensions library) self.rf_library = EventLibrary() # Library of RF events self.shape_library = EventLibrary() # Library of compressed shapes self.trigger_library = EventLibrary() # Library of trigger events # ========= # OTHER # ========= self.system = system self.block_events = OrderedDict() # Event table self.definitions = dict() # Optional sequence definitions self.rf_raster_time = ( self.system.rf_raster_time ) # RF raster time (system dependent) self.grad_raster_time = ( self.system.grad_raster_time ) # Gradient raster time (system dependent) self.adc_raster_time = ( self.system.adc_raster_time ) # ADC raster time (system dependent) self.block_duration_raster = self.system.block_duration_raster self.set_definition("AdcRasterTime", self.adc_raster_time) self.set_definition("BlockDurationRaster", self.block_duration_raster) self.set_definition("GradientRasterTime", self.grad_raster_time) self.set_definition("RadiofrequencyRasterTime", self.rf_raster_time) self.signature_type = "" self.signature_file = "" self.signature_value = "" self.block_durations = [] # Cache of block durations self.extension_numeric_idx = [] # numeric IDs of the used extensions self.extension_string_idx = [] # string IDs of the used extensions def __str__(self) -> str: s = "Sequence:" s += "\nshape_library: " + str(self.shape_library) s += "\nrf_library: " + str(self.rf_library) s += "\ngrad_library: " + str(self.grad_library) s += "\nadc_library: " + str(self.adc_library) s += "\ndelay_library: " + str(self.delay_library) s += "\nextensions_library: " + str( self.extensions_library ) # inserted for trigger support by mveldmann s += "\nrf_raster_time: " + str(self.rf_raster_time) s += "\ngrad_raster_time: " + str(self.grad_raster_time) s += "\nblock_events: " + str(len(self.block_events)) return s def add_block(self, *args: SimpleNamespace) -> None: """ Add a new block/multiple events to the sequence. Adds a sequence block with provided as a block structure See also: - `pypulseq.Sequence.sequence.Sequence.set_block()` - `pypulseq.make_adc.make_adc()` - `pypulseq.make_trapezoid.make_trapezoid()` - `pypulseq.make_sinc_pulse.make_sinc_pulse()` Parameters ---------- args : SimpleNamespace Block structure or events to be added as a block to `Sequence`. """ block.set_block(self, len(self.block_events) + 1, *args) def calculate_kspace( self, trajectory_delay: int = 0 ) -> Tuple[np.array, np.array, np.array, np.array, np.array]: """ Calculates the k-space trajectory of the entire pulse sequence. Parameters ---------- trajectory_delay : int, default=0 Compensation factor in seconds (s) to align ADC and gradients in the reconstruction. Returns ------- k_traj_adc : numpy.array K-space trajectory sampled at `t_adc` timepoints. k_traj : numpy.array K-space trajectory of the entire pulse sequence. t_excitation : numpy.array Excitation timepoints. t_refocusing : numpy.array Refocusing timepoints. t_adc : numpy.array Sampling timepoints. """ if np.any(np.abs(trajectory_delay) > 100e-6): raise Warning( f"Trajectory delay of {trajectory_delay * 1e6} us is suspiciously high" ) # Initialise the counters and accumulator objects count_excitation = 0 count_refocusing = 0 count_adc_samples = 0 # Loop through the blocks to prepare preallocations for block_counter in range(len(self.block_events)): block = self.get_block(block_counter + 1) if block.rf is not None: if ( not hasattr(block.rf, "use") or block.rf.use == "excitation" or block.rf.use == "undefined" ): count_excitation += 1 elif block.rf.use == "refocusing": count_refocusing += 1 if block.adc is not None: count_adc_samples += int(block.adc.num_samples) t_excitation = np.zeros(count_excitation) t_refocusing = np.zeros(count_refocusing) k_time = np.zeros(count_adc_samples) current_duration = 0 count_excitation = 0 count_refocusing = 0 kc_outer = 0 traj_recon_delay = trajectory_delay # Go through the blocks and collect RF and ADC timing data for block_counter in range(len(self.block_events)): block = self.get_block(block_counter + 1) if block.rf is not None: rf = block.rf rf_center, _ = calc_rf_center(rf) t = rf.delay + rf_center if ( not hasattr(block.rf, "use") or block.rf.use == "excitation" or block.rf.use == "undefined" ): t_excitation[count_excitation] = current_duration + t count_excitation += 1 elif block.rf.use == "refocusing": t_refocusing[count_refocusing] = current_duration + t count_refocusing += 1 if block.adc is not None: _k_time = np.arange(block.adc.num_samples) + 0.5 _k_time = ( _k_time * block.adc.dwell + block.adc.delay + current_duration + traj_recon_delay ) k_time[kc_outer : kc_outer + block.adc.num_samples] = _k_time kc_outer += block.adc.num_samples current_duration += self.block_durations[block_counter] # Now calculate the actual k-space trajectory based on the gradient waveforms gw = self.gradient_waveforms() i_excitation = np.round(t_excitation / self.grad_raster_time) i_refocusing = np.round(t_refocusing / self.grad_raster_time) i_periods = np.sort( [1, *(i_excitation + 1), *(i_refocusing + 1), gw.shape[1] + 1] ).astype(np.int32) # i_periods -= 1 # Python is 0-indexed ii_next_excitation = np.min((len(i_excitation), 1)) ii_next_refocusing = np.min((len(i_refocusing), 1)) k_traj = np.zeros_like(gw) k = np.zeros((3, 1)) for i in range(len(i_periods) - 1): i_period_end = i_periods[i + 1] - 1 k_period = np.concatenate( (k, gw[:, i_periods[i] - 1 : i_period_end] * self.grad_raster_time), axis=1, ) k_period = np.cumsum(k_period, axis=1) k_traj[:, i_periods[i] - 1 : i_period_end] = k_period[:, 1:] k = k_period[:, -1] if ( ii_next_excitation > 0 and i_excitation[ii_next_excitation - 1] == i_period_end ): k[:] = 0 k_traj[:, i_period_end - 1] = np.nan ii_next_excitation = min(len(i_excitation), ii_next_excitation + 1) if ( ii_next_refocusing > 0 and i_refocusing[ii_next_refocusing - 1] == i_period_end ): k = -k ii_next_refocusing = min(len(i_refocusing), ii_next_refocusing + 1) k = k.reshape((-1, 1)) # To be compatible with np.concatenate k_traj_adc = [] for _k_traj_row in k_traj: result = np.interp( xp=np.array(range(1, k_traj.shape[1] + 1)) * self.grad_raster_time, fp=_k_traj_row, x=k_time, ) k_traj_adc.append(result) k_traj_adc = np.stack(k_traj_adc) t_adc = k_time return k_traj_adc, k_traj, t_excitation, t_refocusing, t_adc def calculate_kspacePP( self, trajectory_delay: Union[int, float, np.ndarray] = 0, gradient_offset: int = 0, ) -> Tuple[np.array, np.array, np.array, np.array, np.array]: """ Calculates the k-space trajectory of the entire pulse sequence. Parameters ---------- trajectory_delay : int, default=0 Compensation factor in seconds (s) to align ADC and gradients in the reconstruction. Returns ------- k_traj_adc : numpy.array K-space trajectory sampled at `t_adc` timepoints. k_traj : numpy.array K-space trajectory of the entire pulse sequence. t_excitation : numpy.array Excitation timepoints. t_refocusing : numpy.array Refocusing timepoints. t_adc : numpy.array Sampling timepoints. """ if np.any(np.abs(trajectory_delay) > 100e-6): raise Warning( f"Trajectory delay of {trajectory_delay * 1e6} us is suspiciously high" ) total_duration = np.sum(self.block_durations) gw_data, tfp_excitation, tfp_refocusing, t_adc, _ = self.waveforms_and_times() ng = len(gw_data) # Gradient delay handling if isinstance(trajectory_delay, (int, float)): gradient_delays = [trajectory_delay] * ng else: assert ( len(trajectory_delay) == ng ) # Need to have same number of gradient channels gradient_delays = [trajectory_delay] * ng # Gradient offset handling if isinstance(gradient_offset, (int, float)): gradient_offset = [gradient_offset] * ng else: assert ( len(gradient_offset) == ng ) # Need to have same number of gradient channels # Convert data to piecewise polynomials gw_pp = [] gw_pp_MATLAB = [] for j in range(ng): wave_cnt = gw_data[j].shape[1] if wave_cnt == 0: if np.abs(gradient_offset[j]) <= eps: continue else: gw = np.array(([0, total_duration], [0, 0])) else: gw = gw_data[j] # Now gw contains the waveform from the current axis if np.abs(gradient_delays[j]) > eps: gw[1] = gw[1] - gradient_delays[j] # Anisotropic gradient delay support if not np.all(np.isfinite(gw)): raise Warning("Not all elements of the generated waveform are finite.") teps = 1e-12 if gw[0, 0] > 0 and gw[1, -1] < total_duration: # teps terms to avoid integration errors over extended periods of time _temp1 = np.array(([-teps, gw[0, 0] - teps], [0, 0])) _temp2 = np.array(([gw[0, -1] + teps, total_duration + teps], [0, 0])) gw = np.hstack((_temp1, gw, _temp2)) elif gw[0, 0] > 0: _temp = np.array(([-teps, gw[0, 0] - teps], [0, 0])) gw = np.hstack((_temp, gw)) elif gw[0, -1] < total_duration: _temp = np.array(([gw[0, -1] + teps, total_duration + teps], [0, 0])) gw = np.hstack((gw, _temp)) if np.abs(gradient_offset[j]) > eps: gw[1:] += gradient_offset[j] gw[1][gw[1] == -0.0] = 0.0 # Specify window to be same as domain prevent numpy from remapping to [-1, 1] polyfit = [ np.polynomial.Polynomial.fit( gw[0, i : (i + 2)], gw[1, i : (i + 2)], deg=1, window=gw[0, i : (i + 2)], ) for i in range(len(gw[0]) - 1) ] polyfit = np.stack(polyfit) gw_pp.append(polyfit) ### """ Fix coefs for consistency with MATLAB: 1. MATLAB returns coefficients in descending order whereas Numpy returns coefficients in ascending order. 2. MATLAB returns local coefficients that will NOT match Numpy's outputs. Refer to the equation under the "output arguments" section of `mkpp` MATLAB docs to convert and match coefficient outputs. 3. Finally, MATLAB seems to not store any -1 < x < 1 coefficients, so we zero them out. """ polyfit_MATLAB = [] for i in range(len(polyfit)): polyfit_MATLAB_i = copy(polyfit[i]) lower = polyfit_MATLAB_i.domain[0] co = polyfit_MATLAB_i.coef co = co[::-1] # Reverse a = co[0] b = co[1] + (a * lower) if -1 < a < 1: # to investigate a = 0 if -1 < b < 1: b = 0 # co = [b, a] # Un-reverse for Numpy co = [a, b] polyfit_MATLAB_i.coef = co polyfit_MATLAB.append(polyfit_MATLAB_i) gw_pp_MATLAB.append(polyfit_MATLAB) ### # Calculate slice positions. For now we entirely rely on the excitation -- ignoring complicated interleaved # refocused sequences if len(tfp_excitation) > 0: # Position in x, y, z slice_pos = np.zeros((len(gw_data), tfp_excitation.shape[1])) for j in range(len(gw_data)): if gw_pp[j] is None: slice_pos[j] = np.empty_like((1, slice_pos.shape[1])) else: slice_pos[j] = np.divide( tfp_excitation[1], self.ppval_numpy(gw_pp[j], tfp_excitation[0]) ) slice_pos[~np.isfinite(slice_pos)] = 0 # Reset undefined to 0 t_slice_pos = tfp_excitation[0] else: slice_pos = [] t_slice_pos = [] # FNINT def fnint(arr_poly): pieces = len(arr_poly) breaks = np.stack([pp.domain for pp in arr_poly]) breaks = np.append(breaks[:, 0], breaks[-1, 1]) coefs = np.stack([pp.coef for pp in arr_poly]) order = len(arr_poly[1].coef) dim = 1 coefs = coefs / np.tile(range(order, 0, -1), (dim * pieces, 1)) xs = np.diff(breaks[:-1]) index = np.arange(pieces - 1) vv = xs * coefs[index, 0] for i in range(1, order): vv = xs * (vv + coefs[index, i]) last = np.cumsum(np.insert(vv, 0, 0)).reshape((-1, 1)) coefs = np.hstack((coefs[:, :order], last)) arr_poly_integ = [] for i in range(pieces): arr_poly_integ.append( np.polynomial.Polynomial( coefs[i], domain=[breaks[i], breaks[i + 1]], window=[breaks[i], breaks[i + 1]], ) ) return arr_poly_integ, coefs, breaks # ========= # Integrate waveforms as PPs to produce gradient moments gm_pp = [] tc = [] for i in range(ng): if gw_pp[i] is None: continue res_fnint, res_coefs, res_breaks = fnint(gw_pp_MATLAB[i]) gm_pp.append(res_fnint) tc.append(res_breaks) # "Sample" ramps for display purposes otherwise piecewise-linear display (plot) fails ii = np.nonzero(np.abs(res_coefs[:, 0]) > eps)[0] if len(ii) > 0: tca = [] for j in range(len(ii)): res = ( np.arange( np.floor(float(res_breaks[ii[j]] / self.grad_raster_time)), np.ceil( (res_breaks[ii[j] + 1] / self.grad_raster_time) + 1 ), ) * self.grad_raster_time ) tca.extend(res) tc.append(tca) tc = np.array(list(chain(*tc))) if len(tfp_excitation) == 0: t_excitation = np.array([]) else: t_excitation = tfp_excitation[0] if len(tfp_refocusing) == 0: t_refocusing = np.array([]) else: t_refocusing = tfp_refocusing[0] t_acc = 1e-10 # Temporal accuracy t_acc_inv = 1 / t_acc # tc = self.__flatten_jagged_arr(tc) t_ktraj = t_acc * np.unique( np.round( t_acc_inv * np.array( [ *tc, 0, *np.array(t_excitation) * 2 - self.rf_raster_time, *np.array(t_excitation) - self.rf_raster_time, *t_excitation, *np.array(t_refocusing) - self.rf_raster_time, *t_refocusing, *t_adc, total_duration, ] ) ) ) i_excitation = np.isin(t_ktraj, t_acc * np.round(t_acc_inv * t_excitation)) i_excitation = np.nonzero(i_excitation)[0] # Convert boolean array into indices i_refocusing = np.isin(t_ktraj, t_acc * np.round(t_acc_inv * t_refocusing)) i_refocusing = np.nonzero(i_refocusing)[0] # Convert boolean array into indices i_adc = np.isin(t_ktraj, t_acc * np.round(t_acc_inv * t_adc)) i_adc = np.nonzero(i_adc)[0] # Convert boolean array into indices i_periods = np.unique([1, *i_excitation, *i_refocusing, len(t_ktraj) - 1]) if len(i_excitation) > 0: ii_next_excitation = 1 else: ii_next_excitation = 0 if len(i_refocusing) > 0: ii_next_refocusing = 1 else: ii_next_refocusing = 0 k_traj = np.zeros((3, len(t_ktraj))) for i in range(ng): if gw_pp_MATLAB[i] is None: continue it = np.logical_and( t_ktraj >= t_acc * np.round(t_acc_inv * res_breaks[0]), t_ktraj <= t_acc * np.round(t_acc_inv * res_breaks[-1]), ) k_traj[i, it] = self.ppval_MATLAB(gm_pp[i], t_ktraj[it]) if t_ktraj[it[-1]] < t_ktraj[-1]: k_traj[i, it[-1] + 1 :] = k_traj[i, it[-1]] # Convert gradient moments to kspace dk = -k_traj[:, 0] for i in range(len(i_periods) - 1): i_period = i_periods[i] i_period_end = i_periods[i + 1] if ii_next_excitation > 0 and i_excitation[ii_next_excitation] == i_period: if np.abs(t_ktraj[i_period] - t_excitation[ii_next_excitation]) > t_acc: raise Warning( f"np.abs(t_ktraj[i_period]-t_excitation[ii_next_excitation]) < {t_acc} failed for ii_next_excitation={ii_next_excitation} error={t_ktraj(i_period) - t_excitation(ii_next_excitation)}" ) dk = -k_traj[:, i_period] if i_period > 1: # Use nans to mark the excitation points since they interrupt the plots k_traj[:, i_period - 1] = np.NaN # -1 on len(i_excitation) for 0-based indexing ii_next_excitation = np.minimum( len(i_excitation) - 1, ii_next_excitation + 1 ) elif ( ii_next_refocusing > 0 and i_refocusing[ii_next_refocusing] == i_period ): dk = -k_traj[:, i_period] dk = -2 * k_traj[:, i_period] - dk # -1 on len(i_excitation) for 0-based indexing ii_next_refocusing = np.minimum( len(i_refocusing) - 1, ii_next_refocusing + 1 ) k_traj[:, i_period:i_period_end] = ( k_traj[:, i_period:i_period_end] + dk[:, None] ) k_traj[:, i_period_end] = k_traj[:, i_period_end] + dk k_traj_adc = k_traj[:, i_adc] return k_traj_adc, k_traj, t_excitation, t_refocusing, t_adc def check_timing(self) -> Tuple[bool, List[str]]: """ Checks timing of all blocks and objects in the sequence optionally returns the detailed error log. This function also modifies the sequence object by adding the field "TotalDuration" to sequence definitions. Returns ------- is_ok : bool Boolean flag indicating timing errors. error_report : str Error report in case of timing errors. """ error_report = [] is_ok = True num_blocks = len(self.block_events) total_duration = 0 for block_counter in range(num_blocks): block = self.get_block(block_counter + 1) events = [e for e in vars(block).values() if e is not None] res, rep, duration = ext_check_timing(self.system, *events) is_ok = is_ok and res # Check the stored total block duration if np.abs(duration - self.block_durations[block_counter]) > eps: rep += "Inconsistency between the stored block duration and the duration of the block content" is_ok = False duration = self.block_durations[block_counter] # Check RF dead times if block.rf is not None: if block.rf.delay - block.rf.dead_time < -eps: rep += ( f"Delay of {block.rf.delay * 1e6} us is smaller than the RF dead time " f"{block.rf.dead_time * 1e6} us" ) is_ok = False if ( block.rf.delay + block.rf.t[-1] + block.rf.ringdown_time - duration > eps ): rep += ( f"Time between the end of the RF pulse at {block.rf.delay + block.rf.t[-1]} and the end " f"of the block at {duration * 1e6} us is shorter than rf_ringdown_time" ) is_ok = False # Check ADC dead times if block.adc is not None: if block.adc.delay - self.system.adc_dead_time < -eps: rep += "adc.delay < system.adc_dead_time" is_ok = False if ( block.adc.delay + block.adc.num_samples * block.adc.dwell + self.system.adc_dead_time - duration > eps ): rep += "adc: system.adc_dead_time (post-ADC) violation" is_ok = False # Update report if len(rep) != 0: error_report.append(f"Event: {block_counter} - {rep}\n") total_duration += duration # Check if all the gradients in the last block are ramped down properly if len(events) != 0 and all([isinstance(e, SimpleNamespace) for e in events]): for e in range(len(events)): if not isinstance(events[e], list) and events[e].type == "grad": if events[e].last != 0: error_report.append( f"Event {num_blocks - 1} gradients do not ramp to 0 at the end of the sequence" ) self.set_definition("TotalDuration", total_duration) return is_ok, error_report def duration(self) -> Tuple[int, int, np.ndarray]: """ Returns the total duration of this sequence, and the total count of blocks and events. Returns ------- duration : int Duration of this sequence in seconds (s). num_blocks : int Number of blocks in this sequence. event_count : np.ndarray Number of events in this sequence. """ num_blocks = len(self.block_events) event_count = np.zeros(len(self.block_events[1])) duration = 0 for block_counter in range(num_blocks): event_count += self.block_events[block_counter + 1] > 0 duration += self.block_durations[block_counter] return duration, num_blocks, event_count def __flatten_jagged_arr(self, arr: np.array) -> np.array: # Sanity check: we don't need to do anything if we have a flat array def __flat_check(arr: np.array) -> bool: return all([not isinstance(x, Iterable) for x in arr]) if __flat_check(arr): return arr # Flatten the array simply -- 1 level deep arr_flattened = list(itertools.chain(*arr)) # Not flat yet? if __flat_check(arr_flattened): return arr_flattened # Flatten the array -- 2 levels deep any_ragged = [isinstance(x, Iterable) for x in arr_flattened] if np.any(any_ragged): idx_ragged = np.array(np.where(any_ragged)[0]) for i in range(len(idx_ragged)): ii = idx_ragged[i] # If we are not at the end of the list, we need to update the indices of the remaining elements # Because once we expand and insert this list element, the indices of the remaining elements # will be shifted by len(this element) if i != len(idx_ragged) - 1: idx_ragged[i + 1 :] += len(arr_flattened[ii]) arr_flattened = np.insert(arr_flattened, ii, arr_flattened[ii]) return arr_flattened def flip_grad_axis(self, axis: str) -> None: """ Invert all gradients along the corresponding axis/channel. The function acts on all gradient objects already added to the sequence object. Parameters ---------- axis : str Gradients to invert or scale. Must be one of 'x', 'y' or 'z'. """ self.mod_grad_axis(axis, modifier=-1) def get_block(self, block_index: int) -> SimpleNamespace: """ Return a block of the sequence specified by the index. The block is created from the sequence data with all events and shapes decompressed. See also: - `pypulseq.Sequence.sequence.Sequence.set_block()`. - `pypulseq.Sequence.sequence.Sequence.add_block()`. Parameters ---------- block_index : int Index of block to be retrieved from `Sequence`. Returns ------- SimpleNamespace Event identified by `block_index`. """ return block.get_block(self, block_index) def get_definition(self, key: str) -> str: """ Return value of the definition specified by the key. These definitions can be added manually or read from the header of a sequence file defined in the sequence header. An empty array is returned if the key is not defined. See also `pypulseq.Sequence.sequence.Sequence.set_definition()`. Parameters ---------- key : str Key of definition to retrieve. Returns ------- str Definition identified by `key` if found, else returns ''. """ if key in self.definitions: return self.definitions[key] else: return "" def get_extension_type_ID(self, extension_string: str) -> int: """ Get numeric extension ID for `extension_string`. Will automatically create a new ID if unknown. Parameters ---------- extension_string : str Given string extension ID. Returns ------- extension_id : int Numeric ID for given string extension ID. """ if extension_string not in self.extension_string_idx: if len(self.extension_numeric_idx) == 0: extension_id = 1 else: extension_id = 1 + max(self.extension_numeric_idx) self.extension_numeric_idx.append(extension_id) self.extension_string_idx.append(extension_string) assert len(self.extension_numeric_idx) == len(self.extension_string_idx) else: num = self.extension_string_idx.index(extension_string) extension_id = self.extension_numeric_idx[num] return extension_id def get_extension_type_string(self, extension_id: int) -> str: """ Get string extension ID for `extension_id`. Parameters ---------- extension_id : int Given numeric extension ID. Returns ------- extension_str : str String ID for the given numeric extension ID. Raises ------ ValueError If given numeric extension ID is unknown. """ if extension_id in self.extension_numeric_idx: num = self.extension_numeric_idx.index(extension_id) else: raise ValueError( f"Extension for the given ID - {extension_id} - is unknown." ) extension_str = self.extension_string_idx[num] return extension_str def mod_grad_axis(self, axis: str, modifier: int) -> None: """ Invert or scale all gradients along the corresponding axis/channel. The function acts on all gradient objects already added to the sequence object. Parameters ---------- axis : str Gradients to invert or scale. Must be one of 'x', 'y' or 'z'. modifier : int Scaling value. Raises ------ ValueError If invalid `axis` is passed. Must be one of 'x', 'y','z'. RuntimeError If same gradient event is used on multiple axes. """ if axis not in ["x", "y", "z"]: raise ValueError( f"Invalid axis. Must be one of 'x', 'y','z'. Passed: {axis}" ) channel_num = ["x", "y", "z"].index(axis) other_channels = [0, 1, 2] other_channels.remove(channel_num) # Go through all event table entries and list gradient objects in the library all_grad_events = np.array(list(self.block_events.values())) all_grad_events = all_grad_events[:, 2:5] selected_events = np.unique(all_grad_events[:, channel_num]) selected_events = selected_events[selected_events != 0] other_events = np.unique(all_grad_events[:, other_channels]) if len(np.intersect1d(selected_events, other_events)) > 0: raise RuntimeError( "mod_grad_axis does not yet support the same gradient event used on multiple axes." ) for i in range(len(selected_events)): self.grad_library.data[selected_events[i]][0] *= modifier if ( self.grad_library.type[selected_events[i]] == "g" and self.grad_library.lengths[selected_events[i]] == 5 ): # Need to update first and last fields self.grad_library.data[selected_events[i]][3] *= modifier self.grad_library.data[selected_events[i]][4] *= modifier def plot( self, label: str = str(), show_blocks: bool = False, save: bool = False, time_range=(0, np.inf), time_disp: str = "s", grad_disp: str = "kHz/m", plot_now: bool = True ) -> None: """ Plot `Sequence`. Parameters ---------- label : str, defualt=str() Plot label values for ADC events: in this example for LIN and REP labels; other valid labes are accepted as a comma-separated list. save : bool, default=False Boolean flag indicating if plots should be saved. The two figures will be saved as JPG with numerical suffixes to the filename 'seq_plot'. show_blocks : bool, default=False Boolean flag to indicate if grid and tick labels at the block boundaries are to be plotted. time_range : iterable, default=(0, np.inf) Time range (x-axis limits) for plotting the sequence. Default is 0 to infinity (entire sequence). time_disp : str, default='s' Time display type, must be one of `s`, `ms` or `us`. grad_disp : str, default='s' Gradient display unit, must be one of `kHz/m` or `mT/m`. plot_now : bool, default=True If true, function immediately shows the plots, blocking the rest of the code until plots are exited. If false, plots are shown when plt.show() is called. Useful if plots are to be modified. plot_type : str, default='Gradient' Gradients display type, must be one of either 'Gradient' or 'Kspace'. """ mpl.rcParams["lines.linewidth"] = 0.75 # Set default Matplotlib linewidth valid_time_units = ["s", "ms", "us"] valid_grad_units = ["kHz/m", "mT/m"] valid_labels = get_supported_labels() if ( not all([isinstance(x, (int, float)) for x in time_range]) or len(time_range) != 2 ): raise ValueError("Invalid time range") if time_disp not in valid_time_units: raise ValueError("Unsupported time unit") if grad_disp not in valid_grad_units: raise ValueError( "Unsupported gradient unit. Supported gradient units are: " + str(valid_grad_units) ) fig1, fig2 = plt.figure(1), plt.figure(2) sp11 = fig1.add_subplot(311) sp12 = fig1.add_subplot(312, sharex=sp11) sp13 = fig1.add_subplot(313, sharex=sp11) fig2_subplots = [ fig2.add_subplot(311, sharex=sp11), fig2.add_subplot(312, sharex=sp11), fig2.add_subplot(313, sharex=sp11), ] t_factor_list = [1, 1e3, 1e6] t_factor = t_factor_list[valid_time_units.index(time_disp)] g_factor_list = [1e-3, 1e3 / self.system.gamma] g_factor = g_factor_list[valid_grad_units.index(grad_disp)] t0 = 0 label_defined = False label_idx_to_plot = [] label_legend_to_plot = [] label_store = dict() for i in range(len(valid_labels)): label_store[valid_labels[i]] = 0 if valid_labels[i] in label.upper(): label_idx_to_plot.append(i) label_legend_to_plot.append(valid_labels[i]) if len(label_idx_to_plot) != 0: p = parula.main(len(label_idx_to_plot) + 1) label_colors_to_plot = p(np.arange(len(label_idx_to_plot))) label_colors_to_plot = np.roll(label_colors_to_plot, -1, axis=0).tolist() # Block timings block_edges = np.cumsum([0, *self.block_durations]) block_edges_in_range = block_edges[ (block_edges >= time_range[0]) * (block_edges <= time_range[1]) ] if show_blocks: for sp in [sp11, sp12, sp13, *fig2_subplots]: sp.set_xticks(t_factor * block_edges_in_range) sp.set_xticklabels(rotation=90) for block_counter in range(len(self.block_events)): block = self.get_block(block_counter + 1) is_valid = time_range[0] <= t0 <= time_range[1] if is_valid: if getattr(block, "label", None) is not None: for i in range(len(block.label)): if block.label[i].type == "labelinc": label_store[block.label[i].label] += block.label[i].value else: label_store[block.label[i].label] = block.label[i].value label_defined = True if getattr(block, "adc", None) is not None: # ADC adc = block.adc # From Pulseq: According to the information from Klaus Scheffler and indirectly from Siemens this # is the present convention - the samples are shifted by 0.5 dwell t = adc.delay + (np.arange(int(adc.num_samples)) + 0.5) * adc.dwell sp11.plot(t_factor * (t0 + t), np.zeros(len(t)), "rx") sp13.plot( t_factor * (t0 + t), np.angle( np.exp(1j * adc.phase_offset) * np.exp(1j * 2 * np.pi * t * adc.freq_offset) ), "b.", markersize=0.25, ) if label_defined and len(label_idx_to_plot) != 0: cycler = mpl.cycler(color=label_colors_to_plot) sp11.set_prop_cycle(cycler) label_colors_to_plot = np.roll( label_colors_to_plot, -1, axis=0 ).tolist() arr_label_store = list(label_store.values()) lbl_vals = np.take(arr_label_store, label_idx_to_plot) t = t0 + adc.delay + (adc.num_samples - 1) / 2 * adc.dwell _t = [t_factor * t] * len(lbl_vals) # Plot each label individually to retrieve each corresponding Line2D object p = itertools.chain.from_iterable( [ sp11.plot(__t, _lbl_vals, ".") for __t, _lbl_vals in zip(_t, lbl_vals) ] ) if len(label_legend_to_plot) != 0: sp11.legend(p, label_legend_to_plot, loc="upper left") label_legend_to_plot = [] if getattr(block, "rf", None) is not None: # RF rf = block.rf tc, ic = calc_rf_center(rf) time = rf.t signal = rf.signal if np.abs(signal[0]) != 0: signal = np.insert(signal, obj=0, values=0) time = np.insert(time, obj=0, values=time[0]) ic += 1 if np.abs(signal[-1]) != 0: signal = np.append(signal, 0) time = np.append(time, time[-1]) sp12.plot(t_factor * (t0 + time + rf.delay), np.abs(signal)) sp13.plot( t_factor * (t0 + time + rf.delay), np.angle( signal * np.exp(1j * rf.phase_offset) * np.exp(1j * 2 * math.pi * time * rf.freq_offset) ), t_factor * (t0 + tc + rf.delay), np.angle( signal[ic] * np.exp(1j * rf.phase_offset) * np.exp(1j * 2 * math.pi * time[ic] * rf.freq_offset) ), "xb", ) grad_channels = ["gx", "gy", "gz"] for x in range(len(grad_channels)): # Gradients if getattr(block, grad_channels[x], None) is not None: grad = getattr(block, grad_channels[x]) if grad.type == "grad": # We extend the shape by adding the first and the last points in an effort of making the # display a bit less confusing... time = grad.delay + [0, *grad.tt, grad.shape_dur] waveform = g_factor * np.array( (grad.first, *grad.waveform, grad.last) ) else: time = np.cumsum( [ 0, grad.delay, grad.rise_time, grad.flat_time, grad.fall_time, ] ) waveform = ( g_factor * grad.amplitude * np.array([0, 0, 1, 1, 0]) ) fig2_subplots[x].plot(t_factor * (t0 + time), waveform) t0 += self.block_durations[block_counter] grad_plot_labels = ["x", "y", "z"] sp11.set_ylabel("ADC") sp12.set_ylabel("RF mag (Hz)") sp13.set_ylabel("RF/ADC phase (rad)") sp13.set_xlabel(f"t ({time_disp})") for x in range(3): _label = grad_plot_labels[x] fig2_subplots[x].set_ylabel(f"G{_label} ({grad_disp})") fig2_subplots[-1].set_xlabel(f"t ({time_disp})") # Setting display limits disp_range = t_factor * np.array([time_range[0], min(t0, time_range[1])]) [x.set_xlim(disp_range) for x in [sp11, sp12, sp13, *fig2_subplots]] # Grid on for sp in [sp11, sp12, sp13, *fig2_subplots]: sp.grid() fig1.tight_layout() fig2.tight_layout() if save: fig1.savefig("seq_plot1.jpg") fig2.savefig("seq_plot2.jpg") if plot_now: plt.show() def ppval_numpy(self, arr_np_poly: np.ndarray, xq: np.ndarray): """ Perform piece-wise polynomial evaluation on Numpy's polyfit objects. Parameters ========== arr_np_poly : Iterable[np.poly1d] xq : np.array Returns ======= """ result = [] result2 = [] breaks = np.array([p.domain for p in arr_np_poly]) breaks = np.array([*breaks[::2].flatten(), breaks[-1, -1]]) last_match = 0 for x in xq: # Evaluate polynomial only when we find x's corresponding pp window for i in range(last_match, len(arr_np_poly)): corresponding_pp = x > arr_np_poly[i].domain if corresponding_pp.tolist() == [True, False]: value = np.polynomial.polynomial.polyval(x, arr_np_poly[i].coef) result.append(value) last_match = i break corresponding_pp2 = np.nonzero((x < breaks))[0][0] value2 = np.polynomial.polynomial.polyval( x, arr_np_poly[corresponding_pp2].coef ) result2.append(value2) return result def ppval_MATLAB(self, arr_MATLAB_poly: np.ndarray, xq: np.ndarray) -> np.array: """ Perform piece-wise polynomial evaluation on MATLAB's polyfit objects. Returns ======= """ def __ppval(c, x, b, i, n): return c[n] * (x - b) ** i + __ppval(c, x, b, i + 1, n - 1) if n >= 0 else 0 coefs = np.array([p.coef for p in arr_MATLAB_poly]) breaks = np.array([p.domain for p in arr_MATLAB_poly]) breaks = np.array([*breaks[::2].flatten(), breaks[-1, -1]]) result = [] for x in xq: corresponding_pp = np.nonzero((x < breaks))[0][0] c = coefs[corresponding_pp - 1] b = breaks[corresponding_pp - 1] # c = coefs[2] # res = c[0] * (x - breaks[2]) ** 2 + c[1] * (x - breaks[2]) + c[2] res = __ppval(c, x, b, i=0, n=len(c) - 1) result.append(res) return np.array(result) def read(self, file_path: str, detect_rf_use: bool = False) -> None: """ Read `.seq` file from `file_path`. Parameters ---------- detect_rf_use file_path : str Path to `.seq` file to be read. """ read(self, path=file_path, detect_rf_use=detect_rf_use) def register_adc_event(self, event: EventLibrary) -> int: return block.register_adc_event(self, event) def register_grad_event( self, event: SimpleNamespace ) -> Union[int, Tuple[int, int]]: return block.register_grad_event(self, event) def register_label_event(self, event: SimpleNamespace) -> int: return block.register_label_event(self, event) def register_rf_event(self, event: SimpleNamespace) -> Tuple[int, List[int]]: return block.register_rf_event(self, event) def rf_from_lib_data(self, lib_data: list, use: str = str()) -> SimpleNamespace: """ Construct RF object from `lib_data`. Parameters ---------- lib_data : list RF envelope. use : str, default=str() RF event use. Returns ------- rf : SimpleNamespace RF object constructed from `lib_data`. """ rf = SimpleNamespace() rf.type = "rf" amplitude, mag_shape, phase_shape = lib_data[0], lib_data[1], lib_data[2] shape_data = self.shape_library.data[mag_shape] compressed = SimpleNamespace() compressed.num_samples = shape_data[0] compressed.data = shape_data[1:] mag = decompress_shape(compressed) shape_data = self.shape_library.data[phase_shape] compressed.num_samples = shape_data[0] compressed.data = shape_data[1:] phase = decompress_shape(compressed) rf.signal = amplitude * mag * np.exp(1j * 2 * np.pi * phase) time_shape = lib_data[3] if time_shape > 0: shape_data = self.shape_library.data[time_shape] compressed.num_samples = shape_data[0] compressed.data = shape_data[1:] rf.t = decompress_shape(compressed) * self.rf_raster_time rf.shape_dur = ( np.ceil((rf.t[-1] - eps) / self.rf_raster_time) * self.rf_raster_time ) else: # Generate default time raster on the fly rf.t = (np.arange(1, len(rf.signal) + 1) - 0.5) * self.rf_raster_time rf.shape_dur = len(rf.signal) * self.rf_raster_time rf.delay = lib_data[4] rf.freq_offset = lib_data[5] rf.phase_offset = lib_data[6] rf.dead_time = self.system.rf_dead_time rf.ringdown_time = self.system.rf_ringdown_time if use != "": use_cases = { "e": "excitation", "r": "refocusing", "i": "inversion", "s": "saturation", "p": "preparation", } rf.use = use_cases[use] if use in use_cases else "undefined" return rf def set_block(self, block_index: int, *args: SimpleNamespace) -> None: """ Replace block at index with new block provided as block structure, add sequence block, or create a new block from events and store at position specified by index. The block or events are provided in uncompressed form and will be stored in the compressed, non-redundant internal libraries. See also: - `pypulseq.Sequence.sequence.Sequence.get_block()` - `pypulseq.Sequence.sequence.Sequence.add_block()` Parameters ---------- block_index : int Index at which block is replaced. args : SimpleNamespace Block or events to be replaced/added or created at `block_index`. """ block.set_block(self, block_index, *args) def set_definition( self, key: str, value: Union[float, int, list, np.ndarray, str, tuple] ) -> None: """ Modify a custom definition of the sequence. Set the user definition 'key' to value 'value'. If the definition does not exist it will be created. See also `pypulseq.Sequence.sequence.Sequence.get_definition()`. Parameters ---------- key : str Definition key. value : int, list, np.ndarray, str or tuple Definition value. """ if key == "FOV": if np.max(value) > 1: text = "Definition FOV uses values exceeding 1 m. " text += "New Pulseq interpreters expect values in units of meters." warn(text) self.definitions[key] = value def set_extension_string_ID(self, extension_str: str, extension_id: int) -> None: """ Set numeric ID for the given string extension ID. Parameters ---------- extension_str : str Given string extension ID. extension_id : int Given numeric extension ID. Raises ------ ValueError If given numeric or string extension ID is not unique. """ if ( extension_str in self.extension_string_idx or extension_id in self.extension_numeric_idx ): raise ValueError("Numeric or string ID is not unique") self.extension_numeric_idx.append(extension_id) self.extension_string_idx.append(extension_str) assert len(self.extension_numeric_idx) == len(self.extension_string_idx) def test_report(self) -> str: """ Analyze the sequence and return a text report. """ return ext_test_report(self) def waveforms_and_times( self, append_RF: bool = False ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """ Decompress the entire gradient waveform. Returns gradient waveforms as a tuple of `np.ndarray` of `gradient_axes` (typically 3) dimensions. Each `np.ndarray` contains timepoints and the corresponding gradient amplitude values. Additional return values are time points of excitations, refocusings and ADC sampling points. Parameters ---------- append_RF : bool, default=False Boolean flag to indicate if RF wave shapes are to be appended after the gradients. Returns ------- wave_data : np.ndarray tfp_excitation : np.ndarray Contains time moments, frequency and phase offsets of the excitation RF pulses (similar for ` tfp_refocusing`). tfp_refocusing : np.ndarray t_adc: np.ndarray Contains times of all ADC sample points. fp_adc : np.ndarray Contains frequency and phase offsets of each ADC object (not samples). """ grad_channels = ["gx", "gy", "gz"] num_blocks = len(self.block_events) # Collect shape pieces if append_RF: shape_channels = len(grad_channels) + 1 # Last 'channel' is RF else: shape_channels = len(grad_channels) shape_pieces = np.empty((shape_channels, num_blocks), dtype="object") # Also collect RF and ADC timing data # t_excitation, t_refocusing, t_adc tfp_excitation = [] tfp_refocusing = [] t_adc = [] fp_adc = [] curr_dur = 0 out_len = np.zeros(shape_channels) # Last 'channel' is RF for block_counter in range(num_blocks): block = self.get_block(block_counter + 1) for j in range(len(grad_channels)): grad = getattr(block, grad_channels[j]) if grad is not None: # Gradients if grad.type == "grad": # Check if we have an extended trapezoid or an arbitrary gradient on a regular raster tt_rast = grad.tt / self.grad_raster_time + 0.5 if np.all( np.abs(tt_rast - np.arange(1, len(tt_rast) + 1)) < eps ): # Arbitrary gradient """ Arbitrary gradient: restore & recompress shape - if we had a trapezoid converted to shape we have to find the "corners" and we can eliminate internal samples on the straight segments but first we have to restore samples on the edges of the gradient raster intervals for that we need the first sample. """ max_abs = np.max(np.abs(grad.waveform)) odd_step1 = np.array([grad.first, *(2 * grad.waveform)]) odd_step2 = odd_step1 * ( np.mod(np.arange(1, len(odd_step1) + 1), 2) * 2 - 1 ) waveform_odd_rest = np.cumsum(odd_step2) * ( np.mod(np.arange(1, len(odd_step2)), 2) * 2 - 1 ) else: # Extended trapezoid out_len[j] += len(grad.tt) shape_pieces[j, block_counter] = np.array( [ curr_dur + grad.delay + grad.tt, grad.waveform, ] ) else: if np.abs(grad.flat_time) > eps: out_len[j] += 4 _temp = np.vstack( ( curr_dur + grad.delay + np.cumsum( [ 0, grad.rise_time, grad.flat_time, grad.fall_time, ] ), grad.amplitude * np.array([0, 1, 1, 0]), ) ) shape_pieces[j, block_counter] = _temp else: out_len[j] += 3 _temp = np.vstack( ( curr_dur + grad.delay + np.cumsum([0, grad.rise_time, grad.fall_time]), grad.amplitude * np.array([0, 1, 0]), ) ) shape_pieces[j, block_counter] = _temp if block.rf is not None: # RF rf = block.rf t = rf.delay + calc_rf_center(rf)[0] if not hasattr(rf, "use") or block.rf.use in [ "excitation", "undefined", ]: tfp_excitation.append( [curr_dur + t, block.rf.freq_offset, block.rf.phase_offset] ) elif block.rf.use == "refocusing": tfp_refocusing.append( [curr_dur + t, block.rf.freq_offset, block.rf.phase_offset] ) if append_RF: rf_piece = np.array( [ curr_dur + rf.delay + rf.t, rf.signal * np.exp( 1j * (rf.phase_offset + 2 * np.pi * rf.freq_offset * rf.t) ), ] ) out_len[-1] += len(rf.t) if np.abs(rf.signal[0]) > 0: pre = np.array([[curr_dur + rf.delay + rf.t[0] - eps], [0]]) rf_piece = np.hstack((pre, rf_piece)) out_len[-1] += pre.shape[1] if np.abs(rf.signal[-1]) > 0: post = np.array([[curr_dur + rf.delay + rf.t[-1] + eps], [0]]) rf_piece = np.hstack((rf_piece, post)) out_len[-1] += post.shape[1] shape_pieces[-1, block_counter] = rf_piece if block.adc is not None: # ADC t_adc.extend( np.arange(block.adc.num_samples) + 0.5 * block.adc.dwell + block.adc.delay + curr_dur ) fp_adc.append([block.adc.freq_offset, block.adc.phase_offset]) curr_dur += self.block_durations[block_counter] # Collect wave data wave_data = np.empty(shape_channels, dtype="object") for j in range(shape_channels): wave_data[j] = np.zeros((2, int(out_len[j]))) # TODO: This is weird, and needs to be fixed. Time points are also complex this way, and spends 4 times more memory than necessary. if append_RF: wave_data[j] = np.zeros((2, int(out_len[j])), dtype=np.complex128) wave_cnt = np.zeros(shape_channels, dtype=int) for block_counter in range(num_blocks): for j in range(shape_channels): if shape_pieces[j, block_counter] is not None: wave_data_local = shape_pieces[j, block_counter] length = wave_data_local.shape[1] if ( wave_cnt[j] == 0 or wave_data[j][0, wave_cnt[j] - 1] != wave_data_local[0, 0] ): wave_data[j][ :, wave_cnt[j] + np.arange(length) ] = wave_data_local wave_cnt[j] += length else: wave_data[j][ :, wave_cnt[j] + np.arange(length - 1) ] = wave_data_local[:, 1:] wave_cnt[j] += length - 1 if wave_cnt[j] != len(np.unique(wave_data[j][0, : wave_cnt[j]])): raise Warning( "Not all elements of the generated time vector are unique." ) # Trim output data for j in range(shape_channels): if wave_cnt[j] < wave_data[j].shape[1]: wave_data[j] = wave_data[j][:, : wave_cnt[j]] tfp_excitation = np.array(tfp_excitation).transpose() tfp_refocusing = np.array(tfp_refocusing) t_adc = np.array(t_adc) fp_adc = np.array(fp_adc) return wave_data, tfp_excitation, tfp_refocusing, t_adc, fp_adc def waveforms_export(self, time_range=(0, np.inf)) -> dict: """ Plot `Sequence`. Parameters ---------- time_range : iterable, default=(0, np.inf) Time range (x-axis limits) for all waveforms. Default is 0 to infinity (entire sequence). Returns ------- all_waveforms: dict Dictionary containing the following sequence waveforms and time array(s): - `t_adc` - ADC timing array [seconds] - `t_rf` - RF timing array [seconds] - `t_rf_centers`: `rf_t_centers`, - `t_gx`: x gradient timing array, - `t_gy`: y gradient timing array, - `t_gz`: z gradient timing array, - `adc` - ADC complex signal (amplitude=1, phase=adc phase) [a.u.] - `rf` - RF complex signal - `rf_centers`: RF centers array, - `gx` - x gradient - `gy` - y gradient - `gz` - z gradient - `grad_unit`: [kHz/m], - `rf_unit`: [Hz], - `time_unit`: [seconds], """ # Check time range validity if ( not all([isinstance(x, (int, float)) for x in time_range]) or len(time_range) != 2 ): raise ValueError("Invalid time range") t0 = 0 adc_t_all = np.array([]) adc_signal_all = np.array([], dtype=complex) rf_t_all = np.array([]) rf_signal_all = np.array([], dtype=complex) rf_t_centers = np.array([]) rf_signal_centers = np.array([], dtype=complex) gx_t_all = np.array([]) gy_t_all = np.array([]) gz_t_all = np.array([]) gx_all = np.array([]) gy_all = np.array([]) gz_all = np.array([]) for block_counter in range(len(self.block_events)): # For each block block = self.get_block(block_counter + 1) # Retrieve it is_valid = ( time_range[0] <= t0 <= time_range[1] ) # Check if "current time" is within requested range. if is_valid: # Case 1: ADC if hasattr(block, "adc") and block.adc!=None: adc = block.adc # Get adc info # From Pulseq: According to the information from Klaus Scheffler and indirectly from Siemens this # is the present convention - the samples are shifted by 0.5 dwell t = adc.delay + (np.arange(int(adc.num_samples)) + 0.5) * adc.dwell adc_t = t0 + t adc_signal = np.exp(1j * adc.phase_offset) * np.exp( 1j * 2 * np.pi * t * adc.freq_offset ) adc_t_all = np.append(adc_t_all, adc_t) adc_signal_all = np.append(adc_signal_all, adc_signal) if hasattr(block, "rf") and block.rf!=None: rf = block.rf tc, ic = calc_rf_center(rf) t = rf.t + rf.delay tc = tc + rf.delay # Debug - visualize # sp12.plot(t_factor * (t0 + t), np.abs(rf.signal)) # plt.show() # sp13.plot(t_factor * (t0 + t), np.angle(rf.signal * np.exp(1j * rf.phase_offset) # * np.exp(1j * 2 * math.pi * rf.t * rf.freq_offset)), # t_factor * (t0 + tc), np.angle(rf.signal[ic] * np.exp(1j * rf.phase_offset) # * np.exp(1j * 2 * math.pi * rf.t[ic] * rf.freq_offset)), # 'xb') # plt.show() rf_t = t0 + t rf = ( rf.signal * np.exp(1j * rf.phase_offset) * np.exp(1j * 2 * math.pi * rf.t * rf.freq_offset) ) rf_t_all = np.append(rf_t_all, rf_t) rf_signal_all = np.append(rf_signal_all, rf) rf_t_centers = np.append(rf_t_centers, rf_t[ic]) rf_signal_centers = np.append(rf_signal_centers, rf[ic]) grad_channels = ['gx', 'gy', 'gz'] for x in range( len(grad_channels) ): # Check each gradient channel: x, y, and z if hasattr( block, grad_channels[x] ): # If this channel is on in current block grad = getattr(block, grad_channels[x]) if grad == None: break elif grad.type == "grad": # Arbitrary gradient option # In place unpacking of grad.t with the starred expression g_t = ( t0 + grad.delay + [ 0, *(grad.t + (grad.t[1] - grad.t[0]) / 2), grad.t[-1] + grad.t[1] - grad.t[0], ] ) g = 1e-3 * np.array((grad.first, *grad.waveform, grad.last)) else: # Trapezoid gradient option g_t = t0 + np.cumsum( [ 0, grad.delay, grad.rise_time, grad.flat_time, grad.fall_time, ] ) g = 1e-3 * grad.amplitude * np.array([0, 0, 1, 1, 0]) if grad.channel == "x": gx_t_all = np.append(gx_t_all, g_t) gx_all = np.append(gx_all, g) elif grad.channel == "y": gy_t_all = np.append(gy_t_all, g_t) gy_all = np.append(gy_all, g) elif grad.channel == "z": gz_t_all = np.append(gz_t_all, g_t) gz_all = np.append(gz_all, g) t0 += self.block_durations[ block_counter ] # "Current time" gets updated to end of block just examined all_waveforms = { "t_adc": adc_t_all, "t_rf": rf_t_all, "t_rf_centers": rf_t_centers, "t_gx": gx_t_all, "t_gy": gy_t_all, "t_gz": gz_t_all, "adc": adc_signal_all, "rf": rf_signal_all, "rf_centers": rf_signal_centers, "gx": gx_all, "gy": gy_all, "gz": gz_all, "grad_unit": "[kHz/m]", "rf_unit": "[Hz]", "time_unit": "[seconds]", } return all_waveforms def write(self, name: str, create_signature: bool = True) -> None: """ Write the sequence data to the given filename using the open file format for MR sequences. See also `pypulseq.Sequence.read_seq.read()`. Parameters ---------- name : str Filename of `.seq` file to be written to disk. create_signature : bool, default=True Boolean flag to indicate if the file has to be signed. """ write_seq(self, name, create_signature)