Skip to content

Commit c4d6344

Browse files
authored
[FLINK-38403][tests] Fix the unexpected test that the second job does not restore from checkpoint (#27254)
1 parent b61de59 commit c4d6344

File tree

2 files changed

+69
-31
lines changed

2 files changed

+69
-31
lines changed

flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleITCase.java

Lines changed: 40 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.apache.flink.test.checkpointing;
2121

2222
import org.apache.flink.api.common.JobExecutionResult;
23+
import org.apache.flink.api.common.JobStatus;
2324
import org.apache.flink.api.common.accumulators.LongCounter;
2425
import org.apache.flink.api.common.functions.FilterFunction;
2526
import org.apache.flink.api.common.functions.MapFunction;
@@ -54,13 +55,13 @@
5455
import org.junit.runner.RunWith;
5556
import org.junit.runners.Parameterized;
5657

57-
import java.io.File;
5858
import java.util.Arrays;
5959
import java.util.BitSet;
6060
import java.util.Collections;
6161

6262
import static org.apache.flink.api.common.eventtime.WatermarkStrategy.noWatermarks;
6363
import static org.apache.flink.util.Preconditions.checkState;
64+
import static org.assertj.core.api.Assertions.assertThat;
6465
import static org.hamcrest.Matchers.equalTo;
6566

6667
/** Integration test for performing rescale of unaligned checkpoint. */
@@ -366,14 +367,7 @@ public String map(Long value) throws Exception {
366367
})
367368
.name("long-to-string-map")
368369
.uid("long-to-string-map")
369-
.map(
370-
new FailingMapper<>(
371-
state -> false,
372-
state ->
373-
state.completedCheckpoints >= minCheckpoints / 2
374-
&& state.runNumber == 0,
375-
state -> false,
376-
state -> false))
370+
.map(getFailingMapper(minCheckpoints))
377371
.name("failing-map")
378372
.uid("failing-map")
379373
.setParallelism(parallelism)
@@ -394,14 +388,7 @@ void addFailingSink(
394388
DataStream<Long> combinedSource, long minCheckpoints, boolean slotSharing) {
395389
combinedSource
396390
.shuffle()
397-
.map(
398-
new FailingMapper<>(
399-
state -> false,
400-
state ->
401-
state.completedCheckpoints >= minCheckpoints / 2
402-
&& state.runNumber == 0,
403-
state -> false,
404-
state -> false))
391+
.map(getFailingMapper(minCheckpoints))
405392
.name("failing-map")
406393
.uid("failing-map")
407394
.slotSharingGroup(slotSharing ? "default" : "failing-map")
@@ -418,6 +405,25 @@ void addFailingSink(
418405
.slotSharingGroup(slotSharing ? "default" : "sink");
419406
}
420407

408+
/**
409+
* Creates a FailingMapper that only fails during snapshot operations.
410+
*
411+
* <p>Only fails during snapshotState() when completedCheckpoints >= minCheckpoints/2 AND
412+
* runNumber == 0. After job failovers internally, runNumber becomes attemptNumber > 0, so
413+
* failure condition is no longer satisfied. This ensures the mapper fails exactly once
414+
* during initial run to trigger job failover, but never fails again after failing over and
415+
* recovery from checkpoint.
416+
*/
417+
private static <T> FailingMapper<T> getFailingMapper(long minCheckpoints) {
418+
return new FailingMapper<>(
419+
state -> false,
420+
state ->
421+
state.completedCheckpoints >= minCheckpoints / 2
422+
&& state.runNumber == 0,
423+
state -> false,
424+
state -> false);
425+
}
426+
421427
DataStream<Long> createSourcePipeline(
422428
StreamExecutionEnvironment env,
423429
int minCheckpoints,
@@ -611,21 +617,34 @@ public UnalignedCheckpointRescaleITCase(
611617
this.sourceSleepMs = sourceSleepMs;
612618
}
613619

620+
/**
621+
* Tests unaligned checkpoint rescaling behavior.
622+
*
623+
* <p>Prescale phase: Job fails when completedCheckpoints >= minCheckpoints/2 via FailingMapper.
624+
* Generates checkpoint for rescale test.
625+
*
626+
* <p>Postscale phase: Job restores from checkpoint with different parallelism, failovers once,
627+
* and finishes after source generates all records.
628+
*/
614629
@Test
615630
public void shouldRescaleUnalignedCheckpoint() throws Exception {
616631
final UnalignedSettings prescaleSettings =
617632
new UnalignedSettings(topology)
618633
.setParallelism(oldParallelism)
619634
.setExpectedFailures(1)
620-
.setSourceSleepMs(sourceSleepMs);
635+
.setSourceSleepMs(sourceSleepMs)
636+
.setExpectedFinalJobStatus(JobStatus.FAILED);
621637
prescaleSettings.setGenerateCheckpoint(true);
622-
final File checkpointDir = super.execute(prescaleSettings);
623-
638+
final String checkpointDir = super.execute(prescaleSettings);
639+
assertThat(checkpointDir)
640+
.as("First job must generate a checkpoint for rescale test to be valid.")
641+
.isNotNull();
624642
// resume
625643
final UnalignedSettings postscaleSettings =
626644
new UnalignedSettings(topology)
627645
.setParallelism(newParallelism)
628-
.setExpectedFailures(1);
646+
.setExpectedFailures(1)
647+
.setExpectedFinalJobStatus(JobStatus.FINISHED);
629648
postscaleSettings.setRestoreCheckpoint(checkpointDir);
630649
super.execute(postscaleSettings);
631650
}

flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.flink.api.common.JobExecutionResult;
2121
import org.apache.flink.api.common.JobID;
22+
import org.apache.flink.api.common.JobStatus;
2223
import org.apache.flink.api.common.JobSubmissionResult;
2324
import org.apache.flink.api.common.accumulators.IntCounter;
2425
import org.apache.flink.api.common.accumulators.LongCounter;
@@ -57,13 +58,12 @@
5758
import org.apache.flink.runtime.state.FunctionSnapshotContext;
5859
import org.apache.flink.runtime.testutils.CommonTestUtils;
5960
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
60-
import org.apache.flink.runtime.throwable.ThrowableAnnotation;
61-
import org.apache.flink.runtime.throwable.ThrowableType;
6261
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
6362
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
6463
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
6564
import org.apache.flink.streaming.api.functions.sink.legacy.RichSinkFunction;
6665
import org.apache.flink.streaming.api.graph.StreamGraph;
66+
import org.apache.flink.streaming.util.RestartStrategyUtils;
6767
import org.apache.flink.test.util.MiniClusterWithClientResource;
6868
import org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler;
6969
import org.apache.flink.util.Collector;
@@ -73,6 +73,7 @@
7373

7474
import org.apache.flink.shaded.guava33.com.google.common.collect.Iterables;
7575

76+
import org.assertj.core.api.Fail;
7677
import org.junit.AfterClass;
7778
import org.junit.BeforeClass;
7879
import org.junit.Rule;
@@ -103,6 +104,7 @@
103104

104105
import static org.apache.flink.shaded.guava33.com.google.common.collect.Iterables.getOnlyElement;
105106
import static org.apache.flink.util.Preconditions.checkState;
107+
import static org.assertj.core.api.Assertions.assertThat;
106108

107109
/**
108110
* Base class for tests related to unaligned checkpoints.
@@ -150,7 +152,7 @@ public static void afterAll() {
150152
}
151153

152154
@Nullable
153-
protected File execute(UnalignedSettings settings) throws Exception {
155+
protected String execute(UnalignedSettings settings) throws Exception {
154156
final File checkpointDir = temp.newFolder();
155157
Configuration conf = settings.getConfiguration(checkpointDir);
156158

@@ -179,32 +181,42 @@ protected File execute(UnalignedSettings settings) throws Exception {
179181
final StreamExecutionEnvironment env =
180182
StreamExecutionEnvironment.getExecutionEnvironment(conf);
181183
settings.configure(env);
184+
JobID jobID = null;
182185
try {
183186
// print the test parameters to help debugging when the case is stuck
184187
System.out.println(
185188
"Starting " + getClass().getCanonicalName() + "#" + name.getMethodName() + ".");
186189
final CompletableFuture<JobSubmissionResult> result =
187190
miniCluster.getMiniCluster().submitJob(streamGraph.getJobGraph());
188191

189-
final JobID jobID = result.get().getJobID();
192+
jobID = result.get().getJobID();
190193
checkCounters(
191194
miniCluster
192195
.getMiniCluster()
193196
.requestJobResult(jobID)
194197
.get()
195198
.toJobExecutionResult(getClass().getClassLoader()));
199+
if (settings.expectedFinalJobStatus != null) {
200+
assertThat(miniCluster.getMiniCluster().getJobStatus(jobID))
201+
.succeedsWithin(Duration.ofMinutes(1))
202+
.isEqualTo(settings.expectedFinalJobStatus);
203+
}
196204
System.out.println(
197205
"Finished " + getClass().getCanonicalName() + "#" + name.getMethodName() + ".");
198206
if (settings.generateCheckpoint) {
199207
return CommonTestUtils.getLatestCompletedCheckpointPath(
200208
jobID, miniCluster.getMiniCluster())
201-
.map(File::new)
202-
.orElseThrow(() -> new AssertionError("Could not generate checkpoint"));
209+
.orElseGet(() -> Fail.fail("Could not generate checkpoint"));
203210
}
204211
} catch (Exception e) {
205212
if (ExceptionUtils.findThrowable(e, TestException.class).isEmpty()) {
206213
throw e;
207214
}
215+
if (settings.generateCheckpoint) {
216+
return CommonTestUtils.getLatestCompletedCheckpointPath(
217+
jobID, miniCluster.getMiniCluster())
218+
.orElseGet(() -> Fail.fail("Could not generate checkpoint"));
219+
}
208220
} finally {
209221
miniCluster.after();
210222
}
@@ -680,7 +692,7 @@ public String toString() {
680692
protected static class UnalignedSettings {
681693
private int parallelism;
682694
private final int minCheckpoints = 10;
683-
@Nullable private File restoreCheckpoint;
695+
@Nullable private String restoreCheckpoint;
684696
private boolean generateCheckpoint = false;
685697
int expectedFailures = 0;
686698
int tolerableCheckpointFailures = 0;
@@ -691,6 +703,7 @@ protected static class UnalignedSettings {
691703
private int failuresAfterSourceFinishes = 0;
692704
private ChannelType channelType = ChannelType.MIXED;
693705
private long sourceSleepMs = 0;
706+
@Nullable private JobStatus expectedFinalJobStatus = null;
694707

695708
public UnalignedSettings(DagCreator dagCreator) {
696709
this.dagCreator = dagCreator;
@@ -701,7 +714,7 @@ public UnalignedSettings setParallelism(int parallelism) {
701714
return this;
702715
}
703716

704-
public UnalignedSettings setRestoreCheckpoint(File restoreCheckpoint) {
717+
public UnalignedSettings setRestoreCheckpoint(String restoreCheckpoint) {
705718
this.restoreCheckpoint = restoreCheckpoint;
706719
return this;
707720
}
@@ -746,6 +759,11 @@ public UnalignedSettings setSourceSleepMs(long sourceSleepMs) {
746759
return this;
747760
}
748761

762+
public UnalignedSettings setExpectedFinalJobStatus(JobStatus expectedFinalJobStatus) {
763+
this.expectedFinalJobStatus = expectedFinalJobStatus;
764+
return this;
765+
}
766+
749767
public void configure(StreamExecutionEnvironment env) {
750768
env.enableCheckpointing(Math.max(100L, parallelism * 50L));
751769
env.getCheckpointConfig()
@@ -754,6 +772,8 @@ public void configure(StreamExecutionEnvironment env) {
754772
env.getCheckpointConfig()
755773
.setTolerableCheckpointFailureNumber(tolerableCheckpointFailures);
756774
env.setParallelism(parallelism);
775+
RestartStrategyUtils.configureFixedDelayRestartStrategy(
776+
env, generateCheckpoint ? expectedFailures / 2 : expectedFailures, 100L);
757777
env.getCheckpointConfig().enableUnalignedCheckpoints(true);
758778
// for custom partitioner
759779
env.getCheckpointConfig().setForceUnalignedCheckpoints(true);
@@ -772,7 +792,7 @@ public Configuration getConfiguration(File checkpointDir) {
772792
conf.set(StateBackendOptions.STATE_BACKEND, "hashmap");
773793
conf.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString());
774794
if (restoreCheckpoint != null) {
775-
conf.set(StateRecoveryOptions.SAVEPOINT_PATH, restoreCheckpoint.toURI().toString());
795+
conf.set(StateRecoveryOptions.SAVEPOINT_PATH, restoreCheckpoint);
776796
}
777797

778798
conf.set(
@@ -1132,7 +1152,6 @@ protected static long checkHeader(long value) {
11321152
return value;
11331153
}
11341154

1135-
@ThrowableAnnotation(ThrowableType.NonRecoverableError)
11361155
static class TestException extends Exception {
11371156
public TestException(String s) {
11381157
super(s);

0 commit comments

Comments
 (0)