Skip to content

Commit

Permalink
Merge pull request #72 from aodn/features/5522-bbox-bug
Browse files Browse the repository at this point in the history
Add retry to due with timeout on geonetwork elastic
  • Loading branch information
vietnguyengit authored May 24, 2024
2 parents 3e8b23c + d19fffd commit d9269c7
Show file tree
Hide file tree
Showing 8 changed files with 375 additions and 171 deletions.
8 changes: 8 additions & 0 deletions indexer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-cache</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.retry</groupId>
<artifactId>spring-retry</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aspects</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.retry.annotation.EnableRetry;

@Configuration
@EnableRetry
public class IndexerConfig {
/**
* We need to create component here because we do not want to run test with real http connection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
import au.org.aodn.esindexer.utils.UrlUtils;
import au.org.aodn.stac.model.LinkModel;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.FieldSort;
import co.elastic.clients.elasticsearch._types.FieldValue;
import co.elastic.clients.elasticsearch._types.SortOrder;
import co.elastic.clients.elasticsearch._types.query_dsl.MatchAllQuery;

import co.elastic.clients.elasticsearch.core.SearchRequest;
Expand All @@ -16,14 +19,22 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.context.annotation.Scope;
import org.springframework.context.annotation.ScopedProxyMode;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.*;
import org.springframework.retry.annotation.Backoff;
import org.springframework.retry.annotation.Retryable;
import org.springframework.web.client.HttpClientErrorException;
import org.springframework.web.client.RestTemplate;

import java.io.IOException;
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;

@Scope(proxyMode = ScopedProxyMode.TARGET_CLASS)
public class GeoNetworkServiceImpl implements GeoNetworkService {

public static final String SUGGEST_LOGOS = "suggest_logos";
Expand All @@ -33,11 +44,17 @@ public class GeoNetworkServiceImpl implements GeoNetworkService {
@Autowired
protected UrlUtils urlUtils;

@Lazy
@Autowired
protected GeoNetworkServiceImpl self;

@Value("${elasticsearch.query.pageSize:1500}")
protected int ES_PAGE_SIZE;

protected static final Logger logger = LogManager.getLogger(GeoNetworkServiceImpl.class);

protected RestTemplate indexerRestTemplate;
protected ElasticsearchClient gn4ElasticClient;
protected final SearchRequest GEONETWORK_ALL_UUID;
protected String indexName;
protected String server;
protected HttpEntity<String> defaultRequestEntity = getRequestEntity(MediaType.APPLICATION_JSON, null);
Expand All @@ -64,20 +81,6 @@ public GeoNetworkServiceImpl(
this.gn4ElasticClient = gn4ElasticClient;
this.indexerRestTemplate = indexerRestTemplate;

GEONETWORK_ALL_UUID = new SearchRequest.Builder()
.index(indexName)
.query(new MatchAllQuery.Builder().build()._toQuery()) // Match all
.source(s -> s
.filter(f -> f.includes(UUID)))// Only select uuid field
// TODO: redesign the iterator to be more efficient
/* by default ES will return just 10 top hits (10 records of the thousands available records),
the iterator implementation in getAllMetadataRecords() method will help saving memory but process those 10 records only,
need to temporarily increase the size of returning hits
*/
.size(2000)
.build();
logger.info("GEONETWORK_ALL_UUID -> {}", GEONETWORK_ALL_UUID);

setIndexName(indexName);
setServer(server);
}
Expand Down Expand Up @@ -126,7 +129,7 @@ public String findGroupById(String uuid) throws IOException {
*/
@Override
public Optional<LinkModel> getThumbnail(String uuid) {
Optional<Map<String, ?>> optRelated = getRecordRelated(uuid);
Optional<Map<String, ?>> optRelated = self.getRecordRelated(uuid);
if(optRelated.isPresent()) {
Map<String, ?> node = optRelated.get();
if(node.containsKey(THUMB_NAILS) && node.get(THUMB_NAILS) instanceof List<?> thumbnails) {
Expand Down Expand Up @@ -306,12 +309,21 @@ protected Optional<Map<String, Object>> getRecordExtraInfo(String uuid) {
* }
* ]
* }
*
* The need of retryable is because the geonetwork elastic instance is too small, often it memory usage is
* 75%, it will throw BadRequest exception if we push it too hard, so we need to retry on bad request
*/
@Retryable(
retryFor = HttpClientErrorException.BadRequest.class,
maxAttempts = 10,
backoff = @Backoff(delay = 1500L)
)
protected Optional<Map<String, ?>> getRecordRelated(String uuid) {
try {
Map<String, Object> params = new HashMap<>();
params.put(UUID, uuid);

logger.debug("Get related record for {}", uuid);
ResponseEntity<Map<String, ?>> responseEntity = indexerRestTemplate.exchange(
getGeoNetworkRelatedEndpoint(),
HttpMethod.GET,
Expand Down Expand Up @@ -415,24 +427,51 @@ public boolean isMetadataRecordsCountLessThan(int c) {

return true;
}

/**
* The need of retryable is because the geonetwork elastic instance is too small, often it memory usage is
* 75%, it will throw BadRequest exception if we push it too hard, so we need to retry on bad request
*/
@Retryable(
retryFor = HttpClientErrorException.BadRequest.class,
maxAttempts = 10,
backoff = @Backoff(delay = 1500L)
)
@Override
public Iterable<String> getAllMetadataRecords() {

try {
// TODO: Can the elastic index not update after insert dataset into GeoNetwork?
final SearchResponse<ObjectNode> response = gn4ElasticClient.search(GEONETWORK_ALL_UUID, ObjectNode.class);
final AtomicReference<String> lastUUID = new AtomicReference<>(null);
final AtomicReference<SearchResponse<ObjectNode>> response =
new AtomicReference<>(gn4ElasticClient.search(createSearchAllUUID(null), ObjectNode.class));

if(response.get().hits() != null
&& response.get().hits().hits() != null
&& !response.get().hits().hits().isEmpty()) {

if(response.hits() != null && response.hits().hits() != null && !response.hits().hits().isEmpty()) {
// Use iterator so that we can get record by record, otherwise we need to store all record
// in memory which use up lots of memory
return () -> new Iterator<>() {

// int is enough because we paged query
private int index = 0;

@Override
public boolean hasNext() {
return index < response.hits().hits().size();
// If we hit the end, that means we have iterated to end of page.
if (index < response.get().hits().hits().size()) {
return true;
}
else {
// Check if we have next page
try {
response.set(gn4ElasticClient.search(createSearchAllUUID(lastUUID.get()), ObjectNode.class));
// Reset counter from start
index = 0;
return index < response.get().hits().hits().size();
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
}
/**
* There is a problem with query the elastic directly because the index maybe outdated (not reindexed)
Expand All @@ -443,16 +482,33 @@ public boolean hasNext() {
*/
@Override
public String next() {
// TODO: Potential problem with edge case where the list is bigger then size set Integer.MAX
String uuid = response.hits().hits().get(index++).source().get(UUID).asText();
try {
return GeoNetworkServiceImpl.this.searchRecordBy(uuid);
}
catch(MetadataNotFoundException me) {
// Should be a very rare case where someone deleted the doc in geonetwork
// but the index is not refresh yet, so you will get document not found
String uuid = getUUID(index++);

if(uuid == null) {
return null;
}
else {
// Remember the last UUID
lastUUID.set(uuid);

try {
return GeoNetworkServiceImpl.this.searchRecordBy(uuid);
}
catch(MetadataNotFoundException me) {
// Should be a very rare case where someone deleted the doc in geonetwork
// but the index is not refresh yet, so you will get document not found
return null;
}
}
}

private String getUUID(int index) {
if(response.get().hits().hits().get(index).source() != null
&& response.get().hits().hits().get(index).source().has(UUID)) {

return response.get().hits().hits().get(index++).source().get(UUID).asText();
}
return null;
}
};
}
Expand All @@ -461,7 +517,7 @@ public String next() {
}
}
catch(IOException e) {
throw new RuntimeException("Failed to fetch data from the API");
throw new RuntimeException("Failed to fetch data from GeoNetwork Elastic API, too busy?");
}
}

Expand Down Expand Up @@ -496,4 +552,40 @@ protected String getGeoNetworkGroupsEndpoint() {
protected String getReIndexEndpoint() {
return getServer() + "/geonetwork/srv/api/site/index?reset=false&asynchronous=false";
}
/**
* According to ElasticSearch Doc:
* Avoid using from and size to page too deeply or request too many results at once. Search requests usually
* span multiple shards. Each shard must load its requested hits and the hits for any previous pages into memory.
* For deep pages or large sets of results, these operations can significantly increase memory and CPU usage,
* resulting in degraded performance or node failures.
*
* You can use the search_after parameter to retrieve the next page of hits using a set of sort values
* from the previous page.
*
* Noted that the search must always sort the same way, in this search it is UUID, there will be a very small
* chance that new record added in between calls and therefore sort order changed and some record may skip,
* but not much we can do because it is non-transactional operation.
*
* @param searchAfterUUID The UUID found at the end of the previous search, aka you want result after this UUID
* @return - Another page of UUID
*/
protected SearchRequest createSearchAllUUID(String searchAfterUUID) {

SearchRequest.Builder builder = new SearchRequest.Builder()
.index(indexName)
.query(new MatchAllQuery.Builder().build()._toQuery()) // Match all
.source(s -> s
.filter(f -> f.includes(UUID)))// Only select uuid field
.sort(so -> so.field(FieldSort.of(f -> f.field(UUID).order(SortOrder.Asc))))
.size(ES_PAGE_SIZE);

// Since we only sort by UUID, therefore we only need to searchAfter UUID only.
if(searchAfterUUID != null) {
builder = builder.searchAfter(
List.of(FieldValue.of(searchAfterUUID))
);
}

return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,6 @@ public ResponseEntity<String> deleteDocumentByUUID(String uuid) throws IOExcepti
}

public ResponseEntity<String> indexAllMetadataRecordsFromGeoNetwork(boolean confirm) throws IOException {
BulkRequest.Builder bulkRequest = new BulkRequest.Builder();
if (!confirm) {
throw new IndexAllRequestNotConfirmedException("Please confirm that you want to index all metadata records from GeoNetwork");
}
Expand All @@ -234,15 +233,28 @@ public ResponseEntity<String> indexAllMetadataRecordsFromGeoNetwork(boolean conf

logger.info("Indexing all metadata records from GeoNetwork");

BulkRequest.Builder bulkRequest = new BulkRequest.Builder();
List<BulkResponse> results = new ArrayList<>();

long dataSize = 0;
for (String metadataRecord : geoNetworkResourceService.getAllMetadataRecords()) {
if(metadataRecord != null) {
try {
// get mapped metadata values from GeoNetwork to STAC collection schema
final StacCollectionModel mappedMetadataValues = this.getMappedMetadataValues(metadataRecord);

// convert mapped values to binary data
logger.debug("Ingested json is {}", indexerObjectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(mappedMetadataValues));

int size = indexerObjectMapper.writeValueAsBytes(mappedMetadataValues).length;

// We need to split the batch into smaller size to avoid data too large error in ElasticSearch,
// the limit is 10mb, so to make check before document add and push batch if size is too big
//
// dataSize = 0 is init case, just in case we have a very big doc that exceed the limit
// and we have not add it to the bulkRequest
if(dataSize + size > 100000 && dataSize != 0) {
logger.info("Execute batch as bulk request is big enough {}", dataSize + size);
results.add(executeBulk(bulkRequest));
dataSize = 0;
bulkRequest = new BulkRequest.Builder();
}
// send bulk request to Elasticsearch
bulkRequest.operations(op -> op
.index(idx -> idx
Expand All @@ -251,6 +263,7 @@ public ResponseEntity<String> indexAllMetadataRecordsFromGeoNetwork(boolean conf
.document(mappedMetadataValues)
)
);
dataSize += size;

} catch (FactoryException | JAXBException | TransformException e) {
/* it will reach here if cannot extract values of all the keys in GeoNetwork metadata JSON
Expand All @@ -261,6 +274,16 @@ public ResponseEntity<String> indexAllMetadataRecordsFromGeoNetwork(boolean conf
}
}

// In case there are residual
results.add(executeBulk(bulkRequest));

// TODO now processing for record_suggestions index
logger.info("Finished execute bulk indexing records to index: {}",indexName);

return ResponseEntity.status(HttpStatus.OK).body(results.toString());
}

protected BulkResponse executeBulk(BulkRequest.Builder bulkRequest) throws IOException {
BulkResponse result = portalElasticsearchClient.bulk(bulkRequest.build());

// Flush after insert, otherwise you need to wait for next auto-refresh. It is
Expand Down Expand Up @@ -291,12 +314,7 @@ public ResponseEntity<String> indexAllMetadataRecordsFromGeoNetwork(boolean conf
}
}
}
} else {
logger.info("Finished bulk indexing records to index: " + indexName);
}

// TODO now processing for record_suggestions index

return ResponseEntity.status(HttpStatus.OK).body(result.toString());
return result;
}
}
Loading

0 comments on commit d9269c7

Please sign in to comment.