diff --git a/bio2zarr/vcf2zarr/vcz.py b/bio2zarr/vcf2zarr/vcz.py index bc89fa4..8ba7a31 100644 --- a/bio2zarr/vcf2zarr/vcz.py +++ b/bio2zarr/vcf2zarr/vcz.py @@ -15,6 +15,7 @@ from zarr.codecs import BloscCodec, BytesCodec from .. import constants, core, provenance +from ..zarr_v3_utils import VLenUTF8Codec from . import icf logger = logging.getLogger(__name__) @@ -34,6 +35,8 @@ def inspect(path): DEFAULT_ZARR_COMPRESSOR = numcodecs.Blosc(cname="zstd", clevel=7) DEFAULT_ZARR_CODECS = [BytesCodec(), BloscCodec(cname="lz4", clevel=7)] +STRING_ZARR_CODECS = [VLenUTF8Codec(), BloscCodec(cname="lz4", clevel=7)] + _fixed_field_descriptions = { "variant_contig": "An identifier from the reference genome or an angle-bracketed ID" @@ -574,12 +577,12 @@ def init( def encode_samples(self, root): if self.schema.samples != self.icf.metadata.samples: raise ValueError("Subsetting or reordering samples not supported currently") - data = np.array([sample.id for sample in self.schema.samples], dtype=str) + data = np.array([sample.id for sample in self.schema.samples], dtype=object) array = root.create_array( "sample_id", shape=data.shape, dtype=data.dtype, - codecs=DEFAULT_ZARR_CODECS, + codecs=STRING_ZARR_CODECS, chunks=(self.schema.samples_chunk_size,), ) array[...] = data @@ -587,14 +590,15 @@ def encode_samples(self, root): logger.debug("Samples done") def encode_contig_id(self, root): - data = np.array([contig.id for contig in self.schema.contigs], dtype=str) + data = np.array([contig.id for contig in self.schema.contigs], dtype=object) array = root.create_array( "contig_id", shape=data.shape, dtype=data.dtype, - codecs=DEFAULT_ZARR_CODECS, + codecs=STRING_ZARR_CODECS, chunks=data.shape, # no chunking ) + array[...] = data array.attrs["_ARRAY_DIMENSIONS"] = ["contigs"] if all(contig.length is not None for contig in self.schema.contigs): data = np.array( @@ -604,9 +608,10 @@ def encode_contig_id(self, root): "contig_length", shape=data.shape, dtype=data.dtype, - compressor=DEFAULT_ZARR_CODECS, + codecs=DEFAULT_ZARR_CODECS, chunks=data.shape, # no chunking ) + array[...] = data array.attrs["_ARRAY_DIMENSIONS"] = ["contigs"] def encode_filter_id(self, root): @@ -617,23 +622,29 @@ def encode_filter_id(self, root): "filter_id", shape=data.shape, dtype=data.dtype, - codecs=DEFAULT_ZARR_CODECS, + codecs=STRING_ZARR_CODECS, chunks=data.shape, # no chunking ) + array[...] = data array.attrs["_ARRAY_DIMENSIONS"] = ["filters"] def init_array(self, root, array_spec, variants_dim_size): object_codec = None if array_spec.dtype == "O": object_codec = numcodecs.VLenUTF8() + codecs = STRING_ZARR_CODECS + else: + codecs = DEFAULT_ZARR_CODECS shape = list(array_spec.shape) # Truncate the variants dimension is max_variant_chunks was specified shape[0] = variants_dim_size + compressor = numcodecs.get_codec(array_spec.compressor) a = root.create_array( # empty raises NotImplemented array_spec.name, shape=shape, chunks=array_spec.chunks, dtype=array_spec.dtype, + codecs=codecs, # TODO # compressor=numcodecs.get_codec(array_spec.compressor), # filters=[numcodecs.get_codec(filt) for filt in array_spec.filters], @@ -915,7 +926,7 @@ def finalise(self, show_progress=False): logger.debug(f"Removing {self.wip_path}") shutil.rmtree(self.wip_path) logger.info("Consolidating Zarr metadata") - zarr.consolidate_metadata(self.path) + # zarr.consolidate_metadata(self.path) ###################### # encode_all_partitions diff --git a/bio2zarr/vcf2zarr/verification.py b/bio2zarr/vcf2zarr/verification.py index fb68d70..e2e2bfc 100644 --- a/bio2zarr/vcf2zarr/verification.py +++ b/bio2zarr/vcf2zarr/verification.py @@ -167,7 +167,7 @@ def verify(vcf_path, zarr_path, show_progress=False): format_fields = {} info_fields = {} - for colname in root.keys(): + for colname in root.group_keys(): if colname.startswith("call") and not colname.startswith("call_genotype"): vcf_name = colname.split("_", 1)[1] vcf_type = format_headers[vcf_name]["Type"] diff --git a/bio2zarr/zarr_v3_utils.py b/bio2zarr/zarr_v3_utils.py new file mode 100644 index 0000000..c79ce06 --- /dev/null +++ b/bio2zarr/zarr_v3_utils.py @@ -0,0 +1,68 @@ +from dataclasses import dataclass + +import numcodecs +from numcodecs.compat import ensure_bytes, ensure_ndarray +from zarr.abc.codec import ArrayBytesCodec +from zarr.array_spec import ArraySpec +from zarr.buffer import Buffer, NDBuffer +from zarr.codecs.registry import register_codec +from zarr.common import JSON, to_thread + + +@dataclass(frozen=True) +class VLenUTF8Codec(ArrayBytesCodec): + is_fixed_size = False + + def __init__(self, *args, **kwargs) -> None: + pass + + def to_dict(self) -> dict[str, JSON]: + return {"name": "vlen-utf8", "compressor": {"id": "vlen-utf8"}} + + async def _decode_single( + self, + chunk_bytes: Buffer, + chunk_spec: ArraySpec, + ) -> NDBuffer: + compressor = numcodecs.get_codec(dict(id="vlen-utf8")) + chunk_numpy_array = ensure_ndarray( + await to_thread(compressor.decode, chunk_bytes.as_array_like()) + ) + + # ensure correct dtype + if str(chunk_numpy_array.dtype) != chunk_spec.dtype: + chunk_numpy_array = chunk_numpy_array.view(chunk_spec.dtype) + + # ensure correct chunk shape + if chunk_numpy_array.shape != chunk_spec.shape: + chunk_numpy_array = chunk_numpy_array.reshape( + chunk_spec.shape, + ) + + return NDBuffer.from_numpy_array(chunk_numpy_array) + + async def _encode_single( + self, + chunk_array: NDBuffer, + _chunk_spec: ArraySpec, + ) -> Buffer | None: + chunk_numpy_array = chunk_array.as_numpy_array() + compressor = numcodecs.get_codec(dict(id="vlen-utf8")) + if ( + not chunk_numpy_array.flags.c_contiguous + and not chunk_numpy_array.flags.f_contiguous + ): + chunk_numpy_array = chunk_numpy_array.copy(order="A") + encoded_chunk_bytes = ensure_bytes( + await to_thread(compressor.encode, chunk_numpy_array) + ) + + return Buffer.from_bytes(encoded_chunk_bytes) + + def compute_encoded_size( + self, _input_byte_length: int, _chunk_spec: ArraySpec + ) -> int: + raise NotImplementedError + + +register_codec("vlen-utf8", VLenUTF8Codec)