async refactor & docker deploy environment
Some checks failed
continuous-integration/drone/push Build is failing

This commit is contained in:
2025-09-05 12:04:58 +09:00
parent 0d3bcdfc64
commit d17f0f5507
15 changed files with 361 additions and 187 deletions

View File

@@ -1,5 +1,6 @@
FROM python:3.11-slim
WORKDIR /app
COPY . /app
RUN pip install --no-cache-dir python-telegram-bot sqlalchemy python-dotenv
RUN apt update && apt install -y gcc && apt install -y sqlite3
RUN pip install --no-cache-dir -r requirements.txt
CMD ["python", "main.py"]

49
db.py
View File

@@ -1,13 +1,42 @@
import os
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models import Base
from dotenv import load_dotenv
load_dotenv()
DATABASE_URL = os.getenv('DATABASE_URL', 'sqlite:///bot.db')
engine = create_engine(DATABASE_URL, echo=True)
SessionLocal = sessionmaker(bind=engine)
import os
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import declarative_base
from sqlalchemy.ext.asyncio import async_sessionmaker
def init_db():
Base.metadata.create_all(engine)
DATABASE_URL = os.getenv("DATABASE_URL", "sqlite+aiosqlite:///bot.db")
if DATABASE_URL.startswith("sqlite+aiosqlite:///"):
db_path = DATABASE_URL.replace("sqlite+aiosqlite:///", "")
abs_db_path = os.path.abspath(db_path)
db_dir = os.path.dirname(abs_db_path)
if db_dir and not os.path.exists(db_dir):
os.makedirs(db_dir, exist_ok=True)
engine = create_async_engine(DATABASE_URL, future=True, echo=False)
AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False)
Base = declarative_base()
async def init_db():
print(f'База данных: {DATABASE_URL}')
need_create = False
if DATABASE_URL.startswith("sqlite+aiosqlite:///"):
db_path = DATABASE_URL.replace("sqlite+aiosqlite:///", "")
abs_db_path = os.path.abspath(db_path)
print(f"Абсолютный путь к базе данных: {abs_db_path}")
if not os.path.exists(abs_db_path):
print("Файл базы данных отсутствует, будет создан.")
need_create = True
else:
print(f"База данных: {DATABASE_URL}")
# Для других СУБД всегда пытаемся создать таблицы
need_create = True
if need_create:
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
tables = Base.metadata.tables.keys()
print(f"Созданы таблицы: {', '.join(tables)}")
else:
print("База данных уже существует, создание таблиц пропущено.")

View File

@@ -1,4 +1,3 @@
version: '3.8'
services:
bot:
build: .

View File

