import asyncio import json import logging import time from pathlib import Path from typing import List import psutil import yaml from fastapi import FastAPI, Request, WebSocket, WebSocketDisconnect, HTTPException from fastapi.responses import HTMLResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from .docker_monitor import DockerMonitor logger = logging.getLogger(__name__) app = FastAPI(title="RedUnits Control Panel") # Setup paths BASE_DIR = Path(__file__).resolve().parent.parent app.mount("/static", StaticFiles(directory=str(BASE_DIR / "static")), name="static") templates = Jinja2Templates(directory=str(BASE_DIR / "templates")) # Initialize Docker monitor docker_monitor = DockerMonitor() # ────────────────────────────────────────────── # Config loader # ────────────────────────────────────────────── def load_services() -> List[dict]: """Load services from config.yaml. Falls back to empty list on error.""" config_path = BASE_DIR / "config.yaml" try: with open(config_path, "r", encoding="utf-8") as f: data = yaml.safe_load(f) return data.get("services", []) except FileNotFoundError: logger.warning(f"config.yaml not found at {config_path}. No services loaded.") return [] except Exception as e: logger.error(f"Error loading config.yaml: {e}") return [] def load_documents() -> List[dict]: """Load documents from config.yaml.""" config_path = BASE_DIR / "config.yaml" try: with open(config_path, "r", encoding="utf-8") as f: data = yaml.safe_load(f) return data.get("documents", []) except Exception: return [] SERVICES = load_services() # ────────────────────────────────────────────── # WebSocket connection manager # ────────────────────────────────────────────── class ConnectionManager: def __init__(self): self.active_connections: List[WebSocket] = [] async def connect(self, websocket: WebSocket): await websocket.accept() self.active_connections.append(websocket) logger.info(f"WS client connected. Total: {len(self.active_connections)}") def disconnect(self, websocket: WebSocket): if websocket in self.active_connections: self.active_connections.remove(websocket) logger.info(f"WS client disconnected. Total: {len(self.active_connections)}") async def broadcast(self, data: dict): if not self.active_connections: return message = json.dumps(data) dead = [] for ws in self.active_connections: try: await ws.send_text(message) except Exception: dead.append(ws) for ws in dead: self.disconnect(ws) manager = ConnectionManager() # ────────────────────────────────────────────── # Background metrics collector # ────────────────────────────────────────────── async def collect_and_broadcast(): """Collect system + service metrics and broadcast to all WS clients.""" try: services_data = [] for service in SERVICES: status = await asyncio.to_thread( docker_monitor.get_container_status, service["container_name"] ) services_data.append({**service, "status": status}) cpu_percent = await asyncio.to_thread(psutil.cpu_percent, 1) memory = psutil.virtual_memory() disk = psutil.disk_usage("/") containers_total = await asyncio.to_thread(docker_monitor.get_containers_count) containers_running = await asyncio.to_thread(docker_monitor.get_running_containers_count) uptime_seconds = time.time() - psutil.boot_time() payload = { "type": "update", "services": services_data, "system": { "cpu": {"percent": round(cpu_percent, 1)}, "memory": { "percent": round(memory.percent, 1), "used_gb": round(memory.used / (1024 ** 3), 2), "total_gb": round(memory.total / (1024 ** 3), 2), }, "disk": { "percent": round(disk.percent, 1), "used_gb": round(disk.used / (1024 ** 3), 2), "total_gb": round(disk.total / (1024 ** 3), 2), }, "containers": { "running": containers_running, "total": containers_total, }, "uptime": { "days": int(uptime_seconds / 86400), "seconds": int(uptime_seconds), }, }, } await manager.broadcast(payload) except Exception as e: logger.error(f"Error in collect_and_broadcast: {e}") async def metrics_loop(): """Background loop: collect and broadcast metrics every 5 seconds.""" while True: await collect_and_broadcast() await asyncio.sleep(5) # ────────────────────────────────────────────── # App lifecycle # ────────────────────────────────────────────── @app.on_event("startup") async def startup_event(): asyncio.create_task(metrics_loop()) logger.info("Metrics background task started") # ────────────────────────────────────────────── # HTTP routes # ────────────────────────────────────────────── @app.get("/", response_class=HTMLResponse) async def root(request: Request): """Render main dashboard page.""" return templates.TemplateResponse("index.html", {"request": request}) @app.get("/api/services") async def get_services(): """Get all services with their current status (one-shot REST fallback).""" services_with_status = [] for service in SERVICES: status = await asyncio.to_thread( docker_monitor.get_container_status, service["container_name"] ) services_with_status.append({**service, "status": status}) return {"services": services_with_status} @app.get("/api/system") async def get_system_stats(): """Get system statistics (one-shot REST fallback).""" cpu_percent = await asyncio.to_thread(psutil.cpu_percent, 1) memory = psutil.virtual_memory() disk = psutil.disk_usage("/") containers_total = await asyncio.to_thread(docker_monitor.get_containers_count) containers_running = await asyncio.to_thread(docker_monitor.get_running_containers_count) uptime_seconds = time.time() - psutil.boot_time() return { "cpu": {"percent": round(cpu_percent, 1)}, "memory": { "percent": round(memory.percent, 1), "used_gb": round(memory.used / (1024 ** 3), 2), "total_gb": round(memory.total / (1024 ** 3), 2), }, "disk": { "percent": round(disk.percent, 1), "used_gb": round(disk.used / (1024 ** 3), 2), "total_gb": round(disk.total / (1024 ** 3), 2), }, "containers": { "running": containers_running, "total": containers_total, }, "uptime": { "days": int(uptime_seconds / 86400), "seconds": int(uptime_seconds), }, } @app.get("/api/health") async def health_check(): """Health check endpoint.""" return {"status": "ok", "message": "RedUnits Control Panel is running"} @app.post("/api/services/{service_id}/restart") async def restart_service(service_id: str): """Restart a service container.""" service = next((s for s in SERVICES if s["id"] == service_id), None) if not service: raise HTTPException(status_code=404, detail=f"Service '{service_id}' not found") result = await asyncio.to_thread( docker_monitor.restart_container, service["container_name"] ) if not result["success"]: raise HTTPException(status_code=500, detail=result["message"]) return result @app.post("/api/services/{service_id}/stop") async def stop_service(service_id: str): """Stop a service container.""" service = next((s for s in SERVICES if s["id"] == service_id), None) if not service: raise HTTPException(status_code=404, detail=f"Service '{service_id}' not found") result = await asyncio.to_thread( docker_monitor.stop_container, service["container_name"] ) if not result["success"]: raise HTTPException(status_code=500, detail=result["message"]) return result @app.get("/api/documents") async def get_documents(): """Get list of available documents.""" return {"documents": load_documents()} @app.get("/api/document/{doc_id}") async def get_document_content(doc_id: str): """Fetch raw document content (acts as a proxy).""" docs = load_documents() doc = next((d for d in docs if d["id"] == doc_id), None) if not doc: raise HTTPException(status_code=404, detail=f"Document '{doc_id}' not found") import requests try: resp = await asyncio.to_thread(requests.get, doc["url"], timeout=10) resp.raise_for_status() return {"content": resp.text} except Exception as e: logger.error(f"Error fetching document '{doc_id}': {e}") raise HTTPException(status_code=500, detail=str(e)) # ────────────────────────────────────────────── # WebSocket endpoint # ────────────────────────────────────────────── @app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): await manager.connect(websocket) # Send initial data immediately on connect await collect_and_broadcast() try: while True: # Keep connection alive; client messages are ignored await websocket.receive_text() except WebSocketDisconnect: manager.disconnect(websocket) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)