From f1993eb01fbe28d2ca7312f9e4ce25632d7c0a0e Mon Sep 17 00:00:00 2001 From: Matthias Mohr Date: Wed, 11 Aug 2021 15:52:36 +0200 Subject: [PATCH] Basic process namespace support and debug process --- src/api/processes.js | 2 +- src/models/logs.js | 2 +- src/processes/add_dimension.js | 2 +- src/processes/aggregate_temporal_frequency.js | 6 +- src/processes/apply.js | 2 +- src/processes/array_element.js | 2 +- src/processes/debug.js | 18 ++++++ src/processes/debug.json | 57 +++++++++++++++++++ src/processes/drop_dimension.js | 4 +- src/processes/filter_bbox.js | 2 +- src/processes/filter_spatial.js | 2 +- src/processes/load_collection.js | 4 +- src/processes/reduce_dimension.js | 2 +- src/processes/rename_dimension.js | 4 +- src/processes/rename_labels.js | 4 +- src/processgraph/processgraph.js | 5 ++ src/processgraph/registry.js | 5 +- src/utils.js | 8 +++ storage/errors/errors.json | 10 ++-- 19 files changed, 114 insertions(+), 27 deletions(-) create mode 100644 src/processes/debug.js create mode 100644 src/processes/debug.json diff --git a/src/api/processes.js b/src/api/processes.js index 04a6ebf..7929575 100644 --- a/src/api/processes.js +++ b/src/api/processes.js @@ -12,7 +12,7 @@ module.exports = class Processes { getProcesses(req, res, next) { res.json({ - processes: this.registry.toJSON(), + processes: this.registry.namespace('backend'), links: [] }); return next(); diff --git a/src/models/logs.js b/src/models/logs.js index f20d745..c0b6b4a 100644 --- a/src/models/logs.js +++ b/src/models/logs.js @@ -89,7 +89,7 @@ module.exports = class Logs { this.add(message, 'error', data, trace, code, links, error.id); } else { - this.add(message, 'error', data, trace, code, links); + this.add(error, 'error', data, trace, code, links); } } diff --git a/src/processes/add_dimension.js b/src/processes/add_dimension.js index ce7b5b5..197a02d 100644 --- a/src/processes/add_dimension.js +++ b/src/processes/add_dimension.js @@ -11,7 +11,7 @@ module.exports = class add_dimension extends BaseProcess { if (dc.hasDimension(name)) { throw new Errors.DimensionExists({ - process: this.spec.id, + process: this.id, argument: 'name' }); } diff --git a/src/processes/aggregate_temporal_frequency.js b/src/processes/aggregate_temporal_frequency.js index c314870..46e43d1 100644 --- a/src/processes/aggregate_temporal_frequency.js +++ b/src/processes/aggregate_temporal_frequency.js @@ -12,14 +12,14 @@ module.exports = class aggregate_temporal_frequency extends BaseProcess { if (!(callback instanceof ProcessGraph)) { throw new Errors.ProcessArgumentInvalid({ - process: this.spec.id, + process: this.id, argument: 'reducer', reason: 'No reducer specified.' }); } else if (callback.getNodeCount() !== 1) { throw new Errors.ProcessArgumentInvalid({ - process: this.spec.id, + process: this.id, argument: 'reducer', reason: "No complex reducer supported at the moment" }); @@ -30,7 +30,7 @@ module.exports = class aggregate_temporal_frequency extends BaseProcess { var process = callback.getProcess(childNode); if (typeof process.geeReducer !== 'function') { throw new Errors.ProcessArgumentInvalid({ - process: this.spec.id, + process: this.id, argument: 'reducer', reason: 'The specified reducer is invalid.' }); diff --git a/src/processes/apply.js b/src/processes/apply.js index bd72347..8d60fe2 100644 --- a/src/processes/apply.js +++ b/src/processes/apply.js @@ -10,7 +10,7 @@ module.exports = class apply extends BaseProcess { var callback = node.getArgument("process"); if (!(callback instanceof ProcessGraph)) { throw new Errors.ProcessArgumentInvalid({ - process: this.spec.id, + process: this.id, argument: 'process', reason: 'No process specified.' }); diff --git a/src/processes/array_element.js b/src/processes/array_element.js index 6652de1..b2c734f 100644 --- a/src/processes/array_element.js +++ b/src/processes/array_element.js @@ -31,7 +31,7 @@ module.exports = class array_element extends BaseProcess { // ToDo: only bands is currently supported if (dimension.type !== "bands") { throw new Errors.ProcessArgumentInvalid({ - process: this.spec.id, + process: this.id, argument: 'dimension', reason: 'Only dimension "bands" is currently supported.' }); diff --git a/src/processes/debug.js b/src/processes/debug.js new file mode 100644 index 0000000..e943fa3 --- /dev/null +++ b/src/processes/debug.js @@ -0,0 +1,18 @@ +const { BaseProcess } = require('@openeo/js-processgraphs'); +const Commons = require('../processgraph/commons'); + +module.exports = class debug extends BaseProcess { + + async execute(node) { + var dc = node.getArgument('data'); + var code = node.getArgument('data'); + var level = node.getArgument('data', 'info'); + var message = node.getArgument('data'); + + var logger = node.getLogger(); + logger[level](message, data, code); + + return dc; + } + +}; \ No newline at end of file diff --git a/src/processes/debug.json b/src/processes/debug.json new file mode 100644 index 0000000..eedafa4 --- /dev/null +++ b/src/processes/debug.json @@ -0,0 +1,57 @@ +{ + "id": "debug", + "summary": "Publish debugging information", + "description": "Sends debugging information about the data to the log output. Passes the data through.", + "categories": [ + "development" + ], + "experimental": true, + "parameters": [ + { + "name": "data", + "description": "Data to publish.", + "schema": { + "description": "Any data type is allowed." + } + }, + { + "name": "code", + "description": "An identifier to help identify the log entry in a bunch of other log entries.", + "schema": { + "type": "string" + }, + "default": "", + "optional": true + }, + { + "name": "level", + "description": "The severity level of this message, defaults to `info`. Note that the level `error` forces the computation to be stopped!", + "schema": { + "type": "string", + "enum": [ + "error", + "warning", + "info", + "debug" + ] + }, + "default": "info", + "optional": true + }, + { + "name": "message", + "description": "A message to send in addition to the data.", + "schema": { + "type": "string" + }, + "default": "", + "optional": true + } + ], + "returns": { + "description": "The data as passed to the `data` parameter without any modification.", + "schema": { + "description": "Any data type is allowed." + } + } +} \ No newline at end of file diff --git a/src/processes/drop_dimension.js b/src/processes/drop_dimension.js index cbf3645..11c0229 100644 --- a/src/processes/drop_dimension.js +++ b/src/processes/drop_dimension.js @@ -10,7 +10,7 @@ module.exports = class drop_dimension extends BaseProcess { if (dc.hasDimension(dimensionName) === false) { throw new Errors.DimensionNotAvailable({ - process: this.spec.id, + process: this.id, argument: 'name' }); } @@ -19,7 +19,7 @@ module.exports = class drop_dimension extends BaseProcess { if (dimension.values.length > 1) { throw new Errors.DimensionLabelCountMismatch({ - process: this.spec.id, + process: this.id, argument: 'name' }); } diff --git a/src/processes/filter_bbox.js b/src/processes/filter_bbox.js index a04306b..f5499d3 100644 --- a/src/processes/filter_bbox.js +++ b/src/processes/filter_bbox.js @@ -4,7 +4,7 @@ const Commons = require('../processgraph/commons'); module.exports = class filter_bbox extends BaseProcess { async execute(node) { - return Commons.filterBbox(node.getDataCube("data"), node.getArgument("extent"), this.spec.id, 'extent'); + return Commons.filterBbox(node.getDataCube("data"), node.getArgument("extent"), this.id, 'extent'); } }; \ No newline at end of file diff --git a/src/processes/filter_spatial.js b/src/processes/filter_spatial.js index ee9b2ce..1cb7625 100644 --- a/src/processes/filter_spatial.js +++ b/src/processes/filter_spatial.js @@ -4,7 +4,7 @@ const Commons = require('../processgraph/commons'); module.exports = class filter_spatial extends BaseProcess { async execute(node) { - return Commons.filterGeoJSON(node.getData("data"), node.getArgument("geometries"), this.spec.id, 'geometries'); + return Commons.filterGeoJSON(node.getData("data"), node.getArgument("geometries"), this.id, 'geometries'); } }; \ No newline at end of file diff --git a/src/processes/load_collection.js b/src/processes/load_collection.js index e10b791..574a7ae 100644 --- a/src/processes/load_collection.js +++ b/src/processes/load_collection.js @@ -32,10 +32,10 @@ module.exports = class load_collection extends BaseProcess { var spatial_extent = node.getArgument("spatial_extent"); if (spatial_extent !== null) { if (spatial_extent.type) { // GeoJSON - has been validated before so `type` should be a safe indicator for GeoJSON - dc = Commons.filterGeoJSON(dc, spatial_extent, this.spec.id, 'spatial_extent'); + dc = Commons.filterGeoJSON(dc, spatial_extent, this.id, 'spatial_extent'); } else { // Bounding box - dc = Commons.filterBbox(dc, spatial_extent, this.spec.id, 'spatial_extent'); + dc = Commons.filterBbox(dc, spatial_extent, this.id, 'spatial_extent'); } } diff --git a/src/processes/reduce_dimension.js b/src/processes/reduce_dimension.js index 323c60f..097ee86 100644 --- a/src/processes/reduce_dimension.js +++ b/src/processes/reduce_dimension.js @@ -5,7 +5,7 @@ module.exports = class reduce_dimension extends BaseProcess { async execute(node) { var dc = node.getDataCube("data"); - dc = await Commons.reduce(node, dc, this.spec.id); + dc = await Commons.reduce(node, dc, this.id); // ToDo: We don't know at this point how the bands in the GEE images/imagecollections are called. var dimensionName = node.getArgument("dimension"); dc.dropDimension(dimensionName); diff --git a/src/processes/rename_dimension.js b/src/processes/rename_dimension.js index f04d9b8..a108521 100644 --- a/src/processes/rename_dimension.js +++ b/src/processes/rename_dimension.js @@ -9,13 +9,13 @@ module.exports = class rename_dimension extends BaseProcess { if (dc.hasDimension(srcName)) { throw new Errors.DimensionNotAvailable({ - process: this.spec.id, + process: this.id, argument: 'source' }); } else if (dc.hasDimension(trgName)) { throw new Errors.DimensionExists({ - process: this.spec.id, + process: this.id, argument: 'target' }); } diff --git a/src/processes/rename_labels.js b/src/processes/rename_labels.js index ecad3ec..1b6145f 100644 --- a/src/processes/rename_labels.js +++ b/src/processes/rename_labels.js @@ -18,7 +18,7 @@ module.exports = class rename_labels extends BaseProcess { if (!dc.hasDimension(dimensionName)) { throw new Errors.ProcessArgumentInvalid({ - process: this.spec.id, + process: this.id, argument: 'dimension', reason: 'Dimension "' + dimension + '" does not exist.' }); @@ -28,7 +28,7 @@ module.exports = class rename_labels extends BaseProcess { var dimension = dc.getDimension(dimensionName); if (dimension.type !== "bands") { throw new Errors.ProcessArgumentInvalid({ - process: this.spec.id, + process: this.id, argument: 'dimension', reason: 'Only dimension "bands" is currently supported.' }); diff --git a/src/processgraph/processgraph.js b/src/processgraph/processgraph.js index cc262a9..78d8f35 100644 --- a/src/processgraph/processgraph.js +++ b/src/processgraph/processgraph.js @@ -47,6 +47,11 @@ module.exports = class GeeProcessGraph extends ProcessGraph { return pg; } + createProcessInstance(process) { + var impl = require('../processes/' + process.id + '.js'); + return new impl(process); + } + async validateNode(node) { var process = this.getProcess(node); return await process.validate(node, this.context); diff --git a/src/processgraph/registry.js b/src/processgraph/registry.js index c67ffec..910d8d8 100644 --- a/src/processgraph/registry.js +++ b/src/processgraph/registry.js @@ -17,7 +17,7 @@ module.exports = class GeeProcessRegistry extends ProcessRegistry { this.addFromFile(id); } }); - var num = Utils.size(this.processes); + var num = Utils.size(this.namespace('backend')); console.info("Loaded " + num + " processes."); return Promise.resolve(num); } @@ -25,8 +25,7 @@ module.exports = class GeeProcessRegistry extends ProcessRegistry { addFromFile(id) { var spec = require('../processes/' + id + '.json'); delete spec.process_graph; - var impl = require('../processes/' + id + '.js'); - this.processes[id.toLowerCase()] = new impl(spec); + this.add(spec, 'backend'); } getServerContext() { diff --git a/src/utils.js b/src/utils.js index 9ebcb0e..b96c588 100644 --- a/src/utils.js +++ b/src/utils.js @@ -72,6 +72,14 @@ var Utils = { size(obj) { return CommonUtils.size(obj); }, + + omitFromObject(obj, omit) { + return CommonUtils.omitFromObject(obj, omit); + }, + + pickFromObject(obj, pick) { + return CommonUtils.pickFromObject(obj, pick); + }, loadDB(name, folder = './storage/database/') { var db = new Datastore({ filename: folder + name + '.db', autoload: true }); diff --git a/storage/errors/errors.json b/storage/errors/errors.json index 9f111af..c105573 100644 --- a/storage/errors/errors.json +++ b/storage/errors/errors.json @@ -221,7 +221,7 @@ }, "ProcessGraphMissing": { "description": null, - "message": "No valid process graph specified.", + "message": "No process graph specified", "http": 400, "tags": [ "Process Graph Management", @@ -231,7 +231,7 @@ }, "ProcessUnsupported": { "description": null, - "message": "Process '{process}' is not supported.", + "message": "Process '{process}' (namespace: {namespace}) is not supported.", "http": 400, "tags": [ "Processes" @@ -239,7 +239,7 @@ }, "ProcessArgumentUnsupported": { "description": null, - "message": "Process '{process}' does not support argument '{argument}'.", + "message": "Process '{process}' (namespace: {namespace}) does not support the following arguments: {arguments}", "http": 400, "tags": [ "Processes" @@ -247,7 +247,7 @@ }, "ProcessArgumentInvalid": { "description": null, - "message": "The argument '{argument}' in process '{process}' is invalid: {reason}", + "message": "The argument '{argument}' in process '{process}' (namespace: {namespace}) is invalid: {reason}", "http": 400, "tags": [ "Processes" @@ -255,7 +255,7 @@ }, "ProcessArgumentRequired": { "description": null, - "message": "Process '{process}' requires argument '{argument}'.", + "message": "Process '{process}' (namespace: {namespace}) requires argument '{argument}'.", "http": 400, "tags": [ "Processes"