Skip to content
This repository has been archived by the owner on Jun 25, 2023. It is now read-only.

v1.0b #25

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion autotests/helm/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ global:
activeDeadlineSeconds: 3600 # 1h

env:
PARTICIPANT_NAME: <REPLACE_WITH_USERNAME>
PARTICIPANT_NAME: Viktor Kupko
api_host: http://inca-smc-mlops-challenge-solution.default.svc.cluster.local/<REPLACE_WITH_ENDPOINT>

# K6, do not edit!
Expand Down
38 changes: 38 additions & 0 deletions solution/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Использование PyTorch образа с поддержкой CUDA и cuDNN в качестве базового
FROM pytorch/pytorch:latest

# Set environment variables
ARG REDIS_HOST
ENV REDIS_HOST=$REDIS_HOST
ARG REDIS_PASSWORD
ENV REDIS_PASSWORD=$REDIS_PASSWORD

# Set CUDA environment variables
ENV NVIDIA_VISIBLE_DEVICES all
ENV NVIDIA_DRIVER_CAPABILITIES compute,utility

# Обновление pip
RUN pip install --upgrade pip

RUN apt-get update && apt-get install -y \
wget \
make \
gcc \
procps \
lsof \
vim \
supervisor

WORKDIR /solution

COPY requirements.txt .

RUN pip install -r requirements.txt

COPY /dev_parallel/ .

# Копируйте файл конфигурации supervisord в контейнер
COPY supervisord.conf /etc/supervisor/conf.d/supervisord.conf

# Используйте CMD для запуска supervisord
CMD ["/usr/bin/supervisord"]
17 changes: 17 additions & 0 deletions solution/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# app tornado-redis-asyncio v0.1b

App as a service packed in docker env. Using docker-compose for run 2 vm's one with app and one with redis.
All settings can be configured in config.py

... to be continued

p.s. now respond structure is not the same as example and will be changed in early future for pass tests.

## How to run ?

You can run this app as service using docker-compose.

bash$

1. docker build -t name
2. docker-compose up
48 changes: 48 additions & 0 deletions solution/dev_parallel/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
config = {
"requests_db": {
"db": "0"
},
"results_db": {
"db": "6"
},
# settings for farm/cluster of workers ! cannot use 0 or 6 reserved for req/res and max 16 for one redis instance
"farm_1": {
"db": ["1", "2", "3", "4", "5"]
},
"farm_2": {
"db": ["7", "8", "9", "10", "11"]
},

"workers": [
{
"db": 1,
"worker_name": "worker1",
"model_name": "cardiffnlp/twitter-xlm-roberta-base-sentiment",
"model_labels": ["NEGATIVE", "NEUTRAL", "POSITIVE"] # Labels for sentiment model
},
{
"db": 2,
"worker_name": "worker2",
"model_name": "ivanlau/language-detection-fine-tuned-on-xlm-roberta-base",
"model_labels": ["Arabic", "Basque", "Breton", "Catalan", "Chinese_China", "Chinese_Hongkong", "Chinese_Taiwan", "Chuvash", "Czech", "Dhivehi", "Dutch", "English", "Esperanto", "Estonian", "French", "Frisian", "Georgian", "German", "Greek", "Hakha_Chin", "Indonesian", "Interlingua", "Italian", "Japanese", "Kabyle", "Kinyarwanda", "Kyrgyz", "Latvian", "Maltese", "Mongolian", "Persian", "Polish", "Portuguese", "Romanian", "Romansh_Sursilvan", "Russian", "Sakha", "Slovenian", "Spanish", "Swedish", "Tamil", "Tatar", "Turkish", "Ukrainian", "Welsh"] # Labels for language detection model
},
{
"db": 3,
"worker_name": "worker3",
"model_name": "svalabs/twitter-xlm-roberta-crypto-spam",
"model_labels": ["HAM", "SPAM"] # Labels for crypto spam model
},
{
"db": 4,
"worker_name": "worker4",
"model_name": "EIStakovskii/xlm_roberta_base_multilingual_toxicity_classifier_plus",
"model_labels": ["LABEL_0", "LABEL_1"] # Label_1 means TOXIC, Label_0 means NOT TOXIC.
},
{
"db": 5,
"worker_name": "worker5",
"model_name": "jy46604790/Fake-News-Bert-Detect",
"model_labels": ["LABEL_0", "LABEL_1"] # LABEL_0: Fake news, LABEL_1: Real news
}
]
}
66 changes: 66 additions & 0 deletions solution/dev_parallel/responder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import tornado.ioloop
import tornado.gen
import json
import redis
import logging
from os import environ

