001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapreduce;
020
021 import java.io.IOException;
022 import java.net.InetSocketAddress;
023 import java.security.PrivilegedExceptionAction;
024 import java.util.ArrayList;
025 import java.util.List;
026 import java.util.ServiceLoader;
027
028 import org.apache.commons.logging.Log;
029 import org.apache.commons.logging.LogFactory;
030 import org.apache.hadoop.classification.InterfaceAudience;
031 import org.apache.hadoop.classification.InterfaceStability;
032 import org.apache.hadoop.conf.Configuration;
033 import org.apache.hadoop.fs.FileSystem;
034 import org.apache.hadoop.fs.Path;
035 import org.apache.hadoop.io.Text;
036 import org.apache.hadoop.mapred.JobConf;
037 import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
038 import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider;
039 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
040 import org.apache.hadoop.mapreduce.util.ConfigUtil;
041 import org.apache.hadoop.mapreduce.v2.LogParams;
042 import org.apache.hadoop.security.UserGroupInformation;
043 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
044 import org.apache.hadoop.security.token.Token;
045
046 /**
047 * Provides a way to access information about the map/reduce cluster.
048 */
049 @InterfaceAudience.Public
050 @InterfaceStability.Evolving
051 public class Cluster {
052
053 @InterfaceStability.Evolving
054 public static enum JobTrackerStatus {INITIALIZING, RUNNING};
055
056 private ClientProtocolProvider clientProtocolProvider;
057 private ClientProtocol client;
058 private UserGroupInformation ugi;
059 private Configuration conf;
060 private FileSystem fs = null;
061 private Path sysDir = null;
062 private Path stagingAreaDir = null;
063 private Path jobHistoryDir = null;
064 private static final Log LOG = LogFactory.getLog(Cluster.class);
065
066 private static ServiceLoader<ClientProtocolProvider> frameworkLoader =
067 ServiceLoader.load(ClientProtocolProvider.class);
068
069 static {
070 ConfigUtil.loadResources();
071 }
072
073 public Cluster(Configuration conf) throws IOException {
074 this(null, conf);
075 }
076
077 public Cluster(InetSocketAddress jobTrackAddr, Configuration conf)
078 throws IOException {
079 this.conf = conf;
080 this.ugi = UserGroupInformation.getCurrentUser();
081 initialize(jobTrackAddr, conf);
082 }
083
084 private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)
085 throws IOException {
086
087 synchronized (frameworkLoader) {
088 for (ClientProtocolProvider provider : frameworkLoader) {
089 LOG.debug("Trying ClientProtocolProvider : "
090 + provider.getClass().getName());
091 ClientProtocol clientProtocol = null;
092 try {
093 if (jobTrackAddr == null) {
094 clientProtocol = provider.create(conf);
095 } else {
096 clientProtocol = provider.create(jobTrackAddr, conf);
097 }
098
099 if (clientProtocol != null) {
100 clientProtocolProvider = provider;
101 client = clientProtocol;
102 LOG.debug("Picked " + provider.getClass().getName()
103 + " as the ClientProtocolProvider");
104 break;
105 }
106 else {
107 LOG.debug("Cannot pick " + provider.getClass().getName()
108 + " as the ClientProtocolProvider - returned null protocol");
109 }
110 }
111 catch (Exception e) {
112 LOG.info("Failed to use " + provider.getClass().getName()
113 + " due to error: " + e.getMessage());
114 }
115 }
116 }
117
118 if (null == clientProtocolProvider || null == client) {
119 throw new IOException(
120 "Cannot initialize Cluster. Please check your configuration for "
121 + MRConfig.FRAMEWORK_NAME
122 + " and the correspond server addresses.");
123 }
124 }
125
126 ClientProtocol getClient() {
127 return client;
128 }
129
130 Configuration getConf() {
131 return conf;
132 }
133
134 /**
135 * Close the <code>Cluster</code>.
136 */
137 public synchronized void close() throws IOException {
138 clientProtocolProvider.close(client);
139 }
140
141 private Job[] getJobs(JobStatus[] stats) throws IOException {
142 List<Job> jobs = new ArrayList<Job>();
143 for (JobStatus stat : stats) {
144 jobs.add(Job.getInstance(this, stat, new JobConf(stat.getJobFile())));
145 }
146 return jobs.toArray(new Job[0]);
147 }
148
149 /**
150 * Get the file system where job-specific files are stored
151 *
152 * @return object of FileSystem
153 * @throws IOException
154 * @throws InterruptedException
155 */
156 public synchronized FileSystem getFileSystem()
157 throws IOException, InterruptedException {
158 if (this.fs == null) {
159 try {
160 this.fs = ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
161 public FileSystem run() throws IOException, InterruptedException {
162 final Path sysDir = new Path(client.getSystemDir());
163 return sysDir.getFileSystem(getConf());
164 }
165 });
166 } catch (InterruptedException e) {
167 throw new RuntimeException(e);
168 }
169 }
170 return fs;
171 }
172
173 /**
174 * Get job corresponding to jobid.
175 *
176 * @param jobId
177 * @return object of {@link Job}
178 * @throws IOException
179 * @throws InterruptedException
180 */
181 public Job getJob(JobID jobId) throws IOException, InterruptedException {
182 JobStatus status = client.getJobStatus(jobId);
183 if (status != null) {
184 return Job.getInstance(this, status, new JobConf(status.getJobFile()));
185 }
186 return null;
187 }
188
189 /**
190 * Get all the queues in cluster.
191 *
192 * @return array of {@link QueueInfo}
193 * @throws IOException
194 * @throws InterruptedException
195 */
196 public QueueInfo[] getQueues() throws IOException, InterruptedException {
197 return client.getQueues();
198 }
199
200 /**
201 * Get queue information for the specified name.
202 *
203 * @param name queuename
204 * @return object of {@link QueueInfo}
205 * @throws IOException
206 * @throws InterruptedException
207 */
208 public QueueInfo getQueue(String name)
209 throws IOException, InterruptedException {
210 return client.getQueue(name);
211 }
212
213 /**
214 * Get log parameters for the specified jobID or taskAttemptID
215 * @param jobID the job id.
216 * @param taskAttemptID the task attempt id. Optional.
217 * @return the LogParams
218 * @throws IOException
219 * @throws InterruptedException
220 */
221 public LogParams getLogParams(JobID jobID, TaskAttemptID taskAttemptID)
222 throws IOException, InterruptedException {
223 return client.getLogFileParams(jobID, taskAttemptID);
224 }
225
226 /**
227 * Get current cluster status.
228 *
229 * @return object of {@link ClusterMetrics}
230 * @throws IOException
231 * @throws InterruptedException
232 */
233 public ClusterMetrics getClusterStatus() throws IOException, InterruptedException {
234 return client.getClusterMetrics();
235 }
236
237 /**
238 * Get all active trackers in the cluster.
239 *
240 * @return array of {@link TaskTrackerInfo}
241 * @throws IOException
242 * @throws InterruptedException
243 */
244 public TaskTrackerInfo[] getActiveTaskTrackers()
245 throws IOException, InterruptedException {
246 return client.getActiveTrackers();
247 }
248
249 /**
250 * Get blacklisted trackers.
251 *
252 * @return array of {@link TaskTrackerInfo}
253 * @throws IOException
254 * @throws InterruptedException
255 */
256 public TaskTrackerInfo[] getBlackListedTaskTrackers()
257 throws IOException, InterruptedException {
258 return client.getBlacklistedTrackers();
259 }
260
261 /**
262 * Get all the jobs in cluster.
263 *
264 * @return array of {@link Job}
265 * @throws IOException
266 * @throws InterruptedException
267 * @deprecated Use {@link #getAllJobStatuses()} instead.
268 */
269 @Deprecated
270 public Job[] getAllJobs() throws IOException, InterruptedException {
271 return getJobs(client.getAllJobs());
272 }
273
274 /**
275 * Get job status for all jobs in the cluster.
276 * @return job status for all jobs in cluster
277 * @throws IOException
278 * @throws InterruptedException
279 */
280 public JobStatus[] getAllJobStatuses() throws IOException, InterruptedException {
281 return client.getAllJobs();
282 }
283
284 /**
285 * Grab the jobtracker system directory path where
286 * job-specific files will be placed.
287 *
288 * @return the system directory where job-specific files are to be placed.
289 */
290 public Path getSystemDir() throws IOException, InterruptedException {
291 if (sysDir == null) {
292 sysDir = new Path(client.getSystemDir());
293 }
294 return sysDir;
295 }
296
297 /**
298 * Grab the jobtracker's view of the staging directory path where
299 * job-specific files will be placed.
300 *
301 * @return the staging directory where job-specific files are to be placed.
302 */
303 public Path getStagingAreaDir() throws IOException, InterruptedException {
304 if (stagingAreaDir == null) {
305 stagingAreaDir = new Path(client.getStagingAreaDir());
306 }
307 return stagingAreaDir;
308 }
309
310 /**
311 * Get the job history file path for a given job id. The job history file at
312 * this path may or may not be existing depending on the job completion state.
313 * The file is present only for the completed jobs.
314 * @param jobId the JobID of the job submitted by the current user.
315 * @return the file path of the job history file
316 * @throws IOException
317 * @throws InterruptedException
318 */
319 public String getJobHistoryUrl(JobID jobId) throws IOException,
320 InterruptedException {
321 if (jobHistoryDir == null) {
322 jobHistoryDir = new Path(client.getJobHistoryDir());
323 }
324 return new Path(jobHistoryDir, jobId.toString() + "_"
325 + ugi.getShortUserName()).toString();
326 }
327
328 /**
329 * Gets the Queue ACLs for current user
330 * @return array of QueueAclsInfo object for current user.
331 * @throws IOException
332 */
333 public QueueAclsInfo[] getQueueAclsForCurrentUser()
334 throws IOException, InterruptedException {
335 return client.getQueueAclsForCurrentUser();
336 }
337
338 /**
339 * Gets the root level queues.
340 * @return array of JobQueueInfo object.
341 * @throws IOException
342 */
343 public QueueInfo[] getRootQueues() throws IOException, InterruptedException {
344 return client.getRootQueues();
345 }
346
347 /**
348 * Returns immediate children of queueName.
349 * @param queueName
350 * @return array of JobQueueInfo which are children of queueName
351 * @throws IOException
352 */
353 public QueueInfo[] getChildQueues(String queueName)
354 throws IOException, InterruptedException {
355 return client.getChildQueues(queueName);
356 }
357
358 /**
359 * Get the JobTracker's status.
360 *
361 * @return {@link JobTrackerStatus} of the JobTracker
362 * @throws IOException
363 * @throws InterruptedException
364 */
365 public JobTrackerStatus getJobTrackerStatus() throws IOException,
366 InterruptedException {
367 return client.getJobTrackerStatus();
368 }
369
370 /**
371 * Get the tasktracker expiry interval for the cluster
372 * @return the expiry interval in msec
373 */
374 public long getTaskTrackerExpiryInterval() throws IOException,
375 InterruptedException {
376 return client.getTaskTrackerExpiryInterval();
377 }
378
379 /**
380 * Get a delegation token for the user from the JobTracker.
381 * @param renewer the user who can renew the token
382 * @return the new token
383 * @throws IOException
384 */
385 public Token<DelegationTokenIdentifier>
386 getDelegationToken(Text renewer) throws IOException, InterruptedException{
387 // client has already set the service
388 return client.getDelegationToken(renewer);
389 }
390
391 /**
392 * Renew a delegation token
393 * @param token the token to renew
394 * @return the new expiration time
395 * @throws InvalidToken
396 * @throws IOException
397 * @deprecated Use {@link Token#renew} instead
398 */
399 public long renewDelegationToken(Token<DelegationTokenIdentifier> token
400 ) throws InvalidToken, IOException,
401 InterruptedException {
402 return token.renew(getConf());
403 }
404
405 /**
406 * Cancel a delegation token from the JobTracker
407 * @param token the token to cancel
408 * @throws IOException
409 * @deprecated Use {@link Token#cancel} instead
410 */
411 public void cancelDelegationToken(Token<DelegationTokenIdentifier> token
412 ) throws IOException,
413 InterruptedException {
414 token.cancel(getConf());
415 }
416
417 }