Skip to content

Commit

Permalink
Create a VLenUTF8Codec
Browse files Browse the repository at this point in the history
  • Loading branch information
tomwhite committed Jul 11, 2024
1 parent c7ad572 commit 18704d4
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 8 deletions.
25 changes: 18 additions & 7 deletions bio2zarr/vcf2zarr/vcz.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from zarr.codecs import BloscCodec, BytesCodec

from .. import constants, core, provenance
from ..zarr_v3_utils import VLenUTF8Codec
from . import icf

logger = logging.getLogger(__name__)
Expand All @@ -34,6 +35,8 @@ def inspect(path):

DEFAULT_ZARR_COMPRESSOR = numcodecs.Blosc(cname="zstd", clevel=7)
DEFAULT_ZARR_CODECS = [BytesCodec(), BloscCodec(cname="lz4", clevel=7)]
STRING_ZARR_CODECS = [VLenUTF8Codec(), BloscCodec(cname="lz4", clevel=7)]


_fixed_field_descriptions = {
"variant_contig": "An identifier from the reference genome or an angle-bracketed ID"
Expand Down Expand Up @@ -574,27 +577,28 @@ def init(
def encode_samples(self, root):
if self.schema.samples != self.icf.metadata.samples:
raise ValueError("Subsetting or reordering samples not supported currently")
data = np.array([sample.id for sample in self.schema.samples], dtype=str)
data = np.array([sample.id for sample in self.schema.samples], dtype=object)
array = root.create_array(
"sample_id",
shape=data.shape,
dtype=data.dtype,
codecs=DEFAULT_ZARR_CODECS,
codecs=STRING_ZARR_CODECS,
chunks=(self.schema.samples_chunk_size,),
)
array[...] = data
array.attrs["_ARRAY_DIMENSIONS"] = ["samples"]
logger.debug("Samples done")

def encode_contig_id(self, root):
data = np.array([contig.id for contig in self.schema.contigs], dtype=str)
data = np.array([contig.id for contig in self.schema.contigs], dtype=object)
array = root.create_array(
"contig_id",
shape=data.shape,
dtype=data.dtype,
codecs=DEFAULT_ZARR_CODECS,
codecs=STRING_ZARR_CODECS,
chunks=data.shape, # no chunking
)
array[...] = data
array.attrs["_ARRAY_DIMENSIONS"] = ["contigs"]
if all(contig.length is not None for contig in self.schema.contigs):
data = np.array(
Expand All @@ -604,9 +608,10 @@ def encode_contig_id(self, root):
"contig_length",
shape=data.shape,
dtype=data.dtype,
compressor=DEFAULT_ZARR_CODECS,
codecs=DEFAULT_ZARR_CODECS,
chunks=data.shape, # no chunking
)
array[...] = data
array.attrs["_ARRAY_DIMENSIONS"] = ["contigs"]

def encode_filter_id(self, root):
Expand All @@ -617,23 +622,29 @@ def encode_filter_id(self, root):
"filter_id",
shape=data.shape,
dtype=data.dtype,
codecs=DEFAULT_ZARR_CODECS,
codecs=STRING_ZARR_CODECS,
chunks=data.shape, # no chunking
)
array[...] = data
array.attrs["_ARRAY_DIMENSIONS"] = ["filters"]

def init_array(self, root, array_spec, variants_dim_size):
object_codec = None
if array_spec.dtype == "O":
object_codec = numcodecs.VLenUTF8()
codecs = STRING_ZARR_CODECS
else:
codecs = DEFAULT_ZARR_CODECS
shape = list(array_spec.shape)
# Truncate the variants dimension is max_variant_chunks was specified
shape[0] = variants_dim_size
compressor = numcodecs.get_codec(array_spec.compressor)
a = root.create_array( # empty raises NotImplemented
array_spec.name,
shape=shape,
chunks=array_spec.chunks,
dtype=array_spec.dtype,
codecs=codecs,
# TODO
# compressor=numcodecs.get_codec(array_spec.compressor),
# filters=[numcodecs.get_codec(filt) for filt in array_spec.filters],
Expand Down Expand Up @@ -915,7 +926,7 @@ def finalise(self, show_progress=False):
logger.debug(f"Removing {self.wip_path}")
shutil.rmtree(self.wip_path)
logger.info("Consolidating Zarr metadata")
zarr.consolidate_metadata(self.path)
# zarr.consolidate_metadata(self.path)

######################
# encode_all_partitions
Expand Down
2 changes: 1 addition & 1 deletion bio2zarr/vcf2zarr/verification.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ def verify(vcf_path, zarr_path, show_progress=False):

format_fields = {}
info_fields = {}
for colname in root.keys():
for colname in root.group_keys():
if colname.startswith("call") and not colname.startswith("call_genotype"):
vcf_name = colname.split("_", 1)[1]
vcf_type = format_headers[vcf_name]["Type"]
Expand Down
68 changes: 68 additions & 0 deletions bio2zarr/zarr_v3_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
from dataclasses import dataclass

import numcodecs
from numcodecs.compat import ensure_bytes, ensure_ndarray
from zarr.abc.codec import ArrayBytesCodec
from zarr.array_spec import ArraySpec
from zarr.buffer import Buffer, NDBuffer
from zarr.codecs.registry import register_codec
from zarr.common import JSON, to_thread


@dataclass(frozen=True)
class VLenUTF8Codec(ArrayBytesCodec):
is_fixed_size = False

def __init__(self, *args, **kwargs) -> None:
pass

def to_dict(self) -> dict[str, JSON]:
return {"name": "vlen-utf8", "compressor": {"id": "vlen-utf8"}}

async def _decode_single(
self,
chunk_bytes: Buffer,
chunk_spec: ArraySpec,
) -> NDBuffer:
compressor = numcodecs.get_codec(dict(id="vlen-utf8"))
chunk_numpy_array = ensure_ndarray(
await to_thread(compressor.decode, chunk_bytes.as_array_like())
)

# ensure correct dtype
if str(chunk_numpy_array.dtype) != chunk_spec.dtype:
chunk_numpy_array = chunk_numpy_array.view(chunk_spec.dtype)

# ensure correct chunk shape
if chunk_numpy_array.shape != chunk_spec.shape:
chunk_numpy_array = chunk_numpy_array.reshape(
chunk_spec.shape,
)

return NDBuffer.from_numpy_array(chunk_numpy_array)

async def _encode_single(
self,
chunk_array: NDBuffer,
_chunk_spec: ArraySpec,
) -> Buffer | None:
chunk_numpy_array = chunk_array.as_numpy_array()
compressor = numcodecs.get_codec(dict(id="vlen-utf8"))
if (
not chunk_numpy_array.flags.c_contiguous
and not chunk_numpy_array.flags.f_contiguous
):
chunk_numpy_array = chunk_numpy_array.copy(order="A")
encoded_chunk_bytes = ensure_bytes(
await to_thread(compressor.encode, chunk_numpy_array)
)

return Buffer.from_bytes(encoded_chunk_bytes)

def compute_encoded_size(
self, _input_byte_length: int, _chunk_spec: ArraySpec
) -> int:
raise NotImplementedError


register_codec("vlen-utf8", VLenUTF8Codec)

0 comments on commit 18704d4

Please sign in to comment.