SLING-7223: Spikes in RRD after restart
authorMarcel Reutegger <mreutegg@adobe.com>
Tue, 31 Oct 2017 15:00:51 +0000 (16:00 +0100)
committerChetan Mehrotra <chetanm@apache.org>
Wed, 1 Nov 2017 04:46:11 +0000 (10:16 +0530)
src/main/java/org/apache/sling/commons/metrics/rrd4j/impl/RRD4JReporter.java
src/test/java/org/apache/sling/commons/metrics/rrd4j/impl/RestartTest.java [new file with mode: 0644]

index 40dbd7c..5cb9418 100644 (file)
@@ -18,6 +18,7 @@
  */
 package org.apache.sling.commons.metrics.rrd4j.impl;
 
+import com.codahale.metrics.Clock;
 import com.codahale.metrics.Counter;
 import com.codahale.metrics.Gauge;
 import com.codahale.metrics.Histogram;
@@ -62,6 +63,7 @@ class RRD4JReporter extends ScheduledReporter {
 
     private final Map<String, Integer> dictionary = new HashMap<>();
     private final RrdDb rrdDB;
+    private final Clock clock;
     private long lastSampleTime;
 
     static Builder forRegistry(MetricRegistry metricRegistry) {
@@ -70,6 +72,7 @@ class RRD4JReporter extends ScheduledReporter {
 
     static class Builder {
         private MetricRegistry metricRegistry;
+        private Clock clock = Clock.defaultClock();
         private TimeUnit ratesUnit = TimeUnit.SECONDS;
         private TimeUnit durationUnit = TimeUnit.MICROSECONDS;
         private File path = new File(".");
@@ -144,6 +147,11 @@ class RRD4JReporter extends ScheduledReporter {
             return this;
         }
 
+        Builder withClock(Clock clock) {
+            this.clock = clock;
+            return this;
+        }
+
         Builder convertRatesTo(TimeUnit ratesUnit) {
             this.ratesUnit = ratesUnit;
             return this;
@@ -159,7 +167,7 @@ class RRD4JReporter extends ScheduledReporter {
                 return null;
             }
             return new RRD4JReporter(metricRegistry, "RRD4JReporter", MetricFilter.ALL, ratesUnit, durationUnit,
-                    dictionary, createDef());
+                    dictionary, createDef(), clock);
         }
 
         private String checkDataSource(String ds) throws IllegalArgumentException {
@@ -190,16 +198,23 @@ class RRD4JReporter extends ScheduledReporter {
                   TimeUnit rateUnit,
                   TimeUnit durationUnit,
                   Map<String, Integer> dictionary,
-                  RrdDef rrdDef) throws IOException {
+                  RrdDef rrdDef,
+                  Clock clock) throws IOException {
         super(registry, name, filter, rateUnit, durationUnit);
         this.dictionary.putAll(dictionary);
         this.rrdDB = createDB(rrdDef);
+        this.clock = clock;
         storeDictionary(rrdDef.getPath() + PROPERTIES_SUFFIX);
+        writeUnknownSample();
     }
 
     @Override
     public void close() {
         try {
+            // write an unknown sample before closing the DB
+            if (!rrdDB.isClosed()) {
+                writeUnknownSample();
+            }
             rrdDB.close();
         } catch (IOException e) {
             LOGGER.warn("Closing RRD failed", e);
@@ -213,7 +228,7 @@ class RRD4JReporter extends ScheduledReporter {
                        SortedMap<String, Histogram> histograms,
                        SortedMap<String, Meter> meters,
                        SortedMap<String, Timer> timers) {
-        long sampleTime = System.currentTimeMillis() / 1000;
+        long sampleTime = clock.getTime() / 1000;
         if (sampleTime <= lastSampleTime) {
             // sample at most once a second
             return;
@@ -341,6 +356,12 @@ class RRD4JReporter extends ScheduledReporter {
         }
     }
 
+    private void writeUnknownSample() throws IOException {
+        long updateTime = rrdDB.getLastUpdateTime() + 1;
+        rrdDB.createSample(updateTime).update();
+        lastSampleTime = updateTime;
+    }
+
     private static RrdDb createDB(RrdDef definition) throws IOException {
         File dbFile = new File(definition.getPath());
         if (!dbFile.getParentFile().exists()) {
diff --git a/src/test/java/org/apache/sling/commons/metrics/rrd4j/impl/RestartTest.java b/src/test/java/org/apache/sling/commons/metrics/rrd4j/impl/RestartTest.java
new file mode 100644 (file)
index 0000000..7dd4c88
--- /dev/null
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.commons.metrics.rrd4j.impl;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.codahale.metrics.Clock;
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.MetricRegistry;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.rrd4j.core.RrdDb;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.junit.Assert.assertTrue;
+
+public class RestartTest {
+
+    private static final File RRD = new File(new File("target", "metrics"), "metrics.rrd");
+
+    private MetricRegistry registry = new MetricRegistry();
+    private RRD4JReporter reporter;
+    private Counter counter = new Counter();
+    private TestClock clock = new TestClock();
+
+    @Before
+    public void before() throws IOException {
+        RRD.delete();
+        clock.waitUntil(System.currentTimeMillis());
+        registry.register("myCounter", counter);
+        reporter = createReporter();
+    }
+
+    @After
+    public void after() {
+        reporter.close();
+        RRD.delete();
+    }
+
+    @Test
+    public void restart() throws Exception {
+        restart(10);
+    }
+
+    @Test
+    public void restartNoDelay() throws Exception {
+        restart(0);
+    }
+
+    private void restart(int delaySecs) throws Exception {
+        long start = clock.getTime() / 1000;
+        // report some samples
+        for (int i = 0; i < 3; i++) {
+            counter.inc();
+            wait(1, SECONDS);
+            reporter.report();
+        }
+        // shut down
+        reporter.close();
+        // set count to zero
+        counter.dec(counter.getCount());
+        // restart after some delay
+        wait(delaySecs, SECONDS);
+        reporter = createReporter();
+        // report some samples
+        for (int i = 0; i < 3; i++) {
+            counter.inc();
+            wait(1, SECONDS);
+            reporter.report();
+        }
+        // shut down
+        reporter.close();
+        long end = clock.getTime() / 1000;
+        // check DB
+        RrdDb db = new RrdDb(RRD.getPath());
+        try {
+            double[] values = db.createFetchRequest(
+                    db.getArchive(0).getConsolFun(), start, end)
+                        .fetchData().getValues("0");
+            for (double v : values) {
+                if (Double.isNaN(v)) {
+                    continue;
+                }
+                long longValue = (long) v;
+                assertTrue(longValue + " > 1", longValue <= 1L);
+            }
+        } finally {
+            db.close();
+        }
+    }
+
+    private void wait(long duration, TimeUnit unit) {
+        clock.waitUntil(clock.getTime() + unit.toMillis(duration));
+    }
+
+    private RRD4JReporter createReporter() throws IOException {
+        return RRD4JReporter.forRegistry(registry)
+                .withPath(RRD)
+                .withArchives(new String[]{"RRA:AVERAGE:0.5:1:60"})
+                .withDatasources(new String[]{"DS:myCounter:COUNTER:300:0:U"})
+                .withStep(1)
+                .withClock(clock)
+                .build();
+    }
+
+    private static class TestClock extends Clock.UserTimeClock {
+
+        private AtomicLong time = new AtomicLong();
+
+        @Override
+        public long getTime() {
+            return time.get();
+        }
+
+        void waitUntil(long time) {
+            while (this.time.get() < time) {
+                this.time.compareAndSet(this.time.get(), time);
+            }
+        }
+    }
+}