Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,4 @@ nbproject/
.serena/
.bob/
claudedocs
backlog/

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

import io.a2a.server.tasks.PushNotificationSender;
import io.a2a.server.tasks.TaskManager;
import io.a2a.server.tasks.TaskPersistenceException;
import io.a2a.server.tasks.TaskSerializationException;
import io.a2a.server.tasks.TaskStore;
import io.a2a.spec.A2AError;
import io.a2a.spec.A2AServerException;
Expand Down Expand Up @@ -41,6 +43,18 @@
* <b>Note:</b> This bean is eagerly initialized by {@link MainEventBusProcessorInitializer}
* to ensure the background thread starts automatically when the application starts.
* </p>
*
* <h2>Exception Handling</h2>
* TaskStore persistence failures are caught and handled gracefully:
* <ul>
* <li>{@link TaskSerializationException} - Data corruption or schema mismatch.
* Logged at ERROR level, distributed as {@link InternalError} to clients.</li>
* <li>{@link TaskPersistenceException} - Database/storage system failure.
* Logged at ERROR level, distributed as {@link InternalError} to clients.</li>
* </ul>
*
* <p>Processing continues after errors - the failed event is distributed as InternalError
* to all ChildQueues, and the MainEventBusProcessor continues consuming subsequent events.</p>
*/
@ApplicationScoped
public class MainEventBusProcessor implements Runnable {
Expand Down Expand Up @@ -293,11 +307,26 @@ private boolean updateTaskStore(String taskId, Event event, boolean isReplicated
LOGGER.debug("TaskStore updated via TaskManager.process() for task {}: {} (final: {}, replicated: {})",
taskId, event.getClass().getSimpleName(), isFinal, isReplicated);
return isFinal;

} catch (TaskSerializationException e) {
// Data corruption or schema mismatch - ALWAYS permanent
LOGGER.error("Task {} event serialization failed - data corruption detected: {}",
taskId, e.getMessage(), e);
throw new InternalError("Failed to serialize task " + taskId + ": " + e.getMessage());

} catch (TaskPersistenceException e) {
// Database/storage failure
LOGGER.error("Task {} event persistence failed: {}", taskId, e.getMessage(), e);
throw new InternalError("Storage failure for task " + taskId + ": " + e.getMessage());

} catch (InternalError e) {
// Already an InternalError from TaskManager validation - pass through
LOGGER.error("Error updating TaskStore via TaskManager for task {}", taskId, e);
// Rethrow to prevent distributing unpersisted event to clients
throw e;

} catch (Exception e) {
// Unexpected exception type - treat as permanent failure
LOGGER.error("Unexpected error updating TaskStore for task {}", taskId, e);
// Rethrow to prevent distributing unpersisted event to clients
throw new InternalError("TaskStore persistence failed: " + e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,62 @@
* <p>
* This is the default TaskStore used when no other implementation is provided.
* </p>
*
* <h2>Exception Behavior</h2>
* InMemoryTaskStore has minimal exception scenarios compared to database-backed implementations:
* <ul>
* <li><b>No TaskSerializationException:</b> Task objects are stored directly in memory without
* serialization. No JSON parsing or schema compatibility issues can occur.</li>
* <li><b>No TaskPersistenceException:</b> ConcurrentHashMap operations do not involve I/O,
* network, or transactional concerns. Standard put/get/remove operations are guaranteed
* to succeed under normal JVM operation.</li>
* <li><b>OutOfMemoryError (potential):</b> The only failure scenario is JVM heap exhaustion if
* too many tasks are stored. This is an {@link Error} (not Exception) and indicates a fatal
* system condition requiring JVM restart and capacity planning.</li>
* </ul>
*
* <h3>Design Rationale</h3>
* This implementation intentionally does NOT throw {@link TaskStoreException} or its subclasses
* because:
* <ul>
* <li>No serialization step exists - tasks stored as Java objects</li>
* <li>No I/O or network operations that can fail</li>
* <li>ConcurrentHashMap guarantees thread-safe operations without checked exceptions</li>
* <li>Memory exhaustion (OutOfMemoryError) is an unrecoverable system failure</li>
* </ul>
*
* <h3>Comparison to Database Implementations</h3>
* Database-backed implementations (e.g., JpaDatabaseTaskStore) throw exceptions for:
* <ul>
* <li>Serialization errors (JSON parsing, schema mismatches)</li>
* <li>Connection failures (network, timeouts)</li>
* <li>Transaction failures (deadlocks, constraint violations)</li>
* <li>Capacity issues (disk full, quota exceeded)</li>
* </ul>
* InMemoryTaskStore avoids all of these by operating entirely in-process.
*
* <h3>Memory Management Considerations</h3>
* Callers should monitor memory usage and implement task cleanup policies:
* <pre>{@code
* // Example: Delete finalized tasks older than 48 hours
* ListTasksParams params = new ListTasksParams.Builder()
* .statusTimestampBefore(Instant.now().minus(Duration.ofHours(48)))
* .build();
*
* List<Task> oldTasks = taskStore.list(params).tasks();
* oldTasks.stream()
* .filter(task -> task.status().state().isFinal())
* .forEach(task -> taskStore.delete(task.id()));
* }</pre>
*
* <h3>Thread Safety</h3>
* All operations are thread-safe via {@link ConcurrentHashMap}. Multiple threads can
* concurrently save, get, list, and delete tasks without synchronization. Last-write-wins
* semantics apply for concurrent {@code save()} calls to the same task ID.
*
* @see TaskStore for interface contract and exception documentation
* @see TaskStoreException for exception hierarchy (not thrown by this implementation)
* @see TaskStateProvider for queue lifecycle integration
*/
@ApplicationScoped
public class InMemoryTaskStore implements TaskStore, TaskStateProvider {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@
* Implementations should handle errors gracefully:
* <ul>
* <li>Log failures but don't throw exceptions (notifications are best-effort)</li>
* <li>Consider retry logic for transient failures</li>
* <li>Don't block on network I/O - execute asynchronously if needed</li>
* <li>Circuit breaker patterns for repeatedly failing endpoints</li>
* </ul>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package io.a2a.server.tasks;

import org.jspecify.annotations.Nullable;

/**
* Exception for database/storage system failures during task persistence operations.
* <p>
* Indicates failures in the underlying storage system (database, filesystem, etc.) rather
* than data format issues.
*
* <h2>Common Scenarios</h2>
* <ul>
* <li>Database connection timeout or network partition</li>
* <li>Transaction deadlock or lock wait timeout</li>
* <li>Connection pool exhausted</li>
* <li>Disk full / storage quota exceeded</li>
* <li>Database constraint violations (unique key, foreign key)</li>
* <li>Insufficient permissions or authentication failures</li>
* <li>Database schema incompatibilities</li>
* </ul>
*
* <h2>Usage Example</h2>
* <pre>{@code
* try {
* em.merge(jpaTask);
* } catch (PersistenceException e) {
* throw new TaskPersistenceException(taskId, "Database save failed", e);
* }
* }</pre>
*
* @see TaskStoreException
* @see TaskSerializationException for data format errors
*/
public class TaskPersistenceException extends TaskStoreException {

/**
* Creates a new TaskPersistenceException with no message or cause.
*/
public TaskPersistenceException() {
super();
}

/**
* Creates a new TaskPersistenceException with the specified message.
*
* @param msg the exception message
*/
public TaskPersistenceException(final String msg) {
super(msg);
}

/**
* Creates a new TaskPersistenceException with the specified cause.
*
* @param cause the underlying cause
*/
public TaskPersistenceException(final Throwable cause) {
super(cause);
}

/**
* Creates a new TaskPersistenceException with the specified message and cause.
*
* @param msg the exception message
* @param cause the underlying cause
*/
public TaskPersistenceException(final String msg, final Throwable cause) {
super(msg, cause);
}

/**
* Creates a new TaskPersistenceException with the specified task ID and message.
*
* @param taskId the task identifier (may be null for operations not tied to a specific task)
* @param msg the exception message
*/
public TaskPersistenceException(@Nullable final String taskId, final String msg) {
super(taskId, msg);
}

/**
* Creates a new TaskPersistenceException with the specified task ID, message, and cause.
*
* @param taskId the task identifier (may be null for operations not tied to a specific task)
* @param msg the exception message
* @param cause the underlying cause
*/
public TaskPersistenceException(@Nullable final String taskId, final String msg, final Throwable cause) {
super(taskId, msg, cause);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package io.a2a.server.tasks;

import org.jspecify.annotations.Nullable;

/**
* Exception for task serialization/deserialization failures.
* <p>
* Indicates failures converting between Task domain objects and persistent storage format (JSON).
*
* <h2>Common Scenarios</h2>
* <ul>
* <li>JSON parsing errors during {@code get()} operations</li>
* <li>JSON serialization errors during {@code save()} operations</li>
* <li>Invalid enum values or missing required fields</li>
* <li>Data format version mismatches after upgrades</li>
* </ul>
*
* <h2>Usage Example</h2>
* <pre>{@code
* try {
* Task task = jsonMapper.readValue(json, Task.class);
* } catch (JsonProcessingException e) {
* throw new TaskSerializationException(taskId, "Failed to deserialize task", e);
* }
* }</pre>
*
* @see TaskStoreException
* @see TaskPersistenceException for database failures
*/
public class TaskSerializationException extends TaskStoreException {

/**
* Creates a new TaskSerializationException with no message or cause.
*/
public TaskSerializationException() {
super();
}

/**
* Creates a new TaskSerializationException with the specified message.
*
* @param msg the exception message
*/
public TaskSerializationException(final String msg) {
super(msg);
}

/**
* Creates a new TaskSerializationException with the specified cause.
*
* @param cause the underlying cause
*/
public TaskSerializationException(final Throwable cause) {
super(cause);
}

/**
* Creates a new TaskSerializationException with the specified message and cause.
*
* @param msg the exception message
* @param cause the underlying cause
*/
public TaskSerializationException(final String msg, final Throwable cause) {
super(msg, cause);
}

/**
* Creates a new TaskSerializationException with the specified task ID and message.
*
* @param taskId the task identifier (may be null for operations not tied to a specific task)
* @param msg the exception message
*/
public TaskSerializationException(@Nullable final String taskId, final String msg) {
super(taskId, msg);
}

/**
* Creates a new TaskSerializationException with the specified task ID, message, and cause.
*
* @param taskId the task identifier (may be null for operations not tied to a specific task)
* @param msg the exception message
* @param cause the underlying cause
*/
public TaskSerializationException(@Nullable final String taskId, final String msg, final Throwable cause) {
super(taskId, msg, cause);
}
}
Loading