Files
chat/tests/test_websocket_monitoring.py
Andrey K. Choi 3050e084fa
All checks were successful
continuous-integration/drone/push Build is passing
main functions commit
2025-10-19 19:50:00 +09:00

267 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Тест мониторинга WebSocket подключений в Emergency Service
"""
import asyncio
import json
import sys
import time
from datetime import datetime
from typing import List
import aiohttp
import websockets
import requests
# Конфигурация
BASE_URL = "http://192.168.219.108"
GATEWAY_PORT = "8000"
EMERGENCY_PORT = "8002"
# Тестовые данные пользователей
TEST_USERS = [
{"email": "shadow85@list.ru", "password": "R0sebud1985"},
{"email": "user2@example.com", "password": "password123"},
{"email": "user3@example.com", "password": "password123"},
]
class WebSocketMonitoringTest:
def __init__(self):
self.gateway_url = f"{BASE_URL}:{GATEWAY_PORT}"
self.emergency_url = f"{BASE_URL}:{EMERGENCY_PORT}"
self.tokens = {}
self.websockets = {}
def get_jwt_token(self, email: str, password: str) -> str:
"""Получить JWT токен через аутентификацию"""
try:
response = requests.post(
f"{self.gateway_url}/api/v1/auth/login",
json={"email": email, "password": password}
)
if response.status_code == 200:
return response.json()["access_token"]
else:
print(f"❌ Login failed for {email}: {response.status_code}")
return None
except Exception as e:
print(f"❌ Login error for {email}: {e}")
return None
async def connect_websocket(self, email: str, token: str) -> bool:
"""Подключить WebSocket для пользователя"""
try:
ws_url = f"ws://{BASE_URL.replace('http://', '')}:{EMERGENCY_PORT}/api/v1/emergency/ws/current_user_id?token={token}"
async with websockets.connect(ws_url) as websocket:
# Ждем приветственное сообщение
welcome_message = await websocket.recv()
print(f"✅ WebSocket connected for {email}")
print(f" Welcome message: {welcome_message}")
self.websockets[email] = websocket
# Держим соединение открытым и слушаем сообщения
try:
await asyncio.sleep(2) # Держим соединение 2 секунды
return True
except websockets.exceptions.ConnectionClosed:
print(f"⚠️ WebSocket connection closed for {email}")
return False
except Exception as e:
print(f"❌ WebSocket connection failed for {email}: {e}")
return False
def get_websocket_connections(self, token: str) -> dict:
"""Получить информацию о WebSocket подключениях"""
try:
response = requests.get(
f"{self.emergency_url}/api/v1/websocket/connections",
headers={"Authorization": f"Bearer {token}"}
)
if response.status_code == 200:
return response.json()
else:
print(f"❌ Failed to get connections: {response.status_code}")
print(f" Response: {response.text}")
return {}
except Exception as e:
print(f"❌ Error getting connections: {e}")
return {}
def get_websocket_stats(self, token: str) -> dict:
"""Получить статистику WebSocket подключений"""
try:
response = requests.get(
f"{self.emergency_url}/api/v1/websocket/stats",
headers={"Authorization": f"Bearer {token}"}
)
if response.status_code == 200:
return response.json()
else:
print(f"❌ Failed to get stats: {response.status_code}")
return {}
except Exception as e:
print(f"❌ Error getting stats: {e}")
return {}
def ping_connections(self, token: str) -> dict:
"""Пинг всех WebSocket подключений"""
try:
response = requests.post(
f"{self.emergency_url}/api/v1/websocket/ping",
headers={"Authorization": f"Bearer {token}"}
)
if response.status_code == 200:
return response.json()
else:
print(f"❌ Failed to ping: {response.status_code}")
return {}
except Exception as e:
print(f"❌ Error pinging: {e}")
return {}
def broadcast_test_message(self, token: str, message: str) -> dict:
"""Отправить тестовое сообщение всем подключенным"""
try:
response = requests.post(
f"{self.emergency_url}/api/v1/websocket/broadcast?message={message}",
headers={"Authorization": f"Bearer {token}"}
)
if response.status_code == 200:
return response.json()
else:
print(f"❌ Failed to broadcast: {response.status_code}")
return {}
except Exception as e:
print(f"❌ Error broadcasting: {e}")
return {}
async def test_multiple_connections(self):
"""Тест множественных WebSocket подключений"""
print("\n🔥 Testing WebSocket Monitoring System")
print("=" * 50)
# 1. Получаем токены для всех пользователей
print("\n📋 Step 1: Getting JWT tokens...")
for user in TEST_USERS:
token = self.get_jwt_token(user["email"], user["password"])
if token:
self.tokens[user["email"]] = token
print(f"✅ Got token for {user['email']}")
else:
print(f"❌ Failed to get token for {user['email']}")
if not self.tokens:
print("❌ No tokens obtained, stopping test")
return
# Берем первый токен для мониторинга
main_token = list(self.tokens.values())[0]
# 2. Проверяем начальное состояние
print("\n📊 Step 2: Checking initial state...")
initial_stats = self.get_websocket_stats(main_token)
print(f"Initial connections: {initial_stats.get('total_connections', 0)}")
# 3. Подключаем несколько WebSocket соединений параллельно
print("\n🔌 Step 3: Connecting multiple WebSockets...")
# Создаем задачи для параллельного подключения
connection_tasks = []
for email, token in self.tokens.items():
if token: # Только если есть токен
task = asyncio.create_task(
self.connect_websocket(email, token)
)
connection_tasks.append((email, task))
# Ждем подключения всех
connection_results = []
for email, task in connection_tasks:
try:
result = await task
connection_results.append((email, result))
except Exception as e:
print(f"❌ Connection task failed for {email}: {e}")
connection_results.append((email, False))
# 4. Проверяем подключения после соединения
print("\n📊 Step 4: Checking connections after WebSocket setup...")
await asyncio.sleep(1) # Даем время серверу обновить статистику
connections_info = self.get_websocket_connections(main_token)
stats = self.get_websocket_stats(main_token)
print(f"Active connections: {stats.get('total_connections', 0)}")
print(f"Connected users: {stats.get('connected_users', [])}")
if connections_info.get('connection_details'):
print("\n🔍 Connection Details:")
for user_id, details in connections_info['connection_details'].items():
print(f" User {user_id}:")
print(f" - Connected at: {details.get('connected_at')}")
print(f" - Client: {details.get('client_host')}:{details.get('client_port')}")
print(f" - Messages: {details.get('message_count', 0)}")
print(f" - Duration: {details.get('duration_seconds')}s")
# 5. Пинг всех подключений
print("\n📡 Step 5: Pinging all connections...")
ping_result = self.ping_connections(main_token)
print(f"Ping result: {ping_result}")
# 6. Отправка тестового сообщения
print("\n📢 Step 6: Broadcasting test message...")
broadcast_result = self.broadcast_test_message(main_token, "Hello from monitoring test!")
print(f"Broadcast result: {broadcast_result}")
# 7. Финальная статистика
print("\n📊 Step 7: Final statistics...")
final_stats = self.get_websocket_stats(main_token)
final_connections = self.get_websocket_connections(main_token)
print(f"Final connections: {final_stats.get('total_connections', 0)}")
print(f"Total messages sent: {final_stats.get('total_messages_sent', 0)}")
# Резюме
print("\n" + "=" * 50)
print("🎯 TEST SUMMARY")
print("=" * 50)
successful_connections = sum(1 for _, success in connection_results if success)
total_attempts = len(connection_results)
print(f"✅ Successful connections: {successful_connections}/{total_attempts}")
print(f"📊 Active connections tracked: {final_stats.get('total_connections', 0)}")
print(f"📨 Total messages sent: {final_stats.get('total_messages_sent', 0)}")
print(f"👥 Connected users: {len(final_stats.get('connected_users', []))}")
if successful_connections > 0:
print("🎉 WebSocket Monitoring System - WORKING!")
else:
print("❌ WebSocket Monitoring System - ISSUES FOUND")
async def main():
"""Главная функция тестирования"""
tester = WebSocketMonitoringTest()
await tester.test_multiple_connections()
if __name__ == "__main__":
print("🚀 Starting WebSocket Monitoring Test...")
asyncio.run(main())