Skip to content

Commit

Permalink
feat: Flink savepoint
Browse files Browse the repository at this point in the history
  • Loading branch information
itinycheng committed Jul 30, 2024
1 parent cd34556 commit e5a93f3
Show file tree
Hide file tree
Showing 15 changed files with 225 additions and 49 deletions.
4 changes: 4 additions & 0 deletions flink-platform-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-text</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,19 @@
package com.flink.platform.common.exception;

import com.flink.platform.common.enums.ResponseStatus;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;

/** exception. */
@Data
@EqualsAndHashCode(callSuper = true)
@AllArgsConstructor
@NoArgsConstructor
public class DefinitionException extends RuntimeException {

protected int code;
protected String msg;
private ResponseStatus status;

public DefinitionException(ResponseStatus responseStatus) {
this.code = responseStatus.getCode();
this.msg = responseStatus.getDesc();
this.status = responseStatus;
}
}
14 changes: 13 additions & 1 deletion flink-platform-grpc/src/main/proto/job.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ syntax = "proto3";

option java_multiple_files = true;
option java_package = "com.flink.platform.grpc";
option java_outer_classname = "HelloWorldProto";
option java_outer_classname = "JobProto";

import public "google/protobuf/timestamp.proto";
import public "google/protobuf/empty.proto";
Expand All @@ -20,6 +20,10 @@ service JobGrpcService {
// kill job.
rpc KillJob (KillJobRequest) returns (KillJobReply) {
}

// savepoint job.
rpc SavepointJob (SavepointRequest) returns (SavepointReply) {
}
}

