-
Notifications
You must be signed in to change notification settings - Fork 4.8k
HIVE-29088: Add a configuration option to skip corrupted files #5983
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for submitting a PR. I've left comments on every place where I have questions.
Could you share the stacktrace and the reproduction steps? They would be very helpful for the community to review your patch. I would also appreciate it if you could provide more information about the following questions:
- Why is this patch related to protobuf?
- When do we need to skip a corrupted file? I see that the patch modifies a general KV reader, and currently I don’t have a strong reason to support this kind of recovery.
- Why is this recovery logic implemented only in
HadoopShimsSecure$CombineFileRecordReader
? I think there are other classes implementing combined RecordReaders that might also be controlled by the new configuration.
@@ -814,6 +814,10 @@ public static enum ConfVars { | |||
"Whether to use a native APIs for load queries to non-native table(like iceberg), if false uses a Tez job for" + | |||
" load queries"), | |||
|
|||
HIVE_SKIP_PROTOBUF_CORRUPTFILE("hive.exec.skip.protobuf.corruptfile", false, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-
Could we start the config key with
hive.io
or something other thanhive.exec
? Since this does not affect the hive-exec module, I think the key should not start withhive.exec
. -
We probably don't need the '\n' at the end of the line. Also, could we change it to use 4-space indentation, like this?
HIVE_SKIP_PROTOBUF_CORRUPTFILE("hive.exec.skip.protobuf.corruptfile", false, | |
HIVE_SKIP_PROTOBUF_CORRUPTFILE("hive.exec.skip.protobuf.corruptfile", false, | |
"Whether Hive skips Protocol Buffer files corrupted, " + | |
"during parsing (e.g., due to data truncation or file incompletion) to allow task continuation."), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review. Sure, I'll modify it.
@@ -132,6 +133,7 @@ public static class CombineFileRecordReader<K, V> implements RecordReader<K, V> | |||
protected RecordReader<K, V> curReader; | |||
protected boolean isShrinked; | |||
protected long shrinkedLength; | |||
protected boolean skipCorruptfile; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we change this to private final skipCorruptFile
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review. Sure, I'll modify it.
return (K)(new CombineHiveKey(newKey)); | ||
skipCorruptfile = jc.getBoolean("hive.exec.skip.protobuf.corruptfile", false); | ||
K newKey = null; | ||
if (skipCorruptfile) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we check skipCorruptFile
inside the catch clause like this?
hive/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
Lines 2077 to 2083 in 5512ffd
} catch (NullPointerException e) { | |
if (!isTestVectorizerSuppressFatalExceptions) { | |
// Re-throw without losing original stack trace. | |
throw e; | |
} | |
setNodeIssue("exception: " + VectorizationContext.getStackTraceAsSingleLine(e)); | |
return false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review. Sure, I'll modify it.
try { | ||
newKey = curReader.createKey(); | ||
} catch (NullPointerException e) { | ||
LOG.info("=================Caught exception: " + e.getMessage() + ", skipping file======================"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need the extra '=' characters in the log message. Could you simplify the message? Also, there's no corresponding log message in createValue()
. I think these methods should either both emit logs or neither, for consistency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review. Sure, I'll modify it.
curReader = HiveIOExceptionHandlerUtil.handleRecordReaderCreationException( | ||
e, jc); | ||
} | ||
idx++; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that the skipCorruptFile-enabled path does not properly update idx
. Could you check this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review. The idx++ is not within the skipCorruptFile-enabled path.
jc.set("map.input.file", split.getPath(idx).toString()); | ||
jc.setLong("map.input.start", split.getOffset(idx)); | ||
jc.setLong("map.input.length", split.getLength(idx)); | ||
} catch (InvocationTargetException ITe) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Could we use lowercase for
ITe
? - It looks like we skip all remaining splits once we encounter an
InvocationTargetException
. Shouldn't we proceed with them instead of just returning false?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review. Sure, I'll modify it.
} catch (InvocationTargetException ITe) { | ||
return false; | ||
} catch (Exception e) { | ||
curReader = HiveIOExceptionHandlerUtil.handleRecordReaderCreationException(e, jc); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It appears that Hive already provides a way to recover from exceptions thrown by RR via HiveIOExceptionHandlerUtil#handleRecordReaderCreationException
. Could this be a solution to your issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review. The issue I’m facing is that some users don’t care about data integrity—they just want their jobs to run smoothly and hope to skip files corrupted during transmission.
if (skipCorruptfile) { | ||
try { | ||
newKey = curReader.createKey(); | ||
} catch (NullPointerException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure about handling NullPointerException
at this point. Can we say with certainty that a NullPointerException
always results from a corrupted file? If not, I don't think we could swallow it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review. A NullPointerException always results from a corrupted file.
@Lucas61000, could you please elaborate a bit more why this change is needed, where do you see an exception, and what protobuf files are you mentioning here? |
The stack trace involved have been included in the description. Thank you all. |
@Lucas61000 , please check HIVE-28026 which I fixed for the similar stacktrace in tez apache/tez#334. Can you confirm? |
@Aggarwal-Raghav, does it mean it's already fixed? |
Yes, I think so because the stacktrace is similar and error message is the same. |
What changes were proposed in this pull request?
This PR introduces a new Hive configuration parameter hive.exec.skip.protobuf.corruptfile to control whether Hive skips parsing of protocol buffer files that are corrupted (e.g., due to data truncation or incomplete writes) during processing, allowing tasks to continue with valid files instead of failing.
Why are the changes needed?
https://issues.apache.org/jira/browse/HIVE-29088

Does this PR introduce any user-facing change?
Users can now set hive.exec.skip.protobuf.corruptfile=true to skip corrupted protocol buffer files during parsing, whereas previously such errors would cause task failures.
How was this patch tested?
Local