#!/usr/bin/env python3 """ PyGuardian Test Script Скрипт для тестирования компонентов системы """ import asyncio import sys import os from pathlib import Path # Добавляем src в путь для импортов sys.path.insert(0, str(Path(__file__).parent / "src")) from src.storage import Storage from src.monitor import LogParser, LogEvent from datetime import datetime async def test_storage(): """Тест модуля хранения данных""" print("🧪 Тестирование Storage...") # Создаем временную базу данных db_path = "/tmp/test_guardian.db" if os.path.exists(db_path): os.remove(db_path) storage = Storage(db_path) await storage.init_database() # Тест добавления попыток атак await storage.add_attack_attempt( ip="192.168.1.100", username="root", attack_type="failed_password", log_line="Failed password for root from 192.168.1.100", timestamp=datetime.now() ) # Тест получения количества попыток count = await storage.get_attack_count_for_ip("192.168.1.100", 60) print(f"✅ Попыток атак от 192.168.1.100: {count}") # Тест бана IP success = await storage.ban_ip("192.168.1.100", "Test ban", 3600) print(f"✅ Бан IP: {'успешно' if success else 'ошибка'}") # Тест проверки бана is_banned = await storage.is_ip_banned("192.168.1.100") print(f"✅ IP забанен: {is_banned}") # Тест получения топ атакующих top_attackers = await storage.get_top_attackers(limit=5) print(f"✅ Топ атакующих: {len(top_attackers)} записей") # Очистка os.remove(db_path) print("✅ Storage тест завершен успешно\n") def test_log_parser(): """Тест парсера логов""" print("🧪 Тестирование Log Parser...") # Создаем парсер patterns = [ "Failed password", "Invalid user", "authentication failure", "Too many authentication failures" ] parser = LogParser(patterns) # Тестовые строки логов test_lines = [ "Nov 25 14:30:15 server sshd[12345]: Failed password for root from 192.168.1.100 port 22 ssh2", "Nov 25 14:31:15 server sshd[12348]: Invalid user hacker from 192.168.1.101 port 22", "Nov 25 14:32:15 server sshd[12351]: Too many authentication failures for root from 192.168.1.102 port 22", "Nov 25 14:34:15 server sshd[12353]: Accepted password for admin from 192.168.1.10 port 22 ssh2", "Nov 25 14:35:15 server sshd[12355]: authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=192.168.1.104 user=root" ] parsed_events = 0 for line in test_lines: event = parser.parse_line(line) if event: parsed_events += 1 print(f" 📝 {event.event_type}: {event.ip_address} ({event.username})") print(f"✅ Распарсено {parsed_events} из {len(test_lines)} строк") print("✅ Log Parser тест завершен успешно\n") def test_whitelist(): """Тест функции белого списка""" print("🧪 Тестирование Whitelist...") storage = Storage(":memory:") # Временная база в памяти whitelist = [ "127.0.0.1", "192.168.1.0/24", "10.0.0.0/8" ] # Тестовые IP test_cases = [ ("127.0.0.1", True), # Localhost ("192.168.1.50", True), # В сети 192.168.1.0/24 ("10.5.10.20", True), # В сети 10.0.0.0/8 ("8.8.8.8", False), # Публичный IP ("192.168.2.10", False), # Другая подсеть ] for ip, expected in test_cases: # Используем синхронный цикл для теста loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) is_whitelisted = loop.run_until_complete( storage.is_whitelisted(ip, whitelist) ) loop.close() status = "✅" if is_whitelisted == expected else "❌" print(f" {status} {ip}: {'в белом списке' if is_whitelisted else 'не в белом списке'}") print("✅ Whitelist тест завершен успешно\n") async def run_all_tests(): """Запуск всех тестов""" print("🚀 Запуск тестов PyGuardian\n") try: await test_storage() test_log_parser() test_whitelist() print("🎉 Все тесты выполнены успешно!") except Exception as e: print(f"❌ Ошибка в тестах: {e}") return False return True if __name__ == "__main__": success = asyncio.run(run_all_tests()) sys.exit(0 if success else 1)