NIFI-5984: Enabled Kerberos Authentication for PutKudu
authorMark Payne <markap14@hotmail.com>
Wed, 30 Jan 2019 15:06:04 +0000 (10:06 -0500)
committerJeff Storck <jtswork@gmail.com>
Mon, 11 Feb 2019 23:15:16 +0000 (18:15 -0500)
This closes #3279

nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml
nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java
nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java

index 5871cc0..8bb100d 100644 (file)
     <dependencies>
         <dependency>
             <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-distributed-cache-client-service-api</artifactId>
-            <version>1.9.0-SNAPSHOT</version>
-            <scope>provided</scope>
+            <artifactId>nifi-api</artifactId>
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-api</artifactId>
+            <artifactId>nifi-processor-utils</artifactId>
+            <version>1.9.0-SNAPSHOT</version>
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-processor-utils</artifactId>
+            <artifactId>nifi-kerberos-credentials-service-api</artifactId>
             <version>1.9.0-SNAPSHOT</version>
         </dependency>
         <dependency>
             <artifactId>nifi-record</artifactId>
         </dependency>
         <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>18.0</version>
+        </dependency>
+
+        <dependency>
             <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-hadoop-record-utils</artifactId>
+            <artifactId>nifi-security-utils</artifactId>
             <version>1.9.0-SNAPSHOT</version>
         </dependency>
+
         <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-mock-record-utils</artifactId>
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-schema-registry-service-api</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-mock</artifactId>
             <version>1.9.0-SNAPSHOT</version>
             <scope>test</scope>
             <version>2.5.4</version>
             <scope>test</scope>
         </dependency>
-        <dependency>
-            <groupId>com.google.guava</groupId>
-            <artifactId>guava</artifactId>
-            <version>18.0</version>
-        </dependency>
     </dependencies>
 </project>
index b0eb3f9..9c0c503 100644 (file)
@@ -34,6 +34,7 @@ import org.apache.kudu.client.SessionConfiguration;
 import org.apache.kudu.client.Upsert;
 import org.apache.nifi.annotation.behavior.EventDriven;
 import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -41,21 +42,29 @@ import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyDescriptor.Builder;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.kerberos.KerberosCredentialsService;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.security.krb.KerberosAction;
+import org.apache.nifi.security.krb.KerberosKeytabUser;
+import org.apache.nifi.security.krb.KerberosUser;
 import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.RecordReaderFactory;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordSet;
 
+import javax.security.auth.login.LoginException;
+import java.io.IOException;
 import java.io.InputStream;
 import java.math.BigDecimal;
+import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -65,8 +74,11 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import static org.apache.nifi.expression.ExpressionLanguageScope.VARIABLE_REGISTRY;
+
 @EventDriven
 @SupportsBatching
