refactor
Some checks failed
continuous-integration/drone/pr Build is failing

This commit is contained in:
2026-02-17 00:22:42 +09:00
parent ca0c63a89c
commit 0fdad07d82
36 changed files with 4384 additions and 368 deletions

View File

@@ -0,0 +1,176 @@
"""
Сервис для отслеживания активности пользователей
и автоматической блокировки неактивных
"""
from datetime import datetime, timezone, timedelta
from sqlalchemy import select, and_, update
from sqlalchemy.ext.asyncio import AsyncSession
from typing import List
import logging
from .models import User, BlockedUser
from .database import async_session_maker
logger = logging.getLogger(__name__)
class ActivityService:
"""Сервис для управления активностью пользователей"""
# Период неактивности в днях (по умолчанию 30 дней)
INACTIVITY_PERIOD_DAYS = 30
@staticmethod
async def update_user_activity(session: AsyncSession, telegram_id: int) -> None:
"""
Обновить last_activity для пользователя
Args:
session: Сессия БД
telegram_id: Telegram ID пользователя
"""
try:
stmt = (
update(User)
.where(User.telegram_id == telegram_id)
.values(last_activity=datetime.now(timezone.utc))
)
await session.execute(stmt)
await session.commit()
except Exception as e:
logger.error(f"Ошибка обновления активности пользователя {telegram_id}: {e}")
await session.rollback()
@staticmethod
async def get_inactive_users(
session: AsyncSession,
days: int = None
) -> List[User]:
"""
Получить список неактивных пользователей
Args:
session: Сессия БД
days: Количество дней неактивности (по умолчанию INACTIVITY_PERIOD_DAYS)
Returns:
Список неактивных пользователей
"""
if days is None:
days = ActivityService.INACTIVITY_PERIOD_DAYS
cutoff_date = datetime.now(timezone.utc) - timedelta(days=days)
stmt = select(User).where(
and_(
User.last_activity < cutoff_date,
User.is_registered == True
)
)
result = await session.execute(stmt)
return list(result.scalars().all())
@staticmethod
async def mark_inactive_users(session: AsyncSession, days: int = None) -> int:
"""
Пометить неактивных пользователей как заблокированных
Args:
session: Сессия БД
days: Количество дней неактивности
Returns:
Количество помеченных пользователей
"""
try:
inactive_users = await ActivityService.get_inactive_users(session, days)
marked_count = 0
for user in inactive_users:
# Проверяем, не помечен ли уже
stmt = select(BlockedUser).where(
and_(
BlockedUser.telegram_id == user.telegram_id,
BlockedUser.error_type == 'inactive',
BlockedUser.is_active == True
)
)
result = await session.execute(stmt)
existing = result.scalar_one_or_none()
if not existing:
# Создаем новую запись
blocked = BlockedUser(
telegram_id=user.telegram_id,
error_type='inactive',
error_message=f'User inactive for {days} days',
first_blocked_at=datetime.now(timezone.utc),
last_attempt_at=datetime.now(timezone.utc),
attempt_count=1,
is_active=True
)
session.add(blocked)
marked_count += 1
logger.info(f"Пользователь {user.telegram_id} помечен как неактивный (последняя активность: {user.last_activity})")
await session.commit()
return marked_count
except Exception as e:
logger.error(f"Ошибка при пометке неактивных пользователей: {e}")
await session.rollback()
return 0
@staticmethod
async def reactivate_user(session: AsyncSession, telegram_id: int) -> bool:
"""
Реактивировать пользователя (убрать из списка заблокированных по неактивности)
Args:
session: Сессия БД
telegram_id: Telegram ID пользователя
Returns:
True если пользователь реактивирован
"""
try:
# Обновляем активность
await ActivityService.update_user_activity(session, telegram_id)
# Деактивируем запись о блокировке по неактивности
stmt = (
update(BlockedUser)
.where(
and_(
BlockedUser.telegram_id == telegram_id,
BlockedUser.error_type == 'inactive',
BlockedUser.is_active == True
)
)
.values(is_active=False)
)
await session.execute(stmt)
await session.commit()
logger.info(f"Пользователь {telegram_id} реактивирован")
return True
except Exception as e:
logger.error(f"Ошибка реактивации пользователя {telegram_id}: {e}")
await session.rollback()
return False
@staticmethod
async def check_and_mark_inactive_users() -> int:
"""
Проверить и пометить всех неактивных пользователей
Используется для периодического запуска
Returns:
Количество помеченных пользователей
"""
async with async_session_maker() as session:
marked = await ActivityService.mark_inactive_users(session)
logger.info(f"Проверка неактивных пользователей завершена. Помечено: {marked}")
return marked

