Skip to content

Commit d5e1860

Browse files
authored
Better handling of active requests inside node state (#28825)
1 parent 802c5cd commit d5e1860

20 files changed

+331
-333
lines changed

ydb/core/kqp/compute_actor/kqp_compute_actor.cpp

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -152,19 +152,17 @@ void TShardsScanningPolicy::FillRequestScanFeatures(const NKikimrTxDataShard::TK
152152
}
153153

154154
TConclusionStatus TCPULimits::DeserializeFromProto(const NKikimrKqp::TEvStartKqpTasksRequest& config) {
155+
const static auto maxThreadsCount = TActivationContext::ActorSystem()->GetPoolMaxThreadsCount(TActivationContext::AsActorContext().SelfID.PoolID());
155156
const auto share = config.GetPoolMaxCpuShare();
156157
if (share <= 0 || 1 < share) {
157158
return TConclusionStatus::Fail("cpu share have to be in (0, 1] interval");
158159
}
159-
NActors::TExecutorPoolStats poolStats;
160-
TVector<NActors::TExecutorThreadStats> threadsStats;
161-
TActivationContext::ActorSystem()->GetPoolStats(TActivationContext::AsActorContext().SelfID.PoolID(), poolStats, threadsStats);
162-
CPUGroupThreadsLimit = Max<ui64>(poolStats.MaxThreadCount, 1) * share;
160+
CPUGroupThreadsLimit = Max<ui64>(1, maxThreadsCount) * share;
163161
CPUGroupName = config.GetPoolId();
164162
return TConclusionStatus::Success();
165163
}
166164

167-
}
165+
} // namespace NKqp
168166
} // namespace NKikimr
169167

