@@ -901,7 +901,11 @@ class TKqpReadActor : public TActorBootstrapped<TKqpReadActor>, public NYql::NDq
901901 Counters->CreatedIterators ->Inc ();
902902 ReadIdByTabletId[state->TabletId ].push_back (id);
903903
904- Send (PipeCacheId, new TEvPipeCache::TEvForward (ev.Release (), state->TabletId , true ),
904+ bool newPipe = HasEstablishedPipe.insert (state->TabletId ).second ;
905+ Send (PipeCacheId, new TEvPipeCache::TEvForward (
906+ ev.Release (), state->TabletId , TEvPipeCache::TEvForwardOptions{
907+ .AutoConnect = newPipe,
908+ .Subscribe = newPipe}),
905909 IEventHandle::FlagTrackDelivery, 0 , ReadActorSpan.GetTraceId ());
906910
907911 if (!FirstShardStarted) {
@@ -1088,6 +1092,7 @@ class TKqpReadActor : public TActorBootstrapped<TKqpReadActor>, public NYql::NDq
10881092 void HandleError (TEvPipeCache::TEvDeliveryProblem::TPtr& ev) {
10891093 auto & msg = *ev->Get ();
10901094
1095+ HasEstablishedPipe.erase (msg.TabletId );
10911096 TVector<ui32> reads;
10921097 reads = ReadIdByTabletId[msg.TabletId ];
10931098 CA_LOG_W (" Got EvDeliveryProblem, TabletId: " << msg.TabletId << " , NotDelivered: " << msg.NotDelivered );
@@ -1400,7 +1405,10 @@ class TKqpReadActor : public TActorBootstrapped<TKqpReadActor>, public NYql::NDq
14001405 }
14011406 Counters->SentIteratorAcks ->Inc ();
14021407 CA_LOG_D (" sending ack for read #" << id << " limit " << limit << " seqno = " << record.GetSeqNo ());
1403- Send (PipeCacheId, new TEvPipeCache::TEvForward (request.Release (), Reads[id].Shard ->TabletId , true ),
1408+ bool newPipe = HasEstablishedPipe.insert (Reads[id].Shard ->TabletId ).second ;
1409+ Send (PipeCacheId, new TEvPipeCache::TEvForward (request.Release (), Reads[id].Shard ->TabletId , TEvPipeCache::TEvForwardOptions{
1410+ .AutoConnect = newPipe,
1411+ .Subscribe = newPipe}),
14041412 IEventHandle::FlagTrackDelivery);
14051413
14061414 if (auto delay = ShardTimeout ()) {
@@ -1654,6 +1662,8 @@ class TKqpReadActor : public TActorBootstrapped<TKqpReadActor>, public NYql::NDq
16541662 ui64 SeqNo;
16551663 ui64 RowIndex;
16561664 };
1665+
1666+ THashSet<ui64> HasEstablishedPipe;
16571667 THashMap<TString, TDuplicationStats> DuplicateCheckStats;
16581668 TVector<TResultColumn> DuplicateCheckExtraColumns;
16591669 TVector<ui32> DuplicateCheckColumnRemap;
0 commit comments