From aac6d5756aab8d065af6d01bf342e67e50cac2e2 Mon Sep 17 00:00:00 2001 From: uchenily Date: Wed, 1 Jul 2026 14:06:15 +0800 Subject: [PATCH] [feat](streamload): add enable_text_validate_utf8 http header --- be/src/service/http/action/stream_load.cpp | 7 +++ be/src/service/http/http_common.h | 1 + .../nereids/load/NereidsDataDescription.java | 2 + .../nereids/load/NereidsLoadTaskInfo.java | 4 ++ .../nereids/load/NereidsStreamLoadTask.java | 9 +++ gensrc/thrift/FrontendService.thrift | 1 + ...stream_load_disable_text_validate_utf8.out | 10 ++++ ...eam_load_disable_text_validate_utf8.groovy | 60 +++++++++++++++++++ 8 files changed, 94 insertions(+) create mode 100644 regression-test/data/load_p0/stream_load/test_stream_load_disable_text_validate_utf8.out create mode 100644 regression-test/suites/load_p0/stream_load/test_stream_load_disable_text_validate_utf8.groovy diff --git a/be/src/service/http/action/stream_load.cpp b/be/src/service/http/action/stream_load.cpp index 5d9cf40aea7483..e5118150c23998 100644 --- a/be/src/service/http/action/stream_load.cpp +++ b/be/src/service/http/action/stream_load.cpp @@ -678,6 +678,13 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, } request.__set_skip_lines(skip_lines); } + if (!http_req->header(HTTP_ENABLE_TEXT_VALIDATE_UTF8).empty()) { + if (iequal(http_req->header(HTTP_ENABLE_TEXT_VALIDATE_UTF8), "true")) { + request.__set_enable_text_validate_utf8(true); + } else { + request.__set_enable_text_validate_utf8(false); + } + } if (!http_req->header(HTTP_ENABLE_PROFILE).empty()) { if (iequal(http_req->header(HTTP_ENABLE_PROFILE), "true")) { request.__set_enable_profile(true); diff --git a/be/src/service/http/http_common.h b/be/src/service/http/http_common.h index f250e22a278e55..2f05cf0f375836 100644 --- a/be/src/service/http/http_common.h +++ b/be/src/service/http/http_common.h @@ -56,6 +56,7 @@ static const std::string HTTP_LOAD_TO_SINGLE_TABLET = "load_to_single_tablet"; static const std::string HTTP_HIDDEN_COLUMNS = "hidden_columns"; static const std::string HTTP_TRIM_DOUBLE_QUOTES = "trim_double_quotes"; static const std::string HTTP_SKIP_LINES = "skip_lines"; +static const std::string HTTP_ENABLE_TEXT_VALIDATE_UTF8 = "enable_text_validate_utf8"; static const std::string HTTP_COMMENT = "comment"; static const std::string HTTP_ENABLE_PROFILE = "enable_profile"; static const std::string HTTP_PARTIAL_COLUMNS = "partial_columns"; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsDataDescription.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsDataDescription.java index 8e7b45f48c29aa..34b5ae744fb6e6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsDataDescription.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsDataDescription.java @@ -425,6 +425,8 @@ public NereidsDataDescription(String tableName, NereidsLoadTaskInfo taskInfo) { String.valueOf(taskInfo.getTrimDoubleQuotes())); putAnalysisMapIfNonNull(CsvFileFormatProperties.PROP_SKIP_LINES, String.valueOf(taskInfo.getSkipLines())); + putAnalysisMapIfNonNull(CsvFileFormatProperties.PROP_ENABLE_TEXT_VALIDATE_UTF8, + String.valueOf(taskInfo.getEnableTextValidateUtf8())); putAnalysisMapIfNonNull(CsvFileFormatProperties.PROP_EMPTY_FIELD_AS_NULL, String.valueOf(taskInfo.getEmptyFieldAsNull())); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadTaskInfo.java index 2ece54c823ecfb..9101bf43ead019 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadTaskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadTaskInfo.java @@ -139,6 +139,10 @@ default int getSkipLines() { return 0; } + default Boolean getEnableTextValidateUtf8() { + return true; + } + default boolean getEnableProfile() { return false; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsStreamLoadTask.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsStreamLoadTask.java index f5ddca41f19a29..c38a7049dce25c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsStreamLoadTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsStreamLoadTask.java @@ -87,6 +87,7 @@ public class NereidsStreamLoadTask implements NereidsLoadTaskInfo { private TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy = TPartialUpdateNewRowPolicy.APPEND; private int skipLines = 0; + private Boolean enableTextValidateUtf8 = true; private boolean enableProfile = false; private boolean memtableOnSinkNode = false; @@ -297,6 +298,11 @@ public int getSkipLines() { return skipLines; } + @Override + public Boolean getEnableTextValidateUtf8() { + return enableTextValidateUtf8; + } + @Override public boolean getEnableProfile() { return enableProfile; @@ -491,6 +497,9 @@ private void setOptionalFromTSLPutRequest(TStreamLoadPutRequest request) throws if (request.isSetSkipLines()) { skipLines = request.getSkipLines(); } + if (request.isSetEnableTextValidateUtf8()) { + enableTextValidateUtf8 = request.isEnableTextValidateUtf8(); + } if (request.isSetEnableProfile()) { enableProfile = request.isEnableProfile(); } diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index b7111df7066908..3b0df236204d7e 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -601,6 +601,7 @@ struct TStreamLoadPutRequest { 58: optional Descriptors.TPartialUpdateNewRowPolicy partial_update_new_key_policy 59: optional bool empty_field_as_null 60: optional TCertBasedAuth cert_based_auth + 61: optional bool enable_text_validate_utf8 = true // For cloud 1000: optional string cloud_cluster diff --git a/regression-test/data/load_p0/stream_load/test_stream_load_disable_text_validate_utf8.out b/regression-test/data/load_p0/stream_load/test_stream_load_disable_text_validate_utf8.out new file mode 100644 index 00000000000000..41241923473030 --- /dev/null +++ b/regression-test/data/load_p0/stream_load/test_stream_load_disable_text_validate_utf8.out @@ -0,0 +1,10 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +4 + +-- !sql -- +1 2 3 +2 3 3 +3 4 3 +4 5 3 + diff --git a/regression-test/suites/load_p0/stream_load/test_stream_load_disable_text_validate_utf8.groovy b/regression-test/suites/load_p0/stream_load/test_stream_load_disable_text_validate_utf8.groovy new file mode 100644 index 00000000000000..d66bae90675790 --- /dev/null +++ b/regression-test/suites/load_p0/stream_load/test_stream_load_disable_text_validate_utf8.groovy @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_stream_load_disable_text_validate_utf8", "p0") { + sql """ DROP TABLE IF EXISTS test_stream_load_disable_text_validate_utf8 """ + sql """ + CREATE TABLE IF NOT EXISTS test_stream_load_disable_text_validate_utf8 ( + `k1` int(20) NULL, + `k2` bigint(20) NULL, + `v1` tinyint(4) NULL, + `v2` text NULL, + `v3` date NULL, + `v4` datetime NULL + ) ENGINE=OLAP + DUPLICATE KEY(`k1`, `k2`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 3 + PROPERTIES ("replication_allocation" = "tag.location.default: 1"); + """ + + streamLoad { + table "test_stream_load_disable_text_validate_utf8" + set 'column_separator', '\\x01' + set 'enable_text_validate_utf8', 'false' + + file 'csv_with_none_utf8_data.csv' + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(4, json.NumberTotalRows) + assertEquals(4, json.NumberLoadedRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + assertTrue(json.LoadBytes > 0) + } + } + + sql "sync" + qt_sql """select count(*) from test_stream_load_disable_text_validate_utf8; """ + qt_sql """select k1, k2, v1 from test_stream_load_disable_text_validate_utf8 order by k1; """ +}