From 09a030ee75cebe31a8f1d15c3f6b6d908f6fab9b Mon Sep 17 00:00:00 2001 From: Sanil15 Date: Thu, 13 Aug 2020 17:26:54 -0700 Subject: [PATCH 1/4] Per container failure tracking metric for Samza --- .../documentation/versioned/operations/monitoring.md | 1 + .../clustermanager/ContainerProcessManager.java | 12 +++++++++++- .../samza/clustermanager/SamzaApplicationState.java | 7 +++++++ .../metrics/ContainerProcessManagerMetrics.scala | 7 +++++++ 4 files changed, 26 insertions(+), 1 deletion(-) diff --git a/docs/learn/documentation/versioned/operations/monitoring.md b/docs/learn/documentation/versioned/operations/monitoring.md index c11fbe0786..0b45b68f30 100644 --- a/docs/learn/documentation/versioned/operations/monitoring.md +++ b/docs/learn/documentation/versioned/operations/monitoring.md @@ -369,6 +369,7 @@ All \, \, \, \, \, are popula | | expired-preferred-host-requests | Number of expired resource-requests-for -preferred-host received by the cluster manager. | | | expired-any-host-requests | Number of expired resource-requests-for -any-host received by the cluster manager. | | | host-affinity-match-pct | Percentage of non-expired preferred host requests. This measures the % of resource-requests for which host-affinity provided the preferred host. | +| | \-failure-count | Number of times a container identified by containerId has failed | | **Group** | **Metric name** | **Meaning** | | --- | --- | --- | diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java index f6e3b1f899..d4556b3fed 100644 --- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; import org.apache.samza.SamzaException; @@ -182,7 +183,7 @@ public ContainerProcessManager(Config config, SamzaApplicationState state, Metri this.jobConfig = new JobConfig(clusterManagerConfig); this.hostAffinityEnabled = clusterManagerConfig.getHostAffinityEnabled(); - + this.containerProcessManagerMetrics = new ContainerProcessManagerMetrics(jobConfig, state, registry); this.clusterResourceManager = resourceManager; this.containerManager = containerManager; this.diagnosticsManager = Option.empty(); @@ -236,6 +237,12 @@ public void start() { Map processorToHostMapping = state.jobModelManager.jobModel().getAllContainerLocality(); containerAllocator.requestResources(processorToHostMapping); + // Initialize the per processor failure count to be 0 + processorToHostMapping.keySet().forEach(processorId -> { + state.perProcessorFailureCount.put(processorId, new AtomicInteger(0)); + containerProcessManagerMetrics.registerProcessorFailureCountMetric(processorId); + }); + // Start container allocator thread LOG.info("Starting the container allocator thread"); allocatorThread.start(); @@ -472,6 +479,9 @@ void onResourceCompletedWithUnknownStatus(SamzaResourceStatus resourceStatus, St LOG.info("Container ID: {} for Processor ID: {} failed with exit code: {}.", containerId, processorId, exitStatus); Instant now = Instant.now(); state.failedContainers.incrementAndGet(); + if (state.perProcessorFailureCount.get(processorId) != null) { + state.perProcessorFailureCount.get(processorId).incrementAndGet(); + } state.jobHealthy.set(false); state.neededProcessors.incrementAndGet(); diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java index 784f0b45da..5fa98b41c8 100644 --- a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java @@ -115,6 +115,13 @@ public enum SamzaAppStatus { UNDEFINED, SUCCEEDED, FAILED } */ public final ConcurrentHashMap failedProcessors = new ConcurrentHashMap<>(0); + + /** + * Map of the Samza processor ID to the count of failed attempts + * Modified by AMRMCallbackThread + */ + public final ConcurrentMap perProcessorFailureCount = new ConcurrentHashMap<>(0); + /** * Final status of the application. Made to be volatile s.t. changes will be visible in multiple threads. */ diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala b/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala index 91fec2886c..543fb648bf 100644 --- a/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala +++ b/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala @@ -64,4 +64,11 @@ class ContainerProcessManagerMetrics(val config: Config, val mContainerMemoryMb = newGauge("container-memory-mb", () => clusterManagerConfig.getContainerMemoryMb) val mContainerCpuCores = newGauge("container-cpu-cores", () => clusterManagerConfig.getNumCores) + + + val mPerContainerFailureCount = collection.mutable.Map[String, Gauge[Int]]() + def registerProcessorFailureCountMetric(containerId: String) { + mPerContainerFailureCount.put(containerId, newGauge("container_" + containerId + "-failure-count", () => state.perProcessorFailureCount.get(containerId).get())) + } + } From 4189ae8c648cfad69a4a418ce005022abdacac4a Mon Sep 17 00:00:00 2001 From: Sanil15 Date: Wed, 19 Aug 2020 10:36:40 -0700 Subject: [PATCH 2/4] Add teardown to the tests to prevent memory leaks --- .../TestContainerProcessManager.java | 52 ++++++++----------- 1 file changed, 23 insertions(+), 29 deletions(-) diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java index a5dbe77eb5..ece9cbe292 100644 --- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java +++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java @@ -86,6 +86,8 @@ public class TestContainerProcessManager { }; private Config config = new MapConfig(configVals); private ContainerPlacementMetadataStore containerPlacementMetadataStore; + private CoordinatorStreamStore coordinatorStreamStore; + private ContainerProcessManager cpm; private Config getConfig() { Map map = new HashMap<>(); @@ -129,7 +131,7 @@ private JobModelManager getJobModelManagerWithoutHostAffinity(int containerCount public void setup() throws Exception { server = new MockHttpServer("/", 7777, null, new ServletHolder(DefaultServlet.class)); CoordinatorStreamStoreTestUtil coordinatorStreamStoreTestUtil = new CoordinatorStreamStoreTestUtil(config); - CoordinatorStreamStore coordinatorStreamStore = coordinatorStreamStoreTestUtil.getCoordinatorStreamStore(); + coordinatorStreamStore = coordinatorStreamStoreTestUtil.getCoordinatorStreamStore(); coordinatorStreamStore.init(); containerPlacementMetadataStore = new ContainerPlacementMetadataStore(coordinatorStreamStore); containerPlacementMetadataStore.start(); @@ -154,8 +156,7 @@ public void testContainerProcessManager() throws Exception { MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state); ContainerManager containerManager = new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, false); - ContainerProcessManager cpm = - buildContainerProcessManager(new ClusterManagerConfig(new MapConfig(conf)), state, clusterResourceManager, Optional.empty()); + cpm = buildContainerProcessManager(new ClusterManagerConfig(new MapConfig(conf)), state, clusterResourceManager, Optional.empty()); ContainerAllocator allocator = (ContainerAllocator) getPrivateFieldFromCpm("containerAllocator", cpm).get(cpm); @@ -200,8 +201,7 @@ public void testOnInit() throws Exception { new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, clusterManagerConfig.getHostAffinityEnabled(), false); - ContainerProcessManager cpm = - buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.empty()); + cpm = buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.empty()); MockContainerAllocatorWithoutHostAffinity allocator = new MockContainerAllocatorWithoutHostAffinity( clusterResourceManager, @@ -231,7 +231,7 @@ public void run() { assertEquals(1, state.neededProcessors.get()); assertEquals(1, allocator.requestedContainers); - cpm.stop(); + } @Test @@ -242,8 +242,7 @@ public void testOnShutdown() throws Exception { MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state); ClusterManagerConfig clusterManagerConfig = spy(new ClusterManagerConfig(conf)); - ContainerProcessManager cpm = - buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.empty()); + cpm = buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.empty()); cpm.start(); Thread allocatorThread = (Thread) getPrivateFieldFromCpm("allocatorThread", cpm).get(cpm); @@ -274,8 +273,7 @@ public void testCpmShouldStopWhenContainersFinish() throws Exception { state, containerManager); - ContainerProcessManager cpm = - spy(buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator))); + cpm = spy(buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator))); // start triggers a request cpm.start(); @@ -322,8 +320,7 @@ public void testNewContainerRequestedOnFailureWithUnknownCode() throws Exception state, containerManager); - ContainerProcessManager cpm = spy( - buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator))); + cpm = spy(buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator))); // start triggers a request cpm.start(); @@ -381,7 +378,7 @@ public void testNewContainerRequestedOnFailureWithUnknownCode() throws Exception assertTrue(cpm.shouldShutdown()); assertEquals(SamzaApplicationState.SamzaAppStatus.FAILED, state.status); - cpm.stop(); + } /** @@ -421,8 +418,7 @@ private void testContainerRequestedRetriesExceedingWindowOnFailureWithUnknownCod state, containerManager); - ContainerProcessManager cpm = - buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator)); + cpm = buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator)); // start triggers a request cpm.start(); @@ -475,7 +471,7 @@ private void testContainerRequestedRetriesExceedingWindowOnFailureWithUnknownCod assertEquals(false, cpm.getJobFailureCriteriaMet()); assertEquals(1, cpm.getProcessorFailures().get(processorId).getCount()); - cpm.stop(); + } @Test @@ -507,8 +503,7 @@ private void testContainerRequestedRetriesNotExceedingWindowOnFailureWithUnknown state, containerManager); - ContainerProcessManager cpm = - buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator)); + cpm = buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator)); // start triggers a request cpm.start(); @@ -596,7 +591,7 @@ private void testContainerRequestedRetriesNotExceedingWindowOnFailureWithUnknown assertEquals(0, allocator.getContainerRequestState().numPendingRequests()); assertEquals(0, allocator.getContainerRequestState().numDelayedRequests()); - cpm.stop(); + } @Test @@ -617,8 +612,7 @@ public void testInvalidNotificationsAreIgnored() throws Exception { state, containerManager); - ContainerProcessManager cpm = - spy(buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator))); + cpm = spy(buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator))); // Start the task clusterResourceManager cpm.start(); @@ -693,8 +687,7 @@ public void testAllBufferedResourcesAreUtilized() throws Exception { state, containerManager); - ContainerProcessManager cpm = - spy(buildContainerProcessManager(new ClusterManagerConfig(cfg), state, clusterResourceManager, Optional.of(allocator))); + cpm = spy(buildContainerProcessManager(new ClusterManagerConfig(cfg), state, clusterResourceManager, Optional.of(allocator))); cpm.start(); assertFalse(cpm.shouldShutdown()); @@ -758,8 +751,7 @@ public void testDuplicateNotificationsDoNotAffectJobHealth() throws Exception { state, containerManager); - ContainerProcessManager cpm = - spy(buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator))); + cpm = spy(buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator))); // Start the task manager cpm.start(); @@ -833,8 +825,7 @@ public void testNewContainerRequestedOnFailureWithKnownCode() throws Exception { state, containerManager); - ContainerProcessManager cpm = - spy(buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator))); + cpm = spy(buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator))); // Start the task clusterResourceManager cpm.start(); @@ -906,13 +897,16 @@ public void testNewContainerRequestedOnFailureWithKnownCode() throws Exception { assertFalse(cpm.shouldShutdown()); assertFalse(state.jobHealthy.get()); assertEquals(ResourceRequestState.ANY_HOST, allocator.getContainerRequestState().peekPendingRequest().getPreferredHost()); - - cpm.stop(); } @After public void teardown() { + if (cpm != null) { + cpm.stop(); + } server.stop(); + containerPlacementMetadataStore.stop(); + coordinatorStreamStore.close(); } private ContainerProcessManager buildContainerProcessManager(ClusterManagerConfig clusterManagerConfig, SamzaApplicationState state, From f6d62144e6ec7434442f0849a6d812ebe392d153 Mon Sep 17 00:00:00 2001 From: Sanil15 Date: Tue, 8 Sep 2020 17:17:21 -0700 Subject: [PATCH 3/4] Address Rays comments --- .../clustermanager/ContainerProcessManager.java | 6 +----- .../samza/clustermanager/SamzaApplicationState.java | 7 ------- .../metrics/ContainerProcessManagerMetrics.scala | 12 ++++++++++-- 3 files changed, 11 insertions(+), 14 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java index d4556b3fed..e71dd8d0ed 100644 --- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java @@ -25,7 +25,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; import org.apache.samza.SamzaException; @@ -239,7 +238,6 @@ public void start() { // Initialize the per processor failure count to be 0 processorToHostMapping.keySet().forEach(processorId -> { - state.perProcessorFailureCount.put(processorId, new AtomicInteger(0)); containerProcessManagerMetrics.registerProcessorFailureCountMetric(processorId); }); @@ -479,9 +477,7 @@ void onResourceCompletedWithUnknownStatus(SamzaResourceStatus resourceStatus, St LOG.info("Container ID: {} for Processor ID: {} failed with exit code: {}.", containerId, processorId, exitStatus); Instant now = Instant.now(); state.failedContainers.incrementAndGet(); - if (state.perProcessorFailureCount.get(processorId) != null) { - state.perProcessorFailureCount.get(processorId).incrementAndGet(); - } + containerProcessManagerMetrics.incrementProcessorFailureCountMetric(processorId); state.jobHealthy.set(false); state.neededProcessors.incrementAndGet(); diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java index 5fa98b41c8..784f0b45da 100644 --- a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java @@ -115,13 +115,6 @@ public enum SamzaAppStatus { UNDEFINED, SUCCEEDED, FAILED } */ public final ConcurrentHashMap failedProcessors = new ConcurrentHashMap<>(0); - - /** - * Map of the Samza processor ID to the count of failed attempts - * Modified by AMRMCallbackThread - */ - public final ConcurrentMap perProcessorFailureCount = new ConcurrentHashMap<>(0); - /** * Final status of the application. Made to be volatile s.t. changes will be visible in multiple threads. */ diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala b/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala index 543fb648bf..ea3b924b44 100644 --- a/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala +++ b/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala @@ -66,9 +66,17 @@ class ContainerProcessManagerMetrics(val config: Config, val mContainerCpuCores = newGauge("container-cpu-cores", () => clusterManagerConfig.getNumCores) + /** + * Maitains the map of processorId 0,1,2 to failure count for one container + */ val mPerContainerFailureCount = collection.mutable.Map[String, Gauge[Int]]() - def registerProcessorFailureCountMetric(containerId: String) { - mPerContainerFailureCount.put(containerId, newGauge("container_" + containerId + "-failure-count", () => state.perProcessorFailureCount.get(containerId).get())) + + def registerProcessorFailureCountMetric(processorId: String) { + mPerContainerFailureCount.put(processorId, newGauge("container_" + processorId + "-failure-count", 0)) + } + + def incrementProcessorFailureCountMetric(processorId: String) { + mPerContainerFailureCount.get(processorId).get.set(mPerContainerFailureCount.get(processorId).get.getValue + 1) } } From 5ebf38561a8d07fe7e1eaec91edc08372bd36773 Mon Sep 17 00:00:00 2001 From: Sanil15 Date: Mon, 21 Sep 2020 18:24:58 -0700 Subject: [PATCH 4/4] Fix the shutdown sequence for container process manager test --- .../clustermanager/TestContainerProcessManager.java | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java index ae3ded0013..8789d207bb 100644 --- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java +++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java @@ -147,8 +147,7 @@ public void testContainerProcessManager() throws Exception { .thenReturn(new LocalityModel(ImmutableMap.of("0", new ProcessorLocality("0", "host1")))); ContainerManager containerManager = buildContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, false, mockLocalityManager); - ContainerProcessManager cpm = - buildContainerProcessManager(new ClusterManagerConfig(new MapConfig(conf)), state, clusterResourceManager, Optional.empty()); + cpm = buildContainerProcessManager(new ClusterManagerConfig(new MapConfig(conf)), state, clusterResourceManager, Optional.empty()); ContainerAllocator allocator = (ContainerAllocator) getPrivateFieldFromCpm("containerAllocator", cpm).get(cpm); @@ -506,8 +505,7 @@ private void testContainerRequestedRetriesNotExceedingWindowOnFailureWithUnknown state, containerManager); - ContainerProcessManager cpm = - buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator), mockLocalityManager); + cpm = buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator), mockLocalityManager); // start triggers a request cpm.start(); @@ -696,8 +694,7 @@ public void testAllBufferedResourcesAreUtilized() throws Exception { state, containerManager); - ContainerProcessManager cpm = - spy(buildContainerProcessManager(new ClusterManagerConfig(cfg), state, clusterResourceManager, Optional.of(allocator), mockLocalityManager)); + cpm = spy(buildContainerProcessManager(new ClusterManagerConfig(cfg), state, clusterResourceManager, Optional.of(allocator), mockLocalityManager)); cpm.start(); assertFalse(cpm.shouldShutdown());