From 9fef2bf397d22a52f6005b7d57ce8bf44f62914c Mon Sep 17 00:00:00 2001 From: aviau Date: Wed, 20 Mar 2024 08:57:13 -0400 Subject: [PATCH] run format --- src/saturn_engine/core/topic.py | 3 +-- src/saturn_engine/database.py | 6 ++--- src/saturn_engine/utils/asyncutils.py | 12 +++++----- src/saturn_engine/utils/options.py | 3 +-- src/saturn_engine/utils/tester/json_utils.py | 6 ++--- src/saturn_engine/worker/broker.py | 3 +-- src/saturn_engine/worker/executors/queue.py | 6 ++--- src/saturn_engine/worker/inventory.py | 3 +-- .../worker/resources/provider.py | 14 ++++++----- src/saturn_engine/worker/services/__init__.py | 3 +-- .../worker/services/usage_metrics.py | 12 +++++----- src/saturn_engine/worker/topics/rabbitmq.py | 4 +++- src/saturn_engine/worker/work_manager.py | 14 ++++++----- .../worker_manager/config/declarative.py | 6 ++--- .../config/declarative_resource.py | 14 ++++++----- .../config/static_definitions.py | 6 ++--- tests/conftest.py | 3 +-- tests/worker/test_broker.py | 3 +-- tests/worker/test_executor.py | 3 +-- tests/worker/test_pipeline_info.py | 24 +++++++------------ tests/worker/topics/conftest.py | 3 +-- tests/worker/topics/test_batching_topic.py | 3 +-- 22 files changed, 71 insertions(+), 83 deletions(-) diff --git a/src/saturn_engine/core/topic.py b/src/saturn_engine/core/topic.py index 9a76f173..1dde7048 100644 --- a/src/saturn_engine/core/topic.py +++ b/src/saturn_engine/core/topic.py @@ -45,8 +45,7 @@ def __init__( config: dict[str, dict[str, t.Optional[t.Any]]] = None, # type: ignore[assignment] metadata: dict[str, dict[str, t.Optional[t.Any]]] = None, # type: ignore[assignment] expire_after: t.Optional[datetime.timedelta | float] = None, - ) -> None: - ... + ) -> None: ... def __post_init__(self) -> None: if isinstance(self.expire_after, datetime.timedelta): diff --git a/src/saturn_engine/database.py b/src/saturn_engine/database.py index b3fe3d18..04e234e8 100644 --- a/src/saturn_engine/database.py +++ b/src/saturn_engine/database.py @@ -44,9 +44,9 @@ def drop_all() -> None: def engine() -> Engine: init() - database_connection_creator: Optional[ - str - ] = current_app.saturn.config.database_connection_creator + database_connection_creator: Optional[str] = ( + current_app.saturn.config.database_connection_creator + ) extra_args = {} if database_connection_creator: extra_args["creator"] = import_name(database_connection_creator) diff --git a/src/saturn_engine/utils/asyncutils.py b/src/saturn_engine/utils/asyncutils.py index 4950f10a..b9d80f76 100644 --- a/src/saturn_engine/utils/asyncutils.py +++ b/src/saturn_engine/utils/asyncutils.py @@ -183,12 +183,12 @@ def __init__(self, func: AsyncFNone, *, delay: float) -> None: self.delay = delay self.flush_event = asyncio.Event() self.delayed_task: t.Optional[asyncio.Task] = None - self.delayed_params: t.Optional[ - tuple[tuple[t.Any, ...], dict[str, t.Any]] - ] = None - self.current_params: t.Optional[ - tuple[tuple[t.Any, ...], dict[str, t.Any]] - ] = None + self.delayed_params: t.Optional[tuple[tuple[t.Any, ...], dict[str, t.Any]]] = ( + None + ) + self.current_params: t.Optional[tuple[tuple[t.Any, ...], dict[str, t.Any]]] = ( + None + ) self._call_fut: t.Optional[asyncio.Future] = None diff --git a/src/saturn_engine/utils/options.py b/src/saturn_engine/utils/options.py index 703fe144..59f915e6 100644 --- a/src/saturn_engine/utils/options.py +++ b/src/saturn_engine/utils/options.py @@ -18,8 +18,7 @@ class Options: pass @abstractmethod - def __init__(self, *args: object, options: Options, **kwargs: object) -> None: - ... + def __init__(self, *args: object, options: Options, **kwargs: object) -> None: ... @classmethod def from_options( diff --git a/src/saturn_engine/utils/tester/json_utils.py b/src/saturn_engine/utils/tester/json_utils.py index 5c8d5b4a..8c81b632 100644 --- a/src/saturn_engine/utils/tester/json_utils.py +++ b/src/saturn_engine/utils/tester/json_utils.py @@ -5,13 +5,11 @@ @overload -def normalize_json(obj: dict) -> dict: - ... +def normalize_json(obj: dict) -> dict: ... @overload -def normalize_json(obj: list) -> list: - ... +def normalize_json(obj: list) -> list: ... def normalize_json(obj: object) -> object: diff --git a/src/saturn_engine/worker/broker.py b/src/saturn_engine/worker/broker.py index bfa2b59b..d452b6fe 100644 --- a/src/saturn_engine/worker/broker.py +++ b/src/saturn_engine/worker/broker.py @@ -13,8 +13,7 @@ class WorkManagerInit(Protocol): - def __call__(self, *, services: Services) -> WorkManager: - ... + def __call__(self, *, services: Services) -> WorkManager: ... class Broker: diff --git a/src/saturn_engine/worker/executors/queue.py b/src/saturn_engine/worker/executors/queue.py index 64b7ff60..24cff16d 100644 --- a/src/saturn_engine/worker/executors/queue.py +++ b/src/saturn_engine/worker/executors/queue.py @@ -100,9 +100,9 @@ async def scope( # Transfer the message context to the results # processing scope. context=processable._context.pop_all(), - context_error=error - if error and not error.handled - else None, + context_error=( + error if error and not error.handled else None + ), ) ) diff --git a/src/saturn_engine/worker/inventory.py b/src/saturn_engine/worker/inventory.py index 79c933fa..04f1ab5f 100644 --- a/src/saturn_engine/worker/inventory.py +++ b/src/saturn_engine/worker/inventory.py @@ -62,8 +62,7 @@ def __init__( metadata: dict[str, t.Any] = None, # type: ignore[assignment] config: dict[str, t.Any] = None, # type: ignore[assignment] context: AsyncExitStack = None, # type: ignore[assignment] - ) -> None: - ... + ) -> None: ... def __post_init__(self) -> None: if self.cursor is MISSING: diff --git a/src/saturn_engine/worker/resources/provider.py b/src/saturn_engine/worker/resources/provider.py index 0a67f07c..32540bd8 100644 --- a/src/saturn_engine/worker/resources/provider.py +++ b/src/saturn_engine/worker/resources/provider.py @@ -80,12 +80,14 @@ async def add(self, item: ProvidedResource) -> None: type=self.definition.resource_type, data=item.data, default_delay=item.default_delay, - rate_limit=ResourceRateLimit( - rate_limits=item.rate_limit.rate_limits, - strategy=item.rate_limit.strategy, - ) - if item.rate_limit - else None, + rate_limit=( + ResourceRateLimit( + rate_limits=item.rate_limit.rate_limits, + strategy=item.rate_limit.strategy, + ) + if item.rate_limit + else None + ), ) self._managed_resources.add(resource.name) diff --git a/src/saturn_engine/worker/services/__init__.py b/src/saturn_engine/worker/services/__init__.py index 62c893f2..19e16e24 100644 --- a/src/saturn_engine/worker/services/__init__.py +++ b/src/saturn_engine/worker/services/__init__.py @@ -31,8 +31,7 @@ class BaseServices: class ConfigContainer(t.Protocol): @property - def config(self) -> t.Union[LazyConfig, BaseConfig]: - ... + def config(self) -> t.Union[LazyConfig, BaseConfig]: ... T = TypeVar("T") diff --git a/src/saturn_engine/worker/services/usage_metrics.py b/src/saturn_engine/worker/services/usage_metrics.py index 4a191cc8..65adbf89 100644 --- a/src/saturn_engine/worker/services/usage_metrics.py +++ b/src/saturn_engine/worker/services/usage_metrics.py @@ -122,15 +122,15 @@ class StagesState: executing: StageState[SubmittingMessage, ExecutingMessage] = dataclasses.field( default_factory=StageState ) - processing_results: StageState[ - ExecutableMessage, ProcessingMessage - ] = dataclasses.field(default_factory=StageState) + processing_results: StageState[ExecutableMessage, ProcessingMessage] = ( + dataclasses.field(default_factory=StageState) + ) publishing: StageState[ProcessingMessage, PublishingMessage] = dataclasses.field( default_factory=StageState ) - waiting_publish: StageState[ - PublishingMessage, ExecutableMessage - ] = dataclasses.field(default_factory=StageState) + waiting_publish: StageState[PublishingMessage, ExecutableMessage] = ( + dataclasses.field(default_factory=StageState) + ) class UsageMetrics(MinimalService): diff --git a/src/saturn_engine/worker/topics/rabbitmq.py b/src/saturn_engine/worker/topics/rabbitmq.py index 0ba4cc06..d4f71916 100644 --- a/src/saturn_engine/worker/topics/rabbitmq.py +++ b/src/saturn_engine/worker/topics/rabbitmq.py @@ -261,7 +261,9 @@ async def channel(self) -> aio_pika.abc.AbstractChannel: connection = await self.services.s.rabbitmq.connections.get( self.options.connection_name ) - channel: aio_pika.abc.AbstractRobustChannel = await self.exit_stack.enter_async_context( + channel: ( + aio_pika.abc.AbstractRobustChannel + ) = await self.exit_stack.enter_async_context( connection.channel(on_return_raises=True) # type: ignore[arg-type] ) diff --git a/src/saturn_engine/worker/work_manager.py b/src/saturn_engine/worker/work_manager.py index a49229fb..c8b68a41 100644 --- a/src/saturn_engine/worker/work_manager.py +++ b/src/saturn_engine/worker/work_manager.py @@ -197,10 +197,12 @@ def build_resource_data(self, item: ResourceItem) -> ResourceData: type=item.type, data=item.data, default_delay=item.default_delay, - rate_limit=ResourceRateLimit( - rate_limits=item.rate_limit.rate_limits, - strategy=item.rate_limit.strategy, - ) - if item.rate_limit - else None, + rate_limit=( + ResourceRateLimit( + rate_limits=item.rate_limit.rate_limits, + strategy=item.rate_limit.strategy, + ) + if item.rate_limit + else None + ), ) diff --git a/src/saturn_engine/worker_manager/config/declarative.py b/src/saturn_engine/worker_manager/config/declarative.py index d01e4525..bdf5e2e8 100644 --- a/src/saturn_engine/worker_manager/config/declarative.py +++ b/src/saturn_engine/worker_manager/config/declarative.py @@ -96,9 +96,9 @@ def compile_static_definitions( uncompied_resources_provider.data, ResourcesProvider ) resources_provider_item = resources_provider.to_core_object() - definitions.resources_providers[ - resources_provider_item.name - ] = resources_provider_item + definitions.resources_providers[resources_provider_item.name] = ( + resources_provider_item + ) definitions.resources_by_type[resources_provider_item.resource_type].append( resources_provider_item ) diff --git a/src/saturn_engine/worker_manager/config/declarative_resource.py b/src/saturn_engine/worker_manager/config/declarative_resource.py index ecfdf57c..be85ffe6 100644 --- a/src/saturn_engine/worker_manager/config/declarative_resource.py +++ b/src/saturn_engine/worker_manager/config/declarative_resource.py @@ -32,12 +32,14 @@ def to_core_object(self) -> api.ResourceItem: type=self.spec.type, data=self.spec.data, default_delay=self.spec.default_delay, - rate_limit=api.ResourceRateLimitItem( - rate_limits=self.spec.rate_limit.rate_limits, - strategy=self.spec.rate_limit.strategy, - ) - if self.spec.rate_limit - else None, + rate_limit=( + api.ResourceRateLimitItem( + rate_limits=self.spec.rate_limit.rate_limits, + strategy=self.spec.rate_limit.strategy, + ) + if self.spec.rate_limit + else None + ), ) diff --git a/src/saturn_engine/worker_manager/config/static_definitions.py b/src/saturn_engine/worker_manager/config/static_definitions.py index 5ab7c62e..d542c92f 100644 --- a/src/saturn_engine/worker_manager/config/static_definitions.py +++ b/src/saturn_engine/worker_manager/config/static_definitions.py @@ -23,6 +23,6 @@ class StaticDefinitions: default_factory=dict ) resources: dict[str, ResourceItem] = dataclasses.field(default_factory=dict) - resources_by_type: dict[ - str, list[t.Union[ResourceItem, ResourcesProviderItem]] - ] = dataclasses.field(default_factory=lambda: defaultdict(list)) + resources_by_type: dict[str, list[t.Union[ResourceItem, ResourcesProviderItem]]] = ( + dataclasses.field(default_factory=lambda: defaultdict(list)) + ) diff --git a/tests/conftest.py b/tests/conftest.py index 3e01304f..f4ff18d6 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -81,8 +81,7 @@ def event_loop( loop.close() -def pipeline() -> None: - ... +def pipeline() -> None: ... @pytest.fixture diff --git a/tests/worker/test_broker.py b/tests/worker/test_broker.py index e82f1c01..66bac18c 100644 --- a/tests/worker/test_broker.py +++ b/tests/worker/test_broker.py @@ -56,8 +56,7 @@ async def open(self) -> None: await self.add(ProvidedResource(name="fake-resource", data={"foo": "bar"})) -def pipeline(resource: FakeResource) -> None: - ... +def pipeline(resource: FakeResource) -> None: ... @pytest.fixture diff --git a/tests/worker/test_executor.py b/tests/worker/test_executor.py index 63d4cfbf..6b1f3131 100644 --- a/tests/worker/test_executor.py +++ b/tests/worker/test_executor.py @@ -94,8 +94,7 @@ async def test_base_executor( assert executor.processed == 10 -def pipeline(resource: FakeResource) -> None: - ... +def pipeline(resource: FakeResource) -> None: ... @pytest.mark.asyncio diff --git a/tests/worker/test_pipeline_info.py b/tests/worker/test_pipeline_info.py index 59839513..56386d57 100644 --- a/tests/worker/test_pipeline_info.py +++ b/tests/worker/test_pipeline_info.py @@ -16,45 +16,37 @@ class ResourceB(Resource): pass -def simple_pipeline() -> None: - ... +def simple_pipeline() -> None: ... -def pipeline_with_resources(a: ResourceA, b: "ResourceB") -> None: - ... +def pipeline_with_resources(a: ResourceA, b: "ResourceB") -> None: ... class Namespace: - def pipeline(self) -> None: - ... + def pipeline(self) -> None: ... -def deleted_pipeline() -> None: - ... +def deleted_pipeline() -> None: ... -def modified_pipeline() -> None: - ... +def modified_pipeline() -> None: ... def decorator(func: Callable) -> Callable: @wraps(func) - def wrapper() -> None: - ... + def wrapper() -> None: ... return wrapper @decorator -def wrapped_pipeline() -> None: - ... +def wrapped_pipeline() -> None: ... def test_pipeline_info_name() -> None: global deleted_pipeline, modified_pipeline - def local_pipeline() -> None: - ... + def local_pipeline() -> None: ... assert PipelineInfo.from_pipeline(simple_pipeline) == PipelineInfo( name="tests.worker.test_pipeline_info.simple_pipeline", resources={} diff --git a/tests/worker/topics/conftest.py b/tests/worker/topics/conftest.py index 9c6e04c9..f8c5083c 100644 --- a/tests/worker/topics/conftest.py +++ b/tests/worker/topics/conftest.py @@ -14,8 +14,7 @@ class RabbitMQTopicMaker(t.Protocol): - async def __call__(self, klass: t.Type[T], **kwargs: t.Any) -> T: - ... + async def __call__(self, klass: t.Type[T], **kwargs: t.Any) -> T: ... async def ensure_clean_queue( diff --git a/tests/worker/topics/test_batching_topic.py b/tests/worker/topics/test_batching_topic.py index 1d2f0dd8..5486d9af 100644 --- a/tests/worker/topics/test_batching_topic.py +++ b/tests/worker/topics/test_batching_topic.py @@ -83,8 +83,7 @@ async def test_batching_topic_flush_timeout() -> None: class NestedTestTopic(Topic): @dataclasses.dataclass - class Options: - ... + class Options: ... def __init__(self, options: Options, services: Services, **kwargs: object) -> None: self.entered_context_managers: list[int] = []