Skip to content

Commit ede8cde

Browse files
committed
Fix evwrite rollback wait (#29094)
1 parent 8d07770 commit ede8cde

File tree

2 files changed

+13
-7
lines changed

2 files changed

+13
-7
lines changed

ydb/core/kqp/common/kqp_tx_manager.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ class TKqpTransactionManager : public IKqpTransactionManager {
3636
: CollectOnly(collectOnly) {}
3737

3838
void AddShard(ui64 shardId, bool isOlap, const TString& path) override {
39-
Y_ABORT_UNLESS(State == ETransactionState::COLLECTING || State == ETransactionState::ERROR);
39+
AFL_ENSURE(State == ETransactionState::COLLECTING || State == ETransactionState::ERROR);
4040
ShardsIds.insert(shardId);
4141
auto& shardInfo = ShardsInfo[shardId];
4242
shardInfo.IsOlap = isOlap;
@@ -48,7 +48,7 @@ class TKqpTransactionManager : public IKqpTransactionManager {
4848
}
4949

5050
void AddAction(ui64 shardId, ui8 action) override {
51-
Y_ABORT_UNLESS(State == ETransactionState::COLLECTING || State == ETransactionState::ERROR);
51+
AFL_ENSURE(State == ETransactionState::COLLECTING || State == ETransactionState::ERROR);
5252
ShardsInfo.at(shardId).Flags |= action;
5353
if (action & EAction::WRITE) {
5454
ReadOnly = false;
@@ -57,7 +57,7 @@ class TKqpTransactionManager : public IKqpTransactionManager {
5757
}
5858

5959
void AddTopic(ui64 topicId, const TString& path) override {
60-
Y_ABORT_UNLESS(State == ETransactionState::COLLECTING || State == ETransactionState::ERROR);
60+
AFL_ENSURE(State == ETransactionState::COLLECTING || State == ETransactionState::ERROR);
6161
ShardsIds.insert(topicId);
6262
auto& shardInfo = ShardsInfo[topicId];
6363

@@ -83,7 +83,7 @@ class TKqpTransactionManager : public IKqpTransactionManager {
8383
}
8484

8585
bool AddLock(ui64 shardId, const NKikimrDataEvents::TLock& lockProto) override {
86-
Y_ABORT_UNLESS(State == ETransactionState::COLLECTING || State == ETransactionState::ERROR);
86+
AFL_ENSURE(State == ETransactionState::COLLECTING || State == ETransactionState::ERROR);
8787
TKqpLock lock(lockProto);
8888
bool isError = (lock.Proto.GetCounter() >= NKikimr::TSysTables::TLocksTable::TLock::ErrorMin);
8989
bool isInvalidated = (lock.Proto.GetCounter() == NKikimr::TSysTables::TLocksTable::TLock::ErrorAlreadyBroken)

ydb/core/kqp/runtime/kqp_write_actor.cpp

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2985,14 +2985,18 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
29852985
}
29862986
}
29872987

2988-
ForEachWriteActor([](TKqpTableWriteActor* actor, const TActorId) {
2989-
actor->Terminate();
2990-
});
2988+
Clear();
29912989

29922990
Send(MakePipePerNodeCacheID(false), new TEvPipeCache::TEvUnlink(0));
29932991
TActorBootstrapped<TKqpBufferWriteActor>::PassAway();
29942992
}
29952993

2994+
void Clear() {
2995+
ForEachWriteActor([](TKqpTableWriteActor* actor, const TActorId) {
2996+
actor->Terminate();
2997+
});
2998+
}
2999+
29963000
void Handle(TEvTxProxy::TEvProposeTransactionStatus::TPtr &ev) {
29973001
TEvTxProxy::TEvProposeTransactionStatus* res = ev->Get();
29983002
CA_LOG_D("Got transaction status, status: " << res->GetStatus());
@@ -3849,6 +3853,8 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
38493853
std::move(issues),
38503854
BuildStats()
38513855
});
3856+
3857+
Clear();
38523858
}
38533859

38543860
TString GetPathes(ui64 shardId) const {

0 commit comments

Comments
 (0)