Skip to content

Commit

Permalink
PIG-4496: Fix CBZip2InputStream to close underlying stream
Browse files Browse the repository at this point in the history
git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1678878 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
Jianyong Dai committed May 12, 2015
1 parent 6e297a5 commit fba27de
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 45 deletions.
2 changes: 2 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ PIG-4333: Split BigData tests into multiple groups (rohini)

BUG FIXES

PIG-4496: Fix CBZip2InputStream to close underlying stream (petersla via daijy)

PIG-4528: Fix a typo in src/docs/src/documentation/content/xdocs/basic.xml (namusyaka via daijy)

PIG-4532: Pig Documentation contains typo for AvroStorage (fredericschmaljohann via daijy)
Expand Down
27 changes: 16 additions & 11 deletions lib-src/bzip2/org/apache/tools/bzip2r/CBZip2InputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,10 @@ public CBZip2InputStream(FSDataInputStream zStream, int blockSize, long end)

@Override
public int read() throws IOException {
if (this.innerBsStream == null) {
throw new IOException("stream closed");
}

if (streamEnd) {
return -1;
} else {
Expand Down Expand Up @@ -264,6 +268,18 @@ public int read() throws IOException {
}
}

@Override
public void close() throws IOException {
if (this.innerBsStream == null) {
return;
}
try {
innerBsStream.close();
} finally {
this.innerBsStream = null;
}
}

/**
* getPos is used by the caller to know when the processing of the current
* {@link InputSplit} is complete. In this method, as we read each bzip
Expand Down Expand Up @@ -291,7 +307,6 @@ private void initialize(int blockSize) throws IOException {
magic4 = bsGetUChar();
if (magic1 != 'B' || magic2 != 'Z' ||
magic3 != 'h' || magic4 < '1' || magic4 > '9') {
bsFinishedWithStream();
streamEnd = true;
return;
}
Expand All @@ -308,7 +323,6 @@ private void initialize(int blockSize) throws IOException {

private void initBlock(boolean searchForMagic) throws IOException {
if (readCount >= readLimit) {
bsFinishedWithStream();
streamEnd = true;
return;
}
Expand Down Expand Up @@ -408,7 +422,6 @@ private void complete() throws IOException {
throw new IOException("Encountered additional bytes in the filesplit past the crc block. "
+ "Loading of concatenated bz2 files is not supported");
}
bsFinishedWithStream();
streamEnd = true;
}

Expand All @@ -424,14 +437,6 @@ private static void crcError() throws IOException {
cadvise("CRC error");
}

private void bsFinishedWithStream() {
if (this.innerBsStream != null) {
if (this.innerBsStream != System.in) {
this.innerBsStream = null;
}
}
}

private void bsSetStream(FSDataInputStream f) {
innerBsStream = f;
bsLive = 0;
Expand Down
71 changes: 37 additions & 34 deletions test/org/apache/pig/test/TestBZip.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,23 @@
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.test.utils.CloseAwareFSDataInputStream;
import org.apache.pig.test.utils.CloseAwareOutputStream;
import org.apache.tools.bzip2r.CBZip2InputStream;
import org.apache.tools.bzip2r.CBZip2OutputStream;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class TestBZip {
private static Properties properties;
private static MiniGenericCluster cluster;

@Rule
public TemporaryFolder folder = new TemporaryFolder();

@BeforeClass
public static void oneTimeSetUp() throws Exception {
cluster = MiniGenericCluster.buildCluster();
Expand All @@ -73,10 +80,9 @@ public static void oneTimeTearDown() throws Exception {
public void testBzipInPig() throws Exception {
PigServer pig = new PigServer(cluster.getExecType(), properties);

File in = File.createTempFile("junit", ".bz2");
in.deleteOnExit();
File in = folder.newFile("junit-in.bz2");

File out = File.createTempFile("junit", ".bz2");
File out = folder.newFile("junit-out.bz2");
out.delete();
String clusterOutput = Util.removeColon(out.getAbsolutePath());

Expand Down Expand Up @@ -121,9 +127,6 @@ public void testBzipInPig() throws Exception {
for (int j = 1; j < 100; j++) {
assertEquals(new Integer(j), map.get(j));
}

in.delete();
Util.deleteFile(cluster, clusterOutput);
}

/**
Expand All @@ -133,10 +136,9 @@ public void testBzipInPig() throws Exception {
public void testBzipInPig2() throws Exception {
PigServer pig = new PigServer(cluster.getExecType(), properties);

File in = File.createTempFile("junit", ".bz2");
in.deleteOnExit();
File in = folder.newFile("junit-in.bz2");

File out = File.createTempFile("junit", ".bz2");
File out = folder.newFile("junit-out.bz2");
out.delete();
String clusterOutput = Util.removeColon(out.getAbsolutePath());

Expand Down Expand Up @@ -181,9 +183,6 @@ public void testBzipInPig2() throws Exception {
for (int j = 1; j < 100; j++) {
assertEquals(new Integer(j), map.get(j));
}

in.delete();
out.delete();
}

//see PIG-2391
Expand All @@ -197,10 +196,9 @@ public void testBz2() throws Exception {
};

// bzip compressed input
File in = File.createTempFile("junit", ".bz2");
File in = folder.newFile("junit-in.bz2");
String compressedInputFileName = in.getAbsolutePath();
String clusterCompressedFilePath = Util.removeColon(compressedInputFileName);
in.deleteOnExit();

try {
CBZip2OutputStream cos =
Expand Down Expand Up @@ -230,7 +228,6 @@ public void testBz2() throws Exception {
it2.next();
}
} finally {
in.delete();
Util.deleteFile(cluster, "intermediate.bz");
Util.deleteFile(cluster, "final.bz");
}
Expand All @@ -249,9 +246,8 @@ public void testRecordDelims() throws Exception {
};

// bzip compressed input
File in = File.createTempFile("junit", ".bz2");
File in = folder.newFile("junit-in.bz2");
String compressedInputFileName = in.getAbsolutePath();
in.deleteOnExit();
String clusterCompressedFilePath = Util.removeColon(compressedInputFileName);

String unCompressedInputFileName = "testRecordDelims-uncomp.txt";
Expand Down Expand Up @@ -291,7 +287,6 @@ public void testRecordDelims() throws Exception {
assertFalse(it2.hasNext());

} finally {
in.delete();
Util.deleteFile(cluster, unCompressedInputFileName);
Util.deleteFile(cluster, clusterCompressedFilePath);
}
Expand All @@ -305,10 +300,9 @@ public void testRecordDelims() throws Exception {
public void testEmptyBzipInPig() throws Exception {
PigServer pig = new PigServer(cluster.getExecType(), properties);

File in = File.createTempFile("junit", ".tmp");
in.deleteOnExit();
File in = folder.newFile("junit-in.tmp");

File out = File.createTempFile("junit", ".bz2");
File out = folder.newFile("junit-out.bz2");
out.delete();
String clusterOutputFilePath = Util.removeColon(out.getAbsolutePath());

Expand Down Expand Up @@ -336,19 +330,14 @@ public void testEmptyBzipInPig() throws Exception {

pig.registerQuery("B = load '" + Util.encodeEscape(clusterOutputFilePath) + "';");
pig.openIterator("B");

in.delete();
Util.deleteFile(cluster, clusterOutputFilePath);

}

/**
* Tests the writing and reading of an empty BZip file.
*/
@Test
public void testEmptyBzip() throws Exception {
File tmp = File.createTempFile("junit", ".tmp");
tmp.deleteOnExit();
File tmp = folder.newFile("junit.tmp");
CBZip2OutputStream cos = new CBZip2OutputStream(new FileOutputStream(
tmp));
cos.close();
Expand All @@ -358,7 +347,25 @@ public void testEmptyBzip() throws Exception {
fs.open(new Path(tmp.getAbsolutePath())), -1, tmp.length());
assertEquals(-1, cis.read(new byte[100]));
cis.close();
tmp.delete();
}

@Test
public void testInnerStreamGetsClosed() throws Exception {
File tmp = folder.newFile("junit.tmp");

CloseAwareOutputStream out = new CloseAwareOutputStream(new FileOutputStream(tmp));
CBZip2OutputStream cos = new CBZip2OutputStream(out);
assertFalse(out.isClosed());
cos.close();
assertTrue(out.isClosed());

FileSystem fs = FileSystem.getLocal(new Configuration(false));
Path path = new Path(tmp.getAbsolutePath());
CloseAwareFSDataInputStream in = new CloseAwareFSDataInputStream(fs.open(path));
CBZip2InputStream cis = new CBZip2InputStream(in, -1, tmp.length());
assertFalse(in.isClosed());
cis.close();
assertTrue(in.isClosed());
}

/**
Expand Down Expand Up @@ -556,14 +563,12 @@ public void testBZ2Concatenation() throws Exception {
};

// bzip compressed input file1
File in1 = File.createTempFile("junit", ".bz2");
File in1 = folder.newFile("junit-in1.bz2");
String compressedInputFileName1 = in1.getAbsolutePath();
in1.deleteOnExit();

// file2
File in2 = File.createTempFile("junit", ".bz2");
File in2 = folder.newFile("junit-in2.bz2");
String compressedInputFileName2 = in2.getAbsolutePath();
in1.deleteOnExit();

String unCompressedInputFileName = "testRecordDelims-uncomp.txt";
Util.createInputFile(cluster, unCompressedInputFileName, inputDataMerged);
Expand Down Expand Up @@ -614,8 +619,6 @@ public void testBZ2Concatenation() throws Exception {
assertFalse(it2.hasNext());

} finally {
in1.delete();
in2.delete();
Util.deleteFile(cluster, unCompressedInputFileName);
}

Expand Down
43 changes: 43 additions & 0 deletions test/org/apache/pig/test/utils/CloseAwareFSDataInputStream.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.test.utils;

import org.apache.hadoop.fs.FSDataInputStream;

import java.io.IOException;
import java.io.InputStream;

public class CloseAwareFSDataInputStream extends FSDataInputStream {

private boolean isClosed;

public CloseAwareFSDataInputStream(InputStream in) throws IOException {
super(in);
isClosed = false;
}

@Override
public void close() throws IOException {
super.close();
isClosed = true;
}

public boolean isClosed() {
return isClosed;
}
}
47 changes: 47 additions & 0 deletions test/org/apache/pig/test/utils/CloseAwareOutputStream.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.test.utils;

import java.io.IOException;
import java.io.OutputStream;

public class CloseAwareOutputStream extends OutputStream {

private final OutputStream out;
private boolean isClosed;

public CloseAwareOutputStream(OutputStream out) {
this.out = out;
this.isClosed = false;
}

@Override
public void write(int b) throws IOException {
out.write(b);
}

@Override
public void close() throws IOException {
out.close();
isClosed = true;
}

public boolean isClosed() {
return isClosed;
}
}

0 comments on commit fba27de

Please sign in to comment.