Skip to content

Commit

Permalink
Issue #278: refactor _pg logic in DataCube/VectorCube/MlModel to co…
Browse files Browse the repository at this point in the history
…mmon base class
  • Loading branch information
soxofaan committed Mar 18, 2022
1 parent 38d4ea2 commit 5688088
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 143 deletions.
81 changes: 81 additions & 0 deletions openeo/rest/_datacube.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
"""
.. data:: THIS
Symbolic reference to the current data cube, to be used as argument in
:py:meth:`DataCube.process()` and related calls
"""
import json
import logging
import typing
from typing import Optional

from openeo.internal.graph_building import PGNode, _FromNodeMixin
from openeo.util import legacy_alias

if hasattr(typing, 'TYPE_CHECKING') and typing.TYPE_CHECKING:
# Imports for type checking only (circular import issue at runtime). `hasattr` is Python 3.5 workaround #210
from openeo.rest.connection import Connection

log = logging.getLogger(__name__)

# Sentinel object to refer to "current" cube in chained cube processing expressions.
THIS = object()


class _ProcessGraphAbstraction(_FromNodeMixin):
"""
Base class for client-side abstractions/wrappers
for structures that are represented by a openEO process graph:
raster data cubes, vector cubes, ML models, ...
"""

def __init__(self, pgnode: PGNode, connection: "Connection"):
self._pg = pgnode
self._connection = connection

def __str__(self):
return "{t}({pg})".format(t=self.__class__.__name__, pg=self._pg)

def flat_graph(self) -> dict:
"""
Get the process graph in flat dict representation
.. note:: This method is mainly for internal use, subject to change and not recommended for general usage.
Instead, use :py:meth:`to_json()` to get a JSON representation of the process graph.
"""
# TODO: wrap in {"process_graph":...} by default/optionally?
return self._pg.flat_graph()

flatten = legacy_alias(flat_graph, name="flatten")

def to_json(self, indent=2, separators=None) -> str:
"""
Get JSON representation of (flat dict) process graph.
"""
pg = {"process_graph": self.flat_graph()}
return json.dumps(pg, indent=indent, separators=separators)

@property
def _api_version(self):
return self._connection.capabilities().api_version_check

@property
def connection(self) -> "Connection":
return self._connection

def from_node(self):
return self._pg

def _build_pgnode(self, process_id: str, arguments: dict, namespace: Optional[str], **kwargs) -> PGNode:
"""
Helper to build a PGNode from given argument dict and/or kwargs,
and possibly resolving the `THIS` reference.
"""
arguments = {**(arguments or {}), **kwargs}
for k, v in arguments.items():
if v is THIS:
arguments[k] = self
# TODO: also necessary to traverse lists/dictionaries?
return PGNode(process_id=process_id, arguments=arguments, namespace=namespace)
58 changes: 7 additions & 51 deletions openeo/rest/datacube.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@
import openeo.processes
from openeo.api.process import Parameter
from openeo.imagecollection import ImageCollection
from openeo.internal.graph_building import PGNode, ReduceNode, _FromNodeMixin
from openeo.internal.graph_building import PGNode, ReduceNode
from openeo.metadata import CollectionMetadata, Band, BandDimension
from openeo.processes import ProcessBuilder
from openeo.rest import BandMathException, OperatorException, OpenEoClientException
from openeo.rest._datacube import _ProcessGraphAbstraction, THIS
from openeo.rest.job import RESTJob
from openeo.rest.mlmodel import MlModel
from openeo.rest.service import Service
Expand All @@ -47,11 +48,8 @@

log = logging.getLogger(__name__)

# Sentinel object to refer to "current" cube in chained cube processing expressions.
THIS = object()


class DataCube(_FromNodeMixin):
class DataCube(_ProcessGraphAbstraction):
"""
Class representing a openEO Data Cube. Data loaded from the backend is returned as an object of this class.
Various processing methods can be invoked to build a complete workflow.
Expand All @@ -61,14 +59,9 @@ class DataCube(_FromNodeMixin):
"""

def __init__(self, graph: PGNode, connection: 'openeo.Connection', metadata: CollectionMetadata = None):
# Process graph
self._pg = graph
self._connection = connection
super().__init__(pgnode=graph, connection=connection)
self.metadata = CollectionMetadata.get_or_create(metadata)

def __str__(self):
return "DataCube({pg})".format(pg=self._pg)

