Skip to content

Commit b5ed4a3

Browse files
committed
fix streaming job statistic never update
1 parent 484731e commit b5ed4a3

File tree

14 files changed

+106
-33
lines changed

14 files changed

+106
-33
lines changed

fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -623,16 +623,18 @@ private void commitTransactionWithoutLock(long dbId, List<Table> tableList, long
623623
} else if (txnCommitAttachment instanceof StreamingTaskTxnCommitAttachment) {
624624
StreamingTaskTxnCommitAttachment streamingTaskTxnCommitAttachment =
625625
(StreamingTaskTxnCommitAttachment) txnCommitAttachment;
626-
TxnStateChangeCallback cb = callbackFactory.getCallback(streamingTaskTxnCommitAttachment.getTaskId());
626+
TxnStateChangeCallback cb = callbackFactory.getCallback(streamingTaskTxnCommitAttachment.getJobId());
627+
TxnCommitAttachment commitAttachment = null;
627628
if (cb != null) {
628629
// use a temporary transaction state to do before commit check,
629630
// what actually works is the transactionId
630631
TransactionState tmpTxnState = new TransactionState();
631632
tmpTxnState.setTransactionId(transactionId);
632633
cb.beforeCommitted(tmpTxnState);
634+
commitAttachment = tmpTxnState.getTxnCommitAttachment();
633635
}
634636
builder.setCommitAttachment(TxnUtil
635-
.streamingTaskTxnCommitAttachmentToPb(streamingTaskTxnCommitAttachment));
637+
.streamingTaskTxnCommitAttachmentToPb((StreamingTaskTxnCommitAttachment) commitAttachment));
636638
} else {
637639
throw new UserException("invalid txnCommitAttachment");
638640
}
@@ -676,6 +678,11 @@ private void executeCommitTxnRequest(CommitTxnRequest commitTxnRequest, long tra
676678
if (txnCommitAttachment != null && txnCommitAttachment instanceof RLTaskTxnCommitAttachment) {
677679
RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = (RLTaskTxnCommitAttachment) txnCommitAttachment;
678680
callbackId = rlTaskTxnCommitAttachment.getJobId();
681+
} else if (txnCommitAttachment != null
682+
&& txnCommitAttachment instanceof StreamingTaskTxnCommitAttachment) {
683+
StreamingTaskTxnCommitAttachment streamingTaskTxnCommitAttachment =
684+
(StreamingTaskTxnCommitAttachment) txnCommitAttachment;
685+
callbackId = streamingTaskTxnCommitAttachment.getJobId();
679686
} else if (txnState != null) {
680687
callbackId = txnState.getCallbackId();
681688
}

fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -373,6 +373,12 @@ public static TransactionState transactionStateFromPb(TxnInfoPB txnInfo) {
373373
commitAttachment =
374374
TxnUtil.rtTaskTxnCommitAttachmentFromPb(txnInfo.getCommitAttachment());
375375
}
376+
377+
if (txnInfo.getCommitAttachment().getType()
378+
== TxnCommitAttachmentPB.Type.STREAMING_TASK_TXN_COMMIT_ATTACHMENT) {
379+
commitAttachment =
380+
TxnUtil.streamingTaskTxnCommitAttachmentFromPb(txnInfo.getCommitAttachment());
381+
}
376382
}
377383

378384
long prepareTime = txnInfo.hasPrepareTime() ? txnInfo.getPrepareTime() : -1;

fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.apache.doris.job.extensions.insert.InsertTask;
4242
import org.apache.doris.job.offset.SourceOffsetProvider;
4343
import org.apache.doris.job.offset.SourceOffsetProviderFactory;
44+
import org.apache.doris.load.loadv2.LoadJob;
4445
import org.apache.doris.load.loadv2.LoadStatistic;
4546
import org.apache.doris.nereids.StatementContext;
4647
import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
@@ -295,8 +296,6 @@ public void onStreamTaskSuccess(StreamingInsertTask task) {
295296
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().removeRunningTask(task);
296297
StreamingInsertTask nextTask = createStreamingInsertTask();
297298
this.runningStreamTask = nextTask;
298-
//todo: maybe fetch from txn attachment?
299-
offsetProvider.updateOffset(task.getRunningOffset());
300299
} finally {
301300
writeUnlock();
302301
}
@@ -417,8 +416,12 @@ public void beforeCommitted(TransactionState txnState) throws TransactionExcepti
417416
taskIds.add(runningStreamTask.getTaskId());
418417
// todo: Check whether the taskid of runningtask is consistent with the taskid associated with txn
419418

