"""
KazicCAM - Серверная часть с веб-интерфейсом
FastAPI + OpenCV + WebSocket + Multiprocessing
Версия: 2.1.0, 25 пересборка
Для Windows
"""
import os
import sys
import uuid
import json
import time
import asyncio
import hashlib
import secrets
import string
from contextlib import asynccontextmanager
from datetime import datetime
from typing import Dict, List, Optional, Set, Tuple, Any
from collections import defaultdict
from multiprocessing import Process, Manager, Queue, cpu_count, freeze_support
from concurrent.futures import ProcessPoolExecutor
from dotenv import load_dotenv
import cv2
import numpy as np
import uvicorn
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException, Depends, status, Request, Form
from fastapi.security import HTTPBasic, HTTPBasicCredentials
from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse, RedirectResponse
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from pydantic import BaseModel
import psutil
load_dotenv()
# ========== КОНФИГУРАЦИЯ ==========
SERVER_CONFIG = {
# считываем переменную окружения HOST (или legacy 'host'), чтобы можно было управлять через .env / docker-compose
"host": os.getenv("HOST", os.getenv("host", "0.0.0.0")), # сюда IP смотрящий наружу или 0.0.0.0 для всех интерфейсов
"port": 8000,
"debug": False,
"max_clients_per_room": 50,
"max_rooms": 100,
"video_width": 640,
"video_height": 480,
"frame_rate": 30,
"jpeg_quality": 85,
"websocket_ping_interval": 30,
"websocket_ping_timeout": 10,
"admin_session_timeout": 3600,
}
# ========== МОДЕЛИ ДАННЫХ ==========
class RoomCreate(BaseModel):
name: str
password: str
max_connections: int
class RoomUpdate(BaseModel):
name: Optional[str] = None
password: Optional[str] = None
max_connections: Optional[int] = None
# ========== ГЛОБАЛЬНЫЕ ПЕРЕМЕННЫЕ ==========
rooms = None
clients = None
admin_sessions = None
client_websockets = None
admin_websockets = None
video_queues = None
command_queues = None
room_stats = None
server_stats = None
stats_lock = None
cleanup_task = None
templates = None
# Администраторы
ADMINS = [
["admin", "admin123"],
["administrator", "securepass"],
["supervisor", "superpass"]
]
# ========== ВСПОМОГАТЕЛЬНЫЕ ФУНКЦИИ ==========
def safe_json_serializer(obj: Any) -> Any:
"""Безопасный сериализатор JSON для объектов Python"""
if isinstance(obj, set):
return list(obj)
elif isinstance(obj, datetime):
return obj.isoformat()
elif hasattr(obj, '__dict__'):
return obj.__dict__
return str(obj)
def get_server_host():
"""Получение IP адреса сервера"""
import socket
# 0) Если явно задан адрес, который должен отображаться в UI (рекомендуется при запуске в Docker), используем его
public_host = os.getenv("PUBLIC_HOST") or os.getenv("ADVERTISED_HOST")
if public_host and public_host not in ("", "0.0.0.0", "127.0.0.1", "localhost"):
return public_host
# 1) Если в конфиге явно указан хост (и это не 0.0.0.0 / localhost), используем его
cfg_host = SERVER_CONFIG.get("host")
if cfg_host and cfg_host not in ("0.0.0.0", "127.0.0.1", "localhost", ""):
return cfg_host
# 2) Попытка определить внешний IP без отправки данных — UDP socket к публичному адресу
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# Не устанавливаем реального соединения, просто используем маршрутную информацию
s.connect(("8.8.8.8", 80))
ip_address = s.getsockname()[0]
s.close()
if ip_address and not ip_address.startswith("127."):
return ip_address
except Exception:
pass
# 3) Попытка через hostname (может вернуть 127.0.1.1 на некоторых системах)
try:
hostname = socket.gethostname()
ip_address = socket.gethostbyname(hostname)
if ip_address and not ip_address.startswith("127."):
return ip_address
except Exception:
pass
# 4) Фолбэк — вернуть значение из конфига (возможно 0.0.0.0 или что задано в ENV)
return cfg_host or "0.0.0.0"
# ========== КЛАССЫ ДЛЯ УПРАВЛЕНИЯ ==========
class RoomManager:
@staticmethod
def generate_room_id(length: int = 11) -> str:
"""Генерация ID комнаты из букв и цифр"""
alphabet = string.ascii_letters + string.digits
return ''.join(secrets.choice(alphabet) for _ in range(length))
@staticmethod
def create_room(name: str, password: str, max_connections: int, created_by: str = None) -> Dict:
"""Создание новой комнаты"""
global rooms, room_stats, server_stats, stats_lock
room_id = RoomManager.generate_room_id()
room = {
"id": room_id,
"name": name,
"password": password,
"max_connections": max_connections,
"created_at": datetime.now().isoformat(),
"created_by": created_by,
"clients": [], # Используем список вместо set
"is_active": True
}
with stats_lock:
rooms[room_id] = room
room_stats[room_id] = {
"total_clients": 0,
"active_streams": 0,
"bytes_transferred": 0,
"last_activity": datetime.now().isoformat()
}
server_stats["total_rooms"] = len(rooms)
print(f"[RoomManager] Created room: {room_id} - {name}")
return room
@staticmethod
def delete_room(room_id: str) -> bool:
"""Удаление комнаты и отключение всех клиентов"""
global rooms, room_stats, server_stats, stats_lock
if room_id in rooms:
# Отключаем всех клиентов в комнате
for client_id in list(rooms[room_id]["clients"]):
ClientManager.disconnect_client(client_id)
with stats_lock:
del rooms[room_id]
if room_id in room_stats:
del room_stats[room_id]
server_stats["total_rooms"] = len(rooms)
print(f"[RoomManager] Deleted room: {room_id}")
return True
return False
@staticmethod
def update_room(room_id: str, updates: dict) -> Optional[Dict]:
"""Обновление информации о комнате"""
global rooms, stats_lock
if room_id in rooms:
with stats_lock:
for key, value in updates.items():
if value is not None and key in rooms[room_id]:
rooms[room_id][key] = value
return rooms[room_id]
return None
class ClientManager:
@staticmethod
def generate_client_id() -> str:
"""Генерация уникального ID клиента"""
return str(uuid.uuid4())
@staticmethod
def add_client(room_id: str, password: str, ip_address: str) -> Optional[str]:
"""Добавление нового клиента"""
global rooms, clients, room_stats, server_stats, stats_lock
if room_id not in rooms:
print(f"[ClientManager] Room not found: {room_id}")
return None
room = rooms[room_id]
if room["password"] != password:
print(f"[ClientManager] Invalid password for room: {room_id}")
return None
if len(room["clients"]) >= room["max_connections"]:
print(f"[ClientManager] Room {room_id} is full")
return None
client_id = ClientManager.generate_client_id()
client = {
"id": client_id,
"room_id": room_id,
"ip_address": ip_address,
"connected_at": datetime.now().isoformat(),
"last_activity": datetime.now().isoformat(),
"is_streaming": False,
"video_settings": {
"quality": 85,
"frame_rate": 30,
"resolution": "640x480"
},
"commands": []
}
with stats_lock:
clients[client_id] = client
# Используем список вместо set
if client_id not in room["clients"]:
room["clients"].append(client_id)
# Обновление статистики
if room_id in room_stats:
room_stats[room_id]["total_clients"] = len(room["clients"])
room_stats[room_id]["last_activity"] = datetime.now().isoformat()
server_stats["total_clients"] = len(clients)
print(f"[ClientManager] Added client: {client_id} to room: {room_id}")
return client_id
@staticmethod
def disconnect_client(client_id: str) -> bool:
"""Отключение клиента"""
global rooms, clients, client_websockets, video_queues, command_queues, room_stats, server_stats, stats_lock
if client_id in clients:
room_id = clients[client_id]["room_id"]
with stats_lock:
# Удаляем из комнаты
if room_id in rooms and client_id in rooms[room_id]["clients"]:
rooms[room_id]["clients"].remove(client_id)
if room_id in room_stats:
room_stats[room_id]["total_clients"] = len(rooms[room_id]["clients"])
# Удаляем клиента
del clients[client_id]
server_stats["total_clients"] = len(clients)
# Закрываем WebSocket соединение
if client_id in client_websockets:
try:
asyncio.create_task(client_websockets[client_id].close())
except:
pass
del client_websockets[client_id]
# Очищаем очереди
if client_id in video_queues:
del video_queues[client_id]
if client_id in command_queues:
del command_queues[client_id]
print(f"[ClientManager] Disconnected client: {client_id}")
return True
return False
class VideoProcessor:
"""Обработчик видео в отдельном процессе"""
def __init__(self, client_id: str):
self.client_id = client_id
self.video_queue = Queue(maxsize=10)
self.command_queue = Queue()
self.process = None
self.is_running = False
video_queues[client_id] = self.video_queue
command_queues[client_id] = self.command_queue
def start(self):
"""Запуск процесса обработки видео"""
self.is_running = True
self.process = Process(
target=self._process_video_stream,
args=(self.client_id,)
)
self.process.daemon = True
self.process.start()
print(f"[VideoProcessor] Started for client: {self.client_id}")
def stop(self):
"""Остановка процесса"""
self.is_running = False
if self.process:
self.process.terminate()
self.process.join(timeout=5)
if self.process.is_alive():
self.process.kill()
print(f"[VideoProcessor] Stopped for client: {self.client_id}")
@staticmethod
def _process_video_stream(client_id: str):
"""Основной цикл обработки видео"""
print(f"[VideoProcessor] Process started for client {client_id}")
video_queue = None
command_queue = None
# Ждем инициализации очередей
for _ in range(100):
try:
if client_id in video_queues and client_id in command_queues:
video_queue = video_queues[client_id]
command_queue = command_queues[client_id]
break
except:
pass
time.sleep(0.1)
if not video_queue or not command_queue:
print(f"[VideoProcessor] Queues not found for client {client_id}")
return
frame_count = 0
last_log_time = time.time()
while True:
try:
if not video_queue.empty():
frame_data = video_queue.get(timeout=1)
frame_count += 1
nparr = np.frombuffer(frame_data, np.uint8)
frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if frame is not None:
# Применяем команды из очереди
while not command_queue.empty():
try:
command = command_queue.get_nowait()
frame = VideoProcessor._apply_command(frame, command)
except:
break
# Логируем каждые 5 секунд
current_time = time.time()
if current_time - last_log_time > 5:
print(f"[VideoProcessor] Client {client_id}: Processed {frame_count} frames")
last_log_time = current_time
time.sleep(0.001)
except Exception as e:
# print(f"[VideoProcessor] Error: {e}")
time.sleep(0.1)
@staticmethod
def _apply_command(frame: np.ndarray, command: Dict) -> np.ndarray:
"""Применение команды к кадру"""
try:
if not isinstance(command, dict):
return frame
cmd_type = command.get("type")
if cmd_type == "adjust_quality":
pass
elif cmd_type == "rotate":
angle = command.get("angle", 0)
if angle == 90:
frame = cv2.rotate(frame, cv2.ROTATE_90_CLOCKWISE)
elif angle == 180:
frame = cv2.rotate(frame, cv2.ROTATE_180)
elif angle == 270:
frame = cv2.rotate(frame, cv2.ROTATE_90_COUNTERCLOCKWISE)
elif cmd_type == "flip":
flip_code = command.get("direction", 0)
if flip_code in [0, 1, -1]:
frame = cv2.flip(frame, flip_code)
elif cmd_type == "grayscale":
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
frame = cv2.cvtColor(frame, cv2.COLOR_GRAY2BGR)
elif cmd_type == "brightness":
value = command.get("value", 0)
frame = cv2.convertScaleAbs(frame, alpha=1, beta=value)
elif cmd_type == "contrast":
value = command.get("value", 1.0)
frame = cv2.convertScaleAbs(frame, alpha=value, beta=0)
return frame
except Exception as e:
print(f"[VideoProcessor] Error applying command: {e}")
return frame
class AdminAuth:
security = HTTPBasic()
@staticmethod
def authenticate_admin(credentials: HTTPBasicCredentials = Depends(security)):
"""Аутентификация администратора"""
global admin_sessions, stats_lock
for admin in ADMINS:
if credentials.username == admin[0] and credentials.password == admin[1]:
session_id = hashlib.sha256(
f"{credentials.username}{time.time()}".encode()
).hexdigest()[:32]
with stats_lock:
admin_sessions[session_id] = {
"username": credentials.username,
"login_time": datetime.now().isoformat(),
"last_activity": datetime.now().isoformat(),
"is_authenticated": True
}
print(f"[AdminAuth] User authenticated: {credentials.username}")
return {"session_id": session_id, "username": credentials.username}
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid credentials",
headers={"WWW-Authenticate": "Basic"},
)
@staticmethod
def verify_session(session_id: str) -> bool:
"""Проверка сессии администратора"""
global admin_sessions, stats_lock
if session_id in admin_sessions:
with stats_lock:
admin_sessions[session_id]["last_activity"] = datetime.now().isoformat()
return True
return False
# ========== ФОНОВЫЕ ЗАДАЧИ ==========
async def cleanup_inactive_sessions():
"""Очистка неактивных сессий"""
global admin_sessions, admin_websockets, rooms, room_stats, stats_lock
while True:
try:
current_time = datetime.now()
inactive_sessions = []
with stats_lock:
for session_id, session_data in admin_sessions.items():
last_activity = datetime.fromisoformat(session_data["last_activity"])
timeout = SERVER_CONFIG["admin_session_timeout"]
if (current_time - last_activity).seconds > timeout:
inactive_sessions.append(session_id)
for session_id in inactive_sessions:
if session_id in admin_sessions:
del admin_sessions[session_id]
for session_id in inactive_sessions:
if session_id in admin_websockets:
try:
await admin_websockets[session_id].close()
except:
pass
del admin_websockets[session_id]
inactive_rooms = []
with stats_lock:
for room_id, room in rooms.items():
if not room["clients"]:
stats = room_stats.get(room_id, {})
last_activity_str = stats.get("last_activity", current_time.isoformat())
last_activity = datetime.fromisoformat(last_activity_str)
if (current_time - last_activity).seconds > 86400:
inactive_rooms.append(room_id)
for room_id in inactive_rooms:
RoomManager.delete_room(room_id)
await asyncio.sleep(60)
except Exception as e:
print(f"[Cleanup] Error: {e}")
await asyncio.sleep(60)
# ========== LIFESPAN MANAGER ==========
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Управление жизненным циклом приложения"""
global rooms, clients, admin_sessions, client_websockets, admin_websockets
global video_queues, command_queues, room_stats, server_stats, stats_lock, cleanup_task, templates
import threading
stats_lock = threading.Lock()
rooms = {}
clients = {}
admin_sessions = {}
client_websockets = {}
admin_websockets = {}
video_queues = {}
command_queues = {}
room_stats = {}
server_stats = {
"total_rooms": 0,
"total_clients": 0,
"total_streams": 0,
"start_time": datetime.now().isoformat(),
"cpu_usage": 0,
"memory_usage": 0,
"uptime": 0
}
# Создаем папку для шаблонов если её нет
os.makedirs("templates", exist_ok=True)
templates = Jinja2Templates(directory="templates")
print("=" * 60)
print("🎥 Video Streaming Server with Web Interface")
print(f"📡 Server running on: {SERVER_CONFIG['host']}:{SERVER_CONFIG['port']}")
print(f"🌐 Web Interface: http://{SERVER_CONFIG['host']}:{SERVER_CONFIG['port']}/")
print(f"👥 Admin accounts: {len(ADMINS)}")
print("=" * 60)
cleanup_task = asyncio.create_task(cleanup_inactive_sessions())
yield
print("Shutting down server...")
if cleanup_task:
cleanup_task.cancel()
try:
await cleanup_task
except asyncio.CancelledError:
pass
for client_id in list(clients.keys()):
ClientManager.disconnect_client(client_id)
print("Server shutdown complete.")
# ========== FASTAPI ПРИЛОЖЕНИЕ ==========
app = FastAPI(
title="Video Streaming Server",
version="2.1.0",
lifespan=lifespan
)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Статические файлы
os.makedirs("static", exist_ok=True)
app.mount("/static", StaticFiles(directory="static"), name="static")
# ========== ВЕБ-ИНТЕРФЕЙС ==========
@app.get("/", response_class=HTMLResponse)
async def login_page(request: Request):
"""Страница входа"""
server_host = get_server_host()
return templates.TemplateResponse("login.html", {
"request": request,
"server_host": server_host,
"server_port": SERVER_CONFIG["port"]
})
@app.post("/login")
async def login_form(request: Request, username: str = Form(...), password: str = Form(...)):
"""Обработка формы входа"""
for admin in ADMINS:
if username == admin[0] and password == admin[1]:
session_id = hashlib.sha256(
f"{username}{time.time()}".encode()
).hexdigest()[:32]
with stats_lock:
admin_sessions[session_id] = {
"username": username,
"login_time": datetime.now().isoformat(),
"last_activity": datetime.now().isoformat(),
"is_authenticated": True
}
response = RedirectResponse(url="/dashboard", status_code=303)
response.set_cookie(key="session_id", value=session_id)
return response
server_host = get_server_host()
return templates.TemplateResponse("login.html", {
"request": request,
"error": "Invalid username or password",
"server_host": server_host,
"server_port": SERVER_CONFIG["port"]
})
@app.get("/dashboard", response_class=HTMLResponse)
async def dashboard_page(request: Request):
"""Панель управления"""
session_id = request.cookies.get("session_id")
if not session_id or not AdminAuth.verify_session(session_id):
return RedirectResponse(url="/", status_code=303)
with stats_lock:
stats = server_stats.copy()
# Преобразуем данные для JSON сериализации
room_list = []
for room_id, room in rooms.items():
room_stats_data = room_stats.get(room_id, {})
room_list.append({
"id": room["id"],
"name": room["name"],
"clients_count": len(room["clients"]),
"max_connections": room["max_connections"],
"created_at": room["created_at"],
"created_by": room.get("created_by", "Unknown"),
"active_streams": room_stats_data.get("active_streams", 0),
"clients": room["clients"] # Уже список
})
server_host = get_server_host()
return templates.TemplateResponse("dashboard.html", {
"request": request,
"username": admin_sessions[session_id]["username"],
"stats": stats,
"rooms": room_list,
"total_rooms": len(rooms),
"total_clients": len(clients),
"server_host": server_host,
"server_port": SERVER_CONFIG["port"]
})
@app.get("/room/{room_id}", response_class=HTMLResponse)
async def room_page(request: Request, room_id: str):
"""Страница управления комнатой"""
session_id = request.cookies.get("session_id")
if not session_id or not AdminAuth.verify_session(session_id):
return RedirectResponse(url="/", status_code=303)
if room_id not in rooms:
return RedirectResponse(url="/dashboard", status_code=303)
room = rooms[room_id]
room_stats_data = room_stats.get(room_id, {})
clients_list = []
for client_id in room["clients"]:
if client_id in clients:
client = clients[client_id]
clients_list.append({
"id": client_id,
"ip_address": client["ip_address"],
"connected_at": client["connected_at"],
"is_streaming": client["is_streaming"],
"video_settings": client["video_settings"]
})
server_host = get_server_host()
return templates.TemplateResponse("room.html", {
"request": request,
"room": room,
"room_stats": room_stats_data,
"clients": clients_list,
"username": admin_sessions[session_id]["username"],
"server_host": server_host,
"server_port": SERVER_CONFIG["port"]
})
@app.get("/stream/{client_id}", response_class=HTMLResponse)
async def stream_page(request: Request, client_id: str):
"""Страница просмотра потока клиента"""
session_id = request.cookies.get("session_id")
if not session_id or not AdminAuth.verify_session(session_id):
return RedirectResponse(url="/", status_code=303)
if client_id not in clients:
return RedirectResponse(url="/dashboard", status_code=303)
client = clients[client_id]
room = rooms[client["room_id"]] if client["room_id"] in rooms else None
server_host = get_server_host()
return templates.TemplateResponse("stream.html", {
"request": request,
"client": client,
"room": room,
"session_id": session_id,
"username": admin_sessions[session_id]["username"],
"server_host": server_host,
"server_port": SERVER_CONFIG["port"]
})
@app.get("/create-room", response_class=HTMLResponse)
async def create_room_page(request: Request):
"""Страница создания комнаты"""
session_id = request.cookies.get("session_id")
if not session_id or not AdminAuth.verify_session(session_id):
return RedirectResponse(url="/", status_code=303)
server_host = get_server_host()
return templates.TemplateResponse("create_room.html", {
"request": request,
"username": admin_sessions[session_id]["username"],
"server_host": server_host,
"server_port": SERVER_CONFIG["port"]
})
@app.post("/api/create-room")
async def create_room_api(request: Request, name: str = Form(...), password: str = Form(...), max_connections: int = Form(...)):
"""API для создания комнаты через веб-интерфейс"""
session_id = request.cookies.get("session_id")
if not session_id or not AdminAuth.verify_session(session_id):
return JSONResponse({"success": False, "error": "Unauthorized"})
created_by = admin_sessions[session_id]["username"] if session_id in admin_sessions else "Unknown"
try:
room = RoomManager.create_room(name, password, max_connections, created_by)
# Используем safe_json_serializer для преобразования данных
room_serializable = {
"id": room["id"],
"name": room["name"],
"password": room["password"],
"max_connections": room["max_connections"],
"created_at": room["created_at"],
"created_by": room["created_by"],
"is_active": room["is_active"],
"clients": room["clients"] # Уже список
}
return JSONResponse({"success": True, "room": room_serializable})
except Exception as e:
print(f"[API] Error creating room: {e}")
return JSONResponse({"success": False, "error": str(e)})
@app.post("/api/delete-room/{room_id}")
async def delete_room_api(request: Request, room_id: str):
"""API для удаления комнаты"""
session_id = request.cookies.get("session_id")
if not session_id or not AdminAuth.verify_session(session_id):
return JSONResponse({"success": False, "error": "Unauthorized"})
success = RoomManager.delete_room(room_id)
return JSONResponse({"success": success})
@app.post("/api/disconnect-client/{client_id}")
async def disconnect_client_api(request: Request, client_id: str):
"""API для отключения клиента"""
session_id = request.cookies.get("session_id")
if not session_id or not AdminAuth.verify_session(session_id):
return JSONResponse({"success": False, "error": "Unauthorized"})
success = ClientManager.disconnect_client(client_id)
return JSONResponse({"success": success})
@app.get("/logout")
async def logout(request: Request):
"""Выход из системы"""
session_id = request.cookies.get("session_id")
if session_id and session_id in admin_sessions:
with stats_lock:
del admin_sessions[session_id]
response = RedirectResponse(url="/", status_code=303)
response.delete_cookie(key="session_id")
return response
# ========== API РОУТЫ ==========
@app.post("/api/auth/login")
async def login_api(auth_data: dict):
"""Вход администратора (API)"""
username = auth_data.get("username")
password = auth_data.get("password")
for admin in ADMINS:
if username == admin[0] and password == admin[1]:
session_id = hashlib.sha256(
f"{username}{time.time()}".encode()
).hexdigest()[:32]
with stats_lock:
admin_sessions[session_id] = {
"username": username,
"login_time": datetime.now().isoformat(),
"last_activity": datetime.now().isoformat(),
"is_authenticated": True
}
return {"success": True, "session_id": session_id, "username": username}
return {"success": False, "error": "Invalid credentials"}
@app.get("/api/auth/verify")
async def verify_session_api(session_id: str):
"""Проверка сессии (API)"""
if AdminAuth.verify_session(session_id):
return {"success": True, "username": admin_sessions[session_id]["username"]}
return {"success": False}
@app.post("/api/rooms/create")
async def create_room_api_endpoint(room_data: RoomCreate, session_id: str):
"""Создание новой комнаты (API)"""
if not AdminAuth.verify_session(session_id):
raise HTTPException(status_code=401, detail="Unauthorized")
created_by = admin_sessions[session_id]["username"] if session_id in admin_sessions else None
room = RoomManager.create_room(
name=room_data.name,
password=room_data.password,
max_connections=room_data.max_connections,
created_by=created_by
)
# Возвращаем сериализуемые данные
room_response = {
"id": room["id"],
"name": room["name"],
"password": room["password"],
"max_connections": room["max_connections"],
"created_at": room["created_at"],
"created_by": room["created_by"],
"is_active": room["is_active"],
"clients": room["clients"]
}
return {"success": True, "room": room_response}
@app.get("/api/rooms")
async def get_rooms_api(session_id: str):
"""Получение списка комнат (API)"""
if not AdminAuth.verify_session(session_id):
raise HTTPException(status_code=401, detail="Unauthorized")
with stats_lock:
room_list = []
for room_id, room in rooms.items():
stats = room_stats.get(room_id, {})
room_list.append({
"id": room["id"],
"name": room["name"],
"created_at": room["created_at"],
"created_by": room.get("created_by"),
"max_connections": room["max_connections"],
"clients_count": len(room["clients"]),
"is_active": room["is_active"],
"stats": stats,
"clients": room["clients"] # Уже список
})
return {"success": True, "rooms": room_list}
@app.delete("/api/rooms/{room_id}")
async def delete_room_api_endpoint(room_id: str, session_id: str):
"""Удаление комнаты (API)"""
if not AdminAuth.verify_session(session_id):
raise HTTPException(status_code=401, detail="Unauthorized")
success = RoomManager.delete_room(room_id)
return {"success": success}
@app.get("/api/rooms/{room_id}/clients")
async def get_room_clients_api(room_id: str, session_id: str):
"""Получение клиентов комнаты (API)"""
if not AdminAuth.verify_session(session_id):
raise HTTPException(status_code=401, detail="Unauthorized")
if room_id not in rooms:
return {"success": False, "error": "Room not found"}
with stats_lock:
clients_list = []
for client_id in rooms[room_id]["clients"]:
if client_id in clients:
client = clients[client_id]
clients_list.append({
"id": client_id,
"ip_address": client["ip_address"],
"connected_at": client["connected_at"],
"is_streaming": client["is_streaming"],
"video_settings": client["video_settings"]
})
return {"success": True, "clients": clients_list}
@app.get("/api/stats")
async def get_server_stats_api(session_id: str):
"""Получение статистики сервера (API)"""
if not AdminAuth.verify_session(session_id):
raise HTTPException(status_code=401, detail="Unauthorized")
cpu_usage = psutil.cpu_percent()
memory_usage = psutil.virtual_memory().percent
with stats_lock:
server_stats["cpu_usage"] = cpu_usage
server_stats["memory_usage"] = memory_usage
server_stats["total_rooms"] = len(rooms)
server_stats["total_clients"] = len(clients)
active_streams = sum(1 for client in clients.values() if client.get("is_streaming", False))
server_stats["total_streams"] = active_streams
# Расчет времени работы
start_time = datetime.fromisoformat(server_stats["start_time"])
uptime = (datetime.now() - start_time).total_seconds()
server_stats["uptime"] = int(uptime)
stats_copy = server_stats.copy()
return {
"success": True,
"stats": stats_copy,
"system": {
"cpu_count": cpu_count(),
"memory_total": psutil.virtual_memory().total,
"memory_available": psutil.virtual_memory().available,
}
}
# ========== WEBSOCKET РОУТЫ ==========
@app.websocket("/ws/client/{room_id}/{password}")
async def client_websocket_endpoint(websocket: WebSocket, room_id: str, password: str):
"""WebSocket для клиентов (трансляция видео)"""
await websocket.accept()
client_id = None
video_processor = None
try:
client_ip = websocket.client.host if websocket.client else "unknown"
client_id = ClientManager.add_client(room_id, password, client_ip)
if not client_id:
await websocket.send_text(json.dumps({"error": "Invalid room or password"}))
await websocket.close()
return
client_websockets[client_id] = websocket
video_processor = VideoProcessor(client_id)
video_processor.start()
with stats_lock:
if client_id in clients:
clients[client_id]["is_streaming"] = True
await websocket.send_text(json.dumps({
"success": True,
"client_id": client_id,
"room_id": room_id
}))
print(f"[WebSocket] Client connected: {client_id} to room {room_id}")
while True:
try:
data = await websocket.receive()
if data["type"] == "websocket.receive":
message = data.get("text") or data.get("bytes")
if isinstance(message, bytes):
if client_id in video_queues:
if not video_queues[client_id].full():
video_queues[client_id].put(message)
elif isinstance(message, str):
try:
command = json.loads(message)
if "type" in command:
with stats_lock:
if client_id in clients:
clients[client_id]["commands"].append(command)
if client_id in command_queues:
command_queues[client_id].put(command)
except:
pass
elif data["type"] == "websocket.disconnect":
print(f"[WebSocket] Client disconnected: {client_id}")
break
except WebSocketDisconnect:
print(f"[WebSocket] Client WebSocket disconnected: {client_id}")
break
except Exception as e:
print(f"[WebSocket] Error in client WebSocket {client_id}: {e}")
break
except Exception as e:
print(f"[WebSocket] Client connection error: {e}")
finally:
if client_id:
with stats_lock:
if client_id in clients:
clients[client_id]["is_streaming"] = False
ClientManager.disconnect_client(client_id)
if video_processor:
video_processor.stop()
@app.websocket("/ws/admin/{session_id}")
async def admin_websocket_endpoint(websocket: WebSocket, session_id: str):
"""WebSocket для администраторов (просмотр и управление)"""
if not AdminAuth.verify_session(session_id):
await websocket.close(code=1008)
return
await websocket.accept()
admin_websockets[session_id] = websocket
try:
await websocket.send_text(json.dumps({
"type": "connected",
"message": "Admin WebSocket connected"
}))
print(f"[WebSocket] Admin connected: {session_id}")
while True:
try:
data = await websocket.receive()
if data["type"] == "websocket.receive":
message = data.get("text")
if message:
try:
command = json.loads(message)
cmd_type = command.get("type")
if cmd_type == "watch_client":
client_id = command.get("client_id")
await _stream_client_to_admin(client_id, session_id)
elif cmd_type == "control_client":
client_id = command.get("client_id")
control_cmd = command.get("command")
if client_id in clients and client_id in command_queues:
command_queues[client_id].put(control_cmd)
await websocket.send_text(json.dumps({
"type": "control_response",
"success": True
}))
elif cmd_type == "get_stats":
with stats_lock:
stats = server_stats.copy()
await websocket.send_text(json.dumps({
"type": "stats_update",
"stats": stats
}))
except json.JSONDecodeError:
pass
elif data["type"] == "websocket.disconnect":
print(f"[WebSocket] Admin disconnected: {session_id}")
break
except WebSocketDisconnect:
print(f"[WebSocket] Admin WebSocket disconnected: {session_id}")
break
except Exception as e:
print(f"[WebSocket] Error in admin WebSocket {session_id}: {e}")
break
except Exception as e:
print(f"[WebSocket] Admin connection error: {e}")
finally:
if session_id in admin_websockets:
del admin_websockets[session_id]
async def _stream_client_to_admin(client_id: str, admin_session_id: str):
"""Потоковая передача видео от клиента к администратору"""
if client_id not in clients or admin_session_id not in admin_websockets:
return
try:
await admin_websockets[admin_session_id].send_text(json.dumps({
"type": "stream_started",
"client_id": client_id
}))
with stats_lock:
if client_id in clients:
client = clients[client_id]
await admin_websockets[admin_session_id].send_text(json.dumps({
"type": "stream_info",
"message": f"Streaming from client {client_id}",
"quality": client["video_settings"]["quality"],
"ip": client["ip_address"],
"connected_at": client["connected_at"]
}))
except Exception as e:
print(f"[WebSocket] Error streaming to admin: {e}")
# ========== КЛИЕНТСКИЙ API ==========
@app.post("/api/client/connect")
async def client_connect_api(connection_data: dict):
"""Подключение клиента к комнате (API)"""
room_id = connection_data.get("room_id")
password = connection_data.get("password")
client_ip = connection_data.get("ip", "unknown")
client_id = ClientManager.add_client(room_id, password, client_ip)
if client_id:
server_host = get_server_host()
return {
"success": True,
"client_id": client_id,
"room_id": room_id,
"ws_url": f"ws://{server_host}:{SERVER_CONFIG['port']}/ws/client/{room_id}/{password}"
}
return {"success": False, "error": "Connection failed"}
# ========== СОЗДАНИЕ HTML ШАБЛОНОВ ==========
def create_html_templates():
"""Создание HTML шаблонов для веб-интерфейса"""
# Шаблон login.html
login_html = """
Video Streaming Server - Login
🎥 Video Streaming Server
Admin Panel
{% if error %}
{{ error }}
{% endif %}
Demo Accounts:
- admin / admin123
- administrator / securepass
- supervisor / superpass
"""
# Шаблон dashboard.html
dashboard_html = """
Dashboard - Video Streaming Server
Total Rooms
{{ stats.total_rooms }}
Active: {{ total_rooms }}
Connected Clients
{{ stats.total_clients }}
Streaming: {{ stats.total_streams }}
CPU Usage
{{ stats.cpu_usage }}%
Cores: {{ stats.system.cpu_count if stats.system else 'N/A' }}
Memory Usage
{{ stats.memory_usage }}%
{% if stats.system %}
{{ ((stats.system.memory_total - stats.system.memory_available) / 1024 / 1024 / 1024)|round(1) }} GB used
{% else %}
0 GB used
{% endif %}
{% if rooms %}
| ID |
Name |
Clients |
Max Connections |
Created By |
Status |
Actions |
{% for room in rooms %}
{{ room.id }} |
{{ room.name }} |
{{ room.clients_count }}
{% if room.active_streams > 0 %}
({{ room.active_streams }} streaming)
{% endif %}
|
{{ room.max_connections }} |
{{ room.created_by }} |
Active
|
|
{% endfor %}
{% else %}
No rooms created yet. Create your first room!
{% endif %}
System Information
Server Uptime
Loading...
Since {{ stats.start_time }}
Server Address
{{ server_host }}:{{ server_port }}
WebSocket: ws://{{ server_host }}:{{ server_port }}
API Status
Online
All systems operational
"""
# Шаблон room.html
room_html = """
Room: {{ room.name }} - Video Streaming Server
Room Information
{{ room.password }}
{{ room.max_connections }}
{{ room.created_at }}
{{ room.created_by }}
{{ room_stats.total_clients }} / {{ room.max_connections }}
{{ room_stats.active_streams }}
{{ (room_stats.bytes_transferred / 1024 / 1024)|round(2) }} MB
Client Connection Information
Clients can connect to this room using:
WebSocket URL: ws://{{ server_host }}:{{ server_port }}/ws/client/{{ room.id }}/{{ room.password }}
Room ID: {{ room.id }}
Password: {{ room.password }}
{% if clients %}
| Client ID |
IP Address |
Connected At |
Status |
Video Settings |
Actions |
{% for client in clients %}
{{ client.id[:8] }}... |
{{ client.ip_address }} |
{{ client.connected_at }} |
{% if client.is_streaming %}
Streaming
{% else %}
Idle
{% endif %}
|
{{ client.video_settings.quality }}% Quality
{{ client.video_settings.frame_rate }} FPS
|
|
{% endfor %}
{% else %}
No clients connected to this room yet.
Share the connection information above with clients.
{% endif %}
"""
# Шаблон create_room.html
create_room_html = """
Create Room - Video Streaming Server
"""
# Шаблон stream.html
stream_html = """
Stream: {{ client.id }} - Video Streaming Server
Waiting for Video Stream
Video stream will appear here once the client starts streaming.
Use the controls on the right to adjust video settings.
Resolution: 640x480
Frame Rate: 30 FPS
Quality: 85%
{{ client.id }}
{{ room.name if room else 'Unknown' }}
{{ client.connected_at }}
{{ "Yes" if client.is_streaming else "No" }}
Video Controls
Stream Information
This client is connected via WebSocket and streaming video data.
Use the controls above to adjust the video stream in real-time.
Changes are applied to the video processor on the server side.
Connecting...
"""
# Сохраняем шаблоны в файлы
templates_dir = "templates"
os.makedirs(templates_dir, exist_ok=True)
with open(os.path.join(templates_dir, "login.html"), "w", encoding="utf-8") as f:
f.write(login_html)
with open(os.path.join(templates_dir, "dashboard.html"), "w", encoding="utf-8") as f:
f.write(dashboard_html)
with open(os.path.join(templates_dir, "room.html"), "w", encoding="utf-8") as f:
f.write(room_html)
with open(os.path.join(templates_dir, "create_room.html"), "w", encoding="utf-8") as f:
f.write(create_room_html)
with open(os.path.join(templates_dir, "stream.html"), "w", encoding="utf-8") as f:
f.write(stream_html)
print(f"[Templates] Created HTML templates in '{templates_dir}' directory")
# ========== ЗАПУСК СЕРВЕРА ==========
def main():
"""Основная функция для запуска сервера"""
# Создаем HTML шаблоны
create_html_templates()
server_host = get_server_host()
print("=" * 60)
print("🎥 Video Streaming Server with Web Interface")
print("=" * 60)
print(f"🌐 Web Interface: http://{server_host}:{SERVER_CONFIG['port']}")
print(f"🔌 WebSocket: ws://{server_host}:{SERVER_CONFIG['port']}")
print(f"👤 Admin Login: http://{server_host}:{SERVER_CONFIG['port']}/")
print("=" * 60)
print("Default Admin Accounts:")
print(" • admin / admin123")
print(" • administrator / securepass")
print(" • supervisor / superpass")
print("=" * 60)
print("Press Ctrl+C to stop the server")
try:
uvicorn.run(
"server:app",
host=SERVER_CONFIG["host"],
port=SERVER_CONFIG["port"],
reload=False,
log_level="info",
ws_ping_interval=SERVER_CONFIG["websocket_ping_interval"],
ws_ping_timeout=SERVER_CONFIG["websocket_ping_timeout"]
)
except KeyboardInterrupt:
print("\nServer stopped by user")
except Exception as e:
print(f"Error starting server: {e}")
if __name__ == "__main__":
freeze_support()
main()