upgrading tck which implied moving to EMA for meter and updating a bit histogram...
authorRomain Manni-Bucau <rmannibucau@apache.org>
Sat, 27 Oct 2018 18:05:31 +0000 (20:05 +0200)
committerRomain Manni-Bucau <rmannibucau@apache.org>
Sat, 27 Oct 2018 18:05:31 +0000 (20:05 +0200)
geronimo-metrics-common/src/main/java/org/apache/geronimo/microprofile/metrics/common/HistogramImpl.java
geronimo-metrics-common/src/main/java/org/apache/geronimo/microprofile/metrics/common/MeterImpl.java
geronimo-metrics-common/src/main/java/org/apache/geronimo/microprofile/metrics/common/expdecay/ExponentialMovingAverage.java [new file with mode: 0644]
pom.xml

index b0f93d7..58fcff2 100644 (file)
  */
 package org.apache.geronimo.microprofile.metrics.common;
 
+import static java.util.Comparator.comparing;
+import static java.util.stream.Collectors.toMap;
+
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
-import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.LongAdder;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.LongStream;
 import java.util.stream.Stream;
 
@@ -33,15 +38,22 @@ import javax.json.bind.annotation.JsonbTransient;
 import org.eclipse.microprofile.metrics.Histogram;
 import org.eclipse.microprofile.metrics.Snapshot;
 
