novagon/main.py

217 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from telethon import TelegramClient, events
from telethon import types
from datetime import timedelta
from db import DBAction, db_action, create_table
import aiosqlite
from df_utils import detect_intent_text
import random
import asyncio
import os
from dotenv import load_dotenv
load_dotenv()
bot = TelegramClient(os.environ['SESSION_NAME'], int(os.environ['API_ID']),
os.environ['API_HASH']).start(bot_token=os.environ['BOT_TOKEN'])
@bot.on(events.NewMessage(pattern='/help'))
async def on_help(msg):
"""Send detailed help message"""
await msg.reply('Дотупные команды: \n'
'/help - помощь\n'
'/bs [add/rm] <id стикерапка> - запретить, разрешить стикерпак в группе, можно написать в ответ на стикер из нужного стикерпака\n'
'/exc [add/rm] [id пользователя/@упомянуть пользователя/"me"] - сделать пользователя неприкасаемым для бота, можно написать в ответ на любое сообщение нужного пользователя\n'
'/report в ответ на сообщение, отправляет отчет об ошибке распознавания запрещенных слов разработчику, если отправил администратор, размучивает пользователя')
raise events.StopPropagation
@bot.on(events.NewMessage())
async def check_dm(msg):
if msg.is_private:
await msg.reply('Привет!, я не работаю в личных сообщениях. Добавь меня в группу и мы будем избавляться от вагонов вместе!')
raise events.StopPropagation
async def check_admin(msg):
"""Checks if a user is an admin in the chat"""
return any([user.id == msg.sender_id async for user in bot.iter_participants(msg.chat_id,
filter=types.ChannelParticipantsAdmins)])
@bot.on(events.NewMessage(pattern='/report'))
async def on_report(msg):
"""Send detection error report"""
if msg.is_reply:
reply_message = await msg.get_reply_message()
await bot.edit_permissions(msg.chat_id, reply_message.sender_id, send_messages=True)
await bot.send_message(msg.chat_id, 'пользователь размучен.')
else:
await msg.reply('Ответь командой на сообщение, которое было неправильно распознано, чтобы отправить отчет об ошибке и размутить пользователя')
async def check_user_in_exceptions(user_id, chat_id):
"""Checks if a user is in the exceptions list"""
return await db_action('SELECT id FROM user_exceptions WHERE user_id = ? AND chat_id = ?', (user_id, chat_id),
DBAction.fetchone) is not None
@bot.on(events.NewMessage(pattern='/exc'))
async def manage_exceptions(msg):
"""Manage users, that will be ignored by the bot"""
if not await check_admin(msg):
await msg.reply('Добавлять пользователей в исключения может только админ')
raise events.StopPropagation
args = msg.message.text.split(' ')
if msg.is_reply and len(args) > 1:
reply_message = await msg.get_reply_message()
user_id = reply_message.sender_id
elif len(args) > 2:
if args[2] == 'me':
user_id = msg.sender_id
elif args[2].isdigit():
try:
await bot.get_entity(args[2])
except ValueError:
await msg.reply(f'Пользователь с id {args[2]} не найден')
raise events.StopPropagation
user_id = args[2]
elif '@' in args[2]:
try:
user = await bot.get_entity(args[2])
user_id = user.id
except ValueError:
await msg.reply(f'Пользователь с ником {args[2]} не найден')
raise events.StopPropagation
else:
await msg.reply('Неверный синтаксис. Возможный вариант: /exception [add/rm] [user_id/username/"me"]')
raise events.StopPropagation
else:
await msg.reply('Неверный синтаксисю Вы можете использовать: /exception [add/rm] [user_id/username/"me"], или ответить на сообщение пользователя, которого хотите добавить в исключения')
raise events.StopPropagation
match args[1]:
case 'add':
if await check_user_in_exceptions(user_id, msg.chat_id):
await msg.reply('Пользователь уже есть в списке исключений')
raise events.StopPropagation
await db_action('INSERT INTO user_exceptions (user_id, chat_id) VALUES (?, ?)', (user_id, msg.chat_id,),
DBAction.commit)
await msg.reply('Пользователь добавлен в исключения, теперь его сообщения не будут проверяться ботом')
case 'rm':
if not await check_user_in_exceptions(user_id, msg.chat_id):
await msg.reply('Пользователя пока нет в исключениях')
raise events.StopPropagation
await db_action('DELETE FROM user_exceptions WHERE user_id = ? AND chat_id = ?', (user_id, msg.chat_id,),
DBAction.commit)
await msg.reply('Пользователь удален из списка исключений')
case _:
await msg.reply(f'Неизвестный аргумент {args[1]}, возможные варианты: add, rm')
raise events.StopPropagation
async def check_stickerpack_in_banlist(stickerpack_id, chat_id):
"""Checks if a stickerpack is in the banlist"""
return await db_action('SELECT id FROM banned_stickerpacks WHERE pack_id = ? AND chat_id = ?',
(stickerpack_id, chat_id), DBAction.fetchone) is not None
@bot.on(events.NewMessage(pattern='/bs'))
async def manage_stickerpack(msg):
"""Add or remove stickerpack from banlist"""
if not await check_admin(msg):
await msg.reply('Добавлять стикеры в банлист может только админ')
raise events.StopPropagation
args = msg.text.split()
if len(args) > 2:
if not args[2].isdigit():
await msg.reply('Возможно, вы ввели неверный id стикерпака или ввели не id стикерпака')
raise events.StopPropagation
stickerpack_id = args[2]
elif msg.is_reply and len(args) > 1:
reply_message = await msg.get_reply_message()
if not reply_message.sticker:
await msg.reply('Ответьте сообщением с командой именно на стикер, не на медиафайл или текст')
raise events.StopPropagation
for attr in reply_message.media.document.attributes:
if isinstance(attr, types.DocumentAttributeSticker):
stickerpack_id = attr.stickerset.id
break
else:
await msg.reply('Неверный синтаксис. Вы можете использовать: /bs [add/rm] [stickerpack_id] или ответить сообщением с командой на стикер')
raise events.StopPropagation
match args[1]:
case 'add':
if await check_stickerpack_in_banlist(stickerpack_id, msg.chat_id):
await msg.reply('Набор стикеров уже в банлисте')
raise events.StopPropagation
await db_action("INSERT INTO banned_stickerpacks (pack_id, chat_id) VALUES (?, ?)",
(stickerpack_id, msg.chat_id,), DBAction.commit)
await msg.reply(f"Стикерпак с id {stickerpack_id} добавлен в банлист")
case 'rm':
if not await check_stickerpack_in_banlist(stickerpack_id, msg.chat_id):
await msg.reply('Набор стикеров не найден в банлисте')
raise events.StopPropagation
await db_action("DELETE FROM banned_stickerpacks WHERE pack_id = ? AND chat_id = ?",
(stickerpack_id, msg.chat_id,), DBAction.commit)
await msg.reply(f"Стикерпак с id {stickerpack_id} удален из банлиста")
case _:
await msg.reply(f'Неизвестный аргумент {args[1]}, возможные варианты: add, rm')
raise events.StopPropagation
@bot.on(events.NewMessage())
async def process_message(msg):
"""if await check_admin(msg) or await check_user_in_exceptions(msg.sender_id, msg.chat_id):
return"""
if msg.text:
resp = await detect_intent_text(os.environ['GOOGLE_PROJECT_ID'], msg.chat_id, msg.raw_text,
os.environ['GOOGLE_MODEL_LANGUAGE'])
if resp:
await msg.reply(random.choice(['Харе уже свои вагоны слать',
'Вагончиками балуемся?',
'Вагоны тут не приветствуются',
'Надоел уже со своими вагонами']))
if random.randint(0, 6) == 1:
await msg.respond(
'Бот работает в режиме обучения, напишите /report в ответ на сообщение, если оно не было распознано или распознано по ошибке')
await bot.edit_permissions(msg.chat_id, msg.sender_id, timedelta(minutes=10), send_messages=False)
raise events.StopPropagation
if msg.sticker:
banstickerpacks = [stickerpack[0] for stickerpack in await db_action('SELECT pack_id FROM banned_stickerpacks WHERE chat_id = ?',
(msg.chat_id,), DBAction.fetchall)]
if any([x.stickerset.id in banstickerpacks for x in msg.sticker.attributes if isinstance(x, types.DocumentAttributeSticker)]):
await bot.edit_permissions(msg.chat_id, msg.sender_id, timedelta(minutes=10), send_messages=False)
await msg.reply(random.choice(['Ай-ай-ай, стикеры с вагончиками, не хорошо',
'Стикерами с вагонами балуемся? Негоже так делать',
'Надоел уже со своими вагонами',
'Щас высажу с поезда, будете тут про номерные свои кричать!']))
async def main():
await create_table()
await bot.run_until_disconnected()
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())