""" KazicCAM - Серверная часть с веб-интерфейсом FastAPI + OpenCV + WebSocket + Multiprocessing Версия: 3.1.1, 35 пересборка Для Windows """ import os import sys import uuid import json import time import asyncio import hashlib import secrets import string from contextlib import asynccontextmanager from datetime import datetime from typing import Dict, List, Optional, Set, Tuple, Any from collections import defaultdict from multiprocessing import Process, Manager, Queue, cpu_count, freeze_support from concurrent.futures import ProcessPoolExecutor from dotenv import load_dotenv import cv2 import numpy as np import uvicorn from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException, Depends, status, Request, Form from fastapi.security import HTTPBasic, HTTPBasicCredentials from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse, RedirectResponse from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from pydantic import BaseModel import psutil load_dotenv() # ========== КОНФИГУРАЦИЯ ========== SERVER_CONFIG = { # считываем переменную окружения HOST (или legacy 'host'), чтобы можно было управлять через .env / docker-compose "host": os.getenv("HOST", os.getenv("host", "0.0.0.0")), # сюда IP смотрящий наружу или 0.0.0.0 для всех интерфейсов "port": int(os.getenv("PORT", 8000)), "ssl_certfile": os.getenv("SSL_CERTFILE", "/etc/videostream/ssl/cert.pem"), "ssl_keyfile": os.getenv("SSL_KEYFILE", "/etc/videostream/ssl/key.pem"), "ssl_enabled": os.getenv("SSL_ENABLED", "false").lower() == "true", "debug": False, "max_clients_per_room": 50, "max_rooms": 100, "video_width": 640, "video_height": 480, "frame_rate": 30, "jpeg_quality": 85, "websocket_ping_interval": 30, "websocket_ping_timeout": 10, "admin_session_timeout": 3600, } # ========== МОДЕЛИ ДАННЫХ ========== class RoomCreate(BaseModel): name: str password: str max_connections: int class RoomUpdate(BaseModel): name: Optional[str] = None password: Optional[str] = None max_connections: Optional[int] = None # ========== ГЛОБАЛЬНЫЕ ПЕРЕМЕННЫЕ ========== rooms = None clients = None admin_sessions = None client_websockets = None admin_websockets = None video_queues = None command_queues = None room_stats = None server_stats = None stats_lock = None cleanup_task = None templates = None processed_video_queues = {} # Администраторы ADMINS = [ ["admin", "admin123"], ["administrator", "securepass"], ["supervisor", "superpass"] ] # ========== ВСПОМОГАТЕЛЬНЫЕ ФУНКЦИИ ========== def safe_json_serializer(obj: Any) -> Any: """Безопасный сериализатор JSON для объектов Python""" if isinstance(obj, set): return list(obj) elif isinstance(obj, datetime): return obj.isoformat() elif hasattr(obj, '__dict__'): return obj.__dict__ return str(obj) def get_server_host(): """Получение IP адреса сервера""" import socket # 0) Если явно задан адрес, который должен отображаться в UI (рекомендуется при запуске в Docker), используем его public_host = os.getenv("PUBLIC_HOST") or os.getenv("ADVERTISED_HOST") if public_host and public_host not in ("", "0.0.0.0", "127.0.0.1", "localhost"): return public_host # 1) Если в конфиге явно указан хост (и это не 0.0.0.0 / localhost), используем его cfg_host = SERVER_CONFIG.get("host") if cfg_host and cfg_host not in ("0.0.0.0", "127.0.0.1", "localhost", ""): return cfg_host # 2) Попытка определить внешний IP без отправки данных — UDP socket к публичному адресу try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Не устанавливаем реального соединения, просто используем маршрутную информацию s.connect(("8.8.8.8", 80)) ip_address = s.getsockname()[0] s.close() if ip_address and not ip_address.startswith("127."): return ip_address except Exception: pass # 3) Попытка через hostname (может вернуть 127.0.1.1 на некоторых системах) try: hostname = socket.gethostname() ip_address = socket.gethostbyname(hostname) if ip_address and not ip_address.startswith("127."): return ip_address except Exception: pass # 4) Фолбэк — вернуть значение из конфига (возможно 0.0.0.0 или что задано в ENV) return cfg_host or "0.0.0.0" # ========== КЛАССЫ ДЛЯ УПРАВЛЕНИЯ ========== class RoomManager: @staticmethod def generate_room_id(length: int = 11) -> str: """Генерация ID комнаты из букв и цифр""" alphabet = string.ascii_letters + string.digits return ''.join(secrets.choice(alphabet) for _ in range(length)) @staticmethod def create_room(name: str, password: str, max_connections: int, created_by: str = None) -> Dict: """Создание новой комнаты""" global rooms, room_stats, server_stats, stats_lock room_id = RoomManager.generate_room_id() room = { "id": room_id, "name": name, "password": password, "max_connections": max_connections, "created_at": datetime.now().isoformat(), "created_by": created_by, "clients": [], # Используем список вместо set "is_active": True } with stats_lock: rooms[room_id] = room room_stats[room_id] = { "total_clients": 0, "active_streams": 0, "bytes_transferred": 0, "last_activity": datetime.now().isoformat() } server_stats["total_rooms"] = len(rooms) print(f"[RoomManager] Created room: {room_id} - {name}") return room @staticmethod def delete_room(room_id: str) -> bool: """Удаление комнаты и отключение всех клиентов""" global rooms, room_stats, server_stats, stats_lock if room_id in rooms: # Отключаем всех клиентов в комнате for client_id in list(rooms[room_id]["clients"]): ClientManager.disconnect_client(client_id) with stats_lock: del rooms[room_id] if room_id in room_stats: del room_stats[room_id] server_stats["total_rooms"] = len(rooms) print(f"[RoomManager] Deleted room: {room_id}") return True return False @staticmethod def update_room(room_id: str, updates: dict) -> Optional[Dict]: """Обновление информации о комнате""" global rooms, stats_lock if room_id in rooms: with stats_lock: for key, value in updates.items(): if value is not None and key in rooms[room_id]: rooms[room_id][key] = value return rooms[room_id] return None class ClientManager: @staticmethod def generate_client_id() -> str: """Генерация уникального ID клиента""" return str(uuid.uuid4()) @staticmethod def add_client(room_id: str, password: str, ip_address: str) -> Optional[str]: """Добавление нового клиента""" global rooms, clients, room_stats, server_stats, stats_lock print(f"[ClientManager] add_client called: room={room_id}, ip={ip_address}") if room_id not in rooms: print(f"[ClientManager] ❌ Room not found: {room_id}") return None room = rooms[room_id] print(f"[ClientManager] Found room: {room['name']}") if room["password"] != password: print(f"[ClientManager] ❌ Invalid password for room: {room_id} (expected: {room['password']}, got: {password})") return None if len(room["clients"]) >= room["max_connections"]: print(f"[ClientManager] ❌ Room {room_id} is full: {len(room['clients'])}/{room['max_connections']}") return None client_id = ClientManager.generate_client_id() print(f"[ClientManager] Generated client_id: {client_id}") client = { "id": client_id, "room_id": room_id, "ip_address": ip_address, "connected_at": datetime.now().isoformat(), "last_activity": datetime.now().isoformat(), "is_streaming": False, "video_settings": { "quality": 85, "frame_rate": 30, "resolution": "640x480" }, "commands": [] } with stats_lock: clients[client_id] = client # Используем список вместо set if client_id not in room["clients"]: room["clients"].append(client_id) # Обновление статистики if room_id in room_stats: room_stats[room_id]["total_clients"] = len(room["clients"]) room_stats[room_id]["last_activity"] = datetime.now().isoformat() server_stats["total_clients"] = len(clients) print(f"[ClientManager] ✓ Added client: {client_id} to room: {room_id} (total in room: {len(room['clients'])})") return client_id @staticmethod def disconnect_client(client_id: str) -> bool: """Отключение клиента""" global rooms, clients, client_websockets, video_queues, command_queues, room_stats, server_stats, stats_lock if client_id in clients: room_id = clients[client_id]["room_id"] with stats_lock: # Удаляем из комнаты if room_id in rooms and client_id in rooms[room_id]["clients"]: rooms[room_id]["clients"].remove(client_id) if room_id in room_stats: room_stats[room_id]["total_clients"] = len(rooms[room_id]["clients"]) # Удаляем клиента del clients[client_id] server_stats["total_clients"] = len(clients) # Закрываем WebSocket соединение if client_id in client_websockets: try: asyncio.create_task(client_websockets[client_id].close()) except: pass del client_websockets[client_id] # Очищаем очереди if client_id in video_queues: del video_queues[client_id] if client_id in command_queues: del command_queues[client_id] print(f"[ClientManager] Disconnected client: {client_id}") return True return False class VideoProcessor: """Обработчик видео в отдельном процессе""" def __init__(self, client_id: str): self.client_id = client_id self.video_queue = Queue(maxsize=10) self.command_queue = Queue() self.process = None self.is_running = False video_queues[client_id] = self.video_queue command_queues[client_id] = self.command_queue def start(self): """Запуск процесса обработки видео""" self.is_running = True self.process = Process( target=self._process_video_stream, args=(self.client_id,) ) self.process.daemon = True self.process.start() print(f"[VideoProcessor] ✓ Started process for client: {self.client_id} (PID: {self.process.pid})") def stop(self): """Остановка процесса""" self.is_running = False if self.process: self.process.terminate() self.process.join(timeout=5) if self.process.is_alive(): self.process.kill() print(f"[VideoProcessor] Stopped for client: {self.client_id}") @staticmethod def _process_video_stream(client_id: str): """Основной цикл обработки видео с пересылкой в общую очередь""" print(f"[VideoProcessor] Process started for client {client_id}") video_queue = None command_queue = None # Ждем инициализации очередей for _ in range(100): try: if client_id in video_queues and client_id in command_queues: video_queue = video_queues[client_id] command_queue = command_queues[client_id] break except: pass time.sleep(0.1) if not video_queue or not command_queue: print(f"[VideoProcessor] Queues not found for client {client_id}") return # Создаем очередь для обработанных кадров (для администраторов) processed_queue = Queue(maxsize=10) # Добавляем эту очередь в глобальный словарь для доступа администраторов processed_video_queues[client_id] = processed_queue frame_count = 0 last_log_time = time.time() while True: try: if not video_queue.empty(): frame_data = video_queue.get(timeout=1) frame_count += 1 nparr = np.frombuffer(frame_data, np.uint8) frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR) if frame is not None: # Применяем команды из очереди while not command_queue.empty(): try: command = command_queue.get_nowait() frame = VideoProcessor._apply_command(frame, command) except: break # Кодируем обратно в JPEG encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 85] success, encoded_frame = cv2.imencode('.jpg', frame, encode_param) if success: # Отправляем в очередь для администраторов if not processed_queue.full(): processed_queue.put(encoded_frame.tobytes()) # Логируем каждые 5 секунд current_time = time.time() if current_time - last_log_time > 5: print(f"[VideoProcessor] Client {client_id}: Processed {frame_count} frames") last_log_time = current_time time.sleep(0.001) except Exception as e: print(f"[VideoProcessor] Error: {e}") time.sleep(0.1) @staticmethod def _apply_command(frame: np.ndarray, command: Dict) -> np.ndarray: """Применение команды к кадру""" try: if not isinstance(command, dict): return frame cmd_type = command.get("type") if cmd_type == "adjust_quality": pass elif cmd_type == "rotate": angle = command.get("angle", 0) if angle == 90: frame = cv2.rotate(frame, cv2.ROTATE_90_CLOCKWISE) elif angle == 180: frame = cv2.rotate(frame, cv2.ROTATE_180) elif angle == 270: frame = cv2.rotate(frame, cv2.ROTATE_90_COUNTERCLOCKWISE) elif cmd_type == "flip": flip_code = command.get("direction", 0) if flip_code in [0, 1, -1]: frame = cv2.flip(frame, flip_code) elif cmd_type == "grayscale": frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) frame = cv2.cvtColor(frame, cv2.COLOR_GRAY2BGR) elif cmd_type == "brightness": value = command.get("value", 0) frame = cv2.convertScaleAbs(frame, alpha=1, beta=value) elif cmd_type == "contrast": value = command.get("value", 1.0) frame = cv2.convertScaleAbs(frame, alpha=value, beta=0) return frame except Exception as e: print(f"[VideoProcessor] Error applying command: {e}") return frame class AdminAuth: security = HTTPBasic() @staticmethod def authenticate_admin(credentials: HTTPBasicCredentials = Depends(security)): """Аутентификация администратора""" global admin_sessions, stats_lock for admin in ADMINS: if credentials.username == admin[0] and credentials.password == admin[1]: session_id = hashlib.sha256( f"{credentials.username}{time.time()}".encode() ).hexdigest()[:32] with stats_lock: admin_sessions[session_id] = { "username": credentials.username, "login_time": datetime.now().isoformat(), "last_activity": datetime.now().isoformat(), "is_authenticated": True } print(f"[AdminAuth] User authenticated: {credentials.username}") return {"session_id": session_id, "username": credentials.username} raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials", headers={"WWW-Authenticate": "Basic"}, ) @staticmethod def verify_session(session_id: str) -> bool: """Проверка сессии администратора""" global admin_sessions, stats_lock if session_id in admin_sessions: with stats_lock: admin_sessions[session_id]["last_activity"] = datetime.now().isoformat() return True return False # ========== ФОНОВЫЕ ЗАДАЧИ ========== async def cleanup_inactive_sessions(): """Очистка неактивных сессий""" global admin_sessions, admin_websockets, rooms, room_stats, stats_lock while True: try: current_time = datetime.now() inactive_sessions = [] with stats_lock: for session_id, session_data in admin_sessions.items(): last_activity = datetime.fromisoformat(session_data["last_activity"]) timeout = SERVER_CONFIG["admin_session_timeout"] if (current_time - last_activity).seconds > timeout: inactive_sessions.append(session_id) for session_id in inactive_sessions: if session_id in admin_sessions: del admin_sessions[session_id] for session_id in inactive_sessions: if session_id in admin_websockets: try: await admin_websockets[session_id].close() except: pass del admin_websockets[session_id] inactive_rooms = [] with stats_lock: for room_id, room in rooms.items(): if not room["clients"]: stats = room_stats.get(room_id, {}) last_activity_str = stats.get("last_activity", current_time.isoformat()) last_activity = datetime.fromisoformat(last_activity_str) if (current_time - last_activity).seconds > 86400: inactive_rooms.append(room_id) for room_id in inactive_rooms: RoomManager.delete_room(room_id) await asyncio.sleep(60) except Exception as e: print(f"[Cleanup] Error: {e}") await asyncio.sleep(60) # ========== LIFESPAN MANAGER ========== @asynccontextmanager async def lifespan(app: FastAPI): """Управление жизненным циклом приложения""" global rooms, clients, admin_sessions, client_websockets, admin_websockets global video_queues, command_queues, room_stats, server_stats, stats_lock, cleanup_task, templates import threading stats_lock = threading.Lock() rooms = {} clients = {} admin_sessions = {} client_websockets = {} admin_websockets = {} video_queues = {} command_queues = {} room_stats = {} server_stats = { "total_rooms": 0, "total_clients": 0, "total_streams": 0, "start_time": datetime.now().isoformat(), "cpu_usage": 0, "memory_usage": 0, "uptime": 0 } # Создаем папку для шаблонов если её нет os.makedirs("templates", exist_ok=True) templates = Jinja2Templates(directory="templates") print("=" * 60) print("🎥 Video Streaming Server with Web Interface") print(f"📡 Server running on: {SERVER_CONFIG['host']}:{SERVER_CONFIG['port']}") print(f"🌐 Web Interface: http://{SERVER_CONFIG['host']}:{SERVER_CONFIG['port']}/") print(f"👥 Admin accounts: {len(ADMINS)}") print("=" * 60) cleanup_task = asyncio.create_task(cleanup_inactive_sessions()) yield print("Shutting down server...") if cleanup_task: cleanup_task.cancel() try: await cleanup_task except asyncio.CancelledError: pass for client_id in list(clients.keys()): ClientManager.disconnect_client(client_id) print("Server shutdown complete.") # ========== FASTAPI ПРИЛОЖЕНИЕ ========== app = FastAPI( title="Video Streaming Server", version="2.1.0", lifespan=lifespan ) # CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Статические файлы os.makedirs("static", exist_ok=True) app.mount("/static", StaticFiles(directory="static"), name="static") # ========== ВЕБ-ИНТЕРФЕЙС ========== @app.get("/", response_class=HTMLResponse) async def login_page(request: Request): """Страница входа""" server_host = get_server_host() return templates.TemplateResponse("login.html", { "request": request, "server_host": server_host, "server_port": SERVER_CONFIG["port"] }) @app.post("/login") async def login_form(request: Request, username: str = Form(...), password: str = Form(...)): """Обработка формы входа""" for admin in ADMINS: if username == admin[0] and password == admin[1]: session_id = hashlib.sha256( f"{username}{time.time()}".encode() ).hexdigest()[:32] with stats_lock: admin_sessions[session_id] = { "username": username, "login_time": datetime.now().isoformat(), "last_activity": datetime.now().isoformat(), "is_authenticated": True } print(f"[Auth] ✓ Admin logged in: {username}, session_id={session_id[:16]}..., total sessions: {len(admin_sessions)}") response = RedirectResponse(url="/dashboard", status_code=303) response.set_cookie(key="session_id", value=session_id) return response print(f"[Auth] ❌ Failed login attempt: {username}") server_host = get_server_host() return templates.TemplateResponse("login.html", { "request": request, "error": "Invalid username or password", "server_host": server_host, "server_port": SERVER_CONFIG["port"] }) @app.get("/dashboard", response_class=HTMLResponse) async def dashboard_page(request: Request): """Панель управления""" session_id = request.cookies.get("session_id") if not session_id or not AdminAuth.verify_session(session_id): return RedirectResponse(url="/", status_code=303) with stats_lock: stats = server_stats.copy() # Преобразуем данные для JSON сериализации room_list = [] for room_id, room in rooms.items(): room_stats_data = room_stats.get(room_id, {}) room_list.append({ "id": room["id"], "name": room["name"], "clients_count": len(room["clients"]), "max_connections": room["max_connections"], "created_at": room["created_at"], "created_by": room.get("created_by", "Unknown"), "active_streams": room_stats_data.get("active_streams", 0), "clients": room["clients"] # Уже список }) server_host = get_server_host() return templates.TemplateResponse("dashboard.html", { "request": request, "username": admin_sessions[session_id]["username"], "stats": stats, "rooms": room_list, "total_rooms": len(rooms), "total_clients": len(clients), "server_host": server_host, "server_port": SERVER_CONFIG["port"] }) @app.get("/room/{room_id}", response_class=HTMLResponse) async def room_page(request: Request, room_id: str): """Страница управления комнатой""" session_id = request.cookies.get("session_id") if not session_id or not AdminAuth.verify_session(session_id): return RedirectResponse(url="/", status_code=303) if room_id not in rooms: return RedirectResponse(url="/dashboard", status_code=303) room = rooms[room_id] room_stats_data = room_stats.get(room_id, {}) clients_list = [] for client_id in room["clients"]: if client_id in clients: client = clients[client_id] clients_list.append({ "id": client_id, "ip_address": client["ip_address"], "connected_at": client["connected_at"], "is_streaming": client["is_streaming"], "video_settings": client["video_settings"] }) server_host = get_server_host() return templates.TemplateResponse("room.html", { "request": request, "room": room, "room_stats": room_stats_data, "clients": clients_list, "username": admin_sessions[session_id]["username"], "server_host": server_host, "server_port": SERVER_CONFIG["port"], "ssl_enabled": SERVER_CONFIG.get("ssl_enabled", False) }) @app.get("/stream/{client_id}", response_class=HTMLResponse) async def stream_page(request: Request, client_id: str): """Страница просмотра потока клиента""" session_id = request.cookies.get("session_id") if not session_id or not AdminAuth.verify_session(session_id): return RedirectResponse(url="/", status_code=303) if client_id not in clients: return RedirectResponse(url="/dashboard", status_code=303) client = clients[client_id] room = rooms[client["room_id"]] if client["room_id"] in rooms else None server_host = get_server_host() return templates.TemplateResponse("stream.html", { "request": request, "client": client, "room": room, "session_id": session_id, "username": admin_sessions[session_id]["username"], "server_host": server_host, "server_port": SERVER_CONFIG["port"] }) @app.get("/create-room", response_class=HTMLResponse) async def create_room_page(request: Request): """Страница создания комнаты""" session_id = request.cookies.get("session_id") if not session_id or not AdminAuth.verify_session(session_id): return RedirectResponse(url="/", status_code=303) server_host = get_server_host() return templates.TemplateResponse("create_room.html", { "request": request, "username": admin_sessions[session_id]["username"], "server_host": server_host, "server_port": SERVER_CONFIG["port"] }) @app.post("/api/create-room") async def create_room_api(request: Request, name: str = Form(...), password: str = Form(...), max_connections: int = Form(...)): """API для создания комнаты через веб-интерфейс""" session_id = request.cookies.get("session_id") if not session_id or not AdminAuth.verify_session(session_id): return JSONResponse({"success": False, "error": "Unauthorized"}) created_by = admin_sessions[session_id]["username"] if session_id in admin_sessions else "Unknown" try: room = RoomManager.create_room(name, password, max_connections, created_by) # Используем safe_json_serializer для преобразования данных room_serializable = { "id": room["id"], "name": room["name"], "password": room["password"], "max_connections": room["max_connections"], "created_at": room["created_at"], "created_by": room["created_by"], "is_active": room["is_active"], "clients": room["clients"] # Уже список } return JSONResponse({"success": True, "room": room_serializable}) except Exception as e: print(f"[API] Error creating room: {e}") return JSONResponse({"success": False, "error": str(e)}) @app.post("/api/delete-room/{room_id}") async def delete_room_api(request: Request, room_id: str): """API для удаления комнаты""" session_id = request.cookies.get("session_id") if not session_id or not AdminAuth.verify_session(session_id): return JSONResponse({"success": False, "error": "Unauthorized"}) success = RoomManager.delete_room(room_id) return JSONResponse({"success": success}) @app.post("/api/disconnect-client/{client_id}") async def disconnect_client_api(request: Request, client_id: str): """API для отключения клиента""" session_id = request.cookies.get("session_id") if not session_id or not AdminAuth.verify_session(session_id): return JSONResponse({"success": False, "error": "Unauthorized"}) success = ClientManager.disconnect_client(client_id) return JSONResponse({"success": success}) @app.get("/logout") async def logout(request: Request): """Выход из системы""" session_id = request.cookies.get("session_id") if session_id and session_id in admin_sessions: with stats_lock: del admin_sessions[session_id] response = RedirectResponse(url="/", status_code=303) response.delete_cookie(key="session_id") return response # ========== API РОУТЫ ========== @app.post("/api/auth/login") async def login_api(auth_data: dict): """Вход администратора (API)""" username = auth_data.get("username") password = auth_data.get("password") for admin in ADMINS: if username == admin[0] and password == admin[1]: session_id = hashlib.sha256( f"{username}{time.time()}".encode() ).hexdigest()[:32] with stats_lock: admin_sessions[session_id] = { "username": username, "login_time": datetime.now().isoformat(), "last_activity": datetime.now().isoformat(), "is_authenticated": True } return {"success": True, "session_id": session_id, "username": username} return {"success": False, "error": "Invalid credentials"} @app.get("/api/auth/verify") async def verify_session_api(session_id: str): """Проверка сессии (API)""" if AdminAuth.verify_session(session_id): return {"success": True, "username": admin_sessions[session_id]["username"]} return {"success": False} @app.post("/api/rooms/create") async def create_room_api_endpoint(room_data: RoomCreate, session_id: str): """Создание новой комнаты (API)""" if not AdminAuth.verify_session(session_id): raise HTTPException(status_code=401, detail="Unauthorized") created_by = admin_sessions[session_id]["username"] if session_id in admin_sessions else None room = RoomManager.create_room( name=room_data.name, password=room_data.password, max_connections=room_data.max_connections, created_by=created_by ) # Возвращаем сериализуемые данные room_response = { "id": room["id"], "name": room["name"], "password": room["password"], "max_connections": room["max_connections"], "created_at": room["created_at"], "created_by": room["created_by"], "is_active": room["is_active"], "clients": room["clients"] } return {"success": True, "room": room_response} @app.get("/api/rooms") async def get_rooms_api(session_id: str): """Получение списка комнат (API)""" if not AdminAuth.verify_session(session_id): raise HTTPException(status_code=401, detail="Unauthorized") with stats_lock: room_list = [] for room_id, room in rooms.items(): stats = room_stats.get(room_id, {}) room_list.append({ "id": room["id"], "name": room["name"], "created_at": room["created_at"], "created_by": room.get("created_by"), "max_connections": room["max_connections"], "clients_count": len(room["clients"]), "is_active": room["is_active"], "stats": stats, "clients": room["clients"] # Уже список }) return {"success": True, "rooms": room_list} @app.delete("/api/rooms/{room_id}") async def delete_room_api_endpoint(room_id: str, session_id: str): """Удаление комнаты (API)""" if not AdminAuth.verify_session(session_id): raise HTTPException(status_code=401, detail="Unauthorized") success = RoomManager.delete_room(room_id) return {"success": success} @app.get("/api/rooms/{room_id}/clients") async def get_room_clients_api(room_id: str, session_id: str): """Получение клиентов комнаты (API)""" if not AdminAuth.verify_session(session_id): raise HTTPException(status_code=401, detail="Unauthorized") if room_id not in rooms: return {"success": False, "error": "Room not found"} with stats_lock: clients_list = [] for client_id in rooms[room_id]["clients"]: if client_id in clients: client = clients[client_id] clients_list.append({ "id": client_id, "ip_address": client["ip_address"], "connected_at": client["connected_at"], "is_streaming": client["is_streaming"], "video_settings": client["video_settings"] }) return {"success": True, "clients": clients_list} @app.get("/api/stats") async def get_server_stats_api(session_id: str): """Получение статистики сервера (API)""" if not AdminAuth.verify_session(session_id): raise HTTPException(status_code=401, detail="Unauthorized") cpu_usage = psutil.cpu_percent() memory_usage = psutil.virtual_memory().percent with stats_lock: server_stats["cpu_usage"] = cpu_usage server_stats["memory_usage"] = memory_usage server_stats["total_rooms"] = len(rooms) server_stats["total_clients"] = len(clients) active_streams = sum(1 for client in clients.values() if client.get("is_streaming", False)) server_stats["total_streams"] = active_streams # Расчет времени работы start_time = datetime.fromisoformat(server_stats["start_time"]) uptime = (datetime.now() - start_time).total_seconds() server_stats["uptime"] = int(uptime) stats_copy = server_stats.copy() return { "success": True, "stats": stats_copy, "system": { "cpu_count": cpu_count(), "memory_total": psutil.virtual_memory().total, "memory_available": psutil.virtual_memory().available, } } # ========== WEBSOCKET РОУТЫ ========== @app.websocket("/ws/client/{room_id}/{password}") async def client_websocket_endpoint(websocket: WebSocket, room_id: str, password: str): """WebSocket для клиентов (трансляция видео)""" await websocket.accept() client_id = None video_processor = None try: client_ip = websocket.client.host if websocket.client else "unknown" print(f"\n[WebSocket Client] ===== NEW CONNECTION =====") print(f"[WebSocket Client] IP: {client_ip}, Room: {room_id}, Password: {password}") client_id = ClientManager.add_client(room_id, password, client_ip) if not client_id: print(f"[WebSocket Client] ❌ ClientManager.add_client returned None") await websocket.send_text(json.dumps({"error": "Invalid room or password"})) await websocket.close() return print(f"[WebSocket Client] ✓ Client added: {client_id}") client_websockets[client_id] = websocket print(f"[WebSocket Client] ✓ WebSocket registered for {client_id}") video_processor = VideoProcessor(client_id) print(f"[WebSocket Client] ✓ VideoProcessor created for {client_id}") video_processor.start() print(f"[WebSocket Client] ✓ VideoProcessor started for {client_id}") with stats_lock: if client_id in clients: clients[client_id]["is_streaming"] = True print(f"[WebSocket Client] ✓ is_streaming set to True for {client_id}") success_msg = { "success": True, "client_id": client_id, "room_id": room_id } await websocket.send_text(json.dumps(success_msg)) print(f"[WebSocket Client] ✓ Sent success message to {client_id}") print(f"[WebSocket Client] ===== WAITING FOR DATA =====\n") frame_count = 0 command_count = 0 while True: try: data = await websocket.receive() if data["type"] == "websocket.receive": message = data.get("text") or data.get("bytes") if isinstance(message, bytes): if frame_count == 0: print(f"[WebSocket Client] ✓ FIRST FRAME RECEIVED from {client_id}! Size: {len(message)} bytes") if client_id in video_queues: if not video_queues[client_id].full(): video_queues[client_id].put(message) frame_count += 1 if frame_count % 30 == 0: # log every 30 frames print(f"[WebSocket Client] {client_id}: received {frame_count} frames") else: print(f"[WebSocket Client] ⚠️ {client_id}: video queue is FULL, dropping frame") else: print(f"[WebSocket Client] ❌ {client_id}: video_queue not found!") elif isinstance(message, str): try: command = json.loads(message) if "type" in command: with stats_lock: if client_id in clients: clients[client_id]["commands"].append(command) if client_id in command_queues: command_queues[client_id].put(command) command_count += 1 print(f"[WebSocket Client] {client_id}: command received: {command.get('type')}") except json.JSONDecodeError as e: print(f"[WebSocket Client] ⚠️ {client_id}: invalid JSON: {message[:50]}") elif data["type"] == "websocket.disconnect": print(f"[WebSocket Client] {client_id}: CLIENT DISCONNECT (frames: {frame_count}, commands: {command_count})") break except WebSocketDisconnect: print(f"[WebSocket Client] {client_id}: WEBSOCKET DISCONNECT (frames: {frame_count})") break except Exception as e: print(f"[WebSocket Client] ❌ {client_id}: Error in receive loop: {type(e).__name__}: {e}") break except Exception as e: print(f"[WebSocket Client] ❌ Connection error: {type(e).__name__}: {e}") finally: print(f"[WebSocket Client] ===== CLEANUP =====") if client_id: with stats_lock: if client_id in clients: clients[client_id]["is_streaming"] = False print(f"[WebSocket Client] ✓ is_streaming set to False for {client_id}") ClientManager.disconnect_client(client_id) print(f"[WebSocket Client] ✓ Disconnected client: {client_id}") if video_processor: video_processor.stop() # Очищаем очередь для администраторов try: if client_id in processed_video_queues: del processed_video_queues[client_id] except Exception: pass print(f"[WebSocket Client] ✓ VideoProcessor stopped for {client_id}") print(f"[WebSocket Client] ===== END OF CONNECTION =====\n") @app.websocket("/ws/admin/{session_id}") async def admin_websocket_endpoint(websocket: WebSocket, session_id: str): """WebSocket для администраторов (просмотр и управление)""" if not AdminAuth.verify_session(session_id): await websocket.close(code=1008) return await websocket.accept() admin_websockets[session_id] = websocket # Словарь для хранения активных стримов active_streams = {} try: await websocket.send_text(json.dumps({ "type": "connected", "message": "Admin WebSocket connected", "session_id": session_id })) print(f"[WebSocket] Admin connected: {session_id}") while True: try: data = await websocket.receive() if data["type"] == "websocket.receive": message = data.get("text") if message: try: command = json.loads(message) cmd_type = command.get("type") if cmd_type == "watch_client": client_id = command.get("client_id") # Останавливаем предыдущий стрим если есть if session_id in active_streams: active_streams[session_id].cancel() # Запускаем новый стрим task = asyncio.create_task( _stream_client_to_admin(client_id, session_id) ) active_streams[session_id] = task await websocket.send_text(json.dumps({ "type": "watching_started", "client_id": client_id, "message": f"Started watching client {client_id}" })) elif cmd_type == "stop_watching": # Останавливаем просмотр if session_id in active_streams: active_streams[session_id].cancel() del active_streams[session_id] await websocket.send_text(json.dumps({ "type": "watching_stopped", "message": "Stopped watching" })) elif cmd_type == "control_client": client_id = command.get("client_id") control_cmd = command.get("command") if client_id in clients and client_id in command_queues: command_queues[client_id].put(control_cmd) await websocket.send_text(json.dumps({ "type": "control_response", "success": True, "command": control_cmd })) elif cmd_type == "get_stats": with stats_lock: stats = server_stats.copy() await websocket.send_text(json.dumps({ "type": "stats_update", "stats": stats })) except json.JSONDecodeError: pass elif data["type"] == "websocket.disconnect": print(f"[WebSocket] Admin disconnected: {session_id}") break except WebSocketDisconnect: print(f"[WebSocket] Admin WebSocket disconnected: {session_id}") break except Exception as e: print(f"[WebSocket] Error in admin WebSocket {session_id}: {e}") break except Exception as e: print(f"[WebSocket] Admin connection error: {e}") finally: # Останавливаем все активные стримы if session_id in active_streams: active_streams[session_id].cancel() if session_id in admin_websockets: del admin_websockets[session_id] async def _stream_client_to_admin(client_id: str, admin_session_id: str): """Потоковая передача видео от клиента к администратору""" if client_id not in clients or admin_session_id not in admin_websockets: return try: admin_ws = admin_websockets[admin_session_id] await admin_ws.send_text(json.dumps({ "type": "stream_started", "client_id": client_id, "timestamp": datetime.now().isoformat() })) # Получаем информацию о клиенте with stats_lock: if client_id in clients: client = clients[client_id] await admin_ws.send_text(json.dumps({ "type": "stream_info", "message": f"Streaming from client {client_id}", "client_id": client_id, "quality": client["video_settings"]["quality"], "frame_rate": client["video_settings"]["frame_rate"], "resolution": client["video_settings"]["resolution"], "ip": client["ip_address"], "connected_at": client["connected_at"], "status": "streaming" })) print(f"[WebSocket] Started streaming from client {client_id} to admin {admin_session_id}") # Ждем пока появится очередь обработанных кадров start_time = time.time() while client_id not in processed_video_queues and time.time() - start_time < 10: await asyncio.sleep(0.1) if client_id not in processed_video_queues: await admin_ws.send_text(json.dumps({ "type": "stream_error", "message": "No video stream available from client" })) return processed_queue = processed_video_queues[client_id] frame_count = 0 last_update_time = time.time() # Основной цикл пересылки видео while (client_id in clients and admin_session_id in admin_websockets and clients[client_id].get("is_streaming", False)): try: # Получаем обработанный кадр if not processed_queue.empty(): frame_data = processed_queue.get_nowait() frame_count += 1 # Отправляем кадр администратору как бинарные данные await admin_ws.send_bytes(frame_data) # Отправляем статистику каждую секунду current_time = time.time() if current_time - last_update_time > 1: await admin_ws.send_text(json.dumps({ "type": "stream_stats", "frames_sent": frame_count, "timestamp": datetime.now().isoformat() })) last_update_time = current_time # Небольшая задержка для CPU await asyncio.sleep(0.01) except asyncio.CancelledError: # Стрим был отменен администратором break except Exception as e: print(f"[WebSocket] Error streaming to admin: {e}") break print(f"[WebSocket] Stopped streaming from client {client_id} to admin {admin_session_id}") except Exception as e: print(f"[WebSocket] Error in stream to admin: {e}") # ========== КЛИЕНТСКИЙ API ========== @app.post("/api/client/connect") async def client_connect_api(connection_data: dict): """Подключение клиента к комнате (API)""" room_id = connection_data.get("room_id") password = connection_data.get("password") client_ip = connection_data.get("ip", "unknown") client_id = ClientManager.add_client(room_id, password, client_ip) if client_id: server_host = get_server_host() ws_protocol = "wss" if SERVER_CONFIG.get("ssl_enabled", False) else "ws" return { "success": True, "client_id": client_id, "room_id": room_id, "ws_url": f"{ws_protocol}://{server_host}:{SERVER_CONFIG['port']}/ws/client/{room_id}/{password}" } return {"success": False, "error": "Connection failed"} # ========== СОЗДАНИЕ HTML ШАБЛОНОВ ========== def create_html_templates(): """Создание HTML шаблонов для веб-интерфейса""" # Шаблон login.html login_html = """ Video Streaming Server - Login
{% if error %}
{{ error }}
{% endif %}

