PHOENIX-4940 Add tenantId parameter to index tool
authorGokcen Iskender <giskender@salesforce.com>
Thu, 31 Jan 2019 19:04:03 +0000 (11:04 -0800)
committerGeoffrey Jacoby <gjacoby@apache.org>
Thu, 7 Feb 2019 23:08:29 +0000 (15:08 -0800)
Signed-off-by: Geoffrey Jacoby <gjacoby@apache.org>
phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java

index dfe4634..c1a455a 100644 (file)
@@ -46,10 +46,12 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.transaction.PhoenixTransactionProvider.Feature;
 import org.apache.phoenix.transaction.TransactionFactory;
+import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
@@ -72,13 +74,15 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
     private final boolean directApi;
     private final String tableDDLOptions;
     private final boolean useSnapshot;
+    private final boolean useTenantId;
 
     public IndexToolIT(String transactionProvider, boolean mutable, boolean localIndex,
-            boolean directApi, boolean useSnapshot) {
+            boolean directApi, boolean useSnapshot, boolean useTenantId) {
         this.localIndex = localIndex;
         this.transactional = transactionProvider != null;
         this.directApi = directApi;
         this.useSnapshot = useSnapshot;
+        this.useTenantId = useTenantId;
         StringBuilder optionBuilder = new StringBuilder();
         if (!mutable) {
             optionBuilder.append(" IMMUTABLE_ROWS=true ");
@@ -124,13 +128,15 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
                                 .isUnsupported(Feature.ALLOW_LOCAL_INDEX)) {
                         for (boolean directApi : Booleans) {
                             for (boolean useSnapshot : Booleans) {
-                                list.add(new Object[] { transactionProvider, mutable, localIndex, directApi, useSnapshot });
+                                list.add(new Object[] { transactionProvider, mutable, localIndex, directApi, useSnapshot, false});
                             }
                         }
                     }
                 }
             }
         }
+        // Add the usetenantId
+        list.add(new Object[] { "", false, false, true, false, true});
         return TestUtil.filterTxParamData(list,0);
     }
 
@@ -229,6 +235,89 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
     }
 
     @Test