@@ -1,58 +1,76 @@
from telegram import Update
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import ContextTypes, ConversationHandler, CommandHandler, CallbackQueryHandler, MessageHandler, filters
from db import SessionLocal
from db import AsyncSessionLocal
from models import Channel, Group, Button
SELECT_TARGET, INPUT_NAME, INPUT_URL = range(3)
async def add_button_start(update: Update, context: ContextTypes.DEFAULT_TYPE):
# Если выбран канал или группа уже сохранены — сразу переход к названию
if context.user_data.get('channel_id'):
context.user_data['target'] = f"channel_{context.user_data['channel_id']}"
await update.message.reply_text('Введите название кнопки:')
if context.user_data is None:
context.user_data = {}
user_data = context.user_data
if user_data.get('channel_id'):
context.user_data['target'] = f"channel_{user_data['channel_id']}"
if update.message:
await update.message.reply_text('Введите название кнопки:')
return INPUT_NAME
elif context.user_data.get('group_id'):
context.user_data['target'] = f"group_{context.user_data['group_id']}"
await update.message.reply_text('Введите название кнопки:')
elif user_data.get('group_id'):
context.user_data['target'] = f"group_{user_data['group_id']}"
if update.message:
await update.message.reply_text('Введите название кнопки:')
return INPUT_NAME
# Если нет — стандартный выбор
session = SessionLocal()
channels = session.query(Channel).all()
groups = session.query(Group).all()
session.close()
from sqlalchemy import select
async with AsyncSessionLocal() as session:
result_channels = await session.execute(select(Channel))
channels = result_channels.scalars().all()
result_groups = await session.execute(select(Group))
groups = result_groups.scalars().all()
keyboard = []
for c in channels:
keyboard.append([{'text': f'Канал: {c.name}', 'callback_data': f'channel_{c.id}'}])
keyboard.append([InlineKeyboardButton(f'Канал: {getattr(c, "name", str(c.name))}', callback_data=f'channel_{getattr(c, "id", str(c.id))}')])
for g in groups:
keyboard.append([{'text': f'Группа: {g.name}', 'callback_data': f'group_{g.id}'}])
keyboard.append([InlineKeyboardButton(f'Группа: {getattr(g, "name", str(g.name))}', callback_data=f'group_{getattr(g, "id", str(g.id))}')])
if not keyboard:
await update.message.reply_text('Нет каналов или групп для добавления кнопки.')
if update.message:
await update.message.reply_text('Нет каналов или групп для добавления кнопки.')
return ConversationHandler.END
await update.message.reply_text('Выберите канал или группу:', reply_markup=None)
if update.message:
await update.message.reply_text('Выберите канал или группу:', reply_markup=InlineKeyboardMarkup(keyboard))
context.user_data['keyboard'] = keyboard
return SELECT_TARGET
async def select_target(update: Update, context: ContextTypes.DEFAULT_TYPE):
if context.user_data is None:
context.user_data = {}
query = update.callback_query
await query.answer()
data = query.data
context.user_data['target'] = data
await query.edit_message_text('Введите название кнопки:')
return INPUT_NAME
if query:
await query.answer()
data = query.data
context.user_data['target'] = data
await query.edit_message_text('Введите название кнопки:')
return INPUT_NAME
return ConversationHandler.END
async def input_name(update: Update, context: ContextTypes.DEFAULT_TYPE):
context.user_data['button_name'] = update.message.text
await update.message.reply_text('Введите ссылку для кнопки:')
return INPUT_URL
if context.user_data is None:
context.user_data = {}
if update.message and hasattr(update.message, 'text'):
context.user_data['button_name'] = update.message.text
await update.message.reply_text('Введите ссылку для кнопки:')
return INPUT_URL
return ConversationHandler.END
async def input_url(update: Update, context: ContextTypes.DEFAULT_TYPE):
url = update.message.text
name = context.user_data.get('button_name')
target = context.user_data.get('target')
if not target or ('_' not in target):
await update.message.reply_text('Ошибка: не выбран канал или группа. Попробуйте снова.')
url = update.message.text if update.message and hasattr(update.message, 'text') else None
name = context.user_data.get('button_name') if context.user_data else None
target = context.user_data.get('target') if context.user_data else None
if not url or not name or not target or ('_' not in target):
if update.message:
await update.message.reply_text('Ошибка: не выбран канал или группа. Попробуйте снова.')
return ConversationHandler.END
session = SessionLocal()
session = AsyncSessionLocal()
try:
type_, obj_id = target.split('_', 1)
obj_id = int(obj_id)
@@ -61,16 +79,19 @@ async def input_url(update: Update, context: ContextTypes.DEFAULT_TYPE):
elif type_ == 'group':
button = Button(name=name, url=url, group_id=obj_id)
else:
await update.message.reply_text('Ошибка: неверный тип объекта.')
session.close()
if update.message:
await update.message.reply_text('Ошибка: неверный тип объекта.')
await session.close()
return ConversationHandler.END
session.add(button)
session.commit()
await update.message.reply_text('Кнопка добавлена.')
await session.commit()
if update.message:
await update.message.reply_text('Кнопка добавлена.')
except Exception as e:
await update.message.reply_text(f'Ошибка при добавлении кнопки: {e}')
if update.message:
await update.message.reply_text(f'Ошибка при добавлении кнопки: {e}')
finally:
session.close()
await session.close()
return ConversationHandler.END
add_button_conv = ConversationHandler(
@@ -81,4 +102,4 @@ add_button_conv = ConversationHandler(
INPUT_URL: [MessageHandler(filters.TEXT & ~filters.COMMAND, input_url)],
},
fallbacks=[]
)
)

View File

