Skip to content

Commit

Permalink
Merge pull request #73 from aodn/features/5522-long-running-query
Browse files Browse the repository at this point in the history
Features/5522 long running query
  • Loading branch information
utas-raymondng authored May 24, 2024
2 parents d9269c7 + ac1967b commit 7c62fc4
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.springframework.http.*;
import org.springframework.web.bind.annotation.*;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.io.IOException;

Expand Down Expand Up @@ -56,7 +57,47 @@ public ResponseEntity<ObjectNode> getDocumentByUUID(@PathVariable("uuid") String
@PostMapping(path="/all", consumes = "application/json", produces = "application/json")
@Operation(security = { @SecurityRequirement(name = "X-API-Key") }, description = "Index all metadata records from GeoNetwork")
public ResponseEntity<String> indexAllMetadataRecords(@RequestParam(value = "confirm", defaultValue = "false") Boolean confirm) throws IOException {
return indexerService.indexAllMetadataRecordsFromGeoNetwork(confirm);
return indexerService.indexAllMetadataRecordsFromGeoNetwork(confirm, null);
}

@PostMapping(path="/async/all", consumes = "application/json", produces = "application/json")
@Operation(security = { @SecurityRequirement(name = "X-API-Key") }, description = "Index all metadata records from GeoNetwork")
public SseEmitter indexAllMetadataRecordsAsync(@RequestParam(value = "confirm", defaultValue = "false") Boolean confirm) {
final SseEmitter emitter = new SseEmitter(0L); // 0L means no timeout;

IndexerService.Callback callback = new IndexerService.Callback() {
@Override
public void onProgress(Object update) {
try {
emitter.send(update.toString());
}
catch (IOException e) {
emitter.completeWithError(e);
}
}

@Override
public void onComplete(Object result) {
try {
emitter.send(result.toString());
emitter.complete();
}
catch (IOException e) {
emitter.completeWithError(e);
}
}
};

new Thread(() -> {
try {
indexerService.indexAllMetadataRecordsFromGeoNetwork(confirm, callback);
}
catch(IOException e) {
emitter.completeWithError(e);
}
}).start();

return emitter;
}

@PostMapping(path="/{uuid}", produces = "application/json")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,12 @@
import java.io.IOException;

public interface IndexerService {
interface Callback {
void onProgress(Object update);
void onComplete(Object result);
}
ResponseEntity<String> indexMetadata(String metadataValues) throws IOException, FactoryException, TransformException, JAXBException;
ResponseEntity<String> deleteDocumentByUUID(String uuid) throws IOException;
ResponseEntity<String> indexAllMetadataRecordsFromGeoNetwork(boolean confirm) throws IOException;
ResponseEntity<String> indexAllMetadataRecordsFromGeoNetwork(boolean confirm, Callback callback) throws IOException;
Hit<ObjectNode> getDocumentByUUID(String uuid) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ public ResponseEntity<String> deleteDocumentByUUID(String uuid) throws IOExcepti
}
}

public ResponseEntity<String> indexAllMetadataRecordsFromGeoNetwork(boolean confirm) throws IOException {
public ResponseEntity<String> indexAllMetadataRecordsFromGeoNetwork(boolean confirm, Callback callback) throws IOException {
if (!confirm) {
throw new IndexAllRequestNotConfirmedException("Please confirm that you want to index all metadata records from GeoNetwork");
}
Expand Down Expand Up @@ -251,7 +251,13 @@ public ResponseEntity<String> indexAllMetadataRecordsFromGeoNetwork(boolean conf
// and we have not add it to the bulkRequest
if(dataSize + size > 100000 && dataSize != 0) {
logger.info("Execute batch as bulk request is big enough {}", dataSize + size);
results.add(executeBulk(bulkRequest));
BulkResponse temp = executeBulk(bulkRequest);
results.add(temp);

if(callback != null) {
callback.onProgress(temp);
}

dataSize = 0;
bulkRequest = new BulkRequest.Builder();
}
Expand All @@ -275,7 +281,12 @@ public ResponseEntity<String> indexAllMetadataRecordsFromGeoNetwork(boolean conf
}

// In case there are residual
results.add(executeBulk(bulkRequest));
BulkResponse temp = executeBulk(bulkRequest);
results.add(temp);

if(callback != null) {
callback.onComplete(temp);
}

// TODO now processing for record_suggestions index
logger.info("Finished execute bulk indexing records to index: {}",indexName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public void verifyGetDocumentCount() throws IOException {
insertMetadataRecords(uuid1, "classpath:canned/sample2.xml");
insertMetadataRecords(uuid2, "classpath:canned/sample1.xml");

indexerService.indexAllMetadataRecordsFromGeoNetwork(true);
indexerService.indexAllMetadataRecordsFromGeoNetwork(true, null);

// The sample1 geometry have error [1:9695] failed to parse field [summaries.proj:geometry] of type [geo_shape]
// ErrorCause: {"type":"illegal_argument_exception","reason":"Polygon self-intersection at lat=57.0 lon=-66.0"}
Expand All @@ -114,7 +114,7 @@ public void verifyDeleteDocumentByUUID() throws IOException {
insertMetadataRecords(uuid1, "classpath:canned/sample2.xml");
insertMetadataRecords(uuid2, "classpath:canned/sample3.xml");

indexerService.indexAllMetadataRecordsFromGeoNetwork(true);
indexerService.indexAllMetadataRecordsFromGeoNetwork(true, null);
assertEquals("Doc count correct", 2L, elasticSearchIndexService.getDocumentsCount(INDEX_NAME));

// Only 2 doc in elastic, if we delete it then should be zero
Expand All @@ -136,7 +136,7 @@ public void verifyGetDocumentByUUID() throws IOException {

insertMetadataRecords(uuid, "classpath:canned/sample4.xml");

indexerService.indexAllMetadataRecordsFromGeoNetwork(true);
indexerService.indexAllMetadataRecordsFromGeoNetwork(true, null);
Hit<ObjectNode> objectNodeHit = indexerService.getDocumentByUUID(uuid);

String test = objectNodeHit.source().toPrettyString();
Expand All @@ -159,7 +159,7 @@ public void verifyLogoLinkAddedOnIndex() throws IOException {

insertMetadataRecords(uuid, "classpath:canned/sample5.xml");

indexerService.indexAllMetadataRecordsFromGeoNetwork(true);
indexerService.indexAllMetadataRecordsFromGeoNetwork(true, null);
Hit<ObjectNode> objectNodeHit = indexerService.getDocumentByUUID(uuid);

String test = objectNodeHit.source().toPrettyString();
Expand All @@ -181,7 +181,7 @@ public void verifyThumbnailLinkAddedOnIndex() throws IOException {

insertMetadataRecords(uuid, "classpath:canned/sample6.xml");

indexerService.indexAllMetadataRecordsFromGeoNetwork(true);
indexerService.indexAllMetadataRecordsFromGeoNetwork(true, null);
Hit<ObjectNode> objectNodeHit = indexerService.getDocumentByUUID(uuid);

String test = objectNodeHit.source().toPrettyString();
Expand All @@ -205,7 +205,7 @@ public void verifyThumbnailLinkNullAddedOnIndex() throws IOException {

insertMetadataRecords(uuid, "classpath:canned/sample7.xml");

indexerService.indexAllMetadataRecordsFromGeoNetwork(true);
indexerService.indexAllMetadataRecordsFromGeoNetwork(true, null);
Hit<ObjectNode> objectNodeHit = indexerService.getDocumentByUUID(uuid);

String test = objectNodeHit.source().toPrettyString();
Expand Down

0 comments on commit 7c62fc4

Please sign in to comment.