Skip to content

Commit

Permalink
Cherry picked fixes for #793 and DWCA export functionality to ease Ai…
Browse files Browse the repository at this point in the history
…rflow deployment

Work is related to: AtlasOfLivingAustralia/preingestion#103
  • Loading branch information
djtfmartin committed Nov 23, 2022
1 parent cb93adf commit e103451
Show file tree
Hide file tree
Showing 52 changed files with 976 additions and 78 deletions.
2 changes: 1 addition & 1 deletion examples/metrics/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>org.gbif.pipelines</groupId>
<artifactId>examples</artifactId>
<version>2.11.12.2</version>
<version>2.11.12.3-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<artifactId>pipelines-parent</artifactId>
<groupId>org.gbif.pipelines</groupId>
<version>2.11.12.2</version>
<version>2.11.12.3-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion examples/transform/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>org.gbif.pipelines</groupId>
<artifactId>examples</artifactId>
<version>2.11.12.2</version>
<version>2.11.12.3-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion gbif/coordinator/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>org.gbif.pipelines</groupId>
<artifactId>gbif</artifactId>
<version>2.11.12.2</version>
<version>2.11.12.3-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion gbif/coordinator/tasks-integration-tests-hbase/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>org.gbif.pipelines</groupId>
<artifactId>coordinator</artifactId>
<version>2.11.12.2</version>
<version>2.11.12.3-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion gbif/coordinator/tasks-integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>org.gbif.pipelines</groupId>
<artifactId>coordinator</artifactId>
<version>2.11.12.2</version>
<version>2.11.12.3-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion gbif/coordinator/tasks/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>org.gbif.pipelines</groupId>
<artifactId>coordinator</artifactId>
<version>2.11.12.2</version>
<version>2.11.12.3-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion gbif/diagnostics/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>org.gbif.pipelines</groupId>
<artifactId>gbif</artifactId>
<version>2.11.12.2</version>
<version>2.11.12.3-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion gbif/keygen/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>org.gbif.pipelines</groupId>
<artifactId>gbif</artifactId>
<version>2.11.12.2</version>
<version>2.11.12.3-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion gbif/pipelines/clustering-gbif/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<artifactId>pipelines</artifactId>
<groupId>org.gbif.pipelines</groupId>
<version>2.11.12.2</version>
<version>2.11.12.3-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand Down
2 changes: 1 addition & 1 deletion gbif/pipelines/export-gbif-hbase/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>org.gbif.pipelines</groupId>
<artifactId>pipelines</artifactId>
<version>2.11.12.2</version>
<version>2.11.12.3-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion gbif/pipelines/ingest-gbif-beam/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>org.gbif.pipelines</groupId>
<artifactId>pipelines</artifactId>
<version>2.11.12.2</version>
<version>2.11.12.3-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion gbif/pipelines/ingest-gbif-fragmenter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>pipelines</artifactId>
<groupId>org.gbif.pipelines</groupId>
<version>2.11.12.2</version>
<version>2.11.12.3-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion gbif/pipelines/ingest-gbif-java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>org.gbif.pipelines</groupId>
<artifactId>pipelines</artifactId>
<version>2.11.12.2</version>
<version>2.11.12.3-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion gbif/pipelines/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>org.gbif.pipelines</groupId>
<artifactId>gbif</artifactId>
<version>2.11.12.2</version>
<version>2.11.12.3-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion gbif/pipelines/pre-backbone-release/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<artifactId>pipelines</artifactId>
<groupId>org.gbif.pipelines</groupId>
<version>2.11.12.2</version>
<version>2.11.12.3-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand Down
2 changes: 1 addition & 1 deletion gbif/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>org.gbif.pipelines</groupId>
<artifactId>pipelines-parent</artifactId>
<version>2.11.12.2</version>
<version>2.11.12.3-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion gbif/validator/pipelines-validator-ws/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>org.gbif.pipelines</groupId>
<artifactId>validator</artifactId>
<version>2.11.12.2</version>
<version>2.11.12.3-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion gbif/validator/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>org.gbif.pipelines</groupId>
<artifactId>gbif</artifactId>
<version>2.11.12.2</version>
<version>2.11.12.3-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion gbif/validator/validator-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>org.gbif.pipelines</groupId>
<artifactId>validator</artifactId>
<version>2.11.12.2</version>
<version>2.11.12.3-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion gbif/validator/validator-checklists/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>org.gbif.pipelines</groupId>
<artifactId>validator</artifactId>
<version>2.11.12.2</version>
<version>2.11.12.3-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion gbif/validator/validator-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>org.gbif.pipelines</groupId>
<artifactId>validator</artifactId>
<version>2.11.12.2</version>
<version>2.11.12.3-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion gbif/validator/validator-eml/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>org.gbif.pipelines</groupId>
<artifactId>validator</artifactId>
<version>2.11.12.2</version>
<version>2.11.12.3-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion gbif/validator/validator-it-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>org.gbif.pipelines</groupId>
<artifactId>validator</artifactId>
<version>2.11.12.2</version>
<version>2.11.12.3-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion gbif/validator/validator-mail/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<groupId>org.gbif.pipelines</groupId>
<artifactId>validator</artifactId>
<version>2.11.12.2</version>
<version>2.11.12.3-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand Down
2 changes: 1 addition & 1 deletion gbif/validator/validator-ws-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>org.gbif.pipelines</groupId>
<artifactId>validator</artifactId>
<version>2.11.12.2</version>
<version>2.11.12.3-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion livingatlas/bitmap/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<groupId>au.org.ala</groupId>
<artifactId>livingatlas</artifactId>
<version>2.11.12.2</version>
<version>2.11.12.3-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand Down
12 changes: 12 additions & 0 deletions livingatlas/configs/la-pipelines.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,18 @@ image-load-sh-args:
executor-memory: 7G
driver-memory: 1G

