Skip to content

Commit c5fc4d1

Browse files
authored
[Stable-25-3] EvWrite: Rollback & Stats (#28975)
2 parents 5032ba7 + 6f31f47 commit c5fc4d1

File tree

15 files changed

+866
-237
lines changed

15 files changed

+866
-237
lines changed

ydb/core/kqp/common/buffer/events.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,11 @@ namespace NKqp {
55

66
TEvKqpBuffer::TEvError::TEvError(
77
NYql::NDqProto::StatusIds::StatusCode statusCode,
8-
NYql::TIssues&& issues)
8+
NYql::TIssues&& issues,
9+
std::optional<NYql::NDqProto::TDqTaskStats>&& stats)
910
: StatusCode(statusCode)
10-
, Issues(std::move(issues)) {
11+
, Issues(std::move(issues))
12+
, Stats(std::move(stats)) {
1113
}
1214

1315
}

ydb/core/kqp/common/buffer/events.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,12 @@ struct TEvResult : public TEventLocal<TEvResult, TKqpBufferWriterEvents::EvResul
4141
struct TEvError : public TEventLocal<TEvError, TKqpBufferWriterEvents::EvError> {
4242
NYql::NDqProto::StatusIds::StatusCode StatusCode;
4343
NYql::TIssues Issues;
44+
std::optional<NYql::NDqProto::TDqTaskStats> Stats;
4445

45-
TEvError(NYql::NDqProto::StatusIds::StatusCode statusCode, NYql::TIssues&& issues);
46+
TEvError(
47+
NYql::NDqProto::StatusIds::StatusCode statusCode,
48+
NYql::TIssues&& issues,
49+
std::optional<NYql::NDqProto::TDqTaskStats>&& stats);
4650
};
4751

4852
};

