Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.fluss.memory.MemorySegment;
import org.apache.fluss.metadata.PhysicalTablePath;
import org.apache.fluss.record.ChangeType;
import org.apache.fluss.record.LogRecordBatchStatisticsCollector;
import org.apache.fluss.record.MemoryLogRecordsArrowBuilder;
import org.apache.fluss.record.bytesview.BytesView;
import org.apache.fluss.row.InternalRow;
Expand Down Expand Up @@ -55,11 +56,13 @@ public ArrowLogWriteBatch(
int schemaId,
ArrowWriter arrowWriter,
AbstractPagedOutputView outputView,
long createdMs) {
long createdMs,
LogRecordBatchStatisticsCollector statisticsCollector) {
super(bucketId, physicalTablePath, createdMs);
this.outputView = outputView;
this.recordsBuilder =
MemoryLogRecordsArrowBuilder.builder(schemaId, arrowWriter, outputView, true);
MemoryLogRecordsArrowBuilder.builder(
schemaId, arrowWriter, outputView, true, statisticsCollector);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metrics.MetricNames;
import org.apache.fluss.record.LogRecordBatchStatisticsCollector;
import org.apache.fluss.row.arrow.ArrowWriter;
import org.apache.fluss.row.arrow.ArrowWriterPool;
import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator;
Expand Down Expand Up @@ -628,13 +629,20 @@ private WriteBatch createWriteBatch(
outputView.getPreAllocatedSize(),
tableInfo.getRowType(),
tableInfo.getTableConfig().getArrowCompressionInfo());
LogRecordBatchStatisticsCollector statisticsCollector = null;
if (tableInfo.isStatisticsEnabled()) {
statisticsCollector =
new LogRecordBatchStatisticsCollector(
tableInfo.getRowType(), tableInfo.getStatsIndexMapping());
}
return new ArrowLogWriteBatch(
bucketId,
physicalTablePath,
tableInfo.getSchemaId(),
arrowWriter,
outputView,
clock.milliseconds());
clock.milliseconds(),
statisticsCollector);

case COMPACTED_LOG:
return new CompactedLogWriteBatch(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ void testAppendWithPreAllocatedMemorySegments() throws Exception {
DATA1_ROW_TYPE,
DEFAULT_COMPRESSION),
new PreAllocatedPagedOutputView(memorySegmentList),
System.currentTimeMillis());
System.currentTimeMillis(),
null);
assertThat(arrowLogWriteBatch.pooledMemorySegments()).isEqualTo(memorySegmentList);

int count = 0;
Expand Down Expand Up @@ -210,7 +211,8 @@ void testArrowCompressionRatioEstimated() throws Exception {
DATA1_TABLE_INFO.getSchemaId(),
arrowWriter,
new PreAllocatedPagedOutputView(memorySegmentList),
System.currentTimeMillis());
System.currentTimeMillis(),
null);

int recordCount = 0;
while (arrowLogWriteBatch.tryAppend(
Expand Down Expand Up @@ -310,7 +312,8 @@ private ArrowLogWriteBatch createArrowLogWriteBatch(TableBucket tb, int maxSizeI
DATA1_ROW_TYPE,
DEFAULT_COMPRESSION),
new UnmanagedPagedOutputView(128),
System.currentTimeMillis());
System.currentTimeMillis(),
null);
}

private WriteCallback newWriteCallback() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1579,6 +1579,18 @@ public class ConfigOptions {
+ "This mode reduces storage and transmission costs but loses the ability to track previous values. "
+ "This option only affects primary key tables.");

public static final ConfigOption<String> TABLE_STATISTICS_COLUMNS =
key("table.statistics.columns")
.stringType()
.defaultValue("*")
.withDescription(
"Configures statistics collection for the table. "
+ "Empty string ('') (default) means disable statistics collection completely. "
+ "The value '*' means collect statistics for all non-binary columns. "
+ "Comma-separated list of column names means collect statistics only for the specified columns. "
+ "Binary and bytes columns are not supported for statistics collection. "
+ "Example: 'id,name,timestamp' to collect statistics only for specified columns.");

// ------------------------------------------------------------------------
// ConfigOptions for Kv
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.config;