Demo Accounts:

""" # Шаблон dashboard.html dashboard_html = """ Dashboard - Video Streaming Server

Dashboard

Total Rooms

{{ stats.total_rooms }}
Active: {{ total_rooms }}

Connected Clients

{{ stats.total_clients }}
Streaming: {{ stats.total_streams }}

CPU Usage

{{ stats.cpu_usage }}%
Cores: {{ stats.system.cpu_count if stats.system else 'N/A' }}

Memory Usage

{{ stats.memory_usage }}%
{% if stats.system %} {{ ((stats.system.memory_total - stats.system.memory_available) / 1024 / 1024 / 1024)|round(1) }} GB used {% else %} 0 GB used {% endif %}
{% if rooms %}
{% for room in rooms %} {% endfor %}
ID Name Clients Max Connections Created By Status Actions
{{ room.id }} {{ room.name }} {{ room.clients_count }} {% if room.active_streams > 0 %} ({{ room.active_streams }} streaming) {% endif %} {{ room.max_connections }} {{ room.created_by }} Active
{% else %}

No rooms created yet. Create your first room!

{% endif %}

System Information

Server Uptime

Loading...
Since {{ stats.start_time }}

Server Address

{{ server_host }}:{{ server_port }}
WebSocket: ws://{{ server_host }}:{{ server_port }}

