novagon/main.py

125 lines
5.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from telethon import TelegramClient, events
from telethon.tl.types import ChannelParticipantsAdmins, DocumentAttributeSticker
from datetime import timedelta
from db import DBAction, db_action, create_table
from df_utils import detect_intent_text
import random
import asyncio
import os
from dotenv import load_dotenv
bot = TelegramClient(os.environ['SESSION_NAME'], int(os.environ['API_ID']),
os.environ['API_HASH']).start(bot_token=os.environ['BOT_TOKEN'])
load_dotenv()
asyncio.get_event_loop().run_until_complete(create_table())
async def check_admin(msg):
if not any([user.id == msg.sender_id async for user in bot.iter_participants(msg.chat_id,
filter=ChannelParticipantsAdmins)]):
await msg.reply('Эту команду может только администратор использовать')
raise events.StopPropagation
@bot.on(events.NewMessage())
async def on_message(msg):
chats = await db_action('SELECT chat_id FROM chats', (), DBAction.fetchall)
if msg.chat_id not in chats:
await db_action('INSERT INTO chats (chat_id) VALUES (?)', (msg.chat_id,), DBAction.commit)
async def check_user_in_exceptions(user_id, chat_id):
"""Checks if a user is in the exceptions list"""
return db_action('SELECT id FROM user_exceptions WHERE user_id = ? AND chat_id = ?', (user_id, chat_id),
DBAction.fetchone) is not None
@bot.on(events.NewMessage(pattern='/exception'))
async def on_exception(msg):
await check_admin(msg)
args = msg.message.text.split(' ')
if len(args) < 4:
await msg.reply('Неверный формат')
raise events.StopPropagation
if args[2]:
if args[2] == 'me':
user_id = msg.sender_id
else:
user_id = args[2]
elif msg.reply_to_message:
user_id = msg.reply_to_message.sender_id
else:
await msg.reply('Допиши к команде айди пользователя, "me", чтобы добавить себя, или ответь командой на сообщение нужного пользователя')
raise events.StopPropagation
match args[1]:
case 'add':
if await check_user_in_exceptions(user_id, msg.chat_id):
await msg.reply('Пользователь уже есть в списке исключений')
raise events.StopPropagation
await db_action('INSERT INTO user_exceptions (user_id) VALUES (?)', (user_id,), DBAction.commit)
await msg.reply('Пользователь добавлен в исключения, теперь его сообщения не будут проверяться ботом')
case 'remove':
if not await check_user_in_exceptions(user_id, msg.chat_id):
await msg.reply('Пользователя пока нет в исключениях')
raise events.StopPropagation
await db_action('DELETE FROM user_exceptions WHERE user_id = ?', (user_id,), DBAction.commit)
await msg.reply('Пользователь удален из списка исключений')
@bot.on(events.NewMessage(pattern='/addstickerpack'))
async def add_stickerpack(msg):
await check_admin(msg)
args = msg.text.split()
if args[1]:
stickerpack_id = args[1]
elif msg.reply_to_message.sticker:
for attr in msg.sticker.attributes:
if isinstance(attr, DocumentAttributeSticker):
stickerpack_id = attr.stickerset.id
break
else:
msg.reply('Допиши к команде айди набора стикеров или ответь командой на стикер из набора')
raise events.StopPropagation
if await db_action("SELECT * FROM banned_stickerpacks WHERE pack_id = ? AND chat_id = ?",
(stickerpack_id, msg.chat_id,), DBAction.fetchone) is None:
await db_action("INSERT INTO banned_stickerpacks (pack_id, chat_id) VALUES (?, ?)",
(stickerpack_id, msg.chat_id,), DBAction.commit)
await msg.reply(f"Стикерпак с айди {msg.text.split(' ')[1]} добавлен в банлист")
else:
await msg.reply(f"Стикерпак с айди {msg.text.split(' ')[1]} уже в банлисте")
raise events.StopPropagation
async def on_banword(msg):
await bot.edit_permissions(msg.chat_id, msg.sender_id, timedelta(minutes=5), send_messages=False)
await msg.reply(random.choice(['Харе про вагоны писать', 'Надоел уже со своими номерными', 'Никаких вагонов тут нет']))
@bot.on(events.NewMessage())
async def handler(msg):
if msg.is_private:
await msg.respond('Hello! Add me to a group to start using me!')
return
await msg.reply(await detect_intent_text(os.environ['GOOGLE_PROJECT_ID'], msg.chat_id, msg.raw_text, 'ru'))
banstickerpacks = [stickerpack[0] for stickerpack in db_action('SELECT pack_id FROM banned_stickerpacks', (), DBAction.fetchall)]
if msg.sticker and any([x.stickerset.id in banstickerpacks for x in msg.sticker.attributes if isinstance(x, DocumentAttributeSticker)]):
await on_banword(msg)
bot.run_until_disconnected()