From ad20fe5469656e2d9caa21117358ae20c19c5b3b Mon Sep 17 00:00:00 2001 From: Johannes Wendel Date: Fri, 9 Jun 2023 23:14:59 +0200 Subject: [PATCH] Implement replayer --- requirements.txt | 1 + write.py | 62 +++++++++++++++++++++++++----------------------- 2 files changed, 33 insertions(+), 30 deletions(-) create mode 100644 requirements.txt diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..d4d1f9b --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +pyserial \ No newline at end of file diff --git a/write.py b/write.py index 45078c8..e5bd7c0 100644 --- a/write.py +++ b/write.py @@ -1,41 +1,43 @@ -import serial -from time import sleep -from dataclasses import dataclass -# Set up the serial port -ser = serial.Serial('/dev/ttyUSB0', 115200) # Replace with the appropriate port name and baud rate +import datetime +from pathlib import Path -# Define a list of hexadecimal numbers to send -error = bytes.fromhex('ff e3') +import serial + + +ser = serial.Serial('/dev/ttyUSB0', 187500, parity=serial.PARITY_EVEN) + +input_file_path = Path("recordings/b_2_2_tx.txt") numbers = [i for i in range(255)] -@dataclass -class Result: - sent: bytes = b'' - result: bytes = b'' -results = [] -for num in numbers: - byt = num.to_bytes(num, "big") - print("Writing: " + hex(num)) - ser.write(byt) - sleep(0.01) +def replay(): + with open(input_file_path) as input_file: + lines = input_file.readlines() - ser.write(bytes.fromhex('f9')) + begin = datetime.datetime.now() + count = 1 + for line in lines: + if "bit" in line: + continue + if "error" in line: + continue - data = ser.read(ser.in_waiting) + int_to_send = int(line.split("TX: ")[1]) + byte_to_send = int_to_send.to_bytes(int_to_send, "big") + ser.write(byte_to_send) - print(data) - print(error) - if data not in error and data != b'': - result = Result(sent=num, result=hex(int.from_bytes(data, "big"))) - results.append(result) - sleep(0.1) + ser.read(1) + + # wait for 2 acks + if (count % 3) == 0: + ser.read(1) + + count += 1 + + end = datetime.datetime.now() + print(end - begin) +replay() - -for res in results: - print(res) - -# Close the serial port ser.close()