Skip to content

Commit

Permalink
Add array storage helpers (#2065)
Browse files Browse the repository at this point in the history
* implement store.list_prefix and store._set_dict

* simplify string handling

* add nchunks_initialized, and necessary additions for it

* rename _iter_chunks to _iter_chunk_coords

* fix test name

* bring in correct store list_dir implementations

* bump numcodecs to dodge zstd exception

* remove store._set_dict, and add _set_many and get_many instead

* update deprecation warning template

* add a type annotation

* refactor chunk iterators. they are not properties any more, just methods, and they can take an origin kwarg

* _get_many returns tuple[str, buffer]

* stricter store types

* fix types

* lint

* remove deprecation warnings

* fix zip list_prefix

* tests for nchunks_initialized, chunks_initialized; add selection_shape kwarg to grid iteration; make chunk grid iterators consistent for array and async array

* add nchunks test

* fix docstrings

* fix docstring

* revert unnecessary changes to project config
  • Loading branch information
d-v-b authored Sep 26, 2024
1 parent 19ed733 commit f0443db
Show file tree
Hide file tree
Showing 16 changed files with 578 additions and 52 deletions.
47 changes: 34 additions & 13 deletions src/zarr/abc/store.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
from __future__ import annotations

from abc import ABC, abstractmethod
from asyncio import gather
from collections.abc import AsyncGenerator, Iterable
from types import TracebackType
from typing import Any, NamedTuple, Protocol, runtime_checkable
from typing import TYPE_CHECKING, Any, NamedTuple, Protocol, runtime_checkable

if TYPE_CHECKING:
from collections.abc import AsyncGenerator, Iterable
from types import TracebackType
from typing import Any, TypeAlias

from typing_extensions import Self
from typing_extensions import Self

from zarr.core.buffer import Buffer, BufferPrototype
from zarr.core.common import AccessModeLiteral, BytesLike
from zarr.core.buffer import Buffer, BufferPrototype
from zarr.core.common import AccessModeLiteral, BytesLike

__all__ = ["Store", "AccessMode", "ByteGetter", "ByteSetter", "set_or_delete"]

ByteRangeRequest: TypeAlias = tuple[int | None, int | None]


class AccessMode(NamedTuple):
str: AccessModeLiteral
Expand Down Expand Up @@ -100,14 +108,14 @@ async def get(
self,
key: str,
prototype: BufferPrototype,
byte_range: tuple[int | None, int | None] | None = None,
byte_range: ByteRangeRequest | None = None,
) -> Buffer | None:
"""Retrieve the value associated with a given key.
Parameters
----------
key : str
byte_range : tuple[int, Optional[int]], optional
byte_range : tuple[int | None, int | None], optional
Returns
-------
Expand All @@ -119,13 +127,13 @@ async def get(
async def get_partial_values(
self,
prototype: BufferPrototype,
key_ranges: list[tuple[str, tuple[int | None, int | None]]],
key_ranges: Iterable[tuple[str, ByteRangeRequest]],
) -> list[Buffer | None]:
"""Retrieve possibly partial values from given key_ranges.
Parameters
----------
key_ranges : list[tuple[str, tuple[int, int]]]
key_ranges : Iterable[tuple[str, tuple[int | None, int | None]]]
Ordered set of key, range pairs, a key may occur multiple times with different ranges
Returns
Expand Down Expand Up @@ -195,7 +203,9 @@ def supports_partial_writes(self) -> bool:
...

@abstractmethod
async def set_partial_values(self, key_start_values: list[tuple[str, int, BytesLike]]) -> None:
async def set_partial_values(
self, key_start_values: Iterable[tuple[str, int, BytesLike]]
) -> None:
"""Store values at a given key, starting at byte range_start.
Parameters
Expand Down Expand Up @@ -259,21 +269,32 @@ def close(self) -> None:
"""Close the store."""
self._is_open = False

async def _get_many(
self, requests: Iterable[tuple[str, BufferPrototype, ByteRangeRequest | None]]
) -> AsyncGenerator[tuple[str, Buffer | None], None]:
"""
Retrieve a collection of objects from storage. In general this method does not guarantee
that objects will be retrieved in the order in which they were requested, so this method
yields tuple[str, Buffer | None] instead of just Buffer | None
"""
for req in requests:
yield (req[0], await self.get(*req))


@runtime_checkable
class ByteGetter(Protocol):
async def get(
self, prototype: BufferPrototype, byte_range: tuple[int, int | None] | None = None
self, prototype: BufferPrototype, byte_range: ByteRangeRequest | None = None
) -> Buffer | None: ...


@runtime_checkable
class ByteSetter(Protocol):
async def get(
self, prototype: BufferPrototype, byte_range: tuple[int, int | None] | None = None
self, prototype: BufferPrototype, byte_range: ByteRangeRequest | None = None
) -> Buffer | None: ...

async def set(self, value: Buffer, byte_range: tuple[int, int] | None = None) -> None: ...
async def set(self, value: Buffer, byte_range: ByteRangeRequest | None = None) -> None: ...

async def delete(self) -> None: ...

Expand Down
6 changes: 3 additions & 3 deletions src/zarr/codecs/sharding.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
Codec,
CodecPipeline,
)
from zarr.abc.store import ByteGetter, ByteSetter
from zarr.abc.store import ByteGetter, ByteRangeRequest, ByteSetter
from zarr.codecs.bytes import BytesCodec
from zarr.codecs.crc32c_ import Crc32cCodec
from zarr.core.array_spec import ArraySpec
Expand Down Expand Up @@ -78,7 +78,7 @@ class _ShardingByteGetter(ByteGetter):
chunk_coords: ChunkCoords

async def get(
self, prototype: BufferPrototype, byte_range: tuple[int, int | None] | None = None
self, prototype: BufferPrototype, byte_range: ByteRangeRequest | None = None
) -> Buffer | None:
assert byte_range is None, "byte_range is not supported within shards"
assert (
Expand All @@ -91,7 +91,7 @@ async def get(
class _ShardingByteSetter(_ShardingByteGetter, ByteSetter):
shard_dict: ShardMutableMapping

async def set(self, value: Buffer, byte_range: tuple[int, int] | None = None) -> None:
async def set(self, value: Buffer, byte_range: ByteRangeRequest | None = None) -> None:
assert byte_range is None, "byte_range is not supported within shards"
self.shard_dict[self.chunk_coords] = value

Expand Down
Loading

0 comments on commit f0443db

Please sign in to comment.