View File

@@ -0,0 +1,495 @@
"""
Сервисы для системы рассылок с поддержкой Redis очередей
"""
import asyncio
import json
import logging
from typing import Optional, List, Dict, Tuple, Any
from datetime import datetime, timezone
from aiogram import Bot
from aiogram.types import Message
from aiogram.exceptions import TelegramBadRequest, TelegramForbiddenError, TelegramRetryAfter
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
import redis.asyncio as redis
from .models import User, BlockedUser, BroadcastLog, BroadcastChannel
from .config import REDIS_URL, ADMIN_IDS
from .database import async_session_maker
logger = logging.getLogger(__name__)
class RedisQueue:
"""Класс для работы с Redis очередями"""
def __init__(self, redis_url: str = REDIS_URL):
self.redis_url = redis_url
self._redis: Optional[redis.Redis] = None
async def connect(self):
"""Подключение к Redis"""
if self._redis is None:
self._redis = await redis.from_url(self.redis_url, decode_responses=False)
async def disconnect(self):
"""Отключение от Redis"""
if self._redis:
await self._redis.close()
self._redis = None
async def add_to_queue(self, queue_name: str, data: Dict) -> int:
"""
Добавить элемент в очередь
Args:
queue_name: Название очереди
data: Данные для добавления
Returns:
int: Длина очереди после добавления
"""
await self.connect()
serialized = json.dumps(data).encode('utf-8')
return await self._redis.rpush(queue_name, serialized)
async def get_from_queue(self, queue_name: str, timeout: int = 0) -> Optional[Dict]:
"""
Получить элемент из очереди (блокирующая операция)
Args:
queue_name: Название очереди
timeout: Таймаут ожидания в секундах (0 = бесконечно)
Returns:
Dict или None
"""
await self.connect()
result = await self._redis.blpop(queue_name, timeout=timeout)
if result:
_, data = result
return json.loads(data.decode('utf-8'))
return None
async def get_queue_length(self, queue_name: str) -> int:
"""Получить длину очереди"""
await self.connect()
return await self._redis.llen(queue_name)
async def clear_queue(self, queue_name: str):
"""Очистить очередь"""
await self.connect()
await self._redis.delete(queue_name)
class BroadcastService:
"""Сервис для управления рассылками"""
# Константы для очередей
QUEUE_BROADCAST = "broadcast_queue"
QUEUE_FAILED = "broadcast_failed_queue"
# Лимиты Telegram
BATCH_SIZE = 30 # Сообщений в пакете
BATCH_DELAY = 1.0 # Задержка между пакетами (секунды)
RETRY_AFTER_DELAY = 5.0 # Дополнительная задержка при FloodWait
def __init__(self):
self.redis_queue = RedisQueue()
async def check_user_blocked(self, session: AsyncSession, telegram_id: int) -> bool:
"""
Проверить, заблокирован ли пользователь
Args:
session: Сессия БД
telegram_id: Telegram ID пользователя
Returns:
bool: True если заблокирован
"""
stmt = select(BlockedUser).where(
BlockedUser.telegram_id == telegram_id,
BlockedUser.is_active == True
)
result = await session.execute(stmt)
return result.scalar_one_or_none() is not None
async def mark_user_blocked(
self,
session: AsyncSession,
telegram_id: int,
error_type: str,
error_message: str
):
"""
Отметить пользователя как заблокированного
Args:
session: Сессия БД
telegram_id: Telegram ID пользователя
error_type: Тип ошибки
error_message: Сообщение об ошибке
"""
# Проверяем, есть ли уже запись
stmt = select(BlockedUser).where(BlockedUser.telegram_id == telegram_id)
result = await session.execute(stmt)
blocked_user = result.scalar_one_or_none()
if blocked_user:
# Обновляем существующую запись
blocked_user.error_type = error_type
blocked_user.error_message = error_message
blocked_user.last_attempt_at = datetime.now(timezone.utc)
blocked_user.attempt_count += 1
blocked_user.is_active = True
else:
# Создаем новую запись
blocked_user = BlockedUser(
telegram_id=telegram_id,
error_type=error_type,
error_message=error_message
)
session.add(blocked_user)
await session.commit()
logger.info(f"Пользователь {telegram_id} отмечен как заблокированный: {error_type}")
async def unblock_user(self, session: AsyncSession, telegram_id: int):
"""
Разблокировать пользователя (если сообщение успешно доставлено)
Args:
session: Сессия БД
telegram_id: Telegram ID пользователя
"""
stmt = select(BlockedUser).where(
BlockedUser.telegram_id == telegram_id,
BlockedUser.is_active == True
)
result = await session.execute(stmt)
blocked_user = result.scalar_one_or_none()
if blocked_user:
blocked_user.is_active = False
await session.commit()
logger.info(f"Пользователь {telegram_id} разблокирован")
async def send_message_to_user(
self,
bot: Bot,
user: User,
message: Message
) -> Tuple[bool, Optional[str]]:
"""
Отправить сообщение пользователю с обработкой ошибок
Args:
bot: Инстанс бота
user: Объект пользователя
message: Сообщение для отправки
Returns:
Tuple[bool, Optional[str]]: (успех, тип_ошибки)
"""
try:
# Проверяем, не заблокирован ли пользователь
async with async_session_maker() as session:
is_blocked = await self.check_user_blocked(session, user.telegram_id)
if is_blocked:
logger.debug(f"Пропускаем заблокированного пользователя {user.telegram_id}")
return False, "blocked"
# Отправляем сообщение
if message.text:
await bot.send_message(
user.telegram_id,
message.text,
parse_mode="Markdown"
)
elif message.photo:
await bot.send_photo(
user.telegram_id,
photo=message.photo[-1].file_id,
caption=message.caption,
parse_mode="Markdown"
)
elif message.video:
await bot.send_video(
user.telegram_id,
video=message.video.file_id,
caption=message.caption,
parse_mode="Markdown"
)
elif message.document:
await bot.send_document(
user.telegram_id,
document=message.document.file_id,
caption=message.caption,
parse_mode="Markdown"
)
else:
# Копируем сообщение как есть
await message.copy_to(user.telegram_id)
# Если успешно - разблокируем пользователя (на случай если он был заблокирован ранее)
async with async_session_maker() as session:
await self.unblock_user(session, user.telegram_id)
return True, None
except TelegramForbiddenError as e:
# Пользователь заблокировал бота
error_type = "blocked_bot"
async with async_session_maker() as session:
await self.mark_user_blocked(session, user.telegram_id, error_type, str(e))
return False, error_type
except TelegramBadRequest as e:
# Пользователь удален или деактивирован
error_str = str(e).lower()
if "user is deactivated" in error_str:
error_type = "deactivated"
elif "user not found" in error_str:
error_type = "not_found"
elif "chat not found" in error_str:
error_type = "chat_not_found"
else:
error_type = "bad_request"
async with async_session_maker() as session:
await self.mark_user_blocked(session, user.telegram_id, error_type, str(e))
return False, error_type
except TelegramRetryAfter as e:
# FloodWait - слишком много запросов
logger.warning(f"FloodWait для пользователя {user.telegram_id}: ждем {e.retry_after} сек")
await asyncio.sleep(e.retry_after + self.RETRY_AFTER_DELAY)
# Повторная попытка
return await self.send_message_to_user(bot, user, message)
except Exception as e:
# Другие ошибки
logger.error(f"Ошибка отправки пользователю {user.telegram_id}: {e}")
return False, "unknown_error"
async def broadcast_to_users(
self,
bot: Bot,
message: Message,
admin_id: int,
users: Optional[List[User]] = None
) -> Dict[str, Any]:
"""
Рассылка сообщений пользователям через Redis очередь
Args:
bot: Инстанс бота
message: Сообщение для рассылки
admin_id: ID администратора, который запустил рассылку
users: Список пользователей (если None - всем зарегистрированным)
Returns:
Dict: Статистика рассылки
"""
# Создаем лог рассылки
async with async_session_maker() as session:
broadcast_log = BroadcastLog(
broadcast_type='direct',
message_type=message.content_type,
message_text=message.text or message.caption,
file_id=self._get_file_id(message),
created_by=admin_id,
status='in_progress'
)
session.add(broadcast_log)
await session.commit()
await session.refresh(broadcast_log)
log_id = broadcast_log.id
# Получаем список пользователей
if users is None:
async with async_session_maker() as session:
# Получаем всех зарегистрированных пользователей
stmt = select(User).where(User.is_registered == True)
result = await session.execute(stmt)
all_users = result.scalars().all()
# Получаем список заблокированных пользователей
blocked_stmt = select(BlockedUser.telegram_id).where(
BlockedUser.is_active == True
)
blocked_result = await session.execute(blocked_stmt)
blocked_ids = set(row[0] for row in blocked_result.fetchall())
# Фильтруем пользователей, исключая заблокированных
users = [u for u in all_users if u.telegram_id not in blocked_ids]
total_users = len(users)
success_count = 0
failed_count = 0
blocked_count = 0
# Рассылаем пакетами
for i in range(0, total_users, self.BATCH_SIZE):
batch = users[i:i + self.BATCH_SIZE]
# Отправляем пакет
tasks = []
for user in batch:
tasks.append(self.send_message_to_user(bot, user, message))
# Ждем завершения пакета
results = await asyncio.gather(*tasks, return_exceptions=True)
# Подсчитываем результаты
for result in results:
if isinstance(result, Exception):
failed_count += 1
elif result[0]: # success
success_count += 1
else:
failed_count += 1
if result[1] in ['blocked_bot', 'deactivated', 'not_found']:
blocked_count += 1
# Задержка между пакетами
if i + self.BATCH_SIZE < total_users:
await asyncio.sleep(self.BATCH_DELAY)
# Обновляем лог
async with async_session_maker() as session:
stmt = select(BroadcastLog).where(BroadcastLog.id == log_id)
result = await session.execute(stmt)
broadcast_log = result.scalar_one()
broadcast_log.total_recipients = total_users
broadcast_log.success_count = success_count
broadcast_log.failed_count = failed_count
broadcast_log.blocked_count = blocked_count
broadcast_log.completed_at = datetime.now(timezone.utc)
broadcast_log.status = 'completed'
await session.commit()
return {
'total': total_users,
'success': success_count,
'failed': failed_count,
'blocked': blocked_count
}
async def broadcast_to_channel(
self,
bot: Bot,
message: Message,
channel_id: int,
admin_id: int
) -> bool:
"""
Отправка сообщения в канал
Args:
bot: Инстанс бота
message: Сообщение для отправки
channel_id: ID канала
admin_id: ID администратора
Returns:
bool: Успех операции
"""
# Создаем лог
async with async_session_maker() as session:
broadcast_log = BroadcastLog(
broadcast_type='channel',
target_id=channel_id,
message_type=message.content_type,
message_text=message.text or message.caption,
file_id=self._get_file_id(message),
created_by=admin_id,
total_recipients=1,
status='in_progress'
)
session.add(broadcast_log)
await session.commit()
await session.refresh(broadcast_log)
log_id = broadcast_log.id
try:
# Отправляем в канал
if message.text:
await bot.send_message(channel_id, message.text, parse_mode="Markdown")
elif message.photo:
await bot.send_photo(
channel_id,
photo=message.photo[-1].file_id,
caption=message.caption,
parse_mode="Markdown"
)
elif message.video:
await bot.send_video(
channel_id,
video=message.video.file_id,
caption=message.caption,
parse_mode="Markdown"
)
elif message.document:
await bot.send_document(
channel_id,
document=message.document.file_id,
caption=message.caption,
parse_mode="Markdown"
)
else:
await message.copy_to(channel_id)
# Обновляем лог
async with async_session_maker() as session:
stmt = select(BroadcastLog).where(BroadcastLog.id == log_id)
result = await session.execute(stmt)
broadcast_log = result.scalar_one()
broadcast_log.success_count = 1
broadcast_log.completed_at = datetime.now(timezone.utc)
broadcast_log.status = 'completed'
await session.commit()
return True
except Exception as e:
logger.error(f"Ошибка отправки в канал {channel_id}: {e}")
# Обновляем лог
async with async_session_maker() as session:
stmt = select(BroadcastLog).where(BroadcastLog.id == log_id)
result = await session.execute(stmt)
broadcast_log = result.scalar_one()
broadcast_log.failed_count = 1
broadcast_log.completed_at = datetime.now(timezone.utc)
broadcast_log.status = 'failed'
await session.commit()
return False
def _get_file_id(self, message: Message) -> Optional[str]:
"""Получить file_id из сообщения"""
if message.photo:
return message.photo[-1].file_id
elif message.video:
return message.video.file_id
elif message.document:
return message.document.file_id
elif message.animation:
return message.animation.file_id
elif message.voice:
return message.voice.file_id
elif message.audio:
return message.audio.file_id
return None
# Глобальный экземпляр сервиса
broadcast_service = BroadcastService()