export-sh-args:
local:
jvm: -Xmx8g -XX:+UseG1GC
spark-embedded:
jvm: -Xmx8g -XX:+UseG1GC
spark-cluster:
conf: spark.default.parallelism=48
num-executors: 6
executor-cores: 8
executor-memory: 20G
driver-memory: 4G

uuid-sh-args:
local:
jvm: -Xmx8g -XX:+UseG1GC
Expand Down
2 changes: 1 addition & 1 deletion livingatlas/migration/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<groupId>au.org.ala</groupId>
<artifactId>livingatlas</artifactId>
<version>2.11.12.2</version>
<version>2.11.12.3-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand Down
2 changes: 1 addition & 1 deletion livingatlas/pipelines/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>au.org.ala</groupId>
<artifactId>livingatlas</artifactId>
<version>2.11.12.2</version>
<version>2.11.12.3-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import static org.apache.beam.sdk.values.TypeDescriptors.strings;
import static org.gbif.pipelines.common.beam.utils.PathBuilder.buildDatasetAttemptPath;
import static org.gbif.pipelines.common.beam.utils.PathBuilder.buildPath;

import au.org.ala.kvs.ALAPipelinesConfig;
import au.org.ala.kvs.ALAPipelinesConfigFactory;
Expand All @@ -14,13 +13,18 @@
import au.org.ala.utils.ALAFsUtils;
import au.org.ala.utils.CombinedYamlConfiguration;
import au.org.ala.utils.ValidationUtils;
import java.io.File;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.net.URL;
import java.nio.file.Files;
import java.util.List;
import java.util.Scanner;
import java.util.function.UnaryOperator;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.beam.sdk.Pipeline;
Expand All @@ -31,7 +35,12 @@
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.codehaus.plexus.util.FileUtils;
import org.gbif.pipelines.common.beam.options.PipelinesOptionsFactory;
import org.gbif.pipelines.core.factory.FileSystemFactory;
import org.gbif.pipelines.core.utils.FsUtils;
import org.gbif.pipelines.io.avro.IndexRecord;
import org.slf4j.MDC;
Expand All @@ -52,10 +61,11 @@ public static void main(String[] args) throws Exception {
}

@SneakyThrows
public static void run(DwCAExportPipelineOptions options) {
public static void run(DwCAExportPipelineOptions options) throws Exception {

UnaryOperator<String> pathFn =
fileName -> buildPath(buildDatasetAttemptPath(options, "dwca", false), fileName).toString();
fileName ->
String.join(Path.SEPARATOR, buildDatasetAttemptPath(options, "dwca", false), fileName);

Pipeline p = Pipeline.create(options);

Expand Down Expand Up @@ -101,6 +111,56 @@ public List<String> apply(IndexRecord indexRecord) {

// Write the eml.xml
writeEML(options, pathFn);

// create output dir
FileUtils.forceMkdir(new File(options.getLocalExportPath()));

// copy from HDFS to /tmp
boolean originalInputIsHdfs = options.getInputPath().startsWith("hdfs://");

// if inputPath is "hdfs://", then copy to local
if (originalInputIsHdfs) {

FileSystem fs =
FileSystemFactory.getInstance(options.getHdfsSiteConfig(), options.getCoreSiteConfig())
.getFs(options.getInputPath());

Path inputPathHdfs = new Path(options.getInputPath());

if (!fs.exists(inputPathHdfs)) {
throw new RuntimeException("Input file not available: " + options.getInputPath());
}

String dwcaHdfsOutputPath = buildDatasetAttemptPath(options, "dwca", false);
String dwcaOutputPath = options.getLocalExportPath() + "/" + options.getDatasetId() + "/";
RemoteIterator<LocatedFileStatus> iter =
fs.listFiles(new Path(dwcaHdfsOutputPath + "/"), false);

while (iter.hasNext()) {
LocatedFileStatus locatedFileStatus = iter.next();
Path path = locatedFileStatus.getPath();
if (fs.isFile(path)) {
log.info("Transferring " + path.toString() + " to " + dwcaOutputPath);
fs.copyToLocalFile(false, path, new Path(dwcaOutputPath + "/" + path.getName()), false);
}
}
}

String zipPath = options.getLocalExportPath() + "/" + options.getDatasetId() + ".zip";
String dwcaOutputPath = options.getLocalExportPath() + "/" + options.getDatasetId();
File[] filePaths =
originalInputIsHdfs
? new File(dwcaOutputPath).listFiles()
: new File(buildDatasetAttemptPath(options, "dwca", false)).listFiles();

try (ZipOutputStream zipOut = new ZipOutputStream(new FileOutputStream(zipPath))) {
for (File fileToZip : filePaths) {
log.info("Adding to Zip file: " + fileToZip.getName());
zipOut.putNextEntry(new ZipEntry(fileToZip.getName()));
Files.copy(fileToZip.toPath(), zipOut);
}
}
log.info("Zip file written to: " + zipPath);
}

@SneakyThrows
Expand Down
Loading

0 comments on commit e103451

Please sign in to comment.