Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.flink.test.checkpointing;

import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
Expand Down Expand Up @@ -54,13 +55,13 @@
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.io.File;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collections;

import static org.apache.flink.api.common.eventtime.WatermarkStrategy.noWatermarks;
import static org.apache.flink.util.Preconditions.checkState;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.Matchers.equalTo;

/** Integration test for performing rescale of unaligned checkpoint. */
Expand Down Expand Up @@ -366,14 +367,7 @@ public String map(Long value) throws Exception {
})
.name("long-to-string-map")
.uid("long-to-string-map")
.map(
new FailingMapper<>(
state -> false,
state ->
state.completedCheckpoints >= minCheckpoints / 2
&& state.runNumber == 0,
state -> false,
state -> false))
.map(getFailingMapper(minCheckpoints))
.name("failing-map")
.uid("failing-map")
.setParallelism(parallelism)
Expand All @@ -394,14 +388,7 @@ void addFailingSink(
DataStream<Long> combinedSource, long minCheckpoints, boolean slotSharing) {
combinedSource
.shuffle()
.map(
new FailingMapper<>(
state -> false,
state ->
state.completedCheckpoints >= minCheckpoints / 2
&& state.runNumber == 0,
state -> false,
state -> false))
.map(getFailingMapper(minCheckpoints))
.name("failing-map")
.uid("failing-map")
.slotSharingGroup(slotSharing ? "default" : "failing-map")
Expand All @@ -418,6 +405,25 @@ void addFailingSink(
.slotSharingGroup(slotSharing ? "default" : "sink");
}

/**
* Creates a FailingMapper that only fails during snapshot operations.
*
* <p>Only fails during snapshotState() when completedCheckpoints >= minCheckpoints/2 AND
* runNumber == 0. After job failovers internally, runNumber becomes attemptNumber > 0, so
* failure condition is no longer satisfied. This ensures the mapper fails exactly once
* during initial run to trigger job failover, but never fails again after failing over and
* recovery from checkpoint.
*/
private static <T> FailingMapper<T> getFailingMapper(long minCheckpoints) {
return new FailingMapper<>(
state -> false,
state ->
state.completedCheckpoints >= minCheckpoints / 2
&& state.runNumber == 0,
state -> false,
state -> false);
}

DataStream<Long> createSourcePipeline(
StreamExecutionEnvironment env,
int minCheckpoints,
Expand Down Expand Up @@ -611,21 +617,34 @@ public UnalignedCheckpointRescaleITCase(
this.sourceSleepMs = sourceSleepMs;
}

/**
* Tests unaligned checkpoint rescaling behavior.
*
* <p>Prescale phase: Job fails when completedCheckpoints >= minCheckpoints/2 via FailingMapper.
* Generates checkpoint for rescale test.
*
* <p>Postscale phase: Job restores from checkpoint with different parallelism, failovers once,
* and finishes after source generates all records.
Comment on lines +626 to +627
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The second job expects failover once, and finishes after source generates all records. So removing @ThrowableAnnotation(ThrowableType.NonRecoverableError) for TestException.

Also, I introduced ExpectedFinalJobStatus in UnalignedSettings to check the final JobStatus.

*/
@Test
public void shouldRescaleUnalignedCheckpoint() throws Exception {
final UnalignedSettings prescaleSettings =
new UnalignedSettings(topology)
.setParallelism(oldParallelism)
.setExpectedFailures(1)
.setSourceSleepMs(sourceSleepMs);
.setSourceSleepMs(sourceSleepMs)
.setExpectedFinalJobStatus(JobStatus.FAILED);
prescaleSettings.setGenerateCheckpoint(true);
final File checkpointDir = super.execute(prescaleSettings);

final String checkpointDir = super.execute(prescaleSettings);
assertThat(checkpointDir)
.as("First job must generate a checkpoint for rescale test to be valid.")
.isNotNull();
// resume
final UnalignedSettings postscaleSettings =
new UnalignedSettings(topology)
.setParallelism(newParallelism)
.setExpectedFailures(1);
.setExpectedFailures(1)
.setExpectedFinalJobStatus(JobStatus.FINISHED);
postscaleSettings.setRestoreCheckpoint(checkpointDir);
super.execute(postscaleSettings);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.accumulators.LongCounter;
Expand Down Expand Up @@ -57,13 +58,12 @@
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.runtime.throwable.ThrowableAnnotation;
import org.apache.flink.runtime.throwable.ThrowableType;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
import org.apache.flink.streaming.api.functions.sink.legacy.RichSinkFunction;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.util.RestartStrategyUtils;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler;
import org.apache.flink.util.Collector;
Expand All @@ -73,6 +73,7 @@

import org.apache.flink.shaded.guava33.com.google.common.collect.Iterables;

import org.assertj.core.api.Fail;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
Expand Down Expand Up @@ -103,6 +104,7 @@

import static org.apache.flink.shaded.guava33.com.google.common.collect.Iterables.getOnlyElement;
import static org.apache.flink.util.Preconditions.checkState;
import static org.assertj.core.api.Assertions.assertThat;

/**
* Base class for tests related to unaligned checkpoints.
Expand Down Expand Up @@ -150,7 +152,7 @@ public static void afterAll() {
}

@Nullable
protected File execute(UnalignedSettings settings) throws Exception {
protected String execute(UnalignedSettings settings) throws Exception {
final File checkpointDir = temp.newFolder();
Configuration conf = settings.getConfiguration(checkpointDir);

Expand Down Expand Up @@ -179,32 +181,42 @@ protected File execute(UnalignedSettings settings) throws Exception {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(conf);
settings.configure(env);
JobID jobID = null;
try {
// print the test parameters to help debugging when the case is stuck
System.out.println(
"Starting " + getClass().getCanonicalName() + "#" + name.getMethodName() + ".");
final CompletableFuture<JobSubmissionResult> result =
miniCluster.getMiniCluster().submitJob(streamGraph.getJobGraph());

final JobID jobID = result.get().getJobID();
jobID = result.get().getJobID();
checkCounters(
miniCluster
.getMiniCluster()
.requestJobResult(jobID)
.get()
.toJobExecutionResult(getClass().getClassLoader()));
if (settings.expectedFinalJobStatus != null) {
assertThat(miniCluster.getMiniCluster().getJobStatus(jobID))
.succeedsWithin(Duration.ofMinutes(1))
.isEqualTo(settings.expectedFinalJobStatus);
}
System.out.println(
"Finished " + getClass().getCanonicalName() + "#" + name.getMethodName() + ".");
if (settings.generateCheckpoint) {
return CommonTestUtils.getLatestCompletedCheckpointPath(
jobID, miniCluster.getMiniCluster())
.map(File::new)
.orElseThrow(() -> new AssertionError("Could not generate checkpoint"));
.orElseGet(() -> Fail.fail("Could not generate checkpoint"));
}
} catch (Exception e) {
if (ExceptionUtils.findThrowable(e, TestException.class).isEmpty()) {
throw e;
}
if (settings.generateCheckpoint) {
return CommonTestUtils.getLatestCompletedCheckpointPath(
jobID, miniCluster.getMiniCluster())
.orElseGet(() -> Fail.fail("Could not generate checkpoint"));
}
} finally {
miniCluster.after();
}
Expand Down Expand Up @@ -680,7 +692,7 @@ public String toString() {
protected static class UnalignedSettings {
private int parallelism;
private final int minCheckpoints = 10;
@Nullable private File restoreCheckpoint;
@Nullable private String restoreCheckpoint;
private boolean generateCheckpoint = false;
int expectedFailures = 0;
int tolerableCheckpointFailures = 0;
Expand All @@ -691,6 +703,7 @@ protected static class UnalignedSettings {
private int failuresAfterSourceFinishes = 0;
private ChannelType channelType = ChannelType.MIXED;
private long sourceSleepMs = 0;
@Nullable private JobStatus expectedFinalJobStatus = null;

public UnalignedSettings(DagCreator dagCreator) {
this.dagCreator = dagCreator;
Expand All @@ -701,7 +714,7 @@ public UnalignedSettings setParallelism(int parallelism) {
return this;
}

public UnalignedSettings setRestoreCheckpoint(File restoreCheckpoint) {
public UnalignedSettings setRestoreCheckpoint(String restoreCheckpoint) {
this.restoreCheckpoint = restoreCheckpoint;
return this;
}
Expand Down Expand Up @@ -746,6 +759,11 @@ public UnalignedSettings setSourceSleepMs(long sourceSleepMs) {
return this;
}

public UnalignedSettings setExpectedFinalJobStatus(JobStatus expectedFinalJobStatus) {
this.expectedFinalJobStatus = expectedFinalJobStatus;
return this;
}

public void configure(StreamExecutionEnvironment env) {
env.enableCheckpointing(Math.max(100L, parallelism * 50L));
env.getCheckpointConfig()
Expand All @@ -754,6 +772,8 @@ public void configure(StreamExecutionEnvironment env) {
env.getCheckpointConfig()
.setTolerableCheckpointFailureNumber(tolerableCheckpointFailures);
env.setParallelism(parallelism);
RestartStrategyUtils.configureFixedDelayRestartStrategy(
env, generateCheckpoint ? expectedFailures / 2 : expectedFailures, 100L);
Comment on lines +775 to +776
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reasoning behind the calculated number of expected failures?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It reverts the change in https://github.com/apache/flink/pull/27119/files#diff-ace775e80e66d4f4001bdaea6bcbaae1975bd5e9a5497532d8d7152e4090069aL752

The original intention is :

  • generateCheckpoint controls the operational phase: true is for the job before rescaling, and false is for the new job after rescaling
  • The value expectedFailures / 2 acts as the failure threshold for the first job. This setup ensures that the first job fails after half of the expected exceptions are met, allowing the second job to automatically recover from the generated checkpoint and continue consumption.

env.getCheckpointConfig().enableUnalignedCheckpoints(true);
// for custom partitioner
env.getCheckpointConfig().setForceUnalignedCheckpoints(true);
Expand All @@ -772,7 +792,7 @@ public Configuration getConfiguration(File checkpointDir) {
conf.set(StateBackendOptions.STATE_BACKEND, "hashmap");
conf.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString());
if (restoreCheckpoint != null) {
conf.set(StateRecoveryOptions.SAVEPOINT_PATH, restoreCheckpoint.toURI().toString());
conf.set(StateRecoveryOptions.SAVEPOINT_PATH, restoreCheckpoint);
Comment on lines -775 to +795
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It adds the wrong prefix, so updated restoreCheckpoint to String.

Image

}

conf.set(
Expand Down Expand Up @@ -1132,7 +1152,6 @@ protected static long checkHeader(long value) {
return value;
}

@ThrowableAnnotation(ThrowableType.NonRecoverableError)
static class TestException extends Exception {
public TestException(String s) {
super(s);
Expand Down