View File

@@ -360,7 +360,16 @@ class ChatPermissionService:
if settings and settings.global_ban:
return False, "Чат временно закрыт администратором"
# Проверяем личный бан
# Проверяем is_chat_banned в модели User
from .models import User
stmt = select(User).where(User.telegram_id == telegram_id)
result = await session.execute(stmt)
user = result.scalar_one_or_none()
if user and user.is_chat_banned:
return False, "Вы заблокированы и не можете отправлять сообщения в чат"
# Проверяем личный бан (старая система через BannedUser)
is_banned = await BanService.is_banned(session, telegram_id)
if is_banned:
return False, "Вы заблокированы и не можете отправлять сообщения"

View File

@@ -12,6 +12,9 @@ if not BOT_TOKEN:
# База данных
DATABASE_URL = os.getenv("DATABASE_URL", "sqlite+aiosqlite:///./lottery_bot.db")
# Redis
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
# Администраторы
ADMIN_IDS = []
admin_ids_str = os.getenv("ADMIN_IDS", "")

View File

@@ -19,7 +19,9 @@ class User(Base):
club_card_number = Column(String(50), unique=True, nullable=True, index=True) # Номер клубной карты
is_registered = Column(Boolean, default=False) # Прошел ли полную регистрацию
is_admin = Column(Boolean, default=False)
is_chat_banned = Column(Boolean, default=False) # Заблокирован ли в чате бота
created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))
last_activity = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc)) # Последняя активность
# Секретный код для верификации выигрыша (генерируется при регистрации)
verification_code = Column(String(10), unique=True, nullable=True)
@@ -242,4 +244,72 @@ class P2PMessage(Base):
reply_to = relationship("P2PMessage", remote_side=[id], backref="replies")
def __repr__(self):
return f"<P2PMessage(id={self.id}, from={self.sender_id}, to={self.recipient_id})>"
return f"<P2PMessage(id={self.id}, from={self.sender_id}, to={self.recipient_id})>"
class BroadcastChannel(Base):
"""Каналы и группы для рассылки"""
__tablename__ = "broadcast_channels"
id = Column(Integer, primary_key=True)
chat_id = Column(BigInteger, nullable=False, unique=True, index=True) # ID канала или группы
chat_type = Column(String(20), nullable=False) # 'channel' или 'group'
title = Column(String(255), nullable=False) # Название
username = Column(String(255), nullable=True) # Username (если есть)
description = Column(Text, nullable=True) # Описание
is_active = Column(Boolean, default=True, index=True) # Активен ли для рассылок
added_by = Column(Integer, ForeignKey("users.id"), nullable=False)
created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))
updated_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc))
# Связи
admin = relationship("User")
def __repr__(self):
return f"<BroadcastChannel(id={self.id}, title={self.title}, type={self.chat_type})>"
class BlockedUser(Base):
"""Пользователи, которые заблокировали бота или недоступны"""
__tablename__ = "blocked_users"
id = Column(Integer, primary_key=True)
telegram_id = Column(BigInteger, nullable=False, unique=True, index=True)
error_type = Column(String(100), nullable=False) # тип ошибки (blocked, deleted, deactivated, etc.)
error_message = Column(Text, nullable=True) # Полное сообщение об ошибке
first_blocked_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))
last_attempt_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))
attempt_count = Column(Integer, default=1) # Количество неудачных попыток
is_active = Column(Boolean, default=True, index=True) # Активна ли блокировка
def __repr__(self):
return f"<BlockedUser(telegram_id={self.telegram_id}, error={self.error_type})>"
class BroadcastLog(Base):
"""История рассылок"""
__tablename__ = "broadcast_logs"
id = Column(Integer, primary_key=True)
broadcast_type = Column(String(20), nullable=False, index=True) # 'direct', 'channel', 'group'
target_id = Column(BigInteger, nullable=True) # ID канала/группы (null для direct)
message_type = Column(String(20), nullable=False) # text, photo, video, etc.
message_text = Column(Text, nullable=True) # Текст сообщения
file_id = Column(String(255), nullable=True) # ID файла (если есть)
# Статистика
total_recipients = Column(Integer, default=0) # Всего получателей
success_count = Column(Integer, default=0) # Успешно доставлено
failed_count = Column(Integer, default=0) # Не доставлено
blocked_count = Column(Integer, default=0) # Заблокировали бота
# Метаданные
created_by = Column(Integer, ForeignKey("users.id"), nullable=False)
started_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))
completed_at = Column(DateTime(timezone=True), nullable=True)
status = Column(String(20), default='pending', index=True) # pending, in_progress, completed, failed
# Связи
admin = relationship("User")
def __repr__(self):
return f"<BroadcastLog(id={self.id}, type={self.broadcast_type}, status={self.status})>"

