From ded4c4d2ff8b19c243c87f4f3f6cc65fd7c0fe74 Mon Sep 17 00:00:00 2001 From: Yicong-Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Thu, 19 Feb 2026 11:18:43 -0800 Subject: [PATCH 1/5] fix: preserve row count when converting pandas DataFrame with 0 columns to Arrow --- python/pyspark/sql/pandas/conversion.py | 52 +++++++++++++++-------- python/pyspark/sql/tests/test_creation.py | 32 ++++++++++++++ 2 files changed, 67 insertions(+), 17 deletions(-) diff --git a/python/pyspark/sql/pandas/conversion.py b/python/pyspark/sql/pandas/conversion.py index cdcfcc872bbe2..24f9f1d56805f 100644 --- a/python/pyspark/sql/pandas/conversion.py +++ b/python/pyspark/sql/pandas/conversion.py @@ -861,6 +861,12 @@ def convert_timestamp(value: Any) -> Any: ser.dt.to_pytimedelta(), index=ser.index, dtype="object", name=ser.name ) + # Handle the 0-column case separately to preserve row count + if len(pdf.columns) == 0: + from pyspark.sql import Row + + return [Row()] * len(pdf) + # Convert pandas.DataFrame to list of numpy records np_records = pdf.set_axis( [f"col_{i}" for i in range(len(pdf.columns))], axis="columns" @@ -998,16 +1004,26 @@ def _create_from_pandas_with_arrow( step = step if step > 0 else len(pdf) pdf_slices = (pdf.iloc[start : start + step] for start in range(0, len(pdf), step)) - # Create Arrow batches directly using the standalone function - arrow_batches = [ - create_arrow_batch_from_pandas( - [(c, t) for (_, c), t in zip(pdf_slice.items(), spark_types)], - timezone=timezone, - safecheck=safecheck, - prefers_large_types=prefers_large_var_types, - ) - for pdf_slice in pdf_slices - ] + # Handle the 0-column case separately to preserve row count + if len(pdf.columns) == 0: + # Use struct array to preserve row count for 0-column DataFrames + arrow_batches = [ + pa.RecordBatch.from_struct_array( + pa.array([{}] * len(pdf_slice), type=pa.struct([])) + ) + for pdf_slice in pdf_slices + ] + else: + # Create Arrow batches directly using the standalone function + arrow_batches = [ + create_arrow_batch_from_pandas( + [(c, t) for (_, c), t in zip(pdf_slice.items(), spark_types)], + timezone=timezone, + safecheck=safecheck, + prefers_large_types=prefers_large_var_types, + ) + for pdf_slice in pdf_slices + ] jsparkSession = self._jsparkSession @@ -1074,14 +1090,16 @@ def _create_from_arrow_table( if not isinstance(schema, StructType): schema = from_arrow_schema(table.schema, prefer_timestamp_ntz=prefer_timestamp_ntz) - table = _check_arrow_table_timestamps_localize(table, schema, True, timezone).cast( - to_arrow_schema( - schema, - error_on_duplicated_field_names_in_struct=True, - timezone="UTC", - prefers_large_types=prefers_large_var_types, + # Skip cast for 0-column tables as it loses row count + if len(schema.fields) > 0: + table = _check_arrow_table_timestamps_localize(table, schema, True, timezone).cast( + to_arrow_schema( + schema, + error_on_duplicated_field_names_in_struct=True, + timezone="UTC", + prefers_large_types=prefers_large_var_types, + ) ) - ) # Chunk the Arrow Table into RecordBatches chunk_size = arrow_batch_size diff --git a/python/pyspark/sql/tests/test_creation.py b/python/pyspark/sql/tests/test_creation.py index 906dab9692011..96abd8d1ffba1 100644 --- a/python/pyspark/sql/tests/test_creation.py +++ b/python/pyspark/sql/tests/test_creation.py @@ -261,6 +261,38 @@ def test_empty_schema(self): sdf = self.spark.createDataFrame(data, schema) assertDataFrameEqual(sdf, data) + @unittest.skipIf( + not have_pandas or not have_pyarrow, + pandas_requirement_message or pyarrow_requirement_message, + ) + def test_from_pandas_dataframe_with_zero_columns(self): + """SPARK-55600: Test that row count is preserved when creating DataFrame from + pandas with 0 columns but with explicit schema in classic Spark.""" + import pandas as pd + + # Create a pandas DataFrame with 5 rows but 0 columns + pdf = pd.DataFrame(index=range(5)) + schema = StructType([]) + + # Test with Arrow optimization enabled + with self.sql_conf( + { + "spark.sql.execution.arrow.pyspark.enabled": True, + "spark.sql.execution.arrow.pyspark.fallback.enabled": False, + } + ): + df = self.spark.createDataFrame(pdf, schema=schema) + self.assertEqual(df.schema, schema) + self.assertEqual(df.count(), 5) + self.assertEqual(len(df.collect()), 5) + + # Test with Arrow optimization disabled + with self.sql_conf({"spark.sql.execution.arrow.pyspark.enabled": False}): + df = self.spark.createDataFrame(pdf, schema=schema) + self.assertEqual(df.schema, schema) + self.assertEqual(df.count(), 5) + self.assertEqual(len(df.collect()), 5) + class DataFrameCreationTests( DataFrameCreationTestsMixin, From 3ae38fcd2b93963094ecd1608570a5f52e7e991e Mon Sep 17 00:00:00 2001 From: Yicong-Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Thu, 19 Feb 2026 11:23:00 -0800 Subject: [PATCH 2/5] trigger ci From 66a89db62a0c4ca2e85a23f99dc09b9dfb3f8d50 Mon Sep 17 00:00:00 2001 From: Yicong-Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Fri, 20 Feb 2026 11:16:57 -0800 Subject: [PATCH 3/5] retrigger ci From af89044c9c06e0e35fdc50dec937d6d38974e482 Mon Sep 17 00:00:00 2001 From: Yicong-Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Mon, 23 Feb 2026 13:58:26 -0800 Subject: [PATCH 4/5] refactor: use pa.RecordBatch.from_pandas for 0-column arrow batch creation --- python/pyspark/sql/pandas/conversion.py | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/python/pyspark/sql/pandas/conversion.py b/python/pyspark/sql/pandas/conversion.py index 24f9f1d56805f..5c4b6d14b24db 100644 --- a/python/pyspark/sql/pandas/conversion.py +++ b/python/pyspark/sql/pandas/conversion.py @@ -1004,15 +1004,10 @@ def _create_from_pandas_with_arrow( step = step if step > 0 else len(pdf) pdf_slices = (pdf.iloc[start : start + step] for start in range(0, len(pdf), step)) - # Handle the 0-column case separately to preserve row count + # Handle the 0-column case separately to preserve row count. + # pa.RecordBatch.from_pandas preserves num_rows via pandas index metadata. if len(pdf.columns) == 0: - # Use struct array to preserve row count for 0-column DataFrames - arrow_batches = [ - pa.RecordBatch.from_struct_array( - pa.array([{}] * len(pdf_slice), type=pa.struct([])) - ) - for pdf_slice in pdf_slices - ] + arrow_batches = [pa.RecordBatch.from_pandas(pdf_slice) for pdf_slice in pdf_slices] else: # Create Arrow batches directly using the standalone function arrow_batches = [ From 1e31b772e4f5d183ab2bab1b9cda4426982f2113 Mon Sep 17 00:00:00 2001 From: Yicong-Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Mon, 23 Feb 2026 16:50:06 -0800 Subject: [PATCH 5/5] ci: trigger CI re-run