420-
// todo: need get loadStatistic, load manager statistic is empty
421-
LoadStatistic loadStatistic = new LoadStatistic();
419+
List<LoadJob> loadJobs = Env.getCurrentEnv().getLoadManager().queryLoadJobsByJobIds(taskIds);
420+
if (loadJobs.size() == 0) {
421+
throw new TransactionException("load job not found, insert job id is " + runningStreamTask.getTaskId());
422+
}
423+
LoadJob loadJob = loadJobs.get(0);
424+
LoadStatistic loadStatistic = loadJob.getLoadStatistic();
422425
txnState.setTxnCommitAttachment(new StreamingTaskTxnCommitAttachment(
423426
getJobId(),
424427
runningStreamTask.getTaskId(),

fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ private void before() throws Exception {
127127

128128
this.runningOffset = offsetProvider.getNextOffset(jobProperties, originTvfProps);
129129
InsertIntoTableCommand baseCommand = (InsertIntoTableCommand) new NereidsParser().parseSingle(sql);
130+
baseCommand.setJobId(getTaskId());
130131
StmtExecutor baseStmtExecutor =
131132
new StmtExecutor(ctx, new LogicalPlanAdapter(baseCommand, ctx.getStatementContext()));
132133
baseCommand.initPlan(ctx, baseStmtExecutor, false);
@@ -135,7 +136,6 @@ private void before() throws Exception {
135136
}
136137
this.taskCommand = offsetProvider.rewriteTvfParams(baseCommand, runningOffset);
137138
this.taskCommand.setLabelName(Optional.of(getJobId() + LABEL_SPLITTER + getTaskId()));
138-
this.taskCommand.setJobId(getTaskId());
139139
this.stmtExecutor = new StmtExecutor(ctx, new LogicalPlanAdapter(taskCommand, ctx.getStatementContext()));
140140
}
141141

fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingTaskTxnCommitAttachment.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,14 @@
2323

2424
import com.google.gson.annotations.SerializedName;
2525
import lombok.Getter;
26+
import lombok.Setter;
2627

2728
public class StreamingTaskTxnCommitAttachment extends TxnCommitAttachment {
2829

30+
public StreamingTaskTxnCommitAttachment() {
31+
super(TransactionState.LoadJobSourceType.STREAMING_JOB);
32+
}
33+
2934
public StreamingTaskTxnCommitAttachment(long jobId, long taskId,
3035
long scannedRows, long loadBytes, long numFiles, long fileBytes, String offset) {
3136
super(TransactionState.LoadJobSourceType.STREAMING_JOB);
@@ -48,6 +53,7 @@ public StreamingTaskTxnCommitAttachment(StreamingTaskCommitAttachmentPB pb) {
4853
}
4954

5055
@Getter
56+
@Setter
5157
private long jobId;
5258
@Getter
5359
private long taskId;

fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,8 +139,10 @@ public InsertIntoTableCommand rewriteTvfParams(InsertIntoTableCommand originComm
139139
}
140140
return plan;
141141
});
142-
return new InsertIntoTableCommand((LogicalPlan) rewritePlan, Optional.empty(), Optional.empty(),
143-
Optional.empty(), true, Optional.empty());
142+
InsertIntoTableCommand insertIntoTableCommand = new InsertIntoTableCommand((LogicalPlan) rewritePlan,
143+
Optional.empty(), Optional.empty(), Optional.empty(), true, Optional.empty());
144+
insertIntoTableCommand.setJobId(originCommand.getJobId());
145+
return insertIntoTableCommand;
144146
}
145147

146148
@Override

fe/fe-core/src/main/java/org/apache/doris/load/loadv2/InsertLoadJob.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,10 @@ public InsertLoadJob(long dbId, String label) {
5757
super(EtlJobType.INSERT, dbId, label);
5858
}
5959

60+
public InsertLoadJob(long dbId, String label, long jobId) {
61+
super(EtlJobType.INSERT, dbId, label, jobId);
62+
}
63+
6064
public InsertLoadJob(String label, long transactionId, long dbId, long tableId,
6165
long createTimestamp, String failMsg, String trackingUrl, String firstErrorMsg,
6266
UserIdentity userInfo) throws MetaNotFoundException {

fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,7 @@ protected void createLoadJob(LoadJob loadJob) {
194194

195195
public void addLoadJob(LoadJob loadJob) {
196196
// Insert label may be null in txn mode, we add txn insert job after success.
197+
LOG.info("add load job: {}, label: {}", loadJob.getId(), loadJob.getLabel());
197198
if (loadJob.getLabel() != null) {
198199
idToLoadJob.put(loadJob.getId(), loadJob);
199200
long dbId = loadJob.getDbId();

fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public abstract class AbstractInsertExecutor {
7171
protected long txnId = INVALID_TXN_ID;
7272

7373
/**
74-
* Constructor
74+
* Randomly generate job ID.
7575
*/
7676
public AbstractInsertExecutor(ConnectContext ctx, TableIf table, String labelName, NereidsPlanner planner,
7777
Optional<InsertCommandContext> insertCtx, boolean emptyInsert) {
@@ -87,6 +87,24 @@ public AbstractInsertExecutor(ConnectContext ctx, TableIf table, String labelNam
8787
this.emptyInsert = emptyInsert;
8888
}
8989

90+
/**
91+
* Specify job ID.
92+
*/
93+
public AbstractInsertExecutor(ConnectContext ctx, TableIf table, String labelName, NereidsPlanner planner,
94+
Optional<InsertCommandContext> insertCtx, boolean emptyInsert, long jobId) {
95+
this.ctx = ctx;
96+
this.database = table.getDatabase();
97+
this.insertLoadJob = new InsertLoadJob(database.getId(), labelName, jobId);
98+
ctx.getEnv().getLoadManager().addLoadJob(insertLoadJob);
99+
this.coordinator = EnvFactory.getInstance().createCoordinator(
100+
ctx, planner, ctx.getStatsErrorEstimator(), insertLoadJob.getId());
101+
this.labelName = labelName;
102+
this.table = table;
103+
this.insertCtx = insertCtx;
104+
this.emptyInsert = emptyInsert;
105+
this.jobId = jobId;
106+
}
107+
90108
public Coordinator getCoordinator() {
91109
return coordinator;
92110
}

fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ public InsertIntoTableCommand(LogicalPlan logicalQuery, Optional<String> labelNa
132132
this.cte = cte;
133133
this.needNormalizePlan = needNormalizePlan;
134134
this.branchName = branchName;
135+
this.jobId = Env.getCurrentEnv().getNextId();
135136
}
136137

137138
/**
@@ -356,7 +357,8 @@ private ExecutorFactory selectInsertExecutorFactory(
356357
planner,
357358
dataSink,
358359
physicalSink,
359-
() -> new OlapTxnInsertExecutor(ctx, olapTable, label, planner, insertCtx, emptyInsert)
360+
() -> new OlapTxnInsertExecutor(
361+
ctx, olapTable, label, planner, insertCtx, emptyInsert, jobId)
360362
);
361363
} else if (ctx.isGroupCommit()) {
362364
Backend groupCommitBackend = Env.getCurrentEnv()
@@ -369,15 +371,15 @@ private ExecutorFactory selectInsertExecutorFactory(
369371
dataSink,
370372
physicalSink,
371373
() -> new OlapGroupCommitInsertExecutor(
372-
ctx, olapTable, label, planner, insertCtx, emptyInsert, groupCommitBackend
374+
ctx, olapTable, label, planner, insertCtx, emptyInsert, groupCommitBackend, jobId
373375
)
374376
);
375377
} else {
376378
executorFactory = ExecutorFactory.from(
377379
planner,
378380
dataSink,
379381
physicalSink,
380-
() -> new OlapInsertExecutor(ctx, olapTable, label, planner, insertCtx, emptyInsert)
382+
() -> new OlapInsertExecutor(ctx, olapTable, label, planner, insertCtx, emptyInsert, jobId)
381383
);
382384
}
383385

0 commit comments

Comments
 (0)