Skip to content
This repository has been archived by the owner on Jun 25, 2023. It is now read-only.

v1.0b #25

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion autotests/helm/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ global:
activeDeadlineSeconds: 3600 # 1h

env:
PARTICIPANT_NAME: <REPLACE_WITH_USERNAME>
PARTICIPANT_NAME: Viktor Kupko
api_host: http://inca-smc-mlops-challenge-solution.default.svc.cluster.local/<REPLACE_WITH_ENDPOINT>

# K6, do not edit!
Expand Down
38 changes: 38 additions & 0 deletions solution/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Использование PyTorch образа с поддержкой CUDA и cuDNN в качестве базового
FROM pytorch/pytorch:latest

# Set environment variables
ARG REDIS_HOST
ENV REDIS_HOST=$REDIS_HOST
ARG REDIS_PASSWORD
ENV REDIS_PASSWORD=$REDIS_PASSWORD

# Set CUDA environment variables
ENV NVIDIA_VISIBLE_DEVICES all
ENV NVIDIA_DRIVER_CAPABILITIES compute,utility

# Обновление pip
RUN pip install --upgrade pip

RUN apt-get update && apt-get install -y \
wget \
make \
gcc \
procps \
lsof \
vim \
supervisor

WORKDIR /solution

COPY requirements.txt .

RUN pip install -r requirements.txt

COPY /dev_parallel/ .

# Копируйте файл конфигурации supervisord в контейнер
COPY supervisord.conf /etc/supervisor/conf.d/supervisord.conf

# Используйте CMD для запуска supervisord
CMD ["/usr/bin/supervisord"]
17 changes: 17 additions & 0 deletions solution/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# app tornado-redis-asyncio v0.1b

App as a service packed in docker env. Using docker-compose for run 2 vm's one with app and one with redis.
All settings can be configured in config.py

... to be continued

p.s. now respond structure is not the same as example and will be changed in early future for pass tests.

## How to run ?

You can run this app as service using docker-compose.

bash$

1. docker build -t name
2. docker-compose up
78 changes: 78 additions & 0 deletions solution/dev_parallel/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
config = {
"requests_db": {
"db": "0"
},
"results_db": {
"db": "6"
},
# settings for farm/cluster of workers ! cannot use 0 or 6 reserved for req/res and max 16 for one redis instance
"farm_1": {
"db": ["1", "2", "3", "4", "5"]
},
"farm_2": {
"db": ["1", "8", "9", "10", "5"]
},

"workers": [
{
"db": 1,
"worker_name": "worker1",
"model_name": "cardiffnlp/twitter-xlm-roberta-base-sentiment",
"model_labels": ["NEGATIVE", "NEUTRAL", "POSITIVE"] # Labels for sentiment model
},
{
"db": 2,
"worker_name": "worker2",
"model_name": "ivanlau/language-detection-fine-tuned-on-xlm-roberta-base",
"model_labels": ["Arabic", "Basque", "Breton", "Catalan", "Chinese_China", "Chinese_Hongkong", "Chinese_Taiwan", "Chuvash", "Czech", "Dhivehi", "Dutch", "English", "Esperanto", "Estonian", "French", "Frisian", "Georgian", "German", "Greek", "Hakha_Chin", "Indonesian", "Interlingua", "Italian", "Japanese", "Kabyle", "Kinyarwanda", "Kyrgyz", "Latvian", "Maltese", "Mongolian", "Persian", "Polish", "Portuguese", "Romanian", "Romansh_Sursilvan", "Russian", "Sakha", "Slovenian", "Spanish", "Swedish", "Tamil", "Tatar", "Turkish", "Ukrainian", "Welsh"] # Labels for language detection model
},
{
"db": 3,
"worker_name": "worker3",
"model_name": "svalabs/twitter-xlm-roberta-crypto-spam",
"model_labels": ["HAM", "SPAM"] # Labels for crypto spam model
},
{
"db": 4,
"worker_name": "worker4",
"model_name": "EIStakovskii/xlm_roberta_base_multilingual_toxicity_classifier_plus",
"model_labels": ["LABEL_0", "LABEL_1"] # Label_1 means TOXIC, Label_0 means NOT TOXIC.
},
{
"db": 5,
"worker_name": "worker5",
"model_name": "jy46604790/Fake-News-Bert-Detect",
"model_labels": ["LABEL_0", "LABEL_1"] # LABEL_0: Fake news, LABEL_1: Real news
},
{
"db": 7,
"worker_name": "worker7",
"model_name": "cardiffnlp/twitter-xlm-roberta-base-sentiment",
"model_labels": ["NEGATIVE", "NEUTRAL", "POSITIVE"] # Labels for sentiment model
},
{
"db": 8,
"worker_name": "worker8",
"model_name": "ivanlau/language-detection-fine-tuned-on-xlm-roberta-base",
"model_labels": ["Arabic", "Basque", "Breton", "Catalan", "Chinese_China", "Chinese_Hongkong", "Chinese_Taiwan", "Chuvash", "Czech", "Dhivehi", "Dutch", "English", "Esperanto", "Estonian", "French", "Frisian", "Georgian", "German", "Greek", "Hakha_Chin", "Indonesian", "Interlingua", "Italian", "Japanese", "Kabyle", "Kinyarwanda", "Kyrgyz", "Latvian", "Maltese", "Mongolian", "Persian", "Polish", "Portuguese", "Romanian", "Romansh_Sursilvan", "Russian", "Sakha", "Slovenian", "Spanish", "Swedish", "Tamil", "Tatar", "Turkish", "Ukrainian", "Welsh"] # Labels for language detection model
},
{
"db": 9,
"worker_name": "worker9",
"model_name": "svalabs/twitter-xlm-roberta-crypto-spam",
"model_labels": ["HAM", "SPAM"] # Labels for crypto spam model
},
{
"db": 10,
"worker_name": "worker10",
"model_name": "EIStakovskii/xlm_roberta_base_multilingual_toxicity_classifier_plus",
"model_labels": ["LABEL_0", "LABEL_1"] # Label_1 means TOXIC, Label_0 means NOT TOXIC.
},
{
"db": 11,
"worker_name": "worker11",
"model_name": "jy46604790/Fake-News-Bert-Detect",
"model_labels": ["LABEL_0", "LABEL_1"] # LABEL_0: Fake news, LABEL_1: Real news
}
]
}
66 changes: 66 additions & 0 deletions solution/dev_parallel/responder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import tornado.ioloop
import tornado.gen
import json
import redis
import logging
from os import environ

