diff --git a/autotests/helm/values.yaml b/autotests/helm/values.yaml index cda6a5e..0b053f6 100644 --- a/autotests/helm/values.yaml +++ b/autotests/helm/values.yaml @@ -25,8 +25,8 @@ global: activeDeadlineSeconds: 3600 # 1h env: - PARTICIPANT_NAME: - api_host: http://inca-smc-mlops-challenge-solution.default.svc.cluster.local/ + PARTICIPANT_NAME: electriclizard + api_host: http://inca-smc-mlops-challenge-solution.default.svc.cluster.local/process # K6, do not edit! K6_PROMETHEUS_RW_SERVER_URL: http://kube-prometheus-stack-prometheus.monitoring.svc.cluster.local:9090/api/v1/write diff --git a/solution/Dockerfile b/solution/Dockerfile new file mode 100644 index 0000000..f8b2b3a --- /dev/null +++ b/solution/Dockerfile @@ -0,0 +1,14 @@ +FROM ghcr.io/els-rd/transformer-deploy:0.4.0 + +ARG DEBIAN_FRONTEND=noninteractive + +WORKDIR /src +ENV PYTHONPATH="${PYTHONPATH}:${WORKDIR}" + +COPY requirements.txt $WORKDIR + +RUN pip install -U --no-cache-dir -r requirements.txt + +COPY . $WORKDIR + +ENTRYPOINT [ "uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8080", "--workers", "1" ] diff --git a/solution/app.py b/solution/app.py new file mode 100644 index 0000000..90e4645 --- /dev/null +++ b/solution/app.py @@ -0,0 +1,113 @@ +from typing import List +import asyncio + +import uvicorn +from fastapi import FastAPI, APIRouter +from fastapi.openapi.docs import get_swagger_ui_html +from fastapi.openapi.utils import get_openapi +from fastapi.responses import HTMLResponse +from starlette.requests import Request + +from configs.config import AppConfig, ModelConfig +from infrastructure.models import TrtTransformerTextClassificationModel +from service.recognition import TextClassificationService +from handlers.recognition import PredictionHandler +from handlers.data_models import ResponseSchema + + +config = AppConfig.parse_file("./configs/app_config.yaml") +models = [ + TrtTransformerTextClassificationModel(conf.model, conf.model_path, conf.tokenizer) + for conf in config.models + ] + +recognition_service = TextClassificationService(models) +recognition_handler = PredictionHandler(recognition_service, config.timeout) + +app = FastAPI() +router = APIRouter() + + +@app.on_event("startup") +def count_max_batch_size(): + print("Calculating Max batch size") + app.batch_size = 100 + + try: + while True: + text = ["this is simple text"]*app.batch_size + inputs = [model.tokenize_texts(text) for model in models] + outputs = [model(m_inputs) for model, m_inputs in zip(models, inputs)] + app.batch_size += 100 + + except RuntimeError as err: + if "CUDA out of memory" in str(err): + app.batch_size -= 100 + print(f"Max batch size calculated = {app.max_batch_size}") + + +@app.on_event("startup") +def create_queues(): + app.models_queues = {} + for md in models: + task_queue = asyncio.Queue() + app.models_queues[md.name] = task_queue + asyncio.create_task(recognition_handler.handle(md.name, task_queue, app.batch_size)) + + +@router.post("/process", response_model=ResponseSchema) +async def process(request: Request): + text = (await request.body()).decode() + + results = [] + response_q = asyncio.Queue() # init a response queue for every request, one for all models + for model_name, model_queue in app.models_queues.items(): + await model_queue.put((text, response_q)) + model_res = await response_q.get() + results.append(model_res) + return recognition_handler.serialize_answer(results) + + +app.include_router(router) + + +@app.get("/healthcheck") +async def main(): + return {"message": "I am alive"} + + +def custom_openapi(): + if app.openapi_schema: + return app.openapi_schema + openapi_schema = get_openapi( + title="NLP Model Service", + version="0.1.0", + description="Inca test task", + routes=app.routes, + ) + app.openapi_schema = openapi_schema + return app.openapi_schema + + +@app.get( + "/documentation/swagger-ui/", + response_class=HTMLResponse, +) +async def swagger_ui_html(): + return get_swagger_ui_html( + openapi_url="/documentation/openapi.json", + title="API documentation" + ) + + +@app.get( + "/documentation/openapi.json", + response_model_exclude_unset=True, + response_model_exclude_none=True, +) +async def openapi_endpoint(): + return custom_openapi() + + +if __name__ == "__main__": + uvicorn.run("app:app", host="0.0.0.0", port=config.port, workers=config.workers) diff --git a/solution/configs/app_config.yaml b/solution/configs/app_config.yaml new file mode 100644 index 0000000..aea7f52 --- /dev/null +++ b/solution/configs/app_config.yaml @@ -0,0 +1,22 @@ +models: + - model: "cardiffnlp" + model_path: "cardiffnlp/twitter-xlm-roberta-base-sentiment" + tokenizer: "cardiffnlp/twitter-xlm-roberta-base-sentiment" + - model: "ivanlau" + model_path: "ivanlau/language-detection-fine-tuned-on-xlm-roberta-base" + tokenizer: "ivanlau/language-detection-fine-tuned-on-xlm-roberta-base" + - model: "svalabs" + model_path: "svalabs/twitter-xlm-roberta-crypto-spam" + tokenizer: "svalabs/twitter-xlm-roberta-crypto-spam" + - model: "EIStakovskii" + model_path: "EIStakovskii/xlm_roberta_base_multilingual_toxicity_classifier_plus" + tokenizer: "EIStakovskii/xlm_roberta_base_multilingual_toxicity_classifier_plus" + - model: "jy46604790" + model_path: "jy46604790/Fake-News-Bert-Detect" + tokenizer: "jy46604790/Fake-News-Bert-Detect" + +port: 8080 +workers: 1 + +timeout: 0.01 + diff --git a/solution/configs/config.py b/solution/configs/config.py new file mode 100644 index 0000000..b4ebadc --- /dev/null +++ b/solution/configs/config.py @@ -0,0 +1,20 @@ +from typing import List + +from pydantic_yaml import YamlModel + + +class ModelConfig(YamlModel): + model: str + model_path: str + tokenizer: str + + +class AppConfig(YamlModel): + # model parameters + models: List[ModelConfig] + # app parameters + port: int + workers: int + # async queues parameters + timeout: float + diff --git a/solution/handlers/data_models.py b/solution/handlers/data_models.py new file mode 100644 index 0000000..a132a27 --- /dev/null +++ b/solution/handlers/data_models.py @@ -0,0 +1,17 @@ +from typing import List + +from pydantic import BaseModel, validator + + +class RecognitionSchema(BaseModel): + score: float + label: str + + +class ResponseSchema(BaseModel): + cardiffnlp: RecognitionSchema + ivanlau: RecognitionSchema + svalabs: RecognitionSchema + EIStakovskii: RecognitionSchema + jy46604790: RecognitionSchema + diff --git a/solution/handlers/recognition.py b/solution/handlers/recognition.py new file mode 100644 index 0000000..a92e303 --- /dev/null +++ b/solution/handlers/recognition.py @@ -0,0 +1,55 @@ +from typing import List +import asyncio + +from pydantic import ValidationError + +from infrastructure.models import TextClassificationModelData +from service.recognition import TextClassificationService +from handlers.data_models import ResponseSchema, RecognitionSchema + + +class PredictionHandler: + + def __init__(self, recognition_service: TextClassificationService, timeout: float): + self.recognition_service = recognition_service + self.timeout = timeout + + async def handle(self, model_name, model_queue, max_batch_size: int): + while True: + inputs = None + texts = [] + queues = [] + + try: + while True: + (text, response_queue) = await asyncio.wait_for(model_queue.get(), timeout=self.timeout) + texts.append(text) + queues.append(response_queue) + except asyncio.exceptions.TimeoutError: + pass + + if texts: + model = next( + (model for model in self.recognition_service.service_models if model.name == model_name), + None + ) + if model: + for text_batch in self._perform_batches(texts, max_batch_size): + inputs = model.tokenize_texts(texts) + outs = model(inputs) + for rq, out in zip(queues, outs): + await rq.put(out) + + def serialize_answer(self, results: List[TextClassificationModelData]) -> ResponseSchema: + res_model = {rec.model_name: self._recognitions_to_schema(rec) for rec in results} + return ResponseSchema(**res_model) + + def _recognitions_to_schema(self, recognition: TextClassificationModelData) -> RecognitionSchema: + if recognition.model_name != "ivanlau": + recognition.label = recognition.label.upper() + return RecognitionSchema(score=recognition.score, label=recognition.label) + + def _perform_batches(self, texts: List[str], max_batch_size): + for i in range(0, len(texts), max_batch_size): + yield texts[i:i + max_batch_size] + diff --git a/solution/helm/envs/electriclizard.yaml b/solution/helm/envs/electriclizard.yaml new file mode 100644 index 0000000..6d3c75c --- /dev/null +++ b/solution/helm/envs/electriclizard.yaml @@ -0,0 +1,13 @@ +global: + # add any variables you need in format `key: value` + # variables will be available in the container as environment variables + + # change 8000 to your application target port + pod: + ports: + - name: http + containerPort: 8080 + protocol: TCP + service: + targetPort: 8080 + diff --git a/solution/infrastructure/models.py b/solution/infrastructure/models.py new file mode 100644 index 0000000..d5241a8 --- /dev/null +++ b/solution/infrastructure/models.py @@ -0,0 +1,107 @@ +from abc import ABC, abstractmethod +from collections.abc import Callable +from dataclasses import dataclass +from typing import List + +import torch +import onnxruntime +from transformers import AutoTokenizer, AutoModelForSequenceClassification, pipeline +from optimum.onnxruntime import ORTModelForSequenceClassification + + +session_options = onnxruntime.SessionOptions() +session_options.log_severity_level = 1 + + +@dataclass +class TextClassificationModelData: + model_name: str + label: str + score: float + + +class BaseTextClassificationModel(ABC): + + def __init__(self, name: str, model_path: str, tokenizer: str): + self.name = name + self.model_path = model_path + self.tokenizer = tokenizer + self.device = 0 if torch.cuda.is_available() else -1 + self._load_model() + + @abstractmethod + def _load_model(self): + ... + + @abstractmethod + def __call__(self, inputs) -> List[TextClassificationModelData]: + ... + + +class TransformerTextClassificationModel(BaseTextClassificationModel): + + def _load_model(self): + self.tokenizer = AutoTokenizer.from_pretrained(self.tokenizer) + self.model = AutoModelForSequenceClassification.from_pretrained(self.model_path) + self.model = self.model.to(self.device) + + def tokenize_texts(self, texts: List[str]): + inputs = self.tokenizer.batch_encode_plus( + texts, + add_special_tokens=True, + padding='longest', + truncation=True, + return_token_type_ids=True, + return_tensors='pt' + ) + inputs = {k: v.to(self.device) for k, v in inputs.items()} # Move inputs to GPU + return inputs + + def _results_from_logits(self, logits: torch.Tensor): + id2label = self.model.config.id2label + + label_ids = logits.argmax(dim=1) + scores = logits.softmax(dim=-1) + results = [ + { + "label": id2label[label_id.item()], + "score": score[label_id.item()].item() + } + for label_id, score in zip(label_ids, scores) + ] + return results + + def __call__(self, inputs) -> List[TextClassificationModelData]: + logits = self.model(**inputs).logits + predictions = self._results_from_logits(logits) + predictions = [TextClassificationModelData(self.name, **prediction) for prediction in predictions] + return predictions + + +class TrtTransformerTextClassificationModel(TransformerTextClassificationModel): + + def _load_model(self): + provider_options = { + "trt_engine_cache_enable": True, + "trt_engine_cache_path": f"tmp/{self.name}" + } + + self.tokenizer = AutoTokenizer.from_pretrained(self.tokenizer) + self.model = ORTModelForSequenceClassification.from_pretrained( + self.model_path, + export=True, + provider="TensorrtExecutionProvider", + provider_options=provider_options, + session_options=session_options, + ) + + def tokenize_texts(self, texts: List[str]): + inputs = self.tokenizer( + texts, + add_special_tokens=True, + padding=True, + truncation=True, + return_tensors="pt" + ).to("cuda") + return inputs + diff --git a/solution/infrastructure/optimizers.py b/solution/infrastructure/optimizers.py new file mode 100644 index 0000000..5794ab5 --- /dev/null +++ b/solution/infrastructure/optimizers.py @@ -0,0 +1,43 @@ +from pathlib import Path + +from optimum.onnxruntime import ORTOptimizer, ORTModelForSequenceClassification +from optimum.onnxruntime.configuration import OptimizationConfig + + +class OnnxModelOptimizer: + + def __init__( + self, + model, + optimization_level: int = 99 + ): + self.model = model + self.onnx_path: Path = Path("onnx") + self.onnx_path.mkdir(parents=True, exist_ok=True) + self.optimizer = ORTOptimizer.from_pretrained(model) + self.optimization_config = OptimizationConfig(optimization_level=optimization_level) + + def graph_optimization(self, model_name: str, model: ORTModelForSequenceClassification): + # save weights + optimized_model_path = self.onnx_path/model_name + if not optimized_model_path.exists(): + self.model.save_pretrained(optimized_model_path) + + self.optimizer.optimize( + save_dir=optimized_model_path, + optimization_config=self.optimization_config, + ) + + model = ORTModelForSequenceClassification.from_pretrained( + optimized_model_path, + file_name="model_optimized.onnx", + provider="CUDAExecutionProvider" + ) + return model + + def quantinization(self): + """ + dynamic quantization is currently only supported for CPUs + """ + ... + diff --git a/solution/requirements.txt b/solution/requirements.txt new file mode 100644 index 0000000..30cc77d --- /dev/null +++ b/solution/requirements.txt @@ -0,0 +1,9 @@ +onnxruntime-gpu==1.15.0 +optimum[onnxruntime-gpu]==1.8.6 +torch==1.13.0 +numpy==1.23.5 +fastapi[all]==0.95.1 +uvicorn==0.22.0 +pydantic==1.10.7 +pydantic-yaml==0.11.2 + diff --git a/solution/service/recognition.py b/solution/service/recognition.py new file mode 100644 index 0000000..d72cbb1 --- /dev/null +++ b/solution/service/recognition.py @@ -0,0 +1,16 @@ +from abc import ABC, abstractmethod +from typing import List +from dataclasses import dataclass + +from infrastructure.models import BaseTextClassificationModel, TextClassificationModelData + + +class TextClassificationService: + + def __init__(self, models: List[BaseTextClassificationModel]): + self.service_models = models + + def get_results(self, input_texts: List[str]) -> List[List[TextClassificationModelData]]: + results = [model(input_texts) for model in self.service_models] + return results +