Source code for net



import socket
import displayprovider


DEFAULT_PORT = 10101


[docs] class DisplayServer: """ TCP-Server that listens on a specified port for TCP-Connections. Every request sent to the server has to be one of the special commands described later on or a string of 0s and 1s each specifying a dot to be turned on or off respectively. For instance, to display the letter 'T' on a 4x3 display of this form :: 1111 0110 0110 the following request must be sent to the server: 111101100110. A simple command line client like nc can send this request to 'server' listening on port 10101: .. code-block:: bash $ echo 111101100110 | nc server 10101 If the request contains the string 'SIZE' (ignoring case), the server will respond with the dimensions of the display (width x height). Lets start a server. Here we use a thread in order to be able to run the client afterwards. In practice the server will run on a different platform and can be started directly. >>> import net >>> import displayprovider >>> import threading >>> ds = net.DisplayServer(displayprovider.DisplayBase()) >>> th = threading.Thread(target=ds.start) >>> th.setDaemon(True) >>> th.start() Starting server for dimension 4 x 3 on 0.0.0.0 at port 10101 Now we can start a client and send some pixels to the server. >>> cl = net.RemoteDisplay(host="0.0.0.0") Remote display will send data to 0.0.0.0 on port 10101 >>> cl.px(1, 1, True) >>> cl.px(2, 3, True) >>> cl.show() Listening on 0.0.0.0 at port 10101 The output lines after show() are coming from the server. """ def __init__(self, display): self.width = display.width self.height = display.height self.display = display self.on_request = lambda: None self.server_running = False
[docs] def start(self, host="0.0.0.0", port=DEFAULT_PORT): print("Starting display server for dimension", self.width, "x", self.height, "on", host, "at port", port) addr = (host, port) self.server_running = True with socket.socket() as sock: sock.bind(addr) print("Listening on", host, "at port", port) sock.listen(10) while self.server_running: # waiting for connection remote_sock, _cl = sock.accept() buf = remote_sock.recv(self.width * self.height) self.on_request() #print("received", len(buf), "bytes") try: ans = self.handle_request(buf) remote_sock.send(bytes(ans, "ascii")) remote_sock.close() except Exception as e: print("ERROR", e)
[docs] def handle_request(self, payload): s = str(payload, "ascii") # answer with display dimension if desired if s.lower().startswith("size"): return "SIZE {w}x{h}".format(w=self.width, h=self.height) # draw pixels else: return self._handle_display_update_request(payload)
def _handle_display_update_request(self, payload): for y in range(self.height): for x in range(self.width): index = y * self.width + x if index < len(payload): val = chr(payload[index]) else: val = "0" if val in ("0", "1"): self.display.px(x, y, val == "1") self.display.show() return "OK"
[docs] class RemoteDisplay(displayprovider.DisplayBase): "Remote class to connect with a running DisplayServer instance." def __init__(self, host="0.0.0.0", port=DEFAULT_PORT, width=28, height=13): super().__init__(width, height) print("Remote display will send data to", host, "on port", port) self.host = host self.port = port self.buffer = [] for _x in range(width): col = [False] * height self.buffer.append(col)
[docs] def px(self, x, y, val): self.buffer[x][y] = val
[docs] def show(self): with socket.socket() as sock: sock.connect((self.host, self.port)) payload = self._buffer_to_payload() sock.sendall(payload)
def _buffer_to_payload(self): payload = "" for y in range(self.height): for x in range(self.width): payload += "1" if self.buffer[x][y] else "0" return bytes(payload, "utf8")
[docs] def led(self, on_off): pass
# TODO led support for remote display
[docs] class PixelflutServer(displayprovider.DisplayBase): """ PixeflutServer that conforms to the Pixelflut protocol outlined at https://c3pixelflut.de/how.html PX <x> <y> <rrggbb|aarrggbb> # set pixel at x,y with color rrggbb PX <x> <y> # get pixel color info for pixel at x,y SIZE # get size of the display OFFSET <x> <y> # set offset for following commands """ def __init__(self, display): self.width = display.width self.height = display.height self.display = display self.offset_x = 0 self.offset_y = 0 self.server_running = False
[docs] def start(self, host="0.0.0.0", port=DEFAULT_PORT): print("Starting Pixelflut server for dimension", self.width, "x", self.height, "on", host, "at port", port) addr = (host, port) self.server_running = True with socket.socket() as sock: sock.bind(addr) print("Listening on", host, "at port", port) sock.listen(10) while self.server_running: remote_sock, _ = sock.accept() with remote_sock: while True: buf = remote_sock.recv(1024) if not buf: break try: ans = self.handle_request(buf) if ans: remote_sock.send(bytes(ans, "ascii")) except Exception as e: print("ERROR", e)
[docs] def handle_request(self, payload): """ Handle incoming requests and execute commands based on the payload. The payload is expected to be a string of ASCII-encoded commands separated by newlines. Each command is processed and appropriate actions are taken. Commands: - PX x y color: Set the pixel at (x, y) to the specified color. - PX x y: Get the color of the pixel at (x, y). - SIZE: Get the dimensions of the grid. - OFFSET x y: Set the offset for the grid coordinates. Args: payload (bytes): The ASCII-encoded command string. Returns: str: The responses to the commands, joined by newlines. """ commands = str(payload, "ascii").strip().split('\n') responses = [] for command in commands: parts = command.strip().split() if not parts: continue cmd = parts[0].upper() if cmd == "PX": if len(parts) == 4: # PX x y color x = int(parts[1]) + self.offset_x y = int(parts[2]) + self.offset_y color = parts[3] self.set_pixel(x, y, color) elif len(parts) == 3: # PX x y x = int(parts[1]) + self.offset_x y = int(parts[2]) + self.offset_y responses.append(self.get_pixel(x, y)) elif cmd == "SIZE": responses.append(f"{self.width} {self.height}") elif cmd == "OFFSET": # OFFSET x y if len(parts) == 3: self.offset_x = int(parts[1]) self.offset_y = int(parts[2]) return "\n".join(responses)
[docs] def set_pixel(self, x, y, color): """ Set the color of a specific pixel on the display. Parameters: x (int): The x-coordinate of the pixel. y (int): The y-coordinate of the pixel. color (str): The color to set the pixel to, represented as a hex string. If the color is "000000", the pixel will be turned off. Otherwise it will be turned on. """ if 0 <= x < self.width and 0 <= y < self.height: self.display.px(x, y, color != "000000") self.display.show()
[docs] def get_pixel(self, x, y): """ Get the color of a specific pixel on the display. Will return the color of the pixel at the specified coordinates as a hex string. """ if 0 <= x < self.width and 0 <= y < self.height: return "FFFFFF" if self.display.buffer[y][x] else "000000" return "000000"
def test_networking(): import flipdotsim import threading import time TEST_PORT = 1212 fdd = flipdotsim.FlipDotSim(width=15, height=15) ds = DisplayServer(fdd) th = threading.Thread(target=ds.start, kwargs={'host':'127.0.0.1', 'port':TEST_PORT}) th.daemon = True th.start() time.sleep(0.2) # wait for server to start remote_display = RemoteDisplay(host="127.0.0.1", port=TEST_PORT, width=15, height=15) remote_display.px(1, 1, True) remote_display.px(2, 1, True) remote_display.show() time.sleep(0.5) import demos demo = demos.RotatingPlasmaDemo(remote_display) demo.fps = 30 # reduce fps for networking demo.run(2) ds.server_running = False th.join(2) fdd.close() #exit() def test_networking_pixelflut(): import flipdotsim import threading import time TEST_PORT = 1234 fdd = flipdotsim.FlipDotSim(width=15, height=15) ps = PixelflutServer(fdd) th = threading.Thread(target=ps.start, kwargs={'host':'127.0.0.1', 'port':TEST_PORT}) th.daemon = True th.start() time.sleep(0.2) # wait for server to start remote_display = RemoteDisplay(host="127.0.0.1", port=TEST_PORT, width=15, height=15) remote_display.px(1, 1, True) remote_display.px(2, 1, True) remote_display.show() time.sleep(0.5) import demos demo = demos.RotatingPlasmaDemo(remote_display) demo.fps = 30 # reduce fps for networking demo.run(2) ps.server_running = False th.join(2) fdd.close() def main(): import displayprovider import configuration disp = displayprovider.get_display( width=configuration.WIDTH, height=configuration.HEIGHT, fallback=displayprovider.Fallback.SIMULATOR) #ds = DisplayServer(disp) #ds.start(host=configuration.display_server["host"], # port=configuration.display_server["port"]) server = PixelflutServer(disp) server.start(host=configuration.display_server["host"], port=configuration.display_server["port"]) if __name__ == "__main__": main()