Add Apache Helix playground project to sandbox
authorGourav Shenoy <shenoy.200@gmail.com>
Thu, 22 Jun 2017 19:26:06 +0000 (15:26 -0400)
committerGourav Shenoy <shenoy.200@gmail.com>
Thu, 22 Jun 2017 19:26:06 +0000 (15:26 -0400)
16 files changed:
helix-playground/README.md [new file with mode: 0644]
helix-playground/images/helix-task-framework.png [new file with mode: 0644]
helix-playground/pom.xml [new file with mode: 0644]
helix-playground/src/main/java/edu/iu/helix/airavata/ControllerNode.java [new file with mode: 0644]
helix-playground/src/main/java/edu/iu/helix/airavata/HelixCluster.java [new file with mode: 0644]
helix-playground/src/main/java/edu/iu/helix/airavata/HelixClusterManager.java [new file with mode: 0644]
helix-playground/src/main/java/edu/iu/helix/airavata/HelixUtil.java [new file with mode: 0644]
helix-playground/src/main/java/edu/iu/helix/airavata/ParticipantNode.java [new file with mode: 0644]
helix-playground/src/main/java/edu/iu/helix/airavata/tasks/HelixTaskA.java [new file with mode: 0644]
helix-playground/src/main/java/edu/iu/helix/airavata/tasks/HelixTaskB.java [new file with mode: 0644]
helix-playground/src/main/java/edu/iu/helix/airavata/tasks/HelixTaskC.java [new file with mode: 0644]
helix-playground/src/main/java/edu/iu/helix/airavata/tasks/HelixTaskD.java [new file with mode: 0644]
helix-playground/src/main/java/edu/iu/helix/play/HelixDAGFramework.java [new file with mode: 0644]
helix-playground/src/main/java/edu/iu/helix/play/HelixTaskA.java [new file with mode: 0644]
helix-playground/src/main/java/edu/iu/helix/play/HelixTaskB.java [new file with mode: 0644]
helix-playground/src/main/resources/log4j.properties [new file with mode: 0644]

