Skip to content

Commit dfb725f

Browse files
authored
Merge branch 'master' into master
2 parents 2dfe61b + 1028750 commit dfb725f

File tree

3 files changed

+22
-15
lines changed

3 files changed

+22
-15
lines changed

chunjun-core/src/main/java/com/dtstack/chunjun/source/DtInputFormatSourceFunction.java

+17-5
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818

1919
package com.dtstack.chunjun.source;
2020

21+
import com.dtstack.chunjun.constants.Metrics;
22+
import com.dtstack.chunjun.dirty.manager.DirtyManager;
23+
import com.dtstack.chunjun.metrics.AccumulatorCollector;
2124
import com.dtstack.chunjun.restore.FormatState;
2225
import com.dtstack.chunjun.source.format.BaseRichInputFormat;
2326
import com.dtstack.chunjun.util.ExceptionUtil;
@@ -112,19 +115,28 @@ public void run(SourceContext<OUT> ctx) throws Exception {
112115
if (isRunning && format instanceof RichInputFormat) {
113116
((RichInputFormat) format).openInputFormat();
114117
}
115-
116118
OUT nextElement = serializer.createInstance();
117119
while (isRunning) {
118120
format.open(splitIterator.next());
119-
121+
AccumulatorCollector accumulatorCollector =
122+
((BaseRichInputFormat) format).getAccumulatorCollector();
123+
DirtyManager dirtyManager = ((BaseRichInputFormat) format).getDirtyManager();
120124
// for each element we also check if cancel
121125
// was called by checking the isRunning flag
122126

123127
while (isRunning && !format.reachedEnd()) {
124128
synchronized (ctx.getCheckpointLock()) {
125-
nextElement = format.nextRecord(nextElement);
126-
if (nextElement != null) {
127-
ctx.collect(nextElement);
129+
try {
130+
nextElement = format.nextRecord(nextElement);
131+
if (nextElement != null) {
132+
ctx.collect(nextElement);
133+
}
134+
} catch (Exception e) {
135+
// 脏数据总数应是所有slot的脏数据总数,而不是单个的
136+
long globalErrors =
137+
accumulatorCollector.getAccumulatorValue(
138+
Metrics.NUM_ERRORS, false);
139+
dirtyManager.collect(nextElement, e, null, globalErrors);
128140
}
129141
}
130142
}

chunjun-core/src/main/java/com/dtstack/chunjun/source/format/BaseRichInputFormat.java

+2-9
Original file line numberDiff line numberDiff line change
@@ -190,18 +190,11 @@ public void openInputFormat() throws IOException {
190190
}
191191

192192
@Override
193-
public RowData nextRecord(RowData rowData) {
193+
public RowData nextRecord(RowData rowData) throws ReadRecordException {
194194
if (byteRateLimiter != null) {
195195
byteRateLimiter.acquire();
196196
}
197-
RowData internalRow = null;
198-
try {
199-
internalRow = nextRecordInternal(rowData);
200-
} catch (ReadRecordException e) {
201-
// 脏数据总数应是所有slot的脏数据总数,而不是单个的
202-
long globalErrors = accumulatorCollector.getAccumulatorValue(Metrics.NUM_ERRORS, false);
203-
dirtyManager.collect(e.getRowData(), e, null, globalErrors);
204-
}
197+
RowData internalRow = nextRecordInternal(rowData);
205198
if (internalRow != null) {
206199
updateDuration();
207200
if (numReadCounter != null) {

chunjun-core/src/main/java/com/dtstack/chunjun/throwable/ReadRecordException.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@
1818

1919
package com.dtstack.chunjun.throwable;
2020

21+
import java.io.IOException;
22+
2123
/** The Exception describing errors when read a record */
22-
public class ReadRecordException extends Exception {
24+
public class ReadRecordException extends IOException {
2325

2426
private static final long serialVersionUID = 453087894656079820L;
2527
private final int colIndex;

0 commit comments

Comments
 (0)