[AMQ-7126] Improvement to perf of 5266Test
authorjgoodyear <jgoodyear@apache.org>
Tue, 8 Jan 2019 00:52:34 +0000 (21:22 -0330)
committerjgoodyear <jgoodyear@apache.org>
Tue, 8 Jan 2019 00:52:34 +0000 (21:22 -0330)
activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
activemq-broker/src/main/java/org/apache/activemq/store/MessageRecoveryListener.java
activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java

index 83465a1..e0250f2 100644 (file)
@@ -177,6 +177,16 @@ public abstract class AbstractPendingMessageCursor implements PendingMessageCurs
         return systemUsage != null ? (!isParentFull() && systemUsage.getMemoryUsage().getPercentUsage() < memoryUsageHighWaterMark) : true;
     }
 
+    boolean parentHasSpace(int waterMark) {
+        boolean result = true;
+        if (systemUsage != null) {
+            if (systemUsage.getMemoryUsage().getParent() != null) {
+                return systemUsage.getMemoryUsage().getParent().getPercentUsage() <= waterMark;
+            }
+        }
+        return result;
+    }
+
     private boolean isParentFull() {
         boolean result = false;
         if (systemUsage != null) {
index 2dd934f..71a83ac 100644 (file)
@@ -287,6 +287,12 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
         return useCache && size==0 && hasSpace() && isStarted();
     }
 
+    @Override
+    public boolean canRecoveryNextMessage() {
+        // Should be safe to recovery messages if the overall memory usage if < 90%
+        return parentHasSpace(90);
+    }
+
     private void syncWithStore(Message currentAdd) throws Exception {
         pruneLastCached();
         for (ListIterator<MessageId> it = pendingCachedIds.listIterator(pendingCachedIds.size()); it.hasPrevious(); ) {
index 5cbeac9..8d9bf62 100644 (file)
@@ -26,6 +26,9 @@ public interface MessageRecoveryListener {
     boolean recoverMessage(Message message) throws Exception;
     boolean recoverMessageReference(MessageId ref) throws Exception;
     boolean hasSpace();
+    default boolean canRecoveryNextMessage() {
+        return true;
+    }
     /**
      * check if ref is a duplicate but do not record the reference
      * @param ref
index d36bff2..0351d06 100644 (file)
@@ -679,7 +679,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
                             msg.getMessageId().setFutureOrSequenceLong(entry.getKey());
                             listener.recoverMessage(msg);
                             counter++;
-                            if (counter >= maxReturned || !listener.hasSpace()) {
+                            if (counter >= maxReturned || !listener.canRecoveryNextMessage()) {
                                 break;
                             }
                         }
index f9ce9e7..8119a9b 100644 (file)
@@ -737,7 +737,7 @@ class DBManager(val parent:LevelDBStore) {
         lastmsgid = msg.getMessageId
         count += 1
       }
-      count < max && listener.hasSpace
+      count < max && listener.canRecoveryNextMessage
     }
     if( lastmsgid==null ) {
       startPos
index fa27175..4c0747f 100644 (file)
@@ -136,7 +136,7 @@ public class MemoryLimitTest extends TestSupport {
         assertTrue("Should be less than 70% of limit but was: " + percentUsage, percentUsage <= 71);
 
         LOG.info("Broker usage: " + broker.getSystemUsage().getMemoryUsage());
-        assertTrue(broker.getSystemUsage().getMemoryUsage().getPercentUsage() <= 71);
+        assertTrue(broker.getSystemUsage().getMemoryUsage().getPercentUsage() <= 91);
 
         // let's make sure we can consume all messages
         for (int i = 1; i < 2000; i++) {