001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.hadoop.mapred;
019
020 import java.io.IOException;
021
022 import org.apache.hadoop.classification.InterfaceAudience;
023 import org.apache.hadoop.classification.InterfaceStability;
024 import org.apache.hadoop.fs.FileSystem;
025 import org.apache.hadoop.fs.Path;
026 import org.apache.hadoop.io.BytesWritable;
027 import org.apache.hadoop.io.SequenceFile;
028 import org.apache.hadoop.io.SequenceFile.CompressionType;
029 import org.apache.hadoop.io.Writable;
030 import org.apache.hadoop.io.WritableComparable;
031 import org.apache.hadoop.io.compress.CompressionCodec;
032 import org.apache.hadoop.io.compress.DefaultCodec;
033 import org.apache.hadoop.util.Progressable;
034 import org.apache.hadoop.util.ReflectionUtils;
035
036 /**
037 * An {@link OutputFormat} that writes keys, values to
038 * {@link SequenceFile}s in binary(raw) format
039 */
040 @InterfaceAudience.Public
041 @InterfaceStability.Stable
042 public class SequenceFileAsBinaryOutputFormat
043 extends SequenceFileOutputFormat <BytesWritable,BytesWritable> {
044
045 /**
046 * Inner class used for appendRaw
047 */
048 static protected class WritableValueBytes extends org.apache.hadoop.mapreduce
049 .lib.output.SequenceFileAsBinaryOutputFormat.WritableValueBytes {
050 }
051
052 /**
053 * Set the key class for the {@link SequenceFile}
054 * <p>This allows the user to specify the key class to be different
055 * from the actual class ({@link BytesWritable}) used for writing </p>
056 *
057 * @param conf the {@link JobConf} to modify
058 * @param theClass the SequenceFile output key class.
059 */
060 static public void setSequenceFileOutputKeyClass(JobConf conf,
061 Class<?> theClass) {
062 conf.setClass(org.apache.hadoop.mapreduce.lib.output.
063 SequenceFileAsBinaryOutputFormat.KEY_CLASS, theClass, Object.class);
064 }
065
066 /**
067 * Set the value class for the {@link SequenceFile}
068 * <p>This allows the user to specify the value class to be different
069 * from the actual class ({@link BytesWritable}) used for writing </p>
070 *
071 * @param conf the {@link JobConf} to modify
072 * @param theClass the SequenceFile output key class.
073 */
074 static public void setSequenceFileOutputValueClass(JobConf conf,
075 Class<?> theClass) {
076 conf.setClass(org.apache.hadoop.mapreduce.lib.output.
077 SequenceFileAsBinaryOutputFormat.VALUE_CLASS, theClass, Object.class);
078 }
079
080 /**
081 * Get the key class for the {@link SequenceFile}
082 *
083 * @return the key class of the {@link SequenceFile}
084 */
085 static public Class<? extends WritableComparable> getSequenceFileOutputKeyClass(JobConf conf) {
086 return conf.getClass(org.apache.hadoop.mapreduce.lib.output.
087 SequenceFileAsBinaryOutputFormat.KEY_CLASS,
088 conf.getOutputKeyClass().asSubclass(WritableComparable.class),
089 WritableComparable.class);
090 }
091
092 /**
093 * Get the value class for the {@link SequenceFile}
094 *
095 * @return the value class of the {@link SequenceFile}
096 */
097 static public Class<? extends Writable> getSequenceFileOutputValueClass(JobConf conf) {
098 return conf.getClass(org.apache.hadoop.mapreduce.lib.output.
099 SequenceFileAsBinaryOutputFormat.VALUE_CLASS,
100 conf.getOutputValueClass().asSubclass(Writable.class), Writable.class);
101 }
102
103 @Override
104 public RecordWriter <BytesWritable, BytesWritable>
105 getRecordWriter(FileSystem ignored, JobConf job,
106 String name, Progressable progress)
107 throws IOException {
108 // get the path of the temporary output file
109 Path file = FileOutputFormat.getTaskOutputPath(job, name);
110
111 FileSystem fs = file.getFileSystem(job);
112 CompressionCodec codec = null;
113 CompressionType compressionType = CompressionType.NONE;
114 if (getCompressOutput(job)) {
115 // find the kind of compression to do
116 compressionType = getOutputCompressionType(job);
117
118 // find the right codec
119 Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,
120 DefaultCodec.class);
121 codec = ReflectionUtils.newInstance(codecClass, job);
122 }
123 final SequenceFile.Writer out =
124 SequenceFile.createWriter(fs, job, file,
125 getSequenceFileOutputKeyClass(job),
126 getSequenceFileOutputValueClass(job),
127 compressionType,
128 codec,
129 progress);
130
131 return new RecordWriter<BytesWritable, BytesWritable>() {
132
133 private WritableValueBytes wvaluebytes = new WritableValueBytes();
134
135 public void write(BytesWritable bkey, BytesWritable bvalue)
136 throws IOException {
137
138 wvaluebytes.reset(bvalue);
139 out.appendRaw(bkey.getBytes(), 0, bkey.getLength(), wvaluebytes);
140 wvaluebytes.reset(null);
141 }
142
143 public void close(Reporter reporter) throws IOException {
144 out.close();
145 }
146
147 };
148
149 }
150
151 @Override
152 public void checkOutputSpecs(FileSystem ignored, JobConf job)
153 throws IOException {
154 super.checkOutputSpecs(ignored, job);
155 if (getCompressOutput(job) &&
156 getOutputCompressionType(job) == CompressionType.RECORD ){
157 throw new InvalidJobConfException("SequenceFileAsBinaryOutputFormat "
158 + "doesn't support Record Compression" );
159 }
160
161 }
162
163 }