LENS-1538 : Lens changes for HA to persist and restore sessions and queries from DB master
authorAnkit Kailaswar <ankit.kailaswar@gmail.com>
Tue, 29 Jan 2019 09:20:25 +0000 (14:50 +0530)
committerAmareshwari Sriramadasu <amareshwari@apache.org>
Tue, 29 Jan 2019 09:20:25 +0000 (14:50 +0530)
lens-client/src/test/java/org/apache/lens/server/MockQueryExecutionServiceImpl.java
lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryExecutionService.java
lens-server/src/main/java/org/apache/lens/server/BaseLensService.java
lens-server/src/main/java/org/apache/lens/server/metastore/CubeMetastoreServiceImpl.java
lens-server/src/main/java/org/apache/lens/server/query/LensServerDAO.java
lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
lens-server/src/main/java/org/apache/lens/server/query/QueryServiceResource.java
lens-server/src/main/java/org/apache/lens/server/session/HiveSessionService.java
lens-server/src/test/java/org/apache/lens/server/TestBaseLensService.java [new file with mode: 0644]
lens-server/src/test/java/org/apache/lens/server/query/TestLensDAO.java

index 9b55fb6..976dead 100644 (file)
@@ -48,7 +48,7 @@ public class MockQueryExecutionServiceImpl extends QueryExecutionServiceImpl {
   }
 
   @Override
