feat: support LEAD and LAG window functions with IGNORE NULLS#3876
feat: support LEAD and LAG window functions with IGNORE NULLS#3876viirya wants to merge 7 commits intoapache:mainfrom
Conversation
- Add ignore_nulls field to WindowExpr proto message - Serialize Lag window function with its ignoreNulls flag in CometWindowExec - Extend find_df_window_function to also look up WindowUDFs (not just AggregateUDFs) - Pass ignore_nulls to DataFusion's create_window_expr - Enable previously-ignored LAG tests Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
ORDER BY b alone has ties, causing Spark and DataFusion to produce different but both-valid row orderings. Add c as a secondary sort key so tie-breaking is deterministic and results are comparable. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
CometWindowExec is marked Incompatible by default. Add allowIncompatible=true config so LAG tests actually run via Comet and checkSparkAnswerAndOperator can verify native execution. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
LAG/LEAD (FrameLessOffsetWindowFunction) support arbitrary partition and order specs. The existing validatePartitionAndSortSpecsForWindowFunc check (which requires partition columns == order columns) is only needed for aggregate window functions, not offset functions. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
| spark.read.parquet(dir.toString).createOrReplaceTempView("window_test") | ||
| val df = sql(""" | ||
| SELECT a, b, c, | ||
| LAG(c) OVER (PARTITION BY a ORDER BY b, c) as lag_c |
There was a problem hiding this comment.
Oops, missed it. Added another test with IGNORE NULLs now.
| } else { | ||
| (None, exprToProto(windowExpr.windowFunction, output)) | ||
| windowExpr.windowFunction match { | ||
| case lag: Lag => |
There was a problem hiding this comment.
Should Lead also be handled the same way?
There was a problem hiding this comment.
Yes. Since you asked, I just added Lead in this PR too.
| } | ||
|
|
||
| if (op.partitionSpec.nonEmpty && op.orderSpec.nonEmpty && | ||
| val hasOnlyOffsetFunctions = winExprs.nonEmpty && |
There was a problem hiding this comment.
Could you add a comment explaining the logic here?
- Handle Lead case in windowExprToProto alongside Lag - Add comment explaining hasOnlyOffsetFunctions guard - Add test for LAG IGNORE NULLS - Enable LEAD tests with allowIncompatible config and deterministic ORDER BY Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
| FROM window_test | ||
| """) | ||
| checkSparkAnswerAndOperator(df) | ||
| test("window: LAG with offset 2 and default value") { |
There was a problem hiding this comment.
oh nice, the PR also fixes this test although it is not related to IGNORE NULLS
| } | ||
|
|
||
| if (op.partitionSpec.nonEmpty && op.orderSpec.nonEmpty && | ||
| // Offset window functions (LAG, LEAD) support arbitrary partition and order specs, so skip |
There was a problem hiding this comment.
wondering if FIRST_VALUE, LAST_VALUE, NTH are also offset window function, cause they also access the data within frame by some offset (FiRST_VALUE by 1, etc) ?
There was a problem hiding this comment.
No, in Spark
Lag / Lead → inherit FrameLessOffsetWindowFunction
NthValue → inherit AggregateWindowFunction with OffsetWindowFunction
First / Last → inherit DeclarativeAggregate
Only FrameLessOffsetWindowFunction doesn't require frame, currently in Spark only Lag / Lead are FrameLessOffsetWindowFunction.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
Thank you @comphead |
Which issue does this PR close?
Closes #.
Rationale for this change
Support Lead and Lag window function with its ignoreNulls flag.
What changes are included in this PR?
How are these changes tested?