import numpy as np import scipy.interpolate as interp import msgpack import json import socket class ServerSimulator: def __init__(self): self.events = [] self.rfwaves = [] self.time = 0 #Socket parameters self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.conn = 0 self.magic = 0xAA self.cmd = 0x00 self.data = {} self.simview = {} #Simulation parameters self.freq = 3e6 self.level = 20000 #32bit Codes self.errCode = 0x00000000 self.statCode = 0x00000000 # Command callbacks self.cmd_dict = { 0x10: self.addEvent, 0x20: self.getRfwaveTable, 0x30: self.runEventList, 0x00: self.disconnect } def connect(self, host, port): self.sock.bind((host, port)) self.sock.listen(1) self.conn, addr = self.sock.accept() print(f"New connection from {addr}") def disconnect(self): self.conn.close() def getErrorMsg(self, errCode, errString): error = { "magic": self.magic, "cmd": b'\xFF', "code": errCode, "message": errString } return msgpack.packb(error, use_bin_type=True) def getStatusMsg(self, statCode, statString): status = { "magic": self.magic, "cmd": b'\xFE', "code": statCode, "message": statString } return msgpack.packb(status, use_bin_type=True) def acceptCommand(self): rawdata = self.sock.recv(4096) # Check if data is received if not rawdata: return False # Unpack data using msgpack self.data = msgpack.unpackb(rawdata, raw=False) if self.data["magic"] != self.magic: print("Invalid magic byte") return False if self.data["cmd"] not in self.cmd_dict.keys(): print("Invalid command") return False print(f"Received command: {self.data['cmd']}") # Process command self.cmd = self.data["cmd"] self.cmd_dict[self.cmd](self.data) return True def addEvent(self, data): if not all (k in data for k in ("time", "duration", "wave", "time_front_rf_trigger", "time_back_rf", "rf_start", "rf_end", "time_adc_trigger", "time_back_adc")): print("Missing event parameters") self.errCode = 0x00000001 # Missing parameters self.conn.sendall(self.getErrorMsg(self.errCode, "Missing event parameters")) return False event = { "start_time": data["time"], "duration": data["duration"], "wave": data["wave"], "time_front_rf_trigger": data["time_front_rf_trigger"], "time_back_rf_trigger": data["time_back_rf"], "rf_start": data["rf_start"], "rf_end": data["rf_end"], "time_front_adc_trigger": data["time_adc_trigger"], "time_back_adc_trigger": data["time_back_adc"], } if event["start_time"] < self.time: print("Event start time is in the past") self.errCode = 0x00000002 # Event in the past self.conn.sendall(self.getErrorMsg(self.errCode, "Event start time is in the past")) return False self.time = event["start_time"] + event["duration"] if event["time_front_rf_trigger"] < 0 or event["time_back_rf_trigger"] < 0 or event["rf_start"] < 0 or event["rf_end"] < 0: print("RF trigger times and delays must be non-negative") self.errCode = 0x00000003 # Negative times self.conn.sendall(self.getErrorMsg(self.errCode, "RF trigger times and delays must be non-negative")) return False if event["time_front_adc_trigger"] < 0 or event["time_back_adc_trigger"] < 0: print("ADC trigger times must be non-negative") self.errCode = 0x00000004 # Negative times self.conn.sendall(self.getErrorMsg(self.errCode, "ADC trigger times must be non-negative")) return False if event["time_front_rf_trigger"] > event["duration"] or event["time_back_rf_trigger"] > event["duration"]: print("RF trigger times must be within event duration") self.errCode = 0x00000005 # RF trigger times out of bounds self.conn.sendall(self.getErrorMsg(self.errCode, "RF trigger times must be within event duration")) return False if event["rf_start"] > event["duration"] or event["rf_end"] > event["duration"]: print("RF start and end times must be within event duration") self.errCode = 0x00000006 # RF start/end times out of bounds self.conn.sendall(self.getErrorMsg(self.errCode, "RF start and end times must be within event duration")) return False if event["time_front_adc_trigger"] > event["duration"] or event["time_back_adc_trigger"] > event["duration"]: print("ADC trigger times must be within event duration") self.errCode = 0x00000007 # ADC trigger times out of bounds self.conn.sendall(self.getErrorMsg(self.errCode, "ADC trigger times must be within event duration")) return False if event["time_front_rf_trigger"] < event["time_back_rf_trigger"]: print("RF triggers and delays overlap") self.errCode = 0x00000008 # RF triggers overlap self.conn.sendall(self.getErrorMsg(self.errCode, "RF triggers and delays overlap")) return False if event["time_front_adc_trigger"] < event["time_back_adc_trigger"]: print("ADC triggers overlap") self.errCode = 0x00000009 # ADC triggers overlap self.conn.sendall(self.getErrorMsg(self.errCode, "ADC triggers overlap")) return False if event["rf_start"] < event["rf_end"]: print("RF start and end times overlap") self.errCode = 0x0000000A # RF start/end times overlap self.conn.sendall(self.getErrorMsg(self.errCode, "RF start and end times overlap")) return False if event["rf_start"] < event["time_front_rf_trigger"] or event["rf_end"] > event["time_back_rf_trigger"]: print("RF start and end times must be within RF trigger times") self.errCode = 0x0000000B # RF start/end times outside triggers self.conn.sendall(self.getErrorMsg(self.errCode, "RF start and end times must be within RF trigger times")) return False if event["rf_end"] - event["rf_start"] < len(self.rfwaves[event["wave"]]): print("RF pulse duration must be at least as long as the wave length") self.errCode = 0x0000000C # Insufficient RF pulse duration self.conn.sendall(self.getErrorMsg(self.errCode, "RF pulse duration must be at least as long as the wave length")) return False self.events.append(event) print(f"Added event: {event}") return True def getRfwaveTable(self, data): if not all (k in data for k in ("wavetype", "total_length", "total_packets", "packet_index", "packet_length")): print("Missing RF wave parameters") self.errCode = 0x0000000F # Missing parameters self.conn.sendall(self.getErrorMsg(self.errCode, "Missing RF wave parameters")) return False wavetype = data["wavetype"] total_length = data["total_length"] total_packets = data["total_packets"] packet_index = data["packet_index"] packet_length = data["packet_length"] total_wave_q = np.array([], dtype=np.int16) total_wave_i = np.array([], dtype=np.int16) self.conn.sendall(self.getStatusMsg(0x00000001, f"Ready to receive RF wave data packet {packet_index+1}/{total_packets} with length {packet_length}")) for i in range(total_packets): rawdata = self.sock.recv(4096) if not rawdata: print("No data received for RF wave packet") return False packet_data = msgpack.unpackb(rawdata, raw=False) if packet_data["magic"] != self.magic: print("Invalid magic byte in RF wave packet") self.errCode = 0x00000014 # Invalid magic byte self.conn.sendall(self.getErrorMsg(self.errCode, "Invalid magic byte in RF wave packet")) return False if packet_data["cmd"] != 0x2C: print("Invalid command in RF wave packet") self.errCode = 0x00000013 # Invalid command self.conn.sendall(self.getErrorMsg(self.errCode, "Invalid command in RF wave packet")) return False if "wavedata_q" not in packet_data or "wavedata_i" not in packet_data: print("Missing wave data in RF wave packet") self.errCode = 0x00000010 # Missing wave data self.conn.sendall(self.getErrorMsg(self.errCode, "Missing wave data in RF wave packet")) return False wave_q = np.frombuffer(packet_data["wavedata_q"], dtype=np.int16) wave_i = np.frombuffer(packet_data["wavedata_i"], dtype=np.int16) if len(wave_q) != len(wave_i): print("In-phase and quadrature wave data must have the same length in RF wave packet") self.errCode = 0x00000012 # Mismatched wave data lengths self.conn.sendall(self.getErrorMsg(self.errCode, "In-phase and quadrature wave data must have the same length in RF wave packet")) return False total_wave_q = np.concatenate([total_wave_q, wave_q]) total_wave_i = np.concatenate([total_wave_i, wave_i]) print(f"Received RF wave packet {i+1}/{total_packets} with length {len(wave_q)}") if len(total_wave_q) != total_length or len(total_wave_i) != total_length: print("Total wave data length does not match expected length") self.errCode = 0x00000011 # Total length mismatch self.conn.sendall(self.getErrorMsg(self.errCode, "Total wave data length does not match expected length")) return False self.rfwaves.append({data["wavetype"]: (total_wave_q, total_wave_i)}) return True def runEventList(self, data): time_linspace = np.linspace(0, self.time, self.time+1) channel_g = np.ones(self.time, dtype=np.int16) * np.floor(3.3 / 5 * 32767) channel_rf_ttl = np.zeros(self.time, dtype=np.int16) channel_adc_ttl = np.zeros(self.time, dtype=np.int16) channel_rf = np.zeros(self.time, dtype=np.int16) for event in self.events: print(f"Running event: {event}") # Simulate RF pulse rf_wave_q = self.rfwaves[event["wave"]][0] rf_wave_i = self.rfwaves[event["wave"]][1] rf_pulse_length = event["rf_end"] - event["rf_start"] rf_trigger_length = event["time_back_rf_trigger"] - event["time_front_rf_trigger"] adc_trigger_length = event["time_back_adc_trigger"] - event["time_front_adc_trigger"] rf_wave_length = len(rf_wave_q) event_length = event["duration"] rf_time = np.linspace(0, rf_pulse_length, rf_pulse_length+1) rf_trigger_time = np.linspace(0, rf_trigger_length, rf_trigger_length+1) adc_trigger_time = np.linspace(0, adc_trigger_length, adc_trigger_length+1) event_time = np.linspace(0, event_length, event_length+1) if rf_pulse_length < rf_wave_length: print("RF pulse length is shorter than RF wave length, cannot run event") self.errCode = 0x00000031 # Invalid RF pulse length self.conn.sendall(self.getErrorMsg(self.errCode, "RF pulse length is shorter than RF wave length, cannot run event")) continue print("Interpolating RF wave to fit pulse duration...") interp_wave_q = np.zeros(rf_pulse_length, dtype=np.int16) interp_wave_i = np.zeros(rf_pulse_length, dtype=np.int16) if data["interpolation_method"] == "linear": interp_func_q = interp.interp1d(np.linspace(0, rf_wave_length, rf_wave_length), rf_wave_q, kind='linear') interp_func_i = interp.interp1d(np.linspace(0, rf_wave_length, rf_wave_length), rf_wave_i, kind='linear') interp_wave_q = interp_func_q(rf_time).astype(np.int16) interp_wave_i = interp_func_i(rf_time).astype(np.int16) elif data["interpolation_method"] == "nearest": interp_func_q = interp.interp1d(np.linspace(0, rf_wave_length, rf_wave_length), rf_wave_q, kind='nearest') interp_func_i = interp.interp1d(np.linspace(0, rf_wave_length, rf_wave_length), rf_wave_i, kind='nearest') interp_wave_q = interp_func_q(rf_time).astype(np.int16) interp_wave_i = interp_func_i(rf_time).astype(np.int16) elif data["interpolation_method"] == "cubic": interp_func_q = interp.interp1d(np.linspace(0, rf_wave_length, rf_wave_length), rf_wave_q, kind='cubic') interp_func_i = interp.interp1d(np.linspace(0, rf_wave_length, rf_wave_length), rf_wave_i, kind='cubic') interp_wave_q = interp_func_q(rf_time).astype(np.int16) interp_wave_i = interp_func_i(rf_time).astype(np.int16) channel_rf[event["rf_start"]:event["rf_end"]+1] = np.floor((interp_wave_q * np.cos(2 * np.pi * self.freq * rf_time * 8e-9) - interp_wave_i * np.sin(2 * np.pi * self.freq * rf_time * 8e-9)) / np.sqrt(2)).astype(np.int16) channel_rf_ttl[event["time_front_rf_trigger"]:event["time_back_rf_trigger"]+1] = 1 channel_adc_ttl[event["time_front_adc_trigger"]:event["time_back_adc_trigger"]+1] = 1 channel_g[event["start_time"]:(event["start_time"]+event["duration"]+1)] = 0 self.simview = { "time": time_linspace, "channel_g": channel_g, "channel_rf_ttl": channel_rf_ttl, "channel_adc_ttl": channel_adc_ttl, "channel_rf": channel_rf } self.conn.sendall(self.getStatusMsg(0x00000032, "Event list simulation complete")) return True def sendSimViewData(self, data): data_length = len(self.simview["time"]) channels = ["channel_g", "channel_rf_ttl", "channel_adc_ttl", "channel_rf"] packet_size = 1024 total_packets = (data_length * len(channels) * 2) // packet_size + 1 packet_index = 0 self.conn.sendall(self.getStatusMsg(0x00000043, f"Ready to send simulation data in {total_packets} packets with size {packet_size} bytes")) for channel in channels: channel_data = self.simview[channel].tobytes() for i in range(0, len(channel_data), packet_size): packet_num += 1 packet_data = None if i + packet_size > len(channel_data): packet_data = channel_data[i:] else: packet_data = channel_data[i:i+packet_size] packet_msg = { "magic": self.magic, "cmd": 0x33, "channel": channel, "total_packets": total_packets, "packet_index": packet_index, "data": packet_data } self.conn.sendall(msgpack.packb(packet_msg, use_bin_type=True)) rawdata = self.conn.recv(1024) # Wait for ACK ack_data = msgpack.unpackb(rawdata, raw=False) if ack_data["magic"] != self.magic or ack_data["cmd"] != 0x41 or ack_data["code"] != packet_index: print(f"Invalid ACK for packet {packet_index}") self.errCode = 0x00000045 # Invalid ACK self.conn.sendall(self.getErrorMsg(self.errCode, f"Invalid ACK for packet {packet_index}")) return False print("Simulation data sent successfully") self.conn.sendall(self.getStatusMsg(0x00000044, "All simulation data packets sent")) return True