HBASE-21849 master serving regions version is not handled correctly
authorSergey Shelukhin <sershe@apache.org>
Thu, 14 Feb 2019 00:06:48 +0000 (16:06 -0800)
committerSergey Shelukhin <sershe@apache.org>
Thu, 14 Feb 2019 00:06:48 +0000 (16:06 -0800)
Signed-off-by: Michael Stack <stack@apache.org>
hbase-server/src/main/java/org/apache/hadoop/hbase/client/VersionInfoUtil.java
hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServicesVersionWrapper.java [new file with mode: 0644]
hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedurePrepareLatch.java
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java

index b610d84..1436945 100644 (file)
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.client;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.ipc.RpcCallContext;
 import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
 
 
@@ -29,6 +30,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
  */
 @InterfaceAudience.Private
 public final class VersionInfoUtil {
+  private static final ThreadLocal<HBaseProtos.VersionInfo> NonCallVersion = new ThreadLocal<>();
 
   private VersionInfoUtil() {
     /* UTIL CLASS ONLY */
@@ -68,10 +70,30 @@ public final class VersionInfoUtil {
   }
 
   /**
+   *  We intend to use the local version for service call shortcut(s), so we use an interface
+   *  compatible with a typical service call, with 2 args, return type, and an exception type.
+   */
+  public interface ServiceCallFunction<T1, T2, R, E extends Throwable> {
+    R apply(T1 t1, T2 t2) throws E;
+  }
+
+  public static <T1, T2, R, E extends  Throwable> R callWithVersion(
+      ServiceCallFunction<T1, T2, R, E> f, T1 t1, T2 t2) throws E {
+    // Note: just as RpcServer.CurCall, this will only apply on the current thread.
+    NonCallVersion.set(ProtobufUtil.getVersionInfo());
+    try {
+      return f.apply(t1, t2);
+    } finally {
+      NonCallVersion.remove();
+    }
+  }
+
+  /**
    * @return the versionInfo extracted from the current RpcCallContext
    */
   public static HBaseProtos.VersionInfo getCurrentClientVersionInfo() {
-    return RpcServer.getCurrentCall().map(RpcCallContext::getClientVersionInfo).orElse(null);
+    return RpcServer.getCurrentCall().map(
+        RpcCallContext::getClientVersionInfo).orElse(NonCallVersion.get());
   }
 
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServicesVersionWrapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServicesVersionWrapper.java
new file mode 100644 (file)
index 0000000..a98f5ae
--- /dev/null
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master;
+
+import org.apache.hadoop.hbase.client.VersionInfoUtil;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
+import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
+
+/**
+ * A wrapper class for MasterRpcServices shortcut that ensures a client version is available
+ * to the callee without a current RPC call.
+ */
+@InterfaceAudience.Private
+public class MasterRpcServicesVersionWrapper
+    implements RegionServerStatusProtos.RegionServerStatusService.BlockingInterface {
+
+  @FunctionalInterface
+  public interface ServiceCallFunction<Req, Resp>
+      extends VersionInfoUtil.ServiceCallFunction<RpcController, Req, Resp, ServiceException> {
+  }
+
+  private final MasterRpcServices masterRpcServices;
+  private final ServiceCallFunction<RegionServerStatusProtos.RegionServerStartupRequest,
+    RegionServerStatusProtos.RegionServerStartupResponse> startupCall;
+  private final ServiceCallFunction<RegionServerStatusProtos.RegionServerReportRequest,
+    RegionServerStatusProtos.RegionServerReportResponse> reportCall;
+
+
+  public MasterRpcServicesVersionWrapper(MasterRpcServices masterRpcServices) {
+    this.masterRpcServices = masterRpcServices;
+    this.startupCall = (c, req) -> masterRpcServices.regionServerStartup(c, req);
+    this.reportCall = (c, req) -> masterRpcServices.regionServerReport(c, req);
+  }
+
+  @Override
+  public RegionServerStatusProtos.RegionServerStartupResponse regionServerStartup(
+      RpcController controller, RegionServerStatusProtos.RegionServerStartupRequest request)
+      throws ServiceException {
+    return VersionInfoUtil.callWithVersion(startupCall, controller, request);
+  }
+
+  @Override
+  public RegionServerStatusProtos.RegionServerReportResponse regionServerReport(
+      RpcController controller, RegionServerStatusProtos.RegionServerReportRequest request)
+      throws ServiceException {
+    return VersionInfoUtil.callWithVersion(reportCall, controller, request);
+  }
+
+  @Override
+  public RegionServerStatusProtos.ReportRSFatalErrorResponse reportRSFatalError(
+      RpcController controller, RegionServerStatusProtos.ReportRSFatalErrorRequest request)
+      throws ServiceException {
+    return masterRpcServices.reportRSFatalError(controller, request);
+  }
+
+  @Override
+  public RegionServerStatusProtos.GetLastFlushedSequenceIdResponse getLastFlushedSequenceId(
+      RpcController controller, RegionServerStatusProtos.GetLastFlushedSequenceIdRequest request)
+      throws ServiceException {
+    return masterRpcServices.getLastFlushedSequenceId(controller, request);
+  }
+
+  @Override
+  public RegionServerStatusProtos.ReportRegionStateTransitionResponse reportRegionStateTransition(
+      RpcController controller,
+      RegionServerStatusProtos.ReportRegionStateTransitionRequest request)
+      throws ServiceException {
+    return masterRpcServices.reportRegionStateTransition(controller, request);
+  }
+
+  @Override
+  public RegionServerStatusProtos.RegionSpaceUseReportResponse reportRegionSpaceUse(
+      RpcController controller, RegionServerStatusProtos.RegionSpaceUseReportRequest request)
+      throws ServiceException {
+    return masterRpcServices.reportRegionSpaceUse(controller, request);
+  }
+
+  @Override
+  public RegionServerStatusProtos.ReportProcedureDoneResponse reportProcedureDone(
+      RpcController controller, RegionServerStatusProtos.ReportProcedureDoneRequest request)
+      throws ServiceException {
+    return masterRpcServices.reportProcedureDone(controller, request);
+  }
+
+  @Override
+  public RegionServerStatusProtos.FileArchiveNotificationResponse reportFileArchival(
+      RpcController controller, RegionServerStatusProtos.FileArchiveNotificationRequest request)
+      throws ServiceException {
+    return masterRpcServices.reportFileArchival(controller, request);
+  }
+}
index f771210..231e751 100644 (file)
@@ -72,6 +72,8 @@ public abstract class ProcedurePrepareLatch {
   }
 
   private static boolean hasProcedureSupport(int major, int minor) {
+    // Note: this won't work if the shortcut similar to the one in HRegionServer is used
+    //       without the corresponding version handling.
     return VersionInfoUtil.currentClientHasMinimumVersion(major, minor);
   }
 
index 34a6c13..c897502 100644 (file)
@@ -118,6 +118,7 @@ import org.apache.hadoop.hbase.ipc.ServerRpcController;
 import org.apache.hadoop.hbase.log.HBaseMarkers;
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.master.LoadBalancer;
+import org.apache.hadoop.hbase.master.MasterRpcServicesVersionWrapper;
 import org.apache.hadoop.hbase.master.RegionState.State;
 import org.apache.hadoop.hbase.mob.MobFileCache;
 import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost;
@@ -2578,7 +2579,9 @@ public class HRegionServer extends HasThread implements
 
         // If we are on the active master, use the shortcut
         if (this instanceof HMaster && sn.equals(getServerName())) {
-          intRssStub = ((HMaster)this).getMasterRpcServices();
+          // Wrap the shortcut in a class providing our version to the calls where it's relevant.
+          // Normally, RpcServer-based threadlocals do that.
+          intRssStub = new MasterRpcServicesVersionWrapper(((HMaster)this).getMasterRpcServices());
           intLockStub = ((HMaster)this).getMasterRpcServices();
           break;
         }