[AMQ-7126] Prevent OOM when recovering KahaDB and memory space is insufficient to...
authorjgoodyear <jgoodyear@apache.org>
Thu, 3 Jan 2019 19:16:39 +0000 (15:46 -0330)
committerjgoodyear <jgoodyear@apache.org>
Thu, 3 Jan 2019 19:16:39 +0000 (15:46 -0330)
activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4930Test.java
activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java
activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingTest.java

index e719686..d36bff2 100644 (file)
@@ -679,7 +679,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
                             msg.getMessageId().setFutureOrSequenceLong(entry.getKey());
                             listener.recoverMessage(msg);
                             counter++;
-                            if (counter >= maxReturned) {
+                            if (counter >= maxReturned || !listener.hasSpace()) {
                                 break;
                             }
                         }
index bc53710..f9ce9e7 100644 (file)
@@ -737,7 +737,7 @@ class DBManager(val parent:LevelDBStore) {
         lastmsgid = msg.getMessageId
         count += 1
       }
-      count < max
+      count < max && listener.hasSpace
     }
     if( lastmsgid==null ) {
       startPos
index e65ad91..8f6fbb2 100644 (file)
@@ -46,7 +46,7 @@ public class AMQ4930Test extends TestCase {
     protected void configureBroker() throws Exception {
         broker.setDeleteAllMessagesOnStartup(true);
         broker.setAdvisorySupport(false);
-        broker.getSystemUsage().getMemoryUsage().setLimit(1*1024*1024);
+        broker.getSystemUsage().getMemoryUsage().setLimit(100*1024*1024);
 
         PolicyMap pMap = new PolicyMap();
         PolicyEntry policy = new PolicyEntry();
index 760876c..fa27175 100644 (file)
@@ -133,18 +133,10 @@ public class MemoryLimitTest extends TestSupport {
         Message msg = consumer.receive(5000);
         msg.acknowledge();
 
-        // this should free some space and allow us to get new batch of messages in the memory
-        // exceeding the limit
-        assertTrue("Limit is exceeded", Wait.waitFor(new Wait.Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                LOG.info("Destination usage: " + dest.getMemoryUsage());
-                return dest.getMemoryUsage().getPercentUsage() >= 200;
-            }
-        }));
+        assertTrue("Should be less than 70% of limit but was: " + percentUsage, percentUsage <= 71);
 
         LOG.info("Broker usage: " + broker.getSystemUsage().getMemoryUsage());
-        assertTrue(broker.getSystemUsage().getMemoryUsage().getPercentUsage() >= 200);
+        assertTrue(broker.getSystemUsage().getMemoryUsage().getPercentUsage() <= 71);
 
         // let's make sure we can consume all messages
         for (int i = 1; i < 2000; i++) {
index 29b6e72..3e2d067 100644 (file)
@@ -182,7 +182,7 @@ public class QueueBrowsingTest {
 
     @Test
     public void testMemoryLimit() throws Exception {
-        broker.getSystemUsage().getMemoryUsage().setLimit(16 * 1024);
+        broker.getSystemUsage().getMemoryUsage().setLimit((maxPageSize + 10) * 4 * 1024);
 
         int messageToSend = 370;