Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ dependencies {
}

testImplementation projects.groovyAnt
testImplementation projects.groovyHttpBuilder
testImplementation projects.groovyNio
testImplementation projects.groovyXml
testImplementation projects.groovyJson
Expand Down
18 changes: 18 additions & 0 deletions gradle/verification-metadata.xml
Original file line number Diff line number Diff line change
Expand Up @@ -927,6 +927,18 @@
<sha512 value="f220e44fe6b61f8dbb61226f832dfb16a09584384540fd48a4dff5c4de9fee060623f85cbead720dfe776aa25105949e70758a9bb1d9db43f63068d8d22164c9" origin="Generated by Gradle"/>
</artifact>
</component>
<component group="io.projectreactor" name="reactor-core" version="3.7.3">
<artifact name="reactor-core-3.7.3.jar">
<pgp value="48B086A7D843CFA258E83286928FBF39003C0425"/>
<sha512 value="d49b2d4ac07066baa52e5d95e333e51be235f639fca05842d833150f9185882a5cd0983ffa9e12207a1842a37c874e1b8d6836716c9662cc0b8fc4e30b56e11a" origin="Generated by Gradle"/>
</artifact>
</component>
<component group="io.reactivex.rxjava3" name="rxjava" version="3.1.10">
<artifact name="rxjava-3.1.10.jar">
<pgp value="E9CC3CD1AE59E851E4DB3FA350FFD7487D34B5B9"/>
<sha512 value="c495dfd8e4dc51d34d221c80245fd61b56e9a6a6f104d15edb5a39a14a6e947a85f5f9d156aa658ebd1451018cdba10ab0fdfb9214a18423ccb29458f4fed512" origin="Generated by Gradle"/>
</artifact>
</component>
<component group="jakarta.activation" name="jakarta.activation-api" version="1.2.1">
<artifact name="jakarta.activation-api-1.2.1.jar">
<pgp value="6DD3B8C64EF75253BEB2C53AD908A43FB7EC07AC"/>
Expand Down Expand Up @@ -2620,6 +2632,12 @@
<sha512 value="adcc480f68828ffd68d03846be852988b595c2e1bb69224d273578dd6c2ad2773edfe96625a7c00bc40ae0f2d1cac8412eaa54b88cc8e681b0b4c0ee3b082333" origin="Generated by Gradle"/>
</artifact>
</component>
<component group="org.reactivestreams" name="reactive-streams" version="1.0.4">
<artifact name="reactive-streams-1.0.4.jar">
<pgp value="50A628FFAF58480736B1079FD1031D14464180E0"/>
<sha512 value="cdab6bd156f39106cd6bbfd47df1f4b0a89dc4aa28c68c31ef12a463193c688897e415f01b8d7f0d487b0e6b5bd2f19044bf8605704b024f26d6aa1f4f9a2471" origin="Generated by Gradle"/>
</artifact>
</component>
<component group="org.reflections" name="reflections" version="0.10.2">
<artifact name="reflections-0.10.2.jar">
<pgp value="3F2A008A91D11A7FAC4A0786F13D3E721D56BD54"/>
Expand Down
2 changes: 2 additions & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ def subprojects = [
'groovy-test-junit5',
'groovy-test-junit6',
'groovy-testng',
'groovy-reactor',
'groovy-rxjava',
'groovy-toml',
'groovy-typecheckers',
'groovy-xml',
Expand Down
3 changes: 3 additions & 0 deletions src/antlr/GroovyLexer.g4
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,9 @@ DEF : 'def';
IN : 'in';
TRAIT : 'trait';
THREADSAFE : 'threadsafe'; // reserved keyword
ASYNC : 'async';
AWAIT : 'await';
DEFER : 'defer';

// §3.9 Keywords
BuiltInPrimitiveType
Expand Down
18 changes: 17 additions & 1 deletion src/antlr/GroovyParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,7 @@ switchStatement
;

loopStatement
: annotationsOpt FOR LPAREN forControl RPAREN nls statement #forStmtAlt
: annotationsOpt FOR AWAIT? LPAREN forControl RPAREN nls statement #forStmtAlt
| annotationsOpt WHILE expressionInPar nls statement #whileStmtAlt
| annotationsOpt DO nls statement nls WHILE expressionInPar #doWhileStmtAlt
;
Expand Down Expand Up @@ -643,6 +643,8 @@ statement
| continueStatement #continueStmtAlt
| { inSwitchExpressionLevel > 0 }?
yieldStatement #yieldStmtAlt
| YIELD RETURN nls expression #yieldReturnStmtAlt
| DEFER nls statementExpression #deferStmtAlt
| identifier COLON nls statement #labeledStmtAlt
| assertStatement #assertStmtAlt
| localVariableDeclaration #localVariableDeclarationStmtAlt
Expand Down Expand Up @@ -779,6 +781,14 @@ expression
// must come before postfixExpression to resolve the ambiguities between casting and call on parentheses expression, e.g. (int)(1 / 2)
: castParExpression castOperandExpression #castExprAlt

// async closure/lambda must come before postfixExpression to resolve ambiguity with method call, e.g. async { ... }
| ASYNC nls closureOrLambdaExpression #asyncClosureExprAlt

// await expression: single-arg or multi-arg (parenthesized or unparenthesized)
| AWAIT nls ( LPAREN expression (COMMA nls expression)* RPAREN
| expression (COMMA nls expression)*
) #awaitExprAlt

// qualified names, array expressions, method invocation, post inc/dec
| postfixExpression #postfixExprAlt

Expand Down Expand Up @@ -1229,6 +1239,9 @@ identifier
: Identifier
| CapitalizedIdentifier
| AS
| ASYNC
| AWAIT
| DEFER
| IN
| PERMITS
| RECORD
Expand All @@ -1247,6 +1260,8 @@ keywords
: ABSTRACT
| AS
| ASSERT
| ASYNC
| AWAIT
| BREAK
| CASE
| CATCH
Expand All @@ -1255,6 +1270,7 @@ keywords
| CONTINUE
| DEF
| DEFAULT
| DEFER
| DO
| ELSE
| ENUM
Expand Down
114 changes: 114 additions & 0 deletions src/main/java/groovy/concurrent/AsyncChannel.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package groovy.concurrent;

import org.apache.groovy.runtime.async.DefaultAsyncChannel;

/**
* An asynchronous channel for inter-task communication with optional buffering.
* <p>
* A channel coordinates producers and consumers without exposing explicit
* locks or shared mutable state, following the CSP (Communicating Sequential
* Processes) paradigm popularized by Go's channels.
* <p>
* Channels support both unbuffered (rendezvous) and buffered modes:
* <ul>
* <li><b>Unbuffered</b> — {@code create()} or {@code create(0)}. Each
* {@code send} suspends until a matching {@code receive} arrives.</li>
* <li><b>Buffered</b> — {@code create(n)}. Values are enqueued until the
* buffer fills, then senders suspend.</li>
* </ul>
* <p>
* Channels implement {@link Iterable}, so they work with {@code for await}
* and regular {@code for} loops — iteration yields received values until the
* channel is closed and drained:
* <pre>{@code
* def ch = AsyncChannel.create(2)
* async { ch.send('a'); ch.send('b'); ch.close() }
* for await (item in ch) {
* println item // prints 'a', then 'b'
* }
* }</pre>
*
* @param <T> the payload type
* @see Awaitable
* @since 6.0.0
*/
public interface AsyncChannel<T> extends Iterable<T> {

/**
* Creates an unbuffered (rendezvous) channel.
*/
static <T> AsyncChannel<T> create() {
return new DefaultAsyncChannel<>();
}

/**
* Creates a channel with the specified buffer capacity.
*
* @param capacity the maximum buffer size; 0 for unbuffered
*/
static <T> AsyncChannel<T> create(int capacity) {
return new DefaultAsyncChannel<>(capacity);
}

/** Returns this channel's buffer capacity. */
int getCapacity();

/** Returns the number of values currently buffered. */
int getBufferedSize();

/** Returns {@code true} if this channel has been closed. */
boolean isClosed();

/**
* Sends a value through this channel.
* <p>
* The returned {@link Awaitable} completes when the value has been
* delivered to a receiver or buffered. Sending to a closed channel
* fails immediately with {@link ChannelClosedException}.
*
* @param value the value to send; must not be {@code null}
* @return an Awaitable that completes when the send succeeds
* @throws NullPointerException if value is null
*/
Awaitable<Void> send(T value);

/**
* Receives the next value from this channel.
* <p>
* The returned {@link Awaitable} completes when a value is available.
* Receiving from a closed, empty channel fails with
* {@link ChannelClosedException}.
*
* @return an Awaitable that yields the next value
*/
Awaitable<T> receive();

/**
* Closes this channel. Idempotent.
* <p>
* Buffered values remain receivable. Pending senders fail with
* {@link ChannelClosedException}. After all buffered values are
* drained, subsequent receives also fail.
*
* @return {@code true} if this call actually closed the channel
*/
boolean close();
}
163 changes: 163 additions & 0 deletions src/main/java/groovy/concurrent/AsyncScope.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package groovy.concurrent;

import groovy.lang.Closure;
import groovy.transform.stc.ClosureParams;
import groovy.transform.stc.SimpleType;
import org.apache.groovy.runtime.async.AsyncSupport;
import org.apache.groovy.runtime.async.DefaultAsyncScope;

import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.function.Supplier;

/**
* A structured concurrency scope that ensures all child tasks complete
* (or are cancelled) before the scope exits.
* <p>
* {@code AsyncScope} provides a bounded lifetime for async tasks,
* following the <em>structured concurrency</em> model. Unlike
* fire-and-forget {@code async { ... }}, tasks launched within a scope
* are guaranteed to complete before the scope closes. This prevents:
* <ul>
* <li>Orphaned tasks that outlive their logical parent</li>
* <li>Resource leaks from uncollected async work</li>
* <li>Silent failures from unobserved exceptions</li>
* </ul>
* <p>
* By default, the scope uses a <b>fail-fast</b> policy: when any child
* task completes exceptionally, all sibling tasks are cancelled
* immediately. The first failure becomes the primary exception;
* subsequent failures are added as suppressed exceptions.
*
* <pre>{@code
* def results = AsyncScope.withScope { scope ->
* def userTask = scope.async { fetchUser(id) }
* def orderTask = scope.async { fetchOrders(id) }
* return [user: await(userTask), orders: await(orderTask)]
* }
* // Both tasks guaranteed complete here
* }</pre>
*
* @see Awaitable
* @since 6.0.0
*/
public interface AsyncScope extends AutoCloseable {

/**
* Launches a child task within this scope.
* The task's lifetime is bound to the scope: when the scope is closed,
* all incomplete child tasks are cancelled.
*
* @param body the async body to execute
* @param <T> the result type
* @return an {@link Awaitable} representing the child task
* @throws IllegalStateException if the scope has already been closed
*/
<T> Awaitable<T> async(Closure<T> body);

/**
* Launches a child task using a {@link Supplier} for Java interop.
*/
<T> Awaitable<T> async(Supplier<T> supplier);

/**
* Returns the number of tracked child tasks (including completed ones
* that have not yet been pruned).
*/
int getChildCount();

/**
* Cancels all child tasks.
*/
void cancelAll();

/**
* Closes the scope, waiting for all child tasks to complete.
* If any child failed and fail-fast is enabled, remaining children
* are cancelled and the first failure is rethrown.
*/
@Override
void close();

// ---- Static methods -------------------------------------------------

/**
* Returns the scope currently bound to this thread, or {@code null}.
*/
static AsyncScope current() {
return DefaultAsyncScope.current();
}

/**
* Executes the supplier with the given scope installed as current,
* restoring the previous binding afterwards.
*/
static <T> T withCurrent(AsyncScope scope, Supplier<T> supplier) {
return DefaultAsyncScope.withCurrent(scope, supplier);
}

/**
* Creates a scope, executes the closure within it, and ensures the
* scope is closed on exit. The closure receives the scope as its
* argument for launching child tasks.
*
* <pre>{@code
* def result = AsyncScope.withScope { scope ->
* def a = scope.async { computeA() }
* def b = scope.async { computeB() }
* return [await(a), await(b)]
* }
* }</pre>
*/
@SuppressWarnings("unchecked")
static <T> T withScope(
@ClosureParams(value = SimpleType.class, options = "groovy.concurrent.AsyncScope") Closure<T> body) {
return withScope(AsyncSupport.getExecutor(), body);
}

/**
* Creates a scope with the given executor, executes the closure,
* and ensures the scope is closed on exit.
*/
@SuppressWarnings("unchecked")
static <T> T withScope(Executor executor,
@ClosureParams(value = SimpleType.class, options = "groovy.concurrent.AsyncScope") Closure<T> body) {
Objects.requireNonNull(body, "body must not be null");
try (AsyncScope scope = create(executor)) {
return withCurrent(scope, () -> body.call(scope));
}
}

/** Creates a new scope with the default executor and fail-fast enabled. */
static AsyncScope create() {
return new DefaultAsyncScope();
}

/** Creates a new scope with the given executor and fail-fast enabled. */
static AsyncScope create(Executor executor) {
return new DefaultAsyncScope(executor);
}

/** Creates a new scope with the given executor and failure policy. */
static AsyncScope create(Executor executor, boolean failFast) {
return new DefaultAsyncScope(executor, failFast);
}
}
Loading
Loading