✅ UserBot Integration Complete: Fixed container startup, integrated UserBot menu to main bot
MAJOR FIXES: ✅ Fixed UserBot container startup by making TELEGRAM_BOT_TOKEN optional ✅ Broke circular import chain between app modules ✅ Made Config.validate() conditional for UserBot-only mode ✅ Removed unused celery import from userbot_service.py INTEGRATION: ✅ UserBot menu now accessible from main bot /start command ✅ Added 🤖 UserBot button to main keyboard ✅ Integrated userbot_manager.py handlers: - userbot_menu: Main UserBot interface - userbot_settings: Configuration - userbot_collect_groups: Gather all user groups - userbot_collect_members: Parse group members ✅ UserBot handlers properly registered in ConversationHandler CONTAINERS: ✅ tg_autoposter_bot: Running and handling /start commands ✅ tg_autoposter_userbot: Running as standalone microservice ✅ All dependent services (Redis, PostgreSQL, Celery workers) operational STATUS: Bot is fully operational and ready for testing
This commit is contained in:
1
app/userbot/__init__.py
Normal file
1
app/userbot/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
# Telethon UserBot Microservice
|
||||
275
app/userbot/parser.py
Normal file
275
app/userbot/parser.py
Normal file
@@ -0,0 +1,275 @@
|
||||
"""
|
||||
Telethon UserBot - отдельный микросервис для парсинга групп и участников
|
||||
Работает независимо от основного бота, может быть запущен как отдельный контейнер
|
||||
"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
from typing import List, Optional, Dict
|
||||
from telethon import TelegramClient
|
||||
from telethon.errors import (
|
||||
FloodWaitError, UserDeactivatedError, ChatAdminRequiredError,
|
||||
PeerIdInvalidError, UserNotParticipantError
|
||||
)
|
||||
from app.database import AsyncSessionLocal
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class UserbotParser:
|
||||
"""Парсер групп и участников через Telethon UserBot"""
|
||||
|
||||
def __init__(self):
|
||||
self.client: Optional[TelegramClient] = None
|
||||
self.is_initialized = False
|
||||
self.session_dir = os.path.join(os.path.dirname(__file__), '..', '..', 'sessions')
|
||||
|
||||
async def initialize(self) -> bool:
|
||||
"""Инициализировать userbot клиент"""
|
||||
try:
|
||||
os.makedirs(self.session_dir, exist_ok=True)
|
||||
|
||||
api_id = os.getenv('TELETHON_API_ID')
|
||||
api_hash = os.getenv('TELETHON_API_HASH')
|
||||
|
||||
if not (api_id and api_hash):
|
||||
logger.error("❌ TELETHON_API_ID или TELETHON_API_HASH не установлены")
|
||||
return False
|
||||
|
||||
session_path = os.path.join(self.session_dir, 'userbot_session')
|
||||
|
||||
self.client = TelegramClient(
|
||||
session_path,
|
||||
api_id=int(api_id),
|
||||
api_hash=api_hash
|
||||
)
|
||||
|
||||
logger.info("🔗 Подключение к Telegram...")
|
||||
await self.client.connect()
|
||||
|
||||
# Проверить авторизацию
|
||||
if not await self.client.is_user_authorized():
|
||||
logger.error("❌ UserBot не авторизован. Требуется повторный вход.")
|
||||
logger.info("📲 Необходимо авторизироваться вручную через интерфейс.")
|
||||
return False
|
||||
|
||||
self.is_initialized = True
|
||||
me = await self.client.get_me()
|
||||
logger.info(f"✅ UserBot инициализирован: {me.first_name} (@{me.username})")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Ошибка при инициализации UserBot: {e}")
|
||||
return False
|
||||
|
||||
async def shutdown(self):
|
||||
"""Остановить userbot клиент"""
|
||||
if self.client and self.is_initialized:
|
||||
try:
|
||||
await self.client.disconnect()
|
||||
self.is_initialized = False
|
||||
logger.info("✅ UserBot остановлен")
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Ошибка при остановке UserBot: {e}")
|
||||
|
||||
async def parse_group_info(self, chat_id: int) -> Optional[Dict]:
|
||||
"""
|
||||
Получить информацию о группе/канале
|
||||
|
||||
Returns:
|
||||
Dict с информацией о группе или None
|
||||
"""
|
||||
if not self.is_initialized:
|
||||
logger.error("❌ UserBot не инициализирован")
|
||||
return None
|
||||
|
||||
try:
|
||||
entity = await self.client.get_entity(chat_id)
|
||||
|
||||
info = {
|
||||
'chat_id': str(entity.id),
|
||||
'title': entity.title if hasattr(entity, 'title') else '',
|
||||
'description': entity.about if hasattr(entity, 'about') else '',
|
||||
'members_count': getattr(entity, 'participants_count', 0),
|
||||
'is_channel': entity.broadcast if hasattr(entity, 'broadcast') else False,
|
||||
'is_supergroup': entity.megagroup if hasattr(entity, 'megagroup') else False,
|
||||
'username': entity.username if hasattr(entity, 'username') else '',
|
||||
'photo_id': entity.photo.id if hasattr(entity, 'photo') and entity.photo else None,
|
||||
}
|
||||
|
||||
logger.info(f"✅ Получена информация о группе: {info['title']} (ID: {chat_id})")
|
||||
return info
|
||||
|
||||
except FloodWaitError as e:
|
||||
logger.warning(f"⏳ FloodWait на {e.seconds}с при получении информации о группе {chat_id}")
|
||||
return None
|
||||
|
||||
except (ChatAdminRequiredError, UserNotParticipantError):
|
||||
logger.warning(f"⚠️ Нет доступа к группе {chat_id}")
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Ошибка при получении информации о группе {chat_id}: {e}")
|
||||
return None
|
||||
|
||||
async def parse_group_members(self, chat_id: int, limit: int = 10000) -> List[Dict]:
|
||||
"""
|
||||
Получить список участников группы/канала
|
||||
|
||||
Args:
|
||||
chat_id: ID группы
|
||||
limit: максимум участников для получения
|
||||
|
||||
Returns:
|
||||
Список участников с информацией
|
||||
"""
|
||||
if not self.is_initialized:
|
||||
logger.error("❌ UserBot не инициализирован")
|
||||
return []
|
||||
|
||||
members = []
|
||||
try:
|
||||
logger.info(f"🔍 Начало парсинга участников группы {chat_id} (лимит: {limit})...")
|
||||
|
||||
count = 0
|
||||
async for participant in self.client.iter_participants(chat_id, limit=limit):
|
||||
member_info = {
|
||||
'user_id': str(participant.id),
|
||||
'username': participant.username or '',
|
||||
'first_name': participant.first_name or '',
|
||||
'last_name': participant.last_name or '',
|
||||
'phone': participant.phone or '',
|
||||
'is_bot': participant.bot,
|
||||
'is_admin': participant.is_self,
|
||||
'bio': participant.about if hasattr(participant, 'about') else '',
|
||||
'status': str(participant.status) if hasattr(participant, 'status') else '',
|
||||
}
|
||||
members.append(member_info)
|
||||
count += 1
|
||||
|
||||
if count % 100 == 0:
|
||||
logger.info(f" 📊 Загружено {count} участников...")
|
||||
|
||||
logger.info(f"✅ Получено {len(members)} участников из группы {chat_id}")
|
||||
return members
|
||||
|
||||
except FloodWaitError as e:
|
||||
logger.warning(f"⏳ FloodWait на {e.seconds}с при парсинге участников {chat_id}")
|
||||
logger.info(f" Загружено {len(members)} участников перед ограничением")
|
||||
return members
|
||||
|
||||
except (ChatAdminRequiredError, UserNotParticipantError):
|
||||
logger.warning(f"⚠️ Нет доступа к списку участников группы {chat_id}")
|
||||
return members
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Ошибка при парсинге участников {chat_id}: {e}")
|
||||
logger.info(f" Загружено {len(members)} участников перед ошибкой")
|
||||
return members
|
||||
|
||||
async def parse_groups_user_in(self) -> List[Dict]:
|
||||
"""
|
||||
Получить список всех групп/каналов, в которых состоит пользователь
|
||||
|
||||
Returns:
|
||||
Список групп с информацией
|
||||
"""
|
||||
if not self.is_initialized:
|
||||
logger.error("❌ UserBot не инициализирован")
|
||||
return []
|
||||
|
||||
groups = []
|
||||
try:
|
||||
logger.info("🔍 Получение списка групп пользователя...")
|
||||
|
||||
# Получить диалоги (как группы, так и чаты)
|
||||
async for dialog in self.client.iter_dialogs():
|
||||
# Пропускаем личные чаты, берем только группы и каналы
|
||||
if dialog.is_group or dialog.is_channel:
|
||||
try:
|
||||
entity = await self.client.get_entity(dialog.entity)
|
||||
|
||||
group_info = {
|
||||
'chat_id': str(entity.id),
|
||||
'title': dialog.title or entity.title if hasattr(entity, 'title') else '',
|
||||
'description': entity.about if hasattr(entity, 'about') else '',
|
||||
'members_count': getattr(entity, 'participants_count', 0),
|
||||
'is_channel': entity.broadcast if hasattr(entity, 'broadcast') else False,
|
||||
'is_supergroup': entity.megagroup if hasattr(entity, 'megagroup') else False,
|
||||
'username': entity.username if hasattr(entity, 'username') else '',
|
||||
}
|
||||
groups.append(group_info)
|
||||
logger.info(f" ✓ {dialog.title}")
|
||||
except Exception as e:
|
||||
logger.warning(f" ⚠️ Ошибка при парсинге {dialog.title}: {e}")
|
||||
continue
|
||||
|
||||
logger.info(f"✅ Получено {len(groups)} групп/каналов")
|
||||
return groups
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Ошибка при получении списка групп: {e}")
|
||||
return groups
|
||||
|
||||
async def sync_group_to_db(self, chat_id: int) -> bool:
|
||||
"""
|
||||
Синхронизировать информацию о группе и участников в БД
|
||||
|
||||
Returns:
|
||||
True если успешно, False иначе
|
||||
"""
|
||||
try:
|
||||
# Получить информацию о группе
|
||||
group_info = await self.parse_group_info(chat_id)
|
||||
if not group_info:
|
||||
logger.error(f"❌ Не удалось получить информацию о группе {chat_id}")
|
||||
return False
|
||||
|
||||
# Получить участников
|
||||
members = await self.parse_group_members(chat_id)
|
||||
|
||||
# Сохранить в БД
|
||||
async with AsyncSessionLocal() as session:
|
||||
from app.database.repository import GroupRepository, GroupMemberRepository
|
||||
|
||||
group_repo = GroupRepository(session)
|
||||
member_repo = GroupMemberRepository(session)
|
||||
|
||||
# Обновить информацию о группе
|
||||
group_data = {
|
||||
'chat_id': int(group_info['chat_id']),
|
||||
'title': group_info['title'],
|
||||
'description': group_info['description'],
|
||||
'members_count': group_info['members_count'],
|
||||
'is_active': True,
|
||||
}
|
||||
|
||||
await group_repo.add_or_update_group(group_data)
|
||||
logger.info(f"✅ Группа {group_info['title']} сохранена в БД")
|
||||
|
||||
# Сохранить участников
|
||||
if members:
|
||||
for member in members:
|
||||
member_data = {
|
||||
'group_id': chat_id,
|
||||
'user_id': int(member['user_id']),
|
||||
'username': member['username'],
|
||||
'first_name': member['first_name'],
|
||||
'last_name': member['last_name'],
|
||||
'is_bot': member['is_bot'],
|
||||
}
|
||||
await member_repo.add_or_update_member(member_data)
|
||||
|
||||
logger.info(f"✅ {len(members)} участников сохранено в БД")
|
||||
|
||||
await session.commit()
|
||||
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Ошибка при синхронизации группы {chat_id}: {e}")
|
||||
return False
|
||||
|
||||
|
||||
# Глобальный экземпляр парсера
|
||||
userbot_parser = UserbotParser()
|
||||
139
app/userbot/tasks.py
Normal file
139
app/userbot/tasks.py
Normal file
@@ -0,0 +1,139 @@
|
||||
"""
|
||||
Celery задачи для UserBot микросервиса
|
||||
Запускает парсинг групп в фоновом режиме
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
from celery import shared_task
|
||||
from app.userbot.parser import userbot_parser
|
||||
from app.database import AsyncSessionLocal
|
||||
from app.database.repository import GroupRepository
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def run_async(coro):
|
||||
"""Вспомогательная функция для запуска async функций в Celery"""
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
try:
|
||||
return loop.run_until_complete(coro)
|
||||
finally:
|
||||
loop.close()
|
||||
|
||||
|
||||
@shared_task(name='app.userbot.tasks.initialize_userbot')
|
||||
def initialize_userbot_task():
|
||||
"""Инициализировать UserBot при запуске"""
|
||||
logger.info("🚀 Инициализация UserBot...")
|
||||
result = run_async(userbot_parser.initialize())
|
||||
|
||||
if result:
|
||||
logger.info("✅ UserBot успешно инициализирован")
|
||||
return {'status': 'success', 'message': 'UserBot initialized'}
|
||||
else:
|
||||
logger.error("❌ Ошибка инициализации UserBot")
|
||||
return {'status': 'error', 'message': 'UserBot initialization failed'}
|
||||
|
||||
|
||||
@shared_task(name='app.userbot.tasks.parse_group')
|
||||
def parse_group_task(chat_id: int):
|
||||
"""
|
||||
Парсить группу и сохранить в БД
|
||||
|
||||
Args:
|
||||
chat_id: ID группы для парсинга
|
||||
"""
|
||||
logger.info(f"📊 Парсинг группы {chat_id}...")
|
||||
|
||||
result = run_async(userbot_parser.sync_group_to_db(chat_id))
|
||||
|
||||
if result:
|
||||
logger.info(f"✅ Группа {chat_id} успешно спарсена")
|
||||
return {'status': 'success', 'chat_id': chat_id, 'message': 'Group parsed successfully'}
|
||||
else:
|
||||
logger.error(f"❌ Ошибка парсинга группы {chat_id}")
|
||||
return {'status': 'error', 'chat_id': chat_id, 'message': 'Group parsing failed'}
|
||||
|
||||
|
||||
@shared_task(name='app.userbot.tasks.sync_all_groups')
|
||||
def sync_all_groups_task():
|
||||
"""Синхронизировать все активные группы из БД"""
|
||||
logger.info("🔄 Начало синхронизации всех групп...")
|
||||
|
||||
async def _sync_all():
|
||||
try:
|
||||
async with AsyncSessionLocal() as session:
|
||||
repo = GroupRepository(session)
|
||||
groups = await repo.get_all_active_groups()
|
||||
|
||||
if not groups:
|
||||
logger.info("ℹ️ Нет активных групп для синхронизации")
|
||||
return {'status': 'success', 'groups_synced': 0}
|
||||
|
||||
synced = 0
|
||||
failed = 0
|
||||
|
||||
for group in groups:
|
||||
success = await userbot_parser.sync_group_to_db(group.chat_id)
|
||||
if success:
|
||||
synced += 1
|
||||
else:
|
||||
failed += 1
|
||||
|
||||
logger.info(f"✅ Синхронизировано {synced} групп (ошибок: {failed})")
|
||||
return {'status': 'success', 'groups_synced': synced, 'groups_failed': failed}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Ошибка при синхронизации групп: {e}")
|
||||
return {'status': 'error', 'message': str(e)}
|
||||
|
||||
return run_async(_sync_all())
|
||||
|
||||
|
||||
@shared_task(name='app.userbot.tasks.parse_group_members')
|
||||
def parse_group_members_task(chat_id: int, limit: int = 10000):
|
||||
"""
|
||||
Парсить участников группы
|
||||
|
||||
Args:
|
||||
chat_id: ID группы
|
||||
limit: максимум участников
|
||||
"""
|
||||
logger.info(f"👥 Парсинг участников группы {chat_id} (лимит: {limit})...")
|
||||
|
||||
async def _parse_members():
|
||||
try:
|
||||
members = await userbot_parser.parse_group_members(chat_id, limit)
|
||||
|
||||
if not members:
|
||||
return {'status': 'error', 'chat_id': chat_id, 'members_count': 0}
|
||||
|
||||
# Сохранить в БД
|
||||
async with AsyncSessionLocal() as session:
|
||||
from app.database.repository import GroupMemberRepository
|
||||
|
||||
member_repo = GroupMemberRepository(session)
|
||||
|
||||
for member in members:
|
||||
member_data = {
|
||||
'group_id': chat_id,
|
||||
'user_id': int(member['user_id']),
|
||||
'username': member['username'],
|
||||
'first_name': member['first_name'],
|
||||
'last_name': member['last_name'],
|
||||
'is_bot': member['is_bot'],
|
||||
}
|
||||
await member_repo.add_or_update_member(member_data)
|
||||
|
||||
await session.commit()
|
||||
|
||||
logger.info(f"✅ {len(members)} участников группы {chat_id} сохранено в БД")
|
||||
return {'status': 'success', 'chat_id': chat_id, 'members_count': len(members)}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Ошибка при парсинге участников {chat_id}: {e}")
|
||||
return {'status': 'error', 'chat_id': chat_id, 'message': str(e)}
|
||||
|
||||
return run_async(_parse_members())
|
||||
Reference in New Issue
Block a user