Skip to content

Commit

Permalink
Update to latest cron lib.
Browse files Browse the repository at this point in the history
Signed-off-by: Artur Souza <[email protected]>
  • Loading branch information
artursouza committed Mar 21, 2024
1 parent 46e607f commit b028b4f
Show file tree
Hide file tree
Showing 29 changed files with 730 additions and 2,121 deletions.
10 changes: 2 additions & 8 deletions cmd/daprd/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type Options struct {
DaprBlockShutdownDuration *time.Duration
ActorsService string
RemindersService string
SchedulerAddress *string
SchedulerAddress string
DaprAPIListenAddresses string
AppHealthProbeInterval int
AppHealthProbeTimeout int
Expand All @@ -79,8 +79,6 @@ type Options struct {
AppChannelAddress string
Logger logger.Options
Metrics *metrics.Options

schedulerAddressFlag string
}

func New(origArgs []string) (*Options, error) {
Expand Down Expand Up @@ -134,7 +132,6 @@ func New(origArgs []string) (*Options, error) {
fs.StringVar(&opts.SentryAddress, "sentry-address", "", "Address for the Sentry CA service")
fs.StringVar(&opts.ControlPlaneTrustDomain, "control-plane-trust-domain", "localhost", "Trust domain of the Dapr control plane")
fs.StringVar(&opts.ControlPlaneNamespace, "control-plane-namespace", "default", "Namespace of the Dapr control plane")
fs.StringVar(&opts.schedulerAddressFlag, "scheduler-host-address", "", "Addresses for Dapr Scheduler servers")
fs.StringVar(&opts.AllowedOrigins, "allowed-origins", cors.DefaultAllowedOrigins, "Allowed HTTP origins")
fs.BoolVar(&opts.EnableProfiling, "enable-profiling", false, "Enable profiling")
fs.BoolVar(&opts.RuntimeVersion, "version", false, "Prints the runtime version")
Expand Down Expand Up @@ -168,6 +165,7 @@ func New(origArgs []string) (*Options, error) {
fs.StringVar(&placementServiceHostAddr, "placement-host-address", "", "Addresses for Dapr Actor Placement servers (overrides actors-service)")
fs.StringVar(&opts.ActorsService, "actors-service", "", "Type and address of the actors service, in the format 'type:address'")
fs.StringVar(&opts.RemindersService, "reminders-service", "", "Type and address of the reminders service, in the format 'type:address'")
fs.StringVar(&opts.SchedulerAddress, "scheduler-address", "", "Address(es) of the scheduler service instance(s), as comma separated host:port pairs")

// Add flags for logger and metrics
opts.Logger = logger.DefaultOptions()
Expand Down Expand Up @@ -248,10 +246,6 @@ func New(origArgs []string) (*Options, error) {
opts.DaprBlockShutdownDuration = nil
}

if fs.Changed("scheduler-host-address") {
opts.SchedulerAddress = &opts.schedulerAddressFlag
}

return &opts, nil
}

Expand Down
30 changes: 0 additions & 30 deletions dapr/proto/runtime/v1/dapr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -194,12 +194,6 @@ service Dapr {

// Delete a job
rpc DeleteJob(DeleteJobRequest) returns (google.protobuf.Empty) {}

// Get a job
rpc GetJob(GetJobRequest) returns (GetJobResponse) {}

// List all jobs by app
rpc ListJobs(ListJobsRequest) returns (ListJobsResponse) {}
}

// InvokeServiceRequest represents the request message for Service invocation.
Expand Down Expand Up @@ -1153,28 +1147,4 @@ message ScheduleJobRequest {
message DeleteJobRequest {
// The name of the job.
string name = 1;
}

// GetJobRequest is the message to get the job by name.
message GetJobRequest {
// The name of the job.
string name = 1;
}

// GetJobResponse is the response message to convey the job.
message GetJobResponse {
// The job details.
Job job = 1;
}

// ListJobsRequest is the message to list jobs by app_id.
message ListJobsRequest {
// The id of the application (app_id) for which to list jobs.
string app_id = 1;
}

// ListJobsResponse is the response message to convey the list of jobs.
message ListJobsResponse {
// List of jobs that match the request criteria.
repeated Job jobs = 1;
}
20 changes: 0 additions & 20 deletions dapr/proto/scheduler/v1/scheduler.proto
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@ service Scheduler {
rpc ScheduleJob(ScheduleJobRequest) returns (ScheduleJobResponse) {}
// DeleteJob is used by the daprd sidecar to delete a job.
rpc DeleteJob(JobRequest) returns (DeleteJobResponse) {}
// GetJob is used by the daprd sidecar to get details of a job.
rpc GetJob(JobRequest) returns (GetJobResponse) {}
// ListJobs is used by the daprd sidecar to list jobs by app_id.
rpc ListJobs(ListJobsRequest) returns (ListJobsResponse) {}
}

message ConnectHostRequest {
Expand Down Expand Up @@ -73,19 +69,3 @@ message DeleteJobResponse {
// Empty as of now
}

// GetJobResponse is the response message to convey the details of a job.
message GetJobResponse {
runtime.v1.Job job = 1;
}

// ListJobsRequest is the message to list jobs by app_id.
message ListJobsRequest {
// The id of the application (app_id) for which to list jobs.
string app_id = 1;
}
// ListJobsResponse is the response message to convey the list of jobs.
message ListJobsResponse {
// List of jobs that match the request criteria.
repeated runtime.v1.Job jobs = 1;
}

33 changes: 0 additions & 33 deletions dapr/proto/scheduler/v1/scheduler_callback.proto

This file was deleted.

6 changes: 2 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ require (
contrib.go.opencensus.io/exporter/prometheus v0.4.2
github.com/PaesslerAG/jsonpath v0.1.1
github.com/PuerkitoBio/purell v1.2.1
github.com/Scalingo/go-etcd-cron v1.3.2
github.com/alphadose/haxmap v1.3.1
github.com/argoproj/argo-rollouts v1.4.1
github.com/cenkalti/backoff/v4 v4.2.1
github.com/cloudevents/sdk-go/v2 v2.14.0
github.com/dapr/components-contrib v1.13.0-rc.10
github.com/dapr/kit v0.13.0
github.com/diagridio/go-etcd-cron v0.0.0-20240321201514-c0f44b7f5d89
github.com/evanphx/json-patch/v5 v5.8.1
github.com/go-chi/chi/v5 v5.0.11
github.com/go-chi/cors v1.2.1
Expand Down Expand Up @@ -51,6 +51,7 @@ require (
github.com/spiffe/go-spiffe/v2 v2.1.6
github.com/stretchr/testify v1.9.0
github.com/valyala/fasthttp v1.51.0
go.etcd.io/etcd/api/v3 v3.5.12
go.etcd.io/etcd/client/v3 v3.5.12
go.etcd.io/etcd/server/v3 v3.5.11
go.mongodb.org/mongo-driver v1.12.1
Expand Down Expand Up @@ -279,7 +280,6 @@ require (
github.com/http-wasm/http-wasm-host-go v0.5.1 // indirect
github.com/huaweicloud/huaweicloud-sdk-go-obs v3.23.4+incompatible // indirect
github.com/huaweicloud/huaweicloud-sdk-go-v3 v0.1.56 // indirect
github.com/iancoleman/strcase v0.3.0 // indirect
github.com/imdario/mergo v0.3.16 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/influxdata/influxdb-client-go/v2 v2.12.3 // indirect
Expand Down Expand Up @@ -407,7 +407,6 @@ require (
github.com/yusufpapurcu/wmi v1.2.2 // indirect
github.com/zeebo/errs v1.3.0 // indirect
go.etcd.io/bbolt v1.3.8 // indirect
go.etcd.io/etcd/api/v3 v3.5.12 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.12 // indirect
go.etcd.io/etcd/client/v2 v2.305.11 // indirect
go.etcd.io/etcd/pkg/v3 v3.5.11 // indirect
Expand Down Expand Up @@ -489,7 +488,6 @@ replace github.com/microcosm-cc/bluemonday => github.com/microcosm-cc/bluemonday
//
// replace github.com/dapr/components-contrib => ../components-contrib
// replace github.com/dapr/kit => ../kit
replace github.com/Scalingo/go-etcd-cron => github.com/cicoyle/go-etcd-cron v0.0.0-20240212132024-691d2e9fb3f1

//
// Then, run `make modtidy-all` in this repository.
Expand Down
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -363,8 +363,6 @@ github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5P
github.com/chzyer/readline v1.5.0/go.mod h1:x22KAscuvRqlLoK9CsoYsmxoXZMMFVyOl86cAH8qUic=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
github.com/chzyer/test v0.0.0-20210722231415-061457976a23/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
github.com/cicoyle/go-etcd-cron v0.0.0-20240212132024-691d2e9fb3f1 h1:u2JCV1Y2DpQ1cULuNzbz2dYl1onJ5eZBus8DWFgH28I=
github.com/cicoyle/go-etcd-cron v0.0.0-20240212132024-691d2e9fb3f1/go.mod h1:h4rx0m29dWSqSm9Oikw83ldkxyeXKH19aAlj4vagiek=
github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6Dob7S7YxXgwXpfOuvO54S+tGdZdw9fuRZt25Ag=
github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp5jckzBHf4XRpQvBOLI+I=
github.com/clbanning/mxj/v2 v2.5.5/go.mod h1:hNiWqW14h+kc+MdF9C6/YoRfjEJoR3ou6tn/Qo+ve2s=
Expand Down Expand Up @@ -467,6 +465,8 @@ github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cu
github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
github.com/dgryski/trifles v0.0.0-20200323201526-dd97f9abfb48 h1:fRzb/w+pyskVMQ+UbP35JkH8yB7MYb4q/qhBarqZE6g=
github.com/dgryski/trifles v0.0.0-20200323201526-dd97f9abfb48/go.mod h1:if7Fbed8SFyPtHLHbg49SI7NAdJiC5WIA09pe59rfAA=
github.com/diagridio/go-etcd-cron v0.0.0-20240321201514-c0f44b7f5d89 h1:mcF9KZDBpLjzO6XIiV6bBg6R2VxPzf8F5RuXgwBjA08=
github.com/diagridio/go-etcd-cron v0.0.0-20240321201514-c0f44b7f5d89/go.mod h1:tRZ3z7Mr4Rp9fYdxmCB63V7zrwqtUINR6QHriC1bauw=
github.com/didip/tollbooth/v7 v7.0.1 h1:TkT4sBKoQoHQFPf7blQ54iHrZiTDnr8TceU+MulVAog=
github.com/didip/tollbooth/v7 v7.0.1/go.mod h1:VZhDSGl5bDSPj4wPsih3PFa4Uh9Ghv8hgacaTm5PRT4=
github.com/dimfeld/httptreemux v5.0.1+incompatible h1:Qj3gVcDNoOthBAqftuD596rm4wg/adLLz5xh5CmpiCA=
Expand Down Expand Up @@ -967,8 +967,6 @@ github.com/huaweicloud/huaweicloud-sdk-go-v3 v0.1.56 h1:ULzGSSe95hkOdh17NsiPV3lw
github.com/huaweicloud/huaweicloud-sdk-go-v3 v0.1.56/go.mod h1:bsqx6o47Kl4YsniIjPwuoeqiIB5Fc3JbSpB2b3o3WFQ=
github.com/hudl/fargo v1.3.0/go.mod h1:y3CKSmjA+wD2gak7sUSXTAoopbhU08POFhmITJgmKTg=
github.com/iancoleman/strcase v0.0.0-20191112232945-16388991a334/go.mod h1:SK73tn/9oHe+/Y0h39VT4UCxmurVJkR5NA7kMEAOgSE=
github.com/iancoleman/strcase v0.3.0 h1:nTXanmYxhfFAMjZL34Ov6gkzEsSJZ5DbhxWjvSASxEI=
github.com/iancoleman/strcase v0.3.0/go.mod h1:iwCmte+B7n89clKwxIoIXy/HfoL7AsD47ZCWhYzw7ho=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/ianlancetaylor/demangle v0.0.0-20220319035150-800ac71e25c2/go.mod h1:aYm2/VgdVmcIU8iMfdMvDMsRAQjcfZSKFby6HOFvi/w=
Expand Down
12 changes: 5 additions & 7 deletions pkg/actors/actors.go
Original file line number Diff line number Diff line change
Expand Up @@ -1210,21 +1210,19 @@ func (a *actorsRuntime) CreateReminder(ctx context.Context, req *CreateReminderR
return err
}

// TODO: change the 3rd party library to take our format
jobSchedule := "@every " + req.Period
internalScheduleJobReq := &schedulerv1pb.ScheduleJobRequest{
Job: &runtimev1pb.Job{
Name: a.actorsConfig.AppID + "||" + jobName,
Schedule: jobSchedule,
Name: jobName,
Schedule: req.Period,
Data: &anypb.Any{
TypeUrl: "type.googleapis.com/google.protobuf.BytesValue",
Value: data,
Value: data, // TODO: this should go to actorStateStore
},
DueTime: req.DueTime,
Ttl: req.TTL,
},
Namespace: "", // TODO
Metadata: metadata, // TODO: this should generate key if jobStateStore is configured
Namespace: a.actorsConfig.Namespace,
Metadata: metadata,
}

_, err = a.scheduler.ScheduleJob(ctx, internalScheduleJobReq)
Expand Down
4 changes: 2 additions & 2 deletions pkg/actors/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type ConfigOpts struct {
AppID string
ActorsService string
RemindersService string
SchedulerService string
SchedulerAddress string
Port int
Namespace string
AppConfig daprAppConfig.ApplicationConfig
Expand All @@ -59,7 +59,7 @@ func NewConfig(opts ConfigOpts) Config {
AppID: opts.AppID,
ActorsService: opts.ActorsService,
RemindersService: opts.RemindersService,
SchedulerService: opts.SchedulerService,
SchedulerService: opts.SchedulerAddress,
Port: opts.Port,
Namespace: opts.Namespace,
DrainRebalancedActors: opts.AppConfig.DrainRebalancedActors,
Expand Down
60 changes: 0 additions & 60 deletions pkg/api/http/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,26 +52,6 @@ func (a *api) constructSchedulerEndpoints() []endpoints.Endpoint {
Name: "DeleteJob",
},
},
{
Methods: []string{http.MethodGet},
Route: "job/{name}",
Version: apiVersionV1,
Group: endpointGroupSchedulerV1Alpha1,
Handler: a.onGetJobHandler(),
Settings: endpoints.EndpointSettings{
Name: "GetJob",
},
},
{
Methods: []string{http.MethodGet},
Route: "jobs/{app_id}",
Version: apiVersionV1,
Group: endpointGroupSchedulerV1Alpha1,
Handler: a.onListJobsHandler(),
Settings: endpoints.EndpointSettings{
Name: "ListJobs",
},
},
}
}

Expand Down Expand Up @@ -122,43 +102,3 @@ func (a *api) onDeleteJobHandler() http.HandlerFunc {
},
)
}

func (a *api) onGetJobHandler() http.HandlerFunc {
return UniversalHTTPHandler(
a.universal.GetJob,
UniversalHTTPHandlerOpts[*runtimev1pb.GetJobRequest, *runtimev1pb.GetJobResponse]{
SkipInputBody: true,
InModifier: func(r *http.Request, in *runtimev1pb.GetJobRequest) (*runtimev1pb.GetJobRequest, error) {
name := chi.URLParam(r, "name")
in.Name = name
return in, nil
},
OutModifier: func(out *runtimev1pb.GetJobResponse) (any, error) {
if out == nil || out.GetJob() == nil {
return nil, nil // empty body
}
return out.GetJob(), nil // empty body
},
},
)
}

func (a *api) onListJobsHandler() http.HandlerFunc {
return UniversalHTTPHandler(
a.universal.ListJobs,
UniversalHTTPHandlerOpts[*runtimev1pb.ListJobsRequest, *runtimev1pb.ListJobsResponse]{
SkipInputBody: true,
InModifier: func(r *http.Request, in *runtimev1pb.ListJobsRequest) (*runtimev1pb.ListJobsRequest, error) {
appID := chi.URLParam(r, "app_id")
in.AppId = appID
return in, nil
},
OutModifier: func(out *runtimev1pb.ListJobsResponse) (any, error) {
if out == nil || out.GetJobs() == nil {
return nil, nil // empty body
}
return out.GetJobs(), nil // empty body
},
},
)
}
Loading

0 comments on commit b028b4f

Please sign in to comment.