@@ -1,17 +1,18 @@
from telegram import Update
from telegram.ext import ContextTypes
from db import SessionLocal
from db import AsyncSessionLocal
from models import Channel
async def add_channel(update: Update, context: ContextTypes.DEFAULT_TYPE):
args = context.args
args = context.args or []
if update.message is None:
return
if len(args) < 2:
await update.message.reply_text('Используйте: /add_channel <название> <ссылка>')
return
name, link = args[0], args[1]
session = SessionLocal()
channel = Channel(name=name, link=link)
session.add(channel)
session.commit()
session.close()
async with AsyncSessionLocal() as session:
channel = Channel(name=name, link=link)
session.add(channel)
await session.commit()
await update.message.reply_text(f'Канал "{name}" добавлен.')

View File

@@ -1,15 +1,17 @@
from telegram import Update
from telegram.ext import ContextTypes
from db import SessionLocal
from db import AsyncSessionLocal
from models import Group
async def add_group(update: Update, context: ContextTypes.DEFAULT_TYPE):
args = context.args
args = context.args or []
if update.message is None:
return
if len(args) < 2:
await update.message.reply_text('Используйте: /add_group <название> <ссылка>')
return
name, link = args[0], args[1]
session = SessionLocal()
session = AsyncSessionLocal()
group = Group(name=name, link=link)
session.add(group)
session.commit()

View File

@@ -1,34 +1,47 @@
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import CommandHandler, CallbackQueryHandler, ConversationHandler, ContextTypes
from db import SessionLocal
from db import AsyncSessionLocal
from models import Channel, Button
SELECT_CHANNEL, MANAGE_BUTTONS = range(2)
async def channel_buttons_start(update: Update, context: ContextTypes.DEFAULT_TYPE):
session = SessionLocal()
channels = session.query(Channel).all()
session.close()
keyboard = [[InlineKeyboardButton(c.name, callback_data=str(c.id))] for c in channels]
await update.message.reply_text(
"Выберите канал для настройки клавиатуры:",
reply_markup=InlineKeyboardMarkup(keyboard)
)
return SELECT_CHANNEL
from sqlalchemy import select
session = AsyncSessionLocal()
try:
channels_result = await session.execute(select(Channel))
channels = channels_result.scalars().all()
keyboard = [[InlineKeyboardButton(f'{getattr(c, "name", str(c.name))}', callback_data=str(getattr(c, "id", str(c.id))))] for c in channels]
if update.message:
await update.message.reply_text(
"Выберите канал для настройки клавиатуры:",
reply_markup=InlineKeyboardMarkup(keyboard)
)
return SELECT_CHANNEL
return ConversationHandler.END
finally:
await session.close()
async def select_channel(update: Update, context: ContextTypes.DEFAULT_TYPE):
query = update.callback_query
await query.answer()
channel_id = int(query.data)
context.user_data['channel_id'] = channel_id
session = SessionLocal()
buttons = session.query(Button).filter_by(channel_id=channel_id).all()
session.close()
text = "Кнопки этого канала:\n"
for b in buttons:
text += f"- {b.name}: {b.url}\n"
text += "\nДобавить новую кнопку: /add_button\nУдалить: /del_button <название>"
await query.edit_message_text(text)
if query and query.data:
await query.answer()
channel_id = int(query.data)
if context.user_data is None:
context.user_data = {}
context.user_data['channel_id'] = channel_id
from sqlalchemy import select
session = AsyncSessionLocal()
try:
buttons_result = await session.execute(select(Button).where(Button.channel_id == channel_id))
buttons = buttons_result.scalars().all()
text = "Кнопки этого канала:\n"
for b in buttons:
text += f"- {b.name}: {b.url}\n"
text += "\nДобавить новую кнопку: /add_button\nУдалить: /del_button <название>"
await query.edit_message_text(text)
finally:
await session.close()
return ConversationHandler.END
channel_buttons_conv = ConversationHandler(
@@ -37,4 +50,4 @@ channel_buttons_conv = ConversationHandler(
SELECT_CHANNEL: [CallbackQueryHandler(select_channel)],
},
fallbacks=[]
)
)

View File

