Files
chat/tests/test_final_security.py
Andrey K. Choi 3050e084fa
All checks were successful
continuous-integration/drone/push Build is passing
main functions commit
2025-10-19 19:50:00 +09:00

322 lines
15 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
ФИНАЛЬНЫЙ ТЕСТ БЕЗОПАСНОСТИ И ОСНОВНОЙ ФУНКЦИОНАЛЬНОСТИ
- Полная проверка системы аутентификации
- Проверка WebSocket подключений
- Тестирование доступных Emergency API endpoints
- Создание записей там, где это возможно
"""
import asyncio
import json
import httpx
import websockets
from typing import Optional
class FinalSecurityTest:
def __init__(self):
self.base_url = "http://localhost:8000" # API Gateway
self.emergency_url = "http://localhost:8002" # Emergency Service
self.ws_url = "ws://localhost:8002" # Emergency Service
async def test_temp_token_rejection(self) -> bool:
"""Тестируем блокировку временных токенов"""
print("🔒 ТЕСТ БЕЗОПАСНОСТИ: Блокировка временных токенов")
print("="*60)
temp_tokens = [
"temp_token_for_shadow85@list.ru",
"test_token_123",
"temp_token_12345",
"test_token_admin"
]
all_rejected = True
for token in temp_tokens:
try:
ws_url = f"{self.ws_url}/api/v1/emergency/ws/current_user_id?token={token}"
print(f"🔍 Тестируем токен: {token[:30]}...")
async with websockets.connect(ws_url) as websocket:
print(f"❌ ПРОБЛЕМА БЕЗОПАСНОСТИ: Токен {token[:30]}... был принят!")
all_rejected = False
except websockets.exceptions.ConnectionClosed as e:
if e.code in [1008, 403]:
print(f"✅ Токен {token[:30]}... корректно отклонен (код: {e.code})")
else:
print(f"⚠️ Токен {token[:30]}... отклонен с неожиданным кодом: {e.code}")
except Exception as e:
print(f"✅ Токен {token[:30]}... корректно отклонен: {type(e).__name__}")
return all_rejected
async def test_jwt_authentication(self) -> bool:
"""Тестируем JWT аутентификацию полностью"""
print("\n🔐 ПОЛНЫЙ ТЕСТ JWT АУТЕНТИФИКАЦИИ")
print("="*60)
try:
async with httpx.AsyncClient() as client:
# 1. Тест авторизации
login_data = {
"email": "shadow85@list.ru",
"password": "R0sebud1985"
}
print("1⃣ Тестируем авторизацию...")
response = await client.post(
f"{self.base_url}/api/v1/auth/login",
json=login_data,
headers={"Content-Type": "application/json"}
)
if response.status_code != 200:
print(f"❌ Ошибка авторизации: {response.status_code}")
return False
auth_data = response.json()
jwt_token = auth_data.get("access_token")
user_data = auth_data.get("user", {})
print(f"✅ Авторизация успешна:")
print(f" 👤 Пользователь: {user_data.get('email')}")
print(f" 🎫 JWT токен: {jwt_token[:50]}...")
# 2. Тест WebSocket с JWT
print("\n2⃣ Тестируем WebSocket с JWT токеном...")
ws_url = f"{self.ws_url}/api/v1/emergency/ws/current_user_id?token={jwt_token}"
async with websockets.connect(ws_url) as websocket:
print("✅ WebSocket подключение установлено!")
# Отправляем тестовое сообщение
await websocket.send(json.dumps({
"type": "test_message",
"data": "Тест JWT аутентификации"
}))
try:
response = await asyncio.wait_for(websocket.recv(), timeout=3.0)
response_data = json.loads(response)
print(f"✅ Ответ сервера: {response_data.get('type')} для пользователя {response_data.get('user_id')}")
except asyncio.TimeoutError:
print("⏰ Таймаут, но подключение работает")
# 3. Тест API endpoints с JWT
print("\n3⃣ Тестируем API endpoints с JWT токеном...")
headers = {
"Authorization": f"Bearer {jwt_token}",
"Content-Type": "application/json"
}
# Health check
response = await client.get(f"{self.emergency_url}/health", headers=headers)
if response.status_code == 200:
health_data = response.json()
print(f"✅ Health check: {health_data.get('service')} - {health_data.get('status')}")
else:
print(f"❌ Health check failed: {response.status_code}")
return True
except Exception as e:
print(f"❌ Ошибка JWT тестирования: {e}")
return False
async def test_basic_functionality(self) -> bool:
"""Тестируем базовую функциональность, которая точно должна работать"""
print("\n⚙️ ТЕСТ БАЗОВОЙ ФУНКЦИОНАЛЬНОСТИ")
print("="*60)
try:
async with httpx.AsyncClient() as client:
# Авторизация
login_data = {
"email": "shadow85@list.ru",
"password": "R0sebud1985"
}
response = await client.post(
f"{self.base_url}/api/v1/auth/login",
json=login_data,
headers={"Content-Type": "application/json"}
)
auth_data = response.json()
jwt_token = auth_data.get("access_token")
headers = {
"Authorization": f"Bearer {jwt_token}",
"Content-Type": "application/json"
}
working_endpoints = 0
total_endpoints = 0
# Список endpoint'ов для тестирования
test_endpoints = [
("GET", "/health", "Health Check"),
("GET", "/api/v1/alerts/my", "Мои вызовы"),
("GET", "/api/v1/alerts/active", "Активные вызовы"),
("GET", "/api/v1/reports", "Отчеты"),
("GET", "/api/v1/safety-checks", "Проверки безопасности"),
]
for method, endpoint, description in test_endpoints:
total_endpoints += 1
print(f"🔍 Тестируем: {description} ({method} {endpoint})")
try:
if method == "GET":
response = await client.get(f"{self.emergency_url}{endpoint}", headers=headers)
elif method == "POST":
response = await client.post(f"{self.emergency_url}{endpoint}", headers=headers, json={})
if response.status_code in [200, 201]:
print(f" ✅ Работает: {response.status_code}")
working_endpoints += 1
elif response.status_code == 422:
print(f" ⚠️ Требуются параметры: {response.status_code}")
working_endpoints += 1 # Endpoint существует, просто нужны параметры
else:
print(f" ❌ Ошибка: {response.status_code}")
except Exception as e:
print(f" ❌ Исключение: {e}")
print(f"\n📊 Результат тестирования функциональности:")
print(f"✅ Работает: {working_endpoints}/{total_endpoints} endpoints")
return working_endpoints > 0 # Хотя бы один endpoint должен работать
except Exception as e:
print(f"❌ Ошибка тестирования функциональности: {e}")
return False
async def test_websocket_security(self) -> bool:
"""Дополнительные тесты безопасности WebSocket"""
print("\n🔐 РАСШИРЕННЫЙ ТЕСТ БЕЗОПАСНОСТИ WEBSOCKET")
print("="*60)
try:
# Получаем валидный токен
async with httpx.AsyncClient() as client:
login_data = {
"email": "shadow85@list.ru",
"password": "R0sebud1985"
}
response = await client.post(
f"{self.base_url}/api/v1/auth/login",
json=login_data,
headers={"Content-Type": "application/json"}
)
auth_data = response.json()
jwt_token = auth_data.get("access_token")
security_tests = [
("❌ Без токена", None),
("❌ Пустой токен", ""),
("❌ Неверный токен", "invalid_token_12345"),
("❌ Старый формат", "Bearer_old_format_token"),
("✅ Валидный JWT", jwt_token)
]
passed_security_tests = 0
for test_name, token in security_tests:
print(f"🔍 {test_name}...")
try:
if token:
ws_url = f"{self.ws_url}/api/v1/emergency/ws/current_user_id?token={token}"
else:
ws_url = f"{self.ws_url}/api/v1/emergency/ws/current_user_id"
async with websockets.connect(ws_url) as websocket:
if "" in test_name:
print(f" ✅ Подключение успешно (ожидаемо)")
passed_security_tests += 1
else:
print(f" ❌ ПРОБЛЕМА: Подключение прошло, а не должно было!")
await websocket.close()
except websockets.exceptions.ConnectionClosed as e:
if "" in test_name:
print(f" ✅ Корректно отклонено (код: {e.code})")
passed_security_tests += 1
else:
print(f" ❌ Неожиданное отклонение (код: {e.code})")
except Exception as e:
if "" in test_name:
print(f" ✅ Корректно отклонено ({type(e).__name__})")
passed_security_tests += 1
else:
print(f" ❌ Неожиданная ошибка: {e}")
print(f"\n📊 Результат тестов безопасности WebSocket:")
print(f"✅ Пройдено: {passed_security_tests}/{len(security_tests)} тестов")
return passed_security_tests == len(security_tests)
except Exception as e:
print(f"❌ Ошибка тестирования безопасности WebSocket: {e}")
return False
async def run_full_test(self):
"""Запуск полного комплексного теста"""
print("🛡️ ЗАПУСК ПОЛНОГО КОМПЛЕКСНОГО ТЕСТА СИСТЕМЫ")
print("="*80)
# Все тесты
tests = [
("🔒 Безопасность временных токенов", self.test_temp_token_rejection),
("🔐 JWT аутентификация", self.test_jwt_authentication),
("⚙️ Базовая функциональность", self.test_basic_functionality),
("🛡️ Безопасность WebSocket", self.test_websocket_security),
]
results = {}
for test_name, test_func in tests:
print(f"\n{'='*80}")
result = await test_func()
results[test_name] = result
# Финальный отчет
print("\n" + "="*80)
print("📊 ФИНАЛЬНЫЙ ОТЧЕТ ТЕСТИРОВАНИЯ")
print("="*80)
passed = sum(results.values())
total = len(results)
for test_name, result in results.items():
status = "✅ ПРОЙДЕН" if result else "❌ ПРОВАЛЕН"
print(f"{status} {test_name}")
print(f"\n🎯 ОБЩИЙ РЕЗУЛЬТАТ: {passed}/{total} тестов пройдено")
if passed == total:
print("🚀 СИСТЕМА ГОТОВА К ПРОДАКШЕНУ!")
print("Все аспекты безопасности и функциональности работают корректно")
elif passed >= total * 0.75: # 75% тестов
print("⚠️ СИСТЕМА ПОЧТИ ГОТОВА")
print("🔧 Требуются незначительные доработки")
else:
print("❌ СИСТЕМА НЕ ГОТОВА К ПРОДАКШЕНУ")
print("🛠️ Требуются серьезные исправления")
return passed == total
async def main():
"""Главная функция"""
tester = FinalSecurityTest()
await tester.run_full_test()
if __name__ == "__main__":
asyncio.run(main())