Improved documentation. Added code to handle job cacellation if job is not found...
authoramilaj <amilaj@unknown>
Thu, 20 Jun 2013 03:56:32 +0000 (03:56 +0000)
committeramilaj <amilaj@unknown>
Thu, 20 Jun 2013 03:56:32 +0000 (03:56 +0000)
git-svn-id: https://svn.apache.org/repos/asf/airavata/sandbox@1494848 13f79535-47bb-0310-9956-ffa450edef68

grid-tools/gram-client/src/main/java/org/apache/airavata/jobsubmission/gram/GramJobSubmissionManager.java
grid-tools/gram-client/src/main/java/org/apache/airavata/jobsubmission/gram/notifier/GramJobNotifier.java
grid-tools/gram-client/src/main/java/org/apache/airavata/jobsubmission/gram/persistence/JobPersistenceManager.java
grid-tools/gram-client/src/test/java/org/apache/airavata/jobsubmission/gram/GramJobSubmissionManagerTest.java

index 2f75d3d..42c4f45 100644 (file)
@@ -42,7 +42,7 @@ public class GramJobSubmissionManager {
 
     private static final Logger log = Logger.getLogger(GramJobSubmissionManager.class);
 
-    private static final Map<String, GramJob> currentlyExecutingJobs = new ConcurrentHashMap<String, GramJob>();
+    private static final Map<String, GramJob> currentlyExecutingJobCache = new ConcurrentHashMap<String, GramJob>();
 
     private RSLGenerator rslGenerator;
 
@@ -86,7 +86,7 @@ public class GramJobSubmissionManager {
                 ListenerQueue listenerQueue = ListenerQueue.getInstance();
                 listenerQueue.addJob(job);
 
-                currentlyExecutingJobs.put(job.getIDAsString(), job);
+                currentlyExecutingJobCache.put(job.getIDAsString(), job);
 
                 log.debug("Two phase commit: sending COMMIT_REQUEST signal");
                 job.signal(GramJob.SIGNAL_COMMIT_REQUEST);
@@ -110,14 +110,23 @@ public class GramJobSubmissionManager {
 
 
 
-    public void cancelJob(String jobId) throws GramException, GSSException {
+    public void cancelJob(String jobId, GSSCredential gssCred) throws GramException, GSSException,
+            MalformedURLException {
 
-        if (currentlyExecutingJobs.containsKey(jobId)) {
-            GramJob gramJob = currentlyExecutingJobs.get(jobId);
+        if (currentlyExecutingJobCache.containsKey(jobId)) {
+            GramJob gramJob = currentlyExecutingJobCache.get(jobId);
             if (gramJob != null) {
                 gramJob.cancel();
                 gramJob.signal(GramJob.SIGNAL_COMMIT_END);
             }
+        } else {
+
+            GramJob gramJob = new GramJob(null);
+            gramJob.setID(jobId);
+            gramJob.setCredentials(gssCred);
+
+            gramJob.cancel();
+            gramJob.signal(GramJob.SIGNAL_COMMIT_END);
         }
 
     }
index a28b1cb..1445465 100644 (file)
@@ -29,28 +29,80 @@ import org.globus.gram.GramJob;
  * Time: 10:45 AM
  */
 
+/**
+ * This interface abstracts out state changes of a job submitted to GFac.
+ * For each state change of a job an appropriate method will get called.
+ * Further each method will get details about the executing job as a
+ * GramJob object.
+ */
 public interface GramJobNotifier {
 
+    /**
+     * This method will get called when job is in pending state.
+     * @param job Currently executing job.
+     */
     void OnPending(GramJob job);
 
+    /**
+     * This method will get called when job is in active state.
+     * @param job Currently executing job.
+     */
     void OnActive(GramJob job);
 
+    /**
+     * This method will get called when job is in Error state.
+     * @param job Currently executing job.
+     */
     void OnError (GramJob job);
 
+    /**
+     * This method will get called when job is completed.
+     * @param job Currently executing job.
+     */
     void OnCompletion(GramJob job);
 
+    /**
+     * This method will get called when some process cancels the currently executing job.
+     * @param job Currently executing job.
+     */
     void OnCancel (GramJob job);
 
+    /**
+     * This method will get called when job is in suspended state.
+     * @param job Currently executing job.
+     */
     void OnSuspend (GramJob job);
 
+    /**
+     * This method will get called when job is in un-submitted state.
+     * @param job Currently executing job.
+     */
     void OnUnSubmit (GramJob job);
 
+    /**
+     * When job stage in input files, this method will get called.
+     * @param job Currently executing job.
+     */
     void OnFilesStagedIn (GramJob job);
 
+    /**
+     * When job stage out input files, this method will get called.
+     * @param job Currently executing job.
+     */
     void OnFilesStagedOut (GramJob job);
 
+    /**
+     * If an unexpected error occurs in the listener code this method will get
+     * called.
+     * @param job Currently executing job.
+     * @param e The unexpected exception.
+     */
     void OnListenerError(GramJob job, Exception e);
 
+    /**
+     * If authorisation failed.
+     * @param job Currently executing job.
+     */
     void OnAuthorisationDenied(GramJob job);
 
 }
index 460164d..1fecaee 100644 (file)
@@ -31,16 +31,45 @@ import java.util.List;
  * Time: 2:23 PM
  */
 
+/**
+ * Responsible persisting job data. This data is useful during a restart.
+ * When restarting Airavata can resume monitoring currently executing jobs.
+ */
 public interface JobPersistenceManager {
 
+    /**
+     * Updates the job state in the persisting storage.
+     * @param jobData Job data to update.
+     * @throws GFacException If an error occurred while updating job data.
+     */
     void updateJobStatus (JobData jobData) throws GFacException;
 
+    /**
+     * Get all running jobs.
+     * @return Job ids which are not failed nor completed.
+     * @throws GFacException If an error occurred while querying job data.
+     */
     List<JobData> getRunningJobs() throws GFacException;
 
+    /**
+     * Get all failed job ids.
+     * @return Failed job ids.
+     * @throws GFacException If an error occurred while querying job data.
+     */
     List<JobData> getFailedJobs() throws GFacException;
 
+    /**
+     * Get all un-submitted job ids.
+     * @return Un-submitted job ids.
+     * @throws GFacException If an error occurred while querying job data.
+     */
     List<JobData> getUnSubmittedJobs() throws GFacException;
 
+    /**
+     * Get all successfully completed job ids.
+     * @return Successfully completed job ids.
+     * @throws GFacException If an error occurred while querying job data.
+     */
     List<JobData> getSuccessfullyCompletedJobs() throws GFacException;
 
 }
index 1122e77..e1e57bb 100644 (file)
@@ -214,7 +214,7 @@ public class GramJobSubmissionManagerTest extends TestCase {
 
         Assert.assertNotNull(jobId);
 
-        gramJobSubmissionManager.cancelJob(jobId);
+        gramJobSubmissionManager.cancelJob(jobId, context.getRawCredential());
 
 
         logger.info("========== End of test case ==============");