Skip to content

Commit 2ccc4d4

Browse files
committed
implements request_promise()
1 parent 7e3284c commit 2ccc4d4

File tree

9 files changed

+117
-92
lines changed

9 files changed

+117
-92
lines changed

DESCRIPTION

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ LinkingTo:
3636
later
3737
Suggests:
3838
knitr,
39-
markdown,
40-
promises
39+
markdown
4140
VignetteBuilder: knitr
4241
RoxygenNote: 7.3.1

NAMESPACE

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,6 @@ S3method(print,recvAio)
3333
S3method(print,sendAio)
3434
S3method(print,tlsConfig)
3535
S3method(print,unresolvedValue)
36-
S3method(promises::as.promise,recvAio)
37-
S3method(promises::is.promising,recvAio)
3836
S3method(start,nanoDialer)
3937
S3method(start,nanoListener)
4038
export("%~>%")
@@ -77,6 +75,7 @@ export(recv_aio)
7775
export(recv_aio_signal)
7876
export(reply)
7977
export(request)
78+
export(request_promise)
8079
export(request_signal)
8180
export(send)
8281
export(send_aio)

R/aio.R

Lines changed: 2 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -118,14 +118,7 @@ recv_aio <- function(con,
118118
"integer", "logical", "numeric", "raw", "string"),
119119
timeout = NULL,
120120
n = 65536L)
121-
data <- .Call(rnng_recv_aio, con, mode, timeout, n, environment(),
122-
function() {
123-
cb <- .subset2(data, "callback")
124-
if (is.function(cb)) {
125-
cb(data)
126-
}
127-
}
128-
)
121+
data <- .Call(rnng_recv_aio, con, mode, timeout, n, environment())
129122

130123
#' Receive Async and Signal a Condition
131124
#'
@@ -164,60 +157,7 @@ recv_aio_signal <- function(con,
164157
"integer", "logical", "numeric", "raw", "string"),
165158
timeout = NULL,
166159
n = 65536L)
167-
data <- .Call(rnng_recv_aio_signal, con, cv, mode, timeout, n, environment(),
168-
function() {
169-
cb <- .subset2(data, "callback")
170-
if (is.function(cb)) {
171-
cb(data)
172-
}
173-
}
174-
)
175-
176-
#' @exportS3Method promises::is.promising
177-
is.promising.recvAio <- function(x) {
178-
TRUE
179-
}
180-
181-
#' @exportS3Method promises::as.promise
182-
as.promise.recvAio <- function(x) {
183-
184-
promise <- .subset2(x, "promise")
185-
186-
if (is.null(promise)) {
187-
promise <- promises::promise(function(resolve, reject) {
188-
assign("callback", function(...) {
189-
190-
# WARNING: x$data is heavily side-effecty!
191-
value <- .subset2(x, "data")
192-
193-
if (is_error_value(value)) {
194-
reject(simpleError(nng_error(value)))
195-
} else {
196-
resolve(value)
197-
}
198-
}, x)
199-
})
200-
201-
# WARNING: x$data is heavily side-effecty!
202-
value <- x$data
203-
204-
if (!inherits(value, "unresolvedValue")) {
205-
if (is_error_value(value)) {
206-
promise <- promises::promise_reject(simpleError(nng_error(value)))
207-
} else {
208-
promise <- promises::promise_resolve(value)
209-
}
210-
}
211-
212-
# Save for next time. This is not just an optimization but essential for
213-
# correct behavior if as.promise is called multiple times, because only one
214-
# `callback` can exist on the recvAio object at a time.
215-
assign("promise", promise, x)
216-
}
217-
218-
promise
219-
220-
}
160+
data <- .Call(rnng_recv_aio_signal, con, cv, mode, timeout, n, environment())
221161

222162
# Core aio functions -----------------------------------------------------------
223163

R/context.R

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,3 +269,19 @@ request_signal <- function(context,
269269
"integer", "logical", "numeric", "raw", "string"),
270270
timeout = NULL)
271271
data <- .Call(rnng_request_signal, context, data, cv, send_mode, recv_mode, timeout, environment())
272+
273+
#' @rdname request
274+
#' @export
275+
#'
276+
request_promise <- function(context,
277+
data,
278+
cv,
279+
send_mode = c("serial", "raw", "next"),
280+
recv_mode = c("serial", "character", "complex", "double",
281+
"integer", "logical", "numeric", "raw", "string"),
282+
timeout = NULL)
283+
data <- .Call(rnng_request_promise, context, data, cv, send_mode, recv_mode, timeout, environment(),
284+
function() {
285+
cb <- .subset2(data, "callback")
286+
if (is.function(cb)) cb(data)
287+
})

