import socket import msgpack import numpy as np import cmd import sys class PicoClient(cmd.Cmd): intro = 'Welcome to Pico ADC client. Type help or ? to list commands.\n' prompt = '(pico) ' def __init__(self, host='localhost', port=5003): super().__init__() self.host = host self.port = port self.sock = None self.magic = 0xAA self.connected = False self.pk = msgpack.Packer() def connect(self): if self.connected: print("Already connected.") return try: self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.connect((self.host, self.port)) self.connected = True print(f"Connected to {self.host}:{self.port}") except Exception as e: print(f"Connection failed: {e}") def disconnect(self): if self.sock: self.sock.close() self.sock = None self.connected = False print("Disconnected.") def send_command(self, cmd_code, *args): if not self.connected: print("Not connected. Use 'connect' first.") return None try: buf = bytearray() buf.extend(self.pk.pack(0xAA)) buf.extend(self.pk.pack(cmd_code)) for arg in args: buf.extend(self.pk.pack(arg)) self.sock.sendall(buf) return self.receive_response() except Exception as e: print(f"Send failed: {e}") return None def receive_response(self): try: unpacker = msgpack.Unpacker() while True: chunk = self.sock.recv(8192) if not chunk: break unpacker.feed(chunk) objects = [] for obj in unpacker: objects.append(obj) print(f"Code: {objects[1]}") return objects[1], objects[2:] except Exception as e: print(f"Receive failed: {e}") return None def do_connect(self, arg): """Connect to the server.""" self.connect() def do_disconnect(self, arg): """Disconnect from the server.""" self.disconnect() def do_open(self, arg): """Open the Pico device.""" resp = self.send_command(0x01) if resp: code, data = resp if code == 0xCC: print("Device opened successfully.") else: print(f"Error: {code}, {data}") def do_close(self, arg): """Close the Pico device.""" resp = self.send_command(0x03) if resp: code, data = resp if code == 0xCC: print("Device closed successfully.") else: print(f"Error: {code}, {data}") def do_status(self, arg): """Get status.""" resp = self.send_command(0x30) if resp: code, data = resp if code == 0xFF: print(f"Error: {code}, {data}") else: print(f"Status: {code}") def do_get_params(self, arg): """Get current parameters.""" resp = self.send_command(0x04) if resp: code, data = resp if code == 0xCD and data: params = data sample_rate, measurement_times, number_channels, trig_channel, threshold, th_direction, trig_autoTrigger_ms = params print(f"Sample rate: {sample_rate}") print(f"Measurement times: {measurement_times}") print(f"Number channels: {number_channels}") print(f"Trig channel: {trig_channel}") print(f"Threshold: {threshold}") print(f"TH direction: {th_direction}") print(f"Trig auto trigger ms: {trig_autoTrigger_ms}") else: print(f"Error: {code}, {data}") def do_set_rate(self, arg): """Set sample rate. Usage: set_rate """ try: rate = int(arg) resp = self.send_command(0x07, rate) if resp: code, data = resp if code == 0xCC: print("Sample rate set.") else: print(f"Error: {code}, {data}") except ValueError: print("Invalid rate.") def do_set_points(self, arg): """Set points vector. Usage: set_points ...""" try: points = [int(x) for x in arg.split()] resp = self.send_command(0x06, points) if resp: code, data = resp if code == 0xCC: print("Points set.") else: print(f"Error: {code}, {data}") except ValueError: print("Invalid points.") def do_set_times(self, arg): """Set measurement times. Usage: set_times """ try: times = int(arg) resp = self.send_command(0x08, times) if resp: code, data = resp if code == 0xCC: print("Times set.") else: print(f"Error: {code}, {data}") except ValueError: print("Invalid times.") def do_channel_configure(self, arg): """Configure channels. Usage: channel_configure ... """ try: parts = arg.split() num_channels = int(parts[0]) ranges = [int(x) for x in parts[1:1+num_channels]] trig_channel = int(parts[1+num_channels]) resp = self.send_command(0x09, num_channels, ranges, trig_channel) if resp: code, data = resp if code == 0xCC: print("Channels configured.") else: print(f"Error: {code}, {data}") except (ValueError, IndexError): print("Invalid arguments. Usage: channel_configure ") def do_trigger_configure(self, arg): """Configure trigger. Usage: trigger_configure """ try: th_dir, thresh, auto_ms = [int(x) for x in arg.split()] resp = self.send_command(0x0A, th_dir, thresh, auto_ms) # Assuming 0x0A if resp: code, data = resp if code == 0xCC: print("Trigger configured.") else: print(f"Error: {code}, {data}") except ValueError: print("Invalid arguments.") def do_set_premeasurement(self, arg): """Set premeasurement percentage. Usage: set_premeasurement """ try: perc = int(arg) resp = self.send_command(0x0B, perc) # Assuming 0x0B if resp: code, data = resp if code == 0xCC: print("Premeasurement set.") else: print(f"Error: {code}, {data}") except ValueError: print("Invalid percentage.") def do_measurement(self, arg): """Start measurement.""" resp = self.send_command(0x20) if resp: code, data = resp if code == 0xCC: print("Measurement started.") else: print(f"Error: {code}, {data}") def do_stop(self, arg): """Stop measurement.""" resp = self.send_command(0x0E) if resp: code, data = resp if code == 0xCC: print("Measurement stopped.") else: print(f"Error: {code}, {data}") def do_send_points(self, arg): """Send points (after measurement).""" if not self.connected: print("Not connected.") return try: packed = msgpack.packb([self.magic, 0xBB]) self.sock.sendall(packed) # Receive data packets unpacker = msgpack.Unpacker() data_collected = {} while True: chunk = self.sock.recv(1024) if not chunk: break unpacker.feed(chunk) for obj in unpacker: if isinstance(obj, list) and len(obj) >= 2: magic, resp_code = obj[0], obj[1] if resp_code == 0xCA: i, j, packet = obj[2], obj[3], obj[4] key = (i, j) if key not in data_collected: data_collected[key] = [] data_collected[key].extend(packet) # Assuming packet is list of int16 elif resp_code == 0xCE: # Assemble data for key, full_data in data_collected.items(): arr = np.array(full_data, dtype=np.int16) print(f"Data for measurement {key[0]}, channel {key[1]}: shape {arr.shape}, mean {arr.mean():.2f}") # Optionally save to file # np.save(f"data_{key[0]}_{key[1]}.npy", arr) # Send continue packed = msgpack.packb([self.magic, 0x3E]) self.sock.sendall(packed) print("Data received and acknowledged.") return else: print(f"Unexpected response: {resp_code}") return except Exception as e: print(f"Send points failed: {e}") def do_quit(self, arg): """Quit the client.""" self.disconnect() return True if __name__ == '__main__': if len(sys.argv) > 1: host = sys.argv[1] port = int(sys.argv[2]) if len(sys.argv) > 2 else 12345 client = PicoClient(host, port) else: client = PicoClient() client.cmdloop()