Files
chat/tests/test_websocket_full.py
Andrey K. Choi 3050e084fa
All checks were successful
continuous-integration/drone/push Build is passing
main functions commit
2025-10-19 19:50:00 +09:00

152 lines
6.6 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Полный тест WebSocket функциональности Emergency Service
"""
import asyncio
import json
import sys
import httpx
import websockets
from datetime import datetime
class WebSocketTester:
def __init__(self):
self.base_url = "http://localhost:8001" # User Service для аутентификации
self.ws_url = "ws://localhost:8002" # Emergency Service для WebSocket
self.token = None
async def login_and_get_token(self):
"""Логин и получение токена"""
login_data = {
"email": "shadow85@list.ru",
"password": "R0sebud1"
}
try:
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.base_url}/api/v1/auth/login",
json=login_data
)
if response.status_code == 200:
data = response.json()
self.token = data.get("access_token")
print(f"✅ Успешная аутентификация! Токен получен: {self.token[:20]}...")
return True
else:
print(f"❌ Ошибка аутентификации: {response.status_code} - {response.text}")
return False
except Exception as e:
print(f"❌ Ошибка подключения к User Service: {e}")
return False
async def test_websocket_connection(self):
"""Тест WebSocket подключения"""
if not self.token:
print("❌ Токен не получен, тест невозможен")
return False
ws_uri = f"{self.ws_url}/api/v1/emergency/ws/current_user_id?token={self.token}"
print(f"🔌 Подключение к WebSocket: {ws_uri}")
try:
async with websockets.connect(ws_uri) as websocket:
print("✅ WebSocket подключение установлено!")
# Отправляем ping
ping_message = {"type": "ping", "timestamp": datetime.now().isoformat()}
await websocket.send(json.dumps(ping_message))
print(f"📤 Отправлен ping: {ping_message}")
# Ждем сообщения в течение 10 секунд
try:
while True:
message = await asyncio.wait_for(websocket.recv(), timeout=2.0)
data = json.loads(message)
print(f"📥 Получено сообщение: {data}")
# Если получили pong, отправим еще один ping
if data.get("type") == "pong":
await asyncio.sleep(1)
another_ping = {"type": "ping", "message": "Второй ping"}
await websocket.send(json.dumps(another_ping))
print(f"📤 Отправлен второй ping: {another_ping}")
except asyncio.TimeoutError:
print("⏱️ Таймаут - больше сообщений нет")
print("✅ WebSocket тест завершен успешно!")
return True
except websockets.exceptions.ConnectionClosedError as e:
print(f"❌ WebSocket соединение закрыто: код {e.code}, причина: {e}")
return False
except Exception as e:
print(f"❌ Ошибка WebSocket: {type(e).__name__}: {e}")
return False
async def test_emergency_alert_creation(self):
"""Тест создания экстренного оповещения через REST API"""
if not self.token:
print("❌ Токен не получен, тест создания алерта невозможен")
return False
alert_data = {
"latitude": 55.7558,
"longitude": 37.6176,
"alert_type": "general",
"message": "Тестовое оповещение от WebSocket теста",
"address": "Тестовый адрес"
}
try:
async with httpx.AsyncClient() as client:
response = await client.post(
"http://localhost:8002/api/v1/alert",
json=alert_data,
headers={"Authorization": f"Bearer {self.token}"}
)
if response.status_code == 200:
alert = response.json()
print(f"✅ Экстренное оповещение создано! ID: {alert.get('id')}")
return True
else:
print(f"❌ Ошибка создания оповещения: {response.status_code} - {response.text}")
return False
except Exception as e:
print(f"❌ Ошибка при создании оповещения: {e}")
return False
async def run_full_test(self):
"""Запуск полного теста"""
print("🚀 Запуск полного теста WebSocket функциональности")
print("=" * 60)
# 1. Аутентификация
print("1⃣ Тестирование аутентификации...")
if not await self.login_and_get_token():
return False
# 2. WebSocket подключение
print("\n2⃣ Тестирование WebSocket подключения...")
if not await self.test_websocket_connection():
return False
# 3. Создание экстренного оповещения
print("\n3⃣ Тестирование создания экстренного оповещения...")
if not await self.test_emergency_alert_creation():
return False
print("\n🎉 Все тесты прошли успешно!")
print("WebSocket функциональность Emergency Service работает корректно!")
return True
async def main():
tester = WebSocketTester()
success = await tester.run_full_test()
sys.exit(0 if success else 1)
if __name__ == "__main__":
asyncio.run(main())