-  public LensQuery getQuery(LensSessionHandle sessionHandle, QueryHandle queryHandle) throws LensException {
+  public LensQuery getQueryInfo(LensSessionHandle sessionHandle, QueryHandle queryHandle) throws LensException {
 
     if (getSession(sessionHandle).getSessionConf().get(ENABLE_SLEEP_FOR_GET_QUERY_OP) != null) {
       //Introduce wait time for requests on this session. The wait time decreases with each new request to
index a803109..436ad76 100644 (file)
@@ -170,6 +170,8 @@ public interface QueryExecutionService extends LensService, SessionValidator {
    */
   LensQuery getQuery(LensSessionHandle sessionHandle, QueryHandle queryHandle) throws LensException;
 
+  LensQuery getQueryInfo(LensSessionHandle sessionHandle, QueryHandle queryHandle) throws LensException;
+
   /**
    * Get the result set metadata - list of columns(names and types) and result size.
    *
index 9364872..8d0f424 100644 (file)
@@ -50,6 +50,7 @@ import org.apache.lens.server.api.events.LensEvent;
 import org.apache.lens.server.api.events.LensEventService;
 import org.apache.lens.server.api.query.QueryExecutionService;
 import org.apache.lens.server.error.LensServerErrorCode;
+import org.apache.lens.server.query.LensServerDAO;
 import org.apache.lens.server.query.QueryExecutionServiceImpl;
 import org.apache.lens.server.session.LensSessionImpl;
 import org.apache.lens.server.user.UserConfigLoaderFactory;
@@ -91,6 +92,11 @@ public abstract class BaseLensService extends CompositeService implements Extern
   /** Utility to validate and get valid paths for input paths **/
   private PathValidator pathValidator;
 
+  /**
+   * The lens server dao.
+   */
+  protected static final LensServerDAO LENS_SERVER_DAO = new LensServerDAO();
+
   // Static session map which is used by query submission thread to get the
   // lens session before submitting a query to hive server
   /** The session map. */
@@ -568,8 +574,7 @@ public abstract class BaseLensService extends CompositeService implements Extern
     return pathValidator.removePrefixBeforeURI(path);
   }
 
-  @Override
-  public void validateSession(LensSessionHandle handle) throws LensException {
+  public void verifySessionInMemory(LensSessionHandle handle) throws LensException {
     if (handle == null) {
       throw new LensException(SESSION_ID_NOT_PROVIDED.getLensErrorInfo());
     }
@@ -584,6 +589,48 @@ public abstract class BaseLensService extends CompositeService implements Extern
     }
   }
 
+  private void restoreFromDb(final LensSessionHandle sessionHandle) throws LensException {
+    try {
+      LensSessionImpl.LensSessionPersistInfo persistInfo = LENS_SERVER_DAO.findActiveSessionDetails(sessionHandle);
+
+      if (persistInfo == null) {
+        throw new LensException("Unable to find session in mysql with session id : " + sessionHandle.getPublicId());
+      }
+
+      restoreSession(sessionHandle, persistInfo.getUsername(), persistInfo.getPassword(), persistInfo.getConfig());
+    } catch (Exception daoE) {
+      log.error("sql query failed with " + daoE.toString());
+      throw new LensException(SESSION_CLOSED.getLensErrorInfo(), sessionHandle);
+    }
+    throw new LensException("session not found");
+  }
+
+  @Override
+  public void validateSession(final LensSessionHandle sessionHandle) throws LensException {
+    try {
+      verifySessionInMemory(sessionHandle);
+    } catch (LensException e) {
+      if (e.getErrorCode() == LensServerErrorCode.SESSION_CLOSED.getLensErrorInfo().getErrorCode()) {
+        restoreFromDb(sessionHandle);
+      } else {
+        throw e;
+      }
+    }
+  }
+
+  protected void resotreSessionIfRequired(final LensSessionHandle sessionHandle) {
+    try {
+      verifySessionInMemory(sessionHandle);
+    } catch (LensException e) {
+      try {
+        restoreFromDb(sessionHandle);
+      } catch (LensException le) {
+        // we need to restore lens session if not present in memory,
+        // if its not in mysql then just swallow exception here
+        log.warn("Session  " + sessionHandle.getPublicId() + " is invalid.");
+      }
+    }
+  }
 
   @Override
   public void validateAndAuthorizeSession(LensSessionHandle handle, String userPrincipalName) throws LensException {
index 74806af..4e01d84 100644 (file)
@@ -1224,7 +1224,7 @@ public class CubeMetastoreServiceImpl extends BaseLensService implements CubeMet
 
     try {
       /** Try to issue command on hive **/
-      Hive.get(LensServerConf.getHiveConf()).getAllDatabases();
+      Hive.get(LensServerConf.getHiveConf()).getAllFunctions();
     } catch (HiveException e) {
       isHealthy = false;
       details.append("Could not connect to Hive.");
index cc6ca7d..983365d 100644 (file)
@@ -20,6 +20,7 @@ package org.apache.lens.server.query;
 
 import java.nio.charset.Charset;
 import java.sql.Connection;
+import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.ArrayList;
@@ -28,16 +29,20 @@ import java.util.List;
 import javax.sql.DataSource;
 
 import org.apache.lens.api.LensConf;
+import org.apache.lens.api.LensSessionHandle;
 import org.apache.lens.api.query.FailedAttempt;
 import org.apache.lens.api.query.QueryHandle;
 import org.apache.lens.api.query.QueryStatus;
 import org.apache.lens.server.api.error.LensException;
 import org.apache.lens.server.api.query.FinishedLensQuery;
+import org.apache.lens.server.api.query.QueryContext;
+import org.apache.lens.server.session.LensSessionImpl;
 import org.apache.lens.server.util.UtilityMethods;
 
 import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.dbutils.*;
 import org.apache.commons.dbutils.handlers.BeanHandler;
+import org.apache.commons.lang3.SerializationUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 
@@ -387,4 +392,433 @@ public class LensServerDAO {
       return "Error : " + e.getMessage();
     }
   }
+
+
+  /**
+   * Drop active session table.
+   */
+  public void dropActiveSessionsTable() {
+    QueryRunner runner = new QueryRunner(ds);
+    try {
+      runner.update("drop table active_sessions");
+    } catch (SQLException e) {
+      log.error("SQL exception while dropping active sessions table.", e);
+    }
+  }
+
+  /**
+   * Drop active queries table.
+   */
+  public void dropActiveQueries() {
+    QueryRunner runner = new QueryRunner(ds);
+    try {
+      runner.update("drop table active_queries");
+    } catch (SQLException e) {
+      log.error("SQL exception while dropping active queries table.", e);
+    }
+  }
+
+  /**
+   * Method to create active queries table, this is required for embedded lens server. For production server we will
+   * not be creating tables as it would be created upfront.
+   *
+   */
+  public void createActiveQueriesTable() {
+    String sql = "CREATE TABLE if not exists active_queries ("
+            + "queryid varchar(200) not null,"
+            + "querycontext BLOB,"
+            + "primary key (queryid)"
+            + ")";
+    try {
+      QueryRunner runner = new QueryRunner(ds);
+      runner.update(sql);
+      log.info("Created active queries table");
+    } catch (SQLException e) {
+      log.warn("Unable to create active queries table", e);
+    }
+  }
+
+  /**
+   * Method to create active session table, this is required for embedded lens server. For production server we will
+   * not be creating tables as it would be created upfront.
+   *
+   */
+  public void createActiveSessionsTable() throws Exception {
+    String sql = "CREATE TABLE if not exists active_sessions ("
+            + "sessionid varchar(200) not null,"
+            + "sessionobject BLOB,"
+            + "primary key (sessionid)"
+            + ")";
+    try {
+      QueryRunner runner = new QueryRunner(ds);
+      runner.update(sql);
+      log.info("Created active sessions table");
+    } catch (SQLException e) {
+      log.warn("Unable to create active sessions table", e);
+    }
+  }
+
+  /**
+   * Method to insert a new active query into Table.
+   *
+   * @param ctx query context
+   *
+   * @throws SQLException the exception
+   *
+   */
+  public void insertActiveQuery(QueryContext ctx) throws LensException {
+
+    String sql = "insert into active_queries (queryid, querycontext)"
+            + " values (?,?)";
+    Connection conn = null;
+    PreparedStatement pstmt = null;
+    try {
+      conn = getConnection();
+      pstmt = conn.prepareStatement(sql);
+
+      // set input parameters
+      pstmt.setString(1, ctx.getQueryHandleString());
+      pstmt.setObject(2, SerializationUtils.serialize(ctx));
+      pstmt.execute();
+
+      log.info("Inserted query with query " + ctx.getQueryHandleString() + " in database.");
+    } catch (SQLException e) {
+      log.error("Failed to insert query " + ctx.getQueryHandleString() + " in database with error, " + e);
+      throw new LensException(e);
+    } finally {
+      DbUtils.closeQuietly(pstmt);
+      DbUtils.closeQuietly(conn);
+    }
+  }
+
+  /**
+   * Method to insert a new active session into Table.
+   *
+   * @param session LensSessionPersistInfo object that has to be serialized.
+   *
+   * @throws SQLException the exception
+   *
+   */
+  public void insertActiveSession(LensSessionImpl.LensSessionPersistInfo session) throws LensException {
+    String sql = "insert into active_sessions (sessionid, sessionobject)"
+            + " values (?,?)";
+    Connection conn = null;
+    PreparedStatement pstmt = null;
+    try {
+      conn = getConnection();
+      pstmt = conn.prepareStatement(sql);
+
+      // set input parameters
+      pstmt.setString(1, session.getSessionHandle().getPublicId().toString());
+      pstmt.setObject(2, SerializationUtils.serialize(session));
+      pstmt.execute();
+
+      log.info("Inserted seesion " + session.getSessionHandle().getPublicId() + " in database.");
+    } catch (SQLException e) {
+      log.error("Failed to insert session " + session.getSessionHandle().getPublicId()
+              + " in database with error, " + e);
+      throw new LensException(e);
+    } finally {
+      DbUtils.closeQuietly(pstmt);
+      DbUtils.closeQuietly(conn);
+    }
+  }
+
+  /**
+   * Method to update a new active query into Table.
+   *
+   * @param ctx query context
+   *
+   * @throws LensException the exception
+   *
+   */
+  public void updateActiveQuery(QueryContext ctx) throws LensException {
+
+    String sql = "UPDATE active_queries SET querycontext=? where queryid=?";
+    Connection conn = null;
+    PreparedStatement pstmt = null;
+    try {
+      conn = getConnection();
+      pstmt = conn.prepareStatement(sql);
+
+      pstmt.setObject(1, SerializationUtils.serialize(ctx));
+      pstmt.setString(2, ctx.getQueryHandleString());
+      pstmt.execute();
+
+      log.info("Updated query with query " + ctx.getQueryHandleString() + " with query status as "
+              + ctx.getStatus().getStatus() + " in database.");
+    } catch (SQLException e) {
+      log.error("Failed to update query " + ctx.getQueryHandleString()
+              + " in database with error, " + e);
+      throw new LensException(e);
+    } finally {
+      DbUtils.closeQuietly(pstmt);
+      DbUtils.closeQuietly(conn);
+    }
+  }
+
+  /**
+   * Method to update a active session into Table.
+   *
+   * @param session query context object
+   *
+   * @throws LensException the exception
+   *
+   */
+  public void updateActiveSession(LensSessionImpl.LensSessionPersistInfo session) throws LensException {
+    String sql = "UPDATE active_sessions SET sessionobject=? where sessionid=?";
+    Connection conn = null;
+    PreparedStatement pstmt = null;
+    try {
+      conn = getConnection();
+      pstmt = conn.prepareStatement(sql);
+
+      // set input parameters
+      pstmt.setObject(1, SerializationUtils.serialize(session));
+      pstmt.setString(2, session.getSessionHandle().getPublicId().toString());
+      pstmt.execute();
+
+      log.info("Updated session " + session.getSessionHandle().getPublicId() + " in database.");
+    } catch (SQLException e) {
+      log.error("Failed to update session " + session.getSessionHandle().getPublicId()
+              + " in database with error, " + e);
+      throw new LensException(e);
+    } finally {
+      DbUtils.closeQuietly(pstmt);
+      DbUtils.closeQuietly(conn);
+    }
+  }
+
+  /**
+   * Finds Active query.
+   *
+   * @param queryHandle     the state
+   *
+   * @return the QueryContext object
+   *
+   * @throws LensException the lens exception
+   *
+   */
+  public QueryContext findActiveQueryDetails(QueryHandle queryHandle) throws LensException {
+    QueryContext ctx = null;
+    Connection conn = null;
+    PreparedStatement pstmt = null;
+    ResultSet rs = null;
+
+    try {
+
+      String sql = "SELECT querycontext FROM active_queries WHERE queryid = ?";
+
+      conn = getConnection();
+      pstmt = conn.prepareStatement(sql);
+      pstmt.setString(1, queryHandle.toString());
+
+      rs = pstmt.executeQuery();
+
+      if (rs != null) {
+        rs.next();
+        ctx = (QueryContext) SerializationUtils.deserialize(rs.getBytes(1));
+      }
+    } catch (SQLException e) {
+      log.error("Failed to find active query " + queryHandle.getHandleIdString()
+              + " in database with error, " + e);
+      throw new LensException(e);
+    } finally {
+      DbUtils.closeQuietly(rs);
+      DbUtils.closeQuietly(pstmt);
+      DbUtils.closeQuietly(conn);
+    }
+    return ctx;
+  }
+
+  /**
+   * Gets all active query.
+   *
+   * @return the list of QueryContext objects
+   *
+   * @throws LensException the lens exception
+   *
+   */
+  public List<QueryContext> getAllActiveQueries() throws LensException {
+    List<QueryContext> ctxs = new ArrayList<>();
+    Connection conn = null;
+    PreparedStatement pstmt = null;
+    ResultSet rs = null;
+
+    try {
+      String sql = "SELECT querycontext FROM active_queries";
+      conn = getConnection();
+      pstmt = conn.prepareStatement(sql);
+
+      rs = pstmt.executeQuery();
+
+      while (rs.next()) {
+        ctxs.add((QueryContext) SerializationUtils.deserialize(rs.getBytes(1)));
+      }
+    } catch (SQLException e) {
+      log.error("Unable to find all active queries in database, Failed with error, " + e);
+      throw new LensException(e);
+    } finally {
+      DbUtils.closeQuietly(rs);
+      DbUtils.closeQuietly(pstmt);
+      DbUtils.closeQuietly(conn);
+    }
+    return ctxs;
+  }
+
+  /**
+   * Finds active session.
+   *
+   * @param sessionId     the state
+   *
+   * @return session object
+   *
+   * @throws LensException the lens exception
+   *
+   */
+  public LensSessionImpl.LensSessionPersistInfo findActiveSessionDetails(LensSessionHandle sessionId)
+          throws LensException {
+    LensSessionImpl.LensSessionPersistInfo session = null;
+    Connection conn = null;
+    PreparedStatement pstmt = null;
+    ResultSet rs = null;
+
+    try {
+      String sql = "SELECT sessionobject FROM active_sessions WHERE sessionid = '"
+              + sessionId.getPublicId() + "'";
+      conn = getConnection();
+      pstmt = conn.prepareStatement(sql);
+
+      rs = pstmt.executeQuery();
+
+      if (rs != null) {
+        rs.next();
+        session =
+                (LensSessionImpl.LensSessionPersistInfo) SerializationUtils.deserialize(rs.getBytes(1));
+      }
+    } catch (SQLException e) {
+      log.error("Failed to find active session " + sessionId.getPublicId()
+              + " in database with error, " + e);
+      throw new LensException(e);
+    } finally {
+      DbUtils.closeQuietly(rs);
+      DbUtils.closeQuietly(pstmt);
+      DbUtils.closeQuietly(conn);
+    }
+    return session;
+  }
+
+  /**
+   * Gets all active session.
+   *
+   * @return the list of LensSessionImpl.LensSessionPersistInfo objects
+   *
+   * @throws LensException the lens exception
+   *
+   */
+  public List<LensSessionImpl.LensSessionPersistInfo> getAllActiveSessions() throws LensException {
+    List<LensSessionImpl.LensSessionPersistInfo> ctxs = new ArrayList<>();
+    Connection conn = null;
+    PreparedStatement pstmt = null;
+    ResultSet rs = null;
+
+    try {
+
+      String sql = "SELECT sessionobject FROM active_queries";
+
+      conn = getConnection();
+      pstmt = conn.prepareStatement(sql);
+      rs = pstmt.executeQuery();
+
+      while (rs.next()) {
+        ctxs.add((LensSessionImpl.LensSessionPersistInfo) SerializationUtils.deserialize(rs.getBytes(1)));
+      }
+    } catch (SQLException e) {
+      log.error("Unable to find all active queries in database, Failed with error, " + e);
+      throw new LensException(e);
+    } finally {
+      DbUtils.closeQuietly(rs);
+      DbUtils.closeQuietly(pstmt);
+      DbUtils.closeQuietly(conn);
+    }
+    return ctxs;
+  }
+
+  /**
+   * Delete active query.
+   *
+   * @param ctx QueryContext object for query
+   *
+   * @return true on success, false otherwise.
+   *
+   * @throws LensException the lens exception
+   *
+   */
+  public boolean deleteActiveQuery(QueryContext ctx) throws LensException {
+
+    String sql = "DELETE FROM active_queries where queryid=?";
+    Connection conn = null;
+    PreparedStatement pstmt = null;
+    boolean result = false;
+    try {
+      conn = getConnection();
+
+      pstmt = conn.prepareStatement(sql);
+
+      // set input parameters
+      pstmt.setString(1, ctx.getQueryHandleString());
+      result = pstmt.execute();
+
+      log.info("deleted active query " + ctx.getQueryHandleString() + " with final status " + ctx.getStatus()
+              + " from database.");
+    } catch (SQLException e) {
+      log.error("Failed to delete active query " + ctx.getQueryHandleString()
+              + " in database with error, " + e);
+      throw new LensException(e);
+    } finally {
+      DbUtils.closeQuietly(pstmt);
+      DbUtils.closeQuietly(conn);
+    }
+
+    return result;
+  }
+
+  /**
+   * Delete active session.
+   *
+   * @param sessionId session id to be deleted
+   *
+   * @return true on success, false otherwise.
+   *
+   * @throws LensException the lens exception
+   *
+   */
+  public boolean deleteActiveSession(LensSessionHandle sessionId) throws LensException {
+
+    String sql = "DELETE FROM active_sessions where sessionid=?";
+    Connection conn = null;
+    PreparedStatement pstmt = null;
+    boolean result;
+
+    try {
+      conn = getConnection();
+
+      pstmt = conn.prepareStatement(sql);
+
+      // set input parameters
+      pstmt.setString(1, sessionId.getPublicId().toString());
+      result = pstmt.execute();
+
+      log.info("deleted active session " + sessionId.getPublicId().toString() + " from database.");
+    } catch (SQLException e) {
+      log.error("Failed to delete active session " + sessionId.getPublicId().toString()
+              + " in database with error, " + e);
+      throw new LensException(e);
+    } finally {
+      DbUtils.closeQuietly(pstmt);
+      DbUtils.closeQuietly(conn);
+    }
+
+    return result;
+  }
 }
index 07a2107..9e5f2e6 100644 (file)
@@ -136,6 +136,15 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE
   public static final String TOTAL_QUERIES_EXPIRED = "total-expired-queries";
 
   /**
+   * Constants for active query in database.
+   */
+  public static final String ACTIVE_QUERY_INSERT_ERROR_COUNTER = "db-query-insert-errors";
+
+  public static final String ACTIVE_QUERY_DELETE_ERROR_COUNTER = "db-query-delete-errors";
+
+  public static final String ACTIVE_QUERY_UPDATE_ERROR_COUNTER = "db-query-update-errors";
+
+  /**
    * The Constant PREPARED_QUERY_PURGER_COUNTER.
    */
   public static final String PREPARED_QUERY_PURGER_COUNTER = "prepared-query-purger-errors";
@@ -1161,6 +1170,13 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE
     }
 
     StatusChange event = newStatusChangeEvent(ctx, prevState, currentStatus);
+    try {
+      ctx.setStatus(current);
+      lensServerDao.updateActiveQuery(ctx);
+    } catch (Exception e) {
+      log.warn("Failed to update status of query " + ctx.getQueryHandleString() + " in database.");
+      incrCounter(ACTIVE_QUERY_UPDATE_ERROR_COUNTER);
+    }
     if (event != null) {
       try {
         getEventService().notifyEvent(event);
@@ -1218,6 +1234,13 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE
               fireStatusChangeEvent(finished.getCtx(),
                 new QueryStatus(1f, null, CLOSED, "Query purged", false, null, null, null), finished.getCtx()
                   .getStatus());
+
+              try {
+                lensServerDao.deleteActiveQuery(finished.getCtx());
+              } catch (LensException e) {
+                incrCounter(ACTIVE_QUERY_DELETE_ERROR_COUNTER);
+              }
+
               log.info("Query purged: {}", finished.getQueryHandle());
             }
           }
@@ -1390,6 +1413,8 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE
     try {
       this.lensServerDao.createFinishedQueriesTable();
       this.lensServerDao.createFailedAttemptsTable();
+      this.lensServerDao.createActiveSessionsTable();
+      this.lensServerDao.createActiveQueriesTable();
     } catch (Exception e) {
       log.warn("Unable to create finished query tables, query purger will not purge queries", e);
     }
@@ -1537,6 +1562,13 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE
           }
           log.info("Removed closed query from all Queries:" + ctx.getQueryHandle());
         }
