import itertools as itt import sys import time import datetime import serial import win32event import win32file import win32api import pywintypes import subprocess import matplotlib import numpy as np from sklearn.linear_model import LinearRegression from scipy.signal import butter, filtfilt freq = 3000000 sample_rate = 2000000 amp = 1 xgain = 35 rf_filename = "test_linreg.bin" port = 11 adc_filename = "pico_params.xml" below_trigger = False sync_filename = "Sync_param.xml" pico_rate = 80e6 # sample rate - 1st element # period - 2nd element # inverse duty cycle - 3rd element # number - 4th element def load_csv(file_path): try: with open(file_path, 'r', encoding='utf-8') as f: cleaned_lines = [] for line in f: line = line.strip() if line.endswith(','): line = line[:-1] cleaned_lines.append(line) f.close() loaded_array = np.loadtxt(cleaned_lines, delimiter=',') if loaded_array.ndim == 1: loaded_array = loaded_array.reshape(-1, 1) return loaded_array except Exception as e: print(f"Could not load file:\n{e}") return -1 ls_tup = [ list(item) for item in itt.product( range(1, 3), range(1, 11), range(1, 6), range(1, 6) ) ] ls_ls = [] for tup in ls_tup: ls_ls.append(list(tup)) for x in ls_ls: if x[0] == 1: # sr in MHz x[0] = 10 elif x[0] == 2: x[0] = 20 elif x[0] == 3: x[0] = 10 elif x[0] == 4: x[0] = 20 if x[1] == 1: # period in us x[1] = 240 elif x[1] == 2: x[1] = 480 elif x[1] == 3: x[1] = 960 elif x[1] == 4: x[1] = 1920 elif x[1] == 5: x[1] = 2880 elif x[1] == 6: x[1] = 3840 elif x[1] == 7: x[1] = 5760 elif x[1] == 8: x[1] = 7680 elif x[1] == 9: x[1] = 9600 elif x[1] == 10: x[1] = 11520 if x[2] == 1: # idc x[2] = 1.5 elif x[2] == 2: x[2] = 2 elif x[2] == 3: x[2] = 4 elif x[2] == 4: x[2] = 8 elif x[2] == 5: x[2] = 10 if x[3] == 1: # time of pulses x[3] = 100 * x[1] elif x[3] == 2: x[3] = 200 * x[1] elif x[3] == 3: x[3] = 400 * x[1] elif x[3] == 4: x[3] = 600 * x[1] elif x[3] == 5: x[3] = 800 * x[1] nmes = len(ls_ls) * 5 print(f"Num mes: {nmes}") res = 0 cnt = 1 y = np.array([]) for x in ls_ls: delays_arr = np.array([]) for i in range(0, 5): print(f'\nMeasure {cnt}-{i+1} (in {nmes}): {x}\n') print(f'Number: {round(x[3] / x[1])}') print(f'PW: {round(x[1] / x[2])}') try: res = subprocess.run(["python.exe", 'pulse_generator_console_logreg.py', str(x[1]), 'uS', str(round(x[1] / x[2])), 'uS', str(0), 'uS', str(2), 'uS', str(round(x[3] / x[1])), str(x[0])], stdout=subprocess.DEVNULL, creationflags=subprocess.CREATE_NO_WINDOW) except subprocess.CalledProcessError as e: if res != 0: print(f"Error at Sync.exe! Return: {e.returncode}") sys.exit(1) try: res = subprocess.run(["Sync.exe", sync_filename, "--debug", "-p", str(port)], stdout=subprocess.DEVNULL, creationflags=subprocess.CREATE_NO_WINDOW) except subprocess.CalledProcessError as e: if res != 0: print(f"Error at Sync.exe! Return: {e.returncode}") sys.exit(1) try: ser = serial.Serial("COM" + str(port), 9600) ser.write(bytes('e', 'utf-8')) except serial.SerialException as e: print(f"Error at serial port: {e}") sys.exit(1) sem = win32event.CreateSemaphore(None, 0, 1, "wait-semaphore-5d95950d-a278-4733-a041-ee9fb05ad4e4") if sem == None: print(f"Error at semaphore: cannot create!") win32event.CloseHandle(sem) sys.exit(1) if below_trigger: pargs = ["pico_test_00_second_copy.exe", adc_filename, "--below"] # Не ставьте --debug, будут очень мусорные логи на сотни мегабайт! else: pargs = ["pico_test_00_second_copy.exe", adc_filename] # Using Popen for asynchronous running try: picoproc = subprocess.Popen(pargs, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, creationflags=subprocess.CREATE_NO_WINDOW) print(f"Pico process PID: {picoproc.pid}") except subprocess.SubprocessError as e: print(f"Error at Pico! Return: {e.returncode}") picoproc.kill() sys.exit(1) ret = win32event.WaitForSingleObject(sem, 20000) if ret == win32event.WAIT_TIMEOUT: print("Pico timeout!") win32event.ReleaseSemaphore(sem, 1) win32file.CloseHandle(sem) sys.exit(1) try: res = subprocess.run(["hackrftrans00.exe", "-t", rf_filename, "-f", str(round(3e6)), "-s", str(round(x[0] * 1e6)), "-a", str(amp), "-x", str(xgain)]) except subprocess.CalledProcessError as e: if res != 0: print(f"Error at HackRF! Return: {e.returncode}") sys.exit(1) try: outs, errs = picoproc.communicate(timeout=15) except TimeoutExpired: picoproc.kill() outs, errs = proc.communicate() print(f"End of measurement. Loading CSV... \n") time.sleep(3) arr = load_csv('output\\data_fixed.csv') tarr = np.transpose(arr) envelope = np.abs(tarr[1]) cutoff = 1000e3 nyquist = 0.5 * pico_rate order = 5 b, a = butter(order, cutoff / nyquist, btype='low') filt_envelope = filtfilt(b, a, envelope) startcut = round(0.05 * np.size(tarr[1])) endcut = round(0.5 * np.size(tarr[1])) demodulated_voltage = np.average(filt_envelope[startcut:endcut]) print(f'Start cut: {startcut}') print(f'End cut: {endcut}') print(f'Demoodulated voltage: {demodulated_voltage}') i = startcut while filt_envelope[i-1] > demodulated_voltage * 0.5 and i < np.size(tarr[1]): i = i + 1 delay = round((np.size(tarr[1]) - i) / 0.08) print(f'Delay: {delay} nS') delays_arr = np.append(delays_arr, [delay]) win32file.CloseHandle(sem) ser.close() time.sleep(0.1) mean_delay = np.average(delays_arr) print(delays_arr) print(f'\nMean delay {x}: {mean_delay} nS') y = np.append(y, [mean_delay]) with open('delays_new.txt', 'a', encoding='utf-8') as f: f.write(f'sr = {x[0]} MHz; period = {x[1]} uS; idc = {x[2]}; time = {x[3]} uS \n delay = {mean_delay} nS \n\n') f.close() cnt = cnt + 1 reg = LinearRegression().fit(np.array(ls_ls), y) reg.score(np.array(ls_ls), y) print('Coef: ', reg.coef_) print('Intercept: ', reg.intercept_) with open('regression_updated.txt', 'w', encoding='utf-8') as f: f.write(f'sr_coef = {reg.coef_[0]}; period_coef = {reg.coef_[1]}; idc_coef = {reg.coef_[2]}; time_coef = {reg.coef_[3]} nS/uS \n intercept = {reg.intercept_} \n\n') f.close()