API Status

Online
All systems operational
""" # Шаблон room.html room_html = """ Room: {{ room.name }} - Video Streaming Server

Room: {{ room.name }}

Room Information

{{ room.id }}
{{ room.password }}
{{ room.max_connections }}
{{ room.created_at }}
{{ room.created_by }}
{{ room_stats.total_clients }} / {{ room.max_connections }}
{{ room_stats.active_streams }}
{{ (room_stats.bytes_transferred / 1024 / 1024)|round(2) }} MB

Client Connection Information

Clients can connect to this room using:

WebSocket URL: {{ 'wss' if ssl_enabled else 'ws' }}://{{ server_host }}:{{ server_port }}/ws/client/{{ room.id }}/{{ room.password }}

Room ID: {{ room.id }}

Password: {{ room.password }}

Connected Clients ({{ clients|length }})

{% if clients %}
{% for client in clients %} {% endfor %}
Client ID IP Address Connected At Status Video Settings Actions
{{ client.id[:8] }}... {{ client.ip_address }} {{ client.connected_at }} {% if client.is_streaming %} Streaming {% else %} Idle {% endif %} {{ client.video_settings.quality }}% Quality
{{ client.video_settings.frame_rate }} FPS
{% else %}

No clients connected to this room yet.

Share the connection information above with clients.

