| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253 |
- # This script is a client for controlling a TRX device over TCP/IP.
- # It allows the user to set various parameters such as RX frequency, TX frequency,
- # RX rate, TX level, and more. The script uses a socket connection to communicate with the TRX device.
- import socket
- import sys
- import time
- import struct
- import numpy as np
- default_port = 1001
- default_host = '192.168.1.100'
- class TrxClient:
- def __init__(self, host='localhost', port=1001):
- self.host = host
- self.port = port
- self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.size = 0
- self.sock.settimeout(5)
- self.connect()
- self.tx_rate = 2
- def connect(self):
- try:
- self.sock.connect((self.host, self.port))
- print(f"Connected to {self.host}:{self.port}")
- except socket.error as e:
- print(f"Connection error: {e}")
- sys.exit(1)
- def set_rx_freq(self, freq):
- try:
- data = struct.pack('<Q', 0 << 60 | np.uint64(1e6 * freq))
- self.sock.send(data)
- #rdata = self.sock.recv(8)
- print(f"Set RX frequency to {freq} MHz")
- # no response expected for RX frequency
- except socket.error as e:
- print(f"Socket error: {e}")
- def set_tx_freq(self, freq):
- try:
- data = struct.pack('<Q', 1 << 60 | np.uint64(1e6 * freq))
- self.sock.send(data)
- #rdata = self.sock.recv(8)
- print(f"Set TX frequency to {freq} MHz")
- # no response expected for TX frequency
- except socket.error as e:
- print(f"Socket error: {e}")
- def set_rx_rate(self, rate):
- try:
- data = struct.pack('<Q', 2 << 60 | np.uint64(round(122.88e6 / rate / 2)))
- self.sock.send(data)
- #rdata = self.sock.recv(8)
- print(f"Set RX rate to {rate} kSPS")
- # no response expected for RX rate
- except socket.error as e:
- print(f"Socket error: {e}")
- def set_tx_rate(self, rate):
- self.tx_rate = rate
- def set_tx_level(self, level):
- try:
- data = struct.pack('<Q', 3 << 60 | np.uint64(level))
- self.sock.send(data)
- #rdata = self.sock.recv(8)
- print(f"Set TX level to {level}")
- # no response expected for TX level
- except socket.error as e:
- print(f"Socket error: {e}")
- def set_pin(self, pin):
- try:
- data = struct.pack('<Q', 4 << 60 | np.uint64(pin))
- self.sock.send(data)
- #rdata = self.sock.recv(8)
- print(f"Set pin to {pin}")
- # no response expected for pin
- except socket.error as e:
- print(f"Socket error: {e}")
- def clear_pin(self, pin):
- try:
- data = struct.pack('<Q', 5 << 60 | np.uint64(pin))
- self.sock.send(data)
- data = self.sock.recv(8)
- print(f"Cleared pin {pin}")
- # no response expected for clear pin
- except socket.error as e:
- print(f"Socket error: {e}")
- def set_dac(self, dac):
- try:
- data = struct.pack('<Q', 6 << 60 | np.uint64(dac))
- self.sock.send(data)
- #rdata = self.sock.recv(8)
- print(f"Set DAC to {dac}")
- # no response expected for DAC
- except socket.error as e:
- print(f"Socket error: {e}")
- def clear_pulses(self):
- try:
- data = struct.pack('<Q', 7 << 60)
- self.sock.send(data)
- #rdata = self.sock.recv(8)
- self.size = 0
- print("Cleared pulses")
- # no response expected for clear pulses
- except socket.error as e:
- print(f"Socket error: {e}")
- def add_pulse(self, level, phase, width):
- try:
- self.size += 1
- data = struct.pack('<Q', 8 << 60 | np.uint64(width-1))
- self.sock.send(data)
- #rdata = self.sock.recv(8)
- print(f"Set width {width}")
- phase = np.int64(np.floor(phase / 360.0 * (1 << 30) + 0.5))
- data = struct.pack('<Q', 9 << 60 | np.uint64((phase) | (level << 32) | (1 << 48)))
- self.sock.send(data)
- ##rdata = self.sock.recv(8)
- print(f"Set phase {phase} and level {level}")
- # no response expected for add pulse
- except socket.error as e:
- print(f"Socket error: {e}")
- def add_pulses(self, level, phase, width, num):
- for i in range(num):
- self.add_pulse(level, phase, width)
- def add_delay(self, gate, width):
- try:
- self.size += 1
- data = struct.pack('<Q', 8 << 60 | np.uint64(width-1))
- self.sock.send(data)
- #rdata = self.sock.recv(8)
- print(f"Set delay width {width}")
- data = struct.pack('<Q', 9 << 60 | np.uint64(gate << 48))
- self.sock.send(data)
- ##rdata = self.sock.recv(8)
- print(f"Set delay gate {gate}")
- # no response expected for add delay
- except socket.error as e:
- print(f"Socket error: {e}")
- def load_file(self, filename):
- with open(filename, 'rb') as file:
- self.size += 1
- sin_amp = file.read(1)
- #print(sin_amp) # dbg
- sin_data = np.floor(0.5 + 32767 * np.int64(np.frombuffer(sin_amp, dtype=np.int8)[0]) / 128)
- #print(sin_data) #dbg
- #input() #dbg
- quad_amp = file.read(1)
- quad_data = np.floor(0.5 + 32767 * np.int64(np.frombuffer(quad_amp, dtype=np.int8)[0]) / 128)
- level = int(np.floor(0.5 + np.sqrt(quad_data**2 / 2 + sin_data**2 / 2)))
- phase_enter = np.floor(0.5 + np.atan2(quad_data, sin_data) * 180 / np.pi)
- phase = 0.0
- print(level) #dbg
- print(phase_enter) #dbg
- #print(np.floor(124.88 / self.tx_rate + 0.5))
- #input() #dbg
- self.add_pulse(level, phase, 125)
- # repeat this while data is not empty
- while quad_amp:
- self.size += 1
- sin_amp = file.read(1)
- if not sin_amp:
- break
- sin_data = np.floor(0.5 + 32767 * np.int64(np.frombuffer(sin_amp, dtype=np.int8)[0]) / 128)
- quad_amp = file.read(1)
- if not quad_amp:
- break
- quad_data = 0.0
- level = int(np.floor(0.5 + np.sqrt(quad_data**2 / 2 + sin_data**2 / 2)))
- phase_enter = 0.0
- phase = 0.0
- with open('log.txt', 'a') as f:
- f.write(f'lev: {level}\n')
- f.write(f'phs: {phase_enter}\n\n')
- #print(np.floor(124.88 / self.tx_rate + 0.5))
- self.add_pulse(level, phase, 125)
- print(f"Size: {self.size}")
- def start_sequence(self):
- try:
- size = self.size
- data = struct.pack('<Q', 10 << 60 | np.uint64(size))
- self.sock.send(data)
- print("Started sequence")
- n = 2048
- counter = 0
- while counter < size:
- if n > (size - counter):
- n = size - counter
- rcv_data = self.sock.recv(4096)
- counter += n
- if not rcv_data:
- print("No data received")
- break
- except socket.error as e:
- print(f"Socket error: {e}")
- def close(self):
- try:
- self.sock.close()
- print("Connection closed")
- except socket.error as e:
- print(f"Socket error: {e}")
- client = TrxClient(default_host, default_port)
- end_of_cmd = False
- while not end_of_cmd:
- cmd = str(input('Enter command: '))
- if cmd == 'set_rx_freq':
- freq = float(input('Enter RX frequency (MHz): '))
- client.set_rx_freq(freq)
- elif cmd == 'set_tx_freq':
- freq = float(input('Enter TX frequency (MHz): '))
- client.set_tx_freq(freq)
- elif cmd == 'set_rx_rate':
- rate = int(input('Enter RX rate (kSPS): '))
- client.set_rx_rate(rate)
- elif cmd == 'set_tx_level':
- level = int(input('Enter TX level: '))
- client.set_tx_level(level)
- elif cmd == 'set_pin':
- pin = int(input('Enter pin number: '))
- client.set_pin(pin)
- elif cmd == 'clear_pin':
- pin = int(input('Enter pin number: '))
- client.clear_pin(pin)
- elif cmd == 'set_dac':
- dac = int(input('Enter DAC value: '))
- client.set_dac(dac)
- elif cmd == 'clear_pulses':
- client.clear_pulses()
- elif cmd == 'add_pulse':
- level = int(input('Enter pulse level: '))
- phase = float(input('Enter pulse phase (degrees): '))
- width = int(input('Enter pulse width (ticks): '))
- client.add_pulse(level, phase, width)
- elif cmd == 'add_pulses':
- level = int(input('Enter pulse level: '))
- phase = float(input('Enter pulse phase (degrees): '))
- width = int(input('Enter pulse width (ticks): '))
- num = int(input('Enter num: '))
- client.add_pulses(level, phase, width, num)
- elif cmd == 'add_delay':
- gate = int(input('Enter delay gate: '))
- width = int(input('Enter delay width (ticks): '))
- client.add_delay(gate, width)
- elif cmd == 'load_file':
- filename = str(input('Enter filename: '))
- client.load_file(filename)
- elif cmd == 'start_sequence':
- client.start_sequence()
- elif cmd == 'exit':
- client.close()
- end_of_cmd = True
- else:
- print("Invalid command")
|