Skip to content

支持多个字符作为列的分隔符 #68

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: release/0.48.x
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
public class TextExtractor extends Extractor {

private InputStreamSet inputs;
private char delimiterChar;
private String delimiter;
private char linebreakChar;
private DataAttributes attributes;
private Reader currentReader;
Expand Down Expand Up @@ -64,14 +64,10 @@ public void setup(ExecutionContext ctx, InputStreamSet inputs, DataAttributes at
this.ctx = ctx;
// check if "delimiter" attribute is supplied via SQL query
String columnDelimiter = this.attributes.getValueByKey("delimiter");
if ( columnDelimiter != null) {
if (columnDelimiter.length() == 1){
this.delimiterChar = columnDelimiter.charAt(0);
} else{
throw new IllegalArgumentException("column delimiter cannot be more than one character, sees: " + columnDelimiter);
}
if (columnDelimiter != null) {
this.delimiter = columnDelimiter;
} else {
this.delimiterChar = ',';
this.delimiter = ",";
}
String lineTerminator = attributes.getValueByKey("line.terminator");
if (lineTerminator != null && !lineTerminator.isEmpty()) {
Expand Down Expand Up @@ -106,9 +102,9 @@ public void setup(ExecutionContext ctx, InputStreamSet inputs, DataAttributes at
}

System.out.println(
org.apache.commons.lang.StringEscapeUtils.escapeJava(("TextExtractor set up with delimiter [" + this.delimiterChar + "], " +
" line terminator [" + linebreakChar + "], with complex text flag set to "
+ this.complexText + " and reading gzip file set to " + this.isGzip)));
org.apache.commons.lang.StringEscapeUtils.escapeJava(("TextExtractor set up with delimiter [" + this.delimiter + "], " +
" line terminator [" + linebreakChar + "], with complex text flag set to "
+ this.complexText + " and reading gzip file set to " + this.isGzip)));
// note: more properties can be inited from attributes if needed
this.outputColumns = this.attributes.getRecordColumns();
this.outputTypes = new OdpsType[this.outputColumns.length];
Expand Down Expand Up @@ -248,7 +244,10 @@ public String[] parseLine(Reader r) throws IOException{
StringBuffer curPart = new StringBuffer();
boolean hasQuotes = false;
boolean quoteStarted = false;
StringBuilder readBuffer = new StringBuilder();
int delimiterIndex = 0;
while (ch >= 0) {
readBuffer.append((char) ch);
if (hasQuotes) {
quoteStarted = true;
if (ch == '\"') {
Expand All @@ -266,10 +265,12 @@ public String[] parseLine(Reader r) throws IOException{
curPart.append('\"');
}
}
else if (ch == this.delimiterChar && !quoteStarted) {
else if (readBuffer.toString().endsWith(this.delimiter) && !quoteStarted) {
readBuffer.setLength(readBuffer.length() - this.delimiter.length());
setLinePart(colIndx++, curPart.toString());
curPart = new StringBuffer();
quoteStarted = false;
readBuffer.setLength(0);
}
else if (ch == '\r' && ignoreLineFeed) {
//ignore LF characters
Expand Down Expand Up @@ -315,9 +316,9 @@ private void handleMismatchLine(int colIndx) {
ctx.getCounter("text.parse", "schema.oversize").increment(1);
}
String errorMsg = "SCHEMA MISMATCH: External Table schema specified a total of [" +
this.fullSchemaColumns.length + "] columns, but current text line parsed into ["
+ colIndx + "] columns delimited by [" + this.delimiterChar + "]. Current line is read as: "
+ StringUtils.join(this.lineParts, this.delimiterChar);
this.fullSchemaColumns.length + "] columns, but current text line parsed into ["
+ colIndx + "] columns delimited by [" + this.delimiter + "]. Current line is read as: "
+ StringUtils.join(this.lineParts, this.delimiter);
errorMsg = StringEscapeUtils.escapeJava(errorMsg);
if (strict) {
throw new RuntimeException(errorMsg);
Expand All @@ -343,14 +344,14 @@ private String[] readNextLine() throws IOException {
}
while (currentReader != null) {
if (this.complexText){
String[] parts = parseLine(currentReader);
if (parts != null) {
String[] parts = parseLine(currentReader);
if (parts != null) {
return parts;
}
} else {
String line = ((BufferedReader)currentReader).readLine();
if (line != null) {
return StringUtils.splitPreserveAllTokens(line, this.delimiterChar);
return splitPreserveAllTokens(line, this.delimiter);
}
}
currentReader = moveToNextStream();
Expand Down Expand Up @@ -388,4 +389,17 @@ private Reader moveToNextStream() throws IOException {
}
}

private String[] splitPreserveAllTokens(String str, String delimiter) {
ArrayList<String> parts = new ArrayList<>();
int pos = 0;
int end = str.indexOf(delimiter);
while (end != -1) {
parts.add(str.substring(pos, end));
pos = end + delimiter.length();
end = str.indexOf(delimiter, pos);
}
parts.add(str.substring(pos));
return parts.toArray(new String[0]);
}

}