Skip to content

Commit

Permalink
Add queue
Browse files Browse the repository at this point in the history
  • Loading branch information
wzshiming committed Jan 17, 2025
1 parent 997fbce commit cd897d8
Show file tree
Hide file tree
Showing 19 changed files with 2,212 additions and 4 deletions.
2 changes: 1 addition & 1 deletion cmd/crproxy/cluster/auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func runE(ctx context.Context, flags *flagpole) error {
}

handler = handlers.CORS(
handlers.AllowedMethods([]string{http.MethodHead, http.MethodGet, http.MethodPost, http.MethodPut, http.MethodDelete}),
handlers.AllowedMethods([]string{http.MethodHead, http.MethodGet, http.MethodPost, http.MethodPatch, http.MethodPut, http.MethodDelete}),
handlers.AllowedHeaders([]string{"Authorization", "Accept", "Content-Type", "Origin"}),
handlers.AllowedOrigins([]string{"*"}),
)(handler)
Expand Down
5 changes: 5 additions & 0 deletions cmd/crproxy/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"github.com/daocloud/crproxy/cmd/crproxy/cluster/agent"
"github.com/daocloud/crproxy/cmd/crproxy/cluster/auth"
"github.com/daocloud/crproxy/cmd/crproxy/cluster/gateway"
"github.com/daocloud/crproxy/cmd/crproxy/cluster/queue"
"github.com/daocloud/crproxy/cmd/crproxy/cluster/runner"
)