ydb/core/kqp/common/kqp_tx_manager.cpp

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,16 @@ class TKqpTransactionManager : public IKqpTransactionManager {
2727
enum ETransactionState {
2828
COLLECTING,
2929
PREPARING,
30-
EXECUTING,
30+
EXECUTING,
31+
ERROR,
32+
ROLLINGBACK,
3133
};
3234
public:
3335
TKqpTransactionManager(bool collectOnly)
3436
: CollectOnly(collectOnly) {}
3537

3638
void AddShard(ui64 shardId, bool isOlap, const TString& path) override {
37-
Y_ABORT_UNLESS(State == ETransactionState::COLLECTING);
39+
Y_ABORT_UNLESS(State == ETransactionState::COLLECTING || State == ETransactionState::ERROR);
3840
ShardsIds.insert(shardId);
3941
auto& shardInfo = ShardsInfo[shardId];
4042
shardInfo.IsOlap = isOlap;
@@ -46,7 +48,7 @@ class TKqpTransactionManager : public IKqpTransactionManager {
4648
}
4749

4850
void AddAction(ui64 shardId, ui8 action) override {
49-
Y_ABORT_UNLESS(State == ETransactionState::COLLECTING);
51+
Y_ABORT_UNLESS(State == ETransactionState::COLLECTING || State == ETransactionState::ERROR);
5052
ShardsInfo.at(shardId).Flags |= action;
5153
if (action & EAction::WRITE) {
5254
ReadOnly = false;
@@ -55,7 +57,7 @@ class TKqpTransactionManager : public IKqpTransactionManager {
5557
}
5658

5759
void AddTopic(ui64 topicId, const TString& path) override {
58-
Y_ABORT_UNLESS(State == ETransactionState::COLLECTING);
60+
Y_ABORT_UNLESS(State == ETransactionState::COLLECTING || State == ETransactionState::ERROR);
5961
ShardsIds.insert(topicId);
6062
auto& shardInfo = ShardsInfo[topicId];
6163

@@ -81,7 +83,7 @@ class TKqpTransactionManager : public IKqpTransactionManager {
8183
}
8284

8385
bool AddLock(ui64 shardId, const NKikimrDataEvents::TLock& lockProto) override {
84-
Y_ABORT_UNLESS(State == ETransactionState::COLLECTING);
86+
Y_ABORT_UNLESS(State == ETransactionState::COLLECTING || State == ETransactionState::ERROR);
8587
TKqpLock lock(lockProto);
8688
bool isError = (lock.Proto.GetCounter() >= NKikimr::TSysTables::TLocksTable::TLock::ErrorMin);
8789
bool isInvalidated = (lock.Proto.GetCounter() == NKikimr::TSysTables::TLocksTable::TLock::ErrorAlreadyBroken)
@@ -112,7 +114,7 @@ class TKqpTransactionManager : public IKqpTransactionManager {
112114
broken = isInvalidated || isLocksAcquireFailure;
113115
}
114116

115-
if (broken && !LocksIssue) {
117+
if (broken && !LocksIssue && State != ETransactionState::ERROR) {
116118
if (isLocksAcquireFailure) {
117119
LocksIssue = YqlIssue(NYql::TPosition(), NYql::TIssuesIds::KIKIMR_LOCKS_ACQUIRE_FAILURE);
118120
return false;
@@ -151,6 +153,10 @@ class TKqpTransactionManager : public IKqpTransactionManager {
151153
shardInfo.State = EShardState::ERROR;
152154
}
153155

156+
void SetError() override {
157+
State = ETransactionState::ERROR;
158+
}
159+
154160
void SetPartitioning(const TTableId tableId, const std::shared_ptr<const TVector<TKeyDesc::TPartitionInfo>>& partitioning) override {
155161
TablePartitioning[tableId] = partitioning;
156162
}
@@ -474,6 +480,32 @@ class TKqpTransactionManager : public IKqpTransactionManager {
474480
return ShardsToWait.empty();
475481
}
476482

483+
const THashSet<ui64>& StartRollback() override {
484+
AFL_ENSURE(State != ETransactionState::ROLLINGBACK);
485+
State = ETransactionState::ROLLINGBACK;
486+
ShardsToWait.clear();
487+
for (auto& [shardId, shardInfo] : ShardsInfo) {
488+
if (shardInfo.State != EShardState::ERROR) {
489+
shardInfo.State = EShardState::FINISHED;
490+
}
491+
if (!shardInfo.Locks.empty()) {
492+
ShardsToWait.insert(shardId);
493+
}
494+
}
495+
496+
return ShardsToWait;
497+
}
498+
499+
bool ConsumeRollbackResult(ui64 shardId) override {
500+
AFL_ENSURE(State == ETransactionState::ROLLINGBACK);
501+
ShardsToWait.erase(shardId);
502+
return ShardsToWait.empty();
503+
}
504+
505+
bool IsRollBack() const override {
506+
return State == ETransactionState::ROLLINGBACK;
507+
}
508+
477509
private:
478510
bool CollectOnly = false;
479511
ETransactionState State = ETransactionState::COLLECTING;

ydb/core/kqp/common/kqp_tx_manager.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ class IKqpTransactionManager {
6262

6363
virtual EShardState GetState(ui64 shardId) const = 0;
6464
virtual void SetError(ui64 shardId) = 0;
65+
virtual void SetError() = 0;
6566

6667
virtual void SetPartitioning(const TTableId tableId, const std::shared_ptr<const TVector<TKeyDesc::TPartitionInfo>>& partitioning) = 0;
6768
virtual std::shared_ptr<const TVector<TKeyDesc::TPartitionInfo>> GetPartitioning(const TTableId tableId) const = 0;
@@ -138,6 +139,10 @@ class IKqpTransactionManager {
138139
virtual TCommitInfo GetCommitInfo() = 0;
139140

140141
virtual bool ConsumeCommitResult(ui64 shardId) = 0;
142+
143+
virtual const THashSet<ui64>& StartRollback() = 0;
144+
virtual bool ConsumeRollbackResult(ui64 shardId) = 0;
145+
virtual bool IsRollBack() const = 0;
141146
};
142147

143148
using IKqpTransactionManagerPtr = std::shared_ptr<IKqpTransactionManager>;

ydb/core/kqp/counters/kqp_counters.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -847,6 +847,8 @@ TKqpCounters::TKqpCounters(const ::NMonitoring::TDynamicCounterPtr& counters, co
847847
KqpGroup->GetHistogram("SinkWrites/BufferActorCommitLatencyUs", NMonitoring::ExponentialHistogram(28, 2, 1));
848848
BufferActorFlushLatencyHistogram =
849849
KqpGroup->GetHistogram("SinkWrites/BufferActorFlushLatencyUs", NMonitoring::ExponentialHistogram(28, 2, 1));
850+
BufferActorRollbackLatencyHistogram =
851+
KqpGroup->GetHistogram("SinkWrites/BufferActorRollbackLatencyUs", NMonitoring::ExponentialHistogram(28, 2, 1));
850852

851853
ForwardActorWritesSizeHistogram =
852854
KqpGroup->GetHistogram("SinkWrites/ForwardActorWritesSize", NMonitoring::ExponentialHistogram(28, 2, 1));

ydb/core/kqp/counters/kqp_counters.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -436,6 +436,7 @@ class TKqpCounters : public TKqpCountersBase, public NYql::NDq::TSpillingCounter
436436
NMonitoring::THistogramPtr BufferActorPrepareLatencyHistogram;
437437
NMonitoring::THistogramPtr BufferActorCommitLatencyHistogram;
438438
NMonitoring::THistogramPtr BufferActorFlushLatencyHistogram;
439+
NMonitoring::THistogramPtr BufferActorRollbackLatencyHistogram;
439440

440441
NMonitoring::THistogramPtr ForwardActorWritesSizeHistogram;
441442
NMonitoring::THistogramPtr ForwardActorWritesLatencyHistogram;

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 84 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -200,77 +200,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
200200
return ReplyErrorAndDie(Ydb::StatusIds::ABORTED, {});
201201
}
202202

203-
auto addLocks = [this](const ui64 taskId, const auto& data) {
204-
if (data.GetData().template Is<NKikimrTxDataShard::TEvKqpInputActorResultInfo>()) {
205-
NKikimrTxDataShard::TEvKqpInputActorResultInfo info;
206-
YQL_ENSURE(data.GetData().UnpackTo(&info), "Failed to unpack settings");
207-
NDataIntegrity::LogIntegrityTrails("InputActorResult", Request.UserTraceId, TxId, info, TlsActivationContext->AsActorContext());
208-
for (auto& lock : info.GetLocks()) {
209-
if (!TxManager) {
210-
Locks.push_back(lock);
211-
}
212-
213-
const auto& task = TasksGraph.GetTask(taskId);
214-
const auto& stageInfo = TasksGraph.GetStageInfo(task.StageId);
215-
ShardIdToTableInfo->Add(lock.GetDataShard(), stageInfo.Meta.TableKind == ETableKind::Olap, stageInfo.Meta.TablePath);
216-
217-
if (TxManager) {
218-
TxManager->AddShard(lock.GetDataShard(), stageInfo.Meta.TableKind == ETableKind::Olap, stageInfo.Meta.TablePath);
219-
TxManager->AddAction(lock.GetDataShard(), IKqpTransactionManager::EAction::READ);
220-
TxManager->AddLock(lock.GetDataShard(), lock);
221-
}
222-
}
223-
224-
if (!BatchOperationSettings.Empty() && info.HasBatchOperationMaxKey()) {
225-
if (ResponseEv->BatchOperationMaxKeys.empty()) {
226-
for (auto keyId : info.GetBatchOperationKeyIds()) {
227-
ResponseEv->BatchOperationKeyIds.push_back(keyId);
228-
}
229-
}
230-
231-
ResponseEv->BatchOperationMaxKeys.emplace_back(info.GetBatchOperationMaxKey());
232-
}
233-
} else if (data.GetData().template Is<NKikimrKqp::TEvKqpOutputActorResultInfo>()) {
234-
NKikimrKqp::TEvKqpOutputActorResultInfo info;
235-
YQL_ENSURE(data.GetData().UnpackTo(&info), "Failed to unpack settings");
236-
NDataIntegrity::LogIntegrityTrails("OutputActorResult", Request.UserTraceId, TxId, info, TlsActivationContext->AsActorContext());
237-
for (auto& lock : info.GetLocks()) {
238-
if (!TxManager) {
239-
Locks.push_back(lock);
240-
}
241-
242-
const auto& task = TasksGraph.GetTask(taskId);
243-
const auto& stageInfo = TasksGraph.GetStageInfo(task.StageId);
244-
ShardIdToTableInfo->Add(lock.GetDataShard(), stageInfo.Meta.TableKind == ETableKind::Olap, stageInfo.Meta.TablePath);
245-
if (TxManager) {
246-
YQL_ENSURE(stageInfo.Meta.TableKind == ETableKind::Olap);
247-
IKqpTransactionManager::TActionFlags flags = IKqpTransactionManager::EAction::WRITE;
248-
if (info.GetHasRead()) {
249-
flags |= IKqpTransactionManager::EAction::READ;
250-
}
251-
252-
TxManager->AddShard(lock.GetDataShard(), stageInfo.Meta.TableKind == ETableKind::Olap, stageInfo.Meta.TablePath);
253-
TxManager->AddAction(lock.GetDataShard(), flags);
254-
TxManager->AddLock(lock.GetDataShard(), lock);
255-
}
256-
}
257-
}
258-
};
259-
260-
for (auto& [_, extraData] : ExtraData) {
261-
for (const auto& source : extraData.Data.GetSourcesExtraData()) {
262-
addLocks(extraData.TaskId, source);
263-
}
264-
for (const auto& transform : extraData.Data.GetInputTransformsData()) {
265-
addLocks(extraData.TaskId, transform);
266-
}
267-
for (const auto& sink : extraData.Data.GetSinksExtraData()) {
268-
addLocks(extraData.TaskId, sink);
269-
}
270-
if (extraData.Data.HasComputeExtraData()) {
271-
addLocks(extraData.TaskId, extraData.Data.GetComputeExtraData());
272-
}
273-
}
203+
FillLocksFromExtraData();
274204

275205
if (TxManager) {
276206
TxManager->SetHasSnapshot(GetSnapshot().IsValid());
@@ -306,7 +236,6 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
306236
IEventHandle::FlagTrackDelivery,
307237
0,
308238
ExecuterSpan.GetTraceId());
309-
MakeResponseAndPassAway();
310239
return;
311240
} else if (Request.UseImmediateEffects) {
312241
Become(&TKqpDataExecuter::FinalizeState);
@@ -378,9 +307,9 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
378307
MakeResponseAndPassAway();
379308
}
380309

381-
void HandleFinalize(TEvents::TEvUndelivered::TPtr&) {
382-
auto issue = YqlIssue({}, TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE, "Buffer actor isn't available.");
383-
ReplyErrorAndDie(Ydb::StatusIds::UNAVAILABLE, issue);
310+
void HandleFinalize(TEvents::TEvUndelivered::TPtr& ev) {
311+
AFL_ENSURE(ev->Sender == BufferActorId);
312+
LOG_W("Got Undelivered from BufferActor: " << ev->Sender);
384313
}
385314

386315
void MakeResponseAndPassAway() {
@@ -1273,6 +1202,9 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
12731202

12741203
void Handle(TEvKqpBuffer::TEvError::TPtr& ev) {
12751204
auto& msg = *ev->Get();
1205+
if (msg.Stats && Stats) {
1206+
Stats->AddBufferStats(std::move(*msg.Stats));
1207+
}
12761208
TBase::HandleAbortExecution(msg.StatusCode, msg.Issues, false);
12771209
}
12781210

@@ -2820,6 +2752,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
28202752
}
28212753

28222754
if (Planner->GetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) {
2755+
FillLocksFromExtraData();
28232756
PassAway();
28242757
}
28252758
}
@@ -2829,6 +2762,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
28292762
LOG_I("Timed out on waiting for Compute Actors to finish - forcing shutdown. Sender: " << ev->Sender);
28302763

28312764
if (ev->Sender == SelfId()) {
2765+
FillLocksFromExtraData();
28322766
PassAway();
28332767
}
28342768
}
@@ -2840,6 +2774,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
28402774

28412775
// In case of external timeout the response is already sent to the client - no need to wait for stats.
28422776
if (statusCode == Ydb::StatusIds::TIMEOUT) {
2777+
FillLocksFromExtraData();
28432778
LOG_I("External timeout while waiting for Compute Actors to finish - forcing shutdown. Sender: " << ev->Sender);
28442779
PassAway();
28452780
}
@@ -2944,6 +2879,80 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
29442879
}
29452880
}
29462881

2882+
void FillLocksFromExtraData() {
2883+
auto addLocks = [this](const ui64 taskId, const auto& data) {
2884+
if (data.GetData().template Is<NKikimrTxDataShard::TEvKqpInputActorResultInfo>()) {
2885+
NKikimrTxDataShard::TEvKqpInputActorResultInfo info;
2886+
YQL_ENSURE(data.GetData().UnpackTo(&info), "Failed to unpack settings");
2887+
NDataIntegrity::LogIntegrityTrails("InputActorResult", Request.UserTraceId, TxId, info, TlsActivationContext->AsActorContext());
2888+
for (auto& lock : info.GetLocks()) {
2889+
if (!TxManager) {
2890+
Locks.push_back(lock);
2891+
}
2892+
2893+
const auto& task = TasksGraph.GetTask(taskId);
2894+
const auto& stageInfo = TasksGraph.GetStageInfo(task.StageId);
2895+
ShardIdToTableInfo->Add(lock.GetDataShard(), stageInfo.Meta.TableKind == ETableKind::Olap, stageInfo.Meta.TablePath);
2896+
2897+
if (TxManager) {
2898+
TxManager->AddShard(lock.GetDataShard(), stageInfo.Meta.TableKind == ETableKind::Olap, stageInfo.Meta.TablePath);
2899+
TxManager->AddAction(lock.GetDataShard(), IKqpTransactionManager::EAction::READ);
2900+
TxManager->AddLock(lock.GetDataShard(), lock);
2901+
}
2902+
}
2903+
2904+
if (!BatchOperationSettings.Empty() && info.HasBatchOperationMaxKey()) {
2905+
if (ResponseEv->BatchOperationMaxKeys.empty()) {
2906+
for (auto keyId : info.GetBatchOperationKeyIds()) {
2907+
ResponseEv->BatchOperationKeyIds.push_back(keyId);
2908+
}
2909+
}
2910+
2911+
ResponseEv->BatchOperationMaxKeys.emplace_back(info.GetBatchOperationMaxKey());
2912+
}
2913+
} else if (data.GetData().template Is<NKikimrKqp::TEvKqpOutputActorResultInfo>()) {
2914+
NKikimrKqp::TEvKqpOutputActorResultInfo info;
2915+
YQL_ENSURE(data.GetData().UnpackTo(&info), "Failed to unpack settings");
2916+
NDataIntegrity::LogIntegrityTrails("OutputActorResult", Request.UserTraceId, TxId, info, TlsActivationContext->AsActorContext());
2917+
for (auto& lock : info.GetLocks()) {
2918+
if (!TxManager) {
2919+
Locks.push_back(lock);
2920+
}
2921+
2922+
const auto& task = TasksGraph.GetTask(taskId);
2923+
const auto& stageInfo = TasksGraph.GetStageInfo(task.StageId);
2924+
ShardIdToTableInfo->Add(lock.GetDataShard(), stageInfo.Meta.TableKind == ETableKind::Olap, stageInfo.Meta.TablePath);
2925+
if (TxManager) {
2926+
YQL_ENSURE(stageInfo.Meta.TableKind == ETableKind::Olap);
2927+
IKqpTransactionManager::TActionFlags flags = IKqpTransactionManager::EAction::WRITE;
2928+
if (info.GetHasRead()) {
2929+
flags |= IKqpTransactionManager::EAction::READ;
2930+
}
2931+
2932+
TxManager->AddShard(lock.GetDataShard(), stageInfo.Meta.TableKind == ETableKind::Olap, stageInfo.Meta.TablePath);
2933+
TxManager->AddAction(lock.GetDataShard(), flags);
2934+
TxManager->AddLock(lock.GetDataShard(), lock);
2935+
}
2936+
}
2937+
}
2938+
};
2939+
2940+
for (auto& [_, extraData] : ExtraData) {
2941+
for (const auto& source : extraData.Data.GetSourcesExtraData()) {
2942+
addLocks(extraData.TaskId, source);
2943+
}
2944+
for (const auto& transform : extraData.Data.GetInputTransformsData()) {
2945+
addLocks(extraData.TaskId, transform);
2946+
}
2947+
for (const auto& sink : extraData.Data.GetSinksExtraData()) {
2948+
addLocks(extraData.TaskId, sink);
2949+
}
2950+
if (extraData.Data.HasComputeExtraData()) {
2951+
addLocks(extraData.TaskId, extraData.Data.GetComputeExtraData());
2952+
}
2953+
}
2954+
}
2955+
29472956
private:
29482957
TShardIdToTableInfoPtr ShardIdToTableInfo;
29492958

0 commit comments

Comments
 (0)