From 97065fc40256ac18388f8ebdd157e7c744bc1a6a Mon Sep 17 00:00:00 2001 From: kjg Date: Sat, 8 Nov 2025 13:10:05 +0900 Subject: [PATCH] Fix silent data loss in ChunkOrientedStep when skip policy rejects When retry is exhausted and skipPolicy.shouldSkip() returns false, ChunkOrientedStep should fail the job instead of silently discarding the item. This change adds proper exception handling to prevent silent data loss. - Add NonSkippableReadException in doSkipInRead() else block - Add NonSkippableProcessException in doSkipInProcess() else block - Add NonSkippableWriteException in scan() else block - Add test case for skip policy rejection scenario Fixes gh-5079 Signed-off-by: kjg --- .../core/step/item/ChunkOrientedStep.java | 10 ++++ .../step/item/ChunkOrientedStepTests.java | 53 +++++++++++++++++++ 2 files changed, 63 insertions(+) diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/ChunkOrientedStep.java b/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/ChunkOrientedStep.java index 2c86d54e21..f4f79e75df 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/ChunkOrientedStep.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/ChunkOrientedStep.java @@ -51,6 +51,9 @@ import org.springframework.batch.core.step.StepInterruptionPolicy; import org.springframework.batch.core.step.ThreadStepInterruptionPolicy; import org.springframework.batch.core.step.skip.AlwaysSkipItemSkipPolicy; +import org.springframework.batch.core.step.skip.NonSkippableProcessException; +import org.springframework.batch.core.step.skip.NonSkippableReadException; +import org.springframework.batch.core.step.skip.NonSkippableWriteException; import org.springframework.batch.core.step.skip.SkipPolicy; import org.springframework.batch.infrastructure.item.Chunk; import org.springframework.batch.infrastructure.item.ExecutionContext; @@ -560,6 +563,9 @@ private void doSkipInRead(RetryException retryException, StepContribution contri this.compositeSkipListener.onSkipInRead(cause); contribution.incrementReadSkipCount(); } + else { + throw new NonSkippableReadException("Skip policy rejected skipping item", cause); + } } private Chunk processChunk(Chunk chunk, StepContribution contribution) throws Exception { @@ -653,6 +659,9 @@ private void doSkipInProcess(I item, RetryException retryException, StepContribu this.compositeSkipListener.onSkipInProcess(item, retryException.getCause()); contribution.incrementProcessSkipCount(); } + else { + throw new NonSkippableProcessException("Skip policy rejected skipping item", cause); + } } private void writeChunk(Chunk chunk, StepContribution contribution) throws Exception { @@ -737,6 +746,7 @@ private void scan(Chunk chunk, StepContribution contribution) { else { logger.error("Failed to write item: " + item, exception); this.compositeItemWriteListener.onWriteError(exception, singleItemChunk); + throw new NonSkippableWriteException("Skip policy rejected skipping item", exception); } } } diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/ChunkOrientedStepTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/ChunkOrientedStepTests.java index 244719d754..681c993e21 100644 --- a/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/ChunkOrientedStepTests.java +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/ChunkOrientedStepTests.java @@ -17,21 +17,32 @@ import org.junit.jupiter.api.Test; +import org.springframework.batch.core.ExitStatus; import org.springframework.batch.core.job.JobExecution; import org.springframework.batch.core.job.JobInstance; import org.springframework.batch.core.job.parameters.JobParameters; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.repository.support.ResourcelessJobRepository; +import org.springframework.batch.core.step.FatalStepExecutionException; import org.springframework.batch.core.step.StepExecution; +import org.springframework.batch.core.step.skip.NeverSkipItemSkipPolicy; +import org.springframework.batch.core.step.skip.NonSkippableProcessException; +import org.springframework.batch.infrastructure.item.ItemProcessor; import org.springframework.batch.infrastructure.item.ItemReader; import org.springframework.batch.infrastructure.item.ItemWriter; +import org.springframework.core.retry.policy.SimpleRetryPolicy; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; /** + * Tests for {@link ChunkOrientedStep}. + * * @author Mahmoud Ben Hassine */ public class ChunkOrientedStepTests { @@ -57,4 +68,46 @@ void testReadNoMoreThanAvailableItems() throws Exception { verify(reader, times(6)).read(); } + @Test + void testDoSkipInProcessThrowsNonSkippableProcessExceptionWhenSkipPolicyReturnsFalse() throws Exception { + // given - fault-tolerant step with NeverSkipItemSkipPolicy and retry limit + ItemReader reader = mock(); + when(reader.read()).thenReturn("item1", "item2", "item3", null); + + ItemProcessor processor = item -> { + if ("item2".equals(item)) { + throw new RuntimeException("Processing failed for item2"); + } + return item.toUpperCase(); + }; + + ItemWriter writer = chunk -> { + }; + + JobRepository jobRepository = new ResourcelessJobRepository(); + ChunkOrientedStep step = new ChunkOrientedStep<>("step", 3, reader, writer, jobRepository); + step.setItemProcessor(processor); + step.setFaultTolerant(true); + step.setRetryPolicy(new SimpleRetryPolicy(2)); // retry once (initial + 1 retry) + step.setSkipPolicy(new NeverSkipItemSkipPolicy()); // never skip + step.afterPropertiesSet(); + + JobInstance jobInstance = new JobInstance(1L, "job"); + JobExecution jobExecution = new JobExecution(1L, jobInstance, new JobParameters()); + StepExecution stepExecution = new StepExecution(1L, "step", jobExecution); + + // when - execute step + FatalStepExecutionException exception = assertThrows(FatalStepExecutionException.class, () -> { + step.execute(stepExecution); + }); + + // then - should throw NonSkippableProcessException + Throwable cause = exception.getCause(); + assertInstanceOf(NonSkippableProcessException.class, cause, + "Expected NonSkippableProcessException when skip policy rejects skipping"); + assertEquals("Skip policy rejected skipping item", cause.getMessage()); + assertEquals(ExitStatus.FAILED.getExitCode(), stepExecution.getExitStatus().getExitCode()); + assertEquals(0, stepExecution.getProcessSkipCount(), "Process skip count should be 0"); + } + }