Skip to content

Commit 14d9641

Browse files
committed
chore: improved release/abort behaviour
1 parent 2c7b5a9 commit 14d9641

File tree

2 files changed

+39
-45
lines changed

2 files changed

+39
-45
lines changed

packages/host/cpp/ThreadsafeFunction.cpp

Lines changed: 38 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,7 @@ ThreadSafeFunction::ThreadSafeFunction(
3636
threadFinalizeData_{threadFinalizeData},
3737
threadFinalizeCb_{threadFinalizeCb},
3838
context_{context},
39-
callJsCb_{callJsCb},
40-
refCount_{initialThreadCount} {
39+
callJsCb_{callJsCb} {
4140
if (jsFunc) {
4241
// Keep JS function alive across async hops; fatal here mirrors Node-API's
4342
// behavior when environment is irrecoverable.
@@ -135,48 +134,48 @@ napi_status ThreadSafeFunction::call(
135134
}
136135
// Hop to JS thread; we drain one item per hop to keep latency predictable
137136
// and avoid long monopolization of the JS queue.
138-
invoker->invokeAsync([this] {
137+
invoker->invokeAsync([self = shared_from_this()] {
139138
void* queuedData{nullptr};
140139
auto empty{false};
141140
{
142-
std::lock_guard lock{queueMutex_};
143-
if (!queue_.empty()) {
144-
queuedData = queue_.front();
145-
const auto size = queue_.size();
146-
queue_.pop();
147-
empty = queue_.empty();
148-
if (size == maxQueueSize_ && maxQueueSize_) {
149-
queueCv_.notify_one();
141+
std::lock_guard lock{self->queueMutex_};
142+
if (!self->queue_.empty()) {
143+
queuedData = self->queue_.front();
144+
const auto size = self->queue_.size();
145+
self->queue_.pop();
146+
empty = self->queue_.empty();
147+
if (size == self->maxQueueSize_ && self->maxQueueSize_) {
148+
self->queueCv_.notify_one();
150149
}
151150
}
152151
}
153-
if (queuedData && !aborted_) {
152+
if (queuedData && !self->aborted_) {
154153
// Prefer the user-provided callJsCb_ (Node-API compatible). If absent
155154
// but we have a JS function ref, call it directly with no args.
156-
if (callJsCb_) {
155+
if (self->callJsCb_) {
157156
napi_value fn{nullptr};
158-
if (jsFuncRef_) {
159-
napi_get_reference_value(env_, jsFuncRef_, &fn);
157+
if (self->jsFuncRef_) {
158+
napi_get_reference_value(self->env_, self->jsFuncRef_, &fn);
160159
}
161-
callJsCb_(env_, fn, context_, queuedData);
162-
} else if (jsFuncRef_) {
160+
self->callJsCb_(self->env_, fn, self->context_, queuedData);
161+
} else if (self->jsFuncRef_) {
163162
napi_value fn;
164-
napi_get_reference_value(env_, jsFuncRef_, &fn);
163+
napi_get_reference_value(self->env_, self->jsFuncRef_, &fn);
165164
napi_value recv;
166-
napi_get_undefined(env_, &recv);
165+
napi_get_undefined(self->env_, &recv);
167166
napi_value result;
168-
napi_call_function(env_, recv, fn, 0, nullptr, &result);
167+
napi_call_function(self->env_, recv, fn, 0, nullptr, &result);
169168
}
170169
}
171170

172171
// Auto-finalize when: no remaining threads (acquire/release balance),
173172
// queue drained, and not already closing.
174-
if (!threadCount_ && empty && !closing_) {
175-
if (maxQueueSize_) {
176-
std::lock_guard lock{queueMutex_};
177-
queueCv_.notify_all();
178-
}
179-
finalize();
173+
if (!self->threadCount_ && empty) {
174+
// if (self->maxQueueSize_) {
175+
// std::lock_guard lock{self->queueMutex_};
176+
// self->queueCv_.notify_all();
177+
// }
178+
self->finalize();
180179
}
181180
});
182181
return napi_ok;
@@ -186,7 +185,6 @@ napi_status ThreadSafeFunction::acquire() {
186185
if (closing_) {
187186
return napi_closing;
188187
}
189-
refCount_++;
190188
threadCount_++;
191189
return napi_ok;
192190
}
@@ -198,21 +196,19 @@ napi_status ThreadSafeFunction::release(
198196
aborted_ = true;
199197
closing_ = true;
200198
}
201-
if (refCount_) {
202-
refCount_--;
203-
}
204199
if (threadCount_) {
205200
threadCount_--;
206201
}
207-
// When the last ref is gone (or we're closing), queue is drained, notify and
208-
// finalize.
209-
std::lock_guard lock{queueMutex_};
210-
if (!refCount_ && !threadCount_ && queue_.empty() || closing_) {
211-
closing_ = true;
202+
// When the last thread is gone (or we're closing), notify and finalize.
203+
if (!threadCount_ || closing_) {
204+
std::lock_guard lock{queueMutex_};
205+
auto emptyQueue{queue_.empty()};
212206
if (maxQueueSize_) {
213207
queueCv_.notify_all();
214208
}
215-
finalize();
209+
if (aborted_ || emptyQueue) {
210+
finalize();
211+
}
216212
}
217213
return napi_ok;
218214
}
@@ -232,26 +228,26 @@ napi_status ThreadSafeFunction::unref() {
232228
}
233229

234230
void ThreadSafeFunction::finalize() {
235-
std::lock_guard lock{finalizeMutex_};
236231
if (handlesClosing_) {
237232
return;
238233
}
239234
handlesClosing_ = true;
240235
closing_ = true;
241236

242-
const auto onFinalize = [this] {
237+
const auto onFinalize = [self = shared_from_this()] {
243238
// Invoke user finalizer and unregister the handle from the global map.
244-
if (threadFinalizeCb_) {
245-
threadFinalizeCb_(env_, threadFinalizeData_, context_);
239+
if (self->threadFinalizeCb_) {
240+
self->threadFinalizeCb_(
241+
self->env_, self->threadFinalizeData_, self->context_);
246242
}
247243
std::lock_guard lock{registryMutex};
248-
registry.erase(reinterpret_cast<napi_threadsafe_function>(this));
244+
registry.erase(reinterpret_cast<napi_threadsafe_function>(self.get()));
249245
};
250246

251247
// Prefer running the finalizer on the JS thread to match expectations;
252248
// if CallInvoker is gone, run synchronously.
253249
if (const auto invoker = callInvoker_.lock()) {
254-
invoker->invokeAsync([=]() { onFinalize(); });
250+
invoker->invokeAsync(onFinalize);
255251
} else {
256252
onFinalize();
257253
}

packages/host/cpp/ThreadsafeFunction.hpp

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,7 @@ class ThreadSafeFunction
7070
std::queue<void*> queue_;
7171
std::atomic<bool> closing_{false};
7272
std::atomic<bool> referenced_{true};
73-
std::atomic<size_t> refCount_;
74-
std::mutex finalizeMutex_;
75-
bool handlesClosing_{false};
73+
std::atomic<bool> handlesClosing_{false};
7674
};
7775

7876
} // namespace callstack::nodeapihost

0 commit comments

Comments
 (0)