| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893 |
- import itertools
- import math
- from collections import OrderedDict
- from types import SimpleNamespace
- from typing import Tuple, List
- from typing import Union
- from warnings import warn
- import tkinter as tk
- try:
- from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg, NavigationToolbar2Tk
- from matplotlib.figure import Figure
- import matplotlib as mpl
- from matplotlib import pyplot as plt
- except Exception:
- FigureCanvasTkAgg = None
- NavigationToolbar2Tk = None
- Figure = None
- mpl = None
- plt = None
- try:
- from typing import Self
- except ImportError:
- from typing import TypeVar
- Self = TypeVar('Self', bound='Sequence')
- import numpy as np
- try:
- from scipy.interpolate import PPoly
- except Exception:
- PPoly = None
- from LF_scanner.pypulseq import eps
- from LF_scanner.pypulseq.calc_rf_center import calc_rf_center
- from LF_scanner.pypulseq.check_timing import check_timing as ext_check_timing
- from LF_scanner.pypulseq.decompress_shape import decompress_shape
- from LF_scanner.pypulseq.event_lib import EventLibrary
- from LF_scanner.pypulseq.opts import Opts
- from LF_scanner.pypulseq.supported_labels_rf_use import get_supported_labels
- from LF_scanner.version import major, minor, revision
- from LF_scanner.pypulseq.block_to_events import block_to_events
- from LF_scanner.pypulseq.utils.cumsum import cumsum
- from copy import deepcopy
- from LF_scanner.pypulseq.Sequence import block
- try:
- from LF_scanner.pypulseq.Sequence import parula
- except Exception:
- parula = None
- from LF_scanner.pypulseq.Sequence.ext_test_report import ext_test_report
- from LF_scanner.pypulseq.Sequence.read_seq import read
- from LF_scanner.pypulseq.Sequence.write_seq import write as write_seq
- try:
- from LF_scanner.pypulseq.Sequence.calc_pns import calc_pns
- except Exception:
- calc_pns = None
- try:
- from LF_scanner.pypulseq.Sequence.calc_grad_spectrum import calculate_gradient_spectrum
- except Exception:
- calculate_gradient_spectrum = None
- class Sequence:
- """
- Generate sequences and read/write sequence files. This class defines properties and methods to define a complete MR
- sequence including RF pulses, gradients, ADC events, etc. The class provides an implementation of the open MR
- sequence format defined by the Pulseq project. See http://pulseq.github.io/.
- See also `demo_read.py`, `demo_write.py`.
- """
- version_major = int(major)
- version_minor = int(minor)
- version_revision = revision
- def __init__(self, system=None, use_block_cache=True):
- if system == None:
- system = Opts()
- # =========
- # EVENT LIBRARIES
- # =========
- self.adc_library = EventLibrary() # Library of ADC events
- self.delay_library = EventLibrary() # Library of delay events
- # Library of extension events. Extension events form single-linked zero-terminated lists
- self.extensions_library = EventLibrary()
- self.grad_library = EventLibrary() # Library of gradient events
- self.label_inc_library = (
- EventLibrary()
- ) # Library of Label(inc) events (reference from the extensions library)
- self.label_set_library = (
- EventLibrary()
- ) # Library of Label(set) events (reference from the extensions library)
- self.rf_library = EventLibrary() # Library of RF events
- self.shape_library = EventLibrary(numpy_data=True) # Library of compressed shapes
- self.trigger_library = EventLibrary() # Library of trigger events
- # =========
- # OTHER
- # =========
- self.system = system
- self.block_events = OrderedDict() # Event table
- self.use_block_cache = use_block_cache
- self.block_cache = dict() # Block cache
- self.next_free_block_ID = 1
- self.definitions = dict() # Optional sequence definitions
- self.rf_raster_time = (
- self.system.rf_raster_time
- ) # RF raster time (system dependent)
- self.grad_raster_time = (
- self.system.grad_raster_time
- ) # Gradient raster time (system dependent)
- self.adc_raster_time = (
- self.system.adc_raster_time
- ) # ADC raster time (system dependent)
- self.block_duration_raster = self.system.block_duration_raster
- self.set_definition("AdcRasterTime", self.adc_raster_time)
- self.set_definition("BlockDurationRaster", self.block_duration_raster)
- self.set_definition("GradientRasterTime", self.grad_raster_time)
- self.set_definition("RadiofrequencyRasterTime", self.rf_raster_time)
- self.signature_type = ""
- self.signature_file = ""
- self.signature_value = ""
- self.block_durations = dict() # Cache of block durations
- self.extension_numeric_idx = [] # numeric IDs of the used extensions
- self.extension_string_idx = [] # string IDs of the used extensions
- def __str__(self) -> str:
- s = "Sequence:"
- s += "\nshape_library: " + str(self.shape_library)
- s += "\nrf_library: " + str(self.rf_library)
- s += "\ngrad_library: " + str(self.grad_library)
- s += "\nadc_library: " + str(self.adc_library)
- s += "\ndelay_library: " + str(self.delay_library)
- s += "\nextensions_library: " + str(
- self.extensions_library
- ) # inserted for trigger support by mveldmann
- s += "\nrf_raster_time: " + str(self.rf_raster_time)
- s += "\ngrad_raster_time: " + str(self.grad_raster_time)
- s += "\nblock_events: " + str(len(self.block_events))
- return s
- def adc_times(
- self, time_range: List[float] = None
- ) -> Tuple[np.ndarray, np.ndarray]:
- """
- Return time points of ADC sampling points.
- Returns
- -------
- t_adc: np.ndarray
- Contains times of all ADC sample points.
- fp_adc : np.ndarray
- Contains frequency and phase offsets of each ADC object (not samples).
- """
- # Collect ADC timing data
- t_adc = []
- fp_adc = []
- curr_dur = 0
- if time_range == None:
- blocks = self.block_events
- else:
- if len(time_range) != 2:
- raise ValueError('Time range must be list of two elements')
- if time_range[0] > time_range[1]:
- raise ValueError('End time of time_range must be after begin time')
- # Calculate end times of each block
- bd = np.array(list(self.block_durations.values()))
- t = np.cumsum(bd)
- # Search block end times for start of time range
- begin_block = np.searchsorted(t, time_range[0])
- # Search block begin times for end of time range
- end_block = np.searchsorted(t - bd, time_range[1], side='right')
- blocks = list(self.block_durations.keys())[begin_block:end_block]
- curr_dur = t[begin_block] - bd[begin_block]
- for block_counter in blocks:
- block = self.get_block(block_counter)
- if block.adc is not None: # ADC
- t_adc.append(
- (np.arange(block.adc.num_samples) + 0.5) * block.adc.dwell
- + block.adc.delay
- + curr_dur
- )
- fp_adc.append([block.adc.freq_offset, block.adc.phase_offset])
- curr_dur += self.block_durations[block_counter]
- if t_adc == []:
- # If there are no ADCs, make sure the output is the right shape
- t_adc = np.zeros(0)
- fp_adc = np.zeros((0, 2))
- else:
- t_adc = np.concatenate(t_adc)
- fp_adc = np.array(fp_adc)
- return t_adc, fp_adc
- def add_block(self, *args: SimpleNamespace) -> None:
- """
- Add a new block/multiple events to the sequence. Adds a sequence block with provided as a block structure
- See also:
- - `pypulseq.Sequence.sequence.Sequence.set_block()`
- - `pypulseq.make_adc.make_adc()`
- - `pypulseq.make_trapezoid.make_trapezoid()`
- - `pypulseq.make_sinc_pulse.make_sinc_pulse()`
- Parameters
- ----------
- args : SimpleNamespace
- Block structure or events to be added as a block to `Sequence`.
- """
- block.set_block(self, self.next_free_block_ID, *args)
- self.next_free_block_ID += 1
- def calculate_gradient_spectrum(
- self, max_frequency: float = 2000,
- window_width: float = 0.05,
- frequency_oversampling: float = 3,
- time_range: List[float] = None,
- plot: bool = True,
- combine_mode: str = 'max',
- use_derivative: bool = False,
- acoustic_resonances: List[dict] = []
- ) -> Tuple[List[np.ndarray], np.ndarray, np.ndarray, np.ndarray]:
- """
- Calculates the gradient spectrum of the sequence. Returns a spectrogram
- for each gradient channel, as well as a root-sum-squares combined
- spectrogram.
- Works by splitting the sequence into windows that are 'window_width'
- long and calculating the fourier transform of each window. Windows
- overlap 50% with the previous and next window. When 'combine_mode' is
- not 'none', all windows are combined into one spectrogram.
- Parameters
- ----------
- max_frequency : float, optional
- Maximum frequency to include in spectrograms. The default is 2000.
- window_width : float, optional
- Window width (in seconds). The default is 0.05.
- frequency_oversampling : float, optional
- Oversampling in the frequency dimension, higher values make
- smoother spectrograms. The default is 3.
- time_range : List[float], optional
- Time range over which to calculate the spectrograms as a list of
- two timepoints (in seconds) (e.g. [1, 1.5])
- The default is None.
- plot : bool, optional
- Whether to plot the spectograms. The default is True.
- combine_mode : str, optional
- How to combine all windows into one spectrogram, options:
- 'max', 'mean', 'sos' (root-sum-of-squares), 'none' (no combination)
- The default is 'max'.
- use_derivative : bool, optional
- Whether the use the derivative of the gradient waveforms instead of the
- gradient waveforms for the gradient spectrum calculations. The default
- is False
- acoustic_resonances : List[dict], optional
- Acoustic resonances as a list of dictionaries with 'frequency' and
- 'bandwidth' elements. Only used when plot==True. The default is [].
- Returns
- -------
- spectrograms : List[np.ndarray]
- List of spectrograms per gradient channel.
- spectrogram_sos : np.ndarray
- Root-sum-of-squares combined spectrogram over all gradient channels.
- frequencies : np.ndarray
- Frequency axis of the spectrograms.
- times : np.ndarray
- Time axis of the spectrograms (only relevant when combine_mode == 'none').
- """
- return calculate_gradient_spectrum(self, max_frequency=max_frequency,
- window_width=window_width,
- frequency_oversampling=frequency_oversampling,
- time_range=time_range,
- plot=plot,
- combine_mode=combine_mode,
- use_derivative=use_derivative,
- acoustic_resonances=acoustic_resonances)
- def calculate_kspace(
- self,
- trajectory_delay: Union[float, List[float], np.ndarray] = 0,
- gradient_offset: Union[float, List[float], np.ndarray] = 0
- ) -> Tuple[np.array, np.array, List[float], List[float], np.array]:
- """
- Calculates the k-space trajectory of the entire pulse sequence.
- Parameters
- ----------
- trajectory_delay : float or list, default=0
- Compensation factor in seconds (s) to align ADC and gradients in the reconstruction.
- gradient_offset : float or list, default=0
- Simulates background gradients (specified in Hz/m)
- Returns
- -------
- k_traj_adc : numpy.array
- K-space trajectory sampled at `t_adc` timepoints.
- k_traj : numpy.array
- K-space trajectory of the entire pulse sequence.
- t_excitation : List[float]
- Excitation timepoints.
- t_refocusing : List[float]
- Refocusing timepoints.
- t_adc : numpy.array
- Sampling timepoints.
- """
- if np.any(np.abs(trajectory_delay) > 100e-6):
- raise Warning(
- f"Trajectory delay of {trajectory_delay * 1e6} us is suspiciously high"
- )
- total_duration = sum(self.block_durations.values())
- t_excitation, fp_excitation, t_refocusing, _ = self.rf_times()
- t_adc, _ = self.adc_times()
- # Convert data to piecewise polynomials
- gw_pp = self.get_gradients(trajectory_delay, gradient_offset)
- ng = len(gw_pp)
- # Calculate slice positions. For now we entirely rely on the excitation -- ignoring complicated interleaved
- # refocused sequences
- if len(t_excitation) > 0:
- # Position in x, y, z
- slice_pos = np.zeros((ng, len(t_excitation)))
- for j in range(ng):
- if gw_pp[j] is None:
- slice_pos[j] = np.nan
- else:
- # Check for divisions by zero to avoid numpy warning
- divisor = np.array(gw_pp[j](t_excitation))
- slice_pos[j, divisor != 0.0] = fp_excitation[0, divisor != 0.0] / divisor[divisor != 0.0]
- slice_pos[j, divisor == 0.0] = np.nan
- slice_pos[~np.isfinite(slice_pos)] = 0 # Reset undefined to 0
- else:
- slice_pos = []
- # =========
- # Integrate waveforms as PPs to produce gradient moments
- gm_pp = []
- tc = []
- for i in range(ng):
- if gw_pp[i] is None:
- gm_pp.append(None)
- continue
- gm_pp.append(gw_pp[i].antiderivative())
- tc.append(gm_pp[i].x)
- # "Sample" ramps for display purposes otherwise piecewise-linear display (plot) fails
- ii = np.flatnonzero(np.abs(gm_pp[i].c[0, :]) > eps)
- # Do nothing if there are no ramps
- if ii.shape[0] == 0:
- continue
- starts = np.int64(np.floor((gm_pp[i].x[ii] + eps) / self.grad_raster_time))
- ends = np.int64(np.ceil((gm_pp[i].x[ii + 1] - eps) / self.grad_raster_time))
- # Create all ranges starts[0]:ends[0], starts[1]:ends[1], etc.
- lengths = ends - starts + 1
- inds = np.ones((lengths).sum())
- # Calculate output index where each range will start
- start_inds = np.cumsum(np.concatenate(([0], lengths[:-1])))
- # Create element-wise differences that will cumsum into
- # the final indices: [starts[0], 1, 1, starts[1]-starts[0]-lengths[0]+1, 1, etc.]
- inds[start_inds] = np.concatenate(([starts[0]], np.diff(starts) - lengths[:-1] + 1))
- tc.append(np.cumsum(inds) * self.grad_raster_time)
- if tc != []:
- tc = np.concatenate(tc)
- t_acc = 1e-10 # Temporal accuracy
- t_acc_inv = 1 / t_acc
- # tc = self.__flatten_jagged_arr(tc)
- t_ktraj = t_acc * np.unique(
- np.round(
- t_acc_inv
- * np.array(
- [
- *tc,
- 0,
- *np.asarray(t_excitation) - 2 * self.rf_raster_time,
- *np.asarray(t_excitation) - self.rf_raster_time,
- *t_excitation,
- *np.asarray(t_refocusing) - self.rf_raster_time,
- *t_refocusing,
- *t_adc,
- total_duration,
- ]
- )
- )
- )
- i_excitation = np.searchsorted(t_ktraj, t_acc * np.round(t_acc_inv * np.asarray(t_excitation)))
- i_refocusing = np.searchsorted(t_ktraj, t_acc * np.round(t_acc_inv * np.asarray(t_refocusing)))
- i_adc = np.searchsorted(t_ktraj, t_acc * np.round(t_acc_inv * np.asarray(t_adc)))
- i_periods = np.unique([0, *i_excitation, *i_refocusing, len(t_ktraj) - 1])
- if len(i_excitation) > 0:
- ii_next_excitation = 0
- else:
- ii_next_excitation = -1
- if len(i_refocusing) > 0:
- ii_next_refocusing = 0
- else:
- ii_next_refocusing = -1
- k_traj = np.zeros((ng, len(t_ktraj)))
- for i in range(ng):
- if gw_pp[i] is None:
- continue
- it = np.where(np.logical_and(
- t_ktraj >= t_acc * round(t_acc_inv * gm_pp[i].x[0]),
- t_ktraj <= t_acc * round(t_acc_inv * gm_pp[i].x[-1]),
- ))[0]
- k_traj[i, it] = gm_pp[i](t_ktraj[it])
- if t_ktraj[it[-1]] < t_ktraj[-1]:
- k_traj[i, it[-1] + 1:] = k_traj[i, it[-1]]
- # Convert gradient moments to kspace
- dk = -k_traj[:, 0]
- for i in range(len(i_periods) - 1):
- i_period = i_periods[i]
- i_period_end = i_periods[i + 1]
- if ii_next_excitation >= 0 and i_excitation[ii_next_excitation] == i_period:
- if abs(t_ktraj[i_period] - t_excitation[ii_next_excitation]) > t_acc:
- raise Warning(
- f"abs(t_ktraj[i_period]-t_excitation[ii_next_excitation]) < {t_acc} failed for ii_next_excitation={ii_next_excitation} error={t_ktraj(i_period) - t_excitation(ii_next_excitation)}"
- )
- dk = -k_traj[:, i_period]
- if i_period > 0:
- # Use nans to mark the excitation points since they interrupt the plots
- k_traj[:, i_period - 1] = np.nan
- # -1 on len(i_excitation) for 0-based indexing
- ii_next_excitation = min(len(i_excitation) - 1, ii_next_excitation + 1)
- elif (
- ii_next_refocusing >= 0 and i_refocusing[ii_next_refocusing] == i_period
- ):
- # dk = -k_traj[:, i_period]
- dk = -2 * k_traj[:, i_period] - dk
- # -1 on len(i_excitation) for 0-based indexing
- ii_next_refocusing = min(len(i_refocusing) - 1, ii_next_refocusing + 1)
- k_traj[:, i_period:i_period_end] = (
- k_traj[:, i_period:i_period_end] + dk[:, None]
- )
- k_traj[:, i_period_end] = k_traj[:, i_period_end] + dk
- k_traj_adc = k_traj[:, i_adc]
- return k_traj_adc, k_traj, t_excitation, t_refocusing, t_adc
- def calculate_kspacePP(
- self,
- trajectory_delay: Union[float, List[float], np.ndarray] = 0,
- gradient_offset: Union[float, List[float], np.ndarray] = 0
- ) -> Tuple[np.array, np.array, np.array, np.array, np.array]:
- warn('Sequence.calculate_kspacePP has been deprecated, use calculate_kspace instead', DeprecationWarning,
- stacklevel=2)
- return self.calculate_kspace(trajectory_delay, gradient_offset)
- def calculate_pns(
- self,
- hardware: SimpleNamespace,
- time_range: List[float] = None,
- do_plots: bool = True
- ) -> Tuple[bool, np.array, np.ndarray, np.array]:
- """
- Calculate PNS using safe model implementation by Szczepankiewicz and Witzel
- See http://github.com/filip-szczepankiewicz/safe_pns_prediction
- Returns pns levels due to respective axes (normalized to 1 and not to 100#)
- Parameters
- ----------
- hardware : SimpleNamespace
- Hardware specifications. See safe_example_hw() from
- the safe_pns_prediction package. Alternatively a text file
- in the .asc format (Siemens) can be passed, e.g. for Prisma
- it is MP_GPA_K2309_2250V_951A_AS82.asc (we leave it as an
- exercise to the interested user to find were these files
- can be acquired from)
- do_plots : bool, optional
- Plot the results from the PNS calculations. The default is True.
- Returns
- -------
- ok : bool
- Boolean flag indicating whether peak PNS is within acceptable limits
- pns_norm : numpy.array [N]
- PNS norm over all gradient channels, normalized to 1
- pns_components : numpy.array [Nx3]
- PNS levels per gradient channel
- t_pns : np.array [N]
- Time axis for the pns_norm and pns_components arrays
- """
- return calc_pns(self, hardware, time_range=time_range, do_plots=do_plots)
- def check_timing(self) -> Tuple[bool, List[str]]:
- """
- Checks timing of all blocks and objects in the sequence optionally returns the detailed error log. This
- function also modifies the sequence object by adding the field "TotalDuration" to sequence definitions.
- Returns
- -------
- is_ok : bool
- Boolean flag indicating timing errors.
- error_report : str
- Error report in case of timing errors.
- """
- error_report = []
- is_ok = True
- total_duration = 0
- for block_counter in self.block_events:
- block = self.get_block(block_counter)
- events = block_to_events(block)
- res, rep, duration = ext_check_timing(self.system, *events)
- is_ok = is_ok and res
- # Check the stored total block duration
- if abs(duration - self.block_durations[block_counter]) > eps:
- rep += "Inconsistency between the stored block duration and the duration of the block content"
- is_ok = False
- duration = self.block_durations[block_counter]
- # Check RF dead times
- if block.rf is not None:
- if block.rf.delay - block.rf.dead_time < -eps:
- rep += (
- f"Delay of {block.rf.delay * 1e6} us is smaller than the RF dead time "
- f"{block.rf.dead_time * 1e6} us"
- )
- is_ok = False
- if (
- block.rf.delay + block.rf.t[-1] + block.rf.ringdown_time - duration
- > eps
- ):
- rep += (
- f"Time between the end of the RF pulse at {block.rf.delay + block.rf.t[-1]} and the end "
- f"of the block at {duration * 1e6} us is shorter than rf_ringdown_time"
- )
- is_ok = False
- # Check ADC dead times
- if block.adc is not None:
- if block.adc.delay - self.system.adc_dead_time < -eps:
- rep += "adc.delay < system.adc_dead_time"
- is_ok = False
- if (
- block.adc.delay
- + block.adc.num_samples * block.adc.dwell
- + self.system.adc_dead_time
- - duration
- > eps
- ):
- rep += "adc: system.adc_dead_time (post-ADC) violation"
- is_ok = False
- # Update report
- if len(rep) != 0:
- error_report.append(f"Event: {block_counter} - {rep}\n")
- total_duration += duration
- # Check if all the gradients in the last block are ramped down properly
- if len(events) != 0 and all([isinstance(e, SimpleNamespace) for e in events]):
- for e in range(len(events)):
- if not isinstance(events[e], list) and events[e].type == "grad":
- if events[e].last != 0:
- error_report.append(
- f"Event: {list(self.block_events)[-1]} - Gradients do not ramp to 0 at the end of the sequence"
- )
- self.set_definition("TotalDuration", total_duration)
- return is_ok, error_report
- def duration(self) -> Tuple[int, int, np.ndarray]:
- """
- Returns the total duration of this sequence, and the total count of blocks and events.
- Returns
- -------
- duration : int
- Duration of this sequence in seconds (s).
- num_blocks : int
- Number of blocks in this sequence.
- event_count : np.ndarray
- Number of events in this sequence.
- """
- num_blocks = len(self.block_events)
- event_count = np.zeros(len(next(iter(self.block_events.values()))))
- duration = 0
- for block_counter in self.block_events:
- event_count += self.block_events[block_counter] > 0
- duration += self.block_durations[block_counter]
- return duration, num_blocks, event_count
- def evaluate_labels(
- self,
- init: dict = None,
- evolution: str = 'none'
- ) -> dict:
- """
- Evaluate label values of the entire sequence.
- When no evolution is given, returns the label values at the end of the
- sequence. Returns a dictionary with keys named after the labels used
- in the sequence. Only the keys corresponding to the labels actually
- used are created.
- E.g. labels['LIN'] == 4
- When evolution is given, labels are tracked through the sequence. See
- below for options for different types of evolutions. The resulting
- dictionary will contain arrays of the label values.
- E.g. labels['LIN'] == np.array([0,1,2,3,4])
- Initial values for the labels can be given with the 'init' parameter.
- Useful if evaluating labels block-by-block.
- Parameters
- ----------
- init : dict, optional
- Dictionary containing initial label values. The default is None.
- evolution : str, optional
- Flag to specify tracking of label evolutions.
- Must be one of: 'none', 'adc', 'label', 'blocks' (default = 'none')
- 'blocks': Return label values for all blocks.
- 'adc': Return label values only for blocks containing ADC events.
- 'label': Return label values only for blocks where labels are
- manipulated.
- Returns
- -------
- labels : dict
- Dictionary containing label values.
- If evolution == 'none', the dictionary values only contains the
- final label value.
- Otherwise, the dictionary values are arrays of label evolutions.
- Only the labels that are used in the sequence are created in the
- dictionary.
- """
- labels = init or dict()
- label_evolution = []
- # TODO: MATLAB implementation includes block_range parameter. But in
- # general we cannot assume linear block ordering. Could include
- # time_range like in other sequence functions. Or a blocks
- # parameter to specify which blocks to loop over?
- for block_counter in self.block_events:
- block = self.get_block(block_counter)
- if block.label is not None:
- # Current block has labels
- for lab in block.label.values():
- if lab.type == 'labelinc':
- # Increment label
- if lab.label not in labels:
- labels[lab.label] = 0
- labels[lab.label] += lab.value
- else:
- # Set label
- labels[lab.label] = lab.value
- if evolution == 'label':
- label_evolution.append(dict(labels))
- if evolution == 'blocks' or (evolution == 'adc' and block.adc is not None):
- label_evolution.append(dict(labels))
- # Convert evolutions into label dictionary
- if len(label_evolution) > 0:
- for lab in labels:
- labels[lab] = np.array([e[lab] if lab in e else 0 for e in label_evolution])
- return labels
- def flip_grad_axis(self, axis: str) -> None:
- """
- Invert all gradients along the corresponding axis/channel. The function acts on all gradient objects already
- added to the sequence object.
- Parameters
- ----------
- axis : str
- Gradients to invert or scale. Must be one of 'x', 'y' or 'z'.
- """
- self.mod_grad_axis(axis, modifier=-1)
- def get_block(self, block_index: int) -> SimpleNamespace:
- """
- Return a block of the sequence specified by the index. The block is created from the sequence data with all
- events and shapes decompressed.
- See also:
- - `pypulseq.Sequence.sequence.Sequence.set_block()`.
- - `pypulseq.Sequence.sequence.Sequence.add_block()`.
- Parameters
- ----------
- block_index : int
- Index of block to be retrieved from `Sequence`.
- Returns
- -------
- SimpleNamespace
- Event identified by `block_index`.
- """
- return block.get_block(self, block_index)
- def get_definition(self, key: str) -> str:
- """
- Return value of the definition specified by the key. These definitions can be added manually or read from the
- header of a sequence file defined in the sequence header. An empty array is returned if the key is not defined.
- See also `pypulseq.Sequence.sequence.Sequence.set_definition()`.
- Parameters
- ----------
- key : str
- Key of definition to retrieve.
- Returns
- -------
- str
- Definition identified by `key` if found, else returns ''.
- """
- if key in self.definitions:
- return self.definitions[key]
- else:
- return ""
- def get_extension_type_ID(self, extension_string: str) -> int:
- """
- Get numeric extension ID for `extension_string`. Will automatically create a new ID if unknown.
- Parameters
- ----------
- extension_string : str
- Given string extension ID.
- Returns
- -------
- extension_id : int
- Numeric ID for given string extension ID.
- """
- if extension_string not in self.extension_string_idx:
- if len(self.extension_numeric_idx) == 0:
- extension_id = 1
- else:
- extension_id = 1 + max(self.extension_numeric_idx)
- self.extension_numeric_idx.append(extension_id)
- self.extension_string_idx.append(extension_string)
- assert len(self.extension_numeric_idx) == len(self.extension_string_idx)
- else:
- num = self.extension_string_idx.index(extension_string)
- extension_id = self.extension_numeric_idx[num]
- return extension_id
- def get_extension_type_string(self, extension_id: int) -> str:
- """
- Get string extension ID for `extension_id`.
- Parameters
- ----------
- extension_id : int
- Given numeric extension ID.
- Returns
- -------
- extension_str : str
- String ID for the given numeric extension ID.
- Raises
- ------
- ValueError
- If given numeric extension ID is unknown.
- """
- if extension_id in self.extension_numeric_idx:
- num = self.extension_numeric_idx.index(extension_id)
- else:
- raise ValueError(
- f"Extension for the given ID - {extension_id} - is unknown."
- )
- extension_str = self.extension_string_idx[num]
- return extension_str
- def get_gradients(self,
- trajectory_delay: Union[float, List[float], np.ndarray] = 0,
- gradient_offset: Union[float, List[float], np.ndarray] = 0,
- time_range: List[float] = None) -> List[PPoly]:
- """
- Get all gradient waveforms of the sequence in a piecewise-polynomial
- format (scipy PPoly). Gradient values can be accessed easily at one or
- more timepoints using `gw_pp[channel](t)` (where t is a float, list of
- floats, or numpy array). Note that PPoly objects return nan for
- timepoints outside the waveform.
- Parameters
- ----------
- trajectory_delay : float or list, default=0
- Compensation factor in seconds (s) to align ADC and gradients in the reconstruction.
- gradient_offset : float or list, default=0
- Simulates background gradients (specified in Hz/m)
- Returns
- -------
- gw_pp : List[PPoly]
- List of gradient waveforms for each of the gradient channels,
- expressed as scipy PPoly objects.
- """
- if np.any(np.abs(trajectory_delay) > 100e-6):
- raise Warning(
- f"Trajectory delay of {trajectory_delay * 1e6} us is suspiciously high"
- )
- total_duration = sum(self.block_durations.values())
- gw_data = self.waveforms(time_range=time_range)
- ng = len(gw_data)
- # Gradient delay handling
- if isinstance(trajectory_delay, (int, float)):
- gradient_delays = [trajectory_delay] * ng
- else:
- assert (len(trajectory_delay) == ng) # Need to have same number of gradient channels
- gradient_delays = [trajectory_delay] * ng
- # Gradient offset handling
- if isinstance(gradient_offset, (int, float)):
- gradient_offset = [gradient_offset] * ng
- else:
- assert (len(gradient_offset) == ng) # Need to have same number of gradient channels
- # Convert data to piecewise polynomials
- gw_pp = []
- for j in range(ng):
- wave_cnt = gw_data[j].shape[1]
- if wave_cnt == 0:
- if np.abs(gradient_offset[j]) <= eps:
- gw_pp.append(None)
- continue
- else:
- gw = np.array(([0, total_duration], [0, 0]))
- else:
- gw = gw_data[j]
- # Now gw contains the waveform from the current axis
- if np.abs(gradient_delays[j]) > eps:
- gw[0] = gw[0] - gradient_delays[j] # Anisotropic gradient delay support
- if not np.all(np.isfinite(gw)):
- raise Warning("Not all elements of the generated waveform are finite.")
- teps = 1e-12
- _temp1 = np.array(([gw[0, 0] - 2 * teps, gw[0, 0] - teps], [0, 0]))
- _temp2 = np.array(([gw[0, -1] + teps, gw[0, -1] + 2 * teps], [0, 0]))
- gw = np.hstack((_temp1, gw, _temp2))
- if np.abs(gradient_offset[j]) > eps:
- gw[1, :] += gradient_offset[j]
- gw[1][gw[1] == -0.0] = 0.0
- gw_pp.append(PPoly(np.stack((np.diff(gw[1]) / np.diff(gw[0]),
- gw[1][:-1])), gw[0], extrapolate=True))
- return gw_pp
- def mod_grad_axis(self, axis: str, modifier: int) -> None:
- """
- Invert or scale all gradients along the corresponding axis/channel. The function acts on all gradient objects
- already added to the sequence object.
- Parameters
- ----------
- axis : str
- Gradients to invert or scale. Must be one of 'x', 'y' or 'z'.
- modifier : int
- Scaling value.
- Raises
- ------
- ValueError
- If invalid `axis` is passed. Must be one of 'x', 'y','z'.
- RuntimeError
- If same gradient event is used on multiple axes.
- """
- if axis not in ["x", "y", "z"]:
- raise ValueError(
- f"Invalid axis. Must be one of 'x', 'y','z'. Passed: {axis}"
- )
- channel_num = ["x", "y", "z"].index(axis)
- other_channels = [0, 1, 2]
- other_channels.remove(channel_num)
- # Go through all event table entries and list gradient objects in the library
- all_grad_events = np.array(list(self.block_events.values()))
- all_grad_events = all_grad_events[:, 2:5]
- selected_events = np.unique(all_grad_events[:, channel_num])
- selected_events = selected_events[selected_events != 0]
- other_events = np.unique(all_grad_events[:, other_channels])
- if len(np.intersect1d(selected_events, other_events)) > 0:
- raise RuntimeError(
- "mod_grad_axis does not yet support the same gradient event used on multiple axes."
- )
- for i in range(len(selected_events)):
- self.grad_library.data[selected_events[i]][0] *= modifier
- if (
- self.grad_library.type[selected_events[i]] == "g"
- and self.grad_library.lengths[selected_events[i]] == 5
- ):
- # Need to update first and last fields
- self.grad_library.data[selected_events[i]][3] *= modifier
- self.grad_library.data[selected_events[i]][4] *= modifier
- def plot(
- self, tk_obj,
- label: str = str(),
- show_blocks: bool = False,
- save: bool = False,
- time_range=(0, np.inf),
- time_disp: str = "s",
- grad_disp: str = "kHz/m",
- plot_now: bool = True,
- tk_plot: bool = True
- ) -> None:
- """
- Plot `Sequence`.
- Parameters
- ----------
- label : str, defualt=str()
- Plot label values for ADC events: in this example for LIN and REP labels; other valid labes are accepted as
- a comma-separated list.
- save : bool, default=False
- Boolean flag indicating if plots should be saved. The two figures will be saved as JPG with numerical
- suffixes to the filename 'seq_plot'.
- show_blocks : bool, default=False
- Boolean flag to indicate if grid and tick labels at the block boundaries are to be plotted.
- time_range : iterable, default=(0, np.inf)
- Time range (x-axis limits) for plotting the sequence. Default is 0 to infinity (entire sequence).
- time_disp : str, default='s'
- Time display type, must be one of `s`, `ms` or `us`.
- grad_disp : str, default='s'
- Gradient display unit, must be one of `kHz/m` or `mT/m`.
- plot_now : bool, default=True
- If true, function immediately shows the plots, blocking the rest of the code until plots are exited.
- If false, plots are shown when plt.show() is called. Useful if plots are to be modified.
- plot_type : str, default='Gradient'
- Gradients display type, must be one of either 'Gradient' or 'Kspace'.
- """
- mpl.rcParams["lines.linewidth"] = 0.75 # Set default Matplotlib linewidth
- valid_time_units = ["s", "ms", "us"]
- valid_grad_units = ["kHz/m", "mT/m"]
- valid_labels = get_supported_labels()
- if (
- not all([isinstance(x, (int, float)) for x in time_range])
- or len(time_range) != 2
- ):
- raise ValueError("Invalid time range")
- if time_disp not in valid_time_units:
- raise ValueError("Unsupported time unit")
- if grad_disp not in valid_grad_units:
- raise ValueError(
- "Unsupported gradient unit. Supported gradient units are: "
- + str(valid_grad_units)
- )
- fig1, fig2 = plt.figure(1), plt.figure(2)
- sp11 = fig1.add_subplot(311)
- sp12 = fig1.add_subplot(312, sharex=sp11)
- sp13 = fig1.add_subplot(313, sharex=sp11)
- fig2_subplots = [
- fig2.add_subplot(311, sharex=sp11),
- fig2.add_subplot(312, sharex=sp11),
- fig2.add_subplot(313, sharex=sp11),
- ]
- t_factor_list = [1, 1e3, 1e6]
- t_factor = t_factor_list[valid_time_units.index(time_disp)]
- g_factor_list = [1e-3, 1e3 / self.system.gamma]
- g_factor = g_factor_list[valid_grad_units.index(grad_disp)]
- t0 = 0
- label_defined = False
- label_idx_to_plot = []
- label_legend_to_plot = []
- label_store = dict()
- for i in range(len(valid_labels)):
- label_store[valid_labels[i]] = 0
- if valid_labels[i] in label.upper():
- label_idx_to_plot.append(i)
- label_legend_to_plot.append(valid_labels[i])
- if len(label_idx_to_plot) != 0:
- p = parula.main(len(label_idx_to_plot) + 1)
- label_colors_to_plot = p(np.arange(len(label_idx_to_plot)))
- cycler = mpl.cycler(color=label_colors_to_plot)
- sp11.set_prop_cycle(cycler)
- # Block timings
- block_edges = np.cumsum([0] + [x[1] for x in sorted(self.block_durations.items())])
- block_edges_in_range = block_edges[
- (block_edges >= time_range[0]) * (block_edges <= time_range[1])
- ]
- if show_blocks:
- for sp in [sp11, sp12, sp13, *fig2_subplots]:
- sp.set_xticks(t_factor * block_edges_in_range)
- sp.set_xticklabels(sp.get_xticklabels(), rotation=90)
- for block_counter in self.block_events:
- block = self.get_block(block_counter)
- is_valid = (time_range[0] <= t0 + self.block_durations[block_counter]
- and t0 <= time_range[1])
- if is_valid:
- if getattr(block, "label", None) is not None:
- for i in range(len(block.label)):
- if block.label[i].type == "labelinc":
- label_store[block.label[i].label] += block.label[i].value
- else:
- label_store[block.label[i].label] = block.label[i].value
- label_defined = True
- if getattr(block, "adc", None) is not None: # ADC
- adc = block.adc
- # From Pulseq: According to the information from Klaus Scheffler and indirectly from Siemens this
- # is the present convention - the samples are shifted by 0.5 dwell
- t = adc.delay + (np.arange(int(adc.num_samples)) + 0.5) * adc.dwell
- sp11.plot(t_factor * (t0 + t), np.zeros(len(t)), "rx")
- sp13.plot(
- t_factor * (t0 + t),
- np.angle(
- np.exp(1j * adc.phase_offset)
- * np.exp(1j * 2 * np.pi * t * adc.freq_offset)
- ),
- "b.",
- markersize=0.25,
- )
- if label_defined and len(label_idx_to_plot) != 0:
- arr_label_store = list(label_store.values())
- lbl_vals = np.take(arr_label_store, label_idx_to_plot)
- t = t0 + adc.delay + (adc.num_samples - 1) / 2 * adc.dwell
- _t = [t_factor * t] * len(lbl_vals)
- # Plot each label individually to retrieve each corresponding Line2D object
- p = itertools.chain.from_iterable(
- [
- sp11.plot(__t, _lbl_vals, ".")
- for __t, _lbl_vals in zip(_t, lbl_vals)
- ]
- )
- if len(label_legend_to_plot) != 0:
- sp11.legend(p, label_legend_to_plot, loc="upper left")
- label_legend_to_plot = []
- if getattr(block, "rf", None) is not None: # RF
- rf = block.rf
- tc, ic = calc_rf_center(rf)
- time = rf.t
- signal = rf.signal
- if abs(signal[0]) != 0:
- signal = np.concatenate(([0], signal))
- time = np.concatenate(([time[0]], time))
- ic += 1
- if abs(signal[-1]) != 0:
- signal = np.concatenate((signal, [0]))
- time = np.concatenate((time, [time[-1]]))
- sp12.plot(t_factor * (t0 + time + rf.delay), np.abs(signal))
- sp13.plot(
- t_factor * (t0 + time + rf.delay),
- np.angle(
- signal
- * np.exp(1j * rf.phase_offset)
- * np.exp(1j * 2 * math.pi * time * rf.freq_offset)
- ),
- t_factor * (t0 + tc + rf.delay),
- np.angle(
- signal[ic]
- * np.exp(1j * rf.phase_offset)
- * np.exp(1j * 2 * math.pi * time[ic] * rf.freq_offset)
- ),
- "xb",
- )
- grad_channels = ["gx", "gy", "gz"]
- for x in range(len(grad_channels)): # Gradients
- if getattr(block, grad_channels[x], None) is not None:
- grad = getattr(block, grad_channels[x])
- if grad.type == "grad":
- # We extend the shape by adding the first and the last points in an effort of making the
- # display a bit less confusing...
- time = grad.delay + np.array([0, *grad.tt, grad.shape_dur])
- waveform = g_factor * np.array(
- (grad.first, *grad.waveform, grad.last)
- )
- else:
- time = np.array(cumsum(
- 0,
- grad.delay,
- grad.rise_time,
- grad.flat_time,
- grad.fall_time,
- ))
- waveform = (
- g_factor * grad.amplitude * np.array([0, 0, 1, 1, 0])
- )
- fig2_subplots[x].plot(t_factor * (t0 + time), waveform)
- t0 += self.block_durations[block_counter]
- grad_plot_labels = ["x", "y", "z"]
- sp11.set_ylabel("ADC")
- sp12.set_ylabel("RF mag (Hz)")
- sp13.set_ylabel("RF/ADC phase (rad)")
- sp13.set_xlabel(f"t ({time_disp})")
- for x in range(3):
- _label = grad_plot_labels[x]
- fig2_subplots[x].set_ylabel(f"G{_label} ({grad_disp})")
- fig2_subplots[-1].set_xlabel(f"t ({time_disp})")
- # Setting display limits
- disp_range = t_factor * np.array([time_range[0], min(t0, time_range[1])])
- [x.set_xlim(disp_range) for x in [sp11, sp12, sp13, *fig2_subplots]]
- # Grid on
- for sp in [sp11, sp12, sp13, *fig2_subplots]:
- sp.grid()
- fig1.tight_layout()
- fig2.tight_layout()
- if save:
- fig1.savefig("seq_plot1.jpg")
- fig2.savefig("seq_plot2.jpg")
- if tk_plot:
- root = tk.Tk()
- root.title("График с панелью инструментов")
- canvas = FigureCanvasTkAgg(fig1, master=root)
- canvas.draw()
- # Добавляем панель инструментов (зумирование, сохранение и т. д.)
- toolbar = NavigationToolbar2Tk(canvas, root)
- toolbar.update()
- canvas.get_tk_widget().pack(side=tk.TOP, fill=tk.BOTH, expand=True)
- if plot_now:
- plt.show()
- def read(self, file_path: str, detect_rf_use: bool = False, remove_duplicates: bool = True) -> None:
- """
- Read `.seq` file from `file_path`.
- Parameters
- ----------
- detect_rf_use
- file_path : str
- Path to `.seq` file to be read.
- remove_duplicates : bool, default=True
- Remove duplicate events from the sequence after reading.
- """
- if self.use_block_cache:
- self.block_cache.clear()
- read(self, path=file_path, detect_rf_use=detect_rf_use, remove_duplicates=remove_duplicates)
- # Initialize next free block ID
- self.next_free_block_ID = (max(self.block_events) + 1) if self.block_events else 1
- def register_adc_event(self, event: EventLibrary) -> int:
- return block.register_adc_event(self, event)
- def register_grad_event(
- self, event: SimpleNamespace
- ) -> Union[int, Tuple[int, int]]:
- return block.register_grad_event(self, event)
- def register_label_event(self, event: SimpleNamespace) -> int:
- return block.register_label_event(self, event)
- def register_rf_event(self, event: SimpleNamespace) -> Tuple[int, List[int]]:
- return block.register_rf_event(self, event)
- def remove_duplicates(self, in_place: bool = False) -> Self:
- """
- Removes duplicate events from the shape and event libraries contained
- in this sequence.
- Parameters
- ----------
- in_place : bool, optional
- If true, removes the duplicates from the current sequence.
- Otherwise, a copy is created. The default is False.
- Returns
- -------
- seq_copy : Sequence
- If `in_place`, returns self. Otherwise returns a copy of the
- sequence.
- """
- if in_place:
- seq_copy = self
- else:
- # Avoid copying block_cache for performance
- tmp = self.block_cache
- self.block_cache = {}
- seq_copy = deepcopy(self)
- self.block_cache = tmp
- # Find duplicate in shape library
- seq_copy.shape_library, mapping = seq_copy.shape_library.remove_duplicates(9)
- # Remap shape IDs of arbitrary gradient events
- for grad_id in seq_copy.grad_library.data:
- if seq_copy.grad_library.type[grad_id] == 'g':
- data = seq_copy.grad_library.data[grad_id]
- new_data = (data[0],) + (mapping[data[1]], mapping[data[2]]) + data[3:]
- if data != new_data:
- seq_copy.grad_library.update(grad_id, None, new_data)
- # Remap shape IDs of RF events
- for rf_id in seq_copy.rf_library.data:
- data = seq_copy.rf_library.data[rf_id]
- new_data = (data[0],) + (mapping[data[1]], mapping[data[2]], mapping[data[3]]) + data[4:]
- if data != new_data:
- seq_copy.rf_library.update(rf_id, None, new_data)
- # Filter duplicates in gradient library
- seq_copy.grad_library, mapping = seq_copy.grad_library.remove_duplicates((6, -6, -6, -6, -6, -6))
- # Remap gradient event IDs
- for block_id in seq_copy.block_events:
- seq_copy.block_events[block_id][2] = mapping[seq_copy.block_events[block_id][2]]
- seq_copy.block_events[block_id][3] = mapping[seq_copy.block_events[block_id][3]]
- seq_copy.block_events[block_id][4] = mapping[seq_copy.block_events[block_id][4]]
- # Filter duplicates in RF library
- seq_copy.rf_library, mapping = seq_copy.rf_library.remove_duplicates((6, 0, 0, 0, 6, 6, 6))
- # Remap RF event IDs
- for block_id in seq_copy.block_events:
- seq_copy.block_events[block_id][1] = mapping[seq_copy.block_events[block_id][1]]
- # Filter duplicates in ADC library
- seq_copy.adc_library, mapping = seq_copy.adc_library.remove_duplicates((0, -9, -6, 6, 6, 6))
- # Remap ADC event IDs
- for block_id in seq_copy.block_events:
- seq_copy.block_events[block_id][5] = mapping[seq_copy.block_events[block_id][5]]
- return seq_copy
- def rf_from_lib_data(self, lib_data: list, use: str = str()) -> SimpleNamespace:
- """
- Construct RF object from `lib_data`.
- Parameters
- ----------
- lib_data : list
- RF envelope.
- use : str, default=str()
- RF event use.
- Returns
- -------
- rf : SimpleNamespace
- RF object constructed from `lib_data`.
- """
- rf = SimpleNamespace()
- rf.type = "rf"
- amplitude, mag_shape, phase_shape = lib_data[0], lib_data[1], lib_data[2]
- shape_data = self.shape_library.data[mag_shape]
- compressed = SimpleNamespace()
- compressed.num_samples = shape_data[0]
- compressed.data = shape_data[1:]
- mag = decompress_shape(compressed)
- shape_data = self.shape_library.data[phase_shape]
- compressed.num_samples = shape_data[0]
- compressed.data = shape_data[1:]
- phase = decompress_shape(compressed)
- rf.signal = amplitude * mag * np.exp(1j * 2 * np.pi * phase)
- time_shape = lib_data[3]
- if time_shape > 0:
- shape_data = self.shape_library.data[time_shape]
- compressed.num_samples = shape_data[0]
- compressed.data = shape_data[1:]
- rf.t = decompress_shape(compressed) * self.rf_raster_time
- rf.shape_dur = (
- math.ceil((rf.t[-1] - eps) / self.rf_raster_time) * self.rf_raster_time
- )
- else: # Generate default time raster on the fly
- rf.t = (np.arange(1, len(rf.signal) + 1) - 0.5) * self.rf_raster_time
- rf.shape_dur = len(rf.signal) * self.rf_raster_time
- rf.delay = lib_data[4]
- rf.freq_offset = lib_data[5]
- rf.phase_offset = lib_data[6]
- rf.dead_time = self.system.rf_dead_time
- rf.ringdown_time = self.system.rf_ringdown_time
- if use != "":
- use_cases = {
- "e": "excitation",
- "r": "refocusing",
- "i": "inversion",
- "s": "saturation",
- "p": "preparation",
- }
- rf.use = use_cases[use] if use in use_cases else "undefined"
- return rf
- def rf_times(
- self, time_range: List[float] = None
- ) -> Tuple[List[float], np.ndarray, List[float], np.ndarray, np.ndarray]:
- """
- Return time points of excitations and refocusings.
- Returns
- -------
- t_excitation : List[float]
- Contains time moments of the excitation RF pulses
- fp_excitation : np.ndarray
- Contains frequency and phase offsets of the excitation RF pulses
- t_refocusing : List[float]
- Contains time moments of the refocusing RF pulses
- fp_refocusing : np.ndarray
- Contains frequency and phase offsets of the excitation RF pulses
- """
- # Collect RF timing data
- t_excitation = []
- fp_excitation = []
- t_refocusing = []
- fp_refocusing = []
- curr_dur = 0
- if time_range == None:
- blocks = self.block_events
- else:
- if len(time_range) != 2:
- raise ValueError('Time range must be list of two elements')
- if time_range[0] > time_range[1]:
- raise ValueError('End time of time_range must be after begin time')
- # Calculate end times of each block
- bd = np.array(list(self.block_durations.values()))
- t = np.cumsum(bd)
- # Search block end times for start of time range
- begin_block = np.searchsorted(t, time_range[0])
- # Search block begin times for end of time range
- end_block = np.searchsorted(t - bd, time_range[1], side='right')
- blocks = list(self.block_durations.keys())[begin_block:end_block]
- curr_dur = t[begin_block] - bd[begin_block]
- for block_counter in blocks:
- block = self.get_block(block_counter)
- if block.rf is not None: # RF
- rf = block.rf
- t = rf.delay + calc_rf_center(rf)[0]
- if not hasattr(rf, "use") or block.rf.use in [
- "excitation",
- "undefined",
- ]:
- t_excitation.append(curr_dur + t)
- fp_excitation.append([block.rf.freq_offset, block.rf.phase_offset])
- elif block.rf.use == "refocusing":
- t_refocusing.append(curr_dur + t)
- fp_refocusing.append([block.rf.freq_offset, block.rf.phase_offset])
- curr_dur += self.block_durations[block_counter]
- if len(t_excitation) != 0:
- fp_excitation = np.array(fp_excitation).T
- else:
- fp_excitation = np.empty((2, 0))
- if len(t_refocusing) != 0:
- fp_refocusing = np.array(fp_refocusing).T
- else:
- fp_refocusing = np.empty((2, 0))
- return t_excitation, fp_excitation, t_refocusing, fp_refocusing
- def set_block(self, block_index: int, *args: SimpleNamespace) -> None:
- """
- Replace block at index with new block provided as block structure, add sequence block, or create a new block
- from events and store at position specified by index. The block or events are provided in uncompressed form and
- will be stored in the compressed, non-redundant internal libraries.
- See also:
- - `pypulseq.Sequence.sequence.Sequence.get_block()`
- - `pypulseq.Sequence.sequence.Sequence.add_block()`
- Parameters
- ----------
- block_index : int
- Index at which block is replaced.
- args : SimpleNamespace
- Block or events to be replaced/added or created at `block_index`.
- """
- block.set_block(self, block_index, *args)
- if block_index >= self.next_free_block_ID:
- self.next_free_block_ID = block_index + 1
- def set_definition(
- self, key: str, value: Union[float, int, list, np.ndarray, str, tuple]
- ) -> None:
- """
- Modify a custom definition of the sequence. Set the user definition 'key' to value 'value'. If the definition
- does not exist it will be created.
- See also `pypulseq.Sequence.sequence.Sequence.get_definition()`.
- Parameters
- ----------
- key : str
- Definition key.
- value : int, list, np.ndarray, str or tuple
- Definition value.
- """
- if key == "FOV":
- if np.max(value) > 1:
- text = "Definition FOV uses values exceeding 1 m. "
- text += "New Pulseq interpreters expect values in units of meters."
- warn(text)
- self.definitions[key] = value
- def set_extension_string_ID(self, extension_str: str, extension_id: int) -> None:
- """
- Set numeric ID for the given string extension ID.
- Parameters
- ----------
- extension_str : str
- Given string extension ID.
- extension_id : int
- Given numeric extension ID.
- Raises
- ------
- ValueError
- If given numeric or string extension ID is not unique.
- """
- if (
- extension_str in self.extension_string_idx
- or extension_id in self.extension_numeric_idx
- ):
- raise ValueError("Numeric or string ID is not unique")
- self.extension_numeric_idx.append(extension_id)
- self.extension_string_idx.append(extension_str)
- assert len(self.extension_numeric_idx) == len(self.extension_string_idx)
- def test_report(self) -> str:
- """
- Analyze the sequence and return a text report.
- """
- return ext_test_report(self)
- def waveforms(
- self, append_RF: bool = False, time_range: List[float] = None
- ) -> Tuple[np.ndarray]:
- """
- Decompress the entire gradient waveform. Returns gradient waveforms as a tuple of `np.ndarray` of
- `gradient_axes` (typically 3) dimensions. Each `np.ndarray` contains timepoints and the corresponding
- gradient amplitude values.
- Parameters
- ----------
- append_RF : bool, default=False
- Boolean flag to indicate if RF wave shapes are to be appended after the gradients.
- Returns
- -------
- wave_data : np.ndarray
- """
- grad_channels = ["gx", "gy", "gz"]
- # Collect shape pieces
- if append_RF:
- shape_channels = len(grad_channels) + 1 # Last 'channel' is RF
- else:
- shape_channels = len(grad_channels)
- shape_pieces = [[] for _ in range(shape_channels)]
- out_len = np.zeros(shape_channels) # Last 'channel' is RF
- curr_dur = 0
- if time_range == None:
- blocks = self.block_events
- else:
- if len(time_range) != 2:
- raise ValueError('Time range must be list of two elements')
- if time_range[0] > time_range[1]:
- raise ValueError('End time of time_range must be after begin time')
- # Calculate end times of each block
- bd = np.array(list(self.block_durations.values()))
- t = np.cumsum(bd)
- # Search block end times for start of time range
- begin_block = np.searchsorted(t, time_range[0])
- # Search block begin times for end of time range
- end_block = np.searchsorted(t - bd, time_range[1], side='right')
- blocks = list(self.block_durations.keys())[begin_block:end_block]
- curr_dur = t[begin_block] - bd[begin_block]
- for block_counter in blocks:
- block = self.get_block(block_counter)
- for j in range(len(grad_channels)):
- grad = getattr(block, grad_channels[j])
- if grad is not None: # Gradients
- if grad.type == "grad":
- # Check if we have an extended trapezoid or an arbitrary gradient on a regular raster
- tt_rast = grad.tt / self.grad_raster_time + 0.5
- if np.all(
- np.abs(tt_rast - np.arange(1, len(tt_rast) + 1)) < eps
- ): # Arbitrary gradient
- """
- Arbitrary gradient: restore & recompress shape - if we had a trapezoid converted to shape we
- have to find the "corners" and we can eliminate internal samples on the straight segments
- but first we have to restore samples on the edges of the gradient raster intervals for that
- we need the first sample.
- """
- # TODO: Implement restoreAdditionalShapeSamples
- # https://github.com/pulseq/pulseq/blob/master/matlab/%2Bmr/restoreAdditionalShapeSamples.m
- out_len[j] += len(grad.tt) + 2
- shape_pieces[j].append(np.array(
- [
- curr_dur + grad.delay + np.concatenate(
- ([0], grad.tt, [grad.tt[-1] + self.grad_raster_time / 2])),
- np.concatenate(([grad.first], grad.waveform, [grad.last]))
- ]
- ))
- else: # Extended trapezoid
- out_len[j] += len(grad.tt)
- shape_pieces[j].append(np.array(
- [
- curr_dur + grad.delay + grad.tt,
- grad.waveform,
- ]
- ))
- else:
- if abs(grad.flat_time) > eps:
- out_len[j] += 4
- _temp = np.vstack(
- (
- cumsum(
- curr_dur + grad.delay,
- grad.rise_time,
- grad.flat_time,
- grad.fall_time,
- ),
- grad.amplitude * np.array([0, 1, 1, 0]),
- )
- )
- shape_pieces[j].append(_temp)
- else:
- if abs(grad.rise_time) > eps and abs(grad.fall_time) > eps:
- out_len[j] += 3
- _temp = np.vstack(
- (
- cumsum(curr_dur + grad.delay, grad.rise_time, grad.fall_time),
- grad.amplitude * np.array([0, 1, 0]),
- )
- )
- shape_pieces[j].append(_temp)
- else:
- if abs(grad.amplitude) > eps:
- print(
- 'Warning: "empty" gradient with non-zero magnitude detected in block {}'.format(
- block_counter))
- if block.rf is not None: # RF
- rf = block.rf
- if append_RF:
- rf_piece = np.array(
- [
- curr_dur + rf.delay + rf.t,
- rf.signal
- * np.exp(
- 1j
- * (rf.phase_offset + 2 * np.pi * rf.freq_offset * rf.t)
- ),
- ]
- )
- out_len[-1] += len(rf.t)
- if abs(rf.signal[0]) > 0:
- pre = np.array([[rf_piece[0, 0] - 0.1 * self.system.rf_raster_time], [0]])
- rf_piece = np.hstack((pre, rf_piece))
- out_len[-1] += pre.shape[1]
- if abs(rf.signal[-1]) > 0:
- post = np.array([[rf_piece[0, -1] + 0.1 * self.system.rf_raster_time], [0]])
- rf_piece = np.hstack((rf_piece, post))
- out_len[-1] += post.shape[1]
- shape_pieces[-1].append(rf_piece)
- curr_dur += self.block_durations[block_counter]
- # Collect wave data
- wave_data = []
- for j in range(shape_channels):
- if shape_pieces[j] == []:
- wave_data.append(np.zeros((2, 0)))
- continue
- # If the first element of the next shape has the same time as
- # the last element of the previous shape, drop the first
- # element of the next shape.
- shape_pieces[j] = ([shape_pieces[j][0]] +
- [cur if prev[0, -1] + eps < cur[0, 0] else cur[:, 1:]
- for prev, cur in zip(shape_pieces[j][:-1], shape_pieces[j][1:])])
- wave_data.append(np.concatenate(shape_pieces[j], axis=1))
- rftdiff = np.diff(wave_data[j][0])
- if np.any(rftdiff < eps):
- raise Warning(
- "Time vector elements are not monotonically increasing."
- )
- return wave_data
- def waveforms_and_times(
- self, append_RF: bool = False, time_range: List[float] = None
- ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
- """
- Decompress the entire gradient waveform. Returns gradient waveforms as a tuple of `np.ndarray` of
- `gradient_axes` (typically 3) dimensions. Each `np.ndarray` contains timepoints and the corresponding
- gradient amplitude values. Additional return values are time points of excitations, refocusings and ADC
- sampling points.
- Parameters
- ----------
- append_RF : bool, default=False
- Boolean flag to indicate if RF wave shapes are to be appended after the gradients.
- Returns
- -------
- wave_data : np.ndarray
- tfp_excitation : np.ndarray
- Contains time moments, frequency and phase offsets of the excitation RF pulses (similar for `
- tfp_refocusing`).
- tfp_refocusing : np.ndarray
- t_adc: np.ndarray
- Contains times of all ADC sample points.
- fp_adc : np.ndarray
- Contains frequency and phase offsets of each ADC object (not samples).
- """
- wave_data = self.waveforms(append_RF=append_RF, time_range=time_range)
- t_excitation, fp_excitation, t_refocusing, fp_refocusing = self.rf_times(time_range=time_range)
- t_adc, fp_adc = self.adc_times(time_range=time_range)
- # Join times, frequency and phases of RF pulses for compatibility with previous implementation
- tfp_excitation = np.concatenate((np.array(t_excitation)[None], fp_excitation), axis=0)
- tfp_refocusing = np.concatenate((np.array(t_refocusing)[None], fp_refocusing), axis=0)
- return wave_data, tfp_excitation, tfp_refocusing, t_adc, fp_adc
- def waveforms_export(self, time_range=(0, np.inf)) -> dict:
- """
- Plot `Sequence`.
- Parameters
- ----------
- time_range : iterable, default=(0, np.inf)
- Time range (x-axis limits) for all waveforms. Default is 0 to infinity (entire sequence).
- Returns
- -------
- all_waveforms: dict
- Dictionary containing the following sequence waveforms and time array(s):
- - `t_adc` - ADC timing array [seconds]
- - `t_rf` - RF timing array [seconds]
- - `t_rf_centers`: `rf_t_centers`,
- - `t_gx`: x gradient timing array,
- - `t_gy`: y gradient timing array,
- - `t_gz`: z gradient timing array,
- - `adc` - ADC complex signal (amplitude=1, phase=adc phase) [a.u.]
- - `rf` - RF complex signal
- - `rf_centers`: RF centers array,
- - `gx` - x gradient
- - `gy` - y gradient
- - `gz` - z gradient
- - `grad_unit`: [kHz/m],
- - `rf_unit`: [Hz],
- - `time_unit`: [seconds],
- """
- # Check time range validity
- if (
- not all([isinstance(x, (int, float)) for x in time_range])
- or len(time_range) != 2
- ):
- raise ValueError("Invalid time range")
- t0 = 0
- adc_t_all = np.array([])
- adc_signal_all = np.array([], dtype=complex)
- rf_t_all = np.array([])
- rf_signal_all = np.array([], dtype=complex)
- rf_t_centers = np.array([])
- rf_signal_centers = np.array([], dtype=complex)
- gx_t_all = np.array([])
- gy_t_all = np.array([])
- gz_t_all = np.array([])
- gx_all = np.array([])
- gy_all = np.array([])
- gz_all = np.array([])
- for block_counter in self.block_events: # For each block
- block = self.get_block(block_counter) # Retrieve it
- is_valid = (
- time_range[0] <= t0 <= time_range[1]
- ) # Check if "current time" is within requested range.
- if is_valid:
- # Case 1: ADC
- if block.adc != None:
- adc = block.adc # Get adc info
- # From Pulseq: According to the information from Klaus Scheffler and indirectly from Siemens this
- # is the present convention - the samples are shifted by 0.5 dwell
- t = adc.delay + (np.arange(int(adc.num_samples)) + 0.5) * adc.dwell
- adc_t = t0 + t
- adc_signal = np.exp(1j * adc.phase_offset) * np.exp(
- 1j * 2 * np.pi * t * adc.freq_offset
- )
- adc_t_all = np.concatenate((adc_t_all, adc_t))
- adc_signal_all = np.concatenate((adc_signal_all, adc_signal))
- if block.rf != None:
- rf = block.rf
- tc, ic = calc_rf_center(rf)
- t = rf.t + rf.delay
- tc = tc + rf.delay
- # Debug - visualize
- # sp12.plot(t_factor * (t0 + t), np.abs(rf.signal))
- # sp13.plot(t_factor * (t0 + t), np.angle(rf.signal * np.exp(1j * rf.phase_offset)
- # * np.exp(1j * 2 * math.pi * rf.t * rf.freq_offset)),
- # t_factor * (t0 + tc), np.angle(rf.signal[ic] * np.exp(1j * rf.phase_offset)
- # * np.exp(1j * 2 * math.pi * rf.t[ic] * rf.freq_offset)),
- # 'xb')
- rf_t = t0 + t
- rf = (
- rf.signal
- * np.exp(1j * rf.phase_offset)
- * np.exp(1j * 2 * math.pi * rf.t * rf.freq_offset)
- )
- rf_t_all = np.concatenate((rf_t_all, rf_t))
- rf_signal_all = np.concatenate((rf_signal_all, rf))
- rf_t_centers = np.concatenate((rf_t_centers, [rf_t[ic]]))
- rf_signal_centers = np.concatenate((rf_signal_centers, [rf[ic]]))
- grad_channels = ["gx", "gy", "gz"]
- for x in range(
- len(grad_channels)
- ): # Check each gradient channel: x, y, and z
- if getattr(block, grad_channels[x]) != None:
- # If this channel is on in current block
- grad = getattr(block, grad_channels[x])
- if grad.type == "grad": # Arbitrary gradient option
- # In place unpacking of grad.t with the starred expression
- g_t = (
- t0
- + grad.delay
- + [
- 0,
- *(grad.tt + (grad.tt[1] - grad.tt[0]) / 2),
- grad.tt[-1] + grad.tt[1] - grad.tt[0],
- ]
- )
- g = 1e-3 * np.array((grad.first, *grad.waveform, grad.last))
- else: # Trapezoid gradient option
- g_t = cumsum(
- t0,
- grad.delay,
- grad.rise_time,
- grad.flat_time,
- grad.fall_time,
- )
- g = 1e-3 * grad.amplitude * np.array([0, 0, 1, 1, 0])
- if grad.channel == "x":
- gx_t_all = np.concatenate((gx_t_all, g_t))
- gx_all = np.concatenate((gx_all, g))
- elif grad.channel == "y":
- gy_t_all = np.concatenate((gy_t_all, g_t))
- gy_all = np.concatenate((gy_all, g))
- elif grad.channel == "z":
- gz_t_all = np.concatenate((gz_t_all, g_t))
- gz_all = np.concatenate((gz_all, g))
- t0 += self.block_durations[
- block_counter
- ] # "Current time" gets updated to end of block just examined
- all_waveforms = {
- "t_adc": adc_t_all,
- "t_rf": rf_t_all,
- "t_rf_centers": rf_t_centers,
- "t_gx": gx_t_all,
- "t_gy": gy_t_all,
- "t_gz": gz_t_all,
- "adc": adc_signal_all,
- "rf": rf_signal_all,
- "rf_centers": rf_signal_centers,
- "gx": gx_all,
- "gy": gy_all,
- "gz": gz_all,
- "grad_unit": "[kHz/m]",
- "rf_unit": "[Hz]",
- "time_unit": "[seconds]",
- }
- return all_waveforms
- def write(self, name: str, create_signature: bool = True, remove_duplicates: bool = True) -> Union[str, None]:
- """
- Write the sequence data to the given filename using the open file format for MR sequences.
- See also `pypulseq.Sequence.read_seq.read()`.
- Parameters
- ----------
- name : str
- Filename of `.seq` file to be written to disk.
- create_signature : bool, default=True
- Boolean flag to indicate if the file has to be signed.
- remove_duplicates : bool, default=True
- Remove duplicate events from the sequence before writing
- Returns
- -------
- signature or None : If create_signature is True, it returns the written .seq file's signature as a string,
- otherwise it returns None. Note that, if remove_duplicates is True, signature belongs to the
- deduplicated sequences signature, and not the Sequence that is stored in the Sequence object.
- """
- signature = write_seq(self, name, create_signature)
- if signature is not None:
- self.signature_type = "md5"
- self.signature_file = "text"
- self.signature_value = signature
- return signature
- else:
- return None
|