Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to latest cron lib. #17

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 2 additions & 8 deletions cmd/daprd/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type Options struct {
DaprBlockShutdownDuration *time.Duration
ActorsService string
RemindersService string
SchedulerAddress *string
SchedulerAddress string
DaprAPIListenAddresses string
AppHealthProbeInterval int
AppHealthProbeTimeout int
Expand All @@ -79,8 +79,6 @@ type Options struct {
AppChannelAddress string
Logger logger.Options
Metrics *metrics.Options

schedulerAddressFlag string
}

func New(origArgs []string) (*Options, error) {
Expand Down Expand Up @@ -134,7 +132,6 @@ func New(origArgs []string) (*Options, error) {
fs.StringVar(&opts.SentryAddress, "sentry-address", "", "Address for the Sentry CA service")
fs.StringVar(&opts.ControlPlaneTrustDomain, "control-plane-trust-domain", "localhost", "Trust domain of the Dapr control plane")
fs.StringVar(&opts.ControlPlaneNamespace, "control-plane-namespace", "default", "Namespace of the Dapr control plane")
fs.StringVar(&opts.schedulerAddressFlag, "scheduler-host-address", "", "Addresses for Dapr Scheduler servers")
fs.StringVar(&opts.AllowedOrigins, "allowed-origins", cors.DefaultAllowedOrigins, "Allowed HTTP origins")
fs.BoolVar(&opts.EnableProfiling, "enable-profiling", false, "Enable profiling")
fs.BoolVar(&opts.RuntimeVersion, "version", false, "Prints the runtime version")
Expand Down Expand Up @@ -166,6 +163,7 @@ func New(origArgs []string) (*Options, error) {
// --placement-host-address is a legacy (but not deprecated) flag that is translated to the actors-service flag
var placementServiceHostAddr string
fs.StringVar(&placementServiceHostAddr, "placement-host-address", "", "Addresses for Dapr Actor Placement servers (overrides actors-service)")
fs.StringVar(&opts.SchedulerAddress, "scheduler-host-address", "", "Address(es) of the scheduler service instance(s), as comma separated host:port pairs")
fs.StringVar(&opts.ActorsService, "actors-service", "", "Type and address of the actors service, in the format 'type:address'")
fs.StringVar(&opts.RemindersService, "reminders-service", "", "Type and address of the reminders service, in the format 'type:address'")

Expand Down Expand Up @@ -248,10 +246,6 @@ func New(origArgs []string) (*Options, error) {
opts.DaprBlockShutdownDuration = nil
}

if fs.Changed("scheduler-host-address") {
opts.SchedulerAddress = &opts.schedulerAddressFlag
}

return &opts, nil
}

Expand Down
30 changes: 0 additions & 30 deletions dapr/proto/runtime/v1/dapr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -194,12 +194,6 @@ service Dapr {

// Delete a job
rpc DeleteJob(DeleteJobRequest) returns (google.protobuf.Empty) {}

// Get a job
rpc GetJob(GetJobRequest) returns (GetJobResponse) {}

// List all jobs by app
rpc ListJobs(ListJobsRequest) returns (ListJobsResponse) {}
}

// InvokeServiceRequest represents the request message for Service invocation.
Expand Down Expand Up @@ -1153,28 +1147,4 @@ message ScheduleJobRequest {
message DeleteJobRequest {
// The name of the job.
string name = 1;
}

// GetJobRequest is the message to get the job by name.
message GetJobRequest {
// The name of the job.
string name = 1;
}

// GetJobResponse is the response message to convey the job.
message GetJobResponse {
// The job details.
Job job = 1;
}

// ListJobsRequest is the message to list jobs by app_id.
message ListJobsRequest {
// The id of the application (app_id) for which to list jobs.
string app_id = 1;
}

// ListJobsResponse is the response message to convey the list of jobs.
message ListJobsResponse {
// List of jobs that match the request criteria.
repeated Job jobs = 1;
}
21 changes: 1 addition & 20 deletions dapr/proto/scheduler/v1/scheduler.proto
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@ service Scheduler {
rpc ScheduleJob(ScheduleJobRequest) returns (ScheduleJobResponse) {}
// DeleteJob is used by the daprd sidecar to delete a job.
rpc DeleteJob(JobRequest) returns (DeleteJobResponse) {}
// GetJob is used by the daprd sidecar to get details of a job.
rpc GetJob(JobRequest) returns (GetJobResponse) {}
// ListJobs is used by the daprd sidecar to list jobs by app_id.
rpc ListJobs(ListJobsRequest) returns (ListJobsResponse) {}
}

message ConnectHostRequest {
Expand Down Expand Up @@ -67,25 +63,10 @@ message ScheduleJobResponse {
// JobRequest is the message used by the daprd sidecar to delete or get a job.
message JobRequest {
string job_name = 1;
string namespace = 2;
}

message DeleteJobResponse {
// Empty as of now
}

// GetJobResponse is the response message to convey the details of a job.
message GetJobResponse {
runtime.v1.Job job = 1;
}

// ListJobsRequest is the message to list jobs by app_id.
message ListJobsRequest {
// The id of the application (app_id) for which to list jobs.
string app_id = 1;
}
// ListJobsResponse is the response message to convey the list of jobs.
message ListJobsResponse {
// List of jobs that match the request criteria.
repeated runtime.v1.Job jobs = 1;
}

33 changes: 0 additions & 33 deletions dapr/proto/scheduler/v1/scheduler_callback.proto

This file was deleted.

6 changes: 2 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ require (
contrib.go.opencensus.io/exporter/prometheus v0.4.2
github.com/PaesslerAG/jsonpath v0.1.1
github.com/PuerkitoBio/purell v1.2.1
github.com/Scalingo/go-etcd-cron v1.3.2
github.com/alphadose/haxmap v1.3.1
github.com/argoproj/argo-rollouts v1.4.1
github.com/cenkalti/backoff/v4 v4.2.1
github.com/cloudevents/sdk-go/v2 v2.14.0
github.com/dapr/components-contrib v1.13.0-rc.10
github.com/dapr/kit v0.13.0
github.com/diagridio/go-etcd-cron v0.0.0-20240321201514-c0f44b7f5d89
github.com/evanphx/json-patch/v5 v5.8.1
github.com/go-chi/chi/v5 v5.0.11
github.com/go-chi/cors v1.2.1
Expand Down Expand Up @@ -51,6 +51,7 @@ require (
github.com/spiffe/go-spiffe/v2 v2.1.6
github.com/stretchr/testify v1.9.0
github.com/valyala/fasthttp v1.51.0
go.etcd.io/etcd/api/v3 v3.5.12
go.etcd.io/etcd/client/v3 v3.5.12
go.etcd.io/etcd/server/v3 v3.5.11
go.mongodb.org/mongo-driver v1.12.1
Expand Down Expand Up @@ -279,7 +280,6 @@ require (
github.com/http-wasm/http-wasm-host-go v0.5.1 // indirect
github.com/huaweicloud/huaweicloud-sdk-go-obs v3.23.4+incompatible // indirect
github.com/huaweicloud/huaweicloud-sdk-go-v3 v0.1.56 // indirect
github.com/iancoleman/strcase v0.3.0 // indirect
github.com/imdario/mergo v0.3.16 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/influxdata/influxdb-client-go/v2 v2.12.3 // indirect
Expand Down Expand Up @@ -407,7 +407,6 @@ require (
github.com/yusufpapurcu/wmi v1.2.2 // indirect
github.com/zeebo/errs v1.3.0 // indirect
go.etcd.io/bbolt v1.3.8 // indirect
go.etcd.io/etcd/api/v3 v3.5.12 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.12 // indirect
go.etcd.io/etcd/client/v2 v2.305.11 // indirect
go.etcd.io/etcd/pkg/v3 v3.5.11 // indirect
Expand Down Expand Up @@ -489,7 +488,6 @@ replace github.com/microcosm-cc/bluemonday => github.com/microcosm-cc/bluemonday
//
// replace github.com/dapr/components-contrib => ../components-contrib
// replace github.com/dapr/kit => ../kit
replace github.com/Scalingo/go-etcd-cron => github.com/cicoyle/go-etcd-cron v0.0.0-20240212132024-691d2e9fb3f1

//
// Then, run `make modtidy-all` in this repository.
Expand Down
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -363,8 +363,6 @@ github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5P
github.com/chzyer/readline v1.5.0/go.mod h1:x22KAscuvRqlLoK9CsoYsmxoXZMMFVyOl86cAH8qUic=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
github.com/chzyer/test v0.0.0-20210722231415-061457976a23/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
github.com/cicoyle/go-etcd-cron v0.0.0-20240212132024-691d2e9fb3f1 h1:u2JCV1Y2DpQ1cULuNzbz2dYl1onJ5eZBus8DWFgH28I=
github.com/cicoyle/go-etcd-cron v0.0.0-20240212132024-691d2e9fb3f1/go.mod h1:h4rx0m29dWSqSm9Oikw83ldkxyeXKH19aAlj4vagiek=
github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6Dob7S7YxXgwXpfOuvO54S+tGdZdw9fuRZt25Ag=
github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp5jckzBHf4XRpQvBOLI+I=
github.com/clbanning/mxj/v2 v2.5.5/go.mod h1:hNiWqW14h+kc+MdF9C6/YoRfjEJoR3ou6tn/Qo+ve2s=
Expand Down Expand Up @@ -467,6 +465,8 @@ github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cu
github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
github.com/dgryski/trifles v0.0.0-20200323201526-dd97f9abfb48 h1:fRzb/w+pyskVMQ+UbP35JkH8yB7MYb4q/qhBarqZE6g=
github.com/dgryski/trifles v0.0.0-20200323201526-dd97f9abfb48/go.mod h1:if7Fbed8SFyPtHLHbg49SI7NAdJiC5WIA09pe59rfAA=
github.com/diagridio/go-etcd-cron v0.0.0-20240321201514-c0f44b7f5d89 h1:mcF9KZDBpLjzO6XIiV6bBg6R2VxPzf8F5RuXgwBjA08=
github.com/diagridio/go-etcd-cron v0.0.0-20240321201514-c0f44b7f5d89/go.mod h1:tRZ3z7Mr4Rp9fYdxmCB63V7zrwqtUINR6QHriC1bauw=
github.com/didip/tollbooth/v7 v7.0.1 h1:TkT4sBKoQoHQFPf7blQ54iHrZiTDnr8TceU+MulVAog=
github.com/didip/tollbooth/v7 v7.0.1/go.mod h1:VZhDSGl5bDSPj4wPsih3PFa4Uh9Ghv8hgacaTm5PRT4=
github.com/dimfeld/httptreemux v5.0.1+incompatible h1:Qj3gVcDNoOthBAqftuD596rm4wg/adLLz5xh5CmpiCA=
Expand Down Expand Up @@ -967,8 +967,6 @@ github.com/huaweicloud/huaweicloud-sdk-go-v3 v0.1.56 h1:ULzGSSe95hkOdh17NsiPV3lw
github.com/huaweicloud/huaweicloud-sdk-go-v3 v0.1.56/go.mod h1:bsqx6o47Kl4YsniIjPwuoeqiIB5Fc3JbSpB2b3o3WFQ=
github.com/hudl/fargo v1.3.0/go.mod h1:y3CKSmjA+wD2gak7sUSXTAoopbhU08POFhmITJgmKTg=
github.com/iancoleman/strcase v0.0.0-20191112232945-16388991a334/go.mod h1:SK73tn/9oHe+/Y0h39VT4UCxmurVJkR5NA7kMEAOgSE=
github.com/iancoleman/strcase v0.3.0 h1:nTXanmYxhfFAMjZL34Ov6gkzEsSJZ5DbhxWjvSASxEI=
github.com/iancoleman/strcase v0.3.0/go.mod h1:iwCmte+B7n89clKwxIoIXy/HfoL7AsD47ZCWhYzw7ho=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/ianlancetaylor/demangle v0.0.0-20220319035150-800ac71e25c2/go.mod h1:aYm2/VgdVmcIU8iMfdMvDMsRAQjcfZSKFby6HOFvi/w=
Expand Down
29 changes: 22 additions & 7 deletions pkg/actors/actors.go
Original file line number Diff line number Diff line change
Expand Up @@ -1210,21 +1210,19 @@ func (a *actorsRuntime) CreateReminder(ctx context.Context, req *CreateReminderR
return err
}

// TODO: change the 3rd party library to take our format
jobSchedule := "@every " + req.Period
internalScheduleJobReq := &schedulerv1pb.ScheduleJobRequest{
Job: &runtimev1pb.Job{
Name: a.actorsConfig.AppID + "||" + jobName,
Schedule: jobSchedule,
Name: jobName,
Schedule: req.Period,
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the request period is the schedule?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking of sending it as-is and letting the library handle period too. Now, thinking about it, I will put the "@every" back. So, I can extend the format for "@every x"

Data: &anypb.Any{
TypeUrl: "type.googleapis.com/google.protobuf.BytesValue",
Value: data,
Value: data, // TODO: this should go to actorStateStore
},
DueTime: req.DueTime,
Ttl: req.TTL,
},
Namespace: "", // TODO
Metadata: metadata, // TODO: this should generate key if jobStateStore is configured
Namespace: a.actorsConfig.Namespace,
Metadata: metadata,
}

_, err = a.scheduler.ScheduleJob(ctx, internalScheduleJobReq)
Expand Down Expand Up @@ -1258,6 +1256,23 @@ func (a *actorsRuntime) CreateTimer(ctx context.Context, req *CreateTimerRequest
}

func (a *actorsRuntime) DeleteReminder(ctx context.Context, req *DeleteReminderRequest) error {
if a.scheduler != nil {
jobName := constructCompositeKey(
"reminder",
req.ActorType,
req.ActorID,
req.Name,
)

internalDeleteJobReq := &schedulerv1pb.JobRequest{
JobName: jobName,
Namespace: a.actorsConfig.Namespace,
}

_, err := a.scheduler.DeleteJob(ctx, internalDeleteJobReq)
return err
}

if !a.actorsConfig.Config.HostedActorTypes.IsActorTypeHosted(req.ActorType) {
return ErrReminderOpActorNotHosted
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/actors/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type ConfigOpts struct {
AppID string
ActorsService string
RemindersService string
SchedulerService string
SchedulerAddress string
Port int
Namespace string
AppConfig daprAppConfig.ApplicationConfig
Expand All @@ -59,7 +59,7 @@ func NewConfig(opts ConfigOpts) Config {
AppID: opts.AppID,
ActorsService: opts.ActorsService,
RemindersService: opts.RemindersService,
SchedulerService: opts.SchedulerService,
SchedulerService: opts.SchedulerAddress,
Port: opts.Port,
Namespace: opts.Namespace,
DrainRebalancedActors: opts.AppConfig.DrainRebalancedActors,
Expand Down
60 changes: 0 additions & 60 deletions pkg/api/http/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,26 +52,6 @@ func (a *api) constructSchedulerEndpoints() []endpoints.Endpoint {
Name: "DeleteJob",
},
},
{
Methods: []string{http.MethodGet},
Route: "job/{name}",
Version: apiVersionV1,
Group: endpointGroupSchedulerV1Alpha1,
Handler: a.onGetJobHandler(),
Settings: endpoints.EndpointSettings{
Name: "GetJob",
},
},
{
Methods: []string{http.MethodGet},
Route: "jobs/{app_id}",
Version: apiVersionV1,
Group: endpointGroupSchedulerV1Alpha1,
Handler: a.onListJobsHandler(),
Settings: endpoints.EndpointSettings{
Name: "ListJobs",
},
},
}
}

Expand Down Expand Up @@ -122,43 +102,3 @@ func (a *api) onDeleteJobHandler() http.HandlerFunc {
},
)
}

func (a *api) onGetJobHandler() http.HandlerFunc {
return UniversalHTTPHandler(
a.universal.GetJob,
UniversalHTTPHandlerOpts[*runtimev1pb.GetJobRequest, *runtimev1pb.GetJobResponse]{
SkipInputBody: true,
InModifier: func(r *http.Request, in *runtimev1pb.GetJobRequest) (*runtimev1pb.GetJobRequest, error) {
name := chi.URLParam(r, "name")
in.Name = name
return in, nil
},
OutModifier: func(out *runtimev1pb.GetJobResponse) (any, error) {
if out == nil || out.GetJob() == nil {
return nil, nil // empty body
}
return out.GetJob(), nil // empty body
},
},
)
}

func (a *api) onListJobsHandler() http.HandlerFunc {
return UniversalHTTPHandler(
a.universal.ListJobs,
UniversalHTTPHandlerOpts[*runtimev1pb.ListJobsRequest, *runtimev1pb.ListJobsResponse]{
SkipInputBody: true,
InModifier: func(r *http.Request, in *runtimev1pb.ListJobsRequest) (*runtimev1pb.ListJobsRequest, error) {
appID := chi.URLParam(r, "app_id")
in.AppId = appID
return in, nil
},
OutModifier: func(out *runtimev1pb.ListJobsResponse) (any, error) {
if out == nil || out.GetJobs() == nil {
return nil, nil // empty body
}
return out.GetJobs(), nil // empty body
},
},
)
}
Loading