+    public void testIndexToolWithTenantId() throws Exception {
+        if (!useTenantId) { return;}
+        String tenantId = generateUniqueName();
+        String schemaName = generateUniqueName();
+        String dataTableName = generateUniqueName();
+        String viewTenantName = generateUniqueName();
+        String indexNameGlobal = generateUniqueName();
+        String indexNameTenant = generateUniqueName();
+        String viewIndexTableName = "_IDX_" + dataTableName;
+
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection connGlobal = DriverManager.getConnection(getUrl(), props);
+        props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+        Connection connTenant = DriverManager.getConnection(getUrl(), props);
+        String createTableStr = "CREATE TABLE %s (TENANT_ID VARCHAR(15) NOT NULL, ID INTEGER NOT NULL, NAME VARCHAR, "
+                + "CONSTRAINT PK_1 PRIMARY KEY (TENANT_ID, ID)) MULTI_TENANT=true";
+        String createViewStr = "CREATE VIEW %s AS SELECT * FROM %s";
+
+        String upsertQueryStr = "UPSERT INTO %s (TENANT_ID, ID, NAME) VALUES('%s' , %d, '%s')";
+        String createIndexStr = "CREATE INDEX %s ON %s (NAME) ";
+
+        try {
+            String tableStmtGlobal = String.format(createTableStr, dataTableName);
+            connGlobal.createStatement().execute(tableStmtGlobal);
+
+            String viewStmtTenant = String.format(createViewStr, viewTenantName, dataTableName);
+            connTenant.createStatement().execute(viewStmtTenant);
+
+            String idxStmtTenant = String.format(createIndexStr, indexNameTenant, viewTenantName);
+            connTenant.createStatement().execute(idxStmtTenant);
+
+            connTenant.createStatement()
+                    .execute(String.format(upsertQueryStr, viewTenantName, tenantId, 1, "x"));
+            connTenant.commit();
+
+            runIndexTool(true, false, "", viewTenantName, indexNameTenant, tenantId, 0,
+                    new String[0]);
+
+            String selectSql = String.format("SELECT ID FROM %s WHERE NAME='x'", viewTenantName);
+            ResultSet rs = connTenant.createStatement().executeQuery("EXPLAIN " + selectSql);
+            String actualExplainPlan = QueryUtil.getExplainPlan(rs);
+            assertExplainPlan(false, actualExplainPlan, "", viewIndexTableName);
+            rs = connTenant.createStatement().executeQuery(selectSql);
+            assertTrue(rs.next());
+            assertEquals(1, rs.getInt(1));
+            assertFalse(rs.next());
+
+            // Remove from tenant view index and build.
+            ConnectionQueryServices queryServices = connGlobal.unwrap(PhoenixConnection.class).getQueryServices();
+            Admin admin = queryServices.getAdmin();
+            TableName tableName = TableName.valueOf(viewIndexTableName);
+            admin.disableTable(tableName);
+            admin.truncateTable(tableName, false);
+
+            runIndexTool(true, false, "", viewTenantName, indexNameTenant, tenantId, 0,
+                    new String[0]);
+            Table htable= queryServices.getTable(Bytes.toBytes(viewIndexTableName));
+            int count = getUtility().countRows(htable);
+            // Confirm index has rows
+            assertTrue(count == 1);
+
+            selectSql = String.format("SELECT /*+ INDEX(%s) */ COUNT(*) FROM %s", indexNameTenant, viewTenantName);
+            rs = connTenant.createStatement().executeQuery(selectSql);
+            assertTrue(rs.next());
+            assertEquals(1, rs.getInt(1));
+            assertFalse(rs.next());
+
+            String idxStmtGlobal =
+                    String.format(createIndexStr, indexNameGlobal, dataTableName);
+            connGlobal.createStatement().execute(idxStmtGlobal);
+
+            // run the index MR job this time with tenant id.
+            // We expect it to return -1 because indexTable is not correct for this tenant.
+            runIndexTool(true, false, schemaName, dataTableName, indexNameGlobal,
+                    tenantId, -1, new String[0]);
+
+        } finally {
+            connGlobal.close();
+            connTenant.close();
+        }
+    }
+
+    @Test
     public void testSaltedVariableLengthPK() throws Exception {
         String schemaName = generateUniqueName();
         String dataTableName = generateUniqueName();
@@ -332,7 +421,7 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
             conn.createStatement().execute(indexDDL);
 
             // run with 50% sampling rate, split if data table more than 3 regions
-            runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, "-sp", "50", "-spa", "3");
+            runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null,"-sp", "50", "-spa", "3");
 
             assertEquals(targetNumRegions, admin.getTableRegions(indexTN).size());
             List<Cell> values = new ArrayList<>();
@@ -361,7 +450,7 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
     }
 
     public static String[] getArgValues(boolean directApi, boolean useSnapshot, String schemaName,
-            String dataTable, String indxTable) {
+            String dataTable, String indxTable, String tenantId) {
         final List<String> args = Lists.newArrayList();
         if (schemaName != null) {
             args.add("-s");
@@ -381,6 +470,11 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
             args.add("-snap");
         }
 
+        if (tenantId != null) {
+            args.add("-tenant");
+            args.add(tenantId);
+        }
+
         args.add("-op");
         args.add("/tmp/" + UUID.randomUUID().toString());
         return args.toArray(new String[0]);
@@ -401,15 +495,21 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
 
     public static void runIndexTool(boolean directApi, boolean useSnapshot, String schemaName,
             String dataTableName, String indexTableName, String... additionalArgs) throws Exception {
+        runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, additionalArgs);
+    }
+
+    public static void runIndexTool(boolean directApi, boolean useSnapshot, String schemaName,
+            String dataTableName, String indexTableName, String tenantId, int expectedStatus,
+            String... additionalArgs) throws Exception {
         IndexTool indexingTool = new IndexTool();
         Configuration conf = new Configuration(getUtility().getConfiguration());
         conf.set(QueryServices.TRANSACTIONS_ENABLED, Boolean.TRUE.toString());
         indexingTool.setConf(conf);
         final String[] cmdArgs =
-                getArgValues(directApi, useSnapshot, schemaName, dataTableName, indexTableName);
+                getArgValues(directApi, useSnapshot, schemaName, dataTableName, indexTableName, tenantId);
         List<String> cmdArgList = new ArrayList<>(Arrays.asList(cmdArgs));
         cmdArgList.addAll(Arrays.asList(additionalArgs));
         int status = indexingTool.run(cmdArgList.toArray(new String[cmdArgList.size()]));
-        assertEquals(0, status);
+        assertEquals(expectedStatus, status);
     }
 }
