Skip to content

Commit

Permalink
Issue #182 add namespace support to DataCube.process and related
Browse files Browse the repository at this point in the history
bumps version to 0.5.0a1  because the `ProcessGraphVisitor` changed a bit
  • Loading branch information
soxofaan committed Mar 5, 2021
1 parent 05ab328 commit 4e58c05
Show file tree
Hide file tree
Showing 9 changed files with 120 additions and 30 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- Add namespace support to `DataCube.process`, `PGNode`, `ProcessGraphVisitor` (minor API breaking change) and related. Allows building process graphs with processes from non-"backend" namespaces ([#182](https://github.com/Open-EO/openeo-python-client/issues/182))

### Changed

### Removed
Expand Down
2 changes: 1 addition & 1 deletion openeo/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.4.11a1'
__version__ = '0.5.0a1'
26 changes: 16 additions & 10 deletions openeo/internal/graph_building.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from openeo.api.process import Parameter
from openeo.internal.process_graph_visitor import ProcessGraphVisitor
from openeo.util import legacy_alias
from openeo.util import legacy_alias, dict_no_none


class PGNode:
Expand All @@ -24,7 +24,7 @@ class PGNode:
"""

def __init__(self, process_id: str, arguments: dict = None, **kwargs):
def __init__(self, process_id: str, arguments: dict = None, namespace: Union[str, None] = None, **kwargs):
self._process_id = process_id
# Merge arguments dict and kwargs
arguments = dict(**(arguments or {}), **kwargs)
Expand All @@ -34,6 +34,7 @@ def __init__(self, process_id: str, arguments: dict = None, **kwargs):
arguments[arg] = {"from_node": value}
# TODO: use a frozendict of some sort to ensure immutability?
self._arguments = arguments
self._namespace = namespace

def __repr__(self):
return "<{c} {p!r} at 0x{m:x}>".format(c=self.__class__.__name__, p=self.process_id, m=id(self))
Expand All @@ -46,6 +47,10 @@ def process_id(self) -> str:
def arguments(self) -> dict:
return self._arguments

@property
def namespace(self) -> Union[str, None]:
return self._namespace

def to_dict(self) -> dict:
"""
Convert process graph to a nested dictionary structure.
Expand All @@ -55,7 +60,7 @@ def to_dict(self) -> dict:
def _deep_copy(x):
"""PGNode aware deep copy helper"""
if isinstance(x, PGNode):
return {"process_id": x.process_id, "arguments": _deep_copy(x.arguments)}
return dict_no_none(process_id=x.process_id, arguments=_deep_copy(x.arguments), namespace=x.namespace)
if isinstance(x, Parameter):
return {"from_parameter": x.name}
elif isinstance(x, dict):
Expand Down Expand Up @@ -201,20 +206,21 @@ def accept_node(self, node: PGNode):
# Process reused nodes only first time and remember node id.
node_id = id(node)
if node_id not in self._node_cache:
super()._accept_process(process_id=node.process_id, arguments=node.arguments)
super()._accept_process(process_id=node.process_id, arguments=node.arguments, namespace=node.namespace)
self._node_cache[node_id] = self._last_node_id
else:
self._last_node_id = self._node_cache[node_id]

def enterProcess(self, process_id: str, arguments: dict):
def enterProcess(self, process_id: str, arguments: dict, namespace: Union[str, None]):
self._argument_stack.append({})

def leaveProcess(self, process_id: str, arguments: dict):
def leaveProcess(self, process_id: str, arguments: dict, namespace: Union[str, None]):
node_id = self._node_id_generator.generate(process_id)
self._flattened[node_id] = {
"process_id": process_id,
"arguments": self._argument_stack.pop()
}
self._flattened[node_id] = dict_no_none(
process_id=process_id,
arguments=self._argument_stack.pop(),
namespace=namespace,
)
self._last_node_id = node_id

def _store_argument(self, argument_id: str, value):
Expand Down
14 changes: 8 additions & 6 deletions openeo/internal/process_graph_visitor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from abc import ABC
from typing import Union

from deprecated import deprecated

Expand Down Expand Up @@ -75,11 +76,12 @@ def accept(self, node: dict):
def accept_node(self, node: dict):
pid = node['process_id']
arguments = node.get('arguments', {})
self._accept_process(process_id=pid, arguments=arguments)
namespace = node.get("namespace", None)
self._accept_process(process_id=pid, arguments=arguments, namespace=namespace)

def _accept_process(self, process_id: str, arguments: dict):
def _accept_process(self, process_id: str, arguments: dict, namespace: Union[str, None]):
self.process_stack.append(process_id)
self.enterProcess(process_id=process_id, arguments=arguments)
self.enterProcess(process_id=process_id, arguments=arguments, namespace=namespace)
for arg_id, value in sorted(arguments.items()):
if isinstance(value, list):
self.enterArray(argument_id=arg_id)
Expand All @@ -91,7 +93,7 @@ def _accept_process(self, process_id: str, arguments: dict):
self.leaveArgument(argument_id=arg_id, value=value)
else:
self.constantArgument(argument_id=arg_id, value=value)
self.leaveProcess(process_id=process_id, arguments=arguments)
self.leaveProcess(process_id=process_id, arguments=arguments, namespace=namespace)
assert self.process_stack.pop() == process_id

def _accept_argument_list(self, elements: list):
Expand Down Expand Up @@ -121,10 +123,10 @@ def _accept_dict(self, value: dict):
def from_parameter(self,parameter_id:str):
pass

def enterProcess(self, process_id: str, arguments: dict):
def enterProcess(self, process_id: str, arguments: dict, namespace: Union[str, None]):
pass

def leaveProcess(self, process_id: str, arguments: dict):
def leaveProcess(self, process_id: str, arguments: dict, namespace: Union[str, None]):
pass

def enterArgument(self, argument_id: str, value):
Expand Down
16 changes: 13 additions & 3 deletions openeo/rest/datacube.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,21 @@ def _api_version(self):
def connection(self) -> 'openeo.Connection':
return self._connection

def process(self, process_id: str, arguments: dict = None, metadata: CollectionMetadata = None, **kwargs) -> 'DataCube':
def process(
self,
process_id: str,
arguments: dict = None,
metadata: Optional[CollectionMetadata] = None,
namespace: Optional[str] = None,
**kwargs
) -> 'DataCube':
"""
Generic helper to create a new DataCube by applying a process.
:param process_id: process id of the process.
:param arguments: argument dictionary for the process.
:param metadata: optional: metadata to override original cube metadata (e.g. when reducing dimensions)
:param namespace: optional: process namespace
:return: new DataCube instance
"""
arguments = {**(arguments or {}), **kwargs}
Expand All @@ -110,16 +119,17 @@ def process(self, process_id: str, arguments: dict = None, metadata: CollectionM
return self.process_with_node(PGNode(
process_id=process_id,
arguments=arguments,
namespace=namespace,
), metadata=metadata)

graph_add_node = legacy_alias(process, "graph_add_node")

def process_with_node(self, pg: PGNode, metadata: CollectionMetadata = None) -> 'DataCube':
def process_with_node(self, pg: PGNode, metadata: Optional[CollectionMetadata] = None) -> 'DataCube':
"""
Generic helper to create a new DataCube by applying a process (given as process graph node)
:param pg: process graph node (containing process id and arguments)
:param metadata: (optional) metadata to override original cube metadata (e.g. when reducing dimensions)
:param metadata: optional: metadata to override original cube metadata (e.g. when reducing dimensions)
:return: new DataCube instance
"""
# TODO: deep copy `self.metadata` instead of using same instance?
Expand Down
21 changes: 21 additions & 0 deletions tests/data/1.0.0/process_foo_namespaced.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"loadcollection1": {
"process_id": "load_collection",
"arguments": {
"id": "S2",
"spatial_extent": null,
"temporal_extent": null
}
},
"foo1": {
"process_id": "foo",
"namespace": "bar",
"arguments": {
"data": {
"from_node": "loadcollection1"
},
"bar": 123
},
"result": true
}
}
19 changes: 19 additions & 0 deletions tests/internal/test_graphbuilding.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ def test_pgnode_arguments():
PGNode("foo", arguments={"bar": 123}, bar=456)


def test_pgnode_namespace():
assert PGNode("foo").namespace is None
assert PGNode("foo", namespace="bar").namespace == "bar"


def test_pgnode_to_dict():
pg = PGNode(process_id="load_collection", arguments={"collection_id": "S2"})
assert pg.to_dict() == {
Expand All @@ -27,6 +32,15 @@ def test_pgnode_to_dict():
}


def test_pgnode_to_dict_namespace():
pg = PGNode(process_id="load_collection", arguments={"collection_id": "S2"}, namespace="bar")
assert pg.to_dict() == {
"process_id": "load_collection",
"namespace": "bar",
"arguments": {"collection_id": "S2"}
}


def test_pgnode_to_dict_nested():
pg = PGNode(
process_id="filter_bands",
Expand Down Expand Up @@ -88,6 +102,11 @@ def test_build_and_flatten_argument_dict():
assert node.flat_graph() == {"foo1": {"process_id": "foo", "arguments": {"bar": "red", "x": 3}, "result": True}}


def test_build_and_flatten_namespace():
node = PGNode("foo", namespace="bar")
assert node.flat_graph() == {"foo1": {"process_id": "foo", "namespace": "bar", "arguments": {}, "result": True}}


def test_pgnode_to_dict_subprocess_graphs():
load_collection = PGNode("load_collection", collection_id="S2")
band2 = PGNode("array_element", data={"from_argument": "data"}, index=2)
Expand Down
34 changes: 26 additions & 8 deletions tests/internal/test_process_graph_visitor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from unittest import TestCase
from unittest.mock import MagicMock, call, ANY

import pytest
Expand All @@ -16,7 +15,26 @@ def test_visit_node():
visitor.enterArgument = MagicMock()
visitor.accept_node(node)

assert visitor.enterProcess.call_args_list == [call(process_id="cos", arguments={"x": {"from_argument": "data"}})]
assert visitor.enterProcess.call_args_list == [
call(process_id="cos", arguments={"x": {"from_argument": "data"}}, namespace=None)
]
assert visitor.enterArgument.call_args_list == [call(argument_id="x", value={"from_argument": "data"})]


def test_visit_node_namespaced():
node = {
"process_id": "cos",
"namespace": "math",
"arguments": {"x": {"from_argument": "data"}}
}
visitor = ProcessGraphVisitor()
visitor.enterProcess = MagicMock()
visitor.enterArgument = MagicMock()
visitor.accept_node(node)

assert visitor.enterProcess.call_args_list == [
call(process_id="cos", arguments={"x": {"from_argument": "data"}}, namespace="math")
]
assert visitor.enterArgument.call_args_list == [call(argument_id="x", value={"from_argument": "data"})]


Expand Down Expand Up @@ -51,8 +69,8 @@ def test_visit_nodes():
visitor.accept_process_graph(graph)

assert visitor.leaveProcess.call_args_list == [
call(process_id="abs", arguments=ANY),
call(process_id="cos", arguments=ANY),
call(process_id="abs", arguments=ANY, namespace=None),
call(process_id="cos", arguments=ANY, namespace=None),
]
assert visitor.enterArgument.call_args_list == [
call(argument_id="data", value=ANY),
Expand Down Expand Up @@ -93,8 +111,8 @@ def test_visit_nodes_array():

visitor.accept_process_graph(graph)
assert visitor.leaveProcess.call_args_list == [
call(process_id='abs', arguments=ANY),
call(process_id='cos', arguments=ANY)
call(process_id='abs', arguments=ANY, namespace=None),
call(process_id='cos', arguments=ANY, namespace=None)
]
assert visitor.enterArgument.call_args_list == [
call(argument_id="data", value=ANY)
Expand Down Expand Up @@ -130,8 +148,8 @@ def test_visit_array_with_dereferenced_nodes():

visitor.accept_node(dereferenced)
assert visitor.leaveProcess.call_args_list == [
call(process_id='array_element', arguments=ANY),
call(process_id='product', arguments=ANY)
call(process_id='array_element', arguments=ANY, namespace=None),
call(process_id='product', arguments=ANY, namespace=None)
]
assert visitor.enterArgument.call_args_list == [
call(argument_id="data", value={'from_argument': 'data'})
Expand Down
16 changes: 14 additions & 2 deletions tests/rest/datacube/test_datacube100.py
Original file line number Diff line number Diff line change
Expand Up @@ -539,12 +539,18 @@ def test_custom_process_kwargs_datacube_pg(con100: Connection):
assert res.graph == expected


def test_custom_process_kwargs_datacube_chained(con100: Connection):
def test_custom_process_kwargs_this(con100: Connection):
res = con100.load_collection("S2").process(process_id="foo", data=THIS, bar=123)
expected = load_json_resource('data/1.0.0/process_foo.json')
assert res.graph == expected


def test_custom_process_kwargs_namespaced(con100: Connection):
res = con100.load_collection("S2").process(process_id="foo", data=THIS, bar=123, namespace="bar")
expected = load_json_resource('data/1.0.0/process_foo_namespaced.json')
assert res.graph == expected


def test_custom_process_arguments_datacube(con100: Connection):
img = con100.load_collection("S2")
res = img.process(process_id="foo", arguments={"data": img, "bar": 123})
Expand All @@ -559,12 +565,18 @@ def test_custom_process_arguments_datacube_pg(con100: Connection):
assert res.graph == expected


def test_custom_process_arguments_datacube_chained(con100: Connection):
def test_custom_process_arguments_this(con100: Connection):
res = con100.load_collection("S2").process(process_id="foo", arguments={"data": THIS, "bar": 123})
expected = load_json_resource('data/1.0.0/process_foo.json')
assert res.graph == expected


def test_custom_process_arguments_namespacd(con100: Connection):
res = con100.load_collection("S2").process(process_id="foo", arguments={"data": THIS, "bar": 123}, namespace="bar")
expected = load_json_resource('data/1.0.0/process_foo_namespaced.json')
assert res.graph == expected


def test_save_user_defined_process(con100, requests_mock):
requests_mock.get(API_URL + "/processes", json={"processes": [{"id": "add"}]})

Expand Down

0 comments on commit 4e58c05

Please sign in to comment.