170168
namespace NKikimr::NKqp {

ydb/core/kqp/compute_actor/kqp_compute_actor.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,8 @@ class TCPULimits {
5656
};
5757

5858
IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NYql::NDqProto::TDqTask* task,
59-
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
60-
const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits,
61-
NWilson::TTraceId traceId,
59+
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const NYql::NDq::TComputeRuntimeSettings& settings,
60+
const NYql::NDq::TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId,
6261
TIntrusivePtr<NActors::TProtoArenaHolder> arena,
6362
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings,
6463
NScheduler::TSchedulableActorOptions schedulableOptions,

ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#include "kqp_compute_actor.h"
33

44
#include <ydb/core/kqp/common/kqp_resolve.h>
5+
#include <ydb/core/kqp/node_service/kqp_node_state.h>
56
#include <ydb/core/kqp/rm_service/kqp_resource_estimation.h>
67

78
namespace NKikimr::NKqp::NComputeActor {
@@ -11,24 +12,17 @@ struct TMemoryQuotaManager : public NYql::NDq::TGuaranteeQuotaManager {
1112

1213
TMemoryQuotaManager(std::shared_ptr<NRm::IKqpResourceManager> resourceManager
1314
, NRm::EKqpMemoryPool memoryPool
14-
, std::shared_ptr<IKqpNodeState> state
1515
, TIntrusivePtr<NRm::TTxState> tx
1616
, TIntrusivePtr<NRm::TTaskState> task
1717
, ui64 limit)
1818
: NYql::NDq::TGuaranteeQuotaManager(limit, limit)
1919
, ResourceManager(std::move(resourceManager))
2020
, MemoryPool(memoryPool)
21-
, State(std::move(state))
2221
, Tx(std::move(tx))
2322
, Task(std::move(task))
24-
{
25-
}
23+
{}
2624

2725
~TMemoryQuotaManager() override {
28-
if (State) {
29-
State->OnTaskTerminate(Tx->TxId, Task->TaskId, Success);
30-
}
31-
3226
ResourceManager->FreeResources(Tx, Task);
3327
}
3428

@@ -71,7 +65,6 @@ struct TMemoryQuotaManager : public NYql::NDq::TGuaranteeQuotaManager {
7165

7266
std::shared_ptr<NRm::IKqpResourceManager> ResourceManager;
7367
NRm::EKqpMemoryPool MemoryPool;
74-
std::shared_ptr<IKqpNodeState> State;
7568
TIntrusivePtr<NRm::TTxState> Tx;
7669
TIntrusivePtr<NRm::TTaskState> Task;
7770
bool Success = true;
@@ -169,7 +162,6 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
169162
memoryLimits.MemoryQuotaManager = std::make_shared<TMemoryQuotaManager>(
170163
ResourceManager_,
171164
args.MemoryPool,
172-
std::move(args.State),
173165
std::move(args.TxInfo),
174166
std::move(task),
175167
limit);
@@ -193,12 +185,14 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
193185
}
194186

195187
NYql::NDq::IMemoryQuotaManager::TWeakPtr memoryQuotaManager = memoryLimits.MemoryQuotaManager;
196-
runtimeSettings.TerminateHandler = [memoryQuotaManager]
188+
runtimeSettings.TerminateHandler = [memoryQuotaManager, state=args.State, txId=args.TxId, taskId=args.Task->GetId()]
197189
(bool success, const NYql::TIssues& issues) {
198-
auto manager = memoryQuotaManager.lock();
199-
if (manager) {
190+
if (auto manager = memoryQuotaManager.lock()) {
200191
static_cast<TMemoryQuotaManager*>(manager.get())->TerminateHandler(success, issues);
201192
}
193+
if (state) {
194+
state->OnTaskFinished(txId, taskId, success);
195+
}
202196
};
203197

204198
NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta meta;
@@ -224,8 +218,7 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
224218
YQL_ENSURE(args.ComputesByStages);
225219
auto& info = args.ComputesByStages->UpsertTaskWithScan(*args.Task, meta);
226220
IActor* computeActor = CreateKqpScanComputeActor(
227-
args.ExecuterId, args.TxId,
228-
args.Task, AsyncIoFactory, runtimeSettings, memoryLimits,
221+
args.ExecuterId, args.TxId, args.Task, AsyncIoFactory, runtimeSettings, memoryLimits,
229222
std::move(args.TraceId), std::move(args.Arena),
230223
std::move(schedulableOptions), args.BlockTrackingMode);
231224
TActorId result = TlsActivationContext->Register(computeActor);
@@ -236,8 +229,10 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
236229
if (!args.SerializedGUCSettings.empty()) {
237230
GUCSettings = std::make_shared<TGUCSettings>(args.SerializedGUCSettings);
238231
}
239-
IActor* computeActor = ::NKikimr::NKqp::CreateKqpComputeActor(args.ExecuterId, args.TxId, args.Task, AsyncIoFactory,
240-
runtimeSettings, memoryLimits, std::move(args.TraceId), std::move(args.Arena), FederatedQuerySetup, GUCSettings,
232+
IActor* computeActor = NKqp::CreateKqpComputeActor(
233+
args.ExecuterId, args.TxId, args.Task, AsyncIoFactory, runtimeSettings, memoryLimits,
234+
std::move(args.TraceId), std::move(args.Arena),
235+
FederatedQuerySetup, GUCSettings,
241236
std::move(schedulableOptions), args.BlockTrackingMode, std::move(args.UserToken), args.Database);
242237
return args.ShareMailbox ? TlsActivationContext->AsActorContext().RegisterWithSameMailbox(computeActor) :
243238
TlsActivationContext->AsActorContext().Register(computeActor);

ydb/core/kqp/compute_actor/kqp_compute_actor_factory.h

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
namespace NKikimr::NKqp {
1313
struct TKqpFederatedQuerySetup;
14+
class TNodeState;
1415
}
1516

1617
namespace NKikimr::NKqp::NComputeActor {
@@ -92,13 +93,6 @@ class TComputeStagesWithScan {
9293
}
9394
};
9495

95-
struct IKqpNodeState {
96-
virtual ~IKqpNodeState() = default;
97-
98-
virtual void OnTaskTerminate(ui64 txId, ui64 taskId, bool success) = 0;
99-
};
100-
101-
10296
struct IKqpNodeComputeActorFactory {
10397
virtual ~IKqpNodeComputeActorFactory() = default;
10498

@@ -127,7 +121,7 @@ struct IKqpNodeComputeActorFactory {
127121
const NKikimrConfig::TTableServiceConfig::EBlockTrackingMode BlockTrackingMode;
128122

129123
TComputeStagesWithScan* ComputesByStages = nullptr;
130-
std::shared_ptr<IKqpNodeState> State = nullptr;
124+
std::shared_ptr<TNodeState> State = nullptr;
131125
TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
132126
TString Database;
133127

ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,6 @@ void TKqpScanComputeActor::Handle(TEvScanExchange::TEvTerminateFromFetcher::TPtr
174174
ALS_DEBUG(NKikimrServices::KQP_COMPUTE) << "TEvTerminateFromFetcher: " << ev->Sender << "/" << SelfId();
175175
TBase::InternalError(ev->Get()->GetStatusCode(), ev->Get()->GetIssues());
176176
State = ev->Get()->GetState();
177-
DoTerminateImpl();
178177
}
179178

180179
void TKqpScanComputeActor::Handle(TEvScanExchange::TEvSendData::TPtr& ev) {

ydb/core/kqp/compute_actor/kqp_scan_compute_actor.h

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,10 +65,6 @@ class TKqpScanComputeActor: public NScheduler::TSchedulableComputeActorBase<TKqp
6565
const EBlockTrackingMode BlockTrackingMode;
6666

6767
public:
68-
~TKqpScanComputeActor() override {
69-
DoTerminateImpl();
70-
}
71-
7268
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
7369
return NKikimrServices::TActivity::KQP_SCAN_COMPUTE_ACTOR;
7470
}

0 commit comments

Comments
 (0)