GIRAPH-1188
authorYuksel Akinci <yuksela@fb.com>
Tue, 15 May 2018 05:45:14 +0000 (22:45 -0700)
committerDionysios Logothetis <dionysios@fb.com>
Tue, 15 May 2018 05:45:14 +0000 (22:45 -0700)
closes #70

giraph-block-app-8/src/test/java/org/apache/giraph/block_app/framework/no_vtx/MessagesWithoutVerticesTest.java
giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
giraph-core/src/main/java/org/apache/giraph/writable/kryo/Boxed.java [new file with mode: 0644]
giraph-core/src/main/java/org/apache/giraph/writable/kryo/GiraphClassResolver.java [new file with mode: 0644]
giraph-core/src/main/java/org/apache/giraph/writable/kryo/HadoopKryo.java
giraph-core/src/main/java/org/apache/giraph/writable/kryo/KryoSimpleWrapper.java
giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperExt.java
giraph-examples/src/main/java/org/apache/giraph/examples/PageRankWithKryoSimpleWritable.java [new file with mode: 0644]
giraph-examples/src/test/java/org/apache/giraph/examples/TestKryoPageRank.java [new file with mode: 0644]

index bf3e194..dcdb002 100644 (file)
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.giraph.block_app.framework.no_vtx;
 
 import java.util.Iterator;
index c3fd141..632a1e6 100644 (file)
@@ -25,6 +25,7 @@ import org.apache.giraph.job.JobProgressTracker;
 import org.apache.giraph.partition.GraphPartitionerFactory;
 import org.apache.giraph.utils.CheckpointingUtils;
 import org.apache.giraph.worker.WorkerInfo;
+import org.apache.giraph.writable.kryo.GiraphClassResolver;
 import org.apache.giraph.zk.BspEvent;
 import org.apache.giraph.zk.PredicateLock;
 import org.apache.giraph.zk.ZooKeeperExt;
@@ -81,7 +82,9 @@ public abstract class BspService<I extends WritableComparable,
   /** Input splits all done node*/
   public static final String INPUT_SPLITS_ALL_DONE_NODE =
       "/_inputSplitsAllDone";
-
+  /** Directory to store kryo className-ID assignment */
+  public static final String KRYO_REGISTERED_CLASS_DIR =
+          "/_kryo";
   /** Directory of attempts of this application */
   public static final String APPLICATION_ATTEMPTS_DIR =
       "/_applicationAttemptsDir";
@@ -155,6 +158,8 @@ public abstract class BspService<I extends WritableComparable,
   protected final String haltComputationPath;
   /** Path where memory observer stores data */
   protected final String memoryObserverPath;
+  /** Kryo className-ID mapping directory */
+  protected final String kryoRegisteredClassPath;
   /** Private ZooKeeper instance that implements the service */
   private final ZooKeeperExt zk;
   /** Has the Connection occurred? */
@@ -250,7 +255,7 @@ public abstract class BspService<I extends WritableComparable,
     inputSplitsAllDonePath = basePath + INPUT_SPLITS_ALL_DONE_NODE;
     applicationAttemptsPath = basePath + APPLICATION_ATTEMPTS_DIR;
     cleanedUpPath = basePath + CLEANED_UP_DIR;
-
+    kryoRegisteredClassPath = basePath + KRYO_REGISTERED_CLASS_DIR;
 
 
     String restartJobId = RESTART_JOB_ID.get(conf);
@@ -289,6 +294,11 @@ public abstract class BspService<I extends WritableComparable,
       throw new RuntimeException(e);
     }
 
+    boolean disableGiraphResolver =
+            GiraphConstants.DISABLE_GIRAPH_CLASS_RESOLVER.get(conf);
+    if (!disableGiraphResolver) {
+      GiraphClassResolver.setZookeeperInfo(zk, kryoRegisteredClassPath);
+    }
     this.taskId = (int) getApplicationAttempt() * conf.getMaxWorkers() +
             conf.getTaskPartition();
     this.hostnameTaskId = hostname + "_" + getTaskId();