{% endif %}
""" # Шаблон create_room.html create_room_html = """ Create Room - Video Streaming Server

Create New Room

A descriptive name for the room
Clients will need this password to connect
Maximum number of clients that can connect simultaneously
Cancel

Room Information

Generating...
ws://{{ server_host }}:{{ server_port }}/ws/client/...
""" # Шаблон stream.html stream_html = """ Stream: {{ client.id }} - Video Streaming Server
Back to Room

Streaming: {{ client.id[:8] }}...

Connected from {{ client.ip_address }}

Waiting for Video Stream

Video stream will appear here once the client starts streaming.

Use the controls on the right to adjust video settings.

{{ client.id }}
{{ room.name if room else 'Unknown' }}
{{ client.connected_at }}
{{ "Yes" if client.is_streaming else "No" }}

Video Controls

{{ client.video_settings.quality }}%
0
1.0

Stream Information

This client is connected via WebSocket and streaming video data.

Use the controls above to adjust the video stream in real-time.

Changes are applied to the video processor on the server side.

Connecting...
""" # Сохраняем шаблоны в файлы templates_dir = "templates" os.makedirs(templates_dir, exist_ok=True) with open(os.path.join(templates_dir, "login.html"), "w", encoding="utf-8") as f: f.write(login_html) with open(os.path.join(templates_dir, "dashboard.html"), "w", encoding="utf-8") as f: f.write(dashboard_html) with open(os.path.join(templates_dir, "room.html"), "w", encoding="utf-8") as f: f.write(room_html) with open(os.path.join(templates_dir, "create_room.html"), "w", encoding="utf-8") as f: f.write(create_room_html) with open(os.path.join(templates_dir, "stream.html"), "w", encoding="utf-8") as f: f.write(stream_html) print(f"[Templates] Created HTML templates in '{templates_dir}' directory") # ========== ЗАПУСК СЕРВЕРА ========== def main(): """Основная функция для запуска сервера""" # Создаем HTML шаблоны create_html_templates() server_host = get_server_host() ssl_enabled = SERVER_CONFIG.get("ssl_enabled", False) protocol = "https" if ssl_enabled else "http" ws_protocol = "wss" if ssl_enabled else "ws" print("=" * 60) print("🎥 Video Streaming Server with Web Interface") print("=" * 60) print(f"🌐 Web Interface: {protocol}://{server_host}:{SERVER_CONFIG['port']}") print(f"🔌 WebSocket: {ws_protocol}://{server_host}:{SERVER_CONFIG['port']}") print(f"👤 Admin Login: {protocol}://{server_host}:{SERVER_CONFIG['port']}/") print("=" * 60) print("Default Admin Accounts:") print(" • admin / admin123") print(" • administrator / securepass") print(" • supervisor / superpass") print("=" * 60) if ssl_enabled: print("🔒 SSL/TLS enabled") print(f" Certificate: {SERVER_CONFIG.get('ssl_certfile')}") print(f" Key: {SERVER_CONFIG.get('ssl_keyfile')}") print("=" * 60) print("Press Ctrl+C to stop the server") try: uvicorn_kwargs = { "host": SERVER_CONFIG["host"], "port": SERVER_CONFIG["port"], "log_level": "info", "ws_ping_interval": SERVER_CONFIG["websocket_ping_interval"], "ws_ping_timeout": SERVER_CONFIG["websocket_ping_timeout"], } # Добавляем SSL если включено if ssl_enabled: ssl_certfile = SERVER_CONFIG.get("ssl_certfile") ssl_keyfile = SERVER_CONFIG.get("ssl_keyfile") if ssl_certfile and ssl_keyfile: import os if os.path.exists(ssl_certfile) and os.path.exists(ssl_keyfile): uvicorn_kwargs["ssl_certfile"] = ssl_certfile uvicorn_kwargs["ssl_keyfile"] = ssl_keyfile else: print(f"⚠️ SSL files not found, running without SSL") uvicorn.run("server:app", **uvicorn_kwargs) except KeyboardInterrupt: print("\nServer stopped by user") except Exception as e: print(f"Error starting server: {e}") if __name__ == "__main__": freeze_support() main()