-
Notifications
You must be signed in to change notification settings - Fork 2
/
bot.py
152 lines (116 loc) · 4.39 KB
/
bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import os
from datetime import datetime as dt
from dotenv import load_dotenv
from telegram import InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import (ApplicationBuilder, CallbackQueryHandler,
CommandHandler, MessageHandler, filters)
from logger import logger as log
from screens.exceptions import ScreenshotError, WHOISError
from screens.models import Statistics, User, get_or_create, session
from screens.screenshot import take_screenshot
from screens.whois import parce_whois
load_dotenv()
BOT_TOKEN = os.getenv('BOT_TOKEN')
async def start(update, context):
text = """
Привет! Я делаю скриншоты веб-страниц по ссылкам, которые вы пришлете.
Чтобы начать работу, пришлите мне любую ссылку, например wikipedia.org
"""
await context.bot.send_message(
chat_id=update.effective_chat.id, text=text,
reply_markup=InlineKeyboardMarkup(
inline_keyboard=[[InlineKeyboardButton(
text='Добавить бота в ваш чат',
url='http://t.me/{}?startgroup=true'.format(
context.bot.bot.username
)
)]]
)
)
async def send_screenshot(update, context):
link = update.message.text
text = 'Запрос принят к обработке'
dummy = await context.bot.send_message(
chat_id=update.effective_chat.id, text=text,
reply_to_message_id=update.message.message_id
)
user_instance = get_or_create(
session, User, tg_id=update.effective_user.id
)
new_stat = Statistics(
user_id=user_instance.id,
website=link,
date=dt.now(),
success=False
)
try:
screen_bytes, website, page_title, ev_time = await take_screenshot(
link, tg_user=update.effective_user.id
)
await context.bot.delete_message(
chat_id=update.effective_chat.id,
message_id=dummy.message_id
)
await context.bot.send_photo(
photo=screen_bytes,
caption='{}\n\nВеб-сайт: {}\n\nВремя обработки: {} сек.'.format(
page_title, website, ev_time
),
chat_id=update.effective_chat.id,
reply_markup=InlineKeyboardMarkup(
inline_keyboard=[[InlineKeyboardButton(
'Подробнее', callback_data=f'{website}'
)]]
)
)
log.info(f'Успешный запрос к {website}')
new_stat.success = True
except ScreenshotError as e:
log.error(e)
await context.bot.edit_message_text(
chat_id=update.effective_chat.id, text=e.args[0],
message_id=dummy.message_id
)
except Exception as e:
log.exception(e)
text = 'Произошла ошибка. Попробуйте сделать запрос позже'
await context.bot.edit_message_text(
chat_id=update.effective_chat.id, text=text,
message_id=dummy.message_id
)
finally:
session.add(new_stat)
session.commit()
async def whois_button(update, context):
whois = 'Не удалось получить IP-адрес указанного сайта'
query = update.callback_query
try:
whois = await parce_whois(query.data)
except WHOISError as e:
log.error(e)
except Exception as e:
log.exception(e)
await query.answer(text=whois, show_alert=True)
def check_tokens():
if not BOT_TOKEN:
log.critical(
'Переменная окружения BOT_TOKEN не задана. Бот выключен'
)
return False
return True
start_handler = CommandHandler('start', start)
get_screen = MessageHandler(
filters.Regex(r'.*?([\da-z\.-]+\.[a-z\.]{2,6})*\/?'),
send_screenshot
)
callback_query_answ = CallbackQueryHandler(whois_button)
def main():
if not check_tokens():
return
application = ApplicationBuilder().token(BOT_TOKEN).build()
application.add_handler(start_handler)
application.add_handler(get_screen)
application.add_handler(callback_query_answ)
application.run_polling()
if __name__ == '__main__':
main()