fixed the conversion between scala and kotlin for ActionData in the tests
[incubator-amaterasu.git] / leader / src / test / scala / org / apache / amaterasu / common / execution / ActionStatusTests.scala
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements.  See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License.  You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 package org.apache.amaterasu.common.execution
18
19 import java.util
20 import java.util.concurrent.LinkedBlockingQueue
21
22 import org.apache.amaterasu.common.configuration.enums.ActionStatus
23 import org.apache.amaterasu.common.dataobjects.ActionData
24 import org.apache.amaterasu.leader.common.actions.SequentialAction
25 import org.apache.curator.framework.CuratorFrameworkFactory
26 import org.apache.curator.retry.ExponentialBackoffRetry
27 import org.apache.curator.test.TestingServer
28 import org.apache.zookeeper.CreateMode
29 import org.scalatest.{FlatSpec, Matchers}
30
31 import scala.collection.JavaConverters._
32
33 class ActionStatusTests extends FlatSpec with Matchers {
34
35   // setting up a testing zookeeper server (curator TestServer)
36   val retryPolicy = new ExponentialBackoffRetry(1000, 3)
37   val server = new TestingServer(2181, true)
38   val jobId = s"job_${System.currentTimeMillis}"
39   val data = new ActionData(ActionStatus.pending, "test_action", "start.scala", "spark","scala", "0000001", new util.HashMap() , List[String]().asJava)
40
41   "an Action" should "queue it's ActionData int the job queue when executed" in {
42
43     val queue = new LinkedBlockingQueue[ActionData]()
44     // val config = ClusterConfig()
45
46     val client = CuratorFrameworkFactory.newClient(server.getConnectString, retryPolicy)
47     client.start()
48
49     client.create().withMode(CreateMode.PERSISTENT).forPath(s"/$jobId")
50     val action = SequentialAction(data.getName, data.getSrc, data.getGroupId, data.getTypeId, Map.empty, jobId, queue, client, 1)
51
52     action.execute()
53     queue.peek().getName should be(data.getName)
54     queue.peek().getSrc should be(data.getSrc)
55
56   }
57
58   it should "also create a sequential znode for the task with the value of queued" in {
59
60     val client = CuratorFrameworkFactory.newClient(server.getConnectString, retryPolicy)
61     client.start()
62
63     val taskStatus = client.getData.forPath(s"/$jobId/task-0000000000")
64
65     taskStatus should not be null
66     new String(taskStatus) should be("queued")
67
68   }
69
70 }