import socket import struct import subprocess import numpy as np import serial import time as tm import msgpack probe = { '10mV': np.int8(0), '20mV': np.int8(1), '50mV': np.int8(2), '100mV': np.int8(3), '200mV': np.int8(4), '500mV': np.int8(5), '1V': np.int8(6), '2V': np.int8(7), '5V': np.int8(8), '10V': np.int8(9), '20V': np.int8(10), '50V': np.int8(11), '100V': np.int8(12), '200V': np.int8(13) } class adc_default: def __init__(self, port): self.port = port self.ndata = [] self.channels_data = [] self.measure_code = (0x00, 0) self.proto = { 'open': 0x01, 'set_rate': 0x07, 'set_points': 0x06, 'config_channels': 0x09, 'set_premeasure': 0x28, 'set_trignum': 0x19, 'start': 0x3B, 'stop': 0x03, } def connect(self, port=0): self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.client_socket.settimeout(10.0) resp = self.client_socket.connect(('127.0.0.1', self.port)) return resp def send_command(self, cmd_code, *args): if not self.connected: print("Not connected. Use 'connect' first.") return None try: buf = bytearray() buf.extend(self.pk.pack(0xAA)) buf.extend(self.pk.pack(cmd_code)) for arg in args: buf.extend(self.pk.pack(arg)) self.client_socket.sendall(buf) return self.receive_response() except Exception as e: print(f"Send failed: {e}") return None def receive_response(self): try: unpacker = msgpack.Unpacker() while True: chunk = self.client_socket.recv(8192) if not chunk: break unpacker.feed(chunk) objects = [] for obj in unpacker: objects.append(obj) print(f"Code: {objects[1]}") return objects[1], objects[2:] except Exception as e: print(f"Receive failed: {e}") return None def resp_handler(self, resp, cmd): if(resp[0] != 0xAA): return (-1, 0) if(resp[1] == 0xFF): return (0xFF, resp[3]) return (resp[1], 0) def open(self): resp = self.send_command(0x01) if resp: code, data = resp if code == 0xCC: return (0xCC, 0, 0) else: return (0xFF, code, data) def set_rate(self, rate): """Set sample rate. Usage: set_rate """ try: srate = int(rate) resp = self.send_command(0x07, srate) if resp: code, data = resp if code == 0xCC: return (0xCC, 0, 0) else: return (0xFF, code, data) except ValueError: return (0xFF, code, b"Invalid rate!") def set_points(self, points): """Set points vector. Usage: set_points ...""" try: resp = self.send_command(0x06, points) if resp: code, data = resp if code == 0xCC: return (0xCC, 0, 0) else: return (0xFF, code, data) except ValueError: return (0xFF, code, b"Invalid points!") def config_channels(self, nchannels, channel_ranges, trig_channel): """Configure channels. Usage: channel_configure ... """ try: resp = self.send_command(0x09, nchannels, channel_ranges, trig_channel) if resp: code, data = resp if code == 0xCC: return (0xCC, 0, 0) else: return (0xFF, code, data) except ValueError: return (0xFF, code, b"Invalid channel data!") def trigger_configure(self, th_dir, thresh, auto_ms): """Configure trigger. Usage: trigger_configure """ try: resp = self.send_command(0x0A, th_dir, thresh, auto_ms) # Assuming 0x0A if resp: code, data = resp if code == 0xCC: return (0xCC, 0, 0) else: return (0xFF, code, data) except ValueError: return (0xFF, code, b"Invalid trigger data!") def set_premeasure(self, trig_premeasure): """Set premeasurement percentage. Usage: set_premeasurement """ try: resp = self.send_command(0x0B, trig_premeasure) # Assuming 0x0B if resp: code, data = resp if code == 0xCC: return (0xCC, 0, 0) else: return (0xFF, code, data) except ValueError: return (0xFF, code, b"Invalid premeasurement!") def set_trignum(self, trig_num): pass # deprecated def start(self): """Start measurement.""" resp = self.send_command(0x20) if resp: code, data = resp if code == 0xCC: return (0xCC, 0, 0) else: return (0xFF, code, data) def aqcuire_data(self): if not self.connected: print("Not connected.") return try: # Receive data packets unpacker = msgpack.Unpacker() data_collected = {} buf = bytearray() buf.extend(self.pk.pack(0xAA)) buf.extend(self.pk.pack(0xBB)) self.sock.sendall(buf) last_i = 0 last_j = 0 curr_data = [] while True: resp_code, data = self.receive_response() print("Chunk recieved!") print(f'Resp: {resp_code} (need {0xCD} or {0xCE})') if resp_code == 0xCD: i, j, packet, size, packet_data = data[0], data[1], data[2], data[3], data[4] if last_j != j: print('Append channel') self.channels_data.append(curr_data) curr_data = [] if last_i != i: print('Append signal') self.ndata.append(self.channels_data) self.channels_data = [] last_i = i last_j = j curr_data.extend(packet_data) key = (i, j) print("Received packet params.") if key not in data_collected: data_collected[key] = [] print("Key created!") data_collected[key].extend(packet_data) # Assuming packet is list of int16 print("Appended!") buf = bytearray() buf.extend(self.pk.pack(0xAA)) buf.extend(self.pk.pack(0x3D)) self.sock.sendall(buf) elif resp_code == 0xCE: self.channels_data.append(curr_data) self.ndata.append(self.channels_data) curr_data = [] self.channels_data = [] # Assemble data buf = bytearray() buf.extend(self.pk.pack(0xAA)) buf.extend(self.pk.pack(0x3E)) self.sock.sendall(buf) print("Data received and acknowledged.") return else: print(f"Unexpected response: {resp_code}") return except Exception as e: print(f"Send points failed: {e}") def stop(self): resp = self.send_command(0x03) if resp: code, data = resp if code == 0xCC: return (0xCC, 0, 0) else: return (0xFF, code, data) def clear_ndata(self): self.ndata = [] self.channels_data = [] class sync_default: def __init__(self, port): if(port != -1): self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.proto = { 'upload': 0x01, 'trig_wait': 0x02 } self.serial = None def connect(self, port): self.client_socket.settimeout(5.0) resp = self.client_socket.connect(('localhost', port)) return resp def resp_handler(self, resp, cmd): if(resp[0] != 0xAA): return (-1, 0) if(resp[1] == 0xFF): return (0xFF, resp[3]) return (resp[1], 0) def upload(self, programPath, filePath, port): try: process = subprocess.run([programPath, filePath, '-p', str(port), '--debug']) except subprocess.CalledProcessError as e: print(f"Ошибка команды {e.cmd}!") return process.returncode def serial_open(self, port): try: self.serial = serial.Serial('COM' + str(port), 9600, timeout=1) except serial.SerialException as e: print(e) return (0xFF, -1) return(0x00, 0) def trig_wait(self, port): try: self.serial.write(b'e') except serial.SerialException as e: print(e) return (0xFF, -1) return(0x00, 0) def serial_close(self): try: self.serial.close() except: return (0xFF, -1) return (0x00, 0) class sdr_default: def __init__(self, port): if(port != -1): self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.proto = { 'transf': 0x01 } def connect(self, port): self.client_socket.settimeout(5.0) resp = self.client_socket.connect(('localhost', port)) return resp def transf(self, programPath, filePath, freq, rate, ampl, gain): try: process = subprocess.run([programPath, '-t', filePath, '-f', str(freq), '-s', str(rate), '-a', str(ampl), '-x', str(gain)], check=True) except subprocess.CalledProcessError as e: print(f"Ошибка команды {e.cmd}!") return (0xFF, -1) return (0x00, process.returncode) class gra_default: def __init__(self, ip, port): if(port != -1): self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.client_socket.settimeout(5.0) self.client_socket.connect((ip, port)) self.proto = { 'reset': 0x0001, 'ps_on': 0x0003, 'ps_off': 0x0004, 'state': 0x0005, 'upload': 0x0006, 'send_packet': 0x0007, 'confirm': 0x0008 } def connect(self, port): self.client_socket.settimeout(5.0) resp = self.client_socket.connect(('localhost', port)) return resp def reset(self): msg = struct.pack(' 6): magic, cmd, state, errbits, errpoint, current_setting_amp, current_amp, first_sensor, second_sensor = struct.unpack('