From 56a4b4ba319d6a3fecbdd269599153a1187f9480 Mon Sep 17 00:00:00 2001
From: Kabir Khan Transient Errors (return true):
+ *
+ *
+ *
+ * Non-Transient Errors (return false):
+ *
+ *
+ *
+ * @param e the persistence exception to analyze
+ * @return true if retry may succeed, false if manual intervention required
+ */
+ private static boolean isTransientDatabaseError(PersistenceException e) {
+ // Check exception class hierarchy for transient indicators
+ Throwable cause = e;
+ while (cause != null) {
+ String className = cause.getClass().getName();
+ String message = cause.getMessage() != null ? cause.getMessage().toLowerCase() : "";
+
+ // Transient exception types
+ if (className.contains("Timeout") ||
+ className.contains("LockTimeout") ||
+ className.contains("QueryTimeout")) {
+ return true;
+ }
+
+ // Transient message patterns
+ if (message.contains("timeout") ||
+ message.contains("deadlock") ||
+ message.contains("connection") && (message.contains("refused") || message.contains("closed")) ||
+ message.contains("pool") && message.contains("exhausted")) {
+ return true;
+ }
+
+ // Non-transient indicators
+ if (className.contains("ConstraintViolation") ||
+ className.contains("IntegrityConstraint") ||
+ message.contains("unique constraint") ||
+ message.contains("foreign key") ||
+ message.contains("disk full") ||
+ message.contains("permission denied")) {
+ return false;
+ }
+
+ cause = cause.getCause();
+ }
+
+ // Default to non-transient for safety (don't retry unless confident)
+ return false;
+ }
+
@Transactional
@Override
public void save(Task task, boolean isReplicated) {
@@ -85,7 +149,13 @@ public void save(Task task, boolean isReplicated) {
}
} catch (JsonProcessingException e) {
LOGGER.error("Failed to serialize task with ID: {}", task.id(), e);
- throw new RuntimeException("Failed to serialize task with ID: " + task.id(), e);
+ throw new TaskSerializationException(task.id(),
+ "Failed to serialize task for persistence", e);
+ } catch (PersistenceException e) {
+ boolean isTransient = isTransientDatabaseError(e);
+ LOGGER.error("Database save failed for task with ID: {} (transient: {})", task.id(), isTransient, e);
+ throw new TaskPersistenceException(task.id(),
+ "Database save failed for task", e, isTransient);
}
}
@@ -93,19 +163,28 @@ public void save(Task task, boolean isReplicated) {
@Override
public Task get(String taskId) {
LOGGER.debug("Retrieving task with ID: {}", taskId);
- JpaTask jpaTask = em.find(JpaTask.class, taskId);
- if (jpaTask == null) {
- LOGGER.debug("Task not found with ID: {}", taskId);
- return null;
- }
-
try {
- Task task = jpaTask.getTask();
- LOGGER.debug("Successfully retrieved task with ID: {}", taskId);
- return task;
- } catch (JsonProcessingException e) {
- LOGGER.error("Failed to deserialize task with ID: {}", taskId, e);
- throw new RuntimeException("Failed to deserialize task with ID: " + taskId, e);
+ JpaTask jpaTask = em.find(JpaTask.class, taskId);
+ if (jpaTask == null) {
+ LOGGER.debug("Task not found with ID: {}", taskId);
+ return null;
+ }
+
+ try {
+ Task task = jpaTask.getTask();
+ LOGGER.debug("Successfully retrieved task with ID: {}", taskId);
+ return task;
+ } catch (JsonProcessingException e) {
+ LOGGER.error("Failed to deserialize task with ID: {}", taskId, e);
+ throw new TaskSerializationException(taskId,
+ "Failed to deserialize task from database", e);
+ }
+
+ } catch (PersistenceException e) {
+ boolean isTransient = isTransientDatabaseError(e);
+ LOGGER.error("Database retrieval failed for task with ID: {} (transient: {})", taskId, isTransient, e);
+ throw new TaskPersistenceException(taskId,
+ "Database retrieval failed for task", e, isTransient);
}
}
@@ -113,12 +192,19 @@ public Task get(String taskId) {
@Override
public void delete(String taskId) {
LOGGER.debug("Deleting task with ID: {}", taskId);
- JpaTask jpaTask = em.find(JpaTask.class, taskId);
- if (jpaTask != null) {
- em.remove(jpaTask);
- LOGGER.debug("Successfully deleted task with ID: {}", taskId);
- } else {
- LOGGER.debug("Task not found for deletion with ID: {}", taskId);
+ try {
+ JpaTask jpaTask = em.find(JpaTask.class, taskId);
+ if (jpaTask != null) {
+ em.remove(jpaTask);
+ LOGGER.debug("Successfully deleted task with ID: {}", taskId);
+ } else {
+ LOGGER.debug("Task not found for deletion with ID: {}", taskId);
+ }
+ } catch (PersistenceException e) {
+ boolean isTransient = isTransientDatabaseError(e);
+ LOGGER.error("Database deletion failed for task with ID: {} (transient: {})", taskId, isTransient, e);
+ throw new TaskPersistenceException(taskId,
+ "Database deletion failed for task", e, isTransient);
}
}
@@ -231,14 +317,15 @@ public ListTasksResult list(ListTasksParams params) {
LOGGER.debug("Listing tasks with params: contextId={}, status={}, pageSize={}, pageToken={}",
params.contextId(), params.status(), params.pageSize(), params.pageToken());
- // Parse pageToken once at the beginning
- PageToken pageToken = PageToken.fromString(params.pageToken());
- Instant tokenTimestamp = pageToken != null ? pageToken.timestamp() : null;
- String tokenId = pageToken != null ? pageToken.id() : null;
+ try {
+ // Parse pageToken once at the beginning
+ PageToken pageToken = PageToken.fromString(params.pageToken());
+ Instant tokenTimestamp = pageToken != null ? pageToken.timestamp() : null;
+ String tokenId = pageToken != null ? pageToken.id() : null;
- // Build dynamic JPQL query with WHERE clauses for filtering
- StringBuilder queryBuilder = new StringBuilder("SELECT t FROM JpaTask t WHERE 1=1");
- StringBuilder countQueryBuilder = new StringBuilder("SELECT COUNT(t) FROM JpaTask t WHERE 1=1");
+ // Build dynamic JPQL query with WHERE clauses for filtering
+ StringBuilder queryBuilder = new StringBuilder("SELECT t FROM JpaTask t WHERE 1=1");
+ StringBuilder countQueryBuilder = new StringBuilder("SELECT COUNT(t) FROM JpaTask t WHERE 1=1");
// Apply contextId filter using denormalized column
if (params.contextId() != null) {
@@ -311,16 +398,17 @@ public ListTasksResult list(ListTasksParams params) {
}
int totalSize = countQuery.getSingleResult().intValue();
- // Deserialize tasks from JSON
- List
Processing continues after errors - the failed event is distributed as InternalError + * to all ChildQueues, and the MainEventBusProcessor continues consuming subsequent events.
*/ @ApplicationScoped public class MainEventBusProcessor implements Runnable { @@ -293,11 +308,35 @@ private boolean updateTaskStore(String taskId, Event event, boolean isReplicated LOGGER.debug("TaskStore updated via TaskManager.process() for task {}: {} (final: {}, replicated: {})", taskId, event.getClass().getSimpleName(), isFinal, isReplicated); return isFinal; + + } catch (TaskSerializationException e) { + // Data corruption or schema mismatch - ALWAYS permanent + LOGGER.error("Task {} event serialization failed - data corruption detected: {}", + taskId, e.getMessage(), e); + throw new InternalError("Failed to serialize task " + taskId + ": " + e.getMessage()); + + } catch (TaskPersistenceException e) { + // Database failure - check if transient for retry guidance + if (e.isTransient()) { + LOGGER.warn("Task {} event persistence failed (TRANSIENT) - retry may succeed: {}", + taskId, e.getMessage()); + throw new InternalError("Temporary storage failure for task " + taskId + + " (retry may succeed): " + e.getMessage()); + } else { + LOGGER.error("Task {} event persistence failed (PERMANENT) - manual intervention required: {}", + taskId, e.getMessage(), e); + throw new InternalError("Permanent storage failure for task " + taskId + + " (requires intervention): " + e.getMessage()); + } + } catch (InternalError e) { + // Already an InternalError from TaskManager validation - pass through LOGGER.error("Error updating TaskStore via TaskManager for task {}", taskId, e); // Rethrow to prevent distributing unpersisted event to clients throw e; + } catch (Exception e) { + // Unexpected exception type - treat as permanent failure LOGGER.error("Unexpected error updating TaskStore for task {}", taskId, e); // Rethrow to prevent distributing unpersisted event to clients throw new InternalError("TaskStore persistence failed: " + e.getMessage()); diff --git a/server-common/src/main/java/io/a2a/server/tasks/InMemoryTaskStore.java b/server-common/src/main/java/io/a2a/server/tasks/InMemoryTaskStore.java index 15f94d7e8..de8774eb8 100644 --- a/server-common/src/main/java/io/a2a/server/tasks/InMemoryTaskStore.java +++ b/server-common/src/main/java/io/a2a/server/tasks/InMemoryTaskStore.java @@ -25,6 +25,63 @@ ** This is the default TaskStore used when no other implementation is provided. *
+ * + *{@code
+ * // Example: Delete finalized tasks older than 48 hours
+ * ListTasksParams params = new ListTasksParams.Builder()
+ * .statusTimestampBefore(Instant.now().minus(Duration.ofHours(48)))
+ * .build();
+ *
+ * List oldTasks = taskStore.list(params).tasks();
+ * oldTasks.stream()
+ * .filter(task -> task.status().state().isFinal())
+ * .forEach(task -> taskStore.delete(task.id()));
+ * }
+ *
+ * + * Indicates failures in the underlying storage system (database, filesystem, etc.) rather + * than data format issues. Includes a {@link #isTransient()} flag to distinguish between + * temporary failures (retry recommended) and permanent failures (manual intervention required). + * + *
{@code
+ * try {
+ * em.merge(jpaTask);
+ * } catch (PersistenceException e) {
+ * boolean transient = isTransientDatabaseError(e);
+ * throw new TaskPersistenceException(taskId, "Database save failed", e, transient);
+ * }
+ * }
+ *
+ * @see TaskStoreException
+ * @see TaskSerializationException for data format errors
+ */
+public class TaskPersistenceException extends TaskStoreException {
+
+ private final boolean isTransientFailure;
+
+ /**
+ * Creates a new TaskPersistenceException with no message or cause.
+ * The failure is assumed to be non-transient.
+ */
+ public TaskPersistenceException() {
+ super();
+ this.isTransientFailure = false;
+ }
+
+ /**
+ * Creates a new TaskPersistenceException with the specified message.
+ * The failure is assumed to be non-transient.
+ *
+ * @param msg the exception message
+ */
+ public TaskPersistenceException(final String msg) {
+ super(msg);
+ this.isTransientFailure = false;
+ }
+
+ /**
+ * Creates a new TaskPersistenceException with the specified cause.
+ * The failure is assumed to be non-transient.
+ *
+ * @param cause the underlying cause
+ */
+ public TaskPersistenceException(final Throwable cause) {
+ super(cause);
+ this.isTransientFailure = false;
+ }
+
+ /**
+ * Creates a new TaskPersistenceException with the specified message and cause.
+ * The failure is assumed to be non-transient.
+ *
+ * @param msg the exception message
+ * @param cause the underlying cause
+ */
+ public TaskPersistenceException(final String msg, final Throwable cause) {
+ super(msg, cause);
+ this.isTransientFailure = false;
+ }
+
+ /**
+ * Creates a new TaskPersistenceException with the specified task ID and message.
+ * The failure is assumed to be non-transient.
+ *
+ * @param taskId the task identifier (may be null for operations not tied to a specific task)
+ * @param msg the exception message
+ */
+ public TaskPersistenceException(@Nullable final String taskId, final String msg) {
+ super(taskId, msg);
+ this.isTransientFailure = false;
+ }
+
+ /**
+ * Creates a new TaskPersistenceException with the specified task ID, message, and cause.
+ * The failure is assumed to be non-transient.
+ *
+ * @param taskId the task identifier (may be null for operations not tied to a specific task)
+ * @param msg the exception message
+ * @param cause the underlying cause
+ */
+ public TaskPersistenceException(@Nullable final String taskId, final String msg, final Throwable cause) {
+ super(taskId, msg, cause);
+ this.isTransientFailure = false;
+ }
+
+ /**
+ * Creates a new TaskPersistenceException with the specified message and transient flag.
+ *
+ * @param msg the exception message
+ * @param isTransient true if the failure is transient and may resolve with retry, false otherwise
+ */
+ public TaskPersistenceException(final String msg, final boolean isTransient) {
+ super(msg);
+ this.isTransientFailure = isTransient;
+ }
+
+ /**
+ * Creates a new TaskPersistenceException with the specified message, cause, and transient flag.
+ *
+ * @param msg the exception message
+ * @param cause the underlying cause
+ * @param isTransient true if the failure is transient and may resolve with retry, false otherwise
+ */
+ public TaskPersistenceException(final String msg, final Throwable cause, final boolean isTransient) {
+ super(msg, cause);
+ this.isTransientFailure = isTransient;
+ }
+
+ /**
+ * Creates a new TaskPersistenceException with the specified task ID, message, and transient flag.
+ *
+ * @param taskId the task identifier (may be null for operations not tied to a specific task)
+ * @param msg the exception message
+ * @param isTransient true if the failure is transient and may resolve with retry, false otherwise
+ */
+ public TaskPersistenceException(@Nullable final String taskId, final String msg, final boolean isTransient) {
+ super(taskId, msg);
+ this.isTransientFailure = isTransient;
+ }
+
+ /**
+ * Creates a new TaskPersistenceException with the specified task ID, message, cause, and transient flag.
+ *
+ * @param taskId the task identifier (may be null for operations not tied to a specific task)
+ * @param msg the exception message
+ * @param cause the underlying cause
+ * @param isTransient true if the failure is transient and may resolve with retry, false otherwise
+ */
+ public TaskPersistenceException(@Nullable final String taskId, final String msg, final Throwable cause,
+ final boolean isTransient) {
+ super(taskId, msg, cause);
+ this.isTransientFailure = isTransient;
+ }
+
+ /**
+ * Indicates whether this failure is transient (retry may help).
+ *
+ * @return true if transient (network, timeout, deadlock), false if permanent (disk full, constraint)
+ */
+ public boolean isTransient() {
+ return isTransientFailure;
+ }
+}
diff --git a/server-common/src/main/java/io/a2a/server/tasks/TaskSerializationException.java b/server-common/src/main/java/io/a2a/server/tasks/TaskSerializationException.java
new file mode 100644
index 000000000..ac960f91f
--- /dev/null
+++ b/server-common/src/main/java/io/a2a/server/tasks/TaskSerializationException.java
@@ -0,0 +1,97 @@
+package io.a2a.server.tasks;
+
+import org.jspecify.annotations.Nullable;
+
+/**
+ * Exception for task serialization/deserialization failures.
+ * + * Indicates failures converting between Task domain objects and persistent storage format (JSON). + * These failures are typically non-transient - they indicate data corruption, schema + * mismatches, or invalid field values that require manual intervention. + * + *
{@code
+ * try {
+ * Task task = jsonMapper.readValue(json, Task.class);
+ * } catch (JsonProcessingException e) {
+ * throw new TaskSerializationException(taskId, "Failed to deserialize task", e);
+ * }
+ * }
+ *
+ * @see TaskStoreException
+ * @see TaskPersistenceException for database failures
+ */
+public class TaskSerializationException extends TaskStoreException {
+
+ /**
+ * Creates a new TaskSerializationException with no message or cause.
+ */
+ public TaskSerializationException() {
+ super();
+ }
+
+ /**
+ * Creates a new TaskSerializationException with the specified message.
+ *
+ * @param msg the exception message
+ */
+ public TaskSerializationException(final String msg) {
+ super(msg);
+ }
+
+ /**
+ * Creates a new TaskSerializationException with the specified cause.
+ *
+ * @param cause the underlying cause
+ */
+ public TaskSerializationException(final Throwable cause) {
+ super(cause);
+ }
+
+ /**
+ * Creates a new TaskSerializationException with the specified message and cause.
+ *
+ * @param msg the exception message
+ * @param cause the underlying cause
+ */
+ public TaskSerializationException(final String msg, final Throwable cause) {
+ super(msg, cause);
+ }
+
+ /**
+ * Creates a new TaskSerializationException with the specified task ID and message.
+ *
+ * @param taskId the task identifier (may be null for operations not tied to a specific task)
+ * @param msg the exception message
+ */
+ public TaskSerializationException(@Nullable final String taskId, final String msg) {
+ super(taskId, msg);
+ }
+
+ /**
+ * Creates a new TaskSerializationException with the specified task ID, message, and cause.
+ *
+ * @param taskId the task identifier (may be null for operations not tied to a specific task)
+ * @param msg the exception message
+ * @param cause the underlying cause
+ */
+ public TaskSerializationException(@Nullable final String taskId, final String msg, final Throwable cause) {
+ super(taskId, msg, cause);
+ }
+}
diff --git a/server-common/src/main/java/io/a2a/server/tasks/TaskStore.java b/server-common/src/main/java/io/a2a/server/tasks/TaskStore.java
index 3df903f77..c0b7eb9ea 100644
--- a/server-common/src/main/java/io/a2a/server/tasks/TaskStore.java
+++ b/server-common/src/main/java/io/a2a/server/tasks/TaskStore.java
@@ -90,10 +90,119 @@
* {@code
+ * @Override
+ * public void save(Task task, boolean isReplicated) {
+ * try {
+ * String json = objectMapper.writeValueAsString(task);
+ * } catch (JsonProcessingException e) {
+ * throw new TaskSerializationException(task.id(), "Failed to serialize task", e);
+ * }
+ *
+ * try {
+ * entityManager.merge(toEntity(json));
+ * } catch (PersistenceException e) {
+ * boolean transient = isTransientDatabaseError(e);
+ * throw new TaskPersistenceException(task.id(), "Database save failed", e, transient);
+ * }
+ * }
+ *
+ * @Override
+ * public Task get(String taskId) {
+ * String json = database.retrieve(taskId);
+ * try {
+ * return objectMapper.readValue(json, Task.class);
+ * } catch (JsonProcessingException e) {
+ * throw new TaskSerializationException(taskId, "Failed to deserialize task", e);
+ * }
+ * }
+ * }
+ *
+ * {@code
+ * try {
+ * taskStore.save(task, false);
+ * } catch (TaskSerializationException e) {
+ * // Non-transient: Log error, notify operations team
+ * logger.error("Task data corruption for {}: {}", e.getTaskId(), e.getMessage(), e);
+ * alerting.sendAlert("Task serialization failure", e);
+ * // DO NOT RETRY - requires manual data repair
+ *
+ * } catch (TaskPersistenceException e) {
+ * if (e.isTransient()) {
+ * // Transient: Retry with exponential backoff
+ * logger.warn("Transient persistence failure for {}: {}", e.getTaskId(), e.getMessage());
+ * retryWithBackoff(() -> taskStore.save(task, false));
+ * } else {
+ * // Non-transient: Log error, alert operations
+ * logger.error("Permanent persistence failure for {}: {}", e.getTaskId(), e.getMessage(), e);
+ * alerting.sendAlert("Database capacity/constraint issue", e);
+ * // DO NOT RETRY - requires manual intervention
+ * }
+ * } catch (TaskStoreException e) {
+ * // Generic fallback - treat as non-transient
+ * logger.error("TaskStore failure for {}: {}", e.getTaskId(), e.getMessage(), e);
+ * // DO NOT RETRY by default
+ * }
+ * }
+ *
+ * + * Root exception for all task storage and retrieval errors. Specialized subclasses + * provide specific failure contexts: + *
+ * Tests verify that TaskStore persistence failures are converted to InternalError events + * and distributed to clients with appropriate logging based on failure type: + *
+ * Provides reusable test patterns for exception construction, field verification, + * and message formatting. Subclasses must implement {@link #createException} methods + * to test specific exception types. + *
+ * This class is designed to be extended by implementation tests (e.g., InMemoryTaskStore tests)
+ * to ensure consistent exception behavior across all TaskStore implementations.
+ *
+ * @param
+ * Tests the exception class for database/storage system failures,
+ * with special focus on the {@code isTransient} flag for retry logic.
+ */
+class TaskPersistenceExceptionTest extends AbstractTaskStoreExceptionTest
+ * Tests the exception class for task serialization/deserialization failures,
+ * verifying all constructor variants and non-transient failure semantics.
+ */
+class TaskSerializationExceptionTest extends AbstractTaskStoreExceptionTest
+ * Tests the base exception class for TaskStore persistence layer failures,
+ * verifying all constructor variants and field behavior.
+ */
+class TaskStoreExceptionTest extends AbstractTaskStoreExceptionTest
* Indicates failures in the underlying storage system (database, filesystem, etc.) rather
- * than data format issues. Includes a {@link #isTransient()} flag to distinguish between
- * temporary failures (retry recommended) and permanent failures (manual intervention required).
- *
- *
+ * Examples of persistence failures:
*
- * Tests the exception class for database/storage system failures,
- * with special focus on the {@code isTransient} flag for retry logic.
+ * Tests the exception class for database/storage system failures.
*/
class TaskPersistenceExceptionTest extends AbstractTaskStoreExceptionTest Processing continues after errors - the failed event is distributed as InternalError
diff --git a/server-common/src/main/java/io/a2a/server/tasks/InMemoryTaskStore.java b/server-common/src/main/java/io/a2a/server/tasks/InMemoryTaskStore.java
index de8774eb8..7c443b1cc 100644
--- a/server-common/src/main/java/io/a2a/server/tasks/InMemoryTaskStore.java
+++ b/server-common/src/main/java/io/a2a/server/tasks/InMemoryTaskStore.java
@@ -46,8 +46,7 @@
*
* Indicates failures in the underlying storage system (database, filesystem, etc.) rather
* than data format issues.
- *
- * Examples of persistence failures:
+ *
+ *
* Indicates failures converting between Task domain objects and persistent storage format (JSON).
- * These failures are typically non-transient - they indicate data corruption, schema
- * mismatches, or invalid field values that require manual intervention.
*
*
* Tests the exception class for task serialization/deserialization failures,
- * verifying all constructor variants and non-transient failure semantics.
+ * verifying all constructor variants.
*/
class TaskSerializationExceptionTest extends AbstractTaskStoreExceptionTestTransient Errors (return true):
- *
- *
- *
- * Non-Transient Errors (return false):
- *
- *
- *
- * @param e the persistence exception to analyze
- * @return true if retry may succeed, false if manual intervention required
- */
- private static boolean isTransientDatabaseError(PersistenceException e) {
- // Check exception class hierarchy for transient indicators
- Throwable cause = e;
- while (cause != null) {
- String className = cause.getClass().getName();
- String message = cause.getMessage() != null ? cause.getMessage().toLowerCase() : "";
-
- // Transient exception types
- if (className.contains("Timeout") ||
- className.contains("LockTimeout") ||
- className.contains("QueryTimeout")) {
- return true;
- }
-
- // Transient message patterns
- if (message.contains("timeout") ||
- message.contains("deadlock") ||
- message.contains("connection") && (message.contains("refused") || message.contains("closed")) ||
- message.contains("pool") && message.contains("exhausted")) {
- return true;
- }
-
- // Non-transient indicators
- if (className.contains("ConstraintViolation") ||
- className.contains("IntegrityConstraint") ||
- message.contains("unique constraint") ||
- message.contains("foreign key") ||
- message.contains("disk full") ||
- message.contains("permission denied")) {
- return false;
- }
-
- cause = cause.getCause();
- }
-
- // Default to non-transient for safety (don't retry unless confident)
- return false;
- }
@Transactional
@Override
@@ -152,10 +92,9 @@ public void save(Task task, boolean isReplicated) {
throw new TaskSerializationException(task.id(),
"Failed to serialize task for persistence", e);
} catch (PersistenceException e) {
- boolean isTransient = isTransientDatabaseError(e);
- LOGGER.error("Database save failed for task with ID: {} (transient: {})", task.id(), isTransient, e);
+ LOGGER.error("Database save failed for task with ID: {}", task.id(), e);
throw new TaskPersistenceException(task.id(),
- "Database save failed for task", e, isTransient);
+ "Database save failed for task", e);
}
}
@@ -181,10 +120,9 @@ public Task get(String taskId) {
}
} catch (PersistenceException e) {
- boolean isTransient = isTransientDatabaseError(e);
- LOGGER.error("Database retrieval failed for task with ID: {} (transient: {})", taskId, isTransient, e);
+ LOGGER.error("Database retrieval failed for task with ID: {}", taskId, e);
throw new TaskPersistenceException(taskId,
- "Database retrieval failed for task", e, isTransient);
+ "Database retrieval failed for task", e);
}
}
@@ -201,10 +139,9 @@ public void delete(String taskId) {
LOGGER.debug("Task not found for deletion with ID: {}", taskId);
}
} catch (PersistenceException e) {
- boolean isTransient = isTransientDatabaseError(e);
- LOGGER.error("Database deletion failed for task with ID: {} (transient: {})", taskId, isTransient, e);
+ LOGGER.error("Database deletion failed for task with ID: {}", taskId, e);
throw new TaskPersistenceException(taskId,
- "Database deletion failed for task", e, isTransient);
+ "Database deletion failed for task", e);
}
}
@@ -436,10 +373,9 @@ public ListTasksResult list(ListTasksParams params) {
throw e;
} catch (PersistenceException e) {
// Database errors from query creation, execution, or count
- boolean isTransient = isTransientDatabaseError(e);
- LOGGER.error("Database query failed during list operation (transient: {})", isTransient, e);
+ LOGGER.error("Database query failed during list operation", e);
throw new TaskPersistenceException(null, // No single taskId for list operation
- "Database query failed during list operation", e, isTransient);
+ "Database query failed during list operation", e);
}
}
diff --git a/server-common/src/main/java/io/a2a/server/events/MainEventBusProcessor.java b/server-common/src/main/java/io/a2a/server/events/MainEventBusProcessor.java
index c8f7db106..9005765eb 100644
--- a/server-common/src/main/java/io/a2a/server/events/MainEventBusProcessor.java
+++ b/server-common/src/main/java/io/a2a/server/events/MainEventBusProcessor.java
@@ -316,18 +316,9 @@ private boolean updateTaskStore(String taskId, Event event, boolean isReplicated
throw new InternalError("Failed to serialize task " + taskId + ": " + e.getMessage());
} catch (TaskPersistenceException e) {
- // Database failure - check if transient for retry guidance
- if (e.isTransient()) {
- LOGGER.warn("Task {} event persistence failed (TRANSIENT) - retry may succeed: {}",
- taskId, e.getMessage());
- throw new InternalError("Temporary storage failure for task " + taskId +
- " (retry may succeed): " + e.getMessage());
- } else {
- LOGGER.error("Task {} event persistence failed (PERMANENT) - manual intervention required: {}",
- taskId, e.getMessage(), e);
- throw new InternalError("Permanent storage failure for task " + taskId +
- " (requires intervention): " + e.getMessage());
- }
+ // Database/storage failure
+ LOGGER.error("Task {} event persistence failed: {}", taskId, e.getMessage(), e);
+ throw new InternalError("Storage failure for task " + taskId + ": " + e.getMessage());
} catch (InternalError e) {
// Already an InternalError from TaskManager validation - pass through
diff --git a/server-common/src/main/java/io/a2a/server/tasks/TaskPersistenceException.java b/server-common/src/main/java/io/a2a/server/tasks/TaskPersistenceException.java
index 0b40b4310..d5a218d77 100644
--- a/server-common/src/main/java/io/a2a/server/tasks/TaskPersistenceException.java
+++ b/server-common/src/main/java/io/a2a/server/tasks/TaskPersistenceException.java
@@ -6,36 +6,27 @@
* Exception for database/storage system failures during task persistence operations.
* Transient Failures (isTransient = true)
- * Temporary issues that may resolve with retry:
+ * than data format issues.
+ *
*
- * Recovery: Exponential backoff retry, circuit breaker patterns
- *
- * Non-Transient Failures (isTransient = false)
- * Persistent issues requiring intervention:
- *
*
- * Recovery: Manual intervention, capacity planning, configuration fixes
+ * Usage Example
* {@code
* try {
* em.merge(jpaTask);
* } catch (PersistenceException e) {
- * boolean transient = isTransientDatabaseError(e);
- * throw new TaskPersistenceException(taskId, "Database save failed", e, transient);
+ * throw new TaskPersistenceException(taskId, "Database save failed", e);
* }
* }
*
@@ -44,66 +35,53 @@
*/
public class TaskPersistenceException extends TaskStoreException {
- private final boolean isTransientFailure;
-
/**
* Creates a new TaskPersistenceException with no message or cause.
- * The failure is assumed to be non-transient.
*/
public TaskPersistenceException() {
super();
- this.isTransientFailure = false;
}
/**
* Creates a new TaskPersistenceException with the specified message.
- * The failure is assumed to be non-transient.
*
* @param msg the exception message
*/
public TaskPersistenceException(final String msg) {
super(msg);
- this.isTransientFailure = false;
}
/**
* Creates a new TaskPersistenceException with the specified cause.
- * The failure is assumed to be non-transient.
*
* @param cause the underlying cause
*/
public TaskPersistenceException(final Throwable cause) {
super(cause);
- this.isTransientFailure = false;
}
/**
* Creates a new TaskPersistenceException with the specified message and cause.
- * The failure is assumed to be non-transient.
*
* @param msg the exception message
* @param cause the underlying cause
*/
public TaskPersistenceException(final String msg, final Throwable cause) {
super(msg, cause);
- this.isTransientFailure = false;
}
/**
* Creates a new TaskPersistenceException with the specified task ID and message.
- * The failure is assumed to be non-transient.
*
* @param taskId the task identifier (may be null for operations not tied to a specific task)
* @param msg the exception message
*/
public TaskPersistenceException(@Nullable final String taskId, final String msg) {
super(taskId, msg);
- this.isTransientFailure = false;
}
/**
* Creates a new TaskPersistenceException with the specified task ID, message, and cause.
- * The failure is assumed to be non-transient.
*
* @param taskId the task identifier (may be null for operations not tied to a specific task)
* @param msg the exception message
@@ -111,64 +89,5 @@ public TaskPersistenceException(@Nullable final String taskId, final String msg)
*/
public TaskPersistenceException(@Nullable final String taskId, final String msg, final Throwable cause) {
super(taskId, msg, cause);
- this.isTransientFailure = false;
- }
-
- /**
- * Creates a new TaskPersistenceException with the specified message and transient flag.
- *
- * @param msg the exception message
- * @param isTransient true if the failure is transient and may resolve with retry, false otherwise
- */
- public TaskPersistenceException(final String msg, final boolean isTransient) {
- super(msg);
- this.isTransientFailure = isTransient;
- }
-
- /**
- * Creates a new TaskPersistenceException with the specified message, cause, and transient flag.
- *
- * @param msg the exception message
- * @param cause the underlying cause
- * @param isTransient true if the failure is transient and may resolve with retry, false otherwise
- */
- public TaskPersistenceException(final String msg, final Throwable cause, final boolean isTransient) {
- super(msg, cause);
- this.isTransientFailure = isTransient;
- }
-
- /**
- * Creates a new TaskPersistenceException with the specified task ID, message, and transient flag.
- *
- * @param taskId the task identifier (may be null for operations not tied to a specific task)
- * @param msg the exception message
- * @param isTransient true if the failure is transient and may resolve with retry, false otherwise
- */
- public TaskPersistenceException(@Nullable final String taskId, final String msg, final boolean isTransient) {
- super(taskId, msg);
- this.isTransientFailure = isTransient;
- }
-
- /**
- * Creates a new TaskPersistenceException with the specified task ID, message, cause, and transient flag.
- *
- * @param taskId the task identifier (may be null for operations not tied to a specific task)
- * @param msg the exception message
- * @param cause the underlying cause
- * @param isTransient true if the failure is transient and may resolve with retry, false otherwise
- */
- public TaskPersistenceException(@Nullable final String taskId, final String msg, final Throwable cause,
- final boolean isTransient) {
- super(taskId, msg, cause);
- this.isTransientFailure = isTransient;
- }
-
- /**
- * Indicates whether this failure is transient (retry may help).
- *
- * @return true if transient (network, timeout, deadlock), false if permanent (disk full, constraint)
- */
- public boolean isTransient() {
- return isTransientFailure;
}
}
diff --git a/server-common/src/test/java/io/a2a/server/events/MainEventBusProcessorExceptionTest.java b/server-common/src/test/java/io/a2a/server/events/MainEventBusProcessorExceptionTest.java
index eb6c53d46..8bb212601 100644
--- a/server-common/src/test/java/io/a2a/server/events/MainEventBusProcessorExceptionTest.java
+++ b/server-common/src/test/java/io/a2a/server/events/MainEventBusProcessorExceptionTest.java
@@ -40,8 +40,7 @@
* and distributed to clients with appropriate logging based on failure type:
*
*
*/
public class MainEventBusProcessorExceptionTest {
@@ -133,15 +132,15 @@ public void testTaskSerializationException_ConvertsToInternalError() throws Inte
}
/**
- * Test that TaskPersistenceException (transient) is converted to InternalError with WARN log.
- * AC#2: Mock TaskStore throws TaskPersistenceException(transient) → WARN log + InternalError
+ * Test that TaskPersistenceException is converted to InternalError with ERROR log.
+ * AC#2: Mock TaskStore throws TaskPersistenceException → ERROR log + InternalError
*/
@Test
- public void testTaskPersistenceException_Transient_ConvertsToInternalError() throws InterruptedException {
- // Arrange: Mock TaskStore to throw transient TaskPersistenceException
- String exceptionMessage = "Connection timeout to database";
+ public void testTaskPersistenceException_ConvertsToInternalError() throws InterruptedException {
+ // Arrange: Mock TaskStore to throw TaskPersistenceException
+ String exceptionMessage = "Database operation failed";
TaskPersistenceException exception = new TaskPersistenceException(
- TASK_ID, exceptionMessage, true // transient = true
+ TASK_ID, exceptionMessage
);
when(mockTaskStore.get(any())).thenThrow(exception);
doThrow(exception).when(mockTaskStore).save(any(Task.class), anyBoolean());
@@ -160,54 +159,13 @@ public void testTaskPersistenceException_Transient_ConvertsToInternalError() thr
InternalError error = (InternalError) distributedEvent;
assertTrue(error.getMessage().contains(TASK_ID),
"Error message should contain task ID: " + error.getMessage());
- assertTrue(error.getMessage().contains("retry may succeed"),
- "Transient error message should mention retry: " + error.getMessage());
- // Assert: Verify WARN level logging for transient failures
- boolean foundWarnLog = logAppender.list.stream()
- .anyMatch(event -> event.getLevel() == Level.WARN
- && event.getFormattedMessage().contains(TASK_ID)
- && event.getFormattedMessage().contains("TRANSIENT"));
- assertTrue(foundWarnLog, "Should log transient TaskPersistenceException at WARN level");
- }
-
- /**
- * Test that TaskPersistenceException (permanent) is converted to InternalError with ERROR log.
- * AC#3: Mock TaskStore throws TaskPersistenceException(permanent) → ERROR log + InternalError
- */
- @Test
- public void testTaskPersistenceException_Permanent_ConvertsToInternalError() throws InterruptedException {
- // Arrange: Mock TaskStore to throw permanent TaskPersistenceException
- String exceptionMessage = "Disk full - cannot write";
- TaskPersistenceException exception = new TaskPersistenceException(
- TASK_ID, exceptionMessage, false // transient = false (permanent)
- );
- when(mockTaskStore.get(any())).thenThrow(exception);
- doThrow(exception).when(mockTaskStore).save(any(Task.class), anyBoolean());
-
- Task testTask = createTestTask();
-
- // Act: Enqueue event and wait for processing
- ListException Handling
* TaskStore persistence failures are caught and handled gracefully:
*
- *
*
* Comparison to Database Implementations
diff --git a/server-common/src/main/java/io/a2a/server/tasks/PushNotificationSender.java b/server-common/src/main/java/io/a2a/server/tasks/PushNotificationSender.java
index f8b7b018d..093241182 100644
--- a/server-common/src/main/java/io/a2a/server/tasks/PushNotificationSender.java
+++ b/server-common/src/main/java/io/a2a/server/tasks/PushNotificationSender.java
@@ -63,7 +63,6 @@
* Implementations should handle errors gracefully:
*
*
diff --git a/server-common/src/main/java/io/a2a/server/tasks/TaskPersistenceException.java b/server-common/src/main/java/io/a2a/server/tasks/TaskPersistenceException.java
index d5a218d77..97007cab9 100644
--- a/server-common/src/main/java/io/a2a/server/tasks/TaskPersistenceException.java
+++ b/server-common/src/main/java/io/a2a/server/tasks/TaskPersistenceException.java
@@ -7,9 +7,8 @@
* Common Scenarios
*
*
- * Usage Example
* {@code
diff --git a/server-common/src/main/java/io/a2a/server/tasks/TaskSerializationException.java b/server-common/src/main/java/io/a2a/server/tasks/TaskSerializationException.java
index ac960f91f..5916ce530 100644
--- a/server-common/src/main/java/io/a2a/server/tasks/TaskSerializationException.java
+++ b/server-common/src/main/java/io/a2a/server/tasks/TaskSerializationException.java
@@ -6,8 +6,6 @@
* Exception for task serialization/deserialization failures.
* Common Scenarios
*
@@ -17,14 +15,6 @@
*
*
- * Recovery Strategy
- * Non-Transient: Retry will not help. Requires:
- *
- *
- *
* Usage Example
* {@code
* try {
diff --git a/server-common/src/main/java/io/a2a/server/tasks/TaskStore.java b/server-common/src/main/java/io/a2a/server/tasks/TaskStore.java
index c0b7eb9ea..7ae7a46bf 100644
--- a/server-common/src/main/java/io/a2a/server/tasks/TaskStore.java
+++ b/server-common/src/main/java/io/a2a/server/tasks/TaskStore.java
@@ -92,11 +92,10 @@
*
* Exception Contract
* All TaskStore methods may throw {@link TaskStoreException} or its subclasses to indicate
- * persistence failures. Implementers must choose the appropriate exception type based on
- * the failure cause:
+ * persistence failures:
*
- *
*
* When to Throw TaskSerializationException
@@ -107,23 +106,16 @@
* When to Throw TaskPersistenceException
* Use when the storage system fails:
*
- *
- * Set the {@code isTransient} flag appropriately:
- *
- *
*
* Implementer Example
@@ -139,61 +131,14 @@
* try {
* entityManager.merge(toEntity(json));
* } catch (PersistenceException e) {
- * boolean transient = isTransientDatabaseError(e);
- * throw new TaskPersistenceException(task.id(), "Database save failed", e, transient);
- * }
- * }
- *
- * @Override
- * public Task get(String taskId) {
- * String json = database.retrieve(taskId);
- * try {
- * return objectMapper.readValue(json, Task.class);
- * } catch (JsonProcessingException e) {
- * throw new TaskSerializationException(taskId, "Failed to deserialize task", e);
+ * throw new TaskPersistenceException(task.id(), "Database save failed", e);
* }
* }
* }
*
- * Caller Exception Handling
- * Callers should distinguish between transient and permanent failures:
- * {@code
- * try {
- * taskStore.save(task, false);
- * } catch (TaskSerializationException e) {
- * // Non-transient: Log error, notify operations team
- * logger.error("Task data corruption for {}: {}", e.getTaskId(), e.getMessage(), e);
- * alerting.sendAlert("Task serialization failure", e);
- * // DO NOT RETRY - requires manual data repair
- *
- * } catch (TaskPersistenceException e) {
- * if (e.isTransient()) {
- * // Transient: Retry with exponential backoff
- * logger.warn("Transient persistence failure for {}: {}", e.getTaskId(), e.getMessage());
- * retryWithBackoff(() -> taskStore.save(task, false));
- * } else {
- * // Non-transient: Log error, alert operations
- * logger.error("Permanent persistence failure for {}: {}", e.getTaskId(), e.getMessage(), e);
- * alerting.sendAlert("Database capacity/constraint issue", e);
- * // DO NOT RETRY - requires manual intervention
- * }
- * } catch (TaskStoreException e) {
- * // Generic fallback - treat as non-transient
- * logger.error("TaskStore failure for {}: {}", e.getTaskId(), e.getMessage(), e);
- * // DO NOT RETRY by default
- * }
- * }
- *
- * Current Exception Handling
- * {@link io.a2a.server.events.MainEventBusProcessor} currently catches all TaskStore
- * exceptions and wraps them in {@link io.a2a.spec.InternalError} events for client
- * distribution. Future enhancements may distinguish transient failures for retry logic.
- *
- * Method-Specific Notes
- *
- *
+ * Exception Handling
+ * {@link io.a2a.server.events.MainEventBusProcessor} catches TaskStore exceptions and
+ * wraps them in {@link io.a2a.spec.InternalError} events for client distribution.
*
* @see TaskManager
* @see TaskStateProvider
@@ -213,9 +158,8 @@ public interface TaskStore {
* false if it originated locally. Used to prevent feedback loops
* in replicated scenarios (e.g., don't fire TaskFinalizedEvent for replicated updates)
* @throws TaskSerializationException if the task cannot be serialized to storage format (JSON parsing error,
- * invalid field values, schema mismatch). Non-transient - retry will not help.
- * @throws TaskPersistenceException if the storage system fails (database timeout, connection error, disk full).
- * Check {@link TaskPersistenceException#isTransient()} to determine if retry is appropriate.
+ * invalid field values, schema mismatch)
+ * @throws TaskPersistenceException if the storage system fails (database timeout, connection error, disk full)
* @throws TaskStoreException for other persistence failures not covered by specific subclasses
*/
void save(Task task, boolean isReplicated);
@@ -226,9 +170,9 @@ public interface TaskStore {
* @param taskId the task identifier
* @return the task if found, null otherwise
* @throws TaskSerializationException if the persisted task data cannot be deserialized (corrupted JSON,
- * schema incompatibility). Non-transient - indicates data corruption.
+ * schema incompatibility)
* @throws TaskPersistenceException if the storage system fails during retrieval (database connection error,
- * query timeout). Check {@link TaskPersistenceException#isTransient()} for retry guidance.
+ * query timeout)
* @throws TaskStoreException for other retrieval failures not covered by specific subclasses
*/
@Nullable Task get(String taskId);
@@ -238,8 +182,7 @@ public interface TaskStore {
*
* @param taskId the task identifier
* @throws TaskPersistenceException if the storage system fails during deletion (database connection error,
- * transaction timeout, constraint violation). Check {@link TaskPersistenceException#isTransient()}
- * to determine if retry is appropriate.
+ * transaction timeout, constraint violation)
* @throws TaskStoreException for other deletion failures not covered by specific subclasses
*/
void delete(String taskId);
@@ -250,10 +193,9 @@ public interface TaskStore {
* @param params the filtering and pagination parameters
* @return the list of tasks matching the criteria with pagination info
* @throws TaskSerializationException if any persisted task data cannot be deserialized during listing
- * (corrupted JSON in database). Non-transient - indicates data corruption affecting
- * one or more tasks.
+ * (corrupted JSON in database)
* @throws TaskPersistenceException if the storage system fails during the list operation (database query timeout,
- * connection error). Check {@link TaskPersistenceException#isTransient()} for retry guidance.
+ * connection error)
* @throws TaskStoreException for other listing failures not covered by specific subclasses
*/
ListTasksResult list(ListTasksParams params);
diff --git a/server-common/src/test/java/io/a2a/server/tasks/TaskSerializationExceptionTest.java b/server-common/src/test/java/io/a2a/server/tasks/TaskSerializationExceptionTest.java
index e843adb68..df356ef14 100644
--- a/server-common/src/test/java/io/a2a/server/tasks/TaskSerializationExceptionTest.java
+++ b/server-common/src/test/java/io/a2a/server/tasks/TaskSerializationExceptionTest.java
@@ -11,7 +11,7 @@
* Unit tests for {@link TaskSerializationException}.
*