diff --git a/autotests/helm/values.yaml b/autotests/helm/values.yaml index cda6a5e..1102397 100644 --- a/autotests/helm/values.yaml +++ b/autotests/helm/values.yaml @@ -25,7 +25,7 @@ global: activeDeadlineSeconds: 3600 # 1h env: - PARTICIPANT_NAME: + PARTICIPANT_NAME: Viktor Kupko api_host: http://inca-smc-mlops-challenge-solution.default.svc.cluster.local/ # K6, do not edit! diff --git a/solution/Dockerfile b/solution/Dockerfile new file mode 100644 index 0000000..3704a10 --- /dev/null +++ b/solution/Dockerfile @@ -0,0 +1,47 @@ +# Использование PyTorch образа с поддержкой CUDA и cuDNN в качестве базового +FROM pytorch/pytorch:latest +# FROM nvidia/cuda:12.1.1-cudnn8-runtime-ubi8 + +# Обновление pip +RUN pip install --upgrade pip + +# Set environment variables +ARG REDIS_HOST +ENV REDIS_HOST=$REDIS_HOST +ARG REDIS_PASSWORD +ENV REDIS_PASSWORD=$REDIS_PASSWORD + +# Set CUDA environment variables +ENV NVIDIA_VISIBLE_DEVICES all +ENV NVIDIA_DRIVER_CAPABILITIES compute,utility + +RUN apt-get update && apt-get install -y \ + wget \ + curl \ + make \ + gcc \ + procps \ + lsof \ + vim \ + supervisor \ + nginx \ + linux-headers-$(uname -r) + +COPY default /etc/nginx/sites-available/ +COPY default /etc/nginx/sites-enabled/ + +WORKDIR /solution + +COPY requirements.txt . + +RUN pip install -r requirements.txt + +COPY /dev_parallel/ . + +COPY /gunicorn_conf.py . + +# Копируйте файл конфигурации supervisord в контейнер +COPY supervisord.conf /etc/supervisor/conf.d/supervisord.conf + +# Используйте CMD для запуска supervisord +CMD ["/usr/bin/supervisord"] diff --git a/solution/README.md b/solution/README.md new file mode 100644 index 0000000..6886e45 --- /dev/null +++ b/solution/README.md @@ -0,0 +1,17 @@ +# app tornado-redis-asyncio v0.1b + +App as a service packed in docker env. Using docker-compose for run 2 vm's one with app and one with redis. +All settings can be configured in config.py + +... to be continued + +p.s. now respond structure is not the same as example and will be changed in early future for pass tests. + +## How to run ? + +You can run this app as service using docker-compose. + +bash$ + +1. docker build -t name +2. docker-compose up diff --git a/solution/default b/solution/default new file mode 100644 index 0000000..b3b4775 --- /dev/null +++ b/solution/default @@ -0,0 +1,13 @@ +#default +upstream app_servers { + server 0.0.0.0:8001; + +} + +server { + listen 8000; + + location / { + proxy_pass http://app_servers; + } +} diff --git a/solution/dev_parallel/config.py b/solution/dev_parallel/config.py new file mode 100644 index 0000000..7cd470a --- /dev/null +++ b/solution/dev_parallel/config.py @@ -0,0 +1,37 @@ +config = { + + + "workers": [ + + { + "queue": 1, + "worker_name": "worker1", + "model_name": "cardiffnlp/twitter-xlm-roberta-base-sentiment", + "model_labels": ["NEGATIVE", "NEUTRAL", "POSITIVE"] # Labels for sentiment model + }, + { + "queue": 2, + "worker_name": "worker2", + "model_name": "ivanlau/language-detection-fine-tuned-on-xlm-roberta-base", + "model_labels": ["Arabic", "Basque", "Breton", "Catalan", "Chinese_China", "Chinese_Hongkong", "Chinese_Taiwan", "Chuvash", "Czech", "Dhivehi", "Dutch", "English", "Esperanto", "Estonian", "French", "Frisian", "Georgian", "German", "Greek", "Hakha_Chin", "Indonesian", "Interlingua", "Italian", "Japanese", "Kabyle", "Kinyarwanda", "Kyrgyz", "Latvian", "Maltese", "Mongolian", "Persian", "Polish", "Portuguese", "Romanian", "Romansh_Sursilvan", "Russian", "Sakha", "Slovenian", "Spanish", "Swedish", "Tamil", "Tatar", "Turkish", "Ukrainian", "Welsh"] # Labels for language detection model + }, + { + "queue": 3, + "worker_name": "worker3", + "model_name": "svalabs/twitter-xlm-roberta-crypto-spam", + "model_labels": ["HAM", "SPAM"] # Labels for crypto spam model + }, + { + "queue": 4, + "worker_name": "worker4", + "model_name": "EIStakovskii/xlm_roberta_base_multilingual_toxicity_classifier_plus", + "model_labels": ["LABEL_0", "LABEL_1"] # Label_1 means TOXIC, Label_0 means NOT TOXIC. + }, + { + "queue": 5, + "worker_name": "worker5", + "model_name": "jy46604790/Fake-News-Bert-Detect", + "model_labels": ["LABEL_0", "LABEL_1"] # LABEL_0: Fake news, LABEL_1: Real news + } + ] +} diff --git a/solution/dev_parallel/main_handler.py b/solution/dev_parallel/main_handler.py new file mode 100644 index 0000000..e114ab3 --- /dev/null +++ b/solution/dev_parallel/main_handler.py @@ -0,0 +1,51 @@ +# main_handler.py +import uuid +import logging +from typing import Union, List, Dict +from fastapi import HTTPException +import asyncio + +async def process_data(data: Union[str, List[Dict[str, str]], Dict[str, str]], queues, futures_dict, logger): + if data is None or data == '': + # Если данные пустые, создаем словарь с ключом 'data' и значением "Empty request" + data = {'data': "Empty request"} + elif isinstance(data, list): + # Если входные данные - список словарей, оставляем его как есть + data = {'data': data} + elif isinstance(data, str): + # Если входные данные - просто строка, преобразуем её в словарь с ключом 'data' + data = {'data': data} + elif isinstance(data, dict) and 'data' in data: + # Если входные данные - словарь и содержат ключ 'data', используем его + pass + else: + # Если ни одно из условий не выполнилось, возвращаем сообщение об ошибке + raise HTTPException(status_code=400, detail="Invalid format. Expected JSON with a 'data' key or a simple text.") + + if not isinstance(data['data'], (str, list)): + # Если 'data' не является строкой или списком, возвращаем сообщение об ошибке + raise HTTPException(status_code=400, detail="Invalid 'data' format. Expected a string or a list.") + + correlation_id = str(uuid.uuid4()) + logging.info(f'Received data: {data}. Assigned correlation_id: {correlation_id}.') + + # Назначение задачи для воркера + message = { + "correlation_id": correlation_id, + "data": data, + } + + # Отправляем сообщение в каждую из очередей + for queue in queues: + await queue.put(message) + logging.info(f'Pushed to {id(queue)} : {message}') + + # Создание Future и его сохранение в словаре + future = asyncio.Future() + futures_dict[correlation_id] = future + logging.info(f'Future created for correlation_id: {correlation_id}.') + + # Ожидание результата и его запись в ответ + result = await future + logging.info(f'Received result for correlation_id: {correlation_id}. Result: {result}') + return result diff --git a/solution/dev_parallel/responder.py b/solution/dev_parallel/responder.py new file mode 100644 index 0000000..f60b273 --- /dev/null +++ b/solution/dev_parallel/responder.py @@ -0,0 +1,38 @@ +import json +import asyncio + +class Responder: + def __init__(self, futures_dict, result_queue, logger): + self.logger = logger + self.futures_dict = futures_dict + self.result_queue = result_queue + self.tasks = {} + + async def fetch_results(self): + while True: + result_string = await self.result_queue.get() + result = json.loads(result_string) + correlation_id = result.get('correlation_id') + worker_result = result.get('result') + + if correlation_id not in self.tasks: + self.tasks[correlation_id] = [0, {}] + + self.tasks[correlation_id][1].update(worker_result) + self.tasks[correlation_id][0] += 1 + + if self.tasks[correlation_id][0] >= 5: + asyncio.create_task(self.process_results(correlation_id)) + + async def process_results(self, correlation_id): + aggregated_results = self.tasks[correlation_id][1] + final_message = aggregated_results + future = self.futures_dict.pop(correlation_id, None) + if future is not None: + future.set_result(final_message) + del self.tasks[correlation_id] + + async def start(self): + asyncio.create_task(self.fetch_results()) + while True: + await asyncio.sleep(0.025) diff --git a/solution/dev_parallel/server.py b/solution/dev_parallel/server.py new file mode 100644 index 0000000..382c002 --- /dev/null +++ b/solution/dev_parallel/server.py @@ -0,0 +1,62 @@ +# server.py +import asyncio +import threading +import logging +from typing import Union, List, Dict +from fastapi import FastAPI +from worker import Worker +from responder import Responder +from main_handler import process_data +from config import config # Import the configuration + +app = FastAPI() + +# Configure logging +logger = logging.getLogger('server') +logger.setLevel(logging.DEBUG) + +futures_dict = {} +worker_queues = [asyncio.Queue() for _ in range(5)] # Create 5 worker queues +result_queue = asyncio.Queue() # Create a single result queue + +def start_worker(worker_instance): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + loop.run_until_complete(worker_instance.start()) + loop.close() + +def start_responder(responder_instance): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + loop.run_until_complete(responder_instance.start()) + loop.close() + +@app.on_event("startup") +async def startup_event(): + # Start the workers in separate threads + worker_threads = [] + worker_events = [] + for i, queue in enumerate(worker_queues): + logging.info(f'Starting worker with queue {id(queue)}.') + worker_config = config["workers"][i] # Get the configuration for this worker + worker_event = threading.Event() + worker_instance = Worker(worker_config, queue, result_queue, worker_event, logger) + worker_thread = threading.Thread(target=start_worker, args=(worker_instance,)) + worker_thread.start() + worker_threads.append(worker_thread) + worker_events.append(worker_event) + + # Wait for each worker to fully load before starting the next one + for worker_event in worker_events: + worker_event.wait() + + # Start the Responder in a separate thread + responder_instance = Responder(futures_dict, result_queue, logger) + responder_thread = threading.Thread(target=start_responder, args=(responder_instance,)) + responder_thread.start() + + logging.info('Server started, running result listener') + +@app.post("/process") +async def process_endpoint(data: Union[str, List[Dict[str, str]], Dict[str, str]]): + return await process_data(data, worker_queues, futures_dict, logger) diff --git a/solution/dev_parallel/worker.py b/solution/dev_parallel/worker.py new file mode 100644 index 0000000..352608b --- /dev/null +++ b/solution/dev_parallel/worker.py @@ -0,0 +1,110 @@ +import logging +import json +import asyncio +from transformers import AutoTokenizer, AutoModelForSequenceClassification +import torch +import os + +# Настройка логирования +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +class Worker: + def __init__(self, worker_config, message_queue, results_queue, event, logger): + self.logger = logger + # Взять имя воркера из конфигурации + self.worker_name = worker_config["worker_name"] + self.model_name = worker_config["model_name"] + self.model_labels = worker_config["model_labels"] + self.event = event + + self.message_queue = message_queue + self.results_queue = results_queue + + # Загрузка модели + logger.info(f"Loading model {self.model_name}...") + self.model = AutoModelForSequenceClassification.from_pretrained(self.model_name) + if torch.cuda.is_available(): + self.model = self.model.to('cuda') + logger.info("Using GPU for model computations.") + else: + logger.info("Using CPU for model computations.") + logger.info(f"Model {self.model_name} loaded.") + + # Загрузка токенизатора + logger.info(f"Loading tokenizer...") + self.tokenizer = AutoTokenizer.from_pretrained(self.model_name) + logger.info(f"Tokenizer for {self.model_name} loaded.") + + # Проверка модели на тестовых данных + self.test_model() + + # Signal that the worker has fully loaded + self.event.set() + logger.info(f"Worker started.{self.worker_name} ") + + def test_model(self): + test_data = "This is a test sentence." + inputs = self.tokenizer(test_data, return_tensors='pt') + if torch.cuda.is_available(): + inputs = inputs.to('cuda') + + while True: + try: + with torch.no_grad(): + outputs = self.model(**inputs) + + predictions = outputs.logits.argmax(dim=-1).item() + logger.info(f"Test data processed successfully. Predicted label: {predictions}") + break # Если тест прошел успешно, выходим из цикла + except Exception as e: + logger.error(f"Test failed with error: {e}. Retrying...") + continue + + BATCH_SIZE = 5 # Подходящий размер пакета + + async def start(self): + batch = [] + while True: + if not self.message_queue.empty() and len(batch) < self.BATCH_SIZE: + message = await self.message_queue.get() + batch.append(message) + elif batch: + await self.process_batch(batch) + batch = [] + else: + await asyncio.sleep(0.01) + + + + async def process_batch(self, batch): + bodies = [json.loads(message) if isinstance(message, str) else message for message in batch] + texts = [body['data']['data'] for body in bodies] + correlation_ids = [body['correlation_id'] for body in bodies] + + inputs = self.tokenizer(texts, return_tensors='pt', padding=True, truncation=True) + + if torch.cuda.is_available(): + inputs = inputs.to('cuda') # + + with torch.no_grad(): + outputs = self.model(**inputs) + + predictions = outputs.logits.argmax(dim=-1) + probabilities = torch.nn.functional.softmax(outputs.logits, dim=-1) + + for i, prediction in enumerate(predictions): + score = probabilities[i][prediction].item() + label = self.model_labels[prediction.item()] + result_key = self.model_name.split('/')[0] + result = {result_key: {"score": score, "label": label}} + + results_dict = { + "correlation_id": correlation_ids[i], + "worker": self.worker_name, + "result": result + } + results_dict = json.dumps(results_dict) + await self.results_queue.put(results_dict) + + diff --git a/solution/docker-compose.yml b/solution/docker-compose.yml new file mode 100644 index 0000000..055b686 --- /dev/null +++ b/solution/docker-compose.yml @@ -0,0 +1,42 @@ +version: '3.8' +services: + mlops-app: + build: + context: . + dockerfile: Dockerfile + ports: + - "8000:8000" + environment: + - REDIS_HOST=redis + - REDIS_PASSWORD= + deploy: + resources: + reservations: + devices: + - driver: nvidia + count: 1 + capabilities: [gpu] + k6: + build: + context: ./autotests/app + dockerfile: k6.Dockerfile + volumes: + - ./autotests/app/src:/app/src + environment: + - API_HOST=http://mlops-app:8000/process + command: /bin/bash -c "sleep 180 && k6 run -o xk6-prometheus-rw /app/main.js" + # command: run -o xk6-prometheus-rw /app/main.js + grafana: + image: grafana/grafana + environment: + - GF_LOG_LEVEL=error + ports: + - 3000:3000 + prometheus: + image: prom/prometheus + volumes: + - ./prometheus.yml:/etc/prometheus/prometheus.yml + command: + - '--config.file=/etc/prometheus/prometheus.yml' + ports: + - 9090:9090 \ No newline at end of file diff --git a/solution/gunicorn_conf.py b/solution/gunicorn_conf.py new file mode 100644 index 0000000..df308f5 --- /dev/null +++ b/solution/gunicorn_conf.py @@ -0,0 +1,22 @@ +import multiprocessing + +# Server Socket +bind = '0.0.0.0:8000' + +# Worker Processes +workers = 2 +# multiprocessing.cpu_count() * 2 + 1 +worker_class = 'gthread' + +# The maximum number of simultaneous clients. +worker_connections = 5000 + +# Server Mechanics +preload_app = True +timeout = 600 # Timeout in seconds +keepalive = 600 # Keep-alive value in seconds + +# Logging +accesslog = '-' # '-' means log to stdout +errorlog = '-' # '-' means log to stderr +loglevel = 'debug' diff --git a/solution/prometheus.yml b/solution/prometheus.yml new file mode 100644 index 0000000..d3a9178 --- /dev/null +++ b/solution/prometheus.yml @@ -0,0 +1,8 @@ +global: + scrape_interval: 15s + +scrape_configs: + - job_name: 'prometheus' + scrape_interval: 5s + static_configs: + - targets: ['0.0.0.0:9090'] diff --git a/solution/requirements.txt b/solution/requirements.txt new file mode 100644 index 0000000..f0fc15f --- /dev/null +++ b/solution/requirements.txt @@ -0,0 +1,15 @@ +fastapi==0.96.0 +uvicorn==0.22.0 +gunicorn==20.1.0 +yappi==1.4.0 +async-timeout==4.0.2 +idna==3.4 +multidict==6.0.4 +pamqp==3.2.1 +typing_extensions==4.6.2 +yarl==1.9.2 +redis==3.5.3 +transformers==4.29.2 +torch==2.0.1 +sentencepiece==0.1.99 +protobuf==3.20.1 diff --git a/solution/supervisord.conf b/solution/supervisord.conf new file mode 100644 index 0000000..40df976 --- /dev/null +++ b/solution/supervisord.conf @@ -0,0 +1,15 @@ +[supervisord] +nodaemon=true + +[program:server1] +command=bash -c "uvicorn server:app --host 0.0.0.0 --port 8001 --reload" +directory=/solution +autostart=true +autorestart=true + +[program:nginx] +command=bash -c "service nginx restart" +directory=/solution +autostart=true + +