Skip to content

Commit 7f593bd

Browse files
fix: use condition variables instead of busy waits in worker threads
Resolves: NEO-16085, GSD-11678, HSD-14025819208 Signed-off-by: Igor Venevtsev <igor.venevtsev@intel.com>
1 parent dc8dc50 commit 7f593bd

23 files changed

+313
-172
lines changed

shared/source/command_stream/command_stream_receiver.cpp

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -673,13 +673,6 @@ void CommandStreamReceiver::downloadAllocation(GraphicsAllocation &gfxAllocation
673673
}
674674
}
675675

676-
void CommandStreamReceiver::startControllingDirectSubmissions() {
677-
auto controller = this->executionEnvironment.directSubmissionController.get();
678-
if (controller) {
679-
controller->startControlling();
680-
}
681-
}
682-
683676
bool CommandStreamReceiver::enqueueWaitForPagingFence(uint64_t pagingFenceValue) {
684677
auto controller = this->executionEnvironment.directSubmissionController.get();
685678
if (this->isAnyDirectSubmissionEnabled() && controller) {

shared/source/command_stream/command_stream_receiver.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -352,8 +352,6 @@ class CommandStreamReceiver : NEO::NonCopyableAndNonMovableClass {
352352

353353
uint32_t getRootDeviceIndex() const { return rootDeviceIndex; }
354354

355-
MOCKABLE_VIRTUAL void startControllingDirectSubmissions();
356-
357355
MOCKABLE_VIRTUAL bool isAnyDirectSubmissionEnabled() const {
358356
return this->isDirectSubmissionEnabled() || isBlitterDirectSubmissionEnabled();
359357
}

shared/source/command_stream/command_stream_receiver_hw_base.inl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1304,7 +1304,7 @@ SubmissionStatus CommandStreamReceiverHw<GfxFamily>::flushSmallTask(LinearStream
13041304
this->latestSentTaskCount = taskCount + 1;
13051305
auto submissionStatus = flushHandler(batchBuffer, getResidencyAllocations());
13061306
if (submissionStatus == SubmissionStatus::success) {
1307-
taskCount++;
1307+
++taskCount;
13081308
}
13091309
return submissionStatus;
13101310
}
@@ -1473,7 +1473,6 @@ inline bool CommandStreamReceiverHw<GfxFamily>::initDirectSubmission() {
14731473
if (directSubmissionController) {
14741474
directSubmissionController->registerDirectSubmission(this);
14751475
}
1476-
this->startControllingDirectSubmissions();
14771476
if (this->isUpdateTagFromWaitEnabled()) {
14781477
this->overrideDispatchPolicy(DispatchMode::immediateDispatch);
14791478
}
@@ -1486,6 +1485,7 @@ inline bool CommandStreamReceiverHw<GfxFamily>::initDirectSubmission() {
14861485
}
14871486
}
14881487
}
1488+
14891489
return ret;
14901490
}
14911491

shared/source/direct_submission/direct_submission_controller.cpp

Lines changed: 29 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -66,50 +66,40 @@ void DirectSubmissionController::startThread() {
6666
}
6767

6868
void DirectSubmissionController::stopThread() {
69-
runControlling.store(false);
70-
keepControlling.store(false);
69+
{
70+
std::lock_guard<std::mutex> lock(condVarMutex);
71+
keepControlling.store(false);
72+
condVar.notify_one();
73+
}
7174
if (directSubmissionControllingThread) {
7275
directSubmissionControllingThread->join();
7376
directSubmissionControllingThread.reset();
7477
}
7578
}
7679

77-
void DirectSubmissionController::startControlling() {
78-
this->runControlling.store(true);
79-
}
80-
8180
void *DirectSubmissionController::controlDirectSubmissionsState(void *self) {
8281
auto controller = reinterpret_cast<DirectSubmissionController *>(self);
8382

84-
while (!controller->runControlling.load()) {
85-
if (!controller->keepControlling.load()) {
86-
return nullptr;
87-
}
88-
std::unique_lock<std::mutex> lock(controller->condVarMutex);
89-
controller->handlePagingFenceRequests(lock, false);
90-
91-
auto isControllerNotified = controller->sleep(lock);
92-
if (isControllerNotified) {
93-
controller->handlePagingFenceRequests(lock, false);
94-
}
95-
}
96-
9783
controller->timeSinceLastCheck = controller->getCpuTimestamp();
9884
controller->lastHangCheckTime = std::chrono::high_resolution_clock::now();
99-
while (true) {
100-
if (!controller->keepControlling.load()) {
101-
return nullptr;
102-
}
103-
std::unique_lock<std::mutex> lock(controller->condVarMutex);
104-
controller->handlePagingFenceRequests(lock, true);
10585

106-
auto isControllerNotified = controller->sleep(lock);
107-
if (isControllerNotified) {
108-
controller->handlePagingFenceRequests(lock, true);
109-
}
86+
while (controller->keepControlling.load()) {
87+
std::unique_lock<std::mutex> lock(controller->condVarMutex);
88+
controller->wait(lock);
89+
controller->handlePagingFenceRequests(lock);
90+
controller->sleep(lock);
91+
controller->handlePagingFenceRequests(lock);
11092
lock.unlock();
11193
controller->checkNewSubmissions();
11294
}
95+
96+
return nullptr;
97+
}
98+
99+
void DirectSubmissionController::notifyNewSubmission(const CommandStreamReceiver *csr) {
100+
++activeSubmissionsCount;
101+
directSubmissions[const_cast<CommandStreamReceiver *>(csr)].isActive = true;
102+
condVar.notify_one();
113103
}
114104

115105
void DirectSubmissionController::checkNewSubmissions() {
@@ -121,9 +111,11 @@ void DirectSubmissionController::checkNewSubmissions() {
121111
std::lock_guard<std::mutex> lock(this->directSubmissionsMutex);
122112
bool shouldRecalculateTimeout = false;
123113
std::optional<TaskCountType> bcsTaskCount{};
124-
for (auto &directSubmission : this->directSubmissions) {
125-
auto csr = directSubmission.first;
126-
auto &state = directSubmission.second;
114+
for (auto &[csr, state] : directSubmissions) {
115+
if (!state.isActive) {
116+
continue;
117+
}
118+
127119
auto isBcs = EngineHelpers::isBcs(csr->getOsContext().getEngineType());
128120
if (timeoutMode == TimeoutElapsedMode::bcsOnly && !isBcs) {
129121
continue;
@@ -143,8 +135,10 @@ void DirectSubmissionController::checkNewSubmissions() {
143135
auto lock = csr->obtainUniqueOwnership();
144136
if (!isCsrIdleDetectionEnabled || (isDirectSubmissionIdle(csr, lock) && isCopyEngineIdle)) {
145137
csr->stopDirectSubmission(false, false);
138+
state.isActive = false;
146139
state.isStopped = true;
147140
shouldRecalculateTimeout = true;
141+
--activeSubmissionsCount;
148142
}
149143
state.taskCount = csr->peekTaskCount();
150144
} else {
@@ -282,13 +276,13 @@ void DirectSubmissionController::recalculateTimeout() {
282276
}
283277

284278
void DirectSubmissionController::enqueueWaitForPagingFence(CommandStreamReceiver *csr, uint64_t pagingFenceValue) {
285-
std::lock_guard lock(this->condVarMutex);
279+
std::lock_guard lock(condVarMutex);
286280
pagingFenceRequests.push({csr, pagingFenceValue});
287281
condVar.notify_one();
288282
}
289283

290284
void DirectSubmissionController::drainPagingFenceQueue() {
291-
std::lock_guard lock(this->condVarMutex);
285+
std::lock_guard lock(condVarMutex);
292286

293287
while (!pagingFenceRequests.empty()) {
294288
auto request = pagingFenceRequests.front();
@@ -297,18 +291,13 @@ void DirectSubmissionController::drainPagingFenceQueue() {
297291
}
298292
}
299293

300-
void DirectSubmissionController::handlePagingFenceRequests(std::unique_lock<std::mutex> &lock, bool checkForNewSubmissions) {
294+
void DirectSubmissionController::handlePagingFenceRequests(std::unique_lock<std::mutex> &lock) {
301295
UNRECOVERABLE_IF(!lock.owns_lock())
302296
while (!pagingFenceRequests.empty()) {
303297
auto request = pagingFenceRequests.front();
304298
pagingFenceRequests.pop();
305299
lock.unlock();
306-
307300
request.csr->unblockPagingFenceSemaphore(request.pagingFenceValue);
308-
if (checkForNewSubmissions) {
309-
checkNewSubmissions();
310-
}
311-
312301
lock.lock();
313302
}
314303
}

shared/source/direct_submission/direct_submission_controller.h

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,24 +61,26 @@ class DirectSubmissionController {
6161
void unregisterDirectSubmission(CommandStreamReceiver *csr);
6262

6363
void startThread();
64-
void startControlling();
6564
void stopThread();
6665

6766
static bool isSupported();
6867

6968
void enqueueWaitForPagingFence(CommandStreamReceiver *csr, uint64_t pagingFenceValue);
7069
void drainPagingFenceQueue();
70+
void notifyNewSubmission(const CommandStreamReceiver *csr);
7171

7272
protected:
7373
struct DirectSubmissionState {
7474
DirectSubmissionState(DirectSubmissionState &&other) noexcept {
75+
isActive = other.isActive.load();
7576
isStopped = other.isStopped.load();
7677
taskCount = other.taskCount.load();
7778
}
7879
DirectSubmissionState &operator=(const DirectSubmissionState &other) {
7980
if (this == &other) {
8081
return *this;
8182
}
83+
this->isActive = other.isActive.load();
8284
this->isStopped = other.isStopped.load();
8385
this->taskCount = other.taskCount.load();
8486
return *this;
@@ -90,15 +92,20 @@ class DirectSubmissionController {
9092
DirectSubmissionState(const DirectSubmissionState &other) = delete;
9193
DirectSubmissionState &operator=(DirectSubmissionState &&other) = delete;
9294

95+
std::atomic_bool isActive{false};
9396
std::atomic_bool isStopped{true};
9497
std::atomic<TaskCountType> taskCount{0};
9598
};
9699

97100
static void *controlDirectSubmissionsState(void *self);
98-
void checkNewSubmissions();
101+
MOCKABLE_VIRTUAL void checkNewSubmissions();
99102
bool isDirectSubmissionIdle(CommandStreamReceiver *csr, std::unique_lock<std::recursive_mutex> &csrLock);
100103
bool isCopyEngineOnDeviceIdle(uint32_t rootDeviceIndex, std::optional<TaskCountType> &bcsTaskCount);
101104
MOCKABLE_VIRTUAL bool sleep(std::unique_lock<std::mutex> &lock);
105+
bool waitPredicate() { return !keepControlling || !pagingFenceRequests.empty() || activeSubmissionsCount; }
106+
MOCKABLE_VIRTUAL void wait(std::unique_lock<std::mutex> &lock) {
107+
condVar.wait(lock, [&]() { return waitPredicate(); });
108+
}
102109
MOCKABLE_VIRTUAL SteadyClock::time_point getCpuTimestamp();
103110
MOCKABLE_VIRTUAL void overrideDirectSubmissionTimeouts(const ProductHelper &productHelper);
104111

@@ -107,7 +114,7 @@ class DirectSubmissionController {
107114
void updateLastSubmittedThrottle(QueueThrottle throttle);
108115
size_t getTimeoutParamsMapKey(QueueThrottle throttle, bool acLineStatus);
109116

110-
void handlePagingFenceRequests(std::unique_lock<std::mutex> &lock, bool checkForNewSubmissions);
117+
MOCKABLE_VIRTUAL void handlePagingFenceRequests(std::unique_lock<std::mutex> &lock);
111118
MOCKABLE_VIRTUAL TimeoutElapsedMode timeoutElapsed();
112119
std::chrono::microseconds getSleepValue() const { return std::chrono::microseconds(this->timeout / this->bcsTimeoutDivisor); }
113120

@@ -118,7 +125,7 @@ class DirectSubmissionController {
118125

119126
std::unique_ptr<Thread> directSubmissionControllingThread;
120127
std::atomic_bool keepControlling = true;
121-
std::atomic_bool runControlling = false;
128+
std::atomic_uint activeSubmissionsCount = 0;
122129

123130
SteadyClock::time_point timeSinceLastCheck{};
124131
SteadyClock::time_point lastTerminateCpuTimestamp{};
@@ -136,4 +143,4 @@ class DirectSubmissionController {
136143

137144
std::queue<WaitForPagingFenceRequest> pagingFenceRequests;
138145
};
139-
} // namespace NEO
146+
} // namespace NEO

shared/source/direct_submission/direct_submission_hw.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
#include "shared/source/command_stream/command_stream_receiver.h"
1111

1212
namespace NEO {
13-
DirectSubmissionInputParams::DirectSubmissionInputParams(const CommandStreamReceiver &commandStreamReceiver) : osContext(commandStreamReceiver.getOsContext()), rootDeviceEnvironment(commandStreamReceiver.peekRootDeviceEnvironment()), rootDeviceIndex(commandStreamReceiver.getRootDeviceIndex()) {
13+
DirectSubmissionInputParams::DirectSubmissionInputParams(const CommandStreamReceiver &commandStreamReceiver) : csr(commandStreamReceiver), osContext(commandStreamReceiver.getOsContext()), rootDeviceEnvironment(commandStreamReceiver.peekRootDeviceEnvironment()), rootDeviceIndex(commandStreamReceiver.getRootDeviceIndex()) {
1414
memoryManager = commandStreamReceiver.getMemoryManager();
1515
globalFenceAllocation = commandStreamReceiver.getGlobalFenceAllocation();
1616
workPartitionAllocation = commandStreamReceiver.getWorkPartitionAllocation();

shared/source/direct_submission/direct_submission_hw.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ class MemoryOperationsHandler;
5050

5151
struct DirectSubmissionInputParams : NonCopyableClass {
5252
DirectSubmissionInputParams(const CommandStreamReceiver &commandStreamReceiver);
53+
const CommandStreamReceiver &csr;
5354
OsContext &osContext;
5455
const RootDeviceEnvironment &rootDeviceEnvironment;
5556
MemoryManager *memoryManager = nullptr;
@@ -222,6 +223,7 @@ class DirectSubmissionHw {
222223
uint64_t gpuVaForPagingFenceSemaphore = 0u;
223224
uint64_t relaxedOrderingQueueSizeLimitValueVa = 0;
224225

226+
const CommandStreamReceiver &csr;
225227
OsContext &osContext;
226228
const uint32_t rootDeviceIndex;
227229
MemoryManager *memoryManager = nullptr;

shared/source/direct_submission/direct_submission_hw.inl

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include "shared/source/command_container/command_encoder.h"
99
#include "shared/source/command_stream/submissions_aggregator.h"
1010
#include "shared/source/debug_settings/debug_settings_manager.h"
11+
#include "shared/source/direct_submission/direct_submission_controller.h"
1112
#include "shared/source/direct_submission/direct_submission_hw.h"
1213
#include "shared/source/direct_submission/relaxed_ordering_helper.h"
1314
#include "shared/source/execution_environment/execution_environment.h"
@@ -33,7 +34,7 @@ namespace NEO {
3334

3435
template <typename GfxFamily, typename Dispatcher>
3536
DirectSubmissionHw<GfxFamily, Dispatcher>::DirectSubmissionHw(const DirectSubmissionInputParams &inputParams)
36-
: ringBuffers(RingBufferUse::initialRingBufferCount), osContext(inputParams.osContext), rootDeviceIndex(inputParams.rootDeviceIndex), rootDeviceEnvironment(inputParams.rootDeviceEnvironment) {
37+
: ringBuffers(RingBufferUse::initialRingBufferCount), csr(inputParams.csr), osContext(inputParams.osContext), rootDeviceIndex(inputParams.rootDeviceIndex), rootDeviceEnvironment(inputParams.rootDeviceEnvironment) {
3738
memoryManager = inputParams.memoryManager;
3839
globalFenceAllocation = inputParams.globalFenceAllocation;
3940
hwInfo = inputParams.rootDeviceEnvironment.getHardwareInfo();
@@ -581,6 +582,9 @@ template <typename GfxFamily, typename Dispatcher>
581582
bool DirectSubmissionHw<GfxFamily, Dispatcher>::submitCommandBufferToGpu(bool needStart, uint64_t gpuAddress, size_t size, bool needWait, const ResidencyContainer *allocationsForResidency) {
582583
if (needStart) {
583584
this->ringStart = this->submit(gpuAddress, size, allocationsForResidency);
585+
if (auto controller = rootDeviceEnvironment.executionEnvironment.directSubmissionController.get()) {
586+
controller->notifyNewSubmission(&csr);
587+
}
584588
return this->ringStart;
585589
} else {
586590
if (needWait) {

shared/source/memory_manager/unified_memory_manager.cpp

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ bool SVMAllocsManager::SvmAllocationCache::insert(size_t size, void *ptr, SvmAll
6969
return false;
7070
}
7171

72-
std::lock_guard<std::mutex> lock(this->mtx);
72+
std::unique_lock<std::mutex> lock(this->mtx);
7373
if (svmData->device ? svmData->device->shouldLimitAllocationsReuse() : memoryManager->shouldLimitAllocationsReuse()) {
7474
return false;
7575
}
@@ -101,8 +101,11 @@ bool SVMAllocsManager::SvmAllocationCache::insert(size_t size, void *ptr, SvmAll
101101
}
102102
svmData->isSavedForReuse = true;
103103
allocations.emplace(std::lower_bound(allocations.begin(), allocations.end(), size), size, ptr, svmData, waitForCompletion);
104-
if (memoryManager->peekExecutionEnvironment().unifiedMemoryReuseCleaner) {
105-
memoryManager->peekExecutionEnvironment().unifiedMemoryReuseCleaner->startThread();
104+
empty = false;
105+
if (auto usmReuseCleaner = this->memoryManager->peekExecutionEnvironment().unifiedMemoryReuseCleaner.get()) {
106+
lock.unlock();
107+
usmReuseCleaner->startThread();
108+
usmReuseCleaner->notifySvmAllocationsCacheUpdate();
106109
}
107110
}
108111
if (enablePerformanceLogging) {
@@ -112,6 +115,7 @@ bool SVMAllocsManager::SvmAllocationCache::insert(size_t size, void *ptr, SvmAll
112115
.operationType = CacheOperationType::insert,
113116
.isSuccess = isSuccess});
114117
}
118+
115119
return isSuccess;
116120
}
117121

@@ -183,6 +187,7 @@ void *SVMAllocsManager::SvmAllocationCache::get(size_t size, const UnifiedMemory
183187
svmAllocsManager->reinsertToAllocsForIndirectAccess(*allocationIter->svmData);
184188
}
185189
allocations.erase(allocationIter);
190+
empty = allocations.empty();
186191
return allocationPtr;
187192
}
188193
}
@@ -217,6 +222,7 @@ void SVMAllocsManager::SvmAllocationCache::trim() {
217222
svmAllocsManager->freeSVMAllocImpl(cachedAllocationInfo.allocation, FreePolicyType::blocking, cachedAllocationInfo.svmData);
218223
}
219224
this->allocations.clear();
225+
empty = true;
220226
}
221227

222228
void SVMAllocsManager::SvmAllocationCache::cleanup() {
@@ -301,6 +307,7 @@ void SVMAllocsManager::SvmAllocationCache::trimOldAllocs(std::chrono::high_resol
301307
if (trimAll) {
302308
std::erase_if(allocations, SvmCacheAllocationInfo::isMarkedForDelete);
303309
}
310+
empty = allocations.empty();
304311
}
305312

306313
SvmAllocationData *SVMAllocsManager::MapBasedAllocationTracker::get(const void *ptr) {

shared/source/memory_manager/unified_memory_manager.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,7 @@ class SVMAllocsManager {
203203
static bool allocUtilizationAllows(size_t requestedSize, size_t reuseCandidateSize);
204204
static bool alignmentAllows(void *ptr, size_t alignment);
205205
bool isInUse(SvmCacheAllocationInfo &cacheAllocInfo);
206+
bool isEmpty() { return empty; };
206207
void *get(size_t size, const UnifiedMemoryProperties &unifiedMemoryProperties);
207208
void trim();
208209
void trimOldAllocs(std::chrono::high_resolution_clock::time_point trimTimePoint, bool trimAll);
@@ -216,6 +217,7 @@ class SVMAllocsManager {
216217
MemoryManager *memoryManager = nullptr;
217218
bool enablePerformanceLogging = false;
218219
bool requireUpdatingAllocsForIndirectAccess = false;
220+
std::atomic_bool empty = true;
219221
};
220222

221223
enum class FreePolicyType : uint32_t {

0 commit comments

Comments
 (0)