Skip to content

Commit ce12e73

Browse files
jcheng5shikokuchuo
authored andcommitted
Implement promises for recv_aio[_signal]
1 parent 4bb4622 commit ce12e73

File tree

4 files changed

+94
-9
lines changed

4 files changed

+94
-9
lines changed

NAMESPACE

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ S3method(print,recvAio)
3333
S3method(print,sendAio)
3434
S3method(print,tlsConfig)
3535
S3method(print,unresolvedValue)
36+
S3method(promises::as.promise,recvAio)
37+
S3method(promises::is.promising,recvAio)
3638
S3method(start,nanoDialer)
3739
S3method(start,nanoListener)
3840
export("%~>%")

R/aio.R

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,14 @@ recv_aio <- function(con,
118118
"integer", "logical", "numeric", "raw", "string"),
119119
timeout = NULL,
120120
n = 65536L)
121-
data <- .Call(rnng_recv_aio, con, mode, timeout, n, environment())
121+
data <- .Call(rnng_recv_aio, con, mode, timeout, n, environment(),
122+
function() {
123+
cb <- data$callback
124+
if (!is.null(cb)) {
125+
cb(data)
126+
}
127+
}
128+
)
122129

123130
#' Receive Async and Signal a Condition
124131
#'
@@ -157,7 +164,58 @@ recv_aio_signal <- function(con,
157164
"integer", "logical", "numeric", "raw", "string"),
158165
timeout = NULL,
159166
n = 65536L)
160-
data <- .Call(rnng_recv_aio_signal, con, cv, mode, timeout, n, environment())
167+
data <- .Call(rnng_recv_aio_signal, con, cv, mode, timeout, n, environment(),
168+
function() {
169+
cb <- data$callback
170+
if (!is.null(cb)) {
171+
cb(data)
172+
}
173+
}
174+
)
175+
176+
#' @exportS3Method promises::is.promising
177+
is.promising.recvAio <- function(x) {
178+
TRUE
179+
}
180+
181+
#' @exportS3Method promises::as.promise
182+
as.promise.recvAio <- function(x) {
183+
prom <- x$promise
184+
185+
if (is.null(prom)) {
186+
prom <- promises::promise(function(resolve, reject) {
187+
assign("callback", function(...) {
188+
189+
# WARNING: x$data is heavily side-effecty!
190+
value <- x$data
191+
192+
if (is_error_value(value)) {
193+
reject(simpleError(nng_error(value)))
194+
} else {
195+
resolve(value)
196+
}
197+
}, x)
198+
})
199+
200+
# WARNING: x$data is heavily side-effecty!
201+
value <- x$data
202+
203+
if (!inherits(value, "unresolvedValue")) {
204+
if (is_error_value(value)) {
205+
prom <- promises::promise_reject(simpleError(nng_error(value)))
206+
} else {
207+
prom <- promises::promise_resolve(value)
208+
}
209+
}
210+
211+
# Save for next time. This is not just an optimization but essential for
212+
# correct behavior if as.promise is called multiple times, because only one
213+
# `callback` can exist on the recvAio object at a time.
214+
assign("promise", prom, x)
215+
}
216+
217+
prom
218+
}
161219

162220
# Core aio functions -----------------------------------------------------------
163221

src/aio.c

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#define NANONEXT_SUPPLEMENTALS
2121
#define NANONEXT_SIGNALS
2222
#include "nanonext.h"
23+
#include "later_shim.h"
2324

2425
// internals -------------------------------------------------------------------
2526

@@ -195,6 +196,21 @@ static void isaio_complete(void *arg) {
195196

196197
}
197198

