001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.io.compress;
020
021 import java.io.BufferedInputStream;
022 import java.io.IOException;
023 import java.io.InputStream;
024 import java.io.OutputStream;
025
026
027 import org.apache.hadoop.classification.InterfaceAudience;
028 import org.apache.hadoop.classification.InterfaceStability;
029 import org.apache.hadoop.fs.Seekable;
030 import org.apache.hadoop.io.compress.bzip2.BZip2Constants;
031 import org.apache.hadoop.io.compress.bzip2.BZip2DummyCompressor;
032 import org.apache.hadoop.io.compress.bzip2.BZip2DummyDecompressor;
033 import org.apache.hadoop.io.compress.bzip2.CBZip2InputStream;
034 import org.apache.hadoop.io.compress.bzip2.CBZip2OutputStream;
035
036 /**
037 * This class provides CompressionOutputStream and CompressionInputStream for
038 * compression and decompression. Currently we dont have an implementation of
039 * the Compressor and Decompressor interfaces, so those methods of
040 * CompressionCodec which have a Compressor or Decompressor type argument, throw
041 * UnsupportedOperationException.
042 */
043 @InterfaceAudience.Public
044 @InterfaceStability.Evolving
045 public class BZip2Codec implements SplittableCompressionCodec {
046
047 private static final String HEADER = "BZ";
048 private static final int HEADER_LEN = HEADER.length();
049 private static final String SUB_HEADER = "h9";
050 private static final int SUB_HEADER_LEN = SUB_HEADER.length();
051
052 /**
053 * Creates a new instance of BZip2Codec
054 */
055 public BZip2Codec() { }
056
057 /**
058 * Creates CompressionOutputStream for BZip2
059 *
060 * @param out
061 * The output Stream
062 * @return The BZip2 CompressionOutputStream
063 * @throws java.io.IOException
064 * Throws IO exception
065 */
066 @Override
067 public CompressionOutputStream createOutputStream(OutputStream out)
068 throws IOException {
069 return new BZip2CompressionOutputStream(out);
070 }
071
072 /**
073 * Creates a compressor using given OutputStream.
074 *
075 * @return CompressionOutputStream
076 @throws java.io.IOException
077 */
078 @Override
079 public CompressionOutputStream createOutputStream(OutputStream out,
080 Compressor compressor) throws IOException {
081 return createOutputStream(out);
082 }
083
084 /**
085 * This functionality is currently not supported.
086 *
087 * @return BZip2DummyCompressor.class
088 */
089 @Override
090 public Class<? extends org.apache.hadoop.io.compress.Compressor> getCompressorType() {
091 return BZip2DummyCompressor.class;
092 }
093
094 /**
095 * This functionality is currently not supported.
096 *
097 * @return Compressor
098 */
099 @Override
100 public Compressor createCompressor() {
101 return new BZip2DummyCompressor();
102 }
103
104 /**
105 * Creates CompressionInputStream to be used to read off uncompressed data.
106 *
107 * @param in
108 * The InputStream
109 * @return Returns CompressionInputStream for BZip2
110 * @throws java.io.IOException
111 * Throws IOException
112 */
113 @Override
114 public CompressionInputStream createInputStream(InputStream in)
115 throws IOException {
116 return new BZip2CompressionInputStream(in);
117 }
118
119 /**
120 * This functionality is currently not supported.
121 *
122 * @return CompressionInputStream
123 */
124 @Override
125 public CompressionInputStream createInputStream(InputStream in,
126 Decompressor decompressor) throws IOException {
127 return createInputStream(in);
128 }
129
130 /**
131 * Creates CompressionInputStream to be used to read off uncompressed data
132 * in one of the two reading modes. i.e. Continuous or Blocked reading modes
133 *
134 * @param seekableIn The InputStream
135 * @param start The start offset into the compressed stream
136 * @param end The end offset into the compressed stream
137 * @param readMode Controls whether progress is reported continuously or
138 * only at block boundaries.
139 *
140 * @return CompressionInputStream for BZip2 aligned at block boundaries
141 */
142 @Override
143 public SplitCompressionInputStream createInputStream(InputStream seekableIn,
144 Decompressor decompressor, long start, long end, READ_MODE readMode)
145 throws IOException {
146
147 if (!(seekableIn instanceof Seekable)) {
148 throw new IOException("seekableIn must be an instance of " +
149 Seekable.class.getName());
150 }
151
152 //find the position of first BZip2 start up marker
153 ((Seekable)seekableIn).seek(0);
154
155 // BZip2 start of block markers are of 6 bytes. But the very first block
156 // also has "BZh9", making it 10 bytes. This is the common case. But at
157 // time stream might start without a leading BZ.
158 final long FIRST_BZIP2_BLOCK_MARKER_POSITION =
159 CBZip2InputStream.numberOfBytesTillNextMarker(seekableIn);
160 long adjStart = Math.max(0L, start - FIRST_BZIP2_BLOCK_MARKER_POSITION);
161
162 ((Seekable)seekableIn).seek(adjStart);
163 SplitCompressionInputStream in =
164 new BZip2CompressionInputStream(seekableIn, adjStart, end, readMode);
165
166
167 // The following if clause handles the following case:
168 // Assume the following scenario in BZip2 compressed stream where
169 // . represent compressed data.
170 // .....[48 bit Block].....[48 bit Block].....[48 bit Block]...
171 // ........................[47 bits][1 bit].....[48 bit Block]...
172 // ................................^[Assume a Byte alignment here]
173 // ........................................^^[current position of stream]
174 // .....................^^[We go back 10 Bytes in stream and find a Block marker]
175 // ........................................^^[We align at wrong position!]
176 // ...........................................................^^[While this pos is correct]
177
178 if (in.getPos() <= start) {
179 ((Seekable)seekableIn).seek(start);
180 in = new BZip2CompressionInputStream(seekableIn, start, end, readMode);
181 }
182
183 return in;
184 }
185
186 /**
187 * This functionality is currently not supported.
188 *
189 * @return BZip2DummyDecompressor.class
190 */
191 @Override
192 public Class<? extends org.apache.hadoop.io.compress.Decompressor> getDecompressorType() {
193 return BZip2DummyDecompressor.class;
194 }
195
196 /**
197 * This functionality is currently not supported.
198 *
199 * @return Decompressor
200 */
201 @Override
202 public Decompressor createDecompressor() {
203 return new BZip2DummyDecompressor();
204 }
205
206 /**
207 * .bz2 is recognized as the default extension for compressed BZip2 files
208 *
209 * @return A String telling the default bzip2 file extension
210 */
211 @Override
212 public String getDefaultExtension() {
213 return ".bz2";
214 }
215
216 private static class BZip2CompressionOutputStream extends
217 CompressionOutputStream {
218
219 // class data starts here//
220 private CBZip2OutputStream output;
221 private boolean needsReset;
222 // class data ends here//
223
224 public BZip2CompressionOutputStream(OutputStream out)
225 throws IOException {
226 super(out);
227 needsReset = true;
228 }
229
230 private void writeStreamHeader() throws IOException {
231 if (super.out != null) {
232 // The compressed bzip2 stream should start with the
233 // identifying characters BZ. Caller of CBZip2OutputStream
234 // i.e. this class must write these characters.
235 out.write(HEADER.getBytes());
236 }
237 }
238
239 @Override
240 public void finish() throws IOException {
241 if (needsReset) {
242 // In the case that nothing is written to this stream, we still need to
243 // write out the header before closing, otherwise the stream won't be
244 // recognized by BZip2CompressionInputStream.
245 internalReset();
246 }
247 this.output.finish();
248 needsReset = true;
249 }
250
251 private void internalReset() throws IOException {
252 if (needsReset) {
253 needsReset = false;
254 writeStreamHeader();
255 this.output = new CBZip2OutputStream(out);
256 }
257 }
258
259 @Override
260 public void resetState() throws IOException {
261 // Cannot write to out at this point because out might not be ready
262 // yet, as in SequenceFile.Writer implementation.
263 needsReset = true;
264 }
265
266 @Override
267 public void write(int b) throws IOException {
268 if (needsReset) {
269 internalReset();
270 }
271 this.output.write(b);
272 }
273
274 @Override
275 public void write(byte[] b, int off, int len) throws IOException {
276 if (needsReset) {
277 internalReset();
278 }
279 this.output.write(b, off, len);
280 }
281
282 @Override
283 public void close() throws IOException {
284 if (needsReset) {
285 // In the case that nothing is written to this stream, we still need to
286 // write out the header before closing, otherwise the stream won't be
287 // recognized by BZip2CompressionInputStream.
288 internalReset();
289 }
290 this.output.flush();
291 this.output.close();
292 needsReset = true;
293 }
294
295 }// end of class BZip2CompressionOutputStream
296
297 /**
298 * This class is capable to de-compress BZip2 data in two modes;
299 * CONTINOUS and BYBLOCK. BYBLOCK mode makes it possible to
300 * do decompression starting any arbitrary position in the stream.
301 *
302 * So this facility can easily be used to parallelize decompression
303 * of a large BZip2 file for performance reasons. (It is exactly
304 * done so for Hadoop framework. See LineRecordReader for an
305 * example). So one can break the file (of course logically) into
306 * chunks for parallel processing. These "splits" should be like
307 * default Hadoop splits (e.g as in FileInputFormat getSplit metod).
308 * So this code is designed and tested for FileInputFormat's way
309 * of splitting only.
310 */
311
312 private static class BZip2CompressionInputStream extends
313 SplitCompressionInputStream {
314
315 // class data starts here//
316 private CBZip2InputStream input;
317 boolean needsReset;
318 private BufferedInputStream bufferedIn;
319 private boolean isHeaderStripped = false;
320 private boolean isSubHeaderStripped = false;
321 private READ_MODE readMode = READ_MODE.CONTINUOUS;
322 private long startingPos = 0L;
323
324 // Following state machine handles different states of compressed stream
325 // position
326 // HOLD : Don't advertise compressed stream position
327 // ADVERTISE : Read 1 more character and advertise stream position
328 // See more comments about it before updatePos method.
329 private enum POS_ADVERTISEMENT_STATE_MACHINE {
330 HOLD, ADVERTISE
331 };
332
333 POS_ADVERTISEMENT_STATE_MACHINE posSM = POS_ADVERTISEMENT_STATE_MACHINE.HOLD;
334 long compressedStreamPosition = 0;
335
336 // class data ends here//
337
338 public BZip2CompressionInputStream(InputStream in) throws IOException {
339 this(in, 0L, Long.MAX_VALUE, READ_MODE.CONTINUOUS);
340 }
341
342 public BZip2CompressionInputStream(InputStream in, long start, long end,
343 READ_MODE readMode) throws IOException {
344 super(in, start, end);
345 needsReset = false;
346 bufferedIn = new BufferedInputStream(super.in);
347 this.startingPos = super.getPos();
348 this.readMode = readMode;
349 if (this.startingPos == 0) {
350 // We only strip header if it is start of file
351 bufferedIn = readStreamHeader();
352 }
353 input = new CBZip2InputStream(bufferedIn, readMode);
354 if (this.isHeaderStripped) {
355 input.updateReportedByteCount(HEADER_LEN);
356 }
357
358 if (this.isSubHeaderStripped) {
359 input.updateReportedByteCount(SUB_HEADER_LEN);
360 }
361
362 this.updatePos(false);
363 }
364
365 private BufferedInputStream readStreamHeader() throws IOException {
366 // We are flexible enough to allow the compressed stream not to
367 // start with the header of BZ. So it works fine either we have
368 // the header or not.
369 if (super.in != null) {
370 bufferedIn.mark(HEADER_LEN);
371 byte[] headerBytes = new byte[HEADER_LEN];
372 int actualRead = bufferedIn.read(headerBytes, 0, HEADER_LEN);
373 if (actualRead != -1) {
374 String header = new String(headerBytes);
375 if (header.compareTo(HEADER) != 0) {
376 bufferedIn.reset();
377 } else {
378 this.isHeaderStripped = true;
379 // In case of BYBLOCK mode, we also want to strip off
380 // remaining two character of the header.
381 if (this.readMode == READ_MODE.BYBLOCK) {
382 actualRead = bufferedIn.read(headerBytes, 0,
383 SUB_HEADER_LEN);
384 if (actualRead != -1) {
385 this.isSubHeaderStripped = true;
386 }
387 }
388 }
389 }
390 }
391
392 if (bufferedIn == null) {
393 throw new IOException("Failed to read bzip2 stream.");
394 }
395
396 return bufferedIn;
397
398 }// end of method
399
400 @Override
401 public void close() throws IOException {
402 if (!needsReset) {
403 input.close();
404 needsReset = true;
405 }
406 }
407
408 /**
409 * This method updates compressed stream position exactly when the
410 * client of this code has read off at least one byte passed any BZip2
411 * end of block marker.
412 *
413 * This mechanism is very helpful to deal with data level record
414 * boundaries. Please see constructor and next methods of
415 * org.apache.hadoop.mapred.LineRecordReader as an example usage of this
416 * feature. We elaborate it with an example in the following:
417 *
418 * Assume two different scenarios of the BZip2 compressed stream, where
419 * [m] represent end of block, \n is line delimiter and . represent compressed
420 * data.
421 *
422 * ............[m]......\n.......
423 *
424 * ..........\n[m]......\n.......
425 *
426 * Assume that end is right after [m]. In the first case the reading
427 * will stop at \n and there is no need to read one more line. (To see the
428 * reason of reading one more line in the next() method is explained in LineRecordReader.)
429 * While in the second example LineRecordReader needs to read one more line
430 * (till the second \n). Now since BZip2Codecs only update position
431 * at least one byte passed a maker, so it is straight forward to differentiate
432 * between the two cases mentioned.
433 *
434 */
435
436 @Override
437 public int read(byte[] b, int off, int len) throws IOException {
438 if (needsReset) {
439 internalReset();
440 }
441
442 int result = 0;
443 result = this.input.read(b, off, len);
444 if (result == BZip2Constants.END_OF_BLOCK) {
445 this.posSM = POS_ADVERTISEMENT_STATE_MACHINE.ADVERTISE;
446 }
447
448 if (this.posSM == POS_ADVERTISEMENT_STATE_MACHINE.ADVERTISE) {
449 result = this.input.read(b, off, off + 1);
450 // This is the precise time to update compressed stream position
451 // to the client of this code.
452 this.updatePos(true);
453 this.posSM = POS_ADVERTISEMENT_STATE_MACHINE.HOLD;
454 }
455
456 return result;
457
458 }
459
460 @Override
461 public int read() throws IOException {
462 byte b[] = new byte[1];
463 int result = this.read(b, 0, 1);
464 return (result < 0) ? result : (b[0] & 0xff);
465 }
466
467 private void internalReset() throws IOException {
468 if (needsReset) {
469 needsReset = false;
470 BufferedInputStream bufferedIn = readStreamHeader();
471 input = new CBZip2InputStream(bufferedIn, this.readMode);
472 }
473 }
474
475 @Override
476 public void resetState() throws IOException {
477 // Cannot read from bufferedIn at this point because bufferedIn
478 // might not be ready
479 // yet, as in SequenceFile.Reader implementation.
480 needsReset = true;
481 }
482
483 @Override
484 public long getPos() {
485 return this.compressedStreamPosition;
486 }
487
488 /*
489 * As the comments before read method tell that
490 * compressed stream is advertised when at least
491 * one byte passed EOB have been read off. But
492 * there is an exception to this rule. When we
493 * construct the stream we advertise the position
494 * exactly at EOB. In the following method
495 * shouldAddOn boolean captures this exception.
496 *
497 */
498 private void updatePos(boolean shouldAddOn) {
499 int addOn = shouldAddOn ? 1 : 0;
500 this.compressedStreamPosition = this.startingPos
501 + this.input.getProcessedByteCount() + addOn;
502 }
503
504 }// end of BZip2CompressionInputStream
505
506 }