[FIP-37] Add bitmap aggregate functions: rb_build_agg, rb_or_agg, rb_and_agg and register via FlussCatalog#3398
Open
Prajwal-banakar wants to merge 2 commits into
Open
Conversation
Contributor
Author
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Purpose
Linked issue: Part of #3289
This PR adds the three Phase 1 bitmap aggregate functions and registers them as built-in catalog functions in FlussCatalog, building on the infrastructure from PR #3319
After
USE CATALOG fluss_catalog, users can call these functions directly in Flink SQL without anyCREATE TEMPORARY FUNCTIONstatement.Brief change log
New files in
fluss-flink/fluss-flink-common:RbBuildAggFunction.java—rb_build_agg(INT) -> BYTES. Aggregates a stream of 32-bit integers into a serialized RoaringBitmap. ExtendsAbstractRbAggFunctionfrom PR [FIP-37] Add bitmap infrastructure: BitmapUtils, RoaringBitmapSerializer, AbstractRbAggFunction #3319.RbOrAggFunction.java—rb_or_agg(BYTES) -> BYTES. Unions multiple serialized RoaringBitmaps via bitwise OR across rows. ExtendsAbstractRbAggFunction.RbAndAggFunction.java—rb_and_agg(BYTES) -> BYTES. Intersects multiple serialized RoaringBitmaps via bitwise AND across rows. Unlike the OR/build variants, AND requires a customAccumulatorPOJO with aninitializedflag because an empty bitmap cannot serve as a "not yet initialized" sentinel — once the AND result becomes empty, it must remain empty. The innerAccumulatorSerializerfollows the same pattern asRoaringBitmapSerializer.Modified file:
FlinkCatalog.javaOverrides
listFunctions(),functionExists(), andgetFunction()to serve the three aggregate functions as built-in catalog functions via a staticBUILTIN_BITMAP_FUNCTIONSmap usingCatalogFunctionImplfromflink-table-api-java.Test file:
RbAggFunctionsTest.javaUnit tests covering:
rb_build_agg: basic accumulation, null handling, duplicate deduplication, merge, retract throwsrb_or_agg: union correctness, null/empty input handling, mergerb_and_agg: intersection correctness, null inputs ignored, empty intersection returns null, merge, reset, Accumulator serializer round-tripCatalog registration verified in
FlinkCatalogTest.testBitmapFunctionsRegisteredNote:
AccumulatorSerializerinRbAndAggFunctionusesTypeSerializerSingleton, which is deprecated in Flink 1.20 but is the same approach used inRoaringBitmapSerializer(PR #3319, reviewed and approved). This will be revisited when upgrading to Flink 2.x.The scalar functions (rb_cardinality, rb_build, rb_contains, rb_to_array, rb_or, rb_and) will follow in the next PR.
Tests
Unit tests:
RbAggFunctionsTest— 14 tests, all passing.Catalog integration:
FlinkCatalogTest.testBitmapFunctionsRegisteredandtestViewsAndFunctions— verified against a live Fluss cluster.Verified locally:
./mvnw spotless:apply— 0 violations./mvnw test -pl fluss-flink/fluss-flink-common -Dtest="BitmapUtilsTest,RbAggFunctionsTest,RoaringBitmapSerializerTest,AbstractRbAggFunctionITCase,FlinkCatalogTest"— BUILD SUCCESSFull test run: Tests run: 68, Failures: 0, Errors: 0, Skipped: 0
API and Format
This change does not affect any storage format or wire protocol. The functions operate on existing
BYTEScolumns using the standard RoaringBitmap serialization format already used by the server-sideFieldRoaringBitmap32Agg.Documentation
No user-facing documentation yet. Documentation will be added after the scalar functions are implemented and the full Phase 1 function set is available.