Skip to content

Commit 6ba949e

Browse files
author
Samrat
committed
[FLINK-33100] Implement YarnJobListFetcher
1 parent b820577 commit 6ba949e

File tree

8 files changed

+665
-14
lines changed

8 files changed

+665
-14
lines changed

docs/content/docs/custom-resource/autoscaler.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,14 @@ org.apache.flink.autoscaler.standalone.StandaloneAutoscalerEntrypoint \
275275
Updating the `autoscaler.standalone.fetcher.flink-cluster.host` and `autoscaler.standalone.fetcher.flink-cluster.port`
276276
based on your flink cluster. In general, the host and port are the same as Flink WebUI.
277277
278+
To select the job fetcher use:
279+
280+
```
281+
--autoscaler.standalone.fetcher.type FLINK_CLUSTER|YARN
282+
```
283+
284+
When running against Flink-on-YARN (`YARN`), set the host/port to the YARN web proxy endpoint that exposes the JobManager REST API.
285+
278286
All autoscaler related options can be set at autoscaler standalone level, and the configuration at job-level can
279287
override the default value provided in the autoscaler standalone, such as:
280288

flink-autoscaler-standalone/README.md

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,9 @@ Please click [here](../flink-autoscaler/README.md) to check out extensibility of
6969
`JobAutoScalerContext` of the job. It has a control loop that periodically calls
7070
`JobListFetcher#fetch` to fetch the job list and scale these jobs.
7171

72-
Currently `FlinkClusterJobListFetcher` is the only implementation of the `JobListFetcher`
73-
interface, that's why `Flink Autoscaler Standalone` only supports a single Flink cluster so far.
74-
We will implement `YarnJobListFetcher` in the future, `Flink Autoscaler Standalone` will call
75-
`YarnJobListFetcher#fetch` to fetch job list from yarn cluster periodically.
72+
Currently `FlinkClusterJobListFetcher` and `YarnJobListFetcher` are implementations of the
73+
`JobListFetcher` interface. that's why `Flink Autoscaler Standalone` only supports a single Flink cluster so far.
74+
`YarnJobListFetcher` enables fetching jobs and per-job configuration from
75+
Flink-on-YARN clusters using a provided `RestClusterClient`
76+
77+
Select which one to use via `autoscaler.standalone.fetcher.type` (`FLINK_CLUSTER` or `YARN`).

flink-autoscaler-standalone/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,12 @@ under the License.
101101
</exclusions>
102102
</dependency>
103103

104+
<dependency>
105+
<groupId>org.apache.flink</groupId>
106+
<artifactId>flink-yarn</artifactId>
107+
<version>${flink.version}</version>
108+
</dependency>
109+
104110
<!-- Logging -->
105111

106112
<dependency>

flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerEntrypoint.java

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,16 @@
2727
import org.apache.flink.autoscaler.ScalingExecutor;
2828
import org.apache.flink.autoscaler.ScalingMetricEvaluator;
2929
import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
30+
import org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions;
3031
import org.apache.flink.autoscaler.standalone.flinkcluster.FlinkClusterJobListFetcher;
3132
import org.apache.flink.autoscaler.standalone.realizer.RescaleApiScalingRealizer;
33+
import org.apache.flink.autoscaler.standalone.yarn.YarnJobListFetcher;
3234
import org.apache.flink.autoscaler.state.AutoScalerStateStore;
3335
import org.apache.flink.client.program.rest.RestClusterClient;
3436
import org.apache.flink.configuration.Configuration;
3537
import org.apache.flink.configuration.GlobalConfiguration;
3638
import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
39+
import org.apache.flink.util.function.FunctionWithException;
3740

3841
import org.slf4j.Logger;
3942
import org.slf4j.LoggerFactory;
@@ -73,15 +76,24 @@ JobListFetcher<KEY, Context> createJobListFetcher(Configuration conf) {
7376
var port = conf.get(FETCHER_FLINK_CLUSTER_PORT);
7477
var restServerAddress = String.format("http://%s:%s", host, port);
7578

76-
return (JobListFetcher<KEY, Context>)
77-
new FlinkClusterJobListFetcher(
78-
configuration ->
79-
new RestClusterClient<>(
80-
configuration,
81-
"clusterId",
82-
(c, e) ->
83-
new StandaloneClientHAServices(restServerAddress)),
84-
conf.get(FLINK_CLIENT_TIMEOUT));
79+
var fetcherType = conf.get(AutoscalerStandaloneOptions.FETCHER_TYPE);
80+
FunctionWithException<Configuration, RestClusterClient<String>, Exception> clientSupplier =
81+
configuration ->
82+
new RestClusterClient<>(
83+
configuration,
84+
"clusterId",
85+
(c, e) -> new StandaloneClientHAServices(restServerAddress));
86+
87+
switch (fetcherType) {
88+
case YARN:
89+
return (JobListFetcher<KEY, Context>)
90+
new YarnJobListFetcher(clientSupplier, conf.get(FLINK_CLIENT_TIMEOUT));
91+
case FLINK_CLUSTER:
92+
default:
93+
return (JobListFetcher<KEY, Context>)
94+
new FlinkClusterJobListFetcher(
95+
clientSupplier, conf.get(FLINK_CLIENT_TIMEOUT));
96+
}
8597
}
8698

