#!/usr/bin/env python3 """ Скрипт для подключения админа и просмотра видео от клиента Требует, чтобы есть активное подключение клиента, отправляющего кадры """ import asyncio import websockets import json import requests from dotenv import load_dotenv import os # Загружаем переменные окружения из .env load_dotenv() # Параметры подключения HOST = os.getenv("PUBLIC_HOST", "192.168.0.112") PORT = int(os.getenv("PORT", 8000)) # Учетные данные админа (из server.py) ADMIN_USERNAME = "admin" ADMIN_PASSWORD = "admin123" async def admin_watch(): """Админ логинится и смотрит видео от клиента""" # 1. Логинимся как админ print(f"[Admin] Logging in as {ADMIN_USERNAME}...") session = requests.Session() try: login_response = session.post( f"http://{HOST}:{PORT}/login", data={ "username": ADMIN_USERNAME, "password": ADMIN_PASSWORD } ) if login_response.status_code != 303 and login_response.status_code != 200: print(f"[Admin] ❌ Failed to login: {login_response.status_code}") return # Получаем session_id из cookies session_id = session.cookies.get("session_id") if not session_id: print(f"[Admin] ❌ No session_id in cookies") print(f"[Admin] Cookies: {session.cookies}") return print(f"[Admin] ✓ Logged in, session_id: {session_id[:16]}...") except Exception as e: print(f"[Admin] ❌ Login error: {e}") return # 2. Получаем список активных комнат и клиентов print(f"[Admin] Getting active clients...") try: dashboard_response = session.get(f"http://{HOST}:{PORT}/dashboard") if dashboard_response.status_code != 200: print(f"[Admin] ❌ Failed to get dashboard: {dashboard_response.status_code}") return # Парсим HTML чтобы найти client_id (это сложновато, используем API) # Для простоты - просто выведем сообщение о том, как найти client_id print(f"[Admin] 💡 Open http://{HOST}:{PORT}/dashboard in browser to see active clients") print(f"[Admin] 💡 Look for client_id in the page source or network tab") except Exception as e: print(f"[Admin] ❌ Error: {e}") return # 3. Для тестирования подключимся к вебсокету админа uri = f"ws://{HOST}:{PORT}/ws/admin/{session_id}" print(f"[Admin] Connecting to WebSocket: {uri}") try: async with websockets.connect(uri) as ws: # Получаем ответ сервера response = await asyncio.wait_for(ws.recv(), timeout=5) print(f"[Admin] ✓ Server response: {response}") # Пример: отправляем команду "watch_client" # Вам нужно знать client_id активного клиента print(f"[Admin] 💡 To watch a client, send: {{'type': 'watch_client', 'client_id': 'CLIENT_ID_HERE'}}") print(f"[Admin] 💡 After that, admin will receive binary video frames") # Ждём сообщений от сервера print(f"[Admin] Waiting for messages from server (press Ctrl+C to stop)...") try: while True: try: message = await asyncio.wait_for(ws.recv(), timeout=5) if isinstance(message, bytes): print(f"[Admin] ✓ Received {len(message)} bytes of video frame data") else: print(f"[Admin] ✓ Message: {message[:100]}") except asyncio.TimeoutError: # Таймаут OK - просто ждём дальше continue except KeyboardInterrupt: print(f"\n[Admin] Disconnected") except Exception as e: print(f"[Admin] ❌ WebSocket error: {e}") if __name__ == "__main__": print("=" * 60) print("Admin WebSocket Test") print("=" * 60) print(f"Server: http://{HOST}:{PORT}") print() try: asyncio.run(admin_watch()) except KeyboardInterrupt: print("\n[Admin] Interrupted by user")