import socket import sys import logging from time import sleep class TcpServer: def __init__(self, port, client_handler): self._port = port self._client_handler = client_handler self._server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._server_socket.setsockopt( socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server_address = ('0.0.0.0', port) self._server_socket.bind(server_address) self._server_socket.listen(1) def handle_connection(self): try: connection, client_address = self._server_socket.accept() logging.info(f"Client connected: {client_address}") self._client_handler(connection) except KeyboardInterrupt: sys.exit() def handle_client(connection): connection.settimeout(3.0) while True: connection.sendall( b"""{ "jsonrpc": "2.0", "id": "1", "method": "SetLedPattern", "params": { "led": 19, "pattern": 3.1 } } """) try: res = connection.recv(1024) if not res: logging.info("Client disconnected") connection.close() break logging.info(f"SetLedPattern response: {res.decode()}") except socket.timeout: logging.error("Connection timed out") break sleep(1) def main(): logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', datefmt='%H:%M:%S', level=logging.DEBUG, stream=sys.stdout) port = 65432 server = TcpServer(port=port, client_handler=handle_client) logging.info(f"Started TCP server on port {port}") while True: try: server.handle_connection() except Exception as e: logging.error( f"An error occurred while handling the client connection: {e}") if __name__ == "__main__": main()