Skip to content

Commit fc8c057

Browse files
committed
Merge branch 'ld/zarrs_0.23.0' into ig/refactor_chunk_handling
2 parents faf922b + 3cbe4be commit fc8c057

File tree

6 files changed

+79
-75
lines changed

6 files changed

+79
-75
lines changed

Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,11 @@ numpy = "0.27.0"
1919
unsafe_cell_slice = "0.2.0"
2020
serde_json = "1.0.128"
2121
pyo3-stub-gen = "0.17.1"
22-
opendal = { version = "0.54.0", features = ["services-http"] }
22+
opendal = { version = "0.55.0", features = ["services-http"] }
2323
tokio = { version = "1.41.1", features = ["rt-multi-thread"] }
24-
zarrs_opendal = "0.9.0"
24+
zarrs_opendal = "0.10.0"
2525
itertools = "0.14.0"
26+
bytemuck = { version = "1.24.0", features = ["must_cast"] }
2627

2728
[profile.release]
2829
lto = true

src/chunk_item.rs

Lines changed: 37 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,10 @@ pub fn fill_value_to_bytes(dtype: &str, fill_value: &Bound<'_, PyAny>) -> PyResu
4141
#[gen_stub_pyclass]
4242
#[pyclass]
4343
pub(crate) struct WithSubset {
44-
pub key: StoreKey,
44+
key: StoreKey,
4545
pub chunk_subset: ArraySubset,
4646
pub subset: ArraySubset,
47-
pub chunk_shape_u64: Vec<u64>,
48-
pub chunk_shape: Vec<NonZeroU64>,
47+
shape: Vec<NonZeroU64>,
4948
pub num_elements: u64,
5049
}
5150

@@ -61,8 +60,29 @@ impl WithSubset {
6160
subset: Vec<Bound<'_, PySlice>>,
6261
shape: Vec<u64>,
6362
) -> PyResult<Self> {
64-
let chunk_subset = selection_to_array_subset(&chunk_subset, &chunk_shape)?;
65-
let subset = selection_to_array_subset(&subset, &shape)?;
63+
let num_elements = chunk_shape.iter().product();
64+
let shape_nonzero_u64: Vec<NonZeroU64> = shape
65+
.into_iter()
66+
.map(|dim| {
67+
NonZeroU64::new(dim).ok_or_else(|| {
68+
PyErr::new::<PyValueError, _>(
69+
"subset dimensions must be greater than zero".to_string(),
70+
)
71+
})
72+
})
73+
.collect::<PyResult<Vec<NonZeroU64>>>()?;
74+
let chunk_shape_nonzero_u64: Vec<NonZeroU64> = chunk_shape
75+
.into_iter()
76+
.map(|dim| {
77+
NonZeroU64::new(dim).ok_or_else(|| {
78+
PyErr::new::<PyValueError, _>(
79+
"subset dimensions must be greater than zero".to_string(),
80+
)
81+
})
82+
})
83+
.collect::<PyResult<Vec<NonZeroU64>>>()?;
84+
let chunk_subset = selection_to_array_subset(&chunk_subset, &chunk_shape_nonzero_u64)?;
85+
let subset = selection_to_array_subset(&subset, &shape_nonzero_u64)?;
6686
// Check that subset and chunk_subset have the same number of elements.
6787
// This permits broadcasting of a constant input.
6888
if subset.num_elements() != chunk_subset.num_elements() && subset.num_elements() > 1 {
@@ -75,15 +95,19 @@ impl WithSubset {
7595
key: StoreKey::new(key).map_py_err::<PyValueError>()?,
7696
chunk_subset,
7797
subset,
78-
chunk_shape: chunk_shape
79-
.iter()
80-
.map(|v| NonZeroU64::new(*v).unwrap())
81-
.collect(), // TODO: Unwrap
82-
num_elements: chunk_shape.iter().product(),
83-
chunk_shape_u64: chunk_shape,
98+
shape: chunk_shape_nonzero_u64,
99+
num_elements,
84100
})
85101
}
86102
}
103+
impl WithSubset {
104+
pub fn key(&self) -> &StoreKey {
105+
&self.key
106+
}
107+
pub fn shape(&self) -> &[NonZeroU64] {
108+
&self.shape
109+
}
110+
}
87111

