Skip to content
This repository has been archived by the owner on Jun 25, 2023. It is now read-only.

nginx-uvicorn-asyncio #34

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion autotests/helm/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ global:
activeDeadlineSeconds: 3600 # 1h

env:
PARTICIPANT_NAME: <REPLACE_WITH_USERNAME>
PARTICIPANT_NAME: Viktor Kupko
api_host: http://inca-smc-mlops-challenge-solution.default.svc.cluster.local/<REPLACE_WITH_ENDPOINT>

# K6, do not edit!
Expand Down
47 changes: 47 additions & 0 deletions solution/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Использование PyTorch образа с поддержкой CUDA и cuDNN в качестве базового
FROM pytorch/pytorch:latest
# FROM nvidia/cuda:12.1.1-cudnn8-runtime-ubi8

# Обновление pip
RUN pip install --upgrade pip

# Set environment variables
ARG REDIS_HOST
ENV REDIS_HOST=$REDIS_HOST
ARG REDIS_PASSWORD
ENV REDIS_PASSWORD=$REDIS_PASSWORD

# Set CUDA environment variables
ENV NVIDIA_VISIBLE_DEVICES all
ENV NVIDIA_DRIVER_CAPABILITIES compute,utility

RUN apt-get update && apt-get install -y \
wget \
curl \
make \
gcc \
procps \
lsof \
vim \
supervisor \
nginx \
linux-headers-$(uname -r)

COPY default /etc/nginx/sites-available/
COPY default /etc/nginx/sites-enabled/

WORKDIR /solution

COPY requirements.txt .

RUN pip install -r requirements.txt

COPY /dev_parallel/ .

COPY /gunicorn_conf.py .

# Копируйте файл конфигурации supervisord в контейнер
COPY supervisord.conf /etc/supervisor/conf.d/supervisord.conf

# Используйте CMD для запуска supervisord
CMD ["/usr/bin/supervisord"]
17 changes: 17 additions & 0 deletions solution/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# app tornado-redis-asyncio v0.1b

App as a service packed in docker env. Using docker-compose for run 2 vm's one with app and one with redis.
All settings can be configured in config.py

... to be continued

p.s. now respond structure is not the same as example and will be changed in early future for pass tests.

## How to run ?

You can run this app as service using docker-compose.

bash$

1. docker build -t name
2. docker-compose up
13 changes: 13 additions & 0 deletions solution/default
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#default
upstream app_servers {
server 0.0.0.0:8001;

}

server {
listen 8000;

location / {
proxy_pass http://app_servers;
}
}
37 changes: 37 additions & 0 deletions solution/dev_parallel/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
config = {


"workers": [

{
"queue": 1,
"worker_name": "worker1",
"model_name": "cardiffnlp/twitter-xlm-roberta-base-sentiment",
"model_labels": ["NEGATIVE", "NEUTRAL", "POSITIVE"] # Labels for sentiment model
},
{
"queue": 2,
"worker_name": "worker2",
"model_name": "ivanlau/language-detection-fine-tuned-on-xlm-roberta-base",
"model_labels": ["Arabic", "Basque", "Breton", "Catalan", "Chinese_China", "Chinese_Hongkong", "Chinese_Taiwan", "Chuvash", "Czech", "Dhivehi", "Dutch", "English", "Esperanto", "Estonian", "French", "Frisian", "Georgian", "German", "Greek", "Hakha_Chin", "Indonesian", "Interlingua", "Italian", "Japanese", "Kabyle", "Kinyarwanda", "Kyrgyz", "Latvian", "Maltese", "Mongolian", "Persian", "Polish", "Portuguese", "Romanian", "Romansh_Sursilvan", "Russian", "Sakha", "Slovenian", "Spanish", "Swedish", "Tamil", "Tatar", "Turkish", "Ukrainian", "Welsh"] # Labels for language detection model
},
{
"queue": 3,
"worker_name": "worker3",
"model_name": "svalabs/twitter-xlm-roberta-crypto-spam",
"model_labels": ["HAM", "SPAM"] # Labels for crypto spam model
},
{
"queue": 4,
"worker_name": "worker4",
"model_name": "EIStakovskii/xlm_roberta_base_multilingual_toxicity_classifier_plus",
"model_labels": ["LABEL_0", "LABEL_1"] # Label_1 means TOXIC, Label_0 means NOT TOXIC.
},
{
"queue": 5,
"worker_name": "worker5",
"model_name": "jy46604790/Fake-News-Bert-Detect",
"model_labels": ["LABEL_0", "LABEL_1"] # LABEL_0: Fake news, LABEL_1: Real news
}
]
}
51 changes: 51 additions & 0 deletions solution/dev_parallel/main_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# main_handler.py
import uuid
import logging
from typing import Union, List, Dict
from fastapi import HTTPException
import asyncio

