Skip to content

Adds intermediate dataType to schema and use it for ingestion aggregation#16868

Open
noob-se7en wants to merge 13 commits into
apache:masterfrom
noob-se7en:fix_aggregation
Open

Adds intermediate dataType to schema and use it for ingestion aggregation#16868
noob-se7en wants to merge 13 commits into
apache:masterfrom
noob-se7en:fix_aggregation

Conversation

@noob-se7en

@noob-se7en noob-se7en commented Sep 22, 2025

Copy link
Copy Markdown
Contributor

Problem
Related to #16317 . TLDR: When the ingestion aggregation/tranformation happens on source column not present in schema, There can be exceptions thrown which occur from data type conversions since there is no info of those source column as they are not present in the schema.
Example: Ingestion aggregation: sum(price) , Here if price column is not part of schema, Pinot assumes it to be as Number but it can be String in source.

PR
Add new intermediate field type like below to schema and use this info in ingestion aggregation.

  "intermediateFieldSpecs": [
    {
      "name": "price",
      "dataType": "STRING"
    }
  ],

Pending
Adding more tests. Opening this PR to get early reviews.

@codecov-commenter

codecov-commenter commented Sep 23, 2025

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 52.66667% with 71 lines in your changes missing coverage. Please review.
✅ Project coverage is 63.23%. Comparing base (4ef8e3e) to head (c926962).
⚠️ Report is 332 commits behind head on master.

Files with missing lines Patch % Lines
...t/local/aggregator/MinMaxRangeValueAggregator.java 19.04% 12 Missing and 5 partials ⚠️
...t/segment/local/aggregator/AvgValueAggregator.java 15.78% 11 Missing and 5 partials ⚠️
...rc/main/java/org/apache/pinot/spi/data/Schema.java 18.75% 12 Missing and 1 partial ⚠️
...local/indexsegment/mutable/MutableSegmentImpl.java 75.00% 9 Missing and 3 partials ⚠️
...g/apache/pinot/spi/data/IntermediateFieldSpec.java 0.00% 3 Missing ⚠️
...t/segment/local/aggregator/MaxValueAggregator.java 75.00% 1 Missing and 1 partial ⚠️
...t/segment/local/aggregator/MinValueAggregator.java 75.00% 1 Missing and 1 partial ⚠️
.../local/aggregator/SumPrecisionValueAggregator.java 80.00% 1 Missing and 1 partial ⚠️
...t/segment/local/aggregator/SumValueAggregator.java 75.00% 1 Missing and 1 partial ⚠️
...segment/local/aggregator/ValueAggregatorUtils.java 66.66% 1 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #16868      +/-   ##
============================================
- Coverage     63.25%   63.23%   -0.02%     
  Complexity     1499     1499              
============================================
  Files          3174     3176       +2     
  Lines        190323   190430     +107     
  Branches      29080    29096      +16     
============================================
+ Hits         120381   120422      +41     
- Misses        60606    60654      +48     
- Partials       9336     9354      +18     
Flag Coverage Δ
custom-integration1 100.00% <ø> (ø)
integration 100.00% <ø> (ø)
integration1 100.00% <ø> (ø)
integration2 0.00% <ø> (ø)
java-11 63.18% <52.66%> (-0.03%) ⬇️
java-21 63.21% <52.66%> (-0.01%) ⬇️
temurin 63.23% <52.66%> (-0.02%) ⬇️
unittests 63.23% <52.66%> (-0.02%) ⬇️
unittests1 55.58% <25.33%> (-0.05%) ⬇️
unittests2 34.10% <50.66%> (+0.03%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@noob-se7en

Copy link
Copy Markdown
Contributor Author

@Jackie-Jiang added intermediate field spec in schema:

Like:

  "intermediateFieldSpecs": [
    {
      "name": "random",
      "dataType": "STRING"
    }
  ],

@noob-se7en noob-se7en changed the title Adds source dataType for aggregation Adds intermediate dataType to schema and use it in ingestion aggregation Oct 30, 2025
@noob-se7en noob-se7en marked this pull request as ready for review October 30, 2025 06:18
@noob-se7en noob-se7en changed the title Adds intermediate dataType to schema and use it in ingestion aggregation Adds intermediate dataType to schema and use it for ingestion aggregation Oct 30, 2025
@9aman

9aman commented Nov 3, 2025

Copy link
Copy Markdown
Contributor

@noob-se7en

  1. Will it impact segment reload (due to schema change ) etc?
    • It's impact on existing segments: Given that these are transformation at the time of ingestion, were we failing segment build for such scenarios (referring to the issues mentioned above) ?
    • It's impact on pauseless ingestion i.e. scenarios of continued ingestion without segment build. Will we rely on DR here ?
  2. How are we handling transformations for such scenarios ? Is the expectation that the column being transformed is part of the schema.

@9aman

9aman commented Nov 3, 2025

Copy link
Copy Markdown
Contributor

@noob-se7en

  1. Will it impact segment reload (due to schema change ) etc?

    • It's impact on existing segments: Given that these are transformation at the time of ingestion, were we failing segment build for such scenarios (referring to the issues mentioned above) ?
    • It's impact on pauseless ingestion i.e. scenarios of continued ingestion without segment build. Will we rely on DR here ?
  2. How are we handling transformations for such scenarios ? Is the expectation that the column being transformed is part of the schema.

I guess for transformation the ingestion itself, at row level, will throw exceptions and we won't wait till the segment build ?

@noob-se7en

Copy link
Copy Markdown
Contributor Author

@noob-se7en

  1. Will it impact segment reload (due to schema change ) etc?

    • It's impact on existing segments: Given that these are transformation at the time of ingestion, were we failing segment build for such scenarios (referring to the issues mentioned above) ?
    • It's impact on pauseless ingestion i.e. scenarios of continued ingestion without segment build. Will we rely on DR here ?
  2. How are we handling transformations for such scenarios ? Is the expectation that the column being transformed is part of the schema.

I don't understand the questions fully. Code changes are only in MutableSegmentImpl.
It should not impact reload of segments right?

This PR is only meant for supporting realtime ingestion aggregation (which happens during indexing of mutable segments)

@Jackie-Jiang Jackie-Jiang added enhancement Improvement to existing functionality ingestion Related to data ingestion pipeline labels Nov 19, 2025

@Jackie-Jiang Jackie-Jiang left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well done.

Given the field type name cannot be changed in the future, do you see intermediate a common field type name in other DBs?

@@ -49,11 +49,28 @@ public interface ValueAggregator<R, A> {
A getInitialAggregatedValue(@Nullable R rawValue);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we can deprecate this method as long as A applyRawValue(A value, R rawValue);

* Returns the initial aggregated value with the optional source data type provided for correct raw value handling.
* Default implementation delegates to {@link #getInitialAggregatedValue(Object)} for backward compatibility.
*/
default A getInitialAggregatedValue(@Nullable R rawValue, @Nullable DataType sourceDataType) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Star-tree builder can also be switched to use the new set of methods

@xiangfu0 xiangfu0 added the schema Related to table schema definitions or changes label Mar 20, 2026
@Jackie-Jiang

Copy link
Copy Markdown
Contributor

Taking a different approach in #18816, where user can add optional data type conversion for any source fields.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement Improvement to existing functionality ingestion Related to data ingestion pipeline schema Related to table schema definitions or changes

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants