From 2eec8720954265637ac0f5ca9e38673458119139 Mon Sep 17 00:00:00 2001 From: vzip Date: Tue, 30 May 2023 02:27:45 -0600 Subject: [PATCH 01/14] v1.0b --- solution/helm/envs/example.yaml | 4 +- solution/mlops/Dockerfile | 27 +++++++ solution/mlops/README.md | 17 +++++ solution/mlops/dev_parallel/config.py | 42 +++++++++++ solution/mlops/dev_parallel/responder.py | 62 ++++++++++++++++ solution/mlops/dev_parallel/server.py | 54 ++++++++++++++ solution/mlops/dev_parallel/tasker.py | 35 +++++++++ solution/mlops/dev_parallel/worker1.py | 94 ++++++++++++++++++++++++ solution/mlops/dev_parallel/worker2.py | 94 ++++++++++++++++++++++++ solution/mlops/dev_parallel/worker3.py | 94 ++++++++++++++++++++++++ solution/mlops/dev_parallel/worker4.py | 94 ++++++++++++++++++++++++ solution/mlops/dev_parallel/worker5.py | 94 ++++++++++++++++++++++++ solution/mlops/docker-compose.yml | 11 +++ solution/mlops/requirements.txt | 15 ++++ solution/mlops/supervisord.conf | 44 +++++++++++ 15 files changed, 779 insertions(+), 2 deletions(-) create mode 100644 solution/mlops/Dockerfile create mode 100644 solution/mlops/README.md create mode 100644 solution/mlops/dev_parallel/config.py create mode 100644 solution/mlops/dev_parallel/responder.py create mode 100644 solution/mlops/dev_parallel/server.py create mode 100644 solution/mlops/dev_parallel/tasker.py create mode 100644 solution/mlops/dev_parallel/worker1.py create mode 100644 solution/mlops/dev_parallel/worker2.py create mode 100644 solution/mlops/dev_parallel/worker3.py create mode 100644 solution/mlops/dev_parallel/worker4.py create mode 100644 solution/mlops/dev_parallel/worker5.py create mode 100644 solution/mlops/docker-compose.yml create mode 100644 solution/mlops/requirements.txt create mode 100644 solution/mlops/supervisord.conf diff --git a/solution/helm/envs/example.yaml b/solution/helm/envs/example.yaml index 4ef5b0c..e9e2c46 100644 --- a/solution/helm/envs/example.yaml +++ b/solution/helm/envs/example.yaml @@ -8,10 +8,10 @@ global: pod: ports: - name: http - containerPort: 8000 + containerPort: 5000 protocol: TCP service: - targetPort: 8000 + targetPort: 5000 # add any configmap data you need # configmaps will be mounted to /workspace/ diff --git a/solution/mlops/Dockerfile b/solution/mlops/Dockerfile new file mode 100644 index 0000000..671e133 --- /dev/null +++ b/solution/mlops/Dockerfile @@ -0,0 +1,27 @@ +FROM python:3.9.1-slim + +# Обновление pip +RUN pip install --upgrade pip + +RUN apt-get update && apt-get install -y \ + wget \ + make \ + gcc \ + procps \ + lsof \ + vim \ + supervisor + +WORKDIR /app + +COPY requirements.txt . + +RUN pip install -r requirements.txt + +COPY /dev_parallel/ . + +# Копируйте файл конфигурации supervisord в контейнер +COPY supervisord.conf /etc/supervisor/conf.d/supervisord.conf + +# Используйте CMD для запуска supervisord +CMD ["/usr/bin/supervisord"] diff --git a/solution/mlops/README.md b/solution/mlops/README.md new file mode 100644 index 0000000..f7374bc --- /dev/null +++ b/solution/mlops/README.md @@ -0,0 +1,17 @@ +# app tornado-redis-asyncio v0.1b + +App as a service packed in docker env. Using docker-compose for run 2 vm's one with app and one with redis. +All sttings can be configured in config.py + +... to be continued + +p.s. now respond structure is not the same as example and will be changed in early future for pass tests. + +## How to run ? + +You can run this app as service using docker-compose. + +bash$ + +1. docker build -t name +2. docker-compose up diff --git a/solution/mlops/dev_parallel/config.py b/solution/mlops/dev_parallel/config.py new file mode 100644 index 0000000..79657a1 --- /dev/null +++ b/solution/mlops/dev_parallel/config.py @@ -0,0 +1,42 @@ +config = { + "redis": { + "host": "redis", + "port": 6379, + }, + "results_db": { + "db": "6" + }, + + "workers": [ + { + "db": 1, + "worker_name": "worker1", + "model_name": "cardiffnlp/twitter-xlm-roberta-base-sentiment", + "model_labels": ["NEGATIVE", "NEUTRAL", "POSITIVE"] # Labels for sentiment model + }, + { + "db": 2, + "worker_name": "worker2", + "model_name": "ivanlau/language-detection-fine-tuned-on-xlm-roberta-base", + "model_labels": ["Arabic", "Basque", "Breton", "Catalan", "Chinese_China", "Chinese_Hongkong", "Chinese_Taiwan", "Chuvash", "Czech", "Dhivehi", "Dutch", "English", "Esperanto", "Estonian", "French", "Frisian", "Georgian", "German", "Greek", "Hakha_Chin", "Indonesian", "Interlingua", "Italian", "Japanese", "Kabyle", "Kinyarwanda", "Kyrgyz", "Latvian", "Maltese", "Mongolian", "Persian", "Polish", "Portuguese", "Romanian", "Romansh_Sursilvan", "Russian", "Sakha", "Slovenian", "Spanish", "Swedish", "Tamil", "Tatar", "Turkish", "Ukrainian", "Welsh"] # Labels for language detection model + }, + { + "db": 3, + "worker_name": "worker3", + "model_name": "svalabs/twitter-xlm-roberta-crypto-spam", + "model_labels": ["HAM", "SPAM"] # Labels for crypto spam model + }, + { + "db": 4, + "worker_name": "worker4", + "model_name": "EIStakovskii/xlm_roberta_base_multilingual_toxicity_classifier_plus", + "model_labels": ["LABEL_0", "LABEL_1"] # Label_1 means TOXIC, Label_0 means NOT TOXIC. + }, + { + "db": 5, + "worker_name": "worker5", + "model_name": "jy46604790/Fake-News-Bert-Detect", + "model_labels": ["LABEL_0", "LABEL_1"] # LABEL_0: Fake news, LABEL_1: Real news + } + ] +} diff --git a/solution/mlops/dev_parallel/responder.py b/solution/mlops/dev_parallel/responder.py new file mode 100644 index 0000000..40b196c --- /dev/null +++ b/solution/mlops/dev_parallel/responder.py @@ -0,0 +1,62 @@ +import tornado.ioloop +import tornado.gen +import json +import redis +import logging + +# Настройка логирования +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +# Подключение к Redis +r = redis.Redis(host='redis', port=6379, db=6) + +async def result_listener(futures_dict): + while True: + pipeline = r.pipeline() + logger.info('Connected to Redis Pipeline.') + + valid_sets = [] # Здесь мы будем хранить все наборы данных, которые удовлетворяют нашим условиям + + for key in r.scan_iter(): + # Здесь добавляем команду в пайплайн, но еще не выполняем ее + pipeline.lrange(key, 0, -1) + + # Теперь выполняем все команды, добавленные в пайплайн + values_list = pipeline.execute() + + for key, values in zip(r.scan_iter(), values_list): + # Проверяем, что количество элементов в списке больше или равно 5 + if len(values) < 5: + continue + + workers = set(json.loads(value.decode())['worker'] for value in values) + + # Проверяем, что результаты от всех воркеров присутствуют + if len(workers) < 5: + continue + + valid_sets.append((key, values)) + + if valid_sets: + for key, values in valid_sets: + key = key.decode() + final_message = {"correlation_id": key, "results": []} + + for value in values: + message = json.loads(value.decode()) + final_message["results"].append({"worker": message["worker"], "result": message["result"]}) + + # Добавляем команду в пайплайн и сразу выполняем + pipeline.ltrim(key, len(values), -1).execute() + + future = futures_dict.pop(final_message["correlation_id"], None) + if future is not None: + future.set_result(final_message) + logger.info(f'Successfully returned result for key: {key}') + else: + logger.warning(f"No Future found for key: {key}. Current futures_dict: {futures_dict}") + else: + logger.info('No sets with 5 values found, skipping connection attempt.') + + await tornado.gen.sleep(0.05) diff --git a/solution/mlops/dev_parallel/server.py b/solution/mlops/dev_parallel/server.py new file mode 100644 index 0000000..84fa0e7 --- /dev/null +++ b/solution/mlops/dev_parallel/server.py @@ -0,0 +1,54 @@ +import tornado.ioloop +import tornado.web +import tornado.gen +import json +import redis +import uuid +import logging + +from responder import result_listener + +# Configure logging +logging.basicConfig(level=logging.DEBUG) + +r = redis.Redis(host='redis', port=6379, db=0) + +futures_dict = {} + +class MainHandler(tornado.web.RequestHandler): + async def post(self): + data = json.loads(self.request.body) + correlation_id = str(uuid.uuid4()) + + logging.info(f'Received data: {data}. Assigned correlation_id: {correlation_id}.') + + # Назначение задачи для воркера + message = { + "correlation_id": correlation_id, + "data": data, + } + r.rpush('dispatcher', json.dumps(message)) + logging.info(f'Pushed to Redis: {message}') + + # Создание Future и его сохранение в словаре + future = tornado.gen.Future() + futures_dict[correlation_id] = future + logging.info(f'Future created for correlation_id: {correlation_id}.') + + # Ожидание результата и его запись в ответ + result = await future + logging.info(f'Received result for correlation_id: {correlation_id}. Result: {result}') + self.write(json.dumps(result)) + +def make_app(): + return tornado.web.Application([ + (r"/process", MainHandler), + ]) + +if __name__ == "__main__": + app = make_app() + app.listen(5000, address='0.0.0.0') + logging.info('Server started.') + tornado.ioloop.IOLoop.current().spawn_callback(result_listener, futures_dict) + logging.info('Server started, running result listener') + tornado.ioloop.IOLoop.current().start() diff --git a/solution/mlops/dev_parallel/tasker.py b/solution/mlops/dev_parallel/tasker.py new file mode 100644 index 0000000..bc4c056 --- /dev/null +++ b/solution/mlops/dev_parallel/tasker.py @@ -0,0 +1,35 @@ +import asyncio +import aioredis +import json +import logging + +logging.basicConfig(level=logging.INFO) + +class QueueManager: + def __init__(self): + self.redis_host = 'redis' + self.redis_port = 6379 + self.redis_password = '' # set password if any + + async def start(self): + logging.info('Starting Queue Manager...') + self.redis_source = aioredis.from_url(f"redis://{self.redis_host}:{self.redis_port}/0", password=self.redis_password) + self.redis_targets = [aioredis.from_url(f"redis://{self.redis_host}:{self.redis_port}/{i}", password=self.redis_password) for i in range(1, 6)] + logging.info('Connected to Redis.') + await self.transfer_messages_from_redis_to_redis() + + async def publish_message_to_redis(self, message_body, redis_db): + await redis_db.rpush('dispatcher', message_body) + + async def transfer_messages_from_redis_to_redis(self, redis_queue_name='dispatcher'): + while True: + logging.info('Waiting for message in Redis queue...') + _, message_body = await self.redis_source.blpop(redis_queue_name) + logging.info(f'Received message from Redis queue: {message_body}') + publish_tasks = [self.publish_message_to_redis(message_body, redis_target) for redis_target in self.redis_targets] + await asyncio.gather(*publish_tasks) + logging.info('Published message to all Redis targets.') + +loop = asyncio.get_event_loop() +manager = QueueManager() +loop.run_until_complete(manager.start()) diff --git a/solution/mlops/dev_parallel/worker1.py b/solution/mlops/dev_parallel/worker1.py new file mode 100644 index 0000000..fd210a0 --- /dev/null +++ b/solution/mlops/dev_parallel/worker1.py @@ -0,0 +1,94 @@ +import logging +import json +import redis +from transformers import AutoTokenizer, AutoModelForSequenceClassification +import torch +import os + +# Import config from config.py +from config import config + +# Настройка логирования +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +class Worker: + def __init__(self, worker_config, connection_config): + self.redis_config = connection_config["redis"] + self.redis_result_config = connection_config["results_db"] + + # Взять имя воркера из имени файла + self.queue_name = os.path.splitext(os.path.basename(__file__))[0] + self.model_name = worker_config["model_name"] + self.model_labels = worker_config["model_labels"] + + self.redis_incoming = redis.Redis(host=self.redis_config['host'], + port=self.redis_config['port'], + db=worker_config['db']) + + self.redis_outgoing = redis.Redis(host=self.redis_config['host'], + port=self.redis_config['port'], + db=self.redis_result_config['db']) + + # Загрузка модели + logger.info(f"Loading model {self.model_name}...") + self.model = AutoModelForSequenceClassification.from_pretrained(self.model_name) + if torch.cuda.is_available(): + self.model = self.model.to('cuda') + logger.info(f"Model {self.model_name} loaded.") + + # Загрузка токенизатора + logger.info(f"Loading tokenizer...") + self.tokenizer = AutoTokenizer.from_pretrained(self.model_name) + logger.info(f"Tokenizer for {self.model_name} loaded.") + + def start(self): + logger.info('Worker started.') + while True: + _, message = self.redis_incoming.blpop('dispatcher') + message_body = json.loads(message) + self.process_message(message_body) + + def process_message(self, body): + correlation_id = body['correlation_id'] + text = body['data']['data'] + logger.info(f"Processing text: {text}") + inputs = self.tokenizer(text, return_tensors='pt') + + if torch.cuda.is_available(): + inputs = inputs.to('cuda') + + outputs = self.model(**inputs) + + predictions = outputs.logits.argmax(dim=-1).item() + probabilities = torch.nn.functional.softmax(outputs.logits, dim=-1) + score = probabilities[0][predictions].item() + + label = self.model_labels[predictions] + + result_key = self.model_name.split('/')[-1] + result = {result_key: {"score": score, "label": label}} + logger.info(f"Received task results {result}") + + results_dict = { + "correlation_id": correlation_id, + "worker": self.queue_name, + "result": result[self.model_name.split('/')[-1]] + } + results_dict = json.dumps(results_dict) + self.redis_outgoing.rpush(correlation_id, results_dict) + logger.info(f"Saved result to Redis with key {correlation_id} : {results_dict}") + +# Загрузка конфигурации подключений из файла config.py +connection_config = config + +worker_name = os.path.splitext(os.path.basename(__file__))[0] +worker_config = next((worker for worker in connection_config["workers"] if worker["worker_name"] == worker_name), None) + +if not worker_config: + raise ValueError(f"No configuration found for worker {worker_name}") + +# Создание и запуск воркера +if __name__ == "__main__": + worker = Worker(worker_config, connection_config) + worker.start() diff --git a/solution/mlops/dev_parallel/worker2.py b/solution/mlops/dev_parallel/worker2.py new file mode 100644 index 0000000..fd210a0 --- /dev/null +++ b/solution/mlops/dev_parallel/worker2.py @@ -0,0 +1,94 @@ +import logging +import json +import redis +from transformers import AutoTokenizer, AutoModelForSequenceClassification +import torch +import os + +# Import config from config.py +from config import config + +# Настройка логирования +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +class Worker: + def __init__(self, worker_config, connection_config): + self.redis_config = connection_config["redis"] + self.redis_result_config = connection_config["results_db"] + + # Взять имя воркера из имени файла + self.queue_name = os.path.splitext(os.path.basename(__file__))[0] + self.model_name = worker_config["model_name"] + self.model_labels = worker_config["model_labels"] + + self.redis_incoming = redis.Redis(host=self.redis_config['host'], + port=self.redis_config['port'], + db=worker_config['db']) + + self.redis_outgoing = redis.Redis(host=self.redis_config['host'], + port=self.redis_config['port'], + db=self.redis_result_config['db']) + + # Загрузка модели + logger.info(f"Loading model {self.model_name}...") + self.model = AutoModelForSequenceClassification.from_pretrained(self.model_name) + if torch.cuda.is_available(): + self.model = self.model.to('cuda') + logger.info(f"Model {self.model_name} loaded.") + + # Загрузка токенизатора + logger.info(f"Loading tokenizer...") + self.tokenizer = AutoTokenizer.from_pretrained(self.model_name) + logger.info(f"Tokenizer for {self.model_name} loaded.") + + def start(self): + logger.info('Worker started.') + while True: + _, message = self.redis_incoming.blpop('dispatcher') + message_body = json.loads(message) + self.process_message(message_body) + + def process_message(self, body): + correlation_id = body['correlation_id'] + text = body['data']['data'] + logger.info(f"Processing text: {text}") + inputs = self.tokenizer(text, return_tensors='pt') + + if torch.cuda.is_available(): + inputs = inputs.to('cuda') + + outputs = self.model(**inputs) + + predictions = outputs.logits.argmax(dim=-1).item() + probabilities = torch.nn.functional.softmax(outputs.logits, dim=-1) + score = probabilities[0][predictions].item() + + label = self.model_labels[predictions] + + result_key = self.model_name.split('/')[-1] + result = {result_key: {"score": score, "label": label}} + logger.info(f"Received task results {result}") + + results_dict = { + "correlation_id": correlation_id, + "worker": self.queue_name, + "result": result[self.model_name.split('/')[-1]] + } + results_dict = json.dumps(results_dict) + self.redis_outgoing.rpush(correlation_id, results_dict) + logger.info(f"Saved result to Redis with key {correlation_id} : {results_dict}") + +# Загрузка конфигурации подключений из файла config.py +connection_config = config + +worker_name = os.path.splitext(os.path.basename(__file__))[0] +worker_config = next((worker for worker in connection_config["workers"] if worker["worker_name"] == worker_name), None) + +if not worker_config: + raise ValueError(f"No configuration found for worker {worker_name}") + +# Создание и запуск воркера +if __name__ == "__main__": + worker = Worker(worker_config, connection_config) + worker.start() diff --git a/solution/mlops/dev_parallel/worker3.py b/solution/mlops/dev_parallel/worker3.py new file mode 100644 index 0000000..fd210a0 --- /dev/null +++ b/solution/mlops/dev_parallel/worker3.py @@ -0,0 +1,94 @@ +import logging +import json +import redis +from transformers import AutoTokenizer, AutoModelForSequenceClassification +import torch +import os + +# Import config from config.py +from config import config + +# Настройка логирования +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +class Worker: + def __init__(self, worker_config, connection_config): + self.redis_config = connection_config["redis"] + self.redis_result_config = connection_config["results_db"] + + # Взять имя воркера из имени файла + self.queue_name = os.path.splitext(os.path.basename(__file__))[0] + self.model_name = worker_config["model_name"] + self.model_labels = worker_config["model_labels"] + + self.redis_incoming = redis.Redis(host=self.redis_config['host'], + port=self.redis_config['port'], + db=worker_config['db']) + + self.redis_outgoing = redis.Redis(host=self.redis_config['host'], + port=self.redis_config['port'], + db=self.redis_result_config['db']) + + # Загрузка модели + logger.info(f"Loading model {self.model_name}...") + self.model = AutoModelForSequenceClassification.from_pretrained(self.model_name) + if torch.cuda.is_available(): + self.model = self.model.to('cuda') + logger.info(f"Model {self.model_name} loaded.") + + # Загрузка токенизатора + logger.info(f"Loading tokenizer...") + self.tokenizer = AutoTokenizer.from_pretrained(self.model_name) + logger.info(f"Tokenizer for {self.model_name} loaded.") + + def start(self): + logger.info('Worker started.') + while True: + _, message = self.redis_incoming.blpop('dispatcher') + message_body = json.loads(message) + self.process_message(message_body) + + def process_message(self, body): + correlation_id = body['correlation_id'] + text = body['data']['data'] + logger.info(f"Processing text: {text}") + inputs = self.tokenizer(text, return_tensors='pt') + + if torch.cuda.is_available(): + inputs = inputs.to('cuda') + + outputs = self.model(**inputs) + + predictions = outputs.logits.argmax(dim=-1).item() + probabilities = torch.nn.functional.softmax(outputs.logits, dim=-1) + score = probabilities[0][predictions].item() + + label = self.model_labels[predictions] + + result_key = self.model_name.split('/')[-1] + result = {result_key: {"score": score, "label": label}} + logger.info(f"Received task results {result}") + + results_dict = { + "correlation_id": correlation_id, + "worker": self.queue_name, + "result": result[self.model_name.split('/')[-1]] + } + results_dict = json.dumps(results_dict) + self.redis_outgoing.rpush(correlation_id, results_dict) + logger.info(f"Saved result to Redis with key {correlation_id} : {results_dict}") + +# Загрузка конфигурации подключений из файла config.py +connection_config = config + +worker_name = os.path.splitext(os.path.basename(__file__))[0] +worker_config = next((worker for worker in connection_config["workers"] if worker["worker_name"] == worker_name), None) + +if not worker_config: + raise ValueError(f"No configuration found for worker {worker_name}") + +# Создание и запуск воркера +if __name__ == "__main__": + worker = Worker(worker_config, connection_config) + worker.start() diff --git a/solution/mlops/dev_parallel/worker4.py b/solution/mlops/dev_parallel/worker4.py new file mode 100644 index 0000000..fd210a0 --- /dev/null +++ b/solution/mlops/dev_parallel/worker4.py @@ -0,0 +1,94 @@ +import logging +import json +import redis +from transformers import AutoTokenizer, AutoModelForSequenceClassification +import torch +import os + +# Import config from config.py +from config import config + +# Настройка логирования +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +class Worker: + def __init__(self, worker_config, connection_config): + self.redis_config = connection_config["redis"] + self.redis_result_config = connection_config["results_db"] + + # Взять имя воркера из имени файла + self.queue_name = os.path.splitext(os.path.basename(__file__))[0] + self.model_name = worker_config["model_name"] + self.model_labels = worker_config["model_labels"] + + self.redis_incoming = redis.Redis(host=self.redis_config['host'], + port=self.redis_config['port'], + db=worker_config['db']) + + self.redis_outgoing = redis.Redis(host=self.redis_config['host'], + port=self.redis_config['port'], + db=self.redis_result_config['db']) + + # Загрузка модели + logger.info(f"Loading model {self.model_name}...") + self.model = AutoModelForSequenceClassification.from_pretrained(self.model_name) + if torch.cuda.is_available(): + self.model = self.model.to('cuda') + logger.info(f"Model {self.model_name} loaded.") + + # Загрузка токенизатора + logger.info(f"Loading tokenizer...") + self.tokenizer = AutoTokenizer.from_pretrained(self.model_name) + logger.info(f"Tokenizer for {self.model_name} loaded.") + + def start(self): + logger.info('Worker started.') + while True: + _, message = self.redis_incoming.blpop('dispatcher') + message_body = json.loads(message) + self.process_message(message_body) + + def process_message(self, body): + correlation_id = body['correlation_id'] + text = body['data']['data'] + logger.info(f"Processing text: {text}") + inputs = self.tokenizer(text, return_tensors='pt') + + if torch.cuda.is_available(): + inputs = inputs.to('cuda') + + outputs = self.model(**inputs) + + predictions = outputs.logits.argmax(dim=-1).item() + probabilities = torch.nn.functional.softmax(outputs.logits, dim=-1) + score = probabilities[0][predictions].item() + + label = self.model_labels[predictions] + + result_key = self.model_name.split('/')[-1] + result = {result_key: {"score": score, "label": label}} + logger.info(f"Received task results {result}") + + results_dict = { + "correlation_id": correlation_id, + "worker": self.queue_name, + "result": result[self.model_name.split('/')[-1]] + } + results_dict = json.dumps(results_dict) + self.redis_outgoing.rpush(correlation_id, results_dict) + logger.info(f"Saved result to Redis with key {correlation_id} : {results_dict}") + +# Загрузка конфигурации подключений из файла config.py +connection_config = config + +worker_name = os.path.splitext(os.path.basename(__file__))[0] +worker_config = next((worker for worker in connection_config["workers"] if worker["worker_name"] == worker_name), None) + +if not worker_config: + raise ValueError(f"No configuration found for worker {worker_name}") + +# Создание и запуск воркера +if __name__ == "__main__": + worker = Worker(worker_config, connection_config) + worker.start() diff --git a/solution/mlops/dev_parallel/worker5.py b/solution/mlops/dev_parallel/worker5.py new file mode 100644 index 0000000..fd210a0 --- /dev/null +++ b/solution/mlops/dev_parallel/worker5.py @@ -0,0 +1,94 @@ +import logging +import json +import redis +from transformers import AutoTokenizer, AutoModelForSequenceClassification +import torch +import os + +# Import config from config.py +from config import config + +# Настройка логирования +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +class Worker: + def __init__(self, worker_config, connection_config): + self.redis_config = connection_config["redis"] + self.redis_result_config = connection_config["results_db"] + + # Взять имя воркера из имени файла + self.queue_name = os.path.splitext(os.path.basename(__file__))[0] + self.model_name = worker_config["model_name"] + self.model_labels = worker_config["model_labels"] + + self.redis_incoming = redis.Redis(host=self.redis_config['host'], + port=self.redis_config['port'], + db=worker_config['db']) + + self.redis_outgoing = redis.Redis(host=self.redis_config['host'], + port=self.redis_config['port'], + db=self.redis_result_config['db']) + + # Загрузка модели + logger.info(f"Loading model {self.model_name}...") + self.model = AutoModelForSequenceClassification.from_pretrained(self.model_name) + if torch.cuda.is_available(): + self.model = self.model.to('cuda') + logger.info(f"Model {self.model_name} loaded.") + + # Загрузка токенизатора + logger.info(f"Loading tokenizer...") + self.tokenizer = AutoTokenizer.from_pretrained(self.model_name) + logger.info(f"Tokenizer for {self.model_name} loaded.") + + def start(self): + logger.info('Worker started.') + while True: + _, message = self.redis_incoming.blpop('dispatcher') + message_body = json.loads(message) + self.process_message(message_body) + + def process_message(self, body): + correlation_id = body['correlation_id'] + text = body['data']['data'] + logger.info(f"Processing text: {text}") + inputs = self.tokenizer(text, return_tensors='pt') + + if torch.cuda.is_available(): + inputs = inputs.to('cuda') + + outputs = self.model(**inputs) + + predictions = outputs.logits.argmax(dim=-1).item() + probabilities = torch.nn.functional.softmax(outputs.logits, dim=-1) + score = probabilities[0][predictions].item() + + label = self.model_labels[predictions] + + result_key = self.model_name.split('/')[-1] + result = {result_key: {"score": score, "label": label}} + logger.info(f"Received task results {result}") + + results_dict = { + "correlation_id": correlation_id, + "worker": self.queue_name, + "result": result[self.model_name.split('/')[-1]] + } + results_dict = json.dumps(results_dict) + self.redis_outgoing.rpush(correlation_id, results_dict) + logger.info(f"Saved result to Redis with key {correlation_id} : {results_dict}") + +# Загрузка конфигурации подключений из файла config.py +connection_config = config + +worker_name = os.path.splitext(os.path.basename(__file__))[0] +worker_config = next((worker for worker in connection_config["workers"] if worker["worker_name"] == worker_name), None) + +if not worker_config: + raise ValueError(f"No configuration found for worker {worker_name}") + +# Создание и запуск воркера +if __name__ == "__main__": + worker = Worker(worker_config, connection_config) + worker.start() diff --git a/solution/mlops/docker-compose.yml b/solution/mlops/docker-compose.yml new file mode 100644 index 0000000..5961f69 --- /dev/null +++ b/solution/mlops/docker-compose.yml @@ -0,0 +1,11 @@ +version: '3' +services: + mlops-app: + build: + context: . + dockerfile: Dockerfile + ports: + - "5000:5000" + redis: + image: "redis:latest" + diff --git a/solution/mlops/requirements.txt b/solution/mlops/requirements.txt new file mode 100644 index 0000000..3dd088f --- /dev/null +++ b/solution/mlops/requirements.txt @@ -0,0 +1,15 @@ +tornado==6.3.2 +aio-pika==9.0.7 +aioredis==2.0.1 +aiormq==6.7.6 +async-timeout==4.0.2 +idna==3.4 +multidict==6.0.4 +pamqp==3.2.1 +typing_extensions==4.6.2 +yarl==1.9.2 +redis==3.5.3 +transformers==4.29.2 +torch==2.0.1 +sentencepiece==0.1.99 +protobuf==3.20.1 diff --git a/solution/mlops/supervisord.conf b/solution/mlops/supervisord.conf new file mode 100644 index 0000000..0903610 --- /dev/null +++ b/solution/mlops/supervisord.conf @@ -0,0 +1,44 @@ +[supervisord] +nodaemon=true + +[program:server] +command=bash -c "sleep 0 && python server.py" +directory=/app +autostart=true +autorestart=true + +[program:tasker] +command=bash -c "sleep 2 && python tasker.py" +directory=/app +autostart=true +autorestart=true + +[program:worker1] +command=bash -c "sleep 5 && python worker1.py" +directory=/app +autostart=true +autorestart=true + +[program:worker2] +command=bash -c "sleep 10 && python worker2.py" +directory=/app +autostart=true +autorestart=true + +[program:worker3] +command=bash -c "sleep 15 && python worker3.py" +directory=/app +autostart=true +autorestart=true + +[program:worker4] +command=bash -c "sleep 20 && python worker4.py" +directory=/app +autostart=true +autorestart=true + +[program:worker5] +command=bash -c "sleep 25 && python worker5.py" +directory=/app +autostart=true +autorestart=true From 5f94401eea60111d948ac9dd194c9703a3dc5bc2 Mon Sep 17 00:00:00 2001 From: vzip Date: Tue, 30 May 2023 03:18:15 -0600 Subject: [PATCH 02/14] change source app --- solution/{mlops => }/Dockerfile | 0 solution/{mlops => }/README.md | 0 solution/{mlops => }/dev_parallel/config.py | 0 solution/{mlops => }/dev_parallel/responder.py | 0 solution/{mlops => }/dev_parallel/server.py | 0 solution/{mlops => }/dev_parallel/tasker.py | 0 solution/{mlops => }/dev_parallel/worker1.py | 0 solution/{mlops => }/dev_parallel/worker2.py | 0 solution/{mlops => }/dev_parallel/worker3.py | 0 solution/{mlops => }/dev_parallel/worker4.py | 0 solution/{mlops => }/dev_parallel/worker5.py | 0 solution/{mlops => }/docker-compose.yml | 0 solution/{mlops => }/requirements.txt | 0 solution/{mlops => }/supervisord.conf | 0 14 files changed, 0 insertions(+), 0 deletions(-) rename solution/{mlops => }/Dockerfile (100%) rename solution/{mlops => }/README.md (100%) rename solution/{mlops => }/dev_parallel/config.py (100%) rename solution/{mlops => }/dev_parallel/responder.py (100%) rename solution/{mlops => }/dev_parallel/server.py (100%) rename solution/{mlops => }/dev_parallel/tasker.py (100%) rename solution/{mlops => }/dev_parallel/worker1.py (100%) rename solution/{mlops => }/dev_parallel/worker2.py (100%) rename solution/{mlops => }/dev_parallel/worker3.py (100%) rename solution/{mlops => }/dev_parallel/worker4.py (100%) rename solution/{mlops => }/dev_parallel/worker5.py (100%) rename solution/{mlops => }/docker-compose.yml (100%) rename solution/{mlops => }/requirements.txt (100%) rename solution/{mlops => }/supervisord.conf (100%) diff --git a/solution/mlops/Dockerfile b/solution/Dockerfile similarity index 100% rename from solution/mlops/Dockerfile rename to solution/Dockerfile diff --git a/solution/mlops/README.md b/solution/README.md similarity index 100% rename from solution/mlops/README.md rename to solution/README.md diff --git a/solution/mlops/dev_parallel/config.py b/solution/dev_parallel/config.py similarity index 100% rename from solution/mlops/dev_parallel/config.py rename to solution/dev_parallel/config.py diff --git a/solution/mlops/dev_parallel/responder.py b/solution/dev_parallel/responder.py similarity index 100% rename from solution/mlops/dev_parallel/responder.py rename to solution/dev_parallel/responder.py diff --git a/solution/mlops/dev_parallel/server.py b/solution/dev_parallel/server.py similarity index 100% rename from solution/mlops/dev_parallel/server.py rename to solution/dev_parallel/server.py diff --git a/solution/mlops/dev_parallel/tasker.py b/solution/dev_parallel/tasker.py similarity index 100% rename from solution/mlops/dev_parallel/tasker.py rename to solution/dev_parallel/tasker.py diff --git a/solution/mlops/dev_parallel/worker1.py b/solution/dev_parallel/worker1.py similarity index 100% rename from solution/mlops/dev_parallel/worker1.py rename to solution/dev_parallel/worker1.py diff --git a/solution/mlops/dev_parallel/worker2.py b/solution/dev_parallel/worker2.py similarity index 100% rename from solution/mlops/dev_parallel/worker2.py rename to solution/dev_parallel/worker2.py diff --git a/solution/mlops/dev_parallel/worker3.py b/solution/dev_parallel/worker3.py similarity index 100% rename from solution/mlops/dev_parallel/worker3.py rename to solution/dev_parallel/worker3.py diff --git a/solution/mlops/dev_parallel/worker4.py b/solution/dev_parallel/worker4.py similarity index 100% rename from solution/mlops/dev_parallel/worker4.py rename to solution/dev_parallel/worker4.py diff --git a/solution/mlops/dev_parallel/worker5.py b/solution/dev_parallel/worker5.py similarity index 100% rename from solution/mlops/dev_parallel/worker5.py rename to solution/dev_parallel/worker5.py diff --git a/solution/mlops/docker-compose.yml b/solution/docker-compose.yml similarity index 100% rename from solution/mlops/docker-compose.yml rename to solution/docker-compose.yml diff --git a/solution/mlops/requirements.txt b/solution/requirements.txt similarity index 100% rename from solution/mlops/requirements.txt rename to solution/requirements.txt diff --git a/solution/mlops/supervisord.conf b/solution/supervisord.conf similarity index 100% rename from solution/mlops/supervisord.conf rename to solution/supervisord.conf From c45656a26398941e61ec2b2b19574b26e38a7f88 Mon Sep 17 00:00:00 2001 From: vzip Date: Tue, 30 May 2023 15:51:40 -0600 Subject: [PATCH 03/14] make redis connection arguments env-based & change for pytorch/pytorch:*-cuda*-cudnn*-runtime & update Dockerfile --- solution/Dockerfile | 5 +++-- solution/README.md | 2 +- solution/dev_parallel/config.py | 12 +++++++++--- solution/dev_parallel/responder.py | 6 +++++- solution/dev_parallel/server.py | 9 ++++++--- solution/dev_parallel/tasker.py | 7 ++++--- solution/dev_parallel/worker1.py | 20 +++++++++++++------- solution/dev_parallel/worker2.py | 20 +++++++++++++------- solution/dev_parallel/worker3.py | 20 +++++++++++++------- solution/dev_parallel/worker4.py | 20 +++++++++++++------- solution/dev_parallel/worker5.py | 20 +++++++++++++------- solution/docker-compose.yml | 5 ++++- solution/helm/envs/example.yaml | 4 ++-- 13 files changed, 99 insertions(+), 51 deletions(-) diff --git a/solution/Dockerfile b/solution/Dockerfile index 671e133..7f7e2f5 100644 --- a/solution/Dockerfile +++ b/solution/Dockerfile @@ -1,4 +1,5 @@ -FROM python:3.9.1-slim +# Использование PyTorch образа с поддержкой CUDA и cuDNN в качестве базового +FROM pytorch/pytorch:latest-cuda-cudnn-runtime # Обновление pip RUN pip install --upgrade pip @@ -12,7 +13,7 @@ RUN apt-get update && apt-get install -y \ vim \ supervisor -WORKDIR /app +WORKDIR /solution COPY requirements.txt . diff --git a/solution/README.md b/solution/README.md index f7374bc..6886e45 100644 --- a/solution/README.md +++ b/solution/README.md @@ -1,7 +1,7 @@ # app tornado-redis-asyncio v0.1b App as a service packed in docker env. Using docker-compose for run 2 vm's one with app and one with redis. -All sttings can be configured in config.py +All settings can be configured in config.py ... to be continued diff --git a/solution/dev_parallel/config.py b/solution/dev_parallel/config.py index 79657a1..cb5fc46 100644 --- a/solution/dev_parallel/config.py +++ b/solution/dev_parallel/config.py @@ -1,11 +1,17 @@ config = { - "redis": { - "host": "redis", - "port": 6379, + "requests_db": { + "db": "0" }, "results_db": { "db": "6" }, + # settings for farm/cluster of workers ! cannot use 0 or 6 reserved for req/res and max 16 for one redis instance + "farm_1": { + "db": ["1", "2", "3", "4", "5"] + }, + "farm_2": { + "db": ["7", "8", "9", "10", "11"] + }, "workers": [ { diff --git a/solution/dev_parallel/responder.py b/solution/dev_parallel/responder.py index 40b196c..ef90a54 100644 --- a/solution/dev_parallel/responder.py +++ b/solution/dev_parallel/responder.py @@ -3,13 +3,17 @@ import json import redis import logging +from os import environ # Настройка логирования logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) +REDIS_PASSWORD = environ.get("REDIS_PASSWORD") +REDIS_HOST = environ.get("REDIS_HOST") + # Подключение к Redis -r = redis.Redis(host='redis', port=6379, db=6) +r = redis.Redis(host=REDIS_HOST, password=REDIS_PASSWORD, port=6379, db=6) async def result_listener(futures_dict): while True: diff --git a/solution/dev_parallel/server.py b/solution/dev_parallel/server.py index 84fa0e7..d5d6cfd 100644 --- a/solution/dev_parallel/server.py +++ b/solution/dev_parallel/server.py @@ -5,13 +5,16 @@ import redis import uuid import logging - from responder import result_listener +from os import environ # Configure logging logging.basicConfig(level=logging.DEBUG) -r = redis.Redis(host='redis', port=6379, db=0) +REDIS_PASSWORD = environ.get("REDIS_PASSWORD") +REDIS_HOST = environ.get("REDIS_HOST") + +r = redis.Redis(host=REDIS_HOST, password=REDIS_PASSWORD, port=6379, db=0) futures_dict = {} @@ -47,7 +50,7 @@ def make_app(): if __name__ == "__main__": app = make_app() - app.listen(5000, address='0.0.0.0') + app.listen(8000, address='0.0.0.0') logging.info('Server started.') tornado.ioloop.IOLoop.current().spawn_callback(result_listener, futures_dict) logging.info('Server started, running result listener') diff --git a/solution/dev_parallel/tasker.py b/solution/dev_parallel/tasker.py index bc4c056..9fcc7e1 100644 --- a/solution/dev_parallel/tasker.py +++ b/solution/dev_parallel/tasker.py @@ -2,14 +2,15 @@ import aioredis import json import logging +import os logging.basicConfig(level=logging.INFO) class QueueManager: def __init__(self): - self.redis_host = 'redis' - self.redis_port = 6379 - self.redis_password = '' # set password if any + self.redis_host = os.environ.get('REDIS_HOST') + self.redis_port = 6379 + self.redis_password = os.environ.get('REDIS_PASSWORD') async def start(self): logging.info('Starting Queue Manager...') diff --git a/solution/dev_parallel/worker1.py b/solution/dev_parallel/worker1.py index fd210a0..e7156b0 100644 --- a/solution/dev_parallel/worker1.py +++ b/solution/dev_parallel/worker1.py @@ -14,7 +14,11 @@ class Worker: def __init__(self, worker_config, connection_config): - self.redis_config = connection_config["redis"] + # получение значений переменных окружения + redis_host = os.environ.get('REDIS_HOST') + redis_password = os.environ.get('REDIS_PASSWORD') + redis_port = int(os.environ.get('REDIS_PORT', '6379')) + self.redis_result_config = connection_config["results_db"] # Взять имя воркера из имени файла @@ -22,13 +26,15 @@ def __init__(self, worker_config, connection_config): self.model_name = worker_config["model_name"] self.model_labels = worker_config["model_labels"] - self.redis_incoming = redis.Redis(host=self.redis_config['host'], - port=self.redis_config['port'], - db=worker_config['db']) + self.redis_incoming = redis.Redis(host=redis_host, + port=redis_port, + db=worker_config['db'], + password=redis_password) - self.redis_outgoing = redis.Redis(host=self.redis_config['host'], - port=self.redis_config['port'], - db=self.redis_result_config['db']) + self.redis_outgoing = redis.Redis(host=redis_host, + port=redis_port, + db=self.redis_result_config['db'], + password=redis_password) # Загрузка модели logger.info(f"Loading model {self.model_name}...") diff --git a/solution/dev_parallel/worker2.py b/solution/dev_parallel/worker2.py index fd210a0..e7156b0 100644 --- a/solution/dev_parallel/worker2.py +++ b/solution/dev_parallel/worker2.py @@ -14,7 +14,11 @@ class Worker: def __init__(self, worker_config, connection_config): - self.redis_config = connection_config["redis"] + # получение значений переменных окружения + redis_host = os.environ.get('REDIS_HOST') + redis_password = os.environ.get('REDIS_PASSWORD') + redis_port = int(os.environ.get('REDIS_PORT', '6379')) + self.redis_result_config = connection_config["results_db"] # Взять имя воркера из имени файла @@ -22,13 +26,15 @@ def __init__(self, worker_config, connection_config): self.model_name = worker_config["model_name"] self.model_labels = worker_config["model_labels"] - self.redis_incoming = redis.Redis(host=self.redis_config['host'], - port=self.redis_config['port'], - db=worker_config['db']) + self.redis_incoming = redis.Redis(host=redis_host, + port=redis_port, + db=worker_config['db'], + password=redis_password) - self.redis_outgoing = redis.Redis(host=self.redis_config['host'], - port=self.redis_config['port'], - db=self.redis_result_config['db']) + self.redis_outgoing = redis.Redis(host=redis_host, + port=redis_port, + db=self.redis_result_config['db'], + password=redis_password) # Загрузка модели logger.info(f"Loading model {self.model_name}...") diff --git a/solution/dev_parallel/worker3.py b/solution/dev_parallel/worker3.py index fd210a0..e7156b0 100644 --- a/solution/dev_parallel/worker3.py +++ b/solution/dev_parallel/worker3.py @@ -14,7 +14,11 @@ class Worker: def __init__(self, worker_config, connection_config): - self.redis_config = connection_config["redis"] + # получение значений переменных окружения + redis_host = os.environ.get('REDIS_HOST') + redis_password = os.environ.get('REDIS_PASSWORD') + redis_port = int(os.environ.get('REDIS_PORT', '6379')) + self.redis_result_config = connection_config["results_db"] # Взять имя воркера из имени файла @@ -22,13 +26,15 @@ def __init__(self, worker_config, connection_config): self.model_name = worker_config["model_name"] self.model_labels = worker_config["model_labels"] - self.redis_incoming = redis.Redis(host=self.redis_config['host'], - port=self.redis_config['port'], - db=worker_config['db']) + self.redis_incoming = redis.Redis(host=redis_host, + port=redis_port, + db=worker_config['db'], + password=redis_password) - self.redis_outgoing = redis.Redis(host=self.redis_config['host'], - port=self.redis_config['port'], - db=self.redis_result_config['db']) + self.redis_outgoing = redis.Redis(host=redis_host, + port=redis_port, + db=self.redis_result_config['db'], + password=redis_password) # Загрузка модели logger.info(f"Loading model {self.model_name}...") diff --git a/solution/dev_parallel/worker4.py b/solution/dev_parallel/worker4.py index fd210a0..e7156b0 100644 --- a/solution/dev_parallel/worker4.py +++ b/solution/dev_parallel/worker4.py @@ -14,7 +14,11 @@ class Worker: def __init__(self, worker_config, connection_config): - self.redis_config = connection_config["redis"] + # получение значений переменных окружения + redis_host = os.environ.get('REDIS_HOST') + redis_password = os.environ.get('REDIS_PASSWORD') + redis_port = int(os.environ.get('REDIS_PORT', '6379')) + self.redis_result_config = connection_config["results_db"] # Взять имя воркера из имени файла @@ -22,13 +26,15 @@ def __init__(self, worker_config, connection_config): self.model_name = worker_config["model_name"] self.model_labels = worker_config["model_labels"] - self.redis_incoming = redis.Redis(host=self.redis_config['host'], - port=self.redis_config['port'], - db=worker_config['db']) + self.redis_incoming = redis.Redis(host=redis_host, + port=redis_port, + db=worker_config['db'], + password=redis_password) - self.redis_outgoing = redis.Redis(host=self.redis_config['host'], - port=self.redis_config['port'], - db=self.redis_result_config['db']) + self.redis_outgoing = redis.Redis(host=redis_host, + port=redis_port, + db=self.redis_result_config['db'], + password=redis_password) # Загрузка модели logger.info(f"Loading model {self.model_name}...") diff --git a/solution/dev_parallel/worker5.py b/solution/dev_parallel/worker5.py index fd210a0..e7156b0 100644 --- a/solution/dev_parallel/worker5.py +++ b/solution/dev_parallel/worker5.py @@ -14,7 +14,11 @@ class Worker: def __init__(self, worker_config, connection_config): - self.redis_config = connection_config["redis"] + # получение значений переменных окружения + redis_host = os.environ.get('REDIS_HOST') + redis_password = os.environ.get('REDIS_PASSWORD') + redis_port = int(os.environ.get('REDIS_PORT', '6379')) + self.redis_result_config = connection_config["results_db"] # Взять имя воркера из имени файла @@ -22,13 +26,15 @@ def __init__(self, worker_config, connection_config): self.model_name = worker_config["model_name"] self.model_labels = worker_config["model_labels"] - self.redis_incoming = redis.Redis(host=self.redis_config['host'], - port=self.redis_config['port'], - db=worker_config['db']) + self.redis_incoming = redis.Redis(host=redis_host, + port=redis_port, + db=worker_config['db'], + password=redis_password) - self.redis_outgoing = redis.Redis(host=self.redis_config['host'], - port=self.redis_config['port'], - db=self.redis_result_config['db']) + self.redis_outgoing = redis.Redis(host=redis_host, + port=redis_port, + db=self.redis_result_config['db'], + password=redis_password) # Загрузка модели logger.info(f"Loading model {self.model_name}...") diff --git a/solution/docker-compose.yml b/solution/docker-compose.yml index 5961f69..aeecfa6 100644 --- a/solution/docker-compose.yml +++ b/solution/docker-compose.yml @@ -5,7 +5,10 @@ services: context: . dockerfile: Dockerfile ports: - - "5000:5000" + - "8000:8000" + environment: + - REDIS_HOST='redis' + - REDIS_PASSWORD='' redis: image: "redis:latest" diff --git a/solution/helm/envs/example.yaml b/solution/helm/envs/example.yaml index e9e2c46..4ef5b0c 100644 --- a/solution/helm/envs/example.yaml +++ b/solution/helm/envs/example.yaml @@ -8,10 +8,10 @@ global: pod: ports: - name: http - containerPort: 5000 + containerPort: 8000 protocol: TCP service: - targetPort: 5000 + targetPort: 8000 # add any configmap data you need # configmaps will be mounted to /workspace/ From bb911657ab198c48aca688328bb1f425287864ed Mon Sep 17 00:00:00 2001 From: vzip Date: Tue, 30 May 2023 15:58:55 -0600 Subject: [PATCH 04/14] Dockerfile change FROM for pytorch/pytorch:latest --- solution/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/solution/Dockerfile b/solution/Dockerfile index 7f7e2f5..938ec39 100644 --- a/solution/Dockerfile +++ b/solution/Dockerfile @@ -1,5 +1,5 @@ # Использование PyTorch образа с поддержкой CUDA и cuDNN в качестве базового -FROM pytorch/pytorch:latest-cuda-cudnn-runtime +FROM pytorch/pytorch:latest # Обновление pip RUN pip install --upgrade pip From bd4964a877162624e315910afa73f68ac6d97af5 Mon Sep 17 00:00:00 2001 From: vzip Date: Tue, 30 May 2023 20:11:37 -0600 Subject: [PATCH 05/14] change format responds for autotest --- solution/dev_parallel/server.py | 13 ++++++++++++- solution/dev_parallel/worker1.py | 2 +- solution/dev_parallel/worker2.py | 2 +- solution/dev_parallel/worker3.py | 2 +- solution/dev_parallel/worker4.py | 2 +- solution/dev_parallel/worker5.py | 2 +- 6 files changed, 17 insertions(+), 6 deletions(-) diff --git a/solution/dev_parallel/server.py b/solution/dev_parallel/server.py index d5d6cfd..3649112 100644 --- a/solution/dev_parallel/server.py +++ b/solution/dev_parallel/server.py @@ -41,7 +41,18 @@ async def post(self): # Ожидание результата и его запись в ответ result = await future logging.info(f'Received result for correlation_id: {correlation_id}. Result: {result}') - self.write(json.dumps(result)) + # Извлечение данных под ключом 'results' + results_list = result['results'] + + # Сортировка списка словарей по имени воркера + sorted_results = sorted(results_list, key=lambda k: k['worker']) + + # Объединение всех результатов в единый словарь + final_result = {} + for res in sorted_results: + final_result.update(res['result']) + + self.write(json.dumps(final_result)) def make_app(): return tornado.web.Application([ diff --git a/solution/dev_parallel/worker1.py b/solution/dev_parallel/worker1.py index e7156b0..4254cb2 100644 --- a/solution/dev_parallel/worker1.py +++ b/solution/dev_parallel/worker1.py @@ -79,7 +79,7 @@ def process_message(self, body): results_dict = { "correlation_id": correlation_id, "worker": self.queue_name, - "result": result[self.model_name.split('/')[-1]] + "result": result #[self.model_name.split('/')[-1]] } results_dict = json.dumps(results_dict) self.redis_outgoing.rpush(correlation_id, results_dict) diff --git a/solution/dev_parallel/worker2.py b/solution/dev_parallel/worker2.py index e7156b0..4254cb2 100644 --- a/solution/dev_parallel/worker2.py +++ b/solution/dev_parallel/worker2.py @@ -79,7 +79,7 @@ def process_message(self, body): results_dict = { "correlation_id": correlation_id, "worker": self.queue_name, - "result": result[self.model_name.split('/')[-1]] + "result": result #[self.model_name.split('/')[-1]] } results_dict = json.dumps(results_dict) self.redis_outgoing.rpush(correlation_id, results_dict) diff --git a/solution/dev_parallel/worker3.py b/solution/dev_parallel/worker3.py index e7156b0..4254cb2 100644 --- a/solution/dev_parallel/worker3.py +++ b/solution/dev_parallel/worker3.py @@ -79,7 +79,7 @@ def process_message(self, body): results_dict = { "correlation_id": correlation_id, "worker": self.queue_name, - "result": result[self.model_name.split('/')[-1]] + "result": result #[self.model_name.split('/')[-1]] } results_dict = json.dumps(results_dict) self.redis_outgoing.rpush(correlation_id, results_dict) diff --git a/solution/dev_parallel/worker4.py b/solution/dev_parallel/worker4.py index e7156b0..4254cb2 100644 --- a/solution/dev_parallel/worker4.py +++ b/solution/dev_parallel/worker4.py @@ -79,7 +79,7 @@ def process_message(self, body): results_dict = { "correlation_id": correlation_id, "worker": self.queue_name, - "result": result[self.model_name.split('/')[-1]] + "result": result #[self.model_name.split('/')[-1]] } results_dict = json.dumps(results_dict) self.redis_outgoing.rpush(correlation_id, results_dict) diff --git a/solution/dev_parallel/worker5.py b/solution/dev_parallel/worker5.py index e7156b0..4254cb2 100644 --- a/solution/dev_parallel/worker5.py +++ b/solution/dev_parallel/worker5.py @@ -79,7 +79,7 @@ def process_message(self, body): results_dict = { "correlation_id": correlation_id, "worker": self.queue_name, - "result": result[self.model_name.split('/')[-1]] + "result": result #[self.model_name.split('/')[-1]] } results_dict = json.dumps(results_dict) self.redis_outgoing.rpush(correlation_id, results_dict) From 787a866b1c1c8691bdf79b01b889b855ef080a71 Mon Sep 17 00:00:00 2001 From: vzip Date: Wed, 31 May 2023 12:20:23 -0600 Subject: [PATCH 06/14] added to Dockerfile env vars for redis host --- autotests/helm/values.yaml | 2 +- solution/Dockerfile | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/autotests/helm/values.yaml b/autotests/helm/values.yaml index cda6a5e..1102397 100644 --- a/autotests/helm/values.yaml +++ b/autotests/helm/values.yaml @@ -25,7 +25,7 @@ global: activeDeadlineSeconds: 3600 # 1h env: - PARTICIPANT_NAME: + PARTICIPANT_NAME: Viktor Kupko api_host: http://inca-smc-mlops-challenge-solution.default.svc.cluster.local/ # K6, do not edit! diff --git a/solution/Dockerfile b/solution/Dockerfile index 938ec39..4233fc9 100644 --- a/solution/Dockerfile +++ b/solution/Dockerfile @@ -1,6 +1,12 @@ # Использование PyTorch образа с поддержкой CUDA и cuDNN в качестве базового FROM pytorch/pytorch:latest +# Set environment variables +ARG REDIS_HOST +ENV REDIS_HOST=$REDIS_HOST +ARG REDIS_PASSWORD +ENV REDIS_PASSWORD=$REDIS_PASSWORD + # Обновление pip RUN pip install --upgrade pip From edf714cb1df35789ad0eacf122fd5a7d5cb9c0eb Mon Sep 17 00:00:00 2001 From: vzip Date: Wed, 31 May 2023 12:26:49 -0600 Subject: [PATCH 07/14] added logging into server.py to check that env got redis host name and try ping --- solution/dev_parallel/server.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/solution/dev_parallel/server.py b/solution/dev_parallel/server.py index 3649112..f3d180e 100644 --- a/solution/dev_parallel/server.py +++ b/solution/dev_parallel/server.py @@ -11,10 +11,20 @@ # Configure logging logging.basicConfig(level=logging.DEBUG) -REDIS_PASSWORD = environ.get("REDIS_PASSWORD") + REDIS_HOST = environ.get("REDIS_HOST") +# Log the Redis host and password +logging.info(f"Redis Host: {REDIS_HOST}") + +REDIS_PASSWORD = environ.get("REDIS_PASSWORD") r = redis.Redis(host=REDIS_HOST, password=REDIS_PASSWORD, port=6379, db=0) +# Try to connect to Redis +try: + r.ping() + logging.info("Successfully connected to Redis") +except redis.ConnectionError: + logging.error("Failed to connect to Redis") futures_dict = {} From a88c1986d3ac9c85c9e50aefb3bc6513517aca4d Mon Sep 17 00:00:00 2001 From: vzip Date: Wed, 31 May 2023 13:39:39 -0600 Subject: [PATCH 08/14] change dir app in supervisord-conf this cause make problem to start py files --- solution/supervisord.conf | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/solution/supervisord.conf b/solution/supervisord.conf index 0903610..d5e529a 100644 --- a/solution/supervisord.conf +++ b/solution/supervisord.conf @@ -3,42 +3,42 @@ nodaemon=true [program:server] command=bash -c "sleep 0 && python server.py" -directory=/app +directory=/solution autostart=true autorestart=true [program:tasker] command=bash -c "sleep 2 && python tasker.py" -directory=/app +directory=/solution autostart=true autorestart=true [program:worker1] command=bash -c "sleep 5 && python worker1.py" -directory=/app +directory=/solution autostart=true autorestart=true [program:worker2] command=bash -c "sleep 10 && python worker2.py" -directory=/app +directory=/solution autostart=true autorestart=true [program:worker3] command=bash -c "sleep 15 && python worker3.py" -directory=/app +directory=/solution autostart=true autorestart=true [program:worker4] command=bash -c "sleep 20 && python worker4.py" -directory=/app +directory=/solution autostart=true autorestart=true [program:worker5] command=bash -c "sleep 25 && python worker5.py" -directory=/app +directory=/solution autostart=true autorestart=true From b981b7d24417a08253a0ea3750d29b5955000b2b Mon Sep 17 00:00:00 2001 From: vzip Date: Thu, 1 Jun 2023 01:06:57 -0600 Subject: [PATCH 09/14] set CUDA env vars in Dockerfile - all ready for run test --- solution/Dockerfile | 4 ++++ solution/helm/envs/{example.yaml => vzip.yaml} | 8 +++++--- 2 files changed, 9 insertions(+), 3 deletions(-) rename solution/helm/envs/{example.yaml => vzip.yaml} (89%) diff --git a/solution/Dockerfile b/solution/Dockerfile index 4233fc9..6f6567e 100644 --- a/solution/Dockerfile +++ b/solution/Dockerfile @@ -7,6 +7,10 @@ ENV REDIS_HOST=$REDIS_HOST ARG REDIS_PASSWORD ENV REDIS_PASSWORD=$REDIS_PASSWORD +# Set CUDA environment variables +ENV NVIDIA_VISIBLE_DEVICES all +ENV NVIDIA_DRIVER_CAPABILITIES compute,utility + # Обновление pip RUN pip install --upgrade pip diff --git a/solution/helm/envs/example.yaml b/solution/helm/envs/vzip.yaml similarity index 89% rename from solution/helm/envs/example.yaml rename to solution/helm/envs/vzip.yaml index 4ef5b0c..28976d4 100644 --- a/solution/helm/envs/example.yaml +++ b/solution/helm/envs/vzip.yaml @@ -1,9 +1,11 @@ global: # add any variables you need in format `key: value` # variables will be available in the container as environment variables - env: - EXAMPLE: "example" - + # env: + # EXAMPLE: "example" + resources: + limits: + nvidia.com/gpu: 1 # change 8000 to your application target port pod: ports: From 60b55e4798170d47c1415ac7f33d7577420f4014 Mon Sep 17 00:00:00 2001 From: vzip Date: Thu, 1 Jun 2023 15:36:31 -0600 Subject: [PATCH 10/14] added validation of input data in incoming requests --- solution/dev_parallel/responder.py | 2 +- solution/dev_parallel/server.py | 21 +++++++++++++++++++-- 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/solution/dev_parallel/responder.py b/solution/dev_parallel/responder.py index ef90a54..b1e7800 100644 --- a/solution/dev_parallel/responder.py +++ b/solution/dev_parallel/responder.py @@ -63,4 +63,4 @@ async def result_listener(futures_dict): else: logger.info('No sets with 5 values found, skipping connection attempt.') - await tornado.gen.sleep(0.05) + await tornado.gen.sleep(0.01) diff --git a/solution/dev_parallel/server.py b/solution/dev_parallel/server.py index f3d180e..6b00c4c 100644 --- a/solution/dev_parallel/server.py +++ b/solution/dev_parallel/server.py @@ -30,9 +30,26 @@ class MainHandler(tornado.web.RequestHandler): async def post(self): - data = json.loads(self.request.body) - correlation_id = str(uuid.uuid4()) + request_body = json.loads(self.request.body) + if isinstance(request_body, str): + # Если входные данные - просто строка, преобразуем её в словарь с ключом 'data' + data = {'data': request_body} + elif isinstance(request_body, dict) and 'data' in request_body: + # Если входные данные - словарь и содержат ключ 'data', используем его + data = request_body + else: + # Если ни одно из условий не выполнилось, возвращаем сообщение об ошибке + self.set_status(400) # код статуса HTTP для "Bad Request" + self.write({"error": "Invalid format. Expected JSON with a 'data' key or a simple text."}) + return + + if not isinstance(data['data'], str): + # Если 'data' не является строкой, возвращаем сообщение об ошибке + self.set_status(400) + self.write({"error": "Invalid 'data' format. Expected a string."}) + return + correlation_id = str(uuid.uuid4()) logging.info(f'Received data: {data}. Assigned correlation_id: {correlation_id}.') # Назначение задачи для воркера From f3e68d17606ec70b3b6c14dd697df39db7efeebb Mon Sep 17 00:00:00 2001 From: vzip Date: Thu, 1 Jun 2023 19:35:57 -0600 Subject: [PATCH 11/14] added more validation on incoming reqs --- solution/dev_parallel/server.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/solution/dev_parallel/server.py b/solution/dev_parallel/server.py index 6b00c4c..9ebb456 100644 --- a/solution/dev_parallel/server.py +++ b/solution/dev_parallel/server.py @@ -31,18 +31,24 @@ class MainHandler(tornado.web.RequestHandler): async def post(self): request_body = json.loads(self.request.body) - if isinstance(request_body, str): + if isinstance(request_body, list): + # Если входные данные - список словарей, преобразуем его в строку + data = {'data': json.dumps(request_body)} + await self.process_data(data) + elif isinstance(request_body, str): # Если входные данные - просто строка, преобразуем её в словарь с ключом 'data' data = {'data': request_body} + await self.process_data(data) elif isinstance(request_body, dict) and 'data' in request_body: # Если входные данные - словарь и содержат ключ 'data', используем его - data = request_body + await self.process_data(request_body) else: # Если ни одно из условий не выполнилось, возвращаем сообщение об ошибке - self.set_status(400) # код статуса HTTP для "Bad Request" + self.set_status(400) self.write({"error": "Invalid format. Expected JSON with a 'data' key or a simple text."}) return + async def process_data(self, data): if not isinstance(data['data'], str): # Если 'data' не является строкой, возвращаем сообщение об ошибке self.set_status(400) From 667c52ce6879453d5e10e8bb8969b7f16b7be152 Mon Sep 17 00:00:00 2001 From: vzip Date: Fri, 2 Jun 2023 08:45:26 -0600 Subject: [PATCH 12/14] change key in resp to by model's author from model's name --- solution/dev_parallel/worker1.py | 4 ++-- solution/dev_parallel/worker2.py | 4 ++-- solution/dev_parallel/worker3.py | 4 ++-- solution/dev_parallel/worker4.py | 4 ++-- solution/dev_parallel/worker5.py | 2 +- 5 files changed, 9 insertions(+), 9 deletions(-) diff --git a/solution/dev_parallel/worker1.py b/solution/dev_parallel/worker1.py index 4254cb2..c8c517b 100644 --- a/solution/dev_parallel/worker1.py +++ b/solution/dev_parallel/worker1.py @@ -72,14 +72,14 @@ def process_message(self, body): label = self.model_labels[predictions] - result_key = self.model_name.split('/')[-1] + result_key = self.model_name.split('/')[0] result = {result_key: {"score": score, "label": label}} logger.info(f"Received task results {result}") results_dict = { "correlation_id": correlation_id, "worker": self.queue_name, - "result": result #[self.model_name.split('/')[-1]] + "result": result } results_dict = json.dumps(results_dict) self.redis_outgoing.rpush(correlation_id, results_dict) diff --git a/solution/dev_parallel/worker2.py b/solution/dev_parallel/worker2.py index 4254cb2..c8c517b 100644 --- a/solution/dev_parallel/worker2.py +++ b/solution/dev_parallel/worker2.py @@ -72,14 +72,14 @@ def process_message(self, body): label = self.model_labels[predictions] - result_key = self.model_name.split('/')[-1] + result_key = self.model_name.split('/')[0] result = {result_key: {"score": score, "label": label}} logger.info(f"Received task results {result}") results_dict = { "correlation_id": correlation_id, "worker": self.queue_name, - "result": result #[self.model_name.split('/')[-1]] + "result": result } results_dict = json.dumps(results_dict) self.redis_outgoing.rpush(correlation_id, results_dict) diff --git a/solution/dev_parallel/worker3.py b/solution/dev_parallel/worker3.py index 4254cb2..6fa55ee 100644 --- a/solution/dev_parallel/worker3.py +++ b/solution/dev_parallel/worker3.py @@ -72,14 +72,14 @@ def process_message(self, body): label = self.model_labels[predictions] - result_key = self.model_name.split('/')[-1] + result_key = self.model_name.split('/')[0] result = {result_key: {"score": score, "label": label}} logger.info(f"Received task results {result}") results_dict = { "correlation_id": correlation_id, "worker": self.queue_name, - "result": result #[self.model_name.split('/')[-1]] + "result": result } results_dict = json.dumps(results_dict) self.redis_outgoing.rpush(correlation_id, results_dict) diff --git a/solution/dev_parallel/worker4.py b/solution/dev_parallel/worker4.py index 4254cb2..6fa55ee 100644 --- a/solution/dev_parallel/worker4.py +++ b/solution/dev_parallel/worker4.py @@ -72,14 +72,14 @@ def process_message(self, body): label = self.model_labels[predictions] - result_key = self.model_name.split('/')[-1] + result_key = self.model_name.split('/')[0] result = {result_key: {"score": score, "label": label}} logger.info(f"Received task results {result}") results_dict = { "correlation_id": correlation_id, "worker": self.queue_name, - "result": result #[self.model_name.split('/')[-1]] + "result": result } results_dict = json.dumps(results_dict) self.redis_outgoing.rpush(correlation_id, results_dict) diff --git a/solution/dev_parallel/worker5.py b/solution/dev_parallel/worker5.py index 4254cb2..4e7470f 100644 --- a/solution/dev_parallel/worker5.py +++ b/solution/dev_parallel/worker5.py @@ -72,7 +72,7 @@ def process_message(self, body): label = self.model_labels[predictions] - result_key = self.model_name.split('/')[-1] + result_key = self.model_name.split('/')[0] result = {result_key: {"score": score, "label": label}} logger.info(f"Received task results {result}") From 07161feb93a73f731f23c3984c7b4f19460d6bd9 Mon Sep 17 00:00:00 2001 From: vzip Date: Fri, 2 Jun 2023 13:45:41 -0600 Subject: [PATCH 13/14] added more 3 workers --- solution/dev_parallel/config.py | 32 +++++++++- solution/dev_parallel/tasker.py | 43 ++++++++++--- solution/dev_parallel/worker11.py | 100 ++++++++++++++++++++++++++++++ solution/dev_parallel/worker8.py | 100 ++++++++++++++++++++++++++++++ solution/dev_parallel/worker9.py | 100 ++++++++++++++++++++++++++++++ solution/supervisord.conf | 18 ++++++ 6 files changed, 382 insertions(+), 11 deletions(-) create mode 100644 solution/dev_parallel/worker11.py create mode 100644 solution/dev_parallel/worker8.py create mode 100644 solution/dev_parallel/worker9.py diff --git a/solution/dev_parallel/config.py b/solution/dev_parallel/config.py index cb5fc46..c862fef 100644 --- a/solution/dev_parallel/config.py +++ b/solution/dev_parallel/config.py @@ -10,7 +10,7 @@ "db": ["1", "2", "3", "4", "5"] }, "farm_2": { - "db": ["7", "8", "9", "10", "11"] + "db": ["1", "8", "9", "4", "11"] }, "workers": [ @@ -43,6 +43,36 @@ "worker_name": "worker5", "model_name": "jy46604790/Fake-News-Bert-Detect", "model_labels": ["LABEL_0", "LABEL_1"] # LABEL_0: Fake news, LABEL_1: Real news + }, + { + "db": 7, + "worker_name": "worker7", + "model_name": "cardiffnlp/twitter-xlm-roberta-base-sentiment", + "model_labels": ["NEGATIVE", "NEUTRAL", "POSITIVE"] # Labels for sentiment model + }, + { + "db": 8, + "worker_name": "worker8", + "model_name": "ivanlau/language-detection-fine-tuned-on-xlm-roberta-base", + "model_labels": ["Arabic", "Basque", "Breton", "Catalan", "Chinese_China", "Chinese_Hongkong", "Chinese_Taiwan", "Chuvash", "Czech", "Dhivehi", "Dutch", "English", "Esperanto", "Estonian", "French", "Frisian", "Georgian", "German", "Greek", "Hakha_Chin", "Indonesian", "Interlingua", "Italian", "Japanese", "Kabyle", "Kinyarwanda", "Kyrgyz", "Latvian", "Maltese", "Mongolian", "Persian", "Polish", "Portuguese", "Romanian", "Romansh_Sursilvan", "Russian", "Sakha", "Slovenian", "Spanish", "Swedish", "Tamil", "Tatar", "Turkish", "Ukrainian", "Welsh"] # Labels for language detection model + }, + { + "db": 9, + "worker_name": "worker9", + "model_name": "svalabs/twitter-xlm-roberta-crypto-spam", + "model_labels": ["HAM", "SPAM"] # Labels for crypto spam model + }, + { + "db": 10, + "worker_name": "worker10", + "model_name": "EIStakovskii/xlm_roberta_base_multilingual_toxicity_classifier_plus", + "model_labels": ["LABEL_0", "LABEL_1"] # Label_1 means TOXIC, Label_0 means NOT TOXIC. + }, + { + "db": 11, + "worker_name": "worker11", + "model_name": "jy46604790/Fake-News-Bert-Detect", + "model_labels": ["LABEL_0", "LABEL_1"] # LABEL_0: Fake news, LABEL_1: Real news } ] } diff --git a/solution/dev_parallel/tasker.py b/solution/dev_parallel/tasker.py index 9fcc7e1..d12c0f2 100644 --- a/solution/dev_parallel/tasker.py +++ b/solution/dev_parallel/tasker.py @@ -3,19 +3,33 @@ import json import logging import os +from config import config # импортируем конфигурационный файл logging.basicConfig(level=logging.INFO) class QueueManager: def __init__(self): - self.redis_host = os.environ.get('REDIS_HOST') - self.redis_port = 6379 - self.redis_password = os.environ.get('REDIS_PASSWORD') + self.redis_host = os.environ.get('REDIS_HOST') + self.redis_port = 6379 + self.redis_password = os.environ.get('REDIS_PASSWORD') + self.group_settings = self.load_group_settings() # загружаем настройки групп + self.lock = asyncio.Lock() + + def load_group_settings(self): + # загружаем конфигурацию групп только один раз при инициализации + group_settings = [] + for key in config.keys(): + if 'farm_' in key: + group_settings.append({'db_numbers': config[key]['db']}) + return group_settings async def start(self): logging.info('Starting Queue Manager...') self.redis_source = aioredis.from_url(f"redis://{self.redis_host}:{self.redis_port}/0", password=self.redis_password) - self.redis_targets = [aioredis.from_url(f"redis://{self.redis_host}:{self.redis_port}/{i}", password=self.redis_password) for i in range(1, 6)] + self.target_groups = [ + [aioredis.from_url(f"redis://{self.redis_host}:{self.redis_port}/{i}", password=self.redis_password) for i in group['db_numbers']] + for group in self.group_settings + ] logging.info('Connected to Redis.') await self.transfer_messages_from_redis_to_redis() @@ -23,13 +37,22 @@ async def publish_message_to_redis(self, message_body, redis_db): await redis_db.rpush('dispatcher', message_body) async def transfer_messages_from_redis_to_redis(self, redis_queue_name='dispatcher'): + current_group_index = 0 # Индекс текущей группы while True: - logging.info('Waiting for message in Redis queue...') - _, message_body = await self.redis_source.blpop(redis_queue_name) - logging.info(f'Received message from Redis queue: {message_body}') - publish_tasks = [self.publish_message_to_redis(message_body, redis_target) for redis_target in self.redis_targets] - await asyncio.gather(*publish_tasks) - logging.info('Published message to all Redis targets.') + async with self.lock: + logging.info('Waiting for message in Redis queue...') + _, message_body = await self.redis_source.blpop(redis_queue_name) + logging.info(f'Received message from Redis queue: {message_body}') + if message_body: + target_group = self.target_groups[current_group_index] # Выбираем текущую группу + logging.info(f'Sending message to group {current_group_index}.') + publish_tasks = [self.publish_message_to_redis(message_body, redis_target) for redis_target in target_group] + await asyncio.gather(*publish_tasks) + logging.info(f'Published message to group {current_group_index}.') + current_group_index = (current_group_index + 1) % len(self.target_groups) # Переходим к следующей группе + else: + logging.error('Received empty message from Redis queue.') + await asyncio.sleep(0.01) # Добавляем небольшую задержку, чтобы уменьшить нагрузку на ЦП loop = asyncio.get_event_loop() manager = QueueManager() diff --git a/solution/dev_parallel/worker11.py b/solution/dev_parallel/worker11.py new file mode 100644 index 0000000..4e7470f --- /dev/null +++ b/solution/dev_parallel/worker11.py @@ -0,0 +1,100 @@ +import logging +import json +import redis +from transformers import AutoTokenizer, AutoModelForSequenceClassification +import torch +import os + +# Import config from config.py +from config import config + +# Настройка логирования +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +class Worker: + def __init__(self, worker_config, connection_config): + # получение значений переменных окружения + redis_host = os.environ.get('REDIS_HOST') + redis_password = os.environ.get('REDIS_PASSWORD') + redis_port = int(os.environ.get('REDIS_PORT', '6379')) + + self.redis_result_config = connection_config["results_db"] + + # Взять имя воркера из имени файла + self.queue_name = os.path.splitext(os.path.basename(__file__))[0] + self.model_name = worker_config["model_name"] + self.model_labels = worker_config["model_labels"] + + self.redis_incoming = redis.Redis(host=redis_host, + port=redis_port, + db=worker_config['db'], + password=redis_password) + + self.redis_outgoing = redis.Redis(host=redis_host, + port=redis_port, + db=self.redis_result_config['db'], + password=redis_password) + + # Загрузка модели + logger.info(f"Loading model {self.model_name}...") + self.model = AutoModelForSequenceClassification.from_pretrained(self.model_name) + if torch.cuda.is_available(): + self.model = self.model.to('cuda') + logger.info(f"Model {self.model_name} loaded.") + + # Загрузка токенизатора + logger.info(f"Loading tokenizer...") + self.tokenizer = AutoTokenizer.from_pretrained(self.model_name) + logger.info(f"Tokenizer for {self.model_name} loaded.") + + def start(self): + logger.info('Worker started.') + while True: + _, message = self.redis_incoming.blpop('dispatcher') + message_body = json.loads(message) + self.process_message(message_body) + + def process_message(self, body): + correlation_id = body['correlation_id'] + text = body['data']['data'] + logger.info(f"Processing text: {text}") + inputs = self.tokenizer(text, return_tensors='pt') + + if torch.cuda.is_available(): + inputs = inputs.to('cuda') + + outputs = self.model(**inputs) + + predictions = outputs.logits.argmax(dim=-1).item() + probabilities = torch.nn.functional.softmax(outputs.logits, dim=-1) + score = probabilities[0][predictions].item() + + label = self.model_labels[predictions] + + result_key = self.model_name.split('/')[0] + result = {result_key: {"score": score, "label": label}} + logger.info(f"Received task results {result}") + + results_dict = { + "correlation_id": correlation_id, + "worker": self.queue_name, + "result": result #[self.model_name.split('/')[-1]] + } + results_dict = json.dumps(results_dict) + self.redis_outgoing.rpush(correlation_id, results_dict) + logger.info(f"Saved result to Redis with key {correlation_id} : {results_dict}") + +# Загрузка конфигурации подключений из файла config.py +connection_config = config + +worker_name = os.path.splitext(os.path.basename(__file__))[0] +worker_config = next((worker for worker in connection_config["workers"] if worker["worker_name"] == worker_name), None) + +if not worker_config: + raise ValueError(f"No configuration found for worker {worker_name}") + +# Создание и запуск воркера +if __name__ == "__main__": + worker = Worker(worker_config, connection_config) + worker.start() diff --git a/solution/dev_parallel/worker8.py b/solution/dev_parallel/worker8.py new file mode 100644 index 0000000..4e7470f --- /dev/null +++ b/solution/dev_parallel/worker8.py @@ -0,0 +1,100 @@ +import logging +import json +import redis +from transformers import AutoTokenizer, AutoModelForSequenceClassification +import torch +import os + +# Import config from config.py +from config import config + +# Настройка логирования +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +class Worker: + def __init__(self, worker_config, connection_config): + # получение значений переменных окружения + redis_host = os.environ.get('REDIS_HOST') + redis_password = os.environ.get('REDIS_PASSWORD') + redis_port = int(os.environ.get('REDIS_PORT', '6379')) + + self.redis_result_config = connection_config["results_db"] + + # Взять имя воркера из имени файла + self.queue_name = os.path.splitext(os.path.basename(__file__))[0] + self.model_name = worker_config["model_name"] + self.model_labels = worker_config["model_labels"] + + self.redis_incoming = redis.Redis(host=redis_host, + port=redis_port, + db=worker_config['db'], + password=redis_password) + + self.redis_outgoing = redis.Redis(host=redis_host, + port=redis_port, + db=self.redis_result_config['db'], + password=redis_password) + + # Загрузка модели + logger.info(f"Loading model {self.model_name}...") + self.model = AutoModelForSequenceClassification.from_pretrained(self.model_name) + if torch.cuda.is_available(): + self.model = self.model.to('cuda') + logger.info(f"Model {self.model_name} loaded.") + + # Загрузка токенизатора + logger.info(f"Loading tokenizer...") + self.tokenizer = AutoTokenizer.from_pretrained(self.model_name) + logger.info(f"Tokenizer for {self.model_name} loaded.") + + def start(self): + logger.info('Worker started.') + while True: + _, message = self.redis_incoming.blpop('dispatcher') + message_body = json.loads(message) + self.process_message(message_body) + + def process_message(self, body): + correlation_id = body['correlation_id'] + text = body['data']['data'] + logger.info(f"Processing text: {text}") + inputs = self.tokenizer(text, return_tensors='pt') + + if torch.cuda.is_available(): + inputs = inputs.to('cuda') + + outputs = self.model(**inputs) + + predictions = outputs.logits.argmax(dim=-1).item() + probabilities = torch.nn.functional.softmax(outputs.logits, dim=-1) + score = probabilities[0][predictions].item() + + label = self.model_labels[predictions] + + result_key = self.model_name.split('/')[0] + result = {result_key: {"score": score, "label": label}} + logger.info(f"Received task results {result}") + + results_dict = { + "correlation_id": correlation_id, + "worker": self.queue_name, + "result": result #[self.model_name.split('/')[-1]] + } + results_dict = json.dumps(results_dict) + self.redis_outgoing.rpush(correlation_id, results_dict) + logger.info(f"Saved result to Redis with key {correlation_id} : {results_dict}") + +# Загрузка конфигурации подключений из файла config.py +connection_config = config + +worker_name = os.path.splitext(os.path.basename(__file__))[0] +worker_config = next((worker for worker in connection_config["workers"] if worker["worker_name"] == worker_name), None) + +if not worker_config: + raise ValueError(f"No configuration found for worker {worker_name}") + +# Создание и запуск воркера +if __name__ == "__main__": + worker = Worker(worker_config, connection_config) + worker.start() diff --git a/solution/dev_parallel/worker9.py b/solution/dev_parallel/worker9.py new file mode 100644 index 0000000..4e7470f --- /dev/null +++ b/solution/dev_parallel/worker9.py @@ -0,0 +1,100 @@ +import logging +import json +import redis +from transformers import AutoTokenizer, AutoModelForSequenceClassification +import torch +import os + +# Import config from config.py +from config import config + +# Настройка логирования +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +class Worker: + def __init__(self, worker_config, connection_config): + # получение значений переменных окружения + redis_host = os.environ.get('REDIS_HOST') + redis_password = os.environ.get('REDIS_PASSWORD') + redis_port = int(os.environ.get('REDIS_PORT', '6379')) + + self.redis_result_config = connection_config["results_db"] + + # Взять имя воркера из имени файла + self.queue_name = os.path.splitext(os.path.basename(__file__))[0] + self.model_name = worker_config["model_name"] + self.model_labels = worker_config["model_labels"] + + self.redis_incoming = redis.Redis(host=redis_host, + port=redis_port, + db=worker_config['db'], + password=redis_password) + + self.redis_outgoing = redis.Redis(host=redis_host, + port=redis_port, + db=self.redis_result_config['db'], + password=redis_password) + + # Загрузка модели + logger.info(f"Loading model {self.model_name}...") + self.model = AutoModelForSequenceClassification.from_pretrained(self.model_name) + if torch.cuda.is_available(): + self.model = self.model.to('cuda') + logger.info(f"Model {self.model_name} loaded.") + + # Загрузка токенизатора + logger.info(f"Loading tokenizer...") + self.tokenizer = AutoTokenizer.from_pretrained(self.model_name) + logger.info(f"Tokenizer for {self.model_name} loaded.") + + def start(self): + logger.info('Worker started.') + while True: + _, message = self.redis_incoming.blpop('dispatcher') + message_body = json.loads(message) + self.process_message(message_body) + + def process_message(self, body): + correlation_id = body['correlation_id'] + text = body['data']['data'] + logger.info(f"Processing text: {text}") + inputs = self.tokenizer(text, return_tensors='pt') + + if torch.cuda.is_available(): + inputs = inputs.to('cuda') + + outputs = self.model(**inputs) + + predictions = outputs.logits.argmax(dim=-1).item() + probabilities = torch.nn.functional.softmax(outputs.logits, dim=-1) + score = probabilities[0][predictions].item() + + label = self.model_labels[predictions] + + result_key = self.model_name.split('/')[0] + result = {result_key: {"score": score, "label": label}} + logger.info(f"Received task results {result}") + + results_dict = { + "correlation_id": correlation_id, + "worker": self.queue_name, + "result": result #[self.model_name.split('/')[-1]] + } + results_dict = json.dumps(results_dict) + self.redis_outgoing.rpush(correlation_id, results_dict) + logger.info(f"Saved result to Redis with key {correlation_id} : {results_dict}") + +# Загрузка конфигурации подключений из файла config.py +connection_config = config + +worker_name = os.path.splitext(os.path.basename(__file__))[0] +worker_config = next((worker for worker in connection_config["workers"] if worker["worker_name"] == worker_name), None) + +if not worker_config: + raise ValueError(f"No configuration found for worker {worker_name}") + +# Создание и запуск воркера +if __name__ == "__main__": + worker = Worker(worker_config, connection_config) + worker.start() diff --git a/solution/supervisord.conf b/solution/supervisord.conf index d5e529a..12f5270 100644 --- a/solution/supervisord.conf +++ b/solution/supervisord.conf @@ -42,3 +42,21 @@ command=bash -c "sleep 25 && python worker5.py" directory=/solution autostart=true autorestart=true + +[program:worker8] +command=bash -c "sleep 45 && python worker8.py" +directory=/solution +autostart=true +autorestart=true + +[program:worker9] +command=bash -c "sleep 55 && python worker9.py" +directory=/solution +autostart=true +autorestart=true + +[program:worker11] +command=bash -c "sleep 65 && python worker11.py" +directory=/solution +autostart=true +autorestart=true From 93c0d1f6d61f1c2ad21d64eccf54347df1c4fe0c Mon Sep 17 00:00:00 2001 From: vzip Date: Mon, 5 Jun 2023 07:53:36 -0600 Subject: [PATCH 14/14] swap some workers --- solution/dev_parallel/config.py | 2 +- solution/dev_parallel/{worker11.py => worker10.py} | 0 solution/supervisord.conf | 8 ++++---- 3 files changed, 5 insertions(+), 5 deletions(-) rename solution/dev_parallel/{worker11.py => worker10.py} (100%) diff --git a/solution/dev_parallel/config.py b/solution/dev_parallel/config.py index c862fef..93f20de 100644 --- a/solution/dev_parallel/config.py +++ b/solution/dev_parallel/config.py @@ -10,7 +10,7 @@ "db": ["1", "2", "3", "4", "5"] }, "farm_2": { - "db": ["1", "8", "9", "4", "11"] + "db": ["1", "8", "9", "10", "5"] }, "workers": [ diff --git a/solution/dev_parallel/worker11.py b/solution/dev_parallel/worker10.py similarity index 100% rename from solution/dev_parallel/worker11.py rename to solution/dev_parallel/worker10.py diff --git a/solution/supervisord.conf b/solution/supervisord.conf index 12f5270..1425ee3 100644 --- a/solution/supervisord.conf +++ b/solution/supervisord.conf @@ -44,19 +44,19 @@ autostart=true autorestart=true [program:worker8] -command=bash -c "sleep 45 && python worker8.py" +command=bash -c "sleep 30 && python worker8.py" directory=/solution autostart=true autorestart=true [program:worker9] -command=bash -c "sleep 55 && python worker9.py" +command=bash -c "sleep 35 && python worker9.py" directory=/solution autostart=true autorestart=true -[program:worker11] -command=bash -c "sleep 65 && python worker11.py" +[program:worker10] +command=bash -c "sleep 65 && python worker10.py" directory=/solution autostart=true autorestart=true