Skip to content

Commit 547c859

Browse files
committed
TEZ-4554: Counter for used nodes within a DAG
1 parent 0ac505b commit 547c859

File tree

6 files changed

+115
-19
lines changed

6 files changed

+115
-19
lines changed

tez-api/src/main/java/org/apache/tez/common/counters/DAGCounter.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,5 +87,24 @@ public enum DAGCounter {
8787
* Number of container reuses during a DAG. This is incremented every time
8888
* the containerReused callback is called in the TaskSchedulerContext.
8989
*/
90-
TOTAL_CONTAINER_REUSE_COUNT
90+
TOTAL_CONTAINER_REUSE_COUNT,
91+
92+
/*
93+
* Number of nodes to which task attempts were assigned in this DAG.
94+
* Nodes are distinguished by the Yarn NodeId.
95+
*/
96+
NODE_USED_COUNT,
97+
98+
/*
99+
* Number of node hosts to which task attempts were assigned in this DAG.
100+
* Nodes are distinguished by Yarn NodeId.getHost()
101+
*/
102+
NODE_HOSTS_USED_COUNT,
103+
104+
/*
105+
* Total number of nodes visible to the task scheduler (regardless of
106+
* task assignments). This is typically exposed by a resource manager
107+
* client.
108+
*/
109+
NODE_TOTAL_COUNT
91110
}

tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import java.util.Set;
2424

2525
import org.apache.hadoop.security.UserGroupInformation;
26-
import org.apache.hadoop.yarn.api.records.ContainerId;
26+
import org.apache.hadoop.yarn.api.records.Container;
2727
import org.apache.hadoop.yarn.api.records.LocalResource;
2828
import org.apache.hadoop.yarn.event.EventHandler;
2929
import org.apache.tez.common.counters.DAGCounter;
@@ -106,7 +106,7 @@ VertexStatusBuilder getVertexStatus(String vertexName,
106106

107107
void incrementDagCounter(DAGCounter counter, int incrValue);
108108
void setDagCounter(DAGCounter counter, int setValue);
109-
void addUsedContainer(ContainerId containerId);
109+
void addUsedContainer(Container container);
110110

111111
/**
112112
* Called by the DAGAppMaster when the DAG is started normally or in the event of recovery.

tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,10 @@
6161
import org.apache.hadoop.security.Credentials;
6262
import org.apache.hadoop.security.UserGroupInformation;
6363
import org.apache.hadoop.yarn.api.ApplicationConstants;
64+
import org.apache.hadoop.yarn.api.records.Container;
6465
import org.apache.hadoop.yarn.api.records.ContainerId;
6566
import org.apache.hadoop.yarn.api.records.LocalResource;
67+
import org.apache.hadoop.yarn.api.records.NodeId;
6668
import org.apache.hadoop.yarn.event.EventHandler;
6769
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
6870
import org.apache.hadoop.yarn.state.MultipleArcTransition;
@@ -250,6 +252,11 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
250252

251253
private final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
252254
private final Set<ContainerId> containersUsedByCurrentDAG = new HashSet<>();
255+
@VisibleForTesting
256+
final Set<NodeId> nodesUsedByCurrentDAG = new HashSet<>();
257+
@VisibleForTesting
258+
final Set<String> nodeHostsUsedByCurrentDAG = new HashSet<>();
259+
253260

254261
protected static final
255262
StateMachineFactory<DAGImpl, DAGState, DAGEventType, DAGEvent>
@@ -2563,7 +2570,7 @@ public void onStart() {
25632570
@Override
25642571
public void onFinish() {
25652572
stopVertexServices();
2566-
handleUsedContainersOnDagFinish();
2573+
updateCounters();
25672574
}
25682575

25692576
private void startVertexServices() {
@@ -2579,11 +2586,16 @@ void stopVertexServices() {
25792586
}
25802587

25812588
@Override
2582-
public void addUsedContainer(ContainerId containerId) {
2583-
containersUsedByCurrentDAG.add(containerId);
2589+
public void addUsedContainer(Container container) {
2590+
containersUsedByCurrentDAG.add(container.getId());
2591+
nodesUsedByCurrentDAG.add(container.getNodeId());
2592+
nodeHostsUsedByCurrentDAG.add(container.getNodeId().getHost());
25842593
}
25852594

2586-
private void handleUsedContainersOnDagFinish() {
2595+
private void updateCounters() {
25872596
setDagCounter(DAGCounter.TOTAL_CONTAINERS_USED, containersUsedByCurrentDAG.size());
2597+
setDagCounter(DAGCounter.NODE_USED_COUNT, nodesUsedByCurrentDAG.size());
2598+
setDagCounter(DAGCounter.NODE_HOSTS_USED_COUNT, nodeHostsUsedByCurrentDAG.size());
2599+
setDagCounter(DAGCounter.NODE_TOTAL_COUNT, appContext.getTaskScheduler().getNumClusterNodes());
25882600
}
25892601
}

tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -746,7 +746,7 @@ public synchronized void taskAllocated(int schedulerId, Object task,
746746
sendEvent(new AMNodeEventContainerAllocated(container
747747
.getNodeId(), schedulerId, container.getId()));
748748
}
749-
appContext.getCurrentDAG().addUsedContainer(containerId);
749+
appContext.getCurrentDAG().addUsedContainer(container);
750750

751751
TaskAttempt taskAttempt = event.getTaskAttempt();
752752
// TODO - perhaps check if the task still needs this container

tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java

Lines changed: 75 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.apache.tez.common.counters.TezCounters;
5050
import org.apache.tez.dag.api.TezConstants;
5151
import org.apache.tez.dag.app.dag.event.DAGEventTerminateDag;
52+
import org.apache.tez.dag.app.rm.TaskSchedulerManager;
5253
import org.apache.tez.hadoop.shim.DefaultHadoopShim;
5354
import org.apache.tez.hadoop.shim.HadoopShim;
5455
import org.junit.Rule;
@@ -60,6 +61,8 @@
6061
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
6162
import org.apache.hadoop.yarn.api.records.ApplicationId;
6263
import org.apache.hadoop.yarn.api.records.ContainerId;
64+
import org.apache.hadoop.yarn.api.records.Container;
65+
import org.apache.hadoop.yarn.api.records.NodeId;
6366
import org.apache.hadoop.yarn.api.records.Resource;
6467
import org.apache.hadoop.yarn.event.EventHandler;
6568
import org.apache.hadoop.yarn.util.Clock;
@@ -181,6 +184,7 @@ public class TestDAGImpl {
181184
private ACLManager aclManager;
182185
private ApplicationAttemptId appAttemptId;
183186
private DAGImpl dag;
187+
private TaskSchedulerManager taskSchedulerManager;
184188
private TaskEventDispatcher taskEventDispatcher;
185189
private VertexEventDispatcher vertexEventDispatcher;
186190
private DagEventDispatcher dagEventDispatcher;
@@ -861,11 +865,12 @@ public void setup() {
861865
dispatcher = new DrainDispatcher();
862866
fsTokens = new Credentials();
863867
appContext = mock(AppContext.class);
868+
taskSchedulerManager = mock(TaskSchedulerManager.class);
864869
execService = mock(ListeningExecutorService.class);
865870
final ListenableFuture<Void> mockFuture = mock(ListenableFuture.class);
866871
when(appContext.getHadoopShim()).thenReturn(defaultShim);
867872
when(appContext.getApplicationID()).thenReturn(appAttemptId.getApplicationId());
868-
873+
869874
doAnswer(new Answer() {
870875
public ListenableFuture<Void> answer(InvocationOnMock invocation) {
871876
Object[] args = invocation.getArguments();
@@ -2358,22 +2363,82 @@ public void testCounterLimits() {
23582363

23592364
}
23602365

2361-
@SuppressWarnings("unchecked")
23622366
@Test(timeout = 5000)
23632367
public void testTotalContainersUsedCounter() {
2368+
DAGImpl spy = getDagSpy();
2369+
2370+
spy.addUsedContainer(Container.newInstance(ContainerId.fromString("container_e16_1504924099862_7571_01_000005"),
2371+
mock(NodeId.class), null, null, null, null));
2372+
spy.addUsedContainer(Container.newInstance(ContainerId.fromString("container_e16_1504924099862_7571_01_000006"),
2373+
mock(NodeId.class), null, null, null, null));
2374+
2375+
spy.onFinish();
2376+
// 2 calls to addUsedContainer
2377+
verify(spy, times(2)).addUsedContainer(any(Container.class));
2378+
// 2 containers were used
2379+
Assert.assertEquals(2,
2380+
spy.getAllCounters().getGroup(DAGCounter.class.getName()).findCounter(DAGCounter.TOTAL_CONTAINERS_USED.name())
2381+
.getValue());
2382+
}
2383+
2384+
@Test(timeout = 5000)
2385+
public void testNodesUsedCounter() {
2386+
DAGImpl spy = getDagSpy();
2387+
2388+
Container containerOnHost = mock(Container.class);
2389+
when(containerOnHost.getNodeId()).thenReturn(NodeId.fromString("localhost:0"));
2390+
Container containerOnSameHost = mock(Container.class);
2391+
when(containerOnSameHost.getNodeId()).thenReturn(NodeId.fromString("localhost:0"));
2392+
Container containerOnDifferentHost = mock(Container.class);
2393+
when(containerOnDifferentHost.getNodeId()).thenReturn(NodeId.fromString("otherhost:0"));
2394+
Container containerOnSameHostWithDifferentPort = mock(Container.class);
2395+
when(containerOnSameHostWithDifferentPort.getNodeId()).thenReturn(NodeId.fromString("localhost:1"));
2396+
2397+
spy.addUsedContainer(containerOnHost);
2398+
spy.addUsedContainer(containerOnSameHost);
2399+
spy.addUsedContainer(containerOnDifferentHost);
2400+
spy.addUsedContainer(containerOnSameHostWithDifferentPort);
2401+
2402+
when(taskSchedulerManager.getNumClusterNodes()).thenReturn(10);
2403+
2404+
spy.onFinish();
2405+
// 4 calls to addUsedContainer
2406+
verify(spy, times(4)).addUsedContainer(any(Container.class));
2407+
// 3 nodes were used: localhost:0, otherhost:0, localhost:1
2408+
// localhost:0 and localhost:1 might be on the same physical host, but as long as
2409+
// yarn considers them different nodes, we consider them different too
2410+
Assert.assertEquals(3,
2411+
spy.getAllCounters().getGroup(DAGCounter.class.getName()).findCounter(DAGCounter.NODE_USED_COUNT.name())
2412+
.getValue());
2413+
2414+
Assert.assertTrue(spy.nodesUsedByCurrentDAG.contains(NodeId.fromString("localhost:0")));
2415+
Assert.assertTrue(spy.nodesUsedByCurrentDAG.contains(NodeId.fromString("otherhost:0")));
2416+
Assert.assertTrue(spy.nodesUsedByCurrentDAG.contains(NodeId.fromString("localhost:1")));
2417+
2418+
// 2 distinct node hosts were seen: localhost, otherhost
2419+
Assert.assertEquals(2,
2420+
spy.getAllCounters().getGroup(DAGCounter.class.getName())
2421+
.findCounter(DAGCounter.NODE_HOSTS_USED_COUNT.name())
2422+
.getValue());
2423+
2424+
Assert.assertEquals(10,
2425+
spy.getAllCounters().getGroup(DAGCounter.class.getName())
2426+
.findCounter(DAGCounter.NODE_TOTAL_COUNT.name())
2427+
.getValue());
2428+
2429+
Assert.assertTrue(spy.nodeHostsUsedByCurrentDAG.contains("localhost"));
2430+
Assert.assertTrue(spy.nodeHostsUsedByCurrentDAG.contains("otherhost"));
2431+
}
2432+
2433+
private DAGImpl getDagSpy() {
23642434
initDAG(mrrDag);
23652435
dispatcher.await();
23662436
startDAG(mrrDag);
23672437
dispatcher.await();
23682438

2369-
DAGImpl spy = spy(mrrDag);
2370-
spy.addUsedContainer(mock(ContainerId.class));
2371-
spy.addUsedContainer(mock(ContainerId.class));
2439+
// needed when onFinish() method is called on a DAGImpl
2440+
when(mrrAppContext.getTaskScheduler()).thenReturn(taskSchedulerManager);
23722441

2373-
spy.onFinish();
2374-
// 2 calls to addUsedContainer, obviously, we did it here
2375-
verify(spy, times(2)).addUsedContainer(any(ContainerId.class));
2376-
// 1 call to setDagCounter, which happened at dag.onFinish
2377-
verify(spy).setDagCounter(DAGCounter.TOTAL_CONTAINERS_USED, 2);
2442+
return spy(mrrDag);
23782443
}
23792444
}

tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ public void testSimpleAllocate() throws Exception {
234234
assertEquals(priority, assignEvent.getPriority());
235235
assertEquals(mockAttemptId, assignEvent.getTaskAttemptId());
236236

237-
verify(mockAppContext.getCurrentDAG()).addUsedContainer(any(ContainerId.class)); // called on taskAllocated
237+
verify(mockAppContext.getCurrentDAG()).addUsedContainer(any(Container.class)); // called on taskAllocated
238238
}
239239

240240
@Test(timeout = 5000)

0 commit comments

Comments
 (0)