@@ -1,21 +1,27 @@
from telegram import Update
from telegram.ext import CommandHandler, ContextTypes
from db import SessionLocal
from db import AsyncSessionLocal
from models import Button
async def del_button(update: Update, context: ContextTypes.DEFAULT_TYPE):
args = context.args
if not args:
await update.message.reply_text('Используйте: /del_button <название>')
if update.message:
await update.message.reply_text('Используйте: /del_button <название>')
return
name = args[0]
session = SessionLocal()
button = session.query(Button).filter_by(name=name).first()
if not button:
await update.message.reply_text('Кнопка не найдена.')
session.close()
return
session.delete(button)
session.commit()
session.close()
await update.message.reply_text(f'Кнопка "{name}" удалена.')
session = AsyncSessionLocal()
try:
from sqlalchemy import select
result = await session.execute(select(Button).where(Button.name == name))
button = result.scalar_one_or_none()
if not button:
if update.message:
await update.message.reply_text('Кнопка не найдена.')
return
await session.delete(button)
await session.commit()
if update.message:
await update.message.reply_text(f'Кнопка "{name}" удалена.')
finally:
await session.close()

View File

@@ -1,6 +1,6 @@
from telegram import Update
from telegram.ext import CommandHandler, ContextTypes
from db import SessionLocal
from db import AsyncSessionLocal
from models import Button
async def edit_button(update: Update, context: ContextTypes.DEFAULT_TYPE):
@@ -9,14 +9,18 @@ async def edit_button(update: Update, context: ContextTypes.DEFAULT_TYPE):
await update.message.reply_text('Используйте: /edit_button <название> <новоеазвание> <новая_ссылка>')
return
name, new_name, new_url = args[0], args[1], args[2]
session = SessionLocal()
button = session.query(Button).filter_by(name=name).first()
if not button:
await update.message.reply_text('Кнопка не найдена.')
session.close()
return
button.name = new_name
button.url = new_url
session.commit()
session.close()
await update.message.reply_text(f'Кнопка "{name}" изменена.')
session = AsyncSessionLocal()
try:
result = await session.execute(Button.__table__.select().where(Button.name == name))
button = result.scalar_one_or_none()
if not button:
if update.message:
await update.message.reply_text('Кнопка не найдена.')
return
button.name = new_name
button.url = new_url
await session.commit()
if update.message:
await update.message.reply_text(f'Кнопка "{name}" изменена.')
finally:
await session.close()

View File

@@ -1,34 +1,45 @@
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import CommandHandler, CallbackQueryHandler, ConversationHandler, ContextTypes
from db import SessionLocal
from db import AsyncSessionLocal
from models import Group, Button
SELECT_GROUP, MANAGE_BUTTONS = range(2)
async def group_buttons_start(update: Update, context: ContextTypes.DEFAULT_TYPE):
session = SessionLocal()
groups = session.query(Group).all()
session.close()
keyboard = [[InlineKeyboardButton(g.name, callback_data=str(g.id))] for g in groups]
await update.message.reply_text(
"Выберите группу для настройки клавиатуры:",
reply_markup=InlineKeyboardMarkup(keyboard)
)
return SELECT_GROUP
session = AsyncSessionLocal()
try:
groups_result = await session.execute(Group.__table__.select())
groups = groups_result.scalars().all()
keyboard = [[InlineKeyboardButton(f'{getattr(g, "name", str(g.name))}', callback_data=str(getattr(g, "id", str(g.id))))] for g in groups]
if update.message:
await update.message.reply_text(
"Выберите группу для настройки клавиатуры:",
reply_markup=InlineKeyboardMarkup(keyboard)
)
return SELECT_GROUP
return ConversationHandler.END
finally:
await session.close()
async def select_group(update: Update, context: ContextTypes.DEFAULT_TYPE):
query = update.callback_query
await query.answer()
group_id = int(query.data)
context.user_data['group_id'] = group_id
session = SessionLocal()
buttons = session.query(Button).filter_by(group_id=group_id).all()
session.close()
text = "Кнопки этой группы:\n"
for b in buttons:
text += f"- {b.name}: {b.url}\n"
text += "\nДобавить новую кнопку: /add_button\nУдалить: /del_button <название>"
await query.edit_message_text(text)
if query and query.data:
await query.answer()
group_id = int(query.data)
if context.user_data is None:
context.user_data = {}
context.user_data['group_id'] = group_id
session = AsyncSessionLocal()
try:
buttons_result = await session.execute(Button.__table__.select().where(Button.group_id == group_id))
buttons = buttons_result.scalars().all()
text = "Кнопки этой группы:\n"
for b in buttons:
text += f"- {b.name}: {b.url}\n"
text += "\nДобавить новую кнопку: /add_button\nУдалить: /del_button <название>"
await query.edit_message_text(text)
finally:
await session.close()
return ConversationHandler.END
group_buttons_conv = ConversationHandler(
@@ -37,4 +48,4 @@ group_buttons_conv = ConversationHandler(
SELECT_GROUP: [CallbackQueryHandler(select_group)],
},
fallbacks=[]
)
)

