diff --git a/python/zarrs/_internal.pyi b/python/zarrs/_internal.pyi index 0e4e76f..4af635a 100644 --- a/python/zarrs/_internal.pyi +++ b/python/zarrs/_internal.pyi @@ -8,8 +8,15 @@ import numpy.typing import zarr.abc.store @typing.final -class Basic: - def __new__(cls, byte_interface: typing.Any, chunk_spec: typing.Any) -> Basic: ... +class ChunkItem: + def __new__( + cls, + key: builtins.str, + chunk_subset: typing.Sequence[slice], + chunk_shape: typing.Sequence[builtins.int], + subset: typing.Sequence[slice], + shape: typing.Sequence[builtins.int], + ) -> ChunkItem: ... @typing.final class CodecPipelineImpl: @@ -26,22 +33,12 @@ class CodecPipelineImpl: ) -> CodecPipelineImpl: ... def retrieve_chunks_and_apply_index( self, - chunk_descriptions: typing.Sequence[WithSubset], + chunk_descriptions: typing.Sequence[ChunkItem], value: numpy.typing.NDArray[typing.Any], ) -> None: ... def store_chunks_with_indices( self, - chunk_descriptions: typing.Sequence[WithSubset], + chunk_descriptions: typing.Sequence[ChunkItem], value: numpy.typing.NDArray[typing.Any], write_empty_chunks: builtins.bool, ) -> None: ... - -@typing.final -class WithSubset: - def __new__( - cls, - item: Basic, - chunk_subset: typing.Sequence[slice], - subset: typing.Sequence[slice], - shape: typing.Sequence[builtins.int], - ) -> WithSubset: ... diff --git a/python/zarrs/utils.py b/python/zarrs/utils.py index 114d30c..2f483de 100644 --- a/python/zarrs/utils.py +++ b/python/zarrs/utils.py @@ -10,7 +10,7 @@ from zarr.core.array_spec import ArraySpec from zarr.core.indexing import SelectorTuple, is_integer -from zarrs._internal import Basic, WithSubset +from zarrs._internal import ChunkItem if TYPE_CHECKING: from collections.abc import Iterable @@ -148,7 +148,7 @@ def get_implicit_fill_value(dtype: ZDType, fill_value: Any) -> Any: @dataclass(frozen=True) class RustChunkInfo: - chunk_info_with_indices: list[WithSubset] + chunk_info_with_indices: list[ChunkItem] write_empty_chunks: bool @@ -160,7 +160,7 @@ def make_chunk_info_for_rust_with_indices( shape: tuple[int, ...], ) -> RustChunkInfo: shape = shape if shape else (1,) # constant array - chunk_info_with_indices: list[WithSubset] = [] + chunk_info_with_indices: list[ChunkItem] = [] write_empty_chunks: bool = True for ( byte_getter, @@ -178,7 +178,6 @@ def make_chunk_info_for_rust_with_indices( chunk_spec.config, chunk_spec.prototype, ) - chunk_info = Basic(byte_getter, chunk_spec) out_selection_as_slices = selector_tuple_to_slice_selection(out_selection) chunk_selection_as_slices = selector_tuple_to_slice_selection(chunk_selection) shape_chunk_selection_slices = get_shape_for_selector( @@ -195,9 +194,10 @@ def make_chunk_info_for_rust_with_indices( f"{shape_chunk_selection} != {shape_chunk_selection_slices}" ) chunk_info_with_indices.append( - WithSubset( - chunk_info, + ChunkItem( + key=byte_getter.path, chunk_subset=chunk_selection_as_slices, + chunk_shape=chunk_spec.shape, subset=out_selection_as_slices, shape=shape, ) diff --git a/src/chunk_item.rs b/src/chunk_item.rs index 133703b..b759120 100644 --- a/src/chunk_item.rs +++ b/src/chunk_item.rs @@ -1,124 +1,56 @@ use std::num::NonZeroU64; use pyo3::{ - Bound, PyAny, PyErr, PyResult, - exceptions::{PyIndexError, PyRuntimeError, PyValueError}, + Bound, PyErr, PyResult, + exceptions::{PyIndexError, PyValueError}, pyclass, pymethods, - types::{PyAnyMethods, PyBytes, PyBytesMethods, PyInt, PySlice, PySliceMethods as _}, + types::{PySlice, PySliceMethods as _}, }; use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pymethods}; -use zarrs::{ - array::{ArraySubset, ChunkShape, DataType, FillValue}, - metadata::v3::MetadataV3, - storage::StoreKey, -}; +use zarrs::{array::ArraySubset, storage::StoreKey}; use crate::utils::PyErrExt; -pub(crate) trait ChunksItem { - fn key(&self) -> &StoreKey; - fn shape(&self) -> &[NonZeroU64]; - fn data_type(&self) -> &DataType; - fn fill_value(&self) -> &FillValue; -} - -#[derive(Clone)] -#[gen_stub_pyclass] -#[pyclass] -pub(crate) struct Basic { - key: StoreKey, - shape: ChunkShape, - data_type: DataType, - fill_value: FillValue, -} - -fn fill_value_to_bytes(dtype: &str, fill_value: &Bound<'_, PyAny>) -> PyResult> { - if dtype == "string" { - // Match zarr-python 2.x.x string fill value behaviour with a 0 fill value - // See https://github.com/zarr-developers/zarr-python/issues/2792#issuecomment-2644362122 - if let Ok(fill_value_downcast) = fill_value.cast::() { - let fill_value_usize: usize = fill_value_downcast.extract()?; - if fill_value_usize == 0 { - return Ok(vec![]); - } - Err(PyErr::new::(format!( - "Cannot understand non-zero integer {fill_value_usize} fill value for dtype {dtype}" - )))?; - } - } - - if let Ok(fill_value_downcast) = fill_value.cast::() { - Ok(fill_value_downcast.as_bytes().to_vec()) - } else if fill_value.hasattr("tobytes")? { - Ok(fill_value.call_method0("tobytes")?.extract()?) - } else { - Err(PyErr::new::(format!( - "Unsupported fill value {fill_value:?}" - ))) - } -} - -#[gen_stub_pymethods] -#[pymethods] -impl Basic { - #[new] - fn new(byte_interface: &Bound<'_, PyAny>, chunk_spec: &Bound<'_, PyAny>) -> PyResult { - let path: String = byte_interface.getattr("path")?.extract()?; - - let shape: Vec = chunk_spec.getattr("shape")?.extract()?; - - let mut dtype: String = chunk_spec - .getattr("dtype")? - .call_method0("to_native_dtype")? - .call_method0("__str__")? - .extract()?; - if dtype == "object" { - // zarrs doesn't understand `object` which is the output of `np.dtype("|O").__str__()` - // but maps it to "string" internally https://github.com/LDeakin/zarrs/blob/0532fe983b7b42b59dbf84e50a2fe5e6f7bad4ce/zarrs_metadata/src/v2_to_v3.rs#L288 - dtype = String::from("string"); - } - let data_type = get_data_type_from_dtype(&dtype)?; - let fill_value: Bound<'_, PyAny> = chunk_spec.getattr("fill_value")?; - let fill_value = FillValue::new(fill_value_to_bytes(&dtype, &fill_value)?); - Ok(Self { - key: StoreKey::new(path).map_py_err::()?, - shape, - data_type, - fill_value, +fn to_nonzero_u64_vec(v: Vec) -> PyResult> { + v.into_iter() + .map(|dim| { + NonZeroU64::new(dim).ok_or_else(|| { + PyErr::new::( + "subset dimensions must be greater than zero".to_string(), + ) + }) }) - } + .collect::>>() } #[derive(Clone)] #[gen_stub_pyclass] #[pyclass] -pub(crate) struct WithSubset { - pub item: Basic, +pub(crate) struct ChunkItem { + pub key: StoreKey, pub chunk_subset: ArraySubset, pub subset: ArraySubset, + pub shape: Vec, + pub num_elements: u64, } #[gen_stub_pymethods] #[pymethods] -impl WithSubset { +impl ChunkItem { #[new] #[allow(clippy::needless_pass_by_value)] fn new( - item: Basic, + key: String, chunk_subset: Vec>, + chunk_shape: Vec, subset: Vec>, shape: Vec, ) -> PyResult { - let chunk_subset = selection_to_array_subset(&chunk_subset, &item.shape)?; - let shape: Vec = shape - .into_iter() - .map(|dim| { - NonZeroU64::new(dim) - .ok_or("subset dimensions must be greater than zero") - .map_py_err::() - }) - .collect::>>()?; - let subset = selection_to_array_subset(&subset, &shape)?; + let num_elements = chunk_shape.iter().product(); + let shape_nonzero_u64 = to_nonzero_u64_vec(shape)?; + let chunk_shape_nonzero_u64 = to_nonzero_u64_vec(chunk_shape)?; + let chunk_subset = selection_to_array_subset(&chunk_subset, &chunk_shape_nonzero_u64)?; + let subset = selection_to_array_subset(&subset, &shape_nonzero_u64)?; // Check that subset and chunk_subset have the same number of elements. // This permits broadcasting of a constant input. if subset.num_elements() != chunk_subset.num_elements() && subset.num_elements() > 1 { @@ -126,50 +58,17 @@ impl WithSubset { "the size of the chunk subset {chunk_subset} and input/output subset {subset} are incompatible", ))); } + Ok(Self { - item, + key: StoreKey::new(key).map_py_err::()?, chunk_subset, subset, + shape: chunk_shape_nonzero_u64, + num_elements, }) } } -impl ChunksItem for Basic { - fn key(&self) -> &StoreKey { - &self.key - } - fn shape(&self) -> &[NonZeroU64] { - &self.shape - } - fn data_type(&self) -> &DataType { - &self.data_type - } - fn fill_value(&self) -> &FillValue { - &self.fill_value - } -} - -impl ChunksItem for WithSubset { - fn key(&self) -> &StoreKey { - &self.item.key - } - fn shape(&self) -> &[NonZeroU64] { - &self.item.shape - } - fn data_type(&self) -> &DataType { - &self.item.data_type - } - fn fill_value(&self) -> &FillValue { - &self.item.fill_value - } -} - -fn get_data_type_from_dtype(dtype: &str) -> PyResult { - let data_type = - DataType::from_metadata(&MetadataV3::new(dtype)).map_py_err::()?; - Ok(data_type) -} - fn slice_to_range(slice: &Bound<'_, PySlice>, length: isize) -> PyResult> { let indices = slice.indices(length)?; if indices.start < 0 { diff --git a/src/concurrency.rs b/src/concurrency.rs index bece382..08b9dca 100644 --- a/src/concurrency.rs +++ b/src/concurrency.rs @@ -4,7 +4,7 @@ use zarrs::array::{ concurrency::calc_concurrency_outer_inner, }; -use crate::{CodecPipelineImpl, chunk_item::ChunksItem, utils::PyCodecErrExt as _}; +use crate::{CodecPipelineImpl, chunk_item::ChunkItem, utils::PyCodecErrExt as _}; pub trait ChunkConcurrentLimitAndCodecOptions { fn get_chunk_concurrent_limit_and_codec_options( @@ -13,22 +13,19 @@ pub trait ChunkConcurrentLimitAndCodecOptions { ) -> PyResult>; } -impl ChunkConcurrentLimitAndCodecOptions for Vec -where - T: ChunksItem, -{ +impl ChunkConcurrentLimitAndCodecOptions for Vec { fn get_chunk_concurrent_limit_and_codec_options( &self, codec_pipeline_impl: &CodecPipelineImpl, ) -> PyResult> { let num_chunks = self.len(); - let Some(chunk_descriptions0) = self.first() else { + let Some(item) = self.first() else { return Ok(None); }; let codec_concurrency = codec_pipeline_impl .codec_chain - .recommended_concurrency(chunk_descriptions0.shape(), chunk_descriptions0.data_type()) + .recommended_concurrency(&item.shape, &codec_pipeline_impl.data_type) .map_codec_err()?; let min_concurrent_chunks = diff --git a/src/lib.rs b/src/lib.rs index df3f669..1b7c08e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,7 +6,7 @@ use std::collections::HashMap; use std::ptr::NonNull; use std::sync::Arc; -use chunk_item::WithSubset; +use chunk_item::ChunkItem; use itertools::Itertools; use numpy::npyffi::PyArrayObject; use numpy::{PyArrayDescrMethods, PyUntypedArray, PyUntypedArrayMethods}; @@ -20,15 +20,12 @@ use unsafe_cell_slice::UnsafeCellSlice; use utils::is_whole_chunk; use zarrs::array::{ ArrayBytes, ArrayBytesDecodeIntoTarget, ArrayBytesFixedDisjointView, ArrayMetadata, - ArrayPartialDecoderTraits, ArraySubset, ArrayToBytesCodecTraits, ChunkShapeTraits, CodecChain, - CodecOptions, FillValue, StoragePartialDecoder, copy_fill_value_into, update_array_bytes, + ArrayPartialDecoderTraits, ArrayToBytesCodecTraits, CodecChain, CodecOptions, DataType, + FillValue, StoragePartialDecoder, copy_fill_value_into, update_array_bytes, }; use zarrs::config::global_config; -use zarrs::convert::{ - ArrayMetadataV2ToV3Error, codec_metadata_v2_to_v3, data_type_metadata_v2_to_v3, -}; -use zarrs::metadata::v2::data_type_metadata_v2_to_endianness; -use zarrs::metadata::v3::MetadataV3; +use zarrs::convert::array_metadata_v2_to_v3; +use zarrs::plugin::ZarrVersion; use zarrs::storage::{ReadableWritableListableStorage, StorageHandle, StoreKey}; mod chunk_item; @@ -39,7 +36,6 @@ mod store; mod tests; mod utils; -use crate::chunk_item::ChunksItem; use crate::concurrency::ChunkConcurrentLimitAndCodecOptions; use crate::store::StoreConfig; use crate::utils::{PyCodecErrExt, PyErrExt as _, PyUntypedArrayExt as _}; @@ -54,58 +50,56 @@ pub struct CodecPipelineImpl { pub(crate) chunk_concurrent_minimum: usize, pub(crate) chunk_concurrent_maximum: usize, pub(crate) num_threads: usize, + pub(crate) fill_value: FillValue, + pub(crate) data_type: DataType, } impl CodecPipelineImpl { - fn retrieve_chunk_bytes<'a, I: ChunksItem>( + fn retrieve_chunk_bytes<'a>( &self, - item: &I, + item: &ChunkItem, codec_chain: &CodecChain, codec_options: &CodecOptions, ) -> PyResult> { - let value_encoded = self.store.get(item.key()).map_py_err::()?; + let value_encoded = self.store.get(&item.key).map_py_err::()?; let value_decoded = if let Some(value_encoded) = value_encoded { let value_encoded: Vec = value_encoded.into(); // zero-copy in this case codec_chain .decode( value_encoded.into(), - item.shape(), - item.data_type(), - item.fill_value(), + &item.shape, + &self.data_type, + &self.fill_value, codec_options, ) .map_codec_err()? } else { - ArrayBytes::new_fill_value( - item.data_type(), - item.shape().num_elements_u64(), - item.fill_value(), - ) - .map_py_err::()? + ArrayBytes::new_fill_value(&self.data_type, item.num_elements, &self.fill_value) + .map_py_err::()? }; Ok(value_decoded) } - fn store_chunk_bytes( + fn store_chunk_bytes( &self, - item: &I, + item: &ChunkItem, codec_chain: &CodecChain, value_decoded: ArrayBytes, codec_options: &CodecOptions, ) -> PyResult<()> { value_decoded - .validate(item.shape().num_elements_u64(), item.data_type()) + .validate(item.num_elements, &self.data_type) .map_codec_err()?; - if value_decoded.is_fill_value(item.fill_value()) { - self.store.erase(item.key()).map_py_err::() + if value_decoded.is_fill_value(&self.fill_value) { + self.store.erase(&item.key).map_py_err::() } else { let value_encoded = codec_chain .encode( value_decoded, - item.shape(), - item.data_type(), - item.fill_value(), + &item.shape, + &self.data_type, + &self.fill_value, codec_options, ) .map(Cow::into_owned) @@ -113,36 +107,34 @@ impl CodecPipelineImpl { // Store the encoded chunk self.store - .set(item.key(), value_encoded.into()) + .set(&item.key, value_encoded.into()) .map_py_err::() } } - fn store_chunk_subset_bytes( + fn store_chunk_subset_bytes( &self, - item: &I, + item: &ChunkItem, codec_chain: &CodecChain, chunk_subset_bytes: ArrayBytes, - chunk_subset: &ArraySubset, codec_options: &CodecOptions, ) -> PyResult<()> { - let array_shape = item.shape(); + let array_shape = &item.shape; + let chunk_subset = &item.chunk_subset; if !chunk_subset.inbounds_shape(bytemuck::must_cast_slice(array_shape)) { return Err(PyErr::new::(format!( "chunk subset ({chunk_subset}) is out of bounds for array shape ({array_shape:?})" ))); } - let data_type_size = item.data_type().size(); + let data_type_size = self.data_type.size(); - if chunk_subset.start().iter().all(|&o| o == 0) - && chunk_subset.shape() == bytemuck::must_cast_slice::<_, u64>(array_shape) - { + if is_whole_chunk(item) { // Fast path if the chunk subset spans the entire chunk, no read required self.store_chunk_bytes(item, codec_chain, chunk_subset_bytes, codec_options) } else { // Validate the chunk subset bytes chunk_subset_bytes - .validate(chunk_subset.num_elements(), item.data_type()) + .validate(chunk_subset.num_elements(), &self.data_type) .map_codec_err()?; // Retrieve the chunk @@ -214,28 +206,6 @@ impl CodecPipelineImpl { } } -fn array_metadata_to_codec_metadata_v3( - metadata: ArrayMetadata, -) -> Result, ArrayMetadataV2ToV3Error> { - match metadata { - ArrayMetadata::V3(metadata) => Ok(metadata.codecs), - ArrayMetadata::V2(metadata) => { - let endianness = data_type_metadata_v2_to_endianness(&metadata.dtype) - .map_err(ArrayMetadataV2ToV3Error::InvalidEndianness)?; - let data_type = data_type_metadata_v2_to_v3(&metadata.dtype)?; - - codec_metadata_v2_to_v3( - metadata.order, - metadata.shape.len(), - &data_type, - endianness, - &metadata.filters, - &metadata.compressor, - ) - } - } -} - #[gen_stub_pymethods] #[pymethods] impl CodecPipelineImpl { @@ -247,7 +217,7 @@ impl CodecPipelineImpl { chunk_concurrent_minimum=None, chunk_concurrent_maximum=None, num_threads=None, - direct_io=false + direct_io=false, ))] #[new] fn new( @@ -260,13 +230,15 @@ impl CodecPipelineImpl { direct_io: bool, ) -> PyResult { store_config.direct_io(direct_io); - let metadata: ArrayMetadata = - serde_json::from_str(array_metadata).map_py_err::()?; - let codec_metadata = - array_metadata_to_codec_metadata_v3(metadata).map_py_err::()?; + let metadata = serde_json::from_str(array_metadata).map_py_err::()?; + let metadata_v3 = match &metadata { + ArrayMetadata::V2(v2) => { + Cow::Owned(array_metadata_v2_to_v3(v2).map_py_err::()?) + } + ArrayMetadata::V3(v3) => Cow::Borrowed(v3), + }; let codec_chain = - Arc::new(CodecChain::from_metadata(&codec_metadata).map_py_err::()?); - + Arc::new(CodecChain::from_metadata(&metadata_v3.codecs).map_py_err::()?); let codec_options = CodecOptions::default().with_validate_checksums(validate_checksums); let chunk_concurrent_minimum = @@ -278,6 +250,24 @@ impl CodecPipelineImpl { let store: ReadableWritableListableStorage = (&store_config).try_into().map_py_err::()?; + let data_type = + DataType::from_metadata(&metadata_v3.data_type).map_py_err::()?; + let fill_value = data_type + .fill_value(&metadata_v3.fill_value, ZarrVersion::V3) + .or_else(|_| { + Err(match &metadata { + ArrayMetadata::V2(metadata) => format!( + "incompatible fill value metadata: dtype={}, fill_value={}", + metadata.dtype, metadata.fill_value + ), + ArrayMetadata::V3(metadata) => format!( + "incompatible fill value metadata: data_type={}, fill_value={}", + metadata.data_type, metadata.fill_value + ), + }) + }) + .map_py_err::()?; + Ok(Self { store, codec_chain, @@ -285,13 +275,15 @@ impl CodecPipelineImpl { chunk_concurrent_minimum, chunk_concurrent_maximum, num_threads, + fill_value, + data_type, }) } fn retrieve_chunks_and_apply_index( &self, py: Python, - chunk_descriptions: Vec, // FIXME: Ref / iterable? + chunk_descriptions: Vec, // FIXME: Ref / iterable? value: &Bound<'_, PyUntypedArray>, ) -> PyResult<()> { // Get input array @@ -306,37 +298,32 @@ impl CodecPipelineImpl { }; // Assemble partial decoders ahead of time and in parallel - let partial_chunk_descriptions = chunk_descriptions + let partial_chunk_items = chunk_descriptions .iter() .filter(|item| !(is_whole_chunk(item))) - .unique_by(|item| item.key()) + .unique_by(|item| item.key.clone()) .collect::>(); let mut partial_decoder_cache: HashMap> = HashMap::new(); - if !partial_chunk_descriptions.is_empty() { - let key_decoder_pairs = iter_concurrent_limit!( - chunk_concurrent_limit, - partial_chunk_descriptions, - map, - |item| { + if !partial_chunk_items.is_empty() { + let key_decoder_pairs = + iter_concurrent_limit!(chunk_concurrent_limit, partial_chunk_items, map, |item| { let storage_handle = Arc::new(StorageHandle::new(self.store.clone())); - let input_handle = - StoragePartialDecoder::new(storage_handle, item.key().clone()); + let input_handle = StoragePartialDecoder::new(storage_handle, item.key.clone()); let partial_decoder = self .codec_chain .clone() .partial_decoder( Arc::new(input_handle), - item.shape(), - item.data_type(), - item.fill_value(), + &item.shape, + &self.data_type, + &self.fill_value, &codec_options, ) .map_codec_err()?; - Ok((item.key().clone(), partial_decoder)) - } - ) - .collect::>>()?; + Ok((item.key.clone(), partial_decoder)) + }) + .collect::>>()?; partial_decoder_cache.extend(key_decoder_pairs); } @@ -344,12 +331,7 @@ impl CodecPipelineImpl { // FIXME: the `decode_into` methods only support fixed length data types. // For variable length data types, need a codepath with non `_into` methods. // Collect all the subsets and copy into value on the Python side? - let update_chunk_subset = |item: chunk_item::WithSubset| { - let chunk_item::WithSubset { - item, - subset, - chunk_subset, - } = item; + let update_chunk_subset = |item: ChunkItem| { let mut output_view = unsafe { // TODO: Is the following correct? // can we guarantee that when this function is called from Python with arbitrary arguments? @@ -357,52 +339,42 @@ impl CodecPipelineImpl { ArrayBytesFixedDisjointView::new( output, // TODO: why is data_type in `item`, it should be derived from `output`, no? - item.data_type() + self.data_type .fixed_size() .ok_or("variable length data type not supported") .map_py_err::()?, &output_shape, - subset, + item.subset.clone(), ) .map_py_err::()? }; - + let target = ArrayBytesDecodeIntoTarget::Fixed(&mut output_view); // See zarrs::array::Array::retrieve_chunk_subset_into - if chunk_subset.start().iter().all(|&o| o == 0) - && chunk_subset.shape() == bytemuck::must_cast_slice::<_, u64>(item.shape()) - { + if is_whole_chunk(&item) { // See zarrs::array::Array::retrieve_chunk_into if let Some(chunk_encoded) = - self.store.get(item.key()).map_py_err::()? + self.store.get(&item.key).map_py_err::()? { // Decode the encoded data into the output buffer let chunk_encoded: Vec = chunk_encoded.into(); self.codec_chain.decode_into( Cow::Owned(chunk_encoded), - item.shape(), - item.data_type(), - item.fill_value(), - ArrayBytesDecodeIntoTarget::Fixed(&mut output_view), + &item.shape, + &self.data_type, + &self.fill_value, + target, &codec_options, ) } else { // The chunk is missing, write the fill value - copy_fill_value_into( - item.data_type(), - item.fill_value(), - ArrayBytesDecodeIntoTarget::Fixed(&mut output_view), - ) + copy_fill_value_into(&self.data_type, &self.fill_value, target) } } else { - let key = item.key(); + let key = &item.key; let partial_decoder = partial_decoder_cache.get(key).ok_or_else(|| { PyRuntimeError::new_err(format!("Partial decoder not found for key: {key}")) })?; - partial_decoder.partial_decode_into( - &chunk_subset, - ArrayBytesDecodeIntoTarget::Fixed(&mut output_view), - &codec_options, - ) + partial_decoder.partial_decode_into(&item.chunk_subset, target, &codec_options) } .map_codec_err() }; @@ -421,7 +393,7 @@ impl CodecPipelineImpl { fn store_chunks_with_indices( &self, py: Python, - chunk_descriptions: Vec, + chunk_descriptions: Vec, value: &Bound<'_, PyUntypedArray>, write_empty_chunks: bool, ) -> PyResult<()> { @@ -449,22 +421,21 @@ impl CodecPipelineImpl { codec_options.set_store_empty_chunks(write_empty_chunks); py.detach(move || { - let store_chunk = |item: chunk_item::WithSubset| match &input { + let store_chunk = |item: ChunkItem| match &input { InputValue::Array(input) => { let chunk_subset_bytes = input - .extract_array_subset(&item.subset, &input_shape, item.item.data_type()) + .extract_array_subset(&item.subset, &input_shape, &self.data_type) .map_codec_err()?; self.store_chunk_subset_bytes( &item, &self.codec_chain, chunk_subset_bytes, - &item.chunk_subset, &codec_options, ) } InputValue::Constant(constant_value) => { let chunk_subset_bytes = ArrayBytes::new_fill_value( - item.data_type(), + &self.data_type, item.chunk_subset.num_elements(), constant_value, ) @@ -474,7 +445,6 @@ impl CodecPipelineImpl { &item, &self.codec_chain, chunk_subset_bytes, - &item.chunk_subset, &codec_options, ) } @@ -497,8 +467,7 @@ impl CodecPipelineImpl { fn _internal(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add("__version__", env!("CARGO_PKG_VERSION"))?; m.add_class::()?; - m.add_class::()?; - m.add_class::()?; + m.add_class::()?; Ok(()) } diff --git a/src/utils.rs b/src/utils.rs index 6674cbe..d663b5c 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -4,7 +4,7 @@ use numpy::{PyUntypedArray, PyUntypedArrayMethods}; use pyo3::{Bound, PyErr, PyResult, PyTypeInfo}; use zarrs::array::CodecError; -use crate::{ChunksItem, WithSubset}; +use crate::ChunkItem; pub(crate) trait PyErrExt { fn map_py_err(self) -> PyResult; @@ -55,7 +55,7 @@ impl PyUntypedArrayExt for Bound<'_, PyUntypedArray> { } } -pub fn is_whole_chunk(item: &WithSubset) -> bool { +pub fn is_whole_chunk(item: &ChunkItem) -> bool { item.chunk_subset.start().iter().all(|&o| o == 0) - && item.chunk_subset.shape() == bytemuck::must_cast_slice::<_, u64>(item.shape()) + && item.chunk_subset.shape() == bytemuck::must_cast_slice::<_, u64>(&item.shape) } diff --git a/tests/test_v2.py b/tests/test_v2.py index 49fbe60..f2b5c4b 100644 --- a/tests/test_v2.py +++ b/tests/test_v2.py @@ -296,12 +296,8 @@ def test_parse_structured_fill_value_valid( @pytest.mark.filterwarnings( - # TODO: Permit this in zarrs? - "ignore:Array is unsupported by ZarrsCodecPipeline. unsupported Zarr V2 array. unsupported fill value Null for data type bytes:UserWarning" -) -@pytest.mark.filterwarnings( - # TODO: Fix handling of string fill values for Zarr v2 bytes data - "ignore:Array is unsupported by ZarrsCodecPipeline. incompatible fill value .eAAAAAAAAA==. for data type bytes:UserWarning" + # TODO: Fix handling of Null fill values for Zarr v2 bytes data: this warning is raised by no_fill + "ignore:Array is unsupported by ZarrsCodecPipeline. incompatible fill value metadata:UserWarning" ) @pytest.mark.parametrize("fill_value", [None, b"x"], ids=["no_fill", "fill"]) def test_other_dtype_roundtrip(fill_value, tmp_path) -> None: