#!/usr/bin/env python import asyncio import websockets import json import sys import os from storage import Storage CONNECTIONS = set() STORAGES = { "users": Storage("users") } class UserInfoProtocol(websockets.BasicAuthWebSocketServerProtocol): async def check_credentials(self, username, password): all_users = storage("users").read() self.user = all_users[username] if username in all_users else None return self.user != None def storage(name): if not name in STORAGES: STORAGES[name] = Storage(name) return STORAGES[name] def _get_entity(event): return event.get("entity", None) def _get_data(event): return event.get("data", {}) async def add_entity(websocket, event): name = _get_entity(event) data = _get_data(event) if name and data: storage(name).create(data) await broadcast(websocket, { "entity": name, "event": "created", "data": data }) async def get_entity(websocket, event): name = _get_entity(event) ts = _get_data(event).get("ts", 0) data = storage(name).read(ts) await websocket.send(json.dumps({ "entity": name, "event": "received", "data": data })) async def mod_entity(websocket, event): name = _get_entity(event) data = _get_data(event) if name and data: storage(name).update(data) await broadcast(websocket, { "entity": name, "event": "modified", "data": data }) async def del_entity(websocket, event): name = _get_entity(event) entity_id = _get_data(event).get("id", None) if storage(name).delete(entity_id): await broadcast(websocket, { "entity": name, "event": "deleted", "data": entity_id }) async def describe(websocket, event): connections = list(CONNECTIONS) data = { c.user["id"]: connections.count(c) for c in connections } await websocket.send(json.dumps({ "entity": "connections", "event": "described", "data": data })) async def broadcast(websocket, event): websockets.broadcast(CONNECTIONS, json.dumps(event)) COMMANDS = { "add": add_entity, "get": get_entity, "mod": mod_entity, "del": del_entity, "describe": describe } async def handle(websocket): try: CONNECTIONS.add(websocket) await broadcast(websocket, { "event": "connected", "entity": "users", "data": websocket.user}) async for message in websocket: event = json.loads(message) print(event, file=sys.stderr) handler = COMMANDS.get(event["action"], broadcast) await handler(websocket, event) finally: CONNECTIONS.remove(websocket) await broadcast(websocket, { "event": "disconnected", "entity": "users", "data": websocket.user}) async def main(): port = os.environ.get("BEERLOG_PORT", 8000) host = os.environ.get("BEERLOG_HOST", "0.0.0.0") print(f"Start on {host}:{port}", file=sys.stderr) async with websockets.serve(handle, host, port, create_protocol=UserInfoProtocol): await asyncio.Future() asyncio.run(main())