001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.hadoop.mapred;
019
020 import java.io.FileNotFoundException;
021 import java.io.IOException;
022 import java.net.InetSocketAddress;
023 import java.net.URL;
024 import java.security.PrivilegedExceptionAction;
025 import java.util.ArrayList;
026 import java.util.Collection;
027 import java.util.List;
028
029 import org.apache.hadoop.classification.InterfaceAudience;
030 import org.apache.hadoop.classification.InterfaceStability;
031 import org.apache.hadoop.conf.Configuration;
032 import org.apache.hadoop.fs.FileSystem;
033 import org.apache.hadoop.fs.Path;
034 import org.apache.hadoop.io.Text;
035 import org.apache.hadoop.mapred.ClusterStatus.BlackListInfo;
036 import org.apache.hadoop.mapreduce.Cluster;
037 import org.apache.hadoop.mapreduce.ClusterMetrics;
038 import org.apache.hadoop.mapreduce.Job;
039 import org.apache.hadoop.mapreduce.QueueInfo;
040 import org.apache.hadoop.mapreduce.TaskTrackerInfo;
041 import org.apache.hadoop.mapreduce.TaskType;
042 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
043 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
044 import org.apache.hadoop.mapreduce.tools.CLI;
045 import org.apache.hadoop.mapreduce.util.ConfigUtil;
046 import org.apache.hadoop.security.UserGroupInformation;
047 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
048 import org.apache.hadoop.security.token.Token;
049 import org.apache.hadoop.security.token.TokenRenewer;
050 import org.apache.hadoop.util.Tool;
051 import org.apache.hadoop.util.ToolRunner;
052
053 /**
054 * <code>JobClient</code> is the primary interface for the user-job to interact
055 * with the cluster.
056 *
057 * <code>JobClient</code> provides facilities to submit jobs, track their
058 * progress, access component-tasks' reports/logs, get the Map-Reduce cluster
059 * status information etc.
060 *
061 * <p>The job submission process involves:
062 * <ol>
063 * <li>
064 * Checking the input and output specifications of the job.
065 * </li>
066 * <li>
067 * Computing the {@link InputSplit}s for the job.
068 * </li>
069 * <li>
070 * Setup the requisite accounting information for the {@link DistributedCache}
071 * of the job, if necessary.
072 * </li>
073 * <li>
074 * Copying the job's jar and configuration to the map-reduce system directory
075 * on the distributed file-system.
076 * </li>
077 * <li>
078 * Submitting the job to the cluster and optionally monitoring
079 * it's status.
080 * </li>
081 * </ol></p>
082 *
083 * Normally the user creates the application, describes various facets of the
084 * job via {@link JobConf} and then uses the <code>JobClient</code> to submit
085 * the job and monitor its progress.
086 *
087 * <p>Here is an example on how to use <code>JobClient</code>:</p>
088 * <p><blockquote><pre>
089 * // Create a new JobConf
090 * JobConf job = new JobConf(new Configuration(), MyJob.class);
091 *
092 * // Specify various job-specific parameters
093 * job.setJobName("myjob");
094 *
095 * job.setInputPath(new Path("in"));
096 * job.setOutputPath(new Path("out"));
097 *
098 * job.setMapperClass(MyJob.MyMapper.class);
099 * job.setReducerClass(MyJob.MyReducer.class);
100 *
101 * // Submit the job, then poll for progress until the job is complete
102 * JobClient.runJob(job);
103 * </pre></blockquote></p>
104 *
105 * <h4 id="JobControl">Job Control</h4>
106 *
107 * <p>At times clients would chain map-reduce jobs to accomplish complex tasks
108 * which cannot be done via a single map-reduce job. This is fairly easy since
109 * the output of the job, typically, goes to distributed file-system and that
110 * can be used as the input for the next job.</p>
111 *
112 * <p>However, this also means that the onus on ensuring jobs are complete
113 * (success/failure) lies squarely on the clients. In such situations the
114 * various job-control options are:
115 * <ol>
116 * <li>
117 * {@link #runJob(JobConf)} : submits the job and returns only after
118 * the job has completed.
119 * </li>
120 * <li>
121 * {@link #submitJob(JobConf)} : only submits the job, then poll the
122 * returned handle to the {@link RunningJob} to query status and make
123 * scheduling decisions.
124 * </li>
125 * <li>
126 * {@link JobConf#setJobEndNotificationURI(String)} : setup a notification
127 * on job-completion, thus avoiding polling.
128 * </li>
129 * </ol></p>
130 *
131 * @see JobConf
132 * @see ClusterStatus
133 * @see Tool
134 * @see DistributedCache
135 */
136 @InterfaceAudience.Public
137 @InterfaceStability.Stable
138 public class JobClient extends CLI {
139 public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
140 private TaskStatusFilter taskOutputFilter = TaskStatusFilter.FAILED;
141 /* notes that get delegation token was called. Again this is hack for oozie
142 * to make sure we add history server delegation tokens to the credentials
143 * for the job. Since the api only allows one delegation token to be returned,
144 * we have to add this hack.
145 */
146 private boolean getDelegationTokenCalled = false;
147 /* do we need a HS delegation token for this client */
148 static final String HS_DELEGATION_TOKEN_REQUIRED
149 = "mapreduce.history.server.delegationtoken.required";
150
151 static{
152 ConfigUtil.loadResources();
153 }
154
155 /**
156 * A NetworkedJob is an implementation of RunningJob. It holds
157 * a JobProfile object to provide some info, and interacts with the
158 * remote service to provide certain functionality.
159 */
160 static class NetworkedJob implements RunningJob {
161 Job job;
162 /**
163 * We store a JobProfile and a timestamp for when we last
164 * acquired the job profile. If the job is null, then we cannot
165 * perform any of the tasks. The job might be null if the cluster
166 * has completely forgotten about the job. (eg, 24 hours after the
167 * job completes.)
168 */
169 public NetworkedJob(JobStatus status, Cluster cluster) throws IOException {
170 job = Job.getInstance(cluster, status, new JobConf(status.getJobFile()));
171 }
172
173 public NetworkedJob(Job job) throws IOException {
174 this.job = job;
175 }
176
177 public Configuration getConfiguration() {
178 return job.getConfiguration();
179 }
180
181 /**
182 * An identifier for the job
183 */
184 public JobID getID() {
185 return JobID.downgrade(job.getJobID());
186 }
187
188 /** @deprecated This method is deprecated and will be removed. Applications should
189 * rather use {@link #getID()}.*/
190 @Deprecated
191 public String getJobID() {
192 return getID().toString();
193 }
194
195 /**
196 * The user-specified job name
197 */
198 public String getJobName() {
199 return job.getJobName();
200 }
201
202 /**
203 * The name of the job file
204 */
205 public String getJobFile() {
206 return job.getJobFile();
207 }
208
209 /**
210 * A URL where the job's status can be seen
211 */
212 public String getTrackingURL() {
213 return job.getTrackingURL();
214 }
215
216 /**
217 * A float between 0.0 and 1.0, indicating the % of map work
218 * completed.
219 */
220 public float mapProgress() throws IOException {
221 try {
222 return job.mapProgress();
223 } catch (InterruptedException ie) {
224 throw new IOException(ie);
225 }
226 }
227
228 /**
229 * A float between 0.0 and 1.0, indicating the % of reduce work
230 * completed.
231 */
232 public float reduceProgress() throws IOException {
233 try {
234 return job.reduceProgress();
235 } catch (InterruptedException ie) {
236 throw new IOException(ie);
237 }
238 }
239
240 /**
241 * A float between 0.0 and 1.0, indicating the % of cleanup work
242 * completed.
243 */
244 public float cleanupProgress() throws IOException {
245 try {
246 return job.cleanupProgress();
247 } catch (InterruptedException ie) {
248 throw new IOException(ie);
249 }
250 }
251
252 /**
253 * A float between 0.0 and 1.0, indicating the % of setup work
254 * completed.
255 */
256 public float setupProgress() throws IOException {
257 try {
258 return job.setupProgress();
259 } catch (InterruptedException ie) {
260 throw new IOException(ie);
261 }
262 }
263
264 /**
265 * Returns immediately whether the whole job is done yet or not.
266 */
267 public synchronized boolean isComplete() throws IOException {
268 try {
269 return job.isComplete();
270 } catch (InterruptedException ie) {
271 throw new IOException(ie);
272 }
273 }
274
275 /**
276 * True iff job completed successfully.
277 */
278 public synchronized boolean isSuccessful() throws IOException {
279 try {
280 return job.isSuccessful();
281 } catch (InterruptedException ie) {
282 throw new IOException(ie);
283 }
284 }
285
286 /**
287 * Blocks until the job is finished
288 */
289 public void waitForCompletion() throws IOException {
290 try {
291 job.waitForCompletion(false);
292 } catch (InterruptedException ie) {
293 throw new IOException(ie);
294 } catch (ClassNotFoundException ce) {
295 throw new IOException(ce);
296 }
297 }
298
299 /**
300 * Tells the service to get the state of the current job.
301 */
302 public synchronized int getJobState() throws IOException {
303 try {
304 return job.getJobState().getValue();
305 } catch (InterruptedException ie) {
306 throw new IOException(ie);
307 }
308 }
309
310 /**
311 * Tells the service to terminate the current job.
312 */
313 public synchronized void killJob() throws IOException {
314 try {
315 job.killJob();
316 } catch (InterruptedException ie) {
317 throw new IOException(ie);
318 }
319 }
320
321
322 /** Set the priority of the job.
323 * @param priority new priority of the job.
324 */
325 public synchronized void setJobPriority(String priority)
326 throws IOException {
327 try {
328 job.setPriority(
329 org.apache.hadoop.mapreduce.JobPriority.valueOf(priority));
330 } catch (InterruptedException ie) {
331 throw new IOException(ie);
332 }
333 }
334
335 /**
336 * Kill indicated task attempt.
337 * @param taskId the id of the task to kill.
338 * @param shouldFail if true the task is failed and added to failed tasks list, otherwise
339 * it is just killed, w/o affecting job failure status.
340 */
341 public synchronized void killTask(TaskAttemptID taskId,
342 boolean shouldFail) throws IOException {
343 try {
344 if (shouldFail) {
345 job.failTask(taskId);
346 } else {
347 job.killTask(taskId);
348 }
349 } catch (InterruptedException ie) {
350 throw new IOException(ie);
351 }
352 }
353
354 /** @deprecated Applications should rather use {@link #killTask(TaskAttemptID, boolean)}*/
355 @Deprecated
356 public synchronized void killTask(String taskId, boolean shouldFail) throws IOException {
357 killTask(TaskAttemptID.forName(taskId), shouldFail);
358 }
359
360 /**
361 * Fetch task completion events from cluster for this job.
362 */
363 public synchronized TaskCompletionEvent[] getTaskCompletionEvents(
364 int startFrom) throws IOException {
365 try {
366 org.apache.hadoop.mapreduce.TaskCompletionEvent[] acls =
367 job.getTaskCompletionEvents(startFrom, 10);
368 TaskCompletionEvent[] ret = new TaskCompletionEvent[acls.length];
369 for (int i = 0 ; i < acls.length; i++ ) {
370 ret[i] = TaskCompletionEvent.downgrade(acls[i]);
371 }
372 return ret;
373 } catch (InterruptedException ie) {
374 throw new IOException(ie);
375 }
376 }
377
378 /**
379 * Dump stats to screen
380 */
381 @Override
382 public String toString() {
383 return job.toString();
384 }
385
386 /**
387 * Returns the counters for this job
388 */
389 public Counters getCounters() throws IOException {
390 try {
391 Counters result = null;
392 org.apache.hadoop.mapreduce.Counters temp = job.getCounters();
393 if(temp != null) {
394 result = Counters.downgrade(temp);
395 }
396 return result;
397 } catch (InterruptedException ie) {
398 throw new IOException(ie);
399 }
400 }
401
402 @Override
403 public String[] getTaskDiagnostics(TaskAttemptID id) throws IOException {
404 try {
405 return job.getTaskDiagnostics(id);
406 } catch (InterruptedException ie) {
407 throw new IOException(ie);
408 }
409 }
410
411 public String getHistoryUrl() throws IOException {
412 try {
413 return job.getHistoryUrl();
414 } catch (InterruptedException ie) {
415 throw new IOException(ie);
416 }
417 }
418
419 public boolean isRetired() throws IOException {
420 try {
421 return job.isRetired();
422 } catch (InterruptedException ie) {
423 throw new IOException(ie);
424 }
425 }
426
427 boolean monitorAndPrintJob() throws IOException, InterruptedException {
428 return job.monitorAndPrintJob();
429 }
430
431 @Override
432 public String getFailureInfo() throws IOException {
433 try {
434 return job.getStatus().getFailureInfo();
435 } catch (InterruptedException ie) {
436 throw new IOException(ie);
437 }
438 }
439
440 @Override
441 public JobStatus getJobStatus() throws IOException {
442 try {
443 return JobStatus.downgrade(job.getStatus());
444 } catch (InterruptedException ie) {
445 throw new IOException(ie);
446 }
447 }
448 }
449
450 /**
451 * Ugi of the client. We store this ugi when the client is created and
452 * then make sure that the same ugi is used to run the various protocols.
453 */
454 UserGroupInformation clientUgi;
455
456 /**
457 * Create a job client.
458 */
459 public JobClient() {
460 }
461
462 /**
463 * Build a job client with the given {@link JobConf}, and connect to the
464 * default cluster
465 *
466 * @param conf the job configuration.
467 * @throws IOException
468 */
469 public JobClient(JobConf conf) throws IOException {
470 init(conf);
471 }
472
473 /**
474 * Build a job client with the given {@link Configuration},
475 * and connect to the default cluster
476 *
477 * @param conf the configuration.
478 * @throws IOException
479 */
480 public JobClient(Configuration conf) throws IOException {
481 init(new JobConf(conf));
482 }
483
484 /**
485 * Connect to the default cluster
486 * @param conf the job configuration.
487 * @throws IOException
488 */
489 public void init(JobConf conf) throws IOException {
490 setConf(conf);
491 cluster = new Cluster(conf);
492 clientUgi = UserGroupInformation.getCurrentUser();
493 }
494
495 /**
496 * Build a job client, connect to the indicated job tracker.
497 *
498 * @param jobTrackAddr the job tracker to connect to.
499 * @param conf configuration.
500 */
501 public JobClient(InetSocketAddress jobTrackAddr,
502 Configuration conf) throws IOException {
503 cluster = new Cluster(jobTrackAddr, conf);
504 clientUgi = UserGroupInformation.getCurrentUser();
505 }
506
507 /**
508 * Close the <code>JobClient</code>.
509 */
510 public synchronized void close() throws IOException {
511 cluster.close();
512 }
513
514 /**
515 * Get a filesystem handle. We need this to prepare jobs
516 * for submission to the MapReduce system.
517 *
518 * @return the filesystem handle.
519 */
520 public synchronized FileSystem getFs() throws IOException {
521 try {
522 return cluster.getFileSystem();
523 } catch (InterruptedException ie) {
524 throw new IOException(ie);
525 }
526 }
527
528 /**
529 * Get a handle to the Cluster
530 */
531 public Cluster getClusterHandle() {
532 return cluster;
533 }
534
535 /**
536 * Submit a job to the MR system.
537 *
538 * This returns a handle to the {@link RunningJob} which can be used to track
539 * the running-job.
540 *
541 * @param jobFile the job configuration.
542 * @return a handle to the {@link RunningJob} which can be used to track the
543 * running-job.
544 * @throws FileNotFoundException
545 * @throws InvalidJobConfException
546 * @throws IOException
547 */
548 public RunningJob submitJob(String jobFile) throws FileNotFoundException,
549 InvalidJobConfException,
550 IOException {
551 // Load in the submitted job details
552 JobConf job = new JobConf(jobFile);
553 return submitJob(job);
554 }
555
556 /**
557 * Submit a job to the MR system.
558 * This returns a handle to the {@link RunningJob} which can be used to track
559 * the running-job.
560 *
561 * @param conf the job configuration.
562 * @return a handle to the {@link RunningJob} which can be used to track the
563 * running-job.
564 * @throws FileNotFoundException
565 * @throws IOException
566 */
567 public RunningJob submitJob(final JobConf conf) throws FileNotFoundException,
568 IOException {
569 try {
570 conf.setBooleanIfUnset("mapred.mapper.new-api", false);
571 conf.setBooleanIfUnset("mapred.reducer.new-api", false);
572 if (getDelegationTokenCalled) {
573 conf.setBoolean(HS_DELEGATION_TOKEN_REQUIRED, getDelegationTokenCalled);
574 getDelegationTokenCalled = false;
575 }
576 Job job = clientUgi.doAs(new PrivilegedExceptionAction<Job> () {
577 @Override
578 public Job run() throws IOException, ClassNotFoundException,
579 InterruptedException {
580 Job job = Job.getInstance(conf);
581 job.submit();
582 return job;
583 }
584 });
585 // update our Cluster instance with the one created by Job for submission
586 // (we can't pass our Cluster instance to Job, since Job wraps the config
587 // instance, and the two configs would then diverge)
588 cluster = job.getCluster();
589 return new NetworkedJob(job);
590 } catch (InterruptedException ie) {
591 throw new IOException("interrupted", ie);
592 }
593 }
594
595 private Job getJobUsingCluster(final JobID jobid) throws IOException,
596 InterruptedException {
597 return clientUgi.doAs(new PrivilegedExceptionAction<Job>() {
598 public Job run() throws IOException, InterruptedException {
599 return cluster.getJob(jobid);
600 }
601 });
602 }
603 /**
604 * Get an {@link RunningJob} object to track an ongoing job. Returns
605 * null if the id does not correspond to any known job.
606 *
607 * @param jobid the jobid of the job.
608 * @return the {@link RunningJob} handle to track the job, null if the
609 * <code>jobid</code> doesn't correspond to any known job.
610 * @throws IOException
611 */
612 public RunningJob getJob(final JobID jobid) throws IOException {
613 try {
614
615 Job job = getJobUsingCluster(jobid);
616 if (job != null) {
617 JobStatus status = JobStatus.downgrade(job.getStatus());
618 if (status != null) {
619 return new NetworkedJob(status, cluster);
620 }
621 }
622 } catch (InterruptedException ie) {
623 throw new IOException(ie);
624 }
625 return null;
626 }
627
628 /**@deprecated Applications should rather use {@link #getJob(JobID)}.
629 */
630 @Deprecated
631 public RunningJob getJob(String jobid) throws IOException {
632 return getJob(JobID.forName(jobid));
633 }
634
635 private static final TaskReport[] EMPTY_TASK_REPORTS = new TaskReport[0];
636
637 /**
638 * Get the information of the current state of the map tasks of a job.
639 *
640 * @param jobId the job to query.
641 * @return the list of all of the map tips.
642 * @throws IOException
643 */
644 public TaskReport[] getMapTaskReports(JobID jobId) throws IOException {
645 return getTaskReports(jobId, TaskType.MAP);
646 }
647
648 private TaskReport[] getTaskReports(final JobID jobId, TaskType type) throws
649 IOException {
650 try {
651 Job j = getJobUsingCluster(jobId);
652 if(j == null) {
653 return EMPTY_TASK_REPORTS;
654 }
655 return TaskReport.downgradeArray(j.getTaskReports(type));
656 } catch (InterruptedException ie) {
657 throw new IOException(ie);
658 }
659 }
660
661 /**@deprecated Applications should rather use {@link #getMapTaskReports(JobID)}*/
662 @Deprecated
663 public TaskReport[] getMapTaskReports(String jobId) throws IOException {
664 return getMapTaskReports(JobID.forName(jobId));
665 }
666
667 /**
668 * Get the information of the current state of the reduce tasks of a job.
669 *
670 * @param jobId the job to query.
671 * @return the list of all of the reduce tips.
672 * @throws IOException
673 */
674 public TaskReport[] getReduceTaskReports(JobID jobId) throws IOException {
675 return getTaskReports(jobId, TaskType.REDUCE);
676 }
677
678 /**
679 * Get the information of the current state of the cleanup tasks of a job.
680 *
681 * @param jobId the job to query.
682 * @return the list of all of the cleanup tips.
683 * @throws IOException
684 */
685 public TaskReport[] getCleanupTaskReports(JobID jobId) throws IOException {
686 return getTaskReports(jobId, TaskType.JOB_CLEANUP);
687 }
688
689 /**
690 * Get the information of the current state of the setup tasks of a job.
691 *
692 * @param jobId the job to query.
693 * @return the list of all of the setup tips.
694 * @throws IOException
695 */
696 public TaskReport[] getSetupTaskReports(JobID jobId) throws IOException {
697 return getTaskReports(jobId, TaskType.JOB_SETUP);
698 }
699
700
701 /**@deprecated Applications should rather use {@link #getReduceTaskReports(JobID)}*/
702 @Deprecated
703 public TaskReport[] getReduceTaskReports(String jobId) throws IOException {
704 return getReduceTaskReports(JobID.forName(jobId));
705 }
706
707 /**
708 * Display the information about a job's tasks, of a particular type and
709 * in a particular state
710 *
711 * @param jobId the ID of the job
712 * @param type the type of the task (map/reduce/setup/cleanup)
713 * @param state the state of the task
714 * (pending/running/completed/failed/killed)
715 */
716 public void displayTasks(final JobID jobId, String type, String state)
717 throws IOException {
718 try {
719 Job job = getJobUsingCluster(jobId);
720 super.displayTasks(job, type, state);
721 } catch (InterruptedException ie) {
722 throw new IOException(ie);
723 }
724 }
725
726 /**
727 * Get status information about the Map-Reduce cluster.
728 *
729 * @return the status information about the Map-Reduce cluster as an object
730 * of {@link ClusterStatus}.
731 * @throws IOException
732 */
733 public ClusterStatus getClusterStatus() throws IOException {
734 try {
735 return clientUgi.doAs(new PrivilegedExceptionAction<ClusterStatus>() {
736 public ClusterStatus run() throws IOException, InterruptedException {
737 ClusterMetrics metrics = cluster.getClusterStatus();
738 return new ClusterStatus(metrics.getTaskTrackerCount(),
739 metrics.getBlackListedTaskTrackerCount(), cluster.getTaskTrackerExpiryInterval(),
740 metrics.getOccupiedMapSlots(),
741 metrics.getOccupiedReduceSlots(), metrics.getMapSlotCapacity(),
742 metrics.getReduceSlotCapacity(),
743 cluster.getJobTrackerStatus(),
744 metrics.getDecommissionedTaskTrackerCount());
745 }
746 });
747 }
748 catch (InterruptedException ie) {
749 throw new IOException(ie);
750 }
751 }
752
753 private Collection<String> arrayToStringList(TaskTrackerInfo[] objs) {
754 Collection<String> list = new ArrayList<String>();
755 for (TaskTrackerInfo info: objs) {
756 list.add(info.getTaskTrackerName());
757 }
758 return list;
759 }
760
761 private Collection<BlackListInfo> arrayToBlackListInfo(TaskTrackerInfo[] objs) {
762 Collection<BlackListInfo> list = new ArrayList<BlackListInfo>();
763 for (TaskTrackerInfo info: objs) {
764 BlackListInfo binfo = new BlackListInfo();
765 binfo.setTrackerName(info.getTaskTrackerName());
766 binfo.setReasonForBlackListing(info.getReasonForBlacklist());
767 binfo.setBlackListReport(info.getBlacklistReport());
768 list.add(binfo);
769 }
770 return list;
771 }
772
773 /**
774 * Get status information about the Map-Reduce cluster.
775 *
776 * @param detailed if true then get a detailed status including the
777 * tracker names
778 * @return the status information about the Map-Reduce cluster as an object
779 * of {@link ClusterStatus}.
780 * @throws IOException
781 */
782 public ClusterStatus getClusterStatus(boolean detailed) throws IOException {
783 try {
784 return clientUgi.doAs(new PrivilegedExceptionAction<ClusterStatus>() {
785 public ClusterStatus run() throws IOException, InterruptedException {
786 ClusterMetrics metrics = cluster.getClusterStatus();
787 return new ClusterStatus(arrayToStringList(cluster.getActiveTaskTrackers()),
788 arrayToBlackListInfo(cluster.getBlackListedTaskTrackers()),
789 cluster.getTaskTrackerExpiryInterval(), metrics.getOccupiedMapSlots(),
790 metrics.getOccupiedReduceSlots(), metrics.getMapSlotCapacity(),
791 metrics.getReduceSlotCapacity(),
792 cluster.getJobTrackerStatus());
793 }
794 });
795 } catch (InterruptedException ie) {
796 throw new IOException(ie);
797 }
798 }
799
800
801 /**
802 * Get the jobs that are not completed and not failed.
803 *
804 * @return array of {@link JobStatus} for the running/to-be-run jobs.
805 * @throws IOException
806 */
807 public JobStatus[] jobsToComplete() throws IOException {
808 List<JobStatus> stats = new ArrayList<JobStatus>();
809 for (JobStatus stat : getAllJobs()) {
810 if (!stat.isJobComplete()) {
811 stats.add(stat);
812 }
813 }
814 return stats.toArray(new JobStatus[0]);
815 }
816
817 /**
818 * Get the jobs that are submitted.
819 *
820 * @return array of {@link JobStatus} for the submitted jobs.
821 * @throws IOException
822 */
823 public JobStatus[] getAllJobs() throws IOException {
824 try {
825 org.apache.hadoop.mapreduce.JobStatus[] jobs =
826 clientUgi.doAs(new PrivilegedExceptionAction<
827 org.apache.hadoop.mapreduce.JobStatus[]> () {
828 public org.apache.hadoop.mapreduce.JobStatus[] run()
829 throws IOException, InterruptedException {
830 return cluster.getAllJobStatuses();
831 }
832 });
833 JobStatus[] stats = new JobStatus[jobs.length];
834 for (int i = 0; i < jobs.length; i++) {
835 stats[i] = JobStatus.downgrade(jobs[i]);
836 }
837 return stats;
838 } catch (InterruptedException ie) {
839 throw new IOException(ie);
840 }
841 }
842
843 /**
844 * Utility that submits a job, then polls for progress until the job is
845 * complete.
846 *
847 * @param job the job configuration.
848 * @throws IOException if the job fails
849 */
850 public static RunningJob runJob(JobConf job) throws IOException {
851 JobClient jc = new JobClient(job);
852 RunningJob rj = jc.submitJob(job);
853 try {
854 if (!jc.monitorAndPrintJob(job, rj)) {
855 throw new IOException("Job failed!");
856 }
857 } catch (InterruptedException ie) {
858 Thread.currentThread().interrupt();
859 }
860 return rj;
861 }
862
863 /**
864 * Monitor a job and print status in real-time as progress is made and tasks
865 * fail.
866 * @param conf the job's configuration
867 * @param job the job to track
868 * @return true if the job succeeded
869 * @throws IOException if communication to the JobTracker fails
870 */
871 public boolean monitorAndPrintJob(JobConf conf,
872 RunningJob job
873 ) throws IOException, InterruptedException {
874 return ((NetworkedJob)job).monitorAndPrintJob();
875 }
876
877 static String getTaskLogURL(TaskAttemptID taskId, String baseUrl) {
878 return (baseUrl + "/tasklog?plaintext=true&attemptid=" + taskId);
879 }
880
881 static Configuration getConfiguration(String jobTrackerSpec)
882 {
883 Configuration conf = new Configuration();
884 if (jobTrackerSpec != null) {
885 if (jobTrackerSpec.indexOf(":") >= 0) {
886 conf.set("mapred.job.tracker", jobTrackerSpec);
887 } else {
888 String classpathFile = "hadoop-" + jobTrackerSpec + ".xml";
889 URL validate = conf.getResource(classpathFile);
890 if (validate == null) {
891 throw new RuntimeException(classpathFile + " not found on CLASSPATH");
892 }
893 conf.addResource(classpathFile);
894 }
895 }
896 return conf;
897 }
898
899 /**
900 * Sets the output filter for tasks. only those tasks are printed whose
901 * output matches the filter.
902 * @param newValue task filter.
903 */
904 @Deprecated
905 public void setTaskOutputFilter(TaskStatusFilter newValue){
906 this.taskOutputFilter = newValue;
907 }
908
909 /**
910 * Get the task output filter out of the JobConf.
911 *
912 * @param job the JobConf to examine.
913 * @return the filter level.
914 */
915 public static TaskStatusFilter getTaskOutputFilter(JobConf job) {
916 return TaskStatusFilter.valueOf(job.get("jobclient.output.filter",
917 "FAILED"));
918 }
919
920 /**
921 * Modify the JobConf to set the task output filter.
922 *
923 * @param job the JobConf to modify.
924 * @param newValue the value to set.
925 */
926 public static void setTaskOutputFilter(JobConf job,
927 TaskStatusFilter newValue) {
928 job.set("jobclient.output.filter", newValue.toString());
929 }
930
931 /**
932 * Returns task output filter.
933 * @return task filter.
934 */
935 @Deprecated
936 public TaskStatusFilter getTaskOutputFilter(){
937 return this.taskOutputFilter;
938 }
939
940 protected long getCounter(org.apache.hadoop.mapreduce.Counters cntrs,
941 String counterGroupName, String counterName) throws IOException {
942 Counters counters = Counters.downgrade(cntrs);
943 return counters.findCounter(counterGroupName, counterName).getValue();
944 }
945
946 /**
947 * Get status information about the max available Maps in the cluster.
948 *
949 * @return the max available Maps in the cluster
950 * @throws IOException
951 */
952 public int getDefaultMaps() throws IOException {
953 try {
954 return clientUgi.doAs(new PrivilegedExceptionAction<Integer>() {
955 @Override
956 public Integer run() throws IOException, InterruptedException {
957 return cluster.getClusterStatus().getMapSlotCapacity();
958 }
959 });
960 } catch (InterruptedException ie) {
961 throw new IOException(ie);
962 }
963 }
964
965 /**
966 * Get status information about the max available Reduces in the cluster.
967 *
968 * @return the max available Reduces in the cluster
969 * @throws IOException
970 */
971 public int getDefaultReduces() throws IOException {
972 try {
973 return clientUgi.doAs(new PrivilegedExceptionAction<Integer>() {
974 @Override
975 public Integer run() throws IOException, InterruptedException {
976 return cluster.getClusterStatus().getReduceSlotCapacity();
977 }
978 });
979 } catch (InterruptedException ie) {
980 throw new IOException(ie);
981 }
982 }
983
984 /**
985 * Grab the jobtracker system directory path where job-specific files are to be placed.
986 *
987 * @return the system directory where job-specific files are to be placed.
988 */
989 public Path getSystemDir() {
990 try {
991 return clientUgi.doAs(new PrivilegedExceptionAction<Path>() {
992 @Override
993 public Path run() throws IOException, InterruptedException {
994 return cluster.getSystemDir();
995 }
996 });
997 } catch (IOException ioe) {
998 return null;
999 } catch (InterruptedException ie) {
1000 return null;
1001 }
1002 }
1003
1004 private JobQueueInfo getJobQueueInfo(QueueInfo queue) {
1005 JobQueueInfo ret = new JobQueueInfo(queue);
1006 // make sure to convert any children
1007 if (queue.getQueueChildren().size() > 0) {
1008 List<JobQueueInfo> childQueues = new ArrayList<JobQueueInfo>(queue
1009 .getQueueChildren().size());
1010 for (QueueInfo child : queue.getQueueChildren()) {
1011 childQueues.add(getJobQueueInfo(child));
1012 }
1013 ret.setChildren(childQueues);
1014 }
1015 return ret;
1016 }
1017
1018 private JobQueueInfo[] getJobQueueInfoArray(QueueInfo[] queues)
1019 throws IOException {
1020 JobQueueInfo[] ret = new JobQueueInfo[queues.length];
1021 for (int i = 0; i < queues.length; i++) {
1022 ret[i] = getJobQueueInfo(queues[i]);
1023 }
1024 return ret;
1025 }
1026
1027 /**
1028 * Returns an array of queue information objects about root level queues
1029 * configured
1030 *
1031 * @return the array of root level JobQueueInfo objects
1032 * @throws IOException
1033 */
1034 public JobQueueInfo[] getRootQueues() throws IOException {
1035 try {
1036 return clientUgi.doAs(new PrivilegedExceptionAction<JobQueueInfo[]>() {
1037 public JobQueueInfo[] run() throws IOException, InterruptedException {
1038 return getJobQueueInfoArray(cluster.getRootQueues());
1039 }
1040 });
1041 } catch (InterruptedException ie) {
1042 throw new IOException(ie);
1043 }
1044 }
1045
1046 /**
1047 * Returns an array of queue information objects about immediate children
1048 * of queue queueName.
1049 *
1050 * @param queueName
1051 * @return the array of immediate children JobQueueInfo objects
1052 * @throws IOException
1053 */
1054 public JobQueueInfo[] getChildQueues(final String queueName) throws IOException {
1055 try {
1056 return clientUgi.doAs(new PrivilegedExceptionAction<JobQueueInfo[]>() {
1057 public JobQueueInfo[] run() throws IOException, InterruptedException {
1058 return getJobQueueInfoArray(cluster.getChildQueues(queueName));
1059 }
1060 });
1061 } catch (InterruptedException ie) {
1062 throw new IOException(ie);
1063 }
1064 }
1065
1066 /**
1067 * Return an array of queue information objects about all the Job Queues
1068 * configured.
1069 *
1070 * @return Array of JobQueueInfo objects
1071 * @throws IOException
1072 */
1073 public JobQueueInfo[] getQueues() throws IOException {
1074 try {
1075 return clientUgi.doAs(new PrivilegedExceptionAction<JobQueueInfo[]>() {
1076 public JobQueueInfo[] run() throws IOException, InterruptedException {
1077 return getJobQueueInfoArray(cluster.getQueues());
1078 }
1079 });
1080 } catch (InterruptedException ie) {
1081 throw new IOException(ie);
1082 }
1083 }
1084
1085 /**
1086 * Gets all the jobs which were added to particular Job Queue
1087 *
1088 * @param queueName name of the Job Queue
1089 * @return Array of jobs present in the job queue
1090 * @throws IOException
1091 */
1092
1093 public JobStatus[] getJobsFromQueue(final String queueName) throws IOException {
1094 try {
1095 QueueInfo queue = clientUgi.doAs(new PrivilegedExceptionAction<QueueInfo>() {
1096 @Override
1097 public QueueInfo run() throws IOException, InterruptedException {
1098 return cluster.getQueue(queueName);
1099 }
1100 });
1101 if (queue == null) {
1102 return null;
1103 }
1104 org.apache.hadoop.mapreduce.JobStatus[] stats =
1105 queue.getJobStatuses();
1106 JobStatus[] ret = new JobStatus[stats.length];
1107 for (int i = 0 ; i < stats.length; i++ ) {
1108 ret[i] = JobStatus.downgrade(stats[i]);
1109 }
1110 return ret;
1111 } catch (InterruptedException ie) {
1112 throw new IOException(ie);
1113 }
1114 }
1115
1116 /**
1117 * Gets the queue information associated to a particular Job Queue
1118 *
1119 * @param queueName name of the job queue.
1120 * @return Queue information associated to particular queue.
1121 * @throws IOException
1122 */
1123 public JobQueueInfo getQueueInfo(final String queueName) throws IOException {
1124 try {
1125 QueueInfo queueInfo = clientUgi.doAs(new
1126 PrivilegedExceptionAction<QueueInfo>() {
1127 public QueueInfo run() throws IOException, InterruptedException {
1128 return cluster.getQueue(queueName);
1129 }
1130 });
1131 if (queueInfo != null) {
1132 return new JobQueueInfo(queueInfo);
1133 }
1134 return null;
1135 } catch (InterruptedException ie) {
1136 throw new IOException(ie);
1137 }
1138 }
1139
1140 /**
1141 * Gets the Queue ACLs for current user
1142 * @return array of QueueAclsInfo object for current user.
1143 * @throws IOException
1144 */
1145 public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException {
1146 try {
1147 org.apache.hadoop.mapreduce.QueueAclsInfo[] acls =
1148 clientUgi.doAs(new
1149 PrivilegedExceptionAction
1150 <org.apache.hadoop.mapreduce.QueueAclsInfo[]>() {
1151 public org.apache.hadoop.mapreduce.QueueAclsInfo[] run()
1152 throws IOException, InterruptedException {
1153 return cluster.getQueueAclsForCurrentUser();
1154 }
1155 });
1156 QueueAclsInfo[] ret = new QueueAclsInfo[acls.length];
1157 for (int i = 0 ; i < acls.length; i++ ) {
1158 ret[i] = QueueAclsInfo.downgrade(acls[i]);
1159 }
1160 return ret;
1161 } catch (InterruptedException ie) {
1162 throw new IOException(ie);
1163 }
1164 }
1165
1166 /**
1167 * Get a delegation token for the user from the JobTracker.
1168 * @param renewer the user who can renew the token
1169 * @return the new token
1170 * @throws IOException
1171 */
1172 public Token<DelegationTokenIdentifier>
1173 getDelegationToken(final Text renewer) throws IOException, InterruptedException {
1174 getDelegationTokenCalled = true;
1175 return clientUgi.doAs(new
1176 PrivilegedExceptionAction<Token<DelegationTokenIdentifier>>() {
1177 public Token<DelegationTokenIdentifier> run() throws IOException,
1178 InterruptedException {
1179 return cluster.getDelegationToken(renewer);
1180 }
1181 });
1182 }
1183
1184 /**
1185 * Renew a delegation token
1186 * @param token the token to renew
1187 * @return true if the renewal went well
1188 * @throws InvalidToken
1189 * @throws IOException
1190 * @deprecated Use {@link Token#renew} instead
1191 */
1192 public long renewDelegationToken(Token<DelegationTokenIdentifier> token
1193 ) throws InvalidToken, IOException,
1194 InterruptedException {
1195 return token.renew(getConf());
1196 }
1197
1198 /**
1199 * Cancel a delegation token from the JobTracker
1200 * @param token the token to cancel
1201 * @throws IOException
1202 * @deprecated Use {@link Token#cancel} instead
1203 */
1204 public void cancelDelegationToken(Token<DelegationTokenIdentifier> token
1205 ) throws InvalidToken, IOException,
1206 InterruptedException {
1207 token.cancel(getConf());
1208 }
1209
1210 /**
1211 */
1212 public static void main(String argv[]) throws Exception {
1213 int res = ToolRunner.run(new JobClient(), argv);
1214 System.exit(res);
1215 }
1216 }
1217