Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions datafusion/functions-aggregate/src/correlation.rs
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if mean1.is_nan() && mean2.is_nan() {
return Ok(ScalarValue::Float64(Some(f64::NAN)));
}

Same concern as in covariance.rs. There's a check in the result computation that treats "both internal averages are NaN" as meaning "the input contained NaN values":

An emptied sliding window also makes the averages NaN, so this check gets falsely triggered and corr returns NaN even for an empty window, where it should return NULL.

Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,10 @@ impl Accumulator for CorrelationAccumulator {
self.stddev2.retract_batch(&values[1..2])?;
Ok(())
}

fn supports_retract_batch(&self) -> bool {
true
}
}

#[derive(Default)]
Expand Down
12 changes: 12 additions & 0 deletions datafusion/functions-aggregate/src/covariance.rs
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let new_count = self.count - 1;
let delta1 = self.mean1 - value1;
let new_mean1 = delta1 / new_count as f64 + self.mean1;
let delta2 = self.mean2 - value2;
let new_mean2 = delta2 / new_count as f64 + self.mean2;

When the window is holding a single row and that row leaves, the count drops to 0 and the division produces NaN. The internal running values then stay NaN forever, so every later window result silently comes out as NaN instead of the right number.

This is reachable with NULL gaps, e.g. a 2-row sliding window over 10.0, NULL, NULL, 30.0, 40.0, 50.0 — once the window slides onto the NULL section it empties, and all results after that are NaN.

My suggestion would be when the count is about to reach 0, reset the state back to its initial values (count 0, running values 0.0) and skip the division.

Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,14 @@ impl Accumulator for CovarianceAccumulator {
_ => continue,
};

if self.count <= 1 {
self.count = 0;
self.mean1 = 0.0;
self.mean2 = 0.0;
self.algo_const = 0.0;
continue;
}

let new_count = self.count - 1;
let delta1 = self.mean1 - value1;
let new_mean1 = delta1 / new_count as f64 + self.mean1;
Expand Down Expand Up @@ -373,4 +381,8 @@ impl Accumulator for CovarianceAccumulator {
fn size(&self) -> usize {
size_of_val(self)
}

fn supports_retract_batch(&self) -> bool {
true
}
}
7 changes: 7 additions & 0 deletions datafusion/functions-aggregate/src/variance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,13 @@ impl Accumulator for VarianceAccumulator {
fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
let arr = as_float64_array(&values[0])?;
for value in arr.iter().flatten() {
if self.count <= 1 {
self.count = 0;
self.mean = 0.0;
self.m2 = 0.0;
continue;
}

let new_count = self.count - 1;
let delta1 = self.mean - value;
let new_mean = delta1 / new_count as f64 + self.mean;
Expand Down
136 changes: 136 additions & 0 deletions datafusion/sqllogictest/test_files/window.slt
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both tests pass the same column twice (covar_pop(column2, column2)). Covariance of a column with itself is just variance, and correlation of a column with itself is always 1 so the "two different columns" math is never really tested. Please use two distinct columns.

Across both tests, a row actually leaves the window only once (first test, last row). In the second test the window only ever grows, so nothing is ever removed.

There are no NULLs and no case where the window becomes empty in the middle of the data

Original file line number Diff line number Diff line change
Expand Up @@ -6604,6 +6604,142 @@ ORDER BY i;
3 1
4 NULL

# Covariance/correlation sliding-window regression test. Verifies correct
# results across row removals and a NULL-gap empty-frame transition.
query IRRR
SELECT
column1,
covar_pop(column2, column3) OVER (
ORDER BY column1
ROWS BETWEEN 1 PRECEDING AND CURRENT ROW
),
covar_samp(column2, column3) OVER (
ORDER BY column1
ROWS BETWEEN 1 PRECEDING AND CURRENT ROW
),
corr(column2, column3) OVER (
ORDER BY column1
ROWS BETWEEN 1 PRECEDING AND CURRENT ROW
)
FROM (
VALUES
(1, 10.0, 5.0),
(2, NULL, NULL),
(3, NULL, NULL),
(4, 30.0, 10.0),
(5, 40.0, 20.0),
(6, 50.0, 10.0)
);
----
1 0 NULL NULL
2 0 NULL NULL
3 NULL NULL NULL
4 0 NULL NULL
5 25 50 1
6 -25 -50 -1

# Multi-row covariance/correlation sliding-window regression test. Verifies
# correct accumulation when valid rows enter the frame after a reset.
query IRRR
SELECT
column1,
covar_pop(column2, column3) OVER (
ORDER BY column1
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
),
covar_samp(column2, column3) OVER (
ORDER BY column1
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
),
corr(column2, column3) OVER (
ORDER BY column1
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
)
FROM (
VALUES
(1, 10.0, 5.0),
(2, NULL, NULL),
(3, NULL, NULL),
(4, 30.0, 10.0),
(5, 40.0, 20.0),
(6, 50.0, 10.0)
);
----
1 0 NULL NULL
2 0 NULL NULL
3 0 NULL NULL
4 0 NULL NULL
5 25 50 1
6 0 0 0

# Covariance/correlation sliding-window regression test. Rows with NULL in
# either input column must not contribute to the aggregate state.
query IRRR
SELECT
column1,
covar_pop(column2, column3) OVER (
ORDER BY column1
ROWS BETWEEN 3 PRECEDING AND CURRENT ROW
),
covar_samp(column2, column3) OVER (
ORDER BY column1
ROWS BETWEEN 3 PRECEDING AND CURRENT ROW
),
corr(column2, column3) OVER (
ORDER BY column1
ROWS BETWEEN 3 PRECEDING AND CURRENT ROW
)
FROM (
VALUES
(1, 10.0, 5.0),
(2, 20.0, NULL),
(3, NULL, 15.0),
(4, 30.0, 10.0),
(5, 40.0, 20.0)
);
----
1 0 NULL NULL
2 0 NULL NULL
3 0 NULL NULL
4 25 50 1
5 25 50 1

# Variance/stddev sliding-window regression test. Verifies that retracting
# the last valid row resets the aggregate state.
query IRRRR
SELECT
column1,
var_pop(column2) OVER (
ORDER BY column1
ROWS BETWEEN 1 PRECEDING AND CURRENT ROW
),
var_samp(column2) OVER (
ORDER BY column1
ROWS BETWEEN 1 PRECEDING AND CURRENT ROW
),
stddev_pop(column2) OVER (
ORDER BY column1
ROWS BETWEEN 1 PRECEDING AND CURRENT ROW
),
stddev_samp(column2) OVER (
ORDER BY column1
ROWS BETWEEN 1 PRECEDING AND CURRENT ROW
)
FROM (
VALUES
(1, 10.0),
(2, NULL),
(3, NULL),
(4, 30.0),
(5, 40.0)
);
----
1 0 NULL 0 NULL
2 0 NULL 0 NULL
3 NULL NULL NULL NULL
4 0 NULL 0 NULL
5 25 50 5 7.071067811865

# Decimal variant — the integer-division path would otherwise panic on an
# empty frame.
query IR
Expand Down
Loading