Source code for net

"""
TCP-Server that listens on a specified port for
TCP-Connections. Every request sent to the server has to be one of the special
commands described later on or a string of 0s and 1s each specifying a dot to
be turned on or off respectively. For instance, to display the letter 'T'
on a 4x3 display of this form

::

    1111
    0110
    0110


the following request must be sent to the server: 111101100110.

A simple command line client like nc can send this request to 'server'
listening on port 10101:

.. code-block:: bash

   $ echo 111101100110 | nc server 10101

If the request contains the string 'SIZE' (ignoring case), the server
will respond with the dimensions of the display (width x height).

Lets start a server. Here we use a thread in order to be able to run the client
afterwards. In practice the server will run on a different platform and can be
started directly.

    >>> import net
    >>> import displayprovider
    >>> import threading
    >>> ds = net.DisplayServer(displayprovider.DisplayBase())
    >>> th = threading.Thread(target=ds.start)
    >>> th.setDaemon(True)
    >>> th.start()
    Starting server for dimension 4 x 3 on 0.0.0.0 at port 10101

Now we can start a client and send some pixels to the server.

    >>> cl = net.RemoteDisplay(host="0.0.0.0")
    Remote display will send data to 0.0.0.0 on port 10101
    >>> cl.px(1, 1, True)
    >>> cl.px(2, 3, True)
    >>> cl.show()
    Listening on 0.0.0.0 at port 10101

The output lines after show() are coming from the server.

"""

import socket
import displayprovider


DEFAULT_PORT = 10101


[docs] class DisplayServer: def __init__(self, display): self.width = display.width self.height = display.height self.display = display self.on_request = lambda: None self.server_running = False
[docs] def start(self, host="0.0.0.0", port=DEFAULT_PORT): print("Starting server for dimension", self.width, "x", self.height, "on", host, "at port", port) addr = (host, port) self.server_running = True with socket.socket() as sock: sock.bind(addr) print("Listening on", host, "at port", port) sock.listen(10) while self.server_running: # waiting for connection remote_sock, _cl = sock.accept() buf = remote_sock.recv(self.width * self.height) self.on_request() #print("received", len(buf), "bytes") try: ans = self.handle_request(buf) remote_sock.send(bytes(ans, "ascii")) remote_sock.close() except Exception as e: print("ERROR", e)
[docs] def handle_request(self, payload): s = str(payload, "ascii") # answer with display dimension if desired if s.lower().startswith("size"): return "SIZE {w}x{h}".format(w=self.width, h=self.height) # draw pixels else: return self._handle_display_update_request(payload)
def _handle_display_update_request(self, payload): for y in range(self.height): for x in range(self.width): index = y * self.width + x if index < len(payload): val = chr(payload[index]) else: val = "0" if val in ("0", "1"): self.display.px(x, y, val == "1") self.display.show() return "OK"
[docs] class RemoteDisplay(displayprovider.DisplayBase): """Remote class to connect with a running display server.""" def __init__(self, host="0.0.0.0", port=DEFAULT_PORT, width=28, height=13): super().__init__(width, height) print("Remote display will send data to", host, "on port", port) self.host = host self.port = port self.buffer = [] for _x in range(width): col = [False] * height self.buffer.append(col)
[docs] def px(self, x, y, val): self.buffer[x][y] = val
[docs] def show(self): with socket.socket() as sock: sock.connect((self.host, self.port)) payload = self._buffer_to_payload() sock.sendall(payload)
def _buffer_to_payload(self): payload = "" for y in range(self.height): for x in range(self.width): payload += "1" if self.buffer[x][y] else "0" return bytes(payload, "utf8")
[docs] def led(self, on_off): pass
# TODO led support for remote display
[docs] def test_networking(): import flipdotsim import threading import time fdd = flipdotsim.FlipDotSim(width=15, height=15) ds = DisplayServer(fdd) th = threading.Thread(target=ds.start, kwargs={'host':'127.0.0.1'}) th.setDaemon(True) th.start() time.sleep(0.2) # wait for server to start remote_display = RemoteDisplay(host="127.0.0.1", width=15, height=15) remote_display.px(1, 1, True) remote_display.px(2, 1, True) remote_display.show() time.sleep(0.5) import demos demo = demos.RotatingPlasmaDemo(remote_display) demo.fps = 30 # reduce fps for networking demo.run(2) ds.server_running = False th.join(2) fdd.close()
#exit()
[docs] def main(): import displayprovider import configuration disp = displayprovider.get_display( width=configuration.WIDTH, height=configuration.HEIGHT, fallback=displayprovider.Fallback.SIMULATOR) ds = DisplayServer(disp) ds.start(host=configuration.display_server["host"], port=configuration.display_server["port"])
if __name__ == "__main__": main()