123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163 |
- import itertools as itt
- import sys
- import time
- import datetime
- import serial
- import win32event
- import win32file
- import win32api
- import pywintypes
- import subprocess
- import matplotlib
- import numpy as np
- from sklearn.linear_model import LinearRegression
- from scipy.signal import butter, filtfilt
- def load_csv(file_path):
- try:
- with open(file_path, 'r', encoding='utf-8') as f:
- cleaned_lines = []
- for line in f:
- line = line.strip()
- if line.endswith(','):
- line = line[:-1]
- cleaned_lines.append(line)
- loaded_array = np.loadtxt(cleaned_lines, delimiter=',')
- if loaded_array.ndim == 1:
- loaded_array = loaded_array.reshape(-1, 1)
- return loaded_array
- except Exception as e:
- print(f"Could not load file:\n{e}")
- return -1
- freq = 3000000
- sample_rate = 2000000
- amp = 1
- xgain = 35
- rf_filename = "test_linreg.bin"
- port = 11
- adc_filename = "pico_params.xml"
- below_trigger = False
- sync_filename = "Sync_param.xml"
- pico_rate = 80e6
- ls_ls = [[2, 400, 2, 300, 120000],
- [2, 8000, 2, 200, 1600000],
- [2, 6000, 1.5, 200, 1200000],
- [8, 2000, 4, 300, 600000],
- [8, 1000, 10, 100, 100000],
- [8, 200, 2, 500, 100000],
- [20, 500, 4, 400, 200000],
- [20, 800, 8, 500, 400000],
- [20, 2000, 4, 50, 100000],
- [10, 5000, 5, 400, 2000000]]
- print(f"Num mes: {len(ls_ls) * 3}")
- res = 0
- y = np.array([])
- for x in ls_ls:
- delays_arr = np.array([])
- for i in range(0, 1):
- print(f'\nMeasure {i}: {x}\n')
- try:
- res = subprocess.run(["python.exe", 'pulse_generator_console_logreg.py', str(x[1]), 'uS', str(round(x[1] / x[2])), 'uS', str(0), 'uS', str(2), 'uS', str(x[3]), str(x[0])], stdout=subprocess.DEVNULL, creationflags=subprocess.CREATE_NO_WINDOW)
- except subprocess.CalledProcessError as e:
- if res != 0:
- print(f"Error at Sync.exe! Return: {e.returncode}")
- sys.exit(1)
- try:
- res = subprocess.run(["Sync.exe", sync_filename, "--debug", "-p", str(port)], stdout=subprocess.DEVNULL, creationflags=subprocess.CREATE_NO_WINDOW)
- except subprocess.CalledProcessError as e:
- if res != 0:
- print(f"Error at Sync.exe! Return: {e.returncode}")
- sys.exit(1)
- try:
- ser = serial.Serial("COM" + str(port), 9600)
- ser.write(bytes('e', 'utf-8'))
- except serial.SerialException as e:
- print(f"Error at serial port: {e}")
- sys.exit(1)
- sem = win32event.CreateSemaphore(None, 0, 1, "wait-semaphore-5d95950d-a278-4733-a041-ee9fb05ad4e4")
- if sem == None:
- print(f"Error at semaphore: cannot create!")
- win32event.CloseHandle(sem)
- sys.exit(1)
- if below_trigger:
- pargs = ["pico_test_00_second_copy.exe", adc_filename, "--below"] # Не ставьте --debug, будут очень мусорные логи на сотни мегабайт!
- else:
- pargs = ["pico_test_00_second_copy.exe", adc_filename]
- # Using Popen for asynchronous running
- try:
- picoproc = subprocess.Popen(pargs, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, creationflags=subprocess.CREATE_NO_WINDOW)
- print(f"Pico process PID: {picoproc.pid}")
- except subprocess.SubprocessError as e:
- print(f"Error at Pico! Return: {e.returncode}")
- picoproc.kill()
- sys.exit(1)
- ret = win32event.WaitForSingleObject(sem, 20000)
- if ret == win32event.WAIT_TIMEOUT:
- print("Pico timeout!")
- win32event.ReleaseSemaphore(sem, 1)
- win32file.CloseHandle(sem)
- sys.exit(1)
- try:
- res = subprocess.run(["hackrftrans00.exe", "-t", rf_filename, "-f", str(round(3e6)), "-s", str(round(x[0] * 1e6)), "-a", str(amp), "-x", str(xgain)])
- except subprocess.CalledProcessError as e:
- if res != 0:
- print(f"Error at HackRF! Return: {e.returncode}")
- sys.exit(1)
- print(f"End of measurement. Loading CSV... \n")
- time.sleep(10)
- arr = load_csv('output\\data_fixed.csv')
- tarr = np.transpose(arr)
- envelope = np.abs(tarr[1])
- cutoff = 1000e3
- nyquist = 0.5 * pico_rate
- order = 5
- b, a = butter(order, cutoff / nyquist, btype='low')
- filt_envelope = filtfilt(b, a, envelope)
- startcut = round(0.05 * np.size(tarr[1]))
- endcut = round(0.5 * np.size(tarr[1]))
- demodulated_voltage = np.average(filt_envelope[startcut:endcut])
- print(f'Start cut: {startcut}')
- print(f'End cut: {endcut}')
- print(f'Demoodulated voltage: {demodulated_voltage}')
- i = startcut
- while filt_envelope[i] > demodulated_voltage * 0.5 and i < np.size(tarr[1]):
- i = i + 1
- delay = round((np.size(tarr[1]) - i) / 0.08)
- print(f'Delay: {delay} nS')
- delays_arr = np.append(delays_arr, [delay])
- y = (1474.443027 * x[0] - 262.072789 * x[1] / 1e3 - 647.769975 * x[2] + 39740.431031 * x[4] / 1e6 - 6852.33)
- print(f'Predicted delay: {y} nS')
- with open('predication.txt', 'a', encoding='utf-8') as f:
- f.write(f'sr = {x[0]} MHz; period = {x[1]} ms; idc = {x[2]}; time = {x[4] / 1e6} s;\ndelay = {delay} nS;\npredicted = {y} nS;\n\n')
- f.close()
- win32file.CloseHandle(sem)
- ser.close()
- time.sleep(0.1)
|