YARN-8048. Support auto-spawning of admin configured services during bootstrap of...
[hadoop.git] / hadoop-yarn-project / hadoop-yarn / hadoop-yarn-server / hadoop-yarn-server-resourcemanager / src / main / java / org / apache / hadoop / yarn / server / resourcemanager / ResourceManager.java
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.hadoop.yarn.server.resourcemanager;
20
21 import com.google.common.annotations.VisibleForTesting;
22 import com.sun.jersey.spi.container.servlet.ServletContainer;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.curator.framework.AuthInfo;
27 import org.apache.curator.framework.CuratorFramework;
28 import org.apache.hadoop.classification.InterfaceAudience.Private;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.ha.HAServiceProtocol;
31 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
32 import org.apache.hadoop.http.HttpServer2;
33 import org.apache.hadoop.metrics2.MetricsSystem;
34 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
35 import org.apache.hadoop.metrics2.source.JvmMetrics;
36 import org.apache.hadoop.net.NetUtils;
37 import org.apache.hadoop.security.Groups;
38 import org.apache.hadoop.security.SecurityUtil;
39 import org.apache.hadoop.security.UserGroupInformation;
40 import org.apache.hadoop.security.authorize.ProxyUsers;
41 import org.apache.hadoop.service.CompositeService;
42 import org.apache.hadoop.service.Service;
43 import org.apache.hadoop.util.ExitUtil;
44 import org.apache.hadoop.util.GenericOptionsParser;
45 import org.apache.hadoop.util.JvmPauseMonitor;
46 import org.apache.hadoop.util.ReflectionUtils;
47 import org.apache.hadoop.util.ShutdownHookManager;
48 import org.apache.hadoop.util.StringUtils;
49 import org.apache.hadoop.util.VersionInfo;
50 import org.apache.hadoop.util.curator.ZKCuratorManager;
51 import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
52 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
53 import org.apache.hadoop.yarn.api.records.ApplicationId;
54 import org.apache.hadoop.yarn.api.records.NodeId;
55 import org.apache.hadoop.yarn.conf.ConfigurationProvider;
56 import org.apache.hadoop.yarn.conf.ConfigurationProviderFactory;
57 import org.apache.hadoop.yarn.conf.HAUtil;
58 import org.apache.hadoop.yarn.conf.YarnConfiguration;
59 import org.apache.hadoop.yarn.event.AsyncDispatcher;
60 import org.apache.hadoop.yarn.event.Dispatcher;
61 import org.apache.hadoop.yarn.event.EventDispatcher;
62 import org.apache.hadoop.yarn.event.EventHandler;
63 import org.apache.hadoop.yarn.exceptions.YarnException;
64 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
65 import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
66 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
67 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
68 import org.apache.hadoop.yarn.server.resourcemanager.federation.FederationStateStoreService;
69 import org.apache.hadoop.yarn.server.resourcemanager.metrics.NoOpSystemMetricPublisher;
70 import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
71 import org.apache.hadoop.yarn.server.resourcemanager.metrics.TimelineServiceV1Publisher;
72 import org.apache.hadoop.yarn.server.resourcemanager.metrics.TimelineServiceV2Publisher;
73 import org.apache.hadoop.yarn.server.resourcemanager.metrics.CombinedSystemMetricsPublisher;
74 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater;
75 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
76 import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
77 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
78 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
79 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreFactory;
80 import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
81 import org.apache.hadoop.yarn.server.resourcemanager.reservation.AbstractReservationSystem;
82 import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
83 import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceProfilesManager;
84 import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceProfilesManagerImpl;
85 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
86 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
87 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
88 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
89 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
90 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
91 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
92 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.monitor.RMAppLifetimeMonitor;
93 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
94 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
95 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
96 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
97 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
98 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
99 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
100 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.MemoryPlacementConstraintManager;
101 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManagerService;
102 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
103 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
104 import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
105 import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
106 import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
107 import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp;
108 import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppUtil;
109 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
110 import org.apache.hadoop.yarn.server.service.SystemServiceManager;
111 import org.apache.hadoop.yarn.server.webproxy.AppReportFetcher;
112 import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils;
113 import org.apache.hadoop.yarn.server.webproxy.WebAppProxy;
114 import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet;
115 import org.apache.hadoop.yarn.webapp.WebApp;
116 import org.apache.hadoop.yarn.webapp.WebApps;
117 import org.apache.hadoop.yarn.webapp.WebApps.Builder;
118 import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
119 import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
120 import org.eclipse.jetty.webapp.WebAppContext;
121
122 import java.io.IOException;
123 import java.io.InputStream;
124 import java.io.PrintStream;
125 import java.net.InetSocketAddress;
126 import java.net.URI;
127 import java.net.URL;
128 import java.net.URLClassLoader;
129 import java.nio.charset.Charset;
130 import java.security.PrivilegedExceptionAction;
131 import java.security.SecureRandom;
132 import java.util.ArrayList;
133 import java.util.HashMap;
134 import java.util.List;
135 import java.util.Map;
136 import java.util.concurrent.atomic.AtomicBoolean;
137
138 /**
139 * The ResourceManager is the main class that is a set of components.
140 * "I am the ResourceManager. All your resources belong to us..."
141 *
142 */
143 @SuppressWarnings("unchecked")
144 public class ResourceManager extends CompositeService implements Recoverable {
145
146 /**
147 * Priority of the ResourceManager shutdown hook.
148 */
149 public static final int SHUTDOWN_HOOK_PRIORITY = 30;
150
151 /**
152 * Used for generation of various ids.
153 */
154 public static final int EPOCH_BIT_SHIFT = 40;
155
156 private static final Log LOG = LogFactory.getLog(ResourceManager.class);
157 private static long clusterTimeStamp = System.currentTimeMillis();
158
159 /*
160 * UI2 webapp name
161 */
162 public static final String UI2_WEBAPP_NAME = "/ui2";
163
164 /**
165 * "Always On" services. Services that need to run always irrespective of
166 * the HA state of the RM.
167 */
168 @VisibleForTesting
169 protected RMContextImpl rmContext;
170 private Dispatcher rmDispatcher;
171 @VisibleForTesting
172 protected AdminService adminService;
173
174 /**
175 * "Active" services. Services that need to run only on the Active RM.
176 * These services are managed (initialized, started, stopped) by the
177 * {@link CompositeService} RMActiveServices.
178 *
179 * RM is active when (1) HA is disabled, or (2) HA is enabled and the RM is
180 * in Active state.
181 */
182 protected RMActiveServices activeServices;
183 protected RMSecretManagerService rmSecretManagerService;
184
185 protected ResourceScheduler scheduler;
186 protected ReservationSystem reservationSystem;
187 private ClientRMService clientRM;
188 protected ApplicationMasterService masterService;
189 protected NMLivelinessMonitor nmLivelinessMonitor;
190 protected NodesListManager nodesListManager;
191 protected RMAppManager rmAppManager;
192 protected ApplicationACLsManager applicationACLsManager;
193 protected QueueACLsManager queueACLsManager;
194 private FederationStateStoreService federationStateStoreService;
195 private WebApp webApp;
196 private AppReportFetcher fetcher = null;
197 protected ResourceTrackerService resourceTracker;
198 private JvmMetrics jvmMetrics;
199 private boolean curatorEnabled = false;
200 private ZKCuratorManager zkManager;
201 private final String zkRootNodePassword =
202 Long.toString(new SecureRandom().nextLong());
203 private boolean recoveryEnabled;
204
205 @VisibleForTesting
206 protected String webAppAddress;
207 private ConfigurationProvider configurationProvider = null;
208 /** End of Active services */
209
210 private Configuration conf;
211
212 private UserGroupInformation rmLoginUGI;
213
214 public ResourceManager() {
215 super("ResourceManager");
216 }
217
218 public RMContext getRMContext() {
219 return this.rmContext;
220 }
221
222 public static long getClusterTimeStamp() {
223 return clusterTimeStamp;
224 }
225
226 @VisibleForTesting
227 protected static void setClusterTimeStamp(long timestamp) {
228 clusterTimeStamp = timestamp;
229 }
230
231 @VisibleForTesting
232 Dispatcher getRmDispatcher() {
233 return rmDispatcher;
234 }
235
236 @VisibleForTesting
237 protected ResourceProfilesManager createResourceProfileManager() {
238 ResourceProfilesManager resourceProfilesManager =
239 new ResourceProfilesManagerImpl();
240 return resourceProfilesManager;
241 }
242
243 @Override
244 protected void serviceInit(Configuration conf) throws Exception {
245 this.conf = conf;
246 this.rmContext = new RMContextImpl();
247 rmContext.setResourceManager(this);
248
249 this.configurationProvider =
250 ConfigurationProviderFactory.getConfigurationProvider(conf);
251 this.configurationProvider.init(this.conf);
252 rmContext.setConfigurationProvider(configurationProvider);
253
254 // load core-site.xml
255 loadConfigurationXml(YarnConfiguration.CORE_SITE_CONFIGURATION_FILE);
256
257 // Refresh user to group mappings during init.
258 refreshUserToGroupMappingsWithConf();
259
260 // Do refreshSuperUserGroupsConfiguration with loaded core-site.xml
261 // Or use RM specific configurations to overwrite the common ones first
262 // if they exist
263 RMServerUtils.processRMProxyUsersConf(conf);
264 ProxyUsers.refreshSuperUserGroupsConfiguration(this.conf);
265
266 // load yarn-site.xml
267 loadConfigurationXml(YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
268
269 validateConfigs(this.conf);
270
271 // Set HA configuration should be done before login
272 this.rmContext.setHAEnabled(HAUtil.isHAEnabled(this.conf));
273 if (this.rmContext.isHAEnabled()) {
274 HAUtil.verifyAndSetConfiguration(this.conf);
275 }
276
277 // Set UGI and do login
278 // If security is enabled, use login user
279 // If security is not enabled, use current user
280 this.rmLoginUGI = UserGroupInformation.getCurrentUser();
281 try {
282 doSecureLogin();
283 } catch(IOException ie) {
284 throw new YarnRuntimeException("Failed to login", ie);
285 }
286
287 // register the handlers for all AlwaysOn services using setupDispatcher().
288 rmDispatcher = setupDispatcher();
289 addIfService(rmDispatcher);
290 rmContext.setDispatcher(rmDispatcher);
291
292 // The order of services below should not be changed as services will be
293 // started in same order
294 // As elector service needs admin service to be initialized and started,
295 // first we add admin service then elector service
296
297 adminService = createAdminService();
298 addService(adminService);
299 rmContext.setRMAdminService(adminService);
300
301 // elector must be added post adminservice
302 if (this.rmContext.isHAEnabled()) {
303 // If the RM is configured to use an embedded leader elector,
304 // initialize the leader elector.
305 if (HAUtil.isAutomaticFailoverEnabled(conf)
306 && HAUtil.isAutomaticFailoverEmbedded(conf)) {
307 EmbeddedElector elector = createEmbeddedElector();
308 addIfService(elector);
309 rmContext.setLeaderElectorService(elector);
310 }
311 }
312
313 rmContext.setYarnConfiguration(conf);
314
315 createAndInitActiveServices(false);
316
317 webAppAddress = WebAppUtils.getWebAppBindURL(this.conf,
318 YarnConfiguration.RM_BIND_HOST,
319 WebAppUtils.getRMWebAppURLWithoutScheme(this.conf));
320
321 RMApplicationHistoryWriter rmApplicationHistoryWriter =
322 createRMApplicationHistoryWriter();
323 addService(rmApplicationHistoryWriter);
324 rmContext.setRMApplicationHistoryWriter(rmApplicationHistoryWriter);
325
326 // initialize the RM timeline collector first so that the system metrics
327 // publisher can bind to it
328 if (YarnConfiguration.timelineServiceV2Enabled(this.conf)) {
329 RMTimelineCollectorManager timelineCollectorManager =
330 createRMTimelineCollectorManager();
331 addService(timelineCollectorManager);
332 rmContext.setRMTimelineCollectorManager(timelineCollectorManager);
333 }
334
335 SystemMetricsPublisher systemMetricsPublisher =
336 createSystemMetricsPublisher();
337 addIfService(systemMetricsPublisher);
338 rmContext.setSystemMetricsPublisher(systemMetricsPublisher);
339
340 super.serviceInit(this.conf);
341 }
342
343 private void refreshUserToGroupMappingsWithConf()
344 throws YarnException, IOException {
345 Configuration newConf = new Configuration(false);
346 InputStream confFileInputStream =
347 configurationProvider
348 .getConfigurationInputStream(newConf, YarnConfiguration.CORE_SITE_CONFIGURATION_FILE);
349 if (confFileInputStream != null) {
350 newConf.addResource(confFileInputStream);
351 }
352
353 // Do refreshUserToGroupsMappings with loaded core-site.xml
354 Groups.getUserToGroupsMappingServiceWithLoadedConfiguration(newConf)
355 .refresh();
356 }
357
358 private void loadConfigurationXml(String configurationFile)
359 throws YarnException, IOException {
360 InputStream configurationInputStream =
361 this.configurationProvider.getConfigurationInputStream(this.conf,
362 configurationFile);
363 if (configurationInputStream != null) {
364 this.conf.addResource(configurationInputStream, configurationFile);
365 }
366 }
367
368 protected EmbeddedElector createEmbeddedElector() throws IOException {
369 EmbeddedElector elector;
370 curatorEnabled =
371 conf.getBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR,
372 YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED);
373 if (curatorEnabled) {
374 this.zkManager = createAndStartZKManager(conf);
375 elector = new CuratorBasedElectorService(this);
376 } else {
377 elector = new ActiveStandbyElectorBasedElectorService(this);
378 }
379 return elector;
380 }
381
382 /**
383 * Get ZooKeeper Curator manager, creating and starting if not exists.
384 * @param config Configuration for the ZooKeeper curator.
385 * @return ZooKeeper Curator manager.
386 * @throws IOException If it cannot create the manager.
387 */
388 public ZKCuratorManager createAndStartZKManager(Configuration
389 config) throws IOException {
390 ZKCuratorManager manager = new ZKCuratorManager(config);
391
392 // Get authentication
393 List<AuthInfo> authInfos = new ArrayList<>();
394 if (HAUtil.isHAEnabled(config) && HAUtil.getConfValueForRMInstance(
395 YarnConfiguration.ZK_RM_STATE_STORE_ROOT_NODE_ACL, config) == null) {
396 String zkRootNodeUsername = HAUtil.getConfValueForRMInstance(
397 YarnConfiguration.RM_ADDRESS,
398 YarnConfiguration.DEFAULT_RM_ADDRESS, config);
399 String defaultFencingAuth =
400 zkRootNodeUsername + ":" + zkRootNodePassword;
401 byte[] defaultFencingAuthData =
402 defaultFencingAuth.getBytes(Charset.forName("UTF-8"));
403 String scheme = new DigestAuthenticationProvider().getScheme();
404 AuthInfo authInfo = new AuthInfo(scheme, defaultFencingAuthData);
405 authInfos.add(authInfo);
406 }
407
408 manager.start(authInfos);
409 return manager;
410 }
411
412 public ZKCuratorManager getZKManager() {
413 return zkManager;
414 }
415
416 public CuratorFramework getCurator() {
417 if (this.zkManager == null) {
418 return null;
419 }
420 return this.zkManager.getCurator();
421 }
422
423 public String getZkRootNodePassword() {
424 return this.zkRootNodePassword;
425 }
426
427
428 protected QueueACLsManager createQueueACLsManager(ResourceScheduler scheduler,
429 Configuration conf) {
430 return new QueueACLsManager(scheduler, conf);
431 }
432
433 @VisibleForTesting
434 protected void setRMStateStore(RMStateStore rmStore) {
435 rmStore.setRMDispatcher(rmDispatcher);
436 rmStore.setResourceManager(this);
437 rmContext.setStateStore(rmStore);
438 }
439
440 protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
441 return new EventDispatcher(this.scheduler, "SchedulerEventDispatcher");
442 }
443
444 protected Dispatcher createDispatcher() {
445 return new AsyncDispatcher("RM Event dispatcher");
446 }
447
448 protected ResourceScheduler createScheduler() {
449 String schedulerClassName = conf.get(YarnConfiguration.RM_SCHEDULER,
450 YarnConfiguration.DEFAULT_RM_SCHEDULER);
451 LOG.info("Using Scheduler: " + schedulerClassName);
452 try {
453 Class<?> schedulerClazz = Class.forName(schedulerClassName);
454 if (ResourceScheduler.class.isAssignableFrom(schedulerClazz)) {
455 return (ResourceScheduler) ReflectionUtils.newInstance(schedulerClazz,
456 this.conf);
457 } else {
458 throw new YarnRuntimeException("Class: " + schedulerClassName
459 + " not instance of " + ResourceScheduler.class.getCanonicalName());
460 }
461 } catch (ClassNotFoundException e) {
462 throw new YarnRuntimeException("Could not instantiate Scheduler: "
463 + schedulerClassName, e);
464 }
465 }
466
467 protected ReservationSystem createReservationSystem() {
468 String reservationClassName =
469 conf.get(YarnConfiguration.RM_RESERVATION_SYSTEM_CLASS,
470 AbstractReservationSystem.getDefaultReservationSystem(scheduler));
471 if (reservationClassName == null) {
472 return null;
473 }
474 LOG.info("Using ReservationSystem: " + reservationClassName);
475 try {
476 Class<?> reservationClazz = Class.forName(reservationClassName);
477 if (ReservationSystem.class.isAssignableFrom(reservationClazz)) {
478 return (ReservationSystem) ReflectionUtils.newInstance(
479 reservationClazz, this.conf);
480 } else {
481 throw new YarnRuntimeException("Class: " + reservationClassName
482 + " not instance of " + ReservationSystem.class.getCanonicalName());
483 }
484 } catch (ClassNotFoundException e) {
485 throw new YarnRuntimeException(
486 "Could not instantiate ReservationSystem: " + reservationClassName, e);
487 }
488 }
489
490 protected SystemServiceManager createServiceManager() {
491 String schedulerClassName =
492 YarnConfiguration.DEFAULT_YARN_API_SYSTEM_SERVICES_CLASS;
493 LOG.info("Using SystemServiceManager: " + schedulerClassName);
494 try {
495 Class<?> schedulerClazz = Class.forName(schedulerClassName);
496 if (SystemServiceManager.class.isAssignableFrom(schedulerClazz)) {
497 return (SystemServiceManager) ReflectionUtils
498 .newInstance(schedulerClazz, this.conf);
499 } else {
500 throw new YarnRuntimeException(
501 "Class: " + schedulerClassName + " not instance of "
502 + SystemServiceManager.class.getCanonicalName());
503 }
504 } catch (ClassNotFoundException e) {
505 throw new YarnRuntimeException(
506 "Could not instantiate SystemServiceManager: " + schedulerClassName,
507 e);
508 }
509 }
510
511 protected ApplicationMasterLauncher createAMLauncher() {
512 return new ApplicationMasterLauncher(this.rmContext);
513 }
514
515 private NMLivelinessMonitor createNMLivelinessMonitor() {
516 return new NMLivelinessMonitor(this.rmContext
517 .getDispatcher());
518 }
519
520 protected AMLivelinessMonitor createAMLivelinessMonitor() {
521 return new AMLivelinessMonitor(this.rmDispatcher);
522 }
523
524 protected RMNodeLabelsManager createNodeLabelManager()
525 throws InstantiationException, IllegalAccessException {
526 return new RMNodeLabelsManager();
527 }
528
529 protected AllocationTagsManager createAllocationTagsManager() {
530 return new AllocationTagsManager(this.rmContext);
531 }
532
533 protected PlacementConstraintManagerService
534 createPlacementConstraintManager() {
535 // Use the in memory Placement Constraint Manager.
536 return new MemoryPlacementConstraintManager();
537 }
538
539 protected DelegationTokenRenewer createDelegationTokenRenewer() {
540 return new DelegationTokenRenewer();
541 }
542
543 protected RMAppManager createRMAppManager() {
544 return new RMAppManager(this.rmContext, this.scheduler, this.masterService,
545 this.applicationACLsManager, this.conf);
546 }
547
548 protected RMApplicationHistoryWriter createRMApplicationHistoryWriter() {
549 return new RMApplicationHistoryWriter();
550 }
551
552 private RMTimelineCollectorManager createRMTimelineCollectorManager() {
553 return new RMTimelineCollectorManager(this);
554 }
555
556 private FederationStateStoreService createFederationStateStoreService() {
557 return new FederationStateStoreService(rmContext);
558 }
559
560 protected SystemMetricsPublisher createSystemMetricsPublisher() {
561 List<SystemMetricsPublisher> publishers =
562 new ArrayList<SystemMetricsPublisher>();
563 if (YarnConfiguration.timelineServiceV1Enabled(conf)) {
564 SystemMetricsPublisher publisherV1 = new TimelineServiceV1Publisher();
565 publishers.add(publisherV1);
566 }
567 if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
568 // we're dealing with the v.2.x publisher
569 LOG.info("system metrics publisher with the timeline service V2 is "
570 + "configured");
571 SystemMetricsPublisher publisherV2 = new TimelineServiceV2Publisher(
572 rmContext.getRMTimelineCollectorManager());
573 publishers.add(publisherV2);
574 }
575 if (publishers.isEmpty()) {
576 LOG.info("TimelineServicePublisher is not configured");
577 SystemMetricsPublisher noopPublisher = new NoOpSystemMetricPublisher();
578 publishers.add(noopPublisher);
579 }
580
581 for (SystemMetricsPublisher publisher : publishers) {
582 addIfService(publisher);
583 }
584
585 SystemMetricsPublisher combinedPublisher =
586 new CombinedSystemMetricsPublisher(publishers);
587 return combinedPublisher;
588 }
589
590 // sanity check for configurations
591 protected static void validateConfigs(Configuration conf) {
592 // validate max-attempts
593 int globalMaxAppAttempts =
594 conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
595 YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
596 if (globalMaxAppAttempts <= 0) {
597 throw new YarnRuntimeException("Invalid global max attempts configuration"
598 + ", " + YarnConfiguration.RM_AM_MAX_ATTEMPTS
599 + "=" + globalMaxAppAttempts + ", it should be a positive integer.");
600 }
601
602 // validate expireIntvl >= heartbeatIntvl
603 long expireIntvl = conf.getLong(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
604 YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
605 long heartbeatIntvl =
606 conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS,
607 YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS);
608 if (expireIntvl < heartbeatIntvl) {
609 throw new YarnRuntimeException("Nodemanager expiry interval should be no"
610 + " less than heartbeat interval, "
611 + YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS + "=" + expireIntvl
612 + ", " + YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS + "="
613 + heartbeatIntvl);
614 }
615 }
616
617 /**
618 * RMActiveServices handles all the Active services in the RM.
619 */
620 @Private
621 public class RMActiveServices extends CompositeService {
622
623 private DelegationTokenRenewer delegationTokenRenewer;
624 private EventHandler<SchedulerEvent> schedulerDispatcher;
625 private ApplicationMasterLauncher applicationMasterLauncher;
626 private ContainerAllocationExpirer containerAllocationExpirer;
627 private ResourceManager rm;
628 private boolean fromActive = false;
629 private StandByTransitionRunnable standByTransitionRunnable;
630
631 RMActiveServices(ResourceManager rm) {
632 super("RMActiveServices");
633 this.rm = rm;
634 }
635
636 @Override
637 protected void serviceInit(Configuration configuration) throws Exception {
638 standByTransitionRunnable = new StandByTransitionRunnable();
639
640 rmSecretManagerService = createRMSecretManagerService();
641 addService(rmSecretManagerService);
642
643 containerAllocationExpirer = new ContainerAllocationExpirer(rmDispatcher);
644 addService(containerAllocationExpirer);
645 rmContext.setContainerAllocationExpirer(containerAllocationExpirer);
646
647 AMLivelinessMonitor amLivelinessMonitor = createAMLivelinessMonitor();
648 addService(amLivelinessMonitor);
649 rmContext.setAMLivelinessMonitor(amLivelinessMonitor);
650
651 AMLivelinessMonitor amFinishingMonitor = createAMLivelinessMonitor();
652 addService(amFinishingMonitor);
653 rmContext.setAMFinishingMonitor(amFinishingMonitor);
654
655 RMAppLifetimeMonitor rmAppLifetimeMonitor = createRMAppLifetimeMonitor();
656 addService(rmAppLifetimeMonitor);
657 rmContext.setRMAppLifetimeMonitor(rmAppLifetimeMonitor);
658
659 RMNodeLabelsManager nlm = createNodeLabelManager();
660 nlm.setRMContext(rmContext);
661 addService(nlm);
662 rmContext.setNodeLabelManager(nlm);
663
664 AllocationTagsManager allocationTagsManager =
665 createAllocationTagsManager();
666 rmContext.setAllocationTagsManager(allocationTagsManager);
667
668 PlacementConstraintManagerService placementConstraintManager =
669 createPlacementConstraintManager();
670 addService(placementConstraintManager);
671 rmContext.setPlacementConstraintManager(placementConstraintManager);
672
673 // add resource profiles here because it's used by AbstractYarnScheduler
674 ResourceProfilesManager resourceProfilesManager =
675 createResourceProfileManager();
676 resourceProfilesManager.init(conf);
677 rmContext.setResourceProfilesManager(resourceProfilesManager);
678
679 RMDelegatedNodeLabelsUpdater delegatedNodeLabelsUpdater =
680 createRMDelegatedNodeLabelsUpdater();
681 if (delegatedNodeLabelsUpdater != null) {
682 addService(delegatedNodeLabelsUpdater);
683 rmContext.setRMDelegatedNodeLabelsUpdater(delegatedNodeLabelsUpdater);
684 }
685
686 recoveryEnabled = conf.getBoolean(YarnConfiguration.RECOVERY_ENABLED,
687 YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED);
688
689 RMStateStore rmStore = null;
690 if (recoveryEnabled) {
691 rmStore = RMStateStoreFactory.getStore(conf);
692 boolean isWorkPreservingRecoveryEnabled =
693 conf.getBoolean(
694 YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED,
695 YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_ENABLED);
696 rmContext
697 .setWorkPreservingRecoveryEnabled(isWorkPreservingRecoveryEnabled);
698 } else {
699 rmStore = new NullRMStateStore();
700 }
701
702 try {
703 rmStore.setResourceManager(rm);
704 rmStore.init(conf);
705 rmStore.setRMDispatcher(rmDispatcher);
706 } catch (Exception e) {
707 // the Exception from stateStore.init() needs to be handled for
708 // HA and we need to give up master status if we got fenced
709 LOG.error("Failed to init state store", e);
710 throw e;
711 }
712 rmContext.setStateStore(rmStore);
713
714 if (UserGroupInformation.isSecurityEnabled()) {
715 delegationTokenRenewer = createDelegationTokenRenewer();
716 rmContext.setDelegationTokenRenewer(delegationTokenRenewer);
717 }
718
719 // Register event handler for NodesListManager
720 nodesListManager = new NodesListManager(rmContext);
721 rmDispatcher.register(NodesListManagerEventType.class, nodesListManager);
722 addService(nodesListManager);
723 rmContext.setNodesListManager(nodesListManager);
724
725 // Initialize the scheduler
726 scheduler = createScheduler();
727 scheduler.setRMContext(rmContext);
728 addIfService(scheduler);
729 rmContext.setScheduler(scheduler);
730
731 schedulerDispatcher = createSchedulerEventDispatcher();
732 addIfService(schedulerDispatcher);
733 rmDispatcher.register(SchedulerEventType.class, schedulerDispatcher);
734
735 // Register event handler for RmAppEvents
736 rmDispatcher.register(RMAppEventType.class,
737 new ApplicationEventDispatcher(rmContext));
738
739 // Register event handler for RmAppAttemptEvents
740 rmDispatcher.register(RMAppAttemptEventType.class,
741 new ApplicationAttemptEventDispatcher(rmContext));
742
743 // Register event handler for RmNodes
744 rmDispatcher.register(
745 RMNodeEventType.class, new NodeEventDispatcher(rmContext));
746
747 nmLivelinessMonitor = createNMLivelinessMonitor();
748 addService(nmLivelinessMonitor);
749
750 resourceTracker = createResourceTrackerService();
751 addService(resourceTracker);
752 rmContext.setResourceTrackerService(resourceTracker);
753
754 MetricsSystem ms = DefaultMetricsSystem.initialize("ResourceManager");
755 if (fromActive) {
756 JvmMetrics.reattach(ms, jvmMetrics);
757 UserGroupInformation.reattachMetrics();
758 } else {
759 jvmMetrics = JvmMetrics.initSingleton("ResourceManager", null);
760 }
761
762 JvmPauseMonitor pauseMonitor = new JvmPauseMonitor();
763 addService(pauseMonitor);
764 jvmMetrics.setPauseMonitor(pauseMonitor);
765
766 // Initialize the Reservation system
767 if (conf.getBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE,
768 YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_ENABLE)) {
769 reservationSystem = createReservationSystem();
770 if (reservationSystem != null) {
771 reservationSystem.setRMContext(rmContext);
772 addIfService(reservationSystem);
773 rmContext.setReservationSystem(reservationSystem);
774 LOG.info("Initialized Reservation system");
775 }
776 }
777
778 masterService = createApplicationMasterService();
779 addService(masterService) ;
780 rmContext.setApplicationMasterService(masterService);
781
782 applicationACLsManager = new ApplicationACLsManager(conf);
783
784 queueACLsManager = createQueueACLsManager(scheduler, conf);
785
786 rmAppManager = createRMAppManager();
787 // Register event handler for RMAppManagerEvents
788 rmDispatcher.register(RMAppManagerEventType.class, rmAppManager);
789
790 clientRM = createClientRMService();
791 addService(clientRM);
792 rmContext.setClientRMService(clientRM);
793
794 applicationMasterLauncher = createAMLauncher();
795 rmDispatcher.register(AMLauncherEventType.class,
796 applicationMasterLauncher);
797
798 addService(applicationMasterLauncher);
799 if (UserGroupInformation.isSecurityEnabled()) {
800 addService(delegationTokenRenewer);
801 delegationTokenRenewer.setRMContext(rmContext);
802 }
803
804 if(HAUtil.isFederationEnabled(conf)) {
805 String cId = YarnConfiguration.getClusterId(conf);
806 if (cId.isEmpty()) {
807 String errMsg =
808 "Cannot initialize RM as Federation is enabled"
809 + " but cluster id is not configured.";
810 LOG.error(errMsg);
811 throw new YarnRuntimeException(errMsg);
812 }
813 federationStateStoreService = createFederationStateStoreService();
814 addIfService(federationStateStoreService);
815 LOG.info("Initialized Federation membership.");
816 }
817
818 new RMNMInfo(rmContext, scheduler);
819
820 if (conf.getBoolean(YarnConfiguration.YARN_API_SERVICES_ENABLE,
821 false)) {
822 SystemServiceManager systemServiceManager = createServiceManager();
823 addIfService(systemServiceManager);
824 }
825
826 super.serviceInit(conf);
827 }
828
829 @Override
830 protected void serviceStart() throws Exception {
831 RMStateStore rmStore = rmContext.getStateStore();
832 // The state store needs to start irrespective of recoveryEnabled as apps
833 // need events to move to further states.
834 rmStore.start();
835
836 if(recoveryEnabled) {
837 try {
838 LOG.info("Recovery started");
839 rmStore.checkVersion();
840 if (rmContext.isWorkPreservingRecoveryEnabled()) {
841 rmContext.setEpoch(rmStore.getAndIncrementEpoch());
842 }
843 RMState state = rmStore.loadState();
844 recover(state);
845 LOG.info("Recovery ended");
846 } catch (Exception e) {
847 // the Exception from loadState() needs to be handled for
848 // HA and we need to give up master status if we got fenced
849 LOG.error("Failed to load/recover state", e);
850 throw e;
851 }
852 } else {
853 if (HAUtil.isFederationEnabled(conf)) {
854 long epoch = conf.getLong(YarnConfiguration.RM_EPOCH,
855 YarnConfiguration.DEFAULT_RM_EPOCH);
856 rmContext.setEpoch(epoch);
857 LOG.info("Epoch set for Federation: " + epoch);
858 }
859 }
860
861 super.serviceStart();
862 }
863
864 @Override
865 protected void serviceStop() throws Exception {
866
867 super.serviceStop();
868
869 DefaultMetricsSystem.shutdown();
870 if (rmContext != null) {
871 RMStateStore store = rmContext.getStateStore();
872 try {
873 if (null != store) {
874 store.close();
875 }
876 } catch (Exception e) {
877 LOG.error("Error closing store.", e);
878 }
879 }
880
881 }
882 }
883
884 @Private
885 private class RMFatalEventDispatcher implements EventHandler<RMFatalEvent> {
886 @Override
887 public void handle(RMFatalEvent event) {
888 LOG.error("Received " + event);
889
890 if (HAUtil.isHAEnabled(getConfig())) {
891 // If we're in an HA config, the right answer is always to go into
892 // standby.
893 LOG.warn("Transitioning the resource manager to standby.");
894 handleTransitionToStandByInNewThread();
895 } else {
896 // If we're stand-alone, we probably want to shut down, but the if and
897 // how depends on the event.
898 switch(event.getType()) {
899 case STATE_STORE_FENCED:
900 LOG.fatal("State store fenced even though the resource manager " +
901 "is not configured for high availability. Shutting down this " +
902 "resource manager to protect the integrity of the state store.");
903 ExitUtil.terminate(1, event.getExplanation());
904 break;
905 case STATE_STORE_OP_FAILED:
906 if (YarnConfiguration.shouldRMFailFast(getConfig())) {
907 LOG.fatal("Shutting down the resource manager because a state " +
908 "store operation failed, and the resource manager is " +
909 "configured to fail fast. See the yarn.fail-fast and " +
910 "yarn.resourcemanager.fail-fast properties.");
911 ExitUtil.terminate(1, event.getExplanation());
912 } else {
913 LOG.warn("Ignoring state store operation failure because the " +
914 "resource manager is not configured to fail fast. See the " +
915 "yarn.fail-fast and yarn.resourcemanager.fail-fast " +
916 "properties.");
917 }
918 break;
919 default:
920 LOG.fatal("Shutting down the resource manager.");
921 ExitUtil.terminate(1, event.getExplanation());
922 }
923 }
924 }
925 }
926
927 /**
928 * Transition to standby state in a new thread. The transition operation is
929 * asynchronous to avoid deadlock caused by cyclic dependency.
930 */
931 private void handleTransitionToStandByInNewThread() {
932 Thread standByTransitionThread =
933 new Thread(activeServices.standByTransitionRunnable);
934 standByTransitionThread.setName("StandByTransitionThread");
935 standByTransitionThread.start();
936 }
937
938 /**
939 * The class to transition RM to standby state. The same
940 * {@link StandByTransitionRunnable} object could be used in multiple threads,
941 * but runs only once. That's because RM can go back to active state after
942 * transition to standby state, the same runnable in the old context can't
943 * transition RM to standby state again. A new runnable is created every time
944 * RM transitions to active state.
945 */
946 private class StandByTransitionRunnable implements Runnable {
947 // The atomic variable to make sure multiple threads with the same runnable
948 // run only once.
949 private final AtomicBoolean hasAlreadyRun = new AtomicBoolean(false);
950
951 @Override
952 public void run() {
953 // Run this only once, even if multiple threads end up triggering
954 // this simultaneously.
955 if (hasAlreadyRun.getAndSet(true)) {
956 return;
957 }
958
959 if (rmContext.isHAEnabled()) {
960 try {
961 // Transition to standby and reinit active services
962 LOG.info("Transitioning RM to Standby mode");
963 transitionToStandby(true);
964 EmbeddedElector elector = rmContext.getLeaderElectorService();
965 if (elector != null) {
966 elector.rejoinElection();
967 }
968 } catch (Exception e) {
969 LOG.fatal("Failed to transition RM to Standby mode.", e);
970 ExitUtil.terminate(1, e);
971 }
972 }
973 }
974 }
975
976 @Private
977 public static final class ApplicationEventDispatcher implements
978 EventHandler<RMAppEvent> {
979
980 private final RMContext rmContext;
981
982 public ApplicationEventDispatcher(RMContext rmContext) {
983 this.rmContext = rmContext;
984 }
985
986 @Override
987 public void handle(RMAppEvent event) {
988 ApplicationId appID = event.getApplicationId();
989 RMApp rmApp = this.rmContext.getRMApps().get(appID);
990 if (rmApp != null) {
991 try {
992 rmApp.handle(event);
993 } catch (Throwable t) {
994 LOG.error("Error in handling event type " + event.getType()
995 + " for application " + appID, t);
996 }
997 }
998 }
999 }
1000
1001 @Private
1002 public static final class ApplicationAttemptEventDispatcher implements
1003 EventHandler<RMAppAttemptEvent> {
1004
1005 private final RMContext rmContext;
1006
1007 public ApplicationAttemptEventDispatcher(RMContext rmContext) {
1008 this.rmContext = rmContext;
1009 }
1010
1011 @Override
1012 public void handle(RMAppAttemptEvent event) {
1013 ApplicationAttemptId appAttemptId = event.getApplicationAttemptId();
1014 ApplicationId appId = appAttemptId.getApplicationId();
1015 RMApp rmApp = this.rmContext.getRMApps().get(appId);
1016 if (rmApp != null) {
1017 RMAppAttempt rmAppAttempt = rmApp.getRMAppAttempt(appAttemptId);
1018 if (rmAppAttempt != null) {
1019 try {
1020 rmAppAttempt.handle(event);
1021 } catch (Throwable t) {
1022 LOG.error("Error in handling event type " + event.getType()
1023 + " for applicationAttempt " + appAttemptId, t);
1024 }
1025 } else if (rmApp.getApplicationSubmissionContext() != null
1026 && rmApp.getApplicationSubmissionContext()
1027 .getKeepContainersAcrossApplicationAttempts()
1028 && event.getType() == RMAppAttemptEventType.CONTAINER_FINISHED) {
1029 // For work-preserving AM restart, failed attempts are still
1030 // capturing CONTAINER_FINISHED events and record the finished
1031 // containers which will be used by current attempt.
1032 // We just keep 'yarn.resourcemanager.am.max-attempts' in
1033 // RMStateStore. If the finished container's attempt is deleted, we
1034 // use the first attempt in app.attempts to deal with these events.
1035
1036 RMAppAttempt previousFailedAttempt =
1037 rmApp.getAppAttempts().values().iterator().next();
1038 if (previousFailedAttempt != null) {
1039 try {
1040 LOG.debug("Event " + event.getType() + " handled by "
1041 + previousFailedAttempt);
1042 previousFailedAttempt.handle(event);
1043 } catch (Throwable t) {
1044 LOG.error("Error in handling event type " + event.getType()
1045 + " for applicationAttempt " + appAttemptId
1046 + " with " + previousFailedAttempt, t);
1047 }
1048 } else {
1049 LOG.error("Event " + event.getType()
1050 + " not handled, because previousFailedAttempt is null");
1051 }
1052 }
1053 }
1054 }
1055 }
1056
1057 @Private
1058 public static final class NodeEventDispatcher implements
1059 EventHandler<RMNodeEvent> {
1060
1061 private final RMContext rmContext;
1062
1063 public NodeEventDispatcher(RMContext rmContext) {
1064 this.rmContext = rmContext;
1065 }
1066
1067 @Override
1068 public void handle(RMNodeEvent event) {
1069 NodeId nodeId = event.getNodeId();
1070 RMNode node = this.rmContext.getRMNodes().get(nodeId);
1071 if (node != null) {
1072 try {
1073 ((EventHandler<RMNodeEvent>) node).handle(event);
1074 } catch (Throwable t) {
1075 LOG.error("Error in handling event type " + event.getType()
1076 + " for node " + nodeId, t);
1077 }
1078 }
1079 }
1080 }
1081
1082 /**
1083 * Return a HttpServer.Builder that the journalnode / namenode / secondary
1084 * namenode can use to initialize their HTTP / HTTPS server.
1085 *
1086 * @param conf configuration object
1087 * @param httpAddr HTTP address
1088 * @param httpsAddr HTTPS address
1089 * @param name Name of the server
1090 * @throws IOException from Builder
1091 * @return builder object
1092 */
1093 public static HttpServer2.Builder httpServerTemplateForRM(Configuration conf,
1094 final InetSocketAddress httpAddr, final InetSocketAddress httpsAddr,
1095 String name) throws IOException {
1096 HttpServer2.Builder builder = new HttpServer2.Builder().setName(name)
1097 .setConf(conf).setSecurityEnabled(false);
1098
1099 if (httpAddr.getPort() == 0) {
1100 builder.setFindPort(true);
1101 }
1102
1103 URI uri = URI.create("http://" + NetUtils.getHostPortString(httpAddr));
1104 builder.addEndpoint(uri);
1105 LOG.info("Starting Web-server for " + name + " at: " + uri);
1106
1107 return builder;
1108 }
1109
1110 protected void startWepApp() {
1111 Map<String, String> serviceConfig = null;
1112 Configuration conf = getConfig();
1113
1114 RMWebAppUtil.setupSecurityAndFilters(conf,
1115 getClientRMService().rmDTSecretManager);
1116
1117 Map<String, String> params = new HashMap<String, String>();
1118 if (getConfig().getBoolean(YarnConfiguration.YARN_API_SERVICES_ENABLE,
1119 false)) {
1120 String apiPackages = "org.apache.hadoop.yarn.service.webapp;" +
1121 "org.apache.hadoop.yarn.webapp";
1122 params.put("com.sun.jersey.config.property.resourceConfigClass",
1123 "com.sun.jersey.api.core.PackagesResourceConfig");
1124 params.put("com.sun.jersey.config.property.packages", apiPackages);
1125 }
1126
1127 Builder<ApplicationMasterService> builder =
1128 WebApps
1129 .$for("cluster", ApplicationMasterService.class, masterService,
1130 "ws")
1131 .with(conf)
1132 .withServlet("API-Service", "/app/*",
1133 ServletContainer.class, params)
1134 .withHttpSpnegoPrincipalKey(
1135 YarnConfiguration.RM_WEBAPP_SPNEGO_USER_NAME_KEY)
1136 .withHttpSpnegoKeytabKey(
1137 YarnConfiguration.RM_WEBAPP_SPNEGO_KEYTAB_FILE_KEY)
1138 .withCSRFProtection(YarnConfiguration.RM_CSRF_PREFIX)
1139 .withXFSProtection(YarnConfiguration.RM_XFS_PREFIX)
1140 .at(webAppAddress);
1141 String proxyHostAndPort = rmContext.getProxyHostAndPort(conf);
1142 if(WebAppUtils.getResolvedRMWebAppURLWithoutScheme(conf).
1143 equals(proxyHostAndPort)) {
1144 if (HAUtil.isHAEnabled(conf)) {
1145 fetcher = new AppReportFetcher(conf);
1146 } else {
1147 fetcher = new AppReportFetcher(conf, getClientRMService());
1148 }
1149 builder.withServlet(ProxyUriUtils.PROXY_SERVLET_NAME,
1150 ProxyUriUtils.PROXY_PATH_SPEC, WebAppProxyServlet.class);
1151 builder.withAttribute(WebAppProxy.FETCHER_ATTRIBUTE, fetcher);
1152 String[] proxyParts = proxyHostAndPort.split(":");
1153 builder.withAttribute(WebAppProxy.PROXY_HOST_ATTRIBUTE, proxyParts[0]);
1154 }
1155
1156 WebAppContext uiWebAppContext = null;
1157 if (getConfig().getBoolean(YarnConfiguration.YARN_WEBAPP_UI2_ENABLE,
1158 YarnConfiguration.DEFAULT_YARN_WEBAPP_UI2_ENABLE)) {
1159 String onDiskPath = getConfig()
1160 .get(YarnConfiguration.YARN_WEBAPP_UI2_WARFILE_PATH);
1161
1162 uiWebAppContext = new WebAppContext();
1163 uiWebAppContext.setContextPath(UI2_WEBAPP_NAME);
1164
1165 if (null == onDiskPath) {
1166 String war = "hadoop-yarn-ui-" + VersionInfo.getVersion() + ".war";
1167 URLClassLoader cl = (URLClassLoader) ClassLoader.getSystemClassLoader();
1168 URL url = cl.findResource(war);
1169
1170 if (null == url) {
1171 onDiskPath = getWebAppsPath("ui2");
1172 } else {
1173 onDiskPath = url.getFile();
1174 }
1175 }
1176 if (onDiskPath == null || onDiskPath.isEmpty()) {
1177 LOG.error("No war file or webapps found for ui2 !");
1178 } else {
1179 if (onDiskPath.endsWith(".war")) {
1180 uiWebAppContext.setWar(onDiskPath);
1181 LOG.info("Using war file at: " + onDiskPath);
1182 } else {
1183 uiWebAppContext.setResourceBase(onDiskPath);
1184 LOG.info("Using webapps at: " + onDiskPath);
1185 }
1186 }
1187 }
1188
1189 webApp = builder.start(new RMWebApp(this), uiWebAppContext);
1190 }
1191
1192 private String getWebAppsPath(String appName) {
1193 URL url = getClass().getClassLoader().getResource("webapps/" + appName);
1194 if (url == null) {
1195 return "";
1196 }
1197 return url.toString();
1198 }
1199
1200 /**
1201 * Helper method to create and init {@link #activeServices}. This creates an
1202 * instance of {@link RMActiveServices} and initializes it.
1203 *
1204 * @param fromActive Indicates if the call is from the active state transition
1205 * or the RM initialization.
1206 */
1207 protected void createAndInitActiveServices(boolean fromActive) {
1208 activeServices = new RMActiveServices(this);
1209 activeServices.fromActive = fromActive;
1210 activeServices.init(conf);
1211 }
1212
1213 /**
1214 * Helper method to start {@link #activeServices}.
1215 * @throws Exception
1216 */
1217 void startActiveServices() throws Exception {
1218 if (activeServices != null) {
1219 clusterTimeStamp = System.currentTimeMillis();
1220 activeServices.start();
1221 }
1222 }
1223
1224 /**
1225 * Helper method to stop {@link #activeServices}.
1226 * @throws Exception
1227 */
1228 void stopActiveServices() {
1229 if (activeServices != null) {
1230 activeServices.stop();
1231 activeServices = null;
1232 }
1233 }
1234
1235 void reinitialize(boolean initialize) {
1236 ClusterMetrics.destroy();
1237 QueueMetrics.clearQueueMetrics();
1238 if (initialize) {
1239 resetRMContext();
1240 createAndInitActiveServices(true);
1241 }
1242 }
1243
1244 @VisibleForTesting
1245 protected boolean areActiveServicesRunning() {
1246 return activeServices != null && activeServices.isInState(STATE.STARTED);
1247 }
1248
1249 synchronized void transitionToActive() throws Exception {
1250 if (rmContext.getHAServiceState() == HAServiceProtocol.HAServiceState.ACTIVE) {
1251 LOG.info("Already in active state");
1252 return;
1253 }
1254 LOG.info("Transitioning to active state");
1255
1256 this.rmLoginUGI.doAs(new PrivilegedExceptionAction<Void>() {
1257 @Override
1258 public Void run() throws Exception {
1259 try {
1260 startActiveServices();
1261 return null;
1262 } catch (Exception e) {
1263 reinitialize(true);
1264 throw e;
1265 }
1266 }
1267 });
1268
1269 rmContext.setHAServiceState(HAServiceProtocol.HAServiceState.ACTIVE);
1270 LOG.info("Transitioned to active state");
1271 }
1272
1273 synchronized void transitionToStandby(boolean initialize)
1274 throws Exception {
1275 if (rmContext.getHAServiceState() ==
1276 HAServiceProtocol.HAServiceState.STANDBY) {
1277 LOG.info("Already in standby state");
1278 return;
1279 }
1280
1281 LOG.info("Transitioning to standby state");
1282 HAServiceState state = rmContext.getHAServiceState();
1283 rmContext.setHAServiceState(HAServiceProtocol.HAServiceState.STANDBY);
1284 if (state == HAServiceProtocol.HAServiceState.ACTIVE) {
1285 stopActiveServices();
1286 reinitialize(initialize);
1287 }
1288 LOG.info("Transitioned to standby state");
1289 }
1290
1291 @Override
1292 protected void serviceStart() throws Exception {
1293 if (this.rmContext.isHAEnabled()) {
1294 transitionToStandby(false);
1295 } else {
1296 transitionToActive();
1297 }
1298
1299 startWepApp();
1300 if (getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER,
1301 false)) {
1302 int port = webApp.port();
1303 WebAppUtils.setRMWebAppPort(conf, port);
1304 }
1305 super.serviceStart();
1306 }
1307
1308 protected void doSecureLogin() throws IOException {
1309 InetSocketAddress socAddr = getBindAddress(conf);
1310 SecurityUtil.login(this.conf, YarnConfiguration.RM_KEYTAB,
1311 YarnConfiguration.RM_PRINCIPAL, socAddr.getHostName());
1312
1313 // if security is enable, set rmLoginUGI as UGI of loginUser
1314 if (UserGroupInformation.isSecurityEnabled()) {
1315 this.rmLoginUGI = UserGroupInformation.getLoginUser();
1316 }
1317 }
1318
1319 @Override
1320 protected void serviceStop() throws Exception {
1321 if (webApp != null) {
1322 webApp.stop();
1323 }
1324 if (fetcher != null) {
1325 fetcher.stop();
1326 }
1327 if (configurationProvider != null) {
1328 configurationProvider.close();
1329 }
1330 super.serviceStop();
1331 if (zkManager != null) {
1332 zkManager.close();
1333 }
1334 transitionToStandby(false);
1335 rmContext.setHAServiceState(HAServiceState.STOPPING);
1336 }
1337
1338 protected ResourceTrackerService createResourceTrackerService() {
1339 return new ResourceTrackerService(this.rmContext, this.nodesListManager,
1340 this.nmLivelinessMonitor,
1341 this.rmContext.getContainerTokenSecretManager(),
1342 this.rmContext.getNMTokenSecretManager());
1343 }
1344
1345 protected ClientRMService createClientRMService() {
1346 return new ClientRMService(this.rmContext, scheduler, this.rmAppManager,
1347 this.applicationACLsManager, this.queueACLsManager,
1348 this.rmContext.getRMDelegationTokenSecretManager());
1349 }
1350
1351 protected ApplicationMasterService createApplicationMasterService() {
1352 Configuration config = this.rmContext.getYarnConfiguration();
1353 if (YarnConfiguration.isOpportunisticContainerAllocationEnabled(config)
1354 || YarnConfiguration.isDistSchedulingEnabled(config)) {
1355 if (YarnConfiguration.isDistSchedulingEnabled(config) &&
1356 !YarnConfiguration
1357 .isOpportunisticContainerAllocationEnabled(config)) {
1358 throw new YarnRuntimeException(
1359 "Invalid parameters: opportunistic container allocation has to " +
1360 "be enabled when distributed scheduling is enabled.");
1361 }
1362 OpportunisticContainerAllocatorAMService
1363 oppContainerAllocatingAMService =
1364 new OpportunisticContainerAllocatorAMService(this.rmContext,
1365 scheduler);
1366 EventDispatcher oppContainerAllocEventDispatcher =
1367 new EventDispatcher(oppContainerAllocatingAMService,
1368 OpportunisticContainerAllocatorAMService.class.getName());
1369 // Add an event dispatcher for the
1370 // OpportunisticContainerAllocatorAMService to handle node
1371 // additions, updates and removals. Since the SchedulerEvent is currently
1372 // a super set of theses, we register interest for it.
1373 addService(oppContainerAllocEventDispatcher);
1374 rmDispatcher.register(SchedulerEventType.class,
1375 oppContainerAllocEventDispatcher);
1376 this.rmContext.setContainerQueueLimitCalculator(
1377 oppContainerAllocatingAMService.getNodeManagerQueueLimitCalculator());
1378 return oppContainerAllocatingAMService;
1379 }
1380 return new ApplicationMasterService(this.rmContext, scheduler);
1381 }
1382
1383 protected AdminService createAdminService() {
1384 return new AdminService(this);
1385 }
1386
1387 protected RMSecretManagerService createRMSecretManagerService() {
1388 return new RMSecretManagerService(conf, rmContext);
1389 }
1390
1391 /**
1392 * Create RMDelegatedNodeLabelsUpdater based on configuration.
1393 */
1394 protected RMDelegatedNodeLabelsUpdater createRMDelegatedNodeLabelsUpdater() {
1395 if (conf.getBoolean(YarnConfiguration.NODE_LABELS_ENABLED,
1396 YarnConfiguration.DEFAULT_NODE_LABELS_ENABLED)
1397 && YarnConfiguration.isDelegatedCentralizedNodeLabelConfiguration(
1398 conf)) {
1399 return new RMDelegatedNodeLabelsUpdater(rmContext);
1400 } else {
1401 return null;
1402 }
1403 }
1404
1405 @Private
1406 public ClientRMService getClientRMService() {
1407 return this.clientRM;
1408 }
1409
1410 /**
1411 * return the scheduler.
1412 * @return the scheduler for the Resource Manager.
1413 */
1414 @Private
1415 public ResourceScheduler getResourceScheduler() {
1416 return this.scheduler;
1417 }
1418
1419 /**
1420 * return the resource tracking component.
1421 * @return the resource tracking component.
1422 */
1423 @Private
1424 public ResourceTrackerService getResourceTrackerService() {
1425 return this.resourceTracker;
1426 }
1427
1428 @Private
1429 public ApplicationMasterService getApplicationMasterService() {
1430 return this.masterService;
1431 }
1432
1433 @Private
1434 public ApplicationACLsManager getApplicationACLsManager() {
1435 return this.applicationACLsManager;
1436 }
1437
1438 @Private
1439 public QueueACLsManager getQueueACLsManager() {
1440 return this.queueACLsManager;
1441 }
1442
1443 @Private
1444 @VisibleForTesting
1445 public FederationStateStoreService getFederationStateStoreService() {
1446 return this.federationStateStoreService;
1447 }
1448
1449 @Private
1450 WebApp getWebapp() {
1451 return this.webApp;
1452 }
1453
1454 @Override
1455 public void recover(RMState state) throws Exception {
1456 // recover RMdelegationTokenSecretManager
1457 rmContext.getRMDelegationTokenSecretManager().recover(state);
1458
1459 // recover AMRMTokenSecretManager
1460 rmContext.getAMRMTokenSecretManager().recover(state);
1461
1462 // recover reservations
1463 if (reservationSystem != null) {
1464 reservationSystem.recover(state);
1465 }
1466 // recover applications
1467 rmAppManager.recover(state);
1468
1469 setSchedulerRecoveryStartAndWaitTime(state, conf);
1470 }
1471
1472 public static void main(String argv[]) {
1473 Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
1474 StringUtils.startupShutdownMessage(ResourceManager.class, argv, LOG);
1475 try {
1476 Configuration conf = new YarnConfiguration();
1477 GenericOptionsParser hParser = new GenericOptionsParser(conf, argv);
1478 argv = hParser.getRemainingArgs();
1479 // If -format-state-store, then delete RMStateStore; else startup normally
1480 if (argv.length >= 1) {
1481 if (argv[0].equals("-format-state-store")) {
1482 deleteRMStateStore(conf);
1483 } else if (argv[0].equals("-remove-application-from-state-store")
1484 && argv.length == 2) {
1485 removeApplication(conf, argv[1]);
1486 } else {
1487 printUsage(System.err);
1488 }
1489 } else {
1490 ResourceManager resourceManager = new ResourceManager();
1491 ShutdownHookManager.get().addShutdownHook(
1492 new CompositeServiceShutdownHook(resourceManager),
1493 SHUTDOWN_HOOK_PRIORITY);
1494 resourceManager.init(conf);
1495 resourceManager.start();
1496 }
1497 } catch (Throwable t) {
1498 LOG.fatal("Error starting ResourceManager", t);
1499 System.exit(-1);
1500 }
1501 }
1502
1503 /**
1504 * Register the handlers for alwaysOn services
1505 */
1506 private Dispatcher setupDispatcher() {
1507 Dispatcher dispatcher = createDispatcher();
1508 dispatcher.register(RMFatalEventType.class,
1509 new ResourceManager.RMFatalEventDispatcher());
1510 return dispatcher;
1511 }
1512
1513 private void resetRMContext() {
1514 RMContextImpl rmContextImpl = new RMContextImpl();
1515 // transfer service context to new RM service Context
1516 rmContextImpl.setServiceContext(rmContext.getServiceContext());
1517
1518 // reset dispatcher
1519 Dispatcher dispatcher = setupDispatcher();
1520 ((Service) dispatcher).init(this.conf);
1521 ((Service) dispatcher).start();
1522 removeService((Service) rmDispatcher);
1523 // Need to stop previous rmDispatcher before assigning new dispatcher
1524 // otherwise causes "AsyncDispatcher event handler" thread leak
1525 ((Service) rmDispatcher).stop();
1526 rmDispatcher = dispatcher;
1527 addIfService(rmDispatcher);
1528 rmContextImpl.setDispatcher(dispatcher);
1529
1530 rmContext = rmContextImpl;
1531 }
1532
1533 private void setSchedulerRecoveryStartAndWaitTime(RMState state,
1534 Configuration conf) {
1535 if (!state.getApplicationState().isEmpty()) {
1536 long waitTime =
1537 conf.getLong(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
1538 YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS);
1539 rmContext.setSchedulerRecoveryStartAndWaitTime(waitTime);
1540 }
1541 }
1542
1543 /**
1544 * Retrieve RM bind address from configuration
1545 *
1546 * @param conf
1547 * @return InetSocketAddress
1548 */
1549 public static InetSocketAddress getBindAddress(Configuration conf) {
1550 return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
1551 YarnConfiguration.DEFAULT_RM_ADDRESS, YarnConfiguration.DEFAULT_RM_PORT);
1552 }
1553
1554 /**
1555 * Deletes the RMStateStore
1556 *
1557 * @param conf
1558 * @throws Exception
1559 */
1560 @VisibleForTesting
1561 static void deleteRMStateStore(Configuration conf) throws Exception {
1562 RMStateStore rmStore = RMStateStoreFactory.getStore(conf);
1563 rmStore.setResourceManager(new ResourceManager());
1564 rmStore.init(conf);
1565 rmStore.start();
1566 try {
1567 LOG.info("Deleting ResourceManager state store...");
1568 rmStore.deleteStore();
1569 LOG.info("State store deleted");
1570 } finally {
1571 rmStore.stop();
1572 }
1573 }
1574
1575 @VisibleForTesting
1576 static void removeApplication(Configuration conf, String applicationId)
1577 throws Exception {
1578 RMStateStore rmStore = RMStateStoreFactory.getStore(conf);
1579 rmStore.setResourceManager(new ResourceManager());
1580 rmStore.init(conf);
1581 rmStore.start();
1582 try {
1583 ApplicationId removeAppId = ApplicationId.fromString(applicationId);
1584 LOG.info("Deleting application " + removeAppId + " from state store");
1585 rmStore.removeApplication(removeAppId);
1586 LOG.info("Application is deleted from state store");
1587 } finally {
1588 rmStore.stop();
1589 }
1590 }
1591
1592 private static void printUsage(PrintStream out) {
1593 out.println("Usage: yarn resourcemanager [-format-state-store]");
1594 out.println(" "
1595 + "[-remove-application-from-state-store <appId>]" + "\n");
1596 }
1597
1598 protected RMAppLifetimeMonitor createRMAppLifetimeMonitor() {
1599 return new RMAppLifetimeMonitor(this.rmContext);
1600 }
1601 }