From 05af3214421b1f5e40cb0e0e58afbca27c0c99f6 Mon Sep 17 00:00:00 2001 From: Peter Huang Date: Wed, 22 Apr 2026 16:00:03 -0700 Subject: [PATCH] [FLINK-39491][runtime] move lineage Listener creation to Dispatcher --- .../executors/EmbeddedExecutor.java | 15 +- .../AbstractSessionClusterExecutor.java | 20 +-- .../deployment/executors/LocalExecutor.java | 19 +-- .../JobStatusChangedListenerUtils.java | 72 ++++++---- .../JobStatusChangedListenerUtilsTest.java | 128 ++++++++++++++++++ .../flink/runtime/dispatcher/Dispatcher.java | 36 +++++ .../execution/DefaultJobCreatedEvent.java | 0 .../runtime/execution/JobCreatedEvent.java | 0 ...iClusterPipelineExecutorServiceLoader.java | 20 +-- 9 files changed, 214 insertions(+), 96 deletions(-) create mode 100644 flink-core/src/test/java/org/apache/flink/core/execution/JobStatusChangedListenerUtilsTest.java rename {flink-streaming-java => flink-runtime}/src/main/java/org/apache/flink/streaming/runtime/execution/DefaultJobCreatedEvent.java (100%) rename {flink-streaming-java => flink-runtime}/src/main/java/org/apache/flink/streaming/runtime/execution/JobCreatedEvent.java (100%) diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java index 9adebbb4d72be..5c0c5ec357e7d 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java @@ -26,8 +26,6 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.PipelineOptionsInternal; import org.apache.flink.core.execution.JobClient; -import org.apache.flink.core.execution.JobStatusChangedListener; -import org.apache.flink.core.execution.JobStatusChangedListenerUtils; import org.apache.flink.core.execution.PipelineExecutor; import org.apache.flink.runtime.blob.BlobClient; import org.apache.flink.runtime.client.ClientUtils; @@ -44,7 +42,6 @@ import java.time.Duration; import java.util.Collection; import java.util.Collections; -import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -80,8 +77,6 @@ public class EmbeddedExecutor implements PipelineExecutor { private final EmbeddedJobClientCreator jobClientCreator; - private final List jobStatusChangedListeners; - /** * Creates a {@link EmbeddedExecutor}. * @@ -135,11 +130,6 @@ public EmbeddedExecutor( this.terminalJobIds = checkNotNull(terminalJobIds); this.dispatcherGateway = checkNotNull(dispatcherGateway); this.jobClientCreator = checkNotNull(jobClientCreator); - this.jobStatusChangedListeners = - JobStatusChangedListenerUtils.createJobStatusChangedListeners( - Thread.currentThread().getContextClassLoader(), - configuration, - executorService); } @Override @@ -237,10 +227,7 @@ private CompletableFuture submitAndGetJobClientFuture( jobID -> jobClientCreator.getJobClient(actualJobId, userCodeClassloader)) .whenCompleteAsync( (jobClient, throwable) -> { - if (throwable == null) { - PipelineExecutorUtils.notifyJobStatusListeners( - pipeline, streamGraph, jobStatusChangedListeners); - } else { + if (throwable != null) { LOG.error( "Failed to submit job graph to application cluster", throwable); diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java index e1a667d159271..95752f3fb48ec 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java @@ -29,13 +29,10 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.core.execution.CacheSupportedPipelineExecutor; import org.apache.flink.core.execution.JobClient; -import org.apache.flink.core.execution.JobStatusChangedListener; -import org.apache.flink.core.execution.JobStatusChangedListenerUtils; import org.apache.flink.core.execution.PipelineExecutor; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.util.AbstractID; -import org.apache.flink.util.concurrent.ExecutorThreadFactory; import org.apache.flink.util.function.FunctionUtils; import org.slf4j.Logger; @@ -43,11 +40,8 @@ import javax.annotation.Nonnull; -import java.util.List; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; @@ -65,23 +59,14 @@ public class AbstractSessionClusterExecutor< ClusterID, ClientFactory extends ClusterClientFactory> implements CacheSupportedPipelineExecutor { private static final Logger LOG = LoggerFactory.getLogger(AbstractSessionClusterExecutor.class); - private final ExecutorService executorService = - Executors.newFixedThreadPool( - 1, new ExecutorThreadFactory("Flink-SessionClusterExecutor-IO")); private final ClientFactory clusterClientFactory; private final Configuration configuration; - private final List jobStatusChangedListeners; public AbstractSessionClusterExecutor( @Nonnull final ClientFactory clusterClientFactory, Configuration configuration) { this.clusterClientFactory = checkNotNull(clusterClientFactory); this.configuration = configuration; - this.jobStatusChangedListeners = - JobStatusChangedListenerUtils.createJobStatusChangedListeners( - Thread.currentThread().getContextClassLoader(), - configuration, - executorService); } @Override @@ -122,10 +107,7 @@ public CompletableFuture execute( userCodeClassloader)) .whenCompleteAsync( (jobClient, throwable) -> { - if (throwable == null) { - PipelineExecutorUtils.notifyJobStatusListeners( - pipeline, streamGraph, jobStatusChangedListeners); - } else { + if (throwable != null) { LOG.error( "Failed to submit job graph to remote session cluster.", throwable); diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/LocalExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/LocalExecutor.java index c5295520ee052..5a983d35fe517 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/LocalExecutor.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/LocalExecutor.java @@ -24,21 +24,15 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.core.execution.JobClient; -import org.apache.flink.core.execution.JobStatusChangedListener; -import org.apache.flink.core.execution.JobStatusChangedListenerUtils; import org.apache.flink.core.execution.PipelineExecutor; import org.apache.flink.runtime.minicluster.MiniCluster; import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; import org.apache.flink.streaming.api.graph.StreamGraph; -import org.apache.flink.util.concurrent.ExecutorThreadFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.function.Function; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -48,14 +42,11 @@ @Internal public class LocalExecutor implements PipelineExecutor { private static final Logger LOG = LoggerFactory.getLogger(LocalExecutor.class); - private final ExecutorService executorService = - Executors.newFixedThreadPool(1, new ExecutorThreadFactory("Flink-LocalExecutor-IO")); public static final String NAME = "local"; private final Configuration configuration; private final Function miniClusterFactory; - private final List jobStatusChangedListeners; public static LocalExecutor create(Configuration configuration) { return new LocalExecutor(configuration, MiniCluster::new); @@ -72,11 +63,6 @@ private LocalExecutor( Function miniClusterFactory) { this.configuration = configuration; this.miniClusterFactory = miniClusterFactory; - this.jobStatusChangedListeners = - JobStatusChangedListenerUtils.createJobStatusChangedListeners( - Thread.currentThread().getContextClassLoader(), - configuration, - executorService); } @Override @@ -101,10 +87,7 @@ public CompletableFuture execute( .submitJob(streamGraph, userCodeClassloader) .whenComplete( (ignored, throwable) -> { - if (throwable == null) { - PipelineExecutorUtils.notifyJobStatusListeners( - pipeline, streamGraph, jobStatusChangedListeners); - } else { + if (throwable != null) { LOG.error( "Failed to submit job graph to local mini cluster.", throwable); diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/JobStatusChangedListenerUtils.java b/flink-core/src/main/java/org/apache/flink/core/execution/JobStatusChangedListenerUtils.java index a73641d7ffd6d..9194a1b0be1d2 100644 --- a/flink-core/src/main/java/org/apache/flink/core/execution/JobStatusChangedListenerUtils.java +++ b/flink-core/src/main/java/org/apache/flink/core/execution/JobStatusChangedListenerUtils.java @@ -19,12 +19,14 @@ package org.apache.flink.core.execution; import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; import org.apache.flink.util.FlinkException; import org.apache.flink.util.InstantiationUtil; import java.util.Collections; import java.util.List; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.stream.Collectors; @@ -33,8 +35,13 @@ /** Util class for {@link JobStatusChangedListener}. */ @Internal public final class JobStatusChangedListenerUtils { + + private static final ConcurrentHashMap listenerCache = + new ConcurrentHashMap<>(); + /** - * Create job status changed listeners from configuration for job. + * Create job status changed listeners from configuration for job. The same listener instance is + * returned for each factory class name across calls. * * @param configuration The job configuration. * @return the job status changed listeners. @@ -47,33 +54,46 @@ public static List createJobStatusChangedListeners( } return jobStatusChangedListeners.stream() .map( - fac -> { - try { - return InstantiationUtil.instantiate( - fac, - JobStatusChangedListenerFactory.class, - userClassLoader) - .createListener( - new JobStatusChangedListenerFactory.Context() { - @Override - public Configuration getConfiguration() { - return configuration; - } + fac -> + listenerCache.computeIfAbsent( + fac, + factoryClassName -> { + try { + return InstantiationUtil.instantiate( + factoryClassName, + JobStatusChangedListenerFactory + .class, + userClassLoader) + .createListener( + new JobStatusChangedListenerFactory + .Context() { + @Override + public Configuration + getConfiguration() { + return configuration; + } - @Override - public ClassLoader getUserClassLoader() { - return userClassLoader; - } + @Override + public ClassLoader + getUserClassLoader() { + return userClassLoader; + } - @Override - public Executor getIOExecutor() { - return ioExecutor; - } - }); - } catch (FlinkException e) { - throw new RuntimeException(e); - } - }) + @Override + public Executor + getIOExecutor() { + return ioExecutor; + } + }); + } catch (FlinkException e) { + throw new RuntimeException(e); + } + })) .collect(Collectors.toList()); } + + @VisibleForTesting + static void clearListenerCache() { + listenerCache.clear(); + } } diff --git a/flink-core/src/test/java/org/apache/flink/core/execution/JobStatusChangedListenerUtilsTest.java b/flink-core/src/test/java/org/apache/flink/core/execution/JobStatusChangedListenerUtilsTest.java new file mode 100644 index 0000000000000..f8bacefdf997b --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/core/execution/JobStatusChangedListenerUtilsTest.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.execution; + +import org.apache.flink.configuration.Configuration; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.List; + +import static org.apache.flink.configuration.DeploymentOptions.JOB_STATUS_CHANGED_LISTENERS; +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link JobStatusChangedListenerUtils}. */ +class JobStatusChangedListenerUtilsTest { + + @AfterEach + void clearCache() { + JobStatusChangedListenerUtils.clearListenerCache(); + } + + @Test + void testEmptyListReturnedWhenNotConfigured() { + List listeners = + JobStatusChangedListenerUtils.createJobStatusChangedListeners( + getClass().getClassLoader(), new Configuration(), Runnable::run); + + assertThat(listeners).isEmpty(); + } + + @Test + void testListenerCreatedFromConfiguredFactory() { + Configuration configuration = configWithFactories(TestListenerFactory.class); + + List listeners = + JobStatusChangedListenerUtils.createJobStatusChangedListeners( + getClass().getClassLoader(), configuration, Runnable::run); + + assertThat(listeners).hasSize(1).allMatch(l -> l instanceof TestListener); + } + + @Test + void testSameListenerInstanceReturnedAcrossCalls() { + Configuration configuration = configWithFactories(TestListenerFactory.class); + ClassLoader classLoader = getClass().getClassLoader(); + + List first = + JobStatusChangedListenerUtils.createJobStatusChangedListeners( + classLoader, configuration, Runnable::run); + List second = + JobStatusChangedListenerUtils.createJobStatusChangedListeners( + classLoader, configuration, Runnable::run); + + assertThat(first.get(0)).isSameAs(second.get(0)); + } + + @Test + void testDifferentFactoryClassesReturnDifferentInstances() { + Configuration configuration = + configWithFactories(TestListenerFactory.class, AnotherTestListenerFactory.class); + + List listeners = + JobStatusChangedListenerUtils.createJobStatusChangedListeners( + getClass().getClassLoader(), configuration, Runnable::run); + + assertThat(listeners).hasSize(2); + assertThat(listeners.get(0)).isNotSameAs(listeners.get(1)); + assertThat(listeners.get(0)).isInstanceOf(TestListener.class); + assertThat(listeners.get(1)).isInstanceOf(AnotherTestListener.class); + } + + private static Configuration configWithFactories(Class... factoryClasses) { + Configuration configuration = new Configuration(); + String[] names = Arrays.stream(factoryClasses).map(Class::getName).toArray(String[]::new); + configuration.set(JOB_STATUS_CHANGED_LISTENERS, Arrays.asList(names)); + return configuration; + } + + // ------------------------------------------------------------------------- + // Test factory / listener implementations + // ------------------------------------------------------------------------- + + /** First test listener. */ + public static class TestListener implements JobStatusChangedListener { + @Override + public void onEvent(JobStatusChangedEvent event) {} + } + + /** Factory that creates {@link TestListener}. */ + public static class TestListenerFactory implements JobStatusChangedListenerFactory { + @Override + public JobStatusChangedListener createListener(Context context) { + return new TestListener(); + } + } + + /** Second test listener, distinct from {@link TestListener}. */ + public static class AnotherTestListener implements JobStatusChangedListener { + @Override + public void onEvent(JobStatusChangedEvent event) {} + } + + /** Factory that creates {@link AnotherTestListener}. */ + public static class AnotherTestListenerFactory implements JobStatusChangedListenerFactory { + @Override + public JobStatusChangedListener createListener(Context context) { + return new AnotherTestListener(); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index 6d53fa0e2216b..9f8619d6a8861 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -32,10 +32,13 @@ import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.DeploymentOptions; +import org.apache.flink.configuration.ExecutionOptions; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.configuration.WebOptions; import org.apache.flink.core.execution.CheckpointType; +import org.apache.flink.core.execution.JobStatusChangedListener; +import org.apache.flink.core.execution.JobStatusChangedListenerUtils; import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.core.failure.FailureEnricher; import org.apache.flink.metrics.MetricGroup; @@ -115,7 +118,9 @@ import org.apache.flink.runtime.shuffle.ShuffleMasterSnapshotUtil; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; import org.apache.flink.streaming.api.graph.ExecutionPlan; +import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator; +import org.apache.flink.streaming.runtime.execution.DefaultJobCreatedEvent; import org.apache.flink.util.CollectionUtil; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; @@ -1337,6 +1342,37 @@ private void persistAndRunJob(ExecutionPlan executionPlan) throws Exception { createJobMasterRunner(executionPlan), ExecutionType.SUBMISSION, executionPlan.getApplicationId().orElse(null)); + notifyJobCreated(executionPlan); + } + + private void notifyJobCreated(ExecutionPlan executionPlan) { + if (!(executionPlan instanceof StreamGraph)) { + return; + } + StreamGraph streamGraph = (StreamGraph) executionPlan; + List listeners = + JobStatusChangedListenerUtils.createJobStatusChangedListeners( + Thread.currentThread().getContextClassLoader(), + configuration, + getIoExecutor(executionPlan.getJobID())); + listeners.forEach( + listener -> { + try { + listener.onEvent( + new DefaultJobCreatedEvent( + executionPlan.getJobID(), + executionPlan.getName(), + streamGraph.getLineageGraph(), + executionPlan + .getJobConfiguration() + .get(ExecutionOptions.RUNTIME_MODE))); + } catch (Throwable e) { + log.error( + "Failed to notify job status changed listener {}", + listener.getClass().getName(), + e); + } + }); } private JobManagerRunner createJobMasterRunner(ExecutionPlan executionPlan) throws Exception { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/execution/DefaultJobCreatedEvent.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/execution/DefaultJobCreatedEvent.java similarity index 100% rename from flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/execution/DefaultJobCreatedEvent.java rename to flink-runtime/src/main/java/org/apache/flink/streaming/runtime/execution/DefaultJobCreatedEvent.java diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/execution/JobCreatedEvent.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/execution/JobCreatedEvent.java similarity index 100% rename from flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/execution/JobCreatedEvent.java rename to flink-runtime/src/main/java/org/apache/flink/streaming/runtime/execution/JobCreatedEvent.java diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterPipelineExecutorServiceLoader.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterPipelineExecutorServiceLoader.java index 6382b554ebdaa..7beb104f04ade 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterPipelineExecutorServiceLoader.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterPipelineExecutorServiceLoader.java @@ -27,8 +27,6 @@ import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.core.execution.CacheSupportedPipelineExecutor; import org.apache.flink.core.execution.JobClient; -import org.apache.flink.core.execution.JobStatusChangedListener; -import org.apache.flink.core.execution.JobStatusChangedListenerUtils; import org.apache.flink.core.execution.PipelineExecutor; import org.apache.flink.core.execution.PipelineExecutorFactory; import org.apache.flink.core.execution.PipelineExecutorServiceLoader; @@ -39,7 +37,6 @@ import org.apache.flink.runtime.minicluster.MiniClusterJobClient; import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.util.AbstractID; -import org.apache.flink.util.concurrent.ExecutorThreadFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,11 +45,8 @@ import java.net.MalformedURLException; import java.net.URL; import java.util.Collection; -import java.util.List; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.stream.Stream; /** @@ -150,19 +144,10 @@ public PipelineExecutor getExecutor(Configuration configuration) { } private static class MiniClusterExecutor implements CacheSupportedPipelineExecutor { - private final ExecutorService executorService = - Executors.newFixedThreadPool( - 1, new ExecutorThreadFactory("Flink-MiniClusterExecutor-IO")); private final MiniCluster miniCluster; - private final List jobStatusChangedListeners; public MiniClusterExecutor(MiniCluster miniCluster) { this.miniCluster = miniCluster; - this.jobStatusChangedListeners = - JobStatusChangedListenerUtils.createJobStatusChangedListeners( - Thread.currentThread().getContextClassLoader(), - miniCluster.getConfiguration(), - executorService); } @Override @@ -181,10 +166,7 @@ public CompletableFuture execute( .submitJob(streamGraph) .whenComplete( (ignored, throwable) -> { - if (throwable == null) { - PipelineExecutorUtils.notifyJobStatusListeners( - pipeline, streamGraph, jobStatusChangedListeners); - } else { + if (throwable != null) { LOG.error( "Failed to submit job graph to mini cluster.", throwable);