import org.apache.fluss.annotation.Internal;
import org.apache.fluss.exception.InvalidConfigException;
import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.types.DataField;
import org.apache.fluss.types.DataType;
import org.apache.fluss.types.DataTypeChecks;
import org.apache.fluss.types.RowType;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
* Utility class for validating table statistics configuration.
*
* <p>This provides simple validation methods that can be called during CREATE TABLE operations to
* ensure statistics configuration is valid and compatible with the table schema.
*/
@Internal
public class StatisticsConfigUtils {

private StatisticsConfigUtils() {}

/**
* Validates statistics configuration for a table descriptor.
*
* @param tableDescriptor the table descriptor to validate
* @throws InvalidConfigException if the statistics configuration is invalid
*/
public static void validateStatisticsConfig(TableDescriptor tableDescriptor) {
Map<String, String> properties = tableDescriptor.getProperties();
String statisticsColumns =
properties.getOrDefault(ConfigOptions.TABLE_STATISTICS_COLUMNS.key(), "*");

// Empty string means statistics disabled - no validation needed
if (statisticsColumns.isEmpty()) {
return;
}

RowType rowType = tableDescriptor.getSchema().getRowType();

// Wildcard means all non-binary columns - no validation needed
if ("*".equals(statisticsColumns.trim())) {
return;
}

// Parse and validate specific column names
List<String> columnNames = parseColumnNames(statisticsColumns);
if (columnNames.isEmpty()) {
throw new InvalidConfigException(
"Statistics columns configuration cannot be empty. "
+ "Use '*' to collect statistics for all non-binary columns, "
+ "or use empty string '' to disable statistics collection.");
}

validateColumns(rowType, columnNames);
}

/**
* Parses comma-separated column names from the configuration string.
*
* @param columnsConfig the configuration string
* @return list of parsed column names
*/
private static List<String> parseColumnNames(String columnsConfig) {
return Arrays.stream(columnsConfig.split(","))
.map(String::trim)
.filter(s -> !s.isEmpty())
.collect(Collectors.toList());
}

/**
* Validates that the specified columns exist in the schema and are of supported types.
*
* @param rowType the table schema
* @param statisticsColumns the list of column names to validate
* @throws InvalidConfigException if validation fails
*/
private static void validateColumns(RowType rowType, List<String> statisticsColumns) {
Map<String, DataType> columnTypeMap = buildColumnTypeMap(rowType);

for (String columnName : statisticsColumns) {
// Check if column exists
if (!columnTypeMap.containsKey(columnName)) {
throw new InvalidConfigException(
String.format(
"Column '%s' specified in statistics collection does not exist in table schema",
columnName));
}

// Check if column type is supported
DataType dataType = columnTypeMap.get(columnName);
if (DataTypeChecks.isBinaryType(dataType)) {
throw new InvalidConfigException(
String.format(
"Binary column '%s' cannot be included in statistics collection. "
+ "Binary and bytes columns are not supported for statistics collection.",
columnName));
}
}
}

/**
* Builds a map from column name to data type for quick lookup.
*
* @param rowType the table schema
* @return map of column name to data type
*/
private static Map<String, DataType> buildColumnTypeMap(RowType rowType) {
return rowType.getFields().stream()
.collect(Collectors.toMap(DataField::getName, DataField::getType));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@
import org.apache.fluss.utils.AutoPartitionStrategy;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

/**
* Helper class to get table configs (prefixed with "table.*" properties).
Expand Down Expand Up @@ -154,4 +157,42 @@ public AutoPartitionStrategy getAutoPartitionStrategy() {
public long getAutoIncrementCacheSize() {
return config.get(ConfigOptions.TABLE_AUTO_INCREMENT_CACHE_SIZE);
}

/** Gets whether statistics collection is enabled for the table. */
public boolean isStatisticsEnabled() {
String columnsStr = config.get(ConfigOptions.TABLE_STATISTICS_COLUMNS);
return !columnsStr.isEmpty();
}

/**
* Gets the statistics columns configuration of the table.
*
* @return Optional containing the list of column names if specific columns are configured,
* empty if all non-binary columns should be collected ("*" configuration), null if
* statistics collection is disabled (empty string configuration)
*/
public Optional<List<String>> getStatisticsColumns() {
String columnsStr = config.get(ConfigOptions.TABLE_STATISTICS_COLUMNS);
if (columnsStr.isEmpty()) {
return null; // null means statistics collection is disabled
}
if ("*".equals(columnsStr)) {
return Optional.empty(); // Empty means collect all non-binary columns
}
List<String> columns =
Arrays.stream(columnsStr.split(","))
.map(String::trim)
.filter(s -> !s.isEmpty())
.collect(Collectors.toList());
return Optional.of(columns);
}

/**
* Checks whether the table is configured to collect statistics for all non-binary columns.
*
* @return true if configured with "*" (collect all non-binary columns), false otherwise
*/
public boolean isCollectAllNonBinaryColumns() {
return "*".equals(config.get(ConfigOptions.TABLE_STATISTICS_COLUMNS));
}
}
Loading