001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.yarn.client;
020
021 import java.net.InetSocketAddress;
022 import java.util.ArrayList;
023 import java.util.List;
024
025 import org.apache.commons.logging.Log;
026 import org.apache.commons.logging.LogFactory;
027 import org.apache.hadoop.classification.InterfaceAudience;
028 import org.apache.hadoop.classification.InterfaceStability;
029 import org.apache.hadoop.conf.Configuration;
030 import org.apache.hadoop.io.Text;
031 import org.apache.hadoop.ipc.RPC;
032 import org.apache.hadoop.yarn.api.ClientRMProtocol;
033 import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
034 import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
035 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
036 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
037 import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
038 import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
039 import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
040 import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
041 import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
042 import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
043 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
044 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
045 import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
046 import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
047 import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
048 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
049 import org.apache.hadoop.yarn.api.records.ApplicationId;
050 import org.apache.hadoop.yarn.api.records.ApplicationReport;
051 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
052 import org.apache.hadoop.yarn.api.records.DelegationToken;
053 import org.apache.hadoop.yarn.api.records.NodeReport;
054 import org.apache.hadoop.yarn.api.records.QueueInfo;
055 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
056 import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
057 import org.apache.hadoop.yarn.conf.YarnConfiguration;
058 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
059 import org.apache.hadoop.yarn.ipc.YarnRPC;
060 import org.apache.hadoop.yarn.service.AbstractService;
061 import org.apache.hadoop.yarn.util.Records;
062
063 @InterfaceAudience.Public
064 @InterfaceStability.Evolving
065 public class YarnClientImpl extends AbstractService implements YarnClient {
066
067 private static final Log LOG = LogFactory.getLog(YarnClientImpl.class);
068
069 protected ClientRMProtocol rmClient;
070 protected InetSocketAddress rmAddress;
071
072 private static final String ROOT = "root";
073
074 public YarnClientImpl() {
075 this(null);
076 }
077
078 public YarnClientImpl(InetSocketAddress rmAddress) {
079 super(YarnClientImpl.class.getName());
080 this.rmAddress = rmAddress;
081 }
082
083 private static InetSocketAddress getRmAddress(Configuration conf) {
084 return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
085 YarnConfiguration.DEFAULT_RM_ADDRESS, YarnConfiguration.DEFAULT_RM_PORT);
086 }
087
088 @Override
089 public synchronized void init(Configuration conf) {
090 if (this.rmAddress == null) {
091 this.rmAddress = getRmAddress(conf);
092 }
093 super.init(conf);
094 }
095
096 @Override
097 public synchronized void start() {
098 YarnRPC rpc = YarnRPC.create(getConfig());
099
100 this.rmClient = (ClientRMProtocol) rpc.getProxy(
101 ClientRMProtocol.class, rmAddress, getConfig());
102 if (LOG.isDebugEnabled()) {
103 LOG.debug("Connecting to ResourceManager at " + rmAddress);
104 }
105 super.start();
106 }
107
108 @Override
109 public synchronized void stop() {
110 if (this.rmClient != null) {
111 RPC.stopProxy(this.rmClient);
112 }
113 super.stop();
114 }
115
116 @Override
117 public GetNewApplicationResponse getNewApplication()
118 throws YarnRemoteException {
119 GetNewApplicationRequest request =
120 Records.newRecord(GetNewApplicationRequest.class);
121 return rmClient.getNewApplication(request);
122 }
123
124 @Override
125 public ApplicationId
126 submitApplication(ApplicationSubmissionContext appContext)
127 throws YarnRemoteException {
128 ApplicationId applicationId = appContext.getApplicationId();
129 appContext.setApplicationId(applicationId);
130 SubmitApplicationRequest request =
131 Records.newRecord(SubmitApplicationRequest.class);
132 request.setApplicationSubmissionContext(appContext);
133 rmClient.submitApplication(request);
134 LOG.info("Submitted application " + applicationId + " to ResourceManager"
135 + " at " + rmAddress);
136 return applicationId;
137 }
138
139 @Override
140 public void killApplication(ApplicationId applicationId)
141 throws YarnRemoteException {
142 LOG.info("Killing application " + applicationId);
143 KillApplicationRequest request =
144 Records.newRecord(KillApplicationRequest.class);
145 request.setApplicationId(applicationId);
146 rmClient.forceKillApplication(request);
147 }
148
149 @Override
150 public ApplicationReport getApplicationReport(ApplicationId appId)
151 throws YarnRemoteException {
152 GetApplicationReportRequest request =
153 Records.newRecord(GetApplicationReportRequest.class);
154 request.setApplicationId(appId);
155 GetApplicationReportResponse response =
156 rmClient.getApplicationReport(request);
157 return response.getApplicationReport();
158 }
159
160 @Override
161 public List<ApplicationReport> getApplicationList()
162 throws YarnRemoteException {
163 GetAllApplicationsRequest request =
164 Records.newRecord(GetAllApplicationsRequest.class);
165 GetAllApplicationsResponse response = rmClient.getAllApplications(request);
166 return response.getApplicationList();
167 }
168
169 @Override
170 public YarnClusterMetrics getYarnClusterMetrics() throws YarnRemoteException {
171 GetClusterMetricsRequest request =
172 Records.newRecord(GetClusterMetricsRequest.class);
173 GetClusterMetricsResponse response = rmClient.getClusterMetrics(request);
174 return response.getClusterMetrics();
175 }
176
177 @Override
178 public List<NodeReport> getNodeReports() throws YarnRemoteException {
179 GetClusterNodesRequest request =
180 Records.newRecord(GetClusterNodesRequest.class);
181 GetClusterNodesResponse response = rmClient.getClusterNodes(request);
182 return response.getNodeReports();
183 }
184
185 @Override
186 public DelegationToken getRMDelegationToken(Text renewer)
187 throws YarnRemoteException {
188 /* get the token from RM */
189 GetDelegationTokenRequest rmDTRequest =
190 Records.newRecord(GetDelegationTokenRequest.class);
191 rmDTRequest.setRenewer(renewer.toString());
192 GetDelegationTokenResponse response =
193 rmClient.getDelegationToken(rmDTRequest);
194 return response.getRMDelegationToken();
195 }
196
197
198 private GetQueueInfoRequest
199 getQueueInfoRequest(String queueName, boolean includeApplications,
200 boolean includeChildQueues, boolean recursive) {
201 GetQueueInfoRequest request = Records.newRecord(GetQueueInfoRequest.class);
202 request.setQueueName(queueName);
203 request.setIncludeApplications(includeApplications);
204 request.setIncludeChildQueues(includeChildQueues);
205 request.setRecursive(recursive);
206 return request;
207 }
208
209 @Override
210 public QueueInfo getQueueInfo(String queueName) throws YarnRemoteException {
211 GetQueueInfoRequest request =
212 getQueueInfoRequest(queueName, true, false, false);
213 Records.newRecord(GetQueueInfoRequest.class);
214 return rmClient.getQueueInfo(request).getQueueInfo();
215 }
216
217 @Override
218 public List<QueueUserACLInfo> getQueueAclsInfo() throws YarnRemoteException {
219 GetQueueUserAclsInfoRequest request =
220 Records.newRecord(GetQueueUserAclsInfoRequest.class);
221 return rmClient.getQueueUserAcls(request).getUserAclsInfoList();
222 }
223
224 @Override
225 public List<QueueInfo> getAllQueues() throws YarnRemoteException {
226 List<QueueInfo> queues = new ArrayList<QueueInfo>();
227
228 QueueInfo rootQueue =
229 rmClient.getQueueInfo(getQueueInfoRequest(ROOT, false, true, true))
230 .getQueueInfo();
231 getChildQueues(rootQueue, queues, true);
232 return queues;
233 }
234
235 @Override
236 public List<QueueInfo> getRootQueueInfos() throws YarnRemoteException {
237 List<QueueInfo> queues = new ArrayList<QueueInfo>();
238
239 QueueInfo rootQueue =
240 rmClient.getQueueInfo(getQueueInfoRequest(ROOT, false, true, true))
241 .getQueueInfo();
242 getChildQueues(rootQueue, queues, false);
243 return queues;
244 }
245
246 @Override
247 public List<QueueInfo> getChildQueueInfos(String parent)
248 throws YarnRemoteException {
249 List<QueueInfo> queues = new ArrayList<QueueInfo>();
250
251 QueueInfo parentQueue =
252 rmClient.getQueueInfo(getQueueInfoRequest(parent, false, true, false))
253 .getQueueInfo();
254 getChildQueues(parentQueue, queues, true);
255 return queues;
256 }
257
258 private void getChildQueues(QueueInfo parent, List<QueueInfo> queues,
259 boolean recursive) {
260 List<QueueInfo> childQueues = parent.getChildQueues();
261
262 for (QueueInfo child : childQueues) {
263 queues.add(child);
264 if (recursive) {
265 getChildQueues(child, queues, recursive);
266 }
267 }
268 }
269 }