Skip to content

Commit e955f5a

Browse files
authored
fix potential hang in the read actor (#28982) (#29005)
2 parents aace197 + 428fd2d commit e955f5a

File tree

1 file changed

+12
-2
lines changed

1 file changed

+12
-2
lines changed

ydb/core/kqp/runtime/kqp_read_actor.cpp

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -901,7 +901,11 @@ class TKqpReadActor : public TActorBootstrapped<TKqpReadActor>, public NYql::NDq
901901
Counters->CreatedIterators->Inc();
902902
ReadIdByTabletId[state->TabletId].push_back(id);
903903

904-
Send(PipeCacheId, new TEvPipeCache::TEvForward(ev.Release(), state->TabletId, true),
904+
bool newPipe = HasEstablishedPipe.insert(state->TabletId).second;
905+
Send(PipeCacheId, new TEvPipeCache::TEvForward(
906+
ev.Release(), state->TabletId, TEvPipeCache::TEvForwardOptions{
907+
.AutoConnect = newPipe,
908+
.Subscribe = newPipe}),
905909
IEventHandle::FlagTrackDelivery, 0, ReadActorSpan.GetTraceId());
906910

907911
if (!FirstShardStarted) {
@@ -1088,6 +1092,7 @@ class TKqpReadActor : public TActorBootstrapped<TKqpReadActor>, public NYql::NDq
10881092
void HandleError(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) {
10891093
auto& msg = *ev->Get();
10901094

1095+
HasEstablishedPipe.erase(msg.TabletId);
10911096
TVector<ui32> reads;
10921097
reads = ReadIdByTabletId[msg.TabletId];
10931098
CA_LOG_W("Got EvDeliveryProblem, TabletId: " << msg.TabletId << ", NotDelivered: " << msg.NotDelivered);
@@ -1400,7 +1405,10 @@ class TKqpReadActor : public TActorBootstrapped<TKqpReadActor>, public NYql::NDq
14001405
}
14011406
Counters->SentIteratorAcks->Inc();
14021407
CA_LOG_D("sending ack for read #" << id << " limit " << limit << " seqno = " << record.GetSeqNo());
1403-
Send(PipeCacheId, new TEvPipeCache::TEvForward(request.Release(), Reads[id].Shard->TabletId, true),
1408+
bool newPipe = HasEstablishedPipe.insert(Reads[id].Shard->TabletId).second;
1409+
Send(PipeCacheId, new TEvPipeCache::TEvForward(request.Release(), Reads[id].Shard->TabletId, TEvPipeCache::TEvForwardOptions{
1410+
.AutoConnect = newPipe,
1411+
.Subscribe = newPipe}),
14041412
IEventHandle::FlagTrackDelivery);
14051413

14061414
if (auto delay = ShardTimeout()) {
@@ -1654,6 +1662,8 @@ class TKqpReadActor : public TActorBootstrapped<TKqpReadActor>, public NYql::NDq
16541662
ui64 SeqNo;
16551663
ui64 RowIndex;
16561664
};
1665+
1666+
THashSet<ui64> HasEstablishedPipe;
16571667
THashMap<TString, TDuplicationStats> DuplicateCheckStats;
16581668
TVector<TResultColumn> DuplicateCheckExtraColumns;
16591669
TVector<ui32> DuplicateCheckColumnRemap;

0 commit comments

Comments
 (0)