HBASE-21201 Support to run VerifyReplication MR tool without peerid
authorToshihiro Suzuki <brfrn169@gmail.com>
Thu, 24 Jan 2019 15:24:39 +0000 (00:24 +0900)
committerToshihiro Suzuki <brfrn169@gmail.com>
Sun, 10 Feb 2019 07:15:45 +0000 (16:15 +0900)
Signed-off-by: Josh Elser <elserj@apache.org>
hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplication.java

index d5f8215..4fa0727 100644 (file)
@@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.zookeeper.ZKConfig;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
@@ -96,6 +97,7 @@ public class VerifyReplication extends Configured implements Tool {
   String families = null;
   String delimiter = "";
   String peerId = null;
+  String peerQuorumAddress = null;
   String rowPrefixes = null;
   int sleepMsBeforeReCompare = 0;
   boolean verbose = false;
@@ -385,7 +387,6 @@ public class VerifyReplication extends Configured implements Tool {
     if (!doCommandLine(args)) {
       return null;
     }
-    conf.set(NAME+".peerId", peerId);
     conf.set(NAME+".tableName", tableName);
     conf.setLong(NAME+".startTime", startTime);
     conf.setLong(NAME+".endTime", endTime);
@@ -401,14 +402,23 @@ public class VerifyReplication extends Configured implements Tool {
       conf.set(NAME+".rowPrefixes", rowPrefixes);
     }
 
-    Pair<ReplicationPeerConfig, Configuration> peerConfigPair = getPeerQuorumConfig(conf, peerId);
-    ReplicationPeerConfig peerConfig = peerConfigPair.getFirst();
-    String peerQuorumAddress = peerConfig.getClusterKey();
-    LOG.info("Peer Quorum Address: " + peerQuorumAddress + ", Peer Configuration: " +
+    String peerQuorumAddress;
+    Pair<ReplicationPeerConfig, Configuration> peerConfigPair = null;
+    if (peerId != null) {
+      peerConfigPair = getPeerQuorumConfig(conf, peerId);
+      ReplicationPeerConfig peerConfig = peerConfigPair.getFirst();
+      peerQuorumAddress = peerConfig.getClusterKey();
+      LOG.info("Peer Quorum Address: " + peerQuorumAddress + ", Peer Configuration: " +
         peerConfig.getConfiguration());
-    conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress);
-    HBaseConfiguration.setWithPrefix(conf, PEER_CONFIG_PREFIX,
+      conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress);
+      HBaseConfiguration.setWithPrefix(conf, PEER_CONFIG_PREFIX,
         peerConfig.getConfiguration().entrySet());
+    } else {
+      assert this.peerQuorumAddress != null;
+      peerQuorumAddress = this.peerQuorumAddress;
+      LOG.info("Peer Quorum Address: " + peerQuorumAddress);
+      conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress);
+    }
 
     conf.setInt(NAME + ".versions", versions);
     LOG.info("Number of version: " + versions);
@@ -463,9 +473,13 @@ public class VerifyReplication extends Configured implements Tool {
     } else {
       TableMapReduceUtil.initTableMapperJob(tableName, scan, Verifier.class, null, null, job);
     }
-    Configuration peerClusterConf = peerConfigPair.getSecond();
-    // Obtain the auth token from peer cluster
-    TableMapReduceUtil.initCredentialsForCluster(job, peerClusterConf);
+
+    if (peerId != null) {
+      assert peerConfigPair != null;
+      Configuration peerClusterConf = peerConfigPair.getSecond();
+      // Obtain the auth token from peer cluster
+      TableMapReduceUtil.initCredentialsForCluster(job, peerClusterConf);
+    }
 
     job.setOutputFormatClass(NullOutputFormat.class);
     job.setNumReduceTasks(0);
@@ -610,7 +624,11 @@ public class VerifyReplication extends Configured implements Tool {
         }
 
         if (i == args.length-2) {
-          peerId = cmd;
+          if (isPeerQuorumAddress(cmd)) {
+            peerQuorumAddress = cmd;
+          } else {
+            peerId = cmd;
+          }
         }
 
         if (i == args.length-1) {
@@ -651,6 +669,16 @@ public class VerifyReplication extends Configured implements Tool {
     return true;
   }
 
+  private boolean isPeerQuorumAddress(String cmd) {
+    try {
+      ZKConfig.validateClusterKey(cmd);
+    } catch (IOException e) {
+      // not a quorum address
+      return false;
+    }
+    return true;
+  }
+
   /*
    * @param errorMsg Error message.  Can be null.
    */
@@ -660,8 +688,9 @@ public class VerifyReplication extends Configured implements Tool {
     }
     System.err.println("Usage: verifyrep [--starttime=X]" +
         " [--endtime=Y] [--families=A] [--row-prefixes=B] [--delimiter=] [--recomparesleep=] " +
-        "[--batch=] [--verbose] [--sourceSnapshotName=P] [--sourceSnapshotTmpDir=Q] [--peerSnapshotName=R] "
-            + "[--peerSnapshotTmpDir=S] [--peerFSAddress=T] [--peerHBaseRootAddress=U]  <peerid> <tablename>");
+        "[--batch=] [--verbose] [--sourceSnapshotName=P] [--sourceSnapshotTmpDir=Q] "
+      + "[--peerSnapshotName=R] [--peerSnapshotTmpDir=S] [--peerFSAddress=T] "
+      + "[--peerHBaseRootAddress=U] <peerid|peerQuorumAddress> <tablename>");
     System.err.println();
     System.err.println("Options:");
     System.err.println(" starttime    beginning of the time range");
@@ -686,6 +715,8 @@ public class VerifyReplication extends Configured implements Tool {
     System.err.println();
     System.err.println("Args:");
     System.err.println(" peerid       Id of the peer used for verification, must match the one given for replication");
+    System.err.println(" peerQuorumAddress   quorumAdress of the peer used for verification. The "
+      + "format is zk_quorum:zk_port:zk_hbase_path");
     System.err.println(" tablename    Name of the table to verify");
     System.err.println();
     System.err.println("Examples:");
index e1fda4e..8b52390 100644 (file)
@@ -422,36 +422,24 @@ public class TestVerifyReplication extends TestReplicationBase {
     FileSystem fs = rootDir.getFileSystem(conf1);
     String sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis();
     SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName,
-      new String(famName), sourceSnapshotName, rootDir, fs, true);
+      Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true);
 
     // Take target snapshot
     Path peerRootDir = FSUtils.getRootDir(conf2);
     FileSystem peerFs = peerRootDir.getFileSystem(conf2);
     String peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis();
     SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), tableName,
