diff --git a/reference/jsonrpc/src/main/java/io/a2a/server/apps/quarkus/A2AServerRoutes.java b/reference/jsonrpc/src/main/java/io/a2a/server/apps/quarkus/A2AServerRoutes.java index 971a0cc77..0eb021664 100644 --- a/reference/jsonrpc/src/main/java/io/a2a/server/apps/quarkus/A2AServerRoutes.java +++ b/reference/jsonrpc/src/main/java/io/a2a/server/apps/quarkus/A2AServerRoutes.java @@ -79,6 +79,93 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * Quarkus routing configuration for JSON-RPC A2A protocol requests. + * + *

This class defines Vert.x Web routes for handling JSON-RPC 2.0 requests over HTTP, + * processing them through the {@link JSONRPCHandler}, and returning responses in either + * standard JSON or Server-Sent Events (SSE) format for streaming operations. + * + *

Request Flow

+ *
+ * HTTP POST / → invokeJSONRPCHandler()
+ *     ↓
+ * Parse JSON-RPC request body
+ *     ↓
+ * Route to handler method (blocking or streaming)
+ *     ↓
+ * JSONRPCHandler → RequestHandler → AgentExecutor
+ *     ↓
+ * Response (JSON or SSE stream)
+ * 
+ * + *

Supported Operations

+ *

Non-Streaming (JSON responses): + *

+ * + *

Streaming (SSE responses): + *

+ * + *

JSON-RPC Request Format

+ *
{@code
+ * POST /
+ * Content-Type: application/json
+ *
+ * {
+ *   "jsonrpc": "2.0",
+ *   "id": "req-123",
+ *   "method": "sendMessage",
+ *   "params": {
+ *     "message": {
+ *       "parts": [{"text": "Hello"}]
+ *     }
+ *   }
+ * }
+ * }
+ * + *

Error Handling

+ *

Errors are mapped to JSON-RPC 2.0 error responses: + *

+ * + *

CDI Integration

+ *

This class is a CDI {@code @Singleton} that automatically wires: + *

+ * + *

Multi-Tenancy Support

+ *

Tenant identification is extracted from the request path: + *

