001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapreduce;
020
021 import java.io.IOException;
022 import java.net.URI;
023 import java.security.PrivilegedExceptionAction;
024
025 import org.apache.commons.logging.Log;
026 import org.apache.commons.logging.LogFactory;
027 import org.apache.hadoop.classification.InterfaceAudience;
028 import org.apache.hadoop.classification.InterfaceStability;
029 import org.apache.hadoop.classification.InterfaceAudience.Private;
030 import org.apache.hadoop.conf.Configuration;
031 import org.apache.hadoop.conf.Configuration.IntegerRanges;
032 import org.apache.hadoop.fs.FileSystem;
033 import org.apache.hadoop.fs.Path;
034 import org.apache.hadoop.io.RawComparator;
035 import org.apache.hadoop.mapred.JobConf;
036 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
037 import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
038 import org.apache.hadoop.mapreduce.task.JobContextImpl;
039 import org.apache.hadoop.mapreduce.util.ConfigUtil;
040 import org.apache.hadoop.util.StringUtils;
041
042 /**
043 * The job submitter's view of the Job.
044 *
045 * <p>It allows the user to configure the
046 * job, submit it, control its execution, and query the state. The set methods
047 * only work until the job is submitted, afterwards they will throw an
048 * IllegalStateException. </p>
049 *
050 * <p>
051 * Normally the user creates the application, describes various facets of the
052 * job via {@link Job} and then submits the job and monitor its progress.</p>
053 *
054 * <p>Here is an example on how to submit a job:</p>
055 * <p><blockquote><pre>
056 * // Create a new Job
057 * Job job = new Job(new Configuration());
058 * job.setJarByClass(MyJob.class);
059 *
060 * // Specify various job-specific parameters
061 * job.setJobName("myjob");
062 *
063 * job.setInputPath(new Path("in"));
064 * job.setOutputPath(new Path("out"));
065 *
066 * job.setMapperClass(MyJob.MyMapper.class);
067 * job.setReducerClass(MyJob.MyReducer.class);
068 *
069 * // Submit the job, then poll for progress until the job is complete
070 * job.waitForCompletion(true);
071 * </pre></blockquote></p>
072 *
073 *
074 */
075 @InterfaceAudience.Public
076 @InterfaceStability.Evolving
077 public class Job extends JobContextImpl implements JobContext {
078 private static final Log LOG = LogFactory.getLog(Job.class);
079
080 @InterfaceStability.Evolving
081 public static enum JobState {DEFINE, RUNNING};
082 private static final long MAX_JOBSTATUS_AGE = 1000 * 2;
083 public static final String OUTPUT_FILTER = "mapreduce.client.output.filter";
084 /** Key in mapred-*.xml that sets completionPollInvervalMillis */
085 public static final String COMPLETION_POLL_INTERVAL_KEY =
086 "mapreduce.client.completion.pollinterval";
087
088 /** Default completionPollIntervalMillis is 5000 ms. */
089 static final int DEFAULT_COMPLETION_POLL_INTERVAL = 5000;
090 /** Key in mapred-*.xml that sets progMonitorPollIntervalMillis */
091 public static final String PROGRESS_MONITOR_POLL_INTERVAL_KEY =
092 "mapreduce.client.progressmonitor.pollinterval";
093 /** Default progMonitorPollIntervalMillis is 1000 ms. */
094 static final int DEFAULT_MONITOR_POLL_INTERVAL = 1000;
095
096 public static final String USED_GENERIC_PARSER =
097 "mapreduce.client.genericoptionsparser.used";
098 public static final String SUBMIT_REPLICATION =
099 "mapreduce.client.submit.file.replication";
100 private static final String TASKLOG_PULL_TIMEOUT_KEY =
101 "mapreduce.client.tasklog.timeout";
102 private static final int DEFAULT_TASKLOG_TIMEOUT = 60000;
103
104 @InterfaceStability.Evolving
105 public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
106
107 static {
108 ConfigUtil.loadResources();
109 }
110
111 private JobState state = JobState.DEFINE;
112 private JobStatus status;
113 private long statustime;
114 private Cluster cluster;
115
116 @Deprecated
117 public Job() throws IOException {
118 this(new Configuration());
119 }
120
121 @Deprecated
122 public Job(Configuration conf) throws IOException {
123 this(new JobConf(conf));
124 }
125
126 @Deprecated
127 public Job(Configuration conf, String jobName) throws IOException {
128 this(conf);
129 setJobName(jobName);
130 }
131
132 Job(JobConf conf) throws IOException {
133 super(conf, null);
134 // propagate existing user credentials to job
135 this.credentials.mergeAll(this.ugi.getCredentials());
136 this.cluster = null;
137 }
138
139 Job(JobStatus status, JobConf conf) throws IOException {
140 this(conf);
141 setJobID(status.getJobID());
142 this.status = status;
143 state = JobState.RUNNING;
144 }
145
146
147 /**
148 * Creates a new {@link Job} with no particular {@link Cluster} .
149 * A Cluster will be created with a generic {@link Configuration}.
150 *
151 * @return the {@link Job} , with no connection to a cluster yet.
152 * @throws IOException
153 */
154 public static Job getInstance() throws IOException {
155 // create with a null Cluster
156 return getInstance(new Configuration());
157 }
158
159 /**
160 * Creates a new {@link Job} with no particular {@link Cluster} and a
161 * given {@link Configuration}.
162 *
163 * The <code>Job</code> makes a copy of the <code>Configuration</code> so
164 * that any necessary internal modifications do not reflect on the incoming
165 * parameter.
166 *
167 * A Cluster will be created from the conf parameter only when it's needed.
168 *
169 * @param conf the configuration
170 * @return the {@link Job} , with no connection to a cluster yet.
171 * @throws IOException
172 */
173 public static Job getInstance(Configuration conf) throws IOException {
174 // create with a null Cluster
175 JobConf jobConf = new JobConf(conf);
176 return new Job(jobConf);
177 }
178
179
180 /**
181 * Creates a new {@link Job} with no particular {@link Cluster} and a given jobName.
182 * A Cluster will be created from the conf parameter only when it's needed.
183 *
184 * The <code>Job</code> makes a copy of the <code>Configuration</code> so
185 * that any necessary internal modifications do not reflect on the incoming
186 * parameter.
187 *
188 * @param conf the configuration
189 * @return the {@link Job} , with no connection to a cluster yet.
190 * @throws IOException
191 */
192 public static Job getInstance(Configuration conf, String jobName)
193 throws IOException {
194 // create with a null Cluster
195 Job result = getInstance(conf);
196 result.setJobName(jobName);
197 return result;
198 }
199
200 /**
201 * Creates a new {@link Job} with no particular {@link Cluster} and given
202 * {@link Configuration} and {@link JobStatus}.
203 * A Cluster will be created from the conf parameter only when it's needed.
204 *
205 * The <code>Job</code> makes a copy of the <code>Configuration</code> so
206 * that any necessary internal modifications do not reflect on the incoming
207 * parameter.
208 *
209 * @param status job status
210 * @param conf job configuration
211 * @return the {@link Job} , with no connection to a cluster yet.
212 * @throws IOException
213 */
214 public static Job getInstance(JobStatus status, Configuration conf)
215 throws IOException {
216 return new Job(status, new JobConf(conf));
217 }
218
219 /**
220 * Creates a new {@link Job} with no particular {@link Cluster}.
221 * A Cluster will be created from the conf parameter only when it's needed.
222 *
223 * The <code>Job</code> makes a copy of the <code>Configuration</code> so
224 * that any necessary internal modifications do not reflect on the incoming
225 * parameter.
226 *
227 * @param ignored
228 * @return the {@link Job} , with no connection to a cluster yet.
229 * @throws IOException
230 * @deprecated Use {@link #getInstance()}
231 */
232 @Deprecated
233 public static Job getInstance(Cluster ignored) throws IOException {
234 return getInstance();
235 }
236
237 /**
238 * Creates a new {@link Job} with no particular {@link Cluster} and given
239 * {@link Configuration}.
240 * A Cluster will be created from the conf parameter only when it's needed.
241 *
242 * The <code>Job</code> makes a copy of the <code>Configuration</code> so
243 * that any necessary internal modifications do not reflect on the incoming
244 * parameter.
245 *
246 * @param ignored
247 * @param conf job configuration
248 * @return the {@link Job} , with no connection to a cluster yet.
249 * @throws IOException
250 * @deprecated Use {@link #getInstance(Configuration)}
251 */
252 @Deprecated
253 public static Job getInstance(Cluster ignored, Configuration conf)
254 throws IOException {
255 return getInstance(conf);
256 }
257
258 /**
259 * Creates a new {@link Job} with no particular {@link Cluster} and given
260 * {@link Configuration} and {@link JobStatus}.
261 * A Cluster will be created from the conf parameter only when it's needed.
262 *
263 * The <code>Job</code> makes a copy of the <code>Configuration</code> so
264 * that any necessary internal modifications do not reflect on the incoming
265 * parameter.
266 *
267 * @param cluster cluster
268 * @param status job status
269 * @param conf job configuration
270 * @return the {@link Job} , with no connection to a cluster yet.
271 * @throws IOException
272 */
273 @Private
274 public static Job getInstance(Cluster cluster, JobStatus status,
275 Configuration conf) throws IOException {
276 Job job = getInstance(status, conf);
277 job.setCluster(cluster);
278 return job;
279 }
280
281 private void ensureState(JobState state) throws IllegalStateException {
282 if (state != this.state) {
283 throw new IllegalStateException("Job in state "+ this.state +
284 " instead of " + state);
285 }
286
287 if (state == JobState.RUNNING && cluster == null) {
288 throw new IllegalStateException
289 ("Job in state " + this.state
290 + ", but it isn't attached to any job tracker!");
291 }
292 }
293
294 /**
295 * Some methods rely on having a recent job status object. Refresh
296 * it, if necessary
297 */
298 synchronized void ensureFreshStatus()
299 throws IOException, InterruptedException {
300 if (System.currentTimeMillis() - statustime > MAX_JOBSTATUS_AGE) {
301 updateStatus();
302 }
303 }
304
305 /** Some methods need to update status immediately. So, refresh
306 * immediately
307 * @throws IOException
308 */
309 synchronized void updateStatus() throws IOException, InterruptedException {
310 this.status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
311 @Override
312 public JobStatus run() throws IOException, InterruptedException {
313 return cluster.getClient().getJobStatus(status.getJobID());
314 }
315 });
316 if (this.status == null) {
317 throw new IOException("Job status not available ");
318 }
319 this.statustime = System.currentTimeMillis();
320 }
321
322 public JobStatus getStatus() throws IOException, InterruptedException {
323 ensureState(JobState.RUNNING);
324 updateStatus();
325 return status;
326 }
327
328 private void setStatus(JobStatus status) {
329 this.status = status;
330 }
331
332 /**
333 * Returns the current state of the Job.
334 *
335 * @return JobStatus#State
336 * @throws IOException
337 * @throws InterruptedException
338 */
339 public JobStatus.State getJobState()
340 throws IOException, InterruptedException {
341 ensureState(JobState.RUNNING);
342 updateStatus();
343 return status.getState();
344 }
345
346 /**
347 * Get the URL where some job progress information will be displayed.
348 *
349 * @return the URL where some job progress information will be displayed.
350 */
351 public String getTrackingURL(){
352 ensureState(JobState.RUNNING);
353 return status.getTrackingUrl().toString();
354 }
355
356 /**
357 * Get the path of the submitted job configuration.
358 *
359 * @return the path of the submitted job configuration.
360 */
361 public String getJobFile() {
362 ensureState(JobState.RUNNING);
363 return status.getJobFile();
364 }
365
366 /**
367 * Get start time of the job.
368 *
369 * @return the start time of the job
370 */
371 public long getStartTime() {
372 ensureState(JobState.RUNNING);
373 return status.getStartTime();
374 }
375
376 /**
377 * Get finish time of the job.
378 *
379 * @return the finish time of the job
380 */
381 public long getFinishTime() throws IOException, InterruptedException {
382 ensureState(JobState.RUNNING);
383 updateStatus();
384 return status.getFinishTime();
385 }
386
387 /**
388 * Get scheduling info of the job.
389 *
390 * @return the scheduling info of the job
391 */
392 public String getSchedulingInfo() {
393 ensureState(JobState.RUNNING);
394 return status.getSchedulingInfo();
395 }
396
397 /**
398 * Get scheduling info of the job.
399 *
400 * @return the scheduling info of the job
401 */
402 public JobPriority getPriority() throws IOException, InterruptedException {
403 ensureState(JobState.RUNNING);
404 updateStatus();
405 return status.getPriority();
406 }
407
408 /**
409 * The user-specified job name.
410 */
411 public String getJobName() {
412 if (state == JobState.DEFINE) {
413 return super.getJobName();
414 }
415 ensureState(JobState.RUNNING);
416 return status.getJobName();
417 }
418
419 public String getHistoryUrl() throws IOException, InterruptedException {
420 ensureState(JobState.RUNNING);
421 updateStatus();
422 return status.getHistoryFile();
423 }
424
425 public boolean isRetired() throws IOException, InterruptedException {
426 ensureState(JobState.RUNNING);
427 updateStatus();
428 return status.isRetired();
429 }
430
431 @Private
432 public Cluster getCluster() {
433 return cluster;
434 }
435
436 /** Only for mocks in unit tests. */
437 @Private
438 private void setCluster(Cluster cluster) {
439 this.cluster = cluster;
440 }
441
442 /**
443 * Dump stats to screen.
444 */
445 @Override
446 public String toString() {
447 ensureState(JobState.RUNNING);
448 String reasonforFailure = " ";
449 int numMaps = 0;
450 int numReduces = 0;
451 try {
452 updateStatus();
453 if (status.getState().equals(JobStatus.State.FAILED))
454 reasonforFailure = getTaskFailureEventString();
455 numMaps = getTaskReports(TaskType.MAP).length;
456 numReduces = getTaskReports(TaskType.REDUCE).length;
457 } catch (IOException e) {
458 } catch (InterruptedException ie) {
459 }
460 StringBuffer sb = new StringBuffer();
461 sb.append("Job: ").append(status.getJobID()).append("\n");
462 sb.append("Job File: ").append(status.getJobFile()).append("\n");
463 sb.append("Job Tracking URL : ").append(status.getTrackingUrl());
464 sb.append("\n");
465 sb.append("Uber job : ").append(status.isUber()).append("\n");
466 sb.append("Number of maps: ").append(numMaps).append("\n");
467 sb.append("Number of reduces: ").append(numReduces).append("\n");
468 sb.append("map() completion: ");
469 sb.append(status.getMapProgress()).append("\n");
470 sb.append("reduce() completion: ");
471 sb.append(status.getReduceProgress()).append("\n");
472 sb.append("Job state: ");
473 sb.append(status.getState()).append("\n");
474 sb.append("retired: ").append(status.isRetired()).append("\n");
475 sb.append("reason for failure: ").append(reasonforFailure);
476 return sb.toString();
477 }
478
479 /**
480 * @return taskid which caused job failure
481 * @throws IOException
482 * @throws InterruptedException
483 */
484 String getTaskFailureEventString() throws IOException,
485 InterruptedException {
486 int failCount = 1;
487 TaskCompletionEvent lastEvent = null;
488 TaskCompletionEvent[] events = ugi.doAs(new
489 PrivilegedExceptionAction<TaskCompletionEvent[]>() {
490 @Override
491 public TaskCompletionEvent[] run() throws IOException,
492 InterruptedException {
493 return cluster.getClient().getTaskCompletionEvents(
494 status.getJobID(), 0, 10);
495 }
496 });
497 for (TaskCompletionEvent event : events) {
498 if (event.getStatus().equals(TaskCompletionEvent.Status.FAILED)) {
499 failCount++;
500 lastEvent = event;
501 }
502 }
503 if (lastEvent == null) {
504 return "There are no failed tasks for the job. "
505 + "Job is failed due to some other reason and reason "
506 + "can be found in the logs.";
507 }
508 String[] taskAttemptID = lastEvent.getTaskAttemptId().toString().split("_", 2);
509 String taskID = taskAttemptID[1].substring(0, taskAttemptID[1].length()-2);
510 return (" task " + taskID + " failed " +
511 failCount + " times " + "For details check tasktracker at: " +
512 lastEvent.getTaskTrackerHttp());
513 }
514
515 /**
516 * Get the information of the current state of the tasks of a job.
517 *
518 * @param type Type of the task
519 * @return the list of all of the map tips.
520 * @throws IOException
521 */
522 public TaskReport[] getTaskReports(TaskType type)
523 throws IOException, InterruptedException {
524 ensureState(JobState.RUNNING);
525 final TaskType tmpType = type;
526 return ugi.doAs(new PrivilegedExceptionAction<TaskReport[]>() {
527 public TaskReport[] run() throws IOException, InterruptedException {
528 return cluster.getClient().getTaskReports(getJobID(), tmpType);
529 }
530 });
531 }
532
533 /**
534 * Get the <i>progress</i> of the job's map-tasks, as a float between 0.0
535 * and 1.0. When all map tasks have completed, the function returns 1.0.
536 *
537 * @return the progress of the job's map-tasks.
538 * @throws IOException
539 */
540 public float mapProgress() throws IOException, InterruptedException {
541 ensureState(JobState.RUNNING);
542 ensureFreshStatus();
543 return status.getMapProgress();
544 }
545
546 /**
547 * Get the <i>progress</i> of the job's reduce-tasks, as a float between 0.0
548 * and 1.0. When all reduce tasks have completed, the function returns 1.0.
549 *
550 * @return the progress of the job's reduce-tasks.
551 * @throws IOException
552 */
553 public float reduceProgress() throws IOException, InterruptedException {
554 ensureState(JobState.RUNNING);
555 ensureFreshStatus();
556 return status.getReduceProgress();
557 }
558
559 /**
560 * Get the <i>progress</i> of the job's cleanup-tasks, as a float between 0.0
561 * and 1.0. When all cleanup tasks have completed, the function returns 1.0.
562 *
563 * @return the progress of the job's cleanup-tasks.
564 * @throws IOException
565 */
566 public float cleanupProgress() throws IOException, InterruptedException {
567 ensureState(JobState.RUNNING);
568 ensureFreshStatus();
569 return status.getCleanupProgress();
570 }
571
572 /**
573 * Get the <i>progress</i> of the job's setup-tasks, as a float between 0.0
574 * and 1.0. When all setup tasks have completed, the function returns 1.0.
575 *
576 * @return the progress of the job's setup-tasks.
577 * @throws IOException
578 */
579 public float setupProgress() throws IOException, InterruptedException {
580 ensureState(JobState.RUNNING);
581 ensureFreshStatus();
582 return status.getSetupProgress();
583 }
584
585 /**
586 * Check if the job is finished or not.
587 * This is a non-blocking call.
588 *
589 * @return <code>true</code> if the job is complete, else <code>false</code>.
590 * @throws IOException
591 */
592 public boolean isComplete() throws IOException, InterruptedException {
593 ensureState(JobState.RUNNING);
594 updateStatus();
595 return status.isJobComplete();
596 }
597
598 /**
599 * Check if the job completed successfully.
600 *
601 * @return <code>true</code> if the job succeeded, else <code>false</code>.
602 * @throws IOException
603 */
604 public boolean isSuccessful() throws IOException, InterruptedException {
605 ensureState(JobState.RUNNING);
606 updateStatus();
607 return status.getState() == JobStatus.State.SUCCEEDED;
608 }
609
610 /**
611 * Kill the running job. Blocks until all job tasks have been
612 * killed as well. If the job is no longer running, it simply returns.
613 *
614 * @throws IOException
615 */
616 public void killJob() throws IOException, InterruptedException {
617 ensureState(JobState.RUNNING);
618 cluster.getClient().killJob(getJobID());
619 }
620
621 /**
622 * Set the priority of a running job.
623 * @param priority the new priority for the job.
624 * @throws IOException
625 */
626 public void setPriority(JobPriority priority)
627 throws IOException, InterruptedException {
628 if (state == JobState.DEFINE) {
629 conf.setJobPriority(
630 org.apache.hadoop.mapred.JobPriority.valueOf(priority.name()));
631 } else {
632 ensureState(JobState.RUNNING);
633 final JobPriority tmpPriority = priority;
634 ugi.doAs(new PrivilegedExceptionAction<Object>() {
635 @Override
636 public Object run() throws IOException, InterruptedException {
637 cluster.getClient().setJobPriority(getJobID(), tmpPriority.toString());
638 return null;
639 }
640 });
641 }
642 }
643
644 /**
645 * Get events indicating completion (success/failure) of component tasks.
646 *
647 * @param startFrom index to start fetching events from
648 * @param numEvents number of events to fetch
649 * @return an array of {@link TaskCompletionEvent}s
650 * @throws IOException
651 */
652 public TaskCompletionEvent[] getTaskCompletionEvents(final int startFrom,
653 final int numEvents) throws IOException, InterruptedException {
654 ensureState(JobState.RUNNING);
655 return ugi.doAs(new PrivilegedExceptionAction<TaskCompletionEvent[]>() {
656 @Override
657 public TaskCompletionEvent[] run() throws IOException, InterruptedException {
658 return cluster.getClient().getTaskCompletionEvents(getJobID(),
659 startFrom, numEvents);
660 }
661 });
662 }
663
664 /**
665 * Kill indicated task attempt.
666 *
667 * @param taskId the id of the task to be terminated.
668 * @throws IOException
669 */
670 public boolean killTask(final TaskAttemptID taskId)
671 throws IOException, InterruptedException {
672 ensureState(JobState.RUNNING);
673 return ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
674 public Boolean run() throws IOException, InterruptedException {
675 return cluster.getClient().killTask(taskId, false);
676 }
677 });
678 }
679
680 /**
681 * Fail indicated task attempt.
682 *
683 * @param taskId the id of the task to be terminated.
684 * @throws IOException
685 */
686 public boolean failTask(final TaskAttemptID taskId)
687 throws IOException, InterruptedException {
688 ensureState(JobState.RUNNING);
689 return ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
690 @Override
691 public Boolean run() throws IOException, InterruptedException {
692 return cluster.getClient().killTask(taskId, true);
693 }
694 });
695 }
696
697 /**
698 * Gets the counters for this job. May return null if the job has been
699 * retired and the job is no longer in the completed job store.
700 *
701 * @return the counters for this job.
702 * @throws IOException
703 */
704 public Counters getCounters()
705 throws IOException, InterruptedException {
706 ensureState(JobState.RUNNING);
707 return ugi.doAs(new PrivilegedExceptionAction<Counters>() {
708 @Override
709 public Counters run() throws IOException, InterruptedException {
710 return cluster.getClient().getJobCounters(getJobID());
711 }
712 });
713 }
714
715 /**
716 * Gets the diagnostic messages for a given task attempt.
717 * @param taskid
718 * @return the list of diagnostic messages for the task
719 * @throws IOException
720 */
721 public String[] getTaskDiagnostics(final TaskAttemptID taskid)
722 throws IOException, InterruptedException {
723 ensureState(JobState.RUNNING);
724 return ugi.doAs(new PrivilegedExceptionAction<String[]>() {
725 @Override
726 public String[] run() throws IOException, InterruptedException {
727 return cluster.getClient().getTaskDiagnostics(taskid);
728 }
729 });
730 }
731
732 /**
733 * Set the number of reduce tasks for the job.
734 * @param tasks the number of reduce tasks
735 * @throws IllegalStateException if the job is submitted
736 */
737 public void setNumReduceTasks(int tasks) throws IllegalStateException {
738 ensureState(JobState.DEFINE);
739 conf.setNumReduceTasks(tasks);
740 }
741
742 /**
743 * Set the current working directory for the default file system.
744 *
745 * @param dir the new current working directory.
746 * @throws IllegalStateException if the job is submitted
747 */
748 public void setWorkingDirectory(Path dir) throws IOException {
749 ensureState(JobState.DEFINE);
750 conf.setWorkingDirectory(dir);
751 }
752
753 /**
754 * Set the {@link InputFormat} for the job.
755 * @param cls the <code>InputFormat</code> to use
756 * @throws IllegalStateException if the job is submitted
757 */
758 public void setInputFormatClass(Class<? extends InputFormat> cls
759 ) throws IllegalStateException {
760 ensureState(JobState.DEFINE);
761 conf.setClass(INPUT_FORMAT_CLASS_ATTR, cls,
762 InputFormat.class);
763 }
764
765 /**
766 * Set the {@link OutputFormat} for the job.
767 * @param cls the <code>OutputFormat</code> to use
768 * @throws IllegalStateException if the job is submitted
769 */
770 public void setOutputFormatClass(Class<? extends OutputFormat> cls
771 ) throws IllegalStateException {
772 ensureState(JobState.DEFINE);
773 conf.setClass(OUTPUT_FORMAT_CLASS_ATTR, cls,
774 OutputFormat.class);
775 }
776
777 /**
778 * Set the {@link Mapper} for the job.
779 * @param cls the <code>Mapper</code> to use
780 * @throws IllegalStateException if the job is submitted
781 */
782 public void setMapperClass(Class<? extends Mapper> cls
783 ) throws IllegalStateException {
784 ensureState(JobState.DEFINE);
785 conf.setClass(MAP_CLASS_ATTR, cls, Mapper.class);
786 }
787
788 /**
789 * Set the Jar by finding where a given class came from.
790 * @param cls the example class
791 */
792 public void setJarByClass(Class<?> cls) {
793 ensureState(JobState.DEFINE);
794 conf.setJarByClass(cls);
795 }
796
797 /**
798 * Set the job jar
799 */
800 public void setJar(String jar) {
801 ensureState(JobState.DEFINE);
802 conf.setJar(jar);
803 }
804
805 /**
806 * Set the reported username for this job.
807 *
808 * @param user the username for this job.
809 */
810 public void setUser(String user) {
811 ensureState(JobState.DEFINE);
812 conf.setUser(user);
813 }
814
815 /**
816 * Set the combiner class for the job.
817 * @param cls the combiner to use
818 * @throws IllegalStateException if the job is submitted
819 */
820 public void setCombinerClass(Class<? extends Reducer> cls
821 ) throws IllegalStateException {
822 ensureState(JobState.DEFINE);
823 conf.setClass(COMBINE_CLASS_ATTR, cls, Reducer.class);
824 }
825
826 /**
827 * Set the {@link Reducer} for the job.
828 * @param cls the <code>Reducer</code> to use
829 * @throws IllegalStateException if the job is submitted
830 */
831 public void setReducerClass(Class<? extends Reducer> cls
832 ) throws IllegalStateException {
833 ensureState(JobState.DEFINE);
834 conf.setClass(REDUCE_CLASS_ATTR, cls, Reducer.class);
835 }
836
837 /**
838 * Set the {@link Partitioner} for the job.
839 * @param cls the <code>Partitioner</code> to use
840 * @throws IllegalStateException if the job is submitted
841 */
842 public void setPartitionerClass(Class<? extends Partitioner> cls
843 ) throws IllegalStateException {
844 ensureState(JobState.DEFINE);
845 conf.setClass(PARTITIONER_CLASS_ATTR, cls,
846 Partitioner.class);
847 }
848
849 /**
850 * Set the key class for the map output data. This allows the user to
851 * specify the map output key class to be different than the final output
852 * value class.
853 *
854 * @param theClass the map output key class.
855 * @throws IllegalStateException if the job is submitted
856 */
857 public void setMapOutputKeyClass(Class<?> theClass
858 ) throws IllegalStateException {
859 ensureState(JobState.DEFINE);
860 conf.setMapOutputKeyClass(theClass);
861 }
862
863 /**
864 * Set the value class for the map output data. This allows the user to
865 * specify the map output value class to be different than the final output
866 * value class.
867 *
868 * @param theClass the map output value class.
869 * @throws IllegalStateException if the job is submitted
870 */
871 public void setMapOutputValueClass(Class<?> theClass
872 ) throws IllegalStateException {
873 ensureState(JobState.DEFINE);
874 conf.setMapOutputValueClass(theClass);
875 }
876
877 /**
878 * Set the key class for the job output data.
879 *
880 * @param theClass the key class for the job output data.
881 * @throws IllegalStateException if the job is submitted
882 */
883 public void setOutputKeyClass(Class<?> theClass
884 ) throws IllegalStateException {
885 ensureState(JobState.DEFINE);
886 conf.setOutputKeyClass(theClass);
887 }
888
889 /**
890 * Set the value class for job outputs.
891 *
892 * @param theClass the value class for job outputs.
893 * @throws IllegalStateException if the job is submitted
894 */
895 public void setOutputValueClass(Class<?> theClass
896 ) throws IllegalStateException {
897 ensureState(JobState.DEFINE);
898 conf.setOutputValueClass(theClass);
899 }
900
901 /**
902 * Define the comparator that controls how the keys are sorted before they
903 * are passed to the {@link Reducer}.
904 * @param cls the raw comparator
905 * @throws IllegalStateException if the job is submitted
906 */
907 public void setSortComparatorClass(Class<? extends RawComparator> cls
908 ) throws IllegalStateException {
909 ensureState(JobState.DEFINE);
910 conf.setOutputKeyComparatorClass(cls);
911 }
912
913 /**
914 * Define the comparator that controls which keys are grouped together
915 * for a single call to
916 * {@link Reducer#reduce(Object, Iterable,
917 * org.apache.hadoop.mapreduce.Reducer.Context)}
918 * @param cls the raw comparator to use
919 * @throws IllegalStateException if the job is submitted
920 */
921 public void setGroupingComparatorClass(Class<? extends RawComparator> cls
922 ) throws IllegalStateException {
923 ensureState(JobState.DEFINE);
924 conf.setOutputValueGroupingComparator(cls);
925 }
926
927 /**
928 * Set the user-specified job name.
929 *
930 * @param name the job's new name.
931 * @throws IllegalStateException if the job is submitted
932 */
933 public void setJobName(String name) throws IllegalStateException {
934 ensureState(JobState.DEFINE);
935 conf.setJobName(name);
936 }
937
938 /**
939 * Turn speculative execution on or off for this job.
940 *
941 * @param speculativeExecution <code>true</code> if speculative execution
942 * should be turned on, else <code>false</code>.
943 */
944 public void setSpeculativeExecution(boolean speculativeExecution) {
945 ensureState(JobState.DEFINE);
946 conf.setSpeculativeExecution(speculativeExecution);
947 }
948
949 /**
950 * Turn speculative execution on or off for this job for map tasks.
951 *
952 * @param speculativeExecution <code>true</code> if speculative execution
953 * should be turned on for map tasks,
954 * else <code>false</code>.
955 */
956 public void setMapSpeculativeExecution(boolean speculativeExecution) {
957 ensureState(JobState.DEFINE);
958 conf.setMapSpeculativeExecution(speculativeExecution);
959 }
960
961 /**
962 * Turn speculative execution on or off for this job for reduce tasks.
963 *
964 * @param speculativeExecution <code>true</code> if speculative execution
965 * should be turned on for reduce tasks,
966 * else <code>false</code>.
967 */
968 public void setReduceSpeculativeExecution(boolean speculativeExecution) {
969 ensureState(JobState.DEFINE);
970 conf.setReduceSpeculativeExecution(speculativeExecution);
971 }
972
973 /**
974 * Specify whether job-setup and job-cleanup is needed for the job
975 *
976 * @param needed If <code>true</code>, job-setup and job-cleanup will be
977 * considered from {@link OutputCommitter}
978 * else ignored.
979 */
980 public void setJobSetupCleanupNeeded(boolean needed) {
981 ensureState(JobState.DEFINE);
982 conf.setBoolean(SETUP_CLEANUP_NEEDED, needed);
983 }
984
985 /**
986 * Set the given set of archives
987 * @param archives The list of archives that need to be localized
988 */
989 public void setCacheArchives(URI[] archives) {
990 ensureState(JobState.DEFINE);
991 DistributedCache.setCacheArchives(archives, conf);
992 }
993
994 /**
995 * Set the given set of files
996 * @param files The list of files that need to be localized
997 */
998 public void setCacheFiles(URI[] files) {
999 ensureState(JobState.DEFINE);
1000 DistributedCache.setCacheFiles(files, conf);
1001 }
1002
1003 /**
1004 * Add a archives to be localized
1005 * @param uri The uri of the cache to be localized
1006 */
1007 public void addCacheArchive(URI uri) {
1008 ensureState(JobState.DEFINE);
1009 DistributedCache.addCacheArchive(uri, conf);
1010 }
1011
1012 /**
1013 * Add a file to be localized
1014 * @param uri The uri of the cache to be localized
1015 */
1016 public void addCacheFile(URI uri) {
1017 ensureState(JobState.DEFINE);
1018 DistributedCache.addCacheFile(uri, conf);
1019 }
1020
1021 /**
1022 * Add an file path to the current set of classpath entries It adds the file
1023 * to cache as well.
1024 *
1025 * Files added with this method will not be unpacked while being added to the
1026 * classpath.
1027 * To add archives to classpath, use the {@link #addArchiveToClassPath(Path)}
1028 * method instead.
1029 *
1030 * @param file Path of the file to be added
1031 */
1032 public void addFileToClassPath(Path file)
1033 throws IOException {
1034 ensureState(JobState.DEFINE);
1035 DistributedCache.addFileToClassPath(file, conf, file.getFileSystem(conf));
1036 }
1037
1038 /**
1039 * Add an archive path to the current set of classpath entries. It adds the
1040 * archive to cache as well.
1041 *
1042 * Archive files will be unpacked and added to the classpath
1043 * when being distributed.
1044 *
1045 * @param archive Path of the archive to be added
1046 */
1047 public void addArchiveToClassPath(Path archive)
1048 throws IOException {
1049 ensureState(JobState.DEFINE);
1050 DistributedCache.addArchiveToClassPath(archive, conf, archive.getFileSystem(conf));
1051 }
1052
1053 /**
1054 * Originally intended to enable symlinks, but currently symlinks cannot be
1055 * disabled.
1056 */
1057 @Deprecated
1058 public void createSymlink() {
1059 ensureState(JobState.DEFINE);
1060 DistributedCache.createSymlink(conf);
1061 }
1062
1063 /**
1064 * Expert: Set the number of maximum attempts that will be made to run a
1065 * map task.
1066 *
1067 * @param n the number of attempts per map task.
1068 */
1069 public void setMaxMapAttempts(int n) {
1070 ensureState(JobState.DEFINE);
1071 conf.setMaxMapAttempts(n);
1072 }
1073
1074 /**
1075 * Expert: Set the number of maximum attempts that will be made to run a
1076 * reduce task.
1077 *
1078 * @param n the number of attempts per reduce task.
1079 */
1080 public void setMaxReduceAttempts(int n) {
1081 ensureState(JobState.DEFINE);
1082 conf.setMaxReduceAttempts(n);
1083 }
1084
1085 /**
1086 * Set whether the system should collect profiler information for some of
1087 * the tasks in this job? The information is stored in the user log
1088 * directory.
1089 * @param newValue true means it should be gathered
1090 */
1091 public void setProfileEnabled(boolean newValue) {
1092 ensureState(JobState.DEFINE);
1093 conf.setProfileEnabled(newValue);
1094 }
1095
1096 /**
1097 * Set the profiler configuration arguments. If the string contains a '%s' it
1098 * will be replaced with the name of the profiling output file when the task
1099 * runs.
1100 *
1101 * This value is passed to the task child JVM on the command line.
1102 *
1103 * @param value the configuration string
1104 */
1105 public void setProfileParams(String value) {
1106 ensureState(JobState.DEFINE);
1107 conf.setProfileParams(value);
1108 }
1109
1110 /**
1111 * Set the ranges of maps or reduces to profile. setProfileEnabled(true)
1112 * must also be called.
1113 * @param newValue a set of integer ranges of the map ids
1114 */
1115 public void setProfileTaskRange(boolean isMap, String newValue) {
1116 ensureState(JobState.DEFINE);
1117 conf.setProfileTaskRange(isMap, newValue);
1118 }
1119
1120 private void ensureNotSet(String attr, String msg) throws IOException {
1121 if (conf.get(attr) != null) {
1122 throw new IOException(attr + " is incompatible with " + msg + " mode.");
1123 }
1124 }
1125
1126 /**
1127 * Sets the flag that will allow the JobTracker to cancel the HDFS delegation
1128 * tokens upon job completion. Defaults to true.
1129 */
1130 public void setCancelDelegationTokenUponJobCompletion(boolean value) {
1131 ensureState(JobState.DEFINE);
1132 conf.setBoolean(JOB_CANCEL_DELEGATION_TOKEN, value);
1133 }
1134
1135 /**
1136 * Default to the new APIs unless they are explicitly set or the old mapper or
1137 * reduce attributes are used.
1138 * @throws IOException if the configuration is inconsistant
1139 */
1140 private void setUseNewAPI() throws IOException {
1141 int numReduces = conf.getNumReduceTasks();
1142 String oldMapperClass = "mapred.mapper.class";
1143 String oldReduceClass = "mapred.reducer.class";
1144 conf.setBooleanIfUnset("mapred.mapper.new-api",
1145 conf.get(oldMapperClass) == null);
1146 if (conf.getUseNewMapper()) {
1147 String mode = "new map API";
1148 ensureNotSet("mapred.input.format.class", mode);
1149 ensureNotSet(oldMapperClass, mode);
1150 if (numReduces != 0) {
1151 ensureNotSet("mapred.partitioner.class", mode);
1152 } else {
1153 ensureNotSet("mapred.output.format.class", mode);
1154 }
1155 } else {
1156 String mode = "map compatability";
1157 ensureNotSet(INPUT_FORMAT_CLASS_ATTR, mode);
1158 ensureNotSet(MAP_CLASS_ATTR, mode);
1159 if (numReduces != 0) {
1160 ensureNotSet(PARTITIONER_CLASS_ATTR, mode);
1161 } else {
1162 ensureNotSet(OUTPUT_FORMAT_CLASS_ATTR, mode);
1163 }
1164 }
1165 if (numReduces != 0) {
1166 conf.setBooleanIfUnset("mapred.reducer.new-api",
1167 conf.get(oldReduceClass) == null);
1168 if (conf.getUseNewReducer()) {
1169 String mode = "new reduce API";
1170 ensureNotSet("mapred.output.format.class", mode);
1171 ensureNotSet(oldReduceClass, mode);
1172 } else {
1173 String mode = "reduce compatability";
1174 ensureNotSet(OUTPUT_FORMAT_CLASS_ATTR, mode);
1175 ensureNotSet(REDUCE_CLASS_ATTR, mode);
1176 }
1177 }
1178 }
1179
1180 private synchronized void connect()
1181 throws IOException, InterruptedException, ClassNotFoundException {
1182 if (cluster == null) {
1183 cluster =
1184 ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
1185 public Cluster run()
1186 throws IOException, InterruptedException,
1187 ClassNotFoundException {
1188 return new Cluster(getConfiguration());
1189 }
1190 });
1191 }
1192 }
1193
1194 boolean isConnected() {
1195 return cluster != null;
1196 }
1197
1198 /** Only for mocking via unit tests. */
1199 @Private
1200 public JobSubmitter getJobSubmitter(FileSystem fs,
1201 ClientProtocol submitClient) throws IOException {
1202 return new JobSubmitter(fs, submitClient);
1203 }
1204 /**
1205 * Submit the job to the cluster and return immediately.
1206 * @throws IOException
1207 */
1208 public void submit()
1209 throws IOException, InterruptedException, ClassNotFoundException {
1210 ensureState(JobState.DEFINE);
1211 setUseNewAPI();
1212 connect();
1213 final JobSubmitter submitter =
1214 getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
1215 status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
1216 public JobStatus run() throws IOException, InterruptedException,
1217 ClassNotFoundException {
1218 return submitter.submitJobInternal(Job.this, cluster);
1219 }
1220 });
1221 state = JobState.RUNNING;
1222 LOG.info("The url to track the job: " + getTrackingURL());
1223 }
1224
1225 /**
1226 * Submit the job to the cluster and wait for it to finish.
1227 * @param verbose print the progress to the user
1228 * @return true if the job succeeded
1229 * @throws IOException thrown if the communication with the
1230 * <code>JobTracker</code> is lost
1231 */
1232 public boolean waitForCompletion(boolean verbose
1233 ) throws IOException, InterruptedException,
1234 ClassNotFoundException {
1235 if (state == JobState.DEFINE) {
1236 submit();
1237 }
1238 if (verbose) {
1239 monitorAndPrintJob();
1240 } else {
1241 // get the completion poll interval from the client.
1242 int completionPollIntervalMillis =
1243 Job.getCompletionPollInterval(cluster.getConf());
1244 while (!isComplete()) {
1245 try {
1246 Thread.sleep(completionPollIntervalMillis);
1247 } catch (InterruptedException ie) {
1248 }
1249 }
1250 }
1251 return isSuccessful();
1252 }
1253
1254 /**
1255 * Monitor a job and print status in real-time as progress is made and tasks
1256 * fail.
1257 * @return true if the job succeeded
1258 * @throws IOException if communication to the JobTracker fails
1259 */
1260 public boolean monitorAndPrintJob()
1261 throws IOException, InterruptedException {
1262 String lastReport = null;
1263 Job.TaskStatusFilter filter;
1264 Configuration clientConf = getConfiguration();
1265 filter = Job.getTaskOutputFilter(clientConf);
1266 JobID jobId = getJobID();
1267 LOG.info("Running job: " + jobId);
1268 int eventCounter = 0;
1269 boolean profiling = getProfileEnabled();
1270 IntegerRanges mapRanges = getProfileTaskRange(true);
1271 IntegerRanges reduceRanges = getProfileTaskRange(false);
1272 int progMonitorPollIntervalMillis =
1273 Job.getProgressPollInterval(clientConf);
1274 /* make sure to report full progress after the job is done */
1275 boolean reportedAfterCompletion = false;
1276 boolean reportedUberMode = false;
1277 while (!isComplete() || !reportedAfterCompletion) {
1278 if (isComplete()) {
1279 reportedAfterCompletion = true;
1280 } else {
1281 Thread.sleep(progMonitorPollIntervalMillis);
1282 }
1283 if (status.getState() == JobStatus.State.PREP) {
1284 continue;
1285 }
1286 if (!reportedUberMode) {
1287 reportedUberMode = true;
1288 LOG.info("Job " + jobId + " running in uber mode : " + isUber());
1289 }
1290 String report =
1291 (" map " + StringUtils.formatPercent(mapProgress(), 0)+
1292 " reduce " +
1293 StringUtils.formatPercent(reduceProgress(), 0));
1294 if (!report.equals(lastReport)) {
1295 LOG.info(report);
1296 lastReport = report;
1297 }
1298
1299 TaskCompletionEvent[] events =
1300 getTaskCompletionEvents(eventCounter, 10);
1301 eventCounter += events.length;
1302 printTaskEvents(events, filter, profiling, mapRanges, reduceRanges);
1303 }
1304 boolean success = isSuccessful();
1305 if (success) {
1306 LOG.info("Job " + jobId + " completed successfully");
1307 } else {
1308 LOG.info("Job " + jobId + " failed with state " + status.getState() +
1309 " due to: " + status.getFailureInfo());
1310 }
1311 Counters counters = getCounters();
1312 if (counters != null) {
1313 LOG.info(counters.toString());
1314 }
1315 return success;
1316 }
1317
1318 /**
1319 * @return true if the profile parameters indicate that this is using
1320 * hprof, which generates profile files in a particular location
1321 * that we can retrieve to the client.
1322 */
1323 private boolean shouldDownloadProfile() {
1324 // Check the argument string that was used to initialize profiling.
1325 // If this indicates hprof and file-based output, then we're ok to
1326 // download.
1327 String profileParams = getProfileParams();
1328
1329 if (null == profileParams) {
1330 return false;
1331 }
1332
1333 // Split this on whitespace.
1334 String [] parts = profileParams.split("[ \\t]+");
1335
1336 // If any of these indicate hprof, and the use of output files, return true.
1337 boolean hprofFound = false;
1338 boolean fileFound = false;
1339 for (String p : parts) {
1340 if (p.startsWith("-agentlib:hprof") || p.startsWith("-Xrunhprof")) {
1341 hprofFound = true;
1342
1343 // This contains a number of comma-delimited components, one of which
1344 // may specify the file to write to. Make sure this is present and
1345 // not empty.
1346 String [] subparts = p.split(",");
1347 for (String sub : subparts) {
1348 if (sub.startsWith("file=") && sub.length() != "file=".length()) {
1349 fileFound = true;
1350 }
1351 }
1352 }
1353 }
1354
1355 return hprofFound && fileFound;
1356 }
1357
1358 private void printTaskEvents(TaskCompletionEvent[] events,
1359 Job.TaskStatusFilter filter, boolean profiling, IntegerRanges mapRanges,
1360 IntegerRanges reduceRanges) throws IOException, InterruptedException {
1361 for (TaskCompletionEvent event : events) {
1362 switch (filter) {
1363 case NONE:
1364 break;
1365 case SUCCEEDED:
1366 if (event.getStatus() ==
1367 TaskCompletionEvent.Status.SUCCEEDED) {
1368 LOG.info(event.toString());
1369 }
1370 break;
1371 case FAILED:
1372 if (event.getStatus() ==
1373 TaskCompletionEvent.Status.FAILED) {
1374 LOG.info(event.toString());
1375 // Displaying the task diagnostic information
1376 TaskAttemptID taskId = event.getTaskAttemptId();
1377 String[] taskDiagnostics = getTaskDiagnostics(taskId);
1378 if (taskDiagnostics != null) {
1379 for (String diagnostics : taskDiagnostics) {
1380 System.err.println(diagnostics);
1381 }
1382 }
1383 }
1384 break;
1385 case KILLED:
1386 if (event.getStatus() == TaskCompletionEvent.Status.KILLED){
1387 LOG.info(event.toString());
1388 }
1389 break;
1390 case ALL:
1391 LOG.info(event.toString());
1392 break;
1393 }
1394 }
1395 }
1396
1397 /** The interval at which monitorAndPrintJob() prints status */
1398 public static int getProgressPollInterval(Configuration conf) {
1399 // Read progress monitor poll interval from config. Default is 1 second.
1400 int progMonitorPollIntervalMillis = conf.getInt(
1401 PROGRESS_MONITOR_POLL_INTERVAL_KEY, DEFAULT_MONITOR_POLL_INTERVAL);
1402 if (progMonitorPollIntervalMillis < 1) {
1403 LOG.warn(PROGRESS_MONITOR_POLL_INTERVAL_KEY +
1404 " has been set to an invalid value; "
1405 + " replacing with " + DEFAULT_MONITOR_POLL_INTERVAL);
1406 progMonitorPollIntervalMillis = DEFAULT_MONITOR_POLL_INTERVAL;
1407 }
1408 return progMonitorPollIntervalMillis;
1409 }
1410
1411 /** The interval at which waitForCompletion() should check. */
1412 public static int getCompletionPollInterval(Configuration conf) {
1413 int completionPollIntervalMillis = conf.getInt(
1414 COMPLETION_POLL_INTERVAL_KEY, DEFAULT_COMPLETION_POLL_INTERVAL);
1415 if (completionPollIntervalMillis < 1) {
1416 LOG.warn(COMPLETION_POLL_INTERVAL_KEY +
1417 " has been set to an invalid value; "
1418 + "replacing with " + DEFAULT_COMPLETION_POLL_INTERVAL);
1419 completionPollIntervalMillis = DEFAULT_COMPLETION_POLL_INTERVAL;
1420 }
1421 return completionPollIntervalMillis;
1422 }
1423
1424 /**
1425 * Get the task output filter.
1426 *
1427 * @param conf the configuration.
1428 * @return the filter level.
1429 */
1430 public static TaskStatusFilter getTaskOutputFilter(Configuration conf) {
1431 return TaskStatusFilter.valueOf(conf.get(Job.OUTPUT_FILTER, "FAILED"));
1432 }
1433
1434 /**
1435 * Modify the Configuration to set the task output filter.
1436 *
1437 * @param conf the Configuration to modify.
1438 * @param newValue the value to set.
1439 */
1440 public static void setTaskOutputFilter(Configuration conf,
1441 TaskStatusFilter newValue) {
1442 conf.set(Job.OUTPUT_FILTER, newValue.toString());
1443 }
1444
1445 public boolean isUber() throws IOException, InterruptedException {
1446 ensureState(JobState.RUNNING);
1447 updateStatus();
1448 return status.isUber();
1449 }
1450
1451 }