Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
yohamta committed Feb 6, 2025
1 parent 552649a commit 9797272
Show file tree
Hide file tree
Showing 7 changed files with 190 additions and 167 deletions.
2 changes: 1 addition & 1 deletion cmd/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (s *setup) scheduler() (*scheduler.Scheduler, error) {
return nil, fmt.Errorf("failed to initialize client: %w", err)
}

manager := scheduler.NewDAGManager(s.cfg.Paths.DAGsDir, cli, s.cfg.Paths.Executable, s.cfg.WorkDir)
manager := scheduler.NewDAGJobManager(s.cfg.Paths.DAGsDir, cli, s.cfg.Paths.Executable, s.cfg.WorkDir)
return scheduler.New(s.cfg, manager), nil
}

Expand Down
10 changes: 5 additions & 5 deletions internal/scheduler/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

"github.com/dagu-org/dagu/internal/client"
"github.com/dagu-org/dagu/internal/digraph"
dagscheduler "github.com/dagu-org/dagu/internal/digraph/scheduler"
"github.com/dagu-org/dagu/internal/digraph/scheduler"
"github.com/dagu-org/dagu/internal/logger"
"github.com/dagu-org/dagu/internal/persistence/model"
"github.com/dagu-org/dagu/internal/stringutil"
Expand Down Expand Up @@ -47,7 +47,7 @@ func (job *dagJob) Start(ctx context.Context) error {
}

