Skip to content

Xenium transcript dask DataFrame has non-unique index; transcript_id is the natural unique key #397

@LucaMarconato

Description

@LucaMarconato

Disclaimer: This document was written with Claude (AI assistant) and manually reviewed for correctness. The prose and analysis have been verified, but the code changes are only partially reviewed and would require a full review before implementation.


TL;DR

When xenium() loads transcripts with dask, each partition gets a local 0-based index → duplicate labels globally. This silently breaks transform() and any index-aligned operation. The raw parquet files have a transcript_id column that is the true unique key, but it is not set as the index. A metadata-based fix that reconstructs the correct global RangeIndex at ~1 ms cost has been prototyped but not merged, pending a decision on whether to use transcript_id as the index instead (which would be the right approach once element linking by named key is supported).


Background

This issue documents a known behaviour in the xenium reader and its interaction with dask's parquet loading, and sketches a fix that has been deferred pending a broader design decision around element linking.

The problem

When xenium() calls dd.read_parquet() on transcripts.parquet, the resulting dask DataFrame has non-unique, per-partition 0-based indices. This happens regardless of whether transcripts.parquet is a single file (common in all currently tested datasets) or a partitioned directory (xenium v2+ multi-file format). The root cause is in dask's parquet reader:

  • Parquet stores each row group with its own RangeIndex(0, n) in the pandas metadata.
  • When dask splits a parquet file into multiple partitions (grouping row groups to fill its blocksize, default 256 MiB), each partition inherits the per-row-group 0-based index independently.
  • The result: a dask DataFrame where every partition has index [0..n_i], so the global index is non-unique.

This is invisible to a user loading with pandas (pq.read_table().to_pandas() concatenates row groups and produces a correct global RangeIndex), but affects all dask-based operations that rely on index alignment — most notably spatialdata.transform().

The issue surfaced in scverse/spatialdata#1105 where transform() raised ValueError: cannot reindex on an axis with duplicate labels. That was fixed on the spatialdata side (see below), but the root is here.

The natural unique key: transcript_id

The xenium parquet files include a transcript_id column (uint64) that is globally unique and monotonically increasing across row groups. This is the intended unique identifier for transcripts. It is not stored as the parquet index — it is a regular data column.

import pyarrow.parquet as pq
df = pq.read_table("transcripts.parquet").to_pandas()
assert df["transcript_id"].is_unique   # True
assert df.index.is_unique              # True (pandas loads the whole file at once)

import dask.dataframe as dd
ddf = dd.read_parquet("transcripts.parquet")
assert ddf.index.compute().is_unique   # False — each partition restarts at 0

Why not just use transcript_id as the index now

Setting transcript_id as the dask index would solve uniqueness but changes semantics: it becomes a heavy uint64 index instead of a simple RangeIndex, and it would need to be sorted (dask requires a monotonic index for known divisions). This is the right long-term direction — see the design note below — but warrants a deliberate decision rather than a quiet change.

Relationship to future element linking

The motivation for leaving this as-is: a planned improvement to spatialdata will allow linking elements not only by integer index but also by named keys (e.g. transcript_id). When that is implemented, setting transcript_id as the index in the xenium reader would be the natural companion change, making transcript-level joins with other elements (e.g. cell assignments) explicit and robust.

Deferred fix: global RangeIndex reindexing

In the interim, a metadata-based fix was developed and validated but deliberately not merged. It reconstructs the correct global RangeIndex (equivalent to what pandas gives for a full file read) using only pyarrow footer metadata — no data is read, cost is ~1 ms regardless of transcript count.

⚠️ The code below is Claude-assisted, has not been fully reviewed, and has not been tested beyond the datasets listed. It should be treated as a starting point, not production-ready code. A full review is required before implementation.

Tested datasets

All currently available datasets have a single-file transcripts.parquet (directory format was not observed in any tested dataset).

Dataset XOA version Transcripts Partitions Fix correct
xenium_3.0.0_io 3.0.0.15 74 M 26
xenium_2.0.0_io 2.0.0.6 ~5 M 4
xenium_hlungcancer 2.0.0.6 12 M 4
Xenium_V1_human_Breast_2fov 2.0.0.10 ~1 M 1 ✓ (no-op)
Xenium_Prime_MultiCellSeg_tiny 3.0.0.15 tiny 1 ✓ (no-op)

Performance

The helper reads only parquet footer metadata (a few KB per file), not the transcript data itself:

