Skip to content

Commit

Permalink
Make graph model independent
Browse files Browse the repository at this point in the history
  • Loading branch information
wysockipiotr committed May 16, 2024
1 parent 444b975 commit 56eab51
Show file tree
Hide file tree
Showing 23 changed files with 413 additions and 92 deletions.
23 changes: 0 additions & 23 deletions smartschedule/parallelization/stage.py

This file was deleted.

27 changes: 0 additions & 27 deletions smartschedule/parallelization/stage_parallelization.py

This file was deleted.

4 changes: 4 additions & 0 deletions smartschedule/planning/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
__all__ = ["ResourceName", "Stage", "stage_parallelization_of"]

from .parallelization.stage import ResourceName, Stage
from .parallelization.stage_parallelization import stage_parallelization_of
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from attrs import frozen

from smartschedule.parallelization.stage import Stage
from smartschedule.planning.parallelization.stage import Stage


@frozen
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
from typing import Self
from attrs import field, frozen

from attrs import frozen, field

from smartschedule.parallelization.parallel_stages import ParallelStages
from smartschedule.planning.parallelization.parallel_stages import ParallelStages


@frozen
Expand All @@ -12,6 +10,6 @@ class ParallelStagesList:
def __str__(self) -> str:
return " | ".join(str(parallel_stages) for parallel_stages in self.all)

def add(self, new_parallel_stages: ParallelStages) -> Self:
def add(self, new_parallel_stages: ParallelStages) -> "ParallelStagesList":
result = [*self.all, new_parallel_stages]
return ParallelStagesList(all=result)
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from smartschedule.planning.parallelization.parallel_stages import ParallelStages
from smartschedule.planning.parallelization.parallel_stages_list import (
ParallelStagesList,
)
from smartschedule.planning.parallelization.stage import Stage
from smartschedule.sorter import SortedNodes


def sorted_nodes_to_parallelized_stages(
sorted_nodes: SortedNodes[Stage],
) -> ParallelStagesList:
return ParallelStagesList(
[
ParallelStages({node.content for node in nodes if node.content is not None})
for nodes in sorted_nodes
]
)
23 changes: 23 additions & 0 deletions smartschedule/planning/parallelization/stage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from datetime import timedelta
from typing import Self

from attrs import evolve, field, frozen


@frozen
class ResourceName:
name: str


@frozen
class Stage:
name: str
dependencies: set["Stage"] = field(factory=set, eq=False)
resources: set[ResourceName] = field(factory=set, eq=False)
duration: timedelta = field(default=timedelta.min, eq=False)

def depends_on(self, stage: Self) -> Self:
return evolve(self, dependencies={*self.dependencies, stage})

def with_chosen_resource_capabilities(self, *resources: ResourceName) -> Self:
return evolve(self, resources={*self.resources, *resources})
15 changes: 15 additions & 0 deletions smartschedule/planning/parallelization/stage_parallelization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from smartschedule.planning.parallelization.parallel_stages_list import (
ParallelStagesList,
)
from smartschedule.planning.parallelization.sorted_nodes_to_parallelized_stages import (
sorted_nodes_to_parallelized_stages,
)
from smartschedule.planning.parallelization.stage import Stage
from smartschedule.planning.parallelization.stages_to_nodes import stages_to_nodes
from smartschedule.sorter import graph_topological_sort


def stage_parallelization_of(stages: set[Stage]) -> ParallelStagesList:
nodes = stages_to_nodes(stages)
sorted_nodes = graph_topological_sort(nodes)
return sorted_nodes_to_parallelized_stages(sorted_nodes)
44 changes: 44 additions & 0 deletions smartschedule/planning/parallelization/stages_to_nodes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from collections.abc import Collection
from itertools import islice

from smartschedule.planning.parallelization.stage import Stage
from smartschedule.sorter import Node, Nodes

type StageName = str
type NodesByName = dict[StageName, Node[Stage]]


def stages_to_nodes(stages: Collection[Stage]) -> Nodes[Stage]:
nodes = {stage.name: Node(name=stage.name, content=stage) for stage in stages}

for n, stage in enumerate(stages):
_explicit_dependencies(stage, nodes)
_shared_resources(stage, list(islice(stages, n + 1, None)), nodes)

return Nodes(nodes.values())


def _explicit_dependencies(stage: Stage, nodes: NodesByName) -> NodesByName:
for other in stage.dependencies:
nodes[stage.name] = nodes[stage.name].depends_on(nodes[other.name])

return nodes


def _shared_resources(
stage: Stage, with_stages: Collection[Stage], nodes: NodesByName
) -> NodesByName:
for other in with_stages:
if stage.name == other.name:
continue

# No shared resources.
if stage.resources.isdisjoint(other.resources):
continue

if len(other.resources) > len(stage.resources):
nodes[stage.name] = nodes[stage.name].depends_on(nodes[other.name])
else:
nodes[other.name] = nodes[other.name].depends_on(nodes[stage.name])

return nodes
6 changes: 6 additions & 0 deletions smartschedule/sorter/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
__all__ = ["graph_topological_sort", "Node", "Nodes", "SortedNodes"]

from .graph_topological_sort import graph_topological_sort
from .node import Node
from .nodes import Nodes
from .sorted_nodes import SortedNodes
45 changes: 45 additions & 0 deletions smartschedule/sorter/feedback_arc_set_on_graph.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from collections import defaultdict

from attrs import frozen

from smartschedule.sorter.node import Node

type AdjacencyList = defaultdict[int, list[int]]


@frozen
class Edge:
source: int
target: int

def __str__(self) -> str:
return f"({self.source} -> {self.target})"


