From e10345163b283af351ad5e958f73eefe8a716b5a Mon Sep 17 00:00:00 2001 From: Dave Martin Date: Wed, 23 Nov 2022 13:05:26 +0000 Subject: [PATCH] Cherry picked fixes for #793 and DWCA export functionality to ease Airflow deployment Work is related to: https://github.com/AtlasOfLivingAustralia/preingestion/issues/103 --- examples/metrics/pom.xml | 2 +- examples/pom.xml | 2 +- examples/transform/pom.xml | 2 +- gbif/coordinator/pom.xml | 2 +- .../tasks-integration-tests-hbase/pom.xml | 2 +- .../tasks-integration-tests/pom.xml | 2 +- gbif/coordinator/tasks/pom.xml | 2 +- gbif/diagnostics/pom.xml | 2 +- gbif/keygen/pom.xml | 2 +- gbif/pipelines/clustering-gbif/pom.xml | 2 +- gbif/pipelines/export-gbif-hbase/pom.xml | 2 +- gbif/pipelines/ingest-gbif-beam/pom.xml | 2 +- gbif/pipelines/ingest-gbif-fragmenter/pom.xml | 2 +- gbif/pipelines/ingest-gbif-java/pom.xml | 2 +- gbif/pipelines/pom.xml | 2 +- gbif/pipelines/pre-backbone-release/pom.xml | 2 +- gbif/pom.xml | 2 +- gbif/validator/pipelines-validator-ws/pom.xml | 2 +- gbif/validator/pom.xml | 2 +- gbif/validator/validator-api/pom.xml | 2 +- gbif/validator/validator-checklists/pom.xml | 2 +- gbif/validator/validator-core/pom.xml | 2 +- gbif/validator/validator-eml/pom.xml | 2 +- gbif/validator/validator-it-tests/pom.xml | 2 +- gbif/validator/validator-mail/pom.xml | 2 +- gbif/validator/validator-ws-client/pom.xml | 2 +- livingatlas/bitmap/pom.xml | 2 +- livingatlas/configs/la-pipelines.yaml | 12 + livingatlas/migration/pom.xml | 2 +- livingatlas/pipelines/pom.xml | 2 +- .../beam/IndexRecordToDwcaPipeline.java | 66 +- .../beam/IndexRecordToSolrPipeline.java | 21 +- ...dexRecordToSolrWithPartitionsPipeline.java | 695 ++++++++++++++++++ .../ala/pipelines/common/SolrFieldSchema.java | 45 ++ .../options/DwCAExportPipelineOptions.java | 7 + .../transforms/IndexRecordTransform.java | 108 ++- .../ala/outlier/DistributionOutlierTest.java | 2 + .../converters/CoreTsvConverterTest.java | 12 +- livingatlas/pom.xml | 2 +- pom.xml | 2 +- sdks/beam-common/pom.xml | 2 +- sdks/beam-transforms/pom.xml | 2 +- sdks/core/pom.xml | 2 +- sdks/models/pom.xml | 2 +- sdks/pom.xml | 2 +- sdks/variables/pom.xml | 2 +- tools/archives-converters/pom.xml | 2 +- tools/elasticsearch-tools/pom.xml | 2 +- .../extension-converter-maven-plugin/pom.xml | 2 +- tools/pipelines-maven-plugin/pom.xml | 2 +- tools/pom.xml | 2 +- tools/xml-to-avsc-maven-plugin/pom.xml | 2 +- 52 files changed, 976 insertions(+), 78 deletions(-) create mode 100644 livingatlas/pipelines/src/main/java/au/org/ala/pipelines/beam/IndexRecordToSolrWithPartitionsPipeline.java create mode 100644 livingatlas/pipelines/src/main/java/au/org/ala/pipelines/common/SolrFieldSchema.java diff --git a/examples/metrics/pom.xml b/examples/metrics/pom.xml index 558674560c..8041114fd6 100644 --- a/examples/metrics/pom.xml +++ b/examples/metrics/pom.xml @@ -5,7 +5,7 @@ org.gbif.pipelines examples - 2.11.12.2 + 2.11.12.3-SNAPSHOT ../pom.xml diff --git a/examples/pom.xml b/examples/pom.xml index b8ebba5b6f..c14afc0630 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -3,7 +3,7 @@ pipelines-parent org.gbif.pipelines - 2.11.12.2 + 2.11.12.3-SNAPSHOT ../pom.xml diff --git a/examples/transform/pom.xml b/examples/transform/pom.xml index 322ecb0d18..3375cb89a8 100644 --- a/examples/transform/pom.xml +++ b/examples/transform/pom.xml @@ -6,7 +6,7 @@ org.gbif.pipelines examples - 2.11.12.2 + 2.11.12.3-SNAPSHOT ../pom.xml diff --git a/gbif/coordinator/pom.xml b/gbif/coordinator/pom.xml index 85bba2f303..81b7ff8250 100644 --- a/gbif/coordinator/pom.xml +++ b/gbif/coordinator/pom.xml @@ -5,7 +5,7 @@ org.gbif.pipelines gbif - 2.11.12.2 + 2.11.12.3-SNAPSHOT ../pom.xml diff --git a/gbif/coordinator/tasks-integration-tests-hbase/pom.xml b/gbif/coordinator/tasks-integration-tests-hbase/pom.xml index 135f991405..3cd2ad33cb 100644 --- a/gbif/coordinator/tasks-integration-tests-hbase/pom.xml +++ b/gbif/coordinator/tasks-integration-tests-hbase/pom.xml @@ -4,7 +4,7 @@ org.gbif.pipelines coordinator - 2.11.12.2 + 2.11.12.3-SNAPSHOT ../pom.xml diff --git a/gbif/coordinator/tasks-integration-tests/pom.xml b/gbif/coordinator/tasks-integration-tests/pom.xml index 4d7df48eb6..26dd45d08e 100644 --- a/gbif/coordinator/tasks-integration-tests/pom.xml +++ b/gbif/coordinator/tasks-integration-tests/pom.xml @@ -4,7 +4,7 @@ org.gbif.pipelines coordinator - 2.11.12.2 + 2.11.12.3-SNAPSHOT ../pom.xml diff --git a/gbif/coordinator/tasks/pom.xml b/gbif/coordinator/tasks/pom.xml index a705fa766f..7f4f6e7608 100644 --- a/gbif/coordinator/tasks/pom.xml +++ b/gbif/coordinator/tasks/pom.xml @@ -4,7 +4,7 @@ org.gbif.pipelines coordinator - 2.11.12.2 + 2.11.12.3-SNAPSHOT ../pom.xml diff --git a/gbif/diagnostics/pom.xml b/gbif/diagnostics/pom.xml index f9136d7009..62d99fae8c 100644 --- a/gbif/diagnostics/pom.xml +++ b/gbif/diagnostics/pom.xml @@ -5,7 +5,7 @@ org.gbif.pipelines gbif - 2.11.12.2 + 2.11.12.3-SNAPSHOT ../pom.xml diff --git a/gbif/keygen/pom.xml b/gbif/keygen/pom.xml index 62de0c4be3..bd04b9edbc 100644 --- a/gbif/keygen/pom.xml +++ b/gbif/keygen/pom.xml @@ -5,7 +5,7 @@ org.gbif.pipelines gbif - 2.11.12.2 + 2.11.12.3-SNAPSHOT ../pom.xml diff --git a/gbif/pipelines/clustering-gbif/pom.xml b/gbif/pipelines/clustering-gbif/pom.xml index 4cb5a42197..1c95640f68 100644 --- a/gbif/pipelines/clustering-gbif/pom.xml +++ b/gbif/pipelines/clustering-gbif/pom.xml @@ -3,7 +3,7 @@ pipelines org.gbif.pipelines - 2.11.12.2 + 2.11.12.3-SNAPSHOT ../pom.xml 4.0.0 diff --git a/gbif/pipelines/export-gbif-hbase/pom.xml b/gbif/pipelines/export-gbif-hbase/pom.xml index 40c55cd31e..0b82b41da4 100644 --- a/gbif/pipelines/export-gbif-hbase/pom.xml +++ b/gbif/pipelines/export-gbif-hbase/pom.xml @@ -5,7 +5,7 @@ org.gbif.pipelines pipelines - 2.11.12.2 + 2.11.12.3-SNAPSHOT ../pom.xml diff --git a/gbif/pipelines/ingest-gbif-beam/pom.xml b/gbif/pipelines/ingest-gbif-beam/pom.xml index 1344c92316..c6d06af790 100644 --- a/gbif/pipelines/ingest-gbif-beam/pom.xml +++ b/gbif/pipelines/ingest-gbif-beam/pom.xml @@ -5,7 +5,7 @@ org.gbif.pipelines pipelines - 2.11.12.2 + 2.11.12.3-SNAPSHOT ../pom.xml diff --git a/gbif/pipelines/ingest-gbif-fragmenter/pom.xml b/gbif/pipelines/ingest-gbif-fragmenter/pom.xml index 8728bac93f..a5a1a98120 100644 --- a/gbif/pipelines/ingest-gbif-fragmenter/pom.xml +++ b/gbif/pipelines/ingest-gbif-fragmenter/pom.xml @@ -5,7 +5,7 @@ pipelines org.gbif.pipelines - 2.11.12.2 + 2.11.12.3-SNAPSHOT ../pom.xml diff --git a/gbif/pipelines/ingest-gbif-java/pom.xml b/gbif/pipelines/ingest-gbif-java/pom.xml index d725e78a80..999ef920cd 100644 --- a/gbif/pipelines/ingest-gbif-java/pom.xml +++ b/gbif/pipelines/ingest-gbif-java/pom.xml @@ -5,7 +5,7 @@ org.gbif.pipelines pipelines - 2.11.12.2 + 2.11.12.3-SNAPSHOT ../pom.xml diff --git a/gbif/pipelines/pom.xml b/gbif/pipelines/pom.xml index e992b35b03..678e406282 100644 --- a/gbif/pipelines/pom.xml +++ b/gbif/pipelines/pom.xml @@ -5,7 +5,7 @@ org.gbif.pipelines gbif - 2.11.12.2 + 2.11.12.3-SNAPSHOT ../pom.xml diff --git a/gbif/pipelines/pre-backbone-release/pom.xml b/gbif/pipelines/pre-backbone-release/pom.xml index 01a0ae1c9e..b32f2d412c 100644 --- a/gbif/pipelines/pre-backbone-release/pom.xml +++ b/gbif/pipelines/pre-backbone-release/pom.xml @@ -3,7 +3,7 @@ pipelines org.gbif.pipelines - 2.11.12.2 + 2.11.12.3-SNAPSHOT ../pom.xml 4.0.0 diff --git a/gbif/pom.xml b/gbif/pom.xml index 00ebf2d431..ef3f82d47b 100644 --- a/gbif/pom.xml +++ b/gbif/pom.xml @@ -5,7 +5,7 @@ org.gbif.pipelines pipelines-parent - 2.11.12.2 + 2.11.12.3-SNAPSHOT ../pom.xml diff --git a/gbif/validator/pipelines-validator-ws/pom.xml b/gbif/validator/pipelines-validator-ws/pom.xml index f0c305f6a9..8a73d676c3 100644 --- a/gbif/validator/pipelines-validator-ws/pom.xml +++ b/gbif/validator/pipelines-validator-ws/pom.xml @@ -5,7 +5,7 @@ org.gbif.pipelines validator - 2.11.12.2 + 2.11.12.3-SNAPSHOT ../pom.xml diff --git a/gbif/validator/pom.xml b/gbif/validator/pom.xml index ac0e699f3a..63f21c7ad8 100644 --- a/gbif/validator/pom.xml +++ b/gbif/validator/pom.xml @@ -5,7 +5,7 @@ org.gbif.pipelines gbif - 2.11.12.2 + 2.11.12.3-SNAPSHOT ../pom.xml diff --git a/gbif/validator/validator-api/pom.xml b/gbif/validator/validator-api/pom.xml index 9eea14ef77..0a74a565c5 100644 --- a/gbif/validator/validator-api/pom.xml +++ b/gbif/validator/validator-api/pom.xml @@ -5,7 +5,7 @@ org.gbif.pipelines validator - 2.11.12.2 + 2.11.12.3-SNAPSHOT ../pom.xml diff --git a/gbif/validator/validator-checklists/pom.xml b/gbif/validator/validator-checklists/pom.xml index 00f3737b18..39def3f5c4 100644 --- a/gbif/validator/validator-checklists/pom.xml +++ b/gbif/validator/validator-checklists/pom.xml @@ -5,7 +5,7 @@ org.gbif.pipelines validator - 2.11.12.2 + 2.11.12.3-SNAPSHOT ../pom.xml diff --git a/gbif/validator/validator-core/pom.xml b/gbif/validator/validator-core/pom.xml index c05f097a43..a6a1b3eb12 100644 --- a/gbif/validator/validator-core/pom.xml +++ b/gbif/validator/validator-core/pom.xml @@ -5,7 +5,7 @@ org.gbif.pipelines validator - 2.11.12.2 + 2.11.12.3-SNAPSHOT ../pom.xml diff --git a/gbif/validator/validator-eml/pom.xml b/gbif/validator/validator-eml/pom.xml index f54e85db77..2bc0e5e70e 100644 --- a/gbif/validator/validator-eml/pom.xml +++ b/gbif/validator/validator-eml/pom.xml @@ -5,7 +5,7 @@ org.gbif.pipelines validator - 2.11.12.2 + 2.11.12.3-SNAPSHOT ../pom.xml diff --git a/gbif/validator/validator-it-tests/pom.xml b/gbif/validator/validator-it-tests/pom.xml index 4501d1cb38..fd74482459 100644 --- a/gbif/validator/validator-it-tests/pom.xml +++ b/gbif/validator/validator-it-tests/pom.xml @@ -5,7 +5,7 @@ org.gbif.pipelines validator - 2.11.12.2 + 2.11.12.3-SNAPSHOT ../pom.xml diff --git a/gbif/validator/validator-mail/pom.xml b/gbif/validator/validator-mail/pom.xml index c7ba34ce72..16cbc84ef3 100644 --- a/gbif/validator/validator-mail/pom.xml +++ b/gbif/validator/validator-mail/pom.xml @@ -3,7 +3,7 @@ org.gbif.pipelines validator - 2.11.12.2 + 2.11.12.3-SNAPSHOT ../pom.xml 4.0.0 diff --git a/gbif/validator/validator-ws-client/pom.xml b/gbif/validator/validator-ws-client/pom.xml index 546934a5dc..53b874ad4b 100644 --- a/gbif/validator/validator-ws-client/pom.xml +++ b/gbif/validator/validator-ws-client/pom.xml @@ -5,7 +5,7 @@ org.gbif.pipelines validator - 2.11.12.2 + 2.11.12.3-SNAPSHOT ../pom.xml diff --git a/livingatlas/bitmap/pom.xml b/livingatlas/bitmap/pom.xml index 6f8bf8bbfa..3386fa99d7 100644 --- a/livingatlas/bitmap/pom.xml +++ b/livingatlas/bitmap/pom.xml @@ -3,7 +3,7 @@ au.org.ala livingatlas - 2.11.12.2 + 2.11.12.3-SNAPSHOT ../pom.xml 4.0.0 diff --git a/livingatlas/configs/la-pipelines.yaml b/livingatlas/configs/la-pipelines.yaml index b3b46871ef..4927630089 100644 --- a/livingatlas/configs/la-pipelines.yaml +++ b/livingatlas/configs/la-pipelines.yaml @@ -336,6 +336,18 @@ image-load-sh-args: executor-memory: 7G driver-memory: 1G +export-sh-args: + local: + jvm: -Xmx8g -XX:+UseG1GC + spark-embedded: + jvm: -Xmx8g -XX:+UseG1GC + spark-cluster: + conf: spark.default.parallelism=48 + num-executors: 6 + executor-cores: 8 + executor-memory: 20G + driver-memory: 4G + uuid-sh-args: local: jvm: -Xmx8g -XX:+UseG1GC diff --git a/livingatlas/migration/pom.xml b/livingatlas/migration/pom.xml index 8745b751cd..11137e6755 100644 --- a/livingatlas/migration/pom.xml +++ b/livingatlas/migration/pom.xml @@ -3,7 +3,7 @@ au.org.ala livingatlas - 2.11.12.2 + 2.11.12.3-SNAPSHOT ../pom.xml 4.0.0 diff --git a/livingatlas/pipelines/pom.xml b/livingatlas/pipelines/pom.xml index 28259fee98..a4e14c2f87 100644 --- a/livingatlas/pipelines/pom.xml +++ b/livingatlas/pipelines/pom.xml @@ -5,7 +5,7 @@ au.org.ala livingatlas - 2.11.12.2 + 2.11.12.3-SNAPSHOT ../pom.xml diff --git a/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/beam/IndexRecordToDwcaPipeline.java b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/beam/IndexRecordToDwcaPipeline.java index 5db6725ecd..ad686e1f1b 100644 --- a/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/beam/IndexRecordToDwcaPipeline.java +++ b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/beam/IndexRecordToDwcaPipeline.java @@ -2,7 +2,6 @@ import static org.apache.beam.sdk.values.TypeDescriptors.strings; import static org.gbif.pipelines.common.beam.utils.PathBuilder.buildDatasetAttemptPath; -import static org.gbif.pipelines.common.beam.utils.PathBuilder.buildPath; import au.org.ala.kvs.ALAPipelinesConfig; import au.org.ala.kvs.ALAPipelinesConfigFactory; @@ -14,13 +13,18 @@ import au.org.ala.utils.ALAFsUtils; import au.org.ala.utils.CombinedYamlConfiguration; import au.org.ala.utils.ValidationUtils; +import java.io.File; +import java.io.FileOutputStream; import java.io.InputStream; import java.io.OutputStream; import java.io.OutputStreamWriter; import java.net.URL; +import java.nio.file.Files; import java.util.List; import java.util.Scanner; import java.util.function.UnaryOperator; +import java.util.zip.ZipEntry; +import java.util.zip.ZipOutputStream; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.beam.sdk.Pipeline; @@ -31,7 +35,12 @@ import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.values.PCollection; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.codehaus.plexus.util.FileUtils; import org.gbif.pipelines.common.beam.options.PipelinesOptionsFactory; +import org.gbif.pipelines.core.factory.FileSystemFactory; import org.gbif.pipelines.core.utils.FsUtils; import org.gbif.pipelines.io.avro.IndexRecord; import org.slf4j.MDC; @@ -52,10 +61,11 @@ public static void main(String[] args) throws Exception { } @SneakyThrows - public static void run(DwCAExportPipelineOptions options) { + public static void run(DwCAExportPipelineOptions options) throws Exception { UnaryOperator pathFn = - fileName -> buildPath(buildDatasetAttemptPath(options, "dwca", false), fileName).toString(); + fileName -> + String.join(Path.SEPARATOR, buildDatasetAttemptPath(options, "dwca", false), fileName); Pipeline p = Pipeline.create(options); @@ -101,6 +111,56 @@ public List apply(IndexRecord indexRecord) { // Write the eml.xml writeEML(options, pathFn); + + // create output dir + FileUtils.forceMkdir(new File(options.getLocalExportPath())); + + // copy from HDFS to /tmp + boolean originalInputIsHdfs = options.getInputPath().startsWith("hdfs://"); + + // if inputPath is "hdfs://", then copy to local + if (originalInputIsHdfs) { + + FileSystem fs = + FileSystemFactory.getInstance(options.getHdfsSiteConfig(), options.getCoreSiteConfig()) + .getFs(options.getInputPath()); + + Path inputPathHdfs = new Path(options.getInputPath()); + + if (!fs.exists(inputPathHdfs)) { + throw new RuntimeException("Input file not available: " + options.getInputPath()); + } + + String dwcaHdfsOutputPath = buildDatasetAttemptPath(options, "dwca", false); + String dwcaOutputPath = options.getLocalExportPath() + "/" + options.getDatasetId() + "/"; + RemoteIterator iter = + fs.listFiles(new Path(dwcaHdfsOutputPath + "/"), false); + + while (iter.hasNext()) { + LocatedFileStatus locatedFileStatus = iter.next(); + Path path = locatedFileStatus.getPath(); + if (fs.isFile(path)) { + log.info("Transferring " + path.toString() + " to " + dwcaOutputPath); + fs.copyToLocalFile(false, path, new Path(dwcaOutputPath + "/" + path.getName()), false); + } + } + } + + String zipPath = options.getLocalExportPath() + "/" + options.getDatasetId() + ".zip"; + String dwcaOutputPath = options.getLocalExportPath() + "/" + options.getDatasetId(); + File[] filePaths = + originalInputIsHdfs + ? new File(dwcaOutputPath).listFiles() + : new File(buildDatasetAttemptPath(options, "dwca", false)).listFiles(); + + try (ZipOutputStream zipOut = new ZipOutputStream(new FileOutputStream(zipPath))) { + for (File fileToZip : filePaths) { + log.info("Adding to Zip file: " + fileToZip.getName()); + zipOut.putNextEntry(new ZipEntry(fileToZip.getName())); + Files.copy(fileToZip.toPath(), zipOut); + } + } + log.info("Zip file written to: " + zipPath); } @SneakyThrows diff --git a/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/beam/IndexRecordToSolrPipeline.java b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/beam/IndexRecordToSolrPipeline.java index 461523313f..373b4a1a2a 100644 --- a/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/beam/IndexRecordToSolrPipeline.java +++ b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/beam/IndexRecordToSolrPipeline.java @@ -4,6 +4,7 @@ import static au.org.ala.pipelines.transforms.IndexValues.*; import static org.gbif.pipelines.common.PipelinesVariables.Pipeline.AVRO_EXTENSION; +import au.org.ala.pipelines.common.SolrFieldSchema; import au.org.ala.pipelines.options.AllDatasetsPipelinesOptions; import au.org.ala.pipelines.options.SolrPipelineOptions; import au.org.ala.pipelines.transforms.IndexFields; @@ -58,7 +59,8 @@ @Slf4j public class IndexRecordToSolrPipeline { - static final SampleRecord nullSampling = SampleRecord.newBuilder().setLatLng("NO_VALUE").build(); + private static final SampleRecord nullSampling = + SampleRecord.newBuilder().setLatLng("NO_VALUE").build(); public static final String EMPTY = "EMPTY"; static final IndexRecord nullIndexRecord = IndexRecord.newBuilder().setId(EMPTY).build(); @@ -86,7 +88,7 @@ public static void main(String[] args) throws Exception { public static void run(SolrPipelineOptions options) { - final List schemaFields = getSchemaFields(options); + final Map schemaFields = getSchemaFields(options); final List dynamicFieldPrefixes = getSchemaDynamicFieldPrefixes(options); final int numOfPartitions = options.getNumOfPartitions(); @@ -177,15 +179,20 @@ public static boolean hasCoordinates(IndexRecord indexRecord) { } @NotNull - private static List getSchemaFields(SolrPipelineOptions options) { + private static Map getSchemaFields(SolrPipelineOptions options) { try (CloudSolrClient client = new CloudSolrClient.Builder(ImmutableList.of(options.getZkHost()), Optional.empty()) .build()) { SchemaRequest.Fields fields = new SchemaRequest.Fields(); SchemaResponse.FieldsResponse response = fields.process(client, options.getSolrCollection()); - return response.getFields().stream() - .map(f -> f.get("name").toString()) - .collect(Collectors.toList()); + Map schema = new HashMap(); + for (Map field : response.getFields()) { + schema.put( + (String) field.get("name"), + new SolrFieldSchema( + (String) field.get("type"), (boolean) field.getOrDefault("multiValued", false))); + } + return schema; } catch (Exception e) { throw new RuntimeException("Unable to retrieve schema fields: " + e.getMessage()); } @@ -229,7 +236,7 @@ private static void writeToSolr( SolrPipelineOptions options, PCollection indexRecords, SolrIO.ConnectionConfiguration conn, - final List schemaFields, + final Map schemaFields, final List dynamicFieldPrefixes) { if (options.getOutputAvroToFilePath() == null) { diff --git a/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/beam/IndexRecordToSolrWithPartitionsPipeline.java b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/beam/IndexRecordToSolrWithPartitionsPipeline.java new file mode 100644 index 0000000000..6a9b9e834e --- /dev/null +++ b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/beam/IndexRecordToSolrWithPartitionsPipeline.java @@ -0,0 +1,695 @@ +package au.org.ala.pipelines.beam; + +import static au.org.ala.pipelines.transforms.IndexFields.*; +import static au.org.ala.pipelines.transforms.IndexValues.*; +import static org.gbif.pipelines.common.PipelinesVariables.Pipeline.AVRO_EXTENSION; + +import au.org.ala.pipelines.common.SolrFieldSchema; +import au.org.ala.pipelines.options.AllDatasetsPipelinesOptions; +import au.org.ala.pipelines.options.SolrPipelineOptions; +import au.org.ala.pipelines.transforms.IndexFields; +import au.org.ala.pipelines.transforms.IndexRecordTransform; +import au.org.ala.pipelines.transforms.IndexValues; +import au.org.ala.pipelines.util.VersionInfo; +import au.org.ala.utils.ALAFsUtils; +import au.org.ala.utils.CombinedYamlConfiguration; +import au.org.ala.utils.ValidationUtils; +import avro.shaded.com.google.common.collect.ImmutableMap; +import java.util.*; +import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.extensions.joinlibrary.Join; +import org.apache.beam.sdk.io.AvroIO; +import org.apache.beam.sdk.io.solr.SolrIO; +import org.apache.beam.sdk.transforms.*; +import org.apache.beam.sdk.transforms.Partition.PartitionFn; +import org.apache.beam.sdk.transforms.join.CoGbkResult; +import org.apache.beam.sdk.transforms.join.CoGroupByKey; +import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; +import org.apache.beam.sdk.values.*; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.apache.solr.client.solrj.impl.CloudSolrClient; +import org.apache.solr.client.solrj.request.schema.SchemaRequest; +import org.apache.solr.client.solrj.response.schema.SchemaResponse; +import org.apache.solr.common.SolrInputDocument; +import org.gbif.dwc.terms.DwcTerm; +import org.gbif.pipelines.common.beam.options.PipelinesOptionsFactory; +import org.gbif.pipelines.common.beam.utils.PathBuilder; +import org.gbif.pipelines.io.avro.*; +import org.jetbrains.annotations.NotNull; +import org.joda.time.Duration; +import org.slf4j.MDC; + +/** + * Pipeline that joins sample data and index records and indexes to SOLR. This version of the index + * record to SOLR pipeline works better on non-EMR environments. + */ +@Slf4j +public class IndexRecordToSolrWithPartitionsPipeline { + + static final SampleRecord nullSampling = SampleRecord.newBuilder().setLatLng("NO_VALUE").build(); + public static final String EMPTY = "EMPTY"; + static final JackKnifeOutlierRecord nullJkor = + JackKnifeOutlierRecord.newBuilder().setId(EMPTY).setItems(new ArrayList<>()).build(); + static final Relationships nullClustering = + Relationships.newBuilder().setId(EMPTY).setRelationships(new ArrayList<>()).build(); + static final DistributionOutlierRecord nullOutlier = + DistributionOutlierRecord.newBuilder().setId(EMPTY).build(); + + public static void main(String[] args) throws Exception { + VersionInfo.print(); + MDC.put("step", "INDEX_RECORD_TO_SOLR"); + + String[] combinedArgs = new CombinedYamlConfiguration(args).toArgs("general", "solr"); + SolrPipelineOptions options = + PipelinesOptionsFactory.create(SolrPipelineOptions.class, combinedArgs); + MDC.put("datasetId", options.getDatasetId() != null ? options.getDatasetId() : "ALL_RECORDS"); + options.setMetaFileName(ValidationUtils.INDEXING_METRICS); + PipelinesOptionsFactory.registerHdfs(options); + run(options); + // FIXME: Issue logged here: https://github.com/AtlasOfLivingAustralia/la-pipelines/issues/105 + System.exit(0); + } + + public static boolean hasCoordinates(IndexRecord indexRecord) { + return indexRecord.getLatLng() != null; + } + + public static void run(SolrPipelineOptions options) { + + final Map schemaFields = getSchemaFields(options); + final List dynamicFieldPrefixes = getSchemaDynamicFieldPrefixes(options); + + Pipeline pipeline = Pipeline.create(options); + + // Load IndexRecords - keyed on UUID + PCollection> indexRecordsCollection = + loadIndexRecords(options, pipeline); + + // If configured to do so, include jack knife information + if (options.getIncludeJackKnife()) { + log.info("Adding jack knife to the index"); + indexRecordsCollection = addJacknifeInfo(options, pipeline, indexRecordsCollection); + } else { + log.info("Skipping adding jack knife to the index"); + } + + // If configured to do so, include jack knife information + if (options.getIncludeClustering()) { + log.info("Adding clustering to the index"); + indexRecordsCollection = addClusteringInfo(options, pipeline, indexRecordsCollection); + } else { + log.info("Skipping adding clustering to the index"); + } + + if (options.getIncludeOutlier()) { + log.info("Adding outlier to the index"); + indexRecordsCollection = addOutlierInfo(options, pipeline, indexRecordsCollection); + } else { + log.info("Skipping adding outlier to the index"); + } + + PCollection> recordsWithoutCoordinates = null; + + if (options.getIncludeSampling()) { + + log.info("Adding sampling to the index"); + + // Load Samples - keyed on LatLng + PCollection> sampleRecords = loadSampleRecords(options, pipeline); + + // Split into records with coordinates and records without + // Filter records with coordinates - we will join these to samples + PCollection> recordsWithCoordinates = + indexRecordsCollection.apply( + Filter.by(indexRecord -> hasCoordinates(indexRecord.getValue()))); + + // Filter records without coordinates - we will index, but not sample these + recordsWithoutCoordinates = + indexRecordsCollection.apply( + Filter.by(indexRecord -> !hasCoordinates(indexRecord.getValue()))); + + // Convert to KV + PCollection> recordsWithCoordinatesKeyedLatng = + recordsWithCoordinates.apply( + MapElements.via( + new SimpleFunction, KV>() { + @Override + public KV apply(KV input) { + String latLng = + input.getValue().getLatLng() == null + ? "NO_LAT_LNG" + : input.getValue().getLatLng(); + return KV.of(latLng, input.getValue()); + } + })); + + final Integer noOfPartitions = options.getNumOfPartitions(); + PCollection> pcs = null; + + if (noOfPartitions > 1) { + + PCollectionList> partitions = + recordsWithCoordinatesKeyedLatng.apply( + Partition.of( + noOfPartitions, + (PartitionFn>) + (elem, numPartitions) -> + (elem.getValue().getInts().get(DwcTerm.month.simpleName()) != null + ? elem.getValue().getInts().get(DwcTerm.month.simpleName()) + : 0) + % noOfPartitions)); + + PCollectionList> pcl = null; + for (int i = 0; i < noOfPartitions; i++) { + PCollection> p = + joinSampleRecord(sampleRecords, partitions.get(i)); + if (i == 0) { + pcl = PCollectionList.of(p); + } else { + pcl = pcl.and(p); + } + } + + pcs = pcl.apply(Flatten.pCollections()); + + } else { + pcs = joinSampleRecord(sampleRecords, recordsWithCoordinatesKeyedLatng); + } + + log.info("Adding step 4: Create SOLR connection"); + SolrIO.ConnectionConfiguration conn = + SolrIO.ConnectionConfiguration.create(options.getZkHost()); + + writeToSolr(options, pcs, conn, schemaFields, dynamicFieldPrefixes); + + if (recordsWithoutCoordinates != null) { + log.info("Adding step 5: Write records (without coordinates) to SOLR"); + writeToSolr(options, recordsWithoutCoordinates, conn, schemaFields, dynamicFieldPrefixes); + } + + } else { + log.info("Adding step 4: Create SOLR connection"); + SolrIO.ConnectionConfiguration conn = + SolrIO.ConnectionConfiguration.create(options.getZkHost()); + + writeToSolr(options, indexRecordsCollection, conn, schemaFields, dynamicFieldPrefixes); + } + + log.info("Starting pipeline"); + pipeline.run(options).waitUntilFinish(); + + log.info("Solr indexing pipeline complete"); + } + + @NotNull + private static Map getSchemaFields(SolrPipelineOptions options) { + try (CloudSolrClient client = + new CloudSolrClient.Builder(ImmutableList.of(options.getZkHost()), Optional.empty()) + .build()) { + SchemaRequest.Fields fields = new SchemaRequest.Fields(); + SchemaResponse.FieldsResponse response = fields.process(client, options.getSolrCollection()); + Map schema = new HashMap<>(); + for (Map field : response.getFields()) { + schema.put( + (String) field.get("name"), + new SolrFieldSchema( + (String) field.get("type"), (boolean) field.getOrDefault("multiValued", false))); + } + return schema; + } catch (Exception e) { + throw new RuntimeException("Unable to retrieve schema fields: " + e.getMessage()); + } + } + + @NotNull + private static List getSchemaDynamicFieldPrefixes(SolrPipelineOptions options) { + try { + CloudSolrClient client = + new CloudSolrClient.Builder().withZkHost(options.getZkHost()).build(); + SchemaRequest.DynamicFields fields = new SchemaRequest.DynamicFields(); + SchemaResponse.DynamicFieldsResponse response = + fields.process(client, options.getSolrCollection()); + return response.getDynamicFields().stream() + .map(f -> f.get("name").toString().replace("*", "")) + .collect(Collectors.toList()); + } catch (Exception e) { + throw new RuntimeException("Unable to retrieve schema fields: " + e.getMessage()); + } + } + + private static PCollection> joinSampleRecord( + PCollection> sampleRecords, + PCollection> recordsWithCoordinatesKeyedLatng) { + PCollection> indexRecordsCollection; + // Co group IndexRecords with coordinates with Sample data + final TupleTag indexRecordTag = new TupleTag<>(); + final TupleTag samplingTag = new TupleTag<>(); + + // Join collections by LatLng string + PCollection> results = + KeyedPCollectionTuple.of(samplingTag, sampleRecords) + .and(indexRecordTag, recordsWithCoordinatesKeyedLatng) + .apply(CoGroupByKey.create()); + + // Create collection which contains samples keyed with indexRecord.id + return results.apply(ParDo.of(joinSampling(indexRecordTag, samplingTag))); + } + + private static PCollection> addJacknifeInfo( + SolrPipelineOptions options, + Pipeline pipeline, + PCollection> indexRecords) { + + // Load Jackknife, keyed on ID + PCollection> jackKnifeRecordsKeyedRecordID = + loadJackKnifeRecords(options, pipeline); + + // Join indexRecordsSampling and jackKnife + PCollection>> + indexRecordSamplingJoinJackKnife = + Join.leftOuterJoin(indexRecords, jackKnifeRecordsKeyedRecordID, nullJkor); + + // Add Jackknife information + return indexRecordSamplingJoinJackKnife.apply(ParDo.of(addJackknifeInfo())); + } + + private static PCollection> addClusteringInfo( + SolrPipelineOptions options, + Pipeline pipeline, + PCollection> indexRecords) { + + // Load clustering, keyed on ID + PCollection> clusteringRecordsKeyedRecordID = + loadClusteringRecords(options, pipeline); + + // Join indexRecordsSampling and jackKnife + PCollection>> indexRecordJoinClustering = + Join.leftOuterJoin(indexRecords, clusteringRecordsKeyedRecordID, nullClustering); + + // Add Jackknife information + return indexRecordJoinClustering.apply(ParDo.of(addClusteringInfo())); + } + + private static PCollection> addOutlierInfo( + SolrPipelineOptions options, + Pipeline pipeline, + PCollection> indexRecords) { + + // Load outlier records, keyed on ID + PCollection> outlierRecords = + loadOutlierRecords(options, pipeline); + PCollection>> indexRecordJoinOurlier = + Join.leftOuterJoin(indexRecords, outlierRecords, nullOutlier); + + // Add Jackknife information + return indexRecordJoinOurlier.apply(ParDo.of(addOutlierInfo())); + } + + private static void writeToSolr( + SolrPipelineOptions options, + PCollection> kvIndexRecords, + SolrIO.ConnectionConfiguration conn, + final Map schemaFields, + final List dynamicFieldPrefixes) { + + if (options.getOutputAvroToFilePath() == null) { + + kvIndexRecords + .apply( + "IndexRecord to SOLR Document", + ParDo.of( + new DoFn, SolrInputDocument>() { + @DoFn.ProcessElement + public void processElement( + @DoFn.Element KV kvIndexRecord, + OutputReceiver out) { + SolrInputDocument solrInputDocument = + IndexRecordTransform.convertIndexRecordToSolrDoc( + kvIndexRecord.getValue(), schemaFields, dynamicFieldPrefixes); + out.output(solrInputDocument); + } + })) + .apply( + SolrIO.write() + .to(options.getSolrCollection()) + .withConnectionConfiguration(conn) + .withMaxBatchSize(options.getSolrBatchSize()) + .withRetryConfiguration( + SolrIO.RetryConfiguration.create( + options.getSolrRetryMaxAttempts(), + Duration.standardMinutes(options.getSolrRetryDurationInMins())))); + } else { + kvIndexRecords + .apply(Values.create()) + .apply(AvroIO.write(IndexRecord.class).to(options.getOutputAvroToFilePath())); + } + } + + private static DoFn>, KV> + addJackknifeInfo() { + + return new DoFn< + KV>, KV>() { + + @ProcessElement + public void processElement(ProcessContext c) { + + KV> e = c.element(); + IndexRecord indexRecord = e.getValue().getKey(); + JackKnifeOutlierRecord jkor = e.getValue().getValue(); + Map ints = indexRecord.getInts(); + + if (ints == null) { + ints = new HashMap<>(); + indexRecord.setInts(ints); + } + + if (jkor != nullJkor && !EMPTY.equals(jkor.getId())) { + + ints.put(OUTLIER_LAYER_COUNT, jkor.getItems().size()); + + Map> multiValues = indexRecord.getMultiValues(); + if (multiValues == null) { + multiValues = new HashMap<>(); + indexRecord.setMultiValues(multiValues); + } + + multiValues.put(OUTLIER_LAYER, jkor.getItems()); + } else { + ints.put(OUTLIER_LAYER_COUNT, 0); + } + + c.output(KV.of(indexRecord.getId(), indexRecord)); + } + }; + } + + private static DoFn>, KV> + addClusteringInfo() { + + return new DoFn>, KV>() { + @ProcessElement + public void processElement(ProcessContext c) { + + KV> e = c.element(); + + IndexRecord indexRecord = e.getValue().getKey(); + String id = indexRecord.getId(); + + Relationships jkor = e.getValue().getValue(); + + Map> multiValues = indexRecord.getMultiValues(); + Map booleans = indexRecord.getBooleans(); + Map strings = indexRecord.getStrings(); + Map ints = indexRecord.getInts(); + + if (multiValues == null) { + multiValues = new HashMap<>(); + indexRecord.setMultiValues(multiValues); + } + + if (booleans == null) { + booleans = new HashMap<>(); + indexRecord.setBooleans(booleans); + } + + if (strings == null) { + strings = new HashMap<>(); + indexRecord.setStrings(strings); + } + + if (ints == null) { + ints = new HashMap<>(); + indexRecord.setInts(ints); + } + + if (jkor != nullClustering + && !EMPTY.equals(jkor.getId()) + && !jkor.getRelationships().isEmpty()) { + + booleans.put(IS_IN_CLUSTER, true); + + boolean isRepresentative = false; + + Set duplicateType = new HashSet<>(); + for (Relationship relationship : jkor.getRelationships()) { + + // A record may end up being marked as both associative + // and representative in two or more separate clusters. + // The important thing here is that if it is marked + // as representative in any relationship we mark is as such + // as the sole purpose of representative/associative markers + // is to allow users to filter out duplicates. + boolean linkingError = false; + if (relationship.getRepId().equals(id)) { + isRepresentative = true; + } + + if (relationship.getDupDataset().equals(relationship.getRepDataset())) { + duplicateType.add(SAME_DATASET); + } else if (!linkingError) { + duplicateType.add(DIFFERENT_DATASET); + } else { + duplicateType.add(LINKING_ERROR); + } + } + + String duplicateStatus = IndexValues.ASSOCIATED; + if (isRepresentative) { + duplicateStatus = IndexValues.REPRESENTATIVE; + } + + // a record may be representative of several records + List isRepresentativeOf = + jkor.getRelationships().stream() + .map(Relationship::getDupId) + .distinct() + .filter(recordId -> !recordId.equals(id)) + .collect(Collectors.toList()); + + // a record is a duplicate of a single representative record + List isDuplicateOf = + jkor.getRelationships().stream() + .distinct() + .filter(relationship -> relationship.getDupId().equals(id)) + .collect(Collectors.toList()); + + if (!isRepresentativeOf.isEmpty()) { + multiValues.put(IndexFields.IS_REPRESENTATIVE_OF, isRepresentativeOf); + strings.put( + DwcTerm.associatedOccurrences.simpleName(), String.join("|", isRepresentativeOf)); + } + + if (!isDuplicateOf.isEmpty()) { + strings.put(IS_DUPLICATE_OF, isDuplicateOf.get(0).getRepId()); + String[] justification = isDuplicateOf.get(0).getJustification().split(","); + multiValues.put(DUPLICATE_JUSTIFICATION, Arrays.asList(justification)); + strings.put( + DwcTerm.associatedOccurrences.simpleName(), isDuplicateOf.get(0).getRepId()); + } + + // set the status + strings.put(DUPLICATE_STATUS, duplicateStatus); + + // add duplicate types + List duplicateTypeList = new ArrayList<>(duplicateType); + multiValues.put(DUPLICATE_TYPE, duplicateTypeList); + + } else { + booleans.put(IS_IN_CLUSTER, false); + } + c.output(KV.of(indexRecord.getId(), indexRecord)); + } + }; + } + + private static DoFn, KV> joinSampling( + TupleTag indexRecordTag, TupleTag samplingTag) { + + return new DoFn, KV>() { + @ProcessElement + public void processElement(ProcessContext c) { + + KV e = c.element(); + + SampleRecord sampleRecord = e.getValue().getOnly(samplingTag, nullSampling); + Iterable indexRecordIterable = e.getValue().getAll(indexRecordTag); + + if (sampleRecord.getStrings() == null && sampleRecord.getDoubles() == null) { + log.error("Sampling was empty for point: {}", e.getKey()); + } + + indexRecordIterable.forEach( + indexRecord -> { + Map strings = + indexRecord.getStrings() != null ? indexRecord.getStrings() : new HashMap<>(); + Map doubles = + indexRecord.getDoubles() != null ? indexRecord.getDoubles() : new HashMap<>(); + + Map stringsToPersist = + ImmutableMap.builder() + .putAll(strings) + .putAll(sampleRecord.getStrings()) + .build(); + + Map doublesToPersist = + ImmutableMap.builder() + .putAll(doubles) + .putAll(sampleRecord.getDoubles()) + .build(); + + IndexRecord ir = + IndexRecord.newBuilder() + .setId(indexRecord.getId()) + .setTaxonID(indexRecord.getTaxonID()) + .setLatLng(indexRecord.getLatLng()) + .setMultiValues(indexRecord.getMultiValues()) + .setDates(indexRecord.getDates()) + .setLongs(indexRecord.getLongs()) + .setBooleans(indexRecord.getBooleans()) + .setInts(indexRecord.getInts()) + .setStrings(stringsToPersist) + .setDoubles(doublesToPersist) + .setDynamicProperties(indexRecord.getDynamicProperties()) + .build(); + + c.output(KV.of(indexRecord.getId(), ir)); + }); + } + }; + } + + private static DoFn< + KV>, KV> + addOutlierInfo() { + + return new DoFn< + KV>, KV>() { + @ProcessElement + public void processElement(ProcessContext c) { + + KV> e = c.element(); + String id = e.getKey(); + + DistributionOutlierRecord outlierRecord = e.getValue().getValue(); + IndexRecord indexRecord = e.getValue().getKey(); + + if (outlierRecord != null) { + indexRecord + .getDoubles() + .put(DISTANCE_FROM_EXPERT_DISTRIBUTION, outlierRecord.getDistanceOutOfEDL()); + } + + c.output(KV.of(id, indexRecord)); + } + }; + } + + /** Load index records from AVRO. */ + private static PCollection> loadIndexRecords( + SolrPipelineOptions options, Pipeline p) { + return ALAFsUtils.loadIndexRecords(options, p) + .apply( + MapElements.via( + new SimpleFunction>() { + @Override + public KV apply(IndexRecord input) { + return KV.of(input.getId(), input); + } + })); + } + + private static PCollection> loadSampleRecords( + AllDatasetsPipelinesOptions options, Pipeline p) { + String samplingPath = + String.join("/", ALAFsUtils.buildPathSamplingUsingTargetPath(options), "*.avro"); + log.info("Loading sampling from {}", samplingPath); + return p.apply(AvroIO.read(SampleRecord.class).from(samplingPath)) + .apply( + MapElements.via( + new SimpleFunction>() { + @Override + public KV apply(SampleRecord input) { + return KV.of(input.getLatLng(), input); + } + })); + } + + private static PCollection> loadJackKnifeRecords( + SolrPipelineOptions options, Pipeline p) { + + if (!options.getIncludeJackKnife()) { + return p.apply(Create.empty(new TypeDescriptor>() {})); + } + + String jackknifePath = + PathBuilder.buildPath(options.getJackKnifePath(), "outliers", "*" + AVRO_EXTENSION) + .toString(); + log.info("Loading jackknife from {}", jackknifePath); + return p.apply(AvroIO.read(JackKnifeOutlierRecord.class).from(jackknifePath)) + .apply( + MapElements.via( + new SimpleFunction>() { + @Override + public KV apply(JackKnifeOutlierRecord input) { + return KV.of(input.getId(), input); + } + })); + } + + private static PCollection> loadClusteringRecords( + SolrPipelineOptions options, Pipeline p) { + + if (!options.getIncludeClustering()) { + return p.apply(Create.empty(new TypeDescriptor>() {})); + } + + String path = + PathBuilder.buildPath( + options.getClusteringPath() + "/relationships/", "relationships-*" + AVRO_EXTENSION) + .toString(); + log.info("Loading clustering from {}", path); + + return p.apply(AvroIO.read(Relationships.class).from(path)) + .apply( + MapElements.via( + new SimpleFunction>() { + @Override + public KV apply(Relationships input) { + return KV.of(input.getId(), input); + } + })); + } + + private static PCollection> loadOutlierRecords( + SolrPipelineOptions options, Pipeline p) { + + if (!options.getIncludeOutlier()) { + return p.apply(Create.empty(new TypeDescriptor>() {})); + } + + String dataResourceFolder = options.getDatasetId(); + if (dataResourceFolder == null || "all".equalsIgnoreCase(dataResourceFolder)) { + dataResourceFolder = "all"; + } + + String path = + PathBuilder.buildPath(options.getOutlierPath(), dataResourceFolder, "*" + AVRO_EXTENSION) + .toString(); + log.info("Loading outlier from {}", path); + + return p.apply(AvroIO.read(DistributionOutlierRecord.class).from(path)) + .apply( + MapElements.via( + new SimpleFunction< + DistributionOutlierRecord, KV>() { + @Override + public KV apply( + DistributionOutlierRecord input) { + return KV.of(input.getId(), input); + } + })); + } +} diff --git a/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/common/SolrFieldSchema.java b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/common/SolrFieldSchema.java new file mode 100644 index 0000000000..c3c620c45e --- /dev/null +++ b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/common/SolrFieldSchema.java @@ -0,0 +1,45 @@ +package au.org.ala.pipelines.common; + +public class SolrFieldSchema implements java.io.Serializable { + public enum Types { + STRING("string"), + DOUBLE("double"), + INT("int"), + LONG("long"), + FLOAT("float"), + DATE("date"), + BOOLEAN("boolean"); + + private String type; + + Types(String type) { + this.type = type; + } + + public String getValue() { + return type; + } + + @Override + public String toString() { + return String.valueOf(type); + } + + public static Types valueOfType(String type) { + for (Types e : values()) { + if (e.type.equals(type)) { + return e; + } + } + return null; + } + } + + public Types type = Types.STRING; + public boolean multiple = false; + + public SolrFieldSchema(String type, boolean multiple) { + this.type = Types.valueOfType(type); + this.multiple = multiple; + } +} diff --git a/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/options/DwCAExportPipelineOptions.java b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/options/DwCAExportPipelineOptions.java index 2f5ba97d9a..f73afe5f4e 100644 --- a/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/options/DwCAExportPipelineOptions.java +++ b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/options/DwCAExportPipelineOptions.java @@ -1,5 +1,6 @@ package au.org.ala.pipelines.options; +import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; /** Main pipeline options necessary for DwCA export for Living atlases */ @@ -9,4 +10,10 @@ public interface DwCAExportPipelineOptions extends IndexingPipelineOptions { String getImageServicePath(); void setImageServicePath(String imageServicePath); + + @Description("Get local export path to write archives") + @Default.String("/tmp/pipelines-export") + String getLocalExportPath(); + + void setLocalExportPath(String localExportPath); } diff --git a/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/transforms/IndexRecordTransform.java b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/transforms/IndexRecordTransform.java index 74bb51d354..23e5ce2eb2 100644 --- a/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/transforms/IndexRecordTransform.java +++ b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/transforms/IndexRecordTransform.java @@ -5,6 +5,7 @@ import static org.apache.avro.Schema.Type.UNION; import static org.gbif.pipelines.common.PipelinesVariables.Metrics.AVRO_TO_JSON_COUNT; +import au.org.ala.pipelines.common.SolrFieldSchema; import au.org.ala.pipelines.interpreters.SensitiveDataInterpreter; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableSet; @@ -16,6 +17,7 @@ import java.time.temporal.TemporalAccessor; import java.util.*; import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import org.apache.avro.Schema; @@ -52,7 +54,12 @@ public class IndexRecordTransform implements Serializable, IndexFields { private static final TermFactory TERM_FACTORY = TermFactory.instance(); public static final String ISSUES = "issues"; public static final String CLASSS = "classs"; + public static final int YYYY_DD_MM_FORMAT_LENGTH = 10; + public static final int YYYY_MM_DDTHH_mm_ss_Z_LENGTH = 22; public static final String RAW_PREFIX = "raw_"; + public static final String MULTIPLE_VALUES_DELIM = "\\|"; + public static final String YYYY_DD_MM_FORMAT = "yyyy-MM-dd"; + public static final String YYYY_MM_DDTHH_mm_ss_Z_FORMAT = "yyyy-MM-dd'T'HH:mmXXX"; // Core @NonNull private TupleTag erTag; @@ -165,6 +172,9 @@ public static IndexRecord createIndexRecord( skipKeys.add("identifiedByIds"); // multi value field skipKeys.add("recordedByIds"); // multi value field skipKeys.add("machineTags"); // TODO review content + skipKeys.add( + "establishmentMeans"); // GBIF treats it as a JSON, but ALA needs a String which is defined + // in the latest DWC // multi valued fields skipKeys.add("identifiedByIds"); @@ -232,6 +242,8 @@ public static IndexRecord createIndexRecord( addToIndexRecord(br, indexRecord, skipKeys); if (br != null) { + addEstablishmentValueSafely( + indexRecord, DwcTerm.establishmentMeans.simpleName(), br.getEstablishmentMeans()); addTermWithAgentsSafely( indexRecord, DwcTerm.recordedByID.simpleName(), br.getRecordedByIds()); addMultiValueTermSafely(indexRecord, DwcTerm.typeStatus.simpleName(), br.getTypeStatus()); @@ -274,7 +286,7 @@ public static IndexRecord createIndexRecord( String occurrenceYear = tr.getYear() + "-01-01"; try { long occurrenceYearTime = - new SimpleDateFormat("yyyy-MM-dd").parse(occurrenceYear).getTime(); + new SimpleDateFormat(YYYY_DD_MM_FORMAT).parse(occurrenceYear).getTime(); indexRecord.getDates().put(OCCURRENCE_YEAR, occurrenceYearTime); } catch (ParseException ex) { // NOP @@ -583,6 +595,11 @@ public static IndexRecord createIndexRecord( // we carry on indexing ObjectMapper om = new ObjectMapper(); Map dynamicProperties = om.readValue(entry.getValue(), Map.class); + + // ensure the dynamic properties are maps of string, string to avoid serialisation + // issues + dynamicProperties.replaceAll((s, c) -> c != null ? c.toString() : ""); + indexRecord.setDynamicProperties(dynamicProperties); } catch (Exception e) { // NOP @@ -675,6 +692,13 @@ private static void addMultiValueTermSafely( } } + private static void addEstablishmentValueSafely( + IndexRecord.Builder indexRecord, String field, VocabularyConcept establishmentMeans) { + if (establishmentMeans != null) { + indexRecord.getStrings().put(field, establishmentMeans.getConcept()); + } + } + private static void addTermSafely( IndexRecord.Builder indexRecord, Map extension, DwcTerm dwcTerm) { String termValue = extension.get(dwcTerm.name()); @@ -836,7 +860,18 @@ public void processElement(ProcessContext c) { String k = c.element().getKey(); // ALA specific - ALAUUIDRecord ur = v.getOnly(urTag, null); + Iterable urs = v.getAll(urTag); + List result = + StreamSupport.stream(urs.spliterator(), false).collect(Collectors.toList()); + + if (result.size() == 0) { + throw new RuntimeException("AAR missing for record key " + k); + } + if (result.size() > 1) { + throw new RuntimeException("Multiple AARs for record key " + k); + } + + ALAUUIDRecord ur = result.get(0); if (ur != null && !ur.getId().startsWith(REMOVED_PREFIX_MARKER)) { @@ -912,14 +947,9 @@ public void processElement(ProcessContext c) { } } } else { - if (!ur.getId().startsWith(REMOVED_PREFIX_MARKER)) { - if (ur != null) { - log.error("UUID missing for record ID " + ur.getId()); - throw new RuntimeException("UUID missing for record ID " + ur.getId()); - } else { - log.error("UUID missing and ER empty"); - throw new RuntimeException("UUID missing and ER empty"); - } + if (ur != null && !ur.getId().startsWith(REMOVED_PREFIX_MARKER)) { + log.error("UUID missing and ER empty"); + throw new RuntimeException("UUID missing and ER empty"); } } } @@ -1051,21 +1081,71 @@ public static void addStringSafely(SolrInputDocument doc, String key, String val } public static SolrInputDocument convertIndexRecordToSolrDoc( - IndexRecord indexRecord, List schemaFields, List dynamicFieldPrefixes) { + IndexRecord indexRecord, + Map schemaFields, + List dynamicFieldPrefixes) { SolrInputDocument doc = new SolrInputDocument(); doc.setField(ID, indexRecord.getId()); // keep track of added dynamic properties for (Map.Entry s : indexRecord.getStrings().entrySet()) { - if (schemaFields.contains(s.getKey()) || startsWithPrefix(dynamicFieldPrefixes, s.getKey())) { + if (schemaFields.containsKey(s.getKey()) + || startsWithPrefix(dynamicFieldPrefixes, s.getKey())) { addStringSafely(doc, s.getKey(), s.getValue()); } else { // clean up field name before adding String key = s.getKey().replaceAll("[^A-Za-z0-9]", "_"); if (StringUtils.isNotEmpty(key) && doc.getFieldValue(DYNAMIC_PROPERTIES_PREFIX + key) == null) { - addStringSafely(doc, DYNAMIC_PROPERTIES_PREFIX + key, s.getValue()); + SolrFieldSchema fieldSchema = schemaFields.get(DYNAMIC_PROPERTIES_PREFIX + key); + if ((fieldSchema != null) && (fieldSchema.type != null)) { + if (fieldSchema.multiple) { + doc.addField( + DYNAMIC_PROPERTIES_PREFIX + key, s.getValue().split(MULTIPLE_VALUES_DELIM)); + } else { + switch (fieldSchema.type) { + case BOOLEAN: + doc.addField(DYNAMIC_PROPERTIES_PREFIX + key, Boolean.valueOf(s.getValue())); + break; + case DATE: + try { + Date date = null; + if ((s.getValue() != null) + && (s.getValue().length() == YYYY_MM_DDTHH_mm_ss_Z_LENGTH)) { + SimpleDateFormat sdf = new SimpleDateFormat(YYYY_MM_DDTHH_mm_ss_Z_FORMAT); + date = sdf.parse(s.getValue()); + } + if ((s.getValue() != null) + && (s.getValue().length() == YYYY_DD_MM_FORMAT_LENGTH)) { + SimpleDateFormat sdf = new SimpleDateFormat(YYYY_DD_MM_FORMAT); + date = sdf.parse(s.getValue()); + } + doc.addField(DYNAMIC_PROPERTIES_PREFIX + key, date); + } catch (ParseException e) { + log.error("Cannot parse date " + s.getValue()); + } + break; + case DOUBLE: + doc.addField(DYNAMIC_PROPERTIES_PREFIX + key, Double.valueOf(s.getValue())); + break; + case FLOAT: + doc.addField(DYNAMIC_PROPERTIES_PREFIX + key, Float.valueOf(s.getValue())); + break; + case INT: + doc.addField(DYNAMIC_PROPERTIES_PREFIX + key, Integer.valueOf(s.getValue())); + break; + case LONG: + doc.addField(DYNAMIC_PROPERTIES_PREFIX + key, Long.valueOf(s.getValue())); + break; + case STRING: + addStringSafely(doc, DYNAMIC_PROPERTIES_PREFIX + key, s.getValue()); + break; + } + } + } else { + addStringSafely(doc, DYNAMIC_PROPERTIES_PREFIX + key, s.getValue()); + } } } } @@ -1120,6 +1200,6 @@ public static SolrInputDocument convertIndexRecordToSolrDoc( } private static boolean isNotBlank(String s) { - return s != null && s.trim().isEmpty(); + return s != null && !s.trim().isEmpty(); } } diff --git a/livingatlas/pipelines/src/test/java/au/org/ala/outlier/DistributionOutlierTest.java b/livingatlas/pipelines/src/test/java/au/org/ala/outlier/DistributionOutlierTest.java index 5acb3a32c0..b150d222e4 100644 --- a/livingatlas/pipelines/src/test/java/au/org/ala/outlier/DistributionOutlierTest.java +++ b/livingatlas/pipelines/src/test/java/au/org/ala/outlier/DistributionOutlierTest.java @@ -5,8 +5,10 @@ import au.org.ala.distribution.DistributionLayer; import au.org.ala.distribution.DistributionServiceImpl; import java.util.*; +import org.junit.Ignore; import org.junit.Test; +@Ignore public class DistributionOutlierTest { // String spatial_url = "http://devt.ala.org.au:8080/ws/"; String spatial_url = "https://spatial-test.ala.org.au/ws/"; diff --git a/livingatlas/pipelines/src/test/java/au/org/ala/pipelines/converters/CoreTsvConverterTest.java b/livingatlas/pipelines/src/test/java/au/org/ala/pipelines/converters/CoreTsvConverterTest.java index b9a29f7ff1..0248e2f46e 100644 --- a/livingatlas/pipelines/src/test/java/au/org/ala/pipelines/converters/CoreTsvConverterTest.java +++ b/livingatlas/pipelines/src/test/java/au/org/ala/pipelines/converters/CoreTsvConverterTest.java @@ -104,7 +104,7 @@ public void converterTest() { "\"raw_er_disposition\"", // DwcTerm.disposition "\"raw_er_dynamicProperties\"", // DwcTerm.dynamicProperties "\"1111111\"", // DwcTerm.endDayOfYear - "\"{concept: br_establishmentMeans, lineage: [br_establishmentMeans]}\"", // DwcTerm.establishmentMeans + "\"br_establishmentMeans\"", // DwcTerm.establishmentMeans "\"raw_er_eventRemarks\"", // DwcTerm.eventRemarks "\"raw_er_eventTime\"", // DwcTerm.eventTime "\"raw_er_fieldNotes\"", // DwcTerm.fieldNotes @@ -631,7 +631,6 @@ public void converterTest() { BasicRecord.newBuilder() .setId(DwcTerm.occurrenceID.simpleName()) .setCreated(2L) - .setGbifId(22L) .setBasisOfRecord("br_basisOfRecord") .setSex("br_sex") .setLifeStage( @@ -668,7 +667,6 @@ public void converterTest() { .build())) .setRecordedBy(Arrays.asList("br_recordedBy_1", "br_recordedBy_2")) .setOccurrenceStatus("br_occurrenceStatus") - .setIsClustered(true) .setDatasetID(Collections.singletonList("br_datasetID")) .setDatasetName(Collections.singletonList("br_datasetName")) .setOtherCatalogNumbers(Collections.singletonList("br_otherCatalogNumbers")) @@ -785,14 +783,6 @@ public void converterTest() { .setVernacularName("atxr_VernacularName") .setSpeciesGroup(Collections.singletonList("atxr_SpeciesGroup")) .setSpeciesSubgroup(Collections.singletonList("atxr_SpeciesSubgroup")) - .setDiagnostics( - Diagnostic.newBuilder() - .setConfidence(5555) - .setStatus(Status.ACCEPTED) - .setNote("atxr_Diagnostic_Note") - .setMatchType(MatchType.EXACT) - .setLineage(Collections.singletonList("atxr_Diagnostic_Lineage")) - .build()) .build(); ALAAttributionRecord aar = diff --git a/livingatlas/pom.xml b/livingatlas/pom.xml index e14165aae8..12a17c370e 100644 --- a/livingatlas/pom.xml +++ b/livingatlas/pom.xml @@ -5,7 +5,7 @@ org.gbif.pipelines pipelines-parent - 2.11.12.2 + 2.11.12.3-SNAPSHOT ../pom.xml diff --git a/pom.xml b/pom.xml index a5e4321928..3e61062744 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ org.gbif.pipelines pipelines-parent - 2.11.12.2 + 2.11.12.3-SNAPSHOT pom diff --git a/sdks/beam-common/pom.xml b/sdks/beam-common/pom.xml index 8f866e0f7f..959f0cf7fe 100644 --- a/sdks/beam-common/pom.xml +++ b/sdks/beam-common/pom.xml @@ -5,7 +5,7 @@ org.gbif.pipelines sdks - 2.11.12.2 + 2.11.12.3-SNAPSHOT ../pom.xml diff --git a/sdks/beam-transforms/pom.xml b/sdks/beam-transforms/pom.xml index b7ef99f8cf..0dad87ebcd 100644 --- a/sdks/beam-transforms/pom.xml +++ b/sdks/beam-transforms/pom.xml @@ -5,7 +5,7 @@ org.gbif.pipelines sdks - 2.11.12.2 + 2.11.12.3-SNAPSHOT ../pom.xml diff --git a/sdks/core/pom.xml b/sdks/core/pom.xml index cc785cad80..74804e0330 100644 --- a/sdks/core/pom.xml +++ b/sdks/core/pom.xml @@ -5,7 +5,7 @@ org.gbif.pipelines sdks - 2.11.12.2 + 2.11.12.3-SNAPSHOT ../pom.xml diff --git a/sdks/models/pom.xml b/sdks/models/pom.xml index 15d356e854..5389537a7a 100644 --- a/sdks/models/pom.xml +++ b/sdks/models/pom.xml @@ -5,7 +5,7 @@ org.gbif.pipelines sdks - 2.11.12.2 + 2.11.12.3-SNAPSHOT ../pom.xml diff --git a/sdks/pom.xml b/sdks/pom.xml index 7e69312021..fe2d2142f6 100644 --- a/sdks/pom.xml +++ b/sdks/pom.xml @@ -5,7 +5,7 @@ org.gbif.pipelines pipelines-parent - 2.11.12.2 + 2.11.12.3-SNAPSHOT ../pom.xml diff --git a/sdks/variables/pom.xml b/sdks/variables/pom.xml index 93ca8c5b3f..5faf9c494b 100644 --- a/sdks/variables/pom.xml +++ b/sdks/variables/pom.xml @@ -5,7 +5,7 @@ org.gbif.pipelines sdks - 2.11.12.2 + 2.11.12.3-SNAPSHOT ../pom.xml diff --git a/tools/archives-converters/pom.xml b/tools/archives-converters/pom.xml index 0e9faef260..487e59161c 100644 --- a/tools/archives-converters/pom.xml +++ b/tools/archives-converters/pom.xml @@ -5,7 +5,7 @@ org.gbif.pipelines tools - 2.11.12.2 + 2.11.12.3-SNAPSHOT ../pom.xml diff --git a/tools/elasticsearch-tools/pom.xml b/tools/elasticsearch-tools/pom.xml index 315ea7b450..c508207173 100644 --- a/tools/elasticsearch-tools/pom.xml +++ b/tools/elasticsearch-tools/pom.xml @@ -5,7 +5,7 @@ org.gbif.pipelines tools - 2.11.12.2 + 2.11.12.3-SNAPSHOT ../pom.xml diff --git a/tools/extension-converter-maven-plugin/pom.xml b/tools/extension-converter-maven-plugin/pom.xml index 88794535d2..6ed3a034d3 100644 --- a/tools/extension-converter-maven-plugin/pom.xml +++ b/tools/extension-converter-maven-plugin/pom.xml @@ -5,7 +5,7 @@ org.gbif.pipelines tools - 2.11.12.2 + 2.11.12.3-SNAPSHOT ../pom.xml diff --git a/tools/pipelines-maven-plugin/pom.xml b/tools/pipelines-maven-plugin/pom.xml index 15608e03c3..575a2ce5e5 100644 --- a/tools/pipelines-maven-plugin/pom.xml +++ b/tools/pipelines-maven-plugin/pom.xml @@ -5,7 +5,7 @@ org.gbif.pipelines tools - 2.11.12.2 + 2.11.12.3-SNAPSHOT ../pom.xml diff --git a/tools/pom.xml b/tools/pom.xml index 6606dfdb79..5a3571def6 100644 --- a/tools/pom.xml +++ b/tools/pom.xml @@ -5,7 +5,7 @@ org.gbif.pipelines pipelines-parent - 2.11.12.2 + 2.11.12.3-SNAPSHOT ../pom.xml diff --git a/tools/xml-to-avsc-maven-plugin/pom.xml b/tools/xml-to-avsc-maven-plugin/pom.xml index f5e1df149a..ab422ba18e 100644 --- a/tools/xml-to-avsc-maven-plugin/pom.xml +++ b/tools/xml-to-avsc-maven-plugin/pom.xml @@ -5,7 +5,7 @@ org.gbif.pipelines tools - 2.11.12.2 + 2.11.12.3-SNAPSHOT ../pom.xml