Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature - data fusion support #91

Merged
merged 23 commits into from
Nov 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
4693b61
Set multiple inputs and mark what still needs to be fixed
zansinergise Nov 3, 2023
1a20864
Fix stuff mostly related to input bands
zansinergise Nov 8, 2023
1f00d93
Fix names if length of nodes is 0
zansinergise Nov 8, 2023
1aca9e0
fix bug
zansinergise Nov 8, 2023
14a196e
add safety check for scenes which are undefined (probably wrong)
zansinergise Nov 9, 2023
8d75ab4
run linting
zansinergise Nov 9, 2023
e6774a3
fix bug where there were no input bands set
zansinergise Nov 10, 2023
5b2655d
add comment what to fix
zansinergise Nov 10, 2023
b67ec31
fix updateOutput function if multiple load collections
zansinergise Nov 14, 2023
a42c324
run linting
zansinergise Nov 14, 2023
604de6a
should work for both basic and fusion
zansinergise Nov 15, 2023
40d5580
fix failing tests, so results match the processes https://processes.o…
zansinergise Nov 15, 2023
c7bf92b
fix wrong concatenation
zansinergise Nov 15, 2023
e40aaad
fix so correct samples are used for datacube creation
zansinergise Nov 15, 2023
ee911c9
make bands_metadata use the new correct format
zansinergise Nov 15, 2023
3531d7b
fix failing tests so they match new format of input bands
zansinergise Nov 15, 2023
0215634
revert changes in order test
zansinergise Nov 15, 2023
d248eec
fix bug
zansinergise Nov 15, 2023
f0c4f57
remove comment as execution order is ok
zansinergise Nov 17, 2023
128d18d
fix for even more advanced PGs
zansinergise Nov 17, 2023
c026d61
make code more readable
zansinergise Nov 22, 2023
ab606b6
rename load collection nodes in tests
zansinergise Nov 22, 2023
78947f7
remove comment
zansinergise Nov 22, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions src/pg_to_evalscript/conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,13 @@ def convert_from_process_graph(
bands_dimension_name="bands",
temporal_dimension_name="t",
encode_result=True,
bands_metadata=[],
bands_metadata={},
):
all_nodes_valid, subgraphs = check_validity_and_subgraphs(
process_graph, temporal_dimension_name, bands_dimension_name, user_defined_processes=user_defined_processes
)
if all_nodes_valid:
nodes, input_bands, initial_data_name = generate_nodes_from_process_graph(
nodes, input_bands, initial_data_names = generate_nodes_from_process_graph(
process_graph,
bands_dimension_name,
temporal_dimension_name,
Expand All @@ -102,7 +102,7 @@ def convert_from_process_graph(
evalscript = Evalscript(
input_bands,
nodes,
initial_data_name,
initial_data_names,
n_output_bands=n_output_bands,
sample_type=sample_type,
units=units,
Expand Down Expand Up @@ -165,17 +165,19 @@ def generate_nodes_from_process_graph(
)

nodes = []
input_bands = None
initial_data_name = None
input_bands = []
initial_data_names = {}

for node_id in execution_order:
process_id = process_graph[node_id]["process_id"]
arguments = process_graph[node_id]["arguments"]
child_nodes = None

if process_id == "load_collection":
input_bands = arguments.get("bands")
bands_for_datasource = arguments.get("bands")
initial_data_name = "node_" + node_id
input_bands.append({"datasource": initial_data_name, "bands": bands_for_datasource})
initial_data_names[node_id] = initial_data_name
continue
elif process_id == "save_result":
continue
Expand Down Expand Up @@ -257,4 +259,4 @@ def generate_nodes_from_process_graph(
bands_dimension_name=bands_dimension_name,
)
nodes.append(node)
return nodes, input_bands, initial_data_name
return nodes, input_bands, initial_data_names
47 changes: 36 additions & 11 deletions src/pg_to_evalscript/evalscript.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def __init__(
self,
input_bands,
nodes,
initial_data_name,
initial_data_names,
n_output_bands=1,
sample_type="AUTO",
units=None,
Expand All @@ -30,11 +30,11 @@ def __init__(
datacube_definition_directory="javascript_datacube",
output_dimensions=None,
encode_result=True,
bands_metadata=[],
bands_metadata={},
):
self.input_bands = input_bands
self.nodes = nodes
self.initial_data_name = initial_data_name
self.initial_data_names = initial_data_names
self.n_output_bands = n_output_bands
self.sample_type = sample_type
self.units = units
Expand All @@ -47,15 +47,15 @@ def __init__(
self.bands_metadata = bands_metadata

def write(self):
if self.input_bands is None:
if any(datasource_with_bands["bands"] is None for datasource_with_bands in self.input_bands):
raise Exception("input_bands must be set!")
newline = "\n"
tab = "\t"
return f"""
//VERSION=3
function setup() {{
return {{
input: [{",".join([f"'{band}'" for band in self.input_bands])}],
input: [{",".join([f"{datasource_with_bands}" for datasource_with_bands in self.input_bands])}],
output: {{ bands: {self.n_output_bands}, sampleType: "{self.sample_type}"{f", units: '{self.units}'" if self.units is not None else ''} }},
mosaicking: "{self.mosaicking}"
}};
Expand Down Expand Up @@ -103,10 +103,24 @@ def write_ndarray_definition(self):
return pkgutil.get_data("pg_to_evalscript", f"javascript_datacube/ndarray.js").decode("utf-8")

def write_datacube_creation(self):
return f"let {self.initial_data_name} = new DataCube(samples, '{self.bands_dimension_name}', '{self.temporal_dimension_name}', true, {json.dumps(self.bands_metadata)}, scenes)"
datacube_creation = ""
if len(self.input_bands) > 1:
for datasource_with_bands in self.input_bands:
datasource_name = datasource_with_bands["datasource"]
datacube_creation += f"let {datasource_name} = new DataCube(samples.{datasource_name}, '{self.bands_dimension_name}', '{self.temporal_dimension_name}', true, {json.dumps(self.bands_metadata[datasource_name] if self.bands_metadata is not None and len(self.bands_metadata) > 0 else None)}, scenes)\n\t"
else:
datasource_with_bands = self.input_bands[0]
datasource_name = datasource_with_bands["datasource"]
datacube_creation += f"let {datasource_name} = new DataCube(samples, '{self.bands_dimension_name}', '{self.temporal_dimension_name}', true, {json.dumps(self.bands_metadata[datasource_name] if self.bands_metadata is not None and len(self.bands_metadata) > 0 else None)}, scenes)\n\t"

return datacube_creation

def write_runtime_global_constants(self):
return f"const INPUT_BANDS = {self.input_bands};"
all_bands = []
for datasource_with_bands in self.input_bands:
all_bands.extend(datasource_with_bands["bands"])

return f"const INPUT_BANDS = {list(set(all_bands))};"
zcernigoj marked this conversation as resolved.
Show resolved Hide resolved

def write_update_output(self):
if self._output_dimensions is None:
Expand All @@ -120,7 +134,13 @@ def write_update_output(self):
size_without_original_temporal_dimensions = reduce(
lambda x, y: x * y, sizes_without_original_temporal_dimensions, 1
)
collection_scenes_length = "* collection.scenes.length" * number_of_original_temporal_dimensions
if len(self.input_bands) > 1:
collection_scenes_length = (
"* Object.values(collection).reduce((acc, val) => acc + val.scenes.length, 0)"
* number_of_original_temporal_dimensions
)
else:
collection_scenes_length = "* collection.scenes.length" * number_of_original_temporal_dimensions
number_of_final_dimensions = len(self._output_dimensions) + 1 if self.encode_result else 0
return f"""
function updateOutput(outputs, collection) {{
Expand All @@ -131,13 +151,18 @@ def write_update_output(self):

def write_output_variable(self):
if len(self.nodes) == 0:
return self.initial_data_name
return "_".join(self.initial_data_names.values())
return self.nodes[-1].node_varname_prefix + self.nodes[-1].node_id

def determine_output_dimensions(self):
dimensions_of_inputs_per_node = defaultdict(list)
all_bands = []
for datasource_with_bands in self.input_bands:
zcernigoj marked this conversation as resolved.
Show resolved Hide resolved
if datasource_with_bands is not None and datasource_with_bands["bands"] is not None:
all_bands.extend(datasource_with_bands["bands"])

initial_output_dimensions = [
{"name": self.bands_dimension_name, "size": len(self.input_bands) if self.input_bands is not None else 0},
{"name": self.bands_dimension_name, "size": len(set(all_bands)) if self.input_bands is not None else 0},
{"name": self.temporal_dimension_name, "size": None, "original_temporal": True},
]

Expand All @@ -147,7 +172,7 @@ def determine_output_dimensions(self):
dimensions_of_inputs_per_node[self.nodes[0].node_id].append(initial_output_dimensions)

for node in self.nodes:
output_dimensions = node.get_dimensions_change(dimensions_of_inputs_per_node[node.node_id])
output_dimensions = node.get_dimensions_change(dimensions_of_inputs_per_node[node.node_id] if len(dimensions_of_inputs_per_node[node.node_id]) > 0 else [initial_output_dimensions])
for dependent in node.dependents:
dimensions_of_inputs_per_node[dependent].append(output_dimensions)

Expand Down
Loading