Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptionsInternal;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.JobStatusChangedListener;
import org.apache.flink.core.execution.JobStatusChangedListenerUtils;
import org.apache.flink.core.execution.PipelineExecutor;
import org.apache.flink.runtime.blob.BlobClient;
import org.apache.flink.runtime.client.ClientUtils;
Expand All @@ -44,7 +42,6 @@
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
Expand Down Expand Up @@ -80,8 +77,6 @@ public class EmbeddedExecutor implements PipelineExecutor {

private final EmbeddedJobClientCreator jobClientCreator;

private final List<JobStatusChangedListener> jobStatusChangedListeners;

/**
* Creates a {@link EmbeddedExecutor}.
*
Expand Down Expand Up @@ -135,11 +130,6 @@ public EmbeddedExecutor(
this.terminalJobIds = checkNotNull(terminalJobIds);
this.dispatcherGateway = checkNotNull(dispatcherGateway);
this.jobClientCreator = checkNotNull(jobClientCreator);
this.jobStatusChangedListeners =
JobStatusChangedListenerUtils.createJobStatusChangedListeners(
Thread.currentThread().getContextClassLoader(),
configuration,
executorService);
}

@Override
Expand Down Expand Up @@ -237,10 +227,7 @@ private CompletableFuture<JobClient> submitAndGetJobClientFuture(
jobID -> jobClientCreator.getJobClient(actualJobId, userCodeClassloader))
.whenCompleteAsync(
(jobClient, throwable) -> {
if (throwable == null) {
PipelineExecutorUtils.notifyJobStatusListeners(
pipeline, streamGraph, jobStatusChangedListeners);
} else {
if (throwable != null) {
LOG.error(
"Failed to submit job graph to application cluster",
throwable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,25 +29,19 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.CacheSupportedPipelineExecutor;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.JobStatusChangedListener;
import org.apache.flink.core.execution.JobStatusChangedListenerUtils;
import org.apache.flink.core.execution.PipelineExecutor;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.function.FunctionUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;

import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
Expand All @@ -65,23 +59,14 @@ public class AbstractSessionClusterExecutor<
ClusterID, ClientFactory extends ClusterClientFactory<ClusterID>>
implements CacheSupportedPipelineExecutor {
private static final Logger LOG = LoggerFactory.getLogger(AbstractSessionClusterExecutor.class);
private final ExecutorService executorService =
Executors.newFixedThreadPool(
1, new ExecutorThreadFactory("Flink-SessionClusterExecutor-IO"));

private final ClientFactory clusterClientFactory;
private final Configuration configuration;
private final List<JobStatusChangedListener> jobStatusChangedListeners;

public AbstractSessionClusterExecutor(
@Nonnull final ClientFactory clusterClientFactory, Configuration configuration) {
this.clusterClientFactory = checkNotNull(clusterClientFactory);
this.configuration = configuration;
this.jobStatusChangedListeners =
JobStatusChangedListenerUtils.createJobStatusChangedListeners(
Thread.currentThread().getContextClassLoader(),
configuration,
executorService);
}

@Override
Expand Down Expand Up @@ -122,10 +107,7 @@ public CompletableFuture<JobClient> execute(
userCodeClassloader))
.whenCompleteAsync(
(jobClient, throwable) -> {
if (throwable == null) {
PipelineExecutorUtils.notifyJobStatusListeners(
pipeline, streamGraph, jobStatusChangedListeners);
} else {
if (throwable != null) {
LOG.error(
"Failed to submit job graph to remote session cluster.",
throwable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,15 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.JobStatusChangedListener;
import org.apache.flink.core.execution.JobStatusChangedListenerUtils;
import org.apache.flink.core.execution.PipelineExecutor;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function;

import static org.apache.flink.util.Preconditions.checkNotNull;
Expand All @@ -48,14 +42,11 @@
@Internal
public class LocalExecutor implements PipelineExecutor {
private static final Logger LOG = LoggerFactory.getLogger(LocalExecutor.class);
private final ExecutorService executorService =
Executors.newFixedThreadPool(1, new ExecutorThreadFactory("Flink-LocalExecutor-IO"));

public static final String NAME = "local";

private final Configuration configuration;
private final Function<MiniClusterConfiguration, MiniCluster> miniClusterFactory;
private final List<JobStatusChangedListener> jobStatusChangedListeners;

public static LocalExecutor create(Configuration configuration) {
return new LocalExecutor(configuration, MiniCluster::new);
Expand All @@ -72,11 +63,6 @@ private LocalExecutor(
Function<MiniClusterConfiguration, MiniCluster> miniClusterFactory) {
this.configuration = configuration;
this.miniClusterFactory = miniClusterFactory;
this.jobStatusChangedListeners =
JobStatusChangedListenerUtils.createJobStatusChangedListeners(
Thread.currentThread().getContextClassLoader(),
configuration,
executorService);
}

@Override
Expand All @@ -101,10 +87,7 @@ public CompletableFuture<JobClient> execute(
.submitJob(streamGraph, userCodeClassloader)
.whenComplete(
(ignored, throwable) -> {
if (throwable == null) {
PipelineExecutorUtils.notifyJobStatusListeners(
pipeline, streamGraph, jobStatusChangedListeners);
} else {
if (throwable != null) {
LOG.error(
"Failed to submit job graph to local mini cluster.",
throwable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
package org.apache.flink.core.execution;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.InstantiationUtil;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;

Expand All @@ -33,8 +35,13 @@
/** Util class for {@link JobStatusChangedListener}. */
@Internal
public final class JobStatusChangedListenerUtils {

private static final ConcurrentHashMap<String, JobStatusChangedListener> listenerCache =
new ConcurrentHashMap<>();

/**
* Create job status changed listeners from configuration for job.
* Create job status changed listeners from configuration for job. The same listener instance is
* returned for each factory class name across calls.
*
* @param configuration The job configuration.
* @return the job status changed listeners.
Expand All @@ -47,33 +54,46 @@ public static List<JobStatusChangedListener> createJobStatusChangedListeners(
}
return jobStatusChangedListeners.stream()
.map(
fac -> {
try {
return InstantiationUtil.instantiate(
fac,
JobStatusChangedListenerFactory.class,
userClassLoader)
.createListener(
new JobStatusChangedListenerFactory.Context() {
@Override
public Configuration getConfiguration() {
return configuration;
}
fac ->
listenerCache.computeIfAbsent(
fac,
factoryClassName -> {
try {
return InstantiationUtil.instantiate(
factoryClassName,
JobStatusChangedListenerFactory
.class,
userClassLoader)
.createListener(
new JobStatusChangedListenerFactory
.Context() {
@Override
public Configuration
getConfiguration() {
return configuration;
}

@Override
public ClassLoader getUserClassLoader() {
return userClassLoader;
}
@Override
public ClassLoader
getUserClassLoader() {
return userClassLoader;
}

@Override
public Executor getIOExecutor() {
return ioExecutor;
}
});
} catch (FlinkException e) {
throw new RuntimeException(e);
}
})
@Override
public Executor
getIOExecutor() {
return ioExecutor;
}
});
} catch (FlinkException e) {
throw new RuntimeException(e);
}
}))
.collect(Collectors.toList());
}

@VisibleForTesting
static void clearListenerCache() {
listenerCache.clear();
}
}
Loading