# Настройка логирования
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

REDIS_PASSWORD = environ.get("REDIS_PASSWORD")
REDIS_HOST = environ.get("REDIS_HOST")

# Подключение к Redis
r = redis.Redis(host=REDIS_HOST, password=REDIS_PASSWORD, port=6379, db=6)

async def result_listener(futures_dict):
while True:
pipeline = r.pipeline()
logger.info('Connected to Redis Pipeline.')

valid_sets = [] # Здесь мы будем хранить все наборы данных, которые удовлетворяют нашим условиям

for key in r.scan_iter():
# Здесь добавляем команду в пайплайн, но еще не выполняем ее
pipeline.lrange(key, 0, -1)

# Теперь выполняем все команды, добавленные в пайплайн
values_list = pipeline.execute()

for key, values in zip(r.scan_iter(), values_list):
# Проверяем, что количество элементов в списке больше или равно 5
if len(values) < 5:
continue

workers = set(json.loads(value.decode())['worker'] for value in values)

# Проверяем, что результаты от всех воркеров присутствуют
if len(workers) < 5:
continue

valid_sets.append((key, values))

if valid_sets:
for key, values in valid_sets:
key = key.decode()
final_message = {"correlation_id": key, "results": []}

for value in values:
message = json.loads(value.decode())
final_message["results"].append({"worker": message["worker"], "result": message["result"]})

# Добавляем команду в пайплайн и сразу выполняем
pipeline.ltrim(key, len(values), -1).execute()

future = futures_dict.pop(final_message["correlation_id"], None)
if future is not None:
future.set_result(final_message)
logger.info(f'Successfully returned result for key: {key}')
else:
logger.warning(f"No Future found for key: {key}. Current futures_dict: {futures_dict}")
else:
logger.info('No sets with 5 values found, skipping connection attempt.')

await tornado.gen.sleep(0.01)
101 changes: 101 additions & 0 deletions solution/dev_parallel/server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import tornado.ioloop
import tornado.web
import tornado.gen
import json
import redis
import uuid
import logging
from responder import result_listener
from os import environ

# Configure logging
logging.basicConfig(level=logging.DEBUG)


REDIS_HOST = environ.get("REDIS_HOST")
# Log the Redis host and password
logging.info(f"Redis Host: {REDIS_HOST}")

REDIS_PASSWORD = environ.get("REDIS_PASSWORD")

r = redis.Redis(host=REDIS_HOST, password=REDIS_PASSWORD, port=6379, db=0)
# Try to connect to Redis
try:
r.ping()
logging.info("Successfully connected to Redis")
except redis.ConnectionError:
logging.error("Failed to connect to Redis")

futures_dict = {}