+
+        try {
+          lensServerDao.updateActiveQuery(ctx);
+        } catch (Exception e) {
+          log.warn("Failed to update status of query " + ctx.getQueryHandleString() + " in database.");
+          incrCounter(ACTIVE_QUERY_UPDATE_ERROR_COUNTER);
+        }
       }
       queuedQueries.addAll(allRestoredQueuedQueries);
       log.info("Recovered {} queries", allQueries.size());
@@ -2177,6 +2209,13 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE
       // Should be set only once
       ctx.setQueryConfHash(UtilityMethods.generateHashOfWritable(qconf));
       ctx.setQueryName(queryName);
+
+      try {
+        lensServerDao.insertActiveQuery(ctx);
+      } catch (Exception e) {
+        incrCounter(ACTIVE_QUERY_INSERT_ERROR_COUNTER);
+      }
+
       return executeAsyncInternal(sessionHandle, ctx);
     } finally {
       release(sessionHandle);
@@ -2387,6 +2426,21 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE
     return getUpdatedQueryContext(sessionHandle, queryHandle).toLensQuery();
   }
 
+  @Override
+  public LensQuery getQueryInfo(LensSessionHandle sessionHandle, QueryHandle queryHandle) throws LensException {
+    LensQuery query = getUpdatedQueryContext(sessionHandle, queryHandle).toLensQuery();
+
+
+    if (query == null) {
+      try {
+        return lensServerDao.findActiveQueryDetails(queryHandle).toLensQuery();
+      } catch (LensException e) {
+        log.info("Query " + queryHandle.getHandleIdString() + " not found in active queries table in db.");
+      }
+    }
+    return query;
+  }
+
   /**
    * Gets the prepared query context.
    *
index 47b40a8..6ade219 100644 (file)
@@ -489,7 +489,7 @@ public class QueryServiceResource {
   public LensQuery getStatus(@QueryParam("sessionid") LensSessionHandle sessionid,
     @PathParam("queryHandle") String queryHandle) throws LensException {
     validateSessionId(sessionid);
-    return queryServer.getQuery(sessionid, getQueryHandle(queryHandle));
+    return queryServer.getQueryInfo(sessionid, getQueryHandle(queryHandle));
   }
 
   /**
index f6d43d7..2650711 100644 (file)
@@ -31,9 +31,11 @@ import javax.ws.rs.WebApplicationException;
 
 import org.apache.lens.api.LensSessionHandle;
 import org.apache.lens.server.BaseLensService;
+import org.apache.lens.server.LensServices;
 import org.apache.lens.server.api.LensConfConstants;
 import org.apache.lens.server.api.error.LensException;
 import org.apache.lens.server.api.health.HealthStatus;
+import org.apache.lens.server.api.metrics.MetricsService;
 import org.apache.lens.server.api.session.*;
 import org.apache.lens.server.session.LensSessionImpl.ResourceEntry;
 
@@ -73,11 +75,26 @@ public class HiveSessionService extends BaseLensService implements SessionServic
   private DatabaseResourceService databaseResourceService;
 
   /**
+   * The metrics service.
+   */
+  private MetricsService metricsService;
+
+  /**
    * The conf.
    */
   private Configuration conf;
 
   /**
+   * The Constant SESSION_CLOSE_COUNTER.
+   */
+  public static final String SESSION_CLOSE_COUNTER = "db-session-close-errors";
+
+  /**
+   * The Constant SESSION_OPEN_COUNTER.
+   */
+  public static final String SESSION_OPEN_COUNTER = "db-session-open-errors";
+
+  /**
    * Instantiates a new hive session service.
    *
    * @param cliService the cli service
@@ -91,6 +108,13 @@ public class HiveSessionService extends BaseLensService implements SessionServic
     if (!isValidResouceType(type)) {
       throw new BadRequestException("Bad resource type is passed. Please pass jar or file as source type");
     }
+
+    try {
+      validateSession(sessionHandle);
+    } catch (LensException e) {
+      log.error("Failed to list resources in session", e);
+      throw new WebApplicationException(e);
+    }
     List<ResourceEntry> resources = getSession(sessionHandle).getResources();
     List<String> allResources = new ArrayList<String>();
     for (ResourceEntry resource : resources) {
@@ -111,10 +135,16 @@ public class HiveSessionService extends BaseLensService implements SessionServic
   @Override
   public void addResource(LensSessionHandle sessionid, String type, String path) {
     try {
+      resotreSessionIfRequired(sessionid);
       acquire(sessionid);
       SessionState ss = getSession(sessionid).getSessionState();
       String finalLocation = ss.add_resource(SessionState.ResourceType.valueOf(type.toUpperCase()), path);
       getSession(sessionid).addResource(type, path, finalLocation);
+      try {
+        LENS_SERVER_DAO.updateActiveSession(getSession(sessionid).getLensSessionPersistInfo());
+      } catch (LensException e) {
+        log.warn("Failed to update active session table with error," + e.toString());
+      }
     } catch (RuntimeException e) {
       log.error("Failed to add resource type:" + type + " path:" + path + " in session", e);
       throw new WebApplicationException(e);
@@ -146,6 +176,11 @@ public class HiveSessionService extends BaseLensService implements SessionServic
       acquire(sessionid);
       closeCliServiceOp(getCliService().executeStatement(getHiveSessionHandle(sessionid), command, null));
       getSession(sessionid).removeResource(type, path);
+      try {
+        LENS_SERVER_DAO.updateActiveSession(getSession(sessionid).getLensSessionPersistInfo());
+      } catch (LensException e) {
+        log.warn("Failed to update active session table with error," + e.toString());
+      }
     } catch (HiveSQLException e) {
       throw new WebApplicationException(e);
     } finally {
@@ -231,6 +266,13 @@ public class HiveSessionService extends BaseLensService implements SessionServic
         addResource(sessionid, "jar", jar);
       }
     }
+
+    try {
+      LENS_SERVER_DAO.insertActiveSession(getSession(sessionid).getLensSessionPersistInfo());
+    } catch (LensException e) {
+      getMetrics().incrCounter(HiveSessionService.class, SESSION_OPEN_COUNTER);
+    }
+
     return sessionid;
   }
 
@@ -245,6 +287,7 @@ public class HiveSessionService extends BaseLensService implements SessionServic
   @Override
   public List<String> getAllSessionParameters(LensSessionHandle sessionid, boolean verbose, String key)
     throws LensException {
+    resotreSessionIfRequired(sessionid);
     List<String> result = new ArrayList<String>();
     acquire(sessionid);
     try {
@@ -279,6 +322,12 @@ public class HiveSessionService extends BaseLensService implements SessionServic
     HashMap<String, String> config = Maps.newHashMap();
     config.put(key, value);
     setSessionParameters(sessionid, config);
+
+    try {
+      LENS_SERVER_DAO.updateActiveSession(getSession(sessionid).getLensSessionPersistInfo());
+    } catch (LensException e) {
+      log.warn("Failed to update active session table with error," + e.toString());
+    }
   }
 
   /**
@@ -337,6 +386,7 @@ public class HiveSessionService extends BaseLensService implements SessionServic
     this.conf = hiveConf;
 
     super.init(hiveConf);
+    this.LENS_SERVER_DAO.init(conf);
   }
 
   /*
@@ -362,39 +412,44 @@ public class HiveSessionService extends BaseLensService implements SessionServic
     }
 
     for (LensSessionImpl.LensSessionPersistInfo persistInfo : restorableSessions) {
-      try {
-        LensSessionHandle sessionHandle = persistInfo.getSessionHandle();
-        restoreSession(sessionHandle, persistInfo.getUsername(), persistInfo.getPassword(), persistInfo.getConfig());
-        LensSessionImpl session = getSession(sessionHandle);
-        session.getLensSessionPersistInfo().setLastAccessTime(persistInfo.getLastAccessTime());
-        session.getLensSessionPersistInfo().setConfig(persistInfo.getConfig());
-        session.getLensSessionPersistInfo().setResources(persistInfo.getResources());
-        session.setCurrentDatabase(persistInfo.getDatabase());
-        session.getLensSessionPersistInfo().setMarkedForClose(persistInfo.isMarkedForClose());
-
-        // Add resources for restored sessions
-        for (LensSessionImpl.ResourceEntry resourceEntry : session.getResources()) {
-          try {
-            addResourceUponRestart(sessionHandle, resourceEntry);
-          } catch (Exception e) {
-            log.error("Failed to restore resource for session: " + session + " resource: " + resourceEntry, e);
-          }
-        }
+      restoreSession(persistInfo);
+    }
+    log.info("Session service restored " + restorableSessions.size() + " sessions");
+  }
 
-        // Add config for restored sessions
-        try{
-          setSessionParametersOnRestore(sessionHandle, session.getConfig());
+  private void restoreSession(LensSessionImpl.LensSessionPersistInfo persistInfo) {
+    try {
+      LensSessionHandle sessionHandle = persistInfo.getSessionHandle();
+      restoreSession(sessionHandle, persistInfo.getUsername(), persistInfo.getPassword(), persistInfo.getConfig());
+      LensSessionImpl session = getSession(sessionHandle);
+      session.getLensSessionPersistInfo().setLastAccessTime(persistInfo.getLastAccessTime());
+      session.getLensSessionPersistInfo().setConfig(persistInfo.getConfig());
+      session.getLensSessionPersistInfo().setResources(persistInfo.getResources());
+      session.setCurrentDatabase(persistInfo.getDatabase());
+      session.getLensSessionPersistInfo().setMarkedForClose(persistInfo.isMarkedForClose());
+
+      // Add resources for restored sessions
+      for (LensSessionImpl.ResourceEntry resourceEntry : session.getResources()) {
+        try {
+          addResourceUponRestart(sessionHandle, resourceEntry);
         } catch (Exception e) {
-          log.error("Error setting parameters " + session.getConfig()
-            + " for session: " + session, e);
+          log.error("Failed to restore resource for session: " + session + " resource: " + resourceEntry, e);
         }
-        log.info("Restored session " + persistInfo.getSessionHandle().getPublicId());
-        notifyEvent(new SessionRestored(System.currentTimeMillis(), sessionHandle));
-      } catch (LensException e) {
-        throw new RuntimeException(e);
       }
+
+      // Add config for restored sessions
+      try{
+        setSessionParametersOnRestore(sessionHandle, session.getConfig());
+      } catch (Exception e) {
+        log.error("Error setting parameters " + session.getConfig()
+                + " for session: " + session, e);
+      }
+      log.info("Restored session " + persistInfo.getSessionHandle().getPublicId());
+      notifyEvent(new SessionRestored(System.currentTimeMillis(), sessionHandle));
+      log.info("Restored session " + persistInfo.getSessionHandle().getPublicId() + " from db.");
+    } catch (LensException e) {
+      throw new RuntimeException(e);
     }
-    log.info("Session service restored " + restorableSessions.size() + " sessions");
   }
 
   private int getSessionExpiryInterval() {
@@ -471,12 +526,28 @@ public class HiveSessionService extends BaseLensService implements SessionServic
     log.info("Session service recovered " + SESSION_MAP.size() + " sessions");
   }
 
+  private MetricsService getMetrics() {
+    if (metricsService == null) {
+      metricsService = LensServices.get().getService(MetricsService.NAME);
+      if (metricsService == null) {
+        throw new NullPointerException("Could not get metrics service");
+      }
+    }
+    return metricsService;
+  }
+
   /**
    * {@inheritDoc}
    */
   @Override
   public void closeSession(LensSessionHandle sessionHandle) throws LensException {
     closeInternal(sessionHandle);
+
+    try {
+      LENS_SERVER_DAO.deleteActiveSession(sessionHandle);
+    } catch (LensException e) {
+      getMetrics().incrCounter(HiveSessionService.class, SESSION_CLOSE_COUNTER);
+    }
     notifyEvent(new SessionClosed(System.currentTimeMillis(), sessionHandle));
   }
 