-      new String(famName), peerSnapshotName, peerRootDir, peerFs, true);
+      Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true);
 
     String peerFSAddress = peerFs.getUri().toString();
     String temPath1 = utility1.getRandomDir().toString();
-    String temPath2 = "/tmp2";
+    String temPath2 = "/tmp" + System.currentTimeMillis();
 
     String[] args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName,
         "--sourceSnapshotTmpDir=" + temPath1, "--peerSnapshotName=" + peerSnapshotName,
         "--peerSnapshotTmpDir=" + temPath2, "--peerFSAddress=" + peerFSAddress,
         "--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), "2", tableName.getNameAsString() };
-
-    Job job = new VerifyReplication().createSubmittableJob(conf1, args);
-    if (job == null) {
-      fail("Job wasn't created, see the log");
-    }
-    if (!job.waitForCompletion(true)) {
-      fail("Job failed, see the log");
-    }
-    assertEquals(NB_ROWS_IN_BATCH,
-      job.getCounters().findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
-    assertEquals(0,
-      job.getCounters().findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
-
+    runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
     checkRestoreTmpDir(conf1, temPath1, 1);
     checkRestoreTmpDir(conf2, temPath2, 1);
 
@@ -470,30 +458,107 @@ public class TestVerifyReplication extends TestReplicationBase {
 
     sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis();
     SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName,
-      new String(famName), sourceSnapshotName, rootDir, fs, true);
+      Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true);
 
     peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis();
     SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), tableName,
-      new String(famName), peerSnapshotName, peerRootDir, peerFs, true);
+      Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true);
 
     args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName,
         "--sourceSnapshotTmpDir=" + temPath1, "--peerSnapshotName=" + peerSnapshotName,
         "--peerSnapshotTmpDir=" + temPath2, "--peerFSAddress=" + peerFSAddress,
         "--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), "2", tableName.getNameAsString() };
