Skip to content

BUG: Data corruption when writing partial shards with dask (concurrent writes to same shard) #3682

@kkollsga

Description

@kkollsga

Summary

When using dask to write data to a sharded zarr array where dask chunks are smaller than shard size, data corruption occurs. This happens because multiple dask tasks concurrently write to different inner chunks within the same shard, causing overwrites.

This issue was originally discovered via xarray: pydata/xarray#10831

Reproduction

import tempfile
import numpy as np
import dask.array as da
import zarr

# Create test data
np_data = np.arange(100 * 100 * 100, dtype=np.float64).reshape(100, 100, 100)
original_sum = np_data.sum()

# Create dask array with chunks SMALLER than zarr shards
dask_data = da.from_array(np_data, chunks=(25, 25, 25))

# Create zarr array with sharding
store_path = tempfile.mkdtemp() + "/test.zarr"
arr = zarr.create_array(
    store=store_path,
    shape=(100, 100, 100),
    dtype=np.float64,
    chunks=(25, 25, 25),   # Inner chunk size
    shards=(50, 50, 50),   # Shard size (2x inner chunks per dimension)
)

# Write using dask - this causes corruption
da.store(dask_data, arr, lock=False)

# Verify data
arr_read = zarr.open_array(store_path, mode='r')
read_sum = arr_read[:].sum()

print(f"Original sum: {original_sum}")
print(f"Read sum: {read_sum}")
print(f"Ratio: {read_sum / original_sum}")
# Output shows ~30% of data retained, ~70% corrupted

Test Results

Scenario Dask Chunks Shard Size Data Retained
Mismatched chunks (25, 25, 25) (50, 50, 50) ~30%
xarray scenario (255, 255, 255) (510, 255, 255) ~37.5%
Parallel writes (25, 25, 25) (50, 50, 50) ~50%

Expected Behavior

One of the following:

  1. Data should be written correctly regardless of dask chunk alignment with shards
  2. Or zarr should raise an error/warning when detecting that write regions don't align with shard boundaries
  3. Or documentation should clearly state that dask chunks must align with shard boundaries

Actual Behavior

Data is silently corrupted. When multiple dask tasks write to different inner chunks within the same shard concurrently, they appear to overwrite each other's changes to the shard.

Analysis

The corruption pattern from the xarray scenario test shows which regions are affected:

Number of corrupted elements: 66325500 / 132651000 (50%)
Corrupted regions:
  [0:255, 0:255, 255:510]
  [255:510, 0:255, 0:255]
  [255:510, 255:510, 0:255]
  [255:510, 255:510, 255:510]

This suggests a race condition where:

  1. Task A reads shard, modifies inner chunk A, writes shard
  2. Task B reads shard (before A writes), modifies inner chunk B, writes shard
  3. Task B's write overwrites Task A's changes

Environment

  • Zarr version: 3.1.6.dev30+g9c47b6d44 (current main branch)
  • Dask version: 2026.1.2
  • Python: 3.12
  • OS: macOS

Related Issues

Possible Solutions

  1. Implement shard-level locking: Coordinate writes to the same shard to prevent race conditions
  2. Read-modify-write atomicity: Ensure partial shard updates are atomic
  3. Validation/warning: Detect misaligned writes and warn users or raise an error
  4. Documentation: Clearly document that dask chunks should align with shard boundaries when using sharding

Recommendation

We recommend Option 3 (Validation/warning) as an immediate fix for the following reasons:

  1. Prevents silent data corruption - The worst outcome is users losing data without knowing. An error makes the problem visible immediately.

  2. Quick to implement - Can be shipped fast as a stopgap while a proper fix is developed.

  3. Guides correct usage - If you're using sharding, you should align write chunks with shards anyway for performance. Writing partial shards requires read-modify-write cycles, which defeats the purpose of sharding.

  4. Low risk - Doesn't change internal behavior, just adds a guard rail.

Why NOT the others as first choice?

Solution Concern
Shard-level locking Hard to implement correctly in distributed settings (S3, cloud storage). Locks don't work well across processes/machines. Performance impact.
Atomic read-modify-write Complex to implement, requires transactional semantics or copy-on-write. Good long-term but not a quick fix.
Documentation only Users don't always read docs. Silent corruption is unacceptable.

Suggested error message

ValueError: "Write region (0:25, 0:25, 0:25) does not align with shard boundaries (50, 50, 50). 
Concurrent writes to partial shards may cause data corruption. 
Either align your chunks with shards, or use lock=True."

Ideal long-term solution

A combination approach:

  1. Warn when writes don't align with shard boundaries (always)
  2. Still handle it correctly via atomic partial updates for cases where misalignment is unavoidable

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions