import org.apache.commons.lang3.Validate;
import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.exception.ProcessException;
import javax.security.auth.login.LoginException;
-import java.security.PrivilegedAction;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
/**
* Helper class for processors to perform an action as a KerberosUser.
*/
-public class KerberosAction {
+public class KerberosAction<T> {
private final KerberosUser kerberosUser;
- private final PrivilegedAction action;
- private final ProcessContext context;
+ private final PrivilegedExceptionAction<T> action;
private final ComponentLog logger;
public KerberosAction(final KerberosUser kerberosUser,
- final PrivilegedAction action,
- final ProcessContext context,
+ final PrivilegedExceptionAction<T> action,
final ComponentLog logger) {
this.kerberosUser = kerberosUser;
this.action = action;
- this.context = context;
this.logger = logger;
Validate.notNull(this.kerberosUser);
Validate.notNull(this.action);
- Validate.notNull(this.context);
Validate.notNull(this.logger);
}
- public void execute() {
+ public T execute() {
+ T result;
// lazily login the first time the processor executes
if (!kerberosUser.isLoggedIn()) {
try {
kerberosUser.login();
logger.info("Successful login for {}", new Object[]{kerberosUser.getPrincipal()});
} catch (LoginException e) {
- // make sure to yield so the processor doesn't keep retrying the rolled back flow files immediately
- context.yield();
throw new ProcessException("Login failed due to: " + e.getMessage(), e);
}
}
try {
kerberosUser.checkTGTAndRelogin();
} catch (LoginException e) {
- // make sure to yield so the processor doesn't keep retrying the rolled back flow files immediately
- context.yield();
throw new ProcessException("Relogin check failed due to: " + e.getMessage(), e);
}
// attempt to execute the action, if an exception is caught attempt to logout/login and retry
try {
- kerberosUser.doAs(action);
+ result = kerberosUser.doAs(action);
} catch (SecurityException se) {
logger.info("Privileged action failed, attempting relogin and retrying...");
logger.debug("", se);
try {
kerberosUser.logout();
kerberosUser.login();
- kerberosUser.doAs(action);
+ result = kerberosUser.doAs(action);
} catch (Exception e) {
- // make sure to yield so the processor doesn't keep retrying the rolled back flow files immediately
- context.yield();
throw new ProcessException("Retrying privileged action failed due to: " + e.getMessage(), e);
}
+ } catch (PrivilegedActionException e) {
+ throw new ProcessException("Privileged action failed due to: " + e.getMessage(), e.getException());
}
+
+ return result;
}
}
import javax.security.auth.login.LoginException;
import java.io.File;
import java.security.Principal;
-import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
final KerberosUser user1 = new KerberosKeytabUser(principal1.getName(), principal1KeytabFile.getAbsolutePath());
final AtomicReference<String> resultHolder = new AtomicReference<>(null);
- final PrivilegedAction privilegedAction = () -> {
+ final PrivilegedExceptionAction<Void> privilegedAction = () -> {
resultHolder.set("SUCCESS");
return null;
};
final ComponentLog logger = Mockito.mock(ComponentLog.class);
// create the action to test and execute it
- final KerberosAction kerberosAction = new KerberosAction(user1, privilegedAction, context, logger);
+ final KerberosAction kerberosAction = new KerberosAction<>(user1, privilegedAction, logger);
kerberosAction.execute();
// if the result holder has the string success then we know the action executed
import javax.security.auth.login.LoginException;
import java.io.IOException;
-import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
doOnTrigger(context, session);
} else {
// wrap doOnTrigger in a privileged action
- final PrivilegedAction action = () -> {
+ final PrivilegedExceptionAction<Void> action = () -> {
doOnTrigger(context, session);
return null;
};
// execute the privileged action as the given keytab user
- final KerberosAction kerberosAction = new KerberosAction(kerberosUser, action, context, getLogger());
- kerberosAction.execute();
+ final KerberosAction kerberosAction = new KerberosAction<>(kerberosUser, action, getLogger());
+ try {
+ kerberosAction.execute();
+ } catch (ProcessException e) {
+ context.yield();
+ throw e;
+ }
}
}
import javax.security.auth.login.LoginException;
import java.io.FileInputStream;
import java.io.IOException;
-import java.security.PrivilegedAction;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
}
@Test
- public void testUpdateWithKerberosAuth() throws IOException, InitializationException, LoginException {
+ public void testUpdateWithKerberosAuth() throws IOException, InitializationException, LoginException, PrivilegedActionException {
final String principal = "nifi@FOO.COM";
final String keytab = "src/test/resources/foo.keytab";
final KerberosKeytabUser kerberosUser = Mockito.mock(KerberosKeytabUser.class);
when(kerberosUser.getPrincipal()).thenReturn(principal);
when(kerberosUser.getKeytabFile()).thenReturn(keytab);
- when(kerberosUser.doAs(any(PrivilegedAction.class))).thenAnswer((invocation -> {
- final PrivilegedAction action = (PrivilegedAction) invocation.getArguments()[0];
+ when(kerberosUser.doAs(any(PrivilegedExceptionAction.class))).thenAnswer((invocation -> {
+ final PrivilegedExceptionAction action = (PrivilegedExceptionAction) invocation.getArguments()[0];
action.run();
return null;
})
// Verify that during the update the user was logged in, TGT was checked, and the action was executed
verify(kerberosUser, times(1)).login();
verify(kerberosUser, times(1)).checkTGTAndRelogin();
- verify(kerberosUser, times(1)).doAs(any(PrivilegedAction.class));
+ verify(kerberosUser, times(1)).doAs(any(PrivilegedExceptionAction.class));
}
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-kerberos-credentials-service-api</artifactId>
+ <version>1.9.0-SNAPSHOT</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<version>1.9.0-SNAPSHOT</version>
<scope>test</scope>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
<version>10.11.1.1</version>
- </dependency>
+ </dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derbynet</artifactId>
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.kerberos.KerberosCredentialsService;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.security.krb.KerberosAction;
+import org.apache.nifi.security.krb.KerberosKeytabUser;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.file.classloader.ClassLoaderUtils;
+import javax.security.auth.login.LoginException;
import java.net.MalformedURLException;
import java.sql.Connection;
import java.sql.Driver;
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
+ public static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder()
+ .name("kerberos-credentials-service")
+ .displayName("Kerberos Credentials Service")
+ .description("Specifies the Kerberos Credentials Controller Service that should be used for authenticating with Kerberos")
+ .identifiesControllerService(KerberosCredentialsService.class)
+ .required(false)
+ .build();
+
private static final List<PropertyDescriptor> properties;
static {
props.add(DATABASE_URL);
props.add(DB_DRIVERNAME);
props.add(DB_DRIVER_LOCATION);
+ props.add(KERBEROS_CREDENTIALS_SERVICE);
props.add(DB_USER);
props.add(DB_PASSWORD);
props.add(MAX_WAIT_TIME);
}
private volatile BasicDataSource dataSource;
+ private volatile KerberosKeytabUser kerberosUser;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final Long timeBetweenEvictionRunsMillis = extractMillisWithInfinite(context.getProperty(EVICTION_RUN_PERIOD));
final Long minEvictableIdleTimeMillis = extractMillisWithInfinite(context.getProperty(MIN_EVICTABLE_IDLE_TIME));
final Long softMinEvictableIdleTimeMillis = extractMillisWithInfinite(context.getProperty(SOFT_MIN_EVICTABLE_IDLE_TIME));
+ final KerberosCredentialsService kerberosCredentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
+
+ if (kerberosCredentialsService != null) {
+ kerberosUser = new KerberosKeytabUser(kerberosCredentialsService.getPrincipal(), kerberosCredentialsService.getKeytab());
+ try {
+ kerberosUser.login();
+ } catch (LoginException e) {
+ throw new InitializationException("Unable to authenticate Kerberos principal", e);
+ }
+ }
dataSource = new BasicDataSource();
dataSource.setDriverClassName(drv);
/**
* Shutdown pool, close all open connections.
+ * If a principal is authenticated with a KDC, that principal is logged out.
+ *
+ * If a @{@link LoginException} occurs while attempting to log out the @{@link org.apache.nifi.security.krb.KerberosUser},
+ * an attempt will still be made to shut down the pool and close open connections.
+ *
+ * @throws SQLException if there is an error while closing open connections
+ * @throws LoginException if there is an error during the principal log out, and will only be thrown if there was
+ * no exception while closing open connections
*/
@OnDisabled
- public void shutdown() {
+ public void shutdown() throws SQLException, LoginException {
try {
- dataSource.close();
- } catch (final SQLException e) {
- throw new ProcessException(e);
+ if (kerberosUser != null) {
+ kerberosUser.logout();
+ }
+ } finally {
+ kerberosUser = null;
+ try {
+ dataSource.close();
+ } finally {
+ dataSource = null;
+ }
}
}
@Override
public Connection getConnection() throws ProcessException {
try {
- final Connection con = dataSource.getConnection();
+ final Connection con;
+ if (kerberosUser != null) {
+ KerberosAction<Connection> kerberosAction = new KerberosAction<>(kerberosUser, () -> dataSource.getConnection(), getLogger());
+ con = kerberosAction.execute();
+ } else {
+ con = dataSource.getConnection();
+ }
return con;
} catch (final SQLException e) {
throw new ProcessException(e);
*/
package org.apache.nifi.dbcp
+import org.apache.commons.dbcp2.BasicDataSource
import org.apache.nifi.reporting.InitializationException
+import org.apache.nifi.security.krb.KerberosKeytabUser
import org.apache.nifi.util.TestRunner
import org.apache.nifi.util.TestRunners
import org.junit.Assert
import org.junit.BeforeClass
import org.junit.Test
+import javax.security.auth.login.LoginException
import java.sql.Connection
import java.sql.SQLException
connection.close() // will return connection to pool
}
}
+
+ @Test(expected = LoginException)
+ void testDatasourceCloseSuccessWithKerberosUserLogoutException() {
+ final DBCPConnectionPool dbcpConnectionPoolService = new DBCPConnectionPool()
+
+ def basicDataSource = [close: { -> }] as BasicDataSource
+ dbcpConnectionPoolService.dataSource = basicDataSource
+ def kerberosKeytabUser = new KerberosKeytabUser("bad@PRINCIPAL.COM", "fake.keytab") {
+ @Override
+ void logout() throws LoginException {
+ throw new LoginException("fake logout exception")
+ }
+ }
+ dbcpConnectionPoolService.kerberosUser = kerberosKeytabUser
+
+ dbcpConnectionPoolService.shutdown()
+
+ }
+
+ @Test(expected = SQLException)
+ void testDatasourceCloseExceptionWithKerberosUserLogoutSuccess() {
+ final DBCPConnectionPool dbcpConnectionPoolService = new DBCPConnectionPool()
+
+ def basicDataSource = [
+ close: { -> throw new SQLException("fake sql exception")
+ }] as BasicDataSource
+ dbcpConnectionPoolService.dataSource = basicDataSource
+ def kerberosKeytabUser = new KerberosKeytabUser("bad@PRINCIPAL.COM", "fake.keytab") {
+ @Override
+ void logout() throws LoginException {
+ }
+ }
+ dbcpConnectionPoolService.kerberosUser = kerberosKeytabUser
+
+ dbcpConnectionPoolService.shutdown()
+ }
+
+ @Test(expected = SQLException)
+ void testDatasourceCloseExceptionWithKerberosUserLogoutException() {
+ final DBCPConnectionPool dbcpConnectionPoolService = new DBCPConnectionPool()
+
+ def basicDataSource = [
+ close: { -> throw new SQLException("fake sql exception")
+ }] as BasicDataSource
+ dbcpConnectionPoolService.dataSource = basicDataSource
+ def kerberosKeytabUser = new KerberosKeytabUser("bad@PRINCIPAL.COM", "fake.keytab") {
+ @Override
+ void logout() throws LoginException {
+ throw new LoginException("fake logout exception")
+ }
+ }
+ dbcpConnectionPoolService.kerberosUser = kerberosKeytabUser
+
+ dbcpConnectionPoolService.shutdown()
+ }
}