R/nanonext-package.R

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,10 +92,10 @@
9292
#' @author Charlie Gao \email{charlie.gao@@shikokuchuo.net}
9393
#' (\href{https://orcid.org/0000-0002-0750-061X}{ORCID})
9494
#'
95+
#' @importFrom later later
9596
#' @importFrom stats start
9697
#' @importFrom tools md5sum
9798
#' @importFrom utils .DollarNames
98-
#' @importFrom later later
9999
#' @useDynLib nanonext, .registration = TRUE
100100
#'
101101
"_PACKAGE"

man/request.Rd

Lines changed: 11 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/aio.c

Lines changed: 79 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -196,14 +196,6 @@ static void isaio_complete(void *arg) {
196196

197197
}
198198

199-
static void raio_invoke_cb(void *arg) {
200-
nano_aio *raio = (nano_aio *) arg;
201-
SEXP callExpr;
202-
PROTECT(callExpr = Rf_lcons((SEXP) raio->cb, R_NilValue));
203-
(void) Rf_eval(callExpr, R_GlobalEnv);
204-
UNPROTECT(1);
205-
}
206-
207199
static void raio_complete(void *arg) {
208200

209201
nano_aio *raio = (nano_aio *) arg;
@@ -219,7 +211,6 @@ static void raio_complete(void *arg) {
219211
raio->result = res - !res;
220212
#endif
221213

222-
later2(raio_invoke_cb, arg, 0);
223214
}
224215

225216
static void raio_complete_signal(void *arg) {
@@ -239,7 +230,6 @@ static void raio_complete_signal(void *arg) {
239230
nng_cv_wake(cv);
240231
nng_mtx_unlock(mtx);
241232

242-
later2(raio_invoke_cb, arg, 0);
243233
}
244234

245235
static void request_complete_signal(void *arg) {
@@ -262,6 +252,55 @@ static void request_complete_signal(void *arg) {
262252

263253
}
264254

255+
static void raio_invoke_cb(void *arg) {
256+
SEXP callExpr;
257+
PROTECT(callExpr = Rf_lcons((SEXP) arg, R_NilValue));
258+
(void) Rf_eval(callExpr, R_GlobalEnv);
259+
UNPROTECT(1);
260+
R_ReleaseObject(arg);
261+
}
262+
263+
static void raio_complete_cb(void *arg) {
264+
265+
nano_aio *raio = (nano_aio *) arg;
266+
const int res = nng_aio_result(raio->aio);
267+
if (res == 0)
268+
raio->data = nng_aio_get_msg(raio->aio);
269+
270+
#ifdef NANONEXT_LEGACY_NNG
271+
nng_mtx_lock(shr_mtx);
272+
raio->result = res - !res;
273+
nng_mtx_unlock(shr_mtx);
274+
#else
275+
raio->result = res - !res;
276+
#endif
277+
278+
later2(raio_invoke_cb, raio->cb, 0);
279+
280+
}
281+
282+
static void request_complete_cb(void *arg) {
283+
284+
nano_aio *raio = (nano_aio *) arg;
285+
nano_aio *saio = (nano_aio *) raio->next;
286+
nano_cv *ncv = (nano_cv *) saio->next;
287+
nng_cv *cv = ncv->cv;
288+
nng_mtx *mtx = ncv->mtx;
289+
290+
const int res = nng_aio_result(raio->aio);
291+
if (res == 0)
292+
raio->data = nng_aio_get_msg(raio->aio);
293+
294+
nng_mtx_lock(mtx);
295+
raio->result = res - !res;
296+
ncv->condition++;
297+
nng_cv_wake(cv);
298+
nng_mtx_unlock(mtx);
299+
300+
later2(raio_invoke_cb, raio->cb, 0);
301+
302+
}
303+
265304
static void iraio_complete(void *arg) {
266305

267306
nano_aio *iaio = (nano_aio *) arg;
@@ -720,7 +759,7 @@ SEXP rnng_send_aio(SEXP con, SEXP data, SEXP mode, SEXP timeout, SEXP clo) {
720759
}
721760

722761
SEXP rnng_recv_aio_impl(const SEXP con, const SEXP mode, const SEXP timeout,
723-
const SEXP bytes, const SEXP clo, const SEXP cb, nano_cv *ncv) {
762+
const SEXP bytes, const SEXP clo, nano_cv *ncv) {
724763

725764
const nng_duration dur = timeout == R_NilValue ? NNG_DURATION_DEFAULT : (nng_duration) Rf_asInteger(timeout);
726765
const int signal = ncv != NULL;
@@ -736,7 +775,6 @@ SEXP rnng_recv_aio_impl(const SEXP con, const SEXP mode, const SEXP timeout,
736775
raio->next = ncv;
737776
raio->type = RECVAIO;
738777
raio->mode = mod;
739-
raio->cb = cb;
740778

741779
if ((xc = nng_aio_alloc(&raio->aio, signal ? raio_complete_signal : raio_complete, raio)))
742780
goto exitlevel1;
@@ -803,19 +841,19 @@ SEXP rnng_recv_aio_impl(const SEXP con, const SEXP mode, const SEXP timeout,
803841

804842
}
805843

806-
SEXP rnng_recv_aio(SEXP con, SEXP mode, SEXP timeout, SEXP bytes, SEXP clo, SEXP cb) {
844+
SEXP rnng_recv_aio(SEXP con, SEXP mode, SEXP timeout, SEXP bytes, SEXP clo) {
807845

808-
return rnng_recv_aio_impl(con, mode, timeout, bytes, clo, cb, NULL);
846+
return rnng_recv_aio_impl(con, mode, timeout, bytes, clo, NULL);
809847

810848
}
811849

812-
SEXP rnng_recv_aio_signal(SEXP con, SEXP cvar, SEXP mode, SEXP timeout, SEXP bytes, SEXP clo, SEXP cb) {
850+
SEXP rnng_recv_aio_signal(SEXP con, SEXP cvar, SEXP mode, SEXP timeout, SEXP bytes, SEXP clo) {
813851

814852
if (R_ExternalPtrTag(cvar) != nano_CvSymbol)
815853
Rf_error("'cv' is not a valid Condition Variable");
816854
nano_cv *ncv = (nano_cv *) R_ExternalPtrAddr(cvar);
817855

818-
return rnng_recv_aio_impl(con, mode, timeout, bytes, clo, cb, ncv);
856+
return rnng_recv_aio_impl(con, mode, timeout, bytes, clo, ncv);
819857

820858
}
821859

@@ -1224,11 +1262,13 @@ SEXP rnng_ncurl_session_close(SEXP session) {
12241262
// request ---------------------------------------------------------------------
12251263

12261264
SEXP rnng_request_impl(const SEXP con, const SEXP data, const SEXP sendmode,
1227-
const SEXP recvmode, const SEXP timeout, const SEXP clo, nano_cv *ncv) {
1265+
const SEXP recvmode, const SEXP timeout, const SEXP clo,
1266+
nano_cv *ncv, const SEXP cb) {
12281267

12291268
const nng_duration dur = timeout == R_NilValue ? NNG_DURATION_DEFAULT : (nng_duration) Rf_asInteger(timeout);
12301269
const int mod = nano_matcharg(recvmode);
12311270
const int signal = ncv != NULL;
1271+
const int promises = cb != NULL;
12321272
nng_ctx *ctx = (nng_ctx *) R_ExternalPtrAddr(con);
12331273
SEXP aio, env, fun;
12341274
nano_buf buf;
@@ -1267,8 +1307,15 @@ SEXP rnng_request_impl(const SEXP con, const SEXP data, const SEXP sendmode,
12671307
raio->type = RECVAIO;
12681308
raio->mode = mod;
12691309
raio->next = saio;
1270-
1271-
if ((xc = nng_aio_alloc(&raio->aio, signal ? request_complete_signal : raio_complete, raio)))
1310+
if (promises)
1311+
R_PreserveObject(cb);
1312+
raio->cb = cb;
1313+
1314+
if ((xc = nng_aio_alloc(&raio->aio,
1315+
promises ?
1316+
(signal ? request_complete_cb : raio_complete_cb) :
1317+
(signal ? request_complete_signal : raio_complete),
1318+
raio)))
12721319
goto exitlevel2;
12731320

12741321
nng_aio_set_timeout(raio->aio, dur);
@@ -1306,7 +1353,7 @@ SEXP rnng_request(SEXP con, SEXP data, SEXP sendmode, SEXP recvmode, SEXP timeou
13061353
if (R_ExternalPtrTag(con) != nano_ContextSymbol)
13071354
Rf_error("'con' is not a valid Context");
13081355

1309-
return rnng_request_impl(con, data, sendmode, recvmode, timeout, clo, NULL);
1356+
return rnng_request_impl(con, data, sendmode, recvmode, timeout, clo, NULL, NULL);
13101357

13111358
}
13121359

@@ -1318,7 +1365,18 @@ SEXP rnng_request_signal(SEXP con, SEXP data, SEXP cvar, SEXP sendmode, SEXP rec
13181365
Rf_error("'cv' is not a valid Condition Variable");
13191366
nano_cv *ncv = (nano_cv *) R_ExternalPtrAddr(cvar);
13201367

1321-
return rnng_request_impl(con, data, sendmode, recvmode, timeout, clo, ncv);
1368+
return rnng_request_impl(con, data, sendmode, recvmode, timeout, clo, ncv, NULL);
1369+
1370+
}
1371+
1372+
SEXP rnng_request_promise(SEXP con, SEXP data, SEXP cvar, SEXP sendmode, SEXP recvmode, SEXP timeout, SEXP clo, SEXP cb) {
1373+
1374+
if (R_ExternalPtrTag(con) != nano_ContextSymbol)
1375+
Rf_error("'con' is not a valid Context");
1376+
1377+
nano_cv *ncv = R_ExternalPtrTag(cvar) == nano_CvSymbol ? (nano_cv *) R_ExternalPtrAddr(cvar) : NULL;
1378+
1379+
return rnng_request_impl(con, data, sendmode, recvmode, timeout, clo, ncv, cb);
13221380

13231381
}
13241382

src/init.c

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -155,9 +155,10 @@ static const R_CallMethodDef callMethods[] = {
155155
{"rnng_random", (DL_FUNC) &rnng_random, 2},
156156
{"rnng_reap", (DL_FUNC) &rnng_reap, 1},
157157
{"rnng_recv", (DL_FUNC) &rnng_recv, 4},
158-
{"rnng_recv_aio", (DL_FUNC) &rnng_recv_aio, 6},
159-
{"rnng_recv_aio_signal", (DL_FUNC) &rnng_recv_aio_signal, 7},
158+
{"rnng_recv_aio", (DL_FUNC) &rnng_recv_aio, 5},
159+
{"rnng_recv_aio_signal", (DL_FUNC) &rnng_recv_aio_signal, 6},
160160
{"rnng_request", (DL_FUNC) &rnng_request, 6},
161+
{"rnng_request_promise", (DL_FUNC) &rnng_request_promise, 8},
161162
{"rnng_request_signal", (DL_FUNC) &rnng_request_signal, 7},
162163
{"rnng_send", (DL_FUNC) &rnng_send, 4},
163164
{"rnng_send_aio", (DL_FUNC) &rnng_send_aio, 5},

src/nanonext.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -251,9 +251,10 @@ SEXP rnng_protocol_open(SEXP, SEXP);
251251
SEXP rnng_random(SEXP, SEXP);
252252
SEXP rnng_reap(SEXP);
253253
SEXP rnng_recv(SEXP, SEXP, SEXP, SEXP);
254-
SEXP rnng_recv_aio(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP);
255-
SEXP rnng_recv_aio_signal(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP);
254+
SEXP rnng_recv_aio(SEXP, SEXP, SEXP, SEXP, SEXP);
255+
SEXP rnng_recv_aio_signal(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP);
256256
SEXP rnng_request(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP);
257+
SEXP rnng_request_promise(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP);
257258
SEXP rnng_request_signal(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP);
258259
SEXP rnng_send(SEXP, SEXP, SEXP, SEXP);
259260
SEXP rnng_send_aio(SEXP, SEXP, SEXP, SEXP, SEXP);

0 commit comments

Comments
 (0)