11#include " ThreadsafeFunction.hpp"
2+ #include < unordered_map>
23#include " Logger.hpp"
34
45// This file provides a React Native-friendly implementation of Node-API's
56// thread-safe function primitive. In RN we don't own/libuv, so we:
67// - Use CallInvoker to hop onto the JS thread instead of uv_async.
7- // - Track a registry mapping native handles to shared_ptrs for lookup/lifetime.
8+ // - Track a registry mapping unique IDs to shared_ptrs for lookup/lifetime.
89// - Emulate ref/unref semantics without affecting any event loop.
910
10- static std::unordered_map<napi_threadsafe_function ,
11+ static std::unordered_map<std:: uintptr_t ,
1112 std::shared_ptr<callstack::nodeapihost::ThreadSafeFunction>>
1213 registry;
1314static std::mutex registryMutex;
15+ static std::atomic<std::uintptr_t > nextId{1 };
16+
17+ // Constants for better readability
18+ static constexpr size_t INITIAL_REF_COUNT = 1 ;
1419
1520namespace callstack ::nodeapihost {
1621
@@ -26,7 +31,8 @@ ThreadSafeFunction::ThreadSafeFunction(
2631 napi_finalize threadFinalizeCb,
2732 void * context,
2833 napi_threadsafe_function_call_js callJsCb)
29- : callInvoker_{std::move (callInvoker)},
34+ : id_{nextId.fetch_add (1 , std::memory_order_relaxed)},
35+ callInvoker_{std::move (callInvoker)},
3036 env_{env},
3137 jsFunc_{jsFunc},
3238 asyncResource_{asyncResource},
@@ -40,8 +46,11 @@ ThreadSafeFunction::ThreadSafeFunction(
4046 if (jsFunc) {
4147 // Keep JS function alive across async hops; fatal here mirrors Node-API's
4248 // behavior when environment is irrecoverable.
43- const auto status = napi_create_reference (env, jsFunc, 1 , &jsFuncRef_);
49+ const auto status =
50+ napi_create_reference (env, jsFunc, INITIAL_REF_COUNT, &jsFuncRef_);
4451 if (status != napi_ok) {
52+ // Consider throwing an exception instead of fatal error in future
53+ // versions
4554 napi_fatal_error (nullptr ,
4655 0 ,
4756 " Failed to create JS function reference" ,
@@ -82,21 +91,26 @@ std::shared_ptr<ThreadSafeFunction> ThreadSafeFunction::create(
8291 callJsCb);
8392
8493 {
85- auto handle = reinterpret_cast <napi_threadsafe_function>(function.get ());
8694 std::lock_guard lock{registryMutex};
87- registry[handle ] = function;
95+ registry[function-> id_ ] = function;
8896 }
8997
90- return std::move ( function) ;
98+ return function;
9199}
92100
93101std::shared_ptr<ThreadSafeFunction> ThreadSafeFunction::get (
94102 napi_threadsafe_function func) {
95103 std::lock_guard lock{registryMutex};
96- return registry.contains (func) ? registry[func] : nullptr ;
104+ const auto id = reinterpret_cast <std::uintptr_t >(func);
105+ const auto it = registry.find (id);
106+ return it != registry.end () ? it->second : nullptr ;
107+ }
108+
109+ napi_threadsafe_function ThreadSafeFunction::getHandle () const noexcept {
110+ return reinterpret_cast <napi_threadsafe_function>(id_);
97111}
98112
99- napi_status ThreadSafeFunction::getContext (void ** result) {
113+ napi_status ThreadSafeFunction::getContext (void ** result) noexcept {
100114 if (!result) {
101115 return napi_invalid_arg;
102116 }
@@ -107,7 +121,7 @@ napi_status ThreadSafeFunction::getContext(void** result) {
107121
108122napi_status ThreadSafeFunction::call (
109123 void * data, napi_threadsafe_function_call_mode isBlocking) {
110- if (aborted_ || closing_ ) {
124+ if (isClosingOrAborted () ) {
111125 return napi_closing;
112126 }
113127
@@ -120,9 +134,9 @@ napi_status ThreadSafeFunction::call(
120134 return napi_queue_full;
121135 }
122136 queueCv_.wait (lock, [&] {
123- return queue_.size () < maxQueueSize_ || aborted_ || closing_ ;
137+ return queue_.size () < maxQueueSize_ || isClosingOrAborted () ;
124138 });
125- if (aborted_ || closing_ ) return napi_closing;
139+ if (isClosingOrAborted () ) return napi_closing;
126140 }
127141 queue_.push (data);
128142 }
@@ -134,101 +148,64 @@ napi_status ThreadSafeFunction::call(
134148 }
135149 // Hop to JS thread; we drain one item per hop to keep latency predictable
136150 // and avoid long monopolization of the JS queue.
137- invoker->invokeAsync ([self = shared_from_this ()] {
138- void * queuedData{nullptr };
139- auto empty{false };
140- {
141- std::lock_guard lock{self->queueMutex_ };
142- if (!self->queue_ .empty ()) {
143- queuedData = self->queue_ .front ();
144- const auto size = self->queue_ .size ();
145- self->queue_ .pop ();
146- empty = self->queue_ .empty ();
147- if (size == self->maxQueueSize_ && self->maxQueueSize_ ) {
148- self->queueCv_ .notify_one ();
149- }
150- }
151- }
152- if (queuedData && !self->aborted_ ) {
153- // Prefer the user-provided callJsCb_ (Node-API compatible). If absent
154- // but we have a JS function ref, call it directly with no args.
155- if (self->callJsCb_ ) {
156- napi_value fn{nullptr };
157- if (self->jsFuncRef_ ) {
158- napi_get_reference_value (self->env_ , self->jsFuncRef_ , &fn);
159- }
160- self->callJsCb_ (self->env_ , fn, self->context_ , queuedData);
161- } else if (self->jsFuncRef_ ) {
162- napi_value fn;
163- napi_get_reference_value (self->env_ , self->jsFuncRef_ , &fn);
164- napi_value recv;
165- napi_get_undefined (self->env_ , &recv);
166- napi_value result;
167- napi_call_function (self->env_ , recv, fn, 0 , nullptr , &result);
168- }
169- }
170-
171- // Auto-finalize when: no remaining threads (acquire/release balance),
172- // queue drained, and not already closing.
173- if (!self->threadCount_ && empty) {
174- self->finalize ();
175- }
176- });
151+ invoker->invokeAsync ([self = shared_from_this ()] { self->processQueue (); });
177152 return napi_ok;
178153}
179154
180155napi_status ThreadSafeFunction::acquire () {
181- if (closing_) {
156+ if (closing_. load (std::memory_order_acquire) ) {
182157 return napi_closing;
183158 }
184- threadCount_++ ;
159+ threadCount_. fetch_add ( 1 , std::memory_order_acq_rel) ;
185160 return napi_ok;
186161}
187162
188163napi_status ThreadSafeFunction::release (
189164 napi_threadsafe_function_release_mode mode) {
190165 // Node-API semantics: abort prevents further JS calls and wakes any waiters.
191166 if (mode == napi_tsfn_abort) {
192- aborted_ = true ;
193- closing_ = true ;
194- }
195- if (threadCount_) {
196- threadCount_--;
167+ aborted_.store (true , std::memory_order_relaxed);
168+ closing_.store (true , std::memory_order_release);
197169 }
170+
171+ const auto remaining = threadCount_.fetch_sub (1 , std::memory_order_acq_rel);
172+
198173 // When the last thread is gone (or we're closing), notify and finalize.
199- if (!threadCount_ || closing_) {
174+ if (remaining <= 1 || closing_. load (std::memory_order_acquire) ) {
200175 std::lock_guard lock{queueMutex_};
201- auto emptyQueue{ queue_.empty ()} ;
176+ const bool emptyQueue = queue_.empty ();
202177 if (maxQueueSize_) {
203178 queueCv_.notify_all ();
204179 }
205- if (aborted_ || emptyQueue) {
180+ if (aborted_. load (std::memory_order_acquire) || emptyQueue) {
206181 finalize ();
207182 }
208183 }
209184 return napi_ok;
210185}
211186
212- napi_status ThreadSafeFunction::ref () {
187+ napi_status ThreadSafeFunction::ref () noexcept {
213188 // In libuv, this would keep the loop alive. In RN we don't own or expose a
214189 // libuv loop. We just track the state for API parity.
215190 referenced_.store (true , std::memory_order_relaxed);
216191 return napi_ok;
217192}
218193
219- napi_status ThreadSafeFunction::unref () {
194+ napi_status ThreadSafeFunction::unref () noexcept {
220195 // In libuv, this allows the loop to exit if nothing else is keeping it
221196 // alive. In RN this is a no-op beyond state tracking.
222197 referenced_.store (false , std::memory_order_relaxed);
223198 return napi_ok;
224199}
225200
226201void ThreadSafeFunction::finalize () {
227- if (finalizeScheduled_) {
202+ bool expected = false ;
203+ if (!finalizeScheduled_.compare_exchange_strong (
204+ expected, true , std::memory_order_acq_rel)) {
228205 return ;
229206 }
230- finalizeScheduled_ = true ;
231- closing_ = true ;
207+
208+ closing_. store ( true , std::memory_order_release) ;
232209
233210 const auto onFinalize = [self = shared_from_this ()] {
234211 // Invoke user finalizer and unregister the handle from the global map.
@@ -237,7 +214,7 @@ void ThreadSafeFunction::finalize() {
237214 self->env_ , self->threadFinalizeData_ , self->context_ );
238215 }
239216 std::lock_guard lock{registryMutex};
240- registry.erase (reinterpret_cast <napi_threadsafe_function>( self. get ()) );
217+ registry.erase (self-> id_ );
241218 };
242219
243220 // Prefer running the finalizer on the JS thread to match expectations;
@@ -249,4 +226,59 @@ void ThreadSafeFunction::finalize() {
249226 }
250227}
251228
229+ void ThreadSafeFunction::processQueue () {
230+ void * queuedData{nullptr };
231+ bool empty{false };
232+
233+ // Extract data from queue
234+ {
235+ std::lock_guard lock{queueMutex_};
236+ if (!queue_.empty ()) {
237+ queuedData = queue_.front ();
238+ const bool wasAtMaxCapacity = (queue_.size () == maxQueueSize_);
239+ queue_.pop ();
240+ empty = queue_.empty ();
241+
242+ // Notify waiting threads if queue was at max capacity
243+ if (wasAtMaxCapacity && maxQueueSize_) {
244+ queueCv_.notify_one ();
245+ }
246+ }
247+ }
248+
249+ // Execute JS callback if we have data and aren't aborted
250+ if (queuedData && !aborted_.load (std::memory_order_relaxed)) {
251+ if (callJsCb_) {
252+ napi_value fn{nullptr };
253+ if (jsFuncRef_) {
254+ napi_get_reference_value (env_, jsFuncRef_, &fn);
255+ }
256+ callJsCb_ (env_, fn, context_, queuedData);
257+ } else if (jsFuncRef_) {
258+ napi_value fn{nullptr };
259+ if (napi_get_reference_value (env_, jsFuncRef_, &fn) == napi_ok) {
260+ napi_value recv{nullptr };
261+ napi_get_undefined (env_, &recv);
262+ napi_value result{nullptr };
263+ napi_call_function (env_, recv, fn, 0 , nullptr , &result);
264+ }
265+ }
266+ }
267+
268+ // Auto-finalize when: no remaining threads, queue drained, and not closing
269+ if (shouldFinalize () && empty) {
270+ finalize ();
271+ }
272+ }
273+
274+ [[nodiscard]] bool ThreadSafeFunction::isClosingOrAborted () const noexcept {
275+ return aborted_.load (std::memory_order_relaxed) ||
276+ closing_.load (std::memory_order_relaxed);
277+ }
278+
279+ [[nodiscard]] bool ThreadSafeFunction::shouldFinalize () const noexcept {
280+ return threadCount_.load (std::memory_order_acquire) == 0 &&
281+ !closing_.load (std::memory_order_acquire);
282+ }
283+
252284} // namespace callstack::nodeapihost
0 commit comments