56
src/core/scheduler.py Normal file
View File

@@ -0,0 +1,56 @@
"""
Планировщик фоновых задач для бота
"""
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
import logging
from src.core.activity_service import ActivityService
logger = logging.getLogger(__name__)
class BotScheduler:
"""Планировщик задач для бота"""
def __init__(self):
self.scheduler = AsyncIOScheduler()
def setup_jobs(self):
"""Настройка всех периодических задач"""
# Проверка неактивных пользователей каждый день в 03:00
self.scheduler.add_job(
self._check_inactive_users,
trigger=CronTrigger(hour=3, minute=0),
id='check_inactive_users',
name='Проверка неактивных пользователей',
replace_existing=True
)
logger.info("Планировщик задач настроен")
async def _check_inactive_users(self):
"""Проверка и блокировка неактивных пользователей"""
try:
logger.info("Запуск проверки неактивных пользователей")
marked = await ActivityService.check_and_mark_inactive_users()
logger.info(f"Проверка завершена. Неактивных пользователей помечено: {marked}")
except Exception as e:
logger.error(f"Ошибка при проверке неактивных пользователей: {e}", exc_info=True)
def start(self):
"""Запуск планировщика"""
self.setup_jobs()
self.scheduler.start()
logger.info("Планировщик задач запущен")
def shutdown(self):
"""Остановка планировщика"""
if self.scheduler.running:
self.scheduler.shutdown()
logger.info("Планировщик задач остановлен")
# Глобальный экземпляр планировщика
bot_scheduler = BotScheduler()

