Skip to content

Commit

Permalink
Scheduler to clear out inactive conversations
Browse files Browse the repository at this point in the history
  • Loading branch information
heiko-braun committed Jan 12, 2024
1 parent b827457 commit ed4b9e3
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 12 deletions.
13 changes: 11 additions & 2 deletions core/slack.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,17 @@
AgentTokenBufferMemory,
)

import datetime

from abc import ABC, abstractmethod
from statemachine import State
from statemachine import StateMachine

# --

# the time in seconds, after which a conversation will be retried if inactive
CONVERSATION_EXPIY_TIME=60

class StatusStrategy(ABC):
@abstractmethod
def print(self, message) -> str:
Expand Down Expand Up @@ -68,6 +73,7 @@ class Conversation(StateMachine):

def __init__(self, slack_client, channel, thread_ts):

self.last_activity = datetime.datetime.now()
self.client = slack_client
self.feedback = SlackStatus(slack_client=slack_client, channel=channel, thread_ts=thread_ts)

Expand All @@ -91,6 +97,9 @@ def __init__(self, slack_client, channel, thread_ts):

super().__init__()

def is_expired(self):
return self.last_activity < datetime.datetime.now()-datetime.timedelta(seconds=CONVERSATION_EXPIY_TIME)

def on_enter_greeting(self):
# mimic the first LLM response to get things started
self.response_handle = {
Expand All @@ -103,6 +112,7 @@ def on_enter_running(self):

print("running ..")

self.last_activity = datetime.datetime.now()
self.feedback.set_tagline("Thinking ...")

# request chat completion
Expand All @@ -126,8 +136,7 @@ def on_enter_answered(self):
slack_response = self.client.chat_postMessage(
channel=self.channel,
thread_ts=self.thread_ts,
text=f"{response_content}")

text=f"{response_content}")

self.feedback.set_visible(False)

Expand Down
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,5 @@ tiktoken
slack-bolt
slack
aiohttp
httpx
httpx
apscheduler
49 changes: 40 additions & 9 deletions slack_bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
import signal
import sys
import time
import threading

import datetime as dt
from apscheduler.schedulers.background import BackgroundScheduler

from slack_bolt.adapter.socket_mode import SocketModeHandler
from slack_sdk import WebClient
Expand All @@ -16,29 +20,47 @@
app = App(token=os.environ['SLACK_BOT_TOKEN'])
client = WebClient(os.environ['SLACK_BOT_TOKEN'])
socket = SocketModeHandler(app, os.environ['SLACK_APP_TOKEN'])
scheduler = BackgroundScheduler()

active_conversations = []
conversation_lock = threading.Lock()

# makre sure conversation are retried when bot stops
# make sure conversation are retried when bot stops
def graceful_shutdown(signum, frame):
print("Shutdown bot ...")

# retire all active conversations
[ref["conversation"].retire() for ref in active_conversations]
time.sleep(3)

# stop the scheduler
scheduler.shutdown(wait=False)

# stop the listener
socket.disconnect()
socket.close()

sys.exit(0)

def find_conversation(thread_ts):

for ref in active_conversations:
if(ref["id"]==thread_ts):
return ref["conversation"]
else:
return None

with conversation_lock:
for ref in active_conversations:
if(ref["id"]==thread_ts):
return ref["conversation"]
else:
return None

def retire_inactive_conversation():
with conversation_lock:
for ref in active_conversations:
conversation = ref["conversation"]
if(conversation.is_expired()):
if(conversation.current_state!='answered'):
conversation.retire()
active_conversations.remove(ref)
else:
print("Conversation is still active, keep for next cycle: ", str(conversation))

# This gets activated when the bot is tagged in a channel
# it will start a new thread that will hold the conversation
@app.event("app_mention")
Expand All @@ -56,7 +78,7 @@ def handle_message_events(body, logger):

response_channel = body["event"]["channel"]
response_thread = body["event"]["event_ts"]

# register new conversation
conversation = Conversation(
slack_client=client,
Expand Down Expand Up @@ -100,9 +122,18 @@ def handle_message_events(event, say):
# outside thread we ingore messages
pass


if __name__ == "__main__":

signal.signal(signal.SIGINT, graceful_shutdown)
signal.signal(signal.SIGTERM, graceful_shutdown)

# scheduler reaper
scheduler.add_job(retire_inactive_conversation, 'interval', seconds=5, id='retirement_job')
scheduler.start()

# start listening for messages
socket.start()



0 comments on commit ed4b9e3

Please sign in to comment.