index a69fa47..1e62838 100644 (file)
@@ -149,6 +149,8 @@ public class IndexTool extends Configured implements Tool {
             "Output path where the files are written");
     private static final Option SNAPSHOT_OPTION = new Option("snap", "snapshot", false,
         "If specified, uses Snapshots for async index building (optional)");
+    private static final Option TENANT_ID_OPTION = new Option("tenant", "tenant-id", true,
+        "If specified, uses Tenant connection for tenant view index building (optional)");
     private static final Option HELP_OPTION = new Option("h", "help", false, "Help");
     public static final String INDEX_JOB_NAME_TEMPLATE = "PHOENIX_%s.%s_INDX_%s";
 
@@ -162,6 +164,7 @@ public class IndexTool extends Configured implements Tool {
         options.addOption(RUN_FOREGROUND_OPTION);
         options.addOption(OUTPUT_PATH_OPTION);
         options.addOption(SNAPSHOT_OPTION);
+        options.addOption(TENANT_ID_OPTION);
         options.addOption(HELP_OPTION);
         AUTO_SPLIT_INDEX_OPTION.setOptionalArg(true);
         options.addOption(AUTO_SPLIT_INDEX_OPTION);
@@ -247,15 +250,15 @@ public class IndexTool extends Configured implements Tool {
         }
 
         public Job getJob(String schemaName, String indexTable, String dataTable, boolean useDirectApi, boolean isPartialBuild,
-            boolean useSnapshot) throws Exception {
+            boolean useSnapshot, String tenantId) throws Exception {
             if (isPartialBuild) {
-                return configureJobForPartialBuild(schemaName, dataTable);
+                return configureJobForPartialBuild(schemaName, dataTable, tenantId);
             } else {
-                return configureJobForAsyncIndex(schemaName, indexTable, dataTable, useDirectApi, useSnapshot);
+                return configureJobForAsyncIndex(schemaName, indexTable, dataTable, useDirectApi, useSnapshot, tenantId);
             }
         }
         
-        private Job configureJobForPartialBuild(String schemaName, String dataTable) throws Exception {
+        private Job configureJobForPartialBuild(String schemaName, String dataTable, String tenantId) throws Exception {
             final String qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable);
             final PTable pdataTable = PhoenixRuntime.getTable(connection, qDataTable);
             connection = ConnectionUtil.getInputConnection(configuration);
@@ -303,7 +306,10 @@ public class IndexTool extends Configured implements Tool {
             ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable(ByteUtil.EMPTY_BYTE_ARRAY);
             IndexMaintainer.serializeAdditional(pdataTable, indexMetaDataPtr, disabledPIndexes, connection.unwrap(PhoenixConnection.class));
             PhoenixConfigurationUtil.setIndexMaintainers(configuration, indexMetaDataPtr);
-            
+            if (tenantId != null) {
+                PhoenixConfigurationUtil.setTenantId(configuration, tenantId);
+            }
+
             //Prepare raw scan 
             Scan scan = IndexManagementUtil.newLocalStateScan(maintainers);
             scan.setTimeRange(minDisableTimestamp - 1, maxTimestamp);
@@ -364,7 +370,7 @@ public class IndexTool extends Configured implements Tool {
             
         }
 
-        private Job configureJobForAsyncIndex(String schemaName, String indexTable, String dataTable, boolean useDirectApi, boolean useSnapshot)
+        private Job configureJobForAsyncIndex(String schemaName, String indexTable, String dataTable, boolean useDirectApi, boolean useSnapshot, String tenantId)
                 throws Exception {
             final String qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable);
             final String qIndexTable;
@@ -408,6 +414,9 @@ public class IndexTool extends Configured implements Tool {
             PhoenixConfigurationUtil.setDisableIndexes(configuration, indexTable);
             PhoenixConfigurationUtil.setUpsertColumnNames(configuration,
                 indexColumns.toArray(new String[indexColumns.size()]));
+            if (tenantId != null) {
+                PhoenixConfigurationUtil.setTenantId(configuration, tenantId);
+            }
             final List<ColumnInfo> columnMetadataList =
                     PhoenixRuntime.generateColumnInfo(connection, qIndexTable, indexColumns);
             ColumnInfoToStringEncoderDecoder.encode(configuration, columnMetadataList);
@@ -536,14 +545,20 @@ public class IndexTool extends Configured implements Tool {
             String basePath=cmdLine.getOptionValue(OUTPUT_PATH_OPTION.getOpt());
             boolean isForeground = cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt());
             boolean useSnapshot = cmdLine.hasOption(SNAPSHOT_OPTION.getOpt());
-            connection = ConnectionUtil.getInputConnection(configuration);
+            boolean useTenantId = cmdLine.hasOption(TENANT_ID_OPTION.getOpt());
             byte[][] splitKeysBeforeJob = null;
             boolean isLocalIndexBuild = false;
             PTable pindexTable = null;
+            String tenantId = null;
+            if (useTenantId) {
+                tenantId = cmdLine.getOptionValue(TENANT_ID_OPTION.getOpt());
+                configuration.set(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+            }
+            connection = ConnectionUtil.getInputConnection(configuration);
             if (indexTable != null) {
-                if (!isValidIndexTable(connection, qDataTable,indexTable)) {
+                if (!isValidIndexTable(connection, qDataTable,indexTable, tenantId)) {
                     throw new IllegalArgumentException(String.format(
-                        " %s is not an index table for %s ", indexTable, qDataTable));
+                        " %s is not an index table for %s for this connection", indexTable, qDataTable));
                 }
                 pindexTable = PhoenixRuntime.getTable(connection, schemaName != null && !schemaName.isEmpty()
                         ? SchemaUtil.getQualifiedTableName(schemaName, indexTable) : indexTable);
@@ -581,7 +596,7 @@ public class IndexTool extends Configured implements Tool {
                        }
             
             Job job = new JobFactory(connection, configuration, outputPath).getJob(schemaName, indexTable, dataTable,
-                    useDirectApi, isPartialBuild, useSnapshot);
+                    useDirectApi, isPartialBuild, useSnapshot, tenantId);
             if (!isForeground && useDirectApi) {
                 LOG.info("Running Index Build in Background - Submit async and exit");
                 job.submit();
@@ -742,18 +757,23 @@ public class IndexTool extends Configured implements Tool {
      * @param connection
      * @param masterTable
      * @param indexTable
+     * @param tenantId
      * @return
      * @throws SQLException
      */
     private boolean isValidIndexTable(final Connection connection, final String masterTable,
-            final String indexTable) throws SQLException {
+            final String indexTable, final String tenantId) throws SQLException {
         final DatabaseMetaData dbMetaData = connection.getMetaData();
         final String schemaName = SchemaUtil.getSchemaNameFromFullName(masterTable);
         final String tableName = SchemaUtil.normalizeIdentifier(SchemaUtil.getTableNameFromFullName(masterTable));
 
         ResultSet rs = null;
         try {
-            rs = dbMetaData.getIndexInfo("", schemaName, tableName, false, false);
+            String catalog = "";
+            if (tenantId != null) {
+                catalog = tenantId;
+            }
+            rs = dbMetaData.getIndexInfo(catalog, schemaName, tableName, false, false);
             while (rs.next()) {
                 final String indexName = rs.getString(6);
                 if (indexTable.equalsIgnoreCase(indexName)) {