diff --git a/helix-playground/README.md b/helix-playground/README.md
new file mode 100644 (file)
index 0000000..3be47ce
--- /dev/null
@@ -0,0 +1,134 @@
+# Using Apache Helix To Perform Distributed Task Execution in Apache Airavata
+
+## Introduction
+
+This project is a playground to test Apache Helix's task execution framework. Apache Helix is a generic cluster management framework, which allows one to build highly scalable and fault tolerant distributed systems. It provides a range of functionalities, including:
+* Automatic assignment of resources (task executors) and it’s partitions (parallelism of resources) to nodes in the cluster.
+* Detecting failure of nodes in the cluster, and taking appropriate actions to recover them back.
+* Cluster management – adding nodes and resources to cluster dynamically, load balancing.
+* Ability to define an IDEAL STATE for a node – and defining STATE transitions in case the state for a node deviates from the IDEAL one.
+
+Apart from these, Helix also provides out-of-the-box APIs to perform Distributed Task Execution. Some of the concepts Helix uses are ideal to our Airavata task execution use-case. These concepts include:
+* Tasks – actual runnable logic executors (eg: job submission, data staging, etc). Tasks return a TaskResult object which contains the state of the task once completed. These include, COMPLETED, FAILED, FATAL_FAILED. Difference between FAILED and FATAL_FAILED, is that FAILED tasks are re-run by Helix (threshold can be set), whereas FATAL_FAILED tasks are not.
+* Jobs – A combination of tasks, without dependencies; i.e. if there are > 1 tasks, they are run in parallel across workers.
+* Workflow – A combination of jobs arranged in a DAG. In a ONE-TIME workflow, once all jobs are completed, the workflow ends. In a RECURRING workflow, you can schedule workflows to run periodically.
+* Job Queues – Another type of workflow, but never ends – keeps accepting new incoming jobs. Ends only when user terminates it.
+
+[helix-tas-framework](images/helix-task-framework.png)
+
+* Helix also allows users to share data (key-value pairs) across Tasks/Jobs/Workflows. The content stored at workflow layer can shared by different jobs belong to this workflow. Similarly content persisted at job layer can shared by different tasks nested in this job.
+* Helix provides APIs to POLL either a JOB or WORKFLOW to reach a particular state.
+
+Some core concepts used in Helix which are important to know:
+* Participant – Is a node in a Helix cluster (a.k.a. an instance or worker), which host resources (a.k.a. tasks).
+* Controller – Is a node in a Helix cluster that monitors and controls the Participant nodes. The controller is responsible for checking if the state of a participant node matches the IDEAL state, and if not, perform STATE TRANSITIONS in order to bring that node back to IDEAL state.
+* State Model & State Transitions – Helix allows developers to define what state a participant node needs to be, in order to declare it healthy. Example, in an ONLINE-OFFLINE state model, a node is healthy if it is in ONLINE state; whereas if it goes OFFLINE (for any reason), we can define TRANSITION actions to bring it back ONLINE.
+* Cluster – Contains participants and controller nodes. One can define the State model for a cluster.
+
+More details about Apache Helix can be found on their website [https://helix.apache.org](https://helix.apache.org).
+
+## How can Helix be used in Airavata??
+Assuming we use Helix just to perform distributed task execution, I have the following in mind:
+* Create Helix Tasks (by implementing the Task interface) for each of our job-submission, data-staging, etc. These tasks are called resources.
+* Create Participant nodes (a.k.a. workers) to hold these resources. Helix allows us to create resource partitions, such that if we need a Task to run in parallel across workers, we can set the num_partitions > 1 for that resource.
+*  Define a StateModel, either an OnlineOffline or MasterSlave, and necessary state transitions. With state transitions we can control the behavior of the participant nodes.
+* Create a WORKFLOW to execute a single experiment. This workflow will contain DAG necessary to run that experiment.
+* Create a long running QUEUE to keep accepting in-coming experiment requests. Each new experiment request will result in creation of a new JOB to be added to this queue – this job will contain one task – which is to create and run the workflow (mentioned in bullet above).
+
+## Building this Project
+This project uses Apache Maven to build the artifacts. Run the following command to make sure the project builds successfully.
+```cmd
+$ mvn clean install
+```
+
+## Running the Prototype
+Open the project in an IDE of your choice. Open the ```HelixClusterManager.java``` class. You can control the number of participant nodes (workers) in the cluster by updating the ```numWorkers``` field in the ```main``` method.
+
+The output after running the program should look as follows:
+```cmd
+Starting helix manager for cluster [ HelixDemoCluster ], on ZK server [ localhost:2199 ], with [ 3 ] workers, having [ 1] partitions.
+0    [main] WARN  org.apache.helix.manager.zk.ZKHelixAdmin  - Root directory exists.Cleaning the root directory:/HelixDemoCluster
+Successfully created helix cluster: HelixDemoCluster, with [ 1 ] partitions.
+Successfully started participant node: HelixDemoParticipant_0, on cluster: HelixDemoCluster
+Successfully started participant node: HelixDemoParticipant_1, on cluster: HelixDemoCluster
+Successfully started participant node: HelixDemoParticipant_2, on cluster: HelixDemoCluster
+548  [pool-1-thread-3] WARN  org.apache.helix.healthcheck.ParticipantHealthReportTask  - ParticipantHealthReportTimerTask already stopped
+548  [pool-1-thread-2] WARN  org.apache.helix.healthcheck.ParticipantHealthReportTask  - ParticipantHealthReportTimerTask already stopped
+548  [pool-1-thread-1] WARN  org.apache.helix.healthcheck.ParticipantHealthReportTask  - ParticipantHealthReportTimerTask already stopped
+631  [pool-1-thread-1] WARN  org.apache.helix.manager.zk.CallbackHandler  - Skip processing callbacks for listener: org.apache.helix.messaging.handling.HelixTaskExecutor@fc497fe, path: /HelixDemoCluster/INSTANCES/HelixDemoParticipant_0/MESSAGES, expected types: [CALLBACK, FINALIZE] but was INIT
+631  [pool-1-thread-3] WARN  org.apache.helix.manager.zk.CallbackHandler  - Skip processing callbacks for listener: org.apache.helix.messaging.handling.HelixTaskExecutor@7bd93e51, path: /HelixDemoCluster/INSTANCES/HelixDemoParticipant_2/MESSAGES, expected types: [CALLBACK, FINALIZE] but was INIT
+631  [pool-1-thread-2] WARN  org.apache.helix.manager.zk.CallbackHandler  - Skip processing callbacks for listener: org.apache.helix.messaging.handling.HelixTaskExecutor@3bcaa92a, path: /HelixDemoCluster/INSTANCES/HelixDemoParticipant_1/MESSAGES, expected types: [CALLBACK, FINALIZE] but was INIT
+Successfully added resources to cluster: HelixDemoCluster
+736  [Thread-17] WARN  org.apache.helix.manager.zk.CallbackHandler  - Skip processing callbacks for listener: org.apache.helix.messaging.handling.HelixTaskExecutor@3046372e, path: /HelixDemoCluster/CONTROLLER/MESSAGES, expected types: [CALLBACK, FINALIZE] but was INIT
+737  [Thread-17] WARN  org.apache.helix.manager.zk.CallbackHandler  - Skip processing callbacks for listener: org.apache.helix.controller.GenericHelixController@629ae1fd, path: /HelixDemoCluster/CONFIGS/PARTICIPANT, expected types: [CALLBACK, FINALIZE] but was INIT
+737  [Thread-17] WARN  org.apache.helix.manager.zk.CallbackHandler  - Skip processing callbacks for listener: org.apache.helix.controller.GenericHelixController@629ae1fd, path: /HelixDemoCluster/INSTANCES/HelixDemoParticipant_2/CURRENTSTATES/15ca80eae2e02bd, expected types: [CALLBACK, FINALIZE] but was INIT
+737  [Thread-17] WARN  org.apache.helix.manager.zk.CallbackHandler  - Skip processing callbacks for listener: org.apache.helix.controller.GenericHelixController@629ae1fd, path: /HelixDemoCluster/INSTANCES/HelixDemoParticipant_1/CURRENTSTATES/15ca80eae2e02be, expected types: [CALLBACK, FINALIZE] but was INIT
+737  [Thread-17] WARN  org.apache.helix.manager.zk.CallbackHandler  - Skip processing callbacks for listener: org.apache.helix.controller.GenericHelixController@629ae1fd, path: /HelixDemoCluster/INSTANCES/HelixDemoParticipant_0/CURRENTSTATES/15ca80eae2e02bf, expected types: [CALLBACK, FINALIZE] but was INIT
+737  [Thread-17] WARN  org.apache.helix.manager.zk.CallbackHandler  - Skip processing callbacks for listener: org.apache.helix.controller.GenericHelixController@629ae1fd, path: /HelixDemoCluster/INSTANCES/HelixDemoParticipant_0/MESSAGES, expected types: [CALLBACK, FINALIZE] but was INIT
+737  [Thread-17] WARN  org.apache.helix.manager.zk.CallbackHandler  - Skip processing callbacks for listener: org.apache.helix.controller.GenericHelixController@629ae1fd, path: /HelixDemoCluster/INSTANCES/HelixDemoParticipant_1/MESSAGES, expected types: [CALLBACK, FINALIZE] but was INIT
+737  [Thread-17] WARN  org.apache.helix.manager.zk.CallbackHandler  - Skip processing callbacks for listener: org.apache.helix.controller.GenericHelixController@629ae1fd, path: /HelixDemoCluster/INSTANCES/HelixDemoParticipant_2/MESSAGES, expected types: [CALLBACK, FINALIZE] but was INIT
+737  [Thread-17] WARN  org.apache.helix.manager.zk.CallbackHandler  - Skip processing callbacks for listener: org.apache.helix.controller.GenericHelixController@629ae1fd, path: /HelixDemoCluster/LIVEINSTANCES, expected types: [CALLBACK, FINALIZE] but was INIT
+737  [Thread-17] WARN  org.apache.helix.manager.zk.CallbackHandler  - Skip processing callbacks for listener: org.apache.helix.controller.GenericHelixController@629ae1fd, path: /HelixDemoCluster/IDEALSTATES, expected types: [CALLBACK, FINALIZE] but was INIT
+737  [Thread-17] WARN  org.apache.helix.manager.zk.CallbackHandler  - Skip processing callbacks for listener: org.apache.helix.controller.GenericHelixController@629ae1fd, path: /HelixDemoCluster/CONTROLLER, expected types: [CALLBACK, FINALIZE] but was INIT
+Successfully started the controller node: HelixDemoController, on cluster: HelixDemoCluster
+Successfully started the helix cluster manager (admin), on cluster: HelixDemoCluster
+Submitting Workflow for DagType: TYPE_A
+839  [ZkClient-EventThread-33-localhost:2199] WARN  org.apache.helix.ConfigAccessor  - No config found at /HelixDemoCluster/CONFIGS/RESOURCE/HelixTask_A
+844  [ZkClient-EventThread-33-localhost:2199] WARN  org.apache.helix.ConfigAccessor  - No config found at /HelixDemoCluster/CONFIGS/RESOURCE/HelixTask_C
+846  [ZkClient-EventThread-33-localhost:2199] WARN  org.apache.helix.ConfigAccessor  - No config found at /HelixDemoCluster/CONFIGS/RESOURCE/HelixTask_D
+849  [ZkClient-EventThread-33-localhost:2199] WARN  org.apache.helix.ConfigAccessor  - No config found at /HelixDemoCluster/CONFIGS/RESOURCE/HelixTask_B
+OnlineOfflineStateModelFactory.onBecomeOnlineFromOffline():HelixDemoParticipant_0 transitioning from OFFLINE to ONLINE for HelixTask_C HelixTask_C_0
+OnlineOfflineStateModelFactory.onBecomeOnlineFromOffline():HelixDemoParticipant_0 transitioning from OFFLINE to ONLINE for HelixTask_A HelixTask_A_0
+OnlineOfflineStateModelFactory.onBecomeOnlineFromOffline():HelixDemoParticipant_0 transitioning from OFFLINE to ONLINE for HelixTask_D HelixTask_D_0
+OnlineOfflineStateModelFactory.onBecomeOnlineFromOffline():HelixDemoParticipant_0 transitioning from OFFLINE to ONLINE for HelixTask_B HelixTask_B_0
+Started workflow for DagType: TYPE_A, in cluster: HelixDemoCluster
+BLAH WKFLW: WorkflowContext, {START_TIME=1498159501322}{}{}
+HelixTaskA | callbackContext: org.apache.helix.task.TaskCallbackContext@2e4d5c35
+HelixTaskA | Inside run(), sleeping for 2 secs
+HelixTaskA | Inside addToUserStore()
+HelixTaskA | Returning status : COMPLETED.
+BLAH WKFLW: WorkflowContext, {START_TIME=1498159501322, STATE=IN_PROGRESS}{JOB_STATES={helix_workflow_helix_job_a=IN_PROGRESS}}{}
+BLAH WKFLW: WorkflowContext, {START_TIME=1498159501322, STATE=IN_PROGRESS}{JOB_STATES={helix_workflow_helix_job_a=COMPLETED}}{}
+HelixTaskB | callbackContext: org.apache.helix.task.TaskCallbackContext@351b6cb3
+HelixTaskB | Returning status FAILED, Helix will retry this task. Retry count: 1
+BLAH WKFLW: WorkflowContext, {START_TIME=1498159501322, STATE=IN_PROGRESS}{JOB_STATES={helix_workflow_helix_job_a=COMPLETED, helix_workflow_helix_job_b=IN_PROGRESS}}{}
+HelixTaskB | callbackContext: org.apache.helix.task.TaskCallbackContext@40decb7c
+HelixTaskB | Returning status FAILED, Helix will retry this task. Retry count: 2
+BLAH WKFLW: WorkflowContext, {START_TIME=1498159501322, STATE=IN_PROGRESS}{JOB_STATES={helix_workflow_helix_job_a=COMPLETED, helix_workflow_helix_job_b=IN_PROGRESS}}{}
+HelixTaskB | callbackContext: org.apache.helix.task.TaskCallbackContext@38a77770
+HelixTaskB | After 2 retries, Inside run(), sleeping for 2 secs
+HelixTaskB | Inside getFromUserStore()
+HelixTaskB | Retrieved from UserStore : Gourav Shenoy
+HelixTaskB | Returning status : COMPLETED.
+BLAH WKFLW: WorkflowContext, {START_TIME=1498159501322, STATE=IN_PROGRESS}{JOB_STATES={helix_workflow_helix_job_a=COMPLETED, helix_workflow_helix_job_b=IN_PROGRESS}}{}
+BLAH WKFLW: WorkflowContext, {START_TIME=1498159501322, STATE=IN_PROGRESS}{JOB_STATES={helix_workflow_helix_job_a=COMPLETED, helix_workflow_helix_job_b=COMPLETED}}{}
+HelixTaskC | callbackContext: org.apache.helix.task.TaskCallbackContext@36cf68e3
+HelixTaskC | Inside run(), sleeping for 2 secs
+HelixTaskC | Inside getFromUserStore()
+HelixTaskC | Retrieved from UserStore : Gourav Shenoy
+HelixTaskC | Returning status : COMPLETED.
+BLAH WKFLW: WorkflowContext, {START_TIME=1498159501322, STATE=IN_PROGRESS}{JOB_STATES={helix_workflow_helix_job_a=COMPLETED, helix_workflow_helix_job_b=COMPLETED, helix_workflow_helix_job_c=IN_PROGRESS}}{}
+BLAH WKFLW: WorkflowContext, {START_TIME=1498159501322, STATE=IN_PROGRESS}{JOB_STATES={helix_workflow_helix_job_a=COMPLETED, helix_workflow_helix_job_b=COMPLETED, helix_workflow_helix_job_c=COMPLETED, helix_workflow_helix_job_d=IN_PROGRESS}}{}
+HelixTaskD | callbackContext: org.apache.helix.task.TaskCallbackContext@79cba411
+HelixTaskD | Inside run(), sleeping for 2 secs
+HelixTaskD | Inside getFromUserStore()
+HelixTaskD | Retrieved from UserStore : Gourav Shenoy
+HelixTaskD | Returning status : COMPLETED.
+1890 [Thread-19] WARN  org.apache.helix.task.TaskRebalancer  - Idealstate for resource helix_workflow_helix_job_a does not exist.
+BLAH WKFLW: WorkflowContext, {FINISH_TIME=1498159502302, START_TIME=1498159501322, STATE=COMPLETED}{JOB_STATES={helix_workflow_helix_job_a=COMPLETED, helix_workflow_helix_job_b=COMPLETED, helix_workflow_helix_job_c=COMPLETED, helix_workflow_helix_job_d=COMPLETED}}{}
+Successfully completed workflow for Dag: TYPE_A
+*** Exiting System ***
+1900 [Thread-19] WARN  org.apache.helix.task.TaskRebalancer  - Idealstate for resource helix_workflow_helix_job_b does not exist.
+1910 [Thread-19] WARN  org.apache.helix.task.TaskRebalancer  - Idealstate for resource helix_workflow_helix_job_c does not exist.
+1931 [Thread-19] ERROR org.apache.helix.task.JobRebalancer  - Job configuration is NULL for helix_workflow_helix_job_d
+1939 [Thread-19] WARN  org.apache.helix.model.IdealState  - Resource key:helix_workflow_helix_job_d_0 does not have a pre-computed preference list.
+HelixManager Admin disconnecting from cluster: HelixDemoCluster
+6211 [Thread-6] WARN  org.apache.helix.participant.statemachine.StateModel  - Default reset method invoked. Either because the process longer own this resource or session timedout
+6212 [Thread-6] WARN  org.apache.helix.participant.statemachine.StateModel  - Default reset method invoked. Either because the process longer own this resource or session timedout
+6212 [Thread-6] WARN  org.apache.helix.participant.statemachine.StateModel  - Default reset method invoked. Either because the process longer own this resource or session timedout
+6212 [Thread-6] WARN  org.apache.helix.participant.statemachine.StateModel  - Default reset method invoked. Either because the process longer own this resource or session timedout
+6214 [Thread-6] WARN  org.apache.helix.participant.statemachine.StateModel  - Default reset method invoked. Either because the process longer own this resource or session timedout
+6214 [Thread-6] WARN  org.apache.helix.participant.statemachine.StateModel  - Default reset method invoked. Either because the process longer own this resource or session timedout
+6214 [Thread-6] WARN  org.apache.helix.participant.statemachine.StateModel  - Default reset method invoked. Either because the process longer own this resource or session timedout
+6214 [Thread-6] WARN  org.apache.helix.participant.statemachine.StateModel  - Default reset method invoked. Either because the process longer own this resource or session timedout
+```
diff --git a/helix-playground/images/helix-task-framework.png b/helix-playground/images/helix-task-framework.png
new file mode 100644 (file)
index 0000000..4ee24a8
Binary files /dev/null and b/helix-playground/images/helix-task-framework.png differ
diff --git a/helix-playground/pom.xml b/helix-playground/pom.xml
new file mode 100644 (file)
index 0000000..cae2614
--- /dev/null
@@ -0,0 +1,42 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <!--<parent>-->
+        <!--<artifactId>helix</artifactId>-->
+        <!--<groupId>org.apache.helix</groupId>-->
+        <!--<version>0.6.7</version>-->
+    <!--</parent>-->
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>org.apache.airavata</groupId>
+    <artifactId>helix-playground</artifactId>
+    <version>1.0-SNAPSHOT</version>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.helix</groupId>
+            <artifactId>helix-core</artifactId>
+            <version>0.6.7</version>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>1.7.5</version>
+            <scope>compile</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+            <version>1.7.5</version>
+        </dependency>
+        <dependency>
+            <groupId>org.testng</groupId>
+            <artifactId>testng</artifactId>
+            <version>6.0.1</version>
+        </dependency>
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/helix-playground/src/main/java/edu/iu/helix/airavata/ControllerNode.java b/helix-playground/src/main/java/edu/iu/helix/airavata/ControllerNode.java
new file mode 100644 (file)
index 0000000..ccb3fb2
--- /dev/null
@@ -0,0 +1,67 @@
+package edu.iu.helix.airavata;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * Created by goshenoy on 6/21/17.
+ */
+public class ControllerNode implements Runnable {
+
+    private static final Logger logger = LogManager.getLogger(ControllerNode.class);
+
+    private String clusterName;
+    private String controllerName;
+    private String zkAddress;
+    private HelixManager zkHelixManager;
+
+    private CountDownLatch startLatch = new CountDownLatch(1);
+    private CountDownLatch stopLatch = new CountDownLatch(1);
+
+    public ControllerNode(String zkAddress, String clusterName, String controllerName) {
+        this.clusterName = clusterName;
+        this.controllerName = controllerName;
+        this.zkAddress = zkAddress;
+    }
+
+    @Override
+    public void run() {
+        try {
+            zkHelixManager = HelixControllerMain.startHelixController(zkAddress, clusterName,
+                    controllerName, HelixControllerMain.STANDALONE);
+            startLatch.countDown();
+            stopLatch.await();
+        } catch (Exception ex) {
+            logger.error("Error in run() for Controller: " + controllerName + ", reason: " + ex, ex);
+        } finally {
+            disconnect();
+        }
+
+    }
+
+    public void start() {
+        new Thread(this).start();
+        try {
+            startLatch.await();
+            logger.info("Controller: " + controllerName + ", has connected to cluster: " + clusterName);
+        } catch (InterruptedException ex) {
+            logger.error("Controller: " + controllerName + ", is interrupted! reason: " + ex, ex);
+        }
+
+    }
+
+    public void stop() {
+        stopLatch.countDown();
+    }
+
+    private void disconnect() {
+        if (zkHelixManager != null) {
+            logger.info("Controller: " + controllerName + ", has disconnected from cluster: " + clusterName);
+            zkHelixManager.disconnect();
+        }
+    }
+}
diff --git a/helix-playground/src/main/java/edu/iu/helix/airavata/HelixCluster.java b/helix-playground/src/main/java/edu/iu/helix/airavata/HelixCluster.java
new file mode 100644 (file)
index 0000000..d59737d
--- /dev/null
@@ -0,0 +1,66 @@
+package edu.iu.helix.airavata;
+
+import edu.iu.helix.airavata.tasks.HelixTaskA;
+import edu.iu.helix.airavata.tasks.HelixTaskB;
+import edu.iu.helix.airavata.tasks.HelixTaskC;
+import edu.iu.helix.airavata.tasks.HelixTaskD;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.OnlineOfflineSMD;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+/**
+ * Created by goshenoy on 6/21/17.
+ */
+public class HelixCluster {
+
+    private static final Logger logger = LogManager.getLogger(HelixCluster.class);
+
+    private String zkAddress;
+    private String clusterName;
+    private int numPartitions;
+
+    private ZkClient zkClient;
+    private ZKHelixAdmin zkHelixAdmin;
+
+    public HelixCluster(String zkAddress, String clusterName, int numPartitions) {
+        this.zkAddress = zkAddress;
+        this.clusterName = clusterName;
+        this.numPartitions = numPartitions;
+
+        zkClient = new ZkClient(this.zkAddress, ZkClient.DEFAULT_SESSION_TIMEOUT,
+                ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
+        zkHelixAdmin = new ZKHelixAdmin(zkClient);
+    }
+
+    public void setup() {
+        zkHelixAdmin.addCluster(clusterName, true);
+        zkHelixAdmin.addStateModelDef(clusterName, OnlineOfflineSMD.name, OnlineOfflineSMD.build());
+        logger.info("Cluster: " +  clusterName + ", has been added.");
+    }
+
+    public void addResourcesToCluster() {
+        String stateModel = BuiltInStateModelDefinitions.OnlineOffline.name();
+        zkHelixAdmin.addResource(clusterName, HelixTaskA.TASK_COMMAND, numPartitions, stateModel);
+        zkHelixAdmin.addResource(clusterName, HelixTaskB.TASK_COMMAND, numPartitions, stateModel);
+        zkHelixAdmin.addResource(clusterName, HelixTaskC.TASK_COMMAND, numPartitions, stateModel);
+        zkHelixAdmin.addResource(clusterName, HelixTaskD.TASK_COMMAND, numPartitions, stateModel);
+        logger.debug("Resources (A,B,C,D) with [ " + numPartitions + " ] partitions have been added to Cluster: " + clusterName);
+
+        zkHelixAdmin.rebalance(clusterName, HelixTaskA.TASK_COMMAND, 1);
+        zkHelixAdmin.rebalance(clusterName, HelixTaskB.TASK_COMMAND, 1);
+        zkHelixAdmin.rebalance(clusterName, HelixTaskC.TASK_COMMAND, 1);
+        zkHelixAdmin.rebalance(clusterName, HelixTaskD.TASK_COMMAND, 1);
+        logger.debug("Resources (A,B,C,D) have been rebalanced");
+    }
+
+    public void disconnect() {
+        if (zkClient != null) {
+            zkClient.close();
+        }
+    }
+
+}
diff --git a/helix-playground/src/main/java/edu/iu/helix/airavata/HelixClusterManager.java b/helix-playground/src/main/java/edu/iu/helix/airavata/HelixClusterManager.java
new file mode 100644 (file)
index 0000000..313a674
--- /dev/null
@@ -0,0 +1,158 @@
+package edu.iu.helix.airavata;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.Workflow;
+import org.apache.helix.task.WorkflowContext;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import javax.print.attribute.standard.JobState;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+/**
+ * Created by goshenoy on 6/21/17.
+ */
+public class HelixClusterManager {
+
+    private static final Logger logger = LogManager.getLogger(HelixClusterManager.class);
+
+    private int numWorkers;
+    private int numPartitions;
+    private String clusterName;
+    private String zkAddress;
+    private HelixManager helixManager;
+    private TaskDriver taskDriver;
+
+    private ControllerNode controllerNode;
+    private  HelixCluster helixCluster;
+    private static final String CONTROLLER_NAME = "HelixDemoController";
+    private static final String PARTICIPANT_NAME = "HelixDemoParticipant_";
+
+    public HelixClusterManager(String clusterName, String zkAddress, int numWorkers, int numPartitions) {
+        this.numWorkers = numWorkers;
+        this.numPartitions = numPartitions;
+        this.clusterName = clusterName;
+        this.zkAddress = zkAddress;
+    }
+
+    public void startHelixCluster() {
+        // create the cluster
+        helixCluster = new HelixCluster(zkAddress, clusterName, numPartitions);
+        helixCluster.setup();
+        System.out.println("Successfully created helix cluster: " + clusterName + ", with [ " + numPartitions + " ] partitions.");
+
+        // start the participants
+        Executor executor = Executors.newFixedThreadPool(numWorkers);
+        for (int i = 0; i < numWorkers; i++) {
+            String participantName = PARTICIPANT_NAME + i;
+            ParticipantNode participant = new ParticipantNode(zkAddress, clusterName, participantName);
+            executor.execute(participant);
+            System.out.println("Successfully started participant node: " + participantName + ", on cluster: " + clusterName);
+        }
+
+        // add resources to cluster
+        helixCluster.addResourcesToCluster();
+        System.out.println("Successfully added resources to cluster: " + clusterName);
+
+        // start the controller
+        controllerNode = new ControllerNode(zkAddress, clusterName, CONTROLLER_NAME);
+        controllerNode.start();
+        System.out.println("Successfully started the controller node: " + CONTROLLER_NAME + ", on cluster: " + clusterName);
+
+        // start the cluster manager
+        connect();
+        System.out.println("Successfully started the helix cluster manager (admin), on cluster: " + clusterName);
+    }
+
+    private void connect() {
+
+        this.helixManager = HelixManagerFactory.getZKHelixManager(clusterName, "Admin",
+                InstanceType.SPECTATOR, zkAddress);
+        try {
+            this.helixManager.connect();
+        } catch (Exception ex) {
+            logger.error("Error in connect() for Admin, reason: " + ex, ex);
+        }
+
+        this.taskDriver = new TaskDriver(helixManager);
+        logger.debug("HelixManager Admin connected.");
+
+        Runtime.getRuntime().addShutdownHook(
+                new Thread() {
+                    @Override
+                    public void run() {
+                        disconnect();
+                        controllerNode.stop();
+                        helixCluster.disconnect();
+                    }
+                }
+        );
+    }
+
+    private void disconnect() {
+        if (helixManager != null) {
+            System.out.println("HelixManager Admin disconnecting from cluster: " + clusterName);
+            helixManager.disconnect();
+        }
+    }
+
+    public boolean submitDag(HelixUtil.DAGType dagType) {
+        Workflow workflow = HelixUtil.getWorkflow(dagType);
+        taskDriver.start(workflow);
+        System.out.println("Started workflow for DagType: " + dagType + ", in cluster: " + clusterName);
+        try {
+            taskDriver.pollForWorkflowState(workflow.getName(), TaskState.COMPLETED, TaskState.FAILED);
+//            while (true) {
+//                Thread.sleep(100);
+//                WorkflowContext context = taskDriver.getWorkflowContext(workflow.getName());
+//                System.out.println(context);
+//                if (context != null && context.getWorkflowState() != null) {
+//                    if (context.getWorkflowState().equals(TaskState.COMPLETED)) {
+//                        System.out.println("Workflow completed!");
+//                        break;
+//                    } else if (context.getWorkflowState().equals(TaskState.FAILED)) {
+//                        System.err.println("Workflow failed!");
+//                        break;
+//                    }
+//                }
+//            }
+            return true;
+        } catch (Exception ex) {
+            logger.error("Error submitting Dag for type: " + dagType + ", reason: " + ex, ex);
+            return false;
+        }
+    }
+
+    public static void main(String[] args) {
+        String clusterName = "HelixDemoCluster";
+        String zkAddress = "localhost:2199";
+        int numWorkers = 3;
+        int numPartitions = 1;
+
+        try {
+            System.out.println("Starting helix manager for cluster [ " + clusterName + " ], " +
+                    "on ZK server [ " + zkAddress + " ], " +
+                    "with [ " + numWorkers + " ] workers, " +
+                    "having [ " + numPartitions + "] partitions.");
+            HelixClusterManager manager = new HelixClusterManager(clusterName, zkAddress, numWorkers, numPartitions);
+            manager.startHelixCluster();
+
+            System.out.println("Submitting Workflow for DagType: " + HelixUtil.DAGType.TYPE_A);
+            if (manager.submitDag(HelixUtil.DAGType.TYPE_A)) {
+                System.out.println("Successfully completed workflow for Dag: " + HelixUtil.DAGType.TYPE_A);
+            } else {
+                throw new Exception("Failed to run workflow for Dag: " + HelixUtil.DAGType.TYPE_A);
+            }
+        } catch (Exception ex) {
+            logger.error("Something went wrong while running helix cluster manager. Reason: " + ex, ex);
+        }
+
+        System.out.println("*** Exiting System ***");
+//        System.exit(0);
+    }
+}
diff --git a/helix-playground/src/main/java/edu/iu/helix/airavata/HelixUtil.java b/helix-playground/src/main/java/edu/iu/helix/airavata/HelixUtil.java
new file mode 100644 (file)
index 0000000..2cea68f
--- /dev/null
@@ -0,0 +1,82 @@
+package edu.iu.helix.airavata;
+
+import edu.iu.helix.airavata.tasks.HelixTaskA;
+import edu.iu.helix.airavata.tasks.HelixTaskB;
+import edu.iu.helix.airavata.tasks.HelixTaskC;
+import edu.iu.helix.airavata.tasks.HelixTaskD;
+import org.apache.helix.task.*;
+import org.jboss.netty.util.internal.ThreadLocalRandom;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Created by goshenoy on 6/21/17.
+ */
+public class HelixUtil {
+
+    public static final String TASK_STATE_DEF = "Task";
+
+    public enum DAGType {
+        TYPE_A,
+        TYPE_B,
+        TYPE_C
+    }
+
+    private static Workflow.Builder getWorkflowBuilder(DAGType dagType) {
+        // create task configs
+        List<TaskConfig> taskConfig1 = new ArrayList<TaskConfig>();
+        List<TaskConfig> taskConfig2 = new ArrayList<TaskConfig>();
+        List<TaskConfig> taskConfig3 = new ArrayList<TaskConfig>();
+        List<TaskConfig> taskConfig4 = new ArrayList<TaskConfig>();
+        taskConfig1.add(new TaskConfig.Builder().setTaskId("helix_task_a").setCommand(HelixTaskA.TASK_COMMAND).build());
+        taskConfig2.add(new TaskConfig.Builder().setTaskId("helix_task_b").setCommand(HelixTaskB.TASK_COMMAND).build());
+        taskConfig3.add(new TaskConfig.Builder().setTaskId("helix_task_c").setCommand(HelixTaskC.TASK_COMMAND).build());
+        taskConfig4.add(new TaskConfig.Builder().setTaskId("helix_task_d").setCommand(HelixTaskD.TASK_COMMAND).build());
+
+        // create job configs
+        JobConfig.Builder jobConfig1 = new JobConfig.Builder().addTaskConfigs(taskConfig1).setMaxAttemptsPerTask(3);
+        JobConfig.Builder jobConfig2 = new JobConfig.Builder().addTaskConfigs(taskConfig2).setMaxAttemptsPerTask(3);
+        JobConfig.Builder jobConfig3 = new JobConfig.Builder().addTaskConfigs(taskConfig3).setMaxAttemptsPerTask(3);
+        JobConfig.Builder jobConfig4 = new JobConfig.Builder().addTaskConfigs(taskConfig4).setMaxAttemptsPerTask(3);
+
+        // create workflow
+        Workflow.Builder workflowBuilder = new Workflow.Builder("helix_workflow").setExpiry(0);
+        workflowBuilder.addJob("helix_job_a", jobConfig1);
+        workflowBuilder.addJob("helix_job_b", jobConfig2);
+        workflowBuilder.addJob("helix_job_c", jobConfig3);
+        workflowBuilder.addJob("helix_job_d", jobConfig4);
+
+
+        switch (dagType) {
+            case TYPE_A:
+                workflowBuilder.addParentChildDependency("helix_job_a", "helix_job_b");
+                workflowBuilder.addParentChildDependency("helix_job_b", "helix_job_c");
+                workflowBuilder.addParentChildDependency("helix_job_c", "helix_job_d");
+                break;
+
+            case TYPE_B:
+                workflowBuilder.addParentChildDependency("helix_job_a", "helix_job_c");
+                workflowBuilder.addParentChildDependency("helix_job_c", "helix_job_d");
+                workflowBuilder.addParentChildDependency("helix_job_d", "helix_job_b");
+                break;
+
+            case TYPE_C:
+                workflowBuilder.addParentChildDependency("helix_job_a", "helix_job_d");
+                workflowBuilder.addParentChildDependency("helix_job_d", "helix_job_b");
+                workflowBuilder.addParentChildDependency("helix_job_b", "helix_job_c");
+                break;
+        }
+
+        return workflowBuilder;
+    }
+
+    public static Workflow getWorkflow(DAGType dagType) {
+        Workflow.Builder workflowBuilder = getWorkflowBuilder(dagType);
+        return workflowBuilder.build();
+    }
+
+    private static String generateWorkflowName() {
+        return "workflow_" + ThreadLocalRandom.current().nextInt(9999);
+    }
+}
diff --git a/helix-playground/src/main/java/edu/iu/helix/airavata/ParticipantNode.java b/helix-playground/src/main/java/edu/iu/helix/airavata/ParticipantNode.java
new file mode 100644 (file)
index 0000000..54466f2
--- /dev/null
@@ -0,0 +1,145 @@
+package edu.iu.helix.airavata;
+
+import edu.iu.helix.airavata.tasks.HelixTaskA;
+import edu.iu.helix.airavata.tasks.HelixTaskB;
+import edu.iu.helix.airavata.tasks.HelixTaskC;
+import edu.iu.helix.airavata.tasks.HelixTaskD;
+import org.apache.helix.InstanceType;
+import org.apache.helix.examples.OnlineOfflineStateModelFactory;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskFactory;
+import org.apache.helix.task.TaskStateModelFactory;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created by goshenoy on 6/21/17.
+ */
+public class ParticipantNode implements Runnable {
+
+    private static final Logger logger = LogManager.getLogger(ParticipantNode.class);
+
+    private String zkAddress;
+    private String clusterName;
+    private String participantName;
+    private ZKHelixManager zkHelixManager;
+
+    public ParticipantNode(String zkAddress, String clusterName, String participantName) {
+        logger.debug("Initializing Participant Node");
+        this.zkAddress = zkAddress;
+        this.clusterName = clusterName;
+        this.participantName = participantName;
+    }
+
+    @Override
+    public void run() {
+        ZkClient zkClient = null;
+        try {
+            zkClient = new ZkClient(zkAddress, ZkClient.DEFAULT_SESSION_TIMEOUT,
+                    ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
+            ZKHelixAdmin zkHelixAdmin = new ZKHelixAdmin(zkClient);
+
+            List<String> nodesInCluster = zkHelixAdmin.getInstancesInCluster(clusterName);
+            if (!nodesInCluster.contains(participantName)) {
+                InstanceConfig instanceConfig = new InstanceConfig(participantName);
+                instanceConfig.setHostName("localhost");
+                instanceConfig.setInstanceEnabled(true);
+                zkHelixAdmin.addInstance(clusterName, instanceConfig);
+                logger.debug("Instance: " + participantName + ", has been added to cluster: " + clusterName);
+            }
+
+            Runtime.getRuntime().addShutdownHook(
+                    new Thread() {
+                        @Override
+                        public void run() {
+                            logger.debug("Participant: " + participantName + ", shutdown hook called.");
+                            disconnect();
+                        }
+                    }
+            );
+
+            // connect the participant manager
+            connect();
+        } catch (Exception ex) {
+            logger.error("Error in run() for Participant: " + participantName + ", reason: " + ex, ex);
+        } finally {
+            if (zkClient != null) {
+                zkClient.close();
+            }
+        }
+
+    }
+
+    private void connect() {
+        try {
+            zkHelixManager = new ZKHelixManager(clusterName, participantName, InstanceType.PARTICIPANT, zkAddress);
+
+            // register online-offline model
+            StateMachineEngine machineEngine = zkHelixManager.getStateMachineEngine();
+            OnlineOfflineStateModelFactory factory = new OnlineOfflineStateModelFactory(participantName);
+            machineEngine.registerStateModelFactory(BuiltInStateModelDefinitions.OnlineOffline.name(), factory);
+
+            // register task model
+            Map<String, TaskFactory> taskRegistry = new HashMap<String, TaskFactory>();
+            taskRegistry.put(HelixTaskA.TASK_COMMAND, new TaskFactory() {
+                @Override
+                public Task createNewTask(TaskCallbackContext context) {
+                    return new HelixTaskA(context);
+                }
+            });
+            taskRegistry.put(HelixTaskB.TASK_COMMAND, new TaskFactory() {
+                @Override
+                public Task createNewTask(TaskCallbackContext context) {
+                    return new HelixTaskB(context);
+                }
+            });
+            taskRegistry.put(HelixTaskC.TASK_COMMAND, new TaskFactory() {
+                @Override
+                public Task createNewTask(TaskCallbackContext context) {
+                    return new HelixTaskC(context);
+                }
+            });
+            taskRegistry.put(HelixTaskD.TASK_COMMAND, new TaskFactory() {
+                @Override
+                public Task createNewTask(TaskCallbackContext context) {
+                    return new HelixTaskD(context);
+                }
+            });
+
+            machineEngine.registerStateModelFactory(HelixUtil.TASK_STATE_DEF,
+                    new TaskStateModelFactory(zkHelixManager, taskRegistry));
+            logger.debug("Participant: " + participantName + ", registered state model factories.");
+
+            zkHelixManager.connect();
+            logger.info("Participant: " + participantName + ", has connected to cluster: " + clusterName);
+
+            Thread.currentThread().join();
+        } catch (InterruptedException ex) {
+            logger.error("Participant: " + participantName + ", is interrupted! reason: " + ex, ex);
+        }
+        catch (Exception ex) {
+            logger.error("Error in connect() for Participant: " + participantName + ", reason: " + ex, ex);
+        } finally {
+            disconnect();
+        }
+    }
+
+    private void disconnect() {
+        if (zkHelixManager != null) {
+            logger.info("Participant: " + participantName + ", has disconnected from cluster: " + clusterName);
+            zkHelixManager.disconnect();
+        }
+    }
+}
diff --git a/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/HelixTaskA.java b/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/HelixTaskA.java
new file mode 100644 (file)
index 0000000..cdae18e
--- /dev/null
@@ -0,0 +1,49 @@
+package edu.iu.helix.airavata.tasks;
+
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskResult;
+import org.apache.helix.task.UserContentStore;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+/**
+ * Created by goshenoy on 6/21/17.
+ */
+public class HelixTaskA  extends UserContentStore implements Task {
+
+    private static Logger logger = LogManager.getLogger(HelixTaskA.class);
+    public static final String TASK_COMMAND = "HelixTask_A";
+    private static int count = 0;
+
+    public HelixTaskA(TaskCallbackContext callbackContext) {
+        System.out.println("HelixTaskA | callbackContext: " + callbackContext);
+    }
+
+    @Override
+    public TaskResult run() {
+        System.out.println("HelixTaskA | Inside run(), sleeping for 2 secs");
+        addToUserStore();
+//        sleep(2000);
+        System.out.println("HelixTaskA | Returning status : COMPLETED.");
+        return new TaskResult(TaskResult.Status.COMPLETED, "HelixTask completed!");
+    }
+
+    @Override
+    public void cancel() {
+        System.out.println("HelixTaskA | Inside cancel()");
+    }
+
+    private void addToUserStore() {
+        System.out.println("HelixTaskA | Inside addToUserStore()");
+        putUserContent("fullName", "Gourav Shenoy", Scope.WORKFLOW);
+    }
+
+    private static void sleep(long d) {
+        try {
+            Thread.sleep(d);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+    }
+}
diff --git a/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/HelixTaskB.java b/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/HelixTaskB.java
new file mode 100644 (file)
index 0000000..9af9c99
--- /dev/null
@@ -0,0 +1,59 @@
+package edu.iu.helix.airavata.tasks;
+
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskResult;
+import org.apache.helix.task.UserContentStore;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+/**
+ * Created by goshenoy on 6/21/17.
+ */
+public class HelixTaskB  extends UserContentStore implements Task {
+
+    private static Logger logger = LogManager.getLogger(HelixTaskB.class);
+    public static final String TASK_COMMAND = "HelixTask_B";
+
+    private static int retryCount = 0;
+    public HelixTaskB(TaskCallbackContext callbackContext) {
+        System.out.println("HelixTaskB | callbackContext: " + callbackContext);
+    }
+
+    @Override
+    public TaskResult run() {
+        if (retryCount < 2) {
+            retryCount++;
+            System.out.println("HelixTaskB | Returning status FAILED, Helix will retry this task. Retry count: " + retryCount);
+            return new TaskResult(TaskResult.Status.FAILED, "HelixTaskB should be retried!");
+        }
+
+        System.out.println("HelixTaskB | After 2 retries, Inside run(), sleeping for 2 secs");
+//        sleep(2000);
+        System.out.println("HelixTaskB | Retrieved from UserStore : " + getFromUserStore("fullName"));
+
+//        System.out.println("HelixTaskB | Returning status : FATAL_FAILED.");
+//        return new TaskResult(TaskResult.Status.FATAL_FAILED, "edu.iu.helix.play.HelixTaskB completed!");
+
+        System.out.println("HelixTaskB | Returning status : COMPLETED.");
+        return new TaskResult(TaskResult.Status.COMPLETED, "HelixTaskB completed!");
+    }
+
+    @Override
+    public void cancel() {
+        System.out.println("HelixTaskB | Inside cancel()");
+    }
+
+    private String getFromUserStore(String key) {
+        System.out.println("HelixTaskB | Inside getFromUserStore()");
+        return getUserContent(key, Scope.WORKFLOW);
+    }
+
+    private static void sleep(long d) {
+        try {
+            Thread.sleep(d);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+    }
+}
diff --git a/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/HelixTaskC.java b/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/HelixTaskC.java
new file mode 100644 (file)
index 0000000..2132001
--- /dev/null
@@ -0,0 +1,48 @@
+package edu.iu.helix.airavata.tasks;
+
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskResult;
+import org.apache.helix.task.UserContentStore;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+/**
+ * Created by goshenoy on 6/21/17.
+ */
+public class HelixTaskC  extends UserContentStore implements Task {
+
+    private static Logger logger = LogManager.getLogger(HelixTaskC.class);
+    public static final String TASK_COMMAND = "HelixTask_C";
+
+    public HelixTaskC(TaskCallbackContext callbackContext) {
+        System.out.println("HelixTaskC | callbackContext: " + callbackContext);
+    }
+
+    @Override
+    public TaskResult run() {
+        System.out.println("HelixTaskC | Inside run(), sleeping for 2 secs");
+//        sleep(2000);
+        System.out.println("HelixTaskC | Retrieved from UserStore : " + getFromUserStore("fullName"));
+        System.out.println("HelixTaskC | Returning status : COMPLETED.");
+        return new TaskResult(TaskResult.Status.COMPLETED, "HelixTaskC completed!");
+    }
+
+    @Override
+    public void cancel() {
+        System.out.println("HelixTaskC | Inside cancel()");
+    }
+
+    private String getFromUserStore(String key) {
+        System.out.println("HelixTaskC | Inside getFromUserStore()");
+        return getUserContent(key, Scope.WORKFLOW);
+    }
+
+    private static void sleep(long d) {
+        try {
+            Thread.sleep(d);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+    }
+}
diff --git a/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/HelixTaskD.java b/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/HelixTaskD.java
new file mode 100644 (file)
index 0000000..52488e8
--- /dev/null
@@ -0,0 +1,48 @@
+package edu.iu.helix.airavata.tasks;
+
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskResult;
+import org.apache.helix.task.UserContentStore;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+/**
+ * Created by goshenoy on 6/21/17.
+ */
+public class HelixTaskD  extends UserContentStore implements Task {
+
+    private static Logger logger = LogManager.getLogger(HelixTaskD.class);
+    public static final String TASK_COMMAND = "HelixTask_D";
+
+    public HelixTaskD(TaskCallbackContext callbackContext) {
+        System.out.println("HelixTaskD | callbackContext: " + callbackContext);
+    }
+
+    @Override
+    public TaskResult run() {
+        System.out.println("HelixTaskD | Inside run(), sleeping for 2 secs");
+//        sleep(2000);
+        System.out.println("HelixTaskD | Retrieved from UserStore : " + getFromUserStore("fullName"));
+        System.out.println("HelixTaskD | Returning status : COMPLETED.");
+        return new TaskResult(TaskResult.Status.COMPLETED, "HelixTaskD completed!");
+    }
+
+    @Override
+    public void cancel() {
+        System.out.println("HelixTaskD | Inside cancel()");
+    }
+
+    private String getFromUserStore(String key) {
+        System.out.println("HelixTaskD | Inside getFromUserStore()");
+        return getUserContent(key, Scope.WORKFLOW);
+    }
+
+    private static void sleep(long d) {
+        try {
+            Thread.sleep(d);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+    }
+}
diff --git a/helix-playground/src/main/java/edu/iu/helix/play/HelixDAGFramework.java b/helix-playground/src/main/java/edu/iu/helix/play/HelixDAGFramework.java
new file mode 100644 (file)
index 0000000..aee4e06
--- /dev/null
@@ -0,0 +1,221 @@
+package edu.iu.helix.play;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.examples.OnlineOfflineStateModelFactory;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.task.*;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created by goshenoy on 6/18/17.
+ */
+public class HelixDAGFramework {
+
+    private static Logger logger = LogManager.getLogger(HelixDAGFramework.class);
+
+    private static final String ZK_ADDRESS = "localhost:2199";
+    private static final String PARTICIPANT_ADDRESS = "localhost:12918";
+
+    private static final String CLUSTER_NAME = "HelixDagCluster";
+    private static final String RESOURCE_NAME = "HelixResource";
+    private static final String CONTROLLER_NAME = "HelixController";
+    private static final String ADMIN_NAME = "HelixAdmin";
+
+    private static final String WORKFLOW_NAME = "helix_workflow";
+    private static final String JOB_NAME = "helix_job";
+    private static final String TASK_A_NAME = "helix_task_id_a";
+    private static final String TASK_B_NAME = "helix_task_id_b";
+
+    private static final String ONLINE_OFFLINE = "OnlineOffline";
+
+    private static HelixManager adminManager;
+    private static ZKHelixManager participantManager;
+    private static ZKHelixManager controllerManager;
+
+    private static ClusterSetup _setupTool;
+
+
+    public static void main(String[] args) {
+        try {
+            // create cluster
+            createCluster();
+
+            // add instance, resource to cluster
+            addInstanceToCluster();
+            addResourceToCluster();
+
+            // create participant, controller & admin manager
+            createParticipantManager();
+            createControllerManager();
+            createAdminManager();
+
+            // verifying cluster state
+            verifyClusterState();
+
+            // create task-driver
+            logger.info("edu.iu.helix.play.HelixDAGFramework | Creating TaskDriver.");
+            TaskDriver taskDriver = new TaskDriver(adminManager);
+
+            // create task-config list
+            logger.info("edu.iu.helix.play.HelixDAGFramework | Creating TaskConfig list.");
+            List<TaskConfig> taskConfigList1 = new ArrayList<TaskConfig>();
+            List<TaskConfig> taskConfigList2 = new ArrayList<TaskConfig>();
+            taskConfigList1.add(
+                    new TaskConfig.Builder().setTaskId(TASK_A_NAME).setCommand(HelixTaskA.TASK_COMMAND).build()
+            );
+            taskConfigList2.add(
+                    new TaskConfig.Builder().setTaskId(TASK_B_NAME).setCommand(HelixTaskB.TASK_COMMAND).build()
+            );
+
+            // create job-config-builder
+            logger.info("edu.iu.helix.play.HelixDAGFramework | Creating JobConfig.Builder.");
+            JobConfig.Builder jobConfigBuilder1 = new JobConfig.Builder().addTaskConfigs(taskConfigList1);
+            JobConfig.Builder jobConfigBuilder2 = new JobConfig.Builder().addTaskConfigs(taskConfigList2);
+
+            // create workflow-builder & add job
+            logger.info("edu.iu.helix.play.HelixDAGFramework | Creating Workflow.Builder.");
+            Workflow.Builder workflowBuilder = new Workflow.Builder(WORKFLOW_NAME).setExpiry(0);
+
+            logger.info("edu.iu.helix.play.HelixDAGFramework | Adding Jobs (a,b) to Workflow.Builder.");
+            workflowBuilder.addJob(JOB_NAME + "_a", jobConfigBuilder1);
+            workflowBuilder.addJob(JOB_NAME + "_b", jobConfigBuilder2);
+
+            logger.info("edu.iu.helix.play.HelixDAGFramework | Setting Job A parent of Job B.");
+            workflowBuilder.addParentChildDependency(JOB_NAME + "_a", JOB_NAME + "_b");
+
+            // start the workflow
+            logger.info("edu.iu.helix.play.HelixDAGFramework | Starting the Workflow.");
+            taskDriver.start(workflowBuilder.build());
+
+            // waiting for job to complete
+            logger.info("edu.iu.helix.play.HelixDAGFramework | Waiting for Workflow to COMPLETE.");
+//            taskDriver.pollForJobState(WORKFLOW_NAME, WORKFLOW_NAME + "_" + JOB_NAME, TaskState.COMPLETED);
+            taskDriver.pollForWorkflowState(WORKFLOW_NAME, TaskState.COMPLETED);
+
+            // job completed, exit
+            logger.info("edu.iu.helix.play.HelixDAGFramework | Job Completed, Exiting.");
+        } catch (Exception ex) {
+            logger.error("Exception caught | ex: " + ex.getMessage(), ex);
+        } finally {
+            // disconnect all managers
+            logger.info("edu.iu.helix.play.HelixDAGFramework | Disconnecting All Managers (Participant, Controller, Admin).");
+            disconnectManagers();
+
+            logger.info("edu.iu.helix.play.HelixDAGFramework | Bye!");
+        }
+    }
+
+    private static void createCluster() {
+        logger.info("edu.iu.helix.play.HelixDAGFramework | Creating a cluster.");
+        _setupTool = new ClusterSetup(ZK_ADDRESS);
+        _setupTool.addCluster(CLUSTER_NAME, true);
+    }
+
+    private static void addInstanceToCluster() {
+        logger.info("edu.iu.helix.play.HelixDAGFramework | Adding instanace to cluster.");
+        _setupTool.addInstanceToCluster(CLUSTER_NAME, PARTICIPANT_ADDRESS);
+    }
+
+    private static void addResourceToCluster() {
+        logger.info("edu.iu.helix.play.HelixDAGFramework | Adding resource to cluster.");
+        _setupTool.addResourceToCluster(CLUSTER_NAME, RESOURCE_NAME, 1, ONLINE_OFFLINE);
+        _setupTool.rebalanceStorageCluster(CLUSTER_NAME, RESOURCE_NAME, 1);
+    }
+
+    private static void createParticipantManager() {
+        logger.info("edu.iu.helix.play.HelixDAGFramework | Creating a Participant Manager.");
+        String instanceName = PARTICIPANT_ADDRESS.replaceAll(":", "_");
+        participantManager = new ZKHelixManager(CLUSTER_NAME, instanceName, InstanceType.PARTICIPANT, ZK_ADDRESS);
+
+        logger.info("edu.iu.helix.play.HelixDAGFramework | Registering SMF for Participant.");
+        StateMachineEngine sme = participantManager.getStateMachineEngine();
+        sme.registerStateModelFactory(BuiltInStateModelDefinitions.OnlineOffline.name(), new OnlineOfflineStateModelFactory());
+
+        try {
+            logger.info("edu.iu.helix.play.HelixDAGFramework | Registering Task for Participant.");
+            registerTaskAndStateModel();
+
+            logger.info("edu.iu.helix.play.HelixDAGFramework | Starting Participant Manager.");
+            participantManager.connect();
+        } catch (Exception ex) {
+            logger.error("Error creating Participant Manager, ex: " + ex, ex);
+        }
+    }
+
+    private static void createAdminManager() {
+        logger.info("edu.iu.helix.play.HelixDAGFramework | Creating a Admin Manager.");
+        adminManager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, ADMIN_NAME, InstanceType.ADMINISTRATOR, ZK_ADDRESS);
+        try {
+            logger.info("edu.iu.helix.play.HelixDAGFramework | Starting Admin Manager.");
+            adminManager.connect();
+        } catch (Exception ex) {
+            logger.error("Error creating Admin Manager, ex: " + ex, ex);
+        }
+    }
+
+    private static void createControllerManager() {
+        logger.info("edu.iu.helix.play.HelixDAGFramework | Creating a Controller Manager.");
+        controllerManager = new ZKHelixManager(CLUSTER_NAME, CONTROLLER_NAME, InstanceType.CONTROLLER, ZK_ADDRESS);
+        try {
+            logger.info("edu.iu.helix.play.HelixDAGFramework | Starting Controller Manager.");
+            controllerManager.connect();
+        } catch (Exception ex) {
+            logger.error("Error creating Controller Manager, ex: " + ex, ex);
+        }
+    }
+
+    private static void registerTaskAndStateModel() {
+        logger.info("edu.iu.helix.play.HelixDAGFramework | Registering Task.");
+        Map<String, TaskFactory> taskRegistry = new HashMap<String, TaskFactory>();
+        taskRegistry.put(HelixTaskA.TASK_COMMAND, new TaskFactory() {
+            @Override
+            public Task createNewTask(TaskCallbackContext context) {
+                return new HelixTaskA(context);
+            }
+        });
+
+        taskRegistry.put(HelixTaskB.TASK_COMMAND, new TaskFactory() {
+            @Override
+            public Task createNewTask(TaskCallbackContext context) {
+                return new HelixTaskB(context);
+            }
+        });
+
+        logger.info("edu.iu.helix.play.HelixDAGFramework | Registering Task StateModel Factory.");
+        StateMachineEngine sme = participantManager.getStateMachineEngine();
+        sme.registerStateModelFactory("Task", new TaskStateModelFactory(participantManager, taskRegistry));
+    }
+
+    private static void verifyClusterState() {
+        logger.info("edu.iu.helix.play.HelixDAGFramework | Verifying Cluster State.");
+        boolean result = ClusterStateVerifier.verifyByZkCallback(
+                new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDRESS, CLUSTER_NAME));
+        Assert.assertTrue(result);
+    }
+
+    private static void disconnectManagers() {
+        logger.info("edu.iu.helix.play.HelixDAGFramework | Disconnecting Admin Manager.");
+        adminManager.disconnect();
+
+        logger.info("edu.iu.helix.play.HelixDAGFramework | Disconnecting Participant Manager.");
+        participantManager.disconnect();
+
+        logger.info("edu.iu.helix.play.HelixDAGFramework | Disconnecting Contoller Manager.");
+        controllerManager.disconnect();
+
+        System.exit(0);
+    }
+ }
diff --git a/helix-playground/src/main/java/edu/iu/helix/play/HelixTaskA.java b/helix-playground/src/main/java/edu/iu/helix/play/HelixTaskA.java
new file mode 100644 (file)
index 0000000..7859624
--- /dev/null
@@ -0,0 +1,48 @@
+package edu.iu.helix.play;
+
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskResult;
+import org.apache.helix.task.UserContentStore;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+/**
+ * Created by goshenoy on 6/19/17.
+ */
+public class HelixTaskA extends UserContentStore implements Task {
+
+    private static Logger logger = LogManager.getLogger(HelixTaskA.class);
+    public static final String TASK_COMMAND = "HelixTask-A";
+
+    HelixTaskA(TaskCallbackContext callbackContext) {
+        logger.info("edu.iu.helix.play.HelixTaskA | callbackContext: " + callbackContext);
+    }
+
+    @Override
+    public TaskResult run() {
+        logger.info("edu.iu.helix.play.HelixTaskA | Inside run(), sleeping for 5 secs");
+        addToUserStore();
+        sleep(5000);
+        logger.info("edu.iu.helix.play.HelixTaskA | Returning status : COMPLETED.");
+        return new TaskResult(TaskResult.Status.COMPLETED, "HelixTask completed!");
+    }
+
+    @Override
+    public void cancel() {
+        logger.info("edu.iu.helix.play.HelixTaskA | Inside cancel()");
+    }
+
+    private void addToUserStore() {
+        logger.info("edu.iu.helix.play.HelixTaskA | Inside addToUserStore()");
+        putUserContent("fullName", "Gourav Shenoy", Scope.WORKFLOW);
+    }
+
+    private static void sleep(long d) {
+        try {
+            Thread.sleep(d);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+    }
+}
diff --git a/helix-playground/src/main/java/edu/iu/helix/play/HelixTaskB.java b/helix-playground/src/main/java/edu/iu/helix/play/HelixTaskB.java
new file mode 100644 (file)
index 0000000..b698286
--- /dev/null
@@ -0,0 +1,52 @@
+package edu.iu.helix.play;
+
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskResult;
+import org.apache.helix.task.UserContentStore;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+/**
+ * Created by goshenoy on 6/20/17.
+ */
+public class HelixTaskB extends UserContentStore implements Task {
+
+    private static Logger logger = LogManager.getLogger(HelixTaskA.class);
+    public static final String TASK_COMMAND = "HelixTask-B";
+
+    HelixTaskB(TaskCallbackContext callbackContext) {
+        logger.info("edu.iu.helix.play.HelixTaskB | callbackContext: " + callbackContext);
+    }
+
+    @Override
+    public TaskResult run() {
+        logger.info("edu.iu.helix.play.HelixTaskB | Inside run(), sleeping for 5 secs");
+        long expiry = System.currentTimeMillis() + 3000L;
+        while (System.currentTimeMillis() < expiry) {
+            logger.info("edu.iu.helix.play.HelixTaskB | Inside run(), *** Waiting ***");
+//            sleep(50);
+        }
+        logger.info("edu.iu.helix.play.HelixTaskB | Retrieved from UserStore : " + getFromUserStore("fullName"));
+        logger.info("edu.iu.helix.play.HelixTaskB | Returning status : COMPLETED.");
+        return new TaskResult(TaskResult.Status.COMPLETED, "edu.iu.helix.play.HelixTaskB completed!");
+    }
+
+    @Override
+    public void cancel() {
+        logger.info("edu.iu.helix.play.HelixTaskB | Inside cancel()");
+    }
+
+    private String getFromUserStore(String key) {
+        logger.info("edu.iu.helix.play.HelixTaskB | Inside getFromUserStore()");
+        return getUserContent(key, Scope.WORKFLOW);
+    }
+
+    private static void sleep(long d) {
+        try {
+            Thread.sleep(d);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+    }
+}
diff --git a/helix-playground/src/main/resources/log4j.properties b/helix-playground/src/main/resources/log4j.properties
new file mode 100644 (file)
index 0000000..6193d62
--- /dev/null
@@ -0,0 +1,9 @@
+# Set root logger level to DEBUG and its only appender to A1.
+log4j.rootLogger=WARN, A1
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
\ No newline at end of file