From 0932a46edd3477ab333bb145a8f9adeedb672d33 Mon Sep 17 00:00:00 2001 From: User#None <77580844+issamansur@users.noreply.github.com> Date: Tue, 26 Sep 2023 10:29:01 +0000 Subject: [PATCH 1/5] refactoring, add types --- locust_influxdb_listener/__init__.py | 128 +++++++++++++++++++-------- 1 file changed, 91 insertions(+), 37 deletions(-) diff --git a/locust_influxdb_listener/__init__.py b/locust_influxdb_listener/__init__.py index 81d1a07..07fc7f0 100644 --- a/locust_influxdb_listener/__init__.py +++ b/locust_influxdb_listener/__init__.py @@ -1,13 +1,14 @@ +import sys +from types import TracebackType from typing import Optional, Dict +from datetime import datetime +import traceback +import logging import atexit import gevent -import logging -import sys -import traceback -from datetime import datetime - from influxdb import InfluxDBClient +from requests import Response from requests.exceptions import HTTPError import locust.env from urllib3 import HTTPConnectionPool @@ -21,18 +22,18 @@ class InfluxDBSettings: """ def __init__( - self, - host: str = 'localhost', - port: int = 8086, - user: str = 'admin', + self, + host: str = 'localhost', + port: int = 8086, + user: str = 'admin', pwd: str = 'pass', database: str = 'default', interval_ms: int = 1000, ssl: bool = False, verify_ssl: bool = False, - additional_tags: dict = {}, - influx_host: Optional[str] = 'localhost', - influx_port: Optional[int] = 8086, + additional_tags: Optional[Dict[str,str]] = {}, + influx_host: Optional[str] = 'localhost', + influx_port: Optional[int] = 8086, ): """ Initialize the InfluxDBSettings object with provided or default settings. @@ -56,14 +57,14 @@ def __init__( self.ssl = ssl self.verify_ssl = verify_ssl self.additional_tags = additional_tags - -class InfluxDBListener: + +class InfluxDBListener: """ Events listener that writes locust events to the given influxdb connection """ - tags : dict - + tags : dict = {} + def __init__( self, env: locust.env.Environment, @@ -75,13 +76,13 @@ def __init__( :param env: The Locust environment to listen for events in. :param influxDbSettings: Settings for the InfluxDB connection. """ - + self.env = env self.cache = [] self.stop_flag = False self.interval_ms = influxDbSettings.interval_ms self.additional_tags = influxDbSettings.additional_tags - # influxdb settings + # influxdb settings try: # try to connect create the database and switch to it self.influxdb_client = InfluxDBClient( @@ -95,13 +96,15 @@ def __init__( # database is mandatory so we should always try to create it self.influxdb_client.create_database(influxDbSettings.database) self.influxdb_client.switch_database(influxDbSettings.database) - + + # Catching too general exception. + # Need more info about default exceptions to catch except Exception as ex: logging.error(f'Unexpected error: {ex}') return # determine if worker or master - self.node_id = 'local' + self.node_id: str = 'local' if '--master' in sys.argv: self.node_id = 'master' if '--worker' in sys.argv: @@ -111,12 +114,12 @@ def __init__( # start background event to push data to influxdb self.flush_worker = gevent.spawn(self.__flush_cached_points_worker) self.test_start(0) - + events = env.events - + # requests events.request.add_listener(self.request) - # events + # events events.test_stop.add_listener(self.test_stop) events.user_error.add_listener(self.user_error) events.spawning_complete.add_listener(self.spawning_complete) @@ -124,30 +127,52 @@ def __init__( # complete atexit.register(self.quitting) - def request(self, request_type, name, response_time, response_length, response, - context, exception, start_time=None, url=None) -> None: + def request( + self, + request_type: str, + name: str, + response_time: int, + response_length: int, + response: Response | HTTPConnectionPool, # need review + context: any, + exception: Exception | HTTPError, + start_time: Optional[datetime] = None, + url: Optional[str] = None + ) -> None: self.__listen_for_requests_events( self.node_id, 'locust_requests', request_type, name, response_time, response_length, response, context, exception, start_time, url) - def spawning_complete(self, user_count) -> None: + def spawning_complete(self, user_count: int) -> None: self.__register_event(self.node_id, user_count, 'spawning_complete') return True - def test_start(self, user_count) -> None: + def test_start(self, user_count: int) -> None: self.__register_event(self.node_id, 0, 'test_started') - def test_stop(self, user_count=None, environment=None) -> None: + def test_stop(self, user_count: int = 0, environment: any = None) -> None: self.__register_event(self.node_id, 0, 'test_stopped') - def user_error(self, user_instance, exception, tb, **_kwargs) -> None: + def user_error(self, + # need review + user_instance: locust.User, + exception: Exception, + tb: TracebackType, + **_kwargs + ) -> None: self.__listen_for_locust_errors(self.node_id, user_instance, exception, tb) def quitting(self, **_kwargs) -> None: self.__register_event(self.node_id, 0, 'quitting') self.last_flush_on_quitting() - def __register_event(self, node_id: str, user_count: int, event: str, **_kwargs) -> None: + def __register_event( + self, + node_id: str, + user_count: int, + event: str, + **_kwargs + ) -> None: """ Persist locust event such as hatching started or stopped to influxdb. Append user_count in case that it exists @@ -165,9 +190,21 @@ def __register_event(self, node_id: str, user_count: int, event: str, **_kwargs) point = self.__make_data_point('locust_events', fields, time) self.cache.append(point) - def __listen_for_requests_events(self, node_id, measurement, request_type, name, - response_time, response_length, response, - context, exception, start_time, url) -> None: + # TODO: `start_time` and `url` don't used + def __listen_for_requests_events( + self, + node_id: str, + measurement: str, + request_type: str, + name: str, + response_time: int, + response_length: int, + response: Response | HTTPConnectionPool, + context: any, + exception: Exception | HTTPError, + start_time: datetime, + url: str + ) -> None: """ Persist request information to influxdb. :param node_id: The id of the node reporting the event. @@ -185,7 +222,7 @@ def __listen_for_requests_events(self, node_id, measurement, request_type, name, 'success': was_successful, 'exception': repr(exception), } - if context and type(context) == dict: + if context and isinstance(context, dict): tags.update(context) if isinstance(exception, HTTPError): @@ -200,7 +237,13 @@ def __listen_for_requests_events(self, node_id, measurement, request_type, name, point = self.__make_data_point(measurement, fields, time, tags=tags) self.cache.append(point) - def __listen_for_locust_errors(self, node_id, user_instance, exception: Exception = None, tb=None) -> None: + def __listen_for_locust_errors( + self, + node_id: str, + user_instance: any, + exception: Exception = None, + tb: TracebackType = None + ) -> None: """ Persist locust errors to InfluxDB. :param node_id: The id of the node reporting the error. @@ -231,7 +274,13 @@ def __flush_cached_points_worker(self) -> None: self.__flush_points(self.influxdb_client) gevent.sleep(self.interval_ms / 1000) - def __make_data_point(self, measurement: str, fields: dict, time: datetime, tags: Optional[Dict[str,str]] = {}) -> dict: + def __make_data_point( + self, + measurement: str, + fields: dict, + time: datetime, + tags: Optional[Dict[str,str]] = {} + ) -> dict: """ Create a list with a single point to be saved to influxdb. :param measurement: The measurement where to save this point. @@ -239,7 +288,12 @@ def __make_data_point(self, measurement: str, fields: dict, time: datetime, tags :param time: The time os this point. :param tags: Dictionary of tags to be saved in the measurement default to None. """ - return {"measurement": measurement, "tags": {**tags, **self.additional_tags}, "time": time, "fields": fields} + return { + "measurement": measurement, + "tags": {**tags, **self.additional_tags}, + "time": time, + "fields": fields + } def last_flush_on_quitting(self): From 33a1c215fa794b831511bd1e4779729c7a785bdb Mon Sep 17 00:00:00 2001 From: User#None <77580844+issamansur@users.noreply.github.com> Date: Tue, 26 Sep 2023 12:07:39 +0000 Subject: [PATCH 2/5] refactor import --- locust_influxdb_listener/__init__.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/locust_influxdb_listener/__init__.py b/locust_influxdb_listener/__init__.py index 07fc7f0..d4c2928 100644 --- a/locust_influxdb_listener/__init__.py +++ b/locust_influxdb_listener/__init__.py @@ -10,8 +10,9 @@ from influxdb import InfluxDBClient from requests import Response from requests.exceptions import HTTPError -import locust.env from urllib3 import HTTPConnectionPool +from locust.env import Environment +from locust import User log = logging.getLogger('locust_influxdb_listener') @@ -67,7 +68,7 @@ class InfluxDBListener: def __init__( self, - env: locust.env.Environment, + env: Environment, influxDbSettings: InfluxDBSettings ): """ @@ -96,9 +97,8 @@ def __init__( # database is mandatory so we should always try to create it self.influxdb_client.create_database(influxDbSettings.database) self.influxdb_client.switch_database(influxDbSettings.database) - - # Catching too general exception. - # Need more info about default exceptions to catch + + except Exception as ex: logging.error(f'Unexpected error: {ex}') return @@ -150,12 +150,12 @@ def spawning_complete(self, user_count: int) -> None: def test_start(self, user_count: int) -> None: self.__register_event(self.node_id, 0, 'test_started') - def test_stop(self, user_count: int = 0, environment: any = None) -> None: + def test_stop(self, user_count: int = 0, environment: Environment = None) -> None: self.__register_event(self.node_id, 0, 'test_stopped') def user_error(self, # need review - user_instance: locust.User, + user_instance: User, exception: Exception, tb: TracebackType, **_kwargs @@ -315,4 +315,4 @@ def __flush_points(self, influxdb_client: InfluxDBClient) -> None: if not success: log.error('Failed to write points to influxdb.') # If failed for any reason put back into the beginning of cache - self.cache.insert(0, to_be_flushed) \ No newline at end of file + self.cache.insert(0, to_be_flushed) From 73eee205828aafa4756e376b026e1f970e91974c Mon Sep 17 00:00:00 2001 From: User#None <77580844+issamansur@users.noreply.github.com> Date: Tue, 26 Sep 2023 14:33:50 +0000 Subject: [PATCH 3/5] refactor `request` to `any` --- locust_influxdb_listener/__init__.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/locust_influxdb_listener/__init__.py b/locust_influxdb_listener/__init__.py index d4c2928..966b916 100644 --- a/locust_influxdb_listener/__init__.py +++ b/locust_influxdb_listener/__init__.py @@ -8,7 +8,6 @@ import atexit import gevent from influxdb import InfluxDBClient -from requests import Response from requests.exceptions import HTTPError from urllib3 import HTTPConnectionPool from locust.env import Environment @@ -133,7 +132,7 @@ def request( name: str, response_time: int, response_length: int, - response: Response | HTTPConnectionPool, # need review + response: any, context: any, exception: Exception | HTTPError, start_time: Optional[datetime] = None, @@ -199,8 +198,8 @@ def __listen_for_requests_events( name: str, response_time: int, response_length: int, - response: Response | HTTPConnectionPool, - context: any, + response: any, + context: any, exception: Exception | HTTPError, start_time: datetime, url: str From 7cd7e878da433f9e80fb5b699c8e19b98b8bad7e Mon Sep 17 00:00:00 2001 From: User#None <77580844+issamansur@users.noreply.github.com> Date: Tue, 26 Sep 2023 14:44:39 +0000 Subject: [PATCH 4/5] add type `User` to `user_instance` --- locust_influxdb_listener/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/locust_influxdb_listener/__init__.py b/locust_influxdb_listener/__init__.py index 966b916..087d2c6 100644 --- a/locust_influxdb_listener/__init__.py +++ b/locust_influxdb_listener/__init__.py @@ -239,7 +239,7 @@ def __listen_for_requests_events( def __listen_for_locust_errors( self, node_id: str, - user_instance: any, + user_instance: User, exception: Exception = None, tb: TracebackType = None ) -> None: From bea4eb066f40b695eb4b7975b9148d7e901dc5e7 Mon Sep 17 00:00:00 2001 From: User#None <77580844+issamansur@users.noreply.github.com> Date: Wed, 27 Sep 2023 12:21:39 +0000 Subject: [PATCH 5/5] use `Union[type1, type2]` instead of `type1|type2` --- locust_influxdb_listener/__init__.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/locust_influxdb_listener/__init__.py b/locust_influxdb_listener/__init__.py index 087d2c6..c846914 100644 --- a/locust_influxdb_listener/__init__.py +++ b/locust_influxdb_listener/__init__.py @@ -1,6 +1,6 @@ import sys from types import TracebackType -from typing import Optional, Dict +from typing import Optional, Dict, Union from datetime import datetime import traceback import logging @@ -134,7 +134,7 @@ def request( response_length: int, response: any, context: any, - exception: Exception | HTTPError, + exception: Union[Exception, HTTPError], start_time: Optional[datetime] = None, url: Optional[str] = None ) -> None: @@ -200,7 +200,7 @@ def __listen_for_requests_events( response_length: int, response: any, context: any, - exception: Exception | HTTPError, + exception: Union[Exception, HTTPError], start_time: datetime, url: str ) -> None: