Skip to content

Commit

Permalink
Update tests
Browse files Browse the repository at this point in the history
  • Loading branch information
vietnguyengit committed Sep 26, 2024
1 parent 1a391e1 commit 6ad1ffa
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
public interface VocabService {
List<String> extractVocabLabelsFromThemes(List<ThemesModel> themes, String vocabType) throws IOException;

void populateVocabsData();
void populateVocabsData() throws IOException;
void populateVocabsDataAsync();
void clearParameterVocabCache();
void clearPlatformVocabCache();
void clearOrganisationVocabCache();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,66 +227,99 @@ protected void indexAllVocabs(List<VocabModel> parameterVocabs,
}

protected void bulkIndexVocabs(List<VocabDto> vocabs) throws IOException {
// count portal index documents, or create index if not found from defined mapping JSON file
BulkRequest.Builder bulkRequest = new BulkRequest.Builder();

for (VocabDto vocab : vocabs) {
try {
// convert vocab values to binary data
log.debug("Ingested json is {}", indexerObjectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(vocab));
// send bulk request to Elasticsearch
bulkRequest.operations(op -> op
.index(idx -> idx
.index(vocabsIndexName)
.document(vocab)
)
);
} catch (JsonProcessingException e) {
log.error("Failed to ingest parameterVocabs to {}", vocabsIndexName);
throw new RuntimeException(e);
if (!vocabs.isEmpty()) {
BulkRequest.Builder bulkRequest = new BulkRequest.Builder();

for (VocabDto vocab : vocabs) {
try {
// convert vocab values to binary data
log.debug("Ingested json is {}", indexerObjectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(vocab));
// send bulk request to Elasticsearch
bulkRequest.operations(op -> op
.index(idx -> idx
.index(vocabsIndexName)
.document(vocab)
)
);
} catch (JsonProcessingException e) {
log.error("Failed to ingest parameterVocabs to {}", vocabsIndexName);
throw new RuntimeException(e);
}
}
}

BulkResponse result = portalElasticsearchClient.bulk(bulkRequest.build());
BulkResponse result = portalElasticsearchClient.bulk(bulkRequest.build());

// Flush after insert, otherwise you need to wait for next auto-refresh. It is
// especially a problem with autotest, where assert happens very fast.
portalElasticsearchClient.indices().refresh();
// Flush after insert, otherwise you need to wait for next auto-refresh. It is
// especially a problem with autotest, where assert happens very fast.
portalElasticsearchClient.indices().refresh();

// Log errors, if any
if (result.errors()) {
log.error("Bulk had errors");
for (BulkResponseItem item: result.items()) {
if (item.error() != null) {
log.error("{} {}", item.error().reason(), item.error().causedBy());
// Log errors, if any
if (result.errors()) {
log.error("Bulk had errors");
for (BulkResponseItem item: result.items()) {
if (item.error() != null) {
log.error("{} {}", item.error().reason(), item.error().causedBy());
}
}
} else {
log.info("Finished bulk indexing items to index: {}", vocabsIndexName);
}
log.info("Total documents in index: {} is {}", vocabsIndexName, elasticSearchIndexService.getDocumentsCount(vocabsIndexName));
} else {
log.info("Finished bulk indexing items to index: {}", vocabsIndexName);
log.error("No vocabs to be indexed, nothing to index");
}
log.info("Total documents in index: {} is {}", vocabsIndexName, elasticSearchIndexService.getDocumentsCount(vocabsIndexName));
}

public void populateVocabsData() {
public void populateVocabsData() throws IOException {
log.info("Starting fetching vocabs data process synchronously...");

List<VocabModel> parameterVocabs = ardcVocabService.getVocabTreeFromArdcByType(VocabApiPaths.PARAMETER_VOCAB);
List<VocabModel> platformVocabs = ardcVocabService.getVocabTreeFromArdcByType(VocabApiPaths.PLATFORM_VOCAB);
List<VocabModel> organisationVocabs = ardcVocabService.getVocabTreeFromArdcByType(VocabApiPaths.ORGANISATION_VOCAB);

indexAllVocabs(parameterVocabs, platformVocabs, organisationVocabs);
}

public void populateVocabsDataAsync() {
log.info("Starting async vocabs data fetching process...");

ExecutorService executorService = Executors.newFixedThreadPool(3);
List<Callable<List<VocabModel>>> vocabTasks = List.of(
createVocabFetchTask(VocabApiPaths.PARAMETER_VOCAB, "parameter"),
createVocabFetchTask(VocabApiPaths.PLATFORM_VOCAB, "platform"),
createVocabFetchTask(VocabApiPaths.ORGANISATION_VOCAB, "organisation")
);
processVocabTasks(executorService, vocabTasks);
}

private void processVocabTasks(ExecutorService executorService, List<Callable<List<VocabModel>>> tasks) {
try {
List<Future<List<VocabModel>>> completed = executorService.invokeAll(tasks);
log.info("Indexing fetched vocabs to {}", vocabsIndexName);
indexAllVocabs(completed.get(0).get(), completed.get(1).get(), completed.get(2).get());
} catch (Exception e) {
log.error("Error processing vocabs data", e);
} finally {
executorService.shutdown();
}
CompletableFuture.runAsync(() -> {
try {
// Invoke all tasks and wait for completion
List<Future<List<VocabModel>>> completedFutures = executorService.invokeAll(vocabTasks);

// Ensure all tasks are completed and check for exceptions
List<List<VocabModel>> allResults = new ArrayList<>();
for (Future<List<VocabModel>> future : completedFutures) {
try {
allResults.add(future.get()); // Blocks until the task is completed and retrieves the result
} catch (Exception taskException) {
log.error("Task failed with an exception", taskException);
// Handle failure for this particular task
allResults.add(Collections.emptyList()); // add empty result for failed task
}
}

// Call indexAllVocabs only after all tasks are completed
log.info("Indexing fetched vocabs to {}", vocabsIndexName);
indexAllVocabs(allResults.get(0), allResults.get(1), allResults.get(2));

} catch (InterruptedException | IOException e) {
Thread.currentThread().interrupt(); // Restore interrupt status
log.error("Thread was interrupted while processing vocab tasks", e);
} finally {
executorService.shutdown();
}
});

log.info("Vocabs data fetching process started in the background.");
}

private Callable<List<VocabModel>> createVocabFetchTask(VocabApiPaths vocabType, String vocabName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.env.Environment;
import org.springframework.scheduling.annotation.Scheduled;

import java.io.IOException;
Expand All @@ -25,30 +26,45 @@ public void setVocabService(VocabService vocabService) {
this.vocabService = vocabService;
}

@Autowired
private Environment environment;

@PostConstruct
public void init() {
// this could take a few minutes to complete, in development, you can skip it with -Dapp.initialiseVocabsIndex=false
// you can call /api/v1/indexer/ext/vocabs/populate endpoint to manually refresh the vocabs index, without waiting for the scheduled task
public void init() throws IOException {
// Check if the initialiseVocabsIndex flag is enabled
if (initialiseVocabsIndex) {
log.info("Initialising {}", vocabsIndexName);
// non-blocking async method for populating vocabs index data when starting the app
log.info("Starting async vocabs data fetching process...");
CompletableFuture.runAsync(() -> {
if (isTestProfileActive()) {
log.info("Initialising {} synchronously for test profile", vocabsIndexName);
vocabService.populateVocabsData();
});
log.info("Vocabs data fetching process started in the background.");
} else {
log.info("Initialising {} asynchronously", vocabsIndexName);
vocabService.populateVocabsDataAsync();
}
}
}

private boolean isTestProfileActive() {
String[] activeProfiles = environment.getActiveProfiles();
for (String profile : activeProfiles) {
if ("test".equalsIgnoreCase(profile)) {
return true;
}
}
return false;
}

@Scheduled(cron = "0 0 0 * * *")
public void scheduledRefreshVocabsData() throws IOException {
log.info("Refreshing ARDC vocabularies data");

// call synchronous populating method, otherwise existing vocab caches will be emptied while new data hasn't been fully processed yet.
vocabService.populateVocabsData();

// clear existing caches
vocabService.clearParameterVocabCache();
vocabService.clearPlatformVocabCache();
vocabService.clearOrganisationVocabCache();
// populate latest vocabs
vocabService.populateVocabsData();

// update the caches
vocabService.getParameterVocabs();
vocabService.getPlatformVocabs();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package au.org.aodn.esindexer;

import au.org.aodn.ardcvocabs.service.ArdcVocabService;
import au.org.aodn.esindexer.configuration.GeoNetworkSearchTestConfig;
import au.org.aodn.esindexer.service.VocabServiceImpl;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.ElasticsearchException;
import co.elastic.clients.elasticsearch._types.query_dsl.QueryBuilders;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,6 @@ public class VocabServiceTest extends BaseTestClass {
@Autowired
protected ObjectMapper indexerObjectMapper;

@BeforeAll
public void setup() {
vocabService.populateVocabsData();
}

@Test
void testExtractParameterVocabLabelsFromThemes() throws IOException {
// Prepare themes
Expand Down Expand Up @@ -130,7 +125,6 @@ void testExtractPOrganisationVocabLabelsFromThemes() throws IOException {
@Test
void testProcessParameterVocabs() throws IOException, JSONException {
// read from ARDC

List<VocabModel> parameterVocabsFromArdc = ardcVocabService.getVocabTreeFromArdcByType(VocabApiPaths.PARAMETER_VOCAB);

// read from Elastic search
Expand Down
2 changes: 1 addition & 1 deletion indexer/src/test/resources/application-test.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Client calling the Indexer API must provide this token in the Authorization header
# diff core / max value should not impact the run, this is just to verify this is correct
app:
initialiseVocabsIndex: false
initialiseVocabsIndex: true
geometry:
# This value will affect the size of grid to divide a spatial extents which is used to calculate the
# centroid point in the summaries, test case needs to change if you change this value
Expand Down

0 comments on commit 6ad1ffa

Please sign in to comment.