Add optional source-field data type fix during ingestion#18816
Add optional source-field data type fix during ingestion#18816Jackie-Jiang wants to merge 2 commits into
Conversation
Rename the enum constants `PinotDataType.INTEGER` -> `INT` and `INTEGER_ARRAY` ->
`INT_ARRAY` so the spelling matches `FieldSpec.DataType.INT`, `ColumnDataType.INT`,
and the SQL type name. `PinotDataType` is a transient conversion helper and is never
persisted by name into segment metadata, ZooKeeper, table config, or the wire protocol,
so the rename carries no serialization / mixed-version compatibility risk.
The only name-based reader is the `cast` scalar function, which accepts a type literal:
`CAST(x AS INTEGER)` previously resolved via `PinotDataType.valueOf("INTEGER")`. An
explicit `case "INTEGER"` alias is added so that literal keeps working (alongside `INT`).
cc6f1e0 to
65475e5
Compare
There was a problem hiding this comment.
Pull request overview
This PR introduces a new optional ingestion-time mechanism to coerce configured source/input fields to specific PinotDataTypes (either pre- or post-complex-type transform), so downstream enrichers/expressions can consume values in the expected type. It also includes a stacked rename change (PinotDataType.INTEGER → INT) with associated reference/test updates.
Changes:
- Add
SourceFieldConfig+ingestionConfig.sourceFieldConfigsto configure per-source-field type coercion, with optional pre/post complex-type phase selection. - Insert optional pre- and post-complex-type
DataTypeTransformerinstances into the ingestion transformer chain based onsourceFieldConfigs, and validate duplicate configs per phase. - Rename
PinotDataType.INTEGER/INTEGER_ARRAYtoINT/INT_ARRAYand update affected code/tests, including keepingCAST(... AS INTEGER)working.
Reviewed changes
Copilot reviewed 32 out of 32 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| pinot-spi/src/test/java/org/apache/pinot/spi/utils/PinotDataTypeTest.java | Updates tests for PinotDataType.INT rename. |
| pinot-spi/src/test/java/org/apache/pinot/spi/config/table/ingestion/SourceFieldConfigTest.java | Adds unit tests for SourceFieldConfig JSON + validation behavior. |
| pinot-spi/src/main/java/org/apache/pinot/spi/utils/PinotDataType.java | Renames enum constants and updates conversion/type-detection logic accordingly. |
| pinot-spi/src/main/java/org/apache/pinot/spi/data/OpenStructTypeInference.java | Updates inferred INT mapping to use PinotDataType.INT. |
| pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/SourceFieldConfig.java | New config object for per-source-field type fixes (pre/post complex-type). |
| pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java | Adds sourceFieldConfigs to ingestion config and adjusts schema-conforming V2 setter handling. |
| pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java | Adds validation test coverage for duplicate SourceFieldConfig entries per phase. |
| pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java | Adds tests for transformer ordering and type conversion for source-field coercion. |
| pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java | Validates sourceFieldConfigs do not duplicate the same field within a phase. |
| pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/MapColumnPreIndexStatsCollector.java | Updates INT mapping to use PinotDataType.INT. |
| pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerUtils.java | Inserts optional source-field DataTypeTransformers pre/post complex-type into pipeline. |
| pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/DataTypeTransformer.java | Adds Map<String, PinotDataType> constructor to reuse conversion logic for source fields. |
| pinot-segment-local/src/main/java/org/apache/pinot/segment/local/columntransformer/DataTypeColumnTransformer.java | Updates no-op checks for renamed PinotDataType.INT(_ARRAY). |
| pinot-core/src/test/java/org/apache/pinot/core/query/selection/SelectionOrderByTest.java | Updates expected result schema strings from INTEGER to INT. |
| pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/ModeAggregationFunctionTest.java | Simplifies PinotDataType selection now that enum name matches FieldSpec.DataType. |
| pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/MinMaxRangeAggregationFunctionTest.java | Same as above (INT rename alignment). |
| pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/LastWithTimeAggregationFunctionTest.java | Same as above (INT rename alignment). |
| pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/FirstWithTimeAggregationFunctionTest.java | Same as above (INT rename alignment). |
| pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/DistinctSumAggregationFunctionTest.java | Updates expected result schema strings to INT. |
| pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/DistinctCountBitmapMVAggregationFunctionTest.java | Updates expected result schema strings to INT. |
| pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/DistinctCountAggregationFunctionTest.java | Updates expected result schema strings to INT. |
| pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/DistinctAvgAggregationFunctionTest.java | Updates expected result schema strings to INT. |
| pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/DistinctAggregationFunctionTest.java | Updates expected result schema strings to INT. |
| pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/CountAggregationFunctionTest.java | Updates expected result schema strings to INT. |
| pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AvgAggregationFunctionTest.java | Updates expected result schema strings to INT. |
| pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/ScalarTransformFunctionWrapper.java | Updates literal conversion and switch cases to use PinotDataType.INT. |
| pinot-common/src/test/java/org/apache/pinot/common/function/scalar/DataTypeConversionFunctionsTest.java | Adds regression test ensuring CAST(... AS INTEGER) still resolves to INT. |
| pinot-common/src/test/java/org/apache/pinot/common/function/FunctionUtilsTest.java | Updates argument/parameter type expectations to PinotDataType.INT(_ARRAY). |
| pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java | Updates ColumnDataType.INT → PinotDataType.INT mapping. |
| pinot-common/src/main/java/org/apache/pinot/common/request/context/LiteralContext.java | Updates literal PinotDataType mapping for INT. |
| pinot-common/src/main/java/org/apache/pinot/common/function/scalar/DataTypeConversionFunctions.java | Adds explicit "INTEGER" cast literal handling to preserve backwards compatibility. |
| pinot-common/src/main/java/org/apache/pinot/common/function/FunctionUtils.java | Updates parameter-type mapping to PinotDataType.INT. |
| @JsonCreator | ||
| public SourceFieldConfig(@JsonProperty(value = "name", required = true) String name, | ||
| @JsonProperty(value = "dataType", required = true) PinotDataType dataType, | ||
| @JsonProperty("preComplexTypeTransform") boolean preComplexTypeTransform) { | ||
| Preconditions.checkArgument(StringUtils.isNotEmpty(name), "'name' must be set in SourceFieldConfig"); | ||
| Preconditions.checkArgument(dataType != null, "'dataType' must be set in SourceFieldConfig for source field: %s", |
There was a problem hiding this comment.
Done — switched to StringUtils.isNotBlank(name) and added a whitespace-only case to SourceFieldConfigTest.
| /** | ||
| * The {@code DataTypeTransformer} class will convert the values to follow the data types in {@link FieldSpec}. | ||
| * <p>NOTE: should put this after all the values has been generated by other transformers (such as | ||
| * {@link ExpressionTransformer}). After this, all values should be of the desired data types. | ||
| */ |
There was a problem hiding this comment.
Done — updated the class Javadoc to document both usages: schema columns (run after other transformers such as ExpressionTransformer) and source fields (run before them to fix input types).
| public class IngestionConfig extends BaseJsonConfig { | ||
|
|
||
| @JsonPropertyDescription("Config related to the batch data sources") | ||
| private BatchIngestionConfig _batchIngestionConfig; | ||
|
|
||
| @JsonPropertyDescription("Config related to the stream data sources") | ||
| private StreamIngestionConfig _streamIngestionConfig; | ||
|
|
||
| @JsonPropertyDescription("Configs to fix the data types of the source fields before applying other transforms") | ||
| private List<SourceFieldConfig> _sourceFieldConfigs; | ||
|
|
There was a problem hiding this comment.
That constructor was already @Deprecated and has no in-tree callers (IngestionConfig is built via setters everywhere). Removing it is an intentional cleanup; downstream code on the deprecated constructor can migrate to the no-arg constructor + setters.
| public DataTypeTransformer(TableConfig tableConfig, Map<String, PinotDataType> dataTypes) { | ||
| _dataTypes = dataTypes; | ||
| IngestionConfig ingestionConfig = tableConfig.getIngestionConfig(); | ||
| _continueOnError = ingestionConfig != null && ingestionConfig.isContinueOnError(); | ||
| _throttledLogger = new ThrottledLogger(LOGGER, ingestionConfig); | ||
| } |
There was a problem hiding this comment.
Both call sites build a fresh map that they neither retain nor mutate after construction, so direct assignment is safe and avoids an extra per-segment copy. getInputColumns() is consumed read-only by the transform pipeline.
Introduce `SourceFieldConfig` (`name`, `dataType`, `preComplexTypeTransform`) listed in `IngestionConfig.sourceFieldConfigs`, used to coerce the data type of a source/input field during ingestion before other transformers consume it (e.g. an epoch timestamp arriving as a String that a transform expression expects as LONG). `RecordTransformerUtils` builds the fix as a `DataTypeTransformer`: - `preComplexTypeTransform = true` -> before the ComplexTypeTransformer and pre-complex-type RecordEnrichers. - `preComplexTypeTransform = false` (default) -> after the ComplexTypeTransformer, before the post-complex-type RecordEnrichers and the ExpressionTransformer. `DataTypeTransformer` gains a `Map<String, PinotDataType>` constructor so the source-field fix reuses the existing conversion logic. `TableConfigUtils` validates that a field is configured at most once per phase (the same field may appear once pre- and once post-complex-type).
65475e5 to
75f8cac
Compare
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #18816 +/- ##
============================================
+ Coverage 64.80% 64.82% +0.02%
Complexity 1319 1319
============================================
Files 3388 3389 +1
Lines 210228 210265 +37
Branches 32948 32958 +10
============================================
+ Hits 136229 136310 +81
+ Misses 63028 62985 -43
+ Partials 10971 10970 -1
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
Summary
Solve #16317
Adds an optional per-source-field data type fix during ingestion, configured via a new
SourceFieldConfiglisted underingestionConfig.sourceFieldConfigs. It coerces the data type of a source/input field before other transformers consume it — useful when a source field arrives with a type that a downstream enricher or transform expression does not expect (e.g. an epoch timestamp arriving as aStringthattoEpochDays(ts)expects asLONG).Placement in the transformer chain
The fix is applied as a
DataTypeTransformer, withpreComplexTypeTransformselecting the phase (mirroring howRecordEnricherdistinguishes pre/post complex-type):true→ before theComplexTypeTransformerand the pre-complex-typeRecordEnrichers, so the corrected value can feed complex-type flattening and pre-complex-type enrichment.false(default) → after theComplexTypeTransformer, before the post-complex-typeRecordEnrichers and theExpressionTransformer, so flattened/unnested fields can be fixed before expressions run.Source fields that are not schema columns are extracted automatically (the transformer's
getInputColumns()flows intoIngestionUtils.getFieldsForRecordExtractorvia theTransformPipeline).Implementation notes
DataTypeTransformergains aMap<String, PinotDataType>constructor so the source-field fix reuses the existing conversion logic.TableConfigUtilsvalidates that a field is configured at most once per phase — the same field may legitimately appear once pre- and once post-complex-type.SourceFieldConfigrequiresnameanddataType(validated in the constructor).Stacked on #18815 (the
PinotDataType.INTEGER→INTrename). The first commit here is that rename; review/merge this after #18815, and it will reduce to the feature commit once #18815 lands and this branch is rebased.