class MainHandler(tornado.web.RequestHandler):
async def post(self):
request_body = json.loads(self.request.body)
if isinstance(request_body, list):
# Если входные данные - список словарей, преобразуем его в строку
data = {'data': json.dumps(request_body)}
await self.process_data(data)
elif isinstance(request_body, str):
# Если входные данные - просто строка, преобразуем её в словарь с ключом 'data'
data = {'data': request_body}
await self.process_data(data)
elif isinstance(request_body, dict) and 'data' in request_body:
# Если входные данные - словарь и содержат ключ 'data', используем его
await self.process_data(request_body)
else:
# Если ни одно из условий не выполнилось, возвращаем сообщение об ошибке
self.set_status(400)
self.write({"error": "Invalid format. Expected JSON with a 'data' key or a simple text."})
return

async def process_data(self, data):
if not isinstance(data['data'], str):
# Если 'data' не является строкой, возвращаем сообщение об ошибке
self.set_status(400)
self.write({"error": "Invalid 'data' format. Expected a string."})
return

correlation_id = str(uuid.uuid4())
logging.info(f'Received data: {data}. Assigned correlation_id: {correlation_id}.')

# Назначение задачи для воркера
message = {
"correlation_id": correlation_id,
"data": data,
}
r.rpush('dispatcher', json.dumps(message))
logging.info(f'Pushed to Redis: {message}')

# Создание Future и его сохранение в словаре
future = tornado.gen.Future()
futures_dict[correlation_id] = future
logging.info(f'Future created for correlation_id: {correlation_id}.')

# Ожидание результата и его запись в ответ
result = await future
logging.info(f'Received result for correlation_id: {correlation_id}. Result: {result}')
# Извлечение данных под ключом 'results'
results_list = result['results']

# Сортировка списка словарей по имени воркера
sorted_results = sorted(results_list, key=lambda k: k['worker'])

# Объединение всех результатов в единый словарь
final_result = {}
for res in sorted_results:
final_result.update(res['result'])

self.write(json.dumps(final_result))

def make_app():
return tornado.web.Application([
(r"/process", MainHandler),
])

if __name__ == "__main__":
app = make_app()
app.listen(8000, address='0.0.0.0')
logging.info('Server started.')
tornado.ioloop.IOLoop.current().spawn_callback(result_listener, futures_dict)
logging.info('Server started, running result listener')
tornado.ioloop.IOLoop.current().start()
59 changes: 59 additions & 0 deletions solution/dev_parallel/tasker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import asyncio
import aioredis
import json
import logging
import os
from config import config # импортируем конфигурационный файл

logging.basicConfig(level=logging.INFO)

class QueueManager:
def __init__(self):
self.redis_host = os.environ.get('REDIS_HOST')
self.redis_port = 6379
self.redis_password = os.environ.get('REDIS_PASSWORD')
self.group_settings = self.load_group_settings() # загружаем настройки групп
self.lock = asyncio.Lock()

def load_group_settings(self):
# загружаем конфигурацию групп только один раз при инициализации
group_settings = []
for key in config.keys():
if 'farm_' in key:
group_settings.append({'db_numbers': config[key]['db']})
return group_settings

async def start(self):
logging.info('Starting Queue Manager...')
self.redis_source = aioredis.from_url(f"redis://{self.redis_host}:{self.redis_port}/0", password=self.redis_password)
self.target_groups = [
[aioredis.from_url(f"redis://{self.redis_host}:{self.redis_port}/{i}", password=self.redis_password) for i in group['db_numbers']]
for group in self.group_settings
]
logging.info('Connected to Redis.')
await self.transfer_messages_from_redis_to_redis()

async def publish_message_to_redis(self, message_body, redis_db):
await redis_db.rpush('dispatcher', message_body)

async def transfer_messages_from_redis_to_redis(self, redis_queue_name='dispatcher'):
current_group_index = 0 # Индекс текущей группы
while True:
async with self.lock:
logging.info('Waiting for message in Redis queue...')
_, message_body = await self.redis_source.blpop(redis_queue_name)
logging.info(f'Received message from Redis queue: {message_body}')
if message_body:
target_group = self.target_groups[current_group_index] # Выбираем текущую группу
logging.info(f'Sending message to group {current_group_index}.')
publish_tasks = [self.publish_message_to_redis(message_body, redis_target) for redis_target in target_group]
await asyncio.gather(*publish_tasks)
logging.info(f'Published message to group {current_group_index}.')
current_group_index = (current_group_index + 1) % len(self.target_groups) # Переходим к следующей группе
else:
logging.error('Received empty message from Redis queue.')
await asyncio.sleep(0.01) # Добавляем небольшую задержку, чтобы уменьшить нагрузку на ЦП

loop = asyncio.get_event_loop()
manager = QueueManager()
loop.run_until_complete(manager.start())
Loading