write_seq.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269
  1. import hashlib
  2. import numpy as np
  3. from seqgen.pypulseq.supported_labels_rf_use import get_supported_labels
  4. def write(self, file_name: str, create_signature) -> None:
  5. """
  6. Write the sequence data to the given filename using the open file format for MR sequences.
  7. See also `pypulseq.Sequence.read_seq.read()`.
  8. Parameters
  9. ----------
  10. file_name : str
  11. File name of `.seq` file to be written to disk.
  12. create_signature : bool
  13. Raises
  14. ------
  15. RuntimeError
  16. If an unsupported definition is encountered.
  17. """
  18. # `>.0f` for decimals.
  19. # `>g` to truncate insignificant zeros.
  20. file_name += ".seq" if file_name[-4:] != ".seq" not in file_name else ""
  21. with open(file_name, "w") as output_file:
  22. output_file.write("# Pulseq sequence file\n")
  23. output_file.write("# Created by PyPulseq\n\n")
  24. output_file.write("[VERSION]\n")
  25. output_file.write(f"major {self.version_major}\n")
  26. output_file.write(f"minor {self.version_minor}\n")
  27. output_file.write(f"revision {self.version_revision}\n")
  28. output_file.write("\n")
  29. if len(self.definitions) != 0:
  30. output_file.write("[DEFINITIONS]\n")
  31. keys = sorted(list(self.definitions.keys()))
  32. values = [self.definitions[k] for k in keys]
  33. for block_counter in range(len(keys)):
  34. output_file.write(f"{keys[block_counter]} ")
  35. if isinstance(values[block_counter], str):
  36. output_file.write(values[block_counter] + " ")
  37. elif isinstance(values[block_counter], (int, float)):
  38. output_file.write(f"{values[block_counter]:0.9g} ")
  39. elif isinstance(
  40. values[block_counter], (list, tuple, np.ndarray)
  41. ): # For example, [FOVx, FOVy, FOVz]
  42. for i in range(len(values[block_counter])):
  43. if isinstance(values[block_counter][i], (int, float)):
  44. output_file.write(f"{values[block_counter][i]:0.9g} ")
  45. else:
  46. output_file.write(f"{values[block_counter][i]} ")
  47. else:
  48. raise RuntimeError("Unsupported definition")
  49. output_file.write("\n")
  50. output_file.write("\n")
  51. output_file.write("# Format of blocks:\n")
  52. output_file.write("# NUM DUR RF GX GY GZ ADC EXT\n")
  53. output_file.write("[BLOCKS]\n")
  54. id_format_width = "{:" + str(len(str(len(self.block_events)))) + "d}"
  55. id_format_str = id_format_width + " {:3d} {:3d} {:3d} {:3d} {:3d} {:2d} {:2d}\n"
  56. for block_counter in range(len(self.block_events)):
  57. block_duration = (
  58. self.block_durations[block_counter] / self.block_duration_raster
  59. )
  60. block_duration_rounded = int(np.round(block_duration))
  61. assert np.abs(block_duration_rounded - block_duration) < 1e-6
  62. s = id_format_str.format(
  63. *(
  64. block_counter + 1,
  65. block_duration_rounded,
  66. *self.block_events[block_counter + 1][1:],
  67. )
  68. )
  69. output_file.write(s)
  70. output_file.write("\n")
  71. if len(self.rf_library.keys) != 0:
  72. output_file.write("# Format of RF events:\n")
  73. output_file.write(
  74. "# id amplitude mag_id phase_id time_shape_id delay freq phase\n"
  75. )
  76. output_file.write(
  77. "# .. Hz .... .... .... us Hz rad\n"
  78. )
  79. output_file.write("[RF]\n")
  80. rf_lib_keys = self.rf_library.keys
  81. id_format_str = "{:.0f} {:12g} {:.0f} {:.0f} {:.0f} {:g} {:g} {:g}\n" # Refer lines 20-21
  82. for k in rf_lib_keys.keys():
  83. lib_data1 = self.rf_library.data[k][0:4]
  84. lib_data2 = self.rf_library.data[k][5:7]
  85. delay = (
  86. np.round(self.rf_library.data[k][4] / self.rf_raster_time)
  87. * self.rf_raster_time
  88. * 1e6
  89. )
  90. s = id_format_str.format(k, *lib_data1, delay, *lib_data2)
  91. output_file.write(s)
  92. output_file.write("\n")
  93. grad_lib_values = np.array(list(self.grad_library.type.values()))
  94. arb_grad_mask = grad_lib_values == "g"
  95. trap_grad_mask = grad_lib_values == "t"
  96. if np.any(arb_grad_mask):
  97. output_file.write("# Format of arbitrary gradients:\n")
  98. output_file.write(
  99. "# time_shape_id of 0 means default timing (stepping with grad_raster starting at 1/2 of grad_raster)\n"
  100. )
  101. output_file.write("# id amplitude amp_shape_id time_shape_id delay\n")
  102. output_file.write("# .. Hz/m .. .. us\n")
  103. output_file.write("[GRADIENTS]\n")
  104. id_format_str = "{:.0f} {:12g} {:.0f} {:.0f} {:.0f}\n" # Refer lines 20-21
  105. keys = np.array(list(self.grad_library.keys.keys()))
  106. for k in keys[arb_grad_mask]:
  107. s = id_format_str.format(
  108. k,
  109. *self.grad_library.data[k][:3],
  110. np.round(self.grad_library.data[k][3] * 1e6),
  111. )
  112. output_file.write(s)
  113. output_file.write("\n")
  114. if np.any(trap_grad_mask):
  115. output_file.write("# Format of trapezoid gradients:\n")
  116. output_file.write("# id amplitude rise flat fall delay\n")
  117. output_file.write("# .. Hz/m us us us us\n")
  118. output_file.write("[TRAP]\n")
  119. keys = np.array(list(self.grad_library.keys.keys()))
  120. id_format_str = "{:2g} {:12g} {:3g} {:4g} {:3g} {:3g}\n"
  121. for k in keys[trap_grad_mask]:
  122. data = np.copy(
  123. self.grad_library.data[k]
  124. ) # Make a copy to leave the original untouched
  125. data[1:] = np.round(1e6 * data[1:])
  126. """
  127. Python & Numpy always round to nearest even value - inconsistent with MATLAB Pulseq's .seq files.
  128. [1] https://stackoverflow.com/questions/29671945/format-string-rounding-inconsistent
  129. [2] https://stackoverflow.com/questions/50374779/how-to-avoid-incorrect-rounding-with-numpy-round
  130. """
  131. s = id_format_str.format(k, *data)
  132. output_file.write(s)
  133. output_file.write("\n")
  134. if len(self.adc_library.keys) != 0:
  135. output_file.write("# Format of ADC events:\n")
  136. output_file.write("# id num dwell delay freq phase\n")
  137. output_file.write("# .. .. ns us Hz rad\n")
  138. output_file.write("[ADC]\n")
  139. keys = self.adc_library.keys
  140. id_format_str = (
  141. "{:.0f} {:.0f} {:.0f} {:.0f} {:g} {:g}\n" # Refer lines 20-21
  142. )
  143. for k in keys.values():
  144. data = np.multiply(self.adc_library.data[k][0:5], [1, 1e9, 1e6, 1, 1])
  145. s = id_format_str.format(k, *data)
  146. output_file.write(s)
  147. output_file.write("\n")
  148. if len(self.extensions_library.keys) != 0:
  149. output_file.write("# Format of extension lists:\n")
  150. output_file.write("# id type ref next_id\n")
  151. output_file.write("# next_id of 0 terminates the list\n")
  152. output_file.write(
  153. "# Extension list is followed by extension specifications\n"
  154. )
  155. output_file.write("[EXTENSIONS]\n")
  156. keys = self.extensions_library.keys
  157. id_format_str = "{:.0f} {:.0f} {:.0f} {:.0f}\n" # Refer lines 20-21
  158. for k in keys.values():
  159. s = id_format_str.format(k, *np.round(self.extensions_library.data[k]))
  160. output_file.write(s)
  161. output_file.write("\n")
  162. if len(self.trigger_library.keys) != 0:
  163. output_file.write(
  164. "# Extension specification for digital output and input triggers:\n"
  165. )
  166. output_file.write("# id type channel delay (us) duration (us)\n")
  167. output_file.write(
  168. f'extension TRIGGERS {self.get_extension_type_ID("TRIGGERS")}\n'
  169. )
  170. keys = self.trigger_library.keys
  171. id_format_str = "{:.0f} {:.0f} {:.0f} {:.0f} {:.0f}\n" # Refer lines 20-21
  172. for k in keys.values():
  173. s = id_format_str.format(
  174. k, *np.round(self.trigger_library.data[k] * [1, 1, 1e6, 1e6])
  175. )
  176. output_file.write(s)
  177. output_file.write("\n")
  178. if len(self.label_set_library.keys) != 0:
  179. labels = get_supported_labels()
  180. output_file.write("# Extension specification for setting labels:\n")
  181. output_file.write("# id set labelstring\n")
  182. tid = self.get_extension_type_ID("LABELSET")
  183. output_file.write(f"extension LABELSET {tid}\n")
  184. keys = self.label_set_library.keys
  185. id_format_str = "{:.0f} {:.0f} {}\n" # Refer lines 20-21
  186. for k in keys.values():
  187. value = self.label_set_library.data[k][0]
  188. label_id = labels[
  189. int(self.label_set_library.data[k][1]) - 1
  190. ] # label_id is +1 in add_block()
  191. s = id_format_str.format(k, value, label_id)
  192. output_file.write(s)
  193. output_file.write("\n")
  194. output_file.write("# Extension specification for setting labels:\n")
  195. output_file.write("# id set labelstring\n")
  196. tid = self.get_extension_type_ID("LABELINC")
  197. output_file.write(f"extension LABELINC {tid}\n")
  198. keys = self.label_inc_library.keys
  199. id_format_str = "{:.0f} {:.0f} {}\n" # See comment at the beginning of this method definition
  200. for k in keys.values():
  201. value = self.label_inc_library.data[k][0]
  202. label_id = labels[
  203. self.label_inc_library.data[k][1] - 1
  204. ] # label_id is +1 in add_block()
  205. s = id_format_str.format(k, value, label_id)
  206. output_file.write(s)
  207. output_file.write("\n")
  208. if len(self.shape_library.keys) != 0:
  209. output_file.write("# Sequence Shapes\n")
  210. output_file.write("[SHAPES]\n\n")
  211. keys = self.shape_library.keys
  212. for k in keys.values():
  213. shape_data = self.shape_library.data[k]
  214. s = "shape_id {:.0f}\n".format(k)
  215. output_file.write(s)
  216. s = "num_samples {:.0f}\n".format(shape_data[0])
  217. output_file.write(s)
  218. s = ("{:.9g}\n" * len(shape_data[1:])).format(*shape_data[1:])
  219. output_file.write(s)
  220. output_file.write("\n")
  221. if create_signature: # Sign the file
  222. # Calculate digest
  223. with open(file_name, "r") as output_file:
  224. buffer = output_file.read()
  225. md5 = hashlib.md5(buffer.encode("utf-8")).hexdigest()
  226. self.signature_type = "md5"
  227. self.signature_file = "text"
  228. self.signature_value = md5
  229. # Write signature
  230. with open(file_name, "a") as output_file:
  231. output_file.write("\n[SIGNATURE]\n")
  232. output_file.write(
  233. "# This is the hash of the Pulseq file, calculated right before the [SIGNATURE] section was added\n"
  234. )
  235. output_file.write(
  236. "# It can be reproduced/verified with md5sum if the file trimmed to the position right above [SIGNATURE]\n"
  237. )
  238. output_file.write(
  239. "# The new line character preceding [SIGNATURE] BELONGS to the signature (and needs to be stripped away for "
  240. "recalculating/verification)\n"
  241. )
  242. output_file.write("Type md5\n")
  243. output_file.write(f"Hash {md5}\n")