Skip to content

Commit

Permalink
Add concurrency to resources
Browse files Browse the repository at this point in the history
  • Loading branch information
KevGoDev committed Jul 20, 2022
1 parent 9c4dd70 commit 80e52aa
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 3 deletions.
12 changes: 9 additions & 3 deletions src/saturn_engine/worker_manager/config/declarative.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,15 @@ def compile_static_definitions(
dict(),
).values():
resource: Resource = fromdict(uncompiled_resource.data, Resource)
resource_item = resource.to_core_object()
definitions.resources[resource.metadata.name] = resource_item
definitions.resources_by_type[resource_item.type].append(resource_item)
# If we have concurrency for our resource then
# we append an index at the end in order
# to differentiate the instances of said resource
for i in range(1, resource.spec.concurrency + 1):
resource_item = resource.to_core_object()
if resource.spec.concurrency > 1:
resource_item.name = f"{resource.metadata.name}-{i}"
definitions.resources[resource_item.name] = resource_item
definitions.resources_by_type[resource_item.type].append(resource_item)

for object_kind in objects_by_kind.keys():
raise Exception(f"Unsupported kind {object_kind}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ class ResourceSpec:
type: str
data: dict[str, Any]
default_delay: float = 0
concurrency: int = 1


@dataclasses.dataclass
Expand Down
19 changes: 19 additions & 0 deletions tests/worker_manager/config/test_declarative.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,3 +421,22 @@ def test_load_executor() -> None:

assert len(static_definitions.executors) == 1
assert static_definitions.executors["test-executor"].options["pool_size"] == 2


def test_resource_concurrency() -> None:
concurrency_definition_str = """
apiVersion: saturn.flared.io/v1alpha1
kind: SaturnResource
metadata:
name: test-resource
spec:
type: TestApiKey
data:
key: "qwe"
default_delay: 10
concurrency: 5
"""
static_definitions = load_definitions_from_str(concurrency_definition_str)
assert len(static_definitions.resources) == 5
for i in range(1, 6):
assert f"test-resource-{i}" in static_definitions.resources

0 comments on commit 80e52aa

Please sign in to comment.