SpectometerDecodeLib.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. import json
  2. import base64
  3. import numpy as np
  4. import scipy.fft as fft
  5. import matplotlib.pyplot as plt
  6. class DataDecoder:
  7. def __init__(self, filename):
  8. self.data = ''
  9. with open(filename, 'r') as file:
  10. self.data = file.read()
  11. self.structed_data = json.loads(self.data)
  12. def getRawData(self, averaging_num=0, data_num=0, channel_num=0):
  13. for items in self.structed_data:
  14. if(items['averaging_num'] == averaging_num and items['data_num'] == data_num):
  15. for channel_data in items['channel_data']:
  16. if(channel_data['channel_num'] == channel_num):
  17. return channel_data['channel_data']
  18. return None
  19. def getDataDecoded(self, averaging_num=0, data_num=0, channel_num=0, points=10):
  20. rawData = self.getRawData(averaging_num, data_num, channel_num)
  21. edata = base64.b64decode(rawData.encode('utf-8'))
  22. arr = np.frombuffer(edata, dtype=np.int16, count=points, offset=0)
  23. return arr
  24. def getDataScaled(self, averaging_num=0, data_num=0, channel_num=0, points=10, range=5.0):
  25. decodedData = self.getDataDecoded(averaging_num, data_num, channel_num, points)
  26. return range * decodedData / 32768
  27. def getDataRate(self, averaging_num=0, data_num=0):
  28. for items in self.structed_data:
  29. if(items['averaging_num'] == averaging_num and items['data_num'] == data_num):
  30. return items['measurement_rate']
  31. def getDataSpectrum(self, averaging_num=0, data_num=0, channel_num=0, points=10, range=5.0, zero_fill=0):
  32. scaledData = self.getDataScaled(averaging_num, data_num, channel_num, points, range)
  33. zerofilledData = np.append(scaledData, np.zeros(zero_fill))
  34. rate = self.getDataRate()
  35. transferedData = fft.rfft(zerofilledData) * 2 / points
  36. freqs = fft.rfftfreq(points+zero_fill, 1/rate)
  37. spectrum = np.abs(transferedData)
  38. phases = np.angle(transferedData)
  39. spect_dict = {'freqs': freqs,
  40. 'phases': phases,
  41. 'spectrum': spectrum}
  42. return spect_dict
  43. def getDataPoints(self, averaging_num=0, data_num=0, channel_num=0):
  44. for items in self.structed_data:
  45. if(items['averaging_num'] == averaging_num and items['data_num'] == data_num):
  46. return items['measurement_points'][channel_num]
  47. #dec = DataDecoder('saved_new.json')
  48. #print('getRawData(0, 0, 1):')
  49. #print(dec.getRawData(0, 0, 1)) # ::getRawData(averagingIndex, dataTriggerIndex, channelIndex)
  50. #print('\n')
  51. #print('getDataDecoded(0, 0, 0, 100):')
  52. #a = dec.getDataDecoded(0, 0, 0, 100)
  53. #print(a[0:100]) # ::getDataDecoded(averagingIndex, dataTriggerIndex, channelIndex, count)
  54. #print('\n')
  55. #print('getDataScaled(0, 1, 1, 100, 5.0):')
  56. #a = dec.getDataScaled(0, 1, 1, 100, 5.0)
  57. #print(a[0:100])
  58. #print('\n')
  59. #print('getDataSpectrum(0, 1, 1, max_points, 5.0, max_points):')
  60. #points = dec.getDataPoints(0, 1, 1)
  61. #spect_dict = dec.getDataSpectrum(0, 1, 1, points, 5.0, 10000)
  62. #print('\n')