Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

run format #397

Merged
merged 1 commit into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/saturn_engine/core/topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ def __init__(
config: dict[str, dict[str, t.Optional[t.Any]]] = None, # type: ignore[assignment]
metadata: dict[str, dict[str, t.Optional[t.Any]]] = None, # type: ignore[assignment]
expire_after: t.Optional[datetime.timedelta | float] = None,
) -> None:
...
) -> None: ...

def __post_init__(self) -> None:
if isinstance(self.expire_after, datetime.timedelta):
Expand Down
6 changes: 3 additions & 3 deletions src/saturn_engine/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ def drop_all() -> None:

def engine() -> Engine:
init()
database_connection_creator: Optional[
str
] = current_app.saturn.config.database_connection_creator
database_connection_creator: Optional[str] = (
current_app.saturn.config.database_connection_creator
)
extra_args = {}
if database_connection_creator:
extra_args["creator"] = import_name(database_connection_creator)
Expand Down
12 changes: 6 additions & 6 deletions src/saturn_engine/utils/asyncutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,12 +183,12 @@ def __init__(self, func: AsyncFNone, *, delay: float) -> None:
self.delay = delay
self.flush_event = asyncio.Event()
self.delayed_task: t.Optional[asyncio.Task] = None
self.delayed_params: t.Optional[
tuple[tuple[t.Any, ...], dict[str, t.Any]]
] = None
self.current_params: t.Optional[
tuple[tuple[t.Any, ...], dict[str, t.Any]]
] = None
self.delayed_params: t.Optional[tuple[tuple[t.Any, ...], dict[str, t.Any]]] = (
None
)
self.current_params: t.Optional[tuple[tuple[t.Any, ...], dict[str, t.Any]]] = (
None
)

self._call_fut: t.Optional[asyncio.Future] = None

Expand Down
3 changes: 1 addition & 2 deletions src/saturn_engine/utils/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ class Options:
pass

@abstractmethod
def __init__(self, *args: object, options: Options, **kwargs: object) -> None:
...
def __init__(self, *args: object, options: Options, **kwargs: object) -> None: ...

@classmethod
def from_options(
Expand Down
6 changes: 2 additions & 4 deletions src/saturn_engine/utils/tester/json_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,11 @@


@overload
def normalize_json(obj: dict) -> dict:
...
def normalize_json(obj: dict) -> dict: ...


@overload
def normalize_json(obj: list) -> list:
...
def normalize_json(obj: list) -> list: ...


def normalize_json(obj: object) -> object:
Expand Down
3 changes: 1 addition & 2 deletions src/saturn_engine/worker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@


class WorkManagerInit(Protocol):
def __call__(self, *, services: Services) -> WorkManager:
...
def __call__(self, *, services: Services) -> WorkManager: ...


class Broker:
Expand Down
6 changes: 3 additions & 3 deletions src/saturn_engine/worker/executors/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,9 @@ async def scope(
# Transfer the message context to the results
# processing scope.
context=processable._context.pop_all(),
context_error=error
if error and not error.handled
else None,
context_error=(
error if error and not error.handled else None
),
)
)

Expand Down
3 changes: 1 addition & 2 deletions src/saturn_engine/worker/inventory.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ def __init__(
metadata: dict[str, t.Any] = None, # type: ignore[assignment]
config: dict[str, t.Any] = None, # type: ignore[assignment]
context: AsyncExitStack = None, # type: ignore[assignment]
) -> None:
...
) -> None: ...

def __post_init__(self) -> None:
if self.cursor is MISSING:
Expand Down
14 changes: 8 additions & 6 deletions src/saturn_engine/worker/resources/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,14 @@ async def add(self, item: ProvidedResource) -> None:
type=self.definition.resource_type,
data=item.data,
default_delay=item.default_delay,
rate_limit=ResourceRateLimit(
rate_limits=item.rate_limit.rate_limits,
strategy=item.rate_limit.strategy,
)
if item.rate_limit
else None,
rate_limit=(
ResourceRateLimit(
rate_limits=item.rate_limit.rate_limits,
strategy=item.rate_limit.strategy,
)
if item.rate_limit
else None
),
)

self._managed_resources.add(resource.name)
Expand Down
3 changes: 1 addition & 2 deletions src/saturn_engine/worker/services/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ class BaseServices:

class ConfigContainer(t.Protocol):
@property
def config(self) -> t.Union[LazyConfig, BaseConfig]:
...
def config(self) -> t.Union[LazyConfig, BaseConfig]: ...


T = TypeVar("T")
Expand Down
12 changes: 6 additions & 6 deletions src/saturn_engine/worker/services/usage_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,15 +122,15 @@ class StagesState:
executing: StageState[SubmittingMessage, ExecutingMessage] = dataclasses.field(
default_factory=StageState
)
processing_results: StageState[
ExecutableMessage, ProcessingMessage
] = dataclasses.field(default_factory=StageState)
processing_results: StageState[ExecutableMessage, ProcessingMessage] = (
dataclasses.field(default_factory=StageState)
)
publishing: StageState[ProcessingMessage, PublishingMessage] = dataclasses.field(
default_factory=StageState
)
waiting_publish: StageState[
PublishingMessage, ExecutableMessage
] = dataclasses.field(default_factory=StageState)
waiting_publish: StageState[PublishingMessage, ExecutableMessage] = (
dataclasses.field(default_factory=StageState)
)


