Files
cam_control/test_send_frame.py
2025-12-09 20:27:25 +09:00

98 lines
3.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Скрипт для отправки тестового JPEG-кадра на сервер.
Помогает проверить, работает ли VideoProcessor без мобильного приложения.
"""
import asyncio
import websockets
import json
import sys
import cv2
import numpy as np
from pathlib import Path
# Параметры подключения
HOST = "192.168.0.112" # Измените на ваш PUBLIC_HOST из .env
PORT = 8000
ROOM_ID = "AtR2yGhr8QM" # Измените на ID комнаты
PASSWORD = "1" # Измените на пароль комнаты
FRAME_FILE = "test_frame.jpg" # Путь к тестовому JPEG-файлу
async def send_frames():
"""Подключиться к WebSocket и отправить кадры"""
uri = f"ws://{HOST}:{PORT}/ws/client/{ROOM_ID}/{PASSWORD}"
print(f"[Test] Connecting to: {uri}")
try:
async with websockets.connect(uri) as ws:
# Получаем ответ сервера
response = await asyncio.wait_for(ws.recv(), timeout=5)
print(f"[Test] ✓ Server response: {response}")
# Читаем тестовый кадр
if not Path(FRAME_FILE).exists():
print(f"[Test] File not found: {FRAME_FILE}")
print(f"[Test] Creating a proper JPEG test file with OpenCV...")
if not create_minimal_jpeg(FRAME_FILE):
print(f"[Test] ❌ Failed to create test JPEG")
return
with open(FRAME_FILE, "rb") as f:
frame_data = f.read()
print(f"[Test] Frame size: {len(frame_data)} bytes")
# Отправляем кадры в цикле (30 FPS, 10 секунд)
print(f"[Test] Sending frames (30 FPS for 10 seconds)...")
for i in range(300): # 10 sec * 30 FPS
await ws.send(frame_data)
if (i + 1) % 30 == 0:
print(f"[Test] ✓ Sent {i + 1} frames")
await asyncio.sleep(1 / 30) # ~30 FPS
print(f"[Test] ✓ Done! Sent 300 frames")
await asyncio.sleep(2) # Пауза перед отключением
except asyncio.TimeoutError:
print(f"[Test] ❌ Timeout waiting for server response")
except ConnectionRefusedError:
print(f"[Test] ❌ Connection refused. Check HOST/PORT and firewall")
except Exception as e:
print(f"[Test] ❌ Error: {type(e).__name__}: {e}")
def create_minimal_jpeg(filename):
"""Создаёт корректный JPEG-файл (640x480 с синим фоном) для теста"""
# Создаём пустой образ 640x480 (синий)
image = np.zeros((480, 640, 3), dtype=np.uint8)
image[:, :] = (255, 0, 0) # BGR: синий цвет
# Добавляем текст
cv2.putText(image, "Test Frame", (260, 240),
cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)
# Кодируем в JPEG
success, jpeg_data = cv2.imencode('.jpg', image)
if not success:
print(f"[Test] ❌ Failed to create JPEG")
return False
# Сохраняем в файл
with open(filename, "wb") as f:
f.write(jpeg_data.tobytes())
print(f"[Test] ✓ Created test JPEG: {filename} ({len(jpeg_data.tobytes())} bytes)")
return True
if __name__ == "__main__":
print("=" * 60)
print("WebSocket Frame Sender Test")
print("=" * 60)
print(f"Target: ws://{HOST}:{PORT}/ws/client/{ROOM_ID}/{PASSWORD}")
print()
try:
asyncio.run(send_frames())
except KeyboardInterrupt:
print("\n[Test] Interrupted by user")