thiiiiings
This commit is contained in:
57
networking/Networking.py
Normal file
57
networking/Networking.py
Normal file
@@ -0,0 +1,57 @@
|
||||
import requests_unixsocket as req
|
||||
import socket
|
||||
from utils.Singleton import Singleton
|
||||
|
||||
|
||||
class Networking(metaclass=Singleton):
|
||||
def __init__(self, output_server_address, input_socket_address):
|
||||
self.server_address = ""
|
||||
self.input_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||
self.output_session = req.Session()
|
||||
self.server_address = "http+unix://" + output_server_address.replace("/", "%2F")
|
||||
self.input_socket.connect(input_socket_address)
|
||||
print("Connected to " + input_socket_address + " and started server " + self.server_address)
|
||||
|
||||
def get(self, path):
|
||||
response = self.output_session.get(self.server_address + path)
|
||||
assert response.status_code == 200
|
||||
return response
|
||||
|
||||
def getSolenoids(self):
|
||||
return self.get("/solenoids").json()
|
||||
|
||||
def getSounds(self):
|
||||
return self.get("/sounds").json()
|
||||
|
||||
def getLamps(self):
|
||||
return self.get("/lamps").json()
|
||||
|
||||
def getDisplays(self):
|
||||
return self.get("/displays").json()
|
||||
|
||||
def triggerSolenoid(self, solenoid):
|
||||
self.get("/solenoids/" + solenoid + "/trigger")
|
||||
|
||||
def playSound(self, sound):
|
||||
self.get("/sounds/" + sound + "/play")
|
||||
|
||||
def activateLamp(self, lamp):
|
||||
self.get("/lamps/" + lamp + "/activate")
|
||||
|
||||
def deactivateLamp(self, lamp):
|
||||
self.get("/lamps/" + lamp + "/deactivate")
|
||||
|
||||
def lampStatus(self, lamp):
|
||||
return self.get("/lamps/" + lamp + "/status").json()
|
||||
|
||||
def writeDisplayScore(self, display, score):
|
||||
self.get("/displays/" + display + "/write_score/" + str(score))
|
||||
|
||||
async def getInputEvent(self):
|
||||
header = list()
|
||||
while b'\x02' not in header:
|
||||
byte = self.input_socket.recv(1)
|
||||
header.append(byte)
|
||||
header = [x.decode('utf8') for x in header[:-1]]
|
||||
|
||||
return str().join(header)
|
||||
BIN
networking/__pycache__/Networking.cpython-38.pyc
Normal file
BIN
networking/__pycache__/Networking.cpython-38.pyc
Normal file
Binary file not shown.
Reference in New Issue
Block a user