Skip to content

Commit 37131ef

Browse files
sollhuidataroaring
authored andcommitted
fix streaming job statistic never update
1 parent aca98e1 commit 37131ef

File tree

8 files changed

+54
-16
lines changed

8 files changed

+54
-16
lines changed

fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.apache.doris.job.extensions.insert.InsertTask;
4242
import org.apache.doris.job.offset.SourceOffsetProvider;
4343
import org.apache.doris.job.offset.SourceOffsetProviderFactory;
44+
import org.apache.doris.load.loadv2.LoadJob;
4445
import org.apache.doris.load.loadv2.LoadStatistic;
4546
import org.apache.doris.nereids.StatementContext;
4647
import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
@@ -417,8 +418,12 @@ public void beforeCommitted(TransactionState txnState) throws TransactionExcepti
417418
taskIds.add(runningStreamTask.getTaskId());
418419
// todo: Check whether the taskid of runningtask is consistent with the taskid associated with txn
419420

420-
// todo: need get loadStatistic, load manager statistic is empty
421-
LoadStatistic loadStatistic = new LoadStatistic();
421+
List<LoadJob> loadJobs = Env.getCurrentEnv().getLoadManager().queryLoadJobsByJobIds(taskIds);
422+
if (loadJobs.size() == 0) {
423+
throw new TransactionException("load job not found, insert job id is " + runningStreamTask.getTaskId());
424+
}
425+
LoadJob loadJob = loadJobs.get(0);
426+
LoadStatistic loadStatistic = loadJob.getLoadStatistic();
422427
txnState.setTxnCommitAttachment(new StreamingTaskTxnCommitAttachment(
423428
getJobId(),
424429
runningStreamTask.getTaskId(),

fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ private void before() throws Exception {
127127

128128
this.runningOffset = offsetProvider.getNextOffset(jobProperties, originTvfProps);
129129
InsertIntoTableCommand baseCommand = (InsertIntoTableCommand) new NereidsParser().parseSingle(sql);
130+
baseCommand.setJobId(getTaskId());
130131
StmtExecutor baseStmtExecutor =
131132
new StmtExecutor(ctx, new LogicalPlanAdapter(baseCommand, ctx.getStatementContext()));
132133
baseCommand.initPlan(ctx, baseStmtExecutor, false);
@@ -135,7 +136,6 @@ private void before() throws Exception {
135136
}
136137
this.taskCommand = offsetProvider.rewriteTvfParams(baseCommand, runningOffset);
137138
this.taskCommand.setLabelName(Optional.of(getJobId() + LABEL_SPLITTER + getTaskId()));
138-
this.taskCommand.setJobId(getTaskId());
139139
this.stmtExecutor = new StmtExecutor(ctx, new LogicalPlanAdapter(taskCommand, ctx.getStatementContext()));
140140
}
141141

fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,8 +139,10 @@ public InsertIntoTableCommand rewriteTvfParams(InsertIntoTableCommand originComm
139139
}
140140
return plan;
141141
});
142-
return new InsertIntoTableCommand((LogicalPlan) rewritePlan, Optional.empty(), Optional.empty(),
143-
Optional.empty(), true, Optional.empty());
142+
InsertIntoTableCommand insertIntoTableCommand = new InsertIntoTableCommand((LogicalPlan) rewritePlan,
143+
Optional.empty(), Optional.empty(), Optional.empty(), true, Optional.empty());
144+
insertIntoTableCommand.setJobId(originCommand.getJobId());
145+
return insertIntoTableCommand;
144146
}
145147

146148
@Override

fe/fe-core/src/main/java/org/apache/doris/load/loadv2/InsertLoadJob.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,10 @@ public InsertLoadJob(long dbId, String label) {
5757
super(EtlJobType.INSERT, dbId, label);
5858
}
5959

