""" Sessions module для PyGuardian Управление SSH сессиями и процессами пользователей """ import asyncio import logging import re import os from datetime import datetime from typing import Dict, List, Optional import psutil logger = logging.getLogger(__name__) class SessionManager: """Менеджер SSH сессий и пользовательских процессов""" def __init__(self): pass async def get_active_sessions(self) -> List[Dict]: """Получение всех активных SSH сессий""" try: sessions = [] # Метод 1: через who who_sessions = await self._get_sessions_via_who() sessions.extend(who_sessions) # Метод 2: через ps (для SSH процессов) ssh_sessions = await self._get_sessions_via_ps() sessions.extend(ssh_sessions) # Убираем дубликаты и объединяем информацию unique_sessions = self._merge_session_info(sessions) return unique_sessions except Exception as e: logger.error(f"Ошибка получения активных сессий: {e}") return [] async def _get_sessions_via_who(self) -> List[Dict]: """Получение сессий через команду who""" try: sessions = [] process = await asyncio.create_subprocess_exec( 'who', '-u', stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await process.communicate() if process.returncode == 0: lines = stdout.decode().strip().split('\n') for line in lines: if line.strip(): # Парсим вывод who # Формат: user tty date time (idle) pid (comment) match = re.match( r'(\w+)\s+(\w+)\s+(\d{4}-\d{2}-\d{2})\s+(\d{2}:\d{2})\s+.*?\((\d+)\)', line ) if match: username, tty, date, time, pid = match.groups() sessions.append({ 'username': username, 'tty': tty, 'login_date': date, 'login_time': time, 'pid': int(pid), 'type': 'who', 'status': 'active' }) else: # Альтернативный парсинг для разных форматов who parts = line.split() if len(parts) >= 2: username = parts[0] tty = parts[1] # Ищем PID в скобках pid_match = re.search(r'\((\d+)\)', line) pid = int(pid_match.group(1)) if pid_match else None sessions.append({ 'username': username, 'tty': tty, 'pid': pid, 'type': 'who', 'status': 'active', 'raw_line': line }) return sessions except Exception as e: logger.error(f"Ошибка получения сессий через who: {e}") return [] async def _get_sessions_via_ps(self) -> List[Dict]: """Получение SSH сессий через ps""" try: sessions = [] # Ищем SSH процессы process = await asyncio.create_subprocess_exec( 'ps', 'aux', stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await process.communicate() if process.returncode == 0: lines = stdout.decode().strip().split('\n') for line in lines[1:]: # Пропускаем заголовок if 'sshd:' in line and '@pts' in line: # Парсим SSH сессии parts = line.split() if len(parts) >= 11: username = parts[0] pid = int(parts[1]) # Извлекаем информацию из команды cmd_parts = ' '.join(parts[10:]) # Ищем пользователя и tty в команде sshd match = re.search(r'sshd:\s+(\w+)@(\w+)', cmd_parts) if match: ssh_user, tty = match.groups() sessions.append({ 'username': ssh_user, 'tty': tty, 'pid': pid, 'ppid': int(parts[2]), 'cpu': parts[2], 'mem': parts[3], 'start_time': parts[8], 'type': 'sshd', 'status': 'active', 'command': cmd_parts }) return sessions except Exception as e: logger.error(f"Ошибка получения SSH сессий через ps: {e}") return [] def _merge_session_info(self, sessions: List[Dict]) -> List[Dict]: """Объединение информации о сессиях и удаление дубликатов""" try: merged = {} for session in sessions: key = f"{session['username']}:{session.get('tty', 'unknown')}" if key in merged: # Обновляем существующую запись дополнительной информацией merged[key].update({k: v for k, v in session.items() if v is not None}) else: merged[key] = session.copy() # Добавляем дополнительную информацию о процессах for session in merged.values(): if session.get('pid'): try: # Получаем дополнительную информацию о процессе через psutil if psutil.pid_exists(session['pid']): proc = psutil.Process(session['pid']) session.update({ 'create_time': datetime.fromtimestamp(proc.create_time()).isoformat(), 'cpu_percent': proc.cpu_percent(), 'memory_info': proc.memory_info()._asdict(), 'connections': len(proc.connections()) }) except Exception: pass # Игнорируем ошибки получения доп. информации return list(merged.values()) except Exception as e: logger.error(f"Ошибка объединения информации о сессиях: {e}") return sessions async def get_user_sessions(self, username: str) -> List[Dict]: """Получение сессий конкретного пользователя""" try: all_sessions = await self.get_active_sessions() return [s for s in all_sessions if s['username'] == username] except Exception as e: logger.error(f"Ошибка получения сессий пользователя {username}: {e}") return [] async def terminate_session(self, pid: int) -> bool: """Завершение сессии по PID""" try: # Сначала пробуем TERM process = await asyncio.create_subprocess_exec( 'kill', '-TERM', str(pid), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) await process.communicate() if process.returncode == 0: # Ждем немного и проверяем await asyncio.sleep(2) if not psutil.pid_exists(pid): logger.info(f"✅ Сессия PID {pid} завершена через TERM") return True else: # Если не помогло - используем KILL process = await asyncio.create_subprocess_exec( 'kill', '-KILL', str(pid), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) await process.communicate() if process.returncode == 0: logger.info(f"🔪 Сессия PID {pid} принудительно завершена через KILL") return True logger.error(f"❌ Не удалось завершить сессию PID {pid}") return False except Exception as e: logger.error(f"Ошибка завершения сессии PID {pid}: {e}") return False async def terminate_user_sessions(self, username: str) -> int: """Завершение всех сессий пользователя""" try: user_sessions = await self.get_user_sessions(username) terminated = 0 for session in user_sessions: pid = session.get('pid') if pid: success = await self.terminate_session(pid) if success: terminated += 1 logger.info(f"Завершено {terminated} из {len(user_sessions)} сессий пользователя {username}") return terminated except Exception as e: logger.error(f"Ошибка завершения сессий пользователя {username}: {e}") return 0 async def get_session_details(self, pid: int) -> Optional[Dict]: """Получение детальной информации о сессии""" try: if not psutil.pid_exists(pid): return None proc = psutil.Process(pid) # Базовая информация о процессе details = { 'pid': pid, 'ppid': proc.ppid(), 'username': proc.username(), 'create_time': datetime.fromtimestamp(proc.create_time()).isoformat(), 'cpu_percent': proc.cpu_percent(), 'memory_info': proc.memory_info()._asdict(), 'status': proc.status(), 'cmdline': proc.cmdline(), 'cwd': proc.cwd(), 'exe': proc.exe() } # Сетевые соединения try: connections = [] for conn in proc.connections(): connections.append({ 'fd': conn.fd, 'family': str(conn.family), 'type': str(conn.type), 'local_address': f"{conn.laddr.ip}:{conn.laddr.port}" if conn.laddr else None, 'remote_address': f"{conn.raddr.ip}:{conn.raddr.port}" if conn.raddr else None, 'status': str(conn.status) }) details['connections'] = connections except Exception: details['connections'] = [] # Открытые файлы try: open_files = [] for file in proc.open_files()[:10]: # Ограничиваем 10 файлами open_files.append({ 'path': file.path, 'fd': file.fd, 'mode': file.mode }) details['open_files'] = open_files except Exception: details['open_files'] = [] # Переменные окружения (выборочно) try: env = proc.environ() safe_env = {} safe_keys = ['USER', 'HOME', 'SHELL', 'SSH_CLIENT', 'SSH_CONNECTION', 'TERM'] for key in safe_keys: if key in env: safe_env[key] = env[key] details['environment'] = safe_env except Exception: details['environment'] = {} return details except Exception as e: logger.error(f"Ошибка получения деталей сессии PID {pid}: {e}") return None async def monitor_session_activity(self, pid: int, duration: int = 60) -> List[Dict]: """Мониторинг активности сессии в течение времени""" try: if not psutil.pid_exists(pid): return [] activity_log = [] proc = psutil.Process(pid) start_time = datetime.now() end_time = start_time + timedelta(seconds=duration) while datetime.now() < end_time: try: # Снимок состояния процесса snapshot = { 'timestamp': datetime.now().isoformat(), 'cpu_percent': proc.cpu_percent(), 'memory_percent': proc.memory_percent(), 'num_threads': proc.num_threads(), 'num_fds': proc.num_fds(), 'status': proc.status() } # Проверяем новые соединения try: connections = len(proc.connections()) snapshot['connections_count'] = connections except Exception: snapshot['connections_count'] = 0 activity_log.append(snapshot) await asyncio.sleep(5) # Снимок каждые 5 секунд except psutil.NoSuchProcess: # Процесс завершился activity_log.append({ 'timestamp': datetime.now().isoformat(), 'event': 'process_terminated' }) break except Exception as e: activity_log.append({ 'timestamp': datetime.now().isoformat(), 'event': 'monitoring_error', 'error': str(e) }) return activity_log except Exception as e: logger.error(f"Ошибка мониторинга активности сессии PID {pid}: {e}") return [] async def get_session_statistics(self) -> Dict: """Получение общей статистики по сессиям""" try: sessions = await self.get_active_sessions() stats = { 'total_sessions': len(sessions), 'users': {}, 'tty_types': {}, 'session_ages': [], 'total_connections': 0 } for session in sessions: # Статистика по пользователям user = session['username'] if user not in stats['users']: stats['users'][user] = 0 stats['users'][user] += 1 # Статистика по типам TTY tty = session.get('tty', 'unknown') tty_type = 'console' if tty.startswith('tty') else 'ssh' if tty_type not in stats['tty_types']: stats['tty_types'][tty_type] = 0 stats['tty_types'][tty_type] += 1 # Возраст сессии if 'create_time' in session: try: create_time = datetime.fromisoformat(session['create_time']) age_seconds = (datetime.now() - create_time).total_seconds() stats['session_ages'].append(age_seconds) except Exception: pass # Количество соединений connections = session.get('connections', 0) if isinstance(connections, int): stats['total_connections'] += connections # Средний возраст сессий if stats['session_ages']: stats['average_session_age'] = sum(stats['session_ages']) / len(stats['session_ages']) else: stats['average_session_age'] = 0 return stats except Exception as e: logger.error(f"Ошибка получения статистики сессий: {e}") return {'error': str(e)} async def find_suspicious_sessions(self) -> List[Dict]: """Поиск подозрительных сессий""" try: sessions = await self.get_active_sessions() suspicious = [] for session in sessions: suspicion_score = 0 reasons = [] # Проверка 1: Много открытых соединений connections = session.get('connections', 0) if isinstance(connections, int) and connections > 10: suspicion_score += 2 reasons.append(f"Много соединений: {connections}") # Проверка 2: Высокое потребление CPU cpu = session.get('cpu_percent', 0) if isinstance(cpu, (int, float)) and cpu > 50: suspicion_score += 1 reasons.append(f"Высокая нагрузка CPU: {cpu}%") # Проверка 3: Долго активная сессия if 'create_time' in session: try: create_time = datetime.fromisoformat(session['create_time']) age_hours = (datetime.now() - create_time).total_seconds() / 3600 if age_hours > 24: # Больше суток suspicion_score += 1 reasons.append(f"Долгая сессия: {age_hours:.1f} часов") except Exception: pass # Проверка 4: Подозрительные команды в cmdline cmdline = session.get('cmdline', []) if isinstance(cmdline, list): suspicious_commands = ['nc', 'netcat', 'wget', 'curl', 'python', 'perl', 'bash'] for cmd in cmdline: if any(susp in cmd.lower() for susp in suspicious_commands): suspicion_score += 1 reasons.append(f"Подозрительная команда: {cmd}") break # Если набрали достаточно очков подозрительности if suspicion_score >= 2: session['suspicion_score'] = suspicion_score session['suspicion_reasons'] = reasons suspicious.append(session) return suspicious except Exception as e: logger.error(f"Ошибка поиска подозрительных сессий: {e}") return []