-// todo? rework it to use the classical exponential decay impl?
+// impl adapted from apache sirona
 public class HistogramImpl implements Histogram {
-    private static final long INTERVAL_NS = TimeUnit.MINUTES.toNanos(1);
-    private static final Value[] EMPTY_VALUES_ARRAY = new Value[0];
+    // potential config
+    private static final double ALPHA = Double.parseDouble(System.getProperty("geronimo.metrics.storage.alpha", "0.015"));
+    private static final int BUCKET_SIZE = Integer.getInteger("geronimo.metrics.storage.size", 1024);
+    private static final long REFRESH_INTERVAL = TimeUnit.HOURS.toNanos(1);
+
+    private static final Value[] EMPTY_ARRAY = new Value[0];
 
     private final String unit;
-    private final LongAdder count = new LongAdder();
-    private final Collection<Value> values = new CopyOnWriteArrayList<>();
-    private final AtomicLong lastCleanUp = new AtomicLong(System.nanoTime());
+
+    private final ReadWriteLock lock = new ReentrantReadWriteLock();
+    private final AtomicLong count = new AtomicLong();
+    private final ConcurrentSkipListMap<Double, Value> bucket = new ConcurrentSkipListMap<>();
+    private final AtomicLong nextRefreshTime = new AtomicLong(System.nanoTime() + REFRESH_INTERVAL);
+    private volatile long startTime = nowSec();
 
     public HistogramImpl(final String unit) {
         this.unit = unit;
@@ -54,21 +66,18 @@ public class HistogramImpl implements Histogram {
 
     @Override
     public synchronized void update(final long value) {
-        refresh();
-        count.increment();
-        values.add(new Value(System.nanoTime(), value));
+        add(value);
     }
 
     @Override
     public long getCount() {
-        return count.sum();
+        return count.get();
     }
 
     @Override
     @JsonbTransient
     public Snapshot getSnapshot() {
-        refresh();
-        return new SnapshotImpl(values.toArray(EMPTY_VALUES_ARRAY));
+        return snapshot();
     }
 
     public String getUnit() {
@@ -115,67 +124,134 @@ public class HistogramImpl implements Histogram {
         return getSnapshot().getStdDev();
     }
 
-    // cheap way to avoid to explode the mem for nothing
-    private void refresh() {
+    private long nowSec() {
+        return TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
+    }
+
+    public void add(final long value) {
+        ensureUpToDate();
+
+        final Lock lock = this.lock.readLock();
+        lock.lock();
+        try {
+            final Value sample = new Value(value, Math.exp(ALPHA * (nowSec() - startTime)));
+            final double priority = sample.weight / Math.random();
+
+            final long size = count.incrementAndGet();
+            if (size <= BUCKET_SIZE) {
+                bucket.put(priority, sample);
+            } else { // iterate through the bucket until we need removing low priority entries to get a new space
+                double first = bucket.firstKey();
+                if (first < priority && bucket.putIfAbsent(priority, sample) == null) {
+                    while (bucket.remove(first) == null) {
+                        first = bucket.firstKey();
+                    }
+                }
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    private void ensureUpToDate() {
+        final long next = nextRefreshTime.get();
         final long now = System.nanoTime();
-        final long lastUpdateNs = lastCleanUp.get();
-        final long elaspsedTime = now - lastUpdateNs;
-        if (elaspsedTime > INTERVAL_NS && lastCleanUp.compareAndSet(lastUpdateNs, now)) {
-            final long cleanFrom = now - INTERVAL_NS;
-            values.removeIf(it -> it.timestamp > cleanFrom);
+        if (now < next) {
+            return;
+        }
+
+        final Lock lock = this.lock.writeLock();
+        lock.lock();
+        try {
+            if (nextRefreshTime.compareAndSet(next, now + REFRESH_INTERVAL)) {
+                final long oldStartTime = startTime;
+                startTime = nowSec();
+                final double updateFactor = Math.exp(-ALPHA * (startTime - oldStartTime));
+                if (updateFactor != 0.) {
+                    bucket.putAll(new ArrayList<>(bucket.keySet()).stream()
+                            .collect(toMap(k -> k * updateFactor, k -> {
+                                final Value previous = bucket.remove(k);
+                                return new Value(previous.value, previous.weight * updateFactor);
+                            })));
+                    count.set(bucket.size()); // N keys can lead to the same key so we must update it
+                } else {
+                    bucket.clear();
+                    count.set(0);
+                }
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public Snapshot snapshot() {
+        ensureUpToDate();
+        final Lock lock = this.lock.readLock();
+        lock.lock();
+        try {
+            return new SnapshotImpl(bucket.values().toArray(EMPTY_ARRAY));
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    private static final class Value {
+        private final long value;
+        private final double weight;
+
+        private Value(final long value, final double weight) {
+            this.value = value;
+            this.weight= weight;
         }
     }
 
     private static class SnapshotImpl extends Snapshot {
         private final Value[] values;
-        private volatile long[] sorted;
+        private Value[] sorted;
 
         private SnapshotImpl(final Value[] values) {
             this.values = values;
+            // no high computation here, we are under lock + all methods are not called in general
         }
 
         @Override
-        public double getValue(final double quantile) {
-            if (values.length == 0) {
-                return 0;
-            }
-            if (values.length == 1) {
-                return values[0].value;
-            }
-            if (sorted == null) {
-                synchronized (this) {
-                    if (sorted == null) {
-                        sorted = getValues();
-                        Arrays.sort(sorted);
-                    }
-                }
-            }
-            return sorted[(int) Math.floor((sorted.length - 1) * quantile)];
+        public int size() {
+            return values.length;
         }
 
         @Override
         public long[] getValues() {
-            return longs().toArray();
-        }
-
-        @Override
-        public int size() {
-            return values.length;
+            return values(sorted()).toArray();
         }
 
         @Override
         public long getMax() {
-            return longs().max().orElse(0);
+            if (values.length == 0) {
+                return 0;
+            }
+            if (sorted != null) {
+                return sorted[sorted.length - 1].value;
+            }
+            return values(values).max().orElse(0);
         }
 
         @Override
-        public double getMean() {
-            return longs().sum() / (double) values.length;
+        public long getMin() {
+            if (values.length == 0) {
+                return 0;
+            }
+            if (sorted != null) {
+                return sorted[0].value;
+            }
+            return values(values).min().orElse(0);
         }
 
         @Override
-        public long getMin() {
-            return longs().min().orElse(0);
+        public double getMean() {
+            if (values.length == 0) {
+                return 0;
+            }
+            return values(values).sum() * 1. / values.length;
         }
 
         @Override
@@ -184,12 +260,15 @@ public class HistogramImpl implements Histogram {
                 return 0;
             }
             final double mean = getMean();
-            return Math.sqrt(longs().map(v -> (long) Math.pow(v - mean, 2)).sum() / values.length - 1);
+            final double sumWeight = Stream.of(values).mapToDouble(i -> i.weight).sum();
+            return Math.sqrt(Stream.of(values)
+                    .mapToDouble(v -> Math.pow(v.value - mean, 2) * (v.weight / sumWeight))
+                    .sum());
         }
 
         @Override
         public void dump(final OutputStream output) {
-            longs().forEach(v -> {
+            values(sorted()).forEach(v -> {
                 try {
                     output.write((v + "\n").getBytes(StandardCharsets.UTF_8));
                 } catch (final IOException e) {
@@ -198,18 +277,36 @@ public class HistogramImpl implements Histogram {
             });
         }
 
-        private LongStream longs() {
-            return Stream.of(values).mapToLong(i -> i.value);
+        @Override
+        public double getValue(final double quantile) {
+            if (!(quantile >= 0 || quantile <= 1)) {
+                throw new IllegalArgumentException("Quantile " + quantile + " is invalid");
+            }
+            if (values.length == 0) {
+                return 0;
+            }
+            if (values.length == 1) {
+                return values[0].value;
+            }
+            final int idx = (int) Math.floor((values.length - 1) * quantile);
+            return sorted()[idx].value;
         }
-    }
 
-    private static final class Value {
-        private final long timestamp;
-        private final long value;
+        private Value[] sorted() {
+            if (sorted == null) {
+                synchronized (this) {
+                    if (sorted == null) {
+                        sorted = new Value[values.length];
+                        System.arraycopy(values, 0, sorted, 0, values.length);
+                        Arrays.sort(sorted, comparing(i -> i.value));
+                    }
+                }
+            }
+            return sorted;
+        }
 
-        private Value(final long timestamp, final long value) {
-            this.timestamp = timestamp;
-            this.value = value;
+        private LongStream values(final Value[] values) {
+            return Stream.of(values).mapToLong(i -> i.value);
         }
     }
 }
index 82faf25..6895dbb 100644 (file)
  */
 package org.apache.geronimo.microprofile.metrics.common;
 
+import static org.apache.geronimo.microprofile.metrics.common.expdecay.ExponentialMovingAverage.forMinutes;
+
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.LongAdder;
+import java.util.stream.Stream;
 
 import javax.json.bind.annotation.JsonbProperty;
 
+import org.apache.geronimo.microprofile.metrics.common.expdecay.ExponentialMovingAverage;
 import org.eclipse.microprofile.metrics.Meter;
 
 public class MeterImpl implements Meter {
-    private static final long INTERVAL_NS;
-    private static final double ALPHA_MN;
-    private static final double ALPHA_5MN;
-    private static final double ALPHA_15MN;
-    static {
-        final double interval = 5; // this is not needed at runtime so no need of a constant
-        INTERVAL_NS = TimeUnit.SECONDS.toNanos((int) interval * 60);
-        ALPHA_MN = 1 - Math.exp(-interval / 60.);
-        ALPHA_5MN = 1 - Math.exp(-interval / 12.);
-        ALPHA_15MN = 1 - Math.exp(-interval / 4.);
-    }
+    private static final long REFRESH_INTERVAL = TimeUnit.SECONDS.toNanos(5);
 
+    private final AtomicLong lastRefresh = new AtomicLong(System.nanoTime());
     private final LongAdder count = new LongAdder();
-    private final Rate rate15 = new Rate(ALPHA_15MN, INTERVAL_NS);
-    private final Rate rate5 = new Rate(ALPHA_5MN, INTERVAL_NS);
-    private final Rate rate1 = new Rate(ALPHA_MN, INTERVAL_NS);
+    private final ExponentialMovingAverage rate15 = forMinutes(15);
+    private final ExponentialMovingAverage rate5 = forMinutes(5);
+    private final ExponentialMovingAverage rate1 = forMinutes(1);
     private final long initNs = System.nanoTime();
-    private final AtomicLong lastUpdate = new AtomicLong(System.nanoTime());
     private final String unit;
 
     public MeterImpl(final String unit) {
@@ -58,13 +52,13 @@ public class MeterImpl implements Meter {
         mark(1);
     }
 
-    @Override // this is not the most beautiful piece but locking here would be a perf killer
+    @Override
     public void mark(final long n) {
-        doRefresh();
+        updateIfNeeded();
         count.add(n);
-        rate1.update(n);
-        rate5.update(n);
-        rate15.update(n);
+        rate1.add(n);
+        rate5.add(n);
+        rate15.add(n);
     }
 
     @Override
@@ -75,22 +69,22 @@ public class MeterImpl implements Meter {
     @Override
     @JsonbProperty("fifteenMinRate")
     public double getFifteenMinuteRate() {
-        doRefresh();
-        return rate15.value;
+        updateIfNeeded();
+        return rate15.rate();
     }
 
     @Override
     @JsonbProperty("fiveMinRate")
     public double getFiveMinuteRate() {
-        doRefresh();
-        return rate5.value;
+        updateIfNeeded();
+        return rate5.rate();
     }
 
     @Override
     @JsonbProperty("oneMinRate")
     public double getOneMinuteRate() {
-        doRefresh();
-        return rate1.value;
+        updateIfNeeded();
+        return rate1.rate();
     }
 
     @Override
@@ -107,53 +101,15 @@ public class MeterImpl implements Meter {
         if (seconds == 0) {
             return 0;
         }
-        return count / seconds;
+        return count * 1. / seconds;
     }
 
-    private void doRefresh() {
+    private void updateIfNeeded() {
         final long now = System.nanoTime();
-        final long lastUpdateNs = lastUpdate.get();
-        final long elaspsedTime = now - lastUpdateNs;
-        if (elaspsedTime > INTERVAL_NS && lastUpdate.compareAndSet(lastUpdateNs, now)) {
-            final long diff = elaspsedTime / INTERVAL_NS;
-            for (long it = 0; it < diff; it++) { // simulate time, avoids a background thread
-                rate1.refresh();
-                rate5.refresh();
-                rate15.refresh();
-            }
-        }
-    }
-
-    private static class Rate {
-        private volatile double value = 0;
-
-        private final double alpha;
-        private final double interval;
-        private final LongAdder updates = new LongAdder();
-        private volatile boolean initialized = false;
-
-        private Rate(final double alpha, final long interval) {
-            this.interval = interval;
-            this.alpha = alpha;
-        }
-
-        private void update(final long n) {
-            updates.add(n);
-        }
-
-        private void refresh() {
-            final long count = updates.sumThenReset();
-            final double val = count / interval;
-            if (!initialized) {
-                synchronized (this) {
-                    value = val;
-                }
-                initialized = true;
-                return;
-            }
-            synchronized (this) {
-                value += (val - value) * alpha;
-            }
+        final long previousRefresh = lastRefresh.get();
+        if (now - previousRefresh >= REFRESH_INTERVAL && lastRefresh.compareAndSet(previousRefresh, now)) {
+            lastRefresh.set(now);
+            Stream.of(rate1, rate5, rate15).forEach(ExponentialMovingAverage::refresh);
         }
     }
 }
diff --git a/geronimo-metrics-common/src/main/java/org/apache/geronimo/microprofile/metrics/common/expdecay/ExponentialMovingAverage.java b/geronimo-metrics-common/src/main/java/org/apache/geronimo/microprofile/metrics/common/expdecay/ExponentialMovingAverage.java
new file mode 100644 (file)
index 0000000..b9bfe37
--- /dev/null
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geronimo.microprofile.metrics.common.expdecay;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAdder;
+
+public class ExponentialMovingAverage {
+    private static final double INTERVAL = SECONDS.toNanos(5);
+    private static final double RATE_RATIO = TimeUnit.SECONDS.toNanos(1);
+
+    private final LongAdder accumulator = new LongAdder();
+    private final double alpha;
+
+    private volatile double rate = 0.0;
+
+    private ExponentialMovingAverage(final double alpha) {
+        this.alpha = alpha;
+    }
+
+    public double rate() {
+        return rate * RATE_RATIO;
+    }
+
+    public void add(final long n) {
+        accumulator.add(n);
+    }
+
+    public void refresh() {
+        final long count = accumulator.sumThenReset();
+        final double instantRate = count / INTERVAL;
+        final double newRate = rate == 0. ? instantRate : nextRate(instantRate);
+        this.rate = newRate;
+    }
+
+    private double nextRate(final double instantRate) {
+        return rate + (alpha * (instantRate - rate));
+    }
+
+    public static ExponentialMovingAverage forMinutes(final int minutes) {
+        return new ExponentialMovingAverage(Math.exp(-5/*INTERVAL in sec*/ / 60. / minutes));
+    }
+}
diff --git a/pom.xml b/pom.xml
index d735d9b..832fc69 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -42,7 +42,7 @@
   </scm>
 
   <properties>
-    <spec.version>1.1</spec.version>
+    <spec.version>1.1.1</spec.version>
     <arquillian.version>1.1.8.Final</arquillian.version>
     <meecrowave.version>1.2.3</meecrowave.version>
   </properties>