From 13b828f3d87529d67f3fa44ce7b56ebe24ac574d Mon Sep 17 00:00:00 2001 From: Rick Zamora Date: Mon, 15 May 2023 08:54:08 -0500 Subject: [PATCH 01/11] start experimenting with parquet statistics --- dask_expr/collection.py | 10 ++++- dask_expr/expr.py | 12 ++++- dask_expr/io/parquet.py | 99 +++++++++++++++++++++++++++++++++-------- 3 files changed, 98 insertions(+), 23 deletions(-) diff --git a/dask_expr/collection.py b/dask_expr/collection.py index e1ab97b27..093043fa0 100644 --- a/dask_expr/collection.py +++ b/dask_expr/collection.py @@ -73,6 +73,12 @@ def _meta(self): def size(self): return new_collection(self.expr.size) + def __len__(self): + _len = self.expr._len + if isinstance(_len, expr.Expr): + _len = new_collection(_len).compute() + return _len + def __reduce__(self): return new_collection, (self._expr,) @@ -546,7 +552,7 @@ def read_parquet( index=None, storage_options=None, dtype_backend=None, - calculate_divisions=False, + gather_statistics=True, ignore_metadata_file=False, metadata_task_size=None, split_row_groups="infer", @@ -571,7 +577,7 @@ def read_parquet( categories=categories, index=index, storage_options=storage_options, - calculate_divisions=calculate_divisions, + gather_statistics=gather_statistics, ignore_metadata_file=ignore_metadata_file, metadata_task_size=metadata_task_size, split_row_groups=split_row_groups, diff --git a/dask_expr/expr.py b/dask_expr/expr.py index 192aee52a..0b6169984 100644 --- a/dask_expr/expr.py +++ b/dask_expr/expr.py @@ -57,6 +57,11 @@ def ndim(self): except AttributeError: return 0 + @functools.cached_property + def _len(self): + # TODO: Use single column + return Len(self) + def __str__(self): s = ", ".join( str(param) + "=" + str(operand) @@ -724,7 +729,10 @@ class Elemwise(Blockwise): optimizations, like `len` will care about which operations preserve length """ - pass + @property + def _len(self): + # Length must be equal to parent + return self.frame._len class AsType(Elemwise): @@ -1318,4 +1326,4 @@ def _execute_task(graph, name, *deps): from dask_expr.io import BlockwiseIO -from dask_expr.reductions import Count, Max, Mean, Min, Mode, Size, Sum +from dask_expr.reductions import Count, Len, Max, Mean, Min, Mode, Size, Sum diff --git a/dask_expr/io/parquet.py b/dask_expr/io/parquet.py index c5fc0510b..6e74a3063 100644 --- a/dask_expr/io/parquet.py +++ b/dask_expr/io/parquet.py @@ -3,11 +3,13 @@ import operator from functools import cached_property +import pandas as pd from dask.dataframe.io.parquet.core import ( ParquetFunctionWrapper, + aggregate_row_groups, get_engine, - process_statistics, set_index_columns, + sorted_columns, ) from dask.dataframe.io.parquet.utils import _split_user_options from dask.utils import natural_sort_key @@ -27,6 +29,60 @@ def _list_columns(columns): return columns +def _align_statistics(parts, statistics): + # Make sure parts and statistics are aligned + # (if statistics is not empty) + if statistics and len(parts) != len(statistics): + statistics = [] + if statistics: + result = list( + zip( + *[ + (part, stats) + for part, stats in zip(parts, statistics) + if stats["num-rows"] > 0 + ] + ) + ) + parts, statistics = result or [[], []] + return parts, statistics + + +def _aggregate_row_groups(parts, statistics, dataset_info): + # Aggregate parts/statistics if we are splitting by row-group + blocksize = ( + dataset_info["blocksize"] if dataset_info["split_row_groups"] is True else None + ) + split_row_groups = dataset_info["split_row_groups"] + fs = dataset_info["fs"] + aggregation_depth = dataset_info["aggregation_depth"] + + if statistics: + if blocksize or (split_row_groups and int(split_row_groups) > 1): + parts, statistics = aggregate_row_groups( + parts, statistics, blocksize, split_row_groups, fs, aggregation_depth + ) + return parts, statistics + + +def _calculate_divisions(statistics, dataset_info, npartitions): + # Use statistics to define divisions + divisions = None + if statistics: + calculate_divisions = dataset_info["kwargs"].get("calculate_divisions", None) + index = dataset_info["index"] + process_columns = index if index and len(index) == 1 else None + if (calculate_divisions is not False) and process_columns: + for sorted_column_info in sorted_columns( + statistics, columns=process_columns + ): + if sorted_column_info["name"] in index: + divisions = sorted_column_info["divisions"] + break + + return divisions or (None,) * (npartitions + 1) + + class ReadParquet(PartitionsFiltered, BlockwiseIO): """Read a parquet dataset""" @@ -37,7 +93,7 @@ class ReadParquet(PartitionsFiltered, BlockwiseIO): "categories", "index", "storage_options", - "calculate_divisions", + "gather_statistics", "ignore_metadata_file", "metadata_task_size", "split_row_groups", @@ -55,7 +111,7 @@ class ReadParquet(PartitionsFiltered, BlockwiseIO): "categories": None, "index": None, "storage_options": None, - "calculate_divisions": False, + "gather_statistics": True, "ignore_metadata_file": False, "metadata_task_size": None, "split_row_groups": "infer", @@ -172,7 +228,7 @@ def _dataset_info(self): fs, self.categories, index, - self.calculate_divisions, + self.gather_statistics, self.filters, self.split_row_groups, blocksize, @@ -189,6 +245,7 @@ def _dataset_info(self): # Infer meta, accounting for index and columns arguments. meta = self.engine._create_dd_meta(dataset_info) + index = dataset_info["index"] index = [index] if isinstance(index, str) else index meta, index, columns = set_index_columns( meta, index, self.operand("columns"), auto_index_allowed @@ -216,21 +273,14 @@ def _plan(self): dataset_info ) - # Parse dataset statistics from metadata (if available) - parts, divisions, _ = process_statistics( - parts, - stats, - dataset_info["filters"], - dataset_info["index"], - ( - dataset_info["blocksize"] - if dataset_info["split_row_groups"] is True - else None - ), - dataset_info["split_row_groups"], - dataset_info["fs"], - dataset_info["aggregation_depth"], - ) + # Make sure parts and stats are aligned + parts, stats = _align_statistics(parts, stats) + + # Use statistics to aggregate partitions + parts, stats = _aggregate_row_groups(parts, stats, dataset_info) + + # Use statistics to calculate divisions + divisions = _calculate_divisions(stats, dataset_info, len(parts)) meta = dataset_info["meta"] if len(divisions) < 2: @@ -254,6 +304,7 @@ def _plan(self): return { "func": io_func, "parts": parts, + "statistics": stats, "divisions": divisions, } @@ -265,3 +316,13 @@ def _filtered_task(self, index: int): if self._series: return (operator.getitem, tsk, self.columns[0]) return tsk + + @property + def _statistics(self): + return self._plan["statistics"] + + @property + def _len(self): + if self._statistics and not self.filters: + return pd.DataFrame(self._statistics)["num-rows"].sum() + return super()._len From 990ba4cb78a8dc606a0c2b887ce7f5be6cf76314 Mon Sep 17 00:00:00 2001 From: Rick Zamora Date: Mon, 15 May 2023 19:50:37 -0500 Subject: [PATCH 02/11] adopt parts of #40 --- dask_expr/expr.py | 40 ++++++++++++++++++++++++++++++++++++---- dask_expr/io/io.py | 7 +++++++ dask_expr/io/parquet.py | 12 +++++------- 3 files changed, 48 insertions(+), 11 deletions(-) diff --git a/dask_expr/expr.py b/dask_expr/expr.py index 4ccf2ec70..3fbb42a20 100644 --- a/dask_expr/expr.py +++ b/dask_expr/expr.py @@ -59,6 +59,9 @@ def ndim(self): @functools.cached_property def _len(self): + stats = self.statistics() + if "row_count" in stats: + return sum(stats["row_count"]) # TODO: Use single column return Len(self) @@ -211,6 +214,30 @@ def _layer(self) -> dict: return {(self._name, i): self._task(i) for i in range(self.npartitions)} + def _statistics(self): + return {} + + def statistics(self) -> dict: + """Known quantities of an expression, like length or min/max + + To define this on a class create a `._statistics` method that returns a + dictionary of new statistics known by that class. If nothing is known it + is ok to return None. Superclasses will also be consulted. + + Examples + -------- + >>> df.statistics() + {"length": 1000000} + """ + out = {} + for typ in type(self).mro()[::-1]: + if not issubclass(typ, Expr): + continue + d = typ._statistics(self) # TODO: maybe this should be cached + if d: + out.update(d) # TODO: this is fragile + return out + def simplify(self): """Simplify expression @@ -738,10 +765,10 @@ class Elemwise(Blockwise): optimizations, like `len` will care about which operations preserve length """ - @property - def _len(self): - # Length must be equal to parent - return self.frame._len + def _statistics(self): + for dep in self.dependencies(): + if "row_count" in dep.statistics(): + return {"row_count": dep.statistics()["row_count"]} class AsType(Elemwise): @@ -1052,6 +1079,11 @@ def _simplify_down(self): def _node_label_args(self): return [self.frame, self.partitions] + def _statistics(self): + if "row_count" in self.frame.statistics(): + row_counts = self.frame.statistics()["row_count"] + return {"row_count": tuple(row_counts[p] for p in self.partitions)} + class PartitionsFiltered(Expr): """Mixin class for partition filtering diff --git a/dask_expr/io/io.py b/dask_expr/io/io.py index 18927d8d8..40941002e 100644 --- a/dask_expr/io/io.py +++ b/dask_expr/io/io.py @@ -68,6 +68,13 @@ def _divisions_and_locations(self): divisions = (None,) * len(locations) return divisions, locations + def _statistics(self): + locations = self._locations() + row_counts = tuple( + offset - locations[i] for i, offset in enumerate(locations[1:]) + ) + return {"row_count": row_counts} + def _divisions(self): return self._divisions_and_locations[0] diff --git a/dask_expr/io/parquet.py b/dask_expr/io/parquet.py index 3c76c689d..f617b1c99 100644 --- a/dask_expr/io/parquet.py +++ b/dask_expr/io/parquet.py @@ -4,7 +4,6 @@ import operator from functools import cached_property -import pandas as pd from dask.dataframe.io.parquet.core import ( ParquetFunctionWrapper, aggregate_row_groups, @@ -297,15 +296,14 @@ def _filtered_task(self, index: int): return (operator.getitem, tsk, self.columns[0]) return tsk - @property def _statistics(self): - return self._plan["statistics"] + if self._pq_statistics and not self.filters: + row_count = tuple(stat["num-rows"] for stat in self._pq_statistics) + return {"row_count": row_count} @property - def _len(self): - if self._statistics and not self.filters: - return pd.DataFrame(self._statistics)["num-rows"].sum() - return super()._len + def _pq_statistics(self): + return self._plan["statistics"] # From 1c62f4c5e89f05a4ebbe79eb46f9b2731333ca94 Mon Sep 17 00:00:00 2001 From: Rick Zamora Date: Tue, 16 May 2023 14:18:22 -0500 Subject: [PATCH 03/11] experimenting with dedicated Metadata class structure --- dask_expr/expr.py | 49 ++++++++++++++++++++++------------------- dask_expr/io/io.py | 5 +++-- dask_expr/io/parquet.py | 5 +++-- 3 files changed, 32 insertions(+), 27 deletions(-) diff --git a/dask_expr/expr.py b/dask_expr/expr.py index 3fbb42a20..55e7495fa 100644 --- a/dask_expr/expr.py +++ b/dask_expr/expr.py @@ -59,10 +59,9 @@ def ndim(self): @functools.cached_property def _len(self): - stats = self.statistics() - if "row_count" in stats: - return sum(stats["row_count"]) - # TODO: Use single column + metadata = self.metadata() + if "row_count" in metadata: + return metadata["row_count"].sum() return Len(self) def __str__(self): @@ -214,26 +213,38 @@ def _layer(self) -> dict: return {(self._name, i): self._task(i) for i in range(self.npartitions)} - def _statistics(self): - return {} - - def statistics(self) -> dict: - """Known quantities of an expression, like length or min/max + def _metadata(self): + """New metadata to add for this expression""" + from dask_expr.metadata import Metadata - To define this on a class create a `._statistics` method that returns a - dictionary of new statistics known by that class. If nothing is known it + # Inherit metadata from dependencies + metadata = {} + for dep in self.dependencies(): + for k, v in dep.metadata().items(): + assert isinstance(v, Metadata) + if k not in metadata: + val = v.inherit(self) + if val: + metadata[k] = val + return metadata + + def metadata(self) -> dict: + """Known metadata of an expression, like partition statistics + + To define this on a class create a `._metadata` method that returns a + dictionary of new metadata known by that class. If nothing is known it is ok to return None. Superclasses will also be consulted. Examples -------- - >>> df.statistics() - {"length": 1000000} + >>> df.metadata() + {'row_count': RowCountMetadata(data=(1000000,))} """ out = {} for typ in type(self).mro()[::-1]: if not issubclass(typ, Expr): continue - d = typ._statistics(self) # TODO: maybe this should be cached + d = typ._metadata(self) or {} if d: out.update(d) # TODO: this is fragile return out @@ -765,10 +776,7 @@ class Elemwise(Blockwise): optimizations, like `len` will care about which operations preserve length """ - def _statistics(self): - for dep in self.dependencies(): - if "row_count" in dep.statistics(): - return {"row_count": dep.statistics()["row_count"]} + pass class AsType(Elemwise): @@ -1079,11 +1087,6 @@ def _simplify_down(self): def _node_label_args(self): return [self.frame, self.partitions] - def _statistics(self): - if "row_count" in self.frame.statistics(): - row_counts = self.frame.statistics()["row_count"] - return {"row_count": tuple(row_counts[p] for p in self.partitions)} - class PartitionsFiltered(Expr): """Mixin class for partition filtering diff --git a/dask_expr/io/io.py b/dask_expr/io/io.py index 40941002e..500c74f10 100644 --- a/dask_expr/io/io.py +++ b/dask_expr/io/io.py @@ -4,6 +4,7 @@ from dask.dataframe.io.io import sorted_division_locations from dask_expr.expr import Blockwise, Expr, PartitionsFiltered +from dask_expr.metadata import RowCountMetadata class IO(Expr): @@ -68,12 +69,12 @@ def _divisions_and_locations(self): divisions = (None,) * len(locations) return divisions, locations - def _statistics(self): + def _metadata(self): locations = self._locations() row_counts = tuple( offset - locations[i] for i, offset in enumerate(locations[1:]) ) - return {"row_count": row_counts} + return {"row_count": RowCountMetadata(row_counts)} def _divisions(self): return self._divisions_and_locations[0] diff --git a/dask_expr/io/parquet.py b/dask_expr/io/parquet.py index f617b1c99..cccfc9ff2 100644 --- a/dask_expr/io/parquet.py +++ b/dask_expr/io/parquet.py @@ -16,6 +16,7 @@ from dask_expr.expr import EQ, GE, GT, LE, LT, NE, And, Expr, Filter, Or, Projection from dask_expr.io import BlockwiseIO, PartitionsFiltered +from dask_expr.metadata import RowCountMetadata NONE_LABEL = "__null_dask_index__" @@ -296,10 +297,10 @@ def _filtered_task(self, index: int): return (operator.getitem, tsk, self.columns[0]) return tsk - def _statistics(self): + def _metadata(self): if self._pq_statistics and not self.filters: row_count = tuple(stat["num-rows"] for stat in self._pq_statistics) - return {"row_count": row_count} + return {"row_count": RowCountMetadata(row_count)} @property def _pq_statistics(self): From afd59d7e9fe7ce3fbd12eb575d7f58a0e31a924b Mon Sep 17 00:00:00 2001 From: Rick Zamora Date: Tue, 16 May 2023 14:25:32 -0500 Subject: [PATCH 04/11] add missing file --- dask_expr/metadata.py | 86 ++++++++++++++++++++++++++++++ dask_expr/tests/test_collection.py | 12 +++++ 2 files changed, 98 insertions(+) create mode 100644 dask_expr/metadata.py diff --git a/dask_expr/metadata.py b/dask_expr/metadata.py new file mode 100644 index 000000000..dd5f1f2a5 --- /dev/null +++ b/dask_expr/metadata.py @@ -0,0 +1,86 @@ +from __future__ import annotations + +from collections.abc import Iterable +from dataclasses import dataclass +from functools import singledispatchmethod +from typing import Any + +from dask_expr.expr import Elemwise, Expr, Partitions + + +@dataclass(frozen=True) +class Metadata: + """Abstract expression-metadata class + + See Also + -------- + StaticMetadata + PartitionMetadata + """ + + data: Any + + @singledispatchmethod + def inherit(self, child: Expr) -> Metadata | None: + """New `Metadata` object that a "child" Expr mayinherit + + A return value of `None` means that `type(Expr)` is + not eligable to inherit this kind of metadata. + """ + return None + + +@dataclass(frozen=True) +class StaticMetadata(Metadata): + """A static metadata object + + This metadata is not partition-specific, and can be + inherited by any child `Expr`. + """ + + def inherit(self, child: Expr) -> StaticMetadata: + return self + + +@dataclass(frozen=True) +class PartitionMetadata(Metadata): + """Metadata containing a distinct value for every partition + + See Also + -------- + RowCountMetadata + """ + + data: Iterable + + +@PartitionMetadata.inherit.register +def _partitionmetadata_partitions(self, child: Partitions): + # A `Partitions` expression may inherit metadata + # from the selected partitions + return type(self)( + type(self.data)( + part for i, part in enumerate(self.data) if i in child.partitions + ) + ) + + +# +# PartitionMetadata sub-classes +# + + +@dataclass(frozen=True) +class RowCountMetadata(PartitionMetadata): + """Tracks the row count of each partition""" + + def sum(self): + """Return the total row-count of all partitions""" + return sum(self.data) + + +@RowCountMetadata.inherit.register +def _rowcount_elemwise(self, child: Elemwise): + # All Element-wise operations may inherit + # row-count metadata "as is" + return self diff --git a/dask_expr/tests/test_collection.py b/dask_expr/tests/test_collection.py index 21dfe370a..cae821b7b 100644 --- a/dask_expr/tests/test_collection.py +++ b/dask_expr/tests/test_collection.py @@ -429,3 +429,15 @@ def test_repartition_divisions(df, opt): if len(part): assert part.min() >= df2.divisions[p] assert part.max() < df2.divisions[p + 1] + + +def test_statistics(df, pdf): + df2 = df[["x"]] + 1 + assert len(df2) == len(pdf) + assert df2.metadata().get("row_count").sum() == len(pdf) + assert df[df.x > 5].metadata().get("row_count") is None + + # Check `partitions` + first = df2.partitions[0].compute() + assert len(df2.partitions[0]) == len(first) + assert df2.partitions[0].metadata().get("row_count").sum() == len(first) From 830230587f3f55dc1107b65195d1a107bf4f5839 Mon Sep 17 00:00:00 2001 From: Rick Zamora Date: Tue, 16 May 2023 14:55:23 -0500 Subject: [PATCH 05/11] go back to and remove sub-class for now --- dask_expr/expr.py | 40 ++++++++++----------- dask_expr/io/io.py | 6 ++-- dask_expr/io/parquet.py | 6 ++-- dask_expr/{metadata.py => statistics.py} | 45 +++++++++--------------- dask_expr/tests/test_collection.py | 6 ++-- 5 files changed, 45 insertions(+), 58 deletions(-) rename dask_expr/{metadata.py => statistics.py} (50%) diff --git a/dask_expr/expr.py b/dask_expr/expr.py index 55e7495fa..c0c77b8f9 100644 --- a/dask_expr/expr.py +++ b/dask_expr/expr.py @@ -59,9 +59,9 @@ def ndim(self): @functools.cached_property def _len(self): - metadata = self.metadata() - if "row_count" in metadata: - return metadata["row_count"].sum() + statistics = self.statistics() + if "row_count" in statistics: + return statistics["row_count"].sum() return Len(self) def __str__(self): @@ -213,38 +213,38 @@ def _layer(self) -> dict: return {(self._name, i): self._task(i) for i in range(self.npartitions)} - def _metadata(self): - """New metadata to add for this expression""" - from dask_expr.metadata import Metadata + def _statistics(self): + """New statistics to add for this expression""" + from dask_expr.statistics import Statistics - # Inherit metadata from dependencies - metadata = {} + # Inherit statistics from dependencies + statistics = {} for dep in self.dependencies(): - for k, v in dep.metadata().items(): - assert isinstance(v, Metadata) - if k not in metadata: + for k, v in dep.statistics().items(): + assert isinstance(v, Statistics) + if k not in statistics: val = v.inherit(self) if val: - metadata[k] = val - return metadata + statistics[k] = val + return statistics - def metadata(self) -> dict: - """Known metadata of an expression, like partition statistics + def statistics(self) -> dict: + """Known statistics of an expression, like partition statistics - To define this on a class create a `._metadata` method that returns a - dictionary of new metadata known by that class. If nothing is known it + To define this on a class create a `._statistics` method that returns a + dictionary of new statistics known by that class. If nothing is known it is ok to return None. Superclasses will also be consulted. Examples -------- - >>> df.metadata() - {'row_count': RowCountMetadata(data=(1000000,))} + >>> df.statistics() + {'row_count': RowCountStatistics(data=(1000000,))} """ out = {} for typ in type(self).mro()[::-1]: if not issubclass(typ, Expr): continue - d = typ._metadata(self) or {} + d = typ._statistics(self) or {} if d: out.update(d) # TODO: this is fragile return out diff --git a/dask_expr/io/io.py b/dask_expr/io/io.py index 500c74f10..8c451efc6 100644 --- a/dask_expr/io/io.py +++ b/dask_expr/io/io.py @@ -4,7 +4,7 @@ from dask.dataframe.io.io import sorted_division_locations from dask_expr.expr import Blockwise, Expr, PartitionsFiltered -from dask_expr.metadata import RowCountMetadata +from dask_expr.statistics import RowCountStatistics class IO(Expr): @@ -69,12 +69,12 @@ def _divisions_and_locations(self): divisions = (None,) * len(locations) return divisions, locations - def _metadata(self): + def _statistics(self): locations = self._locations() row_counts = tuple( offset - locations[i] for i, offset in enumerate(locations[1:]) ) - return {"row_count": RowCountMetadata(row_counts)} + return {"row_count": RowCountStatistics(row_counts)} def _divisions(self): return self._divisions_and_locations[0] diff --git a/dask_expr/io/parquet.py b/dask_expr/io/parquet.py index cccfc9ff2..dd1c18627 100644 --- a/dask_expr/io/parquet.py +++ b/dask_expr/io/parquet.py @@ -16,7 +16,7 @@ from dask_expr.expr import EQ, GE, GT, LE, LT, NE, And, Expr, Filter, Or, Projection from dask_expr.io import BlockwiseIO, PartitionsFiltered -from dask_expr.metadata import RowCountMetadata +from dask_expr.statistics import RowCountStatistics NONE_LABEL = "__null_dask_index__" @@ -297,10 +297,10 @@ def _filtered_task(self, index: int): return (operator.getitem, tsk, self.columns[0]) return tsk - def _metadata(self): + def _statistics(self): if self._pq_statistics and not self.filters: row_count = tuple(stat["num-rows"] for stat in self._pq_statistics) - return {"row_count": RowCountMetadata(row_count)} + return {"row_count": RowCountStatistics(row_count)} @property def _pq_statistics(self): diff --git a/dask_expr/metadata.py b/dask_expr/statistics.py similarity index 50% rename from dask_expr/metadata.py rename to dask_expr/statistics.py index dd5f1f2a5..02f00b66d 100644 --- a/dask_expr/metadata.py +++ b/dask_expr/statistics.py @@ -9,54 +9,41 @@ @dataclass(frozen=True) -class Metadata: - """Abstract expression-metadata class +class Statistics: + """Abstract expression-statistics class See Also -------- - StaticMetadata - PartitionMetadata + PartitionStatistics """ data: Any @singledispatchmethod - def inherit(self, child: Expr) -> Metadata | None: - """New `Metadata` object that a "child" Expr mayinherit + def inherit(self, child: Expr) -> Statistics | None: + """New `Statistics` object that a "child" Expr mayinherit A return value of `None` means that `type(Expr)` is - not eligable to inherit this kind of metadata. + not eligable to inherit this kind of statistics. """ return None @dataclass(frozen=True) -class StaticMetadata(Metadata): - """A static metadata object - - This metadata is not partition-specific, and can be - inherited by any child `Expr`. - """ - - def inherit(self, child: Expr) -> StaticMetadata: - return self - - -@dataclass(frozen=True) -class PartitionMetadata(Metadata): - """Metadata containing a distinct value for every partition +class PartitionStatistics(Statistics): + """Statistics containing a distinct value for every partition See Also -------- - RowCountMetadata + RowCountStatistics """ data: Iterable -@PartitionMetadata.inherit.register -def _partitionmetadata_partitions(self, child: Partitions): - # A `Partitions` expression may inherit metadata +@PartitionStatistics.inherit.register +def _partitionstatistics_partitions(self, child: Partitions): + # A `Partitions` expression may inherit statistics # from the selected partitions return type(self)( type(self.data)( @@ -66,12 +53,12 @@ def _partitionmetadata_partitions(self, child: Partitions): # -# PartitionMetadata sub-classes +# PartitionStatistics sub-classes # @dataclass(frozen=True) -class RowCountMetadata(PartitionMetadata): +class RowCountStatistics(PartitionStatistics): """Tracks the row count of each partition""" def sum(self): @@ -79,8 +66,8 @@ def sum(self): return sum(self.data) -@RowCountMetadata.inherit.register +@RowCountStatistics.inherit.register def _rowcount_elemwise(self, child: Elemwise): # All Element-wise operations may inherit - # row-count metadata "as is" + # row-count statistics "as is" return self diff --git a/dask_expr/tests/test_collection.py b/dask_expr/tests/test_collection.py index cae821b7b..b77a431da 100644 --- a/dask_expr/tests/test_collection.py +++ b/dask_expr/tests/test_collection.py @@ -434,10 +434,10 @@ def test_repartition_divisions(df, opt): def test_statistics(df, pdf): df2 = df[["x"]] + 1 assert len(df2) == len(pdf) - assert df2.metadata().get("row_count").sum() == len(pdf) - assert df[df.x > 5].metadata().get("row_count") is None + assert df2.statistics().get("row_count").sum() == len(pdf) + assert df[df.x > 5].statistics().get("row_count") is None # Check `partitions` first = df2.partitions[0].compute() assert len(df2.partitions[0]) == len(first) - assert df2.partitions[0].metadata().get("row_count").sum() == len(first) + assert df2.partitions[0].statistics().get("row_count").sum() == len(first) From a3c5f2c353b8b424ec584a13ec8bd44f15c07802 Mon Sep 17 00:00:00 2001 From: Rick Zamora Date: Tue, 16 May 2023 15:09:16 -0500 Subject: [PATCH 06/11] add parquet test --- dask_expr/io/tests/test_io.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/dask_expr/io/tests/test_io.py b/dask_expr/io/tests/test_io.py index 60e85394f..0f149750d 100644 --- a/dask_expr/io/tests/test_io.py +++ b/dask_expr/io/tests/test_io.py @@ -204,3 +204,14 @@ def test_parquet_complex_filters(tmpdir): assert_eq(got, expect) assert_eq(got.optimize(), expect) + + +def test_parquet_row_count_statistics(tmpdir): + # NOTE: We should no longer need to set `index` + # or `calculate_divisions` to gather row-count + # statistics after dask#10290 + df = read_parquet(_make_file(tmpdir), index="a", calculate_divisions=True) + pdf = df.compute() + + s = (df["b"] + 1).astype("Int32") + assert s.statistics().get("row_count").sum() == len(pdf) From cbced80d005bd5e219e93389625a35123043a706 Mon Sep 17 00:00:00 2001 From: Rick Zamora Date: Tue, 16 May 2023 15:21:22 -0500 Subject: [PATCH 07/11] use assume vs inherit --- dask_expr/statistics.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/dask_expr/statistics.py b/dask_expr/statistics.py index 02f00b66d..0116ba5ff 100644 --- a/dask_expr/statistics.py +++ b/dask_expr/statistics.py @@ -20,11 +20,11 @@ class Statistics: data: Any @singledispatchmethod - def inherit(self, child: Expr) -> Statistics | None: - """New `Statistics` object that a "child" Expr mayinherit + def assume(self, parent: Expr) -> Statistics | None: + """Statistics that a "parent" Expr may assume A return value of `None` means that `type(Expr)` is - not eligable to inherit this kind of statistics. + not eligable to assume these kind of statistics. """ return None @@ -41,13 +41,13 @@ class PartitionStatistics(Statistics): data: Iterable -@PartitionStatistics.inherit.register -def _partitionstatistics_partitions(self, child: Partitions): - # A `Partitions` expression may inherit statistics +@PartitionStatistics.assume.register +def _partitionstatistics_partitions(self, parent: Partitions): + # A `Partitions` expression may assume statistics # from the selected partitions return type(self)( type(self.data)( - part for i, part in enumerate(self.data) if i in child.partitions + part for i, part in enumerate(self.data) if i in parent.partitions ) ) @@ -66,8 +66,8 @@ def sum(self): return sum(self.data) -@RowCountStatistics.inherit.register -def _rowcount_elemwise(self, child: Elemwise): - # All Element-wise operations may inherit - # row-count statistics "as is" +@RowCountStatistics.assume.register +def _rowcount_elemwise(self, parent: Elemwise): + # All Element-wise operations may assume + # row-count statistics return self From 5fe58629feb93aae8e39ba5f59946a9abf5d3174 Mon Sep 17 00:00:00 2001 From: Rick Zamora Date: Tue, 16 May 2023 15:21:57 -0500 Subject: [PATCH 08/11] use assume vs inherit --- dask_expr/expr.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_expr/expr.py b/dask_expr/expr.py index c0c77b8f9..46851d188 100644 --- a/dask_expr/expr.py +++ b/dask_expr/expr.py @@ -223,7 +223,7 @@ def _statistics(self): for k, v in dep.statistics().items(): assert isinstance(v, Statistics) if k not in statistics: - val = v.inherit(self) + val = v.assume(self) if val: statistics[k] = val return statistics From b0946f8432ea327182218f10b9c482f85dce7e18 Mon Sep 17 00:00:00 2001 From: Rick Zamora Date: Tue, 16 May 2023 15:26:22 -0500 Subject: [PATCH 09/11] split test --- dask_expr/tests/test_collection.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/dask_expr/tests/test_collection.py b/dask_expr/tests/test_collection.py index b77a431da..5c1ceb67e 100644 --- a/dask_expr/tests/test_collection.py +++ b/dask_expr/tests/test_collection.py @@ -431,13 +431,18 @@ def test_repartition_divisions(df, opt): assert part.max() < df2.divisions[p + 1] -def test_statistics(df, pdf): +def test_len(df, pdf): df2 = df[["x"]] + 1 assert len(df2) == len(pdf) - assert df2.statistics().get("row_count").sum() == len(pdf) - assert df[df.x > 5].statistics().get("row_count") is None - # Check `partitions` first = df2.partitions[0].compute() assert len(df2.partitions[0]) == len(first) - assert df2.partitions[0].statistics().get("row_count").sum() == len(first) + + +def test_row_count_statistics(df, pdf): + df2 = df[["x"]] + 1 + assert df2.statistics().get("row_count").sum() == len(pdf) + assert df[df.x > 5].statistics().get("row_count") is None + assert df2.partitions[0].statistics().get("row_count").sum() == len( + df2.partitions[0] + ) From bfd87105ede7b3bece84c21aae1e475c118ce8b8 Mon Sep 17 00:00:00 2001 From: Rick Zamora Date: Tue, 16 May 2023 15:36:14 -0500 Subject: [PATCH 10/11] fix doc-string --- dask_expr/statistics.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/dask_expr/statistics.py b/dask_expr/statistics.py index 0116ba5ff..3fbba5b0e 100644 --- a/dask_expr/statistics.py +++ b/dask_expr/statistics.py @@ -10,7 +10,7 @@ @dataclass(frozen=True) class Statistics: - """Abstract expression-statistics class + """Abstract class for expression statistics See Also -------- @@ -23,8 +23,9 @@ class Statistics: def assume(self, parent: Expr) -> Statistics | None: """Statistics that a "parent" Expr may assume - A return value of `None` means that `type(Expr)` is - not eligable to assume these kind of statistics. + A return value of `None` (the default) means that + `type(Expr)` is not eligable to assume these kind + of statistics. """ return None From 2d343c71334fef84aaa381b2c45441c92f02acc1 Mon Sep 17 00:00:00 2001 From: Rick Zamora Date: Tue, 16 May 2023 15:50:21 -0500 Subject: [PATCH 11/11] fix typos --- dask_expr/expr.py | 2 +- dask_expr/statistics.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/dask_expr/expr.py b/dask_expr/expr.py index 46851d188..7ea7d9268 100644 --- a/dask_expr/expr.py +++ b/dask_expr/expr.py @@ -217,7 +217,7 @@ def _statistics(self): """New statistics to add for this expression""" from dask_expr.statistics import Statistics - # Inherit statistics from dependencies + # Assume statistics from dependencies statistics = {} for dep in self.dependencies(): for k, v in dep.statistics().items(): diff --git a/dask_expr/statistics.py b/dask_expr/statistics.py index 3fbba5b0e..ae631fc6c 100644 --- a/dask_expr/statistics.py +++ b/dask_expr/statistics.py @@ -24,8 +24,8 @@ def assume(self, parent: Expr) -> Statistics | None: """Statistics that a "parent" Expr may assume A return value of `None` (the default) means that - `type(Expr)` is not eligable to assume these kind - of statistics. + `parent` is not eligable to assume this kind of + statistics. """ return None