novagon/main.py

142 lines
6.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from telethon import TelegramClient, events
from telethon.tl.types import ChannelParticipantsAdmins, DocumentAttributeSticker
from datetime import timedelta
from db import DBAction, db_action, create_table
from df_utils import detect_intent_text
import random
import asyncio
import os
from dotenv import load_dotenv
bot = TelegramClient(os.environ['SESSION_NAME'], int(os.environ['API_ID']),
os.environ['API_HASH']).start(bot_token=os.environ['BOT_TOKEN'])
load_dotenv()
asyncio.get_event_loop().run_until_complete(create_table())
async def check_admin(msg):
return any([user.id == msg.sender_id async for user in bot.iter_participants(msg.chat_id,
filter=ChannelParticipantsAdmins)])
@bot.on(events.NewMessage())
async def on_message(msg):
chats = await db_action('SELECT chat_id FROM chats', (), DBAction.fetchall)
if not any([msg.chat_id == chat[0] for chat in chats]):
await db_action('INSERT INTO chats (chat_id) VALUES (?)', (msg.chat_id,), DBAction.commit)
async def check_user_in_exceptions(user_id, chat_id):
"""Checks if a user is in the exceptions list"""
return await db_action('SELECT id FROM user_exceptions WHERE user_id = ? AND chat_id = ?', (user_id, chat_id),
DBAction.fetchone) is not None
@bot.on(events.NewMessage(pattern='/exception'))
async def on_exception(msg):
if not await check_admin(msg):
await msg.reply('Добавлять пользователей в исключения может только админ')
raise events.StopPropagation
args = msg.message.text.split(' ')
if len(args) > 2:
if args[2] == 'me':
user_id = msg.sender_id
else:
user_id = args[2]
elif msg.is_reply:
reply_message = await msg.get_reply_message()
user_id = reply_message.sender_id
else:
await msg.reply('Допиши к команде: айди пользователя, "me", чтобы добавить себя, или ответь командой на сообщение нужного пользователя')
raise events.StopPropagation
match args[1]:
case 'add':
if await check_user_in_exceptions(user_id, msg.chat_id):
await msg.reply('Пользователь уже есть в списке исключений')
raise events.StopPropagation
await db_action('INSERT INTO user_exceptions (user_id, chat_id) VALUES (?, ?)', (user_id, msg.chat_id,),
DBAction.commit)
await msg.reply('Пользователь добавлен в исключения, теперь его сообщения не будут проверяться ботом')
case 'remove':
if not await check_user_in_exceptions(user_id, msg.chat_id):
await msg.reply('Пользователя пока нет в исключениях')
raise events.StopPropagation
await db_action('DELETE FROM user_exceptions WHERE user_id = ? AND chat_id = ?', (user_id, msg.chat_id,),
DBAction.commit)
await msg.reply('Пользователь удален из списка исключений')
@bot.on(events.NewMessage(pattern='/bansticker'))
async def add_stickerpack(msg):
if not await check_admin(msg):
await msg.reply('Добавлять стикеры в банлист может только админ')
raise events.StopPropagation
args = msg.text.split()
if len(args) > 1:
stickerpack_id = args[1]
elif msg.is_reply:
reply_message = await msg.get_reply_message()
if not reply_message.sticker:
await msg.reply('Ответь сообщением с командой именно на стикер')
raise events.StopPropagation
for attr in reply_message.media.document.attributes:
if isinstance(attr, DocumentAttributeSticker):
stickerpack_id = attr.stickerset.id
break
else:
await msg.reply('Допиши к команде айди набора стикеров или ответь командой на стикер из набора')
raise events.StopPropagation
if await db_action("SELECT * FROM banned_stickerpacks WHERE pack_id = ? AND chat_id = ?",
(stickerpack_id, msg.chat_id,), DBAction.fetchone) is None:
await db_action("INSERT INTO banned_stickerpacks (pack_id, chat_id) VALUES (?, ?)",
(stickerpack_id, msg.chat_id,), DBAction.commit)
await msg.reply(f"Стикерпак с айди {stickerpack_id} добавлен в банлист")
else:
await msg.reply(f"Стикерпак с айди {stickerpack_id} уже в банлисте")
raise events.StopPropagation
@bot.on(events.NewMessage())
async def process_message(msg):
"""if await check_admin(msg) or await check_user_in_exceptions(msg.sender_id, msg.chat_id):
return"""
if msg.is_private:
await msg.respond('Я не работаю в личных сообщениях. Добавь меня в группу, пожалуйста')
raise events.StopPropagation
if msg.message:
resp = await detect_intent_text(os.environ['GOOGLE_PROJECT_ID'], msg.chat_id, msg.raw_text, os.environ['GOOGLE_MODEL_LANGUAGE'])
if resp:
await msg.reply(resp)
await bot.edit_permissions(msg.chat_id, msg.sender_id, timedelta(minutes=5), send_messages=False)
raise events.StopPropagation
if msg.sticker:
banstickerpacks = [stickerpack[0] for stickerpack in await db_action('SELECT pack_id FROM banned_stickerpacks WHERE chat_id = ?',
(msg.chat_id,), DBAction.fetchall)]
if any([x.stickerset.id in banstickerpacks for x in msg.sticker.attributes if isinstance(x, DocumentAttributeSticker)]):
await msg.reply(random.choice(['Ай-ай-ай, стикеры с вагончиками, не хорошо',
'Стикерами с вагонами балуемся? Негоже так делать',
'Надоел уже со своими вагонами',
'Щас высажу с поезда, будете тут бегать и про номерные свои кричать!']))
await bot.edit_permissions(msg.chat_id, msg.sender_id, timedelta(minutes=5), send_messages=False)
bot.run_until_disconnected()