+@RequiresInstanceClassLoading // Because of calls to UserGroupInformation.setConfiguration
 @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
 @Tags({"put", "database", "NoSQL", "kudu", "HDFS", "record"})
 @CapabilityDescription("Reads records from an incoming FlowFile using the provided Record Reader, and writes those records " +
@@ -74,23 +86,31 @@ import java.util.stream.Collectors;
         " If any error occurs while reading records from the input, or writing records to Kudu, the FlowFile will be routed to failure")
 @WritesAttribute(attribute = "record.count", description = "Number of records written to Kudu")
 public class PutKudu extends AbstractProcessor {
-    protected static final PropertyDescriptor KUDU_MASTERS = new PropertyDescriptor.Builder()
+    protected static final PropertyDescriptor KUDU_MASTERS = new Builder()
         .name("Kudu Masters")
         .description("List all kudu masters's ip with port (e.g. 7051), comma separated")
         .required(true)
         .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+        .expressionLanguageSupported(VARIABLE_REGISTRY)
         .build();
 
-    protected static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
+    protected static final PropertyDescriptor TABLE_NAME = new Builder()
         .name("Table Name")
         .description("The name of the Kudu Table to put data into")
         .required(true)
         .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+        .expressionLanguageSupported(VARIABLE_REGISTRY)
         .build();
 
-    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+    static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new Builder()
+        .name("kerberos-credentials-service")
+        .displayName("Kerberos Credentials Service")
+        .description("Specifies the Kerberos Credentials to use for authentication")
+        .required(false)
+        .identifiesControllerService(KerberosCredentialsService.class)
+        .build();
+
+    public static final PropertyDescriptor RECORD_READER = new Builder()
         .name("record-reader")
         .displayName("Record Reader")
         .description("The service for reading records from incoming flow files.")
@@ -98,7 +118,7 @@ public class PutKudu extends AbstractProcessor {
         .required(true)
         .build();
 
-    protected static final PropertyDescriptor SKIP_HEAD_LINE = new PropertyDescriptor.Builder()
+    protected static final PropertyDescriptor SKIP_HEAD_LINE = new Builder()
         .name("Skip head line")
         .description("Deprecated. Used to ignore header lines, but this should be handled by a RecordReader " +
             "(e.g. \"Treat First Line as Header\" property of CSVReader)")
@@ -108,7 +128,7 @@ public class PutKudu extends AbstractProcessor {
         .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
         .build();
 
-    protected static final PropertyDescriptor INSERT_OPERATION = new PropertyDescriptor.Builder()
+    protected static final PropertyDescriptor INSERT_OPERATION = new Builder()
         .name("Insert Operation")
         .description("Specify operationType for this processor. Insert-Ignore will ignore duplicated rows")
         .allowableValues(OperationType.values())
@@ -116,7 +136,7 @@ public class PutKudu extends AbstractProcessor {
         .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
         .build();
 
-    protected static final PropertyDescriptor FLUSH_MODE = new PropertyDescriptor.Builder()
+    protected static final PropertyDescriptor FLUSH_MODE = new Builder()
         .name("Flush Mode")
         .description("Set the new flush mode for a kudu session.\n" +
             "AUTO_FLUSH_SYNC: the call returns when the operation is persisted, else it throws an exception.\n" +
@@ -149,7 +169,7 @@ public class PutKudu extends AbstractProcessor {
         .defaultValue("100")
         .required(true)
         .addValidator(StandardValidators.createLongValidator(1, 100000, true))
-        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+        .expressionLanguageSupported(VARIABLE_REGISTRY)
         .build();
 
 
@@ -171,12 +191,14 @@ public class PutKudu extends AbstractProcessor {
 
     protected KuduClient kuduClient;
     protected KuduTable kuduTable;
+    private volatile KerberosUser kerberosUser;
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         final List<PropertyDescriptor> properties = new ArrayList<>();
         properties.add(KUDU_MASTERS);
         properties.add(TABLE_NAME);
+        properties.add(KERBEROS_CREDENTIALS_SERVICE);
         properties.add(SKIP_HEAD_LINE);
         properties.add(RECORD_READER);
         properties.add(INSERT_OPERATION);
@@ -197,7 +219,7 @@ public class PutKudu extends AbstractProcessor {
 
 
     @OnScheduled
-    public void OnScheduled(final ProcessContext context) throws KuduException {
+    public void onScheduled(final ProcessContext context) throws IOException, LoginException {
         final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue();
         final String kuduMasters = context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue();
         operationType = OperationType.valueOf(context.getProperty(INSERT_OPERATION).getValue());
@@ -206,21 +228,48 @@ public class PutKudu extends AbstractProcessor {
         flushMode = SessionConfiguration.FlushMode.valueOf(context.getProperty(FLUSH_MODE).getValue());
 
         getLogger().debug("Setting up Kudu connection...");
-        kuduClient = createClient(kuduMasters);
+        final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
+        kuduClient = createClient(kuduMasters, credentialsService);
         kuduTable = kuduClient.openTable(tableName);
         getLogger().debug("Kudu connection successfully initialized");
     }
 
-    protected KuduClient createClient(final String masters) {
+    protected KuduClient createClient(final String masters, final KerberosCredentialsService credentialsService) throws LoginException {
+        if (credentialsService == null) {
+            return buildClient(masters);
+        }
+
+        final String keytab = credentialsService.getKeytab();
+        final String principal = credentialsService.getPrincipal();
+        kerberosUser = loginKerberosUser(principal, keytab);
+
+        final KerberosAction<KuduClient> kerberosAction = new KerberosAction<>(kerberosUser, () -> buildClient(masters), getLogger());
+        return kerberosAction.execute();
+    }
+
+    protected KuduClient buildClient(final String masters) {
         return new KuduClient.KuduClientBuilder(masters).build();
     }
 
+    protected KerberosUser loginKerberosUser(final String principal, final String keytab) throws LoginException {
+        final KerberosUser kerberosUser = new KerberosKeytabUser(principal, keytab);
+        kerberosUser.login();
+        return kerberosUser;
+    }
+
     @OnStopped
-    public final void closeClient() throws KuduException {
-        if (kuduClient != null) {
-            getLogger().debug("Closing KuduClient");
-            kuduClient.close();
-            kuduClient = null;
+    public final void closeClient() throws KuduException, LoginException {
+        try {
+            if (kuduClient != null) {
+                getLogger().debug("Closing KuduClient");
+                kuduClient.close();
+                kuduClient = null;
+            }
+        } finally {
+            if (kerberosUser != null) {
+                kerberosUser.logout();
+                kerberosUser = null;
+            }
         }
     }
 
@@ -231,6 +280,22 @@ public class PutKudu extends AbstractProcessor {
             return;
         }
 
+        final KerberosUser user = kerberosUser;
+        if (user == null) {
+            trigger(context, session, flowFiles);
+            return;
+        }
+
+        final PrivilegedExceptionAction<Void> privelegedAction = () -> {
+            trigger(context, session, flowFiles);
+            return null;
+        };
+
+        final KerberosAction<Void> action = new KerberosAction<>(user, privelegedAction, getLogger());
+        action.execute();
+    }
+
+    private void trigger(final ProcessContext context, final ProcessSession session, final List<FlowFile> flowFiles) throws ProcessException {
         final KuduSession kuduSession = getKuduSession(kuduClient);
         final RecordReaderFactory recordReaderFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
 
@@ -353,13 +418,13 @@ public class PutKudu extends AbstractProcessor {
 
 
 
-    protected Upsert upsertRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames) throws IllegalStateException, Exception {
+    protected Upsert upsertRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames) {
         Upsert upsert = kuduTable.newUpsert();
         this.buildPartialRow(kuduTable.getSchema(), upsert.getRow(), record, fieldNames);
         return upsert;
     }
 
-    protected Insert insertRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames) throws IllegalStateException, Exception {
+    protected Insert insertRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames) {
         Insert insert = kuduTable.newInsert();
         this.buildPartialRow(kuduTable.getSchema(), insert.getRow(), record, fieldNames);
         return insert;
index f805be7..091f5c3 100644 (file)
@@ -22,8 +22,13 @@ import org.apache.kudu.client.KuduClient;
 import org.apache.kudu.client.KuduSession;
 import org.apache.kudu.client.KuduTable;
 import org.apache.kudu.client.Upsert;
+import org.apache.nifi.security.krb.KerberosUser;
 import org.apache.nifi.serialization.record.Record;
 
+import javax.security.auth.login.LoginException;
+import java.security.PrivilegedAction;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
 import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.List;
@@ -36,6 +41,9 @@ public class MockPutKudu extends PutKudu {
     private KuduSession session;
     private LinkedList<Insert> insertQueue;
 
+    private boolean loggedIn = false;
+    private boolean loggedOut = false;
+
     public MockPutKudu() {
         this(mock(KuduSession.class));
     }
@@ -61,18 +69,71 @@ public class MockPutKudu extends PutKudu {
     }
 
     @Override
-    protected KuduClient createClient(final String masters) {
+    protected KuduClient buildClient(final String masters) {
         final KuduClient client = mock(KuduClient.class);
 
         try {
             when(client.openTable(anyString())).thenReturn(mock(KuduTable.class));
         } catch (final Exception e) {
-
+            throw new AssertionError(e);
         }
 
         return client;
     }
 
+    public boolean loggedIn() {
+        return loggedIn;
+    }
+
+    public boolean loggedOut() {
+        return loggedOut;
+    }
+
+    @Override
+    protected KerberosUser loginKerberosUser(final String principal, final String keytab) throws LoginException {
+        return new KerberosUser() {
+
+            @Override
+            public void login() {
+                loggedIn = true;
+            }
+
+            @Override
+            public void logout() {
+                loggedOut = true;
+            }
+
+            @Override
+            public <T> T doAs(final PrivilegedAction<T> action) throws IllegalStateException {
+                return action.run();
+            }
+
+            @Override
+            public <T> T doAs(final PrivilegedExceptionAction<T> action) throws IllegalStateException, PrivilegedActionException {
+                try {
+                    return action.run();
+                } catch (Exception e) {
+                    throw new PrivilegedActionException(e);
+                }
+            }
+
+            @Override
+            public boolean checkTGTAndRelogin() {
+                return true;
+            }
+
+            @Override
+            public boolean isLoggedIn() {
+                return loggedIn && !loggedOut;
+            }
+
+            @Override
+            public String getPrincipal() {
+                return principal;
+            }
+        };
+    }
+
     @Override
     protected KuduSession getKuduSession(KuduClient client) {
         return session;
index 51908f2..6fc430c 100644 (file)
@@ -28,8 +28,10 @@ import org.apache.kudu.client.OperationResponse;
 import org.apache.kudu.client.RowError;
 import org.apache.kudu.client.RowErrorsAndOverflowStatus;
 import org.apache.kudu.client.SessionConfiguration.FlushMode;
+import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.kerberos.KerberosCredentialsService;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.ProvenanceEventType;
@@ -70,6 +72,8 @@ import java.util.stream.IntStream;
 import static org.apache.nifi.processors.kudu.TestPutKudu.ResultCode.EXCEPTION;
 import static org.apache.nifi.processors.kudu.TestPutKudu.ResultCode.FAIL;
 import static org.apache.nifi.processors.kudu.TestPutKudu.ResultCode.OK;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -150,6 +154,42 @@ public class TestPutKudu {
     }
 
     @Test
+    public void testKerberosEnabled() throws InitializationException {
+        createRecordReader(1);
+
+        final KerberosCredentialsService kerberosCredentialsService = new MockKerberosCredentialsService("unit-test-principal", "unit-test-keytab");
+        testRunner.addControllerService("kerb", kerberosCredentialsService);
+        testRunner.enableControllerService(kerberosCredentialsService);
+
+        testRunner.setProperty(PutKudu.KERBEROS_CREDENTIALS_SERVICE, "kerb");
+
+        testRunner.run(1, false);
+
+        final MockPutKudu proc = (MockPutKudu) testRunner.getProcessor();
+        assertTrue(proc.loggedIn());
+        assertFalse(proc.loggedOut());
+
+        testRunner.run(1, true, false);
+        assertTrue(proc.loggedOut());
+    }
+
+
+    @Test
+    public void testInsecureClient() throws InitializationException {
+        createRecordReader(1);
+
+        testRunner.run(1, false);
+
+        final MockPutKudu proc = (MockPutKudu) testRunner.getProcessor();
+        assertFalse(proc.loggedIn());
+        assertFalse(proc.loggedOut());
+
+        testRunner.run(1, true, false);
+        assertFalse(proc.loggedOut());
+    }
+
+
+    @Test
     public void testInvalidReaderShouldRouteToFailure() throws InitializationException, SchemaNotFoundException, MalformedRecordException, IOException {
         createRecordReader(0);
 
@@ -516,4 +556,25 @@ public class TestPutKudu {
     public void testKuduPartialFailuresOnManualFlush() throws Exception {
         testKuduPartialFailure(FlushMode.MANUAL_FLUSH);
     }
+
+
+    public static class MockKerberosCredentialsService extends AbstractControllerService implements KerberosCredentialsService {
+        private final String keytab;
+        private final String principal;
+
+        public MockKerberosCredentialsService(final String keytab, final String principal) {
+            this.keytab = keytab;
+            this.principal = principal;
+        }
+
+        @Override
+        public String getKeytab() {
+            return keytab;
+        }
+
+        @Override
+        public String getPrincipal() {
+            return principal;
+        }
+    }
 }