257
src/core/user_management.py Normal file
View File

@@ -0,0 +1,257 @@
"""
Сервис управления пользователями с поиском и пагинацией
"""
from datetime import datetime, timezone
from sqlalchemy import select, or_, func, and_
from sqlalchemy.ext.asyncio import AsyncSession
from typing import List, Dict, Any, Optional, Tuple
import logging
from .models import User
from .database import async_session_maker
logger = logging.getLogger(__name__)
class UserManagementService:
"""Сервис для управления пользователями"""
# Количество пользователей на странице
USERS_PER_PAGE = 15
@staticmethod
async def search_users(
session: AsyncSession,
query: str = None,
page: int = 1,
per_page: int = None,
filters: Dict[str, Any] = None
) -> Tuple[List[User], int]:
"""
Поиск пользователей с фильтрацией и пагинацией
Args:
session: Сессия БД
query: Поисковый запрос (ищет по username, имени, telegram_id, номеру карты)
page: Номер страницы (начиная с 1)
per_page: Количество на странице (по умолчанию USERS_PER_PAGE)
filters: Дополнительные фильтры:
- is_registered: bool
- is_admin: bool
- is_chat_banned: bool
Returns:
Tuple[List[User], int]: Список пользователей и общее количество
"""
if per_page is None:
per_page = UserManagementService.USERS_PER_PAGE
# Базовый запрос
stmt = select(User)
conditions = []
# Поисковый запрос
if query and query.strip():
query = query.strip()
search_conditions = []
# Поиск по username
if query.startswith('@'):
search_conditions.append(User.username.ilike(f'%{query[1:]}%'))
else:
# Поиск по всем полям
search_conditions.append(User.username.ilike(f'%{query}%'))
search_conditions.append(User.first_name.ilike(f'%{query}%'))
search_conditions.append(User.last_name.ilike(f'%{query}%'))
search_conditions.append(User.nickname.ilike(f'%{query}%'))
search_conditions.append(User.club_card_number.ilike(f'%{query}%'))
# Если запрос - число, ищем по telegram_id
if query.isdigit():
search_conditions.append(User.telegram_id == int(query))
conditions.append(or_(*search_conditions))
# Применяем фильтры
if filters:
if 'is_registered' in filters:
conditions.append(User.is_registered == filters['is_registered'])
if 'is_admin' in filters:
conditions.append(User.is_admin == filters['is_admin'])
if 'is_chat_banned' in filters:
conditions.append(User.is_chat_banned == filters['is_chat_banned'])
# Добавляем условия к запросу
if conditions:
stmt = stmt.where(and_(*conditions))
# Получаем общее количество
count_stmt = select(func.count()).select_from(stmt.subquery())
total_result = await session.execute(count_stmt)
total = total_result.scalar()
# Применяем сортировку и пагинацию
stmt = stmt.order_by(User.created_at.desc())
offset = (page - 1) * per_page
stmt = stmt.limit(per_page).offset(offset)
# Выполняем запрос
result = await session.execute(stmt)
users = list(result.scalars().all())
return users, total
@staticmethod
async def get_user_by_id(session: AsyncSession, user_id: int) -> Optional[User]:
"""Получить пользователя по ID"""
stmt = select(User).where(User.id == user_id)
result = await session.execute(stmt)
return result.scalar_one_or_none()
@staticmethod
async def get_user_by_telegram_id(session: AsyncSession, telegram_id: int) -> Optional[User]:
"""Получить пользователя по Telegram ID"""
stmt = select(User).where(User.telegram_id == telegram_id)
result = await session.execute(stmt)
return result.scalar_one_or_none()
@staticmethod
async def ban_user_in_chat(session: AsyncSession, user_id: int) -> bool:
"""
Заблокировать пользователя в чате
Args:
session: Сессия БД
user_id: ID пользователя
Returns:
bool: Успех операции
"""
try:
user = await UserManagementService.get_user_by_id(session, user_id)
if not user:
return False
user.is_chat_banned = True
await session.commit()
logger.info(f"Пользователь {user.telegram_id} заблокирован в чате")
return True
except Exception as e:
logger.error(f"Ошибка блокировки пользователя {user_id} в чате: {e}")
await session.rollback()
return False
@staticmethod
async def unban_user_in_chat(session: AsyncSession, user_id: int) -> bool:
"""
Разблокировать пользователя в чате
Args:
session: Сессия БД
user_id: ID пользователя
Returns:
bool: Успех операции
"""
try:
user = await UserManagementService.get_user_by_id(session, user_id)
if not user:
return False
user.is_chat_banned = False
await session.commit()
logger.info(f"Пользователь {user.telegram_id} разблокирован в чате")
return True
except Exception as e:
logger.error(f"Ошибка разблокировки пользователя {user_id} в чате: {e}")
await session.rollback()
return False
@staticmethod
async def get_user_stats(session: AsyncSession) -> Dict[str, int]:
"""
Получить статистику по пользователям
Returns:
Dict: Статистика
"""
# Общее количество
total_stmt = select(func.count(User.id))
total_result = await session.execute(total_stmt)
total = total_result.scalar()
# Зарегистрированные
registered_stmt = select(func.count(User.id)).where(User.is_registered == True)
registered_result = await session.execute(registered_stmt)
registered = registered_result.scalar()
# Админы
admin_stmt = select(func.count(User.id)).where(User.is_admin == True)
admin_result = await session.execute(admin_stmt)
admins = admin_result.scalar()
# Заблокированные в чате
banned_stmt = select(func.count(User.id)).where(User.is_chat_banned == True)
banned_result = await session.execute(banned_stmt)
banned = banned_result.scalar()
return {
'total': total,
'registered': registered,
'admins': admins,
'chat_banned': banned
}
@staticmethod
def format_user_info(user: User, detailed: bool = False) -> str:
"""
Форматировать информацию о пользователе для отображения
Args:
user: Пользователь
detailed: Детальная информация
Returns:
str: Форматированная информация
"""
# Базовая информация
info = f"👤 <b>{user.first_name}"
if user.last_name:
info += f" {user.last_name}"
info += "</b>"
if user.username:
info += f" (@{user.username})"
info += f"\n🆔 ID: <code>{user.telegram_id}</code>"
# Статусы
statuses = []
if user.is_admin:
statuses.append("👑 Админ")
if user.is_registered:
statuses.append("✅ Зарегистрирован")
if user.is_chat_banned:
statuses.append("🚫 Заблокирован в чате")
if statuses:
info += "\n" + " | ".join(statuses)
# Детальная информация
if detailed:
if user.nickname:
info += f"\n📝 Никнейм: {user.nickname}"
if user.club_card_number:
info += f"\n🎫 Клубная карта: <code>{user.club_card_number}</code>"
if user.phone:
info += f"\n📞 Телефон: <code>{user.phone}</code>"
# Даты
info += f"\n📅 Регистрация: {user.created_at.strftime('%d.%m.%Y %H:%M')}"
if user.last_activity:
days_inactive = (datetime.now(timezone.utc) - user.last_activity).days
info += f"\n⏰ Последняя активность: {user.last_activity.strftime('%d.%m.%Y %H:%M')}"
if days_inactive > 0:
info += f" ({days_inactive} дн. назад)"
return info