RATIS-473. Clarify size and length methods on LogStream
authorJosh Elser <elserj@apache.org>
Wed, 9 Jan 2019 20:28:20 +0000 (15:28 -0500)
committerJosh Elser <elserj@apache.org>
Mon, 28 Jan 2019 21:07:21 +0000 (16:07 -0500)
getLength() was meant to be the number of records in a log
but was implemented as the size in bytes of the records in
the log. This commit adds a getSize() which takes the old
implementation of getLength(), and then reimplements
getLength() as the number of records in the log.

Signed-off-by: Vladimir Rodionov <vrodionov@apache.org>
Signed-off-by: Rajeshbabu Chintaguntla <rajeshbabu@apache.org>
ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStream.java
ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogStreamImpl.java
ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java
ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceProtoUtil.java
ratis-logservice/src/main/proto/LogService.proto
ratis-logservice/src/test/java/org/apache/ratis/logservice/LogServiceReadWriteBase.java

index fbb977f..f1123af 100644 (file)
@@ -52,6 +52,12 @@ public interface LogStream extends AutoCloseable{
   long getSize() throws IOException;
 
   /**
+   * Returns the number of records in this log.
+   * @throws IOException
+   */
+  long getLength() throws IOException;
+
+  /**
    * Creates a reader to read this LogStream.
    *
    * @return A synchronous reader
index 2ab0103..3f41c80 100644 (file)
@@ -32,6 +32,7 @@ import org.apache.ratis.logservice.api.LogWriter;
 import org.apache.ratis.logservice.api.RecordListener;
 import org.apache.ratis.logservice.proto.LogServiceProtos.GetLogLastCommittedIndexReplyProto;
 import org.apache.ratis.logservice.proto.LogServiceProtos.GetLogLengthReplyProto;
+import org.apache.ratis.logservice.proto.LogServiceProtos.GetLogSizeReplyProto;
 import org.apache.ratis.logservice.proto.LogServiceProtos.GetLogStartIndexReplyProto;
 import org.apache.ratis.logservice.proto.LogServiceProtos.LogServiceException;
 import org.apache.ratis.logservice.util.LogServiceProtoUtil;
@@ -102,16 +103,30 @@ public class LogStreamImpl implements LogStream {
 
   @Override
   public long getSize() throws IOException{
-      RaftClientReply reply = raftClient
-          .sendReadOnly(Message.valueOf(LogServiceProtoUtil
-              .toGetLengthRequestProto(name).toByteString()));
-      GetLogLengthReplyProto proto =
-          GetLogLengthReplyProto.parseFrom(reply.getMessage().getContent());
-      if (proto.hasException()) {
-        LogServiceException e = proto.getException();
-        throw new IOException(e.getErrorMsg());
-      }
-      return proto.getLength();
+    RaftClientReply reply = raftClient
+        .sendReadOnly(Message.valueOf(LogServiceProtoUtil
+            .toGetSizeRequestProto(name).toByteString()));
+    GetLogSizeReplyProto proto =
+        GetLogSizeReplyProto.parseFrom(reply.getMessage().getContent());
+    if (proto.hasException()) {
+      LogServiceException e = proto.getException();
+      throw new IOException(e.getErrorMsg());
+    }
+    return proto.getSize();
+  }
+
+  @Override
+  public long getLength() throws IOException {
+    RaftClientReply reply = raftClient
+        .sendReadOnly(Message.valueOf(LogServiceProtoUtil
+            .toGetLengthRequestProto(name).toByteString()));
+    GetLogLengthReplyProto proto =
+        GetLogLengthReplyProto.parseFrom(reply.getMessage().getContent());
+    if (proto.hasException()) {
+      LogServiceException e = proto.getException();
+      throw new IOException(e.getErrorMsg());
+    }
+    return proto.getLength();
   }
 
   @Override
index 2ee5a53..900aac1 100644 (file)
@@ -62,11 +62,10 @@ public class LogStateMachine extends BaseStateMachine {
   }
 
   /*
-   *  State is a pair log's length and state (closed/open);
+   *  State is a log's length, size, and state (closed/open);
    */
-
+  private long size;
   private long length;
-
   private State state = State.OPEN;
 
   private final SimpleStateMachineStorage storage = new SimpleStateMachineStorage();
@@ -92,6 +91,7 @@ public class LogStateMachine extends BaseStateMachine {
    */
   void reset() {
     this.length = 0;
+    this.size = 0;
     setLastAppliedTermIndex(null);
   }
 
@@ -132,6 +132,7 @@ public class LogStateMachine extends BaseStateMachine {
         final ObjectOutputStream out = new ObjectOutputStream(
         new BufferedOutputStream(new FileOutputStream(snapshotFile)))) {
       out.writeLong(length);
+      out.writeLong(size);
       out.writeObject(state);
     } catch(IOException ioe) {
       LOG.warn("Failed to write snapshot file \"" + snapshotFile
@@ -165,6 +166,7 @@ public class LogStateMachine extends BaseStateMachine {
       }
       setLastAppliedTermIndex(last);
       this.length = in.readLong();
+      this.size = in.readLong();
       this.state = (State) in.readObject();
     } catch (ClassNotFoundException e) {
       throw new IllegalStateException(e);
@@ -190,14 +192,16 @@ public class LogStateMachine extends BaseStateMachine {
 
         case READNEXTQUERY:
           return processReadRequest(logServiceRequestProto);
-        case LENGTHQUERY:
-          return processGetLengthRequest(logServiceRequestProto);
+        case SIZEREQUEST:
+          return processGetSizeRequest(logServiceRequestProto);
         case STARTINDEXQUERY:
           return processGetStartIndexRequest(logServiceRequestProto);
         case GETSTATE:
           return processGetStateRequest(logServiceRequestProto);
         case LASTINDEXQUERY:
           return processGetLastCommittedIndexRequest(logServiceRequestProto);
+        case LENGTHQUERY:
+          return processGetLengthRequest(logServiceRequestProto);
         default:
           // TODO
           throw new RuntimeException(
@@ -246,6 +250,14 @@ public class LogStateMachine extends BaseStateMachine {
    * @param msg message
    * @return reply message
    */
+  private CompletableFuture<Message> processGetSizeRequest(LogServiceRequestProto proto) {
+    GetLogSizeRequestProto msgProto = proto.getSizeRequest();
+    Throwable t = verifyState(State.OPEN);
+    LOG.debug("QUERY: {}, RESULT: {}", msgProto, this.size);
+    return CompletableFuture.completedFuture(Message
+      .valueOf(LogServiceProtoUtil.toGetLogSizeReplyProto(this.size, t).toByteString()));
+  }
+
   private CompletableFuture<Message> processGetLengthRequest(LogServiceRequestProto proto) {
     GetLogLengthRequestProto msgProto = proto.getLengthQuery();
     Throwable t = verifyState(State.OPEN);
@@ -253,7 +265,6 @@ public class LogStateMachine extends BaseStateMachine {
     return CompletableFuture.completedFuture(Message
       .valueOf(LogServiceProtoUtil.toGetLogLengthReplyProto(this.length, t).toByteString()));
   }
-
   /**
    * Process read log entries request
    * @param msg message
@@ -311,15 +322,16 @@ public class LogStateMachine extends BaseStateMachine {
     final LogEntryProto entry = trx.getLogEntry();
     AppendLogEntryRequestProto proto = logProto.getAppendRequest();
     final long index = entry.getIndex();
-    long total = 0;
+    long newSize = 0;
     Throwable t = verifyState(State.OPEN);
     if (t == null) {
       try (final AutoCloseableLock writeLock = writeLock()) {
           List<byte[]> entries = LogServiceProtoUtil.toListByteArray(proto.getDataList());
           for (byte[] bb : entries) {
-            total += bb.length;
+            newSize += bb.length;
           }
-          this.length += total;
+          this.size += newSize;
+          this.length += entries.size();
           // TODO do we need this for other write request (close, sync)
           updateLastAppliedTermIndex(entry.getTerm(), index);
       }
index 4a044c6..00b6e2a 100644 (file)
@@ -75,6 +75,14 @@ public class LogServiceProtoUtil {
     return builder.build();
   }
 
+  public static LogServiceRequestProto toGetSizeRequestProto(LogName name) {
+    LogNameProto logNameProto =
+        LogNameProto.newBuilder().setName(name.getName()).build();
+    GetLogSizeRequestProto getLogSize = GetLogSizeRequestProto.newBuilder()
+        .setLogName(logNameProto).build();
+    return LogServiceRequestProto.newBuilder().setSizeRequest(getLogSize).build();
+  }
+
   public static LogServiceRequestProto toGetLengthRequestProto(LogName name) {
     LogNameProto logNameProto =
         LogNameProto.newBuilder().setName(name.getName()).build();
@@ -165,6 +173,16 @@ public class LogServiceProtoUtil {
     return builder.build();
   }
 
+  public static GetLogSizeReplyProto toGetLogSizeReplyProto(long size, Throwable t) {
+    GetLogSizeReplyProto.Builder builder = GetLogSizeReplyProto.newBuilder();
+    if (t != null) {
+      builder.setException(toLogException(t));
+    } else {
+      builder.setSize(size);
+    }
+    return builder.build();
+  }
+
   public static GetLogStartIndexReplyProto toGetLogStartIndexReplyProto(long length, Throwable t) {
     GetLogStartIndexReplyProto.Builder builder = GetLogStartIndexReplyProto.newBuilder();
     if (t != null) {
index da15aad..2dc4a32 100644 (file)
@@ -112,6 +112,17 @@ message GetLogLengthReplyProto {
        LogServiceException exception = 2;
 }
 
+// Get the size of a log (in bytes) request
+message GetLogSizeRequestProto {
+    LogNameProto logName = 1;
+}
+
+// Get the size of a log (in bytes) reply
+message GetLogSizeReplyProto {
+    uint64 size = 1;
+    LogServiceException exception = 2;
+}
+
 message GetLogStartIndexRequestProto {
        LogNameProto logName  = 1;
 }
@@ -142,6 +153,7 @@ message LogServiceRequestProto {
        AppendLogEntryRequestProto appendRequest = 6;
        SyncLogRequestProto          syncRequest = 7;
        GetLogLastCommittedIndexRequestProto lastIndexQuery = 8;
+       GetLogSizeRequestProto sizeRequest = 9;
   }
 }
 
index fd71311..8c08f44 100644 (file)
@@ -76,6 +76,7 @@ public abstract class LogServiceReadWriteBase<CLUSTER extends MiniRaftCluster>
       assertEquals("log1", logStream.getName().getName());
       assertEquals(State.OPEN, logStream.getState());
       assertEquals(0, logStream.getSize());
+      assertEquals(0, logStream.getLength());
 
       LogReader reader = logStream.createReader();
       LogWriter writer = logStream.createWriter();
@@ -88,10 +89,9 @@ public abstract class LogServiceReadWriteBase<CLUSTER extends MiniRaftCluster>
       List<ByteBuffer> records = TestUtils.getRandomData(100, 10);
       long id = writer.write(records);
       LOG.info("id {}", id);
-      // Check log size
-      long size = logStream.getSize();
-      assertEquals(10 * 100, size);
-      LOG.info("size {}", size);
+      // Check log size and length
+      assertEquals(10 * 100, logStream.getSize());
+      assertEquals(10, logStream.getLength());
 
       // Check last record id
       long lastId2 = logStream.getLastRecordId();
@@ -100,7 +100,7 @@ public abstract class LogServiceReadWriteBase<CLUSTER extends MiniRaftCluster>
       // Check first record id
       long startId = logStream.getStartRecordId();
       LOG.info("start id {}", startId);
-      //
+
       reader.seek(lastId + 1);
       // Read records back
       List<ByteBuffer> data = reader.readBulk(1);