// Guard against already running jobs.
if latestStatus.Status == dagscheduler.StatusRunning {
if latestStatus.Status == scheduler.StatusRunning {
return ErrJobRunning
}

Expand All @@ -63,7 +63,7 @@ func (job *dagJob) Start(ctx context.Context) error {
// ready checks whether the job can be safely started based on the latest status.
func (job *dagJob) ready(ctx context.Context, latestStatus model.Status) error {
// Prevent starting if it's already running.
if latestStatus.Status == dagscheduler.StatusRunning {
if latestStatus.Status == scheduler.StatusRunning {
return ErrJobRunning
}

Expand All @@ -88,7 +88,7 @@ func (job *dagJob) ready(ctx context.Context, latestStatus model.Status) error {
// If so, the current run is skipped.
func (job *dagJob) skipIfSuccessful(ctx context.Context, latestStatus model.Status, latestStartedAt time.Time) error {
// If skip is not configured, or the DAG is not currently successful, do nothing.
if !job.DAG.SkipIfSuccessful || latestStatus.Status != dagscheduler.StatusSuccess {
if !job.DAG.SkipIfSuccessful || latestStatus.Status != scheduler.StatusSuccess {
return nil
}

Expand All @@ -115,7 +115,7 @@ func (job *dagJob) Stop(ctx context.Context) error {
if err != nil {
return err
}
if latestStatus.Status != dagscheduler.StatusRunning {
if latestStatus.Status != scheduler.StatusRunning {
return ErrJobIsNotRunning
}
return job.Client.Stop(ctx, job.DAG)
Expand Down
42 changes: 22 additions & 20 deletions internal/scheduler/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@ type JobManager interface {
Next(ctx context.Context, now time.Time) ([]*ScheduledJob, error)
}

// ScheduledJob stores the next time a job should be run and the job itself.
type ScheduledJob struct {
Next time.Time // Next is the time when the job should be run.
Job Job
Type ScheduleType
Type ScheduleType // start, stop, restart
}

func NewSchedule(next time.Time, job Job, typ ScheduleType) *ScheduledJob {
// NewScheduledJob creates a new ScheduledJob.
func NewScheduledJob(next time.Time, job Job, typ ScheduleType) *ScheduledJob {
return &ScheduledJob{next, job, typ}
}

Expand All @@ -49,8 +51,8 @@ type dagJobManager struct {
workDir string
}

// NewDAGManager creates a new DAG manager with the given configuration.
func NewDAGManager(dir string, client client.Client, executable, workDir string) JobManager {
// NewDAGJobManager creates a new DAG manager with the given configuration.
func NewDAGJobManager(dir string, client client.Client, executable, workDir string) JobManager {
return &dagJobManager{
targetDir: dir,
lock: sync.Mutex{},
Expand All @@ -61,17 +63,6 @@ func NewDAGManager(dir string, client client.Client, executable, workDir string)
}
}

func (m *dagJobManager) createJob(dag *digraph.DAG, next time.Time, schedule cron.Schedule) Job {
return &dagJob{
DAG: dag,
Executable: m.executable,
WorkDir: m.workDir,
Next: next,
Schedule: schedule,
Client: m.client,
}
}

func (m *dagJobManager) Start(ctx context.Context, done chan any) error {
if err := m.initialize(ctx); err != nil {
return fmt.Errorf("failed to initialize DAGs: %w", err)
Expand Down Expand Up @@ -106,7 +97,7 @@ func (m *dagJobManager) Next(ctx context.Context, now time.Time) ([]*ScheduledJo
for _, s := range schedules {
for _, schedule := range s.items {
next := schedule.Parsed.Next(now)
job := NewSchedule(next, m.createJob(dag, next, schedule.Parsed), s.typ)
job := NewScheduledJob(next, m.createJob(dag, next, schedule.Parsed), s.typ)
jobs = append(jobs, job)
}
}
Expand All @@ -115,6 +106,17 @@ func (m *dagJobManager) Next(ctx context.Context, now time.Time) ([]*ScheduledJo
return jobs, nil
}

func (m *dagJobManager) createJob(dag *digraph.DAG, next time.Time, schedule cron.Schedule) Job {
return &dagJob{
DAG: dag,
Executable: m.executable,
WorkDir: m.workDir,
Next: next,
Schedule: schedule,
Client: m.client,
}
}

func (m *dagJobManager) initialize(ctx context.Context) error {
m.lock.Lock()
defer m.lock.Unlock()
Expand All @@ -129,15 +131,15 @@ func (m *dagJobManager) initialize(ctx context.Context) error {
if fileutil.IsYAMLFile(fi.Name()) {
dag, err := digraph.Load(ctx, filepath.Join(m.targetDir, fi.Name()), digraph.OnlyMetadata(), digraph.WithoutEval())
if err != nil {
logger.Error(ctx, "DAG load failed", "err", err, "DAG", fi.Name())
logger.Error(ctx, "DAG load failed", "err", err, "name", fi.Name())
continue
}
m.registry[fi.Name()] = dag
dags = append(dags, fi.Name())
}
}

logger.Info(ctx, "Scheduler initialized", "specs", strings.Join(dags, ","))
logger.Info(ctx, "Scheduler initialized", "dags", strings.Join(dags, ","))
return nil
}

Expand Down Expand Up @@ -176,12 +178,12 @@ func (m *dagJobManager) watchDags(ctx context.Context, done chan any) {
logger.Error(ctx, "DAG load failed", "err", err, "file", event.Name)
} else {
m.registry[filepath.Base(event.Name)] = dag
logger.Info(ctx, "DAG added/updated", "DAG", filepath.Base(event.Name))
logger.Info(ctx, "DAG added/updated", "name", filepath.Base(event.Name))
}
}
if event.Op == fsnotify.Rename || event.Op == fsnotify.Remove {
delete(m.registry, filepath.Base(event.Name))
logger.Info(ctx, "DAG removed", "DAG", filepath.Base(event.Name))
logger.Info(ctx, "DAG removed", "name", filepath.Base(event.Name))
}
m.lock.Unlock()

Expand Down
123 changes: 48 additions & 75 deletions internal/scheduler/manger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,105 +2,78 @@ package scheduler

import (
"context"
"os"
"path/filepath"
"testing"
"time"

"github.com/dagu-org/dagu/internal/build"
"github.com/dagu-org/dagu/internal/client"
"github.com/dagu-org/dagu/internal/fileutil"
"github.com/dagu-org/dagu/internal/persistence/jsondb"
"github.com/dagu-org/dagu/internal/persistence/local"
"github.com/dagu-org/dagu/internal/persistence/local/storage"

"github.com/stretchr/testify/require"

"github.com/dagu-org/dagu/internal/config"
)

func TestReadEntries(t *testing.T) {
t.Run("ReadEntries", func(t *testing.T) {
tmpDir, cli, _ := setupTest(t)
defer func() {
_ = os.RemoveAll(tmpDir)
}()

now := time.Date(2020, 1, 1, 1, 0, 0, 0, time.UTC).Add(-time.Second)
entryReader := NewDAGManager(
filepath.Join(testdataDir, "invalid_directory"),
cli,
"",
"",
)

entries, err := entryReader.Next(context.Background(), now)
expectedNext := time.Date(2020, 1, 1, 1, 0, 0, 0, time.UTC)
now := expectedNext.Add(-time.Second)

t.Run("InvalidDirectory", func(t *testing.T) {
manager := NewDAGJobManager("invalid_directory", nil, "", "")
jobs, err := manager.Next(context.Background(), expectedNext)
require.NoError(t, err)
require.Len(t, jobs, 0)
})
t.Run("StartAndNext", func(t *testing.T) {
th := setupTest(t)
ctx := context.Background()

done := make(chan any)
defer close(done)

err := th.manager.Start(ctx, done)
require.NoError(t, err)
require.Len(t, entries, 0)

entryReader = NewDAGManager(
testdataDir,
cli,
"",
"",
)
jobs, err := th.manager.Next(ctx, now)
require.NoError(t, err)
require.NotEmpty(t, jobs, "jobs should not be empty")

job := jobs[0]
next := job.Next
require.Equal(t, expectedNext, next)
})
t.Run("SuspendedJob", func(t *testing.T) {
th := setupTest(t)
ctx := context.Background()

done := make(chan any)
defer close(done)
err = entryReader.Start(context.Background(), done)

err := th.manager.Start(ctx, done)
require.NoError(t, err)

entries, err = entryReader.Next(context.Background(), now)
beforeSuspend, err := th.manager.Next(ctx, now)
require.NoError(t, err)
require.GreaterOrEqual(t, len(entries), 1)

next := entries[0].Next
require.Equal(t, now.Add(time.Second), next)

// suspend
var j Job
for _, e := range entries {
jj := e.Job
if jj.GetDAG(context.Background()).Name == "scheduled_job" {
j = jj
break
}
}

err = cli.ToggleSuspend(context.Background(), j.GetDAG(context.Background()).Name, true)
// find the job and suspend it
job := findJobByName(t, beforeSuspend, "scheduled_job").Job
dagJob, ok := job.(*dagJob)
require.True(t, ok)
dag := dagJob.DAG
err = th.client.ToggleSuspend(ctx, dag.Name, true)
require.NoError(t, err)

// check if the job is suspended
lives, err := entryReader.Next(context.Background(), now)
// check if the job is suspended and not returned
afterSuspend, err := th.manager.Next(ctx, now)
require.NoError(t, err)
require.Equal(t, len(entries)-1, len(lives))
require.Equal(t, len(afterSuspend), len(beforeSuspend)-1, "suspended job should not be returned")
})
}

var testdataDir = filepath.Join(fileutil.MustGetwd(), "testdata")

func setupTest(t *testing.T) (string, client.Client, *config.Config) {
func findJobByName(t *testing.T, jobs []*ScheduledJob, name string) *ScheduledJob {
t.Helper()

tmpDir := fileutil.MustTempDir("test")

err := os.Setenv("HOME", tmpDir)
require.NoError(t, err)

cfg := &config.Config{
Paths: config.PathsConfig{
DataDir: filepath.Join(tmpDir, "."+build.Slug, "data"),
DAGsDir: testdataDir,
SuspendFlagsDir: tmpDir,
},
WorkDir: tmpDir,
for _, job := range jobs {
dagJob, ok := job.Job.(*dagJob)
if ok && dagJob.DAG.Name == name {
return job
}
}

dagStore := local.NewDAGStore(cfg.Paths.DAGsDir)
historyStore := jsondb.New(cfg.Paths.DataDir)
flagStore := local.NewFlagStore(
storage.NewStorage(cfg.Paths.SuspendFlagsDir),
)

return tmpDir, client.New(dagStore, historyStore, flagStore, "", cfg.WorkDir), cfg
t.Fatalf("job %s not found", name)
return nil
}
Loading

0 comments on commit 9797272

Please sign in to comment.