Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@
package org.apache.beam.runners.flink;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.SavepointFormatType;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -39,6 +42,7 @@ public class FlinkDetachedRunnerResult implements PipelineResult {

private JobClient jobClient;
private int jobCheckIntervalInSecs;
private volatile @Nullable CompletableFuture<String> drainSavepointFuture;

FlinkDetachedRunnerResult(JobClient jobClient, int jobCheckIntervalInSecs) {
this.jobClient = jobClient;
Expand All @@ -47,10 +51,22 @@ public class FlinkDetachedRunnerResult implements PipelineResult {

@Override
public State getState() {
CompletableFuture<String> drainFuture = drainSavepointFuture;
if (drainFuture != null) {
try {
return getDrainState(drainFuture);
} catch (IOException e) {
LOG.warn("Failed to drain Flink job. Querying Flink job state instead.", e);
}
}
Comment on lines +54 to +61
Comment on lines +54 to +61
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

If the drainSavepointFuture completes exceptionally (e.g., if the stop-with-savepoint operation fails), calling getState() will currently throw a RuntimeException and crash the caller. Instead, we should be more robust: if the drain operation failed, the job is likely still running or in its previous state. We can check isCompletedExceptionally() and, if true, fall back to querying the actual Flink job status via jobClient.getJobStatus().

Suggested change
CompletableFuture<String> drainFuture = drainSavepointFuture;
if (drainFuture != null) {
return getDrainState(drainFuture);
}
CompletableFuture<String> drainFuture = drainSavepointFuture;
if (drainFuture != null && !drainFuture.isCompletedExceptionally()) {
return drainFuture.isDone() ? State.DRAINED : State.DRAINING;
}

return getFlinkJobState();
}

private State getFlinkJobState() {
try {
return toBeamJobState(jobClient.getJobStatus().get());
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException("Fail to get flink job state", e);
throw new RuntimeException("Failed to get Flink job state", e);
}
}

Expand Down Expand Up @@ -82,6 +98,31 @@ public State cancel() throws IOException {
return getState();
}

@Override
public synchronized State drain() throws IOException {
CompletableFuture<String> drainFuture = drainSavepointFuture;
if (drainFuture == null || drainFuture.isCompletedExceptionally()) {
drainFuture = this.jobClient.stopWithSavepoint(true, null, SavepointFormatType.DEFAULT);
drainSavepointFuture = drainFuture;
}
return getDrainState(drainFuture);
}
Comment on lines +102 to +109

private State getDrainState(CompletableFuture<String> drainFuture) throws IOException {
if (!drainFuture.isDone()) {
return State.DRAINING;
}
try {
drainFuture.get();
return State.DRAINED;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Failed to drain Flink job", e);
} catch (ExecutionException e) {
throw new IOException("Failed to drain Flink job", e.getCause());
}
}
Comment on lines +102 to +124
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

We can improve the robustness of the drain() method in several ways:

  1. Support Retries: If a previous drain attempt failed (i.e., drainFuture.isCompletedExceptionally()), we should allow the user to retry the drain operation rather than permanently returning the cached failure.
  2. Throw Checked Exceptions: Since drain() is declared to throw IOException, we should wrap and throw IOException instead of RuntimeException when the future fails.
  3. Simplify Code: We can inline the getDrainState logic directly into drain() and remove the helper method entirely, making the code cleaner.
  @Override
  public synchronized State drain() throws IOException {
    CompletableFuture<String> drainFuture = drainSavepointFuture;
    if (drainFuture == null || drainFuture.isCompletedExceptionally()) {
      drainFuture = this.jobClient.stopWithSavepoint(true, null, SavepointFormatType.DEFAULT);
      drainSavepointFuture = drainFuture;
    }
    if (!drainFuture.isDone()) {
      return State.DRAINING;
    }
    try {
      drainFuture.get();
      return State.DRAINED;
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
      throw new IOException("Failed to drain flink job", e);
    } catch (ExecutionException e) {
      throw new IOException("Failed to drain flink job", e.getCause());
    }
  }


@Override
public State waitUntilFinish() {
return waitUntilFinish(Duration.millis(Long.MAX_VALUE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ public State cancel() {
return State.DONE;
}

@Override
public State drain() {
// We can only be called here when we are done.
return State.DONE;
}

@Override
public State waitUntilFinish() {
return State.DONE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,20 @@

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import org.apache.beam.sdk.PipelineResult;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.SavepointFormatType;
import org.joda.time.Duration;
import org.junit.Test;

Expand All @@ -47,4 +58,85 @@ public void testCancelDoesNotThrowAnException() {
result.cancel();
assertThat(result.getState(), is(PipelineResult.State.DONE));
}

@Test
public void testDrainDoneResultDoesNotThrowAnException() throws Exception {
FlinkRunnerResult result = new FlinkRunnerResult(Collections.emptyMap(), 100);
assertThat(result.drain(), is(PipelineResult.State.DONE));
}

@Test
public void testDetachedDrainReturnsDrainingThenDrained() throws Exception {
JobClient jobClient = mock(JobClient.class);
CompletableFuture<String> drainFuture = new CompletableFuture<>();
when(jobClient.stopWithSavepoint(true, null, SavepointFormatType.DEFAULT))
.thenReturn(drainFuture);
FlinkDetachedRunnerResult result = new FlinkDetachedRunnerResult(jobClient, 1);

assertThat(result.drain(), is(PipelineResult.State.DRAINING));
assertThat(result.getState(), is(PipelineResult.State.DRAINING));

drainFuture.complete("savepoint");
assertThat(result.getState(), is(PipelineResult.State.DRAINED));
verify(jobClient).stopWithSavepoint(true, null, SavepointFormatType.DEFAULT);
}

@Test
public void testDetachedDrainFailureThrowsIOException() throws Exception {
JobClient jobClient = mock(JobClient.class);
CompletableFuture<String> drainFuture = new CompletableFuture<>();
RuntimeException failure = new RuntimeException("savepoint failed");
drainFuture.completeExceptionally(failure);
when(jobClient.stopWithSavepoint(true, null, SavepointFormatType.DEFAULT))
.thenReturn(drainFuture);
FlinkDetachedRunnerResult result = new FlinkDetachedRunnerResult(jobClient, 1);

try {
result.drain();
fail("Expected IOException");
} catch (IOException e) {
assertThat(e.getMessage(), is("Failed to drain Flink job"));
assertSame(failure, e.getCause());
}
}

@Test
public void testDetachedGetStateFallsBackAfterDrainFailure() throws Exception {
JobClient jobClient = mock(JobClient.class);
CompletableFuture<String> drainFuture = new CompletableFuture<>();
drainFuture.completeExceptionally(new RuntimeException("savepoint failed"));
when(jobClient.stopWithSavepoint(true, null, SavepointFormatType.DEFAULT))
.thenReturn(drainFuture);
when(jobClient.getJobStatus()).thenReturn(CompletableFuture.completedFuture(JobStatus.RUNNING));
FlinkDetachedRunnerResult result = new FlinkDetachedRunnerResult(jobClient, 1);

try {
result.drain();
fail("Expected IOException");
} catch (IOException expected) {
assertThat(result.getState(), is(PipelineResult.State.RUNNING));
}
}

@Test
public void testDetachedDrainRetriesAfterFailure() throws Exception {
JobClient jobClient = mock(JobClient.class);
CompletableFuture<String> failedDrainFuture = new CompletableFuture<>();
failedDrainFuture.completeExceptionally(new RuntimeException("savepoint failed"));
CompletableFuture<String> retryDrainFuture = new CompletableFuture<>();
when(jobClient.stopWithSavepoint(true, null, SavepointFormatType.DEFAULT))
.thenReturn(failedDrainFuture, retryDrainFuture);
FlinkDetachedRunnerResult result = new FlinkDetachedRunnerResult(jobClient, 1);

try {
result.drain();
fail("Expected IOException");
} catch (IOException expected) {
assertThat(result.drain(), is(PipelineResult.State.DRAINING));
}

retryDrainFuture.complete("savepoint");
assertThat(result.getState(), is(PipelineResult.State.DRAINED));
verify(jobClient, times(2)).stopWithSavepoint(true, null, SavepointFormatType.DEFAULT);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,7 @@ State waitUntilFinish(
private void logTerminalState(State state) {
switch (state) {
case DONE:
case DRAINED:
case CANCELLED:
LOG.info("Job {} finished with status {}.", getJobId(), state);
break;
Expand Down Expand Up @@ -432,69 +433,83 @@ private Exception processJobMessages(
}

private AtomicReference<FutureTask<State>> cancelState = new AtomicReference<>();
private AtomicReference<FutureTask<State>> drainState = new AtomicReference<>();

@SuppressWarnings("Slf4jFormatShouldBeConst")
@Override
public State cancel() throws IOException {
// Enforce that a cancel() call on the job is done at most once - as
// a workaround for Dataflow service's current bugs with multiple
// cancellation, where it may sometimes return an error when cancelling
// a job that was already cancelled, but still report the job state as
// RUNNING.
// To partially work around these issues, we absorb duplicate cancel()
// calls. This, of course, doesn't address the case when the job terminates
// externally almost concurrently to calling cancel(), but at least it
// makes it possible to safely call cancel() multiple times and from
// multiple threads in one program.
FutureTask<State> tentativeCancelTask =
return requestJobState(cancelState, "JOB_STATE_CANCELLED", "cancel", "Cancel");
}

@Override
public State drain() throws IOException {
return requestJobState(drainState, "JOB_STATE_DRAINED", "drain", "Drain");
}

@SuppressWarnings("Slf4jFormatShouldBeConst")
private State requestJobState(
AtomicReference<FutureTask<State>> requestedState,
String dataflowRequestedState,
String action,
String capitalizedAction)
throws IOException {
// Enforce that a lifecycle request on the job is done at most once. This preserves the
// existing cancel() behavior and keeps duplicate drain() calls idempotent from one client.
FutureTask<State> tentativeTask =
new FutureTask<>(
() -> {
Job content = new Job();
content.setProjectId(getProjectId());
String currentJobId = getJobId();
content.setId(currentJobId);
content.setRequestedState("JOB_STATE_CANCELLED");
content.setRequestedState(dataflowRequestedState);
try {
Job job = dataflowClient.updateJob(currentJobId, content);
return MonitoringUtil.toState(job.getCurrentState());
} catch (IOException e) {
State state = getState();
String message = e.getMessage();
if (state.isTerminal()) {
LOG.warn("Cancel failed because job is already terminated. State is {}", state);
LOG.warn(
"{} failed because job is already terminated. State is {}",
capitalizedAction,
state);
return state;
} else if (e.getMessage().contains("has terminated")) {
} else if (message != null && message.contains("has terminated")) {
// This handles the case where the getState() call above returns RUNNING but the
// cancel was rejected because the job is in fact done. Hopefully, someday we can
// request was rejected because the job is in fact done. Hopefully, someday we can
// delete this code if there is better consistency between the State and whether
// Cancel succeeds.
// lifecycle requests succeed.
//
// Example message:
// Workflow modification failed. Causes: (7603adc9e9bff51e): Cannot perform
// operation 'cancel' on Job: 2017-04-01_22_50_59-9269855660514862348. Job has
// terminated in state SUCCESS: Workflow job:
// 2017-04-01_22_50_59-9269855660514862348 succeeded.
LOG.warn("Cancel failed because job is already terminated.", e);
LOG.warn("{} failed because job is already terminated.", capitalizedAction, e);
return state;
} else {
String errorMsg =
String.format(
"Failed to cancel job in state %s, "
+ "please go to the Developers Console to cancel it manually: %s",
"Failed to %s job in state %s, "
+ "please go to the Developers Console to %s it manually: %s",
action,
state,
action,
MonitoringUtil.getJobMonitoringPageURL(
getProjectId(), getRegion(), getJobId()));
LOG.warn(errorMsg);
throw new IOException(errorMsg, e);
}
}
});
if (cancelState.compareAndSet(null, tentativeCancelTask)) {
// This thread should perform cancellation, while others will
// only wait for the result.
cancelState.get().run();
if (requestedState.compareAndSet(null, tentativeTask)) {
// This thread should perform the lifecycle request, while others will only wait for the
// result.
requestedState.get().run();
}
try {
return cancelState.get().get();
return requestedState.get().get();
} catch (InterruptedException | ExecutionException e) {
throw new IOException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,11 +193,12 @@ private boolean waitForStreamingJobTermination(
}
return false;
} else {
return finalState == State.DONE && !messageHandler.hasSeenError();
return (finalState == State.DONE || finalState == State.DRAINED)
&& !messageHandler.hasSeenError();
}
}

/** Return {@code true} if job state is {@code State.DONE}. {@code false} otherwise. */
/** Return {@code true} if job state is {@code State.DONE} or {@code State.DRAINED}. */
private boolean waitForBatchJobTermination(
DataflowPipelineJob job, ErrorMonitorMessagesHandler messageHandler) {
{
Expand All @@ -210,7 +211,8 @@ private boolean waitForBatchJobTermination(
return false;
}

return job.getState() == State.DONE;
State state = job.getState();
return state == State.DONE || state == State.DRAINED;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,15 +224,17 @@ public static State toState(@Nullable String stateName) {

case "JOB_STATE_RUNNING":
case "JOB_STATE_PENDING": // Job has not yet started; closest mapping is RUNNING
case "JOB_STATE_DRAINING": // Job is still active; the closest mapping is RUNNING
case "JOB_STATE_CANCELLING": // Job is still active; the closest mapping is RUNNING
case "JOB_STATE_PAUSING": // Job is still active; the closest mapping is RUNNING
case "JOB_STATE_RESOURCE_CLEANING_UP": // Job is still active; the closest mapping is RUNNING
return State.RUNNING;
case "JOB_STATE_DRAINING":
return State.DRAINING;

case "JOB_STATE_DONE":
case "JOB_STATE_DRAINED": // Job has successfully terminated; closest mapping is DONE
return State.DONE;
case "JOB_STATE_DRAINED":
return State.DRAINED;
default:
LOG.warn(
"Unrecognized state from Dataflow service: {}."
Expand Down
Loading
Loading