ARTEMIS-2186 Large message incomplete when server is crashed
authoryang wei <wy96fyw@gmail.com>
Wed, 28 Nov 2018 12:37:09 +0000 (20:37 +0800)
committerMichael Andre Pearce <michael.andre.pearce@me.com>
Sat, 19 Jan 2019 08:16:09 +0000 (08:16 +0000)
artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/largemessage/ServerLargeMessageTest.java

index 431c19d..42a76be 100644 (file)
@@ -327,6 +327,7 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe
    public synchronized void releaseResources() {
       if (file != null && file.isOpen()) {
          try {
+            file.sync();
             file.close();
          } catch (Exception e) {
             ActiveMQServerLogger.LOGGER.largeMessageErrorReleasingResources(e);
index e325273..6f45d16 100644 (file)
  */
 package org.apache.activemq.artemis.tests.integration.largemessage;
 
+import java.io.File;
+import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.net.URL;
+import java.nio.ByteBuffer;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
@@ -30,6 +36,10 @@ import org.apache.activemq.artemis.api.core.client.ClientProducer;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
 import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.core.io.SequentialFile;
+import org.apache.activemq.artemis.core.io.buffer.TimedBuffer;
+import org.apache.activemq.artemis.core.journal.EncodingSupport;
 import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
 import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl;
 import org.apache.activemq.artemis.core.security.Role;
@@ -38,6 +48,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServers;
 import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
 import org.apache.activemq.artemis.tests.integration.security.SecurityTest;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -177,7 +188,154 @@ public class ServerLargeMessageTest extends ActiveMQTestBase {
       }
    }
 
-   // Package protected ---------------------------------------------
+   @Test
+   public void testLargeServerMessageSync() throws Exception {
+      final AtomicBoolean open = new AtomicBoolean(false);
+      final AtomicBoolean sync = new AtomicBoolean(false);
+
+      JournalStorageManager storageManager = new JournalStorageManager(createDefaultInVMConfig(), EmptyCriticalAnalyzer.getInstance(), getOrderedExecutor(), getOrderedExecutor()) {
+         @Override
+         public SequentialFile createFileForLargeMessage(long messageID, LargeMessageExtension extension) {
+            return new SequentialFile() {
+               @Override
+               public boolean isOpen() {
+                  return open.get();
+               }
+
+               @Override
+               public boolean exists() {
+                  return true;
+               }
+
+               @Override
+               public void open() throws Exception {
+                  open.set(true);
+               }
+
+               @Override
+               public void open(int maxIO, boolean useExecutor) throws Exception {
+                  open.set(true);
+               }
+
+               @Override
+               public boolean fits(int size) {
+                  return false;
+               }
+
+               @Override
+               public int calculateBlockStart(int position) throws Exception {
+                  return 0;
+               }
+
+               @Override
+               public String getFileName() {
+                  return null;
+               }
+
+               @Override
+               public void fill(int size) throws Exception {
+               }
+
+               @Override
+               public void delete() throws IOException, InterruptedException, ActiveMQException {
+               }
+
+               @Override
+               public void write(ActiveMQBuffer bytes, boolean sync, IOCallback callback) throws Exception {
+               }
+
+               @Override
+               public void write(ActiveMQBuffer bytes, boolean sync) throws Exception {
+               }
+
+               @Override
+               public void write(EncodingSupport bytes, boolean sync, IOCallback callback) throws Exception {
+               }
+
+               @Override
+               public void write(EncodingSupport bytes, boolean sync) throws Exception {
+               }
+
+               @Override
+               public void writeDirect(ByteBuffer bytes, boolean sync, IOCallback callback) {
+               }
+
+               @Override
+               public void writeDirect(ByteBuffer bytes, boolean sync) throws Exception {
+               }
+
+               @Override
+               public void blockingWriteDirect(ByteBuffer bytes, boolean sync, boolean releaseBuffer) throws Exception {
+               }
+
+               @Override
+               public int read(ByteBuffer bytes, IOCallback callback) throws Exception {
+                  return 0;
+               }
+
+               @Override
+               public int read(ByteBuffer bytes) throws Exception {
+                  return 0;
+               }
+
+               @Override
+               public void position(long pos) throws IOException {
+               }
+
+               @Override
+               public long position() {
+                  return 0;
+               }
+
+               @Override
+               public void close() throws Exception {
+                  open.set(false);
+               }
+
+               @Override
+               public void sync() throws IOException {
+                  sync.set(true);
+               }
+
+               @Override
+               public long size() throws Exception {
+                  return 0;
+               }
+
+               @Override
+               public void renameTo(String newFileName) throws Exception {
+               }
+
+               @Override
+               public SequentialFile cloneFile() {
+                  return null;
+               }
+
+               @Override
+               public void copyTo(SequentialFile newFileName) throws Exception {
+               }
+
+               @Override
+               public void setTimedBuffer(TimedBuffer buffer) {
+               }
+
+               @Override
+               public File getJavaFile() {
+                  return null;
+               }
+            };
+         }
+      };
+
+      LargeServerMessageImpl largeServerMessage = new LargeServerMessageImpl(storageManager);
+      largeServerMessage.setMessageID(1234);
+      largeServerMessage.addBytes(new byte[0]);
+      assertTrue(open.get());
+      largeServerMessage.releaseResources();
+      assertTrue(sync.get());
+   }
+
+      // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------