Skip to content

Commit 5b9755a

Browse files
kardymondsGrigoriyPA
authored andcommitted
YQ-4867 Add streaming query name to checkpoint metrics (#28519)
1 parent 42aec1f commit 5b9755a

File tree

9 files changed

+52
-38
lines changed

9 files changed

+52
-38
lines changed

ydb/core/kqp/common/events/events.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ struct TEvKqp {
140140
bool DisableDefaultTimeout = false;
141141
i64 Generation = 1;
142142
TString CheckpointId;
143+
TString StreamingQueryPath;
143144
};
144145

145146
struct TEvScriptResponse : public TEventLocal<TEvScriptResponse, TKqpEvents::EvScriptResponse> {

ydb/core/kqp/common/kqp_user_request_context.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ namespace NKikimr::NKqp {
2121
std::optional<NResourcePool::TPoolSettings> PoolConfig;
2222
bool IsStreamingQuery = false;
2323
TString CheckpointId;
24+
TString StreamingQueryPath;
2425

2526
TUserRequestContext() = default;
2627

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2887,7 +2887,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
28872887
NYql::NDq::MakeCheckpointStorageID(),
28882888
SelfId(),
28892889
{},
2890-
Counters->Counters->GetKqpCounters(),
2890+
Counters->Counters->GetKqpCounters()->GetSubgroup("path", context->StreamingQueryPath),
28912891
graphParams,
28922892
stateLoadMode,
28932893
streamingDisposition).Release());

ydb/core/kqp/gateway/behaviour/streaming_query/queries.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1746,6 +1746,7 @@ class TStartStreamingQueryTableActor final : public TActionActorBase<TStartStrea
17461746
ev->ForgetAfter = TDuration::Max();
17471747
ev->Generation = PreviousGeneration + 1;
17481748
ev->CheckpointId = State.GetCheckpointId();
1749+
ev->StreamingQueryPath = QueryPath;
17491750

17501751
if (const auto statsPeriod = AppData()->QueryServiceConfig.GetProgressStatsPeriodMs()) {
17511752
ev->ProgressStatsPeriod = TDuration::MilliSeconds(statsPeriod);

ydb/core/kqp/proxy_service/kqp_script_executions.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -562,6 +562,7 @@ class TCreateScriptExecutionActor : public TActorBootstrapped<TCreateScriptExecu
562562
.PhysicalGraph = ev.QueryPhysicalGraph,
563563
.DisableDefaultTimeout = ev.DisableDefaultTimeout,
564564
.CheckpointId = ev.CheckpointId,
565+
.StreamingQueryPath = ev.StreamingQueryPath
565566
}, QueryServiceConfig));
566567

567568
const auto& creatorId = Register(new TCreateScriptOperationQuery(ExecutionId, RunScriptActorId, ev.Record, meta, MaxRunTime, GetRetryState(), ev.QueryPhysicalGraph, QueryServiceConfig, ev.Generation));
@@ -615,6 +616,7 @@ class TCreateScriptExecutionActor : public TActorBootstrapped<TCreateScriptExecu
615616
meta.SetTraceId(eventProto.GetTraceId());
616617
meta.SetResourcePoolId(request.GetPoolId());
617618
meta.SetCheckpointId(ev.CheckpointId);
619+
meta.SetStreamingQueryPath(ev.StreamingQueryPath);
618620
meta.SetClientAddress(request.GetClientAddress());
619621
meta.SetCollectStats(request.GetCollectStats());
620622
meta.SetSaveQueryPhysicalGraph(ev.SaveQueryPhysicalGraph);
@@ -1092,6 +1094,7 @@ class TRestartScriptOperationQuery : public TQueryBase {
10921094
.PhysicalGraph = std::move(physicalGraph),
10931095
.DisableDefaultTimeout = meta.GetDisableDefaultTimeout(),
10941096
.CheckpointId = meta.GetCheckpointId(),
1097+
.StreamingQueryPath = meta.GetStreamingQueryPath()
10951098
}, QueryServiceConfig));
10961099

