| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292 |
- import socket
- import msgpack
- import numpy as np
- import cmd
- import sys
- class PicoClient(cmd.Cmd):
- intro = 'Welcome to Pico ADC client. Type help or ? to list commands.\n'
- prompt = '(pico) '
- def __init__(self, host='localhost', port=5003):
- super().__init__()
- self.host = host
- self.port = port
- self.sock = None
- self.magic = 0xAA
- self.connected = False
- self.pk = msgpack.Packer()
- def connect(self):
- if self.connected:
- print("Already connected.")
- return
- try:
- self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.sock.connect((self.host, self.port))
- self.connected = True
- print(f"Connected to {self.host}:{self.port}")
- except Exception as e:
- print(f"Connection failed: {e}")
- def disconnect(self):
- if self.sock:
- self.sock.close()
- self.sock = None
- self.connected = False
- print("Disconnected.")
- def send_command(self, cmd_code, *args):
- if not self.connected:
- print("Not connected. Use 'connect' first.")
- return None
- try:
- buf = bytearray()
- buf.extend(self.pk.pack(0xAA))
- buf.extend(self.pk.pack(cmd_code))
- for arg in args:
- buf.extend(self.pk.pack(arg))
- self.sock.sendall(buf)
- return self.receive_response()
- except Exception as e:
- print(f"Send failed: {e}")
- return None
- def receive_response(self):
- try:
- unpacker = msgpack.Unpacker()
- while True:
- chunk = self.sock.recv(8192)
- if not chunk:
- break
- unpacker.feed(chunk)
-
- objects = []
- for obj in unpacker:
- objects.append(obj)
- print(f"Code: {objects[1]}")
- return objects[1], objects[2:]
- except Exception as e:
- print(f"Receive failed: {e}")
- return None
- def do_connect(self, arg):
- """Connect to the server."""
- self.connect()
- def do_disconnect(self, arg):
- """Disconnect from the server."""
- self.disconnect()
- def do_open(self, arg):
- """Open the Pico device."""
- resp = self.send_command(0x01)
- if resp:
- code, data = resp
- if code == 0xCC:
- print("Device opened successfully.")
- else:
- print(f"Error: {code}, {data}")
- def do_close(self, arg):
- """Close the Pico device."""
- resp = self.send_command(0x03)
- if resp:
- code, data = resp
- if code == 0xCC:
- print("Device closed successfully.")
- else:
- print(f"Error: {code}, {data}")
- def do_status(self, arg):
- """Get status."""
- resp = self.send_command(0x30)
- if resp:
- code, data = resp
- if code == 0xFF:
- print(f"Error: {code}, {data}")
- else:
- print(f"Status: {code}")
- def do_get_params(self, arg):
- """Get current parameters."""
- resp = self.send_command(0x04)
- if resp:
- code, data = resp
- if code == 0xCD and data:
- params = data
- sample_rate, measurement_times, number_channels, trig_channel, threshold, th_direction, trig_autoTrigger_ms = params
- print(f"Sample rate: {sample_rate}")
- print(f"Measurement times: {measurement_times}")
- print(f"Number channels: {number_channels}")
- print(f"Trig channel: {trig_channel}")
- print(f"Threshold: {threshold}")
- print(f"TH direction: {th_direction}")
- print(f"Trig auto trigger ms: {trig_autoTrigger_ms}")
- else:
- print(f"Error: {code}, {data}")
- def do_set_rate(self, arg):
- """Set sample rate. Usage: set_rate <rate>"""
- try:
- rate = int(arg)
- resp = self.send_command(0x07, rate)
- if resp:
- code, data = resp
- if code == 0xCC:
- print("Sample rate set.")
- else:
- print(f"Error: {code}, {data}")
- except ValueError:
- print("Invalid rate.")
- def do_set_points(self, arg):
- """Set points vector. Usage: set_points <p1> <p2> ..."""
- try:
- points = [int(x) for x in arg.split()]
- resp = self.send_command(0x06, points)
- if resp:
- code, data = resp
- if code == 0xCC:
- print("Points set.")
- else:
- print(f"Error: {code}, {data}")
- except ValueError:
- print("Invalid points.")
- def do_set_times(self, arg):
- """Set measurement times. Usage: set_times <times>"""
- try:
- times = int(arg)
- resp = self.send_command(0x08, times)
- if resp:
- code, data = resp
- if code == 0xCC:
- print("Times set.")
- else:
- print(f"Error: {code}, {data}")
- except ValueError:
- print("Invalid times.")
- def do_channel_configure(self, arg):
- """Configure channels. Usage: channel_configure <num_channels> <range1> <range2> ... <trig_channel>"""
- try:
- parts = arg.split()
- num_channels = int(parts[0])
- ranges = [int(x) for x in parts[1:1+num_channels]]
- trig_channel = int(parts[1+num_channels])
- resp = self.send_command(0x09, num_channels, ranges, trig_channel)
- if resp:
- code, data = resp
- if code == 0xCC:
- print("Channels configured.")
- else:
- print(f"Error: {code}, {data}")
- except (ValueError, IndexError):
- print("Invalid arguments. Usage: channel_configure <num> <ranges...> <trig>")
- def do_trigger_configure(self, arg):
- """Configure trigger. Usage: trigger_configure <th_direction> <threshold> <auto_trigger_ms>"""
- try:
- th_dir, thresh, auto_ms = [int(x) for x in arg.split()]
- resp = self.send_command(0x0A, th_dir, thresh, auto_ms) # Assuming 0x0A
- if resp:
- code, data = resp
- if code == 0xCC:
- print("Trigger configured.")
- else:
- print(f"Error: {code}, {data}")
- except ValueError:
- print("Invalid arguments.")
- def do_set_premeasurement(self, arg):
- """Set premeasurement percentage. Usage: set_premeasurement <percentage>"""
- try:
- perc = int(arg)
- resp = self.send_command(0x0B, perc) # Assuming 0x0B
- if resp:
- code, data = resp
- if code == 0xCC:
- print("Premeasurement set.")
- else:
- print(f"Error: {code}, {data}")
- except ValueError:
- print("Invalid percentage.")
- def do_measurement(self, arg):
- """Start measurement."""
- resp = self.send_command(0x20)
- if resp:
- code, data = resp
- if code == 0xCC:
- print("Measurement started.")
- else:
- print(f"Error: {code}, {data}")
- def do_stop(self, arg):
- """Stop measurement."""
- resp = self.send_command(0x0E)
- if resp:
- code, data = resp
- if code == 0xCC:
- print("Measurement stopped.")
- else:
- print(f"Error: {code}, {data}")
- def do_send_points(self, arg):
- """Send points (after measurement)."""
- if not self.connected:
- print("Not connected.")
- return
- try:
- packed = msgpack.packb([self.magic, 0xBB])
- self.sock.sendall(packed)
- # Receive data packets
- unpacker = msgpack.Unpacker()
- data_collected = {}
- while True:
- chunk = self.sock.recv(1024)
- if not chunk:
- break
- unpacker.feed(chunk)
- for obj in unpacker:
- if isinstance(obj, list) and len(obj) >= 2:
- magic, resp_code = obj[0], obj[1]
- if resp_code == 0xCA:
- i, j, packet = obj[2], obj[3], obj[4]
- key = (i, j)
- if key not in data_collected:
- data_collected[key] = []
- data_collected[key].extend(packet) # Assuming packet is list of int16
- elif resp_code == 0xCE:
- # Assemble data
- for key, full_data in data_collected.items():
- arr = np.array(full_data, dtype=np.int16)
- print(f"Data for measurement {key[0]}, channel {key[1]}: shape {arr.shape}, mean {arr.mean():.2f}")
- # Optionally save to file
- # np.save(f"data_{key[0]}_{key[1]}.npy", arr)
- # Send continue
- packed = msgpack.packb([self.magic, 0x3E])
- self.sock.sendall(packed)
- print("Data received and acknowledged.")
- return
- else:
- print(f"Unexpected response: {resp_code}")
- return
- except Exception as e:
- print(f"Send points failed: {e}")
- def do_quit(self, arg):
- """Quit the client."""
- self.disconnect()
- return True
- if __name__ == '__main__':
- if len(sys.argv) > 1:
- host = sys.argv[1]
- port = int(sys.argv[2]) if len(sys.argv) > 2 else 12345
- client = PicoClient(host, port)
- else:
- client = PicoClient()
- client.cmdloop()
|