import asyncio from telegram import Update, InlineKeyboardMarkup, InlineKeyboardButton from telegram.constants import ChatMemberStatus, ChatType, ParseMode from telegram.ext import ContextTypes from app.bot.messages import JOIN_PUBLIC_WITH_ID, NEED_START_DM, BIND_CHANNEL_BTN TTL_SEC = 30 async def _autodel(ctx: ContextTypes.DEFAULT_TYPE, chat_id: int, mid: int, delay: int = TTL_SEC): try: await asyncio.sleep(delay) await ctx.bot.delete_message(chat_id=chat_id, message_id=mid) except Exception: pass async def on_my_chat_member(update: Update, ctx: ContextTypes.DEFAULT_TYPE): mcm = update.my_chat_member if not mcm: return chat = mcm.chat new_status = mcm.new_chat_member.status if new_status not in (ChatMemberStatus.MEMBER, ChatMemberStatus.ADMINISTRATOR): return kb = None if chat.type == ChatType.CHANNEL: kb = InlineKeyboardMarkup([[InlineKeyboardButton(BIND_CHANNEL_BTN, callback_data=f"bind:{chat.id}")]]) actor = getattr(mcm, "from_user", None) if actor: try: await ctx.bot.send_message(actor.id, JOIN_PUBLIC_WITH_ID.format(chat_id=chat.id, ttl=TTL_SEC), parse_mode=ParseMode.MARKDOWN, reply_markup=kb) return except Exception: try: await ctx.bot.send_message(actor.id, NEED_START_DM) except Exception: pass try: msg = await ctx.bot.send_message(chat_id=chat.id, text=JOIN_PUBLIC_WITH_ID.format(chat_id=chat.id, ttl=TTL_SEC), parse_mode=ParseMode.MARKDOWN, reply_markup=kb) ctx.application.create_task(_autodel(ctx, chat.id, msg.message_id, delay=TTL_SEC)) except Exception: pass