diff --git a/CHANGELOG.md b/CHANGELOG.md index 8da75d3b1..b793bedeb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Add namespace support to `DataCube.process`, `PGNode`, `ProcessGraphVisitor` (minor API breaking change) and related. Allows building process graphs that with processes from non-"backend" namespaces ([#182](https://github.com/Open-EO/openeo-python-client/issues/182)) + ### Changed ### Removed diff --git a/openeo/_version.py b/openeo/_version.py index 0d7fdf989..2198e590a 100644 --- a/openeo/_version.py +++ b/openeo/_version.py @@ -1 +1 @@ -__version__ = '0.4.11a1' +__version__ = '0.5.0a1' diff --git a/openeo/internal/graph_building.py b/openeo/internal/graph_building.py index 15d699353..eb83edaa9 100644 --- a/openeo/internal/graph_building.py +++ b/openeo/internal/graph_building.py @@ -8,7 +8,7 @@ from openeo.api.process import Parameter from openeo.internal.process_graph_visitor import ProcessGraphVisitor -from openeo.util import legacy_alias +from openeo.util import legacy_alias, dict_no_none class PGNode: @@ -24,7 +24,7 @@ class PGNode: """ - def __init__(self, process_id: str, arguments: dict = None, **kwargs): + def __init__(self, process_id: str, arguments: dict = None, namespace: Union[str, None] = None, **kwargs): self._process_id = process_id # Merge arguments dict and kwargs arguments = dict(**(arguments or {}), **kwargs) @@ -34,6 +34,7 @@ def __init__(self, process_id: str, arguments: dict = None, **kwargs): arguments[arg] = {"from_node": value} # TODO: use a frozendict of some sort to ensure immutability? self._arguments = arguments + self._namespace = namespace def __repr__(self): return "<{c} {p!r} at 0x{m:x}>".format(c=self.__class__.__name__, p=self.process_id, m=id(self)) @@ -46,6 +47,10 @@ def process_id(self) -> str: def arguments(self) -> dict: return self._arguments + @property + def namespace(self) -> Union[str, None]: + return self._namespace + def to_dict(self) -> dict: """ Convert process graph to a nested dictionary structure. @@ -55,7 +60,7 @@ def to_dict(self) -> dict: def _deep_copy(x): """PGNode aware deep copy helper""" if isinstance(x, PGNode): - return {"process_id": x.process_id, "arguments": _deep_copy(x.arguments)} + return dict_no_none(process_id=x.process_id, arguments=_deep_copy(x.arguments), namespace=x.namespace) if isinstance(x, Parameter): return {"from_parameter": x.name} elif isinstance(x, dict): @@ -201,20 +206,21 @@ def accept_node(self, node: PGNode): # Process reused nodes only first time and remember node id. node_id = id(node) if node_id not in self._node_cache: - super()._accept_process(process_id=node.process_id, arguments=node.arguments) + super()._accept_process(process_id=node.process_id, arguments=node.arguments, namespace=node.namespace) self._node_cache[node_id] = self._last_node_id else: self._last_node_id = self._node_cache[node_id] - def enterProcess(self, process_id: str, arguments: dict): + def enterProcess(self, process_id: str, arguments: dict, namespace: Union[str, None]): self._argument_stack.append({}) - def leaveProcess(self, process_id: str, arguments: dict): + def leaveProcess(self, process_id: str, arguments: dict, namespace: Union[str, None]): node_id = self._node_id_generator.generate(process_id) - self._flattened[node_id] = { - "process_id": process_id, - "arguments": self._argument_stack.pop() - } + self._flattened[node_id] = dict_no_none( + process_id=process_id, + arguments=self._argument_stack.pop(), + namespace=namespace, + ) self._last_node_id = node_id def _store_argument(self, argument_id: str, value): diff --git a/openeo/internal/process_graph_visitor.py b/openeo/internal/process_graph_visitor.py index e96575c98..9e529489a 100644 --- a/openeo/internal/process_graph_visitor.py +++ b/openeo/internal/process_graph_visitor.py @@ -1,4 +1,5 @@ from abc import ABC +from typing import Union from deprecated import deprecated @@ -75,11 +76,12 @@ def accept(self, node: dict): def accept_node(self, node: dict): pid = node['process_id'] arguments = node.get('arguments', {}) - self._accept_process(process_id=pid, arguments=arguments) + namespace = node.get("namespace", None) + self._accept_process(process_id=pid, arguments=arguments, namespace=namespace) - def _accept_process(self, process_id: str, arguments: dict): + def _accept_process(self, process_id: str, arguments: dict, namespace: Union[str, None]): self.process_stack.append(process_id) - self.enterProcess(process_id=process_id, arguments=arguments) + self.enterProcess(process_id=process_id, arguments=arguments, namespace=namespace) for arg_id, value in sorted(arguments.items()): if isinstance(value, list): self.enterArray(argument_id=arg_id) @@ -91,7 +93,7 @@ def _accept_process(self, process_id: str, arguments: dict): self.leaveArgument(argument_id=arg_id, value=value) else: self.constantArgument(argument_id=arg_id, value=value) - self.leaveProcess(process_id=process_id, arguments=arguments) + self.leaveProcess(process_id=process_id, arguments=arguments, namespace=namespace) assert self.process_stack.pop() == process_id def _accept_argument_list(self, elements: list): @@ -121,10 +123,10 @@ def _accept_dict(self, value: dict): def from_parameter(self,parameter_id:str): pass - def enterProcess(self, process_id: str, arguments: dict): + def enterProcess(self, process_id: str, arguments: dict, namespace: Union[str, None]): pass - def leaveProcess(self, process_id: str, arguments: dict): + def leaveProcess(self, process_id: str, arguments: dict, namespace: Union[str, None]): pass def enterArgument(self, argument_id: str, value): diff --git a/openeo/rest/datacube.py b/openeo/rest/datacube.py index 8496a6a99..3e5d7dd13 100644 --- a/openeo/rest/datacube.py +++ b/openeo/rest/datacube.py @@ -93,12 +93,21 @@ def _api_version(self): def connection(self) -> 'openeo.Connection': return self._connection - def process(self, process_id: str, arguments: dict = None, metadata: CollectionMetadata = None, **kwargs) -> 'DataCube': + def process( + self, + process_id: str, + arguments: dict = None, + metadata: Optional[CollectionMetadata] = None, + namespace: Optional[str] = None, + **kwargs + ) -> 'DataCube': """ Generic helper to create a new DataCube by applying a process. :param process_id: process id of the process. :param arguments: argument dictionary for the process. + :param metadata: optional: metadata to override original cube metadata (e.g. when reducing dimensions) + :param namespace: optional: process namespace :return: new DataCube instance """ arguments = {**(arguments or {}), **kwargs} @@ -110,16 +119,17 @@ def process(self, process_id: str, arguments: dict = None, metadata: CollectionM return self.process_with_node(PGNode( process_id=process_id, arguments=arguments, + namespace=namespace, ), metadata=metadata) graph_add_node = legacy_alias(process, "graph_add_node") - def process_with_node(self, pg: PGNode, metadata: CollectionMetadata = None) -> 'DataCube': + def process_with_node(self, pg: PGNode, metadata: Optional[CollectionMetadata] = None) -> 'DataCube': """ Generic helper to create a new DataCube by applying a process (given as process graph node) :param pg: process graph node (containing process id and arguments) - :param metadata: (optional) metadata to override original cube metadata (e.g. when reducing dimensions) + :param metadata: optional: metadata to override original cube metadata (e.g. when reducing dimensions) :return: new DataCube instance """ # TODO: deep copy `self.metadata` instead of using same instance? diff --git a/tests/data/1.0.0/process_foo_namespaced.json b/tests/data/1.0.0/process_foo_namespaced.json new file mode 100644 index 000000000..0a4227609 --- /dev/null +++ b/tests/data/1.0.0/process_foo_namespaced.json @@ -0,0 +1,21 @@ +{ + "loadcollection1": { + "process_id": "load_collection", + "arguments": { + "id": "S2", + "spatial_extent": null, + "temporal_extent": null + } + }, + "foo1": { + "process_id": "foo", + "namespace": "bar", + "arguments": { + "data": { + "from_node": "loadcollection1" + }, + "bar": 123 + }, + "result": true + } +} diff --git a/tests/internal/test_graphbuilding.py b/tests/internal/test_graphbuilding.py index 4bbdc8644..31027ec07 100644 --- a/tests/internal/test_graphbuilding.py +++ b/tests/internal/test_graphbuilding.py @@ -19,6 +19,11 @@ def test_pgnode_arguments(): PGNode("foo", arguments={"bar": 123}, bar=456) +def test_pgnode_namespace(): + assert PGNode("foo").namespace is None + assert PGNode("foo", namespace="bar").namespace == "bar" + + def test_pgnode_to_dict(): pg = PGNode(process_id="load_collection", arguments={"collection_id": "S2"}) assert pg.to_dict() == { @@ -27,6 +32,15 @@ def test_pgnode_to_dict(): } +def test_pgnode_to_dict_namespace(): + pg = PGNode(process_id="load_collection", arguments={"collection_id": "S2"}, namespace="bar") + assert pg.to_dict() == { + "process_id": "load_collection", + "namespace": "bar", + "arguments": {"collection_id": "S2"} + } + + def test_pgnode_to_dict_nested(): pg = PGNode( process_id="filter_bands", @@ -88,6 +102,11 @@ def test_build_and_flatten_argument_dict(): assert node.flat_graph() == {"foo1": {"process_id": "foo", "arguments": {"bar": "red", "x": 3}, "result": True}} +def test_build_and_flatten_namespace(): + node = PGNode("foo", namespace="bar") + assert node.flat_graph() == {"foo1": {"process_id": "foo", "namespace": "bar", "arguments": {}, "result": True}} + + def test_pgnode_to_dict_subprocess_graphs(): load_collection = PGNode("load_collection", collection_id="S2") band2 = PGNode("array_element", data={"from_argument": "data"}, index=2) diff --git a/tests/internal/test_process_graph_visitor.py b/tests/internal/test_process_graph_visitor.py index 52fd8d981..43b3bd6e1 100644 --- a/tests/internal/test_process_graph_visitor.py +++ b/tests/internal/test_process_graph_visitor.py @@ -1,4 +1,3 @@ -from unittest import TestCase from unittest.mock import MagicMock, call, ANY import pytest @@ -16,7 +15,26 @@ def test_visit_node(): visitor.enterArgument = MagicMock() visitor.accept_node(node) - assert visitor.enterProcess.call_args_list == [call(process_id="cos", arguments={"x": {"from_argument": "data"}})] + assert visitor.enterProcess.call_args_list == [ + call(process_id="cos", arguments={"x": {"from_argument": "data"}}, namespace=None) + ] + assert visitor.enterArgument.call_args_list == [call(argument_id="x", value={"from_argument": "data"})] + + +def test_visit_node_namespaced(): + node = { + "process_id": "cos", + "namespace": "math", + "arguments": {"x": {"from_argument": "data"}} + } + visitor = ProcessGraphVisitor() + visitor.enterProcess = MagicMock() + visitor.enterArgument = MagicMock() + visitor.accept_node(node) + + assert visitor.enterProcess.call_args_list == [ + call(process_id="cos", arguments={"x": {"from_argument": "data"}}, namespace="math") + ] assert visitor.enterArgument.call_args_list == [call(argument_id="x", value={"from_argument": "data"})] @@ -51,8 +69,8 @@ def test_visit_nodes(): visitor.accept_process_graph(graph) assert visitor.leaveProcess.call_args_list == [ - call(process_id="abs", arguments=ANY), - call(process_id="cos", arguments=ANY), + call(process_id="abs", arguments=ANY, namespace=None), + call(process_id="cos", arguments=ANY, namespace=None), ] assert visitor.enterArgument.call_args_list == [ call(argument_id="data", value=ANY), @@ -93,8 +111,8 @@ def test_visit_nodes_array(): visitor.accept_process_graph(graph) assert visitor.leaveProcess.call_args_list == [ - call(process_id='abs', arguments=ANY), - call(process_id='cos', arguments=ANY) + call(process_id='abs', arguments=ANY, namespace=None), + call(process_id='cos', arguments=ANY, namespace=None) ] assert visitor.enterArgument.call_args_list == [ call(argument_id="data", value=ANY) @@ -130,8 +148,8 @@ def test_visit_array_with_dereferenced_nodes(): visitor.accept_node(dereferenced) assert visitor.leaveProcess.call_args_list == [ - call(process_id='array_element', arguments=ANY), - call(process_id='product', arguments=ANY) + call(process_id='array_element', arguments=ANY, namespace=None), + call(process_id='product', arguments=ANY, namespace=None) ] assert visitor.enterArgument.call_args_list == [ call(argument_id="data", value={'from_argument': 'data'}) diff --git a/tests/rest/datacube/test_datacube100.py b/tests/rest/datacube/test_datacube100.py index e7fbba79f..4e675e051 100644 --- a/tests/rest/datacube/test_datacube100.py +++ b/tests/rest/datacube/test_datacube100.py @@ -539,12 +539,18 @@ def test_custom_process_kwargs_datacube_pg(con100: Connection): assert res.graph == expected -def test_custom_process_kwargs_datacube_chained(con100: Connection): +def test_custom_process_kwargs_this(con100: Connection): res = con100.load_collection("S2").process(process_id="foo", data=THIS, bar=123) expected = load_json_resource('data/1.0.0/process_foo.json') assert res.graph == expected +def test_custom_process_kwargs_namespaced(con100: Connection): + res = con100.load_collection("S2").process(process_id="foo", data=THIS, bar=123, namespace="bar") + expected = load_json_resource('data/1.0.0/process_foo_namespaced.json') + assert res.graph == expected + + def test_custom_process_arguments_datacube(con100: Connection): img = con100.load_collection("S2") res = img.process(process_id="foo", arguments={"data": img, "bar": 123}) @@ -559,12 +565,18 @@ def test_custom_process_arguments_datacube_pg(con100: Connection): assert res.graph == expected -def test_custom_process_arguments_datacube_chained(con100: Connection): +def test_custom_process_arguments_this(con100: Connection): res = con100.load_collection("S2").process(process_id="foo", arguments={"data": THIS, "bar": 123}) expected = load_json_resource('data/1.0.0/process_foo.json') assert res.graph == expected +def test_custom_process_arguments_namespacd(con100: Connection): + res = con100.load_collection("S2").process(process_id="foo", arguments={"data": THIS, "bar": 123}, namespace="bar") + expected = load_json_resource('data/1.0.0/process_foo_namespaced.json') + assert res.graph == expected + + def test_save_user_defined_process(con100, requests_mock): requests_mock.get(API_URL + "/processes", json={"processes": [{"id": "add"}]})