VIDEO TRANSFER UNAVAILABLE
This commit is contained in:
2025-12-03 20:49:03 +09:00
parent 5023bfbca2
commit cc1ff597cc
3 changed files with 186 additions and 34 deletions

121
server.py
View File

@@ -220,21 +220,25 @@ class ClientManager:
"""Добавление нового клиента""" """Добавление нового клиента"""
global rooms, clients, room_stats, server_stats, stats_lock global rooms, clients, room_stats, server_stats, stats_lock
print(f"[ClientManager] add_client called: room={room_id}, ip={ip_address}")
if room_id not in rooms: if room_id not in rooms:
print(f"[ClientManager] Room not found: {room_id}") print(f"[ClientManager] Room not found: {room_id}")
return None return None
room = rooms[room_id] room = rooms[room_id]
print(f"[ClientManager] Found room: {room['name']}")
if room["password"] != password: if room["password"] != password:
print(f"[ClientManager] Invalid password for room: {room_id}") print(f"[ClientManager] Invalid password for room: {room_id} (expected: {room['password']}, got: {password})")
return None return None
if len(room["clients"]) >= room["max_connections"]: if len(room["clients"]) >= room["max_connections"]:
print(f"[ClientManager] Room {room_id} is full") print(f"[ClientManager] Room {room_id} is full: {len(room['clients'])}/{room['max_connections']}")
return None return None
client_id = ClientManager.generate_client_id() client_id = ClientManager.generate_client_id()
print(f"[ClientManager] Generated client_id: {client_id}")
client = { client = {
"id": client_id, "id": client_id,
@@ -264,7 +268,7 @@ class ClientManager:
server_stats["total_clients"] = len(clients) server_stats["total_clients"] = len(clients)
print(f"[ClientManager] Added client: {client_id} to room: {room_id}") print(f"[ClientManager] Added client: {client_id} to room: {room_id} (total in room: {len(room['clients'])})")
return client_id return client_id
@staticmethod @staticmethod
@@ -329,7 +333,7 @@ class VideoProcessor:
) )
self.process.daemon = True self.process.daemon = True
self.process.start() self.process.start()
print(f"[VideoProcessor] Started for client: {self.client_id}") print(f"[VideoProcessor] Started process for client: {self.client_id} (PID: {self.process.pid})")
def stop(self): def stop(self):
"""Остановка процесса""" """Остановка процесса"""
@@ -344,59 +348,81 @@ class VideoProcessor:
@staticmethod @staticmethod
def _process_video_stream(client_id: str): def _process_video_stream(client_id: str):
"""Основной цикл обработки видео""" """Основной цикл обработки видео"""
print(f"[VideoProcessor] Process started for client {client_id}") print(f"\n[VideoProcessor Process] ===== PROCESS START =====")
print(f"[VideoProcessor Process] Client ID: {client_id}, PID: {os.getpid()}")
video_queue = None video_queue = None
command_queue = None command_queue = None
# Ждем инициализации очередей # Ждем инициализации очередей
for _ in range(100): for attempt in range(100):
try: try:
if client_id in video_queues and client_id in command_queues: if client_id in video_queues and client_id in command_queues:
video_queue = video_queues[client_id] video_queue = video_queues[client_id]
command_queue = command_queues[client_id] command_queue = command_queues[client_id]
print(f"[VideoProcessor Process] ✓ Queues found on attempt {attempt+1}")
break break
except: except:
pass pass
time.sleep(0.1) time.sleep(0.1)
if not video_queue or not command_queue: if not video_queue or not command_queue:
print(f"[VideoProcessor] Queues not found for client {client_id}") print(f"[VideoProcessor Process] ❌ CRITICAL: Queues not found after 10s for client {client_id}")
return return
frame_count = 0 frame_count = 0
last_log_time = time.time() last_log_time = time.time()
last_frame_time = time.time()
print(f"[VideoProcessor Process] ===== WAITING FOR FRAMES =====\n")
while True: while True:
try: try:
if not video_queue.empty(): if not video_queue.empty():
frame_data = video_queue.get(timeout=1) frame_data = video_queue.get(timeout=1)
frame_count += 1 frame_count += 1
last_frame_time = time.time()
nparr = np.frombuffer(frame_data, np.uint8) try:
frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR) nparr = np.frombuffer(frame_data, np.uint8)
frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if frame is not None: if frame is not None:
# Применяем команды из очереди # Применяем команды из очереди
while not command_queue.empty(): commands_applied = 0
try: while not command_queue.empty():
command = command_queue.get_nowait() try:
frame = VideoProcessor._apply_command(frame, command) command = command_queue.get_nowait()
except: frame = VideoProcessor._apply_command(frame, command)
break commands_applied += 1
except:
break
# Логируем каждые 5 секунд if commands_applied > 0:
current_time = time.time() print(f"[VideoProcessor Process] {client_id}: applied {commands_applied} commands to frame #{frame_count}")
if current_time - last_log_time > 5: else:
print(f"[VideoProcessor] Client {client_id}: Processed {frame_count} frames") print(f"[VideoProcessor Process] ⚠️ {client_id}: frame decode FAILED at frame #{frame_count}")
last_log_time = current_time except Exception as e:
print(f"[VideoProcessor Process] ❌ {client_id}: Error decoding frame #{frame_count}: {e}")
# Логируем прогресс каждые 5 секунд
current_time = time.time()
if current_time - last_log_time > 5:
time_since_frame = current_time - last_frame_time
if frame_count == 0:
print(f"[VideoProcessor Process] ⚠️ {client_id}: NO FRAMES YET (waiting for {time_since_frame:.1f}s)")
else:
print(f"[VideoProcessor Process] {client_id}: Processed {frame_count} frames (last {time_since_frame:.1f}s ago)")
last_log_time = current_time
time.sleep(0.001) time.sleep(0.001)
except Exception as e: except Exception as e:
# print(f"[VideoProcessor] Error: {e}") print(f"[VideoProcessor Process] ❌ {client_id}: Error in main loop: {type(e).__name__}: {e}")
time.sleep(0.1) time.sleep(0.1)
print(f"[VideoProcessor Process] ===== PROCESS END =====\n")
@staticmethod @staticmethod
def _apply_command(frame: np.ndarray, command: Dict) -> np.ndarray: def _apply_command(frame: np.ndarray, command: Dict) -> np.ndarray:
"""Применение команды к кадру""" """Применение команды к кадру"""
@@ -990,28 +1016,43 @@ async def client_websocket_endpoint(websocket: WebSocket, room_id: str, password
try: try:
client_ip = websocket.client.host if websocket.client else "unknown" client_ip = websocket.client.host if websocket.client else "unknown"
print(f"\n[WebSocket Client] ===== NEW CONNECTION =====")
print(f"[WebSocket Client] IP: {client_ip}, Room: {room_id}, Password: {password}")
client_id = ClientManager.add_client(room_id, password, client_ip) client_id = ClientManager.add_client(room_id, password, client_ip)
if not client_id: if not client_id:
print(f"[WebSocket Client] ❌ ClientManager.add_client returned None")
await websocket.send_text(json.dumps({"error": "Invalid room or password"})) await websocket.send_text(json.dumps({"error": "Invalid room or password"}))
await websocket.close() await websocket.close()
return return
print(f"[WebSocket Client] ✓ Client added: {client_id}")
client_websockets[client_id] = websocket client_websockets[client_id] = websocket
print(f"[WebSocket Client] ✓ WebSocket registered for {client_id}")
video_processor = VideoProcessor(client_id) video_processor = VideoProcessor(client_id)
print(f"[WebSocket Client] ✓ VideoProcessor created for {client_id}")
video_processor.start() video_processor.start()
print(f"[WebSocket Client] ✓ VideoProcessor started for {client_id}")
with stats_lock: with stats_lock:
if client_id in clients: if client_id in clients:
clients[client_id]["is_streaming"] = True clients[client_id]["is_streaming"] = True
print(f"[WebSocket Client] ✓ is_streaming set to True for {client_id}")
await websocket.send_text(json.dumps({ success_msg = {
"success": True, "success": True,
"client_id": client_id, "client_id": client_id,
"room_id": room_id "room_id": room_id
})) }
await websocket.send_text(json.dumps(success_msg))
print(f"[WebSocket Client] ✓ Sent success message to {client_id}")
print(f"[WebSocket Client] ===== WAITING FOR DATA =====\n")
print(f"[WebSocket] Client connected: {client_id} to room {room_id}") frame_count = 0
command_count = 0
while True: while True:
try: try:
@@ -1024,6 +1065,13 @@ async def client_websocket_endpoint(websocket: WebSocket, room_id: str, password
if client_id in video_queues: if client_id in video_queues:
if not video_queues[client_id].full(): if not video_queues[client_id].full():
video_queues[client_id].put(message) video_queues[client_id].put(message)
frame_count += 1
if frame_count % 30 == 0: # log every 30 frames
print(f"[WebSocket Client] {client_id}: received {frame_count} frames")
else:
print(f"[WebSocket Client] ⚠️ {client_id}: video queue is FULL, dropping frame")
else:
print(f"[WebSocket Client] ❌ {client_id}: video_queue not found!")
elif isinstance(message, str): elif isinstance(message, str):
try: try:
@@ -1035,31 +1083,38 @@ async def client_websocket_endpoint(websocket: WebSocket, room_id: str, password
if client_id in command_queues: if client_id in command_queues:
command_queues[client_id].put(command) command_queues[client_id].put(command)
except: command_count += 1
pass print(f"[WebSocket Client] {client_id}: command received: {command.get('type')}")
except json.JSONDecodeError as e:
print(f"[WebSocket Client] ⚠️ {client_id}: invalid JSON: {message[:50]}")
elif data["type"] == "websocket.disconnect": elif data["type"] == "websocket.disconnect":
print(f"[WebSocket] Client disconnected: {client_id}") print(f"[WebSocket Client] {client_id}: CLIENT DISCONNECT (frames: {frame_count}, commands: {command_count})")
break break
except WebSocketDisconnect: except WebSocketDisconnect:
print(f"[WebSocket] Client WebSocket disconnected: {client_id}") print(f"[WebSocket Client] {client_id}: WEBSOCKET DISCONNECT (frames: {frame_count})")
break break
except Exception as e: except Exception as e:
print(f"[WebSocket] Error in client WebSocket {client_id}: {e}") print(f"[WebSocket Client] ❌ {client_id}: Error in receive loop: {type(e).__name__}: {e}")
break break
except Exception as e: except Exception as e:
print(f"[WebSocket] Client connection error: {e}") print(f"[WebSocket Client] ❌ Connection error: {type(e).__name__}: {e}")
finally: finally:
print(f"[WebSocket Client] ===== CLEANUP =====")
if client_id: if client_id:
with stats_lock: with stats_lock:
if client_id in clients: if client_id in clients:
clients[client_id]["is_streaming"] = False clients[client_id]["is_streaming"] = False
print(f"[WebSocket Client] ✓ is_streaming set to False for {client_id}")
ClientManager.disconnect_client(client_id) ClientManager.disconnect_client(client_id)
print(f"[WebSocket Client] ✓ Disconnected client: {client_id}")
if video_processor: if video_processor:
video_processor.stop() video_processor.stop()
print(f"[WebSocket Client] ✓ VideoProcessor stopped for {client_id}")
print(f"[WebSocket Client] ===== END OF CONNECTION =====\n")
@app.websocket("/ws/admin/{session_id}") @app.websocket("/ws/admin/{session_id}")
async def admin_websocket_endpoint(websocket: WebSocket, session_id: str): async def admin_websocket_endpoint(websocket: WebSocket, session_id: str):

BIN
test_frame.jpg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 332 B

97
test_send_frame.py Normal file
View File

@@ -0,0 +1,97 @@
#!/usr/bin/env python3
"""
Скрипт для отправки тестового JPEG-кадра на сервер.
Помогает проверить, работает ли VideoProcessor без мобильного приложения.
"""
import asyncio
import websockets
import json
import sys
import cv2
import numpy as np
from pathlib import Path
# Параметры подключения
HOST = "192.168.0.112" # Измените на ваш PUBLIC_HOST из .env
PORT = 8000
ROOM_ID = "DazqDVlwdGX" # Измените на ID комнаты
PASSWORD = "1" # Измените на пароль комнаты
FRAME_FILE = "test_frame.jpg" # Путь к тестовому JPEG-файлу
async def send_frames():
"""Подключиться к WebSocket и отправить кадры"""
uri = f"ws://{HOST}:{PORT}/ws/client/{ROOM_ID}/{PASSWORD}"
print(f"[Test] Connecting to: {uri}")
try:
async with websockets.connect(uri) as ws:
# Получаем ответ сервера
response = await asyncio.wait_for(ws.recv(), timeout=5)
print(f"[Test] ✓ Server response: {response}")
# Читаем тестовый кадр
if not Path(FRAME_FILE).exists():
print(f"[Test] File not found: {FRAME_FILE}")
print(f"[Test] Creating a proper JPEG test file with OpenCV...")
if not create_minimal_jpeg(FRAME_FILE):
print(f"[Test] ❌ Failed to create test JPEG")
return
with open(FRAME_FILE, "rb") as f:
frame_data = f.read()
print(f"[Test] Frame size: {len(frame_data)} bytes")
# Отправляем кадры в цикле (30 FPS, 10 секунд)
print(f"[Test] Sending frames (30 FPS for 10 seconds)...")
for i in range(300): # 10 sec * 30 FPS
await ws.send(frame_data)
if (i + 1) % 30 == 0:
print(f"[Test] ✓ Sent {i + 1} frames")
await asyncio.sleep(1 / 30) # ~30 FPS
print(f"[Test] ✓ Done! Sent 300 frames")
await asyncio.sleep(2) # Пауза перед отключением
except asyncio.TimeoutError:
print(f"[Test] ❌ Timeout waiting for server response")
except ConnectionRefusedError:
print(f"[Test] ❌ Connection refused. Check HOST/PORT and firewall")
except Exception as e:
print(f"[Test] ❌ Error: {type(e).__name__}: {e}")
def create_minimal_jpeg(filename):
"""Создаёт корректный JPEG-файл (640x480 с синим фоном) для теста"""
# Создаём пустой образ 640x480 (синий)
image = np.zeros((480, 640, 3), dtype=np.uint8)
image[:, :] = (255, 0, 0) # BGR: синий цвет
# Добавляем текст
cv2.putText(image, "Test Frame", (260, 240),
cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)
# Кодируем в JPEG
success, jpeg_data = cv2.imencode('.jpg', image)
if not success:
print(f"[Test] ❌ Failed to create JPEG")
return False
# Сохраняем в файл
with open(filename, "wb") as f:
f.write(jpeg_data.tobytes())
print(f"[Test] ✓ Created test JPEG: {filename} ({len(jpeg_data.tobytes())} bytes)")
return True
if __name__ == "__main__":
print("=" * 60)
print("WebSocket Frame Sender Test")
print("=" * 60)
print(f"Target: ws://{HOST}:{PORT}/ws/client/{ROOM_ID}/{PASSWORD}")
print()
try:
asyncio.run(send_frames())
except KeyboardInterrupt:
print("\n[Test] Interrupted by user")