@property
@deprecated(reason="Use :py:meth:`DataCube.flat_graph()` instead.", version="0.9.0")
def graph(self) -> dict:
Expand All @@ -80,37 +73,6 @@ def graph(self) -> dict:
# TODO: is it feasible to just remove this property?
return self.flat_graph()

def flat_graph(self) -> dict:
"""
Get the process graph in flat dict representation
.. note:: This method is mainly for internal use, subject to change and not recommended for general usage.
Instead, use :py:meth:`DataCube.to_json()` to get a JSON representation of the process graph.
"""
# TODO: wrap in {"process_graph":...} by default/optionally?
return self._pg.flat_graph()

flatten = legacy_alias(flat_graph, name="flatten")

def to_json(self, indent=2, separators=None) -> str:
"""
Get JSON representation of (flat dict) process graph.
"""
pg = {"process_graph": self.flat_graph()}
return json.dumps(pg, indent=indent, separators=separators)

@property
def _api_version(self):
return self._connection.capabilities().api_version_check

@property
def connection(self) -> 'openeo.Connection':
return self._connection

def from_node(self) -> PGNode:
# _FromNodeMixin API
return self._pg

def process(
self,
process_id: str,
Expand All @@ -128,15 +90,8 @@ def process(
:param namespace: optional: process namespace
:return: new DataCube instance
"""
arguments = {**(arguments or {}), **kwargs}
for k, v in arguments.items():
if v is THIS:
arguments[k] = self
return self.process_with_node(PGNode(
process_id=process_id,
arguments=arguments,
namespace=namespace,
), metadata=metadata)
pg = self._build_pgnode(process_id=process_id, arguments=arguments, namespace=namespace, **kwargs)
return DataCube(graph=pg, connection=self._connection, metadata=metadata or self.metadata)

graph_add_node = legacy_alias(process, "graph_add_node")

Expand All @@ -150,6 +105,7 @@ def process_with_node(self, pg: PGNode, metadata: Optional[CollectionMetadata] =
"""
# TODO: deep copy `self.metadata` instead of using same instance?
# TODO: cover more cases where metadata has to be altered
# TODO: deprecate `process_with_node``: little added value over just calling DataCube() directly
return DataCube(graph=pg, connection=self._connection, metadata=metadata or self.metadata)

@classmethod
Expand Down
47 changes: 6 additions & 41 deletions openeo/rest/mlmodel.py
Original file line number Diff line number Diff line change
@@ -1,63 +1,28 @@
import json

import pathlib
from typing import Union, Optional
import typing
from typing import Union, Optional

from openeo.internal.graph_building import PGNode
from openeo.rest._datacube import _ProcessGraphAbstraction
from openeo.rest.job import RESTJob
from openeo.util import legacy_alias


if hasattr(typing, 'TYPE_CHECKING') and typing.TYPE_CHECKING:
# Imports for type checking only (circular import issue at runtime). `hasattr` is Python 3.5 workaround #210
from openeo import Connection


class MlModel:
class MlModel(_ProcessGraphAbstraction):
"""
A machine learning model accompanied with STAC metadata, including the ml-model extension.
"""
def __init__(self, graph: PGNode, connection: 'Connection'):
super().__init__()
self._pg = graph
self._connection = connection

def __str__(self):
return "MlModel({pg})".format(pg=self._pg)

@property
def graph(self) -> dict:
"""Get the process graph in flat dict representation"""
return self.flat_graph()

def flat_graph(self) -> dict:
"""Get the process graph in flat dict representation"""
return self._pg.flat_graph()

flatten = legacy_alias(flat_graph, name="flatten")

def to_json(self, indent=2, separators=None) -> str:
"""
Get JSON representation of (flat dict) process graph.
"""
pg = {"process_graph": self.flat_graph()}
return json.dumps(pg, indent=indent, separators=separators)

@property
def _api_version(self):
return self._connection.capabilities().api_version_check

@property
def connection(self):
return self._connection
super().__init__(pgnode=graph, connection=connection)

def save_ml_model(self, options: Optional[dict] = None):
pgnode = PGNode(
process_id="save_ml_model",
arguments={
"data": {'from_node': self._pg},
"options": options or {}
}
arguments={"data": self, "options": options or {}}
)
return MlModel(graph=pgnode, connection=self._connection)

Expand Down
72 changes: 22 additions & 50 deletions openeo/rest/vectorcube.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
import json
import pathlib
from typing import Union
from typing import Union, Optional
import typing

from openeo.internal.graph_building import PGNode
from openeo.internal.graph_building import PGNode, _FromNodeMixin
from openeo.metadata import CollectionMetadata
from openeo.rest._datacube import _ProcessGraphAbstraction, THIS
from openeo.rest.job import RESTJob
from openeo.util import legacy_alias


if hasattr(typing, 'TYPE_CHECKING') and typing.TYPE_CHECKING:
# Imports for type checking only (circular import issue at runtime). `hasattr` is Python 3.5 workaround #210
from openeo import Connection


class VectorCube:
class VectorCube(_ProcessGraphAbstraction):
"""
A Vector Cube, or 'Vector Collection' is a data structure containing 'Features':
https://www.w3.org/TR/sdw-bp/#dfn-feature
Expand All @@ -24,53 +24,26 @@ class VectorCube:
"""

def __init__(self, graph: PGNode, connection: 'Connection', metadata: CollectionMetadata = None):
super().__init__()
# Process graph
self._pg = graph
self._connection = connection
super().__init__(pgnode=graph, connection=connection)
# TODO: does VectorCube need CollectionMetadata?
self.metadata = metadata

def __str__(self):
return "DataCube({pg})".format(pg=self._pg)

@property
def graph(self) -> dict:
"""Get the process graph in flat dict representation"""
return self.flat_graph()

def flat_graph(self) -> dict:
"""Get the process graph in flat dict representation"""
return self._pg.flat_graph()

flatten = legacy_alias(flat_graph, name="flatten")

def to_json(self, indent=2, separators=None) -> str:
"""
Get JSON representation of (flat dict) process graph.
"""
pg = {"process_graph": self.flat_graph()}
return json.dumps(pg, indent=indent, separators=separators)

@property
def _api_version(self):
return self._connection.capabilities().api_version_check

@property
def connection(self):
return self._connection

def process(self, process_id: str, args: dict = None, metadata: CollectionMetadata = None, **kwargs) -> 'VectorCube':
def process(
self,
process_id: str,
arguments: dict = None,
metadata: Optional[CollectionMetadata] = None,
namespace: Optional[str] = None,
**kwargs) -> 'VectorCube':
"""
Generic helper to create a new DataCube by applying a process.
:param process_id: process id of the process.
:param args: argument dictionary for the process.
:return: new DataCube instance
"""
return self.process_with_node(PGNode(
process_id=process_id,
arguments=args, **kwargs
), metadata=metadata)
pg = self._build_pgnode(process_id=process_id, arguments=arguments, namespace=namespace, **kwargs)
return VectorCube(graph=pg, connection=self._connection, metadata=metadata or self.metadata)

def process_with_node(self, pg: PGNode, metadata: CollectionMetadata = None) -> 'VectorCube':
"""
Expand All @@ -80,22 +53,21 @@ def process_with_node(self, pg: PGNode, metadata: CollectionMetadata = None) ->
:param metadata: (optional) metadata to override original cube metadata (e.g. when reducing dimensions)
:return: new DataCube instance
"""
from openeo.rest.datacube import DataCube, THIS
arguments = pg.arguments
for k, v in arguments.items():
if isinstance(v, DataCube) or isinstance(v, VectorCube):
arguments[k] = {"from_node": v._pg}
elif v is THIS:
arguments[k] = {"from_node": self._pg}
# TODO: it's against intended flow to resolve THIS and _FromNodeMixin at this point (should be done before building PGNode)
if v is THIS:
v = self
if isinstance(v, _FromNodeMixin):
arguments[k] = {"from_node": v.from_node()}
# TODO: deep copy `self.metadata` instead of using same instance?
# TODO: cover more cases where metadata has to be altered
return VectorCube(graph=pg, connection=self._connection, metadata=metadata or self.metadata)

def save_result(self, format: str = "GeoJson", options: dict = None):
return self.process(
process_id="save_result",
args={
"data": {"from_node": self._pg},
arguments={
"data": self,
"format": format,
"options": options or {}
}
Expand Down
2 changes: 1 addition & 1 deletion tests/rest/datacube/test_vectorcube.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def test_raster_to_vector(con100):
vector_cube = img.raster_to_vector()
vector_cube_tranformed = vector_cube.process_with_node(openeo.UDF("python source code", "Python"))

assert vector_cube_tranformed.graph == {
assert vector_cube_tranformed.flat_graph() == {
'loadcollection1': {
'arguments': {
'id': 'S2',
Expand Down

0 comments on commit 5688088

Please sign in to comment.