Skip to content

Commit

Permalink
Implement DeleteReminder
Browse files Browse the repository at this point in the history
Signed-off-by: Artur Souza <[email protected]>
  • Loading branch information
artursouza committed Mar 21, 2024
1 parent d175a7a commit 400d7fb
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 34 deletions.
1 change: 1 addition & 0 deletions dapr/proto/scheduler/v1/scheduler.proto
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ message ScheduleJobResponse {
// JobRequest is the message used by the daprd sidecar to delete or get a job.
message JobRequest {
string job_name = 1;
string namespace = 2;
}

message DeleteJobResponse {
Expand Down
17 changes: 17 additions & 0 deletions pkg/actors/actors.go
Original file line number Diff line number Diff line change
Expand Up @@ -1256,6 +1256,23 @@ func (a *actorsRuntime) CreateTimer(ctx context.Context, req *CreateTimerRequest
}

func (a *actorsRuntime) DeleteReminder(ctx context.Context, req *DeleteReminderRequest) error {
if a.scheduler != nil {
jobName := constructCompositeKey(
"reminder",
req.ActorType,
req.ActorID,
req.Name,
)

internalDeleteJobReq := &schedulerv1pb.JobRequest{
JobName: jobName,
Namespace: a.actorsConfig.Namespace,
}

_, err := a.scheduler.DeleteJob(ctx, internalDeleteJobReq)
return err
}

if !a.actorsConfig.Config.HostedActorTypes.IsActorTypeHosted(req.ActorType) {
return ErrReminderOpActorNotHosted
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/api/universal/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (a *Universal) ScheduleJob(ctx context.Context, inReq *runtimev1pb.Schedule
DueTime: inReq.GetJob().GetDueTime(),
Ttl: inReq.GetJob().GetTtl(),
},
Namespace: "", // TODO
Namespace: a.globalConfig.Namespace,
Metadata: nil, // TODO: this should generate key if jobStateStore is configured
}

Expand All @@ -79,7 +79,8 @@ func (a *Universal) DeleteJob(ctx context.Context, inReq *runtimev1pb.DeleteJobR

jobName := a.AppID() + "||" + inReq.GetName()
internalDeleteJobReq := &schedulerv1pb.JobRequest{
JobName: jobName,
JobName: jobName,
Namespace: a.globalConfig.Namespace,
}

_, err := a.schedulerClient.DeleteJob(ctx, internalDeleteJobReq)
Expand Down
66 changes: 38 additions & 28 deletions pkg/proto/scheduler/v1/scheduler.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 3 additions & 4 deletions pkg/scheduler/server/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ func (s *Server) ConnectHost(context.Context, *schedulerv1pb.ConnectHostRequest)
return nil, fmt.Errorf("not implemented")
}

// ScheduleJob is a placeholder method that needs to be implemented
func (s *Server) ScheduleJob(ctx context.Context, req *schedulerv1pb.ScheduleJobRequest) (*schedulerv1pb.ScheduleJobResponse, error) {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -101,17 +100,17 @@ func (s *Server) triggerJob(ctx context.Context, metadata map[string]string, pay
return etcdcron.Failure, nil
}

// DeleteJob is a placeholder method that needs to be implemented
func (s *Server) DeleteJob(ctx context.Context, req *schedulerv1pb.JobRequest) (*schedulerv1pb.DeleteJobResponse, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-s.readyCh:
}

err := s.cron.DeleteJob(ctx, req.GetJobName())
jobName := composeJobName(req.GetNamespace(), req.GetJobName())
err := s.cron.DeleteJob(ctx, jobName)
if err != nil {
log.Errorf("error deleting job %s: %s", req.GetJobName(), err)
log.Errorf("error deleting job %s: %s", jobName, err)
return nil, err
}

Expand Down

0 comments on commit 400d7fb

Please sign in to comment.