Dataset Added overhead
Any single-file dataset ~0.3–1.3 ms
Directory, 4 files ~0.1 ms
Directory, 50 files ~1.6 ms

Code changes

The fix adds one helper function and ~5 lines to _get_points() in src/spatialdata_io/readers/xenium.py:

+def _parquet_partition_row_counts(transcripts_path: Path) -> list[int]:
+    """Return the number of rows in each dask partition without reading any data.
+
+    dask splits parquet sources into partitions by grouping consecutive row groups to fill a target
+    block size (default 256 MiB, read from dask config).  For a partitioned directory each file
+    becomes one or more partitions; for a single file the row groups are grouped similarly.
+    Reading only parquet footer metadata keeps this O(num_row_groups) in I/O cost (~1 ms).
+    """
+    import dask.utils
+
+    blocksize = dask.utils.parse_bytes(dask.config.get("dataframe.parquet.read.blocksize", "256 MiB"))
+
+    if transcripts_path.is_dir():
+        parquet_files = sorted(f for f in transcripts_path.iterdir() if f.suffix == ".parquet")
+        sources: list[Path] = parquet_files
+    else:
+        sources = [transcripts_path]
+
+    partition_lengths: list[int] = []
+    for source in sources:
+        meta = pq.read_metadata(str(source))
+        cur_rows, cur_bytes = 0, 0
+        for i in range(meta.num_row_groups):
+            rg = meta.row_group(i)
+            if cur_bytes > 0 and cur_bytes + rg.total_byte_size > blocksize:
+                partition_lengths.append(cur_rows)
+                cur_rows, cur_bytes = 0, 0
+            cur_rows += rg.num_rows
+            cur_bytes += rg.total_byte_size
+        if cur_rows:
+            partition_lengths.append(cur_rows)
+
+    return partition_lengths
+
+
 def _get_points(path: Path, specs: dict[str, Any]) -> pa.Table:
     transcripts_path = path / XeniumKeys.TRANSCRIPTS_FILE
     table = read_parquet(transcripts_path)
 
-    # When transcripts.parquet is a partitioned directory (xenium v2+), dask assigns a per-file
-    # 0-based index to each partition, producing duplicate index labels globally.  We fix this
-    # eagerly using pyarrow file metadata (no data loading) so that the resulting PointsModel has a
-    # proper globally-unique RangeIndex.
-    if transcripts_path.is_dir():
-        parquet_files = sorted(f for f in transcripts_path.iterdir() if f.suffix == ".parquet")
-        row_counts = [pq.read_metadata(str(f)).num_rows for f in parquet_files]
+    # dask assigns a per-partition 0-based index when reading parquet, regardless of whether the
+    # source is a single file (split into partitions by row groups) or a partitioned directory (one
+    # file per partition).  Both cases produce duplicate index labels globally.  We fix this using
+    # only pyarrow footer metadata (no data read, O(num_row_groups) metadata bytes) so that the
+    # resulting PointsModel has a globally-unique RangeIndex.
+    row_counts = _parquet_partition_row_counts(transcripts_path)
+    if len(row_counts) > 1:
         offsets = np.cumsum([0] + row_counts)
 
         def _assign_global_index(
             partition: pd.DataFrame,
             partition_info: dict[str, Any] | None = None,
             _offsets: np.ndarray = offsets,
         ) -> pd.DataFrame:
             i = partition_info["number"]  # type: ignore[index]
             partition = partition.copy()
             partition.index = pd.RangeIndex(int(_offsets[i]), int(_offsets[i + 1]))
             return partition
 
         table = table.map_partitions(_assign_global_index, meta=table._meta)

Open questions before merging the fix

  1. Dask version stability. The fix replicates dask's internal greedy row-group grouping using dask.config.get("dataframe.parquet.read.blocksize"). This is correct for current dask but could drift if dask changes its default blocksize or grouping algorithm.
  2. Use transcript_id as the index instead? If the element-linking work proceeds soon, it may be better to skip the RangeIndex fix entirely and go directly to set_index("transcript_id").
  3. Directory-format datasets. No directory-format transcripts.parquet was available for testing. The directory branch of _parquet_partition_row_counts follows the same logic (one file = one partition) but should be verified on a real v2+ multi-file dataset.

Related

  • scverse/spatialdata#1105transform() crash caused by this non-unique index, fixed on the spatialdata side independently by making transform() robust to duplicate-index points DataFrames.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions