diff --git a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/OpenEOProcessScriptBuilder.scala b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/OpenEOProcessScriptBuilder.scala index 9f4f603e1..55ca5066a 100644 --- a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/OpenEOProcessScriptBuilder.scala +++ b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/OpenEOProcessScriptBuilder.scala @@ -37,6 +37,7 @@ object OpenEOProcessScriptBuilder{ private val logger = LoggerFactory.getLogger(classOf[OpenEOProcessScriptBuilder]) type OpenEOProcess = Map[String,Any] => (Seq[Tile] => Seq[Tile] ) + type AnyProcess = Map[String,Any] => (Any => Any ) private def wrapSimpleProcess(operator: Seq[Tile] => Seq[Tile]): OpenEOProcess = { def wrapper(context:Map[String,Any])(tiles: Seq[Tile]): Seq[Tile] = operator(tiles) @@ -435,20 +436,25 @@ class OpenEOProcessScriptBuilder { val argNames: mutable.Stack[String] = new mutable.Stack[String]() val contextStack: mutable.Stack[mutable.Map[String,Object]] = new mutable.Stack[mutable.Map[String, Object]]() var arrayCounter : Int = 0 - var inputFunction: OpenEOProcess = null + var inputFunction: Object = null + var resultingDataType: CellType = FloatConstantNoDataCellType val defaultDataParameterName:String = "data" def generateFunction(context: Map[String,Any] = Map.empty): Seq[Tile] => Seq[Tile] = { - inputFunction(context) + if(inputFunction.isInstanceOf[OpenEOProcess]) { + inputFunction.asInstanceOf[OpenEOProcess](context) + }else{ + throw new IllegalArgumentException(s"The openEO callback resulted into an unsupported function: $inputFunction") + } } def generateFunction(context: util.Map[String, Any]): Seq[Tile] => Seq[Tile] = { - inputFunction(context.toMap) + this.generateFunction(context.toMap) } def generateFunction(): Seq[Tile] => Seq[Tile] = { - wrapProcessWithDefaultContext(inputFunction) + wrapProcessWithDefaultContext(inputFunction.asInstanceOf[OpenEOProcess]) } /** @@ -478,7 +484,18 @@ class OpenEOProcessScriptBuilder { } private def getProcessArg(name:String):OpenEOProcess = { - contextStack.head.getOrElse(name,throw new IllegalArgumentException(s"Process [${processStack.head}] expects a value argument. These arguments were found: " + contextStack.head.keys.mkString(", ") + s"function tree: ${processStack.reverse.mkString("->")}")).asInstanceOf[OpenEOProcess] + contextStack.head.getOrElse(name,throw new IllegalArgumentException(s"Process [${processStack.head}] expects a $name argument. These arguments were found: " + contextStack.head.keys.mkString(", ") + s"function tree: ${processStack.reverse.mkString("->")}")).asInstanceOf[OpenEOProcess] + } + + private def getAnyProcessArg(name: String,arguments: java.util.Map[String, Object]): AnyProcess = { + if(arguments.get(name).isInstanceOf[String]) { + val theArg:Any = arguments.get(name) + (context: Map[String,Any]) => (inputArg: Any) => { + theArg + } + }else{ + contextStack.head.getOrElse(name, throw new IllegalArgumentException(s"Process [${processStack.head}] expects a $name argument. These arguments were found: " + contextStack.head.keys.mkString(", ") + s"function tree: ${processStack.reverse.mkString("->")}")).asInstanceOf[AnyProcess] + } } private def optionalArg(name: String): OpenEOProcess = { @@ -546,9 +563,42 @@ class OpenEOProcessScriptBuilder { ifElseProcess(value, accept, reject) } + private def dateComponentReplace(arguments: java.util.Map[String, Object]): AnyProcess = { + val date = arguments.get("date") + val value = arguments.get("value") + val component = arguments.get("component") + if (!value.isInstanceOf[Integer]) { + throw new IllegalArgumentException("date_replace_component: The 'value' argument should be an integer, but got: " + value) + } + + val dateProcess = (context: Map[String, Any]) => { + def normalize(argument: Any): String = { + if (argument.isInstanceOf[String]) { + argument.asInstanceOf[String] + } else if (argument.isInstanceOf[util.Map[String, Any]] && argument.asInstanceOf[util.Map[String, Any]].containsKey("from_parameter")) { + val paramName = argument.asInstanceOf[util.Map[String, Any]].get("from_parameter").asInstanceOf[String] + context.getOrElse(paramName, throw new IllegalArgumentException(s"date_difference: Parameter $paramName not found in context: $context")).asInstanceOf[String] + } else { + throw new IllegalArgumentException(s"date_difference got unexpected argument: $argument") + } + } + + val parsedDate = ZonedDateTime.parse(normalize(date)) + val theFunction = (arg:Any) => { + if("day" == component) { + parsedDate.withDayOfMonth(value.asInstanceOf[Integer]).toString + }else{ + throw new IllegalArgumentException(s"date_replace_component: unsupported component $component") + } + } + theFunction + } + return dateProcess + } + private def dateDifferenceProcess(arguments: java.util.Map[String, Object]): OpenEOProcess = { - val date1 = arguments.get("date1") - val date2 = arguments.get("date2") + val date1: AnyProcess = getAnyProcessArg("date1",arguments) + val date2 = getAnyProcessArg("date2",arguments) val dateDiffProcess = (context: Map[String, Any]) => { @@ -562,7 +612,9 @@ class OpenEOProcessScriptBuilder { throw new IllegalArgumentException(s"date_difference got unexpected argument: $argument") } } - val diff = Duration.between(ZonedDateTime.parse(normalize(date1)),ZonedDateTime.parse(normalize(date2))).toDays + val date1Evaluated = date1(context)(null) + val date2Evaluated = date2(context)(null) + val diff = Duration.between(ZonedDateTime.parse(normalize(date1Evaluated)),ZonedDateTime.parse(normalize(date2Evaluated))).toDays createConstantTileFunction(diff) } dateDiffProcess @@ -625,9 +677,12 @@ class OpenEOProcessScriptBuilder { val defaultName = defaultDataParameterName inputFunction = (context:Map[String,Any]) => (tiles: Seq[Tile]) => { if(context.contains(parameterName)) { - if(context.get(parameterName).isInstanceOf[Seq[Tile]]) { + if(context(parameterName).isInstanceOf[Seq[Tile]]) { context.getOrElse(parameterName,tiles).asInstanceOf[Seq[Tile]] - }else{ + } else if(context(parameterName).isInstanceOf[String]){ + context(parameterName) + } + else{ null } }else if(parameterName == defaultName) { @@ -734,6 +789,8 @@ class OpenEOProcessScriptBuilder { val hasTrueCondition = Try(arguments.get("condition").toString.toBoolean).getOrElse(false) val hasConditionExpression = arguments.get("condition") != null && !arguments.get("condition").isInstanceOf[Boolean] + + val operation: OpenEOProcess = operator match { case "date_difference" => dateDifferenceProcess(arguments) case "if" => ifProcess(arguments) @@ -839,12 +896,13 @@ class OpenEOProcessScriptBuilder { resultingDataType = FloatConstantNoDataCellType } } + inputFunction = operation val expectedOperator = processStack.pop() assert(expectedOperator.equals(operator)) contextStack.pop() - inputFunction = operation + } private def inspectFunction(arguments:java.util.Map[String,Object]): OpenEOProcess = { diff --git a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/OpenEOProcesses.scala b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/OpenEOProcesses.scala index 8f313dfcf..53e90b314 100644 --- a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/OpenEOProcesses.scala +++ b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/OpenEOProcesses.scala @@ -156,7 +156,7 @@ class OpenEOProcesses extends Serializable { } logger.info(s"Applying callback on time dimension of cube with partitioner: ${datacube.partitioner.getOrElse("no partitioner")} - index: ${index.getOrElse("no index")} and metadata ${datacube.metadata}") val expectedCellType = datacube.metadata.cellType - val applyToTimeseries: Iterable[(SpaceTimeKey, MultibandTile)] => mutable.Map[SpaceTimeKey, MultibandTile] = createTemporalCallback(scriptBuilder.inputFunction,context.asScala.toMap, expectedCellType) + val applyToTimeseries: Iterable[(SpaceTimeKey, MultibandTile)] => mutable.Map[SpaceTimeKey, MultibandTile] = createTemporalCallback(scriptBuilder.inputFunction.asInstanceOf[OpenEOProcess],context.asScala.toMap, expectedCellType) val rdd = if(index.isDefined && index.get.isInstanceOf[SparseSpaceOnlyPartitioner]) { @@ -411,7 +411,7 @@ class OpenEOProcesses extends Serializable { } - val function = scriptBuilder.inputFunction + val function = scriptBuilder.inputFunction.asInstanceOf[OpenEOProcess] def aggregateTiles(tiles: Iterable[(SpaceTimeKey,MultibandTile)]) = { val theContext = context.asScala.toMap + ("array_labels"->tiles.map(_._1.time)) val tilesFunction = function.apply(theContext)