001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapred;
020
021
022 import java.io.IOException;
023 import java.net.URL;
024 import java.net.URLDecoder;
025 import java.util.Enumeration;
026 import java.util.regex.Pattern;
027
028 import org.apache.commons.logging.Log;
029 import org.apache.commons.logging.LogFactory;
030 import org.apache.hadoop.classification.InterfaceAudience;
031 import org.apache.hadoop.classification.InterfaceAudience.Private;
032 import org.apache.hadoop.classification.InterfaceStability;
033 import org.apache.hadoop.conf.Configuration;
034 import org.apache.hadoop.fs.FileStatus;
035 import org.apache.hadoop.fs.FileSystem;
036 import org.apache.hadoop.fs.Path;
037 import org.apache.hadoop.io.LongWritable;
038 import org.apache.hadoop.io.RawComparator;
039 import org.apache.hadoop.io.Text;
040 import org.apache.hadoop.io.WritableComparable;
041 import org.apache.hadoop.io.WritableComparator;
042 import org.apache.hadoop.io.compress.CompressionCodec;
043 import org.apache.hadoop.mapred.lib.HashPartitioner;
044 import org.apache.hadoop.mapred.lib.IdentityMapper;
045 import org.apache.hadoop.mapred.lib.IdentityReducer;
046 import org.apache.hadoop.mapred.lib.KeyFieldBasedComparator;
047 import org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner;
048 import org.apache.hadoop.mapreduce.MRConfig;
049 import org.apache.hadoop.mapreduce.MRJobConfig;
050 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
051 import org.apache.hadoop.mapreduce.util.ConfigUtil;
052 import org.apache.hadoop.security.Credentials;
053 import org.apache.hadoop.util.ReflectionUtils;
054 import org.apache.hadoop.util.Tool;
055 import org.apache.log4j.Level;
056
057 /**
058 * A map/reduce job configuration.
059 *
060 * <p><code>JobConf</code> is the primary interface for a user to describe a
061 * map-reduce job to the Hadoop framework for execution. The framework tries to
062 * faithfully execute the job as-is described by <code>JobConf</code>, however:
063 * <ol>
064 * <li>
065 * Some configuration parameters might have been marked as
066 * <a href="{@docRoot}/org/apache/hadoop/conf/Configuration.html#FinalParams">
067 * final</a> by administrators and hence cannot be altered.
068 * </li>
069 * <li>
070 * While some job parameters are straight-forward to set
071 * (e.g. {@link #setNumReduceTasks(int)}), some parameters interact subtly
072 * rest of the framework and/or job-configuration and is relatively more
073 * complex for the user to control finely (e.g. {@link #setNumMapTasks(int)}).
074 * </li>
075 * </ol></p>
076 *
077 * <p><code>JobConf</code> typically specifies the {@link Mapper}, combiner
078 * (if any), {@link Partitioner}, {@link Reducer}, {@link InputFormat} and
079 * {@link OutputFormat} implementations to be used etc.
080 *
081 * <p>Optionally <code>JobConf</code> is used to specify other advanced facets
082 * of the job such as <code>Comparator</code>s to be used, files to be put in
083 * the {@link DistributedCache}, whether or not intermediate and/or job outputs
084 * are to be compressed (and how), debugability via user-provided scripts
085 * ( {@link #setMapDebugScript(String)}/{@link #setReduceDebugScript(String)}),
086 * for doing post-processing on task logs, task's stdout, stderr, syslog.
087 * and etc.</p>
088 *
089 * <p>Here is an example on how to configure a job via <code>JobConf</code>:</p>
090 * <p><blockquote><pre>
091 * // Create a new JobConf
092 * JobConf job = new JobConf(new Configuration(), MyJob.class);
093 *
094 * // Specify various job-specific parameters
095 * job.setJobName("myjob");
096 *
097 * FileInputFormat.setInputPaths(job, new Path("in"));
098 * FileOutputFormat.setOutputPath(job, new Path("out"));
099 *
100 * job.setMapperClass(MyJob.MyMapper.class);
101 * job.setCombinerClass(MyJob.MyReducer.class);
102 * job.setReducerClass(MyJob.MyReducer.class);
103 *
104 * job.setInputFormat(SequenceFileInputFormat.class);
105 * job.setOutputFormat(SequenceFileOutputFormat.class);
106 * </pre></blockquote></p>
107 *
108 * @see JobClient
109 * @see ClusterStatus
110 * @see Tool
111 * @see DistributedCache
112 */
113 @InterfaceAudience.Public
114 @InterfaceStability.Stable
115 public class JobConf extends Configuration {
116
117 private static final Log LOG = LogFactory.getLog(JobConf.class);
118
119 static{
120 ConfigUtil.loadResources();
121 }
122
123 /**
124 * @deprecated Use {@link #MAPRED_JOB_MAP_MEMORY_MB_PROPERTY} and
125 * {@link #MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY}
126 */
127 @Deprecated
128 public static final String MAPRED_TASK_MAXVMEM_PROPERTY =
129 "mapred.task.maxvmem";
130
131 /**
132 * @deprecated
133 */
134 @Deprecated
135 public static final String UPPER_LIMIT_ON_TASK_VMEM_PROPERTY =
136 "mapred.task.limit.maxvmem";
137
138 /**
139 * @deprecated
140 */
141 @Deprecated
142 public static final String MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY =
143 "mapred.task.default.maxvmem";
144
145 /**
146 * @deprecated
147 */
148 @Deprecated
149 public static final String MAPRED_TASK_MAXPMEM_PROPERTY =
150 "mapred.task.maxpmem";
151
152 /**
153 * A value which if set for memory related configuration options,
154 * indicates that the options are turned off.
155 */
156 public static final long DISABLED_MEMORY_LIMIT = -1L;
157
158 /**
159 * Property name for the configuration property mapreduce.cluster.local.dir
160 */
161 public static final String MAPRED_LOCAL_DIR_PROPERTY = MRConfig.LOCAL_DIR;
162
163 /**
164 * Name of the queue to which jobs will be submitted, if no queue
165 * name is mentioned.
166 */
167 public static final String DEFAULT_QUEUE_NAME = "default";
168
169 static final String MAPRED_JOB_MAP_MEMORY_MB_PROPERTY =
170 JobContext.MAP_MEMORY_MB;
171
172 static final String MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY =
173 JobContext.REDUCE_MEMORY_MB;
174
175 /** Pattern for the default unpacking behavior for job jars */
176 public static final Pattern UNPACK_JAR_PATTERN_DEFAULT =
177 Pattern.compile("(?:classes/|lib/).*");
178
179 /**
180 * Configuration key to set the java command line options for the child
181 * map and reduce tasks.
182 *
183 * Java opts for the task tracker child processes.
184 * The following symbol, if present, will be interpolated: @taskid@.
185 * It is replaced by current TaskID. Any other occurrences of '@' will go
186 * unchanged.
187 * For example, to enable verbose gc logging to a file named for the taskid in
188 * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of:
189 * -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc
190 *
191 * The configuration variable {@link #MAPRED_TASK_ENV} can be used to pass
192 * other environment variables to the child processes.
193 *
194 * @deprecated Use {@link #MAPRED_MAP_TASK_JAVA_OPTS} or
195 * {@link #MAPRED_REDUCE_TASK_JAVA_OPTS}
196 */
197 @Deprecated
198 public static final String MAPRED_TASK_JAVA_OPTS = "mapred.child.java.opts";
199
200 /**
201 * Configuration key to set the java command line options for the map tasks.
202 *
203 * Java opts for the task tracker child map processes.
204 * The following symbol, if present, will be interpolated: @taskid@.
205 * It is replaced by current TaskID. Any other occurrences of '@' will go
206 * unchanged.
207 * For example, to enable verbose gc logging to a file named for the taskid in
208 * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of:
209 * -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc
210 *
211 * The configuration variable {@link #MAPRED_MAP_TASK_ENV} can be used to pass
212 * other environment variables to the map processes.
213 */
214 public static final String MAPRED_MAP_TASK_JAVA_OPTS =
215 JobContext.MAP_JAVA_OPTS;
216
217 /**
218 * Configuration key to set the java command line options for the reduce tasks.
219 *
220 * Java opts for the task tracker child reduce processes.
221 * The following symbol, if present, will be interpolated: @taskid@.
222 * It is replaced by current TaskID. Any other occurrences of '@' will go
223 * unchanged.
224 * For example, to enable verbose gc logging to a file named for the taskid in
225 * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of:
226 * -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc
227 *
228 * The configuration variable {@link #MAPRED_REDUCE_TASK_ENV} can be used to
229 * pass process environment variables to the reduce processes.
230 */
231 public static final String MAPRED_REDUCE_TASK_JAVA_OPTS =
232 JobContext.REDUCE_JAVA_OPTS;
233
234 public static final String DEFAULT_MAPRED_TASK_JAVA_OPTS = "-Xmx200m";
235
236 /**
237 * @deprecated
238 * Configuration key to set the maximum virtual memory available to the child
239 * map and reduce tasks (in kilo-bytes). This has been deprecated and will no
240 * longer have any effect.
241 */
242 @Deprecated
243 public static final String MAPRED_TASK_ULIMIT = "mapred.child.ulimit";
244
245 /**
246 * @deprecated
247 * Configuration key to set the maximum virtual memory available to the
248 * map tasks (in kilo-bytes). This has been deprecated and will no
249 * longer have any effect.
250 */
251 @Deprecated
252 public static final String MAPRED_MAP_TASK_ULIMIT = "mapreduce.map.ulimit";
253
254 /**
255 * @deprecated
256 * Configuration key to set the maximum virtual memory available to the
257 * reduce tasks (in kilo-bytes). This has been deprecated and will no
258 * longer have any effect.
259 */
260 @Deprecated
261 public static final String MAPRED_REDUCE_TASK_ULIMIT =
262 "mapreduce.reduce.ulimit";
263
264
265 /**
266 * Configuration key to set the environment of the child map/reduce tasks.
267 *
268 * The format of the value is <code>k1=v1,k2=v2</code>. Further it can
269 * reference existing environment variables via <code>$key</code>.
270 *
271 * Example:
272 * <ul>
273 * <li> A=foo - This will set the env variable A to foo. </li>
274 * <li> B=$X:c This is inherit tasktracker's X env variable. </li>
275 * </ul>
276 *
277 * @deprecated Use {@link #MAPRED_MAP_TASK_ENV} or
278 * {@link #MAPRED_REDUCE_TASK_ENV}
279 */
280 @Deprecated
281 public static final String MAPRED_TASK_ENV = "mapred.child.env";
282
283 /**
284 * Configuration key to set the maximum virutal memory available to the
285 * map tasks.
286 *
287 * The format of the value is <code>k1=v1,k2=v2</code>. Further it can
288 * reference existing environment variables via <code>$key</code>.
289 *
290 * Example:
291 * <ul>
292 * <li> A=foo - This will set the env variable A to foo. </li>
293 * <li> B=$X:c This is inherit tasktracker's X env variable. </li>
294 * </ul>
295 */
296 public static final String MAPRED_MAP_TASK_ENV = JobContext.MAP_ENV;
297
298 /**
299 * Configuration key to set the maximum virutal memory available to the
300 * reduce tasks.
301 *
302 * The format of the value is <code>k1=v1,k2=v2</code>. Further it can
303 * reference existing environment variables via <code>$key</code>.
304 *
305 * Example:
306 * <ul>
307 * <li> A=foo - This will set the env variable A to foo. </li>
308 * <li> B=$X:c This is inherit tasktracker's X env variable. </li>
309 * </ul>
310 */
311 public static final String MAPRED_REDUCE_TASK_ENV = JobContext.REDUCE_ENV;
312
313 private Credentials credentials = new Credentials();
314
315 /**
316 * Configuration key to set the logging {@link Level} for the map task.
317 *
318 * The allowed logging levels are:
319 * OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE and ALL.
320 */
321 public static final String MAPRED_MAP_TASK_LOG_LEVEL =
322 JobContext.MAP_LOG_LEVEL;
323
324 /**
325 * Configuration key to set the logging {@link Level} for the reduce task.
326 *
327 * The allowed logging levels are:
328 * OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE and ALL.
329 */
330 public static final String MAPRED_REDUCE_TASK_LOG_LEVEL =
331 JobContext.REDUCE_LOG_LEVEL;
332
333 /**
334 * Default logging level for map/reduce tasks.
335 */
336 public static final Level DEFAULT_LOG_LEVEL = Level.INFO;
337
338
339 /**
340 * Construct a map/reduce job configuration.
341 */
342 public JobConf() {
343 checkAndWarnDeprecation();
344 }
345
346 /**
347 * Construct a map/reduce job configuration.
348 *
349 * @param exampleClass a class whose containing jar is used as the job's jar.
350 */
351 public JobConf(Class exampleClass) {
352 setJarByClass(exampleClass);
353 checkAndWarnDeprecation();
354 }
355
356 /**
357 * Construct a map/reduce job configuration.
358 *
359 * @param conf a Configuration whose settings will be inherited.
360 */
361 public JobConf(Configuration conf) {
362 super(conf);
363
364 if (conf instanceof JobConf) {
365 JobConf that = (JobConf)conf;
366 credentials = that.credentials;
367 }
368
369 checkAndWarnDeprecation();
370 }
371
372
373 /** Construct a map/reduce job configuration.
374 *
375 * @param conf a Configuration whose settings will be inherited.
376 * @param exampleClass a class whose containing jar is used as the job's jar.
377 */
378 public JobConf(Configuration conf, Class exampleClass) {
379 this(conf);
380 setJarByClass(exampleClass);
381 }
382
383
384 /** Construct a map/reduce configuration.
385 *
386 * @param config a Configuration-format XML job description file.
387 */
388 public JobConf(String config) {
389 this(new Path(config));
390 }
391
392 /** Construct a map/reduce configuration.
393 *
394 * @param config a Configuration-format XML job description file.
395 */
396 public JobConf(Path config) {
397 super();
398 addResource(config);
399 checkAndWarnDeprecation();
400 }
401
402 /** A new map/reduce configuration where the behavior of reading from the
403 * default resources can be turned off.
404 * <p/>
405 * If the parameter {@code loadDefaults} is false, the new instance
406 * will not load resources from the default files.
407 *
408 * @param loadDefaults specifies whether to load from the default files
409 */
410 public JobConf(boolean loadDefaults) {
411 super(loadDefaults);
412 checkAndWarnDeprecation();
413 }
414
415 /**
416 * Get credentials for the job.
417 * @return credentials for the job
418 */
419 public Credentials getCredentials() {
420 return credentials;
421 }
422
423 @Private
424 public void setCredentials(Credentials credentials) {
425 this.credentials = credentials;
426 }
427
428 /**
429 * Get the user jar for the map-reduce job.
430 *
431 * @return the user jar for the map-reduce job.
432 */
433 public String getJar() { return get(JobContext.JAR); }
434
435 /**
436 * Set the user jar for the map-reduce job.
437 *
438 * @param jar the user jar for the map-reduce job.
439 */
440 public void setJar(String jar) { set(JobContext.JAR, jar); }
441
442 /**
443 * Get the pattern for jar contents to unpack on the tasktracker
444 */
445 public Pattern getJarUnpackPattern() {
446 return getPattern(JobContext.JAR_UNPACK_PATTERN, UNPACK_JAR_PATTERN_DEFAULT);
447 }
448
449
450 /**
451 * Set the job's jar file by finding an example class location.
452 *
453 * @param cls the example class.
454 */
455 public void setJarByClass(Class cls) {
456 String jar = findContainingJar(cls);
457 if (jar != null) {
458 setJar(jar);
459 }
460 }
461
462 public String[] getLocalDirs() throws IOException {
463 return getTrimmedStrings(MRConfig.LOCAL_DIR);
464 }
465
466 /**
467 * Use MRAsyncDiskService.moveAndDeleteAllVolumes instead.
468 */
469 @Deprecated
470 public void deleteLocalFiles() throws IOException {
471 String[] localDirs = getLocalDirs();
472 for (int i = 0; i < localDirs.length; i++) {
473 FileSystem.getLocal(this).delete(new Path(localDirs[i]), true);
474 }
475 }
476
477 public void deleteLocalFiles(String subdir) throws IOException {
478 String[] localDirs = getLocalDirs();
479 for (int i = 0; i < localDirs.length; i++) {
480 FileSystem.getLocal(this).delete(new Path(localDirs[i], subdir), true);
481 }
482 }
483
484 /**
485 * Constructs a local file name. Files are distributed among configured
486 * local directories.
487 */
488 public Path getLocalPath(String pathString) throws IOException {
489 return getLocalPath(MRConfig.LOCAL_DIR, pathString);
490 }
491
492 /**
493 * Get the reported username for this job.
494 *
495 * @return the username
496 */
497 public String getUser() {
498 return get(JobContext.USER_NAME);
499 }
500
501 /**
502 * Set the reported username for this job.
503 *
504 * @param user the username for this job.
505 */
506 public void setUser(String user) {
507 set(JobContext.USER_NAME, user);
508 }
509
510
511
512 /**
513 * Set whether the framework should keep the intermediate files for
514 * failed tasks.
515 *
516 * @param keep <code>true</code> if framework should keep the intermediate files
517 * for failed tasks, <code>false</code> otherwise.
518 *
519 */
520 public void setKeepFailedTaskFiles(boolean keep) {
521 setBoolean(JobContext.PRESERVE_FAILED_TASK_FILES, keep);
522 }
523
524 /**
525 * Should the temporary files for failed tasks be kept?
526 *
527 * @return should the files be kept?
528 */
529 public boolean getKeepFailedTaskFiles() {
530 return getBoolean(JobContext.PRESERVE_FAILED_TASK_FILES, false);
531 }
532
533 /**
534 * Set a regular expression for task names that should be kept.
535 * The regular expression ".*_m_000123_0" would keep the files
536 * for the first instance of map 123 that ran.
537 *
538 * @param pattern the java.util.regex.Pattern to match against the
539 * task names.
540 */
541 public void setKeepTaskFilesPattern(String pattern) {
542 set(JobContext.PRESERVE_FILES_PATTERN, pattern);
543 }
544
545 /**
546 * Get the regular expression that is matched against the task names
547 * to see if we need to keep the files.
548 *
549 * @return the pattern as a string, if it was set, othewise null.
550 */
551 public String getKeepTaskFilesPattern() {
552 return get(JobContext.PRESERVE_FILES_PATTERN);
553 }
554
555 /**
556 * Set the current working directory for the default file system.
557 *
558 * @param dir the new current working directory.
559 */
560 public void setWorkingDirectory(Path dir) {
561 dir = new Path(getWorkingDirectory(), dir);
562 set(JobContext.WORKING_DIR, dir.toString());
563 }
564
565 /**
566 * Get the current working directory for the default file system.
567 *
568 * @return the directory name.
569 */
570 public Path getWorkingDirectory() {
571 String name = get(JobContext.WORKING_DIR);
572 if (name != null) {
573 return new Path(name);
574 } else {
575 try {
576 Path dir = FileSystem.get(this).getWorkingDirectory();
577 set(JobContext.WORKING_DIR, dir.toString());
578 return dir;
579 } catch (IOException e) {
580 throw new RuntimeException(e);
581 }
582 }
583 }
584
585 /**
586 * Sets the number of tasks that a spawned task JVM should run
587 * before it exits
588 * @param numTasks the number of tasks to execute; defaults to 1;
589 * -1 signifies no limit
590 */
591 public void setNumTasksToExecutePerJvm(int numTasks) {
592 setInt(JobContext.JVM_NUMTASKS_TORUN, numTasks);
593 }
594
595 /**
596 * Get the number of tasks that a spawned JVM should execute
597 */
598 public int getNumTasksToExecutePerJvm() {
599 return getInt(JobContext.JVM_NUMTASKS_TORUN, 1);
600 }
601
602 /**
603 * Get the {@link InputFormat} implementation for the map-reduce job,
604 * defaults to {@link TextInputFormat} if not specified explicity.
605 *
606 * @return the {@link InputFormat} implementation for the map-reduce job.
607 */
608 public InputFormat getInputFormat() {
609 return ReflectionUtils.newInstance(getClass("mapred.input.format.class",
610 TextInputFormat.class,
611 InputFormat.class),
612 this);
613 }
614
615 /**
616 * Set the {@link InputFormat} implementation for the map-reduce job.
617 *
618 * @param theClass the {@link InputFormat} implementation for the map-reduce
619 * job.
620 */
621 public void setInputFormat(Class<? extends InputFormat> theClass) {
622 setClass("mapred.input.format.class", theClass, InputFormat.class);
623 }
624
625 /**
626 * Get the {@link OutputFormat} implementation for the map-reduce job,
627 * defaults to {@link TextOutputFormat} if not specified explicity.
628 *
629 * @return the {@link OutputFormat} implementation for the map-reduce job.
630 */
631 public OutputFormat getOutputFormat() {
632 return ReflectionUtils.newInstance(getClass("mapred.output.format.class",
633 TextOutputFormat.class,
634 OutputFormat.class),
635 this);
636 }
637
638 /**
639 * Get the {@link OutputCommitter} implementation for the map-reduce job,
640 * defaults to {@link FileOutputCommitter} if not specified explicitly.
641 *
642 * @return the {@link OutputCommitter} implementation for the map-reduce job.
643 */
644 public OutputCommitter getOutputCommitter() {
645 return (OutputCommitter)ReflectionUtils.newInstance(
646 getClass("mapred.output.committer.class", FileOutputCommitter.class,
647 OutputCommitter.class), this);
648 }
649
650 /**
651 * Set the {@link OutputCommitter} implementation for the map-reduce job.
652 *
653 * @param theClass the {@link OutputCommitter} implementation for the map-reduce
654 * job.
655 */
656 public void setOutputCommitter(Class<? extends OutputCommitter> theClass) {
657 setClass("mapred.output.committer.class", theClass, OutputCommitter.class);
658 }
659
660 /**
661 * Set the {@link OutputFormat} implementation for the map-reduce job.
662 *
663 * @param theClass the {@link OutputFormat} implementation for the map-reduce
664 * job.
665 */
666 public void setOutputFormat(Class<? extends OutputFormat> theClass) {
667 setClass("mapred.output.format.class", theClass, OutputFormat.class);
668 }
669
670 /**
671 * Should the map outputs be compressed before transfer?
672 * Uses the SequenceFile compression.
673 *
674 * @param compress should the map outputs be compressed?
675 */
676 public void setCompressMapOutput(boolean compress) {
677 setBoolean(JobContext.MAP_OUTPUT_COMPRESS, compress);
678 }
679
680 /**
681 * Are the outputs of the maps be compressed?
682 *
683 * @return <code>true</code> if the outputs of the maps are to be compressed,
684 * <code>false</code> otherwise.
685 */
686 public boolean getCompressMapOutput() {
687 return getBoolean(JobContext.MAP_OUTPUT_COMPRESS, false);
688 }
689
690 /**
691 * Set the given class as the {@link CompressionCodec} for the map outputs.
692 *
693 * @param codecClass the {@link CompressionCodec} class that will compress
694 * the map outputs.
695 */
696 public void
697 setMapOutputCompressorClass(Class<? extends CompressionCodec> codecClass) {
698 setCompressMapOutput(true);
699 setClass(JobContext.MAP_OUTPUT_COMPRESS_CODEC, codecClass,
700 CompressionCodec.class);
701 }
702
703 /**
704 * Get the {@link CompressionCodec} for compressing the map outputs.
705 *
706 * @param defaultValue the {@link CompressionCodec} to return if not set
707 * @return the {@link CompressionCodec} class that should be used to compress the
708 * map outputs.
709 * @throws IllegalArgumentException if the class was specified, but not found
710 */
711 public Class<? extends CompressionCodec>
712 getMapOutputCompressorClass(Class<? extends CompressionCodec> defaultValue) {
713 Class<? extends CompressionCodec> codecClass = defaultValue;
714 String name = get(JobContext.MAP_OUTPUT_COMPRESS_CODEC);
715 if (name != null) {
716 try {
717 codecClass = getClassByName(name).asSubclass(CompressionCodec.class);
718 } catch (ClassNotFoundException e) {
719 throw new IllegalArgumentException("Compression codec " + name +
720 " was not found.", e);
721 }
722 }
723 return codecClass;
724 }
725
726 /**
727 * Get the key class for the map output data. If it is not set, use the
728 * (final) output key class. This allows the map output key class to be
729 * different than the final output key class.
730 *
731 * @return the map output key class.
732 */
733 public Class<?> getMapOutputKeyClass() {
734 Class<?> retv = getClass(JobContext.MAP_OUTPUT_KEY_CLASS, null, Object.class);
735 if (retv == null) {
736 retv = getOutputKeyClass();
737 }
738 return retv;
739 }
740
741 /**
742 * Set the key class for the map output data. This allows the user to
743 * specify the map output key class to be different than the final output
744 * value class.
745 *
746 * @param theClass the map output key class.
747 */
748 public void setMapOutputKeyClass(Class<?> theClass) {
749 setClass(JobContext.MAP_OUTPUT_KEY_CLASS, theClass, Object.class);
750 }
751
752 /**
753 * Get the value class for the map output data. If it is not set, use the
754 * (final) output value class This allows the map output value class to be
755 * different than the final output value class.
756 *
757 * @return the map output value class.
758 */
759 public Class<?> getMapOutputValueClass() {
760 Class<?> retv = getClass(JobContext.MAP_OUTPUT_VALUE_CLASS, null,
761 Object.class);
762 if (retv == null) {
763 retv = getOutputValueClass();
764 }
765 return retv;
766 }
767
768 /**
769 * Set the value class for the map output data. This allows the user to
770 * specify the map output value class to be different than the final output
771 * value class.
772 *
773 * @param theClass the map output value class.
774 */
775 public void setMapOutputValueClass(Class<?> theClass) {
776 setClass(JobContext.MAP_OUTPUT_VALUE_CLASS, theClass, Object.class);
777 }
778
779 /**
780 * Get the key class for the job output data.
781 *
782 * @return the key class for the job output data.
783 */
784 public Class<?> getOutputKeyClass() {
785 return getClass(JobContext.OUTPUT_KEY_CLASS,
786 LongWritable.class, Object.class);
787 }
788
789 /**
790 * Set the key class for the job output data.
791 *
792 * @param theClass the key class for the job output data.
793 */
794 public void setOutputKeyClass(Class<?> theClass) {
795 setClass(JobContext.OUTPUT_KEY_CLASS, theClass, Object.class);
796 }
797
798 /**
799 * Get the {@link RawComparator} comparator used to compare keys.
800 *
801 * @return the {@link RawComparator} comparator used to compare keys.
802 */
803 public RawComparator getOutputKeyComparator() {
804 Class<? extends RawComparator> theClass = getClass(
805 JobContext.KEY_COMPARATOR, null, RawComparator.class);
806 if (theClass != null)
807 return ReflectionUtils.newInstance(theClass, this);
808 return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class));
809 }
810
811 /**
812 * Set the {@link RawComparator} comparator used to compare keys.
813 *
814 * @param theClass the {@link RawComparator} comparator used to
815 * compare keys.
816 * @see #setOutputValueGroupingComparator(Class)
817 */
818 public void setOutputKeyComparatorClass(Class<? extends RawComparator> theClass) {
819 setClass(JobContext.KEY_COMPARATOR,
820 theClass, RawComparator.class);
821 }
822
823 /**
824 * Set the {@link KeyFieldBasedComparator} options used to compare keys.
825 *
826 * @param keySpec the key specification of the form -k pos1[,pos2], where,
827 * pos is of the form f[.c][opts], where f is the number
828 * of the key field to use, and c is the number of the first character from
829 * the beginning of the field. Fields and character posns are numbered
830 * starting with 1; a character position of zero in pos2 indicates the
831 * field's last character. If '.c' is omitted from pos1, it defaults to 1
832 * (the beginning of the field); if omitted from pos2, it defaults to 0
833 * (the end of the field). opts are ordering options. The supported options
834 * are:
835 * -n, (Sort numerically)
836 * -r, (Reverse the result of comparison)
837 */
838 public void setKeyFieldComparatorOptions(String keySpec) {
839 setOutputKeyComparatorClass(KeyFieldBasedComparator.class);
840 set(KeyFieldBasedComparator.COMPARATOR_OPTIONS, keySpec);
841 }
842
843 /**
844 * Get the {@link KeyFieldBasedComparator} options
845 */
846 public String getKeyFieldComparatorOption() {
847 return get(KeyFieldBasedComparator.COMPARATOR_OPTIONS);
848 }
849
850 /**
851 * Set the {@link KeyFieldBasedPartitioner} options used for
852 * {@link Partitioner}
853 *
854 * @param keySpec the key specification of the form -k pos1[,pos2], where,
855 * pos is of the form f[.c][opts], where f is the number
856 * of the key field to use, and c is the number of the first character from
857 * the beginning of the field. Fields and character posns are numbered
858 * starting with 1; a character position of zero in pos2 indicates the
859 * field's last character. If '.c' is omitted from pos1, it defaults to 1
860 * (the beginning of the field); if omitted from pos2, it defaults to 0
861 * (the end of the field).
862 */
863 public void setKeyFieldPartitionerOptions(String keySpec) {
864 setPartitionerClass(KeyFieldBasedPartitioner.class);
865 set(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS, keySpec);
866 }
867
868 /**
869 * Get the {@link KeyFieldBasedPartitioner} options
870 */
871 public String getKeyFieldPartitionerOption() {
872 return get(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS);
873 }
874
875 /**
876 * Get the user defined {@link WritableComparable} comparator for
877 * grouping keys of inputs to the reduce.
878 *
879 * @return comparator set by the user for grouping values.
880 * @see #setOutputValueGroupingComparator(Class) for details.
881 */
882 public RawComparator getOutputValueGroupingComparator() {
883 Class<? extends RawComparator> theClass = getClass(
884 JobContext.GROUP_COMPARATOR_CLASS, null, RawComparator.class);
885 if (theClass == null) {
886 return getOutputKeyComparator();
887 }
888
889 return ReflectionUtils.newInstance(theClass, this);
890 }
891
892 /**
893 * Set the user defined {@link RawComparator} comparator for
894 * grouping keys in the input to the reduce.
895 *
896 * <p>This comparator should be provided if the equivalence rules for keys
897 * for sorting the intermediates are different from those for grouping keys
898 * before each call to
899 * {@link Reducer#reduce(Object, java.util.Iterator, OutputCollector, Reporter)}.</p>
900 *
901 * <p>For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed
902 * in a single call to the reduce function if K1 and K2 compare as equal.</p>
903 *
904 * <p>Since {@link #setOutputKeyComparatorClass(Class)} can be used to control
905 * how keys are sorted, this can be used in conjunction to simulate
906 * <i>secondary sort on values</i>.</p>
907 *
908 * <p><i>Note</i>: This is not a guarantee of the reduce sort being
909 * <i>stable</i> in any sense. (In any case, with the order of available
910 * map-outputs to the reduce being non-deterministic, it wouldn't make
911 * that much sense.)</p>
912 *
913 * @param theClass the comparator class to be used for grouping keys.
914 * It should implement <code>RawComparator</code>.
915 * @see #setOutputKeyComparatorClass(Class)
916 */
917 public void setOutputValueGroupingComparator(
918 Class<? extends RawComparator> theClass) {
919 setClass(JobContext.GROUP_COMPARATOR_CLASS,
920 theClass, RawComparator.class);
921 }
922
923 /**
924 * Should the framework use the new context-object code for running
925 * the mapper?
926 * @return true, if the new api should be used
927 */
928 public boolean getUseNewMapper() {
929 return getBoolean("mapred.mapper.new-api", false);
930 }
931 /**
932 * Set whether the framework should use the new api for the mapper.
933 * This is the default for jobs submitted with the new Job api.
934 * @param flag true, if the new api should be used
935 */
936 public void setUseNewMapper(boolean flag) {
937 setBoolean("mapred.mapper.new-api", flag);
938 }
939
940 /**
941 * Should the framework use the new context-object code for running
942 * the reducer?
943 * @return true, if the new api should be used
944 */
945 public boolean getUseNewReducer() {
946 return getBoolean("mapred.reducer.new-api", false);
947 }
948 /**
949 * Set whether the framework should use the new api for the reducer.
950 * This is the default for jobs submitted with the new Job api.
951 * @param flag true, if the new api should be used
952 */
953 public void setUseNewReducer(boolean flag) {
954 setBoolean("mapred.reducer.new-api", flag);
955 }
956
957 /**
958 * Get the value class for job outputs.
959 *
960 * @return the value class for job outputs.
961 */
962 public Class<?> getOutputValueClass() {
963 return getClass(JobContext.OUTPUT_VALUE_CLASS, Text.class, Object.class);
964 }
965
966 /**
967 * Set the value class for job outputs.
968 *
969 * @param theClass the value class for job outputs.
970 */
971 public void setOutputValueClass(Class<?> theClass) {
972 setClass(JobContext.OUTPUT_VALUE_CLASS, theClass, Object.class);
973 }
974
975 /**
976 * Get the {@link Mapper} class for the job.
977 *
978 * @return the {@link Mapper} class for the job.
979 */
980 public Class<? extends Mapper> getMapperClass() {
981 return getClass("mapred.mapper.class", IdentityMapper.class, Mapper.class);
982 }
983
984 /**
985 * Set the {@link Mapper} class for the job.
986 *
987 * @param theClass the {@link Mapper} class for the job.
988 */
989 public void setMapperClass(Class<? extends Mapper> theClass) {
990 setClass("mapred.mapper.class", theClass, Mapper.class);
991 }
992
993 /**
994 * Get the {@link MapRunnable} class for the job.
995 *
996 * @return the {@link MapRunnable} class for the job.
997 */
998 public Class<? extends MapRunnable> getMapRunnerClass() {
999 return getClass("mapred.map.runner.class",
1000 MapRunner.class, MapRunnable.class);
1001 }
1002
1003 /**
1004 * Expert: Set the {@link MapRunnable} class for the job.
1005 *
1006 * Typically used to exert greater control on {@link Mapper}s.
1007 *
1008 * @param theClass the {@link MapRunnable} class for the job.
1009 */
1010 public void setMapRunnerClass(Class<? extends MapRunnable> theClass) {
1011 setClass("mapred.map.runner.class", theClass, MapRunnable.class);
1012 }
1013
1014 /**
1015 * Get the {@link Partitioner} used to partition {@link Mapper}-outputs
1016 * to be sent to the {@link Reducer}s.
1017 *
1018 * @return the {@link Partitioner} used to partition map-outputs.
1019 */
1020 public Class<? extends Partitioner> getPartitionerClass() {
1021 return getClass("mapred.partitioner.class",
1022 HashPartitioner.class, Partitioner.class);
1023 }
1024
1025 /**
1026 * Set the {@link Partitioner} class used to partition
1027 * {@link Mapper}-outputs to be sent to the {@link Reducer}s.
1028 *
1029 * @param theClass the {@link Partitioner} used to partition map-outputs.
1030 */
1031 public void setPartitionerClass(Class<? extends Partitioner> theClass) {
1032 setClass("mapred.partitioner.class", theClass, Partitioner.class);
1033 }
1034
1035 /**
1036 * Get the {@link Reducer} class for the job.
1037 *
1038 * @return the {@link Reducer} class for the job.
1039 */
1040 public Class<? extends Reducer> getReducerClass() {
1041 return getClass("mapred.reducer.class",
1042 IdentityReducer.class, Reducer.class);
1043 }
1044
1045 /**
1046 * Set the {@link Reducer} class for the job.
1047 *
1048 * @param theClass the {@link Reducer} class for the job.
1049 */
1050 public void setReducerClass(Class<? extends Reducer> theClass) {
1051 setClass("mapred.reducer.class", theClass, Reducer.class);
1052 }
1053
1054 /**
1055 * Get the user-defined <i>combiner</i> class used to combine map-outputs
1056 * before being sent to the reducers. Typically the combiner is same as the
1057 * the {@link Reducer} for the job i.e. {@link #getReducerClass()}.
1058 *
1059 * @return the user-defined combiner class used to combine map-outputs.
1060 */
1061 public Class<? extends Reducer> getCombinerClass() {
1062 return getClass("mapred.combiner.class", null, Reducer.class);
1063 }
1064
1065 /**
1066 * Set the user-defined <i>combiner</i> class used to combine map-outputs
1067 * before being sent to the reducers.
1068 *
1069 * <p>The combiner is an application-specified aggregation operation, which
1070 * can help cut down the amount of data transferred between the
1071 * {@link Mapper} and the {@link Reducer}, leading to better performance.</p>
1072 *
1073 * <p>The framework may invoke the combiner 0, 1, or multiple times, in both
1074 * the mapper and reducer tasks. In general, the combiner is called as the
1075 * sort/merge result is written to disk. The combiner must:
1076 * <ul>
1077 * <li> be side-effect free</li>
1078 * <li> have the same input and output key types and the same input and
1079 * output value types</li>
1080 * </ul></p>
1081 *
1082 * <p>Typically the combiner is same as the <code>Reducer</code> for the
1083 * job i.e. {@link #setReducerClass(Class)}.</p>
1084 *
1085 * @param theClass the user-defined combiner class used to combine
1086 * map-outputs.
1087 */
1088 public void setCombinerClass(Class<? extends Reducer> theClass) {
1089 setClass("mapred.combiner.class", theClass, Reducer.class);
1090 }
1091
1092 /**
1093 * Should speculative execution be used for this job?
1094 * Defaults to <code>true</code>.
1095 *
1096 * @return <code>true</code> if speculative execution be used for this job,
1097 * <code>false</code> otherwise.
1098 */
1099 public boolean getSpeculativeExecution() {
1100 return (getMapSpeculativeExecution() || getReduceSpeculativeExecution());
1101 }
1102
1103 /**
1104 * Turn speculative execution on or off for this job.
1105 *
1106 * @param speculativeExecution <code>true</code> if speculative execution
1107 * should be turned on, else <code>false</code>.
1108 */
1109 public void setSpeculativeExecution(boolean speculativeExecution) {
1110 setMapSpeculativeExecution(speculativeExecution);
1111 setReduceSpeculativeExecution(speculativeExecution);
1112 }
1113
1114 /**
1115 * Should speculative execution be used for this job for map tasks?
1116 * Defaults to <code>true</code>.
1117 *
1118 * @return <code>true</code> if speculative execution be
1119 * used for this job for map tasks,
1120 * <code>false</code> otherwise.
1121 */
1122 public boolean getMapSpeculativeExecution() {
1123 return getBoolean(JobContext.MAP_SPECULATIVE, true);
1124 }
1125
1126 /**
1127 * Turn speculative execution on or off for this job for map tasks.
1128 *
1129 * @param speculativeExecution <code>true</code> if speculative execution
1130 * should be turned on for map tasks,
1131 * else <code>false</code>.
1132 */
1133 public void setMapSpeculativeExecution(boolean speculativeExecution) {
1134 setBoolean(JobContext.MAP_SPECULATIVE, speculativeExecution);
1135 }
1136
1137 /**
1138 * Should speculative execution be used for this job for reduce tasks?
1139 * Defaults to <code>true</code>.
1140 *
1141 * @return <code>true</code> if speculative execution be used
1142 * for reduce tasks for this job,
1143 * <code>false</code> otherwise.
1144 */
1145 public boolean getReduceSpeculativeExecution() {
1146 return getBoolean(JobContext.REDUCE_SPECULATIVE, true);
1147 }
1148
1149 /**
1150 * Turn speculative execution on or off for this job for reduce tasks.
1151 *
1152 * @param speculativeExecution <code>true</code> if speculative execution
1153 * should be turned on for reduce tasks,
1154 * else <code>false</code>.
1155 */
1156 public void setReduceSpeculativeExecution(boolean speculativeExecution) {
1157 setBoolean(JobContext.REDUCE_SPECULATIVE,
1158 speculativeExecution);
1159 }
1160
1161 /**
1162 * Get configured the number of reduce tasks for this job.
1163 * Defaults to <code>1</code>.
1164 *
1165 * @return the number of reduce tasks for this job.
1166 */
1167 public int getNumMapTasks() { return getInt(JobContext.NUM_MAPS, 1); }
1168
1169 /**
1170 * Set the number of map tasks for this job.
1171 *
1172 * <p><i>Note</i>: This is only a <i>hint</i> to the framework. The actual
1173 * number of spawned map tasks depends on the number of {@link InputSplit}s
1174 * generated by the job's {@link InputFormat#getSplits(JobConf, int)}.
1175 *
1176 * A custom {@link InputFormat} is typically used to accurately control
1177 * the number of map tasks for the job.</p>
1178 *
1179 * <h4 id="NoOfMaps">How many maps?</h4>
1180 *
1181 * <p>The number of maps is usually driven by the total size of the inputs
1182 * i.e. total number of blocks of the input files.</p>
1183 *
1184 * <p>The right level of parallelism for maps seems to be around 10-100 maps
1185 * per-node, although it has been set up to 300 or so for very cpu-light map
1186 * tasks. Task setup takes awhile, so it is best if the maps take at least a
1187 * minute to execute.</p>
1188 *
1189 * <p>The default behavior of file-based {@link InputFormat}s is to split the
1190 * input into <i>logical</i> {@link InputSplit}s based on the total size, in
1191 * bytes, of input files. However, the {@link FileSystem} blocksize of the
1192 * input files is treated as an upper bound for input splits. A lower bound
1193 * on the split size can be set via
1194 * <a href="{@docRoot}/../mapred-default.html#mapreduce.input.fileinputformat.split.minsize">
1195 * mapreduce.input.fileinputformat.split.minsize</a>.</p>
1196 *
1197 * <p>Thus, if you expect 10TB of input data and have a blocksize of 128MB,
1198 * you'll end up with 82,000 maps, unless {@link #setNumMapTasks(int)} is
1199 * used to set it even higher.</p>
1200 *
1201 * @param n the number of map tasks for this job.
1202 * @see InputFormat#getSplits(JobConf, int)
1203 * @see FileInputFormat
1204 * @see FileSystem#getDefaultBlockSize()
1205 * @see FileStatus#getBlockSize()
1206 */
1207 public void setNumMapTasks(int n) { setInt(JobContext.NUM_MAPS, n); }
1208
1209 /**
1210 * Get configured the number of reduce tasks for this job. Defaults to
1211 * <code>1</code>.
1212 *
1213 * @return the number of reduce tasks for this job.
1214 */
1215 public int getNumReduceTasks() { return getInt(JobContext.NUM_REDUCES, 1); }
1216
1217 /**
1218 * Set the requisite number of reduce tasks for this job.
1219 *
1220 * <h4 id="NoOfReduces">How many reduces?</h4>
1221 *
1222 * <p>The right number of reduces seems to be <code>0.95</code> or
1223 * <code>1.75</code> multiplied by (<<i>no. of nodes</i>> *
1224 * <a href="{@docRoot}/../mapred-default.html#mapreduce.tasktracker.reduce.tasks.maximum">
1225 * mapreduce.tasktracker.reduce.tasks.maximum</a>).
1226 * </p>
1227 *
1228 * <p>With <code>0.95</code> all of the reduces can launch immediately and
1229 * start transfering map outputs as the maps finish. With <code>1.75</code>
1230 * the faster nodes will finish their first round of reduces and launch a
1231 * second wave of reduces doing a much better job of load balancing.</p>
1232 *
1233 * <p>Increasing the number of reduces increases the framework overhead, but
1234 * increases load balancing and lowers the cost of failures.</p>
1235 *
1236 * <p>The scaling factors above are slightly less than whole numbers to
1237 * reserve a few reduce slots in the framework for speculative-tasks, failures
1238 * etc.</p>
1239 *
1240 * <h4 id="ReducerNone">Reducer NONE</h4>
1241 *
1242 * <p>It is legal to set the number of reduce-tasks to <code>zero</code>.</p>
1243 *
1244 * <p>In this case the output of the map-tasks directly go to distributed
1245 * file-system, to the path set by
1246 * {@link FileOutputFormat#setOutputPath(JobConf, Path)}. Also, the
1247 * framework doesn't sort the map-outputs before writing it out to HDFS.</p>
1248 *
1249 * @param n the number of reduce tasks for this job.
1250 */
1251 public void setNumReduceTasks(int n) { setInt(JobContext.NUM_REDUCES, n); }
1252
1253 /**
1254 * Get the configured number of maximum attempts that will be made to run a
1255 * map task, as specified by the <code>mapreduce.map.maxattempts</code>
1256 * property. If this property is not already set, the default is 4 attempts.
1257 *
1258 * @return the max number of attempts per map task.
1259 */
1260 public int getMaxMapAttempts() {
1261 return getInt(JobContext.MAP_MAX_ATTEMPTS, 4);
1262 }
1263
1264 /**
1265 * Expert: Set the number of maximum attempts that will be made to run a
1266 * map task.
1267 *
1268 * @param n the number of attempts per map task.
1269 */
1270 public void setMaxMapAttempts(int n) {
1271 setInt(JobContext.MAP_MAX_ATTEMPTS, n);
1272 }
1273
1274 /**
1275 * Get the configured number of maximum attempts that will be made to run a
1276 * reduce task, as specified by the <code>mapreduce.reduce.maxattempts</code>
1277 * property. If this property is not already set, the default is 4 attempts.
1278 *
1279 * @return the max number of attempts per reduce task.
1280 */
1281 public int getMaxReduceAttempts() {
1282 return getInt(JobContext.REDUCE_MAX_ATTEMPTS, 4);
1283 }
1284 /**
1285 * Expert: Set the number of maximum attempts that will be made to run a
1286 * reduce task.
1287 *
1288 * @param n the number of attempts per reduce task.
1289 */
1290 public void setMaxReduceAttempts(int n) {
1291 setInt(JobContext.REDUCE_MAX_ATTEMPTS, n);
1292 }
1293
1294 /**
1295 * Get the user-specified job name. This is only used to identify the
1296 * job to the user.
1297 *
1298 * @return the job's name, defaulting to "".
1299 */
1300 public String getJobName() {
1301 return get(JobContext.JOB_NAME, "");
1302 }
1303
1304 /**
1305 * Set the user-specified job name.
1306 *
1307 * @param name the job's new name.
1308 */
1309 public void setJobName(String name) {
1310 set(JobContext.JOB_NAME, name);
1311 }
1312
1313 /**
1314 * Get the user-specified session identifier. The default is the empty string.
1315 *
1316 * The session identifier is used to tag metric data that is reported to some
1317 * performance metrics system via the org.apache.hadoop.metrics API. The
1318 * session identifier is intended, in particular, for use by Hadoop-On-Demand
1319 * (HOD) which allocates a virtual Hadoop cluster dynamically and transiently.
1320 * HOD will set the session identifier by modifying the mapred-site.xml file
1321 * before starting the cluster.
1322 *
1323 * When not running under HOD, this identifer is expected to remain set to
1324 * the empty string.
1325 *
1326 * @return the session identifier, defaulting to "".
1327 */
1328 @Deprecated
1329 public String getSessionId() {
1330 return get("session.id", "");
1331 }
1332
1333 /**
1334 * Set the user-specified session identifier.
1335 *
1336 * @param sessionId the new session id.
1337 */
1338 @Deprecated
1339 public void setSessionId(String sessionId) {
1340 set("session.id", sessionId);
1341 }
1342
1343 /**
1344 * Set the maximum no. of failures of a given job per tasktracker.
1345 * If the no. of task failures exceeds <code>noFailures</code>, the
1346 * tasktracker is <i>blacklisted</i> for this job.
1347 *
1348 * @param noFailures maximum no. of failures of a given job per tasktracker.
1349 */
1350 public void setMaxTaskFailuresPerTracker(int noFailures) {
1351 setInt(JobContext.MAX_TASK_FAILURES_PER_TRACKER, noFailures);
1352 }
1353
1354 /**
1355 * Expert: Get the maximum no. of failures of a given job per tasktracker.
1356 * If the no. of task failures exceeds this, the tasktracker is
1357 * <i>blacklisted</i> for this job.
1358 *
1359 * @return the maximum no. of failures of a given job per tasktracker.
1360 */
1361 public int getMaxTaskFailuresPerTracker() {
1362 return getInt(JobContext.MAX_TASK_FAILURES_PER_TRACKER, 3);
1363 }
1364
1365 /**
1366 * Get the maximum percentage of map tasks that can fail without
1367 * the job being aborted.
1368 *
1369 * Each map task is executed a minimum of {@link #getMaxMapAttempts()}
1370 * attempts before being declared as <i>failed</i>.
1371 *
1372 * Defaults to <code>zero</code>, i.e. <i>any</i> failed map-task results in
1373 * the job being declared as {@link JobStatus#FAILED}.
1374 *
1375 * @return the maximum percentage of map tasks that can fail without
1376 * the job being aborted.
1377 */
1378 public int getMaxMapTaskFailuresPercent() {
1379 return getInt(JobContext.MAP_FAILURES_MAX_PERCENT, 0);
1380 }
1381
1382 /**
1383 * Expert: Set the maximum percentage of map tasks that can fail without the
1384 * job being aborted.
1385 *
1386 * Each map task is executed a minimum of {@link #getMaxMapAttempts} attempts
1387 * before being declared as <i>failed</i>.
1388 *
1389 * @param percent the maximum percentage of map tasks that can fail without
1390 * the job being aborted.
1391 */
1392 public void setMaxMapTaskFailuresPercent(int percent) {
1393 setInt(JobContext.MAP_FAILURES_MAX_PERCENT, percent);
1394 }
1395
1396 /**
1397 * Get the maximum percentage of reduce tasks that can fail without
1398 * the job being aborted.
1399 *
1400 * Each reduce task is executed a minimum of {@link #getMaxReduceAttempts()}
1401 * attempts before being declared as <i>failed</i>.
1402 *
1403 * Defaults to <code>zero</code>, i.e. <i>any</i> failed reduce-task results
1404 * in the job being declared as {@link JobStatus#FAILED}.
1405 *
1406 * @return the maximum percentage of reduce tasks that can fail without
1407 * the job being aborted.
1408 */
1409 public int getMaxReduceTaskFailuresPercent() {
1410 return getInt(JobContext.REDUCE_FAILURES_MAXPERCENT, 0);
1411 }
1412
1413 /**
1414 * Set the maximum percentage of reduce tasks that can fail without the job
1415 * being aborted.
1416 *
1417 * Each reduce task is executed a minimum of {@link #getMaxReduceAttempts()}
1418 * attempts before being declared as <i>failed</i>.
1419 *
1420 * @param percent the maximum percentage of reduce tasks that can fail without
1421 * the job being aborted.
1422 */
1423 public void setMaxReduceTaskFailuresPercent(int percent) {
1424 setInt(JobContext.REDUCE_FAILURES_MAXPERCENT, percent);
1425 }
1426
1427 /**
1428 * Set {@link JobPriority} for this job.
1429 *
1430 * @param prio the {@link JobPriority} for this job.
1431 */
1432 public void setJobPriority(JobPriority prio) {
1433 set(JobContext.PRIORITY, prio.toString());
1434 }
1435
1436 /**
1437 * Get the {@link JobPriority} for this job.
1438 *
1439 * @return the {@link JobPriority} for this job.
1440 */
1441 public JobPriority getJobPriority() {
1442 String prio = get(JobContext.PRIORITY);
1443 if(prio == null) {
1444 return JobPriority.NORMAL;
1445 }
1446
1447 return JobPriority.valueOf(prio);
1448 }
1449
1450 /**
1451 * Set JobSubmitHostName for this job.
1452 *
1453 * @param hostname the JobSubmitHostName for this job.
1454 */
1455 void setJobSubmitHostName(String hostname) {
1456 set(MRJobConfig.JOB_SUBMITHOST, hostname);
1457 }
1458
1459 /**
1460 * Get the JobSubmitHostName for this job.
1461 *
1462 * @return the JobSubmitHostName for this job.
1463 */
1464 String getJobSubmitHostName() {
1465 String hostname = get(MRJobConfig.JOB_SUBMITHOST);
1466
1467 return hostname;
1468 }
1469
1470 /**
1471 * Set JobSubmitHostAddress for this job.
1472 *
1473 * @param hostadd the JobSubmitHostAddress for this job.
1474 */
1475 void setJobSubmitHostAddress(String hostadd) {
1476 set(MRJobConfig.JOB_SUBMITHOSTADDR, hostadd);
1477 }
1478
1479 /**
1480 * Get JobSubmitHostAddress for this job.
1481 *
1482 * @return JobSubmitHostAddress for this job.
1483 */
1484 String getJobSubmitHostAddress() {
1485 String hostadd = get(MRJobConfig.JOB_SUBMITHOSTADDR);
1486
1487 return hostadd;
1488 }
1489
1490 /**
1491 * Get whether the task profiling is enabled.
1492 * @return true if some tasks will be profiled
1493 */
1494 public boolean getProfileEnabled() {
1495 return getBoolean(JobContext.TASK_PROFILE, false);
1496 }
1497
1498 /**
1499 * Set whether the system should collect profiler information for some of
1500 * the tasks in this job? The information is stored in the user log
1501 * directory.
1502 * @param newValue true means it should be gathered
1503 */
1504 public void setProfileEnabled(boolean newValue) {
1505 setBoolean(JobContext.TASK_PROFILE, newValue);
1506 }
1507
1508 /**
1509 * Get the profiler configuration arguments.
1510 *
1511 * The default value for this property is
1512 * "-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s"
1513 *
1514 * @return the parameters to pass to the task child to configure profiling
1515 */
1516 public String getProfileParams() {
1517 return get(JobContext.TASK_PROFILE_PARAMS,
1518 "-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y," +
1519 "verbose=n,file=%s");
1520 }
1521
1522 /**
1523 * Set the profiler configuration arguments. If the string contains a '%s' it
1524 * will be replaced with the name of the profiling output file when the task
1525 * runs.
1526 *
1527 * This value is passed to the task child JVM on the command line.
1528 *
1529 * @param value the configuration string
1530 */
1531 public void setProfileParams(String value) {
1532 set(JobContext.TASK_PROFILE_PARAMS, value);
1533 }
1534
1535 /**
1536 * Get the range of maps or reduces to profile.
1537 * @param isMap is the task a map?
1538 * @return the task ranges
1539 */
1540 public IntegerRanges getProfileTaskRange(boolean isMap) {
1541 return getRange((isMap ? JobContext.NUM_MAP_PROFILES :
1542 JobContext.NUM_REDUCE_PROFILES), "0-2");
1543 }
1544
1545 /**
1546 * Set the ranges of maps or reduces to profile. setProfileEnabled(true)
1547 * must also be called.
1548 * @param newValue a set of integer ranges of the map ids
1549 */
1550 public void setProfileTaskRange(boolean isMap, String newValue) {
1551 // parse the value to make sure it is legal
1552 new Configuration.IntegerRanges(newValue);
1553 set((isMap ? JobContext.NUM_MAP_PROFILES : JobContext.NUM_REDUCE_PROFILES),
1554 newValue);
1555 }
1556
1557 /**
1558 * Set the debug script to run when the map tasks fail.
1559 *
1560 * <p>The debug script can aid debugging of failed map tasks. The script is
1561 * given task's stdout, stderr, syslog, jobconf files as arguments.</p>
1562 *
1563 * <p>The debug command, run on the node where the map failed, is:</p>
1564 * <p><pre><blockquote>
1565 * $script $stdout $stderr $syslog $jobconf.
1566 * </blockquote></pre></p>
1567 *
1568 * <p> The script file is distributed through {@link DistributedCache}
1569 * APIs. The script needs to be symlinked. </p>
1570 *
1571 * <p>Here is an example on how to submit a script
1572 * <p><blockquote><pre>
1573 * job.setMapDebugScript("./myscript");
1574 * DistributedCache.createSymlink(job);
1575 * DistributedCache.addCacheFile("/debug/scripts/myscript#myscript");
1576 * </pre></blockquote></p>
1577 *
1578 * @param mDbgScript the script name
1579 */
1580 public void setMapDebugScript(String mDbgScript) {
1581 set(JobContext.MAP_DEBUG_SCRIPT, mDbgScript);
1582 }
1583
1584 /**
1585 * Get the map task's debug script.
1586 *
1587 * @return the debug Script for the mapred job for failed map tasks.
1588 * @see #setMapDebugScript(String)
1589 */
1590 public String getMapDebugScript() {
1591 return get(JobContext.MAP_DEBUG_SCRIPT);
1592 }
1593
1594 /**
1595 * Set the debug script to run when the reduce tasks fail.
1596 *
1597 * <p>The debug script can aid debugging of failed reduce tasks. The script
1598 * is given task's stdout, stderr, syslog, jobconf files as arguments.</p>
1599 *
1600 * <p>The debug command, run on the node where the map failed, is:</p>
1601 * <p><pre><blockquote>
1602 * $script $stdout $stderr $syslog $jobconf.
1603 * </blockquote></pre></p>
1604 *
1605 * <p> The script file is distributed through {@link DistributedCache}
1606 * APIs. The script file needs to be symlinked </p>
1607 *
1608 * <p>Here is an example on how to submit a script
1609 * <p><blockquote><pre>
1610 * job.setReduceDebugScript("./myscript");
1611 * DistributedCache.createSymlink(job);
1612 * DistributedCache.addCacheFile("/debug/scripts/myscript#myscript");
1613 * </pre></blockquote></p>
1614 *
1615 * @param rDbgScript the script name
1616 */
1617 public void setReduceDebugScript(String rDbgScript) {
1618 set(JobContext.REDUCE_DEBUG_SCRIPT, rDbgScript);
1619 }
1620
1621 /**
1622 * Get the reduce task's debug Script
1623 *
1624 * @return the debug script for the mapred job for failed reduce tasks.
1625 * @see #setReduceDebugScript(String)
1626 */
1627 public String getReduceDebugScript() {
1628 return get(JobContext.REDUCE_DEBUG_SCRIPT);
1629 }
1630
1631 /**
1632 * Get the uri to be invoked in-order to send a notification after the job
1633 * has completed (success/failure).
1634 *
1635 * @return the job end notification uri, <code>null</code> if it hasn't
1636 * been set.
1637 * @see #setJobEndNotificationURI(String)
1638 */
1639 public String getJobEndNotificationURI() {
1640 return get(JobContext.MR_JOB_END_NOTIFICATION_URL);
1641 }
1642
1643 /**
1644 * Set the uri to be invoked in-order to send a notification after the job
1645 * has completed (success/failure).
1646 *
1647 * <p>The uri can contain 2 special parameters: <tt>$jobId</tt> and
1648 * <tt>$jobStatus</tt>. Those, if present, are replaced by the job's
1649 * identifier and completion-status respectively.</p>
1650 *
1651 * <p>This is typically used by application-writers to implement chaining of
1652 * Map-Reduce jobs in an <i>asynchronous manner</i>.</p>
1653 *
1654 * @param uri the job end notification uri
1655 * @see JobStatus
1656 * @see <a href="{@docRoot}/org/apache/hadoop/mapred/JobClient.html#
1657 * JobCompletionAndChaining">Job Completion and Chaining</a>
1658 */
1659 public void setJobEndNotificationURI(String uri) {
1660 set(JobContext.MR_JOB_END_NOTIFICATION_URL, uri);
1661 }
1662
1663 /**
1664 * Get job-specific shared directory for use as scratch space
1665 *
1666 * <p>
1667 * When a job starts, a shared directory is created at location
1668 * <code>
1669 * ${mapreduce.cluster.local.dir}/taskTracker/$user/jobcache/$jobid/work/ </code>.
1670 * This directory is exposed to the users through
1671 * <code>mapreduce.job.local.dir </code>.
1672 * So, the tasks can use this space
1673 * as scratch space and share files among them. </p>
1674 * This value is available as System property also.
1675 *
1676 * @return The localized job specific shared directory
1677 */
1678 public String getJobLocalDir() {
1679 return get(JobContext.JOB_LOCAL_DIR);
1680 }
1681
1682 /**
1683 * Get memory required to run a map task of the job, in MB.
1684 *
1685 * If a value is specified in the configuration, it is returned.
1686 * Else, it returns {@link #DISABLED_MEMORY_LIMIT}.
1687 * <p/>
1688 * For backward compatibility, if the job configuration sets the
1689 * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different
1690 * from {@link #DISABLED_MEMORY_LIMIT}, that value will be used
1691 * after converting it from bytes to MB.
1692 * @return memory required to run a map task of the job, in MB,
1693 * or {@link #DISABLED_MEMORY_LIMIT} if unset.
1694 */
1695 public long getMemoryForMapTask() {
1696 long value = getDeprecatedMemoryValue();
1697 if (value == DISABLED_MEMORY_LIMIT) {
1698 value = normalizeMemoryConfigValue(
1699 getLong(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY,
1700 DISABLED_MEMORY_LIMIT));
1701 }
1702 return value;
1703 }
1704
1705 public void setMemoryForMapTask(long mem) {
1706 setLong(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY, mem);
1707 }
1708
1709 /**
1710 * Get memory required to run a reduce task of the job, in MB.
1711 *
1712 * If a value is specified in the configuration, it is returned.
1713 * Else, it returns {@link #DISABLED_MEMORY_LIMIT}.
1714 * <p/>
1715 * For backward compatibility, if the job configuration sets the
1716 * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different
1717 * from {@link #DISABLED_MEMORY_LIMIT}, that value will be used
1718 * after converting it from bytes to MB.
1719 * @return memory required to run a reduce task of the job, in MB,
1720 * or {@link #DISABLED_MEMORY_LIMIT} if unset.
1721 */
1722 public long getMemoryForReduceTask() {
1723 long value = getDeprecatedMemoryValue();
1724 if (value == DISABLED_MEMORY_LIMIT) {
1725 value = normalizeMemoryConfigValue(
1726 getLong(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY,
1727 DISABLED_MEMORY_LIMIT));
1728 }
1729 return value;
1730 }
1731
1732 // Return the value set to the key MAPRED_TASK_MAXVMEM_PROPERTY,
1733 // converted into MBs.
1734 // Returns DISABLED_MEMORY_LIMIT if unset, or set to a negative
1735 // value.
1736 private long getDeprecatedMemoryValue() {
1737 long oldValue = getLong(MAPRED_TASK_MAXVMEM_PROPERTY,
1738 DISABLED_MEMORY_LIMIT);
1739 oldValue = normalizeMemoryConfigValue(oldValue);
1740 if (oldValue != DISABLED_MEMORY_LIMIT) {
1741 oldValue /= (1024*1024);
1742 }
1743 return oldValue;
1744 }
1745
1746 public void setMemoryForReduceTask(long mem) {
1747 setLong(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY, mem);
1748 }
1749
1750 /**
1751 * Return the name of the queue to which this job is submitted.
1752 * Defaults to 'default'.
1753 *
1754 * @return name of the queue
1755 */
1756 public String getQueueName() {
1757 return get(JobContext.QUEUE_NAME, DEFAULT_QUEUE_NAME);
1758 }
1759
1760 /**
1761 * Set the name of the queue to which this job should be submitted.
1762 *
1763 * @param queueName Name of the queue
1764 */
1765 public void setQueueName(String queueName) {
1766 set(JobContext.QUEUE_NAME, queueName);
1767 }
1768
1769 /**
1770 * Normalize the negative values in configuration
1771 *
1772 * @param val
1773 * @return normalized value
1774 */
1775 public static long normalizeMemoryConfigValue(long val) {
1776 if (val < 0) {
1777 val = DISABLED_MEMORY_LIMIT;
1778 }
1779 return val;
1780 }
1781
1782 /**
1783 * Compute the number of slots required to run a single map task-attempt
1784 * of this job.
1785 * @param slotSizePerMap cluster-wide value of the amount of memory required
1786 * to run a map-task
1787 * @return the number of slots required to run a single map task-attempt
1788 * 1 if memory parameters are disabled.
1789 */
1790 int computeNumSlotsPerMap(long slotSizePerMap) {
1791 if ((slotSizePerMap==DISABLED_MEMORY_LIMIT) ||
1792 (getMemoryForMapTask()==DISABLED_MEMORY_LIMIT)) {
1793 return 1;
1794 }
1795 return (int)(Math.ceil((float)getMemoryForMapTask() / (float)slotSizePerMap));
1796 }
1797
1798 /**
1799 * Compute the number of slots required to run a single reduce task-attempt
1800 * of this job.
1801 * @param slotSizePerReduce cluster-wide value of the amount of memory
1802 * required to run a reduce-task
1803 * @return the number of slots required to run a single reduce task-attempt
1804 * 1 if memory parameters are disabled
1805 */
1806 int computeNumSlotsPerReduce(long slotSizePerReduce) {
1807 if ((slotSizePerReduce==DISABLED_MEMORY_LIMIT) ||
1808 (getMemoryForReduceTask()==DISABLED_MEMORY_LIMIT)) {
1809 return 1;
1810 }
1811 return
1812 (int)(Math.ceil((float)getMemoryForReduceTask() / (float)slotSizePerReduce));
1813 }
1814
1815 /**
1816 * Find a jar that contains a class of the same name, if any.
1817 * It will return a jar file, even if that is not the first thing
1818 * on the class path that has a class with the same name.
1819 *
1820 * @param my_class the class to find.
1821 * @return a jar file that contains the class, or null.
1822 * @throws IOException
1823 */
1824 public static String findContainingJar(Class my_class) {
1825 ClassLoader loader = my_class.getClassLoader();
1826 String class_file = my_class.getName().replaceAll("\\.", "/") + ".class";
1827 try {
1828 for(Enumeration itr = loader.getResources(class_file);
1829 itr.hasMoreElements();) {
1830 URL url = (URL) itr.nextElement();
1831 if ("jar".equals(url.getProtocol())) {
1832 String toReturn = url.getPath();
1833 if (toReturn.startsWith("file:")) {
1834 toReturn = toReturn.substring("file:".length());
1835 }
1836 // URLDecoder is a misnamed class, since it actually decodes
1837 // x-www-form-urlencoded MIME type rather than actual
1838 // URL encoding (which the file path has). Therefore it would
1839 // decode +s to ' 's which is incorrect (spaces are actually
1840 // either unencoded or encoded as "%20"). Replace +s first, so
1841 // that they are kept sacred during the decoding process.
1842 toReturn = toReturn.replaceAll("\\+", "%2B");
1843 toReturn = URLDecoder.decode(toReturn, "UTF-8");
1844 return toReturn.replaceAll("!.*$", "");
1845 }
1846 }
1847 } catch (IOException e) {
1848 throw new RuntimeException(e);
1849 }
1850 return null;
1851 }
1852
1853
1854 /**
1855 * Get the memory required to run a task of this job, in bytes. See
1856 * {@link #MAPRED_TASK_MAXVMEM_PROPERTY}
1857 * <p/>
1858 * This method is deprecated. Now, different memory limits can be
1859 * set for map and reduce tasks of a job, in MB.
1860 * <p/>
1861 * For backward compatibility, if the job configuration sets the
1862 * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different
1863 * from {@link #DISABLED_MEMORY_LIMIT}, that value is returned.
1864 * Otherwise, this method will return the larger of the values returned by
1865 * {@link #getMemoryForMapTask()} and {@link #getMemoryForReduceTask()}
1866 * after converting them into bytes.
1867 *
1868 * @return Memory required to run a task of this job, in bytes,
1869 * or {@link #DISABLED_MEMORY_LIMIT}, if unset.
1870 * @see #setMaxVirtualMemoryForTask(long)
1871 * @deprecated Use {@link #getMemoryForMapTask()} and
1872 * {@link #getMemoryForReduceTask()}
1873 */
1874 @Deprecated
1875 public long getMaxVirtualMemoryForTask() {
1876 LOG.warn(
1877 "getMaxVirtualMemoryForTask() is deprecated. " +
1878 "Instead use getMemoryForMapTask() and getMemoryForReduceTask()");
1879
1880 long value = getLong(MAPRED_TASK_MAXVMEM_PROPERTY, DISABLED_MEMORY_LIMIT);
1881 value = normalizeMemoryConfigValue(value);
1882 if (value == DISABLED_MEMORY_LIMIT) {
1883 value = Math.max(getMemoryForMapTask(), getMemoryForReduceTask());
1884 value = normalizeMemoryConfigValue(value);
1885 if (value != DISABLED_MEMORY_LIMIT) {
1886 value *= 1024*1024;
1887 }
1888 }
1889 return value;
1890 }
1891
1892 /**
1893 * Set the maximum amount of memory any task of this job can use. See
1894 * {@link #MAPRED_TASK_MAXVMEM_PROPERTY}
1895 * <p/>
1896 * mapred.task.maxvmem is split into
1897 * mapreduce.map.memory.mb
1898 * and mapreduce.map.memory.mb,mapred
1899 * each of the new key are set
1900 * as mapred.task.maxvmem / 1024
1901 * as new values are in MB
1902 *
1903 * @param vmem Maximum amount of virtual memory in bytes any task of this job
1904 * can use.
1905 * @see #getMaxVirtualMemoryForTask()
1906 * @deprecated
1907 * Use {@link #setMemoryForMapTask(long mem)} and
1908 * Use {@link #setMemoryForReduceTask(long mem)}
1909 */
1910 @Deprecated
1911 public void setMaxVirtualMemoryForTask(long vmem) {
1912 LOG.warn("setMaxVirtualMemoryForTask() is deprecated."+
1913 "Instead use setMemoryForMapTask() and setMemoryForReduceTask()");
1914 if(vmem != DISABLED_MEMORY_LIMIT && vmem < 0) {
1915 setMemoryForMapTask(DISABLED_MEMORY_LIMIT);
1916 setMemoryForReduceTask(DISABLED_MEMORY_LIMIT);
1917 }
1918
1919 if(get(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) == null) {
1920 setMemoryForMapTask(vmem / (1024 * 1024)); //Changing bytes to mb
1921 setMemoryForReduceTask(vmem / (1024 * 1024));//Changing bytes to mb
1922 }else{
1923 this.setLong(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY,vmem);
1924 }
1925 }
1926
1927 /**
1928 * @deprecated this variable is deprecated and nolonger in use.
1929 */
1930 @Deprecated
1931 public long getMaxPhysicalMemoryForTask() {
1932 LOG.warn("The API getMaxPhysicalMemoryForTask() is deprecated."
1933 + " Refer to the APIs getMemoryForMapTask() and"
1934 + " getMemoryForReduceTask() for details.");
1935 return -1;
1936 }
1937
1938 /*
1939 * @deprecated this
1940 */
1941 @Deprecated
1942 public void setMaxPhysicalMemoryForTask(long mem) {
1943 LOG.warn("The API setMaxPhysicalMemoryForTask() is deprecated."
1944 + " The value set is ignored. Refer to "
1945 + " setMemoryForMapTask() and setMemoryForReduceTask() for details.");
1946 }
1947
1948 static String deprecatedString(String key) {
1949 return "The variable " + key + " is no longer used.";
1950 }
1951
1952 private void checkAndWarnDeprecation() {
1953 if(get(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) != null) {
1954 LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY)
1955 + " Instead use " + JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY
1956 + " and " + JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY);
1957 }
1958 if(get(JobConf.MAPRED_TASK_ULIMIT) != null ) {
1959 LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_TASK_ULIMIT));
1960 }
1961 if(get(JobConf.MAPRED_MAP_TASK_ULIMIT) != null ) {
1962 LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_MAP_TASK_ULIMIT));
1963 }
1964 if(get(JobConf.MAPRED_REDUCE_TASK_ULIMIT) != null ) {
1965 LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_REDUCE_TASK_ULIMIT));
1966 }
1967 }
1968
1969
1970 }
1971