diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java index f7d82065b658..b93d142d4ac2 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.flink; import java.io.IOException; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import org.apache.beam.runners.core.metrics.MetricsContainerStepMap; import org.apache.beam.runners.flink.metrics.FlinkMetricContainer; @@ -25,6 +26,8 @@ import org.apache.beam.sdk.metrics.MetricResults; import org.apache.flink.api.common.JobStatus; import org.apache.flink.core.execution.JobClient; +import org.apache.flink.core.execution.SavepointFormatType; +import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,6 +42,7 @@ public class FlinkDetachedRunnerResult implements PipelineResult { private JobClient jobClient; private int jobCheckIntervalInSecs; + private volatile @Nullable CompletableFuture drainSavepointFuture; FlinkDetachedRunnerResult(JobClient jobClient, int jobCheckIntervalInSecs) { this.jobClient = jobClient; @@ -47,10 +51,22 @@ public class FlinkDetachedRunnerResult implements PipelineResult { @Override public State getState() { + CompletableFuture drainFuture = drainSavepointFuture; + if (drainFuture != null) { + try { + return getDrainState(drainFuture); + } catch (IOException e) { + LOG.warn("Failed to drain Flink job. Querying Flink job state instead.", e); + } + } + return getFlinkJobState(); + } + + private State getFlinkJobState() { try { return toBeamJobState(jobClient.getJobStatus().get()); } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException("Fail to get flink job state", e); + throw new RuntimeException("Failed to get Flink job state", e); } } @@ -82,6 +98,31 @@ public State cancel() throws IOException { return getState(); } + @Override + public synchronized State drain() throws IOException { + CompletableFuture drainFuture = drainSavepointFuture; + if (drainFuture == null || drainFuture.isCompletedExceptionally()) { + drainFuture = this.jobClient.stopWithSavepoint(true, null, SavepointFormatType.DEFAULT); + drainSavepointFuture = drainFuture; + } + return getDrainState(drainFuture); + } + + private State getDrainState(CompletableFuture drainFuture) throws IOException { + if (!drainFuture.isDone()) { + return State.DRAINING; + } + try { + drainFuture.get(); + return State.DRAINED; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Failed to drain Flink job", e); + } catch (ExecutionException e) { + throw new IOException("Failed to drain Flink job", e.getCause()); + } + } + @Override public State waitUntilFinish() { return waitUntilFinish(Duration.millis(Long.MAX_VALUE)); diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java index d892049bce4b..c0cce5349f20 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java @@ -64,6 +64,12 @@ public State cancel() { return State.DONE; } + @Override + public State drain() { + // We can only be called here when we are done. + return State.DONE; + } + @Override public State waitUntilFinish() { return State.DONE; diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkRunnerResultTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkRunnerResultTest.java index ba0981617fe3..09ceede9d586 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkRunnerResultTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkRunnerResultTest.java @@ -19,9 +19,20 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import java.io.IOException; import java.util.Collections; +import java.util.concurrent.CompletableFuture; import org.apache.beam.sdk.PipelineResult; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.core.execution.SavepointFormatType; import org.joda.time.Duration; import org.junit.Test; @@ -47,4 +58,85 @@ public void testCancelDoesNotThrowAnException() { result.cancel(); assertThat(result.getState(), is(PipelineResult.State.DONE)); } + + @Test + public void testDrainDoneResultDoesNotThrowAnException() throws Exception { + FlinkRunnerResult result = new FlinkRunnerResult(Collections.emptyMap(), 100); + assertThat(result.drain(), is(PipelineResult.State.DONE)); + } + + @Test + public void testDetachedDrainReturnsDrainingThenDrained() throws Exception { + JobClient jobClient = mock(JobClient.class); + CompletableFuture drainFuture = new CompletableFuture<>(); + when(jobClient.stopWithSavepoint(true, null, SavepointFormatType.DEFAULT)) + .thenReturn(drainFuture); + FlinkDetachedRunnerResult result = new FlinkDetachedRunnerResult(jobClient, 1); + + assertThat(result.drain(), is(PipelineResult.State.DRAINING)); + assertThat(result.getState(), is(PipelineResult.State.DRAINING)); + + drainFuture.complete("savepoint"); + assertThat(result.getState(), is(PipelineResult.State.DRAINED)); + verify(jobClient).stopWithSavepoint(true, null, SavepointFormatType.DEFAULT); + } + + @Test + public void testDetachedDrainFailureThrowsIOException() throws Exception { + JobClient jobClient = mock(JobClient.class); + CompletableFuture drainFuture = new CompletableFuture<>(); + RuntimeException failure = new RuntimeException("savepoint failed"); + drainFuture.completeExceptionally(failure); + when(jobClient.stopWithSavepoint(true, null, SavepointFormatType.DEFAULT)) + .thenReturn(drainFuture); + FlinkDetachedRunnerResult result = new FlinkDetachedRunnerResult(jobClient, 1); + + try { + result.drain(); + fail("Expected IOException"); + } catch (IOException e) { + assertThat(e.getMessage(), is("Failed to drain Flink job")); + assertSame(failure, e.getCause()); + } + } + + @Test + public void testDetachedGetStateFallsBackAfterDrainFailure() throws Exception { + JobClient jobClient = mock(JobClient.class); + CompletableFuture drainFuture = new CompletableFuture<>(); + drainFuture.completeExceptionally(new RuntimeException("savepoint failed")); + when(jobClient.stopWithSavepoint(true, null, SavepointFormatType.DEFAULT)) + .thenReturn(drainFuture); + when(jobClient.getJobStatus()).thenReturn(CompletableFuture.completedFuture(JobStatus.RUNNING)); + FlinkDetachedRunnerResult result = new FlinkDetachedRunnerResult(jobClient, 1); + + try { + result.drain(); + fail("Expected IOException"); + } catch (IOException expected) { + assertThat(result.getState(), is(PipelineResult.State.RUNNING)); + } + } + + @Test + public void testDetachedDrainRetriesAfterFailure() throws Exception { + JobClient jobClient = mock(JobClient.class); + CompletableFuture failedDrainFuture = new CompletableFuture<>(); + failedDrainFuture.completeExceptionally(new RuntimeException("savepoint failed")); + CompletableFuture retryDrainFuture = new CompletableFuture<>(); + when(jobClient.stopWithSavepoint(true, null, SavepointFormatType.DEFAULT)) + .thenReturn(failedDrainFuture, retryDrainFuture); + FlinkDetachedRunnerResult result = new FlinkDetachedRunnerResult(jobClient, 1); + + try { + result.drain(); + fail("Expected IOException"); + } catch (IOException expected) { + assertThat(result.drain(), is(PipelineResult.State.DRAINING)); + } + + retryDrainFuture.complete("savepoint"); + assertThat(result.getState(), is(PipelineResult.State.DRAINED)); + verify(jobClient, times(2)).stopWithSavepoint(true, null, SavepointFormatType.DEFAULT); + } } diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java index 400f161dee2f..afabe14f1d23 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java @@ -364,6 +364,7 @@ State waitUntilFinish( private void logTerminalState(State state) { switch (state) { case DONE: + case DRAINED: case CANCELLED: LOG.info("Job {} finished with status {}.", getJobId(), state); break; @@ -432,55 +433,69 @@ private Exception processJobMessages( } private AtomicReference> cancelState = new AtomicReference<>(); + private AtomicReference> drainState = new AtomicReference<>(); @SuppressWarnings("Slf4jFormatShouldBeConst") @Override public State cancel() throws IOException { - // Enforce that a cancel() call on the job is done at most once - as - // a workaround for Dataflow service's current bugs with multiple - // cancellation, where it may sometimes return an error when cancelling - // a job that was already cancelled, but still report the job state as - // RUNNING. - // To partially work around these issues, we absorb duplicate cancel() - // calls. This, of course, doesn't address the case when the job terminates - // externally almost concurrently to calling cancel(), but at least it - // makes it possible to safely call cancel() multiple times and from - // multiple threads in one program. - FutureTask tentativeCancelTask = + return requestJobState(cancelState, "JOB_STATE_CANCELLED", "cancel", "Cancel"); + } + + @Override + public State drain() throws IOException { + return requestJobState(drainState, "JOB_STATE_DRAINED", "drain", "Drain"); + } + + @SuppressWarnings("Slf4jFormatShouldBeConst") + private State requestJobState( + AtomicReference> requestedState, + String dataflowRequestedState, + String action, + String capitalizedAction) + throws IOException { + // Enforce that a lifecycle request on the job is done at most once. This preserves the + // existing cancel() behavior and keeps duplicate drain() calls idempotent from one client. + FutureTask tentativeTask = new FutureTask<>( () -> { Job content = new Job(); content.setProjectId(getProjectId()); String currentJobId = getJobId(); content.setId(currentJobId); - content.setRequestedState("JOB_STATE_CANCELLED"); + content.setRequestedState(dataflowRequestedState); try { Job job = dataflowClient.updateJob(currentJobId, content); return MonitoringUtil.toState(job.getCurrentState()); } catch (IOException e) { State state = getState(); + String message = e.getMessage(); if (state.isTerminal()) { - LOG.warn("Cancel failed because job is already terminated. State is {}", state); + LOG.warn( + "{} failed because job is already terminated. State is {}", + capitalizedAction, + state); return state; - } else if (e.getMessage().contains("has terminated")) { + } else if (message != null && message.contains("has terminated")) { // This handles the case where the getState() call above returns RUNNING but the - // cancel was rejected because the job is in fact done. Hopefully, someday we can + // request was rejected because the job is in fact done. Hopefully, someday we can // delete this code if there is better consistency between the State and whether - // Cancel succeeds. + // lifecycle requests succeed. // // Example message: // Workflow modification failed. Causes: (7603adc9e9bff51e): Cannot perform // operation 'cancel' on Job: 2017-04-01_22_50_59-9269855660514862348. Job has // terminated in state SUCCESS: Workflow job: // 2017-04-01_22_50_59-9269855660514862348 succeeded. - LOG.warn("Cancel failed because job is already terminated.", e); + LOG.warn("{} failed because job is already terminated.", capitalizedAction, e); return state; } else { String errorMsg = String.format( - "Failed to cancel job in state %s, " - + "please go to the Developers Console to cancel it manually: %s", + "Failed to %s job in state %s, " + + "please go to the Developers Console to %s it manually: %s", + action, state, + action, MonitoringUtil.getJobMonitoringPageURL( getProjectId(), getRegion(), getJobId())); LOG.warn(errorMsg); @@ -488,13 +503,13 @@ public State cancel() throws IOException { } } }); - if (cancelState.compareAndSet(null, tentativeCancelTask)) { - // This thread should perform cancellation, while others will - // only wait for the result. - cancelState.get().run(); + if (requestedState.compareAndSet(null, tentativeTask)) { + // This thread should perform the lifecycle request, while others will only wait for the + // result. + requestedState.get().run(); } try { - return cancelState.get().get(); + return requestedState.get().get(); } catch (InterruptedException | ExecutionException e) { throw new IOException(e); } diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java index b00194dacb08..bb365656dab9 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java @@ -193,11 +193,12 @@ private boolean waitForStreamingJobTermination( } return false; } else { - return finalState == State.DONE && !messageHandler.hasSeenError(); + return (finalState == State.DONE || finalState == State.DRAINED) + && !messageHandler.hasSeenError(); } } - /** Return {@code true} if job state is {@code State.DONE}. {@code false} otherwise. */ + /** Return {@code true} if job state is {@code State.DONE} or {@code State.DRAINED}. */ private boolean waitForBatchJobTermination( DataflowPipelineJob job, ErrorMonitorMessagesHandler messageHandler) { { @@ -210,7 +211,8 @@ private boolean waitForBatchJobTermination( return false; } - return job.getState() == State.DONE; + State state = job.getState(); + return state == State.DONE || state == State.DRAINED; } } diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java index d117cf786129..acda5532e48d 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java @@ -224,15 +224,17 @@ public static State toState(@Nullable String stateName) { case "JOB_STATE_RUNNING": case "JOB_STATE_PENDING": // Job has not yet started; closest mapping is RUNNING - case "JOB_STATE_DRAINING": // Job is still active; the closest mapping is RUNNING case "JOB_STATE_CANCELLING": // Job is still active; the closest mapping is RUNNING case "JOB_STATE_PAUSING": // Job is still active; the closest mapping is RUNNING case "JOB_STATE_RESOURCE_CLEANING_UP": // Job is still active; the closest mapping is RUNNING return State.RUNNING; + case "JOB_STATE_DRAINING": + return State.DRAINING; case "JOB_STATE_DONE": - case "JOB_STATE_DRAINED": // Job has successfully terminated; closest mapping is DONE return State.DONE; + case "JOB_STATE_DRAINED": + return State.DRAINED; default: LOG.warn( "Unrecognized state from Dataflow service: {}." diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java index 54ba10df9d1c..be7ad2e6e110 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java @@ -406,6 +406,26 @@ public void testCancelUnterminatedJobThatSucceeds() throws IOException { verifyNoMoreInteractions(mockJobs); } + @Test + public void testDrainUnterminatedJobThatSucceeds() throws IOException { + Dataflow.Projects.Locations.Jobs.Update update = + mock(Dataflow.Projects.Locations.Jobs.Update.class); + when(mockJobs.update(eq(PROJECT_ID), eq(REGION_ID), eq(JOB_ID), any(Job.class))) + .thenReturn(update); + when(update.execute()).thenReturn(new Job().setCurrentState("JOB_STATE_DRAINING")); + + DataflowPipelineJob job = + new DataflowPipelineJob(DataflowClient.create(options), JOB_ID, options, null); + + assertEquals(State.DRAINING, job.drain()); + Job content = new Job(); + content.setProjectId(PROJECT_ID); + content.setId(JOB_ID); + content.setRequestedState("JOB_STATE_DRAINED"); + verify(mockJobs).update(eq(PROJECT_ID), eq(REGION_ID), eq(JOB_ID), eq(content)); + verifyNoMoreInteractions(mockJobs); + } + @Test public void testCancelUnterminatedJobThatFails() throws IOException { Dataflow.Projects.Locations.Jobs.Get statusRequest = @@ -432,6 +452,32 @@ public void testCancelUnterminatedJobThatFails() throws IOException { job.cancel(); } + @Test + public void testCancelUnterminatedJobWithNullFailureMessage() throws IOException { + Dataflow.Projects.Locations.Jobs.Get statusRequest = + mock(Dataflow.Projects.Locations.Jobs.Get.class); + + Job statusResponse = new Job(); + statusResponse.setCurrentState("JOB_STATE_RUNNING"); + when(mockJobs.get(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(statusRequest); + when(statusRequest.execute()).thenReturn(statusResponse); + + Dataflow.Projects.Locations.Jobs.Update update = + mock(Dataflow.Projects.Locations.Jobs.Update.class); + when(mockJobs.update(eq(PROJECT_ID), eq(REGION_ID), eq(JOB_ID), any(Job.class))) + .thenReturn(update); + when(update.execute()).thenThrow(new IOException()); + + DataflowPipelineJob job = + new DataflowPipelineJob(DataflowClient.create(options), JOB_ID, options, null); + + thrown.expect(IOException.class); + thrown.expectMessage( + "Failed to cancel job in state RUNNING, " + + "please go to the Developers Console to cancel it manually:"); + job.cancel(); + } + /** * Test that {@link DataflowPipelineJob#cancel} doesn't throw if the Dataflow service returns * non-terminal state even though the cancel API call failed, which can happen in practice. diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java index 5f76b6750ffa..35af1be4cdfc 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java @@ -100,9 +100,9 @@ public void testToStateNormal() { // Non-trivially mapped cases assertEquals(State.STOPPED, MonitoringUtil.toState("JOB_STATE_PAUSED")); - assertEquals(State.RUNNING, MonitoringUtil.toState("JOB_STATE_DRAINING")); + assertEquals(State.DRAINING, MonitoringUtil.toState("JOB_STATE_DRAINING")); assertEquals(State.RUNNING, MonitoringUtil.toState("JOB_STATE_PAUSING")); - assertEquals(State.DONE, MonitoringUtil.toState("JOB_STATE_DRAINED")); + assertEquals(State.DRAINED, MonitoringUtil.toState("JOB_STATE_DRAINED")); } @Test diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java index 91313f3924aa..6c5e0db98595 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java @@ -43,6 +43,19 @@ public interface PipelineResult { */ State cancel() throws IOException; + /** + * Drains the pipeline execution. + * + *

Draining requests that the runner stop accepting new input and finish processing data that + * has already entered the pipeline. + * + * @throws IOException if there is a problem executing the drain request. + * @throws UnsupportedOperationException if the runner does not support draining. + */ + default State drain() throws IOException { + throw new UnsupportedOperationException("Runner does not support draining."); + } + /** * Waits until the pipeline finishes and returns the final status. It times out after the given * duration. @@ -94,6 +107,12 @@ enum State { /** The job has been updated. */ UPDATED(true, true), + /** The job is draining its data. */ + DRAINING(false, false), + + /** The job has completed draining its data. */ + DRAINED(true, false), + /** The job state reported by a runner cannot be interpreted by the SDK. */ UNRECOGNIZED(false, false); diff --git a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java index 7e4e5da0d853..aaf63705ad2a 100644 --- a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java +++ b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java @@ -515,9 +515,11 @@ private void invokeBuilderForPublishOnlyPipeline(PipelineBuilder case UNRECOGNIZED: case STOPPED: case RUNNING: + case DRAINING: // Keep going. break; case DONE: + case DRAINED: // All done. running = false; break;