+ * + * @see JSONRPCHandler + * @see CallContextFactory + * @see MultiSseSupport + */ @Singleton public class A2AServerRoutes { @@ -96,6 +183,78 @@ public class A2AServerRoutes { @Inject Instance callContextFactory; + /** + * Main entry point for all JSON-RPC requests. + * + *

This route handler processes JSON-RPC 2.0 requests, dispatches them to the appropriate + * handler method based on the request type (streaming vs non-streaming), and returns either + * a JSON response or an SSE stream. + * + *

Request Format: + *

{@code
+     * POST /[tenant]
+     * Content-Type: application/json
+     *
+     * {
+     *   "jsonrpc": "2.0",
+     *   "id": "req-123",
+     *   "method": "sendMessage",
+     *   "params": { ... }
+     * }
+     * }
+ * + *

Non-Streaming Response: + *

{@code
+     * HTTP/1.1 200 OK
+     * Content-Type: application/json
+     *
+     * {
+     *   "jsonrpc": "2.0",
+     *   "id": "req-123",
+     *   "result": { ... }
+     * }
+     * }
+ * + *

Streaming Response (SSE): + *

{@code
+     * HTTP/1.1 200 OK
+     * Content-Type: text/event-stream
+     *
+     * id: 0
+     * data: {"jsonrpc":"2.0","id":"req-123","result":{...}}
+     *
+     * id: 1
+     * data: {"jsonrpc":"2.0","id":"req-123","result":{...}}
+     * }
+ * + *

Error Response: + *

{@code
+     * HTTP/1.1 200 OK
+     * Content-Type: application/json
+     *
+     * {
+     *   "jsonrpc": "2.0",
+     *   "id": "req-123",
+     *   "error": {
+     *     "code": -32602,
+     *     "message": "Invalid params"
+     *   }
+     * }
+     * }
+ * + *

Processing Flow: + *

    + *
  1. Parse JSON-RPC request body using {@link JSONRPCUtils#parseRequestBody}
  2. + *
  3. Create {@link ServerCallContext} from routing context
  4. + *
  5. Route to streaming or non-streaming handler
  6. + *
  7. Handle errors with appropriate JSON-RPC error codes
  8. + *
  9. Return JSON response or start SSE stream
  10. + *
+ * + * @param body the raw JSON-RPC request body + * @param rc the Vert.x routing context containing HTTP request/response + * @throws A2AError if request processing fails + */ @Route(path = "/", methods = {Route.HttpMethod.POST}, consumes = {APPLICATION_JSON}, type = Route.HandlerType.BLOCKING) @Authenticated public void invokeJSONRPCHandler(@Body String body, RoutingContext rc) { @@ -157,17 +316,64 @@ public void invokeJSONRPCHandler(@Body String body, RoutingContext rc) { } /** - * /** - * Handles incoming GET requests to the agent card endpoint. - * Returns the agent card in JSON format. + * Handles GET requests to the agent card endpoint. + * + *

Returns the agent's capabilities and metadata in JSON format according to the + * A2A protocol specification. This endpoint is publicly accessible (no authentication). + * + *

Request: + *

{@code
+     * GET /.well-known/agent-card.json
+     * }
+ * + *

Response: + *

{@code
+     * HTTP/1.1 200 OK
+     * Content-Type: application/json
      *
-     * @return the agent card
+     * {
+     *   "name": "My Agent",
+     *   "description": "Agent description",
+     *   "capabilities": {
+     *     "streaming": true,
+     *     "pushNotifications": false
+     *   },
+     *   ...
+     * }
+     * }
+ * + * @return the agent card as a JSON string + * @throws JsonProcessingException if serialization fails + * @see JSONRPCHandler#getAgentCard() */ @Route(path = "/.well-known/agent-card.json", methods = Route.HttpMethod.GET, produces = APPLICATION_JSON) public String getAgentCard() throws JsonProcessingException { return JsonUtil.toJson(jsonRpcHandler.getAgentCard()); } + /** + * Routes non-streaming JSON-RPC requests to the appropriate handler method. + * + *

This method uses pattern matching to dispatch requests based on their type, + * invoking the corresponding handler method in {@link JSONRPCHandler}. + * + *

Supported Request Types: + *

+ * + * @param request the non-streaming JSON-RPC request + * @param context the server call context + * @return the JSON-RPC response + */ private A2AResponse processNonStreamingRequest(NonStreamingJSONRPCRequest request, ServerCallContext context) { if (request instanceof GetTaskRequest req) { return jsonRpcHandler.onGetTask(req, context); @@ -199,6 +405,22 @@ private A2AResponse processNonStreamingRequest(NonStreamingJSONRPCRequest return generateErrorResponse(request, new UnsupportedOperationError()); } + /** + * Routes streaming JSON-RPC requests to the appropriate handler method. + * + *

This method dispatches streaming requests to handlers that return + * {@link Flow.Publisher} of responses, which are then converted to SSE streams. + * + *

Supported Request Types: + *

+ * + * @param request the streaming JSON-RPC request + * @param context the server call context + * @return a Multi stream of JSON-RPC responses + */ private Multi> processStreamingRequest( A2ARequest request, ServerCallContext context) { Flow.Publisher> publisher; @@ -212,14 +434,55 @@ private Multi> processStreamingRequest( return Multi.createFrom().publisher(publisher); } + /** + * Generates a JSON-RPC error response for the given request and error. + * + * @param request the original request + * @param error the A2A error to include in the response + * @return a JSON-RPC error response + */ private A2AResponse generateErrorResponse(A2ARequest request, A2AError error) { return new A2AErrorResponse(request.getId(), error); } + /** + * Sets a callback to be invoked when SSE streaming subscription starts. + * + *

This is a testing hook used to synchronize test execution with streaming setup. + * In production, this remains null. + * + * @param runnable the callback to invoke on subscription + */ static void setStreamingMultiSseSupportSubscribedRunnable(Runnable runnable) { streamingMultiSseSupportSubscribedRunnable = runnable; } + /** + * Creates a {@link ServerCallContext} from the Vert.x routing context. + * + *

This method extracts authentication, headers, tenant, and protocol information + * from the HTTP request and packages them into a context object for use by the + * request handler and agent executor. + * + *

Default Context Creation: + *

If no {@link CallContextFactory} CDI bean is provided, creates a context with: + *

+ * + *

Custom Context Creation: + *

If a {@link CallContextFactory} bean is present, delegates to + * {@link CallContextFactory#build(RoutingContext)} for custom context creation. + * + * @param rc the Vert.x routing context + * @return the server call context + * @see CallContextFactory + */ private ServerCallContext createCallContext(RoutingContext rc) { if (callContextFactory.isUnsatisfied()) { User user; @@ -264,6 +527,21 @@ public String getUsername() { } } + /** + * Extracts the tenant identifier from the request path. + * + *

The tenant is determined by the normalized path, with leading and trailing + * slashes stripped: + *

+ * + * @param rc the routing context + * @return the tenant identifier, or empty string if no tenant in path + */ private String extractTenant(RoutingContext rc) { String tenantPath = rc.normalizedPath(); if (tenantPath == null || tenantPath.isBlank()) { @@ -278,6 +556,39 @@ private String extractTenant(RoutingContext rc) { return tenantPath; } + /** + * Serializes a JSON-RPC response to a JSON string. + * + *

This method handles both success and error responses, converting domain objects + * to protobuf messages before JSON serialization for consistency with the gRPC transport. + * + *

Success Response Format: + *

{@code
+     * {
+     *   "jsonrpc": "2.0",
+     *   "id": "req-123",
+     *   "result": { ... }
+     * }
+     * }
+ * + *

Error Response Format: + *

{@code
+     * {
+     *   "jsonrpc": "2.0",
+     *   "id": "req-123",
+     *   "error": {
+     *     "code": -32602,
+     *     "message": "Invalid params",
+     *     "data": { ... }
+     *   }
+     * }
+     * }
+ * + * @param response the response to serialize + * @return the JSON string + * @see JSONRPCUtils#toJsonRPCResultResponse + * @see JSONRPCUtils#toJsonRPCErrorResponse + */ private static String serializeResponse(A2AResponse response) { // For error responses, use Jackson serialization (errors are standardized) if (response instanceof A2AErrorResponse error) { @@ -291,6 +602,32 @@ private static String serializeResponse(A2AResponse response) { return JSONRPCUtils.toJsonRPCResultResponse(response.getId(), protoMessage); } + /** + * Converts a domain response object to its protobuf representation. + * + *

This method maps response types to their corresponding protobuf messages + * using {@link io.a2a.grpc.utils.ProtoUtils}, ensuring consistent serialization + * across all transports (JSON-RPC, gRPC, REST). + * + *

Supported Response Types: + *

+ * + * @param response the domain response object + * @return the protobuf message representation + * @throws IllegalArgumentException if the response type is unknown + * @see io.a2a.grpc.utils.ProtoUtils + */ private static com.google.protobuf.MessageOrBuilder convertToProto(A2AResponse response) { if (response instanceof GetTaskResponse r) { return io.a2a.grpc.utils.ProtoUtils.ToProto.task(r.getResult()); @@ -319,10 +656,45 @@ private static com.google.protobuf.MessageOrBuilder convertToProto(A2AResponse - * This class only handles HTTP-specific concerns (writing to response, backpressure, disconnect). - * SSE formatting and JSON serialization are handled by {@link SseFormatter}. + * Server-Sent Events (SSE) support for streaming JSON-RPC responses. + * + *

This class handles the HTTP-specific aspects of SSE streaming, including: + *

+ * + *

SSE Format

+ *

Events are written in Server-Sent Events format: + *

+     * id: 0
+     * data: {"jsonrpc":"2.0","id":"req-123","result":{...}}
+     *
+     * id: 1
+     * data: {"jsonrpc":"2.0","id":"req-123","result":{...}}
+     * 
+ * + *

Backpressure Handling

+ *

Uses reactive streams subscription to handle backpressure: + *

    + *
  1. Request 1 event from upstream
  2. + *
  3. Write event to HTTP response
  4. + *
  5. Wait for write completion
  6. + *
  7. Request next event (backpressure)
  8. + *
+ * + *

Disconnect Handling

+ *

When the client disconnects: + *

    + *
  1. Vert.x closeHandler fires
  2. + *
  3. Invokes {@link ServerCallContext#invokeEventConsumerCancelCallback()}
  4. + *
  5. Cancels upstream subscription
  6. + *
  7. Stops event polling
  8. + *
+ * + * @see SseFormatter */ private static class MultiSseSupport { private static final Logger logger = LoggerFactory.getLogger(MultiSseSupport.class); @@ -332,11 +704,35 @@ private MultiSseSupport() { } /** - * Write SSE-formatted strings to HTTP response. + * Writes SSE-formatted strings to the HTTP response with backpressure handling. + * + *

This method subscribes to the SSE event stream and writes each event to the + * HTTP response, managing backpressure through the reactive streams subscription. + * + *

Subscription Flow: + *

    + *
  1. Subscribe to SSE stream
  2. + *
  3. Register disconnect handler
  4. + *
  5. Request first event
  6. + *
  7. Write event to response
  8. + *
  9. Wait for write completion
  10. + *
  11. Request next event (backpressure)
  12. + *
+ * + *

HTTP Headers: + *

Sets {@code Content-Type: text/event-stream} on first write + * + *

Error Handling: + *

* - * @param sseStrings Multi stream of SSE-formatted strings (from SseFormatter) - * @param rc Vert.x routing context - * @param context A2A server call context (for EventConsumer cancellation) + * @param sseStrings Multi stream of SSE-formatted strings from {@link SseFormatter} + * @param rc the Vert.x routing context + * @param context the A2A server call context (for EventConsumer cancellation) + * @see SseFormatter#formatResponseAsSSE */ public static void writeSseStrings(Multi sseStrings, RoutingContext rc, ServerCallContext context) { HttpServerResponse response = rc.response(); diff --git a/reference/jsonrpc/src/main/java/io/a2a/server/apps/quarkus/CallContextFactory.java b/reference/jsonrpc/src/main/java/io/a2a/server/apps/quarkus/CallContextFactory.java index d40bc65f0..64b98c962 100644 --- a/reference/jsonrpc/src/main/java/io/a2a/server/apps/quarkus/CallContextFactory.java +++ b/reference/jsonrpc/src/main/java/io/a2a/server/apps/quarkus/CallContextFactory.java @@ -3,6 +3,70 @@ import io.a2a.server.ServerCallContext; import io.vertx.ext.web.RoutingContext; +/** + * Factory interface for creating {@link ServerCallContext} from Vert.x routing context. + * + *

This interface provides an extension point for customizing how {@link ServerCallContext} + * instances are created in Quarkus JSON-RPC applications. The default implementation in + * {@link A2AServerRoutes} extracts standard information (user, headers, tenant, protocol version), + * but applications can provide their own implementation to add custom context data. + * + *

Default Behavior

+ *

When no CDI bean implementing this interface is provided, {@link A2AServerRoutes} + * creates contexts with: + *

+ * + *

Custom Implementation Example

+ *
{@code
+ * @ApplicationScoped
+ * public class CustomCallContextFactory implements CallContextFactory {
+ *     @Override
+ *     public ServerCallContext build(RoutingContext rc) {
+ *         // Extract user from Quarkus security context
+ *         User user = (rc.user() == null) ? UnauthenticatedUser.INSTANCE :
+ *             new User() {
+ *                 public boolean isAuthenticated() { return rc.userContext().authenticated(); }
+ *                 public String getUsername() { return rc.user().subject(); }
+ *             };
+ *
+ *         // Extract custom data from routing context
+ *         String orgId = rc.request().getHeader("X-Organization-ID");
+ *
+ *         Map state = new HashMap<>();
+ *         state.put("organization", orgId);
+ *         state.put("requestId", UUID.randomUUID().toString());
+ *
+ *         // Extract A2A protocol version from header
+ *         String version = rc.request().getHeader(A2AHeaders.X_A2A_VERSION);
+ *
+ *         // Extract requested extensions from header
+ *         List extensionHeaders = rc.request().headers().getAll(A2AHeaders.X_A2A_EXTENSIONS);
+ *         Set extensions = A2AExtensions.getRequestedExtensions(extensionHeaders);
+ *
+ *         return new ServerCallContext(user, state, extensions, version);
+ *     }
+ * }
+ * }
+ * + * @see ServerCallContext + * @see A2AServerRoutes#createCallContext(RoutingContext) + */ public interface CallContextFactory { + /** + * Builds a {@link ServerCallContext} from a Vert.x routing context. + * + *

This method is called for each incoming HTTP request to create the context + * that will be passed to the {@link io.a2a.server.requesthandlers.RequestHandler} + * and eventually to the {@link io.a2a.server.agentexecution.AgentExecutor}. + * + * @param rc the Vert.x routing context containing request data + * @return a new ServerCallContext with extracted authentication, headers, and metadata + */ ServerCallContext build(RoutingContext rc); } diff --git a/reference/jsonrpc/src/main/java/io/a2a/server/apps/quarkus/QuarkusJSONRPCTransportMetadata.java b/reference/jsonrpc/src/main/java/io/a2a/server/apps/quarkus/QuarkusJSONRPCTransportMetadata.java index 5cff0d7f7..02f2bd3f5 100644 --- a/reference/jsonrpc/src/main/java/io/a2a/server/apps/quarkus/QuarkusJSONRPCTransportMetadata.java +++ b/reference/jsonrpc/src/main/java/io/a2a/server/apps/quarkus/QuarkusJSONRPCTransportMetadata.java @@ -3,8 +3,45 @@ import io.a2a.server.TransportMetadata; import io.a2a.spec.TransportProtocol; +/** + * Transport metadata provider for the Quarkus JSON-RPC reference implementation. + * + *

This class identifies the transport protocol used by the JSON-RPC server implementation. + * It is automatically discovered by the A2A server framework through CDI to provide + * protocol-specific metadata to components that need to distinguish between different + * transport implementations. + * + *

CDI Integration

+ *

This bean is automatically registered and can be injected where transport + * protocol information is needed: + *

{@code
+ * @Inject
+ * TransportMetadata transportMetadata;
+ *
+ * public void logProtocol() {
+ *     String protocol = transportMetadata.getTransportProtocol();
+ *     // Returns "jsonrpc" for this implementation
+ * }
+ * }
+ * + *

Use Cases

+ * + * + * @see io.a2a.server.TransportMetadata + * @see io.a2a.spec.TransportProtocol + */ public class QuarkusJSONRPCTransportMetadata implements TransportMetadata { + /** + * Returns the transport protocol identifier for JSON-RPC. + * + * @return the string "jsonrpc" identifying this transport implementation + */ @Override public String getTransportProtocol() { return TransportProtocol.JSONRPC.asString(); diff --git a/reference/jsonrpc/src/main/java/io/a2a/server/apps/quarkus/package-info.java b/reference/jsonrpc/src/main/java/io/a2a/server/apps/quarkus/package-info.java new file mode 100644 index 000000000..bf47f0340 --- /dev/null +++ b/reference/jsonrpc/src/main/java/io/a2a/server/apps/quarkus/package-info.java @@ -0,0 +1,170 @@ +/** + * Quarkus JSON-RPC reference implementation for the A2A protocol. + * + *

This package provides a production-ready JSON-RPC 2.0 server implementation built on + * Quarkus and Vert.x Web, demonstrating best practices for A2A protocol integration. + * + *

Architecture

+ *
+ * HTTP Request (JSON-RPC 2.0)
+ *     ↓
+ * A2AServerRoutes (@Singleton)
+ *     ├─ Parse JSON-RPC request
+ *     ├─ Create ServerCallContext (via CallContextFactory)
+ *     ├─ Route to JSONRPCHandler
+ *     └─ Return JSON or SSE stream
+ *         ↓
+ * JSONRPCHandler (transport layer)
+ *     ↓
+ * DefaultRequestHandler (server-common)
+ *     ↓
+ * AgentExecutor (your implementation)
+ * 
+ * + *

Core Components

+ * + * + *

JSON-RPC Endpoint

+ *

Main Endpoint: {@code POST /[tenant]} + *

Agent Card: {@code GET /.well-known/agent-card.json} + * + *

Request Format

+ *
{@code
+ * POST /
+ * Content-Type: application/json
+ *
+ * {
+ *   "jsonrpc": "2.0",
+ *   "id": "req-123",
+ *   "method": "sendMessage",
+ *   "params": {
+ *     "message": {
+ *       "parts": [{"text": "Hello"}]
+ *     }
+ *   }
+ * }
+ * }
+ * + *

Response Formats

+ * + *

Non-Streaming (JSON): + *

{@code
+ * HTTP/1.1 200 OK
+ * Content-Type: application/json
+ *
+ * {
+ *   "jsonrpc": "2.0",
+ *   "id": "req-123",
+ *   "result": {
+ *     "task": { ... }
+ *   }
+ * }
+ * }
+ * + *

Streaming (SSE): + *

{@code
+ * HTTP/1.1 200 OK
+ * Content-Type: text/event-stream
+ *
+ * id: 0
+ * data: {"jsonrpc":"2.0","id":"req-123","result":{...}}
+ *
+ * id: 1
+ * data: {"jsonrpc":"2.0","id":"req-123","result":{...}}
+ * }
+ * + *

Error Response: + *

{@code
+ * HTTP/1.1 200 OK
+ * Content-Type: application/json
+ *
+ * {
+ *   "jsonrpc": "2.0",
+ *   "id": "req-123",
+ *   "error": {
+ *     "code": -32602,
+ *     "message": "Invalid params"
+ *   }
+ * }
+ * }
+ * + *

Supported Methods

+ * + * + *

Multi-Tenancy

+ *

Tenant identification from request path: + *

+ * + *

Customization

+ * + *

Custom Context Creation: + *

Provide a CDI bean implementing {@link io.a2a.server.apps.quarkus.CallContextFactory CallContextFactory}: + *

{@code
+ * @ApplicationScoped
+ * public class CustomCallContextFactory implements CallContextFactory {
+ *     @Override
+ *     public ServerCallContext build(RoutingContext rc) {
+ *         // Extract user from Quarkus security context
+ *         User user = (rc.user() == null) ? UnauthenticatedUser.INSTANCE :
+ *             new User() {
+ *                 public boolean isAuthenticated() { return rc.userContext().authenticated(); }
+ *                 public String getUsername() { return rc.user().subject(); }
+ *             };
+ *
+ *         // Extract custom data from routing context
+ *         Map state = new HashMap<>();
+ *         state.put("organization", rc.request().getHeader("X-Org-ID"));
+ *
+ *         // Extract A2A protocol version from header
+ *         String version = rc.request().getHeader(A2AHeaders.X_A2A_VERSION);
+ *
+ *         // Extract requested extensions from header
+ *         List extensionHeaders = rc.request().headers().getAll(A2AHeaders.X_A2A_EXTENSIONS);
+ *         Set extensions = A2AExtensions.getRequestedExtensions(extensionHeaders);
+ *
+ *         return new ServerCallContext(user, state, extensions, version);
+ *     }
+ * }
+ * }
+ * + *

Configuration

+ *

No JSON-RPC-specific configuration required. Standard Quarkus HTTP configuration applies: + *

+ * quarkus.http.port=9999
+ * quarkus.http.host=0.0.0.0
+ * 
+ * + *

Authentication

+ *

Uses Quarkus Security with {@code @Authenticated} annotation on routes. + * Configure authentication in {@code application.properties}: + *

+ * quarkus.security.users.embedded.enabled=true
+ * quarkus.security.users.embedded.plain-text=true
+ * quarkus.security.users.embedded.users.alice=password
+ * 
+ * + * @see io.a2a.server.apps.quarkus.A2AServerRoutes + * @see io.a2a.server.apps.quarkus.CallContextFactory + * @see io.a2a.transport.jsonrpc.handler.JSONRPCHandler + */ +package io.a2a.server.apps.quarkus; diff --git a/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/context/JSONRPCContextKeys.java b/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/context/JSONRPCContextKeys.java index fbed22192..963a1493e 100644 --- a/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/context/JSONRPCContextKeys.java +++ b/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/context/JSONRPCContextKeys.java @@ -3,8 +3,20 @@ /** * Shared JSON-RPC context keys for A2A protocol data. * - * These keys provide access to JSON-RPC context information, - * enabling rich context access in service method implementations. + *

These keys provide access to JSON-RPC context information stored in + * {@link io.a2a.server.ServerCallContext}, enabling rich context access + * in service method implementations and middleware. + * + *

Usage Example

+ *
{@code
+ * public void processRequest(ServerCallContext context) {
+ *     String tenant = context.get(JSONRPCContextKeys.TENANT_KEY);
+ *     String method = context.get(JSONRPCContextKeys.METHOD_NAME_KEY);
+ *     Map headers = context.get(JSONRPCContextKeys.HEADERS_KEY);
+ * }
+ * }
+ * + * @see io.a2a.server.ServerCallContext */ public final class JSONRPCContextKeys { diff --git a/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandler.java b/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandler.java index aa5ad4493..8bdfc9203 100644 --- a/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandler.java +++ b/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandler.java @@ -56,6 +56,77 @@ import mutiny.zero.ZeroPublisher; import org.jspecify.annotations.Nullable; +/** + * JSON-RPC 2.0 transport handler for processing A2A protocol requests over HTTP. + * + *

This handler converts JSON-RPC 2.0 requests into A2A protocol operations and + * manages the lifecycle of agent interactions. It supports both blocking request/response + * and streaming responses via Server-Sent Events. + * + *

Request Flow

+ *

JSON-RPC requests flow through this handler to the underlying {@link RequestHandler}, + * which coordinates with the agent executor and event queue system: + *

+ * JSON-RPC Request → JSONRPCHandler → RequestHandler → AgentExecutor
+ *                         ↓               ↓
+ *                   Validation      EventQueue → Response
+ * 
+ * + *

JSON-RPC 2.0 Format

+ *

Requests follow the JSON-RPC 2.0 specification: + *

{@code
+ * {
+ *   "jsonrpc": "2.0",
+ *   "id": "request-123",
+ *   "method": "sendMessage",
+ *   "params": {
+ *     "message": {...}
+ *   }
+ * }
+ * }
+ * + *

Responses include the request ID for correlation: + *

{@code
+ * {
+ *   "jsonrpc": "2.0",
+ *   "id": "request-123",
+ *   "result": {
+ *     "task": {...}
+ *   }
+ * }
+ * }
+ * + *

Supported Operations

+ * + * + *

Error Handling

+ *

All A2A protocol errors are caught and converted to JSON-RPC error responses + * with the error object embedded in the response. Protocol version and required + * extensions are validated before processing requests. + * + *

Streaming Support

+ *

Streaming methods ({@code sendStreamingMessage}, {@code subscribeToTask}) return + * a {@link Flow.Publisher} of JSON-RPC responses, allowing Server-Sent Events delivery + * when used with Quarkus/Vert.x transport layers. + * + *

CDI Integration

+ *

This handler is an {@code @ApplicationScoped} CDI bean that requires: + *

+ * + * @see RequestHandler + * @see io.a2a.server.requesthandlers.DefaultRequestHandler + * @see io.a2a.spec.AgentCard + * @see ServerCallContext + */ @ApplicationScoped public class JSONRPCHandler { @@ -81,6 +152,14 @@ protected JSONRPCHandler() { this.executor = null; } + /** + * Creates a JSON-RPC handler with full CDI injection support. + * + * @param agentCard the public agent card containing agent capabilities + * @param extendedAgentCard optional extended agent card instance + * @param requestHandler the handler for processing A2A requests + * @param executor the executor for asynchronous operations + */ @Inject public JSONRPCHandler(@PublicAgentCard AgentCard agentCard, @Nullable @ExtendedAgentCard Instance extendedAgentCard, RequestHandler requestHandler, @Internal Executor executor) { @@ -93,10 +172,59 @@ public JSONRPCHandler(@PublicAgentCard AgentCard agentCard, @Nullable @ExtendedA AgentCardValidator.validateTransportConfiguration(agentCard); } + /** + * Creates a JSON-RPC handler with basic dependencies. + * + * @param agentCard the agent card containing agent capabilities + * @param requestHandler the handler for processing A2A requests + * @param executor the executor for asynchronous operations + */ public JSONRPCHandler(@PublicAgentCard AgentCard agentCard, RequestHandler requestHandler, Executor executor) { this(agentCard, null, requestHandler, executor); } + /** + * Handles a blocking message send request. + * + *

This method processes a JSON-RPC {@code sendMessage} request containing a message + * to be sent to the agent. The method blocks until the agent produces a terminal event + * or requires authentication/input. + * + *

Example Request: + *

{@code
+     * {
+     *   "jsonrpc": "2.0",
+     *   "id": "msg-123",
+     *   "method": "sendMessage",
+     *   "params": {
+     *     "message": {
+     *       "parts": [{"text": "What is the weather?"}]
+     *     }
+     *   }
+     * }
+     * }
+ * + *

Example Response: + *

{@code
+     * {
+     *   "jsonrpc": "2.0",
+     *   "id": "msg-123",
+     *   "result": {
+     *     "task": {
+     *       "id": "task-456",
+     *       "status": {"state": "COMPLETED"},
+     *       "artifacts": [...]
+     *     }
+     *   }
+     * }
+     * }
+ * + * @param request the JSON-RPC request containing message params + * @param context the server call context containing authentication and metadata + * @return JSON-RPC response with task or message result + * @see #onMessageSendStream(SendStreamingMessageRequest, ServerCallContext) + * @see RequestHandler#onMessageSend(io.a2a.spec.MessageSendParams, ServerCallContext) + */ public SendMessageResponse onMessageSend(SendMessageRequest request, ServerCallContext context) { try { A2AVersionValidator.validateProtocolVersion(agentCard, context); @@ -110,6 +238,42 @@ public SendMessageResponse onMessageSend(SendMessageRequest request, ServerCallC } } + /** + * Handles a streaming message send request. + * + *

This method processes a JSON-RPC {@code sendStreamingMessage} request for streaming + * responses from the agent. The response is returned as a {@link Flow.Publisher} of + * JSON-RPC response objects, allowing Server-Sent Events delivery. + * + *

This method requires the agent card to have {@code capabilities.streaming = true}. + * + *

Example Request: + *

{@code
+     * {
+     *   "jsonrpc": "2.0",
+     *   "id": "stream-123",
+     *   "method": "sendStreamingMessage",
+     *   "params": {
+     *     "message": {
+     *       "parts": [{"text": "Generate a story"}]
+     *     }
+     *   }
+     * }
+     * }
+ * + *

Example Streaming Responses: + *

{@code
+     * {"jsonrpc":"2.0","id":"stream-123","result":{"taskStatusUpdate":{...}}}
+     * {"jsonrpc":"2.0","id":"stream-123","result":{"taskArtifactUpdate":{...}}}
+     * {"jsonrpc":"2.0","id":"stream-123","result":{"taskStatusUpdate":{...}}}
+     * }
+ * + * @param request the JSON-RPC request containing message params + * @param context the server call context containing authentication and metadata + * @return publisher of JSON-RPC response objects containing streaming events + * @see #onMessageSend(SendMessageRequest, ServerCallContext) + * @see RequestHandler#onMessageSendStream(io.a2a.spec.MessageSendParams, ServerCallContext) + */ public Flow.Publisher onMessageSendStream( SendStreamingMessageRequest request, ServerCallContext context) { if (!agentCard.capabilities().streaming()) { @@ -134,6 +298,31 @@ public Flow.Publisher onMessageSendStream( } } + /** + * Handles a task cancellation request. + * + *

Attempts to cancel a running task identified by the task ID in the request params. + * The cancellation request is forwarded to the {@link RequestHandler}, which signals the + * agent executor to stop processing. The agent should transition the task to {@code CANCELED} state. + * + *

Example Request: + *

{@code
+     * {
+     *   "jsonrpc": "2.0",
+     *   "id": "cancel-123",
+     *   "method": "cancelTask",
+     *   "params": {
+     *     "taskId": "task-456"
+     *   }
+     * }
+     * }
+ * + * @param request the JSON-RPC request containing task ID params + * @param context the server call context containing authentication and metadata + * @return JSON-RPC response with the cancelled task + * @see RequestHandler#onCancelTask(io.a2a.spec.TaskIdParams, ServerCallContext) + * @see io.a2a.server.agentexecution.AgentExecutor#cancel + */ public CancelTaskResponse onCancelTask(CancelTaskRequest request, ServerCallContext context) { try { Task task = requestHandler.onCancelTask(request.getParams(), context); @@ -148,6 +337,40 @@ public CancelTaskResponse onCancelTask(CancelTaskRequest request, ServerCallCont } } + /** + * Subscribes to task updates via a streaming connection. + * + *

Creates a stream that delivers real-time updates for an existing task. This allows + * clients to reconnect to ongoing or completed tasks and receive their event history + * and future updates. + * + *

This method requires the agent card to have {@code capabilities.streaming = true}. + * + *

Example Request: + *

{@code
+     * {
+     *   "jsonrpc": "2.0",
+     *   "id": "subscribe-123",
+     *   "method": "subscribeToTask",
+     *   "params": {
+     *     "taskId": "task-456"
+     *   }
+     * }
+     * }
+ * + *

Use Cases: + *

    + *
  • Reconnecting to a task after network interruption
  • + *
  • Monitoring long-running tasks from multiple clients
  • + *
  • Viewing historical events for completed tasks
  • + *
+ * + * @param request the JSON-RPC request containing task ID params + * @param context the server call context containing authentication and metadata + * @return publisher of JSON-RPC response objects containing task updates + * @see RequestHandler#onSubscribeToTask(io.a2a.spec.TaskIdParams, ServerCallContext) + * @see #onMessageSendStream(SendStreamingMessageRequest, ServerCallContext) + */ public Flow.Publisher onSubscribeToTask( SubscribeToTaskRequest request, ServerCallContext context) { if (!agentCard.capabilities().streaming()) { @@ -170,6 +393,30 @@ public Flow.Publisher onSubscribeToTask( } } + /** + * Retrieves a specific push notification configuration. + * + *

Returns the push notification configuration for a task by config ID. + * This method requires the agent card to have {@code capabilities.pushNotifications = true}. + * + *

Example Request: + *

{@code
+     * {
+     *   "jsonrpc": "2.0",
+     *   "id": "get-config-123",
+     *   "method": "getTaskPushNotificationConfig",
+     *   "params": {
+     *     "taskId": "task-456",
+     *     "configId": "config-789"
+     *   }
+     * }
+     * }
+ * + * @param request the JSON-RPC request containing task and config ID params + * @param context the server call context containing authentication and metadata + * @return JSON-RPC response with the configuration + * @see RequestHandler#onGetTaskPushNotificationConfig + */ public GetTaskPushNotificationConfigResponse getPushNotificationConfig( GetTaskPushNotificationConfigRequest request, ServerCallContext context) { if (!agentCard.capabilities().pushNotifications()) { @@ -187,6 +434,31 @@ public GetTaskPushNotificationConfigResponse getPushNotificationConfig( } } + /** + * Creates a push notification configuration for a task. + * + *

Creates a new push notification configuration specifying webhook URL and event filters. + * This method requires the agent card to have {@code capabilities.pushNotifications = true}. + * + *

Example Request: + *

{@code
+     * {
+     *   "jsonrpc": "2.0",
+     *   "id": "create-config-123",
+     *   "method": "setTaskPushNotificationConfig",
+     *   "params": {
+     *     "taskId": "task-456",
+     *     "url": "https://webhook.example.com/notify",
+     *     "events": ["taskStatusUpdate", "taskArtifactUpdate"]
+     *   }
+     * }
+     * }
+ * + * @param request the JSON-RPC request containing push notification config params + * @param context the server call context containing authentication and metadata + * @return JSON-RPC response with the created configuration + * @see RequestHandler#onCreateTaskPushNotificationConfig + */ public CreateTaskPushNotificationConfigResponse setPushNotificationConfig( CreateTaskPushNotificationConfigRequest request, ServerCallContext context) { if (!agentCard.capabilities().pushNotifications()) { @@ -204,6 +476,30 @@ public CreateTaskPushNotificationConfigResponse setPushNotificationConfig( } } + /** + * Retrieves a specific task by ID. + * + *

Returns the complete task object including status, artifacts, and optionally + * task history based on the {@code historyLength} parameter. + * + *

Example Request: + *

{@code
+     * {
+     *   "jsonrpc": "2.0",
+     *   "id": "get-123",
+     *   "method": "getTask",
+     *   "params": {
+     *     "taskId": "task-456",
+     *     "historyLength": 10
+     *   }
+     * }
+     * }
+ * + * @param request the JSON-RPC request containing task query params + * @param context the server call context containing authentication and metadata + * @return JSON-RPC response with the task object + * @see RequestHandler#onGetTask(io.a2a.spec.TaskQueryParams, ServerCallContext) + */ public GetTaskResponse onGetTask(GetTaskRequest request, ServerCallContext context) { try { Task task = requestHandler.onGetTask(request.getParams(), context); @@ -215,6 +511,42 @@ public GetTaskResponse onGetTask(GetTaskRequest request, ServerCallContext conte } } + /** + * Lists tasks with optional filtering and pagination. + * + *

Retrieves a list of tasks with support for filtering by context, status, and timestamp, + * along with pagination controls. + * + *

Example Request: + *

{@code
+     * {
+     *   "jsonrpc": "2.0",
+     *   "id": "list-123",
+     *   "method": "listTasks",
+     *   "params": {
+     *     "status": "COMPLETED",
+     *     "pageSize": 10,
+     *     "includeArtifacts": true
+     *   }
+     * }
+     * }
+ * + *

Query Parameters: + *

    + *
  • {@code contextId} - Filter tasks by conversation context
  • + *
  • {@code status} - Filter by task state
  • + *
  • {@code pageSize} - Maximum tasks to return
  • + *
  • {@code pageToken} - Token for pagination
  • + *
  • {@code historyLength} - Max history entries per task
  • + *
  • {@code statusTimestampAfter} - ISO-8601 timestamp filter
  • + *
  • {@code includeArtifacts} - Include task artifacts
  • + *
+ * + * @param request the JSON-RPC request containing list tasks params + * @param context the server call context containing authentication and metadata + * @return JSON-RPC response with list of tasks + * @see RequestHandler#onListTasks(io.a2a.spec.ListTasksParams, ServerCallContext) + */ public ListTasksResponse onListTasks(ListTasksRequest request, ServerCallContext context) { try { ListTasksResult result = requestHandler.onListTasks(request.getParams(), context); @@ -226,6 +558,30 @@ public ListTasksResponse onListTasks(ListTasksRequest request, ServerCallContext } } + /** + * Lists push notification configurations for a task. + * + *

Returns a paginated list of push notification configurations associated with a task. + * This method requires the agent card to have {@code capabilities.pushNotifications = true}. + * + *

Example Request: + *

{@code
+     * {
+     *   "jsonrpc": "2.0",
+     *   "id": "list-config-123",
+     *   "method": "listTaskPushNotificationConfig",
+     *   "params": {
+     *     "taskId": "task-456",
+     *     "pageSize": 10
+     *   }
+     * }
+     * }
+ * + * @param request the JSON-RPC request containing task ID and pagination params + * @param context the server call context containing authentication and metadata + * @return JSON-RPC response with list of configurations + * @see RequestHandler#onListTaskPushNotificationConfig + */ public ListTaskPushNotificationConfigResponse listPushNotificationConfig( ListTaskPushNotificationConfigRequest request, ServerCallContext context) { if ( !agentCard.capabilities().pushNotifications()) { @@ -243,6 +599,31 @@ public ListTaskPushNotificationConfigResponse listPushNotificationConfig( } } + /** + * Deletes a push notification configuration. + * + *

Removes a push notification configuration by config ID, stopping notifications + * for the associated task. + * This method requires the agent card to have {@code capabilities.pushNotifications = true}. + * + *

Example Request: + *

{@code
+     * {
+     *   "jsonrpc": "2.0",
+     *   "id": "delete-config-123",
+     *   "method": "deleteTaskPushNotificationConfig",
+     *   "params": {
+     *     "taskId": "task-456",
+     *     "configId": "config-789"
+     *   }
+     * }
+     * }
+ * + * @param request the JSON-RPC request containing task and config ID params + * @param context the server call context containing authentication and metadata + * @return JSON-RPC response confirming deletion + * @see RequestHandler#onDeleteTaskPushNotificationConfig + */ public DeleteTaskPushNotificationConfigResponse deletePushNotificationConfig( DeleteTaskPushNotificationConfigRequest request, ServerCallContext context) { if ( !agentCard.capabilities().pushNotifications()) { @@ -259,6 +640,29 @@ public DeleteTaskPushNotificationConfigResponse deletePushNotificationConfig( } } + /** + * Retrieves the extended agent card if configured. + * + *

The extended agent card provides additional metadata beyond the public agent card, + * such as tenant-specific configurations or private capabilities. This endpoint requires + * the agent card to have {@code capabilities.extendedAgentCard = true} and a CDI-produced + * {@code @ExtendedAgentCard} instance. + * + *

Example Request: + *

{@code
+     * {
+     *   "jsonrpc": "2.0",
+     *   "id": "ext-card-123",
+     *   "method": "getExtendedAgentCard",
+     *   "params": {}
+     * }
+     * }
+ * + * @param request the JSON-RPC request for extended agent card + * @param context the server call context containing authentication and metadata + * @return JSON-RPC response with the extended agent card + * @see #getAgentCard() + */ // TODO: Add authentication (https://github.com/a2aproject/a2a-java/issues/77) public GetExtendedAgentCardResponse onGetExtendedCardRequest( GetExtendedAgentCardRequest request, ServerCallContext context) { @@ -275,6 +679,16 @@ public GetExtendedAgentCardResponse onGetExtendedCardRequest( } } + /** + * Returns the public agent card. + * + *

The agent card is a self-describing manifest that provides essential metadata about + * the agent, including its capabilities, supported skills, communication methods, and + * security requirements. + * + * @return the public agent card + * @see AgentCard + */ public AgentCard getAgentCard() { return agentCard; } diff --git a/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/handler/package-info.java b/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/handler/package-info.java index 20b7a9dde..306e94d86 100644 --- a/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/handler/package-info.java +++ b/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/handler/package-info.java @@ -1,3 +1,34 @@ +/** + * JSON-RPC transport handler implementations for the A2A protocol. + * + *

This package contains the core JSON-RPC handler that processes JSON-RPC 2.0 requests + * over HTTP and translates them to A2A protocol operations. It supports both blocking and + * streaming responses with proper JSON-RPC error handling. + * + *

JSON-RPC 2.0 Protocol

+ *

This implementation follows the JSON-RPC 2.0 specification, supporting: + *

    + *
  • Request/response pairs with unique request IDs
  • + *
  • Error responses with code, message, and optional data
  • + *
  • Streaming via Server-Sent Events for applicable methods
  • + *
+ * + *

Supported Methods

+ *
    + *
  • {@code sendMessage} - Send message (blocking)
  • + *
  • {@code sendStreamingMessage} - Send message (streaming)
  • + *
  • {@code subscribeToTask} - Subscribe to task updates (streaming)
  • + *
  • {@code getTask} - Get task by ID
  • + *
  • {@code listTasks} - List tasks with filtering
  • + *
  • {@code cancelTask} - Cancel task execution
  • + *
  • {@code getTaskPushNotificationConfig} - Get push notification config
  • + *
  • {@code setTaskPushNotificationConfig} - Create push notification config
  • + *
  • {@code listTaskPushNotificationConfig} - List push notification configs
  • + *
  • {@code deleteTaskPushNotificationConfig} - Delete push notification config
  • + *
+ * + * @see io.a2a.transport.jsonrpc.handler.JSONRPCHandler + */ @NullMarked package io.a2a.transport.jsonrpc.handler;