-
Notifications
You must be signed in to change notification settings - Fork 0
/
job.py
198 lines (170 loc) · 6.7 KB
/
job.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
from misc.fun import try_wait
import asyncio
import html
import time
import signal
import traceback
import logging
from pyrogram.types import Message, User, InlineKeyboardMarkup, InlineKeyboardButton
from pyrogram.errors import MessageNotModified
log = logging.getLogger(__name__)
# Maximum output length to fit in the output message
# A deviation of 256 ensures the spaces for the command and the pid,
# I could optimize it but I won't
MAX_LENGTH = 4096 - 256
class Job:
_buf: bytes = b""
_proc = None
_short: str = None
_header: str = None
_markup: InlineKeyboardMarkup = None
_time: float = None
def __init__(self, command: str, from_user: User, message: Message):
self._command = command
self._from_user = from_user
self._top_message = message
self._message = message
self._buf_lock = asyncio.Lock()
@property
def command(self):
return self._command
@property
def short_command(self):
if self._short is None:
self._short = (
self._command if len(self._command) < 80 else
self._command[:self._command.find(" ")]
)
return self._short
@property
def from_user(self):
return self._from_user
@property
def message(self):
return self._message
@property
def proc(self):
return self._proc
@property
def pid(self):
return str(self._proc.pid) if self._proc else "---"
@property
def running(self):
return self._proc and self._proc.returncode is None
@property
def name(self):
return f"{self._message.chat.id}/{self._message.id}"
@property
def header(self):
if self._header is None:
self._header = f"[<code>{self.pid}</code>] <code>{html.escape(self.short_command)}</code>"
return self._header
@property
def markup(self):
if self._markup is None:
self._markup = InlineKeyboardMarkup([
[
InlineKeyboardButton("❌ SIGKILL", callback_data=f"sig {signal.SIGKILL.value}"),
InlineKeyboardButton("🛑 SIGTERM", callback_data=f"sig {signal.SIGTERM.value}"),
],
[InlineKeyboardButton(
"#️⃣ Send secret input",
switch_inline_query_current_chat=f"type {self._message.chat.id} {self._top_message.id} ",
)],
])
return self._markup
# Write to stdin
async def write(self, string: str, secret: bool = False):
self._proc.stdin.write(bytes(f"{string}\n", "utf-8"))
await self._proc.stdin.drain()
log_str = "[secret]" if secret else f"\"{string}\""
log.info(f"~[{self.pid}] {self.short_command} << {log_str}")
def send_signal(self, signal):
return self._proc.send_signal(signal)
# Append output to the buffer using a Lock
async def buf_append(self, buf: bytes):
async with self._buf_lock:
self._buf += buf
# Edit and ignore MessageNotModified
async def _edit(self, *args, **kwargs):
try:
await self._message.edit(*args, **kwargs)
except MessageNotModified:
pass
# Refresh the output message
async def flush(self):
async with self._buf_lock:
# Handle max length (4096 bytes)
while len(self._buf) > MAX_LENGTH:
await try_wait(
self._edit,
f"{self.header}\n\n"
"Output:\n<pre language=\"log\">"
f"{html.escape(self._buf[:MAX_LENGTH].decode())}%</pre>\n"
)
self._buf = b"%" + self._buf[MAX_LENGTH:]
self._message = await try_wait(self._message.reply, "Loading...", quote=True)
# Update the output message
await try_wait(
self._edit,
f"{self.header}\n\n"
f"Output:\n<pre language=\"log\">{html.escape(self._buf.decode() or '%')}</pre>\n" +
("Running..." if self._time is None else
f"Exited with code <code>{self._proc.returncode}</code> in {self._time:.3f}s."),
reply_markup=self.markup if self.running else None,
)
# Return the stream name given the stream object
async def stream_name(self, stream):
return (
"stdout" if stream is self._proc.stdout else
"stderr" if stream is self._proc.stderr else
"unknown"
)
# Thread's target to log stdout and update the Telegram message
async def _log(self, stream):
while True:
buf = await stream.readline()
if not buf:
break
await self.buf_append(buf)
if self.running:
# The buffer is flushed only when the program is running, otherwise it's run()'s task.
# I could read all bytes above here and flush all at once, but then stdout and stderr
# would print their statements in different order, so I'll keep the readline() method
# and buf_append() for each line from both stdout and stderr
try:
await self.flush()
except Exception:
log.error(f"Error while flushing {self.name}/{self.stream_name(stream)} ({self.command}):")
traceback.print_exc()
# Run the command
async def run(self):
loop = asyncio.get_event_loop()
start = time.time()
proc = self._proc = await asyncio.create_subprocess_shell(
self._command,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
log.info(f"+[{self.pid}] {self.short_command}")
# Update output message with PID and buttons
asyncio.run_coroutine_threadsafe(self.flush(), loop)
# Start logging stdout and stderr
fut_out = asyncio.wrap_future(
asyncio.run_coroutine_threadsafe(self._log(proc.stdout), loop))
fut_err = asyncio.wrap_future(
asyncio.run_coroutine_threadsafe(self._log(proc.stderr), loop))
await self._proc.wait()
self._time = time.time() - start # round(() * 1000)/1000
log.info(f"×[{self.pid}] {self.short_command} ({self._proc.returncode})")
# Flush at once what's left
await fut_out
await fut_err
await self.flush()
# Let's put this here to notify users when a process stops
await self._message.reply(
f"Process exited with code <code>{self._proc.returncode}</code>.\n"
f"Execution time: {self._time:.3f}s.",
quote=True,
)