def calculate_feedback_arc_set_on_graph(initial_nodes: list[Node[str]]) -> list[Edge]:
adjacency_list = _create_adjacency_list(initial_nodes)
feedback_edges: list[Edge] = []
visited: list[int] = [0] * (len(adjacency_list) + 1)

for i in adjacency_list:
neighbours = adjacency_list[i]
visited[i] = 1
for neighbour in neighbours:
if visited[neighbour] == 1:
feedback_edges.append(Edge(i, neighbour))
else:
visited[neighbour] = 1

return feedback_edges


def _create_adjacency_list(initial_nodes: list[Node[str]]) -> AdjacencyList:
adjacency_list: AdjacencyList = defaultdict(list)

for i, node in enumerate(initial_nodes):
dependencies = [
initial_nodes.index(dependency) + 1 for dependency in node.dependencies
]
adjacency_list[i + 1] = dependencies

return adjacency_list
22 changes: 22 additions & 0 deletions smartschedule/sorter/graph_topological_sort.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from graphlib import CycleError, TopologicalSorter

from smartschedule.sorter.nodes import Nodes
from smartschedule.sorter.sorted_nodes import SortedNodes


def graph_topological_sort[T](nodes: Nodes[T]) -> SortedNodes[T]:
sorted_nodes = SortedNodes.empty()

sorter = TopologicalSorter(graph={node: node.dependencies for node in nodes})

try:
sorter.prepare()
except CycleError:
return sorted_nodes

while sorter.is_active():
group = sorter.get_ready()
sorted_nodes = sorted_nodes.add(Nodes(group))
sorter.done(*group)

return sorted_nodes
17 changes: 17 additions & 0 deletions smartschedule/sorter/node.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from itertools import chain
from typing import Self

from attrs import field, frozen


@frozen
class Node[T]:
name: str
dependencies: set["Node[T]"] = field(factory=set, eq=False)
content: T | None = field(eq=False, default=None)

def depends_on(self, node: Self) -> "Node[T]":
return Node(self.name, set(chain(self.dependencies, {node})), self.content)

def __str__(self) -> str:
return self.name
17 changes: 17 additions & 0 deletions smartschedule/sorter/nodes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from typing import Collection

from smartschedule.sorter.node import Node


class Nodes[T]:
def __init__(self, nodes: Collection[Node[T]]) -> None:
self._nodes = nodes if isinstance(nodes, set) else set(nodes)

def __iter__(self):
return iter(self._nodes)

def __contains__(self, item):
return item in self._nodes

def __len__(self):
return len(self._nodes)
26 changes: 26 additions & 0 deletions smartschedule/sorter/sorted_nodes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from typing import Self

from attrs import evolve, field, frozen

from smartschedule.sorter.nodes import Nodes


@frozen
class SortedNodes[T]:
_groups: list[Nodes[T]] = field(converter=lambda x: list(x))

def add(self, nodes: Nodes[T], /) -> Self:
return evolve(self, groups=[*self._groups, nodes])

def __iter__(self):
return iter(self._groups)

def __len__(self) -> int:
return len(self._groups)

def __getitem__(self, item):
return self._groups[item]

@classmethod
def empty(cls) -> Self:
return cls([])
Empty file added tests/planning/__init__.py
Empty file.
Empty file.
54 changes: 54 additions & 0 deletions tests/planning/parallelization/test_parallelization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from smartschedule.planning import ResourceName, Stage, stage_parallelization_of

LEON = ResourceName("Leon")
SLAWEK = ResourceName("Sławek")
ERYK = ResourceName("Eric")
KUBA = ResourceName("Kuba")


def test_everything_can_be_done_in_parallel_when_there_are_no_deps() -> None:
stage_1 = Stage("stage_1")
stage_2 = Stage("stage_2")

sorted_stages = stage_parallelization_of({stage_1, stage_2})

assert len(sorted_stages.all) == 1


def test_simple_deps() -> None:
stage_1 = Stage("stage_1")
stage_2 = Stage("stage_2")
stage_3 = Stage("stage_3")
stage_4 = Stage("stage_4")
stage_2 = stage_2.depends_on(stage_1)
stage_3 = stage_3.depends_on(stage_1)
stage_4 = stage_4.depends_on(stage_2)

sorted_stages = stage_parallelization_of({stage_1, stage_2, stage_3, stage_4})

assert str(sorted_stages) == "stage_1 | stage_2, stage_3 | stage_4"


def test_cannot_be_done_when_there_is_a_cycle() -> None:
stage_1 = Stage("stage_1")
stage_2 = Stage("stage_2")
stage_2 = stage_2.depends_on(stage_1)
stage_1 = stage_1.depends_on(stage_2) # making it cyclic

sorted_stages = stage_parallelization_of({stage_1, stage_2})

assert len(sorted_stages.all) == 0


def test_takes_into_account_shared_resources() -> None:
stage_1 = Stage("stage_1").with_chosen_resource_capabilities(LEON)
stage_2 = Stage("stage_2").with_chosen_resource_capabilities(ERYK, LEON)
stage_3 = Stage("stage_3").with_chosen_resource_capabilities(SLAWEK)
stage_4 = Stage("stage_4").with_chosen_resource_capabilities(SLAWEK, KUBA)

parallel_stages = stage_parallelization_of({stage_1, stage_2, stage_3, stage_4})

assert str(parallel_stages) in [
"stage_1, stage_3 | stage_2, stage_4",
"stage_2, stage_4 | stage_1, stage_3",
]
Empty file added tests/sorter/__init__.py
Empty file.
Loading

0 comments on commit 56eab51

Please sign in to comment.