View File

@@ -1,65 +1,95 @@
from telegram import Update, InputMediaPhoto, InlineKeyboardMarkup, InlineKeyboardButton
from telegram.ext import ContextTypes, ConversationHandler, MessageHandler, CommandHandler, filters, CallbackQueryHandler, ContextTypes
from db import SessionLocal
from db import AsyncSessionLocal
from models import Channel, Group, Button
SELECT_MEDIA, SELECT_TEXT, SELECT_TARGET = range(3)
async def new_post_start(update: Update, context: ContextTypes.DEFAULT_TYPE):
await update.message.reply_text('Отправьте картинку для поста или /skip:')
return SELECT_MEDIA
if update.message:
await update.message.reply_text('Отправьте картинку для поста или /skip:')
return SELECT_MEDIA
return ConversationHandler.END
async def select_media(update: Update, context: ContextTypes.DEFAULT_TYPE):
if update.message.photo:
if update.message and hasattr(update.message, 'photo') and update.message.photo:
if context.user_data is None:
context.user_data = {}
context.user_data['photo'] = update.message.photo[-1].file_id
await update.message.reply_text('Введите текст поста или пересланное сообщение:')
return SELECT_TEXT
if update.message:
await update.message.reply_text('Введите текст поста или пересланное сообщение:')
return SELECT_TEXT
return ConversationHandler.END
async def select_text(update: Update, context: ContextTypes.DEFAULT_TYPE):
context.user_data['text'] = update.message.text or update.message.caption
session = SessionLocal()
channels = session.query(Channel).all()
groups = session.query(Group).all()
session.close()
keyboard = []
for c in channels:
keyboard.append([InlineKeyboardButton(f'Канал: {c.name}', callback_data=f'channel_{c.id}')])
for g in groups:
keyboard.append([InlineKeyboardButton(f'Группа: {g.name}', callback_data=f'group_{g.id}')])
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text('Выберите, куда отправить пост:', reply_markup=reply_markup)
return SELECT_TARGET
if update.message:
if context.user_data is None:
context.user_data = {}
context.user_data['text'] = getattr(update.message, 'text', None) or getattr(update.message, 'caption', None)
from sqlalchemy import select
session = AsyncSessionLocal()
try:
channels_result = await session.execute(select(Channel))
channels = channels_result.scalars().all()
groups_result = await session.execute(select(Group))
groups = groups_result.scalars().all()
keyboard = []
for c in channels:
keyboard.append([InlineKeyboardButton(f'Канал: {getattr(c, "name", str(c.name))}', callback_data=f'channel_{getattr(c, "id", str(c.id))}')])
for g in groups:
keyboard.append([InlineKeyboardButton(f'Группа: {getattr(g, "name", str(g.name))}', callback_data=f'group_{getattr(g, "id", str(g.id))}')])
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text('Выберите, куда отправить пост:', reply_markup=reply_markup)
return SELECT_TARGET
finally:
await session.close()
return ConversationHandler.END
async def select_target(update: Update, context: ContextTypes.DEFAULT_TYPE):
query = update.callback_query
if not query:
return ConversationHandler.END
await query.answer()
data = query.data
session = SessionLocal()
session = AsyncSessionLocal()
try:
if data.startswith('channel_'):
chat_id = None
markup = None
if data and data.startswith('channel_'):
from sqlalchemy import select
channel_id = int(data.split('_')[1])
channel = session.query(Channel).get(channel_id)
buttons = session.query(Button).filter_by(channel_id=channel_id).all()
markup = InlineKeyboardMarkup([[InlineKeyboardButton(b.name, url=b.url)] for b in buttons]) if buttons else None
chat_id = channel.link.strip()
else:
channel_result = await session.execute(select(Channel).where(Channel.id == channel_id))
channel = channel_result.scalar_one_or_none()
buttons_result = await session.execute(select(Button).where(Button.channel_id == channel_id))
buttons = buttons_result.scalars().all()
markup = InlineKeyboardMarkup([[InlineKeyboardButton(str(b.name), url=str(b.url))] for b in buttons]) if buttons else None
chat_id = getattr(channel, 'link', None)
elif data and data.startswith('group_'):
from sqlalchemy import select
group_id = int(data.split('_')[1])
group = session.query(Group).get(group_id)
buttons = session.query(Button).filter_by(group_id=group_id).all()
markup = InlineKeyboardMarkup([[InlineKeyboardButton(b.name, url=b.url)] for b in buttons]) if buttons else None
chat_id = group.link.strip()
# Проверка chat_id
if not (chat_id.startswith('@') or chat_id.startswith('-')):
await query.edit_message_text('Ошибка: ссылка должна быть username (@channel) или числовой ID (-100...)')
return ConversationHandler.END
try:
await context.bot.send_photo(chat_id=chat_id, photo=context.user_data.get('photo'), caption=context.user_data.get('text'), reply_markup=markup)
await query.edit_message_text('Пост отправлен!')
except Exception as e:
await query.edit_message_text(f'Ошибка отправки поста: {e}')
group_result = await session.execute(select(Group).where(Group.id == group_id))
group = group_result.scalar_one_or_none()
buttons_result = await session.execute(select(Button).where(Button.group_id == group_id))
buttons = buttons_result.scalars().all()
markup = InlineKeyboardMarkup([[InlineKeyboardButton(str(b.name), url=str(b.url))] for b in buttons]) if buttons else None
chat_id = getattr(group, 'link', None)
if chat_id:
chat_id = chat_id.strip()
if not (chat_id.startswith('@') or chat_id.startswith('-')):
await query.edit_message_text('Ошибка: ссылка должна быть username (@channel) или числовой ID (-100...)')
return ConversationHandler.END
try:
photo = context.user_data.get('photo') if context.user_data else None
if photo:
await context.bot.send_photo(chat_id=chat_id, photo=photo, caption=context.user_data.get('text') if context.user_data else None, reply_markup=markup)
await query.edit_message_text('Пост отправлен!')
else:
await query.edit_message_text('Ошибка: не выбрано фото для поста.')
except Exception as e:
await query.edit_message_text(f'Ошибка отправки поста: {e}')
finally:
session.close()
await session.close()
return ConversationHandler.END
new_post_conv = ConversationHandler(
@@ -69,5 +99,6 @@ new_post_conv = ConversationHandler(
SELECT_TEXT: [MessageHandler(filters.TEXT | filters.FORWARDED, select_text)],
SELECT_TARGET: [CallbackQueryHandler(select_target)],
},
fallbacks=[]
)
fallbacks=[],
)

