diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/ClientCache.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/ClientCache.java index caf7f31a72..974fe596c0 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/ClientCache.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/ClientCache.java @@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.JobID; +import org.apache.tez.dag.api.client.DAGClient; public class ClientCache { @@ -39,9 +40,22 @@ public ClientCache(Configuration conf, ResourceMgrDelegate rm) { //TODO: evict from the cache on some threshold public synchronized ClientServiceDelegate getClient(JobID jobId) { + return getClient(jobId, null); + } + + /** + * Get or create a delegate for the given job, optionally with a DAG client + * for fetching counters and other AM-backed data. + */ + public synchronized ClientServiceDelegate getClient(JobID jobId, + DAGClient dagClient) { ClientServiceDelegate client = cache.get(jobId); if (client == null) { - client = new ClientServiceDelegate(conf, rm, jobId); + client = new ClientServiceDelegate(conf, rm, jobId, dagClient); + cache.put(jobId, client); + } else if (dagClient != null && client.getDAGClient() == null) { + // Replace with a delegate that has the DAG client for this job + client = new ClientServiceDelegate(conf, rm, jobId, dagClient); cache.put(jobId, client); } return client; diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/ClientServiceDelegate.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/ClientServiceDelegate.java index 936ef2a1b6..f2385af065 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/ClientServiceDelegate.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/ClientServiceDelegate.java @@ -19,6 +19,7 @@ package org.apache.tez.mapreduce.client; import java.io.IOException; +import java.util.EnumSet; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; @@ -32,18 +33,30 @@ import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.v2.LogParams; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.api.client.DAGClient; +import org.apache.tez.dag.api.client.StatusGetOpts; +import org.apache.tez.mapreduce.hadoop.TezTypeConverters; public class ClientServiceDelegate { private final TezConfiguration conf; + private final DAGClient dagClient; // FIXME // how to handle completed jobs that the RM does not know about? public ClientServiceDelegate(Configuration conf, ResourceMgrDelegate rm, JobID jobId) { + this(conf, rm, jobId, null); + } + + public ClientServiceDelegate(Configuration conf, ResourceMgrDelegate rm, + JobID jobId, DAGClient dagClient) { this.conf = new TezConfiguration(conf); // Cloning for modifying. + this.dagClient = dagClient; // For faster redirects from AM to HS. this.conf.setInt( CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, @@ -51,12 +64,32 @@ public ClientServiceDelegate(Configuration conf, ResourceMgrDelegate rm, MRJobConfig.DEFAULT_MR_CLIENT_TO_AM_IPC_MAX_RETRIES)); } + /** + * Returns the DAG client for this delegate, if any (used for counters). + */ + DAGClient getDAGClient() { + return dagClient; + } + + /** + * Returns job counters from the Tez DAG, translated to MapReduce Counters. + * Returns empty counters if no DAG client is set, if the AM is unreachable + * (e.g. job already finished and AM exited), or if counters are not available. + */ public Counters getJobCounters(JobID jobId) throws IOException, InterruptedException { - // FIXME needs counters support from DAG - // with a translation layer on client side - Counters empty = new Counters(); - return empty; + if (dagClient == null) { + return new Counters(); + } + try { + TezCounters tezCounters = dagClient.getDAGStatus( + EnumSet.of(StatusGetOpts.GET_COUNTERS)).getDAGCounters(); + Counters mrCounters = TezTypeConverters.fromTez(tezCounters); + return mrCounters != null ? mrCounters : new Counters(); + } catch (TezException e) { + // AM may be gone (e.g. completed job); return empty counters + return new Counters(); + } } public TaskCompletionEvent[] getTaskCompletionEvents(JobID jobId, diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java index aba66f3817..cb66f7211f 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -141,6 +142,8 @@ public class YARNRunner implements ClientProtocol { private final TezConfiguration tezConf; private MRTezClient tezClient; private MRDAGClient dagClient; + /** Map from JobID to DAGClient for jobs submitted by this runner (used for counters and status). */ + private final Map dagClientMap = new ConcurrentHashMap(); /** * Yarn runner incapsulates the client interface of @@ -640,6 +643,7 @@ public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts) tezClient = new MRTezClient("MapReduce", dagAMConf, false, jobLocalResources, ts); tezClient.start(); dagClient = new MRDAGClient(tezClient.submitDAGApplication(appId, dag)); + dagClientMap.put(jobId, dagClient); tezClient.stop(); } catch (TezException e) { throw new IOException(e); @@ -687,7 +691,8 @@ public long renewDelegationToken(Token arg0) @Override public Counters getJobCounters(JobID arg0) throws IOException, InterruptedException { - return clientCache.getClient(arg0).getJobCounters(arg0); + MRDAGClient dagClientForJob = dagClientMap.get(arg0); + return clientCache.getClient(arg0, dagClientForJob).getJobCounters(arg0); } @Override @@ -700,10 +705,14 @@ public JobStatus getJobStatus(JobID jobID) throws IOException, InterruptedException { String user = UserGroupInformation.getCurrentUser().getShortUserName(); String jobFile = MRApps.getJobFile(conf, user, jobID); + MRDAGClient clientForJob = dagClientMap.get(jobID); + if (clientForJob == null) { + clientForJob = dagClient; + } DAGStatus dagStatus; try { - dagStatus = dagClient.getDAGStatus(null); - return new DAGJobStatus(dagClient.getApplicationReport(), dagStatus, jobFile); + dagStatus = clientForJob.getDAGStatus(null); + return new DAGJobStatus(clientForJob.getApplicationReport(), dagStatus, jobFile); } catch (TezException e) { throw new IOException(e); } diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/client/TestClientServiceDelegate.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/client/TestClientServiceDelegate.java new file mode 100644 index 0000000000..53fa143c1c --- /dev/null +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/client/TestClientServiceDelegate.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.mapreduce.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.dag.api.client.DAGClient; +import org.apache.tez.dag.api.client.DAGStatus; + +import org.junit.Test; + +public class TestClientServiceDelegate { + + @Test + void testGetJobCountersWithoutDAGClientReturnsEmpty() throws Exception { + Configuration conf = new Configuration(false); + ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class); + JobID jobId = new JobID("jt", 1); + ClientServiceDelegate delegate = new ClientServiceDelegate(conf, rm, jobId); + + Counters counters = delegate.getJobCounters(jobId); + + assertNotNull(counters); + assertEquals(0, counters.countCounters()); + } + + @Test + void testGetJobCountersWithDAGClientReturnsTranslatedCounters() throws Exception { + Configuration conf = new Configuration(false); + ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class); + JobID jobId = new JobID("jt", 2); + + TezCounters tezCounters = new TezCounters(); + tezCounters.addGroup("TestGroup", "Test Group Display"); + tezCounters.findCounter("TestGroup", "TestCounter").setValue(42L); + + DAGStatus dagStatus = mock(DAGStatus.class); + when(dagStatus.getDAGCounters()).thenReturn(tezCounters); + + DAGClient dagClient = mock(DAGClient.class); + when(dagClient.getDAGStatus(any())).thenReturn(dagStatus); + + ClientServiceDelegate delegate = new ClientServiceDelegate(conf, rm, jobId, dagClient); + + Counters counters = delegate.getJobCounters(jobId); + + assertNotNull(counters); + assertTrue("Expected at least one counter", counters.countCounters() >= 1); + assertNotNull(counters.getGroup("TestGroup")); + assertEquals(42L, counters.findCounter("TestGroup", "TestCounter").getValue()); + } + + @Test + void testGetJobCountersWhenDAGStatusReturnsNullCounters() throws Exception { + Configuration conf = new Configuration(false); + ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class); + JobID jobId = new JobID("jt", 3); + + DAGStatus dagStatus = mock(DAGStatus.class); + when(dagStatus.getDAGCounters()).thenReturn(null); + + DAGClient dagClient = mock(DAGClient.class); + when(dagClient.getDAGStatus(any())).thenReturn(dagStatus); + + ClientServiceDelegate delegate = new ClientServiceDelegate(conf, rm, jobId, dagClient); + + Counters counters = delegate.getJobCounters(jobId); + + assertNotNull(counters); + assertEquals(0, counters.countCounters()); + } +}