decompress_shape.py 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. from types import SimpleNamespace
  2. import numpy as np
  3. def decompress_shape(
  4. compressed_shape: SimpleNamespace, force_decompression: bool = False
  5. ) -> np.ndarray:
  6. """
  7. Decompress a gradient or pulse shape compressed with a run-length compression scheme on the derivative. The given
  8. shape is structure with the following fields:
  9. - num_samples - the number of samples in the uncompressed waveform
  10. - data - containing the compressed waveform
  11. See also `compress_shape.py`.
  12. Parameters
  13. ----------
  14. compressed_shape : SimpleNamespace
  15. Run-length encoded shape.
  16. force_decompression : bool, default=False
  17. Returns
  18. -------
  19. decompressed_shape : numpy.ndarray
  20. Decompressed shape.
  21. """
  22. data_pack = compressed_shape.data
  23. data_pack_len = len(data_pack)
  24. num_samples = int(compressed_shape.num_samples)
  25. if not force_decompression and num_samples == data_pack_len:
  26. # Uncompressed shape
  27. decompressed_shape = data_pack
  28. return decompressed_shape
  29. decompressed_shape = np.zeros(num_samples) # Pre-allocate result matrix
  30. # Decompression starts here
  31. data_pack_diff = data_pack[1:] - data_pack[:-1]
  32. # When data_pack_diff == 0 the subsequent samples are equal ==> marker for repeats (run-length encoding)
  33. data_pack_markers = np.where(data_pack_diff == 0.0)[0]
  34. count_pack = 0 # Points to current compressed sample
  35. count_unpack = 0 # Points to current uncompressed sample
  36. for i in range(len(data_pack_markers)):
  37. # This index may have "false positives", e.g. if the value 3 repeats 3 times, then we will have 3 3 3
  38. next_pack = data_pack_markers[i]
  39. current_unpack_samples = next_pack - count_pack
  40. if current_unpack_samples < 0: # Rejects false positives
  41. continue
  42. elif current_unpack_samples > 0: # We have an unpacked block to copy
  43. decompressed_shape[
  44. count_unpack : count_unpack + current_unpack_samples
  45. ] = data_pack[count_pack:next_pack]
  46. count_pack += current_unpack_samples
  47. count_unpack += current_unpack_samples
  48. # Packed/repeated section
  49. rep = int(data_pack[count_pack + 2] + 2)
  50. decompressed_shape[count_unpack : (count_unpack + rep)] = data_pack[count_pack]
  51. count_pack += 3
  52. count_unpack += rep
  53. # Samples left?
  54. if count_pack <= data_pack_len - 1:
  55. assert data_pack_len - count_pack == num_samples - count_unpack
  56. # Copy the rest of the shape, it is unpacked
  57. decompressed_shape[count_unpack:] = data_pack[count_pack:]
  58. decompressed_shape = np.cumsum(decompressed_shape)
  59. return decompressed_shape