5
init_db.py Normal file
View File

@@ -0,0 +1,5 @@
import asyncio
from db import init_db
if __name__ == "__main__":
asyncio.run(init_db())

81
main.py
View File

@@ -1,32 +1,67 @@
import sys
import asyncio
if sys.platform.startswith('win'):
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
import logging
import os
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import Application, CommandHandler, MessageHandler, filters, CallbackQueryHandler, ConversationHandler, ContextTypes
from dotenv import load_dotenv
from db import SessionLocal, init_db
from db import AsyncSessionLocal, init_db
from models import Admin, Channel, Group, Button
from asyncio import run as sync_to_async
load_dotenv()
TELEGRAM_TOKEN = os.getenv('TELEGRAM_TOKEN')
logging.basicConfig(level=logging.INFO)
logging.getLogger("httpx").setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
init_db()
import asyncio
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
session = SessionLocal()
user_id = update.effective_user.id
admin = session.query(Admin).filter_by(tg_id=user_id).first()
if not admin:
admin = Admin(tg_id=user_id)
session.add(admin)
session.commit()
await update.message.reply_text('Вы зарегистрированы как админ.')
session = AsyncSessionLocal()
user_id = update.effective_user.id if update.effective_user else None
result = await session.execute(Admin.__table__.select().where(Admin.tg_id == user_id))
admin = result.first() if user_id else None
if not admin and user_id:
await session.execute(Admin.__table__.insert().values(tg_id=user_id))
await session.commit()
if update.message:
await update.message.reply_text('Вы зарегистрированы как админ.')
else:
await update.message.reply_text('Вы уже зарегистрированы.')
session.close()
if update.message:
await update.message.reply_text('Вы уже зарегистрированы.')
await session.close()
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
help_text = (
"<b>Справка по командам бота:</b>\n\n"
"<b>/start</b> — регистрация пользователя как администратора.\n"
"<b>/help</b> — показать это подробное описание команд.\n\n"
"<b>/add_channel &lt;название&gt; &lt;ссылка&gt;</b> — добавить канал для публикаций. Пример: /add_channel MyChannel @my_channel\n"
"<b>/add_group &lt;название&gt; &lt;ссылка&gt;</b> — добавить группу для публикаций. Пример: /add_group MyGroup @my_group\n"
"<b>/add_button</b> — добавить кнопку к выбранному каналу или группе. Запустит диалог выбора и добавления.\n"
"<b>/edit_button &lt;название&gt; &lt;новоеазвание&gt; &lt;новая_ссылка&gt;</b> — изменить кнопку. Пример: /edit_button old new https://site.ru\n"
"<b>/del_button &lt;название&gt;</b> — удалить кнопку по названию.\n"
"<b>/new_post</b> — создать новый пост (картинка + текст + выбор канала/группы + кнопки).\n"
"<b>/group_buttons</b> — показать и настроить кнопки для группы.\n"
"<b>/channel_buttons</b> — показать и настроить кнопки для канала.\n\n"
"<b>Примеры использования:</b>\n"
"- /add_channel Новости @news_channel\n"
"- /add_group Чат @chat_group\n"
"- /edit_button Подписка Subscribe https://subscribe.ru\n"
"- /del_button Subscribe\n\n"
"<b>Порядок работы:</b>\n"
"1. Добавьте канал или группу.\n"
"2. Добавьте кнопки для них.\n"
"3. Создайте пост через /new_post и выберите, куда отправить.\n"
"4. Управляйте кнопками через /group_buttons и /channel_buttons.\n\n"
"Если возникли вопросы — используйте /help или обратитесь к администратору."
)
if update.message:
await update.message.reply_text(help_text, parse_mode='HTML')
# Импорт обработчиков
from handlers.add_channel import add_channel
@@ -38,10 +73,16 @@ from handlers.channel_buttons import channel_buttons_conv
from handlers.edit_button import edit_button
from handlers.del_button import del_button
def main():
def main():
if not TELEGRAM_TOKEN:
print("Ошибка: TELEGRAM_TOKEN не найден в переменных окружения.")
return
sync_to_async(init_db())
application = Application.builder().token(TELEGRAM_TOKEN).build()
application.add_handler(CommandHandler('start', start))
application.add_handler(CommandHandler('help', help_command))
application.add_handler(CommandHandler('add_channel', add_channel))
application.add_handler(CommandHandler('add_group', add_group))
application.add_handler(add_button_conv)
@@ -50,7 +91,17 @@ def main():
application.add_handler(channel_buttons_conv)
application.add_handler(CommandHandler('edit_button', edit_button))
application.add_handler(CommandHandler('del_button', del_button))
import sys
import asyncio
if sys.platform.startswith('win'):
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
application.run_polling()
if __name__ == '__main__':
if __name__ == "__main__":
main()

View File

@@ -1,7 +1,6 @@
from sqlalchemy import Column, Integer, String, ForeignKey, Text
from sqlalchemy.orm import declarative_base, relationship
Base = declarative_base()
from sqlalchemy.orm import relationship
from db import Base
class Admin(Base):
__tablename__ = 'admins'

View File

@@ -2,3 +2,4 @@ python-telegram-bot>=20.0
sqlalchemy>=2.0
python-dotenv>=1.0
pytest>=7.0
aiosqlite>=0.19