From 5cb1c0e740a86d8b6bf1354106eba2a5ccb3fb8f Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Mon, 13 Jan 2025 11:28:52 +0800 Subject: [PATCH 1/7] fix: correct LZ0 to LZO in compression options --- python/datafusion/dataframe.py | 4 ++-- python/tests/test_dataframe.py | 4 +--- src/dataframe.rs | 2 +- 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/python/datafusion/dataframe.py b/python/datafusion/dataframe.py index f8aef0c91..a2231a61e 100644 --- a/python/datafusion/dataframe.py +++ b/python/datafusion/dataframe.py @@ -57,7 +57,7 @@ class Compression(Enum): GZIP = "gzip" BROTLI = "brotli" LZ4 = "lz4" - LZ0 = "lz0" + LZO = "lzo" ZSTD = "zstd" LZ4_RAW = "lz4_raw" @@ -696,7 +696,7 @@ def write_parquet( - "snappy": Snappy compression. - "gzip": Gzip compression. - "brotli": Brotli compression. - - "lz0": LZ0 compression. + - "lzo": LZO compression. - "lz4": LZ4 compression. - "lz4_raw": LZ4_RAW compression. - "zstd": Zstandard compression. diff --git a/python/tests/test_dataframe.py b/python/tests/test_dataframe.py index fa5f4e8c5..0f531a5c8 100644 --- a/python/tests/test_dataframe.py +++ b/python/tests/test_dataframe.py @@ -731,9 +731,7 @@ def test_optimized_logical_plan(aggregate_df): def test_execution_plan(aggregate_df): plan = aggregate_df.execution_plan() - expected = ( - "AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[sum(test.c2)]\n" # noqa: E501 - ) + expected = "AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[sum(test.c2)]\n" # noqa: E501 assert expected == plan.display() diff --git a/src/dataframe.rs b/src/dataframe.rs index 71a6fe60f..b875480a7 100644 --- a/src/dataframe.rs +++ b/src/dataframe.rs @@ -491,7 +491,7 @@ impl PyDataFrame { ZstdLevel::try_new(verify_compression_level(compression_level)? as i32) .map_err(|e| PyValueError::new_err(format!("{e}")))?, ), - "lz0" => Compression::LZO, + "lzo" => Compression::LZO, "lz4" => Compression::LZ4, "lz4_raw" => Compression::LZ4_RAW, "uncompressed" => Compression::UNCOMPRESSED, From 0a00bd3a70db6c68e58e5cb835d6d11363708010 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Mon, 13 Jan 2025 15:10:09 +0800 Subject: [PATCH 2/7] fix: disable LZO compression option and update tests to reflect its unavailability --- python/datafusion/dataframe.py | 5 ++++- python/tests/test_dataframe.py | 3 +++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/python/datafusion/dataframe.py b/python/datafusion/dataframe.py index a2231a61e..60ce3ef05 100644 --- a/python/datafusion/dataframe.py +++ b/python/datafusion/dataframe.py @@ -57,7 +57,10 @@ class Compression(Enum): GZIP = "gzip" BROTLI = "brotli" LZ4 = "lz4" - LZO = "lzo" + # TODO + # lzo is not implemented yet + # https://github.com/apache/arrow-rs/issues/6970 + # LZO = "lzo" ZSTD = "zstd" LZ4_RAW = "lz4_raw" diff --git a/python/tests/test_dataframe.py b/python/tests/test_dataframe.py index 0f531a5c8..4cbf09c62 100644 --- a/python/tests/test_dataframe.py +++ b/python/tests/test_dataframe.py @@ -1113,6 +1113,9 @@ def test_write_compressed_parquet_invalid_compression(df, tmp_path, compression) df.write_parquet(str(path), compression=compression) +# TODO +# not testing lzo because it it not implemented yet +# https://github.com/apache/arrow-rs/issues/6970 @pytest.mark.parametrize("compression", ["zstd", "brotli", "gzip"]) def test_write_compressed_parquet_default_compression_level(df, tmp_path, compression): # Test write_parquet with zstd, brotli, gzip default compression level, From e53844bca54b76539ed233a6922f068a29777632 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Mon, 13 Jan 2025 15:17:53 +0800 Subject: [PATCH 3/7] fix: ruff format expected string in test_execution_plan --- python/tests/test_dataframe.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/python/tests/test_dataframe.py b/python/tests/test_dataframe.py index 4cbf09c62..31bf5099b 100644 --- a/python/tests/test_dataframe.py +++ b/python/tests/test_dataframe.py @@ -731,7 +731,9 @@ def test_optimized_logical_plan(aggregate_df): def test_execution_plan(aggregate_df): plan = aggregate_df.execution_plan() - expected = "AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[sum(test.c2)]\n" # noqa: E501 + expected = ( + "AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[sum(test.c2)]\n" # noqa: E501 + ) assert expected == plan.display() From bceef4173e4b3cb9828b07d6932a452d89043e70 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Mon, 13 Jan 2025 15:23:36 +0800 Subject: [PATCH 4/7] fix: update test for execution plan and add validation for invalid LZO compression --- python/tests/test_dataframe.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/python/tests/test_dataframe.py b/python/tests/test_dataframe.py index 31bf5099b..e61f8749e 100644 --- a/python/tests/test_dataframe.py +++ b/python/tests/test_dataframe.py @@ -731,9 +731,7 @@ def test_optimized_logical_plan(aggregate_df): def test_execution_plan(aggregate_df): plan = aggregate_df.execution_plan() - expected = ( - "AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[sum(test.c2)]\n" # noqa: E501 - ) + expected = "AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[sum(test.c2)]\n" # noqa: E501 assert expected == plan.display() @@ -1128,6 +1126,16 @@ def test_write_compressed_parquet_default_compression_level(df, tmp_path, compre df.write_parquet(str(path), compression=compression) +# lzo is not a valid Compression yet +# https://github.com/apache/arrow-rs/issues/6970 +# Test write_parquet with lzo compression, should raise an error +def test_write_compressed_parquet_lzo(df, tmp_path): + path = tmp_path / "test.parquet" + + with pytest.raises(ValueError, match="lzo is not a valid Compression"): + df.write_parquet(str(path), compression="lzo") + + def test_dataframe_export(df) -> None: # Guarantees that we have the canonical implementation # reading our dataframe export From 3c7b68a0a30b7e16a2b8999c7e475c0aecc9510c Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Mon, 13 Jan 2025 15:27:32 +0800 Subject: [PATCH 5/7] fix: remove LZO compression option and related test cases --- python/datafusion/dataframe.py | 2 +- python/tests/test_dataframe.py | 14 +++----------- 2 files changed, 4 insertions(+), 12 deletions(-) diff --git a/python/datafusion/dataframe.py b/python/datafusion/dataframe.py index 60ce3ef05..7d4cd0150 100644 --- a/python/datafusion/dataframe.py +++ b/python/datafusion/dataframe.py @@ -699,10 +699,10 @@ def write_parquet( - "snappy": Snappy compression. - "gzip": Gzip compression. - "brotli": Brotli compression. - - "lzo": LZO compression. - "lz4": LZ4 compression. - "lz4_raw": LZ4_RAW compression. - "zstd": Zstandard compression. + Note: LZO is not yet implemented in arrow-rs and is therefore excluded. compression_level: Compression level to use. For ZSTD, the recommended range is 1 to 22, with the default being 4. Higher levels provide better compression but slower speed. diff --git a/python/tests/test_dataframe.py b/python/tests/test_dataframe.py index e61f8749e..31bf5099b 100644 --- a/python/tests/test_dataframe.py +++ b/python/tests/test_dataframe.py @@ -731,7 +731,9 @@ def test_optimized_logical_plan(aggregate_df): def test_execution_plan(aggregate_df): plan = aggregate_df.execution_plan() - expected = "AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[sum(test.c2)]\n" # noqa: E501 + expected = ( + "AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[sum(test.c2)]\n" # noqa: E501 + ) assert expected == plan.display() @@ -1126,16 +1128,6 @@ def test_write_compressed_parquet_default_compression_level(df, tmp_path, compre df.write_parquet(str(path), compression=compression) -# lzo is not a valid Compression yet -# https://github.com/apache/arrow-rs/issues/6970 -# Test write_parquet with lzo compression, should raise an error -def test_write_compressed_parquet_lzo(df, tmp_path): - path = tmp_path / "test.parquet" - - with pytest.raises(ValueError, match="lzo is not a valid Compression"): - df.write_parquet(str(path), compression="lzo") - - def test_dataframe_export(df) -> None: # Guarantees that we have the canonical implementation # reading our dataframe export From af67813622f847ffd28d73c08a3d0eae8ccc55bf Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 14 Jan 2025 11:01:11 +0800 Subject: [PATCH 6/7] ruff autoformat --- python/tests/test_functions.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/python/tests/test_functions.py b/python/tests/test_functions.py index 01c6c9cef..add170c17 100644 --- a/python/tests/test_functions.py +++ b/python/tests/test_functions.py @@ -790,9 +790,9 @@ def test_hash_functions(df): ) assert result.column(2) == pa.array( [ - b("185F8DB32271FE25F561A6FC938B2E26" "4306EC304EDA518007D1764826381969"), - b("78AE647DC5544D227130A0682A51E30B" "C7777FBB6D8A8F17007463A3ECD1D524"), - b("BB7208BC9B5D7C04F1236A82A0093A5E" "33F40423D5BA8D4266F7092C3BA43B62"), + b("185F8DB32271FE25F561A6FC938B2E264306EC304EDA518007D1764826381969"), + b("78AE647DC5544D227130A0682A51E30BC7777FBB6D8A8F17007463A3ECD1D524"), + b("BB7208BC9B5D7C04F1236A82A0093A5E33F40423D5BA8D4266F7092C3BA43B62"), ] ) assert result.column(3) == pa.array( @@ -838,16 +838,16 @@ def test_hash_functions(df): ) assert result.column(5) == pa.array( [ - b("F73A5FBF881F89B814871F46E26AD3FA" "37CB2921C5E8561618639015B3CCBB71"), - b("B792A0383FB9E7A189EC150686579532" "854E44B71AC394831DAED169BA85CCC5"), - b("27988A0E51812297C77A433F63523334" "6AEE29A829DCF4F46E0F58F402C6CFCB"), + b("F73A5FBF881F89B814871F46E26AD3FA37CB2921C5E8561618639015B3CCBB71"), + b("B792A0383FB9E7A189EC150686579532854E44B71AC394831DAED169BA85CCC5"), + b("27988A0E51812297C77A433F635233346AEE29A829DCF4F46E0F58F402C6CFCB"), ] ) assert result.column(6) == pa.array( [ - b("FBC2B0516EE8744D293B980779178A35" "08850FDCFE965985782C39601B65794F"), - b("BF73D18575A736E4037D45F9E316085B" "86C19BE6363DE6AA789E13DEAACC1C4E"), - b("C8D11B9F7237E4034ADBCD2005735F9B" "C4C597C75AD89F4492BEC8F77D15F7EB"), + b("FBC2B0516EE8744D293B980779178A3508850FDCFE965985782C39601B65794F"), + b("BF73D18575A736E4037D45F9E316085B86C19BE6363DE6AA789E13DEAACC1C4E"), + b("C8D11B9F7237E4034ADBCD2005735F9BC4C597C75AD89F4492BEC8F77D15F7EB"), ] ) assert result.column(7) == result.column(1) # SHA-224 From 1437ea020e963d7dbe342b0e548ab93ba939f8c8 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 14 Jan 2025 11:02:28 +0800 Subject: [PATCH 7/7] fix: remove TODO comment regarding LZO compression implementation --- python/datafusion/dataframe.py | 1 - python/tests/test_dataframe.py | 1 - 2 files changed, 2 deletions(-) diff --git a/python/datafusion/dataframe.py b/python/datafusion/dataframe.py index 7d4cd0150..b0c1abdad 100644 --- a/python/datafusion/dataframe.py +++ b/python/datafusion/dataframe.py @@ -57,7 +57,6 @@ class Compression(Enum): GZIP = "gzip" BROTLI = "brotli" LZ4 = "lz4" - # TODO # lzo is not implemented yet # https://github.com/apache/arrow-rs/issues/6970 # LZO = "lzo" diff --git a/python/tests/test_dataframe.py b/python/tests/test_dataframe.py index 31bf5099b..a1a871e9a 100644 --- a/python/tests/test_dataframe.py +++ b/python/tests/test_dataframe.py @@ -1115,7 +1115,6 @@ def test_write_compressed_parquet_invalid_compression(df, tmp_path, compression) df.write_parquet(str(path), compression=compression) -# TODO # not testing lzo because it it not implemented yet # https://github.com/apache/arrow-rs/issues/6970 @pytest.mark.parametrize("compression", ["zstd", "brotli", "gzip"])