@@ -191,6 +191,9 @@ SEXP rnng_messenger_thread_create(SEXP args) {
191191
192192// threaded functions ----------------------------------------------------------
193193
194+ // # nocov start
195+ // tested interactively
196+
194197static void thread_aio_finalizer (SEXP xptr ) {
195198
196199 if (NANO_PTR (xptr ) == NULL ) return ;
@@ -207,6 +210,80 @@ static void thread_aio_finalizer(SEXP xptr) {
207210
208211}
209212
213+
214+ static void rnng_wait_thread_single (void * args ) {
215+
216+ nano_thread_aio * taio = (nano_thread_aio * ) args ;
217+ nano_cv * ncv = taio -> cv ;
218+ nng_mtx * mtx = ncv -> mtx ;
219+ nng_cv * cv = ncv -> cv ;
220+
221+ nng_aio_wait (taio -> aio );
222+
223+ nng_mtx_lock (mtx );
224+ ncv -> condition = 1 ;
225+ nng_cv_wake (cv );
226+ nng_mtx_unlock (mtx );
227+
228+ }
229+
230+ void single_wait_thread_create (SEXP x ) {
231+
232+ nano_aio * aiop = (nano_aio * ) NANO_PTR (x );
233+ nano_thread_aio * taio = R_Calloc (1 , nano_thread_aio );
234+ nano_cv * ncv = R_Calloc (1 , nano_cv );
235+ taio -> aio = aiop -> aio ;
236+ taio -> cv = ncv ;
237+ nng_mtx * mtx = NULL ;
238+ nng_cv * cv = NULL ;
239+ int xc , signalled ;
240+
241+ if ((xc = nng_mtx_alloc (& mtx )))
242+ goto fail ;
243+
244+ if ((xc = nng_cv_alloc (& cv , mtx )))
245+ goto fail ;
246+
247+ ncv -> mtx = mtx ;
248+ ncv -> cv = cv ;
249+
250+ if ((xc = nng_thread_create (& taio -> thr , rnng_wait_thread_single , taio )))
251+ goto fail ;
252+
253+ SEXP xptr ;
254+ PROTECT (xptr = R_MakeExternalPtr (taio , R_NilValue , R_NilValue ));
255+ R_RegisterCFinalizerEx (xptr , thread_aio_finalizer , TRUE);
256+ R_MakeWeakRef (x , xptr , R_NilValue , TRUE);
257+ UNPROTECT (1 );
258+
259+ nng_time time = nng_clock ();
260+
261+ while (1 ) {
262+ time = time + 400 ;
263+ signalled = 1 ;
264+ nng_mtx_lock (mtx );
265+ while (ncv -> condition == 0 ) {
266+ if (nng_cv_until (cv , time ) == NNG_ETIMEDOUT ) {
267+ signalled = 0 ;
268+ break ;
269+ }
270+ }
271+ nng_mtx_unlock (mtx );
272+ if (signalled ) break ;
273+ R_CheckUserInterrupt ();
274+ }
275+
276+ return ;
277+
278+ fail :
279+ if (cv ) nng_cv_free (cv );
280+ if (mtx ) nng_mtx_free (mtx );
281+ ERROR_OUT (xc );
282+
283+ }
284+
285+ // # nocov end
286+
210287static void thread_duo_finalizer (SEXP xptr ) {
211288
212289 if (NANO_PTR (xptr ) == NULL ) return ;
@@ -284,77 +361,6 @@ static void rnng_wait_thread(void *args) {
284361
285362}
286363
287- static void rnng_wait_thread_single (void * args ) {
288-
289- nano_thread_aio * taio = (nano_thread_aio * ) args ;
290- nano_cv * ncv = taio -> cv ;
291- nng_mtx * mtx = ncv -> mtx ;
292- nng_cv * cv = ncv -> cv ;
293-
294- nng_aio_wait (taio -> aio );
295-
296- nng_mtx_lock (mtx );
297- ncv -> condition = 1 ;
298- nng_cv_wake (cv );
299- nng_mtx_unlock (mtx );
300-
301- }
302-
303- void single_wait_thread_create (SEXP x ) {
304-
305- nano_aio * aiop = (nano_aio * ) NANO_PTR (x );
306- nano_thread_aio * taio = R_Calloc (1 , nano_thread_aio );
307- nano_cv * ncv = R_Calloc (1 , nano_cv );
308- taio -> aio = aiop -> aio ;
309- taio -> cv = ncv ;
310- nng_mtx * mtx = NULL ;
311- nng_cv * cv = NULL ;
312- int xc , signalled ;
313-
314- if ((xc = nng_mtx_alloc (& mtx )))
315- goto fail ;
316-
317- if ((xc = nng_cv_alloc (& cv , mtx )))
318- goto fail ;
319-
320- ncv -> mtx = mtx ;
321- ncv -> cv = cv ;
322-
323- if ((xc = nng_thread_create (& taio -> thr , rnng_wait_thread_single , taio )))
324- goto fail ;
325-
326- SEXP xptr ;
327- PROTECT (xptr = R_MakeExternalPtr (taio , R_NilValue , R_NilValue ));
328- R_RegisterCFinalizerEx (xptr , thread_aio_finalizer , TRUE);
329- R_MakeWeakRef (x , xptr , R_NilValue , TRUE);
330- UNPROTECT (1 );
331-
332- nng_time time = nng_clock ();
333-
334- while (1 ) {
335- time = time + 400 ;
336- signalled = 1 ;
337- nng_mtx_lock (mtx );
338- while (ncv -> condition == 0 ) {
339- if (nng_cv_until (cv , time ) == NNG_ETIMEDOUT ) {
340- signalled = 0 ;
341- break ;
342- }
343- }
344- nng_mtx_unlock (mtx );
345- if (signalled ) break ;
346- R_CheckUserInterrupt ();
347- }
348-
349- return ;
350-
351- fail :
352- if (cv ) nng_cv_free (cv );
353- if (mtx ) nng_mtx_free (mtx );
354- ERROR_OUT (xc );
355-
356- }
357-
358364SEXP rnng_wait_thread_create (SEXP x ) {
359365
360366 const SEXPTYPE typ = TYPEOF (x );
0 commit comments