message ProcessJobRequest {
Expand Down Expand Up @@ -50,3 +54,11 @@ message KillJobRequest {
message KillJobReply {
int64 jobRunId = 1;
}

message SavepointRequest {
int64 jobRunId = 1;
}

message SavepointReply {
int64 jobRunId = 1;
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.flink.platform.web.common;

import lombok.Getter;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
Expand All @@ -13,6 +14,7 @@
public class SpringContext implements ApplicationContextAware {

/** Spring application context. */
@Getter
private static ApplicationContext applicationContext;

/** set application context. */
Expand All @@ -21,11 +23,6 @@ public void setApplicationContext(ApplicationContext applicationContext) {
SpringContext.applicationContext = applicationContext;
}

/** return ApplicationContext. */
public static ApplicationContext getApplicationContext() {
return applicationContext;
}

/** get bean from applicationContext. */
public static Object getBean(String name) throws BeansException {
return applicationContext.getBean(name);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.flink.platform.web.config;

import com.flink.platform.common.exception.DefinitionException;
import com.flink.platform.common.exception.UncaughtException;
import com.flink.platform.web.entity.response.ResultInfo;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -22,6 +23,10 @@ public ResultInfo<Object> exceptionHandler(Exception e) {
throw (UncaughtException) e;
}

if (e instanceof DefinitionException exception) {
return failure(exception.getStatus());
}

log.error("Exception: ", e);
return failure(SERVICE_ERROR);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import org.springframework.context.annotation.Configuration;

/** Create flink env config instance. */
@Configuration
@Setter
@Getter
public class FlinkConfig {
Expand All @@ -22,27 +21,31 @@ public class FlinkConfig {

private String libDirs;

@Bean("flink112")
@ConfigurationProperties(prefix = "flink.sql112")
public FlinkConfig createFlinkConfig112() {
return new FlinkConfig();
}

@Bean("flink113")
@ConfigurationProperties(prefix = "flink.sql113")
public FlinkConfig createFlinkConfig113() {
return new FlinkConfig();
}

@Bean("flink115")
@ConfigurationProperties(prefix = "flink.sql115")
public FlinkConfig createFlinkConfig115() {
return new FlinkConfig();
}

@Bean("flink117")
@ConfigurationProperties(prefix = "flink.sql117")
public FlinkConfig createFlinkConfig117() {
return new FlinkConfig();
@Configuration
public static class FlinkConfigLoader {

@Bean("flink112")
@ConfigurationProperties(prefix = "flink.sql112")
public FlinkConfig createFlinkConfig112() {
return new FlinkConfig();
}

@Bean("flink113")
@ConfigurationProperties(prefix = "flink.sql113")
public FlinkConfig createFlinkConfig113() {
return new FlinkConfig();
}

@Bean("flink115")
@ConfigurationProperties(prefix = "flink.sql115")
public FlinkConfig createFlinkConfig115() {
return new FlinkConfig();
}

@Bean("flink117")
@ConfigurationProperties(prefix = "flink.sql117")
public FlinkConfig createFlinkConfig117() {
return new FlinkConfig();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package com.flink.platform.web.controller.extension;

import com.flink.platform.dao.entity.JobRunInfo;
import com.flink.platform.dao.entity.result.JobCallback;
import com.flink.platform.dao.service.JobRunInfoService;
import com.flink.platform.grpc.JobGrpcServiceGrpc.JobGrpcServiceBlockingStub;
import com.flink.platform.grpc.SavepointReply;
import com.flink.platform.grpc.SavepointRequest;
import com.flink.platform.web.entity.response.ResultInfo;
import com.flink.platform.web.grpc.JobGrpcClient;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import static com.flink.platform.common.constants.Constant.FLINK;
import static com.flink.platform.common.enums.ExecutionStatus.RUNNING;
import static com.flink.platform.common.enums.ExecutionStatus.SUCCESS;
import static com.flink.platform.common.enums.ResponseStatus.OPERATION_NOT_ALLOWED;
import static com.flink.platform.web.entity.response.ResultInfo.failure;
import static com.flink.platform.web.entity.response.ResultInfo.success;

/**
* flink job controller.
*/
@RestController
@RequestMapping("/flink")
public class FlinkJobController {

@Autowired
private JobRunInfoService jobRunService;

@Autowired
private JobGrpcClient jobGrpcClient;

@GetMapping(value = "/savepoint/{jobRunId}")
public ResultInfo<Long> savepoint(@PathVariable Long jobRunId) {
JobRunInfo jobRun = jobRunService.getById(jobRunId);
if (!FLINK.equals(jobRun.getType().getClassification())) {
return failure(OPERATION_NOT_ALLOWED, "Only flink job can be savepoint");
}

if (!RUNNING.equals(jobRun.getStatus()) && !SUCCESS.equals(jobRun.getStatus())) {
return failure(OPERATION_NOT_ALLOWED, "Job is not running");
}

JobCallback callback = jobRun.getBackInfo();
if (callback == null || StringUtils.isEmpty(callback.getJobId()) || StringUtils.isEmpty(callback.getAppId())) {
return failure(OPERATION_NOT_ALLOWED, "AppId or JobId not found");
}

JobGrpcServiceBlockingStub jobGrpcService = jobGrpcClient.grpcClient(jobRun.getHost());
SavepointRequest request =
SavepointRequest.newBuilder().setJobRunId(jobRunId).build();
SavepointReply savepoint = jobGrpcService.savepointJob(request);
return success(savepoint.getJobRunId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ public static <T> ResultInfo<T> success(T result) {
/** 自定义异常返回的结果. */
public static <T> ResultInfo<T> defineError(DefinitionException de) {
ResultInfo<T> result = new ResultInfo<>();
result.setCode(de.getCode());
result.setDesc(de.getMsg());
ResponseStatus status = de.getStatus();
result.setCode(status.getCode());
result.setDesc(status.getDesc());
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
/** Job processing grpc client. */
@Slf4j
@Service
public class JobProcessGrpcClient {
public class JobGrpcClient {

@Autowired
private WorkerService workerService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
import com.flink.platform.grpc.KillJobRequest;
import com.flink.platform.grpc.ProcessJobReply;
import com.flink.platform.grpc.ProcessJobRequest;
import com.flink.platform.grpc.SavepointReply;
import com.flink.platform.grpc.SavepointRequest;
import com.flink.platform.web.service.FlinkJobService;
import com.flink.platform.web.service.KillJobService;
import com.flink.platform.web.service.ProcessJobService;
import com.flink.platform.web.service.ProcessJobStatusService;
Expand All @@ -21,7 +24,7 @@
/** Job process grpc service. */
@Slf4j
@GrpcService
public class JobProcessGrpcServer extends JobGrpcServiceGrpc.JobGrpcServiceImplBase {
public class JobGrpcServer extends JobGrpcServiceGrpc.JobGrpcServiceImplBase {

@Autowired
private ProcessJobService processJobService;
Expand All @@ -32,6 +35,9 @@ public class JobProcessGrpcServer extends JobGrpcServiceGrpc.JobGrpcServiceImplB
@Autowired
private KillJobService killJobService;

@Autowired
private FlinkJobService flinkJobService;

@Override
public void processJob(ProcessJobRequest request, StreamObserver<ProcessJobReply> responseObserver) {
try {
Expand Down Expand Up @@ -73,6 +79,21 @@ public void killJob(KillJobRequest request, StreamObserver<KillJobReply> respons
responseObserver.onCompleted();
}

@Override
public void savepointJob(SavepointRequest request, StreamObserver<SavepointReply> responseObserver) {
try {
flinkJobService.savepoint(request.getJobRunId());
SavepointReply reply = SavepointReply.newBuilder()
.setJobRunId(request.getJobRunId())
.build();
responseObserver.onNext(reply);
} catch (Exception e) {
log.error("flink job savepoint via grpc failed", e);
responseObserver.onError(buildGrpcException(e));
}
responseObserver.onCompleted();
}

private Exception buildGrpcException(Exception e) {
Status status = e instanceof UnrecoverableException ? Status.UNAVAILABLE : Status.INTERNAL;
return status.withCause(e).withDescription(ExceptionUtil.stackTrace(e)).asRuntimeException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import com.flink.platform.web.common.SpringContext;
import com.flink.platform.web.config.AppRunner;
import com.flink.platform.web.config.WorkerConfig;
import com.flink.platform.web.grpc.JobProcessGrpcClient;
import com.flink.platform.web.grpc.JobGrpcClient;
import com.flink.platform.web.monitor.StatusInfo;
import com.flink.platform.web.service.JobRunExtraService;
import com.flink.platform.web.util.ThreadUtil;
Expand Down Expand Up @@ -69,7 +69,7 @@ public class JobExecuteThread implements Supplier<JobResponse> {

private final JobFlowRunService jobFlowRunService;

private final JobProcessGrpcClient jobProcessGrpcClient;
private final JobGrpcClient jobGrpcClient;

private Long jobRunId;

Expand All @@ -84,7 +84,7 @@ public JobExecuteThread(Long flowRunId, JobVertex jobVertex, WorkerConfig worker
this.jobRunInfoService = SpringContext.getBean(JobRunInfoService.class);
this.jobRunExtraService = SpringContext.getBean(JobRunExtraService.class);
this.jobFlowRunService = SpringContext.getBean(JobFlowRunService.class);
this.jobProcessGrpcClient = SpringContext.getBean(JobProcessGrpcClient.class);
this.jobGrpcClient = SpringContext.getBean(JobGrpcClient.class);
}

@Override
Expand Down Expand Up @@ -169,7 +169,7 @@ public void callOnce() {
jobRunStatus = jobRun.getStatus();

// Get a grpc client.
JobGrpcServiceBlockingStub stub = jobProcessGrpcClient.grpcClient(jobRun.getHost());
JobGrpcServiceBlockingStub stub = jobGrpcClient.grpcClient(jobRun.getHost());

// Process job.
if (CREATED.equals(jobRunStatus)) {
Expand Down
Loading

0 comments on commit e5a93f3

Please sign in to comment.