Skip to content
This repository has been archived by the owner on Jun 25, 2023. It is now read-only.

Initial commit #28

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions autotests/helm/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ global:
activeDeadlineSeconds: 3600 # 1h

env:
PARTICIPANT_NAME: <REPLACE_WITH_USERNAME>
api_host: http://inca-smc-mlops-challenge-solution.default.svc.cluster.local/<REPLACE_WITH_ENDPOINT>
PARTICIPANT_NAME: Sergey Rudenko
api_host: http://inca-smc-mlops-challenge-solution.default.svc.cluster.local/

# K6, do not edit!
K6_PROMETHEUS_RW_SERVER_URL: http://kube-prometheus-stack-prometheus.monitoring.svc.cluster.local:9090/api/v1/write
Expand Down
19 changes: 19 additions & 0 deletions solution/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
FROM pytorch/pytorch:2.0.1-cuda11.7-cudnn8-runtime

LABEL authors="sergeyrudenko"

WORKDIR /code

COPY ./requirements.txt /code/requirements.txt

RUN pip install --upgrade pip

RUN pip install torch
RUN pip install xformers

RUN pip install --no-cache-dir --upgrade -r /code/requirements.txt

COPY ./main.py /code/
COPY start.sh .

ENTRYPOINT ["./start.sh"]
194 changes: 194 additions & 0 deletions solution/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
import asyncio
import logging.config
import os
import time
import onnx
import torch
import onnxruntime as ort
import uvicorn
from fastapi import FastAPI, Body, HTTPException
from optimum.onnxruntime import ORTModelForSequenceClassification
from starlette.responses import HTMLResponse
from transformers import AutoTokenizer
import onnxoptimizer
#from fastapi.concurrency import run_in_threadpool

# Global variables

g_logger = logging.getLogger(__name__)
g_current_path = os.getcwd() + "/models"

# dict {model_name: local path} todo save to some kind of conf file to be more like framework
G_MODELS = {"cardiffnlp": g_current_path + "/cardiffnlp/twitter-xlm-roberta-base-sentiment",
"ivanlau": g_current_path + "/ivanlau/language-detection-fine-tuned-on-xlm-roberta-base",
"svalabs": g_current_path + "/svalabs/twitter-xlm-roberta-crypto-spam",
"EIStakovskii": g_current_path + "/EIStakovskii/xlm_roberta_base_multilingual_toxicity_classifier_plus",
"jy46604790": g_current_path + "/jy46604790/Fake-News-Bert-Detect"}

# dict {model_name: (model, tokenizer)}
g_model_pipelines: dict = {}

# dict returning API data {model_name: {score:xx, label: xx} }
g_data: dict = {}

# if GPU available use it
g_device = "cuda:0" if torch.cuda.is_available() else "cpu"
if g_device == "cpu":
raise Exception("Не тестируем на cpu, нужен гпу!")

# spinoff the api and set log level to critical to speedup todo set log level in env vars
# app = FastAPI(title='Inference', debug=False)
app = FastAPI(title='Inference')


# Helper functions
def register_models(models_dict: dict[str, str]) -> None:
"""helper function to register locally stored models by populating g_model_pipelines dict"""
g_logger.warning("Start registering models.")
for model_name, model_path in models_dict.items():
try:
g_logger.warning("Registering: %s" % model_name)
g_model_pipelines[model_name] = (
ort.InferenceSession(model_path + "/model_opt.onnx", providers=["CUDAExecutionProvider","CPUExecutionProvider"]), AutoTokenizer.from_pretrained(model_path))
# model.eval()
except:
err = " Error while registering: %s" % model_name
raise RuntimeError(err)
g_logger.warning("Finished registering models.")


@app.get("/load", response_class=HTMLResponse)
def download_models() -> HTMLResponse:
"""helper download models to local folder if not downoaded before"""
# can be invoked manually forcefully by visiting page at localhost:9000/load
g_logger.warning("Start downloading models from huggingfaces.")
for mdl in G_MODELS.values():
model_name = "/".join(mdl.split("/")[-2:])
save_path = os.getcwd() + "/" + "/".join(mdl.split("/")[-3:])
g_logger.warning("Loading: %s" % model_name)

try:
model = ORTModelForSequenceClassification.from_pretrained(model_name, export=True)
tokenizer = AutoTokenizer.from_pretrained(model_name)
# model.eval()
model.save_pretrained(save_directory=save_path)
tokenizer.save_pretrained(save_directory=save_path)

# optimize model
original_model = onnx.load(save_path+"/model.onnx")
#available_passes = onnxoptimizer.get_available_passes()
#g_logger.warning(available_passes)
#optimized_model = onnxoptimizer.optimize(original_model, passes=available_passes) - specifying passes generates error
optimized_model = onnxoptimizer.optimize(original_model)
onnx.save(optimized_model, save_path+"/model_opt.onnx")
os.remove(save_path+"/model.onnx")
except:
err = " Error while loading: %s" % model_name
raise RuntimeError(err)