class UsageMetrics(MinimalService):
Expand Down
4 changes: 3 additions & 1 deletion src/saturn_engine/worker/topics/rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,9 @@ async def channel(self) -> aio_pika.abc.AbstractChannel:
connection = await self.services.s.rabbitmq.connections.get(
self.options.connection_name
)
channel: aio_pika.abc.AbstractRobustChannel = await self.exit_stack.enter_async_context(
channel: (
aio_pika.abc.AbstractRobustChannel
) = await self.exit_stack.enter_async_context(
connection.channel(on_return_raises=True) # type: ignore[arg-type]
)

Expand Down
14 changes: 8 additions & 6 deletions src/saturn_engine/worker/work_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,12 @@ def build_resource_data(self, item: ResourceItem) -> ResourceData:
type=item.type,
data=item.data,
default_delay=item.default_delay,
rate_limit=ResourceRateLimit(
rate_limits=item.rate_limit.rate_limits,
strategy=item.rate_limit.strategy,
)
if item.rate_limit
else None,
rate_limit=(
ResourceRateLimit(
rate_limits=item.rate_limit.rate_limits,
strategy=item.rate_limit.strategy,
)
if item.rate_limit
else None
),
)
6 changes: 3 additions & 3 deletions src/saturn_engine/worker_manager/config/declarative.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,9 @@ def compile_static_definitions(
uncompied_resources_provider.data, ResourcesProvider
)
resources_provider_item = resources_provider.to_core_object()
definitions.resources_providers[
resources_provider_item.name
] = resources_provider_item
definitions.resources_providers[resources_provider_item.name] = (
resources_provider_item
)
definitions.resources_by_type[resources_provider_item.resource_type].append(
resources_provider_item
)
Expand Down
14 changes: 8 additions & 6 deletions src/saturn_engine/worker_manager/config/declarative_resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@ def to_core_object(self) -> api.ResourceItem:
type=self.spec.type,
data=self.spec.data,
default_delay=self.spec.default_delay,
rate_limit=api.ResourceRateLimitItem(
rate_limits=self.spec.rate_limit.rate_limits,
strategy=self.spec.rate_limit.strategy,
)
if self.spec.rate_limit
else None,
rate_limit=(
api.ResourceRateLimitItem(
rate_limits=self.spec.rate_limit.rate_limits,
strategy=self.spec.rate_limit.strategy,
)
if self.spec.rate_limit
else None
),
)


Expand Down
6 changes: 3 additions & 3 deletions src/saturn_engine/worker_manager/config/static_definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ class StaticDefinitions:
default_factory=dict
)
resources: dict[str, ResourceItem] = dataclasses.field(default_factory=dict)
resources_by_type: dict[
str, list[t.Union[ResourceItem, ResourcesProviderItem]]
] = dataclasses.field(default_factory=lambda: defaultdict(list))
resources_by_type: dict[str, list[t.Union[ResourceItem, ResourcesProviderItem]]] = (
dataclasses.field(default_factory=lambda: defaultdict(list))
)
3 changes: 1 addition & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,7 @@ def event_loop(
loop.close()


def pipeline() -> None:
...
def pipeline() -> None: ...


@pytest.fixture
Expand Down
3 changes: 1 addition & 2 deletions tests/worker/test_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ async def open(self) -> None:
await self.add(ProvidedResource(name="fake-resource", data={"foo": "bar"}))


def pipeline(resource: FakeResource) -> None:
...
def pipeline(resource: FakeResource) -> None: ...


@pytest.fixture
Expand Down
3 changes: 1 addition & 2 deletions tests/worker/test_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,7 @@ async def test_base_executor(
assert executor.processed == 10


def pipeline(resource: FakeResource) -> None:
...
def pipeline(resource: FakeResource) -> None: ...


@pytest.mark.asyncio
Expand Down
24 changes: 8 additions & 16 deletions tests/worker/test_pipeline_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,45 +16,37 @@ class ResourceB(Resource):
pass


def simple_pipeline() -> None:
...
def simple_pipeline() -> None: ...


def pipeline_with_resources(a: ResourceA, b: "ResourceB") -> None:
...
def pipeline_with_resources(a: ResourceA, b: "ResourceB") -> None: ...


class Namespace:
def pipeline(self) -> None:
...
def pipeline(self) -> None: ...


def deleted_pipeline() -> None:
...
def deleted_pipeline() -> None: ...


def modified_pipeline() -> None:
...
def modified_pipeline() -> None: ...


def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper() -> None:
...
def wrapper() -> None: ...

return wrapper


@decorator
def wrapped_pipeline() -> None:
...
def wrapped_pipeline() -> None: ...


def test_pipeline_info_name() -> None:
global deleted_pipeline, modified_pipeline

def local_pipeline() -> None:
...
def local_pipeline() -> None: ...

assert PipelineInfo.from_pipeline(simple_pipeline) == PipelineInfo(
name="tests.worker.test_pipeline_info.simple_pipeline", resources={}
Expand Down
3 changes: 1 addition & 2 deletions tests/worker/topics/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@


class RabbitMQTopicMaker(t.Protocol):
async def __call__(self, klass: t.Type[T], **kwargs: t.Any) -> T:
...
async def __call__(self, klass: t.Type[T], **kwargs: t.Any) -> T: ...


async def ensure_clean_queue(
Expand Down
3 changes: 1 addition & 2 deletions tests/worker/topics/test_batching_topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,7 @@ async def test_batching_topic_flush_timeout() -> None:

class NestedTestTopic(Topic):
@dataclasses.dataclass
class Options:
...
class Options: ...

def __init__(self, options: Options, services: Services, **kwargs: object) -> None:
self.entered_context_managers: list[int] = []
Expand Down
Loading