10971100
KQP_PROXY_LOG_D("Restart with RunScriptActorId: " << RunScriptActorId << ", has PhysicalGraph: " << hasPhysicalGraph);

ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ class TRunScriptActor : public NActors::TActorBootstrapped<TRunScriptActor> {
127127
, DisableDefaultTimeout(settings.DisableDefaultTimeout)
128128
, CheckpointId(settings.CheckpointId)
129129
, PhysicalGraph(std::move(settings.PhysicalGraph))
130+
, StreamingQueryPath(settings.StreamingQueryPath)
130131
, Counters(settings.Counters)
131132
{}
132133

@@ -144,6 +145,7 @@ class TRunScriptActor : public NActors::TActorBootstrapped<TRunScriptActor> {
144145
);
145146
UserRequestContext->IsStreamingQuery = SaveQueryPhysicalGraph;
146147
UserRequestContext->CheckpointId = CheckpointId;
148+
UserRequestContext->StreamingQueryPath = StreamingQueryPath;
147149

148150
LOG_I("Bootstrap");
149151

@@ -969,6 +971,7 @@ class TRunScriptActor : public NActors::TActorBootstrapped<TRunScriptActor> {
969971
const bool DisableDefaultTimeout = false;
970972
const TString CheckpointId;
971973
std::optional<NKikimrKqp::TQueryPhysicalGraph> PhysicalGraph;
974+
const TString StreamingQueryPath;
972975
std::optional<TActorId> PhysicalGraphSender;
973976
TIntrusivePtr<TKqpCounters> Counters;
974977
TString SessionId;

ydb/core/kqp/run_script_actor/kqp_run_script_actor.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ struct TKqpRunScriptActorSettings {
2525
std::optional<NKikimrKqp::TQueryPhysicalGraph> PhysicalGraph;
2626
bool DisableDefaultTimeout = false;
2727
TString CheckpointId;
28+
TString StreamingQueryPath;
2829
};
2930

3031
NActors::IActor* CreateRunScriptActor(const NKikimrKqp::TEvQueryRequest& request, TKqpRunScriptActorSettings&& settings, NKikimrConfig::TQueryServiceConfig queryServiceConfig);

ydb/core/protos/kqp.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -925,6 +925,7 @@ message TScriptExecutionOperationMeta {
925925
optional bool SaveQueryPhysicalGraph = 12;
926926
optional bool DisableDefaultTimeout = 13;
927927
optional string CheckpointId = 14;
928+
optional string StreamingQueryPath = 15;
928929
}
929930

930931
// stored in column "retry_state" of .metadata/script_executions table

ydb/tests/fq/streaming/test_streaming.py

Lines changed: 40 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,13 @@ def get_sensors(self, kikimr, node_id, counters):
3131
url = self.monitoring_endpoint(kikimr, node_id) + "/counters/counters={}/json".format(counters)
3232
return load_metrics(url)
3333

34-
def get_checkpoint_coordinator_metric(self, kikimr, query_id, metric_name, expect_counters_exist=False):
34+
def get_checkpoint_coordinator_metric(self, kikimr, path, metric_name, expect_counters_exist=False):
3535
sum = 0
3636
found = False
3737
for node_id in kikimr.Cluster.nodes:
3838
sensor = self.get_sensors(kikimr, node_id, "kqp").find_sensor(
3939
{
40-
# "query_id": query_id, # TODO
40+
"path": path,
4141
"subsystem": "checkpoint_coordinator",
4242
"sensor": metric_name
4343
}
@@ -48,16 +48,16 @@ def get_checkpoint_coordinator_metric(self, kikimr, query_id, metric_name, expec
4848
assert found or not expect_counters_exist
4949
return sum
5050

51-
def get_completed_checkpoints(self, kikimr, query_id):
52-
return self.get_checkpoint_coordinator_metric(kikimr, query_id, "CompletedCheckpoints")
51+
def get_completed_checkpoints(self, kikimr, path):
52+
return self.get_checkpoint_coordinator_metric(kikimr, path, "CompletedCheckpoints")
5353

54-
def wait_completed_checkpoints(self, kikimr, query_id,
54+
def wait_completed_checkpoints(self, kikimr, path,
5555
timeout=plain_or_under_sanitizer_wrapper(120, 150)):
56-
current = self.get_checkpoint_coordinator_metric(kikimr, query_id, "CompletedCheckpoints")
56+
current = self.get_checkpoint_coordinator_metric(kikimr, path, "CompletedCheckpoints")
5757
checkpoints_count = current + 2
5858
deadline = time.time() + timeout
5959
while True:
60-
completed = self.get_completed_checkpoints(kikimr, query_id)
60+
completed = self.get_completed_checkpoints(kikimr, path)
6161
if completed >= checkpoints_count:
6262
break
6363
assert time.time() < deadline, "Wait checkpoint failed, actual completed: " + str(completed)
@@ -125,16 +125,16 @@ def test_restart_query(self, kikimr):
125125
INSERT INTO {source_name}.`{output_topic}` SELECT time FROM $in;
126126
END DO;'''
127127

128-
query_id = "query_id" # TODO
128+
path = f"/Root/{name}"
129129
kikimr.YdbClient.query(sql.format(query_name=name, source_name=sourceName, input_topic=self.input_topic, output_topic=self.output_topic))
130-
self.wait_completed_checkpoints(kikimr, query_id)
130+
self.wait_completed_checkpoints(kikimr, path)
131131

132132
data = ['{"time": "lunch time"}']
133133
expected_data = ['lunch time']
134134
self.write_stream(data)
135135

136136
assert self.read_stream(len(expected_data), topic_path=self.output_topic) == expected_data
137-
self.wait_completed_checkpoints(kikimr, query_id)
137+
self.wait_completed_checkpoints(kikimr, path)
138138

139139
kikimr.YdbClient.query(f"ALTER STREAMING QUERY `{name}` SET (RUN = FALSE);")
140140
time.sleep(0.5)
@@ -164,21 +164,22 @@ def test_read_topic_shared_reading_insert_to_topic(self, kikimr):
164164
INSERT INTO {source_name}.`{output_topic}` SELECT time FROM $in;
165165
END DO;'''
166166

167-
kikimr.YdbClient.query(sql.format(query_name="query1", source_name=sourceName, input_topic=self.input_topic, output_topic=self.output_topic))
168-
kikimr.YdbClient.query(sql.format(query_name="query2", source_name=sourceName, input_topic=self.input_topic, output_topic=self.output_topic))
169-
170-
query_id = "query_id" # TODO
171-
self.wait_completed_checkpoints(kikimr, query_id)
167+
query_name1 = "test_read_topic_shared_reading_insert_to_topic1"
168+
query_name2 = "test_read_topic_shared_reading_insert_to_topic2"
169+
kikimr.YdbClient.query(sql.format(query_name=query_name1, source_name=sourceName, input_topic=self.input_topic, output_topic=self.output_topic))
170+
kikimr.YdbClient.query(sql.format(query_name=query_name2, source_name=sourceName, input_topic=self.input_topic, output_topic=self.output_topic))
171+
path1 = f"/Root/{query_name1}"
172+
self.wait_completed_checkpoints(kikimr, path1)
172173

173174
data = ['{"time": "lunch time"}']
174175
expected_data = ['lunch time', 'lunch time']
175176
self.write_stream(data)
176177
assert self.read_stream(len(expected_data), topic_path=self.output_topic) == expected_data
177-
self.wait_completed_checkpoints(kikimr, query_id)
178+
self.wait_completed_checkpoints(kikimr, path1)
178179

179180
sql = R'''ALTER STREAMING QUERY `{query_name}` SET (RUN = FALSE);'''
180-
kikimr.YdbClient.query(sql.format(query_name="query1"))
181-
kikimr.YdbClient.query(sql.format(query_name="query2"))
181+
kikimr.YdbClient.query(sql.format(query_name=query_name1))
182+
kikimr.YdbClient.query(sql.format(query_name=query_name2))
182183

183184
time.sleep(1)
184185

@@ -187,13 +188,13 @@ def test_read_topic_shared_reading_insert_to_topic(self, kikimr):
187188
self.write_stream(data)
188189

189190
sql = R'''ALTER STREAMING QUERY `{query_name}` SET (RUN = TRUE);'''
190-
kikimr.YdbClient.query(sql.format(query_name="query1"))
191-
kikimr.YdbClient.query(sql.format(query_name="query2"))
191+
kikimr.YdbClient.query(sql.format(query_name=query_name1))
192+
kikimr.YdbClient.query(sql.format(query_name=query_name2))
192193
assert self.read_stream(len(expected_data), topic_path=self.output_topic) == expected_data
193194

194195
sql = R'''DROP STREAMING QUERY `{query_name}`;'''
195-
kikimr.YdbClient.query(sql.format(query_name="query1"))
196-
kikimr.YdbClient.query(sql.format(query_name="query2"))
196+
kikimr.YdbClient.query(sql.format(query_name=query_name1))
197+
kikimr.YdbClient.query(sql.format(query_name=query_name2))
197198

198199
def test_read_topic_shared_reading_restart_nodes(self, kikimr):
199200
sourceName = "source_" + ''.join(random.choices(string.ascii_letters + string.digits, k=8))
@@ -211,14 +212,15 @@ def test_read_topic_shared_reading_restart_nodes(self, kikimr):
211212
INSERT INTO {source_name}.`{output_topic}` SELECT value FROM $in;
212213
END DO;'''
213214

214-
kikimr.YdbClient.query(sql.format(query_name="query1", source_name=sourceName, input_topic=self.input_topic, output_topic=self.output_topic))
215-
query_id = "query_id" # TODO
216-
self.wait_completed_checkpoints(kikimr, query_id)
215+
query_name = "test_read_topic_shared_reading_restart_nodes"
216+
kikimr.YdbClient.query(sql.format(query_name=query_name, source_name=sourceName, input_topic=self.input_topic, output_topic=self.output_topic))
217+
path = f"/Root/{query_name}"
218+
self.wait_completed_checkpoints(kikimr, path)
217219

218220
self.write_stream(['{"value": "value1"}'])
219221
expected_data = ['value1']
220222
assert self.read_stream(len(expected_data), topic_path=self.output_topic) == expected_data
221-
self.wait_completed_checkpoints(kikimr, query_id)
223+
self.wait_completed_checkpoints(kikimr, path)
222224

223225
restart_node_id = None
224226
for node_id in kikimr.Cluster.nodes:
@@ -234,7 +236,7 @@ def test_read_topic_shared_reading_restart_nodes(self, kikimr):
234236
self.write_stream(['{"value": "value2"}'])
235237
expected_data = ['value2']
236238
assert self.read_stream(len(expected_data), topic_path=self.output_topic) == expected_data
237-
self.wait_completed_checkpoints(kikimr, query_id)
239+
self.wait_completed_checkpoints(kikimr, path)
238240

239241
def test_read_topic_restore_state(self, kikimr):
240242
sourceName = "source4_" + ''.join(random.choices(string.ascii_letters + string.digits, k=8))
@@ -267,9 +269,10 @@ def test_read_topic_restore_state(self, kikimr):
267269
SELECT ToBytes(Unwrap(Json::SerializeJson(Yson::From(TableRow())))) FROM $mr;
268270
END DO;'''
269271

270-
kikimr.YdbClient.query(sql.format(query_name="query1", source_name=sourceName, input_topic=self.input_topic, output_topic=self.output_topic))
271-
query_id = "query_id" # TODO
272-
self.wait_completed_checkpoints(kikimr, query_id)
272+
query_name = "test_read_topic_restore_state"
273+
kikimr.YdbClient.query(sql.format(query_name=query_name, source_name=sourceName, input_topic=self.input_topic, output_topic=self.output_topic))
274+
path = f"/Root/{query_name}"
275+
self.wait_completed_checkpoints(kikimr, path)
273276

274277
data = [
275278
'{"dt": 1696849942000001, "str": "A" }',
@@ -278,7 +281,7 @@ def test_read_topic_restore_state(self, kikimr):
278281
self.write_stream(data)
279282
expected_data = ['{"a_time":1696849942000001,"b_time":1696849942500001,"c_time":null}']
280283
assert self.read_stream(len(expected_data), topic_path=self.output_topic) == expected_data
281-
self.wait_completed_checkpoints(kikimr, query_id)
284+
self.wait_completed_checkpoints(kikimr, path)
282285

283286
restart_node_id = None
284287
for node_id in kikimr.Cluster.nodes:
@@ -313,9 +316,9 @@ def test_json_errors(self, kikimr):
313316
INSERT INTO {source_name}.`{output_topic}` SELECT data FROM $in;
314317
END DO;'''
315318

316-
query_id = "query_id" # TODO
319+
path = f"/Root/{name}"
317320
kikimr.YdbClient.query(sql.format(query_name=name, source_name=sourceName, input_topic=self.input_topic, output_topic=self.output_topic))
318-
self.wait_completed_checkpoints(kikimr, query_id)
321+
self.wait_completed_checkpoints(kikimr, path)
319322

320323
data = [
321324
'{"time": 101, "data": "hello1"}',
@@ -328,7 +331,7 @@ def test_json_errors(self, kikimr):
328331
assert self.read_stream(len(expected), topic_path=self.output_topic) == expected
329332

330333
def test_restart_query_by_rescaling(self, kikimr):
331-
sourceName = ''.join(random.choices(string.ascii_letters + string.digits, k=8))
334+
sourceName = 'source' + ''.join(random.choices(string.ascii_letters + string.digits, k=8))
332335
self.init_topics(sourceName, partitions_count=10)
333336
self.create_source(kikimr, sourceName, True)
334337

@@ -347,15 +350,15 @@ def test_restart_query_by_rescaling(self, kikimr):
347350
INSERT INTO `{source_name}`.`{output_topic}` SELECT time FROM $in;
348351
END DO;'''
349352

350-
query_id = "query_id" # TODO
353+
path = f"/Root/{name}"
351354
kikimr.YdbClient.query(sql.format(query_name=name, source_name=sourceName, input_topic=self.input_topic, output_topic=self.output_topic))
352-
self.wait_completed_checkpoints(kikimr, query_id)
355+
self.wait_completed_checkpoints(kikimr, path)
353356

354357
message_count = 20
355358
for i in range(message_count):
356359
self.write_stream(['{"time": "time to do it"}'], topic_path=None, partition_key=(''.join(random.choices(string.digits, k=8))))
357360
assert self.read_stream(message_count, topic_path=self.output_topic) == ["time to do it" for i in range(message_count)]
358-
self.wait_completed_checkpoints(kikimr, query_id)
361+
self.wait_completed_checkpoints(kikimr, path)
359362

360363
logging.debug(f"stopping query {name}")
361364
kikimr.YdbClient.query(f"ALTER STREAMING QUERY `{name}` SET (RUN = FALSE);")

0 commit comments

Comments
 (0)