novagon/main.py

196 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from telethon import TelegramClient, events
from telethon import functions, types
from datetime import timedelta
from db import DBAction, db_action, create_table
from df_utils import detect_intent_text
import random
import asyncio
import os
from dotenv import load_dotenv
bot = TelegramClient(os.environ['SESSION_NAME'], int(os.environ['API_ID']),
os.environ['API_HASH']).start(bot_token=os.environ['BOT_TOKEN'])
async def check_admin(msg):
return any([user.id == msg.sender_id async for user in bot.iter_participants(msg.chat_id,
filter=types.ChannelParticipantsAdmins)])
@bot.on(events.NewMessage())
async def on_message(msg):
if msg.is_private:
await msg.respond('Я не работаю в личных сообщениях. Добавь меня в группу, для начала работы')
raise events.StopPropagation
chats = await db_action('SELECT chat_id FROM chats', (), DBAction.fetchall)
if not any([msg.chat_id == chat[0] for chat in chats]):
await db_action('INSERT INTO chats (chat_id) VALUES (?)', (msg.chat_id,), DBAction.commit)
@bot.on(events.NewMessage(pattern='/help'))
async def on_help(msg):
await msg.reply('Дотупные команды: \n'
'/help - помощь\n'
'/bs [add/rm] <id стикерапка> - запретить, разрешить стикерпак в группе, можно написать в ответ на стикер из нужного стикерпака\n'
'/exc [add/rm] [id пользователя/@упомянуть пользователя/"me"] - сделать пользователя неприкасаемым для бота, можно написать в ответ на любое сообщение нужного пользователя\n'
'/report в ответ на сообщение бота отправляет отчет об ошибке распознавания запрещенных слов разработчику, если отправил администратор, размучивает пользователя')
raise events.StopPropagation
async def check_user_in_exceptions(user_id, chat_id):
"""Checks if a user is in the exceptions list"""
return await db_action('SELECT id FROM user_exceptions WHERE user_id = ? AND chat_id = ?', (user_id, chat_id),
DBAction.fetchone) is not None
@bot.on(events.NewMessage(pattern='/exc'))
async def on_exception(msg):
if not await check_admin(msg):
await msg.reply('Добавлять пользователей в исключения может только админ')
raise events.StopPropagation
args = msg.message.text.split(' ')
if msg.is_reply and len(args) > 1:
reply_message = await msg.get_reply_message()
user_id = reply_message.sender_id
elif len(args) > 2:
if args[2] == 'me':
user_id = msg.sender_id
elif args[2].isdigit():
try:
await bot.get_entity(args[2])
except ValueError:
await msg.reply(f'Пользователь с id {args[2]} не найден')
raise events.StopPropagation
user_id = args[2]
elif '@' in args[2]:
try:
user = await bot.get_entity(args[2])
user_id = user.id
except ValueError:
await msg.reply(f'Пользователь с ником {args[2]} не найден')
raise events.StopPropagation
else:
await msg.reply('Неверный синтаксис. Возможный вариант: /exception [add/rm] [user_id/username/"me"]')
raise events.StopPropagation
else:
await msg.reply('Неверный синтаксисю Вы можете использовать: /exception [add/rm] [user_id/username/"me"], или ответить на сообщение пользователя, которого хотите добавить в исключения')
raise events.StopPropagation
match args[1]:
case 'add':
if await check_user_in_exceptions(user_id, msg.chat_id):
await msg.reply('Пользователь уже есть в списке исключений')
raise events.StopPropagation
await db_action('INSERT INTO user_exceptions (user_id, chat_id) VALUES (?, ?)', (user_id, msg.chat_id,),
DBAction.commit)
await msg.reply('Пользователь добавлен в исключения, теперь его сообщения не будут проверяться ботом')
case 'rm':
if not await check_user_in_exceptions(user_id, msg.chat_id):
await msg.reply('Пользователя пока нет в исключениях')
raise events.StopPropagation
await db_action('DELETE FROM user_exceptions WHERE user_id = ? AND chat_id = ?', (user_id, msg.chat_id,),
DBAction.commit)
await msg.reply('Пользователь удален из списка исключений')
case _:
await msg.reply(f'Неизвестный аргумент {args[1]}, возможные варианты: add, rm')
raise events.StopPropagation
async def check_stickerpack_in_banlist(stickerpack_id, chat_id):
"""Checks if a stickerpack is in the banlist"""
return await db_action('SELECT id FROM banned_stickerpacks WHERE pack_id = ? AND chat_id = ?',
(stickerpack_id, chat_id), DBAction.fetchone) is not None
@bot.on(events.NewMessage(pattern='/bs'))
async def add_stickerpack(msg):
if not await check_admin(msg):
await msg.reply('Добавлять стикеры в банлист может только админ')
raise events.StopPropagation
args = msg.text.split()
if len(args) > 2:
if not args[2].isdigit():
await msg.reply('Возможно, вы ввели неверный id стикерпака или ввели не id стикерпака')
raise events.StopPropagation
stickerpack_id = args[2]
elif msg.is_reply and len(args) > 1:
reply_message = await msg.get_reply_message()
if not reply_message.sticker:
await msg.reply('Ответь сообщением с командой именно на стикер, не на медиафайл или текст')
raise events.StopPropagation
for attr in reply_message.media.document.attributes:
if isinstance(attr, types.DocumentAttributeSticker):
stickerpack_id = attr.stickerset.id
break
else:
await msg.reply('Неверный синтаксисю Вы можете использовать: /bs [add/rm] [stickerpack_id], или ответить сообщением с командой на стикер')
raise events.StopPropagation
match args[1]:
case 'add':
if await check_stickerpack_in_banlist(stickerpack_id, msg.chat_id):
await msg.reply('Набор стикеров уже в банлисте')
raise events.StopPropagation
await db_action("INSERT INTO banned_stickerpacks (pack_id, chat_id) VALUES (?, ?)",
(stickerpack_id, msg.chat_id,), DBAction.commit)
await msg.reply(f"Стикерпак с id {stickerpack_id} добавлен в банлист")
case 'rm':
if not await check_stickerpack_in_banlist(stickerpack_id, msg.chat_id):
await msg.reply('Набор стикеров не найден в банлисте')
raise events.StopPropagation
await db_action("DELETE FROM banned_stickerpacks WHERE pack_id = ? AND chat_id = ?",
(stickerpack_id, msg.chat_id,), DBAction.commit)
await msg.reply(f"Стикерпак с id {stickerpack_id} удален из банлиста")
case _:
await msg.reply(f'Неизвестный аргумент {args[1]}, возможные варианты: add, rm')
raise events.StopPropagation
@bot.on(events.NewMessage())
async def process_message(msg):
"""if await check_admin(msg) or await check_user_in_exceptions(msg.sender_id, msg.chat_id):
return"""
if msg.message:
resp = await detect_intent_text(os.environ['GOOGLE_PROJECT_ID'], msg.chat_id, msg.raw_text, os.environ['GOOGLE_MODEL_LANGUAGE'])
if resp:
await msg.reply(resp)
await bot.edit_permissions(msg.chat_id, msg.sender_id, timedelta(minutes=5), send_messages=False)
raise events.StopPropagation
if msg.sticker:
banstickerpacks = [stickerpack[0] for stickerpack in await db_action('SELECT pack_id FROM banned_stickerpacks WHERE chat_id = ?',
(msg.chat_id,), DBAction.fetchall)]
if any([x.stickerset.id in banstickerpacks for x in msg.sticker.attributes if isinstance(x, types.DocumentAttributeSticker)]):
await msg.reply(random.choice(['Ай-ай-ай, стикеры с вагончиками, не хорошо',
'Стикерами с вагонами балуемся? Негоже так делать',
'Надоел уже со своими вагонами',
'Щас высажу с поезда, будете тут бегать и про номерные свои кричать!']))
await bot.edit_permissions(msg.chat_id, msg.sender_id, timedelta(minutes=5), send_messages=False)
async def main():
load_dotenv()
await create_table()
await bot.run_until_disconnected()
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())