Skip to content

Commit 7d1a81b

Browse files
authored
merge to stable-25-3 YQ-4312 fixes for streaming queries (#28776)
2 parents c5fc4d1 + 5b9755a commit 7d1a81b

File tree

14 files changed

+323
-67
lines changed

14 files changed

+323
-67
lines changed

ydb/core/kqp/common/events/events.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ struct TEvKqp {
140140
bool DisableDefaultTimeout = false;
141141
i64 Generation = 1;
142142
TString CheckpointId;
143+
TString StreamingQueryPath;
143144
};
144145

145146
struct TEvScriptResponse : public TEventLocal<TEvScriptResponse, TKqpEvents::EvScriptResponse> {

ydb/core/kqp/common/kqp_user_request_context.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ namespace NKikimr::NKqp {
2121
std::optional<NResourcePool::TPoolSettings> PoolConfig;
2222
bool IsStreamingQuery = false;
2323
TString CheckpointId;
24+
TString StreamingQueryPath;
2425

2526
TUserRequestContext() = default;
2627

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2822,7 +2822,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
28222822
NYql::NDq::MakeCheckpointStorageID(),
28232823
SelfId(),
28242824
{},
2825-
Counters->Counters->GetKqpCounters(),
2825+
Counters->Counters->GetKqpCounters()->GetSubgroup("path", context->StreamingQueryPath),
28262826
graphParams,
28272827
stateLoadMode,
28282828
streamingDisposition).Release());

ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1559,7 +1559,6 @@ void TKqpTasksGraph::PersistTasksGraphInfo(NKikimrKqp::TQueryPhysicalGraph& resu
15591559

15601560
void TKqpTasksGraph::RestoreTasksGraphInfo(const TVector<IKqpGateway::TPhysicalTxData>& transactions, const TVector<NKikimrKqp::TKqpNodeResources>& resourcesSnapshot, const NKikimrKqp::TQueryPhysicalGraph& graphInfo) {
15611561
GetMeta().IsRestored = true;
1562-
GetMeta().AllowWithSpilling = false;
15631562

15641563
const auto restoreDqTransform = [](const auto& protoInfo) -> TMaybe<TTransform> {
15651564
if (!protoInfo.HasTransform()) {
@@ -1755,6 +1754,8 @@ void TKqpTasksGraph::RestoreTasksGraphInfo(const TVector<IKqpGateway::TPhysicalT
17551754
const auto it = scheduledTaskCount.find(stageIdx);
17561755
BuildReadTasksFromSource(stageInfo, resourcesSnapshot, it != scheduledTaskCount.end() ? it->second.TaskCount : 0);
17571756
}
1757+
1758+
GetMeta().AllowWithSpilling |= stage.GetAllowWithSpilling();
17581759
}
17591760
}
17601761
}

ydb/core/kqp/gateway/behaviour/streaming_query/queries.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1746,6 +1746,7 @@ class TStartStreamingQueryTableActor final : public TActionActorBase<TStartStrea
17461746
ev->ForgetAfter = TDuration::Max();
17471747
ev->Generation = PreviousGeneration + 1;
17481748
ev->CheckpointId = State.GetCheckpointId();
1749+
ev->StreamingQueryPath = QueryPath;
17491750

17501751
if (const auto statsPeriod = AppData()->QueryServiceConfig.GetProgressStatsPeriodMs()) {
17511752
ev->ProgressStatsPeriod = TDuration::MilliSeconds(statsPeriod);

ydb/core/kqp/provider/yql_kikimr_datasource.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -116,9 +116,9 @@ class TKiSourceIntentDeterminationTransformer: public TKiSourceVisitorTransforme
116116
TExprBase currentNode(node);
117117
if (auto maybeReadTable = currentNode.Maybe<TKiReadTable>()) {
118118
auto readTable = maybeReadTable.Cast();
119-
for (auto setting : readTable.Settings()) {
120-
auto name = setting.Name().Value();
121-
if (name == "sysViewRewritten") {
119+
for (auto setting : readTable.Settings().Ref().ChildrenList()) {
120+
auto maybeTuple = TMaybeNode<TCoNameValueTuple>(setting);
121+
if (maybeTuple && maybeTuple.Cast().Name().Value() == "sysViewRewritten"sv) {
122122
sysViewRewritten = true;
123123
}
124124
}

ydb/core/kqp/proxy_service/kqp_script_executions.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -562,6 +562,7 @@ class TCreateScriptExecutionActor : public TActorBootstrapped<TCreateScriptExecu
562562
.PhysicalGraph = ev.QueryPhysicalGraph,
563563
.DisableDefaultTimeout = ev.DisableDefaultTimeout,
564564
.CheckpointId = ev.CheckpointId,
565+
.StreamingQueryPath = ev.StreamingQueryPath
565566
}, QueryServiceConfig));
566567

567568
const auto& creatorId = Register(new TCreateScriptOperationQuery(ExecutionId, RunScriptActorId, ev.Record, meta, MaxRunTime, GetRetryState(), ev.QueryPhysicalGraph, QueryServiceConfig, ev.Generation));
@@ -615,6 +616,7 @@ class TCreateScriptExecutionActor : public TActorBootstrapped<TCreateScriptExecu
615616
meta.SetTraceId(eventProto.GetTraceId());
616617
meta.SetResourcePoolId(request.GetPoolId());
617618
meta.SetCheckpointId(ev.CheckpointId);
619+
meta.SetStreamingQueryPath(ev.StreamingQueryPath);
618620
meta.SetClientAddress(request.GetClientAddress());
619621
meta.SetCollectStats(request.GetCollectStats());
620622
meta.SetSaveQueryPhysicalGraph(ev.SaveQueryPhysicalGraph);
@@ -1092,6 +1094,7 @@ class TRestartScriptOperationQuery : public TQueryBase {
10921094
.PhysicalGraph = std::move(physicalGraph),
10931095
.DisableDefaultTimeout = meta.GetDisableDefaultTimeout(),
10941096
.CheckpointId = meta.GetCheckpointId(),
1097+
.StreamingQueryPath = meta.GetStreamingQueryPath()
10951098
}, QueryServiceConfig));
10961099

10971100
KQP_PROXY_LOG_D("Restart with RunScriptActorId: " << RunScriptActorId << ", has PhysicalGraph: " << hasPhysicalGraph);

ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ class TRunScriptActor : public NActors::TActorBootstrapped<TRunScriptActor> {
127127
, DisableDefaultTimeout(settings.DisableDefaultTimeout)
128128
, CheckpointId(settings.CheckpointId)
129129
, PhysicalGraph(std::move(settings.PhysicalGraph))
130+
, StreamingQueryPath(settings.StreamingQueryPath)
130131
, Counters(settings.Counters)
131132
{}
132133

@@ -144,6 +145,7 @@ class TRunScriptActor : public NActors::TActorBootstrapped<TRunScriptActor> {
144145
);
145146
UserRequestContext->IsStreamingQuery = SaveQueryPhysicalGraph;
146147
UserRequestContext->CheckpointId = CheckpointId;
148+
UserRequestContext->StreamingQueryPath = StreamingQueryPath;
147149

148150
LOG_I("Bootstrap");
149151

@@ -969,6 +971,7 @@ class TRunScriptActor : public NActors::TActorBootstrapped<TRunScriptActor> {
969971
const bool DisableDefaultTimeout = false;
970972
const TString CheckpointId;
971973
std::optional<NKikimrKqp::TQueryPhysicalGraph> PhysicalGraph;
974+
const TString StreamingQueryPath;
972975
std::optional<TActorId> PhysicalGraphSender;
973976
TIntrusivePtr<TKqpCounters> Counters;
974977
TString SessionId;

ydb/core/kqp/run_script_actor/kqp_run_script_actor.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ struct TKqpRunScriptActorSettings {
2525
std::optional<NKikimrKqp::TQueryPhysicalGraph> PhysicalGraph;
2626
bool DisableDefaultTimeout = false;
2727
TString CheckpointId;
28+
TString StreamingQueryPath;
2829
};
2930

3031
NActors::IActor* CreateRunScriptActor(const NKikimrKqp::TEvQueryRequest& request, TKqpRunScriptActorSettings&& settings, NKikimrConfig::TQueryServiceConfig queryServiceConfig);

ydb/core/kqp/ut/federated_query/datastreams/datastreams_ut.cpp

Lines changed: 159 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -660,35 +660,39 @@ class TStreamingTestFixture : public NUnitTest::TBaseFixture {
660660
TTypeMappingSettings typeMappingSettings;
661661
typeMappingSettings.set_date_time_format(STRING_FORMAT);
662662

663-
auto describeTableBuilder = mockClient->ExpectDescribeTable();
664-
describeTableBuilder
665-
.Table(settings.TableName)
666-
.DataSourceInstance(GetMockConnectorSourceInstance())
667-
.TypeMappingSettings(typeMappingSettings);
668-
669-
auto listSplitsBuilder = mockClient->ExpectListSplits();
670-
auto fillListSplitExpectation = listSplitsBuilder
671-
.ValidateArgs(settings.ValidateListSplitsArgs ? TConnectorClientMock::EArgsValidation::Strict : TConnectorClientMock::EArgsValidation::DataSourceInstance)
672-
.Select()
673-
.DataSourceInstance(GetMockConnectorSourceInstance())
663+
if (settings.DescribeCount) {
664+
auto describeTableBuilder = mockClient->ExpectDescribeTable();
665+
describeTableBuilder
674666
.Table(settings.TableName)
675-
.What();
676-
677-
FillMockConnectorRequestColumns(fillListSplitExpectation, settings.Columns);
667+
.DataSourceInstance(GetMockConnectorSourceInstance())
668+
.TypeMappingSettings(typeMappingSettings);
678669

679-
for (ui64 i = 0; i < settings.DescribeCount; ++i) {
680-
auto responseBuilder = describeTableBuilder.Response();
681-
FillMockConnectorRequestColumns(responseBuilder, settings.Columns);
670+
for (ui64 i = 0; i < settings.DescribeCount; ++i) {
671+
auto responseBuilder = describeTableBuilder.Response();
672+
FillMockConnectorRequestColumns(responseBuilder, settings.Columns);
673+
}
682674
}
683675

684-
for (ui64 i = 0; i < settings.ListSplitsCount; ++i) {
685-
auto responseBuilder = listSplitsBuilder.Result()
686-
.AddResponse(NYql::NConnector::NewSuccess())
687-
.Description("some binary description")
688-
.Select()
689-
.DataSourceInstance(GetMockConnectorSourceInstance())
690-
.What();
691-
FillMockConnectorRequestColumns(responseBuilder, settings.Columns);
676+
if (settings.ListSplitsCount) {
677+
auto listSplitsBuilder = mockClient->ExpectListSplits();
678+
auto fillListSplitExpectation = listSplitsBuilder
679+
.ValidateArgs(settings.ValidateListSplitsArgs ? TConnectorClientMock::EArgsValidation::Strict : TConnectorClientMock::EArgsValidation::DataSourceInstance)
680+
.Select()
681+
.DataSourceInstance(GetMockConnectorSourceInstance())
682+
.Table(settings.TableName)
683+
.What();
684+
685+
FillMockConnectorRequestColumns(fillListSplitExpectation, settings.Columns);
686+
687+
for (ui64 i = 0; i < settings.ListSplitsCount; ++i) {
688+
auto responseBuilder = listSplitsBuilder.Result()
689+
.AddResponse(NYql::NConnector::NewSuccess())
690+
.Description("some binary description")
691+
.Select()
692+
.DataSourceInstance(GetMockConnectorSourceInstance())
693+
.What();
694+
FillMockConnectorRequestColumns(responseBuilder, settings.Columns);
695+
}
692696
}
693697
}
694698

@@ -1561,6 +1565,51 @@ Y_UNIT_TEST_SUITE(KqpFederatedQueryDatastreams) {
15611565
UNIT_ASSERT_STRING_CONTAINS(readyOp.Status().GetIssues().ToString(), "Runtime listing is not supported for streaming queries, pragma value was ignored");
15621566
UNIT_ASSERT_VALUES_EQUAL(GetAllObjects(sourceBucket), "{\"data\":\"x\"}\n{\"data\": \"x\"}");
15631567
}
1568+
1569+
Y_UNIT_TEST_F(CrossJoinWithNotExistingDataSource, TStreamingTestFixture) {
1570+
const auto connectorClient = SetupMockConnectorClient();
1571+
1572+
constexpr char ydbSourceName[] = "ydbSourceName";
1573+
CreateYdbSource(ydbSourceName);
1574+
1575+
constexpr char ydbTable[] = "unknownSourceLookup";
1576+
ExecExternalQuery(fmt::format(R"(
1577+
CREATE TABLE `{table}` (
1578+
fqdn String,
1579+
payload String,
1580+
PRIMARY KEY (fqdn)
1581+
))",
1582+
"table"_a = ydbTable
1583+
));
1584+
1585+
{ // Prepare connector mock
1586+
const std::vector<TColumn> columns = {
1587+
{"fqdn", Ydb::Type::STRING},
1588+
{"payload", Ydb::Type::STRING}
1589+
};
1590+
SetupMockConnectorTableDescription(connectorClient, {
1591+
.TableName = ydbTable,
1592+
.Columns = columns,
1593+
.DescribeCount = 1,
1594+
.ListSplitsCount = 0
1595+
});
1596+
}
1597+
1598+
ExecQuery(fmt::format(R"(
1599+
SELECT
1600+
*
1601+
FROM `unknown-datasource`.`unknown-topic` WITH (
1602+
FORMAT = raw,
1603+
SCHEMA (Data String NOT NULL)
1604+
) AS p
1605+
CROSS JOIN (
1606+
SELECT * FROM `{ydb_source}`.`{table}`
1607+
) AS l
1608+
)",
1609+
"ydb_source"_a = ydbSourceName,
1610+
"table"_a = ydbTable
1611+
), EStatus::SCHEME_ERROR, "Cannot find table '/Root/unknown-datasource.[unknown-topic]' because it does not exist or you do not have access permissions");
1612+
}
15641613
}
15651614

15661615
Y_UNIT_TEST_SUITE(KqpStreamingQueriesDdl) {
@@ -2298,6 +2347,91 @@ Y_UNIT_TEST_SUITE(KqpStreamingQueriesDdl) {
22982347
pqGateway->WaitWriteSession(outputTopicName)->ExpectMessages(sampleResult);
22992348
}
23002349

2350+
Y_UNIT_TEST_F(StreamingQueryWithDoubleYdbJoin, TStreamingTestFixture) {
2351+
const auto connectorClient = SetupMockConnectorClient();
2352+
const auto pqGateway = SetupMockPqGateway();
2353+
2354+
constexpr char inputTopicName[] = "doubleYdbJoinInputTopicName";
2355+
constexpr char outputTopicName[] = "doubleYdbJoinOutputTopicName";
2356+
CreateTopic(inputTopicName);
2357+
CreateTopic(outputTopicName);
2358+
2359+
constexpr char pqSourceName[] = "pqSourceName";
2360+
constexpr char ydbSourceName[] = "ydbSourceName";
2361+
CreatePqSource(pqSourceName);
2362+
CreateYdbSource(ydbSourceName);
2363+
2364+
constexpr char ydbTable[] = "doubleYdbJoinLookup";
2365+
ExecExternalQuery(fmt::format(R"(
2366+
CREATE TABLE `{table}` (
2367+
fqdn String,
2368+
PRIMARY KEY (fqdn)
2369+
))",
2370+
"table"_a = ydbTable
2371+
));
2372+
2373+
{ // Prepare connector mock
2374+
const std::vector<TColumn> columns = {{"fqdn", Ydb::Type::STRING}};
2375+
SetupMockConnectorTableDescription(connectorClient, {
2376+
.TableName = ydbTable,
2377+
.Columns = columns,
2378+
.DescribeCount = 2,
2379+
.ListSplitsCount = 1
2380+
});
2381+
2382+
const std::vector<std::string> fqdnColumn = {"host1", "host2"};
2383+
SetupMockConnectorTableData(connectorClient, {
2384+
.TableName = ydbTable,
2385+
.Columns = columns,
2386+
.NumberReadSplits = 2,
2387+
.ResultFactory = [&]() {
2388+
return MakeRecordBatch(MakeArray<arrow::BinaryBuilder>("fqdn", fqdnColumn, arrow::binary()));
2389+
}
2390+
});
2391+
}
2392+
2393+
constexpr char queryName[] = "streamingQuery";
2394+
ExecQuery(fmt::format(R"(
2395+
CREATE STREAMING QUERY `{query_name}` AS
2396+
DO BEGIN
2397+
INSERT INTO `{pq_source}`.`{output_topic}`
2398+
SELECT
2399+
p.Data || "-" || la.fqdn || "-" || lb.fqdn
2400+
FROM `{pq_source}`.`{input_topic}` AS p
2401+
CROSS JOIN `{ydb_source}`.`{ydb_table}` AS la
2402+
CROSS JOIN `{ydb_source}`.`{ydb_table}` AS lb
2403+
END DO;)",
2404+
"query_name"_a = queryName,
2405+
"pq_source"_a = pqSourceName,
2406+
"ydb_source"_a = ydbSourceName,
2407+
"ydb_table"_a = ydbTable,
2408+
"input_topic"_a = inputTopicName,
2409+
"output_topic"_a = outputTopicName
2410+
));
2411+
2412+
CheckScriptExecutionsCount(1, 1);
2413+
2414+
auto readSession = pqGateway->WaitReadSession(inputTopicName);
2415+
readSession->AddDataReceivedEvent(0, "data1");
2416+
2417+
pqGateway->WaitWriteSession(outputTopicName)->ExpectMessages({
2418+
"data1-host1-host2",
2419+
"data1-host2-host1",
2420+
"data1-host1-host1",
2421+
"data1-host2-host2"
2422+
}, /* sort */ true);
2423+
2424+
readSession->AddCloseSessionEvent(EStatus::UNAVAILABLE, {NIssue::TIssue("Test pq session failure")});
2425+
2426+
pqGateway->WaitReadSession(inputTopicName)->AddDataReceivedEvent(1, "data2");
2427+
pqGateway->WaitWriteSession(outputTopicName)->ExpectMessages({
2428+
"data2-host1-host2",
2429+
"data2-host2-host1",
2430+
"data2-host1-host1",
2431+
"data2-host2-host2"
2432+
}, /* sort */ true);
2433+
}
2434+
23012435
Y_UNIT_TEST_F(StreamingQueryWithStreamLookupJoin, TStreamingTestFixture) {
23022436
SetupAppConfig().MutableQueryServiceConfig()->SetProgressStatsPeriodMs(0);
23032437

0 commit comments

Comments
 (0)