| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637 |
- from types import SimpleNamespace
- from typing import Tuple, List, Union
- import numpy as np
- from seqgen.pypulseq.block_to_events import block_to_events
- from seqgen.pypulseq.compress_shape import compress_shape
- from seqgen.pypulseq.decompress_shape import decompress_shape
- from seqgen.pypulseq.event_lib import EventLibrary
- from seqgen.pypulseq.supported_labels_rf_use import get_supported_labels
- def set_block(self, block_index: int, *args: SimpleNamespace) -> None:
- """
- Replace block at index with new block provided as block structure, add sequence block, or create a new block
- from events and store at position specified by index. The block or events are provided in uncompressed form and
- will be stored in the compressed, non-redundant internal libraries.
- See also:
- - `pypulseq.Sequence.sequence.Sequence.get_block()`
- - `pypulseq.Sequence.sequence.Sequence.add_block()`
- Parameters
- ----------
- block_index : int
- Index at which block is replaced.
- args : SimpleNamespace
- Block or events to be replaced/added or created at `block_index`.
- Raises
- ------
- ValueError
- If trigger event that is passed is of unsupported control event type.
- If delay is set for a gradient even that starts with a non-zero amplitude.
- RuntimeError
- If two consecutive gradients to not have the same amplitude at the connection point.
- If the first gradient in the block does not start with 0.
- If a gradient that doesn't end at zero is not aligned to the block boundary.
- """
- events = block_to_events(*args)
- self.block_events[block_index] = np.zeros(7, dtype=np.int32)
- duration = 0
- check_g = {} # Key-value mapping of index and pairs of gradients/times
- extensions = []
- for event in events:
- if not isinstance(event, float): # If event is not a block duration
- if event.type == "rf":
- if hasattr(event, "id"):
- rf_id = event.id
- else:
- rf_id, _ = register_rf_event(self, event)
- self.block_events[block_index][1] = rf_id
- duration = max(
- duration, event.shape_dur + event.delay + event.ringdown_time
- )
- elif event.type == "grad":
- channel_num = ["x", "y", "z"].index(event.channel)
- idx = 2 + channel_num
- grad_start = (
- event.delay
- + np.floor(event.tt[0] / self.grad_raster_time + 1e-10)
- * self.grad_raster_time
- )
- grad_duration = (
- event.delay
- + np.ceil(event.tt[-1] / self.grad_raster_time - 1e-10)
- * self.grad_raster_time
- )
- check_g[channel_num] = SimpleNamespace()
- check_g[channel_num].idx = idx
- check_g[channel_num].start = np.array((grad_start, event.first))
- check_g[channel_num].stop = np.array((grad_duration, event.last))
- if hasattr(event, "id"):
- grad_id = event.id
- else:
- grad_id, _ = register_grad_event(self, event)
- self.block_events[block_index][idx] = grad_id
- duration = np.max([duration, grad_duration])
- elif event.type == "trap":
- channel_num = ["x", "y", "z"].index(event.channel)
- idx = 2 + channel_num
- check_g[channel_num] = SimpleNamespace()
- check_g[channel_num].idx = idx
- check_g[channel_num].start = np.array((0, 0))
- check_g[channel_num].stop = np.array(
- (
- event.delay
- + event.rise_time
- + event.fall_time
- + event.flat_time,
- 0,
- )
- )
- if hasattr(event, "id"):
- trap_id = event.id
- else:
- trap_id = register_grad_event(self, event)
- self.block_events[block_index][idx] = trap_id
- duration = np.max(
- [
- duration,
- event.delay
- + event.rise_time
- + event.flat_time
- + event.fall_time,
- ]
- )
- elif event.type == "adc":
- if hasattr(event, "id"):
- adc_id = event.id
- else:
- adc_id = register_adc_event(self, event)
- self.block_events[block_index][5] = adc_id
- duration = np.max(
- [
- duration,
- event.delay + event.num_samples * event.dwell + event.dead_time,
- ]
- )
- elif event.type == "delay":
- duration = np.max([duration, event.delay])
- elif event.type in ["output", "trigger"]:
- if hasattr(event, "id"):
- event_id = event.id
- else:
- event_id = register_control_event(self, event)
- ext = {"type": self.get_extension_type_ID("TRIGGERS"), "ref": event_id}
- extensions.append(ext)
- duration = np.max([duration, event.delay + event.duration])
- elif event.type in ["labelset", "labelinc"]:
- if hasattr(event, "id"):
- label_id = event.id
- else:
- label_id = register_label_event(self, event)
- ext = {
- "type": self.get_extension_type_ID(event.type.upper()),
- "ref": label_id,
- }
- extensions.append(ext)
- # =========
- # ADD EXTENSIONS
- # =========
- if len(extensions) > 0:
- """
- Add extensions now... but it's tricky actually we need to check whether the exactly the same list of extensions
- already exists, otherwise we have to create a new one... ooops, we have a potential problem with the key
- mapping then... The trick is that we rely on the sorting of the extension IDs and then we can always find the
- last one in the list by setting the reference to the next to 0 and then proceed with the other elements.
- """
- sort_idx = np.argsort([e["ref"] for e in extensions])
- extensions = np.take(extensions, sort_idx)
- all_found = True
- extension_id = 0
- for i in range(len(extensions)):
- data = [extensions[i]["type"], extensions[i]["ref"], extension_id]
- extension_id, found = self.extensions_library.find(data)
- all_found = all_found and found
- if not found:
- break
- if not all_found:
- # Add the list
- extension_id = 0
- for i in range(len(extensions)):
- data = [extensions[i]["type"], extensions[i]["ref"], extension_id]
- extension_id, found = self.extensions_library.find(data)
- if not found:
- self.extensions_library.insert(extension_id, data)
- # Now we add the ID
- self.block_events[block_index][6] = extension_id
- # =========
- # PERFORM GRADIENT CHECKS
- # =========
- for grad_to_check in check_g.values():
- if (
- abs(grad_to_check.start[1])
- > self.system.max_slew * self.system.grad_raster_time
- ):
- if grad_to_check.start[0] != 0:
- raise ValueError(
- "No delay allowed for gradients which start with a non-zero amplitude"
- )
- if block_index > 1:
- prev_id = self.block_events[block_index - 1][grad_to_check.idx]
- if prev_id != 0:
- prev_lib = self.grad_library.get(prev_id)
- prev_data = prev_lib["data"]
- prev_type = prev_lib["type"]
- if prev_type == "t":
- raise RuntimeError(
- "Two consecutive gradients need to have the same amplitude at the connection point"
- )
- elif prev_type == "g":
- last = prev_data[5]
- if (
- abs(last - grad_to_check.start[1])
- > self.system.max_slew * self.system.grad_raster_time
- ):
- raise RuntimeError(
- "Two consecutive gradients need to have the same amplitude at the connection point"
- )
- else:
- raise RuntimeError(
- "First gradient in the the first block has to start at 0."
- )
- if (
- grad_to_check.stop[1] > self.system.max_slew * self.system.grad_raster_time
- and abs(grad_to_check.stop[0] - duration) > 1e-7
- ):
- raise RuntimeError(
- "A gradient that doesn't end at zero needs to be aligned to the block boundary."
- )
- self.block_durations.append(float(duration))
- def get_block(self, block_index: int) -> SimpleNamespace:
- """
- Returns PyPulseq block at `block_index` position in `self.block_events`.
- The block is created from the sequence data with all events and shapes decompressed.
- Parameters
- ----------
- block_index : int
- Index of PyPulseq block to be retrieved from `self.block_events`.
- Returns
- -------
- block : SimpleNamespace
- PyPulseq block at 'block_index' position in `self.block_events`.
- Raises
- ------
- ValueError
- If a trigger event of an unsupported control type is encountered.
- If a label object of an unknown extension ID is encountered.
- """
- block = SimpleNamespace()
- attrs = ["block_duration", "rf", "gx", "gy", "gz", "adc"]
- values = [None] * len(attrs)
- for att, val in zip(attrs, values):
- setattr(block, att, val)
- event_ind = self.block_events[block_index]
- if event_ind[0] > 0: # Delay
- delay = SimpleNamespace()
- delay.type = "delay"
- delay.delay = self.delay_library.data[event_ind[0]][0]
- block.delay = delay
- if event_ind[1] > 0: # RF
- if len(self.rf_library.type) >= event_ind[1]:
- block.rf = self.rf_from_lib_data(
- self.rf_library.data[event_ind[1]], self.rf_library.type[event_ind[1]]
- )
- else:
- block.rf = self.rf_from_lib_data(
- self.rf_library.data[event_ind[1]]
- ) # Undefined type/use
- # Gradients
- grad_channels = ["gx", "gy", "gz"]
- for i in range(len(grad_channels)):
- if event_ind[2 + i] > 0:
- grad, compressed = SimpleNamespace(), SimpleNamespace()
- grad_type = self.grad_library.type[event_ind[2 + i]]
- lib_data = self.grad_library.data[event_ind[2 + i]]
- grad.type = "trap" if grad_type == "t" else "grad"
- grad.channel = grad_channels[i][1]
- if grad.type == "grad":
- amplitude = lib_data[0]
- shape_id = lib_data[1]
- time_id = lib_data[2]
- delay = lib_data[3]
- shape_data = self.shape_library.data[shape_id]
- compressed.num_samples = shape_data[0]
- compressed.data = shape_data[1:]
- g = decompress_shape(compressed)
- grad.waveform = amplitude * g
- if time_id == 0:
- grad.tt = (np.arange(1, len(g) + 1) - 0.5) * self.grad_raster_time
- t_end = len(g) * self.grad_raster_time
- else:
- t_shape_data = self.shape_library.data[time_id]
- compressed.num_samples = t_shape_data[0]
- compressed.data = t_shape_data[1:]
- grad.tt = decompress_shape(compressed) * self.grad_raster_time
- assert len(grad.waveform) == len(grad.tt)
- t_end = grad.tt[-1]
- grad.shape_id = shape_id
- grad.time_id = time_id
- grad.delay = delay
- grad.shape_dur = t_end
- if len(lib_data) > 5:
- grad.first = lib_data[4]
- grad.last = lib_data[5]
- else:
- grad.amplitude = lib_data[0]
- grad.rise_time = lib_data[1]
- grad.flat_time = lib_data[2]
- grad.fall_time = lib_data[3]
- grad.delay = lib_data[4]
- grad.area = grad.amplitude * (
- grad.flat_time + grad.rise_time / 2 + grad.fall_time / 2
- )
- grad.flat_area = grad.amplitude * grad.flat_time
- setattr(block, grad_channels[i], grad)
- # ADC
- if event_ind[5] > 0:
- lib_data = self.adc_library.data[event_ind[5]]
- if len(lib_data) < 6:
- lib_data = np.append(lib_data, 0)
- adc = SimpleNamespace()
- (
- adc.num_samples,
- adc.dwell,
- adc.delay,
- adc.freq_offset,
- adc.phase_offset,
- adc.dead_time,
- ) = [lib_data[x] for x in range(6)]
- adc.num_samples = int(adc.num_samples)
- adc.type = "adc"
- block.adc = adc
- # Triggers
- if event_ind[6] > 0:
- # We have extensions - triggers, labels, etc.
- next_ext_id = event_ind[6]
- while next_ext_id != 0:
- ext_data = self.extensions_library.data[next_ext_id]
- # Format: ext_type, ext_id, next_ext_id
- ext_type = self.get_extension_type_string(ext_data[0])
- if ext_type == "TRIGGERS":
- trigger_types = ["output", "trigger"]
- data = self.trigger_library.data[ext_data[1]]
- trigger = SimpleNamespace()
- trigger.type = trigger_types[int(data[0]) - 1]
- if data[0] == 1:
- trigger_channels = ["osc0", "osc1", "ext1"]
- trigger.channel = trigger_channels[int(data[1]) - 1]
- elif data[0] == 2:
- trigger_channels = ["physio1", "physio2"]
- trigger.channel = trigger_channels[int(data[1]) - 1]
- else:
- raise ValueError("Unsupported trigger event type")
- trigger.delay = data[2]
- trigger.duration = data[3]
- # Allow for multiple triggers per block
- if hasattr(block, "trigger"):
- block.trigger[len(block.trigger)] = trigger
- else:
- block.trigger = {0: trigger}
- elif ext_type in ["LABELSET", "LABELINC"]:
- label = SimpleNamespace()
- label.type = ext_type.lower()
- supported_labels = get_supported_labels()
- if ext_type == "LABELSET":
- data = self.label_set_library.data[ext_data[1]]
- else:
- data = self.label_inc_library.data[ext_data[1]]
- label.label = supported_labels[int(data[1] - 1)]
- label.value = data[0]
- # Allow for multiple labels per block
- if hasattr(block, "label"):
- block.label[len(block.label)] = label
- else:
- block.label = {0: label}
- else:
- raise RuntimeError(f"Unknown extension ID {ext_data[0]}")
- next_ext_id = ext_data[2]
- block.block_duration = self.block_durations[block_index - 1]
- return block
- def register_adc_event(self, event: EventLibrary) -> int:
- """
- Parameters
- ----------
- event : SimpleNamespace
- ADC event to be registered.
- Returns
- -------
- int
- ID of registered ADC event.
- """
- data = np.array(
- [
- event.num_samples,
- event.dwell,
- np.max([event.delay, event.dead_time]),
- event.freq_offset,
- event.phase_offset,
- event.dead_time,
- ]
- )
- adc_id, _ = self.adc_library.find_or_insert(new_data=data)
- return adc_id
- def register_control_event(self, event: SimpleNamespace) -> int:
- """
- Parameters
- ----------
- event : SimpleNamespace
- Control event to be registered.
- Returns
- -------
- int
- ID of registered control event.
- """
- event_type = ["output", "trigger"].index(event.type)
- if event_type == 0:
- # Trigger codes supported by the Siemens interpreter as of May 2019
- event_channel = ["osc0", "osc1", "ext1"].index(event.channel)
- elif event_type == 1:
- # Trigger codes supported by the Siemens interpreter as of June 2019
- event_channel = ["physio1", "physio2"].index(event.channel)
- else:
- raise ValueError("Unsupported control event type")
- data = [event_type + 1, event_channel + 1, event.delay, event.duration]
- control_id, _ = self.trigger_library.find_or_insert(new_data=data)
- return control_id
- def register_grad_event(
- self, event: SimpleNamespace
- ) -> Union[int, Tuple[int, List[int]]]:
- """
- Parameters
- ----------
- event : SimpleNamespace
- Gradient event to be registered.
- Returns
- -------
- int, [int, ...]
- For gradient events: ID of registered gradient event, list of shape IDs
- int
- For trapezoid gradient events: ID of registered gradient event
- """
- may_exist = True
- if event.type == "grad":
- amplitude = np.abs(event.waveform).max()
- if amplitude > 0:
- fnz = event.waveform[np.nonzero(event.waveform)[0][0]]
- amplitude *= (
- np.sign(fnz) if fnz != 0 else 1
- ) # Workaround for np.sign(0) = 0
- if hasattr(event, "shape_IDs"):
- shape_IDs = event.shape_IDs
- else:
- shape_IDs = [0, 0]
- if amplitude != 0:
- g = event.waveform / amplitude
- else:
- g = event.waveform
- c_shape = compress_shape(g)
- s_data = np.insert(c_shape.data, 0, c_shape.num_samples)
- shape_IDs[0], found = self.shape_library.find_or_insert(s_data)
- may_exist = may_exist & found
- c_time = compress_shape(event.tt / self.grad_raster_time)
- if not (
- len(c_time.data) == 4
- and np.all(c_time.data == [0.5, 1, 1, c_time.num_samples - 3])
- ):
- t_data = np.insert(c_time.data, 0, c_time.num_samples)
- shape_IDs[1], found = self.shape_library.find_or_insert(t_data)
- may_exist = may_exist & found
- data = [amplitude, *shape_IDs, event.delay, event.first, event.last]
- elif event.type == "trap":
- data = np.array(
- [
- event.amplitude,
- event.rise_time,
- event.flat_time,
- event.fall_time,
- event.delay,
- ]
- )
- else:
- raise ValueError("Unknown gradient type passed to register_grad_event()")
- if may_exist:
- grad_id, _ = self.grad_library.find_or_insert(
- new_data=data, data_type=event.type[0]
- )
- else:
- grad_id = self.grad_library.insert(0, data, event.type[0])
- if event.type == "grad":
- return grad_id, shape_IDs
- elif event.type == "trap":
- return grad_id
- def register_label_event(self, event: SimpleNamespace) -> int:
- """
- Parameters
- ----------
- event : SimpleNamespace
- ID of label event to be registered.
- Returns
- -------
- int
- ID of registered label event.
- """
- label_id = get_supported_labels().index(event.label) + 1
- data = [event.value, label_id]
- if event.type == "labelset":
- label_id, _ = self.label_set_library.find_or_insert(new_data=data)
- elif event.type == "labelinc":
- label_id, _ = self.label_inc_library.find_or_insert(new_data=data)
- else:
- raise ValueError("Unsupported label type passed to register_label_event()")
- return label_id
- def register_rf_event(self, event: SimpleNamespace) -> Tuple[int, List[int]]:
- """
- Parameters
- ----------
- event : SimpleNamespace
- RF event to be registered.
- Returns
- -------
- int, [int, ...]
- ID of registered RF event, list of shape IDs
- """
- mag = np.abs(event.signal)
- amplitude = np.max(mag)
- mag /= amplitude
- # Following line of code is a workaround for numpy's divide functions returning NaN when mathematical
- # edge cases are encountered (eg. divide by 0)
- mag[np.isnan(mag)] = 0
- phase = np.angle(event.signal)
- phase[phase < 0] += 2 * np.pi
- phase /= 2 * np.pi
- may_exist = True
- if hasattr(event, "shape_IDs"):
- shape_IDs = event.shape_IDs
- else:
- shape_IDs = [0, 0, 0]
- mag_shape = compress_shape(mag)
- data = np.insert(mag_shape.data, 0, mag_shape.num_samples)
- shape_IDs[0], found = self.shape_library.find_or_insert(data)
- may_exist = may_exist & found
- phase_shape = compress_shape(phase)
- data = np.insert(phase_shape.data, 0, phase_shape.num_samples)
- shape_IDs[1], found = self.shape_library.find_or_insert(data)
- may_exist = may_exist & found
- time_shape = compress_shape(
- event.t / self.rf_raster_time
- ) # Time shape is stored in units of RF raster
- if len(time_shape.data) == 4 and np.all(
- time_shape.data == [0.5, 1, 1, time_shape.num_samples - 3]
- ):
- shape_IDs[2] = 0
- else:
- data = [time_shape.num_samples, *time_shape.data]
- shape_IDs[2], found = self.shape_library.find_or_insert(data)
- may_exist = may_exist & found
- use = "u" # Undefined
- if hasattr(event, "use"):
- if event.use in [
- "excitation",
- "refocusing",
- "inversion",
- "saturation",
- "preparation",
- ]:
- use = event.use[0]
- else:
- use = "u"
- data = np.array(
- [amplitude, *shape_IDs, event.delay, event.freq_offset, event.phase_offset]
- )
- if may_exist:
- rf_id, _ = self.rf_library.find_or_insert(new_data=data, data_type=use)
- else:
- rf_id = self.rf_library.insert(key_id=0, new_data=data, data_type=use)
- return rf_id, shape_IDs
|