88112
fn slice_to_range(slice: &Bound<'_, PySlice>, length: isize) -> PyResult<std::ops::Range<u64>> {
89113
let indices = slice.indices(length)?;
@@ -106,15 +130,15 @@ fn slice_to_range(slice: &Bound<'_, PySlice>, length: isize) -> PyResult<std::op
106130

107131
fn selection_to_array_subset(
108132
selection: &[Bound<'_, PySlice>],
109-
shape: &[u64],
133+
shape: &[NonZeroU64],
110134
) -> PyResult<ArraySubset> {
111135
if selection.is_empty() {
112136
Ok(ArraySubset::new_with_shape(vec![1; shape.len()]))
113137
} else {
114138
let chunk_ranges = selection
115139
.iter()
116140
.zip(shape)
117-
.map(|(selection, &shape)| slice_to_range(selection, isize::try_from(shape)?))
141+
.map(|(selection, &shape)| slice_to_range(selection, isize::try_from(shape.get())?))
118142
.collect::<PyResult<Vec<_>>>()?;
119143
Ok(ArraySubset::new_with_ranges(&chunk_ranges))
120144
}

src/concurrency.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use pyo3::PyResult;
22
use zarrs::array::{
3-
ArrayCodecTraits, RecommendedConcurrency, codec::CodecOptions,
3+
RecommendedConcurrency, codec::ArrayCodecTraits, codec::CodecOptions,
44
concurrency::calc_concurrency_outer_inner,
55
};
66

@@ -25,7 +25,7 @@ impl ChunkConcurrentLimitAndCodecOptions for Vec<WithSubset> {
2525

2626
let codec_concurrency = codec_pipeline_impl
2727
.codec_chain
28-
.recommended_concurrency(&item.chunk_shape, &codec_pipeline_impl.data_type)
28+
.recommended_concurrency(item.shape(), &codec_pipeline_impl.data_type)
2929
.map_codec_err()?;
3030

3131
let min_concurrent_chunks =
@@ -37,8 +37,9 @@ impl ChunkConcurrentLimitAndCodecOptions for Vec<WithSubset> {
3737
&RecommendedConcurrency::new(min_concurrent_chunks..max_concurrent_chunks),
3838
&codec_concurrency,
3939
);
40-
let mut codec_options = codec_pipeline_impl.codec_options.clone();
41-
codec_options.set_concurrent_target(codec_concurrent_limit);
40+
let codec_options = codec_pipeline_impl
41+
.codec_options
42+
.with_concurrent_target(codec_concurrent_limit);
4243
Ok(Some((chunk_concurrent_limit, codec_options)))
4344
}
4445
}

src/lib.rs

Lines changed: 31 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@ use rayon::iter::{IntoParallelIterator, ParallelIterator};
1818
use rayon_iter_concurrent_limit::iter_concurrent_limit;
1919
use unsafe_cell_slice::UnsafeCellSlice;
2020
use utils::is_whole_chunk;
21+
use zarrs::array::codec::ArrayBytesDecodeIntoTarget;
2122
use zarrs::array::codec::{
22-
ArrayBytesDecodeIntoTarget, ArrayPartialDecoderTraits, ArrayToBytesCodecTraits, CodecOptions,
23-
StoragePartialDecoder,
23+
ArrayPartialDecoderTraits, ArrayToBytesCodecTraits, CodecOptions, StoragePartialDecoder,
2424
};
2525
use zarrs::array::{
2626
ArrayBytes, ArrayBytesFixedDisjointView, ArrayMetadata, CodecChain, DataType, DataTypeExt,
@@ -70,13 +70,13 @@ impl CodecPipelineImpl {
7070
codec_chain: &CodecChain,
7171
codec_options: &CodecOptions,
7272
) -> PyResult<ArrayBytes<'a>> {
73-
let value_encoded = self.store.get(&item.key).map_py_err::<PyRuntimeError>()?;
73+
let value_encoded = self.store.get(item.key()).map_py_err::<PyRuntimeError>()?;
7474
let value_decoded = if let Some(value_encoded) = value_encoded {
7575
let value_encoded: Vec<u8> = value_encoded.into(); // zero-copy in this case
7676
codec_chain
7777
.decode(
7878
value_encoded.into(),
79-
&item.chunk_shape,
79+
item.shape(),
8080
&self.data_type,
8181
&self.fill_value,
8282
codec_options,
@@ -101,12 +101,12 @@ impl CodecPipelineImpl {
101101
.map_codec_err()?;
102102

103103
if value_decoded.is_fill_value(&self.fill_value) {
104-
self.store.erase(&item.key).map_py_err::<PyRuntimeError>()
104+
self.store.erase(item.key()).map_py_err::<PyRuntimeError>()
105105
} else {
106106
let value_encoded = codec_chain
107107
.encode(
108108
value_decoded,
109-
&item.chunk_shape,
109+
item.shape(),
110110
&self.data_type,
111111
&self.fill_value,
112112
codec_options,
@@ -116,7 +116,7 @@ impl CodecPipelineImpl {
116116

117117
// Store the encoded chunk
118118
self.store
119-
.set(&item.key, value_encoded.into())
119+
.set(item.key(), value_encoded.into())
120120
.map_py_err::<PyRuntimeError>()
121121
}
122122
}
@@ -129,20 +129,17 @@ impl CodecPipelineImpl {
129129
chunk_subset: &ArraySubset,
130130
codec_options: &CodecOptions,
131131
) -> PyResult<()> {
132-
let chunk_shape = item
133-
.chunk_shape
134-
.clone()
135-
.into_iter()
136-
.map(|v| u64::from(v))
137-
.collect::<Vec<u64>>();
138-
if !chunk_subset.inbounds_shape(&chunk_shape) {
132+
let array_shape = item.shape();
133+
if !chunk_subset.inbounds_shape(bytemuck::must_cast_slice(array_shape)) {
139134
return Err(PyErr::new::<PyValueError, _>(format!(
140-
"chunk subset ({chunk_subset}) is out of bounds for array shape ({chunk_shape:?})"
135+
"chunk subset ({chunk_subset}) is out of bounds for array shape ({array_shape:?})"
141136
)));
142137
}
143138
let data_type_size = self.data_type.size();
144139

145-
if chunk_subset.start().iter().all(|&o| o == 0) && chunk_subset.shape() == chunk_shape {
140+
if chunk_subset.start().iter().all(|&o| o == 0)
141+
&& chunk_subset.shape() == bytemuck::must_cast_slice::<_, u64>(array_shape)
142+
{
146143
// Fast path if the chunk subset spans the entire chunk, no read required
147144
self.store_chunk_bytes(item, codec_chain, chunk_subset_bytes, codec_options)
148145
} else {
@@ -157,7 +154,7 @@ impl CodecPipelineImpl {
157154
// Update the chunk
158155
let chunk_bytes_new = update_array_bytes(
159156
chunk_bytes_old,
160-
&chunk_shape,
157+
bytemuck::must_cast_slice(array_shape),
161158
chunk_subset,
162159
&chunk_subset_bytes,
163160
data_type_size,
@@ -251,12 +248,10 @@ impl CodecPipelineImpl {
251248
let codec_metadata = metadata.codecs;
252249
let codec_chain =
253250
Arc::new(CodecChain::from_metadata(&codec_metadata).map_py_err::<PyTypeError>()?);
254-
let mut codec_options = CodecOptions::default();
251+
let codec_options = CodecOptions::default().with_validate_checksums(validate_checksums);
255252

256-
codec_options.set_validate_checksums(validate_checksums);
257-
258-
let chunk_concurrent_minimum = chunk_concurrent_minimum
259-
.unwrap_or(zarrs::config::global_config().chunk_concurrent_minimum());
253+
let chunk_concurrent_minimum =
254+
chunk_concurrent_minimum.unwrap_or(global_config().chunk_concurrent_minimum());
260255
let chunk_concurrent_maximum =
261256
chunk_concurrent_maximum.unwrap_or(rayon::current_num_threads());
262257
let num_threads = num_threads.unwrap_or(rayon::current_num_threads());
@@ -302,7 +297,7 @@ impl CodecPipelineImpl {
302297
let partial_chunk_descriptions_with_representations = chunk_descriptions
303298
.iter()
304299
.filter(|item| !(is_whole_chunk(item)))
305-
.unique_by(|item| item.key.clone())
300+
.unique_by(|item| item.key().clone())
306301
.collect::<Vec<_>>();
307302
let mut partial_decoder_cache: HashMap<StoreKey, Arc<dyn ArrayPartialDecoderTraits>> =
308303
HashMap::new();
@@ -313,19 +308,20 @@ impl CodecPipelineImpl {
313308
map,
314309
|item| {
315310
let storage_handle = Arc::new(StorageHandle::new(self.store.clone()));
316-
let input_handle = StoragePartialDecoder::new(storage_handle, item.key.clone());
311+
let input_handle =
312+
StoragePartialDecoder::new(storage_handle, item.key().clone());
317313
let partial_decoder = self
318314
.codec_chain
319315
.clone()
320316
.partial_decoder(
321317
Arc::new(input_handle),
322-
&item.chunk_shape,
318+
item.shape(),
323319
&self.data_type,
324320
&self.fill_value,
325321
&codec_options,
326322
)
327323
.map_codec_err()?;
328-
Ok((item.key.clone(), partial_decoder))
324+
Ok((item.key().clone(), partial_decoder))
329325
}
330326
)
331327
.collect::<PyResult<Vec<_>>>()?;
@@ -336,14 +332,8 @@ impl CodecPipelineImpl {
336332
// FIXME: the `decode_into` methods only support fixed length data types.
337333
// For variable length data types, need a codepath with non `_into` methods.
338334
// Collect all the subsets and copy into value on the Python side?
339-
let update_chunk_subset = |item| {
340-
let chunk_item::WithSubset {
341-
key,
342-
subset,
343-
chunk_subset,
344-
chunk_shape_u64,
345-
..
346-
} = item;
335+
let update_chunk_subset = |item: WithSubset| {
336+
let shape = item.shape();
347337
let mut output_view = unsafe {
348338
// TODO: Is the following correct?
349339
// can we guarantee that when this function is called from Python with arbitrary arguments?
@@ -356,24 +346,24 @@ impl CodecPipelineImpl {
356346
.ok_or("variable length data type not supported")
357347
.map_py_err::<PyTypeError>()?,
358348
&output_shape,
359-
subset,
349+
item.subset.clone(),
360350
)
361351
.map_py_err::<PyRuntimeError>()?
362352
};
363353
let target = ArrayBytesDecodeIntoTarget::Fixed(&mut output_view);
364354
// See zarrs::array::Array::retrieve_chunk_subset_into
365-
if chunk_subset.start().iter().all(|&o| o == 0)
366-
&& chunk_subset.shape() == chunk_shape_u64
355+
if item.chunk_subset.start().iter().all(|&o| o == 0)
356+
&& item.chunk_subset.shape() == bytemuck::must_cast_slice::<_, u64>(shape)
367357
{
368358
// See zarrs::array::Array::retrieve_chunk_into
369359
if let Some(chunk_encoded) =
370-
self.store.get(&key).map_py_err::<PyRuntimeError>()?
360+
self.store.get(item.key()).map_py_err::<PyRuntimeError>()?
371361
{
372362
// Decode the encoded data into the output buffer
373363
let chunk_encoded: Vec<u8> = chunk_encoded.into();
374364
self.codec_chain.decode_into(
375365
Cow::Owned(chunk_encoded),
376-
&item.chunk_shape,
366+
item.shape(),
377367
&self.data_type,
378368
&self.fill_value,
379369
target,
@@ -384,11 +374,11 @@ impl CodecPipelineImpl {
384374
copy_fill_value_into(&self.data_type, &self.fill_value, target)
385375
}
386376
} else {
387-
let key = &key;
377+
let key = item.key();
388378
let partial_decoder = partial_decoder_cache.get(key).ok_or_else(|| {
389379
PyRuntimeError::new_err(format!("Partial decoder not found for key: {key}"))
390380
})?;
391-
partial_decoder.partial_decode_into(&chunk_subset, target, &codec_options)
381+
partial_decoder.partial_decode_into(&item.chunk_subset, target, &codec_options)
392382
}
393383
.map_codec_err()
394384
};

src/utils.rs

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,5 @@ impl PyUntypedArrayExt for Bound<'_, PyUntypedArray> {
5757

5858
pub fn is_whole_chunk(item: &WithSubset) -> bool {
5959
item.chunk_subset.start().iter().all(|&o| o == 0)
60-
&& item.chunk_subset.shape()
61-
== item
62-
.chunk_shape
63-
.clone()
64-
.into_iter()
65-
.map(|v| u64::from(v))
66-
.collect::<Vec<u64>>() // TODO: Remove copy
60+
&& item.chunk_subset.shape() == bytemuck::must_cast_slice::<_, u64>(item.shape())
6761
}

tests/test_v2.py

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -60,12 +60,9 @@ def test_fill_single_value(store: Store) -> None:
6060
np.testing.assert_array_equal(result, expected)
6161

6262

63-
@pytest.mark.filterwarnings(
64-
"ignore:Array is unsupported by ZarrsCodecPipeline. data type |S1 is not supported:UserWarning"
65-
)
6663
@pytest.mark.filterwarnings(
6764
# TODO: Fix handling of string fill values for Zarr v2 bytes data
68-
"ignore:Array is unsupported by ZarrsCodecPipeline. incompatible fill value ..+. for data type bytes:UserWarning"
65+
"ignore:Array is unsupported by ZarrsCodecPipeline. unsupported data type .+:UserWarning"
6966
)
7067
@pytest.mark.parametrize(
7168
("dtype", "expected_dtype", "fill_value", "fill_value_json"),
@@ -111,10 +108,7 @@ async def test_v2_encode_decode(
111108

112109

113110
@pytest.mark.filterwarnings(
114-
"ignore:Array is unsupported by ZarrsCodecPipeline. data type |U1 is not supported:UserWarning"
115-
)
116-
@pytest.mark.filterwarnings(
117-
"ignore:Array is unsupported by ZarrsCodecPipeline. data type |S1 is not supported:UserWarning"
111+
"ignore:Array is unsupported by ZarrsCodecPipeline. unsupported data type .+:UserWarning"
118112
)
119113
@pytest.mark.parametrize(
120114
("dtype", "value"),

0 commit comments

Comments
 (0)