# Настройка логирования
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

REDIS_PASSWORD = environ.get("REDIS_PASSWORD")
REDIS_HOST = environ.get("REDIS_HOST")

# Подключение к Redis
r = redis.Redis(host=REDIS_HOST, password=REDIS_PASSWORD, port=6379, db=6)

async def result_listener(futures_dict):
while True:
pipeline = r.pipeline()
logger.info('Connected to Redis Pipeline.')

valid_sets = [] # Здесь мы будем хранить все наборы данных, которые удовлетворяют нашим условиям

for key in r.scan_iter():
# Здесь добавляем команду в пайплайн, но еще не выполняем ее
pipeline.lrange(key, 0, -1)

# Теперь выполняем все команды, добавленные в пайплайн
values_list = pipeline.execute()

for key, values in zip(r.scan_iter(), values_list):
# Проверяем, что количество элементов в списке больше или равно 5
if len(values) < 5:
continue

workers = set(json.loads(value.decode())['worker'] for value in values)

# Проверяем, что результаты от всех воркеров присутствуют
if len(workers) < 5:
continue

valid_sets.append((key, values))

if valid_sets:
for key, values in valid_sets:
key = key.decode()
final_message = {"correlation_id": key, "results": []}

for value in values:
message = json.loads(value.decode())
final_message["results"].append({"worker": message["worker"], "result": message["result"]})

# Добавляем команду в пайплайн и сразу выполняем
pipeline.ltrim(key, len(values), -1).execute()

future = futures_dict.pop(final_message["correlation_id"], None)
if future is not None:
future.set_result(final_message)
logger.info(f'Successfully returned result for key: {key}')
else:
logger.warning(f"No Future found for key: {key}. Current futures_dict: {futures_dict}")
else:
logger.info('No sets with 5 values found, skipping connection attempt.')

await tornado.gen.sleep(0.05)
78 changes: 78 additions & 0 deletions solution/dev_parallel/server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import tornado.ioloop
import tornado.web
import tornado.gen
import json
import redis
import uuid
import logging
from responder import result_listener
from os import environ

# Configure logging
logging.basicConfig(level=logging.DEBUG)


REDIS_HOST = environ.get("REDIS_HOST")
# Log the Redis host and password
logging.info(f"Redis Host: {REDIS_HOST}")

REDIS_PASSWORD = environ.get("REDIS_PASSWORD")

r = redis.Redis(host=REDIS_HOST, password=REDIS_PASSWORD, port=6379, db=0)
# Try to connect to Redis
try:
r.ping()
logging.info("Successfully connected to Redis")
except redis.ConnectionError:
logging.error("Failed to connect to Redis")

futures_dict = {}

class MainHandler(tornado.web.RequestHandler):
async def post(self):
data = json.loads(self.request.body)
correlation_id = str(uuid.uuid4())

logging.info(f'Received data: {data}. Assigned correlation_id: {correlation_id}.')

# Назначение задачи для воркера
message = {
"correlation_id": correlation_id,
"data": data,
}
r.rpush('dispatcher', json.dumps(message))
logging.info(f'Pushed to Redis: {message}')

# Создание Future и его сохранение в словаре
future = tornado.gen.Future()
futures_dict[correlation_id] = future
logging.info(f'Future created for correlation_id: {correlation_id}.')

# Ожидание результата и его запись в ответ
result = await future
logging.info(f'Received result for correlation_id: {correlation_id}. Result: {result}')
# Извлечение данных под ключом 'results'
results_list = result['results']

# Сортировка списка словарей по имени воркера
sorted_results = sorted(results_list, key=lambda k: k['worker'])

# Объединение всех результатов в единый словарь
final_result = {}
for res in sorted_results:
final_result.update(res['result'])

self.write(json.dumps(final_result))

def make_app():
return tornado.web.Application([
(r"/process", MainHandler),
])

if __name__ == "__main__":
app = make_app()
app.listen(8000, address='0.0.0.0')
logging.info('Server started.')
tornado.ioloop.IOLoop.current().spawn_callback(result_listener, futures_dict)
logging.info('Server started, running result listener')
tornado.ioloop.IOLoop.current().start()
36 changes: 36 additions & 0 deletions solution/dev_parallel/tasker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import asyncio
import aioredis
import json
import logging
import os

logging.basicConfig(level=logging.INFO)

class QueueManager:
def __init__(self):
self.redis_host = os.environ.get('REDIS_HOST')
self.redis_port = 6379
self.redis_password = os.environ.get('REDIS_PASSWORD')

async def start(self):
logging.info('Starting Queue Manager...')
self.redis_source = aioredis.from_url(f"redis://{self.redis_host}:{self.redis_port}/0", password=self.redis_password)
self.redis_targets = [aioredis.from_url(f"redis://{self.redis_host}:{self.redis_port}/{i}", password=self.redis_password) for i in range(1, 6)]
logging.info('Connected to Redis.')
await self.transfer_messages_from_redis_to_redis()

async def publish_message_to_redis(self, message_body, redis_db):
await redis_db.rpush('dispatcher', message_body)

async def transfer_messages_from_redis_to_redis(self, redis_queue_name='dispatcher'):
while True:
logging.info('Waiting for message in Redis queue...')
_, message_body = await self.redis_source.blpop(redis_queue_name)
logging.info(f'Received message from Redis queue: {message_body}')
publish_tasks = [self.publish_message_to_redis(message_body, redis_target) for redis_target in self.redis_targets]
await asyncio.gather(*publish_tasks)
logging.info('Published message to all Redis targets.')

loop = asyncio.get_event_loop()
manager = QueueManager()
loop.run_until_complete(manager.start())
100 changes: 100 additions & 0 deletions solution/dev_parallel/worker1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import logging
import json
import redis
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import torch
import os

# Import config from config.py
from config import config

# Настройка логирования
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class Worker:
def __init__(self, worker_config, connection_config):
# получение значений переменных окружения
redis_host = os.environ.get('REDIS_HOST')
redis_password = os.environ.get('REDIS_PASSWORD')
redis_port = int(os.environ.get('REDIS_PORT', '6379'))

self.redis_result_config = connection_config["results_db"]

# Взять имя воркера из имени файла
self.queue_name = os.path.splitext(os.path.basename(__file__))[0]
self.model_name = worker_config["model_name"]
self.model_labels = worker_config["model_labels"]

self.redis_incoming = redis.Redis(host=redis_host,
port=redis_port,
db=worker_config['db'],
password=redis_password)

self.redis_outgoing = redis.Redis(host=redis_host,
port=redis_port,
db=self.redis_result_config['db'],
password=redis_password)

# Загрузка модели
logger.info(f"Loading model {self.model_name}...")
self.model = AutoModelForSequenceClassification.from_pretrained(self.model_name)
if torch.cuda.is_available():
self.model = self.model.to('cuda')
logger.info(f"Model {self.model_name} loaded.")

# Загрузка токенизатора
logger.info(f"Loading tokenizer...")
self.tokenizer = AutoTokenizer.from_pretrained(self.model_name)
logger.info(f"Tokenizer for {self.model_name} loaded.")

def start(self):
logger.info('Worker started.')
while True:
_, message = self.redis_incoming.blpop('dispatcher')
message_body = json.loads(message)
self.process_message(message_body)

def process_message(self, body):
correlation_id = body['correlation_id']
text = body['data']['data']
logger.info(f"Processing text: {text}")
inputs = self.tokenizer(text, return_tensors='pt')

if torch.cuda.is_available():
inputs = inputs.to('cuda')

outputs = self.model(**inputs)

predictions = outputs.logits.argmax(dim=-1).item()
probabilities = torch.nn.functional.softmax(outputs.logits, dim=-1)
score = probabilities[0][predictions].item()

label = self.model_labels[predictions]

result_key = self.model_name.split('/')[-1]
result = {result_key: {"score": score, "label": label}}
logger.info(f"Received task results {result}")

results_dict = {
"correlation_id": correlation_id,
"worker": self.queue_name,
"result": result #[self.model_name.split('/')[-1]]
}
results_dict = json.dumps(results_dict)
self.redis_outgoing.rpush(correlation_id, results_dict)
logger.info(f"Saved result to Redis with key {correlation_id} : {results_dict}")

# Загрузка конфигурации подключений из файла config.py
connection_config = config

worker_name = os.path.splitext(os.path.basename(__file__))[0]
worker_config = next((worker for worker in connection_config["workers"] if worker["worker_name"] == worker_name), None)

if not worker_config:
raise ValueError(f"No configuration found for worker {worker_name}")

# Создание и запуск воркера
if __name__ == "__main__":
worker = Worker(worker_config, connection_config)
worker.start()
Loading