Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,21 @@ private void initReader() throws IOException {

private int[] getProjected() {
return Arrays.stream(fields).mapToInt(fieldName -> {
int index = paimonAllFieldNames.indexOf(fieldName);
int index = getFieldIndex(paimonAllFieldNames, fieldName);
Preconditions.checkArgument(index >= 0, "RequiredField %s not found in schema", fieldName);
return index;
}).toArray();
}

static int getFieldIndex(List<String> fieldNames, String fieldName) {
for (int i = 0; i < fieldNames.size(); i++) {
if (fieldNames.get(i).equalsIgnoreCase(fieldName)) {
return i;
}
}
return -1;
}

private List<Predicate> getPredicates() {
List<Predicate> predicates = PaimonUtils.deserialize(paimonPredicate);
if (LOG.isDebugEnabled()) {
Expand All @@ -154,7 +163,7 @@ private void resetDatetimeV2Precision() {
if (types[i].isDateTimeV2()) {
// paimon support precision > 6, but it has been reset as 6 in FE
// try to get the right precision for datetimev2
int index = paimonAllFieldNames.indexOf(fields[i]);
int index = getFieldIndex(paimonAllFieldNames, fields[i]);
if (index != -1) {
DataType dataType = table.rowType().getTypeAt(index);
if (dataType instanceof TimestampType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

package org.apache.doris.paimon;

import org.junit.Assert;
import org.junit.Test;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

Expand All @@ -33,4 +35,14 @@ public void testConstructorAcceptsEmptyProjection() {

new PaimonJniScanner(128, params);
}

@Test
public void testGetFieldIndexMatchesMixedCaseColumns() {
Assert.assertEquals(1, PaimonJniScanner.getFieldIndex(Arrays.asList("data", "mIxEd_COL", "PART"),
"mixed_col"));
Assert.assertEquals(2, PaimonJniScanner.getFieldIndex(Arrays.asList("data", "mIxEd_COL", "PART"),
"part"));
Assert.assertEquals(-1, PaimonJniScanner.getFieldIndex(Arrays.asList("data", "mIxEd_COL", "PART"),
"missing_col"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;

import java.util.Collections;
import java.util.List;


Expand All @@ -37,14 +38,21 @@
*/
public class DorisTypeToIcebergType extends DorisTypeVisitor<Type> {
private final StructType root;
private final List<String> rootFieldNames;
private int nextId = 0;

public DorisTypeToIcebergType() {
this.root = null;
this.rootFieldNames = Collections.emptyList();
}

public DorisTypeToIcebergType(StructType root) {
this(root, Collections.emptyList());
}

public DorisTypeToIcebergType(StructType root, List<String> rootFieldNames) {
this.root = root;
this.rootFieldNames = rootFieldNames;
// the root struct's fields use the first ids
this.nextId = root.getFields().size();
}
Expand All @@ -65,10 +73,11 @@ public Type struct(StructType struct, List<Type> types) {
Type type = types.get(i);

int id = isRoot ? i : getNextId();
String fieldName = isRoot && !rootFieldNames.isEmpty() ? rootFieldNames.get(i) : field.getName();
if (field.getContainsNull()) {
newFields.add(Types.NestedField.optional(id, field.getName(), type, field.getComment()));
newFields.add(Types.NestedField.optional(id, fieldName, type, field.getComment()));
} else {
newFields.add(Types.NestedField.required(id, field.getName(), type, field.getComment()));
newFields.add(Types.NestedField.required(id, fieldName, type, field.getComment()));
}
}
return Types.StructType.of(newFields);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,8 +368,8 @@ public boolean performCreateTable(CreateTableInfo createTableInfo) throws UserEx
.map(col -> new StructField(col.getName(), col.getType(), col.getComment(), col.isAllowNull()))
.collect(Collectors.toList());
StructType structType = new StructType(new ArrayList<>(collect));
Type visit =
DorisTypeVisitor.visit(structType, new DorisTypeToIcebergType(structType));
List<String> rootFieldNames = columns.stream().map(Column::getName).collect(Collectors.toList());
Type visit = DorisTypeVisitor.visit(structType, new DorisTypeToIcebergType(structType, rootFieldNames));
Comment thread
Gabriel39 marked this conversation as resolved.
Schema schema = new Schema(visit.asNestedType().asStructType().fields());
Map<String, String> properties = createTableInfo.getProperties();
properties.put(ExternalCatalog.DORIS_VERSION, ExternalCatalog.DORIS_VERSION_VALUE);
Expand Down Expand Up @@ -1348,7 +1348,7 @@ private org.apache.iceberg.SortOrder buildSortOrder(List<SortFieldInfo> sortFiel

org.apache.iceberg.SortOrder.Builder builder = org.apache.iceberg.SortOrder.builderFor(schema);
for (SortFieldInfo sortField : sortFields) {
String columnName = sortField.getColumnName();
String columnName = getIcebergColumnName(schema, sortField.getColumnName());
if (sortField.isAscending()) {
if (sortField.isNullFirst()) {
builder.asc(columnName, org.apache.iceberg.NullOrder.NULLS_FIRST);
Expand All @@ -1365,4 +1365,9 @@ private org.apache.iceberg.SortOrder buildSortOrder(List<SortFieldInfo> sortFiel
}
return builder.build();
}

private static String getIcebergColumnName(Schema schema, String columnName) {
NestedField field = schema.caseInsensitiveFindField(columnName);
return field == null ? columnName : field.name();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -585,37 +584,43 @@ public static PartitionSpec solveIcebergPartitionSpec(PartitionDesc partitionDes
PartitionSpec.Builder builder = PartitionSpec.builderFor(schema);
for (Expr expr : partitionExprs) {
if (expr instanceof SlotRef) {
builder.identity(((SlotRef) expr).getColumnName());
builder.identity(getIcebergColumnName(schema, ((SlotRef) expr).getColumnName()));
} else if (expr instanceof FunctionCallExpr) {
String exprName = expr.accept(ExprToExprNameVisitor.INSTANCE, null);
List<Expr> params = ((FunctionCallExpr) expr).getParams().exprs();
switch (exprName.toLowerCase()) {
case "bucket":
builder.bucket(
params.get(1).accept(ExprToExprNameVisitor.INSTANCE, null),
getIcebergColumnName(schema,
params.get(1).accept(ExprToExprNameVisitor.INSTANCE, null)),
Integer.parseInt(params.get(0).getStringValue()));
break;
case "year":
case "years":
builder.year(params.get(0).accept(ExprToExprNameVisitor.INSTANCE, null));
builder.year(getIcebergColumnName(schema,
params.get(0).accept(ExprToExprNameVisitor.INSTANCE, null)));
break;
case "month":
case "months":
builder.month(params.get(0).accept(ExprToExprNameVisitor.INSTANCE, null));
builder.month(getIcebergColumnName(schema,
params.get(0).accept(ExprToExprNameVisitor.INSTANCE, null)));
break;
case "date":
case "day":
case "days":
builder.day(params.get(0).accept(ExprToExprNameVisitor.INSTANCE, null));
builder.day(getIcebergColumnName(schema,
params.get(0).accept(ExprToExprNameVisitor.INSTANCE, null)));
break;
case "date_hour":
case "hour":
case "hours":
builder.hour(params.get(0).accept(ExprToExprNameVisitor.INSTANCE, null));
builder.hour(getIcebergColumnName(schema,
params.get(0).accept(ExprToExprNameVisitor.INSTANCE, null)));
break;
case "truncate":
builder.truncate(
params.get(1).accept(ExprToExprNameVisitor.INSTANCE, null),
getIcebergColumnName(schema,
params.get(1).accept(ExprToExprNameVisitor.INSTANCE, null)),
Integer.parseInt(params.get(0).getStringValue()));
break;
default:
Expand All @@ -626,6 +631,11 @@ public static PartitionSpec solveIcebergPartitionSpec(PartitionDesc partitionDes
return builder.build();
}

private static String getIcebergColumnName(Schema schema, String columnName) {
Types.NestedField field = schema.caseInsensitiveFindField(columnName);
return field == null ? columnName : field.name();
}

private static Type icebergPrimitiveTypeToDorisType(org.apache.iceberg.types.Type.PrimitiveType primitive,
boolean enableMappingVarbinary, boolean enableMappingTimestampTz) {
switch (primitive.typeId()) {
Expand Down Expand Up @@ -771,7 +781,7 @@ public static List<String> getIdentityPartitionColumns(Table table) {
}
String columnName = table.schema().findColumnName(partitionField.sourceId());
if (columnName != null) {
partitionColumns.add(columnName.toLowerCase(Locale.ROOT));
partitionColumns.add(columnName);
}
}
}
Expand Down Expand Up @@ -803,8 +813,7 @@ public static Map<String, String> getIdentityPartitionInfoMap(PartitionData part
}
Object value = partitionData.get(i);
try {
partitionInfoMap.put(columnName.toLowerCase(Locale.ROOT),
serializePartitionValue(field.type(), value, timeZone));
partitionInfoMap.put(columnName, serializePartitionValue(field.type(), value, timeZone));
} catch (UnsupportedOperationException e) {
LOG.warn("Failed to serialize Iceberg table partition value for field {}: {}", field.name(),
e.getMessage());
Expand Down Expand Up @@ -1168,7 +1177,7 @@ public static List<Column> parseSchema(Schema schema, boolean enableMappingVarbi
List<Types.NestedField> columns = schema.columns();
List<Column> resSchema = Lists.newArrayListWithCapacity(columns.size());
for (Types.NestedField field : columns) {
Column column = new Column(field.name().toLowerCase(Locale.ROOT),
Column column = new Column(field.name(),
IcebergUtils.icebergTypeToDorisType(field.type(), enableMappingVarbinary, enableMappingTimestampTz),
true, null,
true, field.doc(), true, -1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ public Optional<SchemaCacheValue> initSchema(SchemaCacheKey key) {
Set<String> partitionColumnNames = Sets.newHashSet(tableSchema.partitionKeys());
List<Column> partitionColumns = Lists.newArrayList();
for (DataField field : columns) {
Column column = new Column(field.name().toLowerCase(),
Column column = new Column(field.name(),
Comment thread
Gabriel39 marked this conversation as resolved.
PaimonUtil.paimonTypeToDorisType(field.type(), getCatalog().getEnableMappingVarbinary(),
getCatalog().getEnableMappingTimestampTz()),
true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -218,7 +219,9 @@ public boolean performCreateTable(CreateTableInfo createTableInfo) throws UserEx
col.getComment(), col.isNullable()))
.collect(Collectors.toList());
StructType structType = new StructType(new ArrayList<>(collect));
Schema schema = toPaimonSchema(structType, createTableInfo.getPartitionDesc(), createTableInfo.getProperties());
List<String> rootFieldNames = columns.stream().map(ColumnDefinition::getName).collect(Collectors.toList());
Schema schema = toPaimonSchema(structType, rootFieldNames, createTableInfo.getPartitionDesc(),
createTableInfo.getProperties());
try {
catalog.createTable(new Identifier(createTableInfo.getDbName(), createTableInfo.getTableName()),
schema, createTableInfo.isIfNotExists());
Expand All @@ -228,7 +231,8 @@ public boolean performCreateTable(CreateTableInfo createTableInfo) throws UserEx
return false;
}

private Schema toPaimonSchema(StructType structType, PartitionDesc partitionDesc, Map<String, String> properties) {
private Schema toPaimonSchema(StructType structType, List<String> rootFieldNames, PartitionDesc partitionDesc,
Map<String, String> properties) {
Map<String, String> normalizedProperties = new HashMap<>(properties);
normalizedProperties.remove(PRIMARY_KEY_IDENTIFIER);
normalizedProperties.remove(PROP_COMMENT);
Expand All @@ -242,19 +246,31 @@ private Schema toPaimonSchema(StructType structType, PartitionDesc partitionDesc
.map(String::trim)
.collect(Collectors.toList());
List<String> partitionKeys = partitionDesc == null ? new ArrayList<>() : partitionDesc.getPartitionColNames();
primaryKeys = getPaimonColumnNames(rootFieldNames, primaryKeys);
partitionKeys = getPaimonColumnNames(rootFieldNames, partitionKeys);
Schema.Builder schemaBuilder = Schema.newBuilder()
.options(normalizedProperties)
.primaryKey(primaryKeys)
.partitionKeys(partitionKeys)
.comment(properties.getOrDefault(PROP_COMMENT, null));
for (StructField field : structType.getFields()) {
schemaBuilder.column(field.getName(),
List<StructField> fields = structType.getFields();
for (int i = 0; i < fields.size(); i++) {
StructField field = fields.get(i);
schemaBuilder.column(rootFieldNames.get(i),
toPaimontype(field.getType()).copy(field.getContainsNull()),
field.getComment());
}
return schemaBuilder.build();
}

private List<String> getPaimonColumnNames(List<String> paimonColumnNames, List<String> dorisColumnNames) {
Map<String, String> paimonColumnNameMap = paimonColumnNames.stream()
.collect(Collectors.toMap(name -> name.toLowerCase(Locale.ROOT), name -> name));
return dorisColumnNames.stream()
.map(name -> paimonColumnNameMap.getOrDefault(name.toLowerCase(Locale.ROOT), name))
.collect(Collectors.toList());
}

private DataType toPaimontype(Type type) {
return DorisTypeVisitor.visit(type, new DorisToPaimonTypeVisitor());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ private List<Column> buildFullSchema() {

for (DataField field : fields) {
Column column = new Column(
field.name().toLowerCase(),
field.name(),
PaimonUtil.paimonTypeToDorisType(
field.type(),
getCatalog().getEnableMappingVarbinary(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -504,7 +503,7 @@ public static List<Column> parseSchema(RowType rowType, List<String> primaryKeys
boolean enableTimestampTzMapping) {
List<Column> resSchema = Lists.newArrayListWithCapacity(rowType.getFields().size());
rowType.getFields().forEach(field -> {
resSchema.add(new Column(field.name().toLowerCase(),
resSchema.add(new Column(field.name(),
PaimonUtil.paimonTypeToDorisType(field.type(), enableVarbinaryMapping, enableTimestampTzMapping),
primaryKeys.contains(field.name()),
null,
Expand Down Expand Up @@ -553,7 +552,7 @@ public static Map<String, String> getPartitionInfoMap(Table table, BinaryRow par
try {
String partitionValue = serializePartitionValue(partitionType.getFields().get(i).type(),
partitionValuesArray[i], timeZone);
partitionInfoMap.put(partitionKeys.get(i).toLowerCase(Locale.ROOT), partitionValue);
partitionInfoMap.put(partitionKeys.get(i), partitionValue);
} catch (UnsupportedOperationException e) {
LOG.warn("Failed to serialize table {} partition value for key {}: {}", table.name(),
partitionKeys.get(i), e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class PaimonPredicateConverter {

public PaimonPredicateConverter(RowType rowType) {
this.builder = new PredicateBuilder(rowType);
this.fieldNames = rowType.getFields().stream().map(f -> f.name().toLowerCase()).collect(Collectors.toList());
this.fieldNames = rowType.getFields().stream().map(DataField::name).collect(Collectors.toList());
this.paimonFieldTypes = rowType.getFields().stream().map(DataField::type).collect(Collectors.toList());
}

Expand Down Expand Up @@ -100,7 +100,7 @@ private Predicate doInPredicate(InPredicate predicate) {
return null;
}
String colName = slotRef.getColumnName();
int idx = fieldNames.indexOf(colName);
int idx = getFieldIndex(colName);
DataType dataType = paimonFieldTypes.get(idx);
List<Object> valueList = new ArrayList<>();
for (int i = 1; i < predicate.getChildren().size(); i++) {
Expand Down Expand Up @@ -132,7 +132,7 @@ private Predicate binaryExprDesc(Expr dorisExpr) {
return null;
}
String colName = slotRef.getColumnName();
int idx = fieldNames.indexOf(colName);
int idx = getFieldIndex(colName);
DataType dataType = paimonFieldTypes.get(idx);
Object value = dataType.accept(new PaimonValueConverter(literalExpr));
if (value == null) {
Expand Down Expand Up @@ -174,6 +174,15 @@ private Predicate binaryExprDesc(Expr dorisExpr) {
return null;
}

private int getFieldIndex(String colName) {
for (int i = 0; i < fieldNames.size(); i++) {
if (fieldNames.get(i).equalsIgnoreCase(colName)) {
return i;
}
}
return fieldNames.indexOf(colName);
}


public static SlotRef convertDorisExprToSlotRef(Expr expr) {
SlotRef slotRef = null;
Expand Down
Loading
Loading