diff --git a/locust_influxdb_listener/__init__.py b/locust_influxdb_listener/__init__.py index 81d1a07..c846914 100644 --- a/locust_influxdb_listener/__init__.py +++ b/locust_influxdb_listener/__init__.py @@ -1,16 +1,17 @@ -from typing import Optional, Dict - -import atexit -import gevent -import logging import sys -import traceback +from types import TracebackType +from typing import Optional, Dict, Union from datetime import datetime +import traceback +import logging +import atexit +import gevent from influxdb import InfluxDBClient from requests.exceptions import HTTPError -import locust.env from urllib3 import HTTPConnectionPool +from locust.env import Environment +from locust import User log = logging.getLogger('locust_influxdb_listener') @@ -21,18 +22,18 @@ class InfluxDBSettings: """ def __init__( - self, - host: str = 'localhost', - port: int = 8086, - user: str = 'admin', + self, + host: str = 'localhost', + port: int = 8086, + user: str = 'admin', pwd: str = 'pass', database: str = 'default', interval_ms: int = 1000, ssl: bool = False, verify_ssl: bool = False, - additional_tags: dict = {}, - influx_host: Optional[str] = 'localhost', - influx_port: Optional[int] = 8086, + additional_tags: Optional[Dict[str,str]] = {}, + influx_host: Optional[str] = 'localhost', + influx_port: Optional[int] = 8086, ): """ Initialize the InfluxDBSettings object with provided or default settings. @@ -56,17 +57,17 @@ def __init__( self.ssl = ssl self.verify_ssl = verify_ssl self.additional_tags = additional_tags - -class InfluxDBListener: + +class InfluxDBListener: """ Events listener that writes locust events to the given influxdb connection """ - tags : dict - + tags : dict = {} + def __init__( self, - env: locust.env.Environment, + env: Environment, influxDbSettings: InfluxDBSettings ): """ @@ -75,13 +76,13 @@ def __init__( :param env: The Locust environment to listen for events in. :param influxDbSettings: Settings for the InfluxDB connection. """ - + self.env = env self.cache = [] self.stop_flag = False self.interval_ms = influxDbSettings.interval_ms self.additional_tags = influxDbSettings.additional_tags - # influxdb settings + # influxdb settings try: # try to connect create the database and switch to it self.influxdb_client = InfluxDBClient( @@ -95,13 +96,14 @@ def __init__( # database is mandatory so we should always try to create it self.influxdb_client.create_database(influxDbSettings.database) self.influxdb_client.switch_database(influxDbSettings.database) - + + except Exception as ex: logging.error(f'Unexpected error: {ex}') return # determine if worker or master - self.node_id = 'local' + self.node_id: str = 'local' if '--master' in sys.argv: self.node_id = 'master' if '--worker' in sys.argv: @@ -111,12 +113,12 @@ def __init__( # start background event to push data to influxdb self.flush_worker = gevent.spawn(self.__flush_cached_points_worker) self.test_start(0) - + events = env.events - + # requests events.request.add_listener(self.request) - # events + # events events.test_stop.add_listener(self.test_stop) events.user_error.add_listener(self.user_error) events.spawning_complete.add_listener(self.spawning_complete) @@ -124,30 +126,52 @@ def __init__( # complete atexit.register(self.quitting) - def request(self, request_type, name, response_time, response_length, response, - context, exception, start_time=None, url=None) -> None: + def request( + self, + request_type: str, + name: str, + response_time: int, + response_length: int, + response: any, + context: any, + exception: Union[Exception, HTTPError], + start_time: Optional[datetime] = None, + url: Optional[str] = None + ) -> None: self.__listen_for_requests_events( self.node_id, 'locust_requests', request_type, name, response_time, response_length, response, context, exception, start_time, url) - def spawning_complete(self, user_count) -> None: + def spawning_complete(self, user_count: int) -> None: self.__register_event(self.node_id, user_count, 'spawning_complete') return True - def test_start(self, user_count) -> None: + def test_start(self, user_count: int) -> None: self.__register_event(self.node_id, 0, 'test_started') - def test_stop(self, user_count=None, environment=None) -> None: + def test_stop(self, user_count: int = 0, environment: Environment = None) -> None: self.__register_event(self.node_id, 0, 'test_stopped') - def user_error(self, user_instance, exception, tb, **_kwargs) -> None: + def user_error(self, + # need review + user_instance: User, + exception: Exception, + tb: TracebackType, + **_kwargs + ) -> None: self.__listen_for_locust_errors(self.node_id, user_instance, exception, tb) def quitting(self, **_kwargs) -> None: self.__register_event(self.node_id, 0, 'quitting') self.last_flush_on_quitting() - def __register_event(self, node_id: str, user_count: int, event: str, **_kwargs) -> None: + def __register_event( + self, + node_id: str, + user_count: int, + event: str, + **_kwargs + ) -> None: """ Persist locust event such as hatching started or stopped to influxdb. Append user_count in case that it exists @@ -165,9 +189,21 @@ def __register_event(self, node_id: str, user_count: int, event: str, **_kwargs) point = self.__make_data_point('locust_events', fields, time) self.cache.append(point) - def __listen_for_requests_events(self, node_id, measurement, request_type, name, - response_time, response_length, response, - context, exception, start_time, url) -> None: + # TODO: `start_time` and `url` don't used + def __listen_for_requests_events( + self, + node_id: str, + measurement: str, + request_type: str, + name: str, + response_time: int, + response_length: int, + response: any, + context: any, + exception: Union[Exception, HTTPError], + start_time: datetime, + url: str + ) -> None: """ Persist request information to influxdb. :param node_id: The id of the node reporting the event. @@ -185,7 +221,7 @@ def __listen_for_requests_events(self, node_id, measurement, request_type, name, 'success': was_successful, 'exception': repr(exception), } - if context and type(context) == dict: + if context and isinstance(context, dict): tags.update(context) if isinstance(exception, HTTPError): @@ -200,7 +236,13 @@ def __listen_for_requests_events(self, node_id, measurement, request_type, name, point = self.__make_data_point(measurement, fields, time, tags=tags) self.cache.append(point) - def __listen_for_locust_errors(self, node_id, user_instance, exception: Exception = None, tb=None) -> None: + def __listen_for_locust_errors( + self, + node_id: str, + user_instance: User, + exception: Exception = None, + tb: TracebackType = None + ) -> None: """ Persist locust errors to InfluxDB. :param node_id: The id of the node reporting the error. @@ -231,7 +273,13 @@ def __flush_cached_points_worker(self) -> None: self.__flush_points(self.influxdb_client) gevent.sleep(self.interval_ms / 1000) - def __make_data_point(self, measurement: str, fields: dict, time: datetime, tags: Optional[Dict[str,str]] = {}) -> dict: + def __make_data_point( + self, + measurement: str, + fields: dict, + time: datetime, + tags: Optional[Dict[str,str]] = {} + ) -> dict: """ Create a list with a single point to be saved to influxdb. :param measurement: The measurement where to save this point. @@ -239,7 +287,12 @@ def __make_data_point(self, measurement: str, fields: dict, time: datetime, tags :param time: The time os this point. :param tags: Dictionary of tags to be saved in the measurement default to None. """ - return {"measurement": measurement, "tags": {**tags, **self.additional_tags}, "time": time, "fields": fields} + return { + "measurement": measurement, + "tags": {**tags, **self.additional_tags}, + "time": time, + "fields": fields + } def last_flush_on_quitting(self): @@ -261,4 +314,4 @@ def __flush_points(self, influxdb_client: InfluxDBClient) -> None: if not success: log.error('Failed to write points to influxdb.') # If failed for any reason put back into the beginning of cache - self.cache.insert(0, to_be_flushed) \ No newline at end of file + self.cache.insert(0, to_be_flushed)