func NewCommand() *cobra.Command {
Expand All @@ -20,5 +22,8 @@ func NewCommand() *cobra.Command {
cmd.AddCommand(agent.NewCommand())
cmd.AddCommand(gateway.NewCommand())
cmd.AddCommand(auth.NewCommand())

cmd.AddCommand(queue.NewCommand())
cmd.AddCommand(runner.NewCommand())
return cmd
}
114 changes: 114 additions & 0 deletions cmd/crproxy/cluster/queue/queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package queue

import (
"context"
"database/sql"
"fmt"
"log/slog"
"net/http"
"os"

_ "github.com/go-sql-driver/mysql"

"github.com/daocloud/crproxy/internal/server"
"github.com/daocloud/crproxy/queue"
"github.com/emicklei/go-restful/v3"
"github.com/gorilla/handlers"
"github.com/spf13/cobra"
)

type flagpole struct {
Behind bool
Address string
AcmeHosts []string
AcmeCacheDir string
CertFile string
PrivateKeyFile string

TokenPublicKeyFile string

SimpleAuthUserpass map[string]string

AdminToken string

DBURL string
}

func NewCommand() *cobra.Command {
flags := &flagpole{
Address: ":18010",
}

cmd := &cobra.Command{
Use: "queue",
Short: "Queue",
RunE: func(cmd *cobra.Command, args []string) error {
return runE(cmd.Context(), flags)
},
}

cmd.Flags().BoolVar(&flags.Behind, "behind", flags.Behind, "Behind")
cmd.Flags().StringVar(&flags.Address, "address", flags.Address, "Address")
cmd.Flags().StringSliceVar(&flags.AcmeHosts, "acme-hosts", flags.AcmeHosts, "Acme hosts")
cmd.Flags().StringVar(&flags.AcmeCacheDir, "acme-cache-dir", flags.AcmeCacheDir, "Acme cache dir")
cmd.Flags().StringVar(&flags.CertFile, "cert-file", flags.CertFile, "Cert file")
cmd.Flags().StringVar(&flags.PrivateKeyFile, "private-key-file", flags.PrivateKeyFile, "Private key file")

cmd.Flags().StringVar(&flags.TokenPublicKeyFile, "token-public-key-file", "", "public key file")

cmd.Flags().StringVar(&flags.AdminToken, "admin-token", flags.AdminToken, "Admin token")

cmd.Flags().StringVar(&flags.DBURL, "db-url", flags.DBURL, "Database URL")

return cmd
}

func runE(ctx context.Context, flags *flagpole) error {
logger := slog.New(slog.NewJSONHandler(os.Stderr, nil))

container := restful.NewContainer()

var mgr *queue.QueueManager
if flags.DBURL != "" {
dburl := flags.DBURL
db, err := sql.Open("mysql", dburl)
if err != nil {
return fmt.Errorf("failed to connect to database: %w", err)
}
defer db.Close()

if err = db.Ping(); err != nil {
return fmt.Errorf("failed to ping database: %w", err)
}

logger.Info("Connected to DB")

mgr = queue.NewQueueManager(flags.AdminToken, db)

mgr.Register(container)

mgr.InitTable(ctx)

mgr.Schedule(ctx, logger)
}

var handler http.Handler = container

handler = handlers.LoggingHandler(os.Stderr, handler)

if flags.Behind {
handler = handlers.ProxyHeaders(handler)
}

handler = handlers.CORS(
handlers.AllowedMethods([]string{http.MethodHead, http.MethodGet, http.MethodPost, http.MethodPatch, http.MethodPut, http.MethodDelete}),
handlers.AllowedHeaders([]string{"Authorization", "Accept", "Content-Type", "Origin"}),
handlers.AllowedOrigins([]string{"*"}),
)(handler)

err := server.Run(ctx, flags.Address, handler, flags.AcmeHosts, flags.AcmeCacheDir, flags.CertFile, flags.PrivateKeyFile)
if err != nil {
return fmt.Errorf("failed to run server: %w", err)
}
return nil
}
144 changes: 144 additions & 0 deletions cmd/crproxy/cluster/runner/runner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package runner

import (
"context"
"errors"
"fmt"
"log/slog"
"net/http"
"os"
"time"

"github.com/daocloud/crproxy/cache"
"github.com/daocloud/crproxy/runner"
"github.com/daocloud/crproxy/storage"
csync "github.com/daocloud/crproxy/sync"
"github.com/daocloud/crproxy/transport"
"github.com/docker/distribution/manifest/manifestlist"
"github.com/spf13/cobra"
)

type flagpole struct {
QueueURL string

AdminToken string

StorageURL []string
Deep bool
Quick bool
Platform []string
Userpass []string

Duration time.Duration
}

func NewCommand() *cobra.Command {
flags := &flagpole{
Platform: []string{
"linux/amd64",
"linux/arm64",
},
}

cmd := &cobra.Command{
Use: "runner",
Short: "Runner",
RunE: func(cmd *cobra.Command, args []string) error {
return runE(cmd.Context(), flags)
},
}

cmd.Flags().StringVar(&flags.AdminToken, "admin-token", flags.AdminToken, "Admin token")

cmd.Flags().StringVar(&flags.QueueURL, "queue-url", flags.QueueURL, "Queue URL")

cmd.Flags().StringArrayVar(&flags.StorageURL, "storage-url", flags.StorageURL, "Storage driver url")
cmd.Flags().BoolVar(&flags.Deep, "deep", flags.Deep, "Deep sync with blob")
cmd.Flags().BoolVar(&flags.Quick, "quick", flags.Quick, "Quick sync with tags")
cmd.Flags().StringSliceVar(&flags.Platform, "platform", flags.Platform, "Platform")
cmd.Flags().StringArrayVarP(&flags.Userpass, "user", "u", flags.Userpass, "host and username and password -u user:pwd@host")

cmd.Flags().DurationVar(&flags.Duration, "duration", flags.Duration, "Duration of the running")

return cmd
}

func runE(ctx context.Context, flags *flagpole) error {
logger := slog.New(slog.NewJSONHandler(os.Stderr, nil))

opts := []csync.Option{}

var caches []*cache.Cache
for _, s := range flags.StorageURL {
sd, err := storage.NewStorage(s)
if err != nil {
return fmt.Errorf("create storage driver failed: %w", err)
}

cache, err := cache.NewCache(cache.WithStorageDriver(sd))
if err != nil {
return fmt.Errorf("create cache failed: %w", err)
}

caches = append(caches, cache)
}

transportOpts := []transport.Option{
transport.WithLogger(logger),
}

if len(flags.Userpass) != 0 {
transportOpts = append(transportOpts, transport.WithUserAndPass(flags.Userpass))
}

tp, err := transport.NewTransport(transportOpts...)
if err != nil {
return fmt.Errorf("create transport failed: %w", err)
}

opts = append(opts,
csync.WithCaches(caches...),
csync.WithDeep(flags.Deep),
csync.WithQuick(flags.Quick),
csync.WithTransport(tp),
csync.WithLogger(logger),
csync.WithFilterPlatform(filterPlatform(flags.Platform)),
)

sm, err := csync.NewSyncManager(opts...)
if err != nil {
return fmt.Errorf("create sync manager failed: %w", err)
}

runner, err := runner.NewRunner(http.DefaultClient, flags.QueueURL, flags.AdminToken, sm)
if err != nil {
return err
}

if flags.Duration > 0 {
ctx, _ = context.WithTimeout(ctx, flags.Duration)
}

err = runner.Run(ctx, logger)
if err != nil {
if !errors.Is(err, context.DeadlineExceeded) {
return err
}
}
return nil
}

func filterPlatform(ps []string) func(pf manifestlist.PlatformSpec) bool {
platforms := map[string]struct{}{}
for _, p := range ps {
platforms[p] = struct{}{}
}
return func(pf manifestlist.PlatformSpec) bool {
p := fmt.Sprintf("%s/%s", pf.OS, pf.Architecture)

if _, ok := platforms[p]; ok {
return true
}
return false
}
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/go-openapi/spec v0.20.9
github.com/go-sql-driver/mysql v1.8.1
github.com/google/go-containerregistry v0.20.2
github.com/google/uuid v1.6.0
github.com/gorilla/handlers v1.5.2
github.com/huaweicloud/huaweicloud-sdk-go-obs v3.24.6+incompatible
github.com/opencontainers/go-digest v1.0.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN
github.com/google/go-containerregistry v0.20.2 h1:B1wPJ1SN/S7pB+ZAimcciVD+r+yV/l/DSArMxlbwseo=
github.com/google/go-containerregistry v0.20.2/go.mod h1:z38EKdKh4h7IP2gSfUUqEvalZBqs6AoLeWfUy34nQC8=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/handlers v1.5.2 h1:cLTUSsNkgcwhgRqvCNmdbRWG0A3N4F+M2nWKdScwyEE=
github.com/gorilla/handlers v1.5.2/go.mod h1:dX+xVpaxdSw+q0Qek8SSsl3dfMk3jNddUkMzo0GtH0w=
github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY=
Expand Down
2 changes: 1 addition & 1 deletion manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (m *Manager) Register(container *restful.Container) {
PostBuildSwaggerObjectHandler: func(s *spec.Swagger) {
s.Info = &spec.Info{}
s.Info.Title = "CRProxy Manager"
s.Schemes = []string{"http", "https"}
s.Schemes = []string{"https", "http"}
s.SecurityDefinitions = spec.SecurityDefinitions{
"BearerHeader": {
SecuritySchemeProps: spec.SecuritySchemeProps{
Expand Down
Loading

0 comments on commit cd897d8

Please sign in to comment.