+    runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
+    checkRestoreTmpDir(conf1, temPath1, 2);
+    checkRestoreTmpDir(conf2, temPath2, 2);
+  }
 
-    job = new VerifyReplication().createSubmittableJob(conf1, args);
-    if (job == null) {
-      fail("Job wasn't created, see the log");
+  @Test
+  public void testVerifyRepJobWithQuorumAddress() throws Exception {
+    // Populate the tables, at the same time it guarantees that the tables are
+    // identical since it does the check
+    runSmallBatchTest();
+
+    // with a quorum address (a cluster key)
+    String[] args = new String[] { utility2.getClusterKey(), tableName.getNameAsString() };
+    runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
+
+    Scan scan = new Scan();
+    ResultScanner rs = htable2.getScanner(scan);
+    Put put = null;
+    for (Result result : rs) {
+      put = new Put(result.getRow());
+      Cell firstVal = result.rawCells()[0];
+      put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal),
+        Bytes.toBytes("diff data"));
+      htable2.put(put);
     }
-    if (!job.waitForCompletion(true)) {
-      fail("Job failed, see the log");
+    Delete delete = new Delete(put.getRow());
+    htable2.delete(delete);
+    runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
+  }
+
+  @Test
+  public void testVerifyRepJobWithQuorumAddressAndSnapshotSupport() throws Exception {
+    // Populate the tables, at the same time it guarantees that the tables are
+    // identical since it does the check
+    runSmallBatchTest();
+
+    // Take source and target tables snapshot
+    Path rootDir = FSUtils.getRootDir(conf1);
+    FileSystem fs = rootDir.getFileSystem(conf1);
+    String sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis();
+    SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName,
+      Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true);
+
+    // Take target snapshot
+    Path peerRootDir = FSUtils.getRootDir(conf2);
+    FileSystem peerFs = peerRootDir.getFileSystem(conf2);
+    String peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis();
+    SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), tableName,
+      Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true);
+
+    String peerFSAddress = peerFs.getUri().toString();
+    String tmpPath1 = utility1.getRandomDir().toString();
+    String tmpPath2 = "/tmp" + System.currentTimeMillis();
+
+    String[] args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName,
+      "--sourceSnapshotTmpDir=" + tmpPath1, "--peerSnapshotName=" + peerSnapshotName,
+      "--peerSnapshotTmpDir=" + tmpPath2, "--peerFSAddress=" + peerFSAddress,
+      "--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), utility2.getClusterKey(),
+      tableName.getNameAsString() };
+    runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
+    checkRestoreTmpDir(conf1, tmpPath1, 1);
+    checkRestoreTmpDir(conf2, tmpPath2, 1);
+
+    Scan scan = new Scan();
+    ResultScanner rs = htable2.getScanner(scan);
+    Put put = null;
+    for (Result result : rs) {
+      put = new Put(result.getRow());
+      Cell firstVal = result.rawCells()[0];
+      put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal),
+        Bytes.toBytes("diff data"));
+      htable2.put(put);
     }
-    assertEquals(0,
-      job.getCounters().findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
-    assertEquals(NB_ROWS_IN_BATCH,
-      job.getCounters().findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
+    Delete delete = new Delete(put.getRow());
+    htable2.delete(delete);
 
-    checkRestoreTmpDir(conf1, temPath1, 2);
-    checkRestoreTmpDir(conf2, temPath2, 2);
+    sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis();
+    SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName,
+      Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true);
+
+    peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis();
+    SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), tableName,
+      Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true);
+
+    args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName,
+      "--sourceSnapshotTmpDir=" + tmpPath1, "--peerSnapshotName=" + peerSnapshotName,
+      "--peerSnapshotTmpDir=" + tmpPath2, "--peerFSAddress=" + peerFSAddress,
+      "--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), utility2.getClusterKey(),
+      tableName.getNameAsString() };
+    runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
+    checkRestoreTmpDir(conf1, tmpPath1, 2);
+    checkRestoreTmpDir(conf2, tmpPath2, 2);
   }
 }