#!/usr/bin/env python3 """ Скрипт для отправки тестового JPEG-кадра на сервер. Помогает проверить, работает ли VideoProcessor без мобильного приложения. """ import asyncio import websockets import json import sys import cv2 import numpy as np from pathlib import Path # Параметры подключения HOST = "192.168.0.112" # Измените на ваш PUBLIC_HOST из .env PORT = 8000 ROOM_ID = "AtR2yGhr8QM" # Измените на ID комнаты PASSWORD = "1" # Измените на пароль комнаты FRAME_FILE = "test_frame.jpg" # Путь к тестовому JPEG-файлу async def send_frames(): """Подключиться к WebSocket и отправить кадры""" uri = f"ws://{HOST}:{PORT}/ws/client/{ROOM_ID}/{PASSWORD}" print(f"[Test] Connecting to: {uri}") try: async with websockets.connect(uri) as ws: # Получаем ответ сервера response = await asyncio.wait_for(ws.recv(), timeout=5) print(f"[Test] ✓ Server response: {response}") # Читаем тестовый кадр if not Path(FRAME_FILE).exists(): print(f"[Test] File not found: {FRAME_FILE}") print(f"[Test] Creating a proper JPEG test file with OpenCV...") if not create_minimal_jpeg(FRAME_FILE): print(f"[Test] ❌ Failed to create test JPEG") return with open(FRAME_FILE, "rb") as f: frame_data = f.read() print(f"[Test] Frame size: {len(frame_data)} bytes") # Отправляем кадры в цикле (30 FPS, 10 секунд) print(f"[Test] Sending frames (30 FPS for 10 seconds)...") for i in range(300): # 10 sec * 30 FPS await ws.send(frame_data) if (i + 1) % 30 == 0: print(f"[Test] ✓ Sent {i + 1} frames") await asyncio.sleep(1 / 30) # ~30 FPS print(f"[Test] ✓ Done! Sent 300 frames") await asyncio.sleep(2) # Пауза перед отключением except asyncio.TimeoutError: print(f"[Test] ❌ Timeout waiting for server response") except ConnectionRefusedError: print(f"[Test] ❌ Connection refused. Check HOST/PORT and firewall") except Exception as e: print(f"[Test] ❌ Error: {type(e).__name__}: {e}") def create_minimal_jpeg(filename): """Создаёт корректный JPEG-файл (640x480 с синим фоном) для теста""" # Создаём пустой образ 640x480 (синий) image = np.zeros((480, 640, 3), dtype=np.uint8) image[:, :] = (255, 0, 0) # BGR: синий цвет # Добавляем текст cv2.putText(image, "Test Frame", (260, 240), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2) # Кодируем в JPEG success, jpeg_data = cv2.imencode('.jpg', image) if not success: print(f"[Test] ❌ Failed to create JPEG") return False # Сохраняем в файл with open(filename, "wb") as f: f.write(jpeg_data.tobytes()) print(f"[Test] ✓ Created test JPEG: {filename} ({len(jpeg_data.tobytes())} bytes)") return True if __name__ == "__main__": print("=" * 60) print("WebSocket Frame Sender Test") print("=" * 60) print(f"Target: ws://{HOST}:{PORT}/ws/client/{ROOM_ID}/{PASSWORD}") print() try: asyncio.run(send_frames()) except KeyboardInterrupt: print("\n[Test] Interrupted by user")