diff --git a/server.py b/server.py index 164f760..1d4e0fc 100644 --- a/server.py +++ b/server.py @@ -1,7 +1,7 @@ """ KazicCAM - Серверная часть с веб-интерфейсом FastAPI + OpenCV + WebSocket + Multiprocessing -Версия: 2.1.0, 25 пересборка +Версия: 3.1.1, 35 пересборка Для Windows """ @@ -40,7 +40,10 @@ load_dotenv() SERVER_CONFIG = { # считываем переменную окружения HOST (или legacy 'host'), чтобы можно было управлять через .env / docker-compose "host": os.getenv("HOST", os.getenv("host", "0.0.0.0")), # сюда IP смотрящий наружу или 0.0.0.0 для всех интерфейсов - "port": 8000, + "port": int(os.getenv("PORT", 8000)), + "ssl_certfile": os.getenv("SSL_CERTFILE", "/etc/videostream/ssl/cert.pem"), + "ssl_keyfile": os.getenv("SSL_KEYFILE", "/etc/videostream/ssl/key.pem"), + "ssl_enabled": os.getenv("SSL_ENABLED", "false").lower() == "true", "debug": False, "max_clients_per_room": 50, "max_rooms": 100, @@ -77,7 +80,7 @@ server_stats = None stats_lock = None cleanup_task = None templates = None -admin_frame_queues = None +processed_video_queues = {} # Администраторы ADMINS = [ @@ -348,81 +351,74 @@ class VideoProcessor: @staticmethod def _process_video_stream(client_id: str): - """Основной цикл обработки видео""" - print(f"\n[VideoProcessor Process] ===== PROCESS START =====") - print(f"[VideoProcessor Process] Client ID: {client_id}, PID: {os.getpid()}") + """Основной цикл обработки видео с пересылкой в общую очередь""" + print(f"[VideoProcessor] Process started for client {client_id}") video_queue = None command_queue = None # Ждем инициализации очередей - for attempt in range(100): + for _ in range(100): try: if client_id in video_queues and client_id in command_queues: video_queue = video_queues[client_id] command_queue = command_queues[client_id] - print(f"[VideoProcessor Process] ✓ Queues found on attempt {attempt+1}") break except: pass time.sleep(0.1) if not video_queue or not command_queue: - print(f"[VideoProcessor Process] ❌ CRITICAL: Queues not found after 10s for client {client_id}") + print(f"[VideoProcessor] Queues not found for client {client_id}") return + # Создаем очередь для обработанных кадров (для администраторов) + processed_queue = Queue(maxsize=10) + + # Добавляем эту очередь в глобальный словарь для доступа администраторов + processed_video_queues[client_id] = processed_queue + frame_count = 0 last_log_time = time.time() - last_frame_time = time.time() - - print(f"[VideoProcessor Process] ===== WAITING FOR FRAMES =====\n") while True: try: if not video_queue.empty(): frame_data = video_queue.get(timeout=1) frame_count += 1 - last_frame_time = time.time() - try: - nparr = np.frombuffer(frame_data, np.uint8) - frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR) + nparr = np.frombuffer(frame_data, np.uint8) + frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR) + + if frame is not None: + # Применяем команды из очереди + while not command_queue.empty(): + try: + command = command_queue.get_nowait() + frame = VideoProcessor._apply_command(frame, command) + except: + break - if frame is not None: - # Применяем команды из очереди - commands_applied = 0 - while not command_queue.empty(): - try: - command = command_queue.get_nowait() - frame = VideoProcessor._apply_command(frame, command) - commands_applied += 1 - except: - break - - if commands_applied > 0: - print(f"[VideoProcessor Process] {client_id}: applied {commands_applied} commands to frame #{frame_count}") - else: - print(f"[VideoProcessor Process] ⚠️ {client_id}: frame decode FAILED at frame #{frame_count}") - except Exception as e: - print(f"[VideoProcessor Process] ❌ {client_id}: Error decoding frame #{frame_count}: {e}") - - # Логируем прогресс каждые 5 секунд - current_time = time.time() - if current_time - last_log_time > 5: - time_since_frame = current_time - last_frame_time - if frame_count == 0: - print(f"[VideoProcessor Process] ⚠️ {client_id}: NO FRAMES YET (waiting for {time_since_frame:.1f}s)") - else: - print(f"[VideoProcessor Process] {client_id}: Processed {frame_count} frames (last {time_since_frame:.1f}s ago)") - last_log_time = current_time + # Кодируем обратно в JPEG + encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 85] + success, encoded_frame = cv2.imencode('.jpg', frame, encode_param) + + if success: + # Отправляем в очередь для администраторов + if not processed_queue.full(): + processed_queue.put(encoded_frame.tobytes()) + + # Логируем каждые 5 секунд + current_time = time.time() + if current_time - last_log_time > 5: + print(f"[VideoProcessor] Client {client_id}: Processed {frame_count} frames") + last_log_time = current_time time.sleep(0.001) except Exception as e: - print(f"[VideoProcessor Process] ❌ {client_id}: Error in main loop: {type(e).__name__}: {e}") + print(f"[VideoProcessor] Error: {e}") time.sleep(0.1) - - print(f"[VideoProcessor Process] ===== PROCESS END =====\n") @staticmethod def _apply_command(frame: np.ndarray, command: Dict) -> np.ndarray: @@ -560,7 +556,7 @@ async def cleanup_inactive_sessions(): async def lifespan(app: FastAPI): """Управление жизненным циклом приложения""" global rooms, clients, admin_sessions, client_websockets, admin_websockets - global video_queues, command_queues, room_stats, server_stats, stats_lock, cleanup_task, templates, admin_frame_queues + global video_queues, command_queues, room_stats, server_stats, stats_lock, cleanup_task, templates import threading stats_lock = threading.Lock() @@ -572,7 +568,6 @@ async def lifespan(app: FastAPI): admin_websockets = {} video_queues = {} command_queues = {} - admin_frame_queues = {} room_stats = {} server_stats = { "total_rooms": 0, @@ -747,7 +742,8 @@ async def room_page(request: Request, room_id: str): "clients": clients_list, "username": admin_sessions[session_id]["username"], "server_host": server_host, - "server_port": SERVER_CONFIG["port"] + "server_port": SERVER_CONFIG["port"], + "ssl_enabled": SERVER_CONFIG.get("ssl_enabled", False) }) @app.get("/stream/{client_id}", response_class=HTMLResponse) @@ -1079,17 +1075,6 @@ async def client_websocket_endpoint(websocket: WebSocket, room_id: str, password print(f"[WebSocket Client] ⚠️ {client_id}: video queue is FULL, dropping frame") else: print(f"[WebSocket Client] ❌ {client_id}: video_queue not found!") - # Также пробуем переслать тот же байтовый кадр администраторам, если кто-то смотрит - try: - if admin_frame_queues is not None and client_id in admin_frame_queues: - aq = admin_frame_queues[client_id] - try: - aq.put_nowait(message) - except asyncio.QueueFull: - # Если очередь админа переполнена, пропускаем кадр - pass - except Exception: - pass elif isinstance(message, str): try: @@ -1131,10 +1116,10 @@ async def client_websocket_endpoint(websocket: WebSocket, room_id: str, password if video_processor: video_processor.stop() - # Очищаем очередь админа, чтобы фреймы перестали отправляться + # Очищаем очередь для администраторов try: - if admin_frame_queues is not None and client_id in admin_frame_queues: - del admin_frame_queues[client_id] + if client_id in processed_video_queues: + del processed_video_queues[client_id] except Exception: pass print(f"[WebSocket Client] ✓ VideoProcessor stopped for {client_id}") @@ -1143,21 +1128,21 @@ async def client_websocket_endpoint(websocket: WebSocket, room_id: str, password @app.websocket("/ws/admin/{session_id}") async def admin_websocket_endpoint(websocket: WebSocket, session_id: str): """WebSocket для администраторов (просмотр и управление)""" - global admin_sessions - if not AdminAuth.verify_session(session_id): - print(f"[WebSocket] Admin connection REJECTED: session_id={session_id} not found in admin_sessions") - print(f"[WebSocket] Available sessions: {list(admin_sessions.keys()) if admin_sessions else 'None'}") await websocket.close(code=1008) return await websocket.accept() admin_websockets[session_id] = websocket + # Словарь для хранения активных стримов + active_streams = {} + try: await websocket.send_text(json.dumps({ "type": "connected", - "message": "Admin WebSocket connected" + "message": "Admin WebSocket connected", + "session_id": session_id })) print(f"[WebSocket] Admin connected: {session_id}") @@ -1176,7 +1161,33 @@ async def admin_websocket_endpoint(websocket: WebSocket, session_id: str): if cmd_type == "watch_client": client_id = command.get("client_id") - await _stream_client_to_admin(client_id, session_id) + + # Останавливаем предыдущий стрим если есть + if session_id in active_streams: + active_streams[session_id].cancel() + + # Запускаем новый стрим + task = asyncio.create_task( + _stream_client_to_admin(client_id, session_id) + ) + active_streams[session_id] = task + + await websocket.send_text(json.dumps({ + "type": "watching_started", + "client_id": client_id, + "message": f"Started watching client {client_id}" + })) + + elif cmd_type == "stop_watching": + # Останавливаем просмотр + if session_id in active_streams: + active_streams[session_id].cancel() + del active_streams[session_id] + + await websocket.send_text(json.dumps({ + "type": "watching_stopped", + "message": "Stopped watching" + })) elif cmd_type == "control_client": client_id = command.get("client_id") @@ -1186,7 +1197,8 @@ async def admin_websocket_endpoint(websocket: WebSocket, session_id: str): command_queues[client_id].put(control_cmd) await websocket.send_text(json.dumps({ "type": "control_response", - "success": True + "success": True, + "command": control_cmd })) elif cmd_type == "get_stats": @@ -1214,67 +1226,99 @@ async def admin_websocket_endpoint(websocket: WebSocket, session_id: str): except Exception as e: print(f"[WebSocket] Admin connection error: {e}") finally: + # Останавливаем все активные стримы + if session_id in active_streams: + active_streams[session_id].cancel() + if session_id in admin_websockets: del admin_websockets[session_id] async def _stream_client_to_admin(client_id: str, admin_session_id: str): """Потоковая передача видео от клиента к администратору""" - global admin_frame_queues, admin_websockets, clients, stats_lock - if client_id not in clients or admin_session_id not in admin_websockets: return - + try: - ws = admin_websockets[admin_session_id] - # Уведомляем админа о старте стрима - await ws.send_text(json.dumps({"type": "stream_started", "client_id": client_id})) - + admin_ws = admin_websockets[admin_session_id] + + await admin_ws.send_text(json.dumps({ + "type": "stream_started", + "client_id": client_id, + "timestamp": datetime.now().isoformat() + })) + + # Получаем информацию о клиенте with stats_lock: if client_id in clients: client = clients[client_id] - await ws.send_text(json.dumps({ + await admin_ws.send_text(json.dumps({ "type": "stream_info", "message": f"Streaming from client {client_id}", - "quality": client.get("video_settings", {}).get("quality"), - "ip": client.get("ip_address"), - "connected_at": client.get("connected_at") + "client_id": client_id, + "quality": client["video_settings"]["quality"], + "frame_rate": client["video_settings"]["frame_rate"], + "resolution": client["video_settings"]["resolution"], + "ip": client["ip_address"], + "connected_at": client["connected_at"], + "status": "streaming" })) - - # Подготовим очередь для пересылки байтов администраторам - if admin_frame_queues is None: + + print(f"[WebSocket] Started streaming from client {client_id} to admin {admin_session_id}") + + # Ждем пока появится очередь обработанных кадров + start_time = time.time() + while client_id not in processed_video_queues and time.time() - start_time < 10: + await asyncio.sleep(0.1) + + if client_id not in processed_video_queues: + await admin_ws.send_text(json.dumps({ + "type": "stream_error", + "message": "No video stream available from client" + })) return - - if client_id not in admin_frame_queues: - # Создаём очередь для этого клиента, если её нет - admin_frame_queues[client_id] = asyncio.Queue(maxsize=128) - - aq = admin_frame_queues[client_id] - print(f"[WebSocket] Started streaming frames to admin {admin_session_id} for client {client_id}") - - # Цикл чтения кадров из очереди и отправки их админу - # Выходим только если очередь удалена (клиент отключился) или произойдёт исключение - try: - while client_id in admin_frame_queues: - try: - # Пытаемся получить кадр с таймаутом, чтобы периодически проверять наличие очереди - frame_bytes = await asyncio.wait_for(aq.get(), timeout=2.0) - try: - await ws.send_bytes(frame_bytes) - except Exception as e: - print(f"[WebSocket] Error sending bytes to admin {admin_session_id}: {e}") - break - except asyncio.TimeoutError: - # Таймаут OK — просто продолжаем ждать - continue - except asyncio.CancelledError: - break - except Exception as e: - print(f"[WebSocket] Error in admin frame loop: {e}") - finally: - print(f"[WebSocket] Stopped streaming frames to admin {admin_session_id} for client {client_id}") - + + processed_queue = processed_video_queues[client_id] + frame_count = 0 + last_update_time = time.time() + + # Основной цикл пересылки видео + while (client_id in clients and + admin_session_id in admin_websockets and + clients[client_id].get("is_streaming", False)): + + try: + # Получаем обработанный кадр + if not processed_queue.empty(): + frame_data = processed_queue.get_nowait() + frame_count += 1 + + # Отправляем кадр администратору как бинарные данные + await admin_ws.send_bytes(frame_data) + + # Отправляем статистику каждую секунду + current_time = time.time() + if current_time - last_update_time > 1: + await admin_ws.send_text(json.dumps({ + "type": "stream_stats", + "frames_sent": frame_count, + "timestamp": datetime.now().isoformat() + })) + last_update_time = current_time + + # Небольшая задержка для CPU + await asyncio.sleep(0.01) + + except asyncio.CancelledError: + # Стрим был отменен администратором + break + except Exception as e: + print(f"[WebSocket] Error streaming to admin: {e}") + break + + print(f"[WebSocket] Stopped streaming from client {client_id} to admin {admin_session_id}") + except Exception as e: - print(f"[WebSocket] Error streaming to admin: {e}") + print(f"[WebSocket] Error in stream to admin: {e}") # ========== КЛИЕНТСКИЙ API ========== @app.post("/api/client/connect") @@ -1288,11 +1332,12 @@ async def client_connect_api(connection_data: dict): if client_id: server_host = get_server_host() + ws_protocol = "wss" if SERVER_CONFIG.get("ssl_enabled", False) else "ws" return { "success": True, "client_id": client_id, "room_id": room_id, - "ws_url": f"ws://{server_host}:{SERVER_CONFIG['port']}/ws/client/{room_id}/{password}" + "ws_url": f"{ws_protocol}://{server_host}:{SERVER_CONFIG['port']}/ws/client/{room_id}/{password}" } return {"success": False, "error": "Connection failed"} @@ -2497,7 +2542,7 @@ def create_html_templates():
Clients can connect to this room using:
-WebSocket URL: ws://{{ server_host }}:{{ server_port }}/ws/client/{{ room.id }}/{{ room.password }}
WebSocket URL: {{ 'wss' if ssl_enabled else 'ws' }}://{{ server_host }}:{{ server_port }}/ws/client/{{ room.id }}/{{ room.password }}
Room ID: {{ room.id }}
Password: {{ room.password }}
Clients can connect to this room using:
-WebSocket URL: ws://{{ server_host }}:{{ server_port }}/ws/client/{{ room.id }}/{{ room.password }}
WebSocket URL: {{ 'wss' if ssl_enabled else 'ws' }}://{{ server_host }}:{{ server_port }}/ws/client/{{ room.id }}/{{ room.password }}
Room ID: {{ room.id }}
Password: {{ room.password }}