From 0c3fed9b371a319a8f9146737c6b2998ff8a6669 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 24 Dec 2024 13:27:05 +0800 Subject: [PATCH 01/16] fix: update default compression to ZSTD and improve documentation for write_parquet method --- python/datafusion/dataframe.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/python/datafusion/dataframe.py b/python/datafusion/dataframe.py index 0b38db924..8879178b8 100644 --- a/python/datafusion/dataframe.py +++ b/python/datafusion/dataframe.py @@ -620,16 +620,24 @@ def write_csv(self, path: str | pathlib.Path, with_header: bool = False) -> None def write_parquet( self, path: str | pathlib.Path, - compression: str = "uncompressed", + compression: str = "ZSTD", compression_level: int | None = None, ) -> None: """Execute the :py:class:`DataFrame` and write the results to a Parquet file. Args: - path: Path of the Parquet file to write. - compression: Compression type to use. - compression_level: Compression level to use. - """ + path (str | pathlib.Path): The file path to write the Parquet file. + compression (str): The compression algorithm to use. Default is "ZSTD". + compression_level (int | None): The compression level to use. For ZSTD, the + recommended range is 1 to 22, with the default being 3. Higher levels + provide better compression but slower speed. + """ + # default compression level to 3 for ZSTD + if compression == "ZSTD": + if compression_level is None: + compression_level = 3 + elif not (1 <= compression_level <= 22): + raise ValueError("Compression level for ZSTD must be between 1 and 22") self.df.write_parquet(str(path), compression, compression_level) def write_json(self, path: str | pathlib.Path) -> None: From b1db46c4c24590d74c7c9379561ce1406e4e66b1 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Fri, 27 Dec 2024 09:51:56 +0800 Subject: [PATCH 02/16] fix: clarify compression level documentation for ZSTD in write_parquet method --- python/datafusion/dataframe.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/datafusion/dataframe.py b/python/datafusion/dataframe.py index 8879178b8..1cab61caa 100644 --- a/python/datafusion/dataframe.py +++ b/python/datafusion/dataframe.py @@ -632,9 +632,10 @@ def write_parquet( recommended range is 1 to 22, with the default being 3. Higher levels provide better compression but slower speed. """ - # default compression level to 3 for ZSTD if compression == "ZSTD": if compression_level is None: + # Default compression level for ZSTD is 3 as per + # https://facebook.github.io/zstd/zstd_manual.html compression_level = 3 elif not (1 <= compression_level <= 22): raise ValueError("Compression level for ZSTD must be between 1 and 22") From 819de0d41e633b5c725b4cdfd7f2cbf49cc3db7d Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 7 Jan 2025 08:57:49 +0800 Subject: [PATCH 03/16] fix: update default compression level for ZSTD to 4 in write_parquet method --- python/datafusion/dataframe.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/python/datafusion/dataframe.py b/python/datafusion/dataframe.py index 1cab61caa..b2a4d027f 100644 --- a/python/datafusion/dataframe.py +++ b/python/datafusion/dataframe.py @@ -629,14 +629,14 @@ def write_parquet( path (str | pathlib.Path): The file path to write the Parquet file. compression (str): The compression algorithm to use. Default is "ZSTD". compression_level (int | None): The compression level to use. For ZSTD, the - recommended range is 1 to 22, with the default being 3. Higher levels + recommended range is 1 to 22, with the default being 4. Higher levels provide better compression but slower speed. """ if compression == "ZSTD": if compression_level is None: - # Default compression level for ZSTD is 3 as per - # https://facebook.github.io/zstd/zstd_manual.html - compression_level = 3 + # Default compression level for ZSTD is 4 like in delta-rs + # https://github.com/apache/datafusion-python/pull/981#discussion_r1899871918 + compression_level = 4 elif not (1 <= compression_level <= 22): raise ValueError("Compression level for ZSTD must be between 1 and 22") self.df.write_parquet(str(path), compression, compression_level) From 56965f46ac3b4add70a8220fbc79bfe4fc749151 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 7 Jan 2025 09:01:57 +0800 Subject: [PATCH 04/16] fix: improve docstring formatting for DataFrame parquet writing method --- python/datafusion/dataframe.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/python/datafusion/dataframe.py b/python/datafusion/dataframe.py index b2a4d027f..65ae2942a 100644 --- a/python/datafusion/dataframe.py +++ b/python/datafusion/dataframe.py @@ -626,11 +626,11 @@ def write_parquet( """Execute the :py:class:`DataFrame` and write the results to a Parquet file. Args: - path (str | pathlib.Path): The file path to write the Parquet file. - compression (str): The compression algorithm to use. Default is "ZSTD". - compression_level (int | None): The compression level to use. For ZSTD, the - recommended range is 1 to 22, with the default being 4. Higher levels - provide better compression but slower speed. + path: Path of the Parquet file to write. + compression: Compression type to use. Default is "ZSTD". + compression_level: Compression level to use. For ZSTD, the + recommended range is 1 to 22, with the default being 4. Higher levels + provide better compression but slower speed. """ if compression == "ZSTD": if compression_level is None: From df7d65e2ec4d990db714da39d20d5f57699820ad Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 8 Jan 2025 11:39:13 +0800 Subject: [PATCH 05/16] feat: implement Compression enum and update write_parquet method to use it --- python/datafusion/dataframe.py | 57 +++++++++++++++++++++++++++++----- 1 file changed, 49 insertions(+), 8 deletions(-) diff --git a/python/datafusion/dataframe.py b/python/datafusion/dataframe.py index 65ae2942a..4b2dab2cc 100644 --- a/python/datafusion/dataframe.py +++ b/python/datafusion/dataframe.py @@ -35,6 +35,39 @@ from datafusion._internal import DataFrame as DataFrameInternal from datafusion.expr import Expr, SortExpr, sort_or_default +from enum import Enum +from typing import Tuple + + +class Compression(Enum): + UNCOMPRESSED = "uncompressed" + SNAPPY = "snappy" + GZIP = "gzip" + BROTLI = "brotli" + LZ4 = "lz4" + LZ0 = "lz0" + ZSTD = "zstd" + LZ4_RAW = "lz4_raw" + + @classmethod + def from_str(cls, value: str) -> "Compression": + try: + return cls(value.lower()) + except ValueError: + raise ValueError( + f"{value} is not a valid Compression. Valid values are: {[item.value for item in Compression]}" + ) + + def get_default_level(self) -> int: + if self == Compression.GZIP: + DEFAULT = 6 + elif self == Compression.BROTLI: + DEFAULT = 1 + elif self == Compression.ZSTD: + DEFAULT = 4 + else: + raise KeyError(f"{self.value} does not have a compression level.") + return DEFAULT class DataFrame: @@ -620,7 +653,7 @@ def write_csv(self, path: str | pathlib.Path, with_header: bool = False) -> None def write_parquet( self, path: str | pathlib.Path, - compression: str = "ZSTD", + compression: str = Compression.ZSTD.value, compression_level: int | None = None, ) -> None: """Execute the :py:class:`DataFrame` and write the results to a Parquet file. @@ -628,18 +661,26 @@ def write_parquet( Args: path: Path of the Parquet file to write. compression: Compression type to use. Default is "ZSTD". + Available compression types are: + - "UNCOMPRESSED": No compression. + - "SNAPPY": Snappy compression. + - "GZIP": Gzip compression. + - "BROTLI": Brotli compression. + - "LZ0": LZ0 compression. + - "LZ4": LZ4 compression. + - "LZ4_RAW": LZ4_RAW compression. + - "ZSTD": Zstandard compression. compression_level: Compression level to use. For ZSTD, the recommended range is 1 to 22, with the default being 4. Higher levels provide better compression but slower speed. """ - if compression == "ZSTD": + compression_enum = Compression.from_str(compression) + + if compression_enum in {Compression.GZIP, Compression.BROTLI, Compression.ZSTD}: if compression_level is None: - # Default compression level for ZSTD is 4 like in delta-rs - # https://github.com/apache/datafusion-python/pull/981#discussion_r1899871918 - compression_level = 4 - elif not (1 <= compression_level <= 22): - raise ValueError("Compression level for ZSTD must be between 1 and 22") - self.df.write_parquet(str(path), compression, compression_level) + compression_level = compression_enum.get_default_level() + + self.df.write_parquet(str(path), compression_enum.value, compression_level) def write_json(self, path: str | pathlib.Path) -> None: """Execute the :py:class:`DataFrame` and write the results to a JSON file. From f62a7a8f953287e8f1f0db3a6d7e30ad767bd578 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 8 Jan 2025 12:10:44 +0800 Subject: [PATCH 06/16] add test --- python/tests/test_dataframe.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/python/tests/test_dataframe.py b/python/tests/test_dataframe.py index b82f95e35..aad88bc99 100644 --- a/python/tests/test_dataframe.py +++ b/python/tests/test_dataframe.py @@ -731,9 +731,7 @@ def test_optimized_logical_plan(aggregate_df): def test_execution_plan(aggregate_df): plan = aggregate_df.execution_plan() - expected = ( - "AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[sum(test.c2)]\n" # noqa: E501 - ) + expected = "AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[sum(test.c2)]\n" # noqa: E501 assert expected == plan.display() @@ -1107,7 +1105,15 @@ def test_write_compressed_parquet_wrong_compression_level( ) -@pytest.mark.parametrize("compression", ["brotli", "zstd", "wrong"]) +# test write_parquet with zstd, brotli default compression level, should complete without error +@pytest.mark.parametrize("compression", ["zstd", "brotli"]) +def test_write_compressed_parquet_default_compression_level(df, tmp_path, compression): + path = tmp_path + + df.write_parquet(str(path), compression=compression) + + +@pytest.mark.parametrize("compression", ["wrong"]) def test_write_compressed_parquet_missing_compression_level(df, tmp_path, compression): path = tmp_path From b5b3c476e15637bc7cbd2dee36873881e1eb2540 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 8 Jan 2025 12:13:12 +0800 Subject: [PATCH 07/16] fix: remove unused import and update default compression to ZSTD in rs' write_parquet method --- python/datafusion/dataframe.py | 1 - src/dataframe.rs | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/python/datafusion/dataframe.py b/python/datafusion/dataframe.py index 4b2dab2cc..5944c0deb 100644 --- a/python/datafusion/dataframe.py +++ b/python/datafusion/dataframe.py @@ -36,7 +36,6 @@ from datafusion._internal import DataFrame as DataFrameInternal from datafusion.expr import Expr, SortExpr, sort_or_default from enum import Enum -from typing import Tuple class Compression(Enum): diff --git a/src/dataframe.rs b/src/dataframe.rs index e7d6ca6d6..9bdc2a327 100644 --- a/src/dataframe.rs +++ b/src/dataframe.rs @@ -463,7 +463,7 @@ impl PyDataFrame { /// Write a `DataFrame` to a Parquet file. #[pyo3(signature = ( path, - compression="uncompressed", + compression="zstd", compression_level=None ))] fn write_parquet( From 23629927aceebf58c0cf8a503710ca5ec3792b78 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 8 Jan 2025 13:03:45 +0800 Subject: [PATCH 08/16] fix: update compression type strings to lowercase in DataFrame parquet writing method doc --- python/datafusion/dataframe.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/python/datafusion/dataframe.py b/python/datafusion/dataframe.py index 5944c0deb..db7b07e62 100644 --- a/python/datafusion/dataframe.py +++ b/python/datafusion/dataframe.py @@ -661,14 +661,14 @@ def write_parquet( path: Path of the Parquet file to write. compression: Compression type to use. Default is "ZSTD". Available compression types are: - - "UNCOMPRESSED": No compression. - - "SNAPPY": Snappy compression. - - "GZIP": Gzip compression. - - "BROTLI": Brotli compression. - - "LZ0": LZ0 compression. - - "LZ4": LZ4 compression. - - "LZ4_RAW": LZ4_RAW compression. - - "ZSTD": Zstandard compression. + - "uncompressed": No compression. + - "snappy": Snappy compression. + - "gzip": Gzip compression. + - "brotli": Brotli compression. + - "lz0": LZ0 compression. + - "lz4": LZ4 compression. + - "lz4_raw": LZ4_RAW compression. + - "zstd": Zstandard compression. compression_level: Compression level to use. For ZSTD, the recommended range is 1 to 22, with the default being 4. Higher levels provide better compression but slower speed. From b86b14212b05e0907e9ccad969cfdf19a269713d Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 8 Jan 2025 13:44:05 +0800 Subject: [PATCH 09/16] test: update parquet compression tests to validate invalid and default compression levels --- python/tests/test_dataframe.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/python/tests/test_dataframe.py b/python/tests/test_dataframe.py index aad88bc99..c13cc6fdd 100644 --- a/python/tests/test_dataframe.py +++ b/python/tests/test_dataframe.py @@ -1105,20 +1105,20 @@ def test_write_compressed_parquet_wrong_compression_level( ) -# test write_parquet with zstd, brotli default compression level, should complete without error -@pytest.mark.parametrize("compression", ["zstd", "brotli"]) -def test_write_compressed_parquet_default_compression_level(df, tmp_path, compression): +@pytest.mark.parametrize("compression", ["wrong"]) +def test_write_compressed_parquet_invalid_compression(df, tmp_path, compression): path = tmp_path - df.write_parquet(str(path), compression=compression) + with pytest.raises(ValueError): + df.write_parquet(str(path), compression=compression) -@pytest.mark.parametrize("compression", ["wrong"]) -def test_write_compressed_parquet_missing_compression_level(df, tmp_path, compression): +# test write_parquet with zstd, brotli default compression level, should complete without error +@pytest.mark.parametrize("compression", ["zstd", "brotli"]) +def test_write_compressed_parquet_default_compression_level(df, tmp_path, compression): path = tmp_path - with pytest.raises(ValueError): - df.write_parquet(str(path), compression=compression) + df.write_parquet(str(path), compression=compression) def test_dataframe_export(df) -> None: From 41e1742be60341bb79182a1e717a148580e8207e Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 8 Jan 2025 14:01:55 +0800 Subject: [PATCH 10/16] add comment on source of Compression --- python/datafusion/dataframe.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/python/datafusion/dataframe.py b/python/datafusion/dataframe.py index db7b07e62..0fd034eea 100644 --- a/python/datafusion/dataframe.py +++ b/python/datafusion/dataframe.py @@ -38,6 +38,8 @@ from enum import Enum +# excerpt from deltalake +# https://github.com/apache/datafusion-python/pull/981#discussion_r1905619163 class Compression(Enum): UNCOMPRESSED = "uncompressed" SNAPPY = "snappy" @@ -58,11 +60,15 @@ def from_str(cls, value: str) -> "Compression": ) def get_default_level(self) -> int: + # GZIP, BROTLI defaults from deltalake + # https://github.com/apache/datafusion-python/pull/981#discussion_r1905619163 if self == Compression.GZIP: DEFAULT = 6 elif self == Compression.BROTLI: DEFAULT = 1 elif self == Compression.ZSTD: + # ZSTD default from delta-rs + # https://github.com/apache/datafusion-python/pull/981#discussion_r1904789223 DEFAULT = 4 else: raise KeyError(f"{self.value} does not have a compression level.") From fe502e84c760afb2ba0754a4d39fa20bfabd9271 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 8 Jan 2025 14:14:58 +0800 Subject: [PATCH 11/16] docs: enhance Compression enum documentation and add default level method --- python/datafusion/dataframe.py | 21 +++++++++++++++++++++ python/tests/test_dataframe.py | 4 +++- 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/python/datafusion/dataframe.py b/python/datafusion/dataframe.py index 0fd034eea..d7489d2d7 100644 --- a/python/datafusion/dataframe.py +++ b/python/datafusion/dataframe.py @@ -41,6 +41,8 @@ # excerpt from deltalake # https://github.com/apache/datafusion-python/pull/981#discussion_r1905619163 class Compression(Enum): + """Enum representing the available compression types for Parquet files.""" + UNCOMPRESSED = "uncompressed" SNAPPY = "snappy" GZIP = "gzip" @@ -52,6 +54,17 @@ class Compression(Enum): @classmethod def from_str(cls, value: str) -> "Compression": + """Convert a string to a Compression enum value. + + Args: + value (str): The string representation of the compression type. + + Returns: + Compression: The corresponding Compression enum value. + + Raises: + ValueError: If the string does not match any Compression enum value. + """ try: return cls(value.lower()) except ValueError: @@ -60,6 +73,14 @@ def from_str(cls, value: str) -> "Compression": ) def get_default_level(self) -> int: + """Get the default compression level for the compression type. + + Returns: + int: The default compression level. + + Raises: + KeyError: If the compression type does not have a default level. + """ # GZIP, BROTLI defaults from deltalake # https://github.com/apache/datafusion-python/pull/981#discussion_r1905619163 if self == Compression.GZIP: diff --git a/python/tests/test_dataframe.py b/python/tests/test_dataframe.py index c13cc6fdd..248af0a21 100644 --- a/python/tests/test_dataframe.py +++ b/python/tests/test_dataframe.py @@ -1113,7 +1113,9 @@ def test_write_compressed_parquet_invalid_compression(df, tmp_path, compression) df.write_parquet(str(path), compression=compression) -# test write_parquet with zstd, brotli default compression level, should complete without error +# Test write_parquet with zstd, brotli default compression level, +# ie don't specify compression level +# should complete without error @pytest.mark.parametrize("compression", ["zstd", "brotli"]) def test_write_compressed_parquet_default_compression_level(df, tmp_path, compression): path = tmp_path From 67529b897496bcf769296e046dfd75bb483e39c2 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 8 Jan 2025 14:46:15 +0800 Subject: [PATCH 12/16] test: include gzip in default compression level tests for write_parquet --- python/tests/test_dataframe.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/tests/test_dataframe.py b/python/tests/test_dataframe.py index 248af0a21..a9f54efca 100644 --- a/python/tests/test_dataframe.py +++ b/python/tests/test_dataframe.py @@ -1113,10 +1113,10 @@ def test_write_compressed_parquet_invalid_compression(df, tmp_path, compression) df.write_parquet(str(path), compression=compression) -# Test write_parquet with zstd, brotli default compression level, +# Test write_parquet with zstd, brotli, gzip default compression level, # ie don't specify compression level # should complete without error -@pytest.mark.parametrize("compression", ["zstd", "brotli"]) +@pytest.mark.parametrize("compression", ["zstd", "brotli", "gzip"]) def test_write_compressed_parquet_default_compression_level(df, tmp_path, compression): path = tmp_path From 811f633f490d43dcfcadb571e95b9da9cd33fd97 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 9 Jan 2025 10:00:10 +0800 Subject: [PATCH 13/16] refactor: simplify Compression enum methods and improve type handling in DataFrame.write_parquet --- python/datafusion/dataframe.py | 55 +++++++++++++++++----------------- 1 file changed, 27 insertions(+), 28 deletions(-) diff --git a/python/datafusion/dataframe.py b/python/datafusion/dataframe.py index d7489d2d7..cb5bdd940 100644 --- a/python/datafusion/dataframe.py +++ b/python/datafusion/dataframe.py @@ -21,7 +21,16 @@ from __future__ import annotations import warnings -from typing import Any, Iterable, List, TYPE_CHECKING, Literal, overload +from typing import ( + Any, + Iterable, + List, + TYPE_CHECKING, + Literal, + overload, + Optional, + Union, +) from datafusion.record_batch import RecordBatchStream from typing_extensions import deprecated from datafusion.plan import LogicalPlan, ExecutionPlan @@ -57,10 +66,7 @@ def from_str(cls, value: str) -> "Compression": """Convert a string to a Compression enum value. Args: - value (str): The string representation of the compression type. - - Returns: - Compression: The corresponding Compression enum value. + value: The string representation of the compression type. Raises: ValueError: If the string does not match any Compression enum value. @@ -72,28 +78,19 @@ def from_str(cls, value: str) -> "Compression": f"{value} is not a valid Compression. Valid values are: {[item.value for item in Compression]}" ) - def get_default_level(self) -> int: - """Get the default compression level for the compression type. - - Returns: - int: The default compression level. - - Raises: - KeyError: If the compression type does not have a default level. - """ - # GZIP, BROTLI defaults from deltalake + def get_default_level(self) -> Optional[int]: + """Get the default compression level for the compression type.""" + # GZIP, BROTLI default values from deltalake repo # https://github.com/apache/datafusion-python/pull/981#discussion_r1905619163 + # ZSTD default value from delta-rs + # https://github.com/apache/datafusion-python/pull/981#discussion_r1904789223 if self == Compression.GZIP: - DEFAULT = 6 + return 6 elif self == Compression.BROTLI: - DEFAULT = 1 + return 1 elif self == Compression.ZSTD: - # ZSTD default from delta-rs - # https://github.com/apache/datafusion-python/pull/981#discussion_r1904789223 - DEFAULT = 4 - else: - raise KeyError(f"{self.value} does not have a compression level.") - return DEFAULT + return 4 + return None class DataFrame: @@ -679,7 +676,7 @@ def write_csv(self, path: str | pathlib.Path, with_header: bool = False) -> None def write_parquet( self, path: str | pathlib.Path, - compression: str = Compression.ZSTD.value, + compression: Union[str, Compression] = Compression.ZSTD, compression_level: int | None = None, ) -> None: """Execute the :py:class:`DataFrame` and write the results to a Parquet file. @@ -700,13 +697,15 @@ def write_parquet( recommended range is 1 to 22, with the default being 4. Higher levels provide better compression but slower speed. """ - compression_enum = Compression.from_str(compression) + # Convert string to Compression enum if necessary + if isinstance(compression, str): + compression = Compression.from_str(compression) - if compression_enum in {Compression.GZIP, Compression.BROTLI, Compression.ZSTD}: + if compression in {Compression.GZIP, Compression.BROTLI, Compression.ZSTD}: if compression_level is None: - compression_level = compression_enum.get_default_level() + compression_level = compression.get_default_level() - self.df.write_parquet(str(path), compression_enum.value, compression_level) + self.df.write_parquet(str(path), compression.value, compression_level) def write_json(self, path: str | pathlib.Path) -> None: """Execute the :py:class:`DataFrame` and write the results to a JSON file. From 50a58b3d65acd38e84ecb4ce4767addf65e7ffef Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 9 Jan 2025 10:25:53 +0800 Subject: [PATCH 14/16] docs: update Compression enum methods to include return type descriptions --- python/datafusion/dataframe.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/python/datafusion/dataframe.py b/python/datafusion/dataframe.py index cb5bdd940..f8aef0c91 100644 --- a/python/datafusion/dataframe.py +++ b/python/datafusion/dataframe.py @@ -68,6 +68,9 @@ def from_str(cls, value: str) -> "Compression": Args: value: The string representation of the compression type. + Returns: + The Compression enum lowercase value. + Raises: ValueError: If the string does not match any Compression enum value. """ @@ -79,7 +82,11 @@ def from_str(cls, value: str) -> "Compression": ) def get_default_level(self) -> Optional[int]: - """Get the default compression level for the compression type.""" + """Get the default compression level for the compression type. + + Returns: + The default compression level for the compression type. + """ # GZIP, BROTLI default values from deltalake repo # https://github.com/apache/datafusion-python/pull/981#discussion_r1905619163 # ZSTD default value from delta-rs From 55fc97eb0ad773b40d39975762bd904837f14147 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 9 Jan 2025 12:46:22 +0800 Subject: [PATCH 15/16] move comment to within test --- python/tests/test_dataframe.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/tests/test_dataframe.py b/python/tests/test_dataframe.py index a9f54efca..dcb86cea3 100644 --- a/python/tests/test_dataframe.py +++ b/python/tests/test_dataframe.py @@ -1113,11 +1113,11 @@ def test_write_compressed_parquet_invalid_compression(df, tmp_path, compression) df.write_parquet(str(path), compression=compression) -# Test write_parquet with zstd, brotli, gzip default compression level, -# ie don't specify compression level -# should complete without error @pytest.mark.parametrize("compression", ["zstd", "brotli", "gzip"]) def test_write_compressed_parquet_default_compression_level(df, tmp_path, compression): + # Test write_parquet with zstd, brotli, gzip default compression level, + # ie don't specify compression level + # should complete without error path = tmp_path df.write_parquet(str(path), compression=compression) From 73519fe65a38851574e2abb0027350b1cb988188 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Thu, 9 Jan 2025 06:51:06 -0500 Subject: [PATCH 16/16] Ruff format --- python/tests/test_dataframe.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/python/tests/test_dataframe.py b/python/tests/test_dataframe.py index dcb86cea3..41a96ae6b 100644 --- a/python/tests/test_dataframe.py +++ b/python/tests/test_dataframe.py @@ -731,7 +731,9 @@ def test_optimized_logical_plan(aggregate_df): def test_execution_plan(aggregate_df): plan = aggregate_df.execution_plan() - expected = "AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[sum(test.c2)]\n" # noqa: E501 + expected = ( + "AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[sum(test.c2)]\n" # noqa: E501 + ) assert expected == plan.display()