RATIS-470. De-couple the LogService "index" from the RaftLog "index"
authorJosh Elser <elserj@apache.org>
Wed, 9 Jan 2019 16:22:13 +0000 (11:22 -0500)
committerJosh Elser <elserj@apache.org>
Wed, 30 Jan 2019 18:37:10 +0000 (13:37 -0500)
We've been using the Raft log index as the "record offset" that we expose
to users via the LogService API. However, because one message that we push
to the Raft log may contain many LogService records, this generates
incorrect results.

We have to "hide" the Raft log index, and identify just the LogService records
so that we're exposing only their Log's information (e.g. none of the interal
Raft quorum configuration entries).

Closes #7

ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogWriter.java
ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogWriterImpl.java
ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java
ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceProtoUtil.java
ratis-logservice/src/test/java/org/apache/ratis/logservice/LogServiceReadWriteBase.java
ratis-logservice/src/test/java/org/apache/ratis/logservice/server/TestMetaServer.java

index 5a51a3c..52b487a 100644 (file)
@@ -42,12 +42,7 @@ public interface LogWriter extends AutoCloseable {
    * @param records Records to append
    * @return The largest recordId assigned to the records written
    */
-  default long write(List<ByteBuffer> records) throws IOException {
-    for (ByteBuffer record : records) {
-      write(record);
-    }
-    return records.size();
-  }
+  long write(List<ByteBuffer> records) throws IOException;
 
   /**
    * Guarantees that all previous data appended by {@link #write(ByteBuffer)} are persisted
index e7a7d4a..e408d1e 100644 (file)
@@ -62,27 +62,25 @@ public class LogWriterImpl implements LogWriter {
     List<ByteBuffer> list = new ArrayList<ByteBuffer>();
     list.add(data);
      return write(list);
-   }
-
-   @Override
-   public long write(List<ByteBuffer> list) throws IOException {
+  }
 
-     try {
-       RaftClientReply reply = raftClient.send(Message.valueOf(
-         LogServiceProtoUtil.toAppendBBEntryLogRequestProto(parent.getName(), list).toByteString()));
-       AppendLogEntryReplyProto proto =
-           AppendLogEntryReplyProto.parseFrom(reply.getMessage().getContent());
-       if (proto.hasException()) {
-         LogServiceException e = proto.getException();
-         throw new IOException(e.getErrorMsg());
-       }
-       List<Long> ids = proto.getRecordIdList();
-       // The above call Always returns one id (regardless of a batch size)
-       return ids.get(0);
-     } catch (Exception e) {
-       throw new IOException(e);
-   }
- }
+  @Override
+  public long write(List<ByteBuffer> list) throws IOException {
+    try {
+      RaftClientReply reply = raftClient.send(
+          Message.valueOf(LogServiceProtoUtil.toAppendBBEntryLogRequestProto(parent.getName(), list).toByteString()));
+      AppendLogEntryReplyProto proto = AppendLogEntryReplyProto.parseFrom(reply.getMessage().getContent());
+      if (proto.hasException()) {
+        LogServiceException e = proto.getException();
+        throw new IOException(e.getErrorMsg());
+      }
+      List<Long> ids = proto.getRecordIdList();
+      // The above call Always returns one id (regardless of a batch size)
+      return ids.get(0);
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
 
  @Override
  public long sync() throws IOException {
index 900aac1..a8aecd8 100644 (file)
@@ -31,7 +31,15 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.ratis.logservice.api.LogName;
-import org.apache.ratis.logservice.proto.LogServiceProtos.*;
+import org.apache.ratis.logservice.proto.LogServiceProtos.AppendLogEntryRequestProto;
+import org.apache.ratis.logservice.proto.LogServiceProtos.CloseLogReplyProto;
+import org.apache.ratis.logservice.proto.LogServiceProtos.CloseLogRequestProto;
+import org.apache.ratis.logservice.proto.LogServiceProtos.GetLogLengthRequestProto;
+import org.apache.ratis.logservice.proto.LogServiceProtos.GetLogSizeRequestProto;
+import org.apache.ratis.logservice.proto.LogServiceProtos.GetStateRequestProto;
+import org.apache.ratis.logservice.proto.LogServiceProtos.LogServiceRequestProto;
+import org.apache.ratis.logservice.proto.LogServiceProtos.LogServiceRequestProto.RequestCase;
+import org.apache.ratis.logservice.proto.LogServiceProtos.ReadLogRequestProto;
 import org.apache.ratis.logservice.util.LogServiceProtoUtil;
 import org.apache.ratis.proto.RaftProtos;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
@@ -50,6 +58,8 @@ import org.apache.ratis.statemachine.TransactionContext;
 import org.apache.ratis.statemachine.impl.BaseStateMachine;
 import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
 import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
+import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.ratis.thirdparty.com.google.protobuf.TextFormat;
 import org.apache.ratis.util.AutoCloseableLock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -64,8 +74,13 @@ public class LogStateMachine extends BaseStateMachine {
   /*
    *  State is a log's length, size, and state (closed/open);
    */
-  private long size;
   private long length;
+
+  /**
+   * The size (number of bytes) of the log records. Does not include Ratis storage overhead
+   */
+  private long dataRecordsSize;
+
   private State state = State.OPEN;
 
   private final SimpleStateMachineStorage storage = new SimpleStateMachineStorage();
@@ -91,7 +106,7 @@ public class LogStateMachine extends BaseStateMachine {
    */
   void reset() {
     this.length = 0;
-    this.size = 0;
+    this.dataRecordsSize = 0;
     setLastAppliedTermIndex(null);
   }
 
@@ -132,7 +147,7 @@ public class LogStateMachine extends BaseStateMachine {
         final ObjectOutputStream out = new ObjectOutputStream(
         new BufferedOutputStream(new FileOutputStream(snapshotFile)))) {
       out.writeLong(length);
-      out.writeLong(size);
+      out.writeLong(dataRecordsSize);
       out.writeObject(state);
     } catch(IOException ioe) {
       LOG.warn("Failed to write snapshot file \"" + snapshotFile
@@ -166,7 +181,7 @@ public class LogStateMachine extends BaseStateMachine {
       }
       setLastAppliedTermIndex(last);
       this.length = in.readLong();
-      this.size = in.readLong();
+      this.dataRecordsSize = in.readLong();
       this.state = (State) in.readObject();
     } catch (ClassNotFoundException e) {
       throw new IllegalStateException(e);
@@ -187,6 +202,9 @@ public class LogStateMachine extends BaseStateMachine {
       checkInitialization();
       LogServiceRequestProto logServiceRequestProto =
           LogServiceRequestProto.parseFrom(request.getContent());
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Processing LogService query: {}", TextFormat.shortDebugString(logServiceRequestProto));
+      }
 
       switch (logServiceRequestProto.getRequestCase()) {
 
@@ -253,9 +271,9 @@ public class LogStateMachine extends BaseStateMachine {
   private CompletableFuture<Message> processGetSizeRequest(LogServiceRequestProto proto) {
     GetLogSizeRequestProto msgProto = proto.getSizeRequest();
     Throwable t = verifyState(State.OPEN);
-    LOG.debug("QUERY: {}, RESULT: {}", msgProto, this.size);
+    LOG.debug("QUERY: {}, RESULT: {}", msgProto, this.dataRecordsSize);
     return CompletableFuture.completedFuture(Message
-      .valueOf(LogServiceProtoUtil.toGetLogSizeReplyProto(this.size, t).toByteString()));
+      .valueOf(LogServiceProtoUtil.toGetLogSizeReplyProto(this.dataRecordsSize, t).toByteString()));
   }
 
   private CompletableFuture<Message> processGetLengthRequest(LogServiceRequestProto proto) {
@@ -274,26 +292,88 @@ public class LogStateMachine extends BaseStateMachine {
 
     ReadLogRequestProto msgProto = proto.getReadNextQuery();
     long startRecordId = msgProto.getStartRecordId();
-    int num = msgProto.getNumRecords();
+    int numRecordsToRead = msgProto.getNumRecords();
     Throwable t = verifyState(State.OPEN);
     List<byte[]> list = new ArrayList<byte[]>();
     LOG.info("Start Index: {}", startRecordId);
-    LOG.info("Total to read: {}", num);
+    LOG.info("Total to read: {}", numRecordsToRead);
+    long raftLogIndex = log.getStartIndex();
+    if (t == null) {
+      // Seek to first entry
+      long logServiceIndex = 0;
+      while (logServiceIndex < startRecordId) {
+        try {
+          LogEntryProto entry = log.get(raftLogIndex);
+          // Skip "meta" entries
+          if (entry == null || entry.hasConfigurationEntry()) {
+            raftLogIndex++;
+            continue;
+          }
+
+          LogServiceRequestProto logServiceProto =
+              LogServiceRequestProto.parseFrom(entry.getStateMachineLogEntry().getLogData());
+          // TODO is it possible to get LogService messages that aren't appends?
+          if (RequestCase.APPENDREQUEST != logServiceProto.getRequestCase()) {
+            raftLogIndex++;
+            continue;
+          }
+
+          AppendLogEntryRequestProto append = logServiceProto.getAppendRequest();
+          int numRecordsInAppend = append.getDataCount();
+          if (logServiceIndex + numRecordsInAppend > startRecordId) {
+            // The starting record is within this raft log entry.
+            break;
+          }
+          // We didn't find the log record, increment the logService record counter
+          logServiceIndex += numRecordsInAppend;
+          // And increment the raft log index
+          raftLogIndex++;
+        } catch (RaftLogIOException e) {
+          t = e;
+          list = null;
+          break;
+        } catch (InvalidProtocolBufferException e) {
+          LOG.error("Failed to read LogService protobuf from Raft log", e);
+          t = e;
+          list = null;
+          break;
+        }
+      }
+    }
+    LOG.debug("Starting to read {} logservice records starting at raft log index {}", numRecordsToRead, raftLogIndex);
     if (t == null) {
-      for (long index = startRecordId; index < startRecordId + num; index++) {
+      // Make sure we don't read off the end of the Raft log
+      for (long index = raftLogIndex; index < log.getLastCommittedIndex(); index++) {
         try {
           LogEntryProto entry = log.get(index);
-          LOG.info("Index: {} Entry: {}", index, entry);
+          LOG.trace("Index: {} Entry: {}", index, entry);
           if (entry == null || entry.hasConfigurationEntry()) {
             continue;
           }
-          //TODO: how to distinguish log records from
-          // DML commands logged by the service?
-          list.add(entry.getStateMachineLogEntry().getLogData().toByteArray());
+
+          LogServiceRequestProto logServiceProto =
+              LogServiceRequestProto.parseFrom(entry.getStateMachineLogEntry().getLogData());
+          // TODO is it possible to get LogService messages that aren't appends?
+          if (RequestCase.APPENDREQUEST != logServiceProto.getRequestCase()) {
+            continue;
+          }
+
+          AppendLogEntryRequestProto append = logServiceProto.getAppendRequest();
+          for (int i = 0; i < append.getDataCount() && list.size() < numRecordsToRead; i++) {
+            list.add(append.getData(i).toByteArray());
+          }
+          if (list.size() == numRecordsToRead) {
+            break;
+          }
         } catch (RaftLogIOException e) {
           t = e;
           list = null;
           break;
+        } catch (InvalidProtocolBufferException e) {
+          LOG.error("Failed to read LogService protobuf from Raft log", e);
+          t = e;
+          list = null;
+          break;
         }
       }
     }
@@ -330,7 +410,7 @@ public class LogStateMachine extends BaseStateMachine {
           for (byte[] bb : entries) {
             newSize += bb.length;
           }
-          this.size += newSize;
+          this.dataRecordsSize += newSize;
           this.length += entries.size();
           // TODO do we need this for other write request (close, sync)
           updateLastAppliedTermIndex(entry.getTerm(), index);
@@ -342,9 +422,9 @@ public class LogStateMachine extends BaseStateMachine {
         CompletableFuture.completedFuture(
           Message.valueOf(LogServiceProtoUtil.toAppendLogReplyProto(ids, t).toByteString()));
     final RaftProtos.RaftPeerRole role = trx.getServerRole();
-    LOG.debug("{}:{}-{}: {} new length {}", role, getId(), index, proto, length);
+    LOG.debug("{}:{}-{}: {} new length {}", role, getId(), index, proto, dataRecordsSize);
     if (LOG.isTraceEnabled()) {
-      LOG.trace("{}-{}: variables={}", getId(), index, length);
+      LOG.trace("{}-{}: variables={}", getId(), index, dataRecordsSize);
     }
     return f;
   }
index 00b6e2a..2cc4c58 100644 (file)
@@ -145,7 +145,12 @@ public class LogServiceProtoUtil {
     AppendLogEntryRequestProto.Builder builder = AppendLogEntryRequestProto.newBuilder();
     builder.setLogName(logNameProto);
     for (int i=0; i < entries.size(); i++) {
-      builder.addData(ByteString.copyFrom(entries.get(i)));
+      ByteBuffer currentBuf = entries.get(i);
+      // Save the current position
+      int pos = currentBuf.position();
+      builder.addData(ByteString.copyFrom(currentBuf));
+      // Reset it after we're done reading the bytes
+      currentBuf.position(pos);
     }
     return LogServiceRequestProto.newBuilder().setAppendRequest(builder.build()).build();
   }
index 8c08f44..047a9ee 100644 (file)
 package org.apache.ratis.logservice;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Iterator;
 import java.util.List;
 
 import org.apache.ratis.BaseTest;
@@ -101,10 +104,58 @@ public abstract class LogServiceReadWriteBase<CLUSTER extends MiniRaftCluster>
       long startId = logStream.getStartRecordId();
       LOG.info("start id {}", startId);
 
-      reader.seek(lastId + 1);
+      reader.seek(startId);
       // Read records back
-      List<ByteBuffer> data = reader.readBulk(1);
-      assertEquals(1, data.size());
+      List<ByteBuffer> data = reader.readBulk(records.size());
+      assertEquals(records.size(), data.size());
+
+      // Make sure we got the same 10 records that we wrote.
+      Iterator<ByteBuffer> expectedIter = records.iterator();
+      Iterator<ByteBuffer> actualIter = data.iterator();
+      while (expectedIter.hasNext() && actualIter.hasNext()) {
+        ByteBuffer expected = expectedIter.next();
+        ByteBuffer actual = actualIter.next();
+        assertEquals(expected, actual);
+      }
+    }
+  }
+
+  @Test
+  public void testSeeking() throws Exception {
+    final RaftClient raftClient =
+        RaftClient.newBuilder().setProperties(getProperties()).setRaftGroup(cluster.getGroup())
+            .build();
+    final LogName logName = LogName.of("log1");
+    final int numRecords = 100;
+    // TODO need API to circumvent metadata service for testing
+    try (LogStream logStream = new LogStreamImpl(logName, raftClient)) {
+      try (LogWriter writer = logStream.createWriter()) {
+        LOG.info("Writing {} records", numRecords);
+        // Write records 0 through 99 (inclusive)
+        for (int i = 0; i < numRecords; i++) {
+          writer.write(ByteBuffer.wrap(Integer.toString(i).getBytes(StandardCharsets.UTF_8)));
+        }
+      }
+
+      LOG.debug("Seek and read'ing records");
+      try (LogReader reader = logStream.createReader()) {
+        for (int i = 9; i < numRecords; i += 10) {
+          LOG.info("Seeking to {}", i);
+          reader.seek(i);
+          LOG.info("Reading one record");
+          ByteBuffer bb = reader.readNext();
+          assertEquals(i, fromBytes(bb));
+        }
+
+        assertTrue("We're expecting at least two records were written", numRecords > 1);
+        for (int i = numRecords - 2; i >= 0; i -= 6) {
+          LOG.info("Seeking to {}", i);
+          reader.seek(i);
+          LOG.info("Reading one record");
+          ByteBuffer bb = reader.readNext();
+          assertEquals(i, fromBytes(bb));
+        }
+      }
     }
   }
 
@@ -112,4 +163,10 @@ public abstract class LogServiceReadWriteBase<CLUSTER extends MiniRaftCluster>
   public void tearDown() {
     cluster.shutdown();
   }
+
+  private int fromBytes(ByteBuffer bb) {
+    byte[] bytes = new byte[bb.remaining()];
+    System.arraycopy(bb.array(), bb.arrayOffset(), bytes, 0, bb.remaining());
+    return Integer.parseInt(new String(bytes, StandardCharsets.UTF_8));
+  }
 }
index cbdd2b5..47aa60a 100644 (file)
@@ -36,7 +36,9 @@ import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.stream.IntStream;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.fail;
 
 
@@ -95,8 +97,7 @@ public class TestMetaServer {
         }
 //        assert(stream.getSize() > 0); //TODO: Doesn't work
         LogReader reader = stream.createReader();
-        ByteBuffer res = reader.readNext(); //TODO: first is conf log entry
-        res = reader.readNext();
+        ByteBuffer res = reader.readNext();
         assert(res.array().length > 0);
     }