Skip to content

Commit b61de59

Browse files
authored
[FLINK-38773][table] Fix batch vector search excnode context (#27311)
1 parent 089af1f commit b61de59

File tree

5 files changed

+13
-12
lines changed

5 files changed

+13
-12
lines changed

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecVectorSearchTableFunction.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecVectorSearchTableFunction;
2929
import org.apache.flink.table.planner.plan.nodes.exec.spec.VectorSearchSpec;
3030
import org.apache.flink.table.planner.plan.nodes.exec.spec.VectorSearchTableSourceSpec;
31-
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecVectorSearchTableFunction;
3231
import org.apache.flink.table.planner.plan.utils.FunctionCallUtil;
3332
import org.apache.flink.table.types.logical.RowType;
3433

@@ -65,9 +64,9 @@ public BatchExecVectorSearchTableFunction(
6564
String description) {
6665
this(
6766
ExecNodeContext.newNodeId(),
68-
ExecNodeContext.newContext(StreamExecVectorSearchTableFunction.class),
67+
ExecNodeContext.newContext(BatchExecVectorSearchTableFunction.class),
6968
ExecNodeContext.newPersistedConfig(
70-
StreamExecVectorSearchTableFunction.class, tableConfig),
69+
BatchExecVectorSearchTableFunction.class, tableConfig),
7170
tableSourceSpec,
7271
vectorSearchSpec,
7372
asyncOptions,

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecTableSourceScan;
4848
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecUnion;
4949
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecValues;
50+
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecVectorSearchTableFunction;
5051
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecWindowTableFunction;
5152
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecAsyncCalc;
5253
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecAsyncCorrelate;
@@ -201,6 +202,7 @@ private ExecNodeMetadataUtil() {
201202
add(BatchExecMatch.class);
202203
add(BatchExecOverAggregate.class);
203204
add(BatchExecRank.class);
205+
add(BatchExecVectorSearchTableFunction.class);
204206
}
205207
};
206208

flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-vector-search-table-function_1/async-vector-search/plan/async-vector-search.json

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
{
2-
"flinkVersion" : "2.2",
2+
"flinkVersion" : "2.3",
33
"nodes" : [ {
44
"id" : 5,
55
"type" : "batch-exec-table-source-scan_1",
@@ -24,10 +24,10 @@
2424
},
2525
"outputType" : "ROW<`id` BIGINT, `content` VARCHAR(2147483647), `vector` ARRAY<FLOAT>>",
2626
"description" : "TableSourceScan(table=[[default_catalog, default_database, src_t]], fields=[id, content, vector])",
27-
"dynamicFilteringDataListenerID" : "581f0987-928f-4c00-a1c1-500fdd1c98fb"
27+
"dynamicFilteringDataListenerID" : "7068b1b2-f209-4e4b-8f16-77adc1440cb8"
2828
}, {
2929
"id" : 6,
30-
"type" : "stream-exec-vector-search-table-function_1",
30+
"type" : "batch-exec-vector-search-table-function_1",
3131
"configuration" : {
3232
"table.exec.async-vector-search.max-concurrent-operations" : "10",
3333
"table.exec.async-vector-search.output-mode" : "ORDERED",

flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-vector-search-table-function_1/sync-vector-search/plan/sync-vector-search.json

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
{
2-
"flinkVersion" : "2.2",
2+
"flinkVersion" : "2.3",
33
"nodes" : [ {
44
"id" : 1,
55
"type" : "batch-exec-table-source-scan_1",
@@ -24,10 +24,10 @@
2424
},
2525
"outputType" : "ROW<`id` BIGINT, `content` VARCHAR(2147483647), `vector` ARRAY<FLOAT>>",
2626
"description" : "TableSourceScan(table=[[default_catalog, default_database, src_t]], fields=[id, content, vector])",
27-
"dynamicFilteringDataListenerID" : "5e850a71-a459-4906-9822-22c324f6f4c9"
27+
"dynamicFilteringDataListenerID" : "19f6b4e7-e167-4785-8b10-4ab9bb286201"
2828
}, {
2929
"id" : 2,
30-
"type" : "stream-exec-vector-search-table-function_1",
30+
"type" : "batch-exec-vector-search-table-function_1",
3131
"configuration" : {
3232
"table.exec.async-vector-search.max-concurrent-operations" : "10",
3333
"table.exec.async-vector-search.output-mode" : "ORDERED",

flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-vector-search-table-function_1/vector-search-with-runtime-config/plan/vector-search-with-runtime-config.json

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
{
2-
"flinkVersion" : "2.2",
2+
"flinkVersion" : "2.3",
33
"nodes" : [ {
44
"id" : 9,
55
"type" : "batch-exec-table-source-scan_1",
@@ -24,10 +24,10 @@
2424
},
2525
"outputType" : "ROW<`id` BIGINT, `content` VARCHAR(2147483647), `vector` ARRAY<FLOAT>>",
2626
"description" : "TableSourceScan(table=[[default_catalog, default_database, src_t]], fields=[id, content, vector])",
27-
"dynamicFilteringDataListenerID" : "21fdf54d-fce6-42da-bcd3-a522ebee5cd6"
27+
"dynamicFilteringDataListenerID" : "733d0e86-4dba-4d5e-986a-22f1d385c6ef"
2828
}, {
2929
"id" : 10,
30-
"type" : "stream-exec-vector-search-table-function_1",
30+
"type" : "batch-exec-vector-search-table-function_1",
3131
"configuration" : {
3232
"table.exec.async-vector-search.max-concurrent-operations" : "10",
3333
"table.exec.async-vector-search.output-mode" : "ORDERED",

0 commit comments

Comments
 (0)