8799
private static <KEY, Context extends JobAutoScalerContext<KEY>>

flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/config/AutoscalerStandaloneOptions.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,18 @@ private static ConfigOptions.OptionBuilder autoscalerStandaloneConfig(String key
4949
.defaultValue(100)
5050
.withDescription("The parallelism of autoscaler standalone control loop.");
5151

52+
public enum FetcherType {
53+
FLINK_CLUSTER,
54+
YARN
55+
}
56+
57+
public static final ConfigOption<FetcherType> FETCHER_TYPE =
58+
autoscalerStandaloneConfig("fetcher.type")
59+
.enumType(FetcherType.class)
60+
.defaultValue(FetcherType.FLINK_CLUSTER)
61+
.withDescription(
62+
"The job list fetcher type to use. Supported values: FLINK_CLUSTER, YARN.");
63+
5264
public static final ConfigOption<String> FETCHER_FLINK_CLUSTER_HOST =
5365
autoscalerStandaloneConfig("fetcher.flink-cluster.host")
5466
.stringType()
Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.autoscaler.standalone.yarn;
19+
20+
import org.apache.flink.api.common.JobID;
21+
import org.apache.flink.autoscaler.JobAutoScalerContext;
22+
import org.apache.flink.autoscaler.standalone.JobListFetcher;
23+
import org.apache.flink.autoscaler.utils.JobStatusUtils;
24+
import org.apache.flink.client.program.rest.RestClusterClient;
25+
import org.apache.flink.configuration.Configuration;
26+
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
27+
import org.apache.flink.runtime.client.JobStatusMessage;
28+
import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
29+
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
30+
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
31+
import org.apache.flink.runtime.rest.messages.JobMessageParameters;
32+
import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
33+
import org.apache.flink.runtime.rest.messages.job.JobManagerJobConfigurationHeaders;
34+
import org.apache.flink.util.function.FunctionWithException;
35+
36+
import org.apache.hadoop.yarn.api.records.ApplicationReport;
37+
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
38+
import org.apache.hadoop.yarn.client.api.YarnClient;
39+
40+
import java.time.Duration;
41+
import java.util.ArrayList;
42+
import java.util.Collection;
43+
import java.util.HashSet;
44+
import java.util.List;
45+
import java.util.Set;
46+
import java.util.concurrent.TimeUnit;
47+
import java.util.stream.Collectors;
48+
49+
import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.FETCHER_FLINK_CLUSTER_HOST;
50+
import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.FETCHER_FLINK_CLUSTER_PORT;
51+
52+
/** Fetch JobAutoScalerContext based on Flink on YARN cluster. */
53+
public class YarnJobListFetcher implements JobListFetcher<JobID, JobAutoScalerContext<JobID>> {
54+
55+
private final FunctionWithException<Configuration, RestClusterClient<String>, Exception>
56+
restClientGetter;
57+
private final Duration restClientTimeout;
58+
59+
public YarnJobListFetcher(
60+
FunctionWithException<Configuration, RestClusterClient<String>, Exception>
61+
restClientGetter,
62+
Duration restClientTimeout) {
63+
this.restClientGetter = restClientGetter;
64+
this.restClientTimeout = restClientTimeout;
65+
}
66+
67+
@Override
68+
public Collection<JobAutoScalerContext<JobID>> fetch(Configuration baseConf) throws Exception {
69+
70+
List<JobAutoScalerContext<JobID>> discovered = tryFetchFromFirstRunningYarnApp(baseConf);
71+
if (!discovered.isEmpty()) {
72+
return discovered;
73+
}
74+
75+
// use supplied client factory (may point to direct JM or a reverse proxy)
76+
try (var restClusterClient = restClientGetter.apply(new Configuration())) {
77+
return restClusterClient
78+
.sendRequest(
79+
JobsOverviewHeaders.getInstance(),
80+
EmptyMessageParameters.getInstance(),
81+
EmptyRequestBody.getInstance())
82+
.thenApply(JobStatusUtils::toJobStatusMessage)
83+
.get(restClientTimeout.toSeconds(), TimeUnit.SECONDS)
84+
.stream()
85+
.map(
86+
jobStatusMessage -> {
87+
try {
88+
return generateJobContext(
89+
baseConf, restClusterClient, jobStatusMessage);
90+
} catch (Throwable e) {
91+
throw new RuntimeException(
92+
"generateJobContext throw exception", e);
93+
}
94+
})
95+
.collect(Collectors.toList());
96+
}
97+
}
98+
99+
private List<JobAutoScalerContext<JobID>> tryFetchFromFirstRunningYarnApp(
100+
Configuration baseConf) {
101+
List<JobAutoScalerContext<JobID>> contexts = new ArrayList<>();
102+
YarnClient yarnClient = null;
103+
try {
104+
yarnClient = YarnClient.createYarnClient();
105+
org.apache.hadoop.conf.Configuration yarnConf =
106+
new org.apache.hadoop.conf.Configuration();
107+
yarnClient.init(yarnConf);
108+
yarnClient.start();
109+
110+
Set<String> appTypes = new HashSet<>();
111+
appTypes.add("Apache Flink");
112+
List<ApplicationReport> apps = yarnClient.getApplications(appTypes);
113+
114+
String rmBase =
115+
String.format(
116+
"http://%s:%s",
117+
baseConf.get(FETCHER_FLINK_CLUSTER_HOST),
118+
baseConf.get(FETCHER_FLINK_CLUSTER_PORT));
119+
120+
for (ApplicationReport app : apps) {
121+
if (app.getYarnApplicationState() != YarnApplicationState.RUNNING) {
122+
continue;
123+
}
124+
String appId = app.getApplicationId().toString();
125+
String proxyBase = rmBase + "/proxy/" + appId;
126+
127+
try (var client =
128+
new RestClusterClient<>(
129+
new Configuration(),
130+
"clusterId",
131+
(c, e) -> new StandaloneClientHAServices(proxyBase))) {
132+
var fetched =
133+
client
134+
.sendRequest(
135+
JobsOverviewHeaders.getInstance(),
136+
EmptyMessageParameters.getInstance(),
137+
EmptyRequestBody.getInstance())
138+
.thenApply(JobStatusUtils::toJobStatusMessage)
139+
.get(restClientTimeout.toSeconds(), TimeUnit.SECONDS)
140+
.stream()
141+
.map(
142+
jobStatusMessage -> {
143+
try {
144+
return generateJobContextForEndpoint(
145+
baseConf, proxyBase, jobStatusMessage);
146+
} catch (Throwable e) {
147+
throw new RuntimeException(
148+
"generateJobContext throw exception",
149+
e);
150+
}
151+
})
152+
.collect(Collectors.toList());
153+
contexts.addAll(fetched);
154+
}
155+
break;
156+
}
157+
} catch (Throwable ignore) {
158+
// Ignore
159+
} finally {
160+
if (yarnClient != null) {
161+
try {
162+
yarnClient.stop();
163+
} catch (Throwable ignored) {
164+
}
165+
}
166+
}
167+
return contexts;
168+
}
169+
170+
private JobAutoScalerContext<JobID> generateJobContext(
171+
Configuration baseConf,
172+
RestClusterClient<String> restClusterClient,
173+
JobStatusMessage jobStatusMessage)
174+
throws Exception {
175+
var jobId = jobStatusMessage.getJobId();
176+
var conf = getConfiguration(baseConf, restClusterClient, jobId);
177+
178+
return new JobAutoScalerContext<>(
179+
jobId,
180+
jobId,
181+
jobStatusMessage.getJobState(),
182+
conf,
183+
new UnregisteredMetricsGroup(),
184+
() -> restClientGetter.apply(conf));
185+
}
186+
187+
private Configuration getConfiguration(
188+
Configuration baseConf, RestClusterClient<String> restClusterClient, JobID jobId)
189+
throws Exception {
190+
var jobParameters = new JobMessageParameters();
191+
jobParameters.jobPathParameter.resolve(jobId);
192+
193+
var configurationInfo =
194+
restClusterClient
195+
.sendRequest(
196+
JobManagerJobConfigurationHeaders.getInstance(),
197+
jobParameters,
198+
EmptyRequestBody.getInstance())
199+
.get(restClientTimeout.toSeconds(), TimeUnit.SECONDS);
200+
201+
var conf = new Configuration(baseConf);
202+
configurationInfo.forEach(entry -> conf.setString(entry.getKey(), entry.getValue()));
203+
return conf;
204+
}
205+
206+
private JobAutoScalerContext<JobID> generateJobContextForEndpoint(
207+
Configuration baseConf, String endpointBase, JobStatusMessage jobStatusMessage)
208+
throws Exception {
209+
var jobId = jobStatusMessage.getJobId();
210+
try (var client =
211+
new RestClusterClient<>(
212+
new Configuration(),
213+
"clusterId",
214+
(c, e) -> new StandaloneClientHAServices(endpointBase))) {
215+
var conf = getConfiguration(baseConf, client, jobId);
216+
return new JobAutoScalerContext<>(
217+
jobId,
218+
jobId,
219+
jobStatusMessage.getJobState(),
220+
conf,
221+
new UnregisteredMetricsGroup(),
222+
() ->
223+
new RestClusterClient<>(
224+
conf,
225+
"clusterId",
226+
(c, e) -> new StandaloneClientHAServices(endpointBase)));
227+
}
228+
}
229+
}

0 commit comments

Comments
 (0)