Files
TG_autoposter/app/handlers/group_parser.py
2025-12-18 05:55:32 +09:00

314 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
import json
import re
from typing import List, Dict, Optional
from datetime import datetime
from telegram.ext import ContextTypes
from app.handlers.telethon_client import telethon_manager
from app.database.member_repository import GroupKeywordRepository, GroupStatisticsRepository
from app.database.repository import GroupRepository, MessageGroupRepository
logger = logging.getLogger(__name__)
class GroupParser:
"""Парсер для поиска и анализа групп по ключевым словам"""
def __init__(self, db_session, bot=None):
self.db_session = db_session
self.bot = bot
self.keyword_repo = GroupKeywordRepository(db_session)
self.stats_repo = GroupStatisticsRepository(db_session)
self.group_repo = GroupRepository(db_session)
async def parse_group_by_keywords(self, keywords: List[str], chat_id: int) -> Dict:
"""
Проанализировать группу и проверить совпадение с ключевыми словами
Args:
keywords: Список ключевых слов для поиска
chat_id: ID группы в Telegram
Returns:
dict: Результаты анализа группы
"""
if not telethon_manager.is_connected():
logger.warning("Telethon клиент не подключен, не могу получить информацию о группе")
return {'matched': False, 'keywords_found': []}
try:
chat_info = await telethon_manager.get_chat_info(chat_id)
if not chat_info:
return {'matched': False, 'keywords_found': []}
# Объединить название и описание для поиска
search_text = f"{chat_info.get('title', '')} {chat_info.get('description', '')}".lower()
# Найти совпадения ключевых слов
matched_keywords = []
for keyword in keywords:
if keyword.lower() in search_text:
matched_keywords.append(keyword)
result = {
'matched': len(matched_keywords) > 0,
'keywords_found': matched_keywords,
'chat_info': chat_info,
'match_count': len(matched_keywords),
'total_keywords': len(keywords),
'match_percentage': (len(matched_keywords) / len(keywords) * 100) if keywords else 0
}
logger.info(f"✅ Анализ группы {chat_id}: найдено {len(matched_keywords)} совпадений из {len(keywords)}")
return result
except Exception as e:
logger.error(f"❌ Ошибка при анализе группы {chat_id}: {e}")
return {'matched': False, 'keywords_found': []}
async def extract_keywords_from_text(self, text: str) -> List[str]:
"""
Извлечь ключевые слова из текста
Args:
text: Текст для извлечения ключевых слов
Returns:
List[str]: Список ключевых слов
"""
# Удалить спецсимволы и разбить на слова
words = re.findall(r'\b\w+\b', text.lower())
# Отфильтровать стоп-слова
stop_words = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for',
'the', 'is', 'are', 'was', 'were', 'be', 'been', 'being',
'я', 'ты', 'он', 'она', 'оно', 'мы', 'вы', 'они',
'и', 'или', 'но', 'в', 'на', 'к', 'по', 'с', 'о', 'об',
'что', 'как', 'где', 'когда', 'зачем', 'откуда', 'куда'}
keywords = [w for w in words if len(w) > 3 and w not in stop_words]
# Убрать дубликаты
return list(set(keywords))
async def parse_group_members(self, chat_id: int, member_repo,
limit: int = 100) -> Dict:
"""
Получить и сохранить список участников группы
Args:
chat_id: ID группы
member_repo: Репозиторий участников
limit: Максимум участников для загрузки
Returns:
dict: Статистика загруженных участников
"""
if not telethon_manager.is_connected():
logger.warning("Telethon клиент не подключен, не могу получить участников")
return {'success': False, 'members_added': 0}
try:
# Получить группу из БД
db_group = await self.group_repo.get_by_chat_id(str(chat_id))
if not db_group:
logger.warning(f"Группа {chat_id} не найдена в БД")
return {'success': False, 'members_added': 0}
# Получить участников
members = await telethon_manager.get_chat_members(chat_id, limit)
if not members:
return {'success': True, 'members_added': 0}
# Сохранить в БД
members_data = members # Уже в нужном формате из telethon_manager
# Очистить старых участников и добавить новых
await member_repo.clear_members(db_group.id)
added = await member_repo.bulk_add_members(db_group.id, members_data)
# Обновить статистику
admins = len([m for m in members_data if m.get('is_admin')])
bots = len([m for m in members_data if m.get('is_bot')])
await self.stats_repo.update_members_count(
db_group.id,
total=len(members_data),
admins=admins,
bots=bots
)
result = {
'success': True,
'members_added': len(added),
'admins_count': admins,
'bots_count': bots,
'users_count': len(members_data) - bots
}
logger.info(f"✅ Загружены участники группы {chat_id}: {result}")
return result
except Exception as e:
logger.error(f"❌ Ошибка при загрузке участников группы {chat_id}: {e}")
return {'success': False, 'members_added': 0}
async def search_groups_by_keywords(self, keywords: List[str],
group_ids: List[int] = None) -> Dict:
"""
Искать группы по ключевым словам из списка
Args:
keywords: Список ключевых слов для поиска
group_ids: Список ID групп для проверки (если None - проверить все)
Returns:
dict: Результаты поиска
"""
if not group_ids:
# Получить все активные группы
all_groups = await self.group_repo.get_active_groups()
group_ids = [g.id for g in all_groups]
results = {
'total_checked': len(group_ids),
'matched_groups': [],
'no_match': [],
'errors': []
}
for group_id in group_ids:
try:
# Получить группу
db_group = await self.group_repo.get_by_id(group_id)
if not db_group:
results['errors'].append({'group_id': group_id, 'error': 'Not found in DB'})
continue
# Анализировать
match_result = await self.parse_group_by_keywords(keywords, int(db_group.chat_id))
if match_result['matched']:
results['matched_groups'].append({
'group_id': group_id,
'chat_id': db_group.chat_id,
'title': db_group.title,
'keywords_found': match_result['keywords_found'],
'match_percentage': match_result['match_percentage']
})
else:
results['no_match'].append({
'group_id': group_id,
'chat_id': db_group.chat_id,
'title': db_group.title
})
except Exception as e:
logger.error(f"Ошибка при проверке группы {group_id}: {e}")
results['errors'].append({'group_id': group_id, 'error': str(e)})
logger.info(f"Поиск по ключевым словам завершен: найдено {len(results['matched_groups'])} групп")
return results
async def set_group_keywords(self, group_id: int, keywords: List[str],
description: str = None) -> bool:
"""
Установить ключевые слова для группы
Args:
group_id: ID группы в БД
keywords: Список ключевых слов
description: Описание для поиска
Returns:
bool: Успешность операции
"""
try:
# Сериализовать список в JSON
keywords_json = json.dumps(keywords)
# Проверить наличие записи
existing = await self.keyword_repo.get_keywords(group_id)
if existing:
await self.keyword_repo.update_keywords(group_id, keywords_json, description)
else:
await self.keyword_repo.add_keywords(group_id, keywords_json, description)
logger.info(f"Ключевые слова установлены для группы {group_id}: {keywords}")
return True
except Exception as e:
logger.error(f"Ошибка при установке ключевых слов: {e}")
return False
async def get_group_keywords(self, group_id: int) -> Optional[List[str]]:
"""
Получить ключевые слова для группы
Args:
group_id: ID группы в БД
Returns:
List[str]: Список ключевых слов или None
"""
try:
keyword_obj = await self.keyword_repo.get_keywords(group_id)
if not keyword_obj:
return None
return json.loads(keyword_obj.keywords)
except Exception as e:
logger.error(f"Ошибка при получении ключевых слов: {e}")
return None
async def format_group_info(self, group_id: int) -> str:
"""
Форматировать информацию о группе для вывода
Args:
group_id: ID группы в БД
Returns:
str: Отформатированная информация
"""
try:
group = await self.group_repo.get_by_id(group_id)
if not group:
return "Группа не найдена"
stats = await self.stats_repo.get_statistics(group_id)
keywords = await self.get_group_keywords(group_id)
info = f"<b>Группа:</b> {group.title}\n"
info += f"<b>Chat ID:</b> <code>{group.chat_id}</code>\n"
info += f"<b>Активна:</b> {'✅ Да' if group.is_active else '❌ Нет'}\n"
if stats:
info += f"\n<b>Статистика:</b>\n"
info += f" Участников: {stats.total_members}\n"
info += f" Администраторов: {stats.total_admins}\n"
info += f" Ботов: {stats.total_bots}\n"
info += f" Отправлено: {stats.messages_sent}\n"
info += f" Через клиент: {stats.messages_via_client}\n"
info += f" Может отправлять как бот: {'' if stats.can_send_as_bot else ''}\n"
info += f" Может отправлять как клиент: {'' if stats.can_send_as_client else ''}\n"
if keywords:
info += f"\n<b>Ключевые слова:</b>\n"
for kw in keywords:
info += f"{kw}\n"
return info
except Exception as e:
logger.error(f"Ошибка при форматировании информации: {e}")
return "Ошибка при получении информации"