import requests_unixsocket as req import socket class Networking: def __init__(self, output_server_address, input_socket_address): self.server_address = "" self.input_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) self.output_session = req.Session() self.server_address = "http+unix://" + output_server_address.replace("/", "%2F") self.input_socket.connect(input_socket_address) print("Connected to " + input_socket_address + " and started server " + self.server_address) def get(self, path): response = self.output_session.get(self.server_address + path) assert response.status_code == 200 return response def getSolenoids(self): return self.get("/solenoids").json() def getSounds(self): return self.get("/sounds").json() def getLamps(self): return self.get("/lamps").json() def getDisplays(self): a = self.get("/displays").json() print a return a def triggerSolenoid(self, solenoid): self.get("/solenoids/" + solenoid + "/trigger") def playSound(self, sound): self.get("/sounds/" + sound + "/play") def activateLamp(self, lamp): self.get("/lamps/" + lamp + "/activate") def deactivateLamp(self, lamp): self.get("/lamps/" + lamp + "/deactivate") def lampStatus(self, lamp): return self.get("/lamps/" + lamp + "/status").json() def writeDisplayScore(self, display, score): self.get("/displays" + display + "/write_score/" + str(score)) def getInputEvent(self): header = list() while b'\x02' not in header: byte = self.input_socket.recv(1) header.append(byte) header = [x.decode('utf8') for x in header[:-1]] return str().join(header)