Skip to content

Commit a165007

Browse files
authored
[Fix #1087] A2A implementation (#1469)
Signed-off-by: fjtirado <ftirados@ibm.com>
1 parent 00ef882 commit a165007

23 files changed

Lines changed: 1220 additions & 6 deletions

impl/a2a/pom.xml

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
2+
<modelVersion>4.0.0</modelVersion>
3+
<parent>
4+
<groupId>io.serverlessworkflow</groupId>
5+
<artifactId>serverlessworkflow-impl</artifactId>
6+
<version>8.0.0-SNAPSHOT</version>
7+
</parent>
8+
<artifactId>serverlessworkflow-impl-a2a</artifactId>
9+
<name>Serverless Workflow :: Impl :: A2A</name>
10+
<dependencies>
11+
<dependency>
12+
<groupId>io.serverlessworkflow</groupId>
13+
<artifactId>serverlessworkflow-impl-core</artifactId>
14+
</dependency>
15+
<dependency>
16+
<groupId>org.a2aproject.sdk</groupId>
17+
<artifactId>a2a-java-sdk-client</artifactId>
18+
</dependency>
19+
<dependency>
20+
<groupId>org.junit.jupiter</groupId>
21+
<artifactId>junit-jupiter-engine</artifactId>
22+
<scope>test</scope>
23+
</dependency>
24+
<dependency>
25+
<groupId>org.junit.jupiter</groupId>
26+
<artifactId>junit-jupiter-params</artifactId>
27+
<scope>test</scope>
28+
</dependency>
29+
</dependencies>
30+
</project>
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.executors.a2a;
17+
18+
import io.serverlessworkflow.impl.WorkflowModel;
19+
import io.serverlessworkflow.impl.WorkflowPosition;
20+
import java.util.concurrent.CompletableFuture;
21+
import java.util.function.Consumer;
22+
23+
class A2AExceptionHandler implements Consumer<Throwable> {
24+
25+
private final CompletableFuture<WorkflowModel> future;
26+
private final WorkflowPosition position;
27+
28+
A2AExceptionHandler(CompletableFuture<WorkflowModel> future, WorkflowPosition position) {
29+
this.future = future;
30+
this.position = position;
31+
}
32+
33+
@Override
34+
public void accept(Throwable ex) {
35+
future.completeExceptionally(A2AUtils.workflowException(position, ex));
36+
}
37+
}
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.executors.a2a;
17+
18+
import io.serverlessworkflow.impl.TaskContext;
19+
import io.serverlessworkflow.impl.WorkflowContext;
20+
import io.serverlessworkflow.impl.WorkflowModel;
21+
import io.serverlessworkflow.impl.WorkflowValueResolver;
22+
import io.serverlessworkflow.impl.executors.CallableTask;
23+
import java.net.URI;
24+
import java.util.Map;
25+
import java.util.Optional;
26+
import java.util.concurrent.CompletableFuture;
27+
import org.a2aproject.sdk.client.Client;
28+
import org.a2aproject.sdk.client.config.ClientConfig;
29+
import org.a2aproject.sdk.client.http.A2ACardResolver;
30+
import org.a2aproject.sdk.client.transport.jsonrpc.JSONRPCTransport;
31+
import org.a2aproject.sdk.client.transport.jsonrpc.JSONRPCTransportConfig;
32+
import org.a2aproject.sdk.spec.A2AClientException;
33+
import org.slf4j.Logger;
34+
import org.slf4j.LoggerFactory;
35+
36+
class A2AExecutor implements CallableTask {
37+
38+
private final WorkflowValueResolver<URI> uriSupplier;
39+
private final A2ARequestDispatcher dispatcher;
40+
private final Optional<WorkflowValueResolver<Map<String, Object>>> mapResolver;
41+
42+
private static final Logger logger = LoggerFactory.getLogger(A2AExecutor.class);
43+
44+
public A2AExecutor(
45+
WorkflowValueResolver<URI> uriSupplier,
46+
A2ARequestDispatcher dispatcher,
47+
Optional<WorkflowValueResolver<Map<String, Object>>> mapResolver) {
48+
this.uriSupplier = uriSupplier;
49+
this.dispatcher = dispatcher;
50+
this.mapResolver = mapResolver;
51+
}
52+
53+
@Override
54+
public CompletableFuture<WorkflowModel> apply(
55+
WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) {
56+
URI uri = uriSupplier.apply(workflowContext, taskContext, input);
57+
58+
return CompletableFuture.supplyAsync(
59+
() -> {
60+
try {
61+
return A2ACardResolver.builder()
62+
.baseUrl(uri.resolve("/").toString())
63+
.agentCardPath(uri.getPath())
64+
.build()
65+
.getAgentCard();
66+
} catch (A2AClientException ex) {
67+
throw A2AUtils.workflowException(taskContext.position(), ex);
68+
}
69+
},
70+
workflowContext.definition().application().executorService())
71+
.thenCompose(
72+
agentCard -> {
73+
logger.debug("Agent card is {}", agentCard);
74+
try {
75+
return dispatcher.apply(
76+
agentCard,
77+
Client.builder(agentCard)
78+
.clientConfig(new ClientConfig.Builder().build())
79+
.withTransport(JSONRPCTransport.class, new JSONRPCTransportConfig())
80+
.build(),
81+
mapResolver
82+
.map(m -> m.apply(workflowContext, taskContext, input))
83+
.orElse(Map.of()),
84+
workflowContext,
85+
taskContext);
86+
} catch (A2AClientException ex) {
87+
throw A2AUtils.workflowException(taskContext.position(), ex);
88+
}
89+
});
90+
}
91+
}
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.executors.a2a;
17+
18+
import io.serverlessworkflow.api.types.A2AArguments;
19+
import io.serverlessworkflow.api.types.CallA2A;
20+
import io.serverlessworkflow.api.types.Parameters;
21+
import io.serverlessworkflow.api.types.TaskBase;
22+
import io.serverlessworkflow.api.types.WithA2AParameters;
23+
import io.serverlessworkflow.impl.WorkflowDefinition;
24+
import io.serverlessworkflow.impl.WorkflowMutablePosition;
25+
import io.serverlessworkflow.impl.WorkflowUtils;
26+
import io.serverlessworkflow.impl.WorkflowValueResolver;
27+
import io.serverlessworkflow.impl.executors.CallableTaskBuilder;
28+
import io.serverlessworkflow.impl.executors.CallableTaskFactory;
29+
import java.net.URI;
30+
import java.util.Map;
31+
import java.util.Optional;
32+
33+
public class A2AExecutorBuilder implements CallableTaskBuilder<CallA2A> {
34+
35+
@Override
36+
public boolean accept(Class<? extends TaskBase> clazz) {
37+
return CallA2A.class.equals(clazz);
38+
}
39+
40+
@Override
41+
public CallableTaskFactory init(
42+
CallA2A task, WorkflowDefinition definition, WorkflowMutablePosition position) {
43+
A2AArguments args = task.getWith();
44+
45+
WorkflowValueResolver<URI> uriSupplier;
46+
if (args.getServer() != null) {
47+
uriSupplier = definition.resourceLoader().uriSupplier(args.getServer());
48+
} else if (args.getAgentCard() != null) {
49+
uriSupplier = definition.resourceLoader().uriSupplier(args.getAgentCard().getEndpoint());
50+
} else {
51+
throw new IllegalArgumentException("Neither server nor agent card is set for task: " + task);
52+
}
53+
54+
A2ARequestDispatcher dispatcher =
55+
switch (args.getMethod()) {
56+
case MESSAGE_SEND ->
57+
new MessageDispatcher(
58+
(workflowContext, taskContext, completableFuture) ->
59+
new MessageSendConsumer(workflowContext.definition(), completableFuture));
60+
case MESSAGE_STREAM ->
61+
new MessageDispatcher(
62+
(workflowContext, taskContext, completableFuture) ->
63+
new MessageStreamConsumer(
64+
workflowContext.definition(), completableFuture, taskContext.position()));
65+
case TASKS_LIST -> new ListTaskDispatcher();
66+
case TASKS_GET -> new GetTaskDispatcher();
67+
case TASKS_CANCEL -> new CancelTaskDispatcher();
68+
// TODO handle missing cases
69+
case AGENT_GET_AUTHENTICATED_EXTENDED_CARD,
70+
TASKS_PUSH_NOTIFICATION_CONFIG_DELETE,
71+
TASKS_PUSH_NOTIFICATION_CONFIG_GET,
72+
TASKS_PUSH_NOTIFICATION_CONFIG_LIST,
73+
TASKS_PUSH_NOTIFICATION_CONFIG_SET,
74+
TASKS_RESUBSCRIBE ->
75+
throw new UnsupportedOperationException("Unimplemented case: " + args.getMethod());
76+
};
77+
78+
Parameters parameters = args.getParameters();
79+
Optional<WorkflowValueResolver<Map<String, Object>>> mapResolver;
80+
if (parameters == null) {
81+
mapResolver = Optional.empty();
82+
} else {
83+
WithA2AParameters a2aParameters = parameters.getWithA2AParameters();
84+
mapResolver =
85+
Optional.of(
86+
WorkflowUtils.buildMapResolver(
87+
definition.application(),
88+
parameters.getString(),
89+
a2aParameters != null ? a2aParameters.getAdditionalProperties() : null));
90+
}
91+
return () -> new A2AExecutor(uriSupplier, dispatcher, mapResolver);
92+
}
93+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.executors.a2a;
17+
18+
import io.serverlessworkflow.impl.TaskContext;
19+
import io.serverlessworkflow.impl.WorkflowContext;
20+
import io.serverlessworkflow.impl.WorkflowModel;
21+
import java.util.Map;
22+
import java.util.concurrent.CompletableFuture;
23+
import org.a2aproject.sdk.client.Client;
24+
import org.a2aproject.sdk.spec.AgentCard;
25+
26+
@FunctionalInterface
27+
interface A2ARequestDispatcher {
28+
CompletableFuture<WorkflowModel> apply(
29+
AgentCard agentCard,
30+
Client client,
31+
Map<String, Object> parameters,
32+
WorkflowContext workflowContext,
33+
TaskContext taskContext);
34+
}

0 commit comments

Comments
 (0)