async def process_data(data: Union[str, List[Dict[str, str]], Dict[str, str]], queues, futures_dict, logger):
if data is None or data == '':
# Если данные пустые, создаем словарь с ключом 'data' и значением "Empty request"
data = {'data': "Empty request"}
elif isinstance(data, list):
# Если входные данные - список словарей, оставляем его как есть
data = {'data': data}
elif isinstance(data, str):
# Если входные данные - просто строка, преобразуем её в словарь с ключом 'data'
data = {'data': data}
elif isinstance(data, dict) and 'data' in data:
# Если входные данные - словарь и содержат ключ 'data', используем его
pass
else:
# Если ни одно из условий не выполнилось, возвращаем сообщение об ошибке
raise HTTPException(status_code=400, detail="Invalid format. Expected JSON with a 'data' key or a simple text.")

if not isinstance(data['data'], (str, list)):
# Если 'data' не является строкой или списком, возвращаем сообщение об ошибке
raise HTTPException(status_code=400, detail="Invalid 'data' format. Expected a string or a list.")

correlation_id = str(uuid.uuid4())
logging.info(f'Received data: {data}. Assigned correlation_id: {correlation_id}.')

# Назначение задачи для воркера
message = {
"correlation_id": correlation_id,
"data": data,
}

# Отправляем сообщение в каждую из очередей
for queue in queues:
await queue.put(message)
logging.info(f'Pushed to {id(queue)} : {message}')

# Создание Future и его сохранение в словаре
future = asyncio.Future()
futures_dict[correlation_id] = future
logging.info(f'Future created for correlation_id: {correlation_id}.')

# Ожидание результата и его запись в ответ
result = await future
logging.info(f'Received result for correlation_id: {correlation_id}. Result: {result}')
return result
38 changes: 38 additions & 0 deletions solution/dev_parallel/responder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import json
import asyncio

class Responder:
def __init__(self, futures_dict, result_queue, logger):
self.logger = logger
self.futures_dict = futures_dict
self.result_queue = result_queue
self.tasks = {}

async def fetch_results(self):
while True:
result_string = await self.result_queue.get()
result = json.loads(result_string)
correlation_id = result.get('correlation_id')
worker_result = result.get('result')

if correlation_id not in self.tasks:
self.tasks[correlation_id] = [0, {}]

self.tasks[correlation_id][1].update(worker_result)
self.tasks[correlation_id][0] += 1

if self.tasks[correlation_id][0] >= 5:
asyncio.create_task(self.process_results(correlation_id))

async def process_results(self, correlation_id):
aggregated_results = self.tasks[correlation_id][1]
final_message = aggregated_results
future = self.futures_dict.pop(correlation_id, None)
if future is not None:
future.set_result(final_message)
del self.tasks[correlation_id]

async def start(self):
asyncio.create_task(self.fetch_results())
while True:
await asyncio.sleep(0.025)
62 changes: 62 additions & 0 deletions solution/dev_parallel/server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# server.py
import asyncio
import threading
import logging
from typing import Union, List, Dict
from fastapi import FastAPI
from worker import Worker
from responder import Responder
from main_handler import process_data
from config import config # Import the configuration

app = FastAPI()

# Configure logging
logger = logging.getLogger('server')
logger.setLevel(logging.DEBUG)

futures_dict = {}
worker_queues = [asyncio.Queue() for _ in range(5)] # Create 5 worker queues
result_queue = asyncio.Queue() # Create a single result queue

def start_worker(worker_instance):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(worker_instance.start())
loop.close()

def start_responder(responder_instance):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(responder_instance.start())
loop.close()