60+
public InsertLoadJob(long dbId, String label, long jobId) {
61+
super(EtlJobType.INSERT, dbId, label, jobId);
62+
}
63+
6064
public InsertLoadJob(String label, long transactionId, long dbId, long tableId,
6165
long createTimestamp, String failMsg, String trackingUrl, String firstErrorMsg,
6266
UserIdentity userInfo) throws MetaNotFoundException {

fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public abstract class AbstractInsertExecutor {
7171
protected long txnId = INVALID_TXN_ID;
7272

7373
/**
74-
* Constructor
74+
* Randomly generate job ID.
7575
*/
7676
public AbstractInsertExecutor(ConnectContext ctx, TableIf table, String labelName, NereidsPlanner planner,
7777
Optional<InsertCommandContext> insertCtx, boolean emptyInsert) {
@@ -87,6 +87,24 @@ public AbstractInsertExecutor(ConnectContext ctx, TableIf table, String labelNam
8787
this.emptyInsert = emptyInsert;
8888
}
8989

90+
/**
91+
* Specify job ID.
92+
*/
93+
public AbstractInsertExecutor(ConnectContext ctx, TableIf table, String labelName, NereidsPlanner planner,
94+
Optional<InsertCommandContext> insertCtx, boolean emptyInsert, long jobId) {
95+
this.ctx = ctx;
96+
this.database = table.getDatabase();
97+
this.insertLoadJob = new InsertLoadJob(database.getId(), labelName, jobId);
98+
ctx.getEnv().getLoadManager().addLoadJob(insertLoadJob);
99+
this.coordinator = EnvFactory.getInstance().createCoordinator(
100+
ctx, planner, ctx.getStatsErrorEstimator(), insertLoadJob.getId());
101+
this.labelName = labelName;
102+
this.table = table;
103+
this.insertCtx = insertCtx;
104+
this.emptyInsert = emptyInsert;
105+
this.jobId = jobId;
106+
}
107+
90108
public Coordinator getCoordinator() {
91109
return coordinator;
92110
}

fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -377,7 +377,7 @@ private ExecutorFactory selectInsertExecutorFactory(
377377
planner,
378378
dataSink,
379379
physicalSink,
380-
() -> new OlapInsertExecutor(ctx, olapTable, label, planner, insertCtx, emptyInsert)
380+
() -> new OlapInsertExecutor(ctx, olapTable, label, planner, insertCtx, emptyInsert, jobId)
381381
);
382382
}
383383

fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,13 @@ public OlapInsertExecutor(ConnectContext ctx, Table table,
8989
this.olapTable = (OlapTable) table;
9090
}
9191

92+
public OlapInsertExecutor(ConnectContext ctx, Table table,
93+
String labelName, NereidsPlanner planner, Optional<InsertCommandContext> insertCtx, boolean emptyInsert,
94+
long jobId) {
95+
super(ctx, table, labelName, planner, insertCtx, emptyInsert, jobId);
96+
this.olapTable = (OlapTable) table;
97+
}
98+
9299
@Override
93100
public void beginTransaction() {
94101
if (isGroupCommitHttpStream()) {

regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -65,22 +65,22 @@ suite("test_streaming_insert_job") {
6565
Awaitility.await().atMost(300, SECONDS)
6666
.pollInterval(1, SECONDS).until(
6767
{
68-
print("check success task count")
6968
def jobSuccendCount = sql """ select SucceedTaskCount from jobs("type"="insert") where Name like '%${jobName}%' and ExecuteType='STREAMING' """
69+
log.info("jobSuccendCount: " + jobSuccendCount)
7070
// check job status and succeed task count larger than 2
7171
jobSuccendCount.size() == 1 && '2' <= jobSuccendCount.get(0).get(0)
7272
}
7373
)
7474
} catch (Exception ex){
7575
def showjob = sql """select * from jobs("type"="insert") where Name='${jobName}'"""
7676
def showtask = sql """select * from tasks("type"="insert") where JobName='${jobName}'"""
77-
println("show job: " + showjob)
78-
println("show task: " + showtask)
77+
log.info("show job: " + showjob)
78+
log.info("show task: " + showtask)
7979
throw ex;
8080
}
8181

8282
def jobResult = sql """select * from jobs("type"="insert") where Name='${jobName}'"""
83-
println("show success job: " + jobResult)
83+
log.info("show success job: " + jobResult)
8484

8585
qt_select """ SELECT * FROM ${tableName} order by c1 """
8686

@@ -96,12 +96,14 @@ suite("test_streaming_insert_job") {
9696
assert pauseShowTask.size() == 0
9797

9898

99-
def jobOffset = sql """
100-
select currentOffset, endoffset from jobs("type"="insert") where Name='${jobName}'
99+
def jobInfo = sql """
100+
select currentOffset, endoffset, loadStatistic from jobs("type"="insert") where Name='${jobName}'
101101
"""
102-
assert jobOffset.get(0).get(0) == "{\"endFile\":\"regression/load/data/example_1.csv\"}";
103-
assert jobOffset.get(0).get(1) == "{\"endFile\":\"regression/load/data/example_1.csv\"}";
104-
102+
log.info("jobInfo: " + jobInfo)
103+
assert jobInfo.get(0).get(0) == "{\"endFile\":\"regression/load/data/example_1.csv\"}";
104+
assert jobInfo.get(0).get(1) == "{\"endFile\":\"regression/load/data/example_1.csv\"}";
105+
assert jobInfo.get(0).get(2) == "{\"scannedRows\":20,\"loadBytes\":425,\"fileNumber\":0,\"fileSize\":0}"
106+
105107
// alter streaming job
106108
sql """
107109
ALTER JOB ${jobName}

0 commit comments

Comments
 (0)