beerlog-srv.py 3.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. #!/usr/bin/env python
  2. import asyncio
  3. import websockets
  4. import json
  5. import sys
  6. import os
  7. from storage import Storage
  8. CONNECTIONS = set()
  9. STORAGES = { "users": Storage("users") }
  10. class UserInfoProtocol(websockets.BasicAuthWebSocketServerProtocol):
  11. async def check_credentials(self, username, password):
  12. all_users = storage("users").read()
  13. self.user = all_users[username] if username in all_users else None
  14. return self.user != None
  15. def storage(name):
  16. if not name in STORAGES:
  17. STORAGES[name] = Storage(name)
  18. return STORAGES[name]
  19. def _get_entity(event):
  20. return event.get("entity", None)
  21. def _get_data(event):
  22. return event.get("data", {})
  23. async def add_entity(websocket, event):
  24. name = _get_entity(event)
  25. data = _get_data(event)
  26. if name and data:
  27. storage(name).create(data)
  28. await broadcast(websocket, { "entity": name, "event": "created", "data": data })
  29. async def get_entity(websocket, event):
  30. name = _get_entity(event)
  31. ts = _get_data(event).get("ts", 0)
  32. data = storage(name).read(ts)
  33. await websocket.send(json.dumps({ "entity": name, "event": "received", "data": data }))
  34. async def mod_entity(websocket, event):
  35. name = _get_entity(event)
  36. data = _get_data(event)
  37. if name and data:
  38. storage(name).update(data)
  39. await broadcast(websocket, { "entity": name, "event": "modified", "data": data })
  40. async def del_entity(websocket, event):
  41. name = _get_entity(event)
  42. entity_id = _get_data(event).get("id", None)
  43. if storage(name).delete(entity_id):
  44. await broadcast(websocket, { "entity": name, "event": "deleted", "data": entity_id })
  45. async def describe(websocket, event):
  46. connections = list(CONNECTIONS)
  47. data = { c.user["id"]: connections.count(c) for c in connections }
  48. await websocket.send(json.dumps({ "entity": "connections", "event": "described", "data": data }))
  49. async def broadcast(websocket, event):
  50. websockets.broadcast(CONNECTIONS, json.dumps(event))
  51. COMMANDS = {
  52. "add": add_entity,
  53. "get": get_entity,
  54. "mod": mod_entity,
  55. "del": del_entity,
  56. "describe": describe
  57. }
  58. async def handle(websocket):
  59. try:
  60. CONNECTIONS.add(websocket)
  61. await broadcast(websocket, { "event": "connected", "entity": "users", "data": websocket.user})
  62. async for message in websocket:
  63. event = json.loads(message)
  64. print(event, file=sys.stderr)
  65. handler = COMMANDS.get(event["action"], broadcast)
  66. await handler(websocket, event)
  67. finally:
  68. CONNECTIONS.remove(websocket)
  69. await broadcast(websocket, { "event": "disconnected", "entity": "users", "data": websocket.user})
  70. async def main():
  71. port = os.environ.get("BEERLOG_PORT", 8000)
  72. host = os.environ.get("BEERLOG_HOST", "0.0.0.0")
  73. print(f"Start on {host}:{port}", file=sys.stderr)
  74. async with websockets.serve(handle, host, port, create_protocol=UserInfoProtocol):
  75. await asyncio.Future()
  76. asyncio.run(main())