Files
flippr-game/networking/Networking.py
Jonas Zeunert ea01669563 add retry
2022-05-06 23:09:11 +02:00

71 lines
2.2 KiB
Python

import requests_unixsocket as req
import socket
import json
class Networking:
def __init__(self, output_server_address, input_socket_address):
self.server_address = ""
self.input_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.output_session = req.Session()
adapter = req.adapters.UnixAdapter(pool_connections=1)
self.output_session.mount('http+unix://', adapter)
self.server_address = "http+unix://" + output_server_address.replace("/", "%2F")
self.input_socket.connect(input_socket_address)
print("Connected to " + input_socket_address + " and started server " + self.server_address)
def get(self, path):
response = self.output_session.get(self.server_address + path)
assert response.status_code == 200
return response.content
def getSolenoids(self):
return json.loads(self.get("/solenoids"))
def getSounds(self):
return json.loads(self.get("/sounds"))
def getLamps(self):
return json.loads(self.get("/lamps"))
def getDisplays(self):
return json.loads(self.get("/displays"))
def triggerSolenoid(self, solenoid):
self.get("/solenoids/" + solenoid + "/trigger")
def playSound(self, sound):
self.get("/sounds/" + sound + "/play")
def activateLamp(self, lamp):
self.get("/lamps/" + lamp.name + "/activate")
def deactivateLamp(self, lamp):
self.get("/lamps/" + lamp.name + "/deactivate")
def activateFlipper(self, name):
self.get("/flippers/" + name + "/activate")
def deactivateFlipper(self, name):
self.get("/flippers/" + name + "/deactivate")
def lampStatus(self, lamp):
return json.loads(self.get("/lamps/" + lamp + "/status"))
def writeDisplayScore(self, display, score):
self.get("/displays/" + display + "/write_score/" + str(score))
async def getInputEvent(self):
header = list()
while b'\x02' not in header:
byte = self.input_socket.recv(1)
header.append(byte)
header = [x.decode('utf8') for x in header[:-1]]
headerAsString = str().join(header)
eventAsJson = json.loads(headerAsString)
return eventAsJson