diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java index 8781cd6746..877cda29ac 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java @@ -48,11 +48,13 @@ import org.apache.flink.table.catalog.CatalogDatabase; import org.apache.flink.table.catalog.CatalogDatabaseImpl; import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.catalog.CatalogFunctionImpl; import org.apache.flink.table.catalog.CatalogMaterializedTable; import org.apache.flink.table.catalog.CatalogPartition; import org.apache.flink.table.catalog.CatalogPartitionSpec; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.CatalogView; +import org.apache.flink.table.catalog.FunctionLanguage; import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.ResolvedCatalogBaseTable; import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; @@ -134,6 +136,18 @@ public class FlinkCatalog extends AbstractCatalog { protected Connection connection; protected Admin admin; + private static final Map BUILTIN_BITMAP_FUNCTIONS; + + static { + BUILTIN_BITMAP_FUNCTIONS = new HashMap<>(); + BUILTIN_BITMAP_FUNCTIONS.put( + "rb_build_agg", "org.apache.fluss.flink.functions.bitmap.RbBuildAggFunction"); + BUILTIN_BITMAP_FUNCTIONS.put( + "rb_or_agg", "org.apache.fluss.flink.functions.bitmap.RbOrAggFunction"); + BUILTIN_BITMAP_FUNCTIONS.put( + "rb_and_agg", "org.apache.fluss.flink.functions.bitmap.RbAndAggFunction"); + } + public FlinkCatalog( String name, String defaultDatabase, @@ -746,19 +760,24 @@ public void alterPartition( } @Override - public List listFunctions(String s) throws DatabaseNotExistException, CatalogException { - return Collections.emptyList(); + public List listFunctions(String dbName) + throws DatabaseNotExistException, CatalogException { + return new ArrayList<>(BUILTIN_BITMAP_FUNCTIONS.keySet()); } @Override - public CatalogFunction getFunction(ObjectPath functionPath) - throws FunctionNotExistException, CatalogException { - throw new FunctionNotExistException(getName(), functionPath); + public boolean functionExists(ObjectPath objectPath) throws CatalogException { + return BUILTIN_BITMAP_FUNCTIONS.containsKey(objectPath.getObjectName().toLowerCase()); } @Override - public boolean functionExists(ObjectPath objectPath) throws CatalogException { - return false; + public CatalogFunction getFunction(ObjectPath functionPath) + throws FunctionNotExistException, CatalogException { + String className = BUILTIN_BITMAP_FUNCTIONS.get(functionPath.getObjectName().toLowerCase()); + if (className == null) { + throw new FunctionNotExistException(getName(), functionPath); + } + return new CatalogFunctionImpl(className, FunctionLanguage.JAVA); } @Override diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/RbAndAggFunction.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/RbAndAggFunction.java new file mode 100644 index 0000000000..dd19658b8d --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/RbAndAggFunction.java @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.functions.bitmap; + +import org.apache.fluss.exception.FlussRuntimeException; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.annotation.FunctionHint; +import org.apache.flink.table.functions.AggregateFunction; +import org.roaringbitmap.RoaringBitmap; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.ThreadSafe; + +import java.io.IOException; +import java.util.Objects; + +/** + * {@code rb_and_agg(bitmap BYTES) -> BYTES} + * + *

Intersects multiple serialized {@link RoaringBitmap} values using bitwise AND across rows. + * + *

Unlike {@link RbOrAggFunction}, this function requires a custom {@link Accumulator} that + * carries an {@code initialized} flag alongside the bitmap. This is necessary because an empty + * bitmap cannot be used as a sentinel for "no input received yet": once the AND result becomes the + * empty set, it must stay empty even if further inputs arrive, whereas a truly uninitialized + * accumulator must be seeded with the first input via OR before AND can proceed. + * + *

Note: there is no server-side {@code FieldRoaringBitmapAndAgg} counterpart. This function + * executes entirely in Flink. Users should be aware that combining it with {@code + * table.merge-engine=aggregation} may produce unexpected results during server-side compaction. + * + *

Returns {@code null} when all inputs are null or the input set is empty. + */ +@FunctionHint( + accumulator = @DataTypeHint(value = "RAW", bridgedTo = RbAndAggFunction.Accumulator.class)) +public class RbAndAggFunction extends AggregateFunction { + + // ------------------------------------------------------------------------- + // Accumulator + // ------------------------------------------------------------------------- + + /** Mutable accumulator that tracks initialization state for AND aggregation. */ + public static final class Accumulator { + + /** True after the first non-null input has been accumulated. */ + public boolean initialized = false; + + /** Current AND result; meaningless if {@code initialized} is false. */ + public RoaringBitmap value = new RoaringBitmap(); + } + + // ------------------------------------------------------------------------- + // AggregateFunction implementation + // ------------------------------------------------------------------------- + + @Override + public Accumulator createAccumulator() { + return new Accumulator(); + } + + /** + * Intersects the input bitmap into the accumulator. + * + * @param acc the running accumulator + * @param bitmapBytes serialized RoaringBitmap bytes; null and empty arrays are ignored + */ + public void accumulate(Accumulator acc, @Nullable byte[] bitmapBytes) throws IOException { + if (bitmapBytes == null || bitmapBytes.length == 0) { + return; + } + RoaringBitmap input = BitmapUtils.fromBytes(bitmapBytes); + if (!acc.initialized) { + acc.value.or(input); + acc.initialized = true; + } else { + acc.value.and(input); + } + } + + /** Merges partial accumulators, required for two-phase aggregation in the Flink Table API. */ + public void merge(Accumulator acc, Iterable it) { + for (Accumulator other : it) { + if (!other.initialized) { + continue; + } + if (!acc.initialized) { + acc.value.or(other.value); + acc.initialized = true; + } else { + acc.value.and(other.value); + } + } + } + + public void resetAccumulator(Accumulator acc) { + acc.initialized = false; + acc.value.clear(); + } + + @Override + @Nullable + public byte[] getValue(Accumulator acc) { + if (!acc.initialized || acc.value.isEmpty()) { + return null; + } + try { + return BitmapUtils.toBytes(acc.value); + } catch (IOException e) { + throw new FlussRuntimeException("Failed to serialize rb_and_agg accumulator.", e); + } + } + + @Override + public TypeInformation getAccumulatorType() { + return AccumulatorTypeInfo.INSTANCE; + } + + // ------------------------------------------------------------------------- + // TypeSerializer and TypeInformation for Accumulator + // ------------------------------------------------------------------------- + + /** {@link TypeInformation} for {@link Accumulator}. */ + @ThreadSafe + public static final class AccumulatorTypeInfo extends TypeInformation { + + public static final AccumulatorTypeInfo INSTANCE = new AccumulatorTypeInfo(); + + private static final long serialVersionUID = 1L; + + private AccumulatorTypeInfo() {} + + @Override + public boolean isBasicType() { + return false; + } + + @Override + public boolean isTupleType() { + return false; + } + + @Override + public int getArity() { + return 1; + } + + @Override + public int getTotalFields() { + return 1; + } + + @Override + public Class getTypeClass() { + return Accumulator.class; + } + + @Override + public boolean isKeyType() { + return false; + } + + @Override + public TypeSerializer createSerializer(ExecutionConfig config) { + return AccumulatorSerializer.INSTANCE; + } + + @Override + public String toString() { + return "RbAndAccumulatorTypeInfo"; + } + + @Override + public boolean equals(Object obj) { + return obj instanceof AccumulatorTypeInfo && ((AccumulatorTypeInfo) obj).canEqual(this); + } + + @Override + public int hashCode() { + return Objects.hash(getTypeClass()); + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof AccumulatorTypeInfo; + } + } + + /** {@link TypeSerializer} for {@link Accumulator}. */ + @ThreadSafe + public static final class AccumulatorSerializer extends TypeSerializerSingleton { + + public static final AccumulatorSerializer INSTANCE = new AccumulatorSerializer(); + + private static final long serialVersionUID = 1L; + + private AccumulatorSerializer() {} + + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public Accumulator createInstance() { + return new Accumulator(); + } + + @Override + public Accumulator copy(Accumulator from) { + Accumulator copy = new Accumulator(); + copy.initialized = from.initialized; + copy.value = from.value.clone(); + return copy; + } + + @Override + public Accumulator copy(Accumulator from, Accumulator reuse) { + return copy(from); + } + + @Override + public int getLength() { + return -1; + } + + @Override + public void serialize(Accumulator record, DataOutputView target) throws IOException { + target.writeBoolean(record.initialized); + if (record.initialized) { + byte[] bytes = BitmapUtils.toBytes(record.value); + target.writeInt(bytes.length); + target.write(bytes); + } + } + + @Override + public Accumulator deserialize(DataInputView source) throws IOException { + Accumulator acc = new Accumulator(); + acc.initialized = source.readBoolean(); + if (acc.initialized) { + int size = source.readInt(); + byte[] bytes = new byte[size]; + source.readFully(bytes); + acc.value = BitmapUtils.fromBytes(bytes); + } + return acc; + } + + @Override + public Accumulator deserialize(Accumulator reuse, DataInputView source) throws IOException { + return deserialize(source); + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + boolean initialized = source.readBoolean(); + target.writeBoolean(initialized); + if (initialized) { + int size = source.readInt(); + target.writeInt(size); + byte[] buffer = new byte[size]; + source.readFully(buffer); + target.write(buffer); + } + } + + @Override + public TypeSerializerSnapshot snapshotConfiguration() { + return new AccumulatorSerializerSnapshot(); + } + + /** Snapshot for {@link AccumulatorSerializer}. */ + public static final class AccumulatorSerializerSnapshot + extends SimpleTypeSerializerSnapshot { + + public AccumulatorSerializerSnapshot() { + super(() -> INSTANCE); + } + } + } +} diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/RbBuildAggFunction.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/RbBuildAggFunction.java new file mode 100644 index 0000000000..547290986b --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/RbBuildAggFunction.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.functions.bitmap; + +import org.roaringbitmap.RoaringBitmap; + +import javax.annotation.Nullable; + +/** + * {@code rb_build_agg(value INT) -> BYTES} + * + *

Aggregates a column of 32-bit integer values into a single serialized {@link RoaringBitmap}. + * Each non-null integer is added to the bitmap during accumulation. Duplicate values are silently + * ignored — the bitmap stores each integer at most once. + * + *

Returns {@code null} when the input set is empty. + */ +public class RbBuildAggFunction extends AbstractRbAggFunction { + + /** + * Adds a single integer value to the accumulator bitmap. + * + * @param acc the running bitmap accumulator + * @param value the integer value to add; null values are ignored + */ + public void accumulate(RoaringBitmap acc, @Nullable Integer value) { + if (value == null) { + return; + } + acc.add(value); + } + + /** + * Retraction is not supported for bitmap build — integer addition is not reversible. + * + * @throws UnsupportedOperationException always + */ + public void retract(RoaringBitmap acc, @Nullable Integer value) { + throw new UnsupportedOperationException( + "rb_build_agg does not support retraction. " + + "Use it only on append-only streams."); + } +} diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/RbOrAggFunction.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/RbOrAggFunction.java new file mode 100644 index 0000000000..87b4d98515 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/RbOrAggFunction.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.functions.bitmap; + +import org.roaringbitmap.RoaringBitmap; + +import javax.annotation.Nullable; + +import java.io.IOException; + +/** + * {@code rb_or_agg(bitmap BYTES) -> BYTES} + * + *

Aggregates multiple serialized {@link RoaringBitmap} values using bitwise OR (union). Used in + * roll-up aggregation where per-group bitmaps need to be merged into a coarser granularity. + * + *

Null and empty inputs are ignored. Returns {@code null} when all inputs are null. + */ +public class RbOrAggFunction extends AbstractRbAggFunction { + + /** + * Unions the input bitmap into the accumulator. + * + * @param acc the running bitmap accumulator + * @param bitmapBytes serialized RoaringBitmap bytes; null and empty arrays are ignored + */ + public void accumulate(RoaringBitmap acc, @Nullable byte[] bitmapBytes) throws IOException { + if (bitmapBytes == null || bitmapBytes.length == 0) { + return; + } + acc.or(BitmapUtils.fromBytes(bitmapBytes)); + } + + /** + * Retraction is not supported for bitmap OR — union is not reversible. + * + * @throws UnsupportedOperationException always + */ + public void retract(RoaringBitmap acc, @Nullable byte[] bitmapBytes) { + throw new UnsupportedOperationException( + "rb_or_agg does not support retraction. " + "Use it only on append-only streams."); + } +} diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java index b941b04f35..8354fbf20c 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java @@ -1007,7 +1007,7 @@ void testViewsAndFunctions() throws Exception { // Test functions operations List functions = catalog.listFunctions(DEFAULT_DB); - assertThat(functions).isEmpty(); + assertThat(functions).contains("rb_build_agg", "rb_or_agg", "rb_and_agg"); ObjectPath functionPath = new ObjectPath(DEFAULT_DB, "testFunction"); assertThat(catalog.functionExists(functionPath)).isFalse(); @@ -1102,4 +1102,20 @@ void registerLakeTable(ObjectPath tablePath, CatalogTable table) catalog.createTable(tablePath, table, false); } } + + @Test + void testBitmapFunctionsRegistered() throws Exception { + + List functions = catalog.listFunctions(DEFAULT_DB); + + assertThat(functions).contains("rb_build_agg", "rb_or_agg", "rb_and_agg"); + + assertThat(catalog.functionExists(new ObjectPath(DEFAULT_DB, "rb_build_agg"))).isTrue(); + + assertThat(catalog.functionExists(new ObjectPath(DEFAULT_DB, "rb_or_agg"))).isTrue(); + + assertThat(catalog.functionExists(new ObjectPath(DEFAULT_DB, "rb_and_agg"))).isTrue(); + + assertThat(catalog.functionExists(new ObjectPath(DEFAULT_DB, "unknown_fn"))).isFalse(); + } } diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/functions/bitmap/RbAggFunctionsTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/functions/bitmap/RbAggFunctionsTest.java new file mode 100644 index 0000000000..1f9e914963 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/functions/bitmap/RbAggFunctionsTest.java @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.functions.bitmap; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.junit.jupiter.api.Test; +import org.roaringbitmap.RoaringBitmap; + +import java.io.IOException; +import java.util.Collections; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Unit tests for {@link RbBuildAggFunction}, {@link RbOrAggFunction}, {@link RbAndAggFunction}. */ +class RbAggFunctionsTest { + + // ------------------------------------------------------------------------- + // RbBuildAggFunction + // ------------------------------------------------------------------------- + + @Test + void testBuildAggBasic() throws IOException { + RbBuildAggFunction fn = new RbBuildAggFunction(); + RoaringBitmap acc = fn.createAccumulator(); + + fn.accumulate(acc, 1); + fn.accumulate(acc, 2); + fn.accumulate(acc, 1); // duplicate — ignored + fn.accumulate(acc, null); // null — ignored + + byte[] result = fn.getValue(acc); + assertThat(result).isNotNull(); + RoaringBitmap restored = BitmapUtils.fromBytes(result); + assertThat(restored.getLongCardinality()).isEqualTo(2L); + assertThat(restored.contains(1)).isTrue(); + assertThat(restored.contains(2)).isTrue(); + } + + @Test + void testBuildAggEmptyReturnsNull() { + RbBuildAggFunction fn = new RbBuildAggFunction(); + RoaringBitmap acc = fn.createAccumulator(); + assertThat(fn.getValue(acc)).isNull(); + } + + @Test + void testBuildAggMerge() throws IOException { + RbBuildAggFunction fn = new RbBuildAggFunction(); + RoaringBitmap acc1 = fn.createAccumulator(); + fn.accumulate(acc1, 1); + fn.accumulate(acc1, 2); + + RoaringBitmap acc2 = fn.createAccumulator(); + fn.accumulate(acc2, 3); + + fn.merge(acc1, Collections.singletonList(acc2)); + assertThat(acc1.getLongCardinality()).isEqualTo(3L); + assertThat(acc1.contains(3)).isTrue(); + } + + @Test + void testBuildAggRetractThrows() { + RbBuildAggFunction fn = new RbBuildAggFunction(); + assertThatThrownBy(() -> fn.retract(fn.createAccumulator(), 1)) + .isInstanceOf(UnsupportedOperationException.class); + } + + // ------------------------------------------------------------------------- + // RbOrAggFunction + // ------------------------------------------------------------------------- + + @Test + void testOrAggBasic() throws IOException { + RbOrAggFunction fn = new RbOrAggFunction(); + RoaringBitmap acc = fn.createAccumulator(); + + byte[] bitmap1 = BitmapUtils.toBytes(RoaringBitmap.bitmapOf(1, 2, 3)); + byte[] bitmap2 = BitmapUtils.toBytes(RoaringBitmap.bitmapOf(3, 4, 5)); + + fn.accumulate(acc, bitmap1); + fn.accumulate(acc, bitmap2); + fn.accumulate(acc, null); // ignored + fn.accumulate(acc, new byte[0]); // ignored + + byte[] result = fn.getValue(acc); + assertThat(result).isNotNull(); + RoaringBitmap restored = BitmapUtils.fromBytes(result); + assertThat(restored.getLongCardinality()).isEqualTo(5L); + assertThat(restored.contains(1)).isTrue(); + assertThat(restored.contains(5)).isTrue(); + } + + @Test + void testOrAggEmptyReturnsNull() { + RbOrAggFunction fn = new RbOrAggFunction(); + assertThat(fn.getValue(fn.createAccumulator())).isNull(); + } + + @Test + void testOrAggMerge() throws IOException { + RbOrAggFunction fn = new RbOrAggFunction(); + RoaringBitmap acc1 = fn.createAccumulator(); + fn.accumulate(acc1, BitmapUtils.toBytes(RoaringBitmap.bitmapOf(1, 2))); + + RoaringBitmap acc2 = fn.createAccumulator(); + fn.accumulate(acc2, BitmapUtils.toBytes(RoaringBitmap.bitmapOf(3, 4))); + + fn.merge(acc1, Collections.singletonList(acc2)); + assertThat(acc1.getLongCardinality()).isEqualTo(4L); + } + + // ------------------------------------------------------------------------- + // RbAndAggFunction + // ------------------------------------------------------------------------- + + @Test + void testAndAggBasic() throws IOException { + RbAndAggFunction fn = new RbAndAggFunction(); + RbAndAggFunction.Accumulator acc = fn.createAccumulator(); + + fn.accumulate(acc, BitmapUtils.toBytes(RoaringBitmap.bitmapOf(1, 2, 3))); + fn.accumulate(acc, BitmapUtils.toBytes(RoaringBitmap.bitmapOf(2, 3, 4))); + fn.accumulate(acc, BitmapUtils.toBytes(RoaringBitmap.bitmapOf(3, 5, 6))); + + byte[] result = fn.getValue(acc); + assertThat(result).isNotNull(); + RoaringBitmap restored = BitmapUtils.fromBytes(result); + assertThat(restored.getLongCardinality()).isEqualTo(1L); + assertThat(restored.contains(3)).isTrue(); + } + + @Test + void testAndAggNullInputsIgnored() throws IOException { + RbAndAggFunction fn = new RbAndAggFunction(); + RbAndAggFunction.Accumulator acc = fn.createAccumulator(); + + fn.accumulate(acc, null); + fn.accumulate(acc, new byte[0]); + assertThat(fn.getValue(acc)).isNull(); // no real input + + fn.accumulate(acc, BitmapUtils.toBytes(RoaringBitmap.bitmapOf(1, 2))); + assertThat(fn.getValue(acc)).isNotNull(); + } + + @Test + void testAndAggEmptyIntersectionReturnsNull() throws IOException { + RbAndAggFunction fn = new RbAndAggFunction(); + RbAndAggFunction.Accumulator acc = fn.createAccumulator(); + + fn.accumulate(acc, BitmapUtils.toBytes(RoaringBitmap.bitmapOf(1, 2))); + fn.accumulate(acc, BitmapUtils.toBytes(RoaringBitmap.bitmapOf(3, 4))); + // Intersection of {1,2} and {3,4} is empty + assertThat(fn.getValue(acc)).isNull(); + } + + @Test + void testAndAggMerge() throws IOException { + RbAndAggFunction fn = new RbAndAggFunction(); + + RbAndAggFunction.Accumulator acc1 = fn.createAccumulator(); + fn.accumulate(acc1, BitmapUtils.toBytes(RoaringBitmap.bitmapOf(1, 2, 3))); + + RbAndAggFunction.Accumulator acc2 = fn.createAccumulator(); + fn.accumulate(acc2, BitmapUtils.toBytes(RoaringBitmap.bitmapOf(2, 3, 4))); + + fn.merge(acc1, Collections.singletonList(acc2)); + byte[] result = fn.getValue(acc1); + assertThat(result).isNotNull(); + RoaringBitmap restored = BitmapUtils.fromBytes(result); + assertThat(restored.getLongCardinality()).isEqualTo(2L); + assertThat(restored.contains(2)).isTrue(); + assertThat(restored.contains(3)).isTrue(); + } + + @Test + void testAndAggResetAccumulator() throws IOException { + RbAndAggFunction fn = new RbAndAggFunction(); + RbAndAggFunction.Accumulator acc = fn.createAccumulator(); + + fn.accumulate(acc, BitmapUtils.toBytes(RoaringBitmap.bitmapOf(1, 2))); + fn.resetAccumulator(acc); + assertThat(acc.initialized).isFalse(); + assertThat(fn.getValue(acc)).isNull(); + } + + @Test + void testAndAggAccumulatorSerializerRoundTrip() throws Exception { + RbAndAggFunction.AccumulatorSerializer ser = + RbAndAggFunction.AccumulatorSerializer.INSTANCE; + + RbAndAggFunction.Accumulator original = new RbAndAggFunction.Accumulator(); + original.initialized = true; + original.value = RoaringBitmap.bitmapOf(10, 20, 30); + + DataOutputSerializer out = new DataOutputSerializer(256); + ser.serialize(original, out); + + DataInputDeserializer in = new DataInputDeserializer(out.getCopyOfBuffer()); + RbAndAggFunction.Accumulator restored = ser.deserialize(in); + + assertThat(restored.initialized).isTrue(); + assertThat(restored.value).isEqualTo(original.value); + } + + @Test + void testAndAggAccumulatorSerializerUninitializedRoundTrip() throws Exception { + RbAndAggFunction.AccumulatorSerializer ser = + RbAndAggFunction.AccumulatorSerializer.INSTANCE; + + RbAndAggFunction.Accumulator original = new RbAndAggFunction.Accumulator(); + + DataOutputSerializer out = new DataOutputSerializer(64); + ser.serialize(original, out); + + DataInputDeserializer in = new DataInputDeserializer(out.getCopyOfBuffer()); + RbAndAggFunction.Accumulator restored = ser.deserialize(in); + assertThat(restored.initialized).isFalse(); + } + + // ------------------------------------------------------------------------- + // RbAndAggFunction inner class coverage + // ------------------------------------------------------------------------- + + @Test + void testAndAggAccumulatorTypeInfoProperties() { + RbAndAggFunction.AccumulatorTypeInfo info = RbAndAggFunction.AccumulatorTypeInfo.INSTANCE; + assertThat(info.getTypeClass()).isEqualTo(RbAndAggFunction.Accumulator.class); + assertThat(info.isBasicType()).isFalse(); + assertThat(info.isTupleType()).isFalse(); + assertThat(info.getArity()).isEqualTo(1); + assertThat(info.getTotalFields()).isEqualTo(1); + assertThat(info.isKeyType()).isFalse(); + assertThat(info.toString()).isEqualTo("RbAndAccumulatorTypeInfo"); + assertThat(info.hashCode()).isNotZero(); + assertThat(info.equals(info)).isTrue(); + assertThat(info.equals("other")).isFalse(); + assertThat(info.canEqual(info)).isTrue(); + assertThat(info.canEqual("other")).isFalse(); + } + + @Test + void testAndAggAccumulatorTypeInfoCreateSerializer() { + TypeSerializer s = + RbAndAggFunction.AccumulatorTypeInfo.INSTANCE.createSerializer( + new ExecutionConfig()); + assertThat(s).isInstanceOf(RbAndAggFunction.AccumulatorSerializer.class); + } + + @Test + void testAndAggAccumulatorSerializerCreateInstance() { + RbAndAggFunction.AccumulatorSerializer ser = + RbAndAggFunction.AccumulatorSerializer.INSTANCE; + RbAndAggFunction.Accumulator acc = ser.createInstance(); + assertThat(acc.initialized).isFalse(); + assertThat(acc.value.isEmpty()).isTrue(); + } + + @Test + void testAndAggAccumulatorSerializerIsNotImmutable() { + assertThat(RbAndAggFunction.AccumulatorSerializer.INSTANCE.isImmutableType()).isFalse(); + } + + @Test + void testAndAggAccumulatorSerializerGetLength() { + assertThat(RbAndAggFunction.AccumulatorSerializer.INSTANCE.getLength()).isEqualTo(-1); + } + + @Test + void testAndAggAccumulatorSerializerCopy() { + RbAndAggFunction.AccumulatorSerializer ser = + RbAndAggFunction.AccumulatorSerializer.INSTANCE; + RbAndAggFunction.Accumulator original = new RbAndAggFunction.Accumulator(); + original.initialized = true; + original.value = RoaringBitmap.bitmapOf(1, 2, 3); + + RbAndAggFunction.Accumulator copy = ser.copy(original); + assertThat(copy.initialized).isTrue(); + assertThat(copy.value).isEqualTo(original.value); + // Verify deep copy + copy.value.add(999); + assertThat(original.value.contains(999)).isFalse(); + } + + @Test + void testAndAggAccumulatorSerializerCopyWithReuse() { + RbAndAggFunction.AccumulatorSerializer ser = + RbAndAggFunction.AccumulatorSerializer.INSTANCE; + RbAndAggFunction.Accumulator original = new RbAndAggFunction.Accumulator(); + original.initialized = true; + original.value = RoaringBitmap.bitmapOf(10, 20); + + RbAndAggFunction.Accumulator reuse = new RbAndAggFunction.Accumulator(); + RbAndAggFunction.Accumulator copy = ser.copy(original, reuse); + assertThat(copy.initialized).isTrue(); + assertThat(copy.value).isEqualTo(original.value); + } + + @Test + void testAndAggAccumulatorSerializerCopyStream() throws Exception { + RbAndAggFunction.AccumulatorSerializer ser = + RbAndAggFunction.AccumulatorSerializer.INSTANCE; + RbAndAggFunction.Accumulator original = new RbAndAggFunction.Accumulator(); + original.initialized = true; + original.value = RoaringBitmap.bitmapOf(5, 10, 15); + + DataOutputSerializer out = new DataOutputSerializer(256); + ser.serialize(original, out); + + DataInputDeserializer in = new DataInputDeserializer(out.getCopyOfBuffer()); + DataOutputSerializer copied = new DataOutputSerializer(256); + ser.copy(in, copied); + + DataInputDeserializer copiedIn = new DataInputDeserializer(copied.getCopyOfBuffer()); + RbAndAggFunction.Accumulator restored = ser.deserialize(copiedIn); + assertThat(restored.initialized).isTrue(); + assertThat(restored.value).isEqualTo(original.value); + } + + @Test + void testAndAggAccumulatorSerializerSnapshotNotNull() { + assertThat(RbAndAggFunction.AccumulatorSerializer.INSTANCE.snapshotConfiguration()) + .isNotNull(); + } +} diff --git a/fluss-test-coverage/pom.xml b/fluss-test-coverage/pom.xml index 2f9fdba582..3af4f089bc 100644 --- a/fluss-test-coverage/pom.xml +++ b/fluss-test-coverage/pom.xml @@ -495,6 +495,9 @@ org.apache.fluss.flink.tiering.FlussLakeTieringEntrypoint org.apache.fluss.flink.tiering.FlussLakeTiering + + org.apache.fluss.flink.functions.bitmap.RbAndAggFunction.AccumulatorSerializer.AccumulatorSerializerSnapshot + org.apache.flink.table.catalog.*