@@ -551,6 +622,13 @@ public class HiveSessionService extends BaseLensService implements SessionServic
           log.info("Closed inactive session " + sessionHandle.getPublicId() + " last accessed at "
             + new Date(lastAccessTime));
           notifyEvent(new SessionExpired(System.currentTimeMillis(), sessionHandle));
+
+          try {
+            LENS_SERVER_DAO.deleteActiveSession(sessionHandle);
+          } catch (LensException e) {
+            getMetrics().incrCounter(HiveSessionService.class, SESSION_CLOSE_COUNTER);
+          }
+
         } catch (ClientErrorException nfe) {
           log.error("Error getting session " + sessionHandle.getPublicId(), nfe);
           // Do nothing
diff --git a/lens-server/src/test/java/org/apache/lens/server/TestBaseLensService.java b/lens-server/src/test/java/org/apache/lens/server/TestBaseLensService.java
new file mode 100644 (file)
index 0000000..1da4fc8
--- /dev/null
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.lens.server;
+
+import java.util.HashMap;
+
+import javax.ws.rs.core.Application;
+
+import org.apache.lens.api.LensSessionHandle;
+import org.apache.lens.server.api.error.LensException;
+import org.apache.lens.server.error.LensServerErrorCode;
+import org.apache.lens.server.query.TestQueryService;
+import org.apache.lens.server.session.HiveSessionService;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * The Class TestBaseLensService.
+ */
+@Test(groups = "unit-test")
+@Slf4j
+public class TestBaseLensService extends LensJerseyTest {
+
+  @Override
+  protected Application configure() {
+    return new TestQueryService.QueryServiceTestApp();
+  }
+
+
+  /**
+   * Test lens server session validator.
+   *
+   * @throws Exception the exception
+   */
+  @Test
+  public void testLensServerValidateSession() throws Exception {
+
+    HiveSessionService sessionService = LensServices.get().getService(HiveSessionService.NAME);
+
+    LensSessionHandle validSession = sessionService.openSession("foo@localhost", "bar",
+            new HashMap<String, String>());
+
+    LensSessionHandle invalidSession = sessionService.openSession("foo@localhost", "bar",
+            new HashMap<String, String>());
+    sessionService.closeSession(invalidSession);
+
+    LensSessionHandle notInsertedSession = sessionService.openSession("foo@localhost", "bar",
+            new HashMap<String, String>());
+
+
+    sessionService.validateSession(validSession);
+    sessionService.validateSession(notInsertedSession);
+    try {
+      sessionService.validateSession(invalidSession);
+    } catch (LensException exp) {
+      Assert.assertEquals(exp.getErrorCode(), LensServerErrorCode.SESSION_CLOSED.getLensErrorInfo().getErrorCode());
+    }
+
+    sessionService.closeSession(validSession);
+    sessionService.closeSession(notInsertedSession);
+  }
+}
index 066525b..c741745 100644 (file)
@@ -39,6 +39,7 @@ import org.apache.lens.server.LensJerseyTest;
 import org.apache.lens.server.LensServices;
 import org.apache.lens.server.api.driver.LensDriver;
 import org.apache.lens.server.api.driver.MockDriver;
+import org.apache.lens.server.api.error.LensException;
 import org.apache.lens.server.api.query.*;
 
 import org.apache.hadoop.conf.Configuration;
@@ -180,6 +181,36 @@ public class TestLensDAO extends LensJerseyTest {
       queryContext.getSelectedDriver().getFullyQualifiedName(), "daotestquery1", -1L, Long.MAX_VALUE);
     Assert.assertEquals(daoTestQueryHandles.size(), 1);
     Assert.assertEquals(daoTestQueryHandles.get(0).getHandleId().toString(), finishedHandle);
+
+    service.lensServerDao.insertActiveQuery(queryContext);
+    QueryContext actualQueryContext = service.lensServerDao.findActiveQueryDetails(queryContext.getQueryHandle());
+    Assert.assertEquals(actualQueryContext.getQueryHandle().toString(), queryContext.getQueryHandle().toString());
+
+    QueryContext invalidQueryContext = service.createContext("SELECT ID FROM testTable1",
+            "foo@localhost", new LensConf(),
+            new Configuration(), 0);
+
+    try {
+      QueryContext actualInvalidQueryContext =
+              service.lensServerDao.findActiveQueryDetails(invalidQueryContext.getQueryHandle());
+    } catch (LensException le) {
+      // expected to throw LensException
+    }
+
+    boolean actualInvalidDeleteQuery = service.lensServerDao.deleteActiveQuery(invalidQueryContext);
+    Assert.assertNotEquals(actualInvalidDeleteQuery, true);
+
+    service.lensServerDao.insertActiveSession(service.getSession(session).getLensSessionPersistInfo());
+
+    LensSessionHandle invalidSession = service.openSession("foo@localhost", "bar",
+            new HashMap<String, String>());
+
+    boolean actualActiveSessionDeleted = service.lensServerDao.deleteActiveSession(session);
+    Assert.assertEquals(actualActiveSessionDeleted, false);
+
+    boolean invalidActualActiveSessionDeleted = service.lensServerDao.deleteActiveSession(invalidSession);
+    Assert.assertEquals(invalidActualActiveSessionDeleted, false);
+
     service.closeSession(session);
   }
 }