index db13670..4c02fff 100644 (file)
@@ -1292,5 +1292,15 @@ public interface GiraphConstants {
   /** Number of supersteps job will run for */
   IntConfOption SUPERSTEP_COUNT = new IntConfOption("giraph.numSupersteps", -1,
       "Number of supersteps job will run for");
+
+  /** Whether to disable GiraphClassResolver which is an efficient
+   * implementation of kryo class resolver. By default this resolver is used by
+   * KryoSimpleWritable and KryoSimpleWrapper, and can be disabled with this
+   * option */
+  BooleanConfOption DISABLE_GIRAPH_CLASS_RESOLVER =
+          new BooleanConfOption("giraph.disableGiraphClassResolver", false,
+            "Disables GiraphClassResolver, which is a custom implementation " +
+            "of kryo class resolver that avoids writing class names to the " +
+            "underlying stream for faster serialization.");
 }
 // CHECKSTYLE: resume InterfaceIsTypeCheck
diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/Boxed.java b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/Boxed.java
new file mode 100644 (file)
index 0000000..087f0dd
--- /dev/null
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.writable.kryo;
+
+/**
+ * Boxed interface
+ * @param <T>
+ */
+public interface Boxed<T> {
+  /**
+   * Gets the boxed value.
+   * @return Boxed object.
+   */
+  T get();
+
+  /**
+   * Sets the boxed value.
+   * @param value Value
+   */
+  void set(T value);
+}
diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/GiraphClassResolver.java b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/GiraphClassResolver.java
new file mode 100644 (file)
index 0000000..80e7011
--- /dev/null
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.writable.kryo;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.KryoException;
+import com.esotericsoftware.kryo.Registration;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.esotericsoftware.kryo.util.DefaultClassResolver;
+import com.esotericsoftware.kryo.util.ObjectMap;
+import org.apache.giraph.zk.ZooKeeperExt;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.List;
+
+import static com.esotericsoftware.kryo.util.Util.getWrapperClass;
+
+/**
+ * In order to avoid writing class names to the stream, this class resolver
+ * assigns unique integers to each class name and writes/reads those integers
+ * to/from the stream. Reads assume that there is already a class assigned
+ * to the given integer. This resolver only assigns unique integers for
+ * classes that are not explicitly registered since those classes are already
+ * assigned unique integers at the time of registration. This implementation
+ * uses zookeeper to provide consistent class name to ID mapping across all
+ + nodes.
+ *
+ *
+ * If resolver encounters a class name that has not been assigned to a unique
+ * integer yet, it creates a class node in zookeeper under a designated path
+ * with persistent_sequential mode - allowing the file name of the class node
+ * to be suffixed with an auto incremented integer. After the class node is
+ * created, the resolver reads back all the nodes under the designated path
+ * and uses the unique suffix as the class id. If there are duplicate entries
+ * for the same class name due to some race condition, the lowest suffix is
+ * used.
+ */
+public class GiraphClassResolver extends DefaultClassResolver {
+  /** Base ID to start for class name assignments.
+   * This number has to be high enough to not conflict with
+   * explicity registered class IDs.
+   * */
+  private static final int BASE_CLASS_ID = 1000;
+
+  /** Class logger */
+  private static final Logger LOG =
+          Logger.getLogger(GiraphClassResolver.class);
+
+  /** Class name to ID cache */
+  private static Map<String, Integer> CLASS_NAME_TO_ID = new HashMap();
+  /** ID to class name cache */
+  private static Map<Integer, String> ID_TO_CLASS_NAME = new HashMap();
+  /** Zookeeper */
+  private static ZooKeeperExt ZK;
+  /** Zookeeper path for automatic class registrations */
+  private static String KRYO_REGISTERED_CLASS_PATH;
+  /** Minimum class ID assigned by zookeeper sequencing */
+  private static int MIN_CLASS_ID = -1;
+  /** True if the zookeeper class registration path is already created */
+  private static boolean IS_CLASS_PATH_CREATED = false;
+
+  /** Memoized class id*/
+  private int memoizedClassId = -1;
+  /** Memoized class registration */
+  private Registration memoizedClassIdValue;
+
+  /**
+   * Sets zookeeper informaton.
+   * @param zookeeperExt ZookeeperExt
+   * @param kryoClassPath Zookeeper directory path where class Name-ID
+   *                      mapping is stored.
+   */
+  public static void setZookeeperInfo(ZooKeeperExt zookeeperExt,
+                                      String kryoClassPath) {
+    ZK = zookeeperExt;
+    KRYO_REGISTERED_CLASS_PATH = kryoClassPath;
+  }
+
+  /**
+   * Return true of the zookeeper is initialized.
+   * @return True if the zookeeper is initialized.
+   */
+  public static boolean isInitialized() {
+    return ZK != null;
+  }
+
+  /**
+   * Creates a new node for the given class name.
+   * Creation mode is persistent sequential, i.e.
+   * ZK will always create a new node . There could be
+   * multiple entries for the same class name but since
+   * the lowest index is used, this is not a problem.
+   * @param className Class name
+   */
+  public static void createClassName(String className) {
+    try {
+      String path = KRYO_REGISTERED_CLASS_PATH + "/" + className;
+      ZK.createExt(path,
+              null,
+              ZooDefs.Ids.OPEN_ACL_UNSAFE,
+              CreateMode.PERSISTENT_SEQUENTIAL,
+              true);
+    } catch (KeeperException e) {
+      throw new IllegalStateException(
+              "Failed to create class " + className, e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+              "Interrupted while creating " + className, e);
+    }
+  }
+
+  /**
+   * Refreshes class-ID mapping from zookeeper.
+   * Not thread safe.
+   */
+  public static void refreshCache() {
+    if (!IS_CLASS_PATH_CREATED) {
+      try {
+        ZK.createOnceExt(KRYO_REGISTERED_CLASS_PATH,
+                null,
+                ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT,
+                true);
+        IS_CLASS_PATH_CREATED = true;
+      } catch (KeeperException e) {
+        throw new IllegalStateException(
+                "Failed to refresh kryo cache " +
+                        KRYO_REGISTERED_CLASS_PATH, e);
+      } catch (InterruptedException e) {
+        throw new IllegalStateException(
+                "Interrupted while refreshing kryo cache " +
+                        KRYO_REGISTERED_CLASS_PATH, e);
+      }
+    }
+
+    List<String> registeredList;
+    try {
+      registeredList =
+              ZK.getChildrenExt(KRYO_REGISTERED_CLASS_PATH,
+                      false,
+                      true,
+                      false);
+    } catch (KeeperException e) {
+      throw new IllegalStateException(
+        "Failed to retrieve child nodes for " + KRYO_REGISTERED_CLASS_PATH, e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+        "Interrupted while retrieving child nodes for " +
+                KRYO_REGISTERED_CLASS_PATH, e);
+    }
+
+    for (String name : registeredList) {
+      // Since these files are created with PERSISTENT_SEQUENTIAL mode,
+      // Kryo appends a sequential number to their file name.
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Registered class: " + name);
+      }
+      String className = name.substring(0,
+          name.length() - ZooKeeperExt.SEQUENCE_NUMBER_LENGTH);
+      int classId = Integer.parseInt(
+          name.substring(name.length() - ZooKeeperExt.SEQUENCE_NUMBER_LENGTH));
+
+      if (MIN_CLASS_ID == -1) {
+        MIN_CLASS_ID = classId;
+      }
+
+      int adjustedId = classId - MIN_CLASS_ID + BASE_CLASS_ID;
+      if (CLASS_NAME_TO_ID.putIfAbsent(className, adjustedId) == null) {
+        ID_TO_CLASS_NAME.put(adjustedId, className);
+      }
+    }
+  }
+
+  /**
+   * Gets ID for the given class name.
+   * @param className Class name
+   * @return class id Class ID
+   */
+  public static int getClassId(String className) {
+    if (CLASS_NAME_TO_ID.containsKey(className)) {
+      return CLASS_NAME_TO_ID.get(className);
+    }
+    synchronized (GiraphClassResolver.class) {
+      if (CLASS_NAME_TO_ID.containsKey(className)) {
+        return CLASS_NAME_TO_ID.get(className);
+      }
+      refreshCache();
+
+      if (!CLASS_NAME_TO_ID.containsKey(className)) {
+        createClassName(className);
+        refreshCache();
+      }
+    }
+
+    if (!CLASS_NAME_TO_ID.containsKey(className)) {
+      throw new IllegalStateException("Failed to assigned id to " + className);
+    }
+
+    return CLASS_NAME_TO_ID.get(className);
+  }
+
+  /**
+   * Get class name for given ID.
+   * @param id class ID
+   * @return class name
+   */
+  public static String getClassName(int id) {
+    if (ID_TO_CLASS_NAME.containsKey(id)) {
+      return ID_TO_CLASS_NAME.get(id);
+    }
+    synchronized (GiraphClassResolver.class) {
+      if (ID_TO_CLASS_NAME.containsKey(id)) {
+        return ID_TO_CLASS_NAME.get(id);
+      }
+      refreshCache();
+    }
+
+    if (!ID_TO_CLASS_NAME.containsKey(id)) {
+      throw new IllegalStateException("ID " + id + " doesn't exist");
+    }
+    return ID_TO_CLASS_NAME.get(id);
+  }
+
+  @Override
+  public Registration register(Registration registration) {
+    if (registration == null) {
+      throw new IllegalArgumentException("registration cannot be null");
+    }
+    if (registration.getId() == NAME) {
+      throw new IllegalArgumentException("Invalid registration ID");
+    }
+
+    idToRegistration.put(registration.getId(), registration);
+    classToRegistration.put(registration.getType(), registration);
+    if (registration.getType().isPrimitive()) {
+      classToRegistration.put(getWrapperClass(registration.getType()),
+              registration);
+    }
+    return registration;
+  }
+
+  @Override
+  public Registration registerImplicit(Class type) {
+    return register(new Registration(type, kryo.getDefaultSerializer(type),
+            getClassId(type.getName())));
+  }
+
+  @Override
+  public Registration writeClass(Output output, Class type) {
+    if (type == null) {
+      output.writeVarInt(Kryo.NULL, true);
+      return null;
+    }
+
+    Registration registration = kryo.getRegistration(type);
+    if (registration.getId() == NAME) {
+      throw new IllegalStateException("Invalid registration ID");
+    } else {
+      // Class ID's are incremented by 2 when writing, because 0 is used
+      // for null and 1 is used for non-explicitly registered classes.
+      output.writeVarInt(registration.getId() + 2, true);
+    }
+    return registration;
+  }
+
+  @Override
+  public Registration readClass(Input input) {
+    int classID = input.readVarInt(true);
+    if (classID == Kryo.NULL) {
+      return null;
+    } else if (classID == NAME + 2) {
+      throw new IllegalStateException("Invalid class ID");
+    }
+    if (classID == memoizedClassId) {
+      return memoizedClassIdValue;
+    }
+    Registration registration = idToRegistration.get(classID - 2);
+    if (registration == null) {
+      String className = getClassName(classID - 2);
+      Class type = getTypeByName(className);
+      if (type == null) {
+        try {
+          type = Class.forName(className, false, kryo.getClassLoader());
+        } catch (ClassNotFoundException ex) {
+          throw new KryoException("Unable to find class: " + className, ex);
+        }
+        if (nameToClass == null) {
+          nameToClass = new ObjectMap();
+        }
+        nameToClass.put(className, type);
+      }
+      registration = new Registration(type, kryo.getDefaultSerializer(type),
+              classID - 2);
+      register(registration);
+    }
+    memoizedClassId = classID;
+    memoizedClassIdValue = registration;
+    return registration;
+  }
+
+  /**
+   * Reset the internal state
+   * Reset clears two hash tables:
+   * 1 - Class name to ID: Every non-explicitly registered class takes the
+   *     ID agreed by all kryo instances, and it doesn't change across
+   *     serializations, so this reset is not required.
+   * 2- Reference tracking: Not required because it is disabled.
+   *
+   * Therefore, this method should not be invoked.
+   *
+   */
+  public void reset() {
+    throw new IllegalStateException("Not implemented");
+  }
+
+  /**
+   * This method writes the class name for the first encountered
+   * non-explicitly registered class. Since all non-explicitly registered
+   * classes take the ID agreed by all kryo instances, there is no need
+   * to write the class name, so this method should not be invoked.
+   * @param output Output stream
+   * @param type CLass type
+   * @param registration Registration
+   */
+  @Override
+  protected void writeName(Output output, Class type,
+                            Registration registration) {
+    throw new IllegalStateException("Not implemented");
+  }
+
+  /**
+   * This method reads the class name for the first encountered
+   * non-explicitly registered class. Since all non-explicitly registered
+   * classes take the ID agreed by all kryo instances, class name is
+   * never written, so this method should not be invoked.
+   * @param input Input stream
+   * @return Registration
+   */
+  @Override
+  protected Registration readName(Input input) {
+    throw new IllegalStateException("Not implemented");
+  }
+
+  /**
+   * Get type by class name.
+   * @param className Class name
+   * @return class type
+   */
+  protected Class<?> getTypeByName(final String className) {
+    return nameToClass != null ? nameToClass.get(className) : null;
+  }
+}
index fb1186b..2713316 100644 (file)
@@ -308,8 +308,12 @@ public class HadoopKryo extends Kryo {
     if (trackReferences) {
       kryo = new HadoopKryo();
     } else {
-      // TODO: if trackReferences is false use custom class resolver.
-      kryo = new HadoopKryo(new DefaultClassResolver(),
+      // Only use GiraphClassResolver if it is properly initialized.
+      // This is to enable test cases which use KryoSimpleWrapper
+      // but don't start ZK.
+      kryo = new HadoopKryo(
+              GiraphClassResolver.isInitialized() ? new GiraphClassResolver() :
+                                                    new DefaultClassResolver(),
               new MapReferenceResolver());
     }
 
@@ -406,8 +410,11 @@ public class HadoopKryo extends Kryo {
     if (!trackReferences) {
       kryo.setReferences(false);
 
-      // TODO: Enable the following when a custom class resolver is created.
-      // kryo.setAutoReset(false);
+      // Auto reset can only be disabled if the GiraphClassResolver is
+      // properly initialized.
+      if (GiraphClassResolver.isInitialized()) {
+        kryo.setAutoReset(false);
+      }
     }
     return kryo;
   }
index 9c5de74..3cb291d 100644 (file)
@@ -39,7 +39,7 @@ import org.apache.hadoop.io.Writable;
  *
  * @param <T> Object type
  */
-public class KryoSimpleWrapper<T> implements Writable {
+public class KryoSimpleWrapper<T> implements Writable, Boxed<T> {
 
   /** Wrapped object */
   private T object;
index 1eb4c8b..a20c494 100644 (file)
@@ -42,10 +42,10 @@ import org.apache.zookeeper.ZooKeeper;
  * should be thread-safe.
  */
 public class ZooKeeperExt {
+  /** Length of the ZK sequence number */
+  public static final int SEQUENCE_NUMBER_LENGTH = 10;
   /** Internal logger */
   private static final Logger LOG = Logger.getLogger(ZooKeeperExt.class);
-  /** Length of the ZK sequence number */
-  private static final int SEQUENCE_NUMBER_LENGTH = 10;
   /** Internal ZooKeeper */
   private final ZooKeeper zooKeeper;
   /** Ensure we have progress */
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/PageRankWithKryoSimpleWritable.java b/giraph-examples/src/main/java/org/apache/giraph/examples/PageRankWithKryoSimpleWritable.java
new file mode 100644 (file)
index 0000000..4e9e1ed
--- /dev/null
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import com.google.common.collect.Lists;
+import org.apache.giraph.aggregators.BasicAggregator;
+import org.apache.giraph.aggregators.LongSumAggregator;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.EdgeFactory;
+import org.apache.giraph.graph.BasicComputation;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.VertexReader;
+import org.apache.giraph.io.formats.GeneratedVertexInputFormat;
+import org.apache.giraph.master.DefaultMasterCompute;
+import org.apache.giraph.worker.WorkerContext;
+import org.apache.giraph.writable.kryo.KryoSimpleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.giraph.examples.PageRankWithKryoSimpleWritable.VertexValue;
+import org.apache.giraph.examples.PageRankWithKryoSimpleWritable.MessageValue;
+import org.apache.giraph.examples.PageRankWithKryoSimpleWritable.EdgeValue;
+
+/**
+ * Copy of SimplePageRank, modified to test vertex/edge and
+ * message values that derives from KryoSimpleWritable.
+ */
+@Algorithm(
+        name = "Page rank"
+)
+public class PageRankWithKryoSimpleWritable extends
+        BasicComputation<LongWritable, VertexValue,
+        EdgeValue, MessageValue> {
+  /** Number of supersteps for this test */
+  public static final int MAX_SUPERSTEPS = 30;
+  /** Number of supersteps for this static  3;
+  /** Logger */
+  private static final Logger LOG =
+          Logger.getLogger(PageRankWithKryoSimpleWritable.class);
+  /** Sum aggregator name */
+  private static String SUM_AGG = "sum";
+  /** Min aggregator name */
+  private static String MIN_AGG = "min";
+  /** Max aggregator name */
+  private static String MAX_AGG = "max";
+
+  @Override
+  public void compute(
+          Vertex<LongWritable, VertexValue,
+          EdgeValue> vertex,
+          Iterable<MessageValue> messages) throws IOException {
+    if (getSuperstep() >= 1) {
+      double sum = 0;
+      for (MessageValue message : messages) {
+        sum += message.get();
+      }
+      Double value = (0.15f / getTotalNumVertices()) + 0.85f * sum;
+      VertexValue vertexValue = new VertexValue(value);
+      vertex.setValue(vertexValue);
+      aggregate(MAX_AGG, vertexValue);
+      aggregate(MIN_AGG, vertexValue);
+      aggregate(SUM_AGG, new LongWritable(1));
+      LOG.info(vertex.getId() + ": PageRank=" + vertexValue +
+              " max=" + getAggregatedValue(MAX_AGG) +
+              " min=" + getAggregatedValue(MIN_AGG));
+    }
+
+    if (getSuperstep() < MAX_SUPERSTEPS) {
+      long edges = vertex.getNumEdges();
+      sendMessageToAllEdges(vertex,
+          new MessageValue(vertex.getValue().get() / edges));
+    } else {
+      vertex.voteToHalt();
+    }
+  }
+
+  /**
+   * Worker context used with {@link PageRankWithKryoSimpleWritable}.
+   */
+  public static class PageRankWithKryoWorkerContext extends
+          WorkerContext {
+    /** Final max value for verification for local jobs */
+    private static double FINAL_MAX;
+    /** Final min value for verification for local jobs */
+    private static double FINAL_MIN;
+    /** Final sum value for verification for local jobs */
+    private static long FINAL_SUM;
+
+    public static double getFinalMax() {
+      return FINAL_MAX;
+    }
+
+    public static double getFinalMin() {
+      return FINAL_MIN;
+    }
+
+    public static long getFinalSum() {
+      return FINAL_SUM;
+    }
+
+    @Override
+    public void preApplication()
+            throws InstantiationException, IllegalAccessException {
+    }
+
+    @Override
+    public void postApplication() {
+      FINAL_SUM = this.<LongWritable>getAggregatedValue(SUM_AGG).get();
+      FINAL_MAX = this.<VertexValue>getAggregatedValue(MAX_AGG).get();
+      FINAL_MIN = this.<VertexValue>getAggregatedValue(MIN_AGG).get();
+
+      LOG.info("aggregatedNumVertices=" + FINAL_SUM);
+      LOG.info("aggregatedMaxPageRank=" + FINAL_MAX);
+      LOG.info("aggregatedMinPageRank=" + FINAL_MIN);
+    }
+
+    @Override
+    public void preSuperstep() {
+      if (getSuperstep() >= 3) {
+        LOG.info("aggregatedNumVertices=" +
+                getAggregatedValue(SUM_AGG) +
+                " NumVertices=" + getTotalNumVertices());
+        if (this.<LongWritable>getAggregatedValue(SUM_AGG).get() !=
+                getTotalNumVertices()) {
+          throw new RuntimeException("wrong value of SumAggreg: " +
+                  getAggregatedValue(SUM_AGG) + ", should be: " +
+                  getTotalNumVertices());
+        }
+        VertexValue maxPagerank = getAggregatedValue(MAX_AGG);
+        LOG.info("aggregatedMaxPageRank=" + maxPagerank.get());
+        VertexValue minPagerank = getAggregatedValue(MIN_AGG);
+        LOG.info("aggregatedMinPageRank=" + minPagerank.get());
+      }
+    }
+
+    @Override
+    public void postSuperstep() { }
+  }
+
+  /**
+   * Master compute associated with {@link PageRankWithKryoSimpleWritable}.
+   * It registers required aggregators.
+   */
+  public static class PageRankWithKryoMasterCompute extends
+          DefaultMasterCompute {
+    @Override
+    public void initialize() throws InstantiationException,
+            IllegalAccessException {
+      registerAggregator(SUM_AGG, LongSumAggregator.class);
+      registerPersistentAggregator(MIN_AGG, DoubleMinWrapperAggregator.class);
+      registerPersistentAggregator(MAX_AGG, DoubleMaxWrapperAggregator.class);
+    }
+  }
+
+  /**
+   * Simple VertexReader that supports {@link PageRankWithKryoSimpleWritable}
+   */
+  public static class PageRankWithKryoVertexReader extends
+          GeneratedVertexReader<LongWritable, VertexValue, EdgeValue> {
+    /** Class logger */
+    private static final Logger LOG =
+        Logger.getLogger(
+          PageRankWithKryoSimpleWritable.PageRankWithKryoVertexReader.class);
+
+    @Override
+    public boolean nextVertex() {
+      return totalRecords > recordsRead;
+    }
+
+    @Override
+    public Vertex<LongWritable, VertexValue, EdgeValue>
+    getCurrentVertex() throws IOException {
+      Vertex<LongWritable, VertexValue, EdgeValue> vertex =
+              getConf().createVertex();
+      LongWritable vertexId = new LongWritable(
+              (inputSplit.getSplitIndex() * totalRecords) + recordsRead);
+      VertexValue vertexValue = new VertexValue(vertexId.get() * 10d);
+      long targetVertexId =
+              (vertexId.get() + 1) %
+                      (inputSplit.getNumSplits() * totalRecords);
+      float edgeValue = vertexId.get() * 100f;
+      List<Edge<LongWritable, EdgeValue>> edges = Lists.newLinkedList();
+      edges.add(EdgeFactory.create(new LongWritable(targetVertexId),
+              new EdgeValue(edgeValue)));
+      vertex.initialize(vertexId, vertexValue, edges);
+      ++recordsRead;
+      if (LOG.isInfoEnabled()) {
+        LOG.info("next: Return vertexId=" + vertex.getId().get() +
+          ", vertexValue=" + vertex.getValue() +
+          ", targetVertexId=" + targetVertexId + ", edgeValue=" + edgeValue);
+      }
+      return vertex;
+    }
+  }
+
+  /**
+   *  VertexInputFormat that supports {@link PageRankWithKryoSimpleWritable}
+   */
+  public static class PageRankWithKryoVertexInputFormat extends
+          GeneratedVertexInputFormat<LongWritable, VertexValue, EdgeValue> {
+    @Override
+    public VertexReader<LongWritable, VertexValue,
+            EdgeValue> createVertexReader(InputSplit split,
+                                              TaskAttemptContext context)
+            throws IOException {
+      return new PageRankWithKryoVertexReader();
+    }
+  }
+
+  /**
+   * Creating a custom vertex value class to force kryo to
+   * register with a new ID. Please note that a custom
+   * class containing a double array is not
+   * necessary for the page rank application. It is only
+   * used for testing the scenario of kryo encountering an
+   * unregistered custom class.
+   */
+  public static class VertexValue extends KryoSimpleWritable {
+    /** Storing the value in an array.
+        Double array is an unregistered type
+        hence kryo will assign a unique class id */
+    private double[] ranks;
+
+    /** Constructor */
+    public VertexValue() {
+    }
+
+    /**
+     * Constructor
+     * @param val Vertex value
+     */
+    public VertexValue(Double val) {
+      ranks = new double[1];
+
+      ranks[0] = val;
+    }
+
+    /**
+     * Get vertex value
+     * @return Vertex value
+     */
+    public Double get() {
+      return ranks[0];
+    }
+
+    /**
+     * Set vertex value.
+     * @param val Vertex value
+     */
+    public void set(Double val) {
+      this.ranks[0] = val;
+    }
+  }
+
+  /**
+   * Creating a custom edge value class to force kryo to
+   * register with a new ID. Please note that a custom
+   * class containing a float is not
+   * necessary for the page rank application. It is only
+   * used for testing the scenario of kryo encountering an
+   * unregistered custom class.
+   */
+  public static class EdgeValue extends KryoSimpleWritable {
+    /** Edge value */
+    private Float realValue;
+
+    /** Constructor */
+    public EdgeValue() {
+    }
+    /**
+     * Constructor
+     * @param val Edge value
+     */
+    public EdgeValue(Float val) {
+      realValue = val;
+    }
+
+    /**
+     * Get edge value
+     * @return Edge value
+     */
+    public Float get() {
+      return realValue;
+    }
+
+    /**
+     * Set edge value
+     * @param val Edge value
+     */
+    public void set(Float val) {
+      this.realValue = val;
+    }
+  }
+
+  /**
+   * Creating a custom message value class to force kryo to
+   * register with a new ID. Please note that a custom
+   * class containing a double list is not
+   * necessary for the page rank application. It is only
+   * used for testing the scenario of kryo encountering an
+   * unregistered custom class.
+   */
+  public static class MessageValue extends KryoSimpleWritable {
+    /** Storing the message in a list to test the list type */
+    private List<Double> msgValue;
+
+    /** Constructor */
+    public MessageValue() {
+    }
+
+    /**
+     * Constructor
+     * @param val Message value
+     */
+    public MessageValue(Double val) {
+      msgValue = new ArrayList<>();
+      msgValue.add(val);
+    }
+
+    /**
+     * Get message value
+     * @return Message value
+     */
+    public Double get() {
+      return msgValue.get(0);
+    }
+
+    /**
+     * Set message value
+     * @param val Message value
+     */
+    public void set(Double val) {
+      this.msgValue.set(0, val);
+    }
+  }
+
+
+  /**
+   * Aggregator for getting max double value
+   */
+  public static class DoubleMaxWrapperAggregator extends
+          BasicAggregator<VertexValue> {
+    @Override
+    public void aggregate(VertexValue value) {
+      getAggregatedValue().set(
+                      Math.max(getAggregatedValue().get(), value.get()));
+    }
+
+    @Override
+    public VertexValue createInitialValue() {
+      return new VertexValue(Double.NEGATIVE_INFINITY);
+    }
+  }
+
+  /**
+   * Aggregator for getting min double value.
+   */
+  public static class DoubleMinWrapperAggregator
+          extends BasicAggregator<VertexValue> {
+    @Override
+    public void aggregate(VertexValue value) {
+      getAggregatedValue().set(
+              Math.min(getAggregatedValue().get(), value.get()));
+    }
+
+    @Override
+    public VertexValue createInitialValue() {
+      return  new VertexValue(Double.MAX_VALUE);
+    }
+  }
+
+}
diff --git a/giraph-examples/src/test/java/org/apache/giraph/examples/TestKryoPageRank.java b/giraph-examples/src/test/java/org/apache/giraph/examples/TestKryoPageRank.java
new file mode 100644 (file)
index 0000000..69f5a83
--- /dev/null
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.examples;
+
+import org.apache.giraph.BspCase;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.job.GiraphJob;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test page rank with kryo wrapper
+ */
+public class TestKryoPageRank extends BspCase {
+
+  /**
+   * Constructor
+   */
+  public TestKryoPageRank() {
+    super(TestPageRank.class.getName());
+  }
+
+  @Test
+  public void testKryoPageRank()
+          throws ClassNotFoundException, IOException, InterruptedException {
+    testPageRankWithKryoWrapper(1);
+  }
+
+  @Test
+  public void testKryoPageRankTenThreadsCompute()
+          throws ClassNotFoundException, IOException, InterruptedException {
+    testPageRankWithKryoWrapper(10);
+  }
+
+
+  /**
+   * Testing simple page rank by wrapping vertex value, edge
+   * and message values with kryo wrapper.
+   *
+   * @param numComputeThreads Number of compute threads to use
+   * @throws java.io.IOException
+   * @throws ClassNotFoundException
+   * @throws InterruptedException
+   */
+  private void testPageRankWithKryoWrapper(int numComputeThreads)
+          throws IOException, InterruptedException, ClassNotFoundException {
+    GiraphConfiguration conf = new GiraphConfiguration();
+    conf.setComputationClass(PageRankWithKryoSimpleWritable.class);
+    conf.setVertexInputFormatClass(
+            PageRankWithKryoSimpleWritable.PageRankWithKryoVertexInputFormat.class);
+    conf.setWorkerContextClass(
+            PageRankWithKryoSimpleWritable.PageRankWithKryoWorkerContext.class);
+    conf.setMasterComputeClass(
+            PageRankWithKryoSimpleWritable.PageRankWithKryoMasterCompute.class);
+    conf.setNumComputeThreads(numComputeThreads);
+    // Set enough partitions to generate randomness on the compute side
+    if (numComputeThreads != 1) {
+      GiraphConstants.USER_PARTITION_COUNT.set(conf, numComputeThreads * 5);
+    }
+    GiraphJob job = prepareJob(getCallingMethodName(), conf);
+    assertTrue(job.run(true));
+    if (!runningInDistributedMode()) {
+      double maxPageRank =
+              PageRankWithKryoSimpleWritable.PageRankWithKryoWorkerContext.getFinalMax();
+      double minPageRank =
+              PageRankWithKryoSimpleWritable.PageRankWithKryoWorkerContext.getFinalMin();
+      long numVertices =
+              PageRankWithKryoSimpleWritable.PageRankWithKryoWorkerContext.getFinalSum();
+      System.out.println(getCallingMethodName() + ": maxPageRank=" +
+              maxPageRank + " minPageRank=" +
+              minPageRank + " numVertices=" + numVertices + ", " +
+              " numComputeThreads=" + numComputeThreads);
+      assertEquals(34.03, maxPageRank, 0.001);
+      assertEquals(0.03, minPageRank, 0.00001);
+      assertEquals(5L, numVertices);
+    }
+  }
+}