@app.on_event("startup")
async def startup_event():
# Start the workers in separate threads
worker_threads = []
worker_events = []
for i, queue in enumerate(worker_queues):
logging.info(f'Starting worker with queue {id(queue)}.')
worker_config = config["workers"][i] # Get the configuration for this worker
worker_event = threading.Event()
worker_instance = Worker(worker_config, queue, result_queue, worker_event, logger)
worker_thread = threading.Thread(target=start_worker, args=(worker_instance,))
worker_thread.start()
worker_threads.append(worker_thread)
worker_events.append(worker_event)

# Wait for each worker to fully load before starting the next one
for worker_event in worker_events:
worker_event.wait()

# Start the Responder in a separate thread
responder_instance = Responder(futures_dict, result_queue, logger)
responder_thread = threading.Thread(target=start_responder, args=(responder_instance,))
responder_thread.start()

logging.info('Server started, running result listener')

@app.post("/process")
async def process_endpoint(data: Union[str, List[Dict[str, str]], Dict[str, str]]):
return await process_data(data, worker_queues, futures_dict, logger)
110 changes: 110 additions & 0 deletions solution/dev_parallel/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import logging
import json
import asyncio
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import torch
import os

# Настройка логирования
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class Worker:
def __init__(self, worker_config, message_queue, results_queue, event, logger):
self.logger = logger
# Взять имя воркера из конфигурации
self.worker_name = worker_config["worker_name"]
self.model_name = worker_config["model_name"]
self.model_labels = worker_config["model_labels"]
self.event = event

self.message_queue = message_queue
self.results_queue = results_queue

# Загрузка модели
logger.info(f"Loading model {self.model_name}...")
self.model = AutoModelForSequenceClassification.from_pretrained(self.model_name)
if torch.cuda.is_available():
self.model = self.model.to('cuda')
logger.info("Using GPU for model computations.")
else:
logger.info("Using CPU for model computations.")
logger.info(f"Model {self.model_name} loaded.")

# Загрузка токенизатора
logger.info(f"Loading tokenizer...")
self.tokenizer = AutoTokenizer.from_pretrained(self.model_name)
logger.info(f"Tokenizer for {self.model_name} loaded.")

# Проверка модели на тестовых данных
self.test_model()

# Signal that the worker has fully loaded
self.event.set()
logger.info(f"Worker started.{self.worker_name} ")

def test_model(self):
test_data = "This is a test sentence."
inputs = self.tokenizer(test_data, return_tensors='pt')
if torch.cuda.is_available():
inputs = inputs.to('cuda')

while True:
try:
with torch.no_grad():
outputs = self.model(**inputs)

predictions = outputs.logits.argmax(dim=-1).item()
logger.info(f"Test data processed successfully. Predicted label: {predictions}")
break # Если тест прошел успешно, выходим из цикла
except Exception as e:
logger.error(f"Test failed with error: {e}. Retrying...")
continue

BATCH_SIZE = 5 # Подходящий размер пакета

async def start(self):
batch = []
while True:
if not self.message_queue.empty() and len(batch) < self.BATCH_SIZE:
message = await self.message_queue.get()
batch.append(message)
elif batch:
await self.process_batch(batch)
batch = []
else:
await asyncio.sleep(0.01)



async def process_batch(self, batch):
bodies = [json.loads(message) if isinstance(message, str) else message for message in batch]
texts = [body['data']['data'] for body in bodies]
correlation_ids = [body['correlation_id'] for body in bodies]

inputs = self.tokenizer(texts, return_tensors='pt', padding=True, truncation=True)

if torch.cuda.is_available():
inputs = inputs.to('cuda') #

with torch.no_grad():
outputs = self.model(**inputs)

predictions = outputs.logits.argmax(dim=-1)
probabilities = torch.nn.functional.softmax(outputs.logits, dim=-1)

for i, prediction in enumerate(predictions):
score = probabilities[i][prediction].item()
label = self.model_labels[prediction.item()]
result_key = self.model_name.split('/')[0]
result = {result_key: {"score": score, "label": label}}

results_dict = {
"correlation_id": correlation_ids[i],
"worker": self.worker_name,
"result": result
}
results_dict = json.dumps(results_dict)
await self.results_queue.put(results_dict)


Loading