diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 673d988dabf7..c23038d70f34 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -1494,6 +1494,24 @@ String The Variant shredding schema for writing. + +
vector-field
+ (none) + String + Specifies column names that should be stored as vector type. This is used when you want to treat a ARRAY column as a VECTOR. + + +
vector.file.format
+ (none) + String + Specify the vector store file format. + + +
vector.target-file-size
+ (none) + MemorySize + Target size of a vector-store file. Default is 10 * TARGET_FILE_SIZE. +
visibility-callback.check-interval
10 s diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index d5934b65dbfb..913a3740d1f9 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -2304,6 +2304,31 @@ public InlineElement getDescription() { .withDescription( "The interval for checking visibility when visibility-callback enabled."); + public static final ConfigOption VECTOR_FILE_FORMAT = + key("vector.file.format") + .stringType() + .noDefaultValue() + .withDescription("Specify the vector store file format."); + + public static final ConfigOption VECTOR_FIELD = + key("vector-field") + .stringType() + .noDefaultValue() + .withDescription( + "Specifies column names that should be stored as vector type. " + + "This is used when you want to treat a ARRAY column as a VECTOR."); + + public static final ConfigOption VECTOR_TARGET_FILE_SIZE = + key("vector.target-file-size") + .memoryType() + .noDefaultValue() + .withDescription( + Description.builder() + .text( + "Target size of a vector-store file." + + " Default is 10 * TARGET_FILE_SIZE.") + .build()); + private final Options options; public CoreOptions(Map options) { @@ -3619,6 +3644,33 @@ public Duration visibilityCallbackCheckInterval() { return options.get(VISIBILITY_CALLBACK_CHECK_INTERVAL); } + public String vectorFileFormatString() { + return normalizeFileFormat(options.get(VECTOR_FILE_FORMAT)); + } + + public Set vectorField() { + String vectorFields = options.get(CoreOptions.VECTOR_FIELD); + if (vectorFields == null || vectorFields.trim().isEmpty()) { + return Collections.emptySet(); + } + return Arrays.stream(vectorFields.trim().split(",")).collect(Collectors.toSet()); + } + + public static Set vectorField(Map options) { + String vectorFields = options.getOrDefault(CoreOptions.VECTOR_FIELD.key(), null); + if (vectorFields == null || vectorFields.trim().isEmpty()) { + return Collections.emptySet(); + } + return Arrays.stream(vectorFields.trim().split(",")).collect(Collectors.toSet()); + } + + public long vectorTargetFileSize() { + // Since vectors are large, it would be better to set a larger target size for vectors. + return options.getOptional(VECTOR_TARGET_FILE_SIZE) + .map(MemorySize::getBytes) + .orElse(10 * targetFileSize(false)); + } + /** Specifies the merge engine for table with primary key. */ public enum MergeEngine implements DescribedEnum { DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."), diff --git a/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java b/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java index 90f52099b4f9..b184b9d0aa0d 100644 --- a/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java +++ b/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java @@ -107,6 +107,14 @@ public static FileFormat fileFormat(CoreOptions options) { return FileFormat.fromIdentifier(options.fileFormatString(), options.toConfiguration()); } + public static FileFormat vectorFileFormat(CoreOptions options) { + String vectorFileFormat = options.vectorFileFormatString(); + if (vectorFileFormat == null) { + return fileFormat(options); + } + return FileFormat.fromIdentifier(vectorFileFormat, options.toConfiguration()); + } + public static FileFormat manifestFormat(CoreOptions options) { return FileFormat.fromIdentifier(options.manifestFormatString(), options.toConfiguration()); } diff --git a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java index cd3f4f9c7052..104ce18b33ea 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java @@ -41,6 +41,7 @@ import org.apache.paimon.operation.BlobFileContext; import org.apache.paimon.options.MemorySize; import org.apache.paimon.reader.RecordReaderIterator; +import org.apache.paimon.types.DataField; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.BatchRecordWriter; import org.apache.paimon.utils.CommitIncrement; @@ -52,6 +53,7 @@ import org.apache.paimon.utils.SinkWriter.BufferedSinkWriter; import org.apache.paimon.utils.SinkWriter.DirectSinkWriter; import org.apache.paimon.utils.StatsCollectorFactories; +import org.apache.paimon.utils.VectorStoreUtils; import javax.annotation.Nullable; @@ -59,8 +61,12 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.function.Supplier; +import java.util.stream.Collectors; + +import static org.apache.paimon.types.BlobType.fieldsInBlobFile; /** * A {@link RecordWriter} implementation that only accepts records which are always insert @@ -71,8 +77,10 @@ public class AppendOnlyWriter implements BatchRecordWriter, MemoryOwner { private final FileIO fileIO; private final long schemaId; private final FileFormat fileFormat; + private final FileFormat vectorFileFormat; private final long targetFileSize; private final long blobTargetFileSize; + private final long vectorTargetFileSize; private final RowType writeSchema; @Nullable private final List writeCols; private final DataFilePathFactory pathFactory; @@ -103,8 +111,10 @@ public AppendOnlyWriter( @Nullable IOManager ioManager, long schemaId, FileFormat fileFormat, + FileFormat vectorFileFormat, long targetFileSize, long blobTargetFileSize, + long vectorTargetFileSize, RowType writeSchema, @Nullable List writeCols, long maxSequenceNumber, @@ -127,8 +137,10 @@ public AppendOnlyWriter( this.fileIO = fileIO; this.schemaId = schemaId; this.fileFormat = fileFormat; + this.vectorFileFormat = vectorFileFormat; this.targetFileSize = targetFileSize; this.blobTargetFileSize = blobTargetFileSize; + this.vectorTargetFileSize = vectorTargetFileSize; this.writeSchema = writeSchema; this.writeCols = writeCols; this.pathFactory = pathFactory; @@ -302,13 +314,38 @@ public void toBufferedWriter() throws Exception { } private RollingFileWriter createRollingRowWriter() { - if (blobContext != null) { - return new RollingBlobFileWriter( + boolean hasNormal, hasBlob, hasVectorStore; + { + hasBlob = (blobContext != null); + + List fieldsInVectorFile = + VectorStoreUtils.fieldsInVectorFile(writeSchema, fileFormat, vectorFileFormat); + Set vectorFieldNames = + fieldsInVectorFile.stream().map(DataField::name).collect(Collectors.toSet()); + hasVectorStore = !fieldsInVectorFile.isEmpty(); + + List fieldsInBlobFile = + hasBlob + ? fieldsInBlobFile(writeSchema, blobContext.blobDescriptorFields()) + : Collections.emptyList(); + Set blobFieldNames = + fieldsInBlobFile.stream().map(DataField::name).collect(Collectors.toSet()); + hasNormal = + writeSchema.getFields().stream() + .anyMatch( + f -> + !blobFieldNames.contains(f.name()) + && !vectorFieldNames.contains(f.name())); + } + if (hasBlob || (hasNormal && hasVectorStore)) { + return new DataEvolutionRollingFileWriter( fileIO, schemaId, fileFormat, + vectorFileFormat, targetFileSize, blobTargetFileSize, + vectorTargetFileSize, writeSchema, pathFactory, seqNumCounterProvider, @@ -319,11 +356,13 @@ private RollingFileWriter createRollingRowWriter() { statsDenseStore, blobContext); } + FileFormat realFileFormat = hasNormal ? fileFormat : vectorFileFormat; + long realTargetFileSize = hasNormal ? targetFileSize : vectorTargetFileSize; return new RowDataRollingFileWriter( fileIO, schemaId, - fileFormat, - targetFileSize, + realFileFormat, + realTargetFileSize, writeSchema, pathFactory, seqNumCounterProvider, diff --git a/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/DataEvolutionRollingFileWriter.java similarity index 63% rename from paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java rename to paimon-core/src/main/java/org/apache/paimon/append/DataEvolutionRollingFileWriter.java index eb4ec0a0c11c..3b1b67aaca6c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/DataEvolutionRollingFileWriter.java @@ -27,15 +27,18 @@ import org.apache.paimon.io.DataFilePathFactory; import org.apache.paimon.io.FileWriterAbortExecutor; import org.apache.paimon.io.RollingFileWriter; +import org.apache.paimon.io.RollingFileWriterImpl; import org.apache.paimon.io.RowDataFileWriter; import org.apache.paimon.io.SingleFileWriter; import org.apache.paimon.manifest.FileSource; import org.apache.paimon.operation.BlobFileContext; +import org.apache.paimon.statistics.SimpleColStatsCollector; import org.apache.paimon.types.DataField; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.LongCounter; import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.StatsCollectorFactories; +import org.apache.paimon.utils.VectorStoreUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,9 +51,11 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.function.Supplier; +import java.util.stream.Collectors; -import static org.apache.paimon.types.BlobType.fieldsNotInBlobFile; +import static org.apache.paimon.types.BlobType.fieldsInBlobFile; /** * A rolling file writer that handles both normal data and blob data. This writer creates separate @@ -72,9 +77,10 @@ * * */ -public class RollingBlobFileWriter implements RollingFileWriter { +public class DataEvolutionRollingFileWriter + implements RollingFileWriter { - private static final Logger LOG = LoggerFactory.getLogger(RollingBlobFileWriter.class); + private static final Logger LOG = LoggerFactory.getLogger(DataEvolutionRollingFileWriter.class); /** Constant for checking rolling condition periodically. */ private static final long CHECK_ROLLING_RECORD_CNT = 1000L; @@ -84,6 +90,10 @@ public class RollingBlobFileWriter implements RollingFileWriter, DataFileMeta>> writerFactory; private final Supplier blobWriterFactory; + private final Supplier< + ProjectedFileWriter< + RollingFileWriterImpl, List>> + vectorStoreWriterFactory; private final long targetFileSize; @Nullable private final ExternalStorageBlobWriter externalStorageBlobWriter; @@ -94,15 +104,20 @@ public class RollingBlobFileWriter implements RollingFileWriter, DataFileMeta> currentWriter; private MultipleBlobFileWriter blobWriter; + private ProjectedFileWriter< + RollingFileWriterImpl, List> + vectorStoreWriter; private long recordCount = 0; private boolean closed = false; - public RollingBlobFileWriter( + public DataEvolutionRollingFileWriter( FileIO fileIO, long schemaId, FileFormat fileFormat, + FileFormat vectorFileFormat, long targetFileSize, long blobTargetFileSize, + long vectorTargetFileSize, RowType writeSchema, DataFilePathFactory pathFactory, Supplier seqNumCounterSupplier, @@ -123,13 +138,33 @@ public RollingBlobFileWriter( // benefit from async write, but cost a lot. boolean asyncFileWrite = false; + // Split into normal, blob, and vector-store parts + List fieldsInBlobFile = + (context == null) + ? Collections.emptyList() + : fieldsInBlobFile(writeSchema, context.blobDescriptorFields()); + List fieldsInVectorFile = + VectorStoreUtils.fieldsInVectorFile(writeSchema, fileFormat, vectorFileFormat); + List fieldsInNormalFile = new ArrayList<>(); + { + Set fieldsNotInNormalFile = + fieldsInBlobFile.stream().map(DataField::name).collect(Collectors.toSet()); + fieldsNotInNormalFile.addAll( + fieldsInVectorFile.stream().map(DataField::name).collect(Collectors.toSet())); + for (DataField field : writeSchema.getFields()) { + if (!fieldsNotInNormalFile.contains(field.name())) { + fieldsInNormalFile.add(field); + } + } + } + // Initialize writer factory for normal data this.writerFactory = createNormalWriterFactory( fileIO, schemaId, fileFormat, - fieldsNotInBlobFile(writeSchema, context.blobDescriptorFields()), + fieldsInNormalFile, writeSchema, pathFactory, seqNumCounterSupplier, @@ -141,23 +176,27 @@ public RollingBlobFileWriter( statsDenseStore); // Initialize blob writer - this.blobWriterFactory = - () -> - new MultipleBlobFileWriter( - fileIO, - schemaId, - writeSchema, - pathFactory, - seqNumCounterSupplier, - fileSource, - asyncFileWrite, - statsDenseStore, - blobTargetFileSize, - context.blobConsumer(), - context.blobDescriptorFields()); + if (!fieldsInBlobFile.isEmpty()) { + this.blobWriterFactory = + () -> + new MultipleBlobFileWriter( + fileIO, + schemaId, + writeSchema, + pathFactory, + seqNumCounterSupplier, + fileSource, + asyncFileWrite, + statsDenseStore, + blobTargetFileSize, + context.blobConsumer(), + context.blobDescriptorFields()); + } else { + this.blobWriterFactory = null; + } // Initialize writer for descriptor fields backed by external storage if needed. - if (!context.blobExternalStorageFields().isEmpty()) { + if ((context != null) && !context.blobExternalStorageFields().isEmpty()) { this.externalStorageBlobWriter = new ExternalStorageBlobWriter( fileIO, @@ -174,6 +213,30 @@ public RollingBlobFileWriter( } else { this.externalStorageBlobWriter = null; } + + // Initialize vector-store writer + if (!fieldsInVectorFile.isEmpty()) { + List vectorFieldNames = + fieldsInVectorFile.stream().map(DataField::name).collect(Collectors.toList()); + this.vectorStoreWriterFactory = + () -> + createVectorStoreWriter( + fileIO, + schemaId, + vectorFileFormat, + fieldsInVectorFile, + writeSchema, + pathFactory, + seqNumCounterSupplier, + fileCompression, + statsCollectorFactories.statsCollectors(vectorFieldNames), + fileSource, + asyncFileWrite, + statsDenseStore, + vectorTargetFileSize); + } else { + this.vectorStoreWriterFactory = null; + } } /** Creates a factory for normal data writers. */ @@ -183,7 +246,7 @@ public RollingBlobFileWriter( FileIO fileIO, long schemaId, FileFormat fileFormat, - List fieldsNotInBlobFile, + List fieldsInNormalFile, RowType writeSchema, DataFilePathFactory pathFactory, Supplier seqNumCounterSupplier, @@ -193,7 +256,7 @@ public RollingBlobFileWriter( FileSource fileSource, boolean asyncFileWrite, boolean statsDenseStore) { - RowType normalRowType = new RowType(fieldsNotInBlobFile); + RowType normalRowType = new RowType(fieldsInNormalFile); List normalColumnNames = normalRowType.getFieldNames(); int[] projectionNormalFields = writeSchema.projectIndexes(normalColumnNames); @@ -220,6 +283,53 @@ public RollingBlobFileWriter( }; } + /** Creates a vector-store writer for handling vector-store data. */ + private static ProjectedFileWriter< + RollingFileWriterImpl, List> + createVectorStoreWriter( + FileIO fileIO, + long schemaId, + FileFormat vectorFileFormat, + List fieldsInVectorStore, + RowType writeSchema, + DataFilePathFactory pathFactory, + Supplier seqNumCounterSupplier, + String fileCompression, + SimpleColStatsCollector.Factory[] statsCollectors, + FileSource fileSource, + boolean asyncFileWrite, + boolean statsDenseStore, + long targetFileSize) { + RowType vectorStoreRowType = new RowType(fieldsInVectorStore); + List vectorStoreColumnNames = vectorStoreRowType.getFieldNames(); + int[] vectorStoreProjection = writeSchema.projectIndexes(vectorStoreColumnNames); + + DataFilePathFactory vectorStorePathFactory = + pathFactory.vectorStorePathFactory(vectorFileFormat.getFormatIdentifier()); + return new ProjectedFileWriter<>( + new RollingFileWriterImpl<>( + () -> + new RowDataFileWriter( + fileIO, + RollingFileWriter.createFileWriterContext( + vectorFileFormat, + vectorStoreRowType, + statsCollectors, + fileCompression), + vectorStorePathFactory.newPath(), + vectorStoreRowType, + schemaId, + seqNumCounterSupplier, + new FileIndexOptions(), + fileSource, + asyncFileWrite, + statsDenseStore, + pathFactory.isExternalPath(), + vectorStoreColumnNames), + targetFileSize), + vectorStoreProjection); + } + /** * Writes a single row to both normal and blob writers. Automatically handles file rolling when * target size is reached. @@ -239,10 +349,18 @@ public void write(InternalRow row) throws IOException { if (currentWriter == null) { currentWriter = writerFactory.get(); } - if (blobWriter == null) { + if ((blobWriter == null) && (blobWriterFactory != null)) { blobWriter = blobWriterFactory.get(); } - blobWriter.write(transformedRow); + if ((vectorStoreWriter == null) && (vectorStoreWriterFactory != null)) { + vectorStoreWriter = vectorStoreWriterFactory.get(); + } + if (blobWriter != null) { + blobWriter.write(transformedRow); + } + if (vectorStoreWriter != null) { + vectorStoreWriter.write(transformedRow); + } currentWriter.write(transformedRow); recordCount++; @@ -303,6 +421,10 @@ public void abort() { blobWriter.abort(); blobWriter = null; } + if (vectorStoreWriter != null) { + vectorStoreWriter.abort(); + vectorStoreWriter = null; + } if (externalStorageBlobWriter != null) { externalStorageBlobWriter.abort(); } @@ -332,12 +454,16 @@ private void closeCurrentWriter() throws IOException { // Close blob writer and process blob metadata List blobMetas = closeBlobWriter(); + // Close vector-store writer and process vector-store metadata + List vectorStoreMetas = closeVectorStoreWriter(); + // Validate consistency between main and blob files - validateFileConsistency(mainDataFileMeta, blobMetas); + validateFileConsistency(mainDataFileMeta, blobMetas, vectorStoreMetas); // Add results to the results list results.add(mainDataFileMeta); results.addAll(blobMetas); + results.addAll(vectorStoreMetas); // Reset current writer currentWriter = null; @@ -361,9 +487,22 @@ private List closeBlobWriter() throws IOException { return results; } + /** Closes the vector-store writer and processes blob metadata with appropriate tags. */ + private List closeVectorStoreWriter() throws IOException { + if (vectorStoreWriter == null) { + return Collections.emptyList(); + } + vectorStoreWriter.close(); + List results = vectorStoreWriter.result(); + vectorStoreWriter = null; + return results; + } + /** Validates that the row counts match between main and blob files. */ private void validateFileConsistency( - DataFileMeta mainDataFileMeta, List blobTaggedMetas) { + DataFileMeta mainDataFileMeta, + List blobTaggedMetas, + List vectorStoreMetas) { long mainRowCount = mainDataFileMeta.rowCount(); Map blobRowCounts = new HashMap<>(); @@ -371,6 +510,9 @@ private void validateFileConsistency( long count = file.rowCount(); blobRowCounts.compute(file.writeCols().get(0), (k, v) -> v == null ? count : v + count); } + long vectorStoreRowCount = + vectorStoreMetas.stream().mapToLong(DataFileMeta::rowCount).sum(); + for (String blobFieldName : blobRowCounts.keySet()) { long blobRowCount = blobRowCounts.get(blobFieldName); if (mainRowCount != blobRowCount) { @@ -381,6 +523,13 @@ private void validateFileConsistency( mainDataFileMeta, mainRowCount, blobFieldName, blobRowCount)); } } + if (!vectorStoreMetas.isEmpty() && (mainRowCount != vectorStoreRowCount)) { + throw new IllegalStateException( + String.format( + "This is a bug: The row count of main file and vector-store files does not match. " + + "Main file: %s (row count: %d), vector-store files: %s (total row count: %d)", + mainDataFileMeta, mainRowCount, vectorStoreMetas, vectorStoreRowCount)); + } } /** diff --git a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java index 7f9d5ebe69d5..3633bc50797e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java @@ -48,6 +48,7 @@ import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile; import static org.apache.paimon.manifest.ManifestFileMeta.allContainsRowId; import static org.apache.paimon.utils.Preconditions.checkArgument; +import static org.apache.paimon.utils.VectorStoreUtils.isVectorStoreFile; /** Compact coordinator to compact data evolution table. */ public class DataEvolutionCompactCoordinator { @@ -57,14 +58,16 @@ public class DataEvolutionCompactCoordinator { private final CompactScanner scanner; private final CompactPlanner planner; - public DataEvolutionCompactCoordinator(FileStoreTable table, boolean compactBlob) { - this(table, null, compactBlob); + public DataEvolutionCompactCoordinator( + FileStoreTable table, boolean compactBlob, boolean compactVector) { + this(table, null, compactBlob, compactVector); } public DataEvolutionCompactCoordinator( FileStoreTable table, @Nullable PartitionPredicate partitionPredicate, - boolean compactBlob) { + boolean compactBlob, + boolean compactVector) { CoreOptions options = table.coreOptions(); long targetFileSize = options.targetFileSize(false); long openFileCost = options.splitOpenFileCost(); @@ -75,7 +78,12 @@ public DataEvolutionCompactCoordinator( table.newSnapshotReader().withPartitionFilter(partitionPredicate), table.store().newScan()); this.planner = - new CompactPlanner(compactBlob, targetFileSize, openFileCost, compactMinFileNum); + new CompactPlanner( + compactBlob, + compactVector, + targetFileSize, + openFileCost, + compactMinFileNum); } public List plan() { @@ -131,16 +139,19 @@ List scan() { static class CompactPlanner { private final boolean compactBlob; + private final boolean compactVector; private final long targetFileSize; private final long openFileCost; private final long compactMinFileNum; CompactPlanner( boolean compactBlob, + boolean compactVector, long targetFileSize, long openFileCost, long compactMinFileNum) { this.compactBlob = compactBlob; + this.compactVector = compactVector; this.targetFileSize = targetFileSize; this.openFileCost = openFileCost; this.compactMinFileNum = compactMinFileNum; @@ -172,14 +183,19 @@ List compactPlan(List input) { for (List group : ranges) { List dataFiles = new ArrayList<>(); List blobFiles = new ArrayList<>(); + List vectorStoreFiles = new ArrayList<>(); TreeMap treeMap = new TreeMap<>(); Map> dataFileToBlobFiles = new HashMap<>(); + Map> dataFileToVectorStoreFiles = + new HashMap<>(); for (DataFileMeta f : group) { - if (!isBlobFile(f.fileName())) { + if (isBlobFile(f.fileName())) { + blobFiles.add(f); + } else if (isVectorStoreFile(f.fileName())) { + vectorStoreFiles.add(f); + } else { treeMap.put(f.nonNullFirstRowId(), f); dataFiles.add(f); - } else { - blobFiles.add(f); } } @@ -201,6 +217,25 @@ List compactPlan(List input) { } } } + if (compactVector) { + // associate vector-store files to data files + for (DataFileMeta vectorStoreFile : vectorStoreFiles) { + Long key = treeMap.floorKey(vectorStoreFile.nonNullFirstRowId()); + if (key != null) { + DataFileMeta dataFile = treeMap.get(key); + if (vectorStoreFile.nonNullFirstRowId() + >= dataFile.nonNullFirstRowId() + && vectorStoreFile.nonNullFirstRowId() + <= dataFile.nonNullFirstRowId() + + dataFile.rowCount() + - 1) { + dataFileToVectorStoreFiles + .computeIfAbsent(dataFile, k -> new ArrayList<>()) + .add(vectorStoreFile); + } + } + } + } RangeHelper rangeHelper2 = new RangeHelper<>(DataFileMeta::nonNullRowIdRange); @@ -220,10 +255,19 @@ List compactPlan(List input) { .sum(); if (currentGroupWeight > targetFileSize) { // compact current file group to merge field files - tasks.addAll(triggerTask(fileGroup, partition, dataFileToBlobFiles)); + tasks.addAll( + triggerTask( + fileGroup, + partition, + dataFileToBlobFiles, + dataFileToVectorStoreFiles)); // compact wait compact files tasks.addAll( - triggerTask(waitCompactFiles, partition, dataFileToBlobFiles)); + triggerTask( + waitCompactFiles, + partition, + dataFileToBlobFiles, + dataFileToVectorStoreFiles)); waitCompactFiles = new ArrayList<>(); weightSum = 0; } else { @@ -232,13 +276,21 @@ List compactPlan(List input) { if (weightSum > targetFileSize) { tasks.addAll( triggerTask( - waitCompactFiles, partition, dataFileToBlobFiles)); + waitCompactFiles, + partition, + dataFileToBlobFiles, + dataFileToVectorStoreFiles)); waitCompactFiles = new ArrayList<>(); weightSum = 0L; } } } - tasks.addAll(triggerTask(waitCompactFiles, partition, dataFileToBlobFiles)); + tasks.addAll( + triggerTask( + waitCompactFiles, + partition, + dataFileToBlobFiles, + dataFileToVectorStoreFiles)); } } return tasks; @@ -247,7 +299,8 @@ List compactPlan(List input) { private List triggerTask( List dataFiles, BinaryRow partition, - Map> dataFileToBlobFiles) { + Map> dataFileToBlobFiles, + Map> dataFileToVectorStoreFiles) { List tasks = new ArrayList<>(); if (dataFiles.size() >= compactMinFileNum) { tasks.add(new DataEvolutionCompactTask(partition, dataFiles, false)); @@ -263,6 +316,18 @@ private List triggerTask( tasks.add(new DataEvolutionCompactTask(partition, blobFiles, true)); } } + + if (compactVector) { + List vectorStoreFiles = new ArrayList<>(); + for (DataFileMeta dataFile : dataFiles) { + vectorStoreFiles.addAll( + dataFileToVectorStoreFiles.getOrDefault( + dataFile, Collections.emptyList())); + } + if (vectorStoreFiles.size() >= compactMinFileNum) { + tasks.add(new DataEvolutionCompactTask(partition, vectorStoreFiles, false)); + } + } return tasks; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTask.java b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTask.java index abbc03f17baf..61e8acf0b2fb 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTask.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTask.java @@ -23,6 +23,7 @@ import org.apache.paimon.append.AppendCompactTask; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.format.FileFormat; import org.apache.paimon.io.CompactIncrement; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataIncrement; @@ -32,15 +33,18 @@ import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.table.sink.CommitMessageImpl; import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypeRoot; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.RecordWriter; +import org.apache.paimon.utils.VectorStoreUtils; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.stream.Collectors; import static org.apache.paimon.utils.Preconditions.checkArgument; @@ -68,6 +72,18 @@ public CommitMessage doCompact(FileStoreTable table, String commitUser) throws E // TODO: support blob file compaction throw new UnsupportedOperationException("Blob task is not supported"); } + if (VectorStoreUtils.isVectorStoreFile(compactBefore.get(0).fileName())) { + // TODO: support vector-store file compaction + throw new UnsupportedOperationException("Vector-store task is not supported"); + } + Set separatedVectorStoreFields = + VectorStoreUtils.fieldsInVectorFile( + table.rowType(), + FileFormat.vectorFileFormat(table.coreOptions()), + FileFormat.fileFormat(table.coreOptions())) + .stream() + .map(DataField::name) + .collect(Collectors.toSet()); table = table.copy(DYNAMIC_WRITE_OPTIONS); long firstRowId = compactBefore.get(0).nonNullFirstRowId(); @@ -76,6 +92,7 @@ public CommitMessage doCompact(FileStoreTable table, String commitUser) throws E new RowType( table.rowType().getFields().stream() .filter(f -> f.type().getTypeRoot() != DataTypeRoot.BLOB) + .filter(f -> !separatedVectorStoreFields.contains(f.name())) .collect(Collectors.toList())); FileStorePathFactory pathFactory = table.store().pathFactory(); AppendOnlyFileStore store = (AppendOnlyFileStore) table.store(); diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java index 608262bc1972..cad95a9f2ac6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java @@ -59,14 +59,36 @@ public DataFilePathFactory( boolean fileSuffixIncludeCompression, String fileCompression, @Nullable ExternalPathProvider externalPathProvider) { + this( + parent, + UUID.randomUUID().toString(), + new AtomicInteger(0), + formatIdentifier, + dataFilePrefix, + changelogFilePrefix, + fileSuffixIncludeCompression, + compressFileExtension(fileCompression), + externalPathProvider); + } + + private DataFilePathFactory( + Path parent, + String uuid, + AtomicInteger pathCount, + String formatIdentifier, + String dataFilePrefix, + String changelogFilePrefix, + boolean fileSuffixIncludeCompression, + @Nullable String compressExtension, + @Nullable ExternalPathProvider externalPathProvider) { this.parent = parent; - this.uuid = UUID.randomUUID().toString(); - this.pathCount = new AtomicInteger(0); + this.uuid = uuid; + this.pathCount = pathCount; this.formatIdentifier = formatIdentifier; this.dataFilePrefix = dataFilePrefix; this.changelogFilePrefix = changelogFilePrefix; this.fileSuffixIncludeCompression = fileSuffixIncludeCompression; - this.compressExtension = compressFileExtension(fileCompression); + this.compressExtension = compressExtension; this.externalPathProvider = externalPathProvider; } @@ -105,6 +127,10 @@ public Path newPath(String prefix) { } private String newFileName(String prefix) { + return newFileName(prefix, makeExtension(compressExtension, formatIdentifier)); + } + + protected String makeExtension(String compressExtension, String formatIdentifier) { String extension; if (compressExtension != null && isTextFormat(formatIdentifier)) { extension = "." + formatIdentifier + "." + compressExtension; @@ -113,7 +139,7 @@ private String newFileName(String prefix) { } else { extension = "." + formatIdentifier; } - return newFileName(prefix, extension); + return extension; } public Path newPathFromExtension(String extension) { @@ -127,7 +153,7 @@ public Path newPathFromName(String fileName) { return new Path(parent, fileName); } - private String newFileName(String prefix, String extension) { + protected String newFileName(String prefix, String extension) { return prefix + uuid + "-" + pathCount.getAndIncrement() + extension; } @@ -217,4 +243,28 @@ private static String compressFileExtension(String compression) { } return compression; } + + public DataFilePathFactory vectorStorePathFactory(String formatIdentifier) { + return new VectorStoreWrapper(this, formatIdentifier); + } + + private static class VectorStoreWrapper extends DataFilePathFactory { + private VectorStoreWrapper(DataFilePathFactory base, String formatIdentifier) { + super( + base.parent, + base.uuid, + base.pathCount, + formatIdentifier, + base.dataFilePrefix, + base.changelogFilePrefix, + base.fileSuffixIncludeCompression, + base.compressExtension, + base.externalPathProvider); + } + + @Override + protected String makeExtension(String compressExtension, String formatIdentifier) { + return ".vector" + super.makeExtension(compressExtension, formatIdentifier); + } + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java index a714658f2ce1..94620205a232 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java @@ -131,8 +131,10 @@ protected RecordWriter createWriter( ioManager, schemaId, fileFormat, + FileFormat.vectorFileFormat(options), options.targetFileSize(false), options.blobTargetFileSize(), + options.vectorTargetFileSize(), writeType, writeCols, restoredMaxSeqNumber, diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java index 7ac829be01d3..6cf335a35b6b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java @@ -59,6 +59,7 @@ import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile; import static org.apache.paimon.manifest.ManifestFileMeta.allContainsRowId; import static org.apache.paimon.utils.Preconditions.checkNotNull; +import static org.apache.paimon.utils.VectorStoreUtils.isVectorStoreFile; /** {@link FileStoreScan} for data-evolution enabled table. */ public class DataEvolutionFileStoreScan extends AppendOnlyFileStoreScan { @@ -204,10 +205,11 @@ static EvolutionStats evolutionStats( TableSchema schema, Function scanTableSchema, List metas) { - // exclude blob files, useless for predicate eval + // exclude blob and vector-store files, useless for predicate eval metas = metas.stream() .filter(entry -> !isBlobFile(entry.file().fileName())) + .filter(entry -> !isVectorStoreFile(entry.file().fileName())) .collect(Collectors.toList()); ToLongFunction maxSeqFunc = e -> e.file().maxSequenceNumber(); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java index ec38b3bf1562..fb9d51080783 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java @@ -63,6 +63,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.TreeMap; import java.util.function.Function; import java.util.function.ToLongFunction; import java.util.stream.Collectors; @@ -73,6 +75,8 @@ import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile; import static org.apache.paimon.table.SpecialFields.rowTypeWithRowTracking; import static org.apache.paimon.utils.Preconditions.checkArgument; +import static org.apache.paimon.utils.Preconditions.checkNotNull; +import static org.apache.paimon.utils.VectorStoreUtils.isVectorStoreFile; /** * A union {@link SplitRead} to read multiple inner files to merge columns, note that this class @@ -219,13 +223,10 @@ private DataEvolutionFileReader createUnionReader( needMergeFiles, file -> { checkArgument( - isBlobFile(file.fileName()), - "Only blob file need to call this method."); - return schemaFetcher - .apply(file.schemaId()) - .logicalRowType() - .getField(file.writeCols().get(0)) - .id(); + isBlobFile(file.fileName()) + || isVectorStoreFile(file.fileName()), + "Only blob/vector-store files need to call this method."); + return schemaFetcher.apply(file.schemaId()).logicalRowType(); }); long rowCount = fieldsFiles.get(0).rowCount(); @@ -416,25 +417,40 @@ private FileRecordReader createFileReader( @VisibleForTesting public static List splitFieldBunches( - List needMergeFiles, Function blobFileToFieldId) { - return splitFieldBunches(needMergeFiles, blobFileToFieldId, false); + List needMergeFiles, Function fileToRowType) { + return splitFieldBunches(needMergeFiles, fileToRowType, false); } @VisibleForTesting public static List splitFieldBunches( List needMergeFiles, - Function blobFileToFieldId, + Function fileToRowType, boolean rowIdPushDown) { List fieldsFiles = new ArrayList<>(); - Map blobBunchMap = new HashMap<>(); + Map blobBunchMap = new HashMap<>(); + Map vectorStoreBunchMap = new TreeMap<>(); long rowCount = -1; for (DataFileMeta file : needMergeFiles) { if (isBlobFile(file.fileName())) { - int fieldId = blobFileToFieldId.apply(file); + RowType rowType = fileToRowType.apply(file); + int fieldId = rowType.getField(file.writeCols().get(0)).id(); final long expectedRowCount = rowCount; blobBunchMap .computeIfAbsent( - fieldId, key -> new BlobBunch(expectedRowCount, rowIdPushDown)) + fieldId, + key -> new SpecialFieldBunch(expectedRowCount, rowIdPushDown)) + .add(file); + } else if (isVectorStoreFile(file.fileName())) { + RowType rowType = fileToRowType.apply(file); + String fileFormat = DataFilePathFactory.formatIdentifier(file.fileName()); + VectorStoreBunchKey vectorStoreKey = + new VectorStoreBunchKey( + file.schemaId(), fileFormat, file.writeCols(), rowType); + final long expectedRowCount = rowCount; + vectorStoreBunchMap + .computeIfAbsent( + vectorStoreKey, + key -> new SpecialFieldBunch(expectedRowCount, rowIdPushDown)) .add(file); } else { // Normal file, just add it to the current merge split @@ -443,6 +459,7 @@ public static List splitFieldBunches( } } fieldsFiles.addAll(blobBunchMap.values()); + fieldsFiles.addAll(vectorStoreBunchMap.values()); return fieldsFiles; } @@ -474,7 +491,7 @@ public List files() { } @VisibleForTesting - static class BlobBunch implements FieldBunch { + static class SpecialFieldBunch implements FieldBunch { final List files; final long expectedRowCount; @@ -485,7 +502,7 @@ static class BlobBunch implements FieldBunch { long latestMaxSequenceNumber = -1; long rowCount; - BlobBunch(long expectedRowCount, boolean rowIdPushDown) { + SpecialFieldBunch(long expectedRowCount, boolean rowIdPushDown) { this.files = new ArrayList<>(); this.rowCount = 0; this.expectedRowCount = expectedRowCount; @@ -493,14 +510,15 @@ static class BlobBunch implements FieldBunch { } void add(DataFileMeta file) { - if (!isBlobFile(file.fileName())) { - throw new IllegalArgumentException("Only blob file can be added to a blob bunch."); + if (!isBlobFile(file.fileName()) && !isVectorStoreFile(file.fileName())) { + throw new IllegalArgumentException( + "Only blob/vector-store file can be added to this bunch."); } if (file.nonNullFirstRowId() == latestFistRowId) { if (file.maxSequenceNumber() >= latestMaxSequenceNumber) { throw new IllegalArgumentException( - "Blob file with same first row id should have decreasing sequence number."); + "Blob/vector-store file with same first row id should have decreasing sequence number."); } return; } @@ -519,11 +537,11 @@ void add(DataFileMeta file) { if (firstRowId < expectedNextFirstRowId) { checkArgument( file.maxSequenceNumber() < latestMaxSequenceNumber, - "Blob file with overlapping row id should have decreasing sequence number."); + "Blob/vector-store file with overlapping row id should have decreasing sequence number."); return; } else if (firstRowId > expectedNextFirstRowId) { throw new IllegalArgumentException( - "Blob file first row id should be continuous, expect " + "Blob/vector-store file first row id should be continuous, expect " + expectedNextFirstRowId + " but got " + firstRowId); @@ -532,17 +550,17 @@ void add(DataFileMeta file) { if (!files.isEmpty()) { checkArgument( file.schemaId() == files.get(0).schemaId(), - "All files in a blob bunch should have the same schema id."); + "All files in this bunch should have the same schema id."); checkArgument( file.writeCols().equals(files.get(0).writeCols()), - "All files in a blob bunch should have the same write columns."); + "All files in this bunch should have the same write columns."); } } files.add(file); rowCount += file.rowCount(); checkArgument( rowCount <= expectedRowCount, - "Blob files row count exceed the expect " + expectedRowCount); + "Blob/vector-store files row count exceed the expect " + expectedRowCount); this.latestMaxSequenceNumber = file.maxSequenceNumber(); this.latestFistRowId = file.nonNullFirstRowId(); this.expectedNextFirstRowId = latestFistRowId + file.rowCount(); @@ -565,14 +583,17 @@ public static List> mergeRangesAndSort(List fil RangeHelper rangeHelper = new RangeHelper<>(DataFileMeta::nonNullRowIdRange); List> result = rangeHelper.mergeOverlappingRanges(files); - // in group, sort by blob file and max_seq + // in group, sort by blob/vector-store file and max_seq for (List group : result) { - // split to data files and blob files + // split to data files, blob files, vector-store files List dataFiles = new ArrayList<>(); List blobFiles = new ArrayList<>(); + List vectorStoreFiles = new ArrayList<>(); for (DataFileMeta f : group) { if (isBlobFile(f.fileName())) { blobFiles.add(f); + } else if (isVectorStoreFile(f.fileName())) { + vectorStoreFiles.add(f); } else { dataFiles.add(f); } @@ -590,12 +611,111 @@ public static List> mergeRangesAndSort(List fil comparingLong(DataFileMeta::nonNullFirstRowId) .thenComparing(reverseOrder(comparingLong(maxSeqF)))); - // concat data files and blob files + // vector-store files sort by first row id then by reversed max sequence number + vectorStoreFiles.sort( + comparingLong(DataFileMeta::nonNullFirstRowId) + .thenComparing(reverseOrder(comparingLong(maxSeqF)))); + + // concat data files, blob files, vector-store files group.clear(); group.addAll(dataFiles); group.addAll(blobFiles); + group.addAll(vectorStoreFiles); } return result; } + + static final class VectorStoreBunchKey implements Comparable { + public final long schemaId; + public final String formatIdentifier; + public final List writeCols; + + public VectorStoreBunchKey( + long schemaId, + String formatIdentifier, + List writeCols, + RowType preferredColOrder) { + this.schemaId = schemaId; + this.formatIdentifier = checkNotNull(formatIdentifier, "formatIdentifier"); + this.writeCols = normalizeWriteCols(writeCols, preferredColOrder); + } + + @Override + public int compareTo(VectorStoreBunchKey o) { + int c = Long.compare(this.schemaId, o.schemaId); + if (c != 0) { + return c; + } + + c = this.formatIdentifier.compareTo(o.formatIdentifier); + if (c != 0) { + return c; + } + + int n = Math.min(this.writeCols.size(), o.writeCols.size()); + for (int i = 0; i < n; i++) { + c = this.writeCols.get(i).compareTo(o.writeCols.get(i)); + if (c != 0) { + return c; + } + } + return Integer.compare(this.writeCols.size(), o.writeCols.size()); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof VectorStoreBunchKey)) { + return false; + } + VectorStoreBunchKey that = (VectorStoreBunchKey) o; + return schemaId == that.schemaId + && formatIdentifier.equals(that.formatIdentifier) + && writeCols.equals(that.writeCols); + } + + @Override + public int hashCode() { + return Objects.hash(schemaId, formatIdentifier, writeCols); + } + + @Override + public String toString() { + return "VectorStoreBunchKey{schemaId=" + + schemaId + + ", format=" + + formatIdentifier + + ", writeCols=" + + writeCols + + "}"; + } + + private static List normalizeWriteCols(List writeCols, RowType rowType) { + if (writeCols == null || writeCols.isEmpty()) { + return Collections.emptyList(); + } + + Map colPosMap = new HashMap<>(); + List namesInRowType = rowType.getFieldNames(); + for (int i = 0; i < namesInRowType.size(); i++) { + colPosMap.putIfAbsent(namesInRowType.get(i), i); + } + + ArrayList sorted = new ArrayList<>(writeCols); + sorted.sort( + (a, b) -> { + int ia = colPosMap.getOrDefault(a, Integer.MAX_VALUE); + int ib = colPosMap.getOrDefault(b, Integer.MAX_VALUE); + if (ia != ib) { + return Integer.compare(ia, ib); + } + return a.compareTo(b); + }); + + return sorted; + } + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowTrackingCommitUtils.java b/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowTrackingCommitUtils.java index d2f3dc8851ef..fe66f00fd1e2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowTrackingCommitUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowTrackingCommitUtils.java @@ -30,6 +30,7 @@ import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile; import static org.apache.paimon.utils.Preconditions.checkArgument; +import static org.apache.paimon.utils.VectorStoreUtils.isVectorStoreFile; /** Utils for row tracking commit. */ public class RowTrackingCommitUtils { @@ -68,6 +69,7 @@ private static long assignRowTrackingMeta( long start = firstRowIdStart; long blobStartDefault = firstRowIdStart; Map blobStarts = new HashMap<>(); + long vectorStoreStart = firstRowIdStart; for (ManifestEntry entry : deltaFiles) { Optional fileSource = entry.file().fileSource(); checkArgument( @@ -91,6 +93,15 @@ private static long assignRowTrackingMeta( } rowIdAssigned.add(entry.assignFirstRowId(blobStart)); blobStarts.put(blobFieldName, blobStart + rowCount); + } else if (isVectorStoreFile(entry.file().fileName())) { + if (vectorStoreStart >= start) { + throw new IllegalStateException( + String.format( + "This is a bug, vectorStoreStart %d should be less than start %d when assigning a vector-store entry file.", + vectorStoreStart, start)); + } + rowIdAssigned.add(entry.assignFirstRowId(vectorStoreStart)); + vectorStoreStart += rowCount; } else { rowIdAssigned.add(entry.assignFirstRowId(start)); blobStartDefault = start; diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java index f353f62780a8..b80b64dd5fbc 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java @@ -42,7 +42,9 @@ import org.apache.paimon.types.TimestampType; import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.StringUtils; +import org.apache.paimon.utils.VectorStoreUtils; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -77,11 +79,13 @@ import static org.apache.paimon.table.PrimaryKeyTableUtils.createMergeFunctionFactory; import static org.apache.paimon.table.SpecialFields.KEY_FIELD_PREFIX; import static org.apache.paimon.table.SpecialFields.SYSTEM_FIELD_NAMES; +import static org.apache.paimon.types.BlobType.fieldsInBlobFile; import static org.apache.paimon.types.BlobType.fieldsNotInBlobFile; import static org.apache.paimon.types.DataTypeRoot.ARRAY; import static org.apache.paimon.types.DataTypeRoot.MAP; import static org.apache.paimon.types.DataTypeRoot.MULTISET; import static org.apache.paimon.types.DataTypeRoot.ROW; +import static org.apache.paimon.types.DataTypeRoot.VECTOR; import static org.apache.paimon.utils.Preconditions.checkArgument; import static org.apache.paimon.utils.Preconditions.checkState; @@ -161,8 +165,27 @@ public static void validateTableSchema(TableSchema schema) { RowType tableRowType = new RowType(schema.fields()); Set blobDescriptorFields = validateBlobDescriptorFields(tableRowType, options); validateBlobExternalStorageFields(tableRowType, options, blobDescriptorFields); - fileFormat.validateDataFields( - new RowType(fieldsNotInBlobFile(tableRowType, blobDescriptorFields))); + + List fieldsInNormalFile = new ArrayList<>(); + { + Set fieldsInBlobFile = + fieldsInBlobFile(tableRowType, blobDescriptorFields).stream() + .map(DataField::name) + .collect(Collectors.toSet()); + Set fieldsInVectorFile = + VectorStoreUtils.fieldsInVectorFile( + tableRowType, fileFormat, FileFormat.vectorFileFormat(options)) + .stream() + .map(DataField::name) + .collect(Collectors.toSet()); + for (DataField field : tableRowType.getFields()) { + if (!fieldsInBlobFile.contains(field.name()) + && !fieldsInVectorFile.contains(field.name())) { + fieldsInNormalFile.add(field); + } + } + } + fileFormat.validateDataFields(new RowType(fieldsInNormalFile)); // Check column names in schema schema.fieldNames() @@ -239,6 +262,24 @@ public static void validateTableSchema(TableSchema schema) { validateForDeletionVectors(options); } + // vector field names must point to vector type + Set fieldNamesSpecifiedAsVector = options.vectorField(); + schema.fields() + .forEach( + f -> + checkState( + !fieldNamesSpecifiedAsVector.contains(f.name()) + || VECTOR.equals(f.type().getTypeRoot()), + String.format( + "Field name[%s] is configured as vector-field so" + + " the type must be vector, but it is %s", + f.name(), f.type()))); + // vector field names must exist in table schema + schema.fieldNames().forEach(fieldNamesSpecifiedAsVector::remove); + checkArgument( + fieldNamesSpecifiedAsVector.isEmpty(), + "Some of the columns specified as vector-field are unknown."); + validateMergeFunctionFactory(schema); validateRowTracking(schema, options); @@ -627,6 +668,31 @@ private static void validateRowTracking(TableSchema schema, CoreOptions options) blobNames.stream().noneMatch(schema.partitionKeys()::contains), "The BLOB type column can not be part of partition keys."); } + + FileFormat fileFormat = FileFormat.fileFormat(options); + FileFormat vectorFileFormat = FileFormat.vectorFileFormat(options); + if (VectorStoreUtils.isDifferentFormat(vectorFileFormat, fileFormat)) { + List vectorStoreFields = + VectorStoreUtils.fieldsInVectorFile( + schema.logicalRowType(), fileFormat, vectorFileFormat); + Set vectorStoreNames = + vectorStoreFields.stream().map(DataField::name).collect(Collectors.toSet()); + List fieldsNotInBlobFile = + fieldsNotInBlobFile(schema.logicalRowType(), options.blobDescriptorField()); + Set nonBlobNames = + fieldsNotInBlobFile.stream().map(DataField::name).collect(Collectors.toSet()); + checkArgument( + schema.partitionKeys().stream().noneMatch(vectorStoreNames::contains), + "The vector-store columns can not be part of partition keys."); + checkArgument( + options.dataEvolutionEnabled(), + "Data evolution config must enabled for table with vector-store file format."); + + List fieldsInVectorFile = + VectorStoreUtils.fieldsInVectorFile( + schema.logicalRowType(), fileFormat, vectorFileFormat); + vectorFileFormat.validateDataFields(new RowType(fieldsInVectorFile)); + } } private static Set validateBlobDescriptorFields(RowType rowType, CoreOptions options) { diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/VectorStoreUtils.java b/paimon-core/src/main/java/org/apache/paimon/utils/VectorStoreUtils.java new file mode 100644 index 000000000000..fab2aba88a56 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/utils/VectorStoreUtils.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import org.apache.paimon.format.FileFormat; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypeRoot; +import org.apache.paimon.types.RowType; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** Utils for vector-store table. */ +public class VectorStoreUtils { + public static boolean isDifferentFormat(FileFormat vectorStoreFormat, FileFormat normalFormat) { + return (vectorStoreFormat != null) + && !vectorStoreFormat + .getFormatIdentifier() + .equals(normalFormat.getFormatIdentifier()); + } + + public static boolean isVectorStoreFile(String fileName) { + return fileName.contains(".vector."); + } + + public static List fieldsInVectorFile( + RowType rowType, FileFormat normalFormat, FileFormat vectorStoreFormat) { + if (!isDifferentFormat(vectorStoreFormat, normalFormat)) { + return Collections.emptyList(); + } + List result = new ArrayList<>(); + rowType.getFields() + .forEach( + field -> { + DataTypeRoot type = field.type().getTypeRoot(); + if (type == DataTypeRoot.VECTOR) { + result.add(field); + } + }); + return result; + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java index b2d820adf846..731c27fd2375 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java @@ -23,6 +23,7 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.BinaryRowWriter; import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.BinaryVector; import org.apache.paimon.data.BlobData; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; @@ -51,6 +52,7 @@ import org.apache.paimon.table.FileStoreTableFactory; import org.apache.paimon.types.BlobType; import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.IntType; import org.apache.paimon.types.RowType; import org.apache.paimon.types.VarCharType; @@ -61,6 +63,7 @@ import org.apache.paimon.utils.StatsCollectorFactories; import org.assertj.core.api.Assertions; +import org.junit.Ignore; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -74,6 +77,7 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; @@ -576,6 +580,30 @@ public void testNonSpillable() throws Exception { writer.close(); } + @Ignore // TODO this can be enabled after avro supports vector + @Test + public void testVectorStoreSameFormatUsesRowDataWriter() throws Exception { + RowType vectorStoreSchema = + RowType.builder() + .fields( + new DataType[] { + new IntType(), + new VarCharType(), + DataTypes.VECTOR(3, DataTypes.FLOAT()) + }, + new String[] {"id", "name", "embed"}) + .build(); + FileFormat format = FileFormat.fromIdentifier(AVRO, new Options()); + AppendOnlyWriter writer = createVectorStoreWriter(1024 * 1024L, format, vectorStoreSchema); + writer.write(rowWithVectors(1, "AAA", new float[] {1.0f, 2.0f, 3.0f})); + CommitIncrement increment = writer.prepareCommit(true); + writer.close(); + + assertThat(increment.newFilesIncrement().newFiles()).hasSize(1); + DataFileMeta meta = increment.newFilesIncrement().newFiles().get(0); + assertThat(meta.fileName()).doesNotContain(".vector"); + } + private SimpleColStats initStats(Integer min, Integer max, long nullCount) { return new SimpleColStats(min, max, nullCount); } @@ -589,6 +617,16 @@ private InternalRow row(int id, String name, String dt) { return GenericRow.of(id, BinaryString.fromString(name), BinaryString.fromString(dt)); } + private InternalRow rowWithVectors(int id, String name, float[]... vectors) { + GenericRow row = new GenericRow(vectors.length + 2); + row.setField(0, id); + row.setField(1, BinaryString.fromString(name)); + for (int i = 0; i < vectors.length; ++i) { + row.setField(i + 2, BinaryVector.fromPrimitiveArray(vectors[i])); + } + return row; + } + private DataFilePathFactory createPathFactory() { return new DataFilePathFactory( new Path(tempDir + "/dt=" + PART + "/bucket-0"), @@ -666,6 +704,56 @@ private Pair> createWriter( boolean hasIoManager, List scannedFiles, CountDownLatch latch) { + Map options = new HashMap<>(); + options.put("metadata.stats-mode", "truncate(16)"); + return createWriterBase( + targetFileSize, + null, + AppendOnlyWriterTest.SCHEMA, + forceCompact, + useWriteBuffer, + spillable, + hasIoManager, + scannedFiles, + compactBefore -> { + latch.await(); + return compactBefore.isEmpty() + ? Collections.emptyList() + : Collections.singletonList(generateCompactAfter(compactBefore)); + }, + options); + } + + private AppendOnlyWriter createVectorStoreWriter( + long targetFileSize, FileFormat vectorFileFormat, RowType writeSchema) { + Map options = new HashMap<>(); + options.put("metadata.stats-mode", "truncate(16)"); + options.put(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true"); + return createWriterBase( + targetFileSize, + vectorFileFormat, + writeSchema, + false, + true, + false, + true, + Collections.emptyList(), + compactBefore -> Collections.emptyList(), + options) + .getKey(); + } + + private Pair> createWriterBase( + long targetFileSize, + FileFormat vectorFileFormat, + RowType writeSchema, + boolean forceCompact, + boolean useWriteBuffer, + boolean spillable, + boolean hasIoManager, + List scannedFiles, + BucketedAppendCompactManager.CompactRewriter rewriter, + Map optionsMap) { FileFormat fileFormat = FileFormat.fromIdentifier(AVRO, new Options()); LinkedList toCompact = new LinkedList<>(scannedFiles); BucketedAppendCompactManager compactManager = @@ -678,25 +766,20 @@ private Pair> createWriter( targetFileSize, targetFileSize / 10 * 7, false, - compactBefore -> { - latch.await(); - return compactBefore.isEmpty() - ? Collections.emptyList() - : Collections.singletonList( - generateCompactAfter(compactBefore)); - }, + rewriter, null); - CoreOptions options = - new CoreOptions(Collections.singletonMap("metadata.stats-mode", "truncate(16)")); + CoreOptions options = new CoreOptions(optionsMap); AppendOnlyWriter writer = new AppendOnlyWriter( LocalFileIO.create(), hasIoManager ? IOManager.create(tempDir.toString()) : null, SCHEMA_ID, fileFormat, + vectorFileFormat, + targetFileSize, targetFileSize, targetFileSize, - AppendOnlyWriterTest.SCHEMA, + writeSchema, null, getMaxSequenceNumber(toCompact), compactManager, @@ -714,7 +797,7 @@ private Pair> createWriter( true, false, options.dataEvolutionEnabled(), - BlobFileContext.create(AppendOnlyWriterTest.SCHEMA, options)); + BlobFileContext.create(writeSchema, options)); writer.setMemoryPool( new HeapMemorySegmentPool(options.writeBufferSize(), options.pageSize())); return Pair.of(writer, compactManager.allFiles()); diff --git a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java index dc0c6c0ac491..1dfc333f0536 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java @@ -38,7 +38,9 @@ import org.apache.paimon.table.TableTestBase; import org.apache.paimon.table.sink.BatchTableWrite; import org.apache.paimon.table.source.ReadBuilder; +import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; import org.apache.paimon.utils.Range; import org.apache.paimon.utils.UriReader; @@ -56,6 +58,7 @@ import java.util.Iterator; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -112,7 +115,8 @@ public void testBasic() throws Exception { .collect(Collectors.toList()); List fieldGroups = - DataEvolutionSplitRead.splitFieldBunches(filesMetas, key -> 0); + DataEvolutionSplitRead.splitFieldBunches( + filesMetas, key -> makeBlobRowType(key.writeCols(), f -> 0)); assertThat(fieldGroups.size()).isEqualTo(2); assertThat(fieldGroups.get(0).files().size()).isEqualTo(1); @@ -162,7 +166,8 @@ public void testMultiBatch() throws Exception { assertThat(batches.size()).isEqualTo(2); for (List batch : batches) { List fieldGroups = - DataEvolutionSplitRead.splitFieldBunches(batch, file -> 0); + DataEvolutionSplitRead.splitFieldBunches( + batch, file -> makeBlobRowType(file.writeCols(), f -> 0)); assertThat(fieldGroups.size()).isEqualTo(2); assertThat(fieldGroups.get(0).files().size()).isEqualTo(1); assertThat(fieldGroups.get(1).files().size()).isEqualTo(10); @@ -574,6 +579,20 @@ protected InternalRow dataDefault(int time, int size) { RANDOM.nextInt(), BinaryString.fromBytes(randomBytes()), new BlobData(blobBytes)); } + private static RowType makeBlobRowType( + List fieldNames, Function fieldIdFunc) { + List fields = new ArrayList<>(); + if (fieldNames == null) { + fieldNames = Collections.emptyList(); + } + for (String fieldName : fieldNames) { + int fieldId = fieldIdFunc.apply(fieldName); + DataField blobField = new DataField(fieldId, fieldName, DataTypes.BLOB()); + fields.add(blobField); + } + return new RowType(fields); + } + @Override protected byte[] randomBytes() { byte[] binary = new byte[2 * 1024 * 124]; diff --git a/paimon-core/src/test/java/org/apache/paimon/append/RollingBlobFileWriterTest.java b/paimon-core/src/test/java/org/apache/paimon/append/DataEvolutionRollingFileWriterTest.java similarity index 96% rename from paimon-core/src/test/java/org/apache/paimon/append/RollingBlobFileWriterTest.java rename to paimon-core/src/test/java/org/apache/paimon/append/DataEvolutionRollingFileWriterTest.java index 11095cb7d0dd..9eea7cdd6d3e 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/RollingBlobFileWriterTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/DataEvolutionRollingFileWriterTest.java @@ -49,8 +49,8 @@ import static org.assertj.core.api.Assertions.assertThat; -/** Tests for {@link RollingBlobFileWriter}. */ -public class RollingBlobFileWriterTest { +/** Tests for {@link DataEvolutionRollingFileWriter}. */ +public class DataEvolutionRollingFileWriterTest { private static final RowType SCHEMA = RowType.builder() @@ -65,7 +65,7 @@ public class RollingBlobFileWriterTest { @TempDir java.nio.file.Path tempDir; - private RollingBlobFileWriter writer; + private DataEvolutionRollingFileWriter writer; private DataFilePathFactory pathFactory; private LongCounter seqNumCounter; private byte[] testBlobData; @@ -93,10 +93,12 @@ public void setUp() throws IOException { // Initialize the writer CoreOptions options = new CoreOptions(new Options()); writer = - new RollingBlobFileWriter( + new DataEvolutionRollingFileWriter( fileIO, SCHEMA_ID, FileFormat.fromIdentifier("parquet", new Options()), + null, + TARGET_FILE_SIZE, TARGET_FILE_SIZE, TARGET_FILE_SIZE, SCHEMA, @@ -183,13 +185,15 @@ public void testBlobTargetFileSize() throws IOException { long blobTargetFileSize = 500 * 1024 * 1024L; // 2 MB for blob files // Create a new writer with different blob target file size - RollingBlobFileWriter blobSizeTestWriter = - new RollingBlobFileWriter( + DataEvolutionRollingFileWriter blobSizeTestWriter = + new DataEvolutionRollingFileWriter( LocalFileIO.create(), SCHEMA_ID, FileFormat.fromIdentifier("parquet", new Options()), + null, 128 * 1024 * 1024, blobTargetFileSize, // Different blob target size + 128 * 1024 * 1024, SCHEMA, new DataFilePathFactory( new Path(tempDir + "/blob-size-test"), @@ -267,13 +271,15 @@ public void testSchemaValidation() throws IOException { void testBlobFileNameFormatWithSharedUuid() throws IOException { long blobTargetFileSize = 2 * 1024 * 1024L; // 2 MB for blob files - RollingBlobFileWriter fileNameTestWriter = - new RollingBlobFileWriter( + DataEvolutionRollingFileWriter fileNameTestWriter = + new DataEvolutionRollingFileWriter( LocalFileIO.create(), SCHEMA_ID, FileFormat.fromIdentifier("parquet", new Options()), + null, 128 * 1024 * 1024, blobTargetFileSize, + 128 * 1024 * 1024, SCHEMA, pathFactory, // Use the same pathFactory to ensure shared UUID () -> new LongCounter(), @@ -345,13 +351,15 @@ void testBlobFileNameFormatWithSharedUuid() throws IOException { void testBlobFileNameFormatWithSharedUuidNonDescriptorMode() throws IOException { long blobTargetFileSize = 2 * 1024 * 1024L; // 2 MB for blob files - RollingBlobFileWriter fileNameTestWriter = - new RollingBlobFileWriter( + DataEvolutionRollingFileWriter fileNameTestWriter = + new DataEvolutionRollingFileWriter( LocalFileIO.create(), SCHEMA_ID, FileFormat.fromIdentifier("parquet", new Options()), + null, 128 * 1024 * 1024, blobTargetFileSize, + 128 * 1024 * 1024, SCHEMA, pathFactory, // Use the same pathFactory to ensure shared UUID () -> new LongCounter(), @@ -562,10 +570,12 @@ void testBlobStatsSchemaWithCustomColumnName() throws IOException { // Reinitialize writer with custom schema writer = - new RollingBlobFileWriter( + new DataEvolutionRollingFileWriter( LocalFileIO.create(), SCHEMA_ID, FileFormat.fromIdentifier("parquet", new Options()), + null, + TARGET_FILE_SIZE, TARGET_FILE_SIZE, TARGET_FILE_SIZE, customSchema, // Use custom schema diff --git a/paimon-core/src/test/java/org/apache/paimon/append/DataEvolutionRollingFileWriterWithVectorStoreTest.java b/paimon-core/src/test/java/org/apache/paimon/append/DataEvolutionRollingFileWriterWithVectorStoreTest.java new file mode 100644 index 000000000000..68fffc08b1a1 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/append/DataEvolutionRollingFileWriterWithVectorStoreTest.java @@ -0,0 +1,366 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.append; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.BinaryVector; +import org.apache.paimon.data.BlobData; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.fileindex.FileIndexOptions; +import org.apache.paimon.format.FileFormat; +import org.apache.paimon.format.blob.BlobFileFormat; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.io.DataFilePathFactory; +import org.apache.paimon.manifest.FileSource; +import org.apache.paimon.operation.BlobFileContext; +import org.apache.paimon.options.Options; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.LongCounter; +import org.apache.paimon.utils.StatsCollectorFactories; +import org.apache.paimon.utils.VectorStoreUtils; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Random; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link DataEvolutionRollingFileWriter} with vector-store. */ +public class DataEvolutionRollingFileWriterWithVectorStoreTest { + + private static final int VECTOR_DIM = 12; + private static final RowType SCHEMA = + RowType.builder() + .field("f0", DataTypes.INT()) + .field("f1", DataTypes.STRING()) + .field("f2", DataTypes.BLOB()) + .field("f3", DataTypes.VECTOR(VECTOR_DIM, DataTypes.FLOAT())) + .field("f4", DataTypes.INT()) + .build(); + + private static final long TARGET_FILE_SIZE = 2 * 1024 * 1024L; // 2 MB + private static final long VECTOR_TARGET_FILE_SIZE = 4 * 1024 * 1024L; // 4 MB + private static final long SCHEMA_ID = 1L; + private static final String COMPRESSION = "none"; + private static final Random RANDOM = new Random(System.currentTimeMillis()); + + @TempDir java.nio.file.Path tempDir; + + private DataEvolutionRollingFileWriter writer; + private DataFilePathFactory pathFactory; + private LongCounter seqNumCounter; + + @BeforeEach + public void setUp() throws IOException { + // Setup file system and path factory + LocalFileIO fileIO = LocalFileIO.create(); + pathFactory = + new DataFilePathFactory( + new Path(tempDir + "/bucket-0"), + "parquet", + "data-", // dataFilePrefix should include the hyphen to match expected + // format: data-{uuid}-{count} + "changelog", + false, + null, + null); + seqNumCounter = new LongCounter(); + + // Initialize the writer + writer = + new DataEvolutionRollingFileWriter( + fileIO, + SCHEMA_ID, + FileFormat.fromIdentifier("parquet", new Options()), + FileFormat.fromIdentifier("json", new Options()), + TARGET_FILE_SIZE, + TARGET_FILE_SIZE, + VECTOR_TARGET_FILE_SIZE, + SCHEMA, + pathFactory, + () -> seqNumCounter, + COMPRESSION, + new StatsCollectorFactories(new CoreOptions(new Options())), + new FileIndexOptions(), + FileSource.APPEND, + false, + BlobFileContext.create(SCHEMA, new CoreOptions(new Options()))); + } + + @Test + public void testBasicWriting() throws IOException { + // Write a single row + writer.write(makeRows(1, 10).get(0)); + assertThat(writer.recordCount()).isEqualTo(1); + } + + @Test + public void testMultipleWrites() throws Exception { + // Write multiple rows + int rowNum = RANDOM.nextInt(64) + 1; + writer.write(makeRows(rowNum, 10).iterator()); + writer.close(); + List metasResult = writer.result(); + + assertThat(metasResult.size()).isEqualTo(3); // blob is small, normal/blob/vector 3 files + assertThat(metasResult.get(0).fileFormat()).isEqualTo("parquet"); + assertThat(metasResult.get(1).fileFormat()).isEqualTo("blob"); + assertThat(metasResult.get(2).fileFormat()).isEqualTo("json"); + assertThat(writer.recordCount()).isEqualTo(rowNum); + + assertThat(metasResult.get(0).rowCount()).isEqualTo(metasResult.get(1).rowCount()); + assertThat(metasResult.get(0).rowCount()).isEqualTo(metasResult.get(2).rowCount()); + } + + @Test + public void testVectorTargetFileSize() throws Exception { + // 100k vector-store data would create 1 normal, 1 blob, and 3 vector-store files + int rowNum = 100 * 1000; + writer.write(makeRows(rowNum, 1).iterator()); + writer.close(); + List results = writer.result(); + + // Verify that we have multiple files due to rolling + assertThat(results.size()).isGreaterThan(1); + + // Check that vector-store files meet the target size requirement + List vectorStoreFiles = + results.stream() + .filter(file -> VectorStoreUtils.isVectorStoreFile(file.fileName())) + .collect(java.util.stream.Collectors.toList()); + + assertThat(vectorStoreFiles.size()).isEqualTo(3); + + // Verify that vector-store files are close to the target size (within reasonable tolerance) + for (DataFileMeta file : vectorStoreFiles.subList(0, vectorStoreFiles.size() - 1)) { + long fileSize = file.fileSize(); + assertThat(fileSize) + .as("Vector-store file size should be close to target size") + .isGreaterThanOrEqualTo(VECTOR_TARGET_FILE_SIZE) + .isLessThanOrEqualTo(VECTOR_TARGET_FILE_SIZE + 256 * 1024); + } + + // Verify total record count + assertThat(writer.recordCount()).isEqualTo(rowNum); + } + + @Test + void testVectorStoreFileNameFormatWithSharedUuid() throws Exception { + // 100k vector-store data would create 1 normal, 1 blob, and 3 vector-store files + int rowNum = 100 * 1000; + writer.write(makeRows(rowNum, 1).iterator()); + writer.close(); + List results = writer.result(); + + // Get uuid from vector-store files. The pattern is data-{uuid}-{count}.vector.json + DataFileMeta oneVectorStoreFile = + results.stream() + .filter(file -> VectorStoreUtils.isVectorStoreFile(file.fileName())) + .findAny() + .get(); + String uuidAndCnt = oneVectorStoreFile.fileName().split(".vector.")[0]; + String prefix = uuidAndCnt.substring(0, uuidAndCnt.lastIndexOf('-') + 1); // data-{uuid}- + + // Verify all files use the same UUID and have sequential counters + for (int i = 0; i < results.size(); ++i) { + String fileName = results.get(i).fileName(); + assertThat(fileName).as("All files should use the same UUID").startsWith(prefix); + int counter = Integer.parseInt(fileName.substring(prefix.length()).split("\\.")[0]); + assertThat(counter).as("File counter should be sequential").isEqualTo(i); + } + } + + @Test + void testVectorStoreStatsMainPart() throws Exception { + // Write multiple rows + int rowNum = RANDOM.nextInt(64) + 1; + writer.write(makeRows(rowNum, 10).iterator()); + writer.close(); + List metasResult = writer.result(); + + // Check row count + for (DataFileMeta file : metasResult) { + assertThat(file.rowCount()).isEqualTo(rowNum); + assertThat(file.deleteRowCount().get()).isEqualTo(0); // There is no deleted rows + } + + // Check statistics + for (DataFileMeta file : metasResult) { + if (BlobFileFormat.isBlobFile(file.fileName())) { + assertThat(file.writeCols()).isEqualTo(Collections.singletonList("f2")); + } else if (VectorStoreUtils.isVectorStoreFile(file.fileName())) { + assertThat(file.writeCols()).isEqualTo(Arrays.asList("f3")); + } else { + assertThat(file.writeCols()).isEqualTo(Arrays.asList("f0", "f1", "f4")); + assertThat(file.valueStats().minValues().getInt(0)).isEqualTo(0); + assertThat(file.valueStats().maxValues().getInt(0)).isEqualTo(rowNum - 1); + } + } + + // Verify total record count + assertThat(writer.recordCount()).isEqualTo(rowNum); + } + + @Test + public void testVectorStoreNoBlob() throws Exception { + RowType schema = + RowType.builder() + .field("f0", DataTypes.INT()) + .field("f1", DataTypes.STRING()) + .field("f2", DataTypes.VECTOR(VECTOR_DIM, DataTypes.FLOAT())) + .field("f3", DataTypes.INT()) + .build(); + writer = + new DataEvolutionRollingFileWriter( + LocalFileIO.create(), + SCHEMA_ID, + FileFormat.fromIdentifier("parquet", new Options()), + FileFormat.fromIdentifier("json", new Options()), + TARGET_FILE_SIZE, + TARGET_FILE_SIZE, + VECTOR_TARGET_FILE_SIZE, + schema, + pathFactory, + () -> seqNumCounter, + COMPRESSION, + new StatsCollectorFactories(new CoreOptions(new Options())), + new FileIndexOptions(), + FileSource.APPEND, + false, + null); + + // 100k vector-store data would create 1 normal and 3 vector-store files + int rowNum = 100 * 1000; + List rows = makeRows(rowNum, 1); + for (InternalRow row : rows) { + writer.write( + GenericRow.of( + row.getInt(0), row.getString(1), row.getVector(3), row.getInt(4))); + } + writer.close(); + List results = writer.result(); + + // Check normal, blob, and vector-store files + List normalFiles = new ArrayList<>(); + List blobFiles = new ArrayList<>(); + List vectorStoreFiles = new ArrayList<>(); + for (DataFileMeta file : results) { + if (BlobFileFormat.isBlobFile(file.fileName())) { + blobFiles.add(file); + } else if (VectorStoreUtils.isVectorStoreFile(file.fileName())) { + vectorStoreFiles.add(file); + } else { + normalFiles.add(file); + } + } + assertThat(normalFiles.size()).isEqualTo(1); + assertThat(blobFiles.size()).isEqualTo(0); + assertThat(vectorStoreFiles.size()).isEqualTo(3); + + // Verify total record count + assertThat(writer.recordCount()).isEqualTo(rowNum); + } + + @Test + public void testVectorStoreTheSameFormat() throws Exception { + // vector-store file format is the same as main part + writer = + new DataEvolutionRollingFileWriter( + LocalFileIO.create(), + SCHEMA_ID, + FileFormat.fromIdentifier("json", new Options()), + FileFormat.fromIdentifier("json", new Options()), + TARGET_FILE_SIZE, + TARGET_FILE_SIZE, + VECTOR_TARGET_FILE_SIZE, + SCHEMA, + pathFactory, + () -> seqNumCounter, + COMPRESSION, + new StatsCollectorFactories(new CoreOptions(new Options())), + new FileIndexOptions(), + FileSource.APPEND, + false, + BlobFileContext.create(SCHEMA, new CoreOptions(new Options()))); + + // This time we use large blob files + int rowNum = 10; + writer.write(makeRows(rowNum, 512 * 1024).iterator()); + writer.close(); + List results = writer.result(); + + // Check normal, blob, and vector-store files + List normalFiles = new ArrayList<>(); + List blobFiles = new ArrayList<>(); + List vectorStoreFiles = new ArrayList<>(); + for (DataFileMeta file : results) { + if (BlobFileFormat.isBlobFile(file.fileName())) { + blobFiles.add(file); + } else if (VectorStoreUtils.isVectorStoreFile(file.fileName())) { + vectorStoreFiles.add(file); + } else { + assertThat(file.writeCols()).isEqualTo(Arrays.asList("f0", "f1", "f3", "f4")); + normalFiles.add(file); + } + } + assertThat(normalFiles.size()).isEqualTo(1); + assertThat(blobFiles.size()).isEqualTo(3); + assertThat(vectorStoreFiles.size()).isEqualTo(0); + + // Verify total record count + assertThat(writer.recordCount()).isEqualTo(rowNum); + } + + private List makeRows(int rowNum, int blobDataSize) { + List rows = new ArrayList<>(rowNum); + byte[] blobData = new byte[blobDataSize]; + RANDOM.nextBytes(blobData); + for (int i = 0; i < rowNum; ++i) { + byte[] string = new byte[1]; + RANDOM.nextBytes(string); + byte[] buf = new byte[VECTOR_DIM]; + RANDOM.nextBytes(buf); + float[] vector = new float[VECTOR_DIM]; + for (int j = 0; j < VECTOR_DIM; ++j) { + vector[j] = buf[j]; + } + int label = RANDOM.nextInt(32) + 1; + rows.add( + GenericRow.of( + i, + BinaryString.fromBytes(string), + new BlobData(blobData), + BinaryVector.fromPrimitiveArray(vector), + label)); + } + return rows; + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/append/MultipleBlobTableTest.java b/paimon-core/src/test/java/org/apache/paimon/append/MultipleBlobTableTest.java index 97f621a72024..4b6d1c579a9c 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/MultipleBlobTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/MultipleBlobTableTest.java @@ -62,9 +62,7 @@ public void testBasic() throws Exception { .collect(Collectors.toList()); RowType rowType = table.schema().logicalRowType(); - List fieldGroups = - splitFieldBunches( - filesMetas, file -> rowType.getField(file.writeCols().get(0)).id()); + List fieldGroups = splitFieldBunches(filesMetas, file -> rowType); assertThat(fieldGroups.size()).isEqualTo(3); assertThat(fieldGroups.get(0).files().size()).isEqualTo(1); diff --git a/paimon-core/src/test/java/org/apache/paimon/append/VectorStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/append/VectorStoreTableTest.java new file mode 100644 index 000000000000..23904e869e02 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/append/VectorStoreTableTest.java @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.append; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.BinaryVector; +import org.apache.paimon.data.BlobData; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.manifest.ManifestEntry; +import org.apache.paimon.operation.DataEvolutionSplitRead; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.TableTestBase; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.StreamTableWrite; +import org.apache.paimon.table.sink.StreamWriteBuilder; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + +/** Tests for table with vector-store and data evolution. */ +public class VectorStoreTableTest extends TableTestBase { + + private static final int VECTOR_DIM = 12; + + private AtomicInteger uniqueIdGen = new AtomicInteger(0); + + private Map rowsWritten = new HashMap<>(); + + @Test + public void testBasic() throws Exception { + int rowNum = RANDOM.nextInt(64) + 1; + + createTableDefault(); + + commitDefault(writeDataDefault(rowNum, 1)); + + AtomicInteger counter = new AtomicInteger(0); + + List filesMetas = + getTableDefault().store().newScan().plan().files().stream() + .map(ManifestEntry::file) + .collect(Collectors.toList()); + + List fieldGroups = + DataEvolutionSplitRead.splitFieldBunches( + filesMetas, key -> makeBlobRowType(key.writeCols(), f -> 0)); + + assertThat(fieldGroups.size()).isEqualTo(3); + assertThat(fieldGroups.get(0).files().size()).isEqualTo(1); + assertThat(fieldGroups.get(1).files().size()).isEqualTo(1); + assertThat(fieldGroups.get(2).files().size()).isEqualTo(1); + + readDefault( + row -> { + counter.getAndIncrement(); + InternalRow expected = rowsWritten.get(row.getInt(0)); + assertThat(row.getString(1)).isEqualTo(expected.getString(1)); + assertThat(row.getBlob(2)).isEqualTo(expected.getBlob(2)); + assertThat(row.getVector(3).toFloatArray()) + .isEqualTo(expected.getVector(3).toFloatArray()); + assertThat(row.getInt(4)).isEqualTo(expected.getInt(4)); + }); + + assertThat(counter.get()).isEqualTo(rowNum); + } + + @Test + public void testMultiBatch() throws Exception { + int rowNum = (RANDOM.nextInt(64) + 1) * 2; + + createTableDefault(); + + commitDefault(writeDataDefault(rowNum / 2, 2)); + + AtomicInteger counter = new AtomicInteger(0); + + List filesMetas = + getTableDefault().store().newScan().plan().files().stream() + .map(ManifestEntry::file) + .collect(Collectors.toList()); + + List> batches = DataEvolutionSplitRead.mergeRangesAndSort(filesMetas); + assertThat(batches.size()).isEqualTo(2); + for (List batch : batches) { + List fieldGroups = + DataEvolutionSplitRead.splitFieldBunches( + batch, file -> makeBlobRowType(file.writeCols(), f -> 0)); + assertThat(fieldGroups.size()).isEqualTo(3); + assertThat(fieldGroups.get(0).files().size()).isEqualTo(1); + assertThat(fieldGroups.get(1).files().size()).isEqualTo(1); + assertThat(fieldGroups.get(2).files().size()).isEqualTo(1); + } + + readDefault( + row -> { + counter.getAndIncrement(); + InternalRow expected = rowsWritten.get(row.getInt(0)); + assertThat(row.getString(1)).isEqualTo(expected.getString(1)); + assertThat(row.getBlob(2)).isEqualTo(expected.getBlob(2)); + assertThat(row.getVector(3).toFloatArray()) + .isEqualTo(expected.getVector(3).toFloatArray()); + assertThat(row.getInt(4)).isEqualTo(expected.getInt(4)); + }); + assertThat(counter.get()).isEqualTo(rowNum); + } + + @Test + public void testRolling() throws Exception { + // 100k vector-store data would create 1 normal, 1 blob, and 3 vector-store files + int rowNum = 100 * 1000 * 3; + + createTableDefault(); + + commitDefault(writeDataDefault(rowNum / 3, 3)); + + AtomicInteger counter = new AtomicInteger(0); + + List filesMetas = + getTableDefault().store().newScan().plan().files().stream() + .map(ManifestEntry::file) + .collect(Collectors.toList()); + + List> batches = DataEvolutionSplitRead.mergeRangesAndSort(filesMetas); + assertThat(batches.size()).isEqualTo(3); + for (List batch : batches) { + List fieldGroups = + DataEvolutionSplitRead.splitFieldBunches( + batch, file -> makeBlobRowType(file.writeCols(), f -> 0)); + assertThat(fieldGroups.size()).isEqualTo(3); + assertThat(fieldGroups.get(0).files().size()).isEqualTo(1); + assertThat(fieldGroups.get(1).files().size()).isEqualTo(1); + assertThat(fieldGroups.get(2).files().size()).isEqualTo(3); + } + + readDefault( + row -> { + counter.getAndIncrement(); + InternalRow expected = rowsWritten.get(row.getInt(0)); + assertThat(row.getString(1)).isEqualTo(expected.getString(1)); + assertThat(row.getBlob(2)).isEqualTo(expected.getBlob(2)); + assertThat(row.getVector(3).toFloatArray()) + .isEqualTo(expected.getVector(3).toFloatArray()); + assertThat(row.getInt(4)).isEqualTo(expected.getInt(4)); + }); + + assertThat(counter.get()).isEqualTo(rowNum); + } + + @Test + public void testWithoutBlob() throws Exception { + // 100k vector-store data would create 1 normal, 1 blob, and 3 vector-store files + int rowNum = 100 * 1000 * 3; + + catalog.createTable(identifier(), schemaWithoutBlob(), true); + + commitDefault(writeDataWithoutBlob(rowNum / 3, 3)); + + AtomicInteger counter = new AtomicInteger(0); + + List filesMetas = + getTableDefault().store().newScan().plan().files().stream() + .map(ManifestEntry::file) + .collect(Collectors.toList()); + + List> batches = DataEvolutionSplitRead.mergeRangesAndSort(filesMetas); + assertThat(batches.size()).isEqualTo(3); + for (List batch : batches) { + List fieldGroups = + DataEvolutionSplitRead.splitFieldBunches( + batch, file -> makeBlobRowType(file.writeCols(), f -> 0)); + assertThat(fieldGroups.size()).isEqualTo(2); + assertThat(fieldGroups.get(0).files().size()).isEqualTo(1); + assertThat(fieldGroups.get(1).files().size()).isEqualTo(3); + } + + readDefault( + row -> { + counter.getAndIncrement(); + InternalRow expected = rowsWritten.get(row.getInt(0)); + assertThat(row.getString(1)).isEqualTo(expected.getString(1)); + assertThat(row.getVector(2).toFloatArray()) + .isEqualTo(expected.getVector(3).toFloatArray()); + assertThat(row.getInt(3)).isEqualTo(expected.getInt(4)); + }); + + assertThat(counter.get()).isEqualTo(rowNum); + } + + @Override + protected Schema schemaDefault() { + Schema.Builder schemaBuilder = Schema.newBuilder(); + schemaBuilder.column("f0", DataTypes.INT()); + schemaBuilder.column("f1", DataTypes.STRING()); + schemaBuilder.column("f2", DataTypes.BLOB()); + schemaBuilder.column("f3", DataTypes.VECTOR(VECTOR_DIM, DataTypes.FLOAT())); + schemaBuilder.column("f4", DataTypes.INT()); + schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "2 MB"); + schemaBuilder.option(CoreOptions.VECTOR_TARGET_FILE_SIZE.key(), "4 MB"); + schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true"); + schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true"); + schemaBuilder.option(CoreOptions.VECTOR_FIELD.key(), "f3"); + schemaBuilder.option(CoreOptions.VECTOR_FILE_FORMAT.key(), "json"); + schemaBuilder.option(CoreOptions.FILE_COMPRESSION.key(), "none"); + return schemaBuilder.build(); + } + + private Schema schemaWithoutBlob() { + Schema.Builder schemaBuilder = Schema.newBuilder(); + schemaBuilder.column("f0", DataTypes.INT()); + schemaBuilder.column("f1", DataTypes.STRING()); + schemaBuilder.column("f2", DataTypes.VECTOR(VECTOR_DIM, DataTypes.FLOAT())); + schemaBuilder.column("f3", DataTypes.INT()); + schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "2 MB"); + schemaBuilder.option(CoreOptions.VECTOR_TARGET_FILE_SIZE.key(), "4 MB"); + schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true"); + schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true"); + schemaBuilder.option(CoreOptions.VECTOR_FIELD.key(), "f2"); + schemaBuilder.option(CoreOptions.VECTOR_FILE_FORMAT.key(), "json"); + schemaBuilder.option(CoreOptions.FILE_COMPRESSION.key(), "none"); + return schemaBuilder.build(); + } + + protected List writeDataWithoutBlob(int size, int times) throws Exception { + Table table = getTableDefault(); + List messages = new ArrayList<>(); + for (int time = 0; time < times; time++) { + StreamWriteBuilder builder = table.newStreamWriteBuilder(); + builder.withCommitUser(commitUser); + try (StreamTableWrite streamTableWrite = builder.newWrite()) { + for (int j = 0; j < size; j++) { + InternalRow row = dataDefault(time, j); + streamTableWrite.write( + GenericRow.of( + row.getInt(0), + row.getString(1), + row.getVector(3), + row.getInt(4))); + } + messages.addAll(streamTableWrite.prepareCommit(false, Long.MAX_VALUE)); + } + } + return messages; + } + + @Override + protected InternalRow dataDefault(int time, int size) { + byte[] stringBytes = new byte[1]; + RANDOM.nextBytes(stringBytes); + byte[] blobBytes = new byte[1]; + RANDOM.nextBytes(blobBytes); + byte[] vectorBytes = new byte[VECTOR_DIM]; + RANDOM.nextBytes(vectorBytes); + float[] vector = new float[VECTOR_DIM]; + for (int i = 0; i < VECTOR_DIM; i++) { + vector[i] = vectorBytes[i]; + } + int id = uniqueIdGen.getAndIncrement(); + InternalRow row = + GenericRow.of( + id, + BinaryString.fromBytes(stringBytes), + new BlobData(blobBytes), + BinaryVector.fromPrimitiveArray(vector), + RANDOM.nextInt(32) + 1); + rowsWritten.put(id, row); + return row; + } + + private static RowType makeBlobRowType( + List fieldNames, Function fieldIdFunc) { + List fields = new ArrayList<>(); + if (fieldNames == null) { + fieldNames = Collections.emptyList(); + } + for (String fieldName : fieldNames) { + int fieldId = fieldIdFunc.apply(fieldName); + DataField blobField = new DataField(fieldId, fieldName, DataTypes.BLOB()); + fields.add(blobField); + } + return new RowType(fields); + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinatorTest.java b/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinatorTest.java index 441ec93ebbab..ca8a5cbb1bfe 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinatorTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinatorTest.java @@ -61,7 +61,7 @@ public void testCompactPlannerSingleFile() { DataEvolutionCompactCoordinator.CompactPlanner planner = new DataEvolutionCompactCoordinator.CompactPlanner( - false, 128 * 1024 * 1024, 4 * 1024 * 1024, 2); + false, false, 128 * 1024 * 1024, 4 * 1024 * 1024, 2); List tasks = planner.compactPlan(entries); @@ -78,7 +78,7 @@ public void testCompactPlannerContiguousFiles() { // Use small target file size to trigger compaction DataEvolutionCompactCoordinator.CompactPlanner planner = - new DataEvolutionCompactCoordinator.CompactPlanner(false, 199, 1, 2); + new DataEvolutionCompactCoordinator.CompactPlanner(false, false, 199, 1, 2); List tasks = planner.compactPlan(entries); @@ -86,7 +86,7 @@ public void testCompactPlannerContiguousFiles() { assertThat(tasks.get(0).compactBefore()) .containsExactly(entries.get(0).file(), entries.get(1).file()); - planner = new DataEvolutionCompactCoordinator.CompactPlanner(false, 200, 1, 2); + planner = new DataEvolutionCompactCoordinator.CompactPlanner(false, false, 200, 1, 2); tasks = planner.compactPlan(entries); assertThat(tasks).isNotEmpty(); assertThat(tasks.get(0).compactBefore()) @@ -107,7 +107,7 @@ public void testCompactPlannerWithRowIdGap() { // Use large target file size so compaction is triggered by gap, not size DataEvolutionCompactCoordinator.CompactPlanner planner = new DataEvolutionCompactCoordinator.CompactPlanner( - false, 128 * 1024 * 1024, 4 * 1024 * 1024, 2); + false, false, 128 * 1024 * 1024, 4 * 1024 * 1024, 2); List tasks = planner.compactPlan(entries); @@ -137,7 +137,7 @@ public void testCompactPlannerSkipsLargeFiles() { DataEvolutionCompactCoordinator.CompactPlanner planner = new DataEvolutionCompactCoordinator.CompactPlanner( - false, 100 * 1024 * 1024, 4 * 1024 * 1024, 2); + false, false, 100 * 1024 * 1024, 4 * 1024 * 1024, 2); List tasks = planner.compactPlan(entries); @@ -165,7 +165,7 @@ public void testCompactPlannerWithBlobFiles() { // Use small target to trigger compaction, with blob compaction enabled DataEvolutionCompactCoordinator.CompactPlanner planner = - new DataEvolutionCompactCoordinator.CompactPlanner(true, 1024, 1024, 2); + new DataEvolutionCompactCoordinator.CompactPlanner(true, false, 1024, 1024, 2); List tasks = planner.compactPlan(entries); @@ -244,7 +244,7 @@ public void testPlanWithNullManifestRowId() { .thenReturn(Arrays.asList(entry1, entry2).iterator()); DataEvolutionCompactCoordinator coordinator = - new DataEvolutionCompactCoordinator(table, false); + new DataEvolutionCompactCoordinator(table, false, false); List tasks = coordinator.plan(); assertThat(tasks).hasSize(1); diff --git a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java index 251e0b17e763..248f3b3ac486 100644 --- a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java @@ -272,6 +272,8 @@ public void testFileSuffix(@TempDir java.nio.file.Path tempDir) throws Exception IOManager.create(tempDir.toString()), 0, fileFormat, + null, + 10, 10, 10, schema, diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java index 452813d9d93c..4e85b1e3dd54 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java @@ -21,9 +21,12 @@ import org.apache.paimon.data.Timestamp; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.manifest.FileSource; -import org.apache.paimon.operation.DataEvolutionSplitRead.BlobBunch; import org.apache.paimon.operation.DataEvolutionSplitRead.FieldBunch; +import org.apache.paimon.operation.DataEvolutionSplitRead.SpecialFieldBunch; import org.apache.paimon.stats.SimpleStats; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -32,20 +35,21 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.function.Function; import static org.apache.paimon.operation.DataEvolutionSplitRead.splitFieldBunches; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatThrownBy; -/** Tests for {@link BlobBunch}. */ +/** Tests for {@link SpecialFieldBunch}. */ public class DataEvolutionReadTest { - private BlobBunch blobBunch; + private SpecialFieldBunch blobBunch; @BeforeEach public void setUp() { - blobBunch = new BlobBunch(Long.MAX_VALUE, false); + blobBunch = new SpecialFieldBunch(Long.MAX_VALUE, false); } @Test @@ -84,7 +88,7 @@ public void testAddNonBlobFileThrowsException() { assertThatThrownBy(() -> blobBunch.add(normalFile)) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Only blob file can be added to a blob bunch."); + .hasMessage("Only blob/vector-store file can be added to this bunch."); } @Test @@ -97,7 +101,7 @@ public void testAddBlobFileWithSameFirstRowId() { assertThatThrownBy(() -> blobBunch.add(blobEntry2)) .isInstanceOf(IllegalArgumentException.class) .hasMessage( - "Blob file with same first row id should have decreasing sequence number."); + "Blob/vector-store file with same first row id should have decreasing sequence number."); } @Test @@ -136,7 +140,7 @@ public void testAddBlobFileWithOverlappingRowIdAndHigherSequenceNumber() { assertThatThrownBy(() -> blobBunch.add(blobEntry2)) .isInstanceOf(IllegalArgumentException.class) .hasMessage( - "Blob file with overlapping row id should have decreasing sequence number."); + "Blob/vector-store file with overlapping row id should have decreasing sequence number."); } @Test @@ -148,7 +152,8 @@ public void testAddBlobFileWithNonContinuousRowId() { // Adding file with non-continuous row id should throw exception assertThatThrownBy(() -> blobBunch.add(blobEntry2)) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Blob file first row id should be continuous, expect 100 but got 200"); + .hasMessage( + "Blob/vector-store file first row id should be continuous, expect 100 but got 200"); } @Test @@ -161,7 +166,7 @@ public void testAddBlobFileWithDifferentWriteCols() { // Adding file with different write columns should throw exception assertThatThrownBy(() -> blobBunch.add(blobEntry2)) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("All files in a blob bunch should have the same write columns."); + .hasMessage("All files in this bunch should have the same write columns."); } @Test @@ -214,10 +219,11 @@ public void testComplexBlobBunchScenario2() { assertThat(batch.get(8).fileName()).contains("blob4"); // skip assertThat(batch.get(9).fileName()).contains("blob8"); // pick - List fieldBunches = splitFieldBunches(batch, file -> 0); + List fieldBunches = + splitFieldBunches(batch, file -> makeBlobRowType(file.writeCols(), f -> 0)); assertThat(fieldBunches.size()).isEqualTo(2); - BlobBunch blobBunch = (BlobBunch) fieldBunches.get(1); + SpecialFieldBunch blobBunch = (SpecialFieldBunch) fieldBunches.get(1); assertThat(blobBunch.files).hasSize(4); assertThat(blobBunch.files.get(0).fileName()).contains("blob5"); assertThat(blobBunch.files.get(1).fileName()).contains("blob9"); @@ -265,17 +271,18 @@ public void testComplexBlobBunchScenario3() { List batch = batches.get(0); List fieldBunches = - splitFieldBunches(batch, file -> file.writeCols().get(0).hashCode()); + splitFieldBunches( + batch, file -> makeBlobRowType(file.writeCols(), String::hashCode)); assertThat(fieldBunches.size()).isEqualTo(3); - BlobBunch blobBunch = (BlobBunch) fieldBunches.get(1); + SpecialFieldBunch blobBunch = (SpecialFieldBunch) fieldBunches.get(1); assertThat(blobBunch.files).hasSize(4); assertThat(blobBunch.files.get(0).fileName()).contains("blob5"); assertThat(blobBunch.files.get(1).fileName()).contains("blob9"); assertThat(blobBunch.files.get(2).fileName()).contains("blob7"); assertThat(blobBunch.files.get(3).fileName()).contains("blob8"); - blobBunch = (BlobBunch) fieldBunches.get(2); + blobBunch = (SpecialFieldBunch) fieldBunches.get(2); assertThat(blobBunch.files).hasSize(4); assertThat(blobBunch.files.get(0).fileName()).contains("blob15"); assertThat(blobBunch.files.get(1).fileName()).contains("blob19"); @@ -322,22 +329,22 @@ private DataFileMeta createBlobFileWithCols( @Test public void testRowIdPushDown() { - BlobBunch blobBunch = new BlobBunch(Long.MAX_VALUE, true); + SpecialFieldBunch blobBunch = new SpecialFieldBunch(Long.MAX_VALUE, true); DataFileMeta blobEntry1 = createBlobFile("blob1", 0, 100, 1); DataFileMeta blobEntry2 = createBlobFile("blob2", 200, 300, 1); blobBunch.add(blobEntry1); - BlobBunch finalBlobBunch = blobBunch; + SpecialFieldBunch finalBlobBunch = blobBunch; DataFileMeta finalBlobEntry = blobEntry2; assertThatCode(() -> finalBlobBunch.add(finalBlobEntry)).doesNotThrowAnyException(); - blobBunch = new BlobBunch(Long.MAX_VALUE, true); + blobBunch = new SpecialFieldBunch(Long.MAX_VALUE, true); blobEntry1 = createBlobFile("blob1", 0, 100, 1); blobEntry2 = createBlobFile("blob2", 50, 200, 2); blobBunch.add(blobEntry1); blobBunch.add(blobEntry2); assertThat(blobBunch.files).containsExactlyInAnyOrder(blobEntry2); - BlobBunch finalBlobBunch2 = blobBunch; + SpecialFieldBunch finalBlobBunch2 = blobBunch; DataFileMeta blobEntry3 = createBlobFile("blob2", 250, 100, 2); assertThatCode(() -> finalBlobBunch2.add(blobEntry3)).doesNotThrowAnyException(); } @@ -371,4 +378,18 @@ private DataFileMeta createNormalFile( firstRowId, null); } + + private static RowType makeBlobRowType( + List fieldNames, Function fieldIdFunc) { + List fields = new ArrayList<>(); + if (fieldNames == null) { + fieldNames = Collections.emptyList(); + } + for (String fieldName : fieldNames) { + int fieldId = fieldIdFunc.apply(fieldName); + DataField blobField = new DataField(fieldId, fieldName, DataTypes.BLOB()); + fields.add(blobField); + } + return new RowType(fields); + } } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionSplitReadTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionSplitReadTest.java index 2f0b1b2f1b4d..95bae45a7d9c 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionSplitReadTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionSplitReadTest.java @@ -115,6 +115,24 @@ public void testSplitWithMultipleBlobFilesPerGroup() { assertEquals(Arrays.asList(file4, file5, file6), result.get(1)); } + @Test + public void testSplitWithMultipleVectorStoreFilesPerGroup() { + DataFileMeta file1 = createFile("file1.parquet", 1L, 10, 1); + DataFileMeta file2 = createFile("file2.vector.json", 1L, 1, 1); + DataFileMeta file3 = createFile("file3.vector.json", 2L, 9, 1); + DataFileMeta file4 = createFile("file4.parquet", 20L, 10, 2); + DataFileMeta file5 = createFile("file5.vector.json", 20L, 5, 2); + DataFileMeta file6 = createFile("file6.vector.json", 25L, 5, 2); + DataFileMeta file7 = createFile("file7.parquet", 1L, 10, 3); + + List files = Arrays.asList(file1, file2, file3, file4, file5, file6, file7); + List> result = DataEvolutionSplitRead.mergeRangesAndSort(files); + + assertEquals(2, result.size()); + assertEquals(Arrays.asList(file7, file1, file2, file3), result.get(0)); + assertEquals(Arrays.asList(file4, file5, file6), result.get(1)); + } + private static DataFileMeta createFile( String name, long firstRowId, long rowCount, long maxSequence) { return DataFileMeta.create( diff --git a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java index 9b789e246be2..2f7ffbb6d937 100644 --- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java @@ -35,7 +35,10 @@ import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; import static org.apache.paimon.CoreOptions.BUCKET; +import static org.apache.paimon.CoreOptions.DATA_EVOLUTION_ENABLED; import static org.apache.paimon.CoreOptions.SCAN_SNAPSHOT_ID; +import static org.apache.paimon.CoreOptions.VECTOR_FIELD; +import static org.apache.paimon.CoreOptions.VECTOR_FILE_FORMAT; import static org.apache.paimon.schema.SchemaValidation.validateTableSchema; import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatNoException; @@ -200,4 +203,115 @@ public void testChainTableAllowsNonDeduplicateMergeEngine() { assertThatNoException().isThrownBy(() -> validateTableSchema(schema)); } + + @Test + public void testVectorStoreUnknownColumn() { + Map options = new HashMap<>(); + options.put(BUCKET.key(), String.valueOf(-1)); + options.put(CoreOptions.ROW_TRACKING_ENABLED.key(), "true"); + options.put(DATA_EVOLUTION_ENABLED.key(), "true"); + options.put(CoreOptions.FILE_FORMAT.key(), "avro"); + options.put(VECTOR_FILE_FORMAT.key(), "json"); + options.put(VECTOR_FIELD.key(), "f99"); + + List fields = + Arrays.asList( + new DataField(0, "f0", DataTypes.INT()), + new DataField(1, "f1", DataTypes.STRING())); + assertThatThrownBy( + () -> + validateTableSchema( + new TableSchema( + 1, + fields, + 10, + emptyList(), + emptyList(), + options, + ""))) + .hasMessage("Some of the columns specified as vector-field are unknown."); + } + + @Test + public void testVectorStoreContainsNonVectorColumn() { + Map options = new HashMap<>(); + options.put(BUCKET.key(), String.valueOf(-1)); + options.put(CoreOptions.ROW_TRACKING_ENABLED.key(), "true"); + options.put(DATA_EVOLUTION_ENABLED.key(), "true"); + options.put(CoreOptions.FILE_FORMAT.key(), "avro"); + options.put(VECTOR_FILE_FORMAT.key(), "json"); + options.put(VECTOR_FIELD.key(), "f1"); + + List fields = + Arrays.asList( + new DataField(0, "f0", DataTypes.INT()), + new DataField(1, "f1", DataTypes.FLOAT())); + assertThatThrownBy( + () -> + validateTableSchema( + new TableSchema( + 1, + fields, + 10, + emptyList(), + emptyList(), + options, + ""))) + .hasMessage( + "Field name[f1] is configured as vector-field so the type must be vector, but it is FLOAT"); + } + + @Test + public void testVectorStoreContainsPartitionColumn() { + Map options = new HashMap<>(); + options.put(BUCKET.key(), String.valueOf(-1)); + options.put(CoreOptions.ROW_TRACKING_ENABLED.key(), "true"); + options.put(DATA_EVOLUTION_ENABLED.key(), "true"); + options.put(CoreOptions.FILE_FORMAT.key(), "avro"); + options.put(VECTOR_FILE_FORMAT.key(), "json"); + options.put(VECTOR_FIELD.key(), "f1"); + + List fields = + Arrays.asList( + new DataField(0, "f0", DataTypes.INT()), + new DataField(1, "f1", DataTypes.VECTOR(6, DataTypes.FLOAT()))); + assertThatThrownBy( + () -> + validateTableSchema( + new TableSchema( + 1, + fields, + 10, + singletonList("f1"), + emptyList(), + options, + ""))) + .hasMessage("The vector-store columns can not be part of partition keys."); + } + + @Test + public void testVectorStoreRequiresDataEvolutionEnabled() { + Map options = new HashMap<>(); + options.put(CoreOptions.FILE_FORMAT.key(), "avro"); + options.put(VECTOR_FILE_FORMAT.key(), "json"); + options.put(VECTOR_FIELD.key(), "f1"); + + List fields = + Arrays.asList( + new DataField(0, "f0", DataTypes.INT()), + new DataField(1, "f1", DataTypes.VECTOR(6, DataTypes.FLOAT()))); + assertThatThrownBy( + () -> + validateTableSchema( + new TableSchema( + 1, + fields, + 10, + emptyList(), + emptyList(), + options, + ""))) + .hasMessage( + "Data evolution config must enabled for table with vector-store file format."); + } } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java index 5ca56eed31f9..118a4fdefede 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java @@ -895,7 +895,7 @@ public void testCompactCoordinator() throws Exception { FileStoreTable table = (FileStoreTable) catalog.getTable(identifier()); // Create coordinator and call plan multiple times DataEvolutionCompactCoordinator coordinator = - new DataEvolutionCompactCoordinator(table, false); + new DataEvolutionCompactCoordinator(table, false, false); // Each plan() call processes one manifest group List allTasks = new ArrayList<>(); @@ -926,7 +926,7 @@ public void testCompact() throws Exception { FileStoreTable table = (FileStoreTable) catalog.getTable(identifier()); // Create coordinator and call plan multiple times DataEvolutionCompactCoordinator coordinator = - new DataEvolutionCompactCoordinator(table, false); + new DataEvolutionCompactCoordinator(table, false, false); // Each plan() call processes one manifest group List commitMessages = new ArrayList<>(); diff --git a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/FlinkVectorStoreE2eTest.java b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/FlinkVectorStoreE2eTest.java new file mode 100644 index 000000000000..12f31f33f8db --- /dev/null +++ b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/FlinkVectorStoreE2eTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.tests; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Random; +import java.util.UUID; + +/** E2E test for vector-store with data evolution. */ +public class FlinkVectorStoreE2eTest extends E2eTestBase { + + @Test + public void testVectorStoreTable() throws Exception { + Random rnd = new Random(System.currentTimeMillis()); + int vectorDim = rnd.nextInt(10) + 1; + final int itemNum = rnd.nextInt(3) + 1; + + String catalogDdl = + String.format( + "CREATE CATALOG ts_catalog WITH (\n" + + " 'type' = 'paimon',\n" + + " 'warehouse' = '%s'\n" + + ");", + TEST_DATA_DIR + "/" + UUID.randomUUID() + ".store"); + + String useCatalogCmd = "USE CATALOG ts_catalog;"; + + String createTableDdl = + String.format( + "CREATE TABLE IF NOT EXISTS ts_table (\n" + + " id BIGINT,\n" + + " embed ARRAY\n" + + ") WITH (\n" + + " 'file.format' = 'parquet',\n" + + " 'file.compression' = 'none',\n" + + " 'row-tracking.enabled' = 'true',\n" + + " 'data-evolution.enabled' = 'true',\n" + + " 'vector.file.format' = 'json',\n" + + " 'vector-field' = 'embed',\n" + + " 'field.embed.vector-dim' = '%d'\n" + + ");", + vectorDim); + + float[][] vectors = new float[itemNum][vectorDim]; + byte[] vectorDataBuf = new byte[vectorDim]; + for (int i = 0; i < itemNum; ++i) { + vectors[i] = new float[vectorDim]; + rnd.nextBytes(vectorDataBuf); + for (int j = 0; j < vectorDim; ++j) { + vectors[i][j] = vectorDataBuf[j]; + } + } + + List values = new ArrayList<>(); + String[] expected = new String[itemNum]; + for (int id = 0; id < itemNum; ++id) { + values.add(String.format("(%d, %s)", id, arrayLiteral(vectors[id]))); + expected[id] = String.format("%d, %s", id, Arrays.toString(vectors[id])); + } + + runBatchSql( + "INSERT INTO ts_table VALUES " + String.join(", ", values) + ";", + catalogDdl, + useCatalogCmd, + createTableDdl); + + runBatchSql( + "INSERT INTO result1 SELECT * FROM ts_table;", + catalogDdl, + useCatalogCmd, + createTableDdl, + createResultSink("result1", "id BIGINT, embed ARRAY")); + checkResult(expected); + clearCurrentResults(); + + runBatchSql( + "INSERT INTO result2 SELECT " + + "COUNT(*) AS total, " + + "SUM(CASE WHEN file_path LIKE '%.vector.json' THEN 1 ELSE 0 END) " + + "AS vector_files " + + "FROM \\`ts_table\\$files\\`;", + catalogDdl, + useCatalogCmd, + createTableDdl, + createResultSink("result2", "total BIGINT, vector_files BIGINT")); + checkResult("2, 1"); + } + + private String arrayLiteral(float[] vector) { + return "ARRAY" + Arrays.toString(vector); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java index c0eddff64682..e05106eef6ff 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java @@ -129,6 +129,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; @@ -1055,14 +1056,9 @@ private static org.apache.paimon.types.DataType resolveDataType( if (blobFields.contains(fieldName)) { return toBlobType(logicalType); } - if (logicalType instanceof org.apache.flink.table.types.logical.ArrayType) { - String vectorDim = options.get(String.format("field.%s.vector-dim", fieldName)); - if (vectorDim != null) { - org.apache.flink.table.types.logical.LogicalType elementType = - ((org.apache.flink.table.types.logical.ArrayType) logicalType) - .getElementType(); - return toVectorType(elementType, vectorDim); - } + Set vectorFields = CoreOptions.vectorField(options); + if (vectorFields.contains(fieldName)) { + return toVectorType(fieldName, logicalType, options); } return toDataType(logicalType); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/LogicalTypeConversion.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/LogicalTypeConversion.java index c83e85d6bccc..556dbd95ff31 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/LogicalTypeConversion.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/LogicalTypeConversion.java @@ -18,6 +18,7 @@ package org.apache.paimon.flink; +import org.apache.paimon.CoreOptions; import org.apache.paimon.types.BlobType; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; @@ -28,6 +29,7 @@ import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.VarBinaryType; +import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import static org.apache.paimon.utils.Preconditions.checkArgument; @@ -51,10 +53,28 @@ public static BlobType toBlobType(LogicalType logicalType) { return new BlobType(); } - public static VectorType toVectorType(LogicalType elementType, String vectorDim) { + public static VectorType toVectorType( + String fieldName, + org.apache.flink.table.types.logical.LogicalType logicalType, + Map options) { + checkArgument( + logicalType instanceof org.apache.flink.table.types.logical.ArrayType, + "Only array type can be converted to Paimon vector type."); + org.apache.flink.table.types.logical.LogicalType elementType = + ((org.apache.flink.table.types.logical.ArrayType) logicalType).getElementType(); + + String dimKey = String.format("field.%s.vector-dim", fieldName); + checkArgument( + options.containsKey(dimKey), + "When setting '" + + CoreOptions.VECTOR_FIELD.key() + + "', you must also set 'field.%s.vector-dim'," + + " where %s is the name of the vector field."); + String vectorDim = options.get(dimKey); checkArgument( !vectorDim.trim().isEmpty(), "Expected an integer for vector-dim, but got empty value."); + try { int dim = Integer.parseInt(vectorDim); return DataTypes.VECTOR(dim, toDataType(elementType)); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataEvolutionTableCompactSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataEvolutionTableCompactSource.java index b0f0ca7d152e..f712c471e438 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataEvolutionTableCompactSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataEvolutionTableCompactSource.java @@ -79,7 +79,8 @@ public static class CompactSourceReader private final DataEvolutionCompactCoordinator compactionCoordinator; public CompactSourceReader(FileStoreTable table, PartitionPredicate partitions) { - compactionCoordinator = new DataEvolutionCompactCoordinator(table, partitions, false); + compactionCoordinator = + new DataEvolutionCompactCoordinator(table, partitions, false, false); } @Override diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LogicalTypeConversionTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LogicalTypeConversionTest.java index 142b53c7cb22..0cc91f7f1287 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LogicalTypeConversionTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LogicalTypeConversionTest.java @@ -23,9 +23,14 @@ import org.apache.flink.table.types.logical.ArrayType; import org.apache.flink.table.types.logical.FloatType; +import org.apache.flink.table.types.logical.IntType; import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.MapType; import org.junit.jupiter.api.Test; +import java.util.HashMap; +import java.util.Map; + import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -34,17 +39,61 @@ public class LogicalTypeConversionTest { @Test public void testToVectorType() { - VectorType vectorType = LogicalTypeConversion.toVectorType(new FloatType(), "3"); + Map options = new HashMap<>(); + options.put("vector-field", "v"); + options.put("field.v.vector-dim", "3"); + LogicalType flinkType = makeVectorLogicalType(new FloatType()); + VectorType vectorType = LogicalTypeConversion.toVectorType("v", flinkType, options); assertThat(vectorType).isEqualTo(DataTypes.VECTOR(3, DataTypes.FLOAT())); } + @Test + public void testToVectorTypeInvalidLogicalType() { + Map options = new HashMap<>(); + options.put("vector-field", "v"); + options.put("field.v.vector-dim", "3"); + assertThatThrownBy(() -> LogicalTypeConversion.toVectorType("v", new FloatType(), options)) + .isInstanceOf(IllegalArgumentException.class); + assertThatThrownBy(() -> LogicalTypeConversion.toVectorType("v", new IntType(), options)) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + public void testToVectorTypeInvalidElementType() { + Map options = new HashMap<>(); + options.put("vector-field", "v"); + options.put("field.v.vector-dim", "3"); + LogicalType type1 = makeVectorLogicalType(new ArrayType(new FloatType())); + assertThatThrownBy(() -> LogicalTypeConversion.toVectorType("v", type1, options)) + .isInstanceOf(IllegalArgumentException.class); + LogicalType type2 = makeVectorLogicalType(new MapType(new IntType(), new FloatType())); + assertThatThrownBy(() -> LogicalTypeConversion.toVectorType("v", type2, options)) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + public void testToVectorTypeNoDim() { + Map options = new HashMap<>(); + options.put("vector-field", "v"); + // options.put("field.v.vector-dim", "3"); + LogicalType flinkType = makeVectorLogicalType(new FloatType()); + assertThatThrownBy(() -> LogicalTypeConversion.toVectorType("v", flinkType, options)) + .isInstanceOf(IllegalArgumentException.class); + } + @Test public void testToVectorTypeInvalidDim() { - assertThatThrownBy(() -> LogicalTypeConversion.toVectorType(new FloatType(), "")) + Map options = new HashMap<>(); + options.put("vector-field", "v"); + LogicalType flinkType = makeVectorLogicalType(new FloatType()); + options.put("field.v.vector-dim", ""); + assertThatThrownBy(() -> LogicalTypeConversion.toVectorType("v", flinkType, options)) .isInstanceOf(IllegalArgumentException.class); - assertThatThrownBy(() -> LogicalTypeConversion.toVectorType(new FloatType(), "abc")) + options.put("field.v.vector-dim", "abc"); + assertThatThrownBy(() -> LogicalTypeConversion.toVectorType("v", flinkType, options)) .isInstanceOf(IllegalArgumentException.class); - assertThatThrownBy(() -> LogicalTypeConversion.toVectorType(new FloatType(), "0")) + options.put("field.v.vector-dim", "0"); + assertThatThrownBy(() -> LogicalTypeConversion.toVectorType("v", flinkType, options)) .isInstanceOf(IllegalArgumentException.class); } @@ -56,4 +105,8 @@ public void testVectorTypeToLogicalType() { ArrayType arrayType = (ArrayType) logicalType; assertThat(arrayType.getElementType()).isInstanceOf(FloatType.class); } + + private LogicalType makeVectorLogicalType(LogicalType elementType) { + return new ArrayType(elementType); + } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/VectorTypeTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/VectorTypeTableITCase.java index 8c918430e6dd..1b1ae3db3deb 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/VectorTypeTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/VectorTypeTableITCase.java @@ -167,6 +167,7 @@ private String getCreateTableDdl() { + ") WITH (" + " 'file.format' = 'json'," + " 'file.compression' = 'none'," + + " 'vector-field' = 'embed'," + " 'field.embed.vector-dim' = '%d'" + ")", testTblName, testVector.length); diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java index 0fe8a76b86a8..7f8c89dcbacf 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java @@ -491,7 +491,7 @@ private void compactDataEvolutionTable( JavaSparkContext javaSparkContext) { List compactionTasks; DataEvolutionCompactCoordinator compactCoordinator = - new DataEvolutionCompactCoordinator(table, partitionPredicate, false); + new DataEvolutionCompactCoordinator(table, partitionPredicate, false, false); CommitMessageSerializer messageSerializerser = new CommitMessageSerializer(); String commitUser = createCommitUser(table.coreOptions().toConfiguration()); try {