199+
200+
static void raio_invoke_cb(void* arg) {
201+
nano_aio *raio = (nano_aio *) arg;
202+
if (raio->cb == NULL || Rf_isNull(raio->cb)) return;
203+
SEXP func = (SEXP)raio->cb;
204+
SEXP callExpr, result;
205+
if (!Rf_isNull(func)) {
206+
PROTECT(callExpr = Rf_lcons(func, R_NilValue)); // Prepare call
207+
PROTECT(result = Rf_eval(callExpr, R_GlobalEnv)); // Execute call
208+
209+
UNPROTECT(2);
210+
R_ReleaseObject(func);
211+
}
212+
}
213+
198214
static void raio_complete(void *arg) {
199215

200216
nano_aio *raio = (nano_aio *) arg;
@@ -210,6 +226,7 @@ static void raio_complete(void *arg) {
210226
raio->result = res - !res;
211227
#endif
212228

229+
later2(raio_invoke_cb, arg, 0);
213230
}
214231

215232
static void raio_complete_signal(void *arg) {
@@ -229,6 +246,7 @@ static void raio_complete_signal(void *arg) {
229246
nng_cv_wake(cv);
230247
nng_mtx_unlock(mtx);
231248

249+
later2(raio_invoke_cb, arg, 0);
232250
}
233251

234252
static void request_complete_signal(void *arg) {
@@ -709,7 +727,7 @@ SEXP rnng_send_aio(SEXP con, SEXP data, SEXP mode, SEXP timeout, SEXP clo) {
709727
}
710728

711729
SEXP rnng_recv_aio_impl(const SEXP con, const SEXP mode, const SEXP timeout,
712-
const SEXP bytes, const SEXP clo, nano_cv *ncv) {
730+
const SEXP bytes, const SEXP clo, const SEXP cb, nano_cv *ncv) {
713731

714732
const nng_duration dur = timeout == R_NilValue ? NNG_DURATION_DEFAULT : (nng_duration) Rf_asInteger(timeout);
715733
const int signal = ncv != NULL;
@@ -725,6 +743,12 @@ SEXP rnng_recv_aio_impl(const SEXP con, const SEXP mode, const SEXP timeout,
725743
raio->next = ncv;
726744
raio->type = RECVAIO;
727745
raio->mode = mod;
746+
if (Rf_isNull(cb)) {
747+
raio->cb = NULL;
748+
} else {
749+
R_PreserveObject(cb);
750+
raio->cb = (void*)cb;
751+
}
728752

729753
if ((xc = nng_aio_alloc(&raio->aio, signal ? raio_complete_signal : raio_complete, raio)))
730754
goto exitlevel1;
@@ -791,19 +815,19 @@ SEXP rnng_recv_aio_impl(const SEXP con, const SEXP mode, const SEXP timeout,
791815

792816
}
793817

794-
SEXP rnng_recv_aio(SEXP con, SEXP mode, SEXP timeout, SEXP bytes, SEXP clo) {
818+
SEXP rnng_recv_aio(SEXP con, SEXP mode, SEXP timeout, SEXP bytes, SEXP clo, SEXP cb) {
795819

796-
return rnng_recv_aio_impl(con, mode, timeout, bytes, clo, NULL);
820+
return rnng_recv_aio_impl(con, mode, timeout, bytes, clo, cb, NULL);
797821

798822
}
799823

800-
SEXP rnng_recv_aio_signal(SEXP con, SEXP cvar, SEXP mode, SEXP timeout, SEXP bytes, SEXP clo) {
824+
SEXP rnng_recv_aio_signal(SEXP con, SEXP cvar, SEXP mode, SEXP timeout, SEXP bytes, SEXP clo, SEXP cb) {
801825

802826
if (R_ExternalPtrTag(cvar) != nano_CvSymbol)
803827
Rf_error("'cv' is not a valid Condition Variable");
804828
nano_cv *ncv = (nano_cv *) R_ExternalPtrAddr(cvar);
805829

806-
return rnng_recv_aio_impl(con, mode, timeout, bytes, clo, ncv);
830+
return rnng_recv_aio_impl(con, mode, timeout, bytes, clo, cb, ncv);
807831

808832
}
809833

src/nanonext.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ typedef struct nano_aio_s {
8888
int result;
8989
void *data;
9090
void *next;
91+
void *cb;
9192
} nano_aio;
9293

9394
typedef struct nano_cv_s {
@@ -250,8 +251,8 @@ SEXP rnng_protocol_open(SEXP, SEXP);
250251
SEXP rnng_random(SEXP, SEXP);
251252
SEXP rnng_reap(SEXP);
252253
SEXP rnng_recv(SEXP, SEXP, SEXP, SEXP);
253-
SEXP rnng_recv_aio(SEXP, SEXP, SEXP, SEXP, SEXP);
254-
SEXP rnng_recv_aio_signal(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP);
254+
SEXP rnng_recv_aio(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP);
255+
SEXP rnng_recv_aio_signal(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP);
255256
SEXP rnng_request(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP);
256257
SEXP rnng_request_signal(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP);
257258
SEXP rnng_send(SEXP, SEXP, SEXP, SEXP);

0 commit comments

Comments
 (0)