YARN-8048. Support auto-spawning of admin configured services during bootstrap of...
[hadoop.git] / hadoop-yarn-project / hadoop-yarn / hadoop-yarn-applications / hadoop-yarn-services-api / src / main / java / org / apache / hadoop / yarn / service / client / SystemServiceManagerImpl.java
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17 package org.apache.hadoop.yarn.service.client;
18
19 import com.google.common.annotations.VisibleForTesting;
20 import org.apache.hadoop.conf.Configuration;
21 import org.apache.hadoop.fs.FileStatus;
22 import org.apache.hadoop.fs.FileSystem;
23 import org.apache.hadoop.fs.Path;
24 import org.apache.hadoop.fs.RemoteIterator;
25 import org.apache.hadoop.security.UserGroupInformation;
26 import org.apache.hadoop.service.AbstractService;
27 import org.apache.hadoop.yarn.api.records.ApplicationId;
28 import org.apache.hadoop.yarn.exceptions.YarnException;
29 import org.apache.hadoop.yarn.server.service.SystemServiceManager;
30 import org.apache.hadoop.yarn.service.api.records.Service;
31 import org.apache.hadoop.yarn.service.api.records.ServiceState;
32 import org.apache.hadoop.yarn.service.conf.YarnServiceConf;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 import java.io.FileNotFoundException;
37 import java.io.IOException;
38 import java.lang.reflect.UndeclaredThrowableException;
39 import java.security.PrivilegedExceptionAction;
40 import java.util.HashMap;
41 import java.util.HashSet;
42 import java.util.Map;
43 import java.util.Set;
44 import java.util.concurrent.atomic.AtomicBoolean;
45
46 import static org.apache.hadoop.yarn.service.utils.ServiceApiUtil.jsonSerDeser;
47
48 /**
49 * SystemServiceManager implementation.
50 * Scan for configure system service path.
51 *
52 * The service path structure is as follows:
53 * SYSTEM_SERVICE_DIR_PATH
54 * |---- sync
55 * | |--- user1
56 * | | |---- service1.yarnfile
57 * | | |---- service2.yarnfile
58 * | |--- user2
59 * | | |---- service1.yarnfile
60 * | | ....
61 * | |
62 * |---- async
63 * | |--- user3
64 * | | |---- service1.yarnfile
65 * | | |---- service2.yarnfile
66 * | |--- user4
67 * | | |---- service1.yarnfile
68 * | | ....
69 * | |
70 *
71 * sync: These services are launched at the time of service start synchronously.
72 * It is a blocking service start.
73 * async: These services are launched in separate thread without any delay after
74 * service start. Non-blocking service start.
75 */
76 public class SystemServiceManagerImpl extends AbstractService
77 implements SystemServiceManager {
78
79 private static final Logger LOG =
80 LoggerFactory.getLogger(SystemServiceManagerImpl.class);
81
82 private static final String YARN_FILE_SUFFIX = ".yarnfile";
83 private static final String SYNC = "sync";
84 private static final String ASYNC = "async";
85
86 private FileSystem fs;
87 private Path systemServiceDir;
88 private AtomicBoolean stopExecutors = new AtomicBoolean(false);
89 private Map<String, Set<Service>> syncUserServices = new HashMap<>();
90 private Map<String, Set<Service>> asyncUserServices = new HashMap<>();
91 private UserGroupInformation loginUGI;
92 private Thread serviceLaucher;
93
94 @VisibleForTesting
95 private int skipCounter;
96 @VisibleForTesting
97 private Map<String, Integer> ignoredUserServices =
98 new HashMap<>();
99
100 public SystemServiceManagerImpl() {
101 super(SystemServiceManagerImpl.class.getName());
102 }
103
104 @Override
105 protected void serviceInit(Configuration conf) throws Exception {
106 String dirPath =
107 conf.get(YarnServiceConf.YARN_SERVICES_SYSTEM_SERVICE_DIRECTORY);
108 if (dirPath != null) {
109 systemServiceDir = new Path(dirPath);
110 LOG.info("System Service Directory is configured to {}",
111 systemServiceDir);
112 fs = systemServiceDir.getFileSystem(conf);
113 this.loginUGI = UserGroupInformation.isSecurityEnabled() ?
114 UserGroupInformation.getLoginUser() :
115 UserGroupInformation.getCurrentUser();
116 LOG.info("UserGroupInformation initialized to {}", loginUGI);
117 }
118 }
119
120 @Override
121 protected void serviceStart() throws Exception {
122 scanForUserServices();
123 launchUserService(syncUserServices);
124 // Create a thread and submit services in background otherwise it
125 // block RM switch time.
126 serviceLaucher = new Thread(createRunnable());
127 serviceLaucher.setName("System service launcher");
128 serviceLaucher.start();
129 }
130
131 @Override
132 protected void serviceStop() throws Exception {
133 LOG.info("Stopping {}", getName());
134 stopExecutors.set(true);
135
136 if (serviceLaucher != null) {
137 serviceLaucher.interrupt();
138 try {
139 serviceLaucher.join();
140 } catch (InterruptedException ie) {
141 LOG.warn("Interrupted Exception while stopping", ie);
142 }
143 }
144 }
145
146 private Runnable createRunnable() {
147 return new Runnable() {
148 @Override
149 public void run() {
150 launchUserService(asyncUserServices);
151 }
152 };
153 }
154
155 void launchUserService(Map<String, Set<Service>> userServices) {
156 for (Map.Entry<String, Set<Service>> entry : userServices.entrySet()) {
157 String user = entry.getKey();
158 Set<Service> services = entry.getValue();
159 if (services.isEmpty()) {
160 continue;
161 }
162 ServiceClient serviceClient = null;
163 try {
164 UserGroupInformation userUgi = getProxyUser(user);
165 serviceClient = createServiceClient(userUgi);
166 for (Service service : services) {
167 LOG.info("POST: createService = {} user = {}", service, userUgi);
168 try {
169 launchServices(userUgi, serviceClient, service);
170 } catch (IOException | UndeclaredThrowableException e) {
171 if (e.getCause() != null) {
172 LOG.warn(e.getCause().getMessage());
173 } else {
174 String message =
175 "Failed to create service " + service.getName() + " : ";
176 LOG.error(message, e);
177 }
178 }
179 }
180 } catch (InterruptedException e) {
181 LOG.warn("System service launcher thread interrupted", e);
182 break;
183 } catch (Exception e) {
184 LOG.error("Error while submitting services for user " + user, e);
185 } finally {
186 if (serviceClient != null) {
187 try {
188 serviceClient.close();
189 } catch (IOException e) {
190 LOG.warn("Error while closing serviceClient for user {}", user);
191 }
192 }
193 }
194 }
195 }
196
197 private ServiceClient createServiceClient(UserGroupInformation userUgi)
198 throws IOException, InterruptedException {
199 ServiceClient serviceClient =
200 userUgi.doAs(new PrivilegedExceptionAction<ServiceClient>() {
201 @Override public ServiceClient run()
202 throws IOException, YarnException {
203 ServiceClient sc = getServiceClient();
204 sc.init(getConfig());
205 sc.start();
206 return sc;
207 }
208 });
209 return serviceClient;
210 }
211
212 private void launchServices(UserGroupInformation userUgi,
213 ServiceClient serviceClient, Service service)
214 throws IOException, InterruptedException {
215 if (service.getState() == ServiceState.STOPPED) {
216 userUgi.doAs(new PrivilegedExceptionAction<Void>() {
217 @Override public Void run() throws IOException, YarnException {
218 serviceClient.actionBuild(service);
219 return null;
220 }
221 });
222 LOG.info("Service {} version {} saved.", service.getName(),
223 service.getVersion());
224 } else {
225 ApplicationId applicationId =
226 userUgi.doAs(new PrivilegedExceptionAction<ApplicationId>() {
227 @Override public ApplicationId run()
228 throws IOException, YarnException {
229 ApplicationId applicationId = serviceClient.actionCreate(service);
230 return applicationId;
231 }
232 });
233 LOG.info("Service {} submitted with Application ID: {}",
234 service.getName(), applicationId);
235 }
236 }
237
238 ServiceClient getServiceClient() {
239 return new ServiceClient();
240 }
241
242 private UserGroupInformation getProxyUser(String user) {
243 UserGroupInformation ugi;
244 if (UserGroupInformation.isSecurityEnabled()) {
245 ugi = UserGroupInformation.createProxyUser(user, loginUGI);
246 } else {
247 ugi = UserGroupInformation.createRemoteUser(user);
248 }
249 return ugi;
250 }
251
252 // scan for both launch service types i.e sync and async
253 void scanForUserServices() throws IOException {
254 if (systemServiceDir == null) {
255 return;
256 }
257 try {
258 LOG.info("Scan for launch type on {}", systemServiceDir);
259 RemoteIterator<FileStatus> iterLaunchType = list(systemServiceDir);
260 while (iterLaunchType.hasNext()) {
261 FileStatus launchType = iterLaunchType.next();
262 if (!launchType.isDirectory()) {
263 LOG.debug("Scanner skips for unknown file {}", launchType.getPath());
264 continue;
265 }
266 if (launchType.getPath().getName().equals(SYNC)) {
267 scanForUserServiceDefinition(launchType.getPath(), syncUserServices);
268 } else if (launchType.getPath().getName().equals(ASYNC)) {
269 scanForUserServiceDefinition(launchType.getPath(), asyncUserServices);
270 } else {
271 LOG.debug("Scanner skips for unknown dir {}.", launchType.getPath());
272 }
273 }
274 } catch (FileNotFoundException e) {
275 LOG.warn("System service directory {} doesn't not exist.",
276 systemServiceDir);
277 }
278 }
279
280 // Files are under systemServiceDir/<users>. Scan for 2 levels
281 // 1st level for users
282 // 2nd level for service definitions under user
283 private void scanForUserServiceDefinition(Path userDirPath,
284 Map<String, Set<Service>> userServices) throws IOException {
285 LOG.info("Scan for users on {}", userDirPath);
286 RemoteIterator<FileStatus> iterUsers = list(userDirPath);
287 while (iterUsers.hasNext()) {
288 FileStatus userDir = iterUsers.next();
289 // if 1st level is not user directory then skip it.
290 if (!userDir.isDirectory()) {
291 LOG.info(
292 "Service definition {} doesn't belong to any user. Ignoring.. ",
293 userDir.getPath().getName());
294 continue;
295 }
296 String userName = userDir.getPath().getName();
297 LOG.info("Scanning service definitions for user {}.", userName);
298
299 //2nd level scan
300 RemoteIterator<FileStatus> iterServices = list(userDir.getPath());
301 while (iterServices.hasNext()) {
302 FileStatus serviceCache = iterServices.next();
303 String filename = serviceCache.getPath().getName();
304 if (!serviceCache.isFile()) {
305 LOG.info("Scanner skips for unknown dir {}", filename);
306 continue;
307 }
308 if (!filename.endsWith(YARN_FILE_SUFFIX)) {
309 LOG.info("Scanner skips for unknown file extension, filename = {}",
310 filename);
311 skipCounter++;
312 continue;
313 }
314 Service service = getServiceDefinition(serviceCache.getPath());
315 if (service != null) {
316 Set<Service> services = userServices.get(userName);
317 if (services == null) {
318 services = new HashSet<>();
319 userServices.put(userName, services);
320 }
321 if (!services.add(service)) {
322 int count = ignoredUserServices.containsKey(userName) ?
323 ignoredUserServices.get(userName) : 0;
324 ignoredUserServices.put(userName, count + 1);
325 LOG.warn(
326 "Ignoring service {} for the user {} as it is already present,"
327 + " filename = {}", service.getName(), userName, filename);
328 }
329 LOG.info("Added service {} for the user {}, filename = {}",
330 service.getName(), userName, filename);
331 }
332 }
333 }
334 }
335
336 private Service getServiceDefinition(Path filePath) {
337 Service service = null;
338 try {
339 if (LOG.isDebugEnabled()) {
340 LOG.debug("Loading service definition from FS: " + filePath);
341 }
342 service = jsonSerDeser.load(fs, filePath);
343 } catch (IOException e) {
344 LOG.info("Error while loading service definition from FS: {}", e);
345 }
346 return service;
347 }
348
349 private RemoteIterator<FileStatus> list(Path path) throws IOException {
350 return new StoppableRemoteIterator(fs.listStatusIterator(path));
351 }
352
353 @VisibleForTesting Map<String, Integer> getIgnoredUserServices() {
354 return ignoredUserServices;
355 }
356
357 private class StoppableRemoteIterator implements RemoteIterator<FileStatus> {
358 private final RemoteIterator<FileStatus> remote;
359
360 StoppableRemoteIterator(RemoteIterator<FileStatus> remote) {
361 this.remote = remote;
362 }
363
364 @Override public boolean hasNext() throws IOException {
365 return !stopExecutors.get() && remote.hasNext();
366 }
367
368 @Override public FileStatus next() throws IOException {
369 return remote.next();
370 }
371 }
372
373 @VisibleForTesting
374 Map<String, Set<Service>> getSyncUserServices() {
375 return syncUserServices;
376 }
377
378 @VisibleForTesting int getSkipCounter() {
379 return skipCounter;
380 }
381 }