AMATERASU-26 Pipeline tasks runs as "yarn" user instead of inheriting the user
[incubator-amaterasu.git] / leader / src / main / java / org / apache / amaterasu / leader / yarn / Client.java
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17 package org.apache.amaterasu.leader.yarn;
18
19 import org.apache.activemq.ActiveMQConnectionFactory;
20 import org.apache.amaterasu.common.configuration.ClusterConfig;
21 import org.apache.amaterasu.leader.execution.frameworks.FrameworkProvidersFactory;
22 import org.apache.amaterasu.leader.utilities.ActiveReportListener;
23 import org.apache.amaterasu.sdk.frameworks.FrameworkSetupProvider;
24 import org.apache.curator.framework.CuratorFramework;
25 import org.apache.curator.framework.CuratorFrameworkFactory;
26 import org.apache.curator.framework.recipes.barriers.DistributedBarrier;
27 import org.apache.curator.retry.ExponentialBackoffRetry;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.fs.FileStatus;
30 import org.apache.hadoop.fs.FileSystem;
31 import org.apache.hadoop.fs.Path;
32 import org.apache.hadoop.security.UserGroupInformation;
33 import org.apache.hadoop.yarn.api.ApplicationConstants;
34 import org.apache.hadoop.yarn.api.records.*;
35 import org.apache.hadoop.yarn.client.api.YarnClient;
36 import org.apache.hadoop.yarn.client.api.YarnClientApplication;
37 import org.apache.hadoop.yarn.conf.YarnConfiguration;
38 import org.apache.hadoop.yarn.exceptions.YarnException;
39 import org.apache.hadoop.yarn.util.Apps;
40 import org.apache.hadoop.yarn.util.ConverterUtils;
41 import org.apache.hadoop.yarn.util.Records;
42 import org.apache.log4j.LogManager;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 import javax.jms.*;
47 import java.io.File;
48 import java.io.FileInputStream;
49 import java.io.IOException;
50 import java.util.*;
51
52 import static java.lang.System.exit;
53
54 public class Client {
55
56 private final static Logger LOGGER = LoggerFactory.getLogger(Client.class);
57 private final Configuration conf = new YarnConfiguration();
58 private FileSystem fs;
59
60 private LocalResource setLocalResourceFromPath(Path path) throws IOException {
61
62 FileStatus stat = fs.getFileStatus(path);
63 LocalResource fileResource = Records.newRecord(LocalResource.class);
64 fileResource.setResource(ConverterUtils.getYarnUrlFromPath(path));
65 fileResource.setSize(stat.getLen());
66 fileResource.setTimestamp(stat.getModificationTime());
67 fileResource.setType(LocalResourceType.FILE);
68 fileResource.setVisibility(LocalResourceVisibility.PUBLIC);
69 return fileResource;
70 }
71
72 private void run(JobOpts opts, String[] args) throws Exception {
73
74 LogManager.resetConfiguration();
75 ClusterConfig config = new ClusterConfig();
76 config.load(new FileInputStream(opts.home + "/amaterasu.properties"));
77
78 // Create yarnClient
79 YarnClient yarnClient = YarnClient.createYarnClient();
80 yarnClient.init(conf);
81 yarnClient.start();
82
83 // Create application via yarnClient
84 YarnClientApplication app = null;
85 try {
86 app = yarnClient.createApplication();
87 } catch (YarnException e) {
88 LOGGER.error("Error initializing yarn application with yarn client.", e);
89 exit(1);
90 } catch (IOException e) {
91 LOGGER.error("Error initializing yarn application with yarn client.", e);
92 exit(2);
93 }
94
95 // Setup jars on hdfs
96 try {
97 fs = FileSystem.get(conf);
98 } catch (IOException e) {
99 LOGGER.error("Eror creating HDFS client isntance.", e);
100 exit(3);
101 }
102 Path jarPath = new Path(config.YARN().hdfsJarsPath());
103 Path jarPathQualified = fs.makeQualified(jarPath);
104
105 ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
106
107 String newId = "";
108 if (opts.jobId == null) {
109 newId = "--new-job-id " + appContext.getApplicationId().toString() + "-" + UUID.randomUUID().toString();
110 }
111
112
113 List<String> commands = Collections.singletonList(
114 "env AMA_NODE=" + System.getenv("AMA_NODE") +
115 " env HADOOP_USER_NAME=" + UserGroupInformation.getCurrentUser().getUserName() +
116 " $JAVA_HOME/bin/java" +
117 " -Dscala.usejavacp=false" +
118 " -Xmx1G" +
119 " org.apache.amaterasu.leader.yarn.ApplicationMaster " +
120 joinStrings(args) +
121 newId +
122 " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
123 " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"
124 );
125
126
127 // Set up the container launch context for the application master
128 ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
129 amContainer.setCommands(commands);
130
131 // Setup local ama folder on hdfs.
132 try {
133
134 if (!fs.exists(jarPathQualified)) {
135 File home = new File(opts.home);
136 fs.mkdirs(jarPathQualified);
137
138 for (File f : home.listFiles()) {
139 fs.copyFromLocalFile(false, true, new Path(f.getAbsolutePath()), jarPathQualified);
140 }
141
142 // setup frameworks
143 FrameworkProvidersFactory frameworkFactory = FrameworkProvidersFactory.apply(opts.env, config);
144 for (String group : frameworkFactory.groups()) {
145 System.out.println("===> setting up " + group);
146 FrameworkSetupProvider framework = frameworkFactory.getFramework(group);
147
148 //creating a group folder
149 Path frameworkPath = Path.mergePaths(jarPathQualified, new Path("/" + framework.getGroupIdentifier()));
150 System.out.println("===> " + frameworkPath.toString());
151
152 fs.mkdirs(frameworkPath);
153 for (File file : framework.getGroupResources()) {
154 if (file.exists())
155 fs.copyFromLocalFile(false, true, new Path(file.getAbsolutePath()), frameworkPath);
156 }
157 }
158 }
159 } catch (IOException e) {
160 System.out.println("===>" + e.getMessage());
161 LOGGER.error("Error uploading ama folder to HDFS.", e);
162 exit(3);
163 } catch (NullPointerException ne) {
164 System.out.println("===>" + ne.getMessage());
165 LOGGER.error("No files in home dir.", ne);
166 exit(4);
167 }
168
169 // get version of build
170 String version = config.version();
171
172 // get local resources pointers that will be set on the master container env
173 String leaderJarPath = String.format("/bin/leader-%s-all.jar", version);
174 LOGGER.info("Leader Jar path is: {}", leaderJarPath);
175 Path mergedPath = Path.mergePaths(jarPath, new Path(leaderJarPath));
176
177 // System.out.println("===> path: " + jarPathQualified);
178 LOGGER.info("Leader merged jar path is: {}", mergedPath);
179 LocalResource leaderJar = null;
180 LocalResource propFile = null;
181 LocalResource log4jPropFile = null;
182
183 try {
184 leaderJar = setLocalResourceFromPath(mergedPath);
185 propFile = setLocalResourceFromPath(Path.mergePaths(jarPath, new Path("/amaterasu.properties")));
186 log4jPropFile = setLocalResourceFromPath(Path.mergePaths(jarPath, new Path("/log4j.properties")));
187 } catch (IOException e) {
188 LOGGER.error("Error initializing yarn local resources.", e);
189 exit(4);
190 }
191
192 // set local resource on master container
193 Map<String, LocalResource> localResources = new HashMap<>();
194 localResources.put("leader.jar", leaderJar);
195 localResources.put("amaterasu.properties", propFile);
196 localResources.put("log4j.properties", log4jPropFile);
197 amContainer.setLocalResources(localResources);
198
199 // Setup CLASSPATH for ApplicationMaster
200 Map<String, String> appMasterEnv = new HashMap<>();
201 setupAppMasterEnv(appMasterEnv);
202 appMasterEnv.put("AMA_CONF_PATH", String.format("%s/amaterasu.properties", config.YARN().hdfsJarsPath()));
203 amContainer.setEnvironment(appMasterEnv);
204
205 // Set up resource type requirements for ApplicationMaster
206 Resource capability = Records.newRecord(Resource.class);
207 capability.setMemory(config.YARN().master().memoryMB());
208 capability.setVirtualCores(config.YARN().master().cores());
209
210 // Finally, set-up ApplicationSubmissionContext for the application
211 appContext.setApplicationName("amaterasu-" + opts.name);
212 appContext.setAMContainerSpec(amContainer);
213 appContext.setResource(capability);
214 appContext.setQueue(config.YARN().queue());
215 appContext.setPriority(Priority.newInstance(1));
216
217 // Submit application
218 ApplicationId appId = appContext.getApplicationId();
219 LOGGER.info("Submitting application {}", appId);
220 try {
221 yarnClient.submitApplication(appContext);
222
223 } catch (YarnException e) {
224 LOGGER.error("Error submitting application.", e);
225 exit(6);
226 } catch (IOException e) {
227 LOGGER.error("Error submitting application.", e);
228 exit(7);
229 }
230
231 CuratorFramework client = CuratorFrameworkFactory.newClient(config.zk(),
232 new ExponentialBackoffRetry(1000, 3));
233 client.start();
234
235 String newJobId = newId.replace("--new-job-id ", "");
236 System.out.println("===> /" + newJobId + "-report-barrier");
237 DistributedBarrier reportBarrier = new DistributedBarrier(client, "/" + newJobId + "-report-barrier");
238 reportBarrier.setBarrier();
239 reportBarrier.waitOnBarrier();
240
241 String address = new String(client.getData().forPath("/" + newJobId + "/broker"));
242 System.out.println("===> " + address);
243 setupReportListener(address);
244
245 ApplicationReport appReport = null;
246 YarnApplicationState appState;
247
248 do {
249 try {
250 appReport = yarnClient.getApplicationReport(appId);
251 } catch (YarnException e) {
252 LOGGER.error("Error getting application report.", e);
253 exit(8);
254 } catch (IOException e) {
255 LOGGER.error("Error getting application report.", e);
256 exit(9);
257 }
258 appState = appReport.getYarnApplicationState();
259 if (isAppFinished(appState)) {
260 exit(0);
261 break;
262 }
263 //LOGGER.info("Application not finished ({})", appReport.getProgress());
264 try {
265 Thread.sleep(100);
266 } catch (InterruptedException e) {
267 LOGGER.error("Interrupted while waiting for job completion.", e);
268 exit(137);
269 }
270 } while (!isAppFinished(appState));
271
272 LOGGER.info("Application {} finished with state {}-{} at {}", appId, appState, appReport.getFinalApplicationStatus(), appReport.getFinishTime());
273 }
274
275 private boolean isAppFinished(YarnApplicationState appState) {
276 return appState == YarnApplicationState.FINISHED ||
277 appState == YarnApplicationState.KILLED ||
278 appState == YarnApplicationState.FAILED;
279 }
280
281 public static void main(String[] args) throws Exception {
282 Client c = new Client();
283
284 JobOpts opts = ArgsParser.getJobOpts(args);
285
286 c.run(opts, args);
287 }
288
289 private static String joinStrings(String[] str) {
290
291 StringBuilder builder = new StringBuilder();
292 for (String s : str) {
293 builder.append(s);
294 builder.append(" ");
295 }
296 return builder.toString();
297
298 }
299
300 private void setupReportListener(String address) throws JMSException {
301
302 ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(address);
303 Connection conn = cf.createConnection();
304 conn.start();
305
306 Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
307
308 //TODO: move to a const in common
309 Topic destination = session.createTopic("JOB.REPORT");
310
311 MessageConsumer consumer = session.createConsumer(destination);
312 consumer.setMessageListener(new ActiveReportListener());
313
314 }
315
316 private void setupAppMasterEnv(Map<String, String> appMasterEnv) {
317 Apps.addToEnvironment(appMasterEnv,
318 ApplicationConstants.Environment.CLASSPATH.name(),
319 ApplicationConstants.Environment.PWD.$() + File.separator + "*", File.pathSeparator);
320
321 for (String c : conf.getStrings(
322 YarnConfiguration.YARN_APPLICATION_CLASSPATH,
323 YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
324 Apps.addToEnvironment(appMasterEnv, ApplicationConstants.Environment.CLASSPATH.name(),
325 c.trim(), File.pathSeparator);
326 }
327 }
328 }