RATIS-462. Add readStateMachineData api for FileStoreStateMachine. Contributed by...
authorMukul Kumar Singh <msingh@apache.org>
Thu, 20 Dec 2018 16:30:38 +0000 (22:00 +0530)
committerMukul Kumar Singh <msingh@apache.org>
Thu, 20 Dec 2018 16:30:38 +0000 (22:00 +0530)
ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
ratis-proto/src/main/proto/Examples.proto

index c671dee..a7c9fee 100644 (file)
@@ -155,6 +155,7 @@ public class FileStoreClient implements Closeable {
     final WriteRequestHeaderProto.Builder header = WriteRequestHeaderProto.newBuilder()
         .setPath(ProtoUtils.toByteString(path))
         .setOffset(offset)
+        .setLength(data.position())
         .setClose(close);
 
     final WriteRequestProto.Builder write = WriteRequestProto.newBuilder()
index 73f6d93..921b1bf 100644 (file)
@@ -19,6 +19,7 @@ package org.apache.ratis.examples.filestore;
 
 import org.apache.ratis.conf.ConfUtils;
 import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.proto.ExamplesProtos;
 import org.apache.ratis.proto.ExamplesProtos.DeleteReplyProto;
 import org.apache.ratis.proto.ExamplesProtos.DeleteRequestProto;
 import org.apache.ratis.proto.ExamplesProtos.FileStoreRequestProto;
@@ -131,6 +132,28 @@ public class FileStoreStateMachine extends BaseStateMachine {
   }
 
   @Override
+  public CompletableFuture<ByteString> readStateMachineData(LogEntryProto entry) {
+    final StateMachineLogEntryProto smLog = entry.getStateMachineLogEntry();
+    final ByteString data = smLog.getLogData();
+    final FileStoreRequestProto proto;
+    try {
+      proto = FileStoreRequestProto.parseFrom(data);
+    } catch (InvalidProtocolBufferException e) {
+      return FileStoreCommon.completeExceptionally(
+          entry.getIndex(), "Failed to parse data, entry=" + entry, e);
+    }
+    if (proto.getRequestCase() != FileStoreRequestProto.RequestCase.WRITEHEADER) {
+      return null;
+    }
+
+    final WriteRequestHeaderProto h = proto.getWriteHeader();
+    CompletableFuture<ExamplesProtos.ReadReplyProto> reply =
+        files.read(h.getPath().toStringUtf8(), h.getOffset(), h.getLength());
+
+    return reply.thenApply(ExamplesProtos.ReadReplyProto::getData);
+  }
+
+  @Override
   public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
     final LogEntryProto entry = trx.getLogEntry();
 
index c2e2500..407fa27 100644 (file)
@@ -39,6 +39,7 @@ message WriteRequestHeaderProto {
   bytes path = 1;
   bool close = 2; // close the file after write?
   uint64 offset = 3;
+  uint64 length = 4;
 }
 
 message WriteRequestProto {