-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pawel plesniak/drunc op mon #18
base: develop
Are you sure you want to change the base?
Changes from all commits
cdc2efd
9295cbf
159ea5b
7c80680
b96025a
64d3d3a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
class OpMonFunction : | ||
def __init__(self, | ||
function, | ||
opmon_id : re.Pattern, | ||
measurement : re.Pattern) : | ||
self.function = function | ||
self.opmon_id = opmon_id | ||
self.measurement = measurement | ||
|
||
def match(self, key : str) -> bool : | ||
opmon_id,measure = key.split('/',1) | ||
if not self.opmon_id.match(opmon_id) : return False | ||
if not self.measurement.match(measure) : return False | ||
return True | ||
|
||
def execute(self, e : entry.OpMonEntry ) : | ||
self.function(e) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
#!/usr/bin/env python3 | ||
|
||
import os | ||
import socket | ||
import inspect | ||
from opmonlib.opmon_entry_pb2 import OpMonValue, OpMonId, OpMonEntry | ||
import google.protobuf.message as msg | ||
from google.protobuf.descriptor import FieldDescriptor as fd | ||
from google.protobuf.timestamp_pb2 import Timestamp | ||
from datetime import datetime | ||
from kafka import KafkaProducer | ||
from typing import Optional | ||
import logging | ||
|
||
class OpMonPublisher: | ||
def __init__( | ||
self, | ||
topic:str, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would like this to be the |
||
bootstrap:str = "monkafka.cern.ch:30092", # Removed for if we don't want to use OpMon (i.e. for ssh-standalone) | ||
application_name:str = "python", | ||
package_name:str = "unknown" | ||
) -> None: | ||
## Options from configurations | ||
self.application_name = application_name | ||
self.package_name = package_name | ||
self.bootstrap = bootstrap | ||
if not topic.startswith('monitoring.'): | ||
topic = 'monitoring.' + topic | ||
self.topic = topic | ||
|
||
## runtime options | ||
self.log = logging.getLogger("OpMonPublisher") | ||
self.log.info("Starting Kafka producer") | ||
self.producer = KafkaProducer( | ||
bootstrap_servers=self.bootstrap, | ||
value_serializer=lambda v: v.SerializeToString(), | ||
key_serializer=lambda k: str(k).encode('utf-8') | ||
) | ||
self.log.info("Initialized Kafka producer") | ||
|
||
def publish( | ||
self, | ||
session:str, | ||
application:str, | ||
message:msg, | ||
custom_origin:Optional[dict[str,str]] = {"": ""}, | ||
substructure:Optional[str] = "", | ||
): | ||
"""Create an OpMonEntry and send it to Kafka.""" | ||
data_dict = self.map_message(message) | ||
opmon_id = OpMonId( | ||
session = session, | ||
application = application, | ||
substructure = substructure | ||
) | ||
t = Timestamp() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The timestamp should be the first thing happening. |
||
opmon_metric = OpMonEntry( | ||
time = t.GetCurrentTime(), | ||
origin = opmon_id, | ||
custom_origin = custom_origin, | ||
measurement = message.DESCRIPTOR.name, | ||
data = data_dict, | ||
) | ||
return self.producer.send(opmon_metric) | ||
|
||
def map_message(self, message:msg): | ||
message_dict = {} | ||
for name, descriptor in message.DESCRIPTOR.fields_by_name.items(): | ||
message_dict[name] = self.map_entry(getattr(message, name), descriptor.cpp_type) | ||
return message_dict | ||
|
||
def map_entry(self, value, field_type:int): | ||
formatted_OpMonValue = OpMonValue() | ||
match field_type: | ||
case fd.CPPTYPE_INT32: | ||
formatted_OpMonValue.int4_value = value | ||
case fd.CPPTYPE_INT64: | ||
formatted_OpMonValue.int8_value = value | ||
case fd.CPPTYPE_UINT32: | ||
formatted_OpMonValue.uint4_value = value | ||
case fd.CPPTYPE_UINT64: | ||
formatted_OpMonValue.uint8_value = value | ||
case fd.CPPTYPE_DOUBLE: | ||
formatted_OpMonValue.double_value = value | ||
case fd.CPPTYPE_FLOAT: | ||
formatted_OpMonValue.float_value = value | ||
case fd.CPPTYPE_BOOL: | ||
formatted_OpMonValue.boolean_value = value | ||
case fd.CPPTYPE_STRING: | ||
formatted_OpMonValue.string_value = value | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need to handle the case in which the type is a message itself. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The equivalent C++ is https://github.com/DUNE-DAQ/opmonlib/blob/721d3fcb21fe087cddf14cb877ff6da514ae26d2/src/Utils.cpp#L84-L90 The rest of the code has to be handled accordingly. |
||
case _: | ||
raise ValueError("Value is of a non-supported type.") | ||
return formatted_OpMonValue |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,25 +11,7 @@ | |
|
||
import opmonlib.opmon_entry_pb2 as entry | ||
import google.protobuf.message as msg | ||
|
||
class OpMonFunction : | ||
def __init__(self, | ||
function, | ||
opmon_id : re.Pattern, | ||
measurement : re.Pattern) : | ||
self.function = function | ||
self.opmon_id = opmon_id | ||
self.measurement = measurement | ||
|
||
def match(self, key : str) -> bool : | ||
opmon_id,measure = key.split('/',1); | ||
if not self.opmon_id.match(opmon_id) : return False | ||
if not self.measurement.match(measure) : return False | ||
return True | ||
|
||
def execute(self, e : entry.OpMonEntry ) : | ||
self.function(e) | ||
|
||
from kafkaopmon import OpMonFunction | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See previous note on this extraction |
||
|
||
class OpMonSubscriber: | ||
def __init__(self, bootstrap, group_id=None, timeout_ms=500, topics=["opmon_stream"]) : | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
__version__ = '0.0.1' |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
[metadata] | ||
name = kafkaopmon | ||
version = attr: kafkaopmon.__version__ | ||
url = https://github.com/DUNE-DAQ/kafkaopmon | ||
long_description = file: docs/README.md | ||
long_description_content_type = text/markdown | ||
|
||
[options] | ||
packages = find_namespace: | ||
package_dir = =python | ||
include_package_data = true | ||
python_requires = >= 3.6 | ||
# Dependencies are in setup.py for GitHub's dependency graph. | ||
|
||
[options.packages.find] | ||
where = python |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
from setuptools import setup | ||
|
||
# Metadata goes in setup.cfg. These are here for GitHub's dependency graph. | ||
setup( | ||
name="kafkaopmon", | ||
install_requires=[ | ||
"os", | ||
"re", | ||
"socket", | ||
"threading", | ||
"logging", | ||
"getpass", | ||
"sys", | ||
"inspect", | ||
"datetime", | ||
"time", | ||
"typing", | ||
"googleapis-common-protos", | ||
"kafka-python", | ||
"rich", | ||
"sh" | ||
|
||
], | ||
extras_require={"develop": [ | ||
"ipdb", | ||
"ipython" | ||
]} | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason why this was extracted into a separate file?
Otherwise I would prefer to restore as it was. The reason being that the file was supposed to be unique to simplify the construction of the microservice: https://github.com/DUNE-DAQ/microservices/blob/5a8182e9459e7f451eff6aed4da4dd06ccdc2eb5/dockerfiles/microservices-dependencies.dockerfile#L44