Skip to content

Commit

Permalink
Fail on any error
Browse files Browse the repository at this point in the history
  • Loading branch information
SimonThordal committed Sep 2, 2024
1 parent a2dd8cf commit 0513ed6
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 22 deletions.
4 changes: 1 addition & 3 deletions yente/provider/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@ async def search(
"""Search for entities in the index."""
raise NotImplementedError

async def bulk_index(
self, entities: AsyncIterator[Dict[str, Any]]
) -> Tuple[int, int]:
async def bulk_index(self, entities: AsyncIterator[Dict[str, Any]]) -> int:
"""Index a list of entities into the search index."""
raise NotImplementedError
13 changes: 4 additions & 9 deletions yente/provider/elastic.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,28 +229,23 @@ async def search(
)
raise YenteIndexError(f"Could not search index: {ae}") from ae

async def bulk_index(
self, entities: AsyncIterator[Dict[str, Any]]
) -> Tuple[int, int]:
async def bulk_index(self, entities: AsyncIterator[Dict[str, Any]]) -> int:
"""Index a list of entities into the search index.
Args:
entities: An async iterator of entities to index.
Returns:
A tuple of the number of entities indexed and the number of errors.
he number of entities indexed
"""
try:
n, errors = await async_bulk(
n, _ = await async_bulk(
self.client(),
entities,
chunk_size=1000,
raise_on_error=False,
)
errors = cast(List[Any], errors)
for error in errors:
log.error("Bulk index error", error=error)
return n, len(errors)
return n

except BulkIndexError as exc:
raise YenteIndexError(f"Could not index entities: {exc}") from exc
12 changes: 3 additions & 9 deletions yente/provider/opensearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,22 +219,16 @@ async def search(
)
raise YenteIndexError(f"Could not search index: {ae}") from ae

async def bulk_index(
self, entities: AsyncIterator[Dict[str, Any]]
) -> Tuple[int, int]:
async def bulk_index(self, entities: AsyncIterator[Dict[str, Any]]) -> int:
"""Index a list of entities into the search index."""
try:
n, errors = await async_bulk(
n, _ = await async_bulk(
self.client,
entities,
chunk_size=1000,
max_retries=3,
initial_backoff=2,
raise_on_error=False,
)
errors = cast(List[Any], errors)
for error in errors:
log.error("Bulk index error", error=error)
return n, len(errors)
return n
except BulkIndexError as exc:
raise YenteIndexError(f"Could not index entities: {exc}") from exc
7 changes: 6 additions & 1 deletion yente/search/indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ async def index_entities(
await provider.create_index(next_index)
try:
docs = iter_entity_docs(updater, next_index)
n_changed, _ = await provider.bulk_index(docs)
n_changed = await provider.bulk_index(docs)
except (
YenteIndexError,
KeyboardInterrupt,
Expand All @@ -155,6 +155,11 @@ async def index_entities(
log.warn("Deleting partial index", index=next_index)
await provider.delete_index(next_index)
if not force:
log.exception(
"Delta indexing error, retrying with full re-index: %r" % exc,
dataset=dataset.name,
index=next_index,
)
return await index_entities(provider, dataset, force=True)
raise exc

Expand Down

0 comments on commit 0513ed6

Please sign in to comment.