diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleITCase.java index 4a97e17cf7b24..5be756a0efadd 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleITCase.java @@ -20,6 +20,7 @@ package org.apache.flink.test.checkpointing; import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.accumulators.LongCounter; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.MapFunction; @@ -54,13 +55,13 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import java.io.File; import java.util.Arrays; import java.util.BitSet; import java.util.Collections; import static org.apache.flink.api.common.eventtime.WatermarkStrategy.noWatermarks; import static org.apache.flink.util.Preconditions.checkState; +import static org.assertj.core.api.Assertions.assertThat; import static org.hamcrest.Matchers.equalTo; /** Integration test for performing rescale of unaligned checkpoint. */ @@ -366,14 +367,7 @@ public String map(Long value) throws Exception { }) .name("long-to-string-map") .uid("long-to-string-map") - .map( - new FailingMapper<>( - state -> false, - state -> - state.completedCheckpoints >= minCheckpoints / 2 - && state.runNumber == 0, - state -> false, - state -> false)) + .map(getFailingMapper(minCheckpoints)) .name("failing-map") .uid("failing-map") .setParallelism(parallelism) @@ -394,14 +388,7 @@ void addFailingSink( DataStream combinedSource, long minCheckpoints, boolean slotSharing) { combinedSource .shuffle() - .map( - new FailingMapper<>( - state -> false, - state -> - state.completedCheckpoints >= minCheckpoints / 2 - && state.runNumber == 0, - state -> false, - state -> false)) + .map(getFailingMapper(minCheckpoints)) .name("failing-map") .uid("failing-map") .slotSharingGroup(slotSharing ? "default" : "failing-map") @@ -418,6 +405,25 @@ void addFailingSink( .slotSharingGroup(slotSharing ? "default" : "sink"); } + /** + * Creates a FailingMapper that only fails during snapshot operations. + * + *

Only fails during snapshotState() when completedCheckpoints >= minCheckpoints/2 AND + * runNumber == 0. After job failovers internally, runNumber becomes attemptNumber > 0, so + * failure condition is no longer satisfied. This ensures the mapper fails exactly once + * during initial run to trigger job failover, but never fails again after failing over and + * recovery from checkpoint. + */ + private static FailingMapper getFailingMapper(long minCheckpoints) { + return new FailingMapper<>( + state -> false, + state -> + state.completedCheckpoints >= minCheckpoints / 2 + && state.runNumber == 0, + state -> false, + state -> false); + } + DataStream createSourcePipeline( StreamExecutionEnvironment env, int minCheckpoints, @@ -611,21 +617,34 @@ public UnalignedCheckpointRescaleITCase( this.sourceSleepMs = sourceSleepMs; } + /** + * Tests unaligned checkpoint rescaling behavior. + * + *

Prescale phase: Job fails when completedCheckpoints >= minCheckpoints/2 via FailingMapper. + * Generates checkpoint for rescale test. + * + *

Postscale phase: Job restores from checkpoint with different parallelism, failovers once, + * and finishes after source generates all records. + */ @Test public void shouldRescaleUnalignedCheckpoint() throws Exception { final UnalignedSettings prescaleSettings = new UnalignedSettings(topology) .setParallelism(oldParallelism) .setExpectedFailures(1) - .setSourceSleepMs(sourceSleepMs); + .setSourceSleepMs(sourceSleepMs) + .setExpectedFinalJobStatus(JobStatus.FAILED); prescaleSettings.setGenerateCheckpoint(true); - final File checkpointDir = super.execute(prescaleSettings); - + final String checkpointDir = super.execute(prescaleSettings); + assertThat(checkpointDir) + .as("First job must generate a checkpoint for rescale test to be valid.") + .isNotNull(); // resume final UnalignedSettings postscaleSettings = new UnalignedSettings(topology) .setParallelism(newParallelism) - .setExpectedFailures(1); + .setExpectedFailures(1) + .setExpectedFinalJobStatus(JobStatus.FINISHED); postscaleSettings.setRestoreCheckpoint(checkpointDir); super.execute(postscaleSettings); } diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java index d2de69d6ec3f3..cd3e1e71b4003 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java @@ -19,6 +19,7 @@ import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.JobSubmissionResult; import org.apache.flink.api.common.accumulators.IntCounter; import org.apache.flink.api.common.accumulators.LongCounter; @@ -57,13 +58,12 @@ import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.runtime.testutils.CommonTestUtils; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; -import org.apache.flink.runtime.throwable.ThrowableAnnotation; -import org.apache.flink.runtime.throwable.ThrowableType; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction; import org.apache.flink.streaming.api.functions.sink.legacy.RichSinkFunction; import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.streaming.util.RestartStrategyUtils; import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler; import org.apache.flink.util.Collector; @@ -73,6 +73,7 @@ import org.apache.flink.shaded.guava33.com.google.common.collect.Iterables; +import org.assertj.core.api.Fail; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Rule; @@ -103,6 +104,7 @@ import static org.apache.flink.shaded.guava33.com.google.common.collect.Iterables.getOnlyElement; import static org.apache.flink.util.Preconditions.checkState; +import static org.assertj.core.api.Assertions.assertThat; /** * Base class for tests related to unaligned checkpoints. @@ -150,7 +152,7 @@ public static void afterAll() { } @Nullable - protected File execute(UnalignedSettings settings) throws Exception { + protected String execute(UnalignedSettings settings) throws Exception { final File checkpointDir = temp.newFolder(); Configuration conf = settings.getConfiguration(checkpointDir); @@ -179,6 +181,7 @@ protected File execute(UnalignedSettings settings) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); settings.configure(env); + JobID jobID = null; try { // print the test parameters to help debugging when the case is stuck System.out.println( @@ -186,25 +189,34 @@ protected File execute(UnalignedSettings settings) throws Exception { final CompletableFuture result = miniCluster.getMiniCluster().submitJob(streamGraph.getJobGraph()); - final JobID jobID = result.get().getJobID(); + jobID = result.get().getJobID(); checkCounters( miniCluster .getMiniCluster() .requestJobResult(jobID) .get() .toJobExecutionResult(getClass().getClassLoader())); + if (settings.expectedFinalJobStatus != null) { + assertThat(miniCluster.getMiniCluster().getJobStatus(jobID)) + .succeedsWithin(Duration.ofMinutes(1)) + .isEqualTo(settings.expectedFinalJobStatus); + } System.out.println( "Finished " + getClass().getCanonicalName() + "#" + name.getMethodName() + "."); if (settings.generateCheckpoint) { return CommonTestUtils.getLatestCompletedCheckpointPath( jobID, miniCluster.getMiniCluster()) - .map(File::new) - .orElseThrow(() -> new AssertionError("Could not generate checkpoint")); + .orElseGet(() -> Fail.fail("Could not generate checkpoint")); } } catch (Exception e) { if (ExceptionUtils.findThrowable(e, TestException.class).isEmpty()) { throw e; } + if (settings.generateCheckpoint) { + return CommonTestUtils.getLatestCompletedCheckpointPath( + jobID, miniCluster.getMiniCluster()) + .orElseGet(() -> Fail.fail("Could not generate checkpoint")); + } } finally { miniCluster.after(); } @@ -680,7 +692,7 @@ public String toString() { protected static class UnalignedSettings { private int parallelism; private final int minCheckpoints = 10; - @Nullable private File restoreCheckpoint; + @Nullable private String restoreCheckpoint; private boolean generateCheckpoint = false; int expectedFailures = 0; int tolerableCheckpointFailures = 0; @@ -691,6 +703,7 @@ protected static class UnalignedSettings { private int failuresAfterSourceFinishes = 0; private ChannelType channelType = ChannelType.MIXED; private long sourceSleepMs = 0; + @Nullable private JobStatus expectedFinalJobStatus = null; public UnalignedSettings(DagCreator dagCreator) { this.dagCreator = dagCreator; @@ -701,7 +714,7 @@ public UnalignedSettings setParallelism(int parallelism) { return this; } - public UnalignedSettings setRestoreCheckpoint(File restoreCheckpoint) { + public UnalignedSettings setRestoreCheckpoint(String restoreCheckpoint) { this.restoreCheckpoint = restoreCheckpoint; return this; } @@ -746,6 +759,11 @@ public UnalignedSettings setSourceSleepMs(long sourceSleepMs) { return this; } + public UnalignedSettings setExpectedFinalJobStatus(JobStatus expectedFinalJobStatus) { + this.expectedFinalJobStatus = expectedFinalJobStatus; + return this; + } + public void configure(StreamExecutionEnvironment env) { env.enableCheckpointing(Math.max(100L, parallelism * 50L)); env.getCheckpointConfig() @@ -754,6 +772,8 @@ public void configure(StreamExecutionEnvironment env) { env.getCheckpointConfig() .setTolerableCheckpointFailureNumber(tolerableCheckpointFailures); env.setParallelism(parallelism); + RestartStrategyUtils.configureFixedDelayRestartStrategy( + env, generateCheckpoint ? expectedFailures / 2 : expectedFailures, 100L); env.getCheckpointConfig().enableUnalignedCheckpoints(true); // for custom partitioner env.getCheckpointConfig().setForceUnalignedCheckpoints(true); @@ -772,7 +792,7 @@ public Configuration getConfiguration(File checkpointDir) { conf.set(StateBackendOptions.STATE_BACKEND, "hashmap"); conf.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString()); if (restoreCheckpoint != null) { - conf.set(StateRecoveryOptions.SAVEPOINT_PATH, restoreCheckpoint.toURI().toString()); + conf.set(StateRecoveryOptions.SAVEPOINT_PATH, restoreCheckpoint); } conf.set( @@ -1132,7 +1152,6 @@ protected static long checkHeader(long value) { return value; } - @ThrowableAnnotation(ThrowableType.NonRecoverableError) static class TestException extends Exception { public TestException(String s) { super(s);