Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactoring, add types #24

Merged
merged 5 commits into from
Oct 4, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 91 additions & 37 deletions locust_influxdb_listener/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import sys
from types import TracebackType
from typing import Optional, Dict
from datetime import datetime
import traceback
import logging

import atexit
import gevent
import logging
import sys
import traceback
from datetime import datetime

from influxdb import InfluxDBClient
from requests import Response
from requests.exceptions import HTTPError
import locust.env
from urllib3 import HTTPConnectionPool
Expand All @@ -21,18 +22,18 @@ class InfluxDBSettings:
"""

def __init__(
self,
host: str = 'localhost',
port: int = 8086,
user: str = 'admin',
self,
host: str = 'localhost',
port: int = 8086,
user: str = 'admin',
pwd: str = 'pass',
database: str = 'default',
interval_ms: int = 1000,
ssl: bool = False,
verify_ssl: bool = False,
additional_tags: dict = {},
influx_host: Optional[str] = 'localhost',
influx_port: Optional[int] = 8086,
additional_tags: Optional[Dict[str,str]] = {},
influx_host: Optional[str] = 'localhost',
influx_port: Optional[int] = 8086,
):
"""
Initialize the InfluxDBSettings object with provided or default settings.
Expand All @@ -56,14 +57,14 @@ def __init__(
self.ssl = ssl
self.verify_ssl = verify_ssl
self.additional_tags = additional_tags


class InfluxDBListener:

class InfluxDBListener:
"""
Events listener that writes locust events to the given influxdb connection
"""
tags : dict
tags : dict = {}

def __init__(
self,
env: locust.env.Environment,
Expand All @@ -75,13 +76,13 @@ def __init__(
:param env: The Locust environment to listen for events in.
:param influxDbSettings: Settings for the InfluxDB connection.
"""

self.env = env
self.cache = []
self.stop_flag = False
self.interval_ms = influxDbSettings.interval_ms
self.additional_tags = influxDbSettings.additional_tags
# influxdb settings
# influxdb settings
try:
# try to connect create the database and switch to it
self.influxdb_client = InfluxDBClient(
Expand All @@ -95,13 +96,15 @@ def __init__(
# database is mandatory so we should always try to create it
self.influxdb_client.create_database(influxDbSettings.database)
self.influxdb_client.switch_database(influxDbSettings.database)


# Catching too general exception.
issamansur marked this conversation as resolved.
Show resolved Hide resolved
# Need more info about default exceptions to catch
except Exception as ex:
logging.error(f'Unexpected error: {ex}')
return

# determine if worker or master
self.node_id = 'local'
self.node_id: str = 'local'
if '--master' in sys.argv:
self.node_id = 'master'
if '--worker' in sys.argv:
Expand All @@ -111,43 +114,65 @@ def __init__(
# start background event to push data to influxdb
self.flush_worker = gevent.spawn(self.__flush_cached_points_worker)
self.test_start(0)

events = env.events

# requests
events.request.add_listener(self.request)
# events
# events
events.test_stop.add_listener(self.test_stop)
events.user_error.add_listener(self.user_error)
events.spawning_complete.add_listener(self.spawning_complete)
events.quitting.add_listener(self.quitting)
# complete
atexit.register(self.quitting)

def request(self, request_type, name, response_time, response_length, response,
context, exception, start_time=None, url=None) -> None:
def request(
self,
request_type: str,
name: str,
response_time: int,
response_length: int,
response: Response | HTTPConnectionPool, # need review
issamansur marked this conversation as resolved.
Show resolved Hide resolved
context: any,
exception: Exception | HTTPError,
start_time: Optional[datetime] = None,
url: Optional[str] = None
) -> None:
self.__listen_for_requests_events(
self.node_id, 'locust_requests', request_type, name, response_time,
response_length, response, context, exception, start_time, url)

def spawning_complete(self, user_count) -> None:
def spawning_complete(self, user_count: int) -> None:
self.__register_event(self.node_id, user_count, 'spawning_complete')
return True

def test_start(self, user_count) -> None:
def test_start(self, user_count: int) -> None:
self.__register_event(self.node_id, 0, 'test_started')

def test_stop(self, user_count=None, environment=None) -> None:
def test_stop(self, user_count: int = 0, environment: any = None) -> None:
issamansur marked this conversation as resolved.
Show resolved Hide resolved
self.__register_event(self.node_id, 0, 'test_stopped')

def user_error(self, user_instance, exception, tb, **_kwargs) -> None:
def user_error(self,
# need review
user_instance: locust.User,
issamansur marked this conversation as resolved.
Show resolved Hide resolved
exception: Exception,
tb: TracebackType,
**_kwargs
) -> None:
self.__listen_for_locust_errors(self.node_id, user_instance, exception, tb)

def quitting(self, **_kwargs) -> None:
self.__register_event(self.node_id, 0, 'quitting')
self.last_flush_on_quitting()

def __register_event(self, node_id: str, user_count: int, event: str, **_kwargs) -> None:
def __register_event(
self,
node_id: str,
user_count: int,
event: str,
**_kwargs
) -> None:
"""
Persist locust event such as hatching started or stopped to influxdb.
Append user_count in case that it exists
Expand All @@ -165,9 +190,21 @@ def __register_event(self, node_id: str, user_count: int, event: str, **_kwargs)
point = self.__make_data_point('locust_events', fields, time)
self.cache.append(point)

def __listen_for_requests_events(self, node_id, measurement, request_type, name,
response_time, response_length, response,
context, exception, start_time, url) -> None:
# TODO: `start_time` and `url` don't used
def __listen_for_requests_events(
self,
node_id: str,
measurement: str,
request_type: str,
name: str,
response_time: int,
response_length: int,
response: Response | HTTPConnectionPool,
context: any,
exception: Exception | HTTPError,
start_time: datetime,
url: str
) -> None:
"""
Persist request information to influxdb.
:param node_id: The id of the node reporting the event.
Expand All @@ -185,7 +222,7 @@ def __listen_for_requests_events(self, node_id, measurement, request_type, name,
'success': was_successful,
'exception': repr(exception),
}
if context and type(context) == dict:
if context and isinstance(context, dict):
tags.update(context)

if isinstance(exception, HTTPError):
Expand All @@ -200,7 +237,13 @@ def __listen_for_requests_events(self, node_id, measurement, request_type, name,
point = self.__make_data_point(measurement, fields, time, tags=tags)
self.cache.append(point)

def __listen_for_locust_errors(self, node_id, user_instance, exception: Exception = None, tb=None) -> None:
def __listen_for_locust_errors(
self,
node_id: str,
user_instance: any,
issamansur marked this conversation as resolved.
Show resolved Hide resolved
exception: Exception = None,
tb: TracebackType = None
) -> None:
"""
Persist locust errors to InfluxDB.
:param node_id: The id of the node reporting the error.
Expand Down Expand Up @@ -231,15 +274,26 @@ def __flush_cached_points_worker(self) -> None:
self.__flush_points(self.influxdb_client)
gevent.sleep(self.interval_ms / 1000)

def __make_data_point(self, measurement: str, fields: dict, time: datetime, tags: Optional[Dict[str,str]] = {}) -> dict:
def __make_data_point(
self,
measurement: str,
fields: dict,
time: datetime,
tags: Optional[Dict[str,str]] = {}
) -> dict:
"""
Create a list with a single point to be saved to influxdb.
:param measurement: The measurement where to save this point.
:param fields: Dictionary of field to be saved to measurement.
:param time: The time os this point.
:param tags: Dictionary of tags to be saved in the measurement default to None.
"""
return {"measurement": measurement, "tags": {**tags, **self.additional_tags}, "time": time, "fields": fields}
return {
"measurement": measurement,
"tags": {**tags, **self.additional_tags},
"time": time,
"fields": fields
}


def last_flush_on_quitting(self):
Expand Down