Skip to content

Commit

Permalink
Merge pull request #24 from issamansur/add-typings
Browse files Browse the repository at this point in the history
  • Loading branch information
pjcalvo authored Oct 4, 2023
2 parents bad5223 + bea4eb0 commit f4af465
Showing 1 changed file with 94 additions and 41 deletions.
135 changes: 94 additions & 41 deletions locust_influxdb_listener/__init__.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
from typing import Optional, Dict

import atexit
import gevent
import logging
import sys
import traceback
from types import TracebackType
from typing import Optional, Dict, Union
from datetime import datetime
import traceback
import logging

import atexit
import gevent
from influxdb import InfluxDBClient
from requests.exceptions import HTTPError
import locust.env
from urllib3 import HTTPConnectionPool
from locust.env import Environment
from locust import User

log = logging.getLogger('locust_influxdb_listener')

Expand All @@ -21,18 +22,18 @@ class InfluxDBSettings:
"""

def __init__(
self,
host: str = 'localhost',
port: int = 8086,
user: str = 'admin',
self,
host: str = 'localhost',
port: int = 8086,
user: str = 'admin',
pwd: str = 'pass',
database: str = 'default',
interval_ms: int = 1000,
ssl: bool = False,
verify_ssl: bool = False,
additional_tags: dict = {},
influx_host: Optional[str] = 'localhost',
influx_port: Optional[int] = 8086,
additional_tags: Optional[Dict[str,str]] = {},
influx_host: Optional[str] = 'localhost',
influx_port: Optional[int] = 8086,
):
"""
Initialize the InfluxDBSettings object with provided or default settings.
Expand All @@ -56,17 +57,17 @@ def __init__(
self.ssl = ssl
self.verify_ssl = verify_ssl
self.additional_tags = additional_tags


class InfluxDBListener:

class InfluxDBListener:
"""
Events listener that writes locust events to the given influxdb connection
"""
tags : dict
tags : dict = {}

def __init__(
self,
env: locust.env.Environment,
env: Environment,
influxDbSettings: InfluxDBSettings
):
"""
Expand All @@ -75,13 +76,13 @@ def __init__(
:param env: The Locust environment to listen for events in.
:param influxDbSettings: Settings for the InfluxDB connection.
"""

self.env = env
self.cache = []
self.stop_flag = False
self.interval_ms = influxDbSettings.interval_ms
self.additional_tags = influxDbSettings.additional_tags
# influxdb settings
# influxdb settings
try:
# try to connect create the database and switch to it
self.influxdb_client = InfluxDBClient(
Expand All @@ -95,13 +96,14 @@ def __init__(
# database is mandatory so we should always try to create it
self.influxdb_client.create_database(influxDbSettings.database)
self.influxdb_client.switch_database(influxDbSettings.database)



except Exception as ex:
logging.error(f'Unexpected error: {ex}')
return

# determine if worker or master
self.node_id = 'local'
self.node_id: str = 'local'
if '--master' in sys.argv:
self.node_id = 'master'
if '--worker' in sys.argv:
Expand All @@ -111,43 +113,65 @@ def __init__(
# start background event to push data to influxdb
self.flush_worker = gevent.spawn(self.__flush_cached_points_worker)
self.test_start(0)

events = env.events

# requests
events.request.add_listener(self.request)
# events
# events
events.test_stop.add_listener(self.test_stop)
events.user_error.add_listener(self.user_error)
events.spawning_complete.add_listener(self.spawning_complete)
events.quitting.add_listener(self.quitting)
# complete
atexit.register(self.quitting)

def request(self, request_type, name, response_time, response_length, response,
context, exception, start_time=None, url=None) -> None:
def request(
self,
request_type: str,
name: str,
response_time: int,
response_length: int,
response: any,
context: any,
exception: Union[Exception, HTTPError],
start_time: Optional[datetime] = None,
url: Optional[str] = None
) -> None:
self.__listen_for_requests_events(
self.node_id, 'locust_requests', request_type, name, response_time,
response_length, response, context, exception, start_time, url)

def spawning_complete(self, user_count) -> None:
def spawning_complete(self, user_count: int) -> None:
self.__register_event(self.node_id, user_count, 'spawning_complete')
return True

def test_start(self, user_count) -> None:
def test_start(self, user_count: int) -> None:
self.__register_event(self.node_id, 0, 'test_started')

def test_stop(self, user_count=None, environment=None) -> None:
def test_stop(self, user_count: int = 0, environment: Environment = None) -> None:
self.__register_event(self.node_id, 0, 'test_stopped')

def user_error(self, user_instance, exception, tb, **_kwargs) -> None:
def user_error(self,
# need review
user_instance: User,
exception: Exception,
tb: TracebackType,
**_kwargs
) -> None:
self.__listen_for_locust_errors(self.node_id, user_instance, exception, tb)

def quitting(self, **_kwargs) -> None:
self.__register_event(self.node_id, 0, 'quitting')
self.last_flush_on_quitting()

def __register_event(self, node_id: str, user_count: int, event: str, **_kwargs) -> None:
def __register_event(
self,
node_id: str,
user_count: int,
event: str,
**_kwargs
) -> None:
"""
Persist locust event such as hatching started or stopped to influxdb.
Append user_count in case that it exists
Expand All @@ -165,9 +189,21 @@ def __register_event(self, node_id: str, user_count: int, event: str, **_kwargs)
point = self.__make_data_point('locust_events', fields, time)
self.cache.append(point)

def __listen_for_requests_events(self, node_id, measurement, request_type, name,
response_time, response_length, response,
context, exception, start_time, url) -> None:
# TODO: `start_time` and `url` don't used
def __listen_for_requests_events(
self,
node_id: str,
measurement: str,
request_type: str,
name: str,
response_time: int,
response_length: int,
response: any,
context: any,
exception: Union[Exception, HTTPError],
start_time: datetime,
url: str
) -> None:
"""
Persist request information to influxdb.
:param node_id: The id of the node reporting the event.
Expand All @@ -185,7 +221,7 @@ def __listen_for_requests_events(self, node_id, measurement, request_type, name,
'success': was_successful,
'exception': repr(exception),
}
if context and type(context) == dict:
if context and isinstance(context, dict):
tags.update(context)

if isinstance(exception, HTTPError):
Expand All @@ -200,7 +236,13 @@ def __listen_for_requests_events(self, node_id, measurement, request_type, name,
point = self.__make_data_point(measurement, fields, time, tags=tags)
self.cache.append(point)

def __listen_for_locust_errors(self, node_id, user_instance, exception: Exception = None, tb=None) -> None:
def __listen_for_locust_errors(
self,
node_id: str,
user_instance: User,
exception: Exception = None,
tb: TracebackType = None
) -> None:
"""
Persist locust errors to InfluxDB.
:param node_id: The id of the node reporting the error.
Expand Down Expand Up @@ -231,15 +273,26 @@ def __flush_cached_points_worker(self) -> None:
self.__flush_points(self.influxdb_client)
gevent.sleep(self.interval_ms / 1000)

def __make_data_point(self, measurement: str, fields: dict, time: datetime, tags: Optional[Dict[str,str]] = {}) -> dict:
def __make_data_point(
self,
measurement: str,
fields: dict,
time: datetime,
tags: Optional[Dict[str,str]] = {}
) -> dict:
"""
Create a list with a single point to be saved to influxdb.
:param measurement: The measurement where to save this point.
:param fields: Dictionary of field to be saved to measurement.
:param time: The time os this point.
:param tags: Dictionary of tags to be saved in the measurement default to None.
"""
return {"measurement": measurement, "tags": {**tags, **self.additional_tags}, "time": time, "fields": fields}
return {
"measurement": measurement,
"tags": {**tags, **self.additional_tags},
"time": time,
"fields": fields
}


def last_flush_on_quitting(self):
Expand All @@ -261,4 +314,4 @@ def __flush_points(self, influxdb_client: InfluxDBClient) -> None:
if not success:
log.error('Failed to write points to influxdb.')
# If failed for any reason put back into the beginning of cache
self.cache.insert(0, to_be_flushed)
self.cache.insert(0, to_be_flushed)

0 comments on commit f4af465

Please sign in to comment.