fixed the conversion between scala and kotlin for ActionData in the tests
[incubator-amaterasu.git] / leader / src / test / scala / org / apache / amaterasu / common / execution / JobRestoreTests.scala
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements.  See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License.  You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 package org.apache.amaterasu.common.execution
18
19 import java.util.concurrent.LinkedBlockingQueue
20
21 import org.apache.amaterasu.common.configuration.enums.ActionStatus
22 import org.apache.amaterasu.common.dataobjects.ActionData
23 import org.apache.amaterasu.leader.execution.{JobLoader, JobManager}
24 import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
25 import org.apache.curator.retry.ExponentialBackoffRetry
26 import org.apache.curator.test.TestingServer
27 import org.apache.zookeeper.CreateMode
28 import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
29
30 import scala.io.Source
31
32 class JobRestoreTests extends FlatSpec with Matchers with BeforeAndAfterEach {
33
34   val retryPolicy = new ExponentialBackoffRetry(1000, 3)
35   val server = new TestingServer(2184, true)
36   var client: CuratorFramework = null
37
38   val jobId = s"job_${System.currentTimeMillis}"
39   val maki = Source.fromURL(getClass.getResource("/simple-maki.yml")).mkString
40   val queue = new LinkedBlockingQueue[ActionData]()
41
42   var manager: JobManager = null
43
44   client = CuratorFrameworkFactory.newClient(server.getConnectString, retryPolicy)
45   client.start()
46
47   override def beforeEach(): Unit = {
48
49     // creating the jobs znode and storing the source repo and branch
50     client.create().withMode(CreateMode.PERSISTENT).forPath(s"/$jobId")
51     client.create().withMode(CreateMode.PERSISTENT).forPath(s"/$jobId/src", "".getBytes)
52     client.create().withMode(CreateMode.PERSISTENT).forPath(s"/$jobId/branch", "".getBytes)
53
54     manager = JobLoader.createJobManager(maki, jobId, client, 3, queue)
55
56   }
57
58   override def afterEach(): Unit = {
59
60     client.delete().deletingChildrenIfNeeded().forPath(s"/$jobId")
61
62   }
63
64   "a restored job" should "have all queued actions in the executionQueue" in {
65
66     // setting task-0000000002 as queued
67     client.setData().forPath(s"/${jobId}/task-0000000002", ActionStatus.queued.toString.getBytes)
68
69     JobLoader.restoreJobState(manager, jobId, client)
70
71     queue.peek.getName should be("start")
72   }
73
74   "a restored job" should "have all started actions in the executionQueue" in {
75
76     // setting task-0000000002 as queued
77     client.setData().forPath(s"/${jobId}/task-0000000002", ActionStatus.started.toString.getBytes)
78
79     JobLoader.restoreJobState(manager, jobId, client)
80
81     queue.peek.getName should be("start")
82   }
83 }