Skip to content

Commit

Permalink
fix: retry with full rebuild on errors
Browse files Browse the repository at this point in the history
report back unchanged datasets
  • Loading branch information
SimonThordal committed Aug 14, 2024
1 parent ab7c7c7 commit 9ccbf76
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 19 deletions.
6 changes: 4 additions & 2 deletions yente/provider/base.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from asyncio import Semaphore
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Tuple
from typing import AsyncIterator

from yente import settings
Expand Down Expand Up @@ -60,6 +60,8 @@ async def search(
"""Search for entities in the index."""
raise NotImplementedError

async def bulk_index(self, entities: AsyncIterator[Dict[str, Any]]) -> None:
async def bulk_index(
self, entities: AsyncIterator[Dict[str, Any]]
) -> Tuple[int, int]:
"""Index a list of entities into the search index."""
raise NotImplementedError
25 changes: 19 additions & 6 deletions yente/provider/elastic.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import json
import asyncio
import warnings
from typing import Any, Dict, List, Optional, cast
from typing import Any, Dict, List, Optional, cast, Tuple
from typing import AsyncIterator
from elasticsearch import AsyncElasticsearch, ElasticsearchWarning
from elasticsearch.helpers import async_bulk, BulkIndexError
Expand Down Expand Up @@ -229,15 +229,28 @@ async def search(
)
raise YenteIndexError(f"Could not search index: {ae}") from ae

async def bulk_index(self, entities: AsyncIterator[Dict[str, Any]]) -> None:
"""Index a list of entities into the search index."""
async def bulk_index(
self, entities: AsyncIterator[Dict[str, Any]]
) -> Tuple[int, int]:
"""Index a list of entities into the search index.
Args:
entities: An async iterator of entities to index.
Returns:
A tuple of the number of entities indexed and the number of errors.
"""
try:
await async_bulk(
n, errors = await async_bulk(
self.client(),
entities,
chunk_size=1000,
yield_ok=False,
stats_only=True,
raise_on_error=False,
)
errors = cast(List[Any], errors)
for error in errors:
log.error("Bulk index error", error=error)
return n, len(errors)

except BulkIndexError as exc:
raise YenteIndexError(f"Could not index entities: {exc}") from exc
15 changes: 10 additions & 5 deletions yente/provider/opensearch.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import json
import asyncio
import logging
from typing import Any, Dict, List, Optional, cast
from typing import Any, Dict, List, Optional, cast, Tuple
from typing import AsyncIterator
from opensearchpy import AsyncOpenSearch, AWSV4SignerAuth
from opensearchpy.helpers import async_bulk, BulkIndexError
Expand Down Expand Up @@ -219,17 +219,22 @@ async def search(
)
raise YenteIndexError(f"Could not search index: {ae}") from ae

async def bulk_index(self, entities: AsyncIterator[Dict[str, Any]]) -> None:
async def bulk_index(
self, entities: AsyncIterator[Dict[str, Any]]
) -> Tuple[int, int]:
"""Index a list of entities into the search index."""
try:
await async_bulk(
n, errors = await async_bulk(
self.client,
entities,
chunk_size=1000,
yield_ok=False,
stats_only=True,
max_retries=3,
initial_backoff=2,
raise_on_error=False,
)
errors = cast(List[Any], errors)
for error in errors:
log.error("Bulk index error", error=error)
return n, len(errors)
except BulkIndexError as exc:
raise YenteIndexError(f"Could not index entities: {exc}") from exc
15 changes: 9 additions & 6 deletions yente/search/indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,17 +134,14 @@ async def index_entities(
if not force and await provider.exists_index_alias(alias, next_index):
log.info("Index is up to date.", index=next_index)
return False

# await es.indices.delete(index=next_index)
if updater.is_incremental and not force:
base_index = construct_index_name(dataset.name, updater.base_version)
await provider.clone_index(base_index, next_index)
else:
await provider.create_index(next_index)

try:
docs = iter_entity_docs(updater, next_index)
await provider.bulk_index(docs)
n_changed, _ = await provider.bulk_index(docs)
except (
YenteIndexError,
KeyboardInterrupt,
Expand All @@ -162,18 +159,24 @@ async def index_entities(
if next_index not in aliases:
log.warn("Deleting partial index", index=next_index)
await provider.delete_index(next_index)
if not force:
return await index_entities(provider, dataset, force=True)
return False

await provider.refresh(index=next_index)
dataset_prefix = construct_index_name(dataset.name)
# FIXME: we're not actually deleting old indexes here any more!
await provider.rollover_index(
alias,
next_index,
prefix=dataset_prefix,
)
# Delete any old indices that are no longer aliased to the current index.
for index in await provider.get_all_indices():
if index.startswith(dataset_prefix) and index != next_index:
log.info("Deleting old index", index=index)
await provider.delete_index(index)
log.info("Index is now aliased to: %s" % alias, index=next_index)
return True
return n_changed > 0


async def delete_old_indices(provider: SearchProvider, catalog: Catalog) -> None:
Expand Down

0 comments on commit 9ccbf76

Please sign in to comment.