Skip to content

Commit

Permalink
Merge pull request #91 from openEOPlatform/feature/data_fusion_support
Browse files Browse the repository at this point in the history
Feature - data fusion support
  • Loading branch information
zansinergise committed Nov 23, 2023
2 parents 45e1eab + 78947f7 commit d7fda12
Show file tree
Hide file tree
Showing 10 changed files with 193 additions and 164 deletions.
16 changes: 9 additions & 7 deletions src/pg_to_evalscript/conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,13 @@ def convert_from_process_graph(
bands_dimension_name="bands",
temporal_dimension_name="t",
encode_result=True,
bands_metadata=[],
bands_metadata={},
):
all_nodes_valid, subgraphs = check_validity_and_subgraphs(
process_graph, temporal_dimension_name, bands_dimension_name, user_defined_processes=user_defined_processes
)
if all_nodes_valid:
nodes, input_bands, initial_data_name = generate_nodes_from_process_graph(
nodes, input_bands, initial_data_names = generate_nodes_from_process_graph(
process_graph,
bands_dimension_name,
temporal_dimension_name,
Expand All @@ -102,7 +102,7 @@ def convert_from_process_graph(
evalscript = Evalscript(
input_bands,
nodes,
initial_data_name,
initial_data_names,
n_output_bands=n_output_bands,
sample_type=sample_type,
units=units,
Expand Down Expand Up @@ -165,17 +165,19 @@ def generate_nodes_from_process_graph(
)

nodes = []
input_bands = None
initial_data_name = None
input_bands = []
initial_data_names = {}

for node_id in execution_order:
process_id = process_graph[node_id]["process_id"]
arguments = process_graph[node_id]["arguments"]
child_nodes = None

if process_id == "load_collection":
input_bands = arguments.get("bands")
bands_for_datasource = arguments.get("bands")
initial_data_name = "node_" + node_id
input_bands.append({"datasource": initial_data_name, "bands": bands_for_datasource})
initial_data_names[node_id] = initial_data_name
continue
elif process_id == "save_result":
continue
Expand Down Expand Up @@ -257,4 +259,4 @@ def generate_nodes_from_process_graph(
bands_dimension_name=bands_dimension_name,
)
nodes.append(node)
return nodes, input_bands, initial_data_name
return nodes, input_bands, initial_data_names
47 changes: 36 additions & 11 deletions src/pg_to_evalscript/evalscript.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def __init__(
self,
input_bands,
nodes,
initial_data_name,
initial_data_names,
n_output_bands=1,
sample_type="AUTO",
units=None,
Expand All @@ -30,11 +30,11 @@ def __init__(
datacube_definition_directory="javascript_datacube",
output_dimensions=None,
encode_result=True,
bands_metadata=[],
bands_metadata={},
):
self.input_bands = input_bands
self.nodes = nodes
self.initial_data_name = initial_data_name
self.initial_data_names = initial_data_names
self.n_output_bands = n_output_bands
self.sample_type = sample_type
self.units = units
Expand All @@ -47,15 +47,15 @@ def __init__(
self.bands_metadata = bands_metadata

def write(self):
if self.input_bands is None:
if any(datasource_with_bands["bands"] is None for datasource_with_bands in self.input_bands):
raise Exception("input_bands must be set!")
newline = "\n"
tab = "\t"
return f"""
//VERSION=3
function setup() {{
return {{
input: [{",".join([f"'{band}'" for band in self.input_bands])}],
input: [{",".join([f"{datasource_with_bands}" for datasource_with_bands in self.input_bands])}],
output: {{ bands: {self.n_output_bands}, sampleType: "{self.sample_type}"{f", units: '{self.units}'" if self.units is not None else ''} }},
mosaicking: "{self.mosaicking}"
}};
Expand Down Expand Up @@ -103,10 +103,24 @@ def write_ndarray_definition(self):
return pkgutil.get_data("pg_to_evalscript", f"javascript_datacube/ndarray.js").decode("utf-8")

def write_datacube_creation(self):
return f"let {self.initial_data_name} = new DataCube(samples, '{self.bands_dimension_name}', '{self.temporal_dimension_name}', true, {json.dumps(self.bands_metadata)}, scenes)"
datacube_creation = ""
if len(self.input_bands) > 1:
for datasource_with_bands in self.input_bands:
datasource_name = datasource_with_bands["datasource"]
datacube_creation += f"let {datasource_name} = new DataCube(samples.{datasource_name}, '{self.bands_dimension_name}', '{self.temporal_dimension_name}', true, {json.dumps(self.bands_metadata[datasource_name] if self.bands_metadata is not None and len(self.bands_metadata) > 0 else None)}, scenes)\n\t"
else:
datasource_with_bands = self.input_bands[0]
datasource_name = datasource_with_bands["datasource"]
datacube_creation += f"let {datasource_name} = new DataCube(samples, '{self.bands_dimension_name}', '{self.temporal_dimension_name}', true, {json.dumps(self.bands_metadata[datasource_name] if self.bands_metadata is not None and len(self.bands_metadata) > 0 else None)}, scenes)\n\t"

return datacube_creation

def write_runtime_global_constants(self):
return f"const INPUT_BANDS = {self.input_bands};"
all_bands = []
for datasource_with_bands in self.input_bands:
all_bands.extend(datasource_with_bands["bands"])

return f"const INPUT_BANDS = {list(set(all_bands))};"

def write_update_output(self):
if self._output_dimensions is None:
Expand All @@ -120,7 +134,13 @@ def write_update_output(self):
size_without_original_temporal_dimensions = reduce(
lambda x, y: x * y, sizes_without_original_temporal_dimensions, 1
)
collection_scenes_length = "* collection.scenes.length" * number_of_original_temporal_dimensions
if len(self.input_bands) > 1:
collection_scenes_length = (
"* Object.values(collection).reduce((acc, val) => acc + val.scenes.length, 0)"
* number_of_original_temporal_dimensions
)
else:
collection_scenes_length = "* collection.scenes.length" * number_of_original_temporal_dimensions
number_of_final_dimensions = len(self._output_dimensions) + 1 if self.encode_result else 0
return f"""
function updateOutput(outputs, collection) {{
Expand All @@ -131,13 +151,18 @@ def write_update_output(self):

def write_output_variable(self):
if len(self.nodes) == 0:
return self.initial_data_name
return "_".join(self.initial_data_names.values())
return self.nodes[-1].node_varname_prefix + self.nodes[-1].node_id

def determine_output_dimensions(self):
dimensions_of_inputs_per_node = defaultdict(list)
all_bands = []
for datasource_with_bands in self.input_bands:
if datasource_with_bands is not None and datasource_with_bands["bands"] is not None:
all_bands.extend(datasource_with_bands["bands"])

initial_output_dimensions = [
{"name": self.bands_dimension_name, "size": len(self.input_bands) if self.input_bands is not None else 0},
{"name": self.bands_dimension_name, "size": len(set(all_bands)) if self.input_bands is not None else 0},
{"name": self.temporal_dimension_name, "size": None, "original_temporal": True},
]

Expand All @@ -147,7 +172,7 @@ def determine_output_dimensions(self):
dimensions_of_inputs_per_node[self.nodes[0].node_id].append(initial_output_dimensions)

for node in self.nodes:
output_dimensions = node.get_dimensions_change(dimensions_of_inputs_per_node[node.node_id])
output_dimensions = node.get_dimensions_change(dimensions_of_inputs_per_node[node.node_id] if len(dimensions_of_inputs_per_node[node.node_id]) > 0 else [initial_output_dimensions])
for dependent in node.dependents:
dimensions_of_inputs_per_node[dependent].append(output_dimensions)

Expand Down
Loading

0 comments on commit d7fda12

Please sign in to comment.