HADOOP-15481. Emit FairCallQueue stats as metrics. Contributed by Christopher Gregorian.
authorChen Liang <cliang@apache.org>
Fri, 11 Jan 2019 22:01:23 +0000 (14:01 -0800)
committerChen Liang <cliang@apache.org>
Fri, 11 Jan 2019 22:01:23 +0000 (14:01 -0800)
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java
hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java

index 3a8c83d..380426f 100644 (file)
@@ -35,6 +35,11 @@ import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.lang3.NotImplementedException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ipc.CallQueueManager.CallQueueOverflowException;
+import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.Interns;
 import org.apache.hadoop.metrics2.util.MBeans;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,7 +48,7 @@ import org.slf4j.LoggerFactory;
  * A queue with multiple levels for each priority.
  */
 public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
-  implements BlockingQueue<E> {
+    implements BlockingQueue<E> {
   @Deprecated
   public static final int    IPC_CALLQUEUE_PRIORITY_LEVELS_DEFAULT = 4;
   @Deprecated
@@ -335,7 +340,8 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
    * MetricsProxy is a singleton because we may init multiple
    * FairCallQueues, but the metrics system cannot unregister beans cleanly.
    */
-  private static final class MetricsProxy implements FairCallQueueMXBean {
+  private static final class MetricsProxy implements FairCallQueueMXBean,
+      MetricsSource {
     // One singleton per namespace
     private static final HashMap<String, MetricsProxy> INSTANCES =
       new HashMap<String, MetricsProxy>();
@@ -346,8 +352,13 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
     // Keep track of how many objects we registered
     private int revisionNumber = 0;
 
+    private String namespace;
+
     private MetricsProxy(String namespace) {
+      this.namespace = namespace;
       MBeans.register(namespace, "FairCallQueue", this);
+      final String name = namespace + ".FairCallQueue";
+      DefaultMetricsSystem.instance().register(name, name, this);
     }
 
     public static synchronized MetricsProxy getInstance(String namespace) {
@@ -389,6 +400,23 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
     @Override public int getRevision() {
       return revisionNumber;
     }
+
+    @Override
+    public void getMetrics(MetricsCollector collector, boolean all) {
+      MetricsRecordBuilder rb = collector.addRecord("FairCallQueue")
+          .setContext("rpc")
+          .tag(Interns.info("namespace", "Namespace"), namespace);
+
+      final int[] currentQueueSizes = getQueueSizes();
+      final long[] currentOverflowedCalls = getOverflowedCalls();
+
+      for (int i = 0; i < currentQueueSizes.length; i++) {
+        rb.addGauge(Interns.info("FairCallQueueSize_p" + i, "FCQ Queue Size"),
+            currentQueueSizes[i]);
+        rb.addCounter(Interns.info("FairCallQueueOverflowedCalls_p" + i,
+            "FCQ Overflowed Calls"), currentOverflowedCalls[i]);
+      }
+    }
   }
 
   // FairCallQueueMXBean
index 1e21940..1ef2b44 100644 (file)
@@ -104,6 +104,16 @@ RetryCache metrics is useful to monitor NameNode fail-over. Each metrics record
 | `CacheCleared` | Total number of RetryCache cleared |
 | `CacheUpdated` | Total number of RetryCache updated |
 
+FairCallQueue
+-------------
+
+FairCallQueue metrics will only exist if FairCallQueue is enabled. Each metric exists for each level of priority.
+
+| Name | Description |
+|:---- |:---- |
+| `FairCallQueueSize_p`*Priority* | Current number of calls in priority queue |
+| `FairCallQueueOverflowedCalls_p`*Priority* | Total number of overflowed calls in priority queue |
+
 rpcdetailed context
 ===================
 
index d82a2f1..776db5e 100644 (file)
@@ -18,6 +18,9 @@
 
 package org.apache.hadoop.ipc;
 
+import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
+import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
+import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
 import static org.mockito.Mockito.anyInt;
 import static org.mockito.Mockito.anyObject;
 import static org.mockito.Mockito.doThrow;
@@ -637,4 +640,37 @@ public class TestFairCallQueue {
     assertEquals(0, queueSizes[0]);
     assertEquals(0, queueSizes[1]);
   }
+
+  @Test
+  public void testFairCallQueueMetrics() throws Exception {
+    final String fcqMetrics = "ns.FairCallQueue";
+    Schedulable p0 = mockCall("a", 0);
+    Schedulable p1 = mockCall("b", 1);
+
+    assertGauge("FairCallQueueSize_p0", 0, getMetrics(fcqMetrics));
+    assertGauge("FairCallQueueSize_p1", 0, getMetrics(fcqMetrics));
+    assertCounter("FairCallQueueOverflowedCalls_p0", 0L,
+        getMetrics(fcqMetrics));
+    assertCounter("FairCallQueueOverflowedCalls_p1", 0L,
+        getMetrics(fcqMetrics));
+
+    for (int i = 0; i < 5; i++) {
+      fcq.add(p0);
+      fcq.add(p1);
+    }
+
+    try {
+      fcq.add(p1);
+      fail("didn't overflow");
+    } catch (IllegalStateException ise) {
+      // Expected exception
+    }
+
+    assertGauge("FairCallQueueSize_p0", 5, getMetrics(fcqMetrics));
+    assertGauge("FairCallQueueSize_p1", 5, getMetrics(fcqMetrics));
+    assertCounter("FairCallQueueOverflowedCalls_p0", 0L,
+        getMetrics(fcqMetrics));
+    assertCounter("FairCallQueueOverflowedCalls_p1", 1L,
+        getMetrics(fcqMetrics));
+  }
 }