001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.hadoop.mapreduce.tools;
019
020 import java.io.IOException;
021 import java.io.OutputStreamWriter;
022 import java.io.PrintWriter;
023 import java.util.ArrayList;
024 import java.util.List;
025
026 import org.apache.commons.logging.Log;
027 import org.apache.commons.logging.LogFactory;
028 import org.apache.hadoop.classification.InterfaceAudience;
029 import org.apache.hadoop.classification.InterfaceStability;
030 import org.apache.hadoop.classification.InterfaceAudience.Private;
031 import org.apache.hadoop.conf.Configuration;
032 import org.apache.hadoop.conf.Configured;
033 import org.apache.hadoop.ipc.RemoteException;
034 import org.apache.hadoop.mapred.JobConf;
035 import org.apache.hadoop.mapred.TIPStatus;
036 import org.apache.hadoop.mapreduce.Cluster;
037 import org.apache.hadoop.mapreduce.Counters;
038 import org.apache.hadoop.mapreduce.Job;
039 import org.apache.hadoop.mapreduce.JobID;
040 import org.apache.hadoop.mapreduce.JobPriority;
041 import org.apache.hadoop.mapreduce.JobStatus;
042 import org.apache.hadoop.mapreduce.TaskAttemptID;
043 import org.apache.hadoop.mapreduce.TaskCompletionEvent;
044 import org.apache.hadoop.mapreduce.TaskReport;
045 import org.apache.hadoop.mapreduce.TaskTrackerInfo;
046 import org.apache.hadoop.mapreduce.TaskType;
047 import org.apache.hadoop.mapreduce.jobhistory.HistoryViewer;
048 import org.apache.hadoop.mapreduce.v2.LogParams;
049 import org.apache.hadoop.security.AccessControlException;
050 import org.apache.hadoop.util.Tool;
051 import org.apache.hadoop.util.ToolRunner;
052 import org.apache.hadoop.yarn.logaggregation.LogDumper;
053
054 import com.google.common.base.Charsets;
055
056 /**
057 * Interprets the map reduce cli options
058 */
059 @InterfaceAudience.Public
060 @InterfaceStability.Stable
061 public class CLI extends Configured implements Tool {
062 private static final Log LOG = LogFactory.getLog(CLI.class);
063 protected Cluster cluster;
064
065 public CLI() {
066 }
067
068 public CLI(Configuration conf) {
069 setConf(conf);
070 }
071
072 public int run(String[] argv) throws Exception {
073 int exitCode = -1;
074 if (argv.length < 1) {
075 displayUsage("");
076 return exitCode;
077 }
078 // process arguments
079 String cmd = argv[0];
080 String submitJobFile = null;
081 String jobid = null;
082 String taskid = null;
083 String historyFile = null;
084 String counterGroupName = null;
085 String counterName = null;
086 JobPriority jp = null;
087 String taskType = null;
088 String taskState = null;
089 int fromEvent = 0;
090 int nEvents = 0;
091 boolean getStatus = false;
092 boolean getCounter = false;
093 boolean killJob = false;
094 boolean listEvents = false;
095 boolean viewHistory = false;
096 boolean viewAllHistory = false;
097 boolean listJobs = false;
098 boolean listAllJobs = false;
099 boolean listActiveTrackers = false;
100 boolean listBlacklistedTrackers = false;
101 boolean displayTasks = false;
102 boolean killTask = false;
103 boolean failTask = false;
104 boolean setJobPriority = false;
105 boolean logs = false;
106
107 if ("-submit".equals(cmd)) {
108 if (argv.length != 2) {
109 displayUsage(cmd);
110 return exitCode;
111 }
112 submitJobFile = argv[1];
113 } else if ("-status".equals(cmd)) {
114 if (argv.length != 2) {
115 displayUsage(cmd);
116 return exitCode;
117 }
118 jobid = argv[1];
119 getStatus = true;
120 } else if("-counter".equals(cmd)) {
121 if (argv.length != 4) {
122 displayUsage(cmd);
123 return exitCode;
124 }
125 getCounter = true;
126 jobid = argv[1];
127 counterGroupName = argv[2];
128 counterName = argv[3];
129 } else if ("-kill".equals(cmd)) {
130 if (argv.length != 2) {
131 displayUsage(cmd);
132 return exitCode;
133 }
134 jobid = argv[1];
135 killJob = true;
136 } else if ("-set-priority".equals(cmd)) {
137 if (argv.length != 3) {
138 displayUsage(cmd);
139 return exitCode;
140 }
141 jobid = argv[1];
142 try {
143 jp = JobPriority.valueOf(argv[2]);
144 } catch (IllegalArgumentException iae) {
145 LOG.info(iae);
146 displayUsage(cmd);
147 return exitCode;
148 }
149 setJobPriority = true;
150 } else if ("-events".equals(cmd)) {
151 if (argv.length != 4) {
152 displayUsage(cmd);
153 return exitCode;
154 }
155 jobid = argv[1];
156 fromEvent = Integer.parseInt(argv[2]);
157 nEvents = Integer.parseInt(argv[3]);
158 listEvents = true;
159 } else if ("-history".equals(cmd)) {
160 if (argv.length != 2 && !(argv.length == 3 && "all".equals(argv[1]))) {
161 displayUsage(cmd);
162 return exitCode;
163 }
164 viewHistory = true;
165 if (argv.length == 3 && "all".equals(argv[1])) {
166 viewAllHistory = true;
167 historyFile = argv[2];
168 } else {
169 historyFile = argv[1];
170 }
171 } else if ("-list".equals(cmd)) {
172 if (argv.length != 1 && !(argv.length == 2 && "all".equals(argv[1]))) {
173 displayUsage(cmd);
174 return exitCode;
175 }
176 if (argv.length == 2 && "all".equals(argv[1])) {
177 listAllJobs = true;
178 } else {
179 listJobs = true;
180 }
181 } else if("-kill-task".equals(cmd)) {
182 if (argv.length != 2) {
183 displayUsage(cmd);
184 return exitCode;
185 }
186 killTask = true;
187 taskid = argv[1];
188 } else if("-fail-task".equals(cmd)) {
189 if (argv.length != 2) {
190 displayUsage(cmd);
191 return exitCode;
192 }
193 failTask = true;
194 taskid = argv[1];
195 } else if ("-list-active-trackers".equals(cmd)) {
196 if (argv.length != 1) {
197 displayUsage(cmd);
198 return exitCode;
199 }
200 listActiveTrackers = true;
201 } else if ("-list-blacklisted-trackers".equals(cmd)) {
202 if (argv.length != 1) {
203 displayUsage(cmd);
204 return exitCode;
205 }
206 listBlacklistedTrackers = true;
207 } else if ("-list-attempt-ids".equals(cmd)) {
208 if (argv.length != 4) {
209 displayUsage(cmd);
210 return exitCode;
211 }
212 jobid = argv[1];
213 taskType = argv[2];
214 taskState = argv[3];
215 displayTasks = true;
216 } else if ("-logs".equals(cmd)) {
217 if (argv.length == 2 || argv.length ==3) {
218 logs = true;
219 jobid = argv[1];
220 if (argv.length == 3) {
221 taskid = argv[2];
222 } else {
223 taskid = null;
224 }
225 } else {
226 displayUsage(cmd);
227 return exitCode;
228 }
229 } else {
230 displayUsage(cmd);
231 return exitCode;
232 }
233
234 // initialize cluster
235 cluster = new Cluster(getConf());
236
237 // Submit the request
238 try {
239 if (submitJobFile != null) {
240 Job job = Job.getInstance(new JobConf(submitJobFile));
241 job.submit();
242 System.out.println("Created job " + job.getJobID());
243 exitCode = 0;
244 } else if (getStatus) {
245 Job job = cluster.getJob(JobID.forName(jobid));
246 if (job == null) {
247 System.out.println("Could not find job " + jobid);
248 } else {
249 Counters counters = job.getCounters();
250 System.out.println();
251 System.out.println(job);
252 if (counters != null) {
253 System.out.println(counters);
254 } else {
255 System.out.println("Counters not available. Job is retired.");
256 }
257 exitCode = 0;
258 }
259 } else if (getCounter) {
260 Job job = cluster.getJob(JobID.forName(jobid));
261 if (job == null) {
262 System.out.println("Could not find job " + jobid);
263 } else {
264 Counters counters = job.getCounters();
265 if (counters == null) {
266 System.out.println("Counters not available for retired job " +
267 jobid);
268 exitCode = -1;
269 } else {
270 System.out.println(getCounter(counters,
271 counterGroupName, counterName));
272 exitCode = 0;
273 }
274 }
275 } else if (killJob) {
276 Job job = cluster.getJob(JobID.forName(jobid));
277 if (job == null) {
278 System.out.println("Could not find job " + jobid);
279 } else {
280 job.killJob();
281 System.out.println("Killed job " + jobid);
282 exitCode = 0;
283 }
284 } else if (setJobPriority) {
285 Job job = cluster.getJob(JobID.forName(jobid));
286 if (job == null) {
287 System.out.println("Could not find job " + jobid);
288 } else {
289 job.setPriority(jp);
290 System.out.println("Changed job priority.");
291 exitCode = 0;
292 }
293 } else if (viewHistory) {
294 viewHistory(historyFile, viewAllHistory);
295 exitCode = 0;
296 } else if (listEvents) {
297 listEvents(cluster.getJob(JobID.forName(jobid)), fromEvent, nEvents);
298 exitCode = 0;
299 } else if (listJobs) {
300 listJobs(cluster);
301 exitCode = 0;
302 } else if (listAllJobs) {
303 listAllJobs(cluster);
304 exitCode = 0;
305 } else if (listActiveTrackers) {
306 listActiveTrackers(cluster);
307 exitCode = 0;
308 } else if (listBlacklistedTrackers) {
309 listBlacklistedTrackers(cluster);
310 exitCode = 0;
311 } else if (displayTasks) {
312 displayTasks(cluster.getJob(JobID.forName(jobid)), taskType, taskState);
313 } else if(killTask) {
314 TaskAttemptID taskID = TaskAttemptID.forName(taskid);
315 Job job = cluster.getJob(taskID.getJobID());
316 if (job == null) {
317 System.out.println("Could not find job " + jobid);
318 } else if (job.killTask(taskID)) {
319 System.out.println("Killed task " + taskid);
320 exitCode = 0;
321 } else {
322 System.out.println("Could not kill task " + taskid);
323 exitCode = -1;
324 }
325 } else if(failTask) {
326 TaskAttemptID taskID = TaskAttemptID.forName(taskid);
327 Job job = cluster.getJob(taskID.getJobID());
328 if (job == null) {
329 System.out.println("Could not find job " + jobid);
330 } else if(job.failTask(taskID)) {
331 System.out.println("Killed task " + taskID + " by failing it");
332 exitCode = 0;
333 } else {
334 System.out.println("Could not fail task " + taskid);
335 exitCode = -1;
336 }
337 } else if (logs) {
338 try {
339 JobID jobID = JobID.forName(jobid);
340 TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskid);
341 LogParams logParams = cluster.getLogParams(jobID, taskAttemptID);
342 LogDumper logDumper = new LogDumper();
343 logDumper.setConf(getConf());
344 exitCode = logDumper.dumpAContainersLogs(logParams.getApplicationId(),
345 logParams.getContainerId(), logParams.getNodeId(),
346 logParams.getOwner());
347 } catch (IOException e) {
348 if (e instanceof RemoteException) {
349 throw e;
350 }
351 System.out.println(e.getMessage());
352 }
353 }
354 } catch (RemoteException re) {
355 IOException unwrappedException = re.unwrapRemoteException();
356 if (unwrappedException instanceof AccessControlException) {
357 System.out.println(unwrappedException.getMessage());
358 } else {
359 throw re;
360 }
361 } finally {
362 cluster.close();
363 }
364 return exitCode;
365 }
366
367 private String getJobPriorityNames() {
368 StringBuffer sb = new StringBuffer();
369 for (JobPriority p : JobPriority.values()) {
370 sb.append(p.name()).append(" ");
371 }
372 return sb.substring(0, sb.length()-1);
373 }
374
375 private String getTaskTypess() {
376 StringBuffer sb = new StringBuffer();
377 for (TaskType t : TaskType.values()) {
378 sb.append(t.name()).append(" ");
379 }
380 return sb.substring(0, sb.length()-1);
381 }
382
383 /**
384 * Display usage of the command-line tool and terminate execution.
385 */
386 private void displayUsage(String cmd) {
387 String prefix = "Usage: CLI ";
388 String jobPriorityValues = getJobPriorityNames();
389 String taskTypes = getTaskTypess();
390 String taskStates = "running, completed";
391 if ("-submit".equals(cmd)) {
392 System.err.println(prefix + "[" + cmd + " <job-file>]");
393 } else if ("-status".equals(cmd) || "-kill".equals(cmd)) {
394 System.err.println(prefix + "[" + cmd + " <job-id>]");
395 } else if ("-counter".equals(cmd)) {
396 System.err.println(prefix + "[" + cmd +
397 " <job-id> <group-name> <counter-name>]");
398 } else if ("-events".equals(cmd)) {
399 System.err.println(prefix + "[" + cmd +
400 " <job-id> <from-event-#> <#-of-events>]. Event #s start from 1.");
401 } else if ("-history".equals(cmd)) {
402 System.err.println(prefix + "[" + cmd + " <jobHistoryFile>]");
403 } else if ("-list".equals(cmd)) {
404 System.err.println(prefix + "[" + cmd + " [all]]");
405 } else if ("-kill-task".equals(cmd) || "-fail-task".equals(cmd)) {
406 System.err.println(prefix + "[" + cmd + " <task-attempt-id>]");
407 } else if ("-set-priority".equals(cmd)) {
408 System.err.println(prefix + "[" + cmd + " <job-id> <priority>]. " +
409 "Valid values for priorities are: "
410 + jobPriorityValues);
411 } else if ("-list-active-trackers".equals(cmd)) {
412 System.err.println(prefix + "[" + cmd + "]");
413 } else if ("-list-blacklisted-trackers".equals(cmd)) {
414 System.err.println(prefix + "[" + cmd + "]");
415 } else if ("-list-attempt-ids".equals(cmd)) {
416 System.err.println(prefix + "[" + cmd +
417 " <job-id> <task-type> <task-state>]. " +
418 "Valid values for <task-type> are " + taskTypes + ". " +
419 "Valid values for <task-state> are " + taskStates);
420 } else if ("-logs".equals(cmd)) {
421 System.err.println(prefix + "[" + cmd +
422 " <job-id> <task-attempt-id>]. " +
423 " <task-attempt-id> is optional to get task attempt logs.");
424 } else {
425 System.err.printf(prefix + "<command> <args>%n");
426 System.err.printf("\t[-submit <job-file>]%n");
427 System.err.printf("\t[-status <job-id>]%n");
428 System.err.printf("\t[-counter <job-id> <group-name> <counter-name>]%n");
429 System.err.printf("\t[-kill <job-id>]%n");
430 System.err.printf("\t[-set-priority <job-id> <priority>]. " +
431 "Valid values for priorities are: " + jobPriorityValues + "%n");
432 System.err.printf("\t[-events <job-id> <from-event-#> <#-of-events>]%n");
433 System.err.printf("\t[-history <jobHistoryFile>]%n");
434 System.err.printf("\t[-list [all]]%n");
435 System.err.printf("\t[-list-active-trackers]%n");
436 System.err.printf("\t[-list-blacklisted-trackers]%n");
437 System.err.println("\t[-list-attempt-ids <job-id> <task-type> " +
438 "<task-state>]. " +
439 "Valid values for <task-type> are " + taskTypes + ". " +
440 "Valid values for <task-state> are " + taskStates);
441 System.err.printf("\t[-kill-task <task-attempt-id>]%n");
442 System.err.printf("\t[-fail-task <task-attempt-id>]%n");
443 System.err.printf("\t[-logs <job-id> <task-attempt-id>]%n%n");
444 ToolRunner.printGenericCommandUsage(System.out);
445 }
446 }
447
448 private void viewHistory(String historyFile, boolean all)
449 throws IOException {
450 HistoryViewer historyViewer = new HistoryViewer(historyFile,
451 getConf(), all);
452 historyViewer.print();
453 }
454
455 protected long getCounter(Counters counters, String counterGroupName,
456 String counterName) throws IOException {
457 return counters.findCounter(counterGroupName, counterName).getValue();
458 }
459
460 /**
461 * List the events for the given job
462 * @param jobId the job id for the job's events to list
463 * @throws IOException
464 */
465 private void listEvents(Job job, int fromEventId, int numEvents)
466 throws IOException, InterruptedException {
467 TaskCompletionEvent[] events = job.
468 getTaskCompletionEvents(fromEventId, numEvents);
469 System.out.println("Task completion events for " + job.getJobID());
470 System.out.println("Number of events (from " + fromEventId + ") are: "
471 + events.length);
472 for(TaskCompletionEvent event: events) {
473 System.out.println(event.getStatus() + " " +
474 event.getTaskAttemptId() + " " +
475 getTaskLogURL(event.getTaskAttemptId(), event.getTaskTrackerHttp()));
476 }
477 }
478
479 protected static String getTaskLogURL(TaskAttemptID taskId, String baseUrl) {
480 return (baseUrl + "/tasklog?plaintext=true&attemptid=" + taskId);
481 }
482
483
484 /**
485 * Dump a list of currently running jobs
486 * @throws IOException
487 */
488 private void listJobs(Cluster cluster)
489 throws IOException, InterruptedException {
490 List<JobStatus> runningJobs = new ArrayList<JobStatus>();
491 for (JobStatus job : cluster.getAllJobStatuses()) {
492 if (!job.isJobComplete()) {
493 runningJobs.add(job);
494 }
495 }
496 displayJobList(runningJobs.toArray(new JobStatus[0]));
497 }
498
499 /**
500 * Dump a list of all jobs submitted.
501 * @throws IOException
502 */
503 private void listAllJobs(Cluster cluster)
504 throws IOException, InterruptedException {
505 displayJobList(cluster.getAllJobStatuses());
506 }
507
508 /**
509 * Display the list of active trackers
510 */
511 private void listActiveTrackers(Cluster cluster)
512 throws IOException, InterruptedException {
513 TaskTrackerInfo[] trackers = cluster.getActiveTaskTrackers();
514 for (TaskTrackerInfo tracker : trackers) {
515 System.out.println(tracker.getTaskTrackerName());
516 }
517 }
518
519 /**
520 * Display the list of blacklisted trackers
521 */
522 private void listBlacklistedTrackers(Cluster cluster)
523 throws IOException, InterruptedException {
524 TaskTrackerInfo[] trackers = cluster.getBlackListedTaskTrackers();
525 if (trackers.length > 0) {
526 System.out.println("BlackListedNode \t Reason");
527 }
528 for (TaskTrackerInfo tracker : trackers) {
529 System.out.println(tracker.getTaskTrackerName() + "\t" +
530 tracker.getReasonForBlacklist());
531 }
532 }
533
534 private void printTaskAttempts(TaskReport report) {
535 if (report.getCurrentStatus() == TIPStatus.COMPLETE) {
536 System.out.println(report.getSuccessfulTaskAttemptId());
537 } else if (report.getCurrentStatus() == TIPStatus.RUNNING) {
538 for (TaskAttemptID t :
539 report.getRunningTaskAttemptIds()) {
540 System.out.println(t);
541 }
542 }
543 }
544
545 /**
546 * Display the information about a job's tasks, of a particular type and
547 * in a particular state
548 *
549 * @param job the job
550 * @param type the type of the task (map/reduce/setup/cleanup)
551 * @param state the state of the task
552 * (pending/running/completed/failed/killed)
553 */
554 protected void displayTasks(Job job, String type, String state)
555 throws IOException, InterruptedException {
556 TaskReport[] reports = job.getTaskReports(TaskType.valueOf(type));
557 for (TaskReport report : reports) {
558 TIPStatus status = report.getCurrentStatus();
559 if ((state.equals("pending") && status ==TIPStatus.PENDING) ||
560 (state.equals("running") && status ==TIPStatus.RUNNING) ||
561 (state.equals("completed") && status == TIPStatus.COMPLETE) ||
562 (state.equals("failed") && status == TIPStatus.FAILED) ||
563 (state.equals("killed") && status == TIPStatus.KILLED)) {
564 printTaskAttempts(report);
565 }
566 }
567 }
568
569 public void displayJobList(JobStatus[] jobs)
570 throws IOException, InterruptedException {
571 displayJobList(jobs, new PrintWriter(new OutputStreamWriter(System.out,
572 Charsets.UTF_8)));
573 }
574
575 @Private
576 public static String headerPattern = "%23s\t%10s\t%14s\t%12s\t%12s\t%10s\t%15s\t%15s\t%8s\t%8s\t%10s\t%10s\n";
577 @Private
578 public static String dataPattern = "%23s\t%10s\t%14d\t%12s\t%12s\t%10s\t%15s\t%15s\t%8s\t%8s\t%10s\t%10s\n";
579 private static String memPattern = "%dM";
580 private static String UNAVAILABLE = "N/A";
581
582 @Private
583 public void displayJobList(JobStatus[] jobs, PrintWriter writer) {
584 writer.println("Total jobs:" + jobs.length);
585 writer.printf(headerPattern, "JobId", "State", "StartTime", "UserName",
586 "Queue", "Priority", "UsedContainers",
587 "RsvdContainers", "UsedMem", "RsvdMem", "NeededMem", "AM info");
588 for (JobStatus job : jobs) {
589 int numUsedSlots = job.getNumUsedSlots();
590 int numReservedSlots = job.getNumReservedSlots();
591 int usedMem = job.getUsedMem();
592 int rsvdMem = job.getReservedMem();
593 int neededMem = job.getNeededMem();
594 writer.printf(dataPattern,
595 job.getJobID().toString(), job.getState(), job.getStartTime(),
596 job.getUsername(), job.getQueue(),
597 job.getPriority().name(),
598 numUsedSlots < 0 ? UNAVAILABLE : numUsedSlots,
599 numReservedSlots < 0 ? UNAVAILABLE : numReservedSlots,
600 usedMem < 0 ? UNAVAILABLE : String.format(memPattern, usedMem),
601 rsvdMem < 0 ? UNAVAILABLE : String.format(memPattern, rsvdMem),
602 neededMem < 0 ? UNAVAILABLE : String.format(memPattern, neededMem),
603 job.getSchedulingInfo());
604 }
605 writer.flush();
606 }
607
608 public static void main(String[] argv) throws Exception {
609 int res = ToolRunner.run(new CLI(), argv);
610 System.exit(res);
611 }
612 }