Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -456,8 +456,14 @@ else if(ORIGINAL_PATTERN_COPY.matcher(bucketFileName).matches()) {
return new BucketMetaData(bucketId, copyNumber);
}
else if (bucketFileName.startsWith(BUCKET_PREFIX)) {
return new BucketMetaData(Integer
.parseInt(bucketFileName.substring(bucketFileName.indexOf('_') + 1)), 0);
String rest = bucketFileName.substring(BUCKET_PREFIX.length());
int idxOfCopy = rest.indexOf(Utilities.COPY_KEYWORD);
int copyNumber = 0;
if (idxOfCopy >= 0) {
copyNumber = Integer.parseInt(rest.substring(idxOfCopy + Utilities.COPY_KEYWORD.length()));
rest = rest.substring(0, idxOfCopy);
}
return new BucketMetaData(Integer.parseInt(rest), copyNumber);
}
return INVALID;
}
Expand Down Expand Up @@ -936,6 +942,10 @@ public static ParsedBaseLight parseBase(Path path) {
if (!filename.startsWith(BASE_PREFIX)) {
throw new IllegalArgumentException(filename + " does not start with " + BASE_PREFIX);
}
int idxOfCopy = filename.indexOf(Utilities.COPY_KEYWORD);
if (idxOfCopy >= 0) {
filename = filename.substring(0, idxOfCopy);
}
int idxOfv = filename.indexOf(VISIBILITY_PREFIX);
if (idxOfv < 0) {
return new ParsedBaseLight(Long.parseLong(filename.substring(BASE_PREFIX.length())), path);
Expand Down Expand Up @@ -1063,6 +1073,10 @@ public static class ParsedDeltaLight implements Comparable<ParsedDeltaLight> {

public static ParsedDeltaLight parse(Path deltaDir) {
String filename = deltaDir.getName();
int idxOfCopy = filename.indexOf(Utilities.COPY_KEYWORD);
if (idxOfCopy >= 0) {
filename = filename.substring(0, idxOfCopy);
}
int idxOfVis = filename.indexOf(VISIBILITY_PREFIX);
long visibilityTxnId = 0; // visibilityTxnId:0 is always visible
if (idxOfVis >= 0) {
Expand Down Expand Up @@ -1435,7 +1449,7 @@ private static void findBestWorkingDeltas(ValidWriteIdList writeIdList, AcidDire
//and we want to end up with the best set containing all relevant data: delta_5_20 delta_51_60,
//subject to list of 'exceptions' in 'writeIdList' (not show in above example).
List<ParsedDelta> deltas = new ArrayList<>();
long current = directory.getBase() == null ? 0 : directory.getBase().getWriteId();
long current = directory.getBase() == null ? -1 : directory.getBase().getWriteId();
int lastStmtId = -1;
ParsedDelta prev = null;
for(ParsedDelta next: directory.getCurrentDirectories()) {
Expand Down Expand Up @@ -1935,6 +1949,9 @@ private static void processDeltaDir(Path deltadir, ValidWriteIdList writeIdList,
* checks {@code visibilityTxnId} to see if {@code child} is committed in current snapshot
*/
private static boolean isDirUsable(Path child, long visibilityTxnId, List<Path> aborted, ValidTxnList validTxnList) {
if (visibilityTxnId <= 0) {
return true;
}
if (validTxnList == null) {
throw new IllegalArgumentException("No ValidTxnList for " + child);
}
Expand Down
49 changes: 49 additions & 0 deletions ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,31 @@ public void testParsing() {
assertEquals(2, opts.getMaximumWriteId());
}

@Test
public void testParsingCopySuffix() throws Exception {
AcidUtils.ParsedDeltaLight p = AcidUtils.ParsedDeltaLight.parse(new Path("mock:/tmp/delta_0000001_0000001_0000_copy_1"));
assertEquals(1, p.getMinWriteId());
assertEquals(1, p.getMaxWriteId());
assertEquals(0, p.getStatementId());
}

@Test
public void testParsingBaseCopySuffix() throws Exception {
AcidUtils.ParsedBaseLight p = AcidUtils.ParsedBaseLight.parseBase(new Path("mock:/tmp/base_0000123_copy_1"));
assertEquals(123, p.getWriteId());
}

@Test
public void testBucketMetaDataParsing() throws Exception {
AcidUtils.BucketMetaData b = AcidUtils.BucketMetaData.parse("bucket_-001_copy_1");
assertEquals(-1, b.bucketId);
assertEquals(1, b.copyNumber);

b = AcidUtils.BucketMetaData.parse("bucket_00123_copy_5");
assertEquals(123, b.bucketId);
assertEquals(5, b.copyNumber);
}

@Test
public void testOriginal() throws Exception {
Configuration conf = new Configuration();
Expand Down Expand Up @@ -198,6 +223,30 @@ public void testOriginal() throws Exception {
result.get(6).getFileStatus().getPath().toString());
}

@Test
public void testGetAcidStateWithNullValidTxnList() throws Exception {
Configuration conf = new Configuration();
MockFileSystem fs = new MockFileSystem(conf,
new MockFile("mock:/tbl/part1/delta_0000001_0000001/bucket_0", 500, new byte[0]));
// Not setting VALID_TXNS_KEY in conf, so validTxnList will be null
AcidDirectory dir = AcidUtils.getAcidState(fs, new MockPath(fs, "mock:/tbl/part1"), conf,
new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":"), null, false);
assertEquals(1, dir.getCurrentDirectories().size());
assertEquals("mock:/tbl/part1/delta_0000001_0000001", dir.getCurrentDirectories().get(0).getPath().toString());
}

@Test
public void testGetAcidStateWithDeltaZero() throws Exception {
Configuration conf = new Configuration();
MockFileSystem fs = new MockFileSystem(conf,
new MockFile("mock:/tbl/part1/delta_0000000_0000000_-001/bucket_0", 500, new byte[0]));
// For non-transactional tables, the writeIdList might be empty or start from 0
AcidDirectory dir = AcidUtils.getAcidState(fs, new MockPath(fs, "mock:/tbl/part1"), conf,
new ValidReaderWriteIdList("tbl:100:0:"), null, false);
// This is currently failing (returns 0) but should return 1
assertEquals(1, dir.getCurrentDirectories().size());
}

@Test
public void testOriginalDeltas() throws Exception {
Configuration conf = new Configuration();
Expand Down
40 changes: 40 additions & 0 deletions ql/src/test/queries/clientpositive/hive_29481.q
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
set hive.support.concurrency=true;
set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;

-- 1. Test Insert-Only Table (Reproduce HIVE-29481)
CREATE TABLE hive_29481_io(id INT) STORED AS ORC
LOCATION '${system:test.tmp.dir}/hive_29481_io'
TBLPROPERTIES('transactional'='true', 'transactional_properties'='insert_only');

INSERT INTO hive_29481_io VALUES (1);
INSERT INTO hive_29481_io VALUES (4),(5),(6);

-- Manually create a directory with a _copy_ suffix (common during name collisions in moves)
-- AcidUtils used to fail with NumberFormatException on the suffix or ignore it if writeId was 0.
dfs -mkdir ${system:test.tmp.dir}/hive_29481_io/delta_0000000_0000000_copy_1;
-- Create a dummy file inside to ensure it's processed
dfs -touchz ${system:test.tmp.dir}/hive_29481_io/delta_0000000_0000000_copy_1/bucket_00000;

SELECT * FROM hive_29481_io;

-- 2. Test Full ACID Table (CRUD)
CREATE TABLE hive_29481_crud(id INT, val STRING) STORED AS ORC
LOCATION '${system:test.tmp.dir}/hive_29481_crud'
TBLPROPERTIES('transactional'='true');

INSERT INTO hive_29481_crud VALUES (1, 'a'), (2, 'b');

-- Update a row (creates delta directory)
UPDATE hive_29481_crud SET val = 'updated' WHERE id = 1;

-- Delete a row (creates delete_delta directory)
DELETE FROM hive_29481_crud WHERE id = 2;

-- Simulate a _copy_ suffix on a delete_delta directory
dfs -mkdir ${system:test.tmp.dir}/hive_29481_crud/delete_delta_0000004_0000004_0000_copy_1;

SELECT * FROM hive_29481_crud;

-- Clean up
DROP TABLE hive_29481_io;
DROP TABLE hive_29481_crud;
113 changes: 113 additions & 0 deletions ql/src/test/results/clientpositive/hive_29481.q.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
PREHOOK: query: CREATE TABLE hive_29481_io(id INT) STORED AS ORC
#### A masked pattern was here ####
TBLPROPERTIES('transactional'='true', 'transactional_properties'='insert_only')
PREHOOK: type: CREATETABLE
#### A masked pattern was here ####
PREHOOK: Output: database:default
PREHOOK: Output: default@hive_29481_io
POSTHOOK: query: CREATE TABLE hive_29481_io(id INT) STORED AS ORC
#### A masked pattern was here ####
TBLPROPERTIES('transactional'='true', 'transactional_properties'='insert_only')
POSTHOOK: type: CREATETABLE
#### A masked pattern was here ####
POSTHOOK: Output: database:default
POSTHOOK: Output: default@hive_29481_io
PREHOOK: query: INSERT INTO hive_29481_io VALUES (1)
PREHOOK: type: QUERY
PREHOOK: Input: _dummy_database@_dummy_table
PREHOOK: Output: default@hive_29481_io
POSTHOOK: query: INSERT INTO hive_29481_io VALUES (1)
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@hive_29481_io
POSTHOOK: Lineage: hive_29481_io.id SCRIPT []
PREHOOK: query: INSERT INTO hive_29481_io VALUES (4),(5),(6)
PREHOOK: type: QUERY
PREHOOK: Input: _dummy_database@_dummy_table
PREHOOK: Output: default@hive_29481_io
POSTHOOK: query: INSERT INTO hive_29481_io VALUES (4),(5),(6)
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@hive_29481_io
POSTHOOK: Lineage: hive_29481_io.id SCRIPT []
PREHOOK: query: SELECT * FROM hive_29481_io
PREHOOK: type: QUERY
PREHOOK: Input: default@hive_29481_io
#### A masked pattern was here ####
POSTHOOK: query: SELECT * FROM hive_29481_io
POSTHOOK: type: QUERY
POSTHOOK: Input: default@hive_29481_io
#### A masked pattern was here ####
1
4
5
6
PREHOOK: query: CREATE TABLE hive_29481_crud(id INT, val STRING) STORED AS ORC
#### A masked pattern was here ####
TBLPROPERTIES('transactional'='true')
PREHOOK: type: CREATETABLE
#### A masked pattern was here ####
PREHOOK: Output: database:default
PREHOOK: Output: default@hive_29481_crud
POSTHOOK: query: CREATE TABLE hive_29481_crud(id INT, val STRING) STORED AS ORC
#### A masked pattern was here ####
TBLPROPERTIES('transactional'='true')
POSTHOOK: type: CREATETABLE
#### A masked pattern was here ####
POSTHOOK: Output: database:default
POSTHOOK: Output: default@hive_29481_crud
PREHOOK: query: INSERT INTO hive_29481_crud VALUES (1, 'a'), (2, 'b')
PREHOOK: type: QUERY
PREHOOK: Input: _dummy_database@_dummy_table
PREHOOK: Output: default@hive_29481_crud
POSTHOOK: query: INSERT INTO hive_29481_crud VALUES (1, 'a'), (2, 'b')
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@hive_29481_crud
POSTHOOK: Lineage: hive_29481_crud.id SCRIPT []
POSTHOOK: Lineage: hive_29481_crud.val SCRIPT []
PREHOOK: query: UPDATE hive_29481_crud SET val = 'updated' WHERE id = 1
PREHOOK: type: QUERY
PREHOOK: Input: default@hive_29481_crud
PREHOOK: Output: default@hive_29481_crud
PREHOOK: Output: default@hive_29481_crud
POSTHOOK: query: UPDATE hive_29481_crud SET val = 'updated' WHERE id = 1
POSTHOOK: type: QUERY
POSTHOOK: Input: default@hive_29481_crud
POSTHOOK: Output: default@hive_29481_crud
POSTHOOK: Output: default@hive_29481_crud
POSTHOOK: Lineage: hive_29481_crud.id SIMPLE []
POSTHOOK: Lineage: hive_29481_crud.val SIMPLE []
PREHOOK: query: DELETE FROM hive_29481_crud WHERE id = 2
PREHOOK: type: QUERY
PREHOOK: Input: default@hive_29481_crud
PREHOOK: Output: default@hive_29481_crud
POSTHOOK: query: DELETE FROM hive_29481_crud WHERE id = 2
POSTHOOK: type: QUERY
POSTHOOK: Input: default@hive_29481_crud
POSTHOOK: Output: default@hive_29481_crud
PREHOOK: query: SELECT * FROM hive_29481_crud
PREHOOK: type: QUERY
PREHOOK: Input: default@hive_29481_crud
#### A masked pattern was here ####
POSTHOOK: query: SELECT * FROM hive_29481_crud
POSTHOOK: type: QUERY
POSTHOOK: Input: default@hive_29481_crud
#### A masked pattern was here ####
1 updated
PREHOOK: query: DROP TABLE hive_29481_io
PREHOOK: type: DROPTABLE
PREHOOK: Input: default@hive_29481_io
PREHOOK: Output: default@hive_29481_io
POSTHOOK: query: DROP TABLE hive_29481_io
POSTHOOK: type: DROPTABLE
POSTHOOK: Input: default@hive_29481_io
POSTHOOK: Output: default@hive_29481_io
PREHOOK: query: DROP TABLE hive_29481_crud
PREHOOK: type: DROPTABLE
PREHOOK: Input: default@hive_29481_crud
PREHOOK: Output: default@hive_29481_crud
POSTHOOK: query: DROP TABLE hive_29481_crud
POSTHOOK: type: DROPTABLE
POSTHOOK: Input: default@hive_29481_crud
POSTHOOK: Output: default@hive_29481_crud
Loading