Skip to content

Commit 670b77d

Browse files
authored
[Stable-25-3] Fix evwrite rollback wait (#29153)
2 parents 08a58b4 + cb6ea51 commit 670b77d

File tree

2 files changed

+18
-6
lines changed

2 files changed

+18
-6
lines changed

ydb/core/kqp/common/kqp_tx_manager.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ class TKqpTransactionManager : public IKqpTransactionManager {
3636
: CollectOnly(collectOnly) {}
3737

3838
void AddShard(ui64 shardId, bool isOlap, const TString& path) override {
39-
Y_ABORT_UNLESS(State == ETransactionState::COLLECTING || State == ETransactionState::ERROR);
39+
AFL_ENSURE(State == ETransactionState::COLLECTING || State == ETransactionState::ERROR);
4040
ShardsIds.insert(shardId);
4141
auto& shardInfo = ShardsInfo[shardId];
4242
shardInfo.IsOlap = isOlap;
@@ -48,7 +48,7 @@ class TKqpTransactionManager : public IKqpTransactionManager {
4848
}
4949

5050
void AddAction(ui64 shardId, ui8 action) override {
51-
Y_ABORT_UNLESS(State == ETransactionState::COLLECTING || State == ETransactionState::ERROR);
51+
AFL_ENSURE(State == ETransactionState::COLLECTING || State == ETransactionState::ERROR);
5252
ShardsInfo.at(shardId).Flags |= action;
5353
if (action & EAction::WRITE) {
5454
ReadOnly = false;
@@ -57,7 +57,7 @@ class TKqpTransactionManager : public IKqpTransactionManager {
5757
}
5858

5959
void AddTopic(ui64 topicId, const TString& path) override {
60-
Y_ABORT_UNLESS(State == ETransactionState::COLLECTING || State == ETransactionState::ERROR);
60+
AFL_ENSURE(State == ETransactionState::COLLECTING || State == ETransactionState::ERROR);
6161
ShardsIds.insert(topicId);
6262
auto& shardInfo = ShardsInfo[topicId];
6363

@@ -83,7 +83,7 @@ class TKqpTransactionManager : public IKqpTransactionManager {
8383
}
8484

8585
bool AddLock(ui64 shardId, const NKikimrDataEvents::TLock& lockProto) override {
86-
Y_ABORT_UNLESS(State == ETransactionState::COLLECTING || State == ETransactionState::ERROR);
86+
AFL_ENSURE(State == ETransactionState::COLLECTING || State == ETransactionState::ERROR);
8787
TKqpLock lock(lockProto);
8888
bool isError = (lock.Proto.GetCounter() >= NKikimr::TSysTables::TLocksTable::TLock::ErrorMin);
8989
bool isInvalidated = (lock.Proto.GetCounter() == NKikimr::TSysTables::TLocksTable::TLock::ErrorAlreadyBroken)

ydb/core/kqp/runtime/kqp_write_actor.cpp

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2985,12 +2985,22 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
29852985
}
29862986
}
29872987

2988+
Clear();
2989+
2990+
Send(MakePipePerNodeCacheID(false), new TEvPipeCache::TEvUnlink(0));
2991+
TActorBootstrapped<TKqpBufferWriteActor>::PassAway();
2992+
}
2993+
2994+
void Clear() {
29882995
ForEachWriteActor([](TKqpTableWriteActor* actor, const TActorId) {
29892996
actor->Terminate();
29902997
});
29912998

2992-
Send(MakePipePerNodeCacheID(false), new TEvPipeCache::TEvUnlink(0));
2993-
TActorBootstrapped<TKqpBufferWriteActor>::PassAway();
2999+
{
3000+
Y_ABORT_UNLESS(Alloc);
3001+
TGuard<NMiniKQL::TScopedAlloc> allocGuard(*Alloc);
3002+
WriteInfos.clear();
3003+
}
29943004
}
29953005

29963006
void Handle(TEvTxProxy::TEvProposeTransactionStatus::TPtr &ev) {
@@ -3849,6 +3859,8 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
38493859
std::move(issues),
38503860
BuildStats()
38513861
});
3862+
3863+
Clear();
38523864
}
38533865

38543866
TString GetPathes(ui64 shardId) const {

0 commit comments

Comments
 (0)