Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pawel plesniak/drunc op mon #18

Open
wants to merge 6 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions python/kafkaopmon/OpMonFunction.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
class OpMonFunction :
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason why this was extracted into a separate file?
Otherwise I would prefer to restore as it was. The reason being that the file was supposed to be unique to simplify the construction of the microservice: https://github.com/DUNE-DAQ/microservices/blob/5a8182e9459e7f451eff6aed4da4dd06ccdc2eb5/dockerfiles/microservices-dependencies.dockerfile#L44

def __init__(self,
function,
opmon_id : re.Pattern,
measurement : re.Pattern) :
self.function = function
self.opmon_id = opmon_id
self.measurement = measurement

def match(self, key : str) -> bool :
opmon_id,measure = key.split('/',1)
if not self.opmon_id.match(opmon_id) : return False
if not self.measurement.match(measure) : return False
return True

def execute(self, e : entry.OpMonEntry ) :
self.function(e)
93 changes: 93 additions & 0 deletions python/kafkaopmon/OpMonPublisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
#!/usr/bin/env python3

import os
import socket
import inspect
from opmonlib.opmon_entry_pb2 import OpMonValue, OpMonId, OpMonEntry
import google.protobuf.message as msg
from google.protobuf.descriptor import FieldDescriptor as fd
from google.protobuf.timestamp_pb2 import Timestamp
from datetime import datetime
from kafka import KafkaProducer
from typing import Optional
import logging

class OpMonPublisher:
def __init__(
self,
topic:str,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like this to be the default_topic rather than just topic. The reason being that we envision the possibility of using multiple topics at some point so the algorithm that selects the topic given a metric at the moment is just returning the default, but it might not be the case for long

bootstrap:str = "monkafka.cern.ch:30092", # Removed for if we don't want to use OpMon (i.e. for ssh-standalone)
application_name:str = "python",
package_name:str = "unknown"
) -> None:
## Options from configurations
self.application_name = application_name
self.package_name = package_name
self.bootstrap = bootstrap
if not topic.startswith('monitoring.'):
topic = 'monitoring.' + topic
self.topic = topic

## runtime options
self.log = logging.getLogger("OpMonPublisher")
self.log.info("Starting Kafka producer")
self.producer = KafkaProducer(
bootstrap_servers=self.bootstrap,
value_serializer=lambda v: v.SerializeToString(),
key_serializer=lambda k: str(k).encode('utf-8')
)
self.log.info("Initialized Kafka producer")

def publish(
self,
session:str,
application:str,
message:msg,
custom_origin:Optional[dict[str,str]] = {"": ""},
substructure:Optional[str] = "",
):
"""Create an OpMonEntry and send it to Kafka."""
data_dict = self.map_message(message)
opmon_id = OpMonId(
session = session,
application = application,
substructure = substructure
)
t = Timestamp()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The timestamp should be the first thing happening.

opmon_metric = OpMonEntry(
time = t.GetCurrentTime(),
origin = opmon_id,
custom_origin = custom_origin,
measurement = message.DESCRIPTOR.name,
data = data_dict,
)
return self.producer.send(opmon_metric)

def map_message(self, message:msg):
message_dict = {}
for name, descriptor in message.DESCRIPTOR.fields_by_name.items():
message_dict[name] = self.map_entry(getattr(message, name), descriptor.cpp_type)
return message_dict

def map_entry(self, value, field_type:int):
formatted_OpMonValue = OpMonValue()
match field_type:
case fd.CPPTYPE_INT32:
formatted_OpMonValue.int4_value = value
case fd.CPPTYPE_INT64:
formatted_OpMonValue.int8_value = value
case fd.CPPTYPE_UINT32:
formatted_OpMonValue.uint4_value = value
case fd.CPPTYPE_UINT64:
formatted_OpMonValue.uint8_value = value
case fd.CPPTYPE_DOUBLE:
formatted_OpMonValue.double_value = value
case fd.CPPTYPE_FLOAT:
formatted_OpMonValue.float_value = value
case fd.CPPTYPE_BOOL:
formatted_OpMonValue.boolean_value = value
case fd.CPPTYPE_STRING:
formatted_OpMonValue.string_value = value
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to handle the case in which the type is a message itself.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The equivalent C++ is https://github.com/DUNE-DAQ/opmonlib/blob/721d3fcb21fe087cddf14cb877ff6da514ae26d2/src/Utils.cpp#L84-L90 The rest of the code has to be handled accordingly.

case _:
raise ValueError("Value is of a non-supported type.")
return formatted_OpMonValue
20 changes: 1 addition & 19 deletions python/kafkaopmon/OpMonSubscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,7 @@

import opmonlib.opmon_entry_pb2 as entry
import google.protobuf.message as msg

class OpMonFunction :
def __init__(self,
function,
opmon_id : re.Pattern,
measurement : re.Pattern) :
self.function = function
self.opmon_id = opmon_id
self.measurement = measurement

def match(self, key : str) -> bool :
opmon_id,measure = key.split('/',1);
if not self.opmon_id.match(opmon_id) : return False
if not self.measurement.match(measure) : return False
return True

def execute(self, e : entry.OpMonEntry ) :
self.function(e)

from kafkaopmon import OpMonFunction
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See previous note on this extraction


class OpMonSubscriber:
def __init__(self, bootstrap, group_id=None, timeout_ms=500, topics=["opmon_stream"]) :
Expand Down
1 change: 1 addition & 0 deletions python/kafkaopmon/__init__ .py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
__version__ = '0.0.1'
16 changes: 16 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[metadata]
name = kafkaopmon
version = attr: kafkaopmon.__version__
url = https://github.com/DUNE-DAQ/kafkaopmon
long_description = file: docs/README.md
long_description_content_type = text/markdown

[options]
packages = find_namespace:
package_dir = =python
include_package_data = true
python_requires = >= 3.6
# Dependencies are in setup.py for GitHub's dependency graph.

[options.packages.find]
where = python
28 changes: 28 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from setuptools import setup

# Metadata goes in setup.cfg. These are here for GitHub's dependency graph.
setup(
name="kafkaopmon",
install_requires=[
"os",
"re",
"socket",
"threading",
"logging",
"getpass",
"sys",
"inspect",
"datetime",
"time",
"typing",
"googleapis-common-protos",
"kafka-python",
"rich",
"sh"

],
extras_require={"develop": [
"ipdb",
"ipython"
]}
)
Loading