JIRA-1212
authorMaja Kabiljo <majakabiljo@fb.com>
Tue, 27 Nov 2018 20:04:30 +0000 (12:04 -0800)
committerMaja Kabiljo <majakabiljo@fb.com>
Tue, 27 Nov 2018 20:04:30 +0000 (12:04 -0800)
closes #94

giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
giraph-core/src/main/java/org/apache/giraph/job/DefaultJobProgressTrackerService.java
giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java

index 17c48a5..832e8b6 100644 (file)
@@ -671,6 +671,16 @@ public class GiraphConfiguration extends Configuration
   }
 
   /**
+   * How many mappers is job asking for, taking into account whether master
+   * is running on the same mapper as worker or not
+   *
+   * @return How many mappers is job asking for
+   */
+  public final int getMaxMappers() {
+    return getMaxWorkers() + (SPLIT_MASTER_WORKER.get(this) ? 1 : 0);
+  }
+
+  /**
    * Utilize an existing ZooKeeper service.  If this is not set, ZooKeeper
    * will be dynamically started by Giraph for this job.
    *
index 243ab81..a1e7f12 100644 (file)
@@ -93,7 +93,7 @@ public class DefaultJobProgressTrackerService
             MAX_ALLOWED_TIME_WITHOUT_PROGRESS_MS.get(conf);
         CombinedWorkerProgress lastProgress = null;
         while (!finished) {
-          if (mappersStarted == conf.getMaxWorkers() + 1 &&
+          if (mappersStarted == conf.getMaxMappers() &&
               !workerProgresses.isEmpty()) {
             // Combine and log
             CombinedWorkerProgress combinedWorkerProgress =
@@ -202,7 +202,7 @@ public class DefaultJobProgressTrackerService
   public synchronized void mapperStarted() {
     mappersStarted++;
     if (LOG.isInfoEnabled()) {
-      if (mappersStarted == conf.getMaxWorkers() + 1) {
+      if (mappersStarted == conf.getMaxMappers()) {
         LOG.info("Got all " + mappersStarted + " mappers");
         jobGotAllMappers();
       } else {
@@ -210,7 +210,7 @@ public class DefaultJobProgressTrackerService
             UPDATE_MILLISECONDS) {
           lastTimeMappersStartedLogged = System.currentTimeMillis();
           LOG.info("Got " + mappersStarted + " but needs " +
-              (conf.getMaxWorkers() + 1) + " mappers");
+              conf.getMaxMappers() + " mappers");
         }
       }
     }
index 8e4e0b8..0f6a3a0 100644 (file)
@@ -34,6 +34,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.TreeMap;
 
+import static org.apache.giraph.conf.GiraphConstants.SPLIT_MASTER_WORKER;
 import static org.apache.giraph.conf.GiraphConstants.USE_SUPERSTEP_COUNTERS;
 
 /**
@@ -58,6 +59,8 @@ public class MasterThread<I extends WritableComparable, V extends Writable,
   private final Context context;
   /** Use superstep counters? */
   private final boolean superstepCounterOn;
+  /** Are master and worker split or not? */
+  private final boolean splitMasterWorker;
   /** Setup seconds */
   private double setupSecs = 0d;
   /** Superstep timer (in seconds) map */
@@ -78,6 +81,7 @@ public class MasterThread<I extends WritableComparable, V extends Writable,
     this.context = context;
     GiraphTimers.init(context);
     superstepCounterOn = USE_SUPERSTEP_COUNTERS.get(context.getConfiguration());
+    splitMasterWorker = SPLIT_MASTER_WORKER.get(context.getConfiguration());
   }
 
   /**
@@ -118,7 +122,10 @@ public class MasterThread<I extends WritableComparable, V extends Writable,
           while (!superstepState.isExecutionComplete()) {
             long startSuperstepMillis = System.currentTimeMillis();
             long cachedSuperstep = bspServiceMaster.getSuperstep();
-            GiraphMetrics.get().resetSuperstepMetrics(cachedSuperstep);
+            // If master and worker are running together, worker will call reset
+            if (splitMasterWorker) {
+              GiraphMetrics.get().resetSuperstepMetrics(cachedSuperstep);
+            }
             Class<? extends Computation> computationClass =
                 bspServiceMaster.getMasterCompute().getComputation();
             superstepState = bspServiceMaster.coordinateSuperstep();