| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495 |
- #!/usr/bin/env python
- import asyncio
- import websockets
- import json
- import sys
- import os
- from storage import Storage
- CONNECTIONS = set()
- STORAGES = { "users": Storage("users") }
- class UserInfoProtocol(websockets.BasicAuthWebSocketServerProtocol):
- async def check_credentials(self, username, password):
- all_users = storage("users").read()
- self.user = all_users[username] if username in all_users else None
- return self.user != None
- def storage(name):
- if not name in STORAGES:
- STORAGES[name] = Storage(name)
- return STORAGES[name]
- def _get_entity(event):
- return event.get("entity", None)
- def _get_data(event):
- return event.get("data", {})
- async def add_entity(websocket, event):
- name = _get_entity(event)
- data = _get_data(event)
- if name and data:
- storage(name).create(data)
- await broadcast(websocket, { "entity": name, "event": "created", "data": data })
- async def get_entity(websocket, event):
- name = _get_entity(event)
- ts = _get_data(event).get("ts", 0)
- data = storage(name).read(ts)
- await websocket.send(json.dumps({ "entity": name, "event": "received", "data": data }))
- async def mod_entity(websocket, event):
- name = _get_entity(event)
- data = _get_data(event)
- if name and data:
- storage(name).update(data)
- await broadcast(websocket, { "entity": name, "event": "modified", "data": data })
- async def del_entity(websocket, event):
- name = _get_entity(event)
- entity_id = _get_data(event).get("id", None)
- if storage(name).delete(entity_id):
- await broadcast(websocket, { "entity": name, "event": "deleted", "data": entity_id })
- async def describe(websocket, event):
- connections = list(CONNECTIONS)
- data = { c.user["id"]: connections.count(c) for c in connections }
- await websocket.send(json.dumps({ "entity": "connections", "event": "described", "data": data }))
- async def broadcast(websocket, event):
- websockets.broadcast(CONNECTIONS, json.dumps(event))
- COMMANDS = {
- "add": add_entity,
- "get": get_entity,
- "mod": mod_entity,
- "del": del_entity,
- "describe": describe
- }
- async def handle(websocket):
- try:
- CONNECTIONS.add(websocket)
- await broadcast(websocket, { "event": "connected", "entity": "users", "data": websocket.user})
- async for message in websocket:
- event = json.loads(message)
- print(event, file=sys.stderr)
- handler = COMMANDS.get(event["action"], broadcast)
- await handler(websocket, event)
- finally:
- CONNECTIONS.remove(websocket)
- await broadcast(websocket, { "event": "disconnected", "entity": "users", "data": websocket.user})
- async def main():
- port = os.environ.get("BEERLOG_PORT", 8000)
- host = os.environ.get("BEERLOG_HOST", "0.0.0.0")
- print(f"Start on {host}:{port}", file=sys.stderr)
- async with websockets.serve(handle, host, port, create_protocol=UserInfoProtocol):
- await asyncio.Future()
- asyncio.run(main())
|