g_logger.warning("Finished downloading models from huggingfaces.")
return """
<html>
<head>
<title>Models downloaded</title>
</head>
<body>
<h1>Models downloaded</h1>
</body>
</html>
"""


# Core logic functions
@app.on_event("startup")
async def startup_trigger():
pid = os.getpid()
os.environ['TRANSFORMERS_CACHE'] = os.getcwd() + '/models'
g_logger.setLevel(logging.WARNING)
g_logger.warning("Starting app, worker: " + str(pid))
g_logger.warning("Inference device: %s" % g_device)
if pid % 2 == 0: # synchonizing workers for 2 workes, only one worker can download models from hugginfaces
if os.path.exists(os.getcwd() + "/signal.txt"):
os.remove(os.getcwd() + "/signal.txt")
if not os.path.exists("models/"): # naive way to check whether models need to be downloaded and sync workers todo
download_models()
register_models(G_MODELS)
with open(os.getcwd() + "/signal.txt", "w") as f: # signal to other workers that models been downloaded and they can start registering models
f.write("done")
else:
while not os.path.exists(os.getcwd() + "/signal.txt"):
time.sleep(0.1)
# register locally downloaded models
register_models(G_MODELS)

# hopefully a hair faster
# torch.set_float32_matmul_precision('medium')

g_logger.warning("API ready for use, worker: " + str(pid))

async def inference(sentence: str, model_name: str, model: tuple) -> None:
"""perform inference"""

# encoded_sentence = model[1](sentence, return_tensors="pt") # .to(g_device)
# encoded_sentence_onnx = {k: v.cpu().numpy() for k, v in model[1](sentence, return_tensors="pt").items()} # Convert to CPU numpy arr as ONNX expects numpy

# outs_onnx = model[0].run(None, encoded_sentence_onnx)

# Get logits
# logits = outs_onnx[0]

# Probs
probs = torch.nn.functional.softmax(torch.from_numpy(model[0].run(None, {k: v.cpu().numpy() for k, v in model[1](sentence, return_tensors="pt").items()})[0]), dim=-1)

# Get the index of max prob and prob itself
pred_index = torch.argmax(probs, dim=-1).item()
confid_score = probs[0, pred_index].item()

# id2label for onnx not exist so only manually
# todo save in some kind conf file or db to be more like framework
match model_name:
case "cardiffnlp":
id2label = {0: "NEGATIVE", 1: "NEUTRAL", 2: "POSITIVE"}
case "ivanlau":
id2label = {0: "Arabic", 1: "Basque", 2: "Breton", 3: "Catalan", 4: "Chinese_China", 5: "Chinese_Hongkong",
6: "Chinese_Taiwan", 7: "Chuvash", 8: "Czech", 9: "Dhivehi", 10: "Dutch", 11: "English",
12: "Esperanto", 13: "Estonian", 14: "French", 15: "Frisian", 16: "Georgian", 17: "German",
18: "Greek", 19: "Hakha_Chin", 20: "Indonesian", 21: "Interlingua", 22: "Italian",
23: "Japanese", 24: "Kabyle", 25: "Kinyarwanda", 26: "Kyrgyz", 27: "Latvian", 28: "Maltese",
29: "Mongolian", 30: "Persian", 31: "Polish", 32: "Portuguese", 33: "Romanian",
34: "Romansh_Sursilvan", 35: "Russian", 36: "Sakha", 37: "Slovenian", 38: "Spanish",
39: "Swedish", 40: "Tamil", 41: "Tatar", 42: "Turkish", 43: "Ukranian", 44: "Welsh"}
case "svalabs":
id2label = {0: "HAM", 1: "SPAM"}
case "EIStakovskii":
id2label = {0: "LABEL_0", 1: "LABEL_1"}
case "jy46604790":
id2label = {0: "LABEL_0", 1: "LABEL_1"}
case _:
err = "Error wrong model name: %s" % model_name
raise RuntimeError(err)

# Get predicted label
pred_label = id2label[pred_index]

g_data[model_name] = {"score": confid_score, "label": pred_label}


@app.post("/")
async def all_infers(sentence: str = Body(...)) -> dict:
"""inference endpoint"""
"""if not sentence: # check whether input request text is empty
g_logger.warning('Request text is empty')
raise HTTPException(status_code=400, detail="Request text is empty")"""

# create inference tasks per model
tasks = [inference(sentence, model_name, model) for model_name, model in
g_model_pipelines.items()]

# run inference tasks and populate resulting dict
await asyncio.gather(*tasks)
return g_data


if __name__ == '__main__':
uvicorn.run("main:app", port=9000, reload=True, log_level="critical")
9 changes: 9 additions & 0 deletions solution/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
optimum[onnxruntime-gpu]~=1.8.6
onnx
transformers~=4.29.2
uvicorn~=0.22.0
fastapi~=0.95.2
starlette~=0.27.0
numpy~=1.24.3
onnxoptimizer~=0.3.13
gunicorn==20.1.0
2 changes: 2 additions & 0 deletions solution/start.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
#!/bin/bash
gunicorn main:app --workers=2 --bind=:9000 --worker-class=uvicorn.workers.UvicornWorker --timeout 900 --log-level critical