Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

multiple results via multiple save_result nodes #424

Closed
jdries opened this issue May 17, 2023 · 16 comments · Fixed by Open-EO/openeo-python-driver#299 or #828
Closed

multiple results via multiple save_result nodes #424

jdries opened this issue May 17, 2023 · 16 comments · Fixed by Open-EO/openeo-python-driver#299 or #828
Assignees

Comments

@jdries
Copy link
Contributor

jdries commented May 17, 2023

According to the spec, it should be allowed to have multiple save_result nodes, for instance to write images for different branches of the process graph.

To implement this, the idea was to perhaps add a dummy 'noop' node backend side, so that we convert the multi output node graph back into single node output.

@jdries
Copy link
Contributor Author

jdries commented Jun 28, 2024

@VictorVerhaert this is the feature we need for lcfm to write outputs with multiple resolutions

The current behaviour is non-standard, so we can replace it, only may need to check with corsa team who recently started using it.
(If needed, find a good rollout approach.)

@bossie
Copy link
Collaborator

bossie commented Jun 28, 2024

This is about having multiple nodes with "result": true within the same process graph, right?

I find this highly confusing as this example is not accepted by the web editor (I'm assuming the web editor complies with the spec):

{
  "process_graph": {
    "load1": {
      "process_id": "load_collection",
      "arguments": {
        "id": "AGERA5",
        "spatial_extent": {
          "west": -14.697125210864408,
          "east": 26.11928103913559,
          "south": 46.96632122789427,
          "north": 57.64623333077631
        },
        "temporal_extent": [
          "2024-06-05T00:00:00Z",
          null
        ]
      }
    },
    "load2": {
      "process_id": "load_collection",
      "arguments": {
        "id": "SENTINEL3_SLSTR",
        "spatial_extent": {
          "west": -17.158692149008044,
          "east": 23.657714100991956,
          "south": 47.38452737005318,
          "north": 57.97398258202165
        },
        "temporal_extent": [
          "2024-06-18T00:00:00Z",
          null
        ]
      }
    },
    "merge3": {
      "process_id": "merge_cubes",
      "arguments": {
        "cube1": {
          "from_node": "load1"
        },
        "cube2": {
          "from_node": "load2"
        }
      }
    },
    "save4": {
      "process_id": "save_result",
      "arguments": {
        "data": {
          "from_node": "merge3"
        },
        "format": "PARQUET"
      }
	  ,
      "result": true
    },
    "save5": {
      "process_id": "save_result",
      "arguments": {
        "data": {
          "from_node": "load1"
        },
        "format": "COVJSON"
      },
      "result": true
    }
  },
  "parameters": []
}

Multiple result nodes specified for the process.

@jdries
Copy link
Contributor Author

jdries commented Jun 28, 2024

@bossie the spec explains it:
https://api.openeo.org/#section/Processes/Process-Graphs

One of the nodes in a map of processes (the final one) MUST have the result flag set to true, all the other nodes can omit it as the default value is false. Having such a node is important as multiple end nodes are possible, but in most use cases it is important to exactly specify the return value to be used by other processes.

So your example is indeed not valid, and my description was confusing, will update.

@jdries jdries changed the title multiple results via multiple end nodes multiple results via multiple save_result nodes Jun 28, 2024
@soxofaan
Copy link
Member

soxofaan commented Jul 1, 2024

Some more insights from Open-EO/openeo-processes#279 and Open-EO/openeo-api#427 about different kinds of terminal nodes in the openEO API

  • a process graph can have multiple "end nodes": leaf nodes that are not input to another node
  • a process graph must have one "result node": node with "result": True.
    • The result node is actually just important in the the context of a "callback" (in a "parent" process like apply, reduce_dimension, ...) to determine the value that has to be used by the parent process
    • The "result" property has little meaning for "top-level" graphs
  • the result node does not have to be an "end node". Result nodes can be input to subsequent nodes
  • all "end nodes" are expected to be evaluated/executed
  • a "save_result" node neither has to be "end node" or "result node" to be executed

I guess in our current implementation, we have some mismatching assumptions about the equivalence between "end node", "result node" and "save_result"

(Note that some of these mismatching assumptions are also present in the Python client I'm afraid -> Open-EO/openeo-python-client#583)

@bossie
Copy link
Collaborator

bossie commented Jul 1, 2024

Thanks @soxofaan. I've yet to go through it in detail but it already seems to answer some of my questions with regards to end/result node nuances and how the "result" property matters for parent/child process graphs rather than processes within the same graph (which is not at all what the API spec says IMO) 👍

@HansVRP
Copy link

HansVRP commented Jul 2, 2024

TODO;

analyze and split up in smaller parts.

There are already plan-b's in place for LCFM

@bossie
Copy link
Collaborator

bossie commented Jul 8, 2024

Related:

#295: implemented multiple save_results in a non-standard way
Open-EO/openeo-python-driver#261: a limitation of #295

@bossie
Copy link
Collaborator

bossie commented Jul 8, 2024

The current implementation in the back-end is wrong in that:

process graph evaluation starts from the single result node and therefore only considers this result node and its dependencies (including transitive ones); other end nodes are not evaluated.

The API spec words it like this:

Please be aware that the result node (result set to true) is not necessarily the last node that is executed. The author of the process graph may choose to set a non-end node to the result node!

@bossie
Copy link
Collaborator

bossie commented Jul 10, 2024

Random ramblings:

  • to elaborate on my previous comment, we shouldn't consider a process graph a tree anymore with the result node as the root, rather several sub-trees that all have to be evaluated (in theory these sub-trees don't even have to be connected);
  • we can turn these sub-trees into a single tree again by introducing an artificial "collect" node with the end nodes as dependencies;
  • then the process graph can be evaluated again with a single call to evaluate() that returns the value of the result node;
  • the value of the result node is not necessarily a SaveResult instance;
  • the result node is not necessarily an end node so we have to keep its value somewhere as we evaluate the graph (at least in the short term it could be put into the EvalEnv);
  • similarly, SaveResult instances could be collected and put into the EvalEnv as we evaluate the graph, then materialized/written to output afterwards (TBC);
  • the implementation of collect() could then be as simple as this:
def collect(args: ProcessArgs, env: EvalEnv):  # args contain values of end nodes
  return env[ENV_RESULT]  # the value of the "result" node

@soxofaan
Copy link
Member

Some feedback ramblings:

with a single call to evaluate() that returns the value of the result node;

I think we should also consider stepping away from the idea that evaluate() returns something directly node related. I think it should return some kind of result set object where all save_result assets are collected.

However, returning something completely different from evaluate() is probably quiet challenging to do without breakage across projects. Alternative approach:

  • keep return behavior for now to stay backward compatible
  • add this new result set object as argument, e.g. evaluate(result_set) and pass this through (e.g. through EvalEnv), so that save_result can append results to it

This way we can gradually migrate from the old way to the new way

the result node is not necessarily an end node so we have to keep its value somewhere as we evaluate the graph (at least in the short term it could be put into the EvalEnv);

At the top level of a process graph, the "result node" (the single node with "result": True) is actually not important it turns out (the "result" flag is only relevant for "child" process graphs), so I don't think we have to keep track of it.

It's more important to collect all save_result results in a result set as described above

similarly, SaveResult instances could be collected and put into the EvalEnv as we evaluate the graph, then materialized/written to output afterwards

The practicalities are a bit cloudy now so I might be wrong here, but if I'm not mistaking, the results are materialized now just in time, just before sending a sync request response, or at the end of a batch job. Maybe the materialization should happen at the time of executing save_result and be collected in this result set object I mentioned above

@bossie
Copy link
Collaborator

bossie commented Jul 10, 2024

I think we should also consider stepping away from the idea that evaluate() returns something directly node related. I think it should return some kind of result set object where all save_result assets are collected.

Maybe it has to do with the way child process graphs are implemented (I still have to look into that) but surely a process graph should evaluate to its result, especially because this result is used by the parent process graph.

At the top level of a process graph, the "result node" (the single node with "result": True) is actually not important it turns out (the "result" flag is only relevant for "child" process graphs), so I don't think we have to keep track of it.

It's more important to collect all save_result results in a result set as described above

It depends on whether we treat the top-level process graph the same as child process graphs. It's true that we don't actually use the result of the top-level PG and rightly return save_result results but I see no reason to evaluate them differently.

The practicalities are a bit cloudy now so I might be wrong here, but if I'm not mistaking, the results are materialized now just in time, just before sending a sync request response, or at the end of a batch job. Maybe the materialization should happen at the time of executing save_result and be collected in this result set object I mentioned above

You are correct and this behavior might have to change as in the context of things related to export_workspace, save_result will return STAC that can be modified by other processes IIRC. But that's another ticket.

@bossie
Copy link
Collaborator

bossie commented Jul 16, 2024

@VictorVerhaert do you have an example process graph I can test this feature with?

@soxofaan
Copy link
Member

I think we should also consider stepping away from the idea that evaluate() returns something directly node related. I think it should return some kind of result set object where all save_result assets are collected.

Maybe it has to do with the way child process graphs are implemented (I still have to look into that) but surely a process graph should evaluate to its result, especially because this result is used by the parent process graph.

I think openeo_driver.ProcessGraphDeserializer.evaluate() is only called on top-level process graphs anyway (if you look at arguments and implementation, it looks to be full of assumptions about that in any case). Child process graphs are typically handled with some custom process graph visitor, which builds a "callable" in "scala space". So I don't think there is a problem there

bossie added a commit that referenced this issue Jul 18, 2024
Traceback (most recent call last):
  File "/home/bossie/PycharmProjects/openeo/venv38/lib/python3.8/site-packages/flask/app.py", line 880, in full_dispatch_request
    rv = self.dispatch_request()
  File "/home/bossie/PycharmProjects/openeo/venv38/lib/python3.8/site-packages/flask/app.py", line 865, in dispatch_request
    return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args)  # type: ignore[no-any-return]
  File "/home/bossie/PycharmProjects/openeo/openeo-python-driver/openeo_driver/users/auth.py", line 95, in decorated
    return f(*args, **kwargs)
  File "/home/bossie/PycharmProjects/openeo/openeo-python-driver/openeo_driver/views.py", line 952, in queue_job
    backend_implementation.batch_jobs.start_job(job_id=job_id, user=user)
  File "/home/bossie/PycharmProjects/openeo/openeo-geopyspark-driver/openeogeotrellis/backend.py", line 1721, in start_job
    self._start_job(job_id, user, _get_vault_token)
  File "/home/bossie/PycharmProjects/openeo/openeo-geopyspark-driver/openeogeotrellis/backend.py", line 1769, in _start_job
    job_dependencies = self._schedule_and_get_dependencies(
  File "/home/bossie/PycharmProjects/openeo/openeo-geopyspark-driver/openeogeotrellis/backend.py", line 2247, in _schedule_and_get_dependencies
    convert_node(result_node, env=env.push({ENV_DRY_RUN_TRACER: dry_run_tracer, ENV_SAVE_RESULT:[],"node_caching":False}))
  File "/home/bossie/PycharmProjects/openeo/openeo-python-driver/openeo_driver/ProcessGraphDeserializer.py", line 458, in convert_node
    env[ENV_FINAL_RESULT][0] = process_result
  File "/home/bossie/PycharmProjects/openeo/openeo-python-driver/openeo_driver/utils.py", line 49, in __getitem__
    return self._parent[key]
  File "/home/bossie/PycharmProjects/openeo/openeo-python-driver/openeo_driver/utils.py", line 49, in __getitem__
    return self._parent[key]
  File "/home/bossie/PycharmProjects/openeo/openeo-python-driver/openeo_driver/utils.py", line 51, in __getitem__
    raise KeyError(key)
KeyError: 'final_result'
bossie added a commit that referenced this issue Jul 18, 2024
bossie added a commit that referenced this issue Jul 18, 2024
@bossie
Copy link
Collaborator

bossie commented Jul 18, 2024

Supports multiple save_result nodes (also: inspect) in batch jobs* by evaluating the whole process graph, regardless of whether nodes are dependencies of the "result" node.

Note that you might have to make sure that these save_results don't overwrite each other's output files by passing the filename_prefix format option; for example, to save intermediate results to a file called intermediate.tif, the save_result node might look like this:

{
  "process_id": "save_result",
  "arguments": {
      "data": {"from_node": "loadcollection1"},
      "format": "GTiff",
      "options": {"filename_prefix": "intermediate"},
  }
}

*there was never support for multiple save_results in sync requests

@bossie
Copy link
Collaborator

bossie commented Aug 5, 2024

TODO: add/adapt an integration test.

@bossie bossie reopened this Aug 5, 2024
@bossie
Copy link
Collaborator

bossie commented Aug 6, 2024

Added test_batch.test_multiple_save_results to k8s integration tests.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants