fixed the conversion between scala and kotlin for ActionData in the tests
[incubator-amaterasu.git] / leader / src / test / scala / org / apache / amaterasu / common / execution / JobExecutionTests.scala
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements.  See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License.  You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 package org.apache.amaterasu.common.execution
18
19 import java.util.concurrent.LinkedBlockingQueue
20
21 import org.apache.amaterasu.common.dataobjects.ActionData
22 import org.apache.amaterasu.leader.dsl.JobParser
23 import org.apache.curator.framework.CuratorFrameworkFactory
24 import org.apache.curator.retry.ExponentialBackoffRetry
25 import org.apache.curator.test.TestingServer
26 import org.apache.zookeeper.CreateMode
27 import org.scalatest.{FlatSpec, Matchers}
28
29 import scala.io.Source
30
31 class JobExecutionTests extends FlatSpec with Matchers {
32
33   val retryPolicy = new ExponentialBackoffRetry(1000, 3)
34   val server = new TestingServer(2183, true)
35   val client = CuratorFrameworkFactory.newClient(server.getConnectString, retryPolicy)
36   client.start()
37
38   val jobId = s"job_${System.currentTimeMillis}"
39   val yaml = Source.fromURL(getClass.getResource("/simple-maki.yml")).mkString
40   val queue = new LinkedBlockingQueue[ActionData]()
41
42   // this will be performed by the job bootstraper
43   client.create().withMode(CreateMode.PERSISTENT).forPath(s"/$jobId")
44   //  client.setData().forPath(s"/$jobId/src",src.getBytes)
45   //  client.setData().forPath(s"/$jobId/branch", branch.getBytes)
46
47   val job = JobParser.parse(jobId, yaml, queue, client, 1)
48
49   "a job" should "queue the first action when the JobManager.start method is called " in {
50
51     job.start
52     queue.peek.getName should be ("start")
53
54     // making sure that the status is reflected in zk
55     val actionStatus = client.getData.forPath(s"/$jobId/task-0000000000")
56     new String(actionStatus) should be("queued")
57
58   }
59
60   it should "return the start action when calling getNextAction and dequeue it" in {
61
62     job.getNextActionData.getName should be ("start")
63     queue.size should be (0)
64
65     // making sure that the status is reflected in zk
66     val actionStatus = client.getData.forPath(s"/$jobId/task-0000000000")
67     new String(actionStatus) should be("started")
68
69   }
70
71   it should "be marked as complete when the actionComplete method is called" in {
72
73     job.actionComplete("0000000000")
74
75     // making sure that the status is reflected in zk
76     val actionStatus = client.getData.forPath(s"/$jobId/task-0000000000")
77     new String(actionStatus) should be("complete")
78
79   }
80
81   "the next step2 job" should "be queued as a result of the completion" in {
82
83     queue.peek.getName should be ("step2")
84
85     // making sure that the status is reflected in zk
86     val actionStatus = client.getData.forPath(s"/$jobId/task-0000000001")
87     new String(actionStatus) should be("queued")
88
89   }
90
91   it should "be marked as started when JobManager.getNextActionData is called" in {
92
93     val data = job.getNextActionData
94
95     data.getName should be ("step2")
96
97     // making sure that the status is reflected in zk
98     val actionStatus = client.getData.forPath(s"/$jobId/task-0000000001")
99     new String(actionStatus) should be("started")
100   }
101
102   it should "be marked as failed when JobManager. is called" in {
103
104     job.actionFailed("0000000001", "test failure")
105     queue.peek.getName should be ("error-action")
106
107     // making sure that the status is reflected in zk
108     val actionStatus = client.getData.forPath(s"/$jobId/task-0000000001-error")
109     new String(actionStatus) should be("queued")
110
111     // and returned by getNextActionData
112     val data = job.getNextActionData
113
114   }
115 }