diff --git a/README.md b/README.md index cbd9d5c33..8fc99f6a5 100644 --- a/README.md +++ b/README.md @@ -255,7 +255,7 @@ dagu version - ️[Quick Start Guide](https://dagu.readthedocs.io/en/latest/quickstart.html) - [Command Line Interface](https://dagu.readthedocs.io/en/latest/cli.html) - [Web User Interface](https://dagu.readthedocs.io/en/latest/web_interface.html) -- YAML Format +- Writing DAG - [Minimal DAG Definition](https://dagu.readthedocs.io/en/latest/yaml_format.html#minimal-dag-definition) - [Running Arbitrary Code Snippets](https://dagu.readthedocs.io/en/latest/yaml_format.html#running-arbitrary-code-snippets) - [Environment Variables](https://dagu.readthedocs.io/en/latest/yaml_format.html#defining-environment-variables) @@ -266,6 +266,8 @@ dagu version - [Redirecting Stdout and Stderr](https://dagu.readthedocs.io/en/latest/yaml_format.html#redirecting-stdout-and-stderr) - [Lifecycle Hooks](https://dagu.readthedocs.io/en/latest/yaml_format.html#adding-lifecycle-hooks) - [Repeating Task](https://dagu.readthedocs.io/en/latest/yaml_format.html#repeating-a-task-at-regular-intervals) + - [Minimal DAG Definition](https://dagu.readthedocs.io/en/latest/yaml_format.html#minimal-dag-definition) + - [Running Sub-DAG](https://dagu.readthedocs.io/en/latest/yaml_format.html#running-sub-dag) - [All Available Fields for a DAG](https://dagu.readthedocs.io/en/latest/yaml_format.html#all-available-fields-for-dags) - [All Available Fields for a Step](https://dagu.readthedocs.io/en/latest/yaml_format.html#all-available-fields-for-steps) - Example DAGs diff --git a/cmd/common_test.go b/cmd/common_test.go index 30038184d..566b21f16 100644 --- a/cmd/common_test.go +++ b/cmd/common_test.go @@ -30,7 +30,7 @@ func setupTest(t *testing.T) (string, engine.Engine, persistence.DataStoreFactor DataDir: path.Join(tmpDir, ".dagu", "data"), }) - e := engine.NewFactory(ds, nil).Create() + e := engine.NewFactory(ds, config.Get()).Create() return tmpDir, e, ds } diff --git a/cmd/restart_test.go b/cmd/restart_test.go index 4343f8d1c..46033a854 100644 --- a/cmd/restart_test.go +++ b/cmd/restart_test.go @@ -55,7 +55,7 @@ func TestRestartCommand(t *testing.T) { require.NoError(t, err) df := client.NewDataStoreFactory(config.Get()) - e = engine.NewFactory(df, nil).Create() + e = engine.NewFactory(df, config.Get()).Create() sts := e.GetRecentHistory(d, 2) require.Len(t, sts, 2) diff --git a/cmd/retry.go b/cmd/retry.go index f705af00b..6f385884c 100644 --- a/cmd/retry.go +++ b/cmd/retry.go @@ -26,7 +26,7 @@ func retryCmd() *cobra.Command { // TODO: use engine.Engine instead of client.DataStoreFactory df := client.NewDataStoreFactory(config.Get()) - e := engine.NewFactory(df, nil).Create() + e := engine.NewFactory(df, config.Get()).Create() hs := df.NewHistoryStore() diff --git a/docs/source/yaml_format.rst b/docs/source/yaml_format.rst index 8e61290a6..74771b39f 100644 --- a/docs/source/yaml_format.rst +++ b/docs/source/yaml_format.rst @@ -461,6 +461,19 @@ Formatting JSON .. _command-execution-over-ssh: + +Running Sub-DAG +~~~~~~~~~~~~~~~~ + +You can run a sub-DAG from a DAG file. The sub-DAG is defined in a separate file and can be called using the `run` field. + +.. code-block:: yaml + + steps: + - name: A task + run: # e.g., sub_dag, sub_dag.yaml, /path/to/sub_dag.yaml + params: "FOO=BAR" # optional + All Available Fields -------------------- @@ -545,6 +558,8 @@ Each step can have its own set of configurations, including: - ``repeatPolicy``: The repeat policy for the step. - ``preconditions``: The conditions that must be met before a step can run. - ``depends``: The step depends on the other step. +- ``run``: The sub-DAG to run. +- ``params``: The parameters to pass to the sub-DAG. Example: @@ -576,4 +591,6 @@ Example: - condition: "`echo $1`" expected: "param1" depends: - - some task name step + - some task name step + run: sub_dag + params: "FOO=BAR" diff --git a/go.mod b/go.mod index 6132385b4..ee25a3844 100644 --- a/go.mod +++ b/go.mod @@ -21,11 +21,13 @@ require ( github.com/jessevdk/go-flags v1.5.0 github.com/mattn/go-shellwords v1.0.12 github.com/mitchellh/mapstructure v1.5.0 + github.com/pkg/errors v0.9.1 github.com/robfig/cron/v3 v3.0.1 github.com/spf13/cobra v1.6.1 github.com/spf13/viper v1.15.0 github.com/stretchr/testify v1.8.2 go.uber.org/fx v1.20.0 + go.uber.org/goleak v1.3.0 golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63 golang.org/x/text v0.12.0 gopkg.in/yaml.v2 v2.4.0 @@ -53,7 +55,6 @@ require ( github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.0.2 // indirect github.com/pelletier/go-toml/v2 v2.0.6 // indirect - github.com/pkg/errors v0.9.1 // indirect github.com/sirupsen/logrus v1.9.0 // indirect github.com/spf13/afero v1.9.3 // indirect github.com/spf13/cast v1.5.0 // indirect @@ -63,7 +64,6 @@ require ( go.mongodb.org/mongo-driver v1.11.3 // indirect go.uber.org/atomic v1.9.0 // indirect go.uber.org/dig v1.17.0 // indirect - go.uber.org/goleak v1.3.0 // indirect go.uber.org/multierr v1.8.0 // indirect go.uber.org/zap v1.23.0 // indirect golang.org/x/mod v0.12.0 // indirect diff --git a/go.sum b/go.sum index 684876fd1..eb46a32f1 100644 --- a/go.sum +++ b/go.sum @@ -37,6 +37,7 @@ cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9 cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8= +github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/Microsoft/go-winio v0.6.0 h1:slsWYD/zyx7lCXoZVlvQrj0hPTM1HI4+v1sIda2yDvg= @@ -47,6 +48,7 @@ github.com/asaskevich/govalidator v0.0.0-20200907205600-7a23bdc65eef/go.mod h1:W github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 h1:DklsrG3dyBCFEj5IhUbnKptjxatkF07cF2ak3yi77so= github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2/go.mod h1:WaHUgvxTVq04UNunO+XhnAqY/wQc+bxr74GqbsZ/Jqw= github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A= +github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= @@ -75,6 +77,7 @@ github.com/envoyproxy/go-control-plane v0.9.7/go.mod h1:cwu0lG7PUMfa9snN8LXBig5y github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/frankban/quicktest v1.14.3 h1:FJKSZTDHjyhriyC81FLQ0LY93eSai0ZyR/ZIkd3ZUKE= +github.com/frankban/quicktest v1.14.3/go.mod h1:mgiwOwqx65TmIk1wJ6Q7wvnVMocbUorkibMOrVTHZps= github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= github.com/go-chi/chi/v5 v5.0.8 h1:lD+NLqFcAi1ovnVZpsnObHGW4xb4J8lNmoYVfECH1Y0= @@ -185,6 +188,7 @@ github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= github.com/google/martian/v3 v3.1.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= @@ -241,6 +245,7 @@ github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxv github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= +github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -296,6 +301,7 @@ github.com/rogpeppe/go-internal v1.1.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFR github.com/rogpeppe/go-internal v1.2.2/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k= +github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/samber/lo v1.38.1 h1:j2XEAqXKb09Am4ebOg31SpvzUTTs6EN3VfgeLUhPdXM= github.com/samber/lo v1.38.1/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA= @@ -366,7 +372,6 @@ go.uber.org/dig v1.17.0 h1:5Chju+tUvcC+N7N6EV08BJz41UZuO3BmHcN4A287ZLI= go.uber.org/dig v1.17.0/go.mod h1:rTxpf7l5I0eBTlE6/9RL+lDybC7WFwY2QH55ZSjy1mU= go.uber.org/fx v1.20.0 h1:ZMC/pnRvhsthOZh9MZjMq5U8Or3mA9zBSPaLnzs3ihQ= go.uber.org/fx v1.20.0/go.mod h1:qCUj0btiR3/JnanEr1TYEePfSw6o/4qYJscgvzQ5Ub0= -go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.8.0 h1:dg6GjLku4EH+249NNmoIciG9N/jURbDG+pFlTkhzIC8= @@ -481,6 +486,7 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= +golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -529,6 +535,7 @@ golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM= golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.11.0 h1:F9tnn/DA/Im8nCwm+fX+1/eBwi4qFjRT++MhtVC4ZX0= +golang.org/x/term v0.11.0/go.mod h1:zC9APTIj3jG3FdV/Ons+XE1riIZXG4aZ4GTHiPZJPIU= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -544,6 +551,7 @@ golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxb golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.1.0 h1:xYY+Bajn2a7VBmTM5GikTmnK8ZuX8YgnQCqZpbBNtmA= +golang.org/x/time v0.1.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= @@ -695,6 +703,7 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA= gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= diff --git a/internal/agent/agent.go b/internal/agent/agent.go index b45955dff..52f137b8f 100644 --- a/internal/agent/agent.go +++ b/internal/agent/agent.go @@ -63,8 +63,9 @@ func New(config *Config, e engine.Engine, ds persistence.DataStoreFactory) *Agen // Config contains the configuration for an Agent. type Config struct { - DAG *dag.DAG - Dry bool + DAG *dag.DAG + DAGsDir string + Dry bool // RetryTarget is the status to retry. RetryTarget *model.Status @@ -337,7 +338,7 @@ func (a *Agent) run(ctx context.Context) error { utils.LogErr("write status", a.historyStore.Write(a.Status())) }() - ctx = dag.NewContext(ctx, a.DAG) + ctx = dag.NewContext(ctx, a.DAG, a.dataStoreFactory.NewDAGStore()) lastErr := a.scheduler.Schedule(ctx, a.graph, done) status := a.Status() @@ -369,7 +370,7 @@ func (a *Agent) dryRun() error { log.Printf("***** Starting DRY-RUN *****") - ctx := dag.NewContext(context.Background(), a.DAG) + ctx := dag.NewContext(context.Background(), a.DAG, a.dataStoreFactory.NewDAGStore()) lastErr := a.scheduler.Schedule(ctx, a.graph, done) status := a.Status() diff --git a/internal/agent/agent_test.go b/internal/agent/agent_test.go index 0501ada1d..b67cf57b7 100644 --- a/internal/agent/agent_test.go +++ b/internal/agent/agent_test.go @@ -36,7 +36,7 @@ func setupTest(t *testing.T) (string, engine.Engine, persistence.DataStoreFactor DataDir: path.Join(tmpDir, ".dagu", "data"), }) - e := engine.NewFactory(ds, nil).Create() + e := engine.NewFactory(ds, config.Get()).Create() return tmpDir, e, ds } diff --git a/internal/config/config.go b/internal/config/config.go index 64b8ea54e..474b6b185 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -14,7 +14,7 @@ type Config struct { Host string Port int DAGs string - Command string + Executable string WorkDir string IsBasicAuth bool BasicAuthUsername string @@ -81,7 +81,7 @@ func LoadConfig() error { viper.SetEnvPrefix("dagu") viper.SetEnvKeyReplacer(strings.NewReplacer("-", "_")) - _ = viper.BindEnv("command", "DAGU_EXECUTABLE") + _ = viper.BindEnv("executable", "DAGU_EXECUTABLE") _ = viper.BindEnv("dags", "DAGU_DAGS_DIR") _ = viper.BindEnv("workDir", "DAGU_WORK_DIR") _ = viper.BindEnv("isBasicAuth", "DAGU_IS_BASICAUTH") @@ -100,14 +100,15 @@ func LoadConfig() error { _ = viper.BindEnv("isAuthToken", "DAGU_IS_AUTHTOKEN") _ = viper.BindEnv("authToken", "DAGU_AUTHTOKEN") _ = viper.BindEnv("latestStatusToday", "DAGU_LATEST_STATUS") - command := "dagu" - if ex, err := os.Executable(); err == nil { - command = ex + + executable, err := os.Executable() + if err != nil { + return fmt.Errorf("failed to get executable path: %w", err) } viper.SetDefault("host", "127.0.0.1") viper.SetDefault("port", "8080") - viper.SetDefault("command", command) + viper.SetDefault("executable", executable) viper.SetDefault("dags", path.Join(appHome, "dags")) viper.SetDefault("workDir", "") viper.SetDefault("isBasicAuth", "0") @@ -130,8 +131,7 @@ func LoadConfig() error { _ = viper.ReadInConfig() cfg := &Config{} - err := viper.Unmarshal(cfg) - if err != nil { + if err := viper.Unmarshal(cfg); err != nil { return fmt.Errorf("failed to unmarshal cfg file: %w", err) } loadLegacyEnvs(cfg) diff --git a/internal/dag/builder.go b/internal/dag/builder.go index 4bed39b56..c7440845f 100644 --- a/internal/dag/builder.go +++ b/internal/dag/builder.go @@ -426,9 +426,29 @@ func buildStep(variables []string, def *stepDef, funcs []*funcDef, options Build } step.MailOnError = def.MailOnError step.Preconditions = loadPreCondition(def.Preconditions) + + if err := parseSubWorkflow(step, def.Run, def.Params); err != nil { + return nil, err + } + return step, nil } +func parseSubWorkflow(step *Step, name, params string) error { + if name == "" { + return nil + } + step.SubWorkflow = &SubWorkflow{ + Name: name, + Params: params, + } + step.ExecutorConfig.Type = ExecutorTypeSubWorkflow + step.Command = fmt.Sprintf("run") + step.Args = []string{name, params} + step.CmdWithArgs = fmt.Sprintf("%s %s", name, params) + return nil +} + func parseExecutor(step *Step, executor any) error { if executor == nil { return nil @@ -687,7 +707,7 @@ func assertStepDef(def *stepDef, funcs []*funcDef) error { return errStepNameRequired } // TODO: Refactor the validation check for each executor. - if def.Executor == nil && def.Command == nil && def.Call == nil { + if def.Executor == nil && def.Command == nil && def.Call == nil && def.Run == "" { return errStepCommandOrCallRequired } diff --git a/internal/dag/context.go b/internal/dag/context.go index e7808aa42..b94203a08 100644 --- a/internal/dag/context.go +++ b/internal/dag/context.go @@ -6,22 +6,35 @@ import ( ) // NewContext sets the current DAG to the context. -func NewContext(ctx context.Context, dag *DAG) context.Context { - return context.WithValue(ctx, DAGContextKey{}, dag) +func NewContext(ctx context.Context, dag *DAG, finder DAGFinder) context.Context { + return context.WithValue(ctx, ctxKey{}, Context{ + DAG: dag, + Finder: finder, + }) } -// DAGContextKey is used as the key for storing the DAG in the context. -type DAGContextKey struct{} +// DAGFinder is an interface for finding a DAG by name. +type DAGFinder interface { + FindByName(name string) (*DAG, error) +} + +type Context struct { + DAG *DAG + Finder DAGFinder +} + +// ctxKey is used as the key for storing the DAG in the context. +type ctxKey struct{} var ( errFailedAssertion = errors.New("failed to assert DAG from context") ) -// GetDAGFromContext returns the DAG from the current context. -func GetDAGFromContext(ctx context.Context) (*DAG, error) { - dag, ok := ctx.Value(DAGContextKey{}).(*DAG) +// GetContext returns the DAG from the current context. +func GetContext(ctx context.Context) (Context, error) { + dag, ok := ctx.Value(ctxKey{}).(Context) if !ok { - return nil, errFailedAssertion + return Context{}, errFailedAssertion } return dag, nil } diff --git a/internal/dag/definition.go b/internal/dag/definition.go index cbffde69e..e2f886143 100644 --- a/internal/dag/definition.go +++ b/internal/dag/definition.go @@ -55,6 +55,8 @@ type stepDef struct { SignalOnStop *string Env string Call *callFuncDef + Run string // Run is a sub workflow to run + Params string // Params is a string of parameters to pass to the sub workflow } type funcDef struct { diff --git a/internal/dag/step.go b/internal/dag/step.go index 77a5e3719..16fb26b7e 100644 --- a/internal/dag/step.go +++ b/internal/dag/step.go @@ -11,26 +11,32 @@ import ( // Step represents a step in a DAG. type Step struct { - Name string - Description string - Variables []string - OutputVariables *utils.SyncMap - Dir string - ExecutorConfig ExecutorConfig - CmdWithArgs string - Command string - Script string - Stdout string - Stderr string - Output string - Args []string - Depends []string - ContinueOn ContinueOn - RetryPolicy *RetryPolicy - RepeatPolicy RepeatPolicy - MailOnError bool - Preconditions []*Condition - SignalOnStop string + Name string `json:"Name"` + Description string `json:"Description,omitempty"` + Variables []string `json:"Variables,omitempty"` + OutputVariables *utils.SyncMap `json:"OutputVariables,omitempty"` + Dir string `json:"Dir,omitempty"` + ExecutorConfig ExecutorConfig `json:"ExecutorConfig,omitempty"` + CmdWithArgs string `json:"CmdWithArgs,omitempty"` + Command string `json:"Command,omitempty"` + Script string `json:"Script,omitempty"` + Stdout string `json:"Stdout,omitempty"` + Stderr string `json:"Stderr,omitempty"` + Output string `json:"Output,omitempty"` + Args []string `json:"Args,omitempty"` + Depends []string `json:"Depends,omitempty"` + ContinueOn ContinueOn `json:"ContinueOn,omitempty"` + RetryPolicy *RetryPolicy `json:"RetryPolicy,omitempty"` + RepeatPolicy RepeatPolicy `json:"RepeatPolicy,omitempty"` + MailOnError bool `json:"MailOnError,omitempty"` + Preconditions []*Condition `json:"Preconditions,omitempty"` + SignalOnStop string `json:"SignalOnStop,omitempty"` + SubWorkflow *SubWorkflow `json:"SubWorkflow,omitempty"` +} + +type SubWorkflow struct { + Name string + Params string } // ExecutorConfig represents the configuration for the executor of a step. @@ -39,6 +45,10 @@ type ExecutorConfig struct { Config map[string]interface{} } +const ( + ExecutorTypeSubWorkflow = "subworkflow" +) + // RetryPolicy represents the retry policy for a step. type RetryPolicy struct { Limit int diff --git a/internal/engine/engine_test.go b/internal/engine/engine_test.go index 7007e4e1b..3b0ccc9fc 100644 --- a/internal/engine/engine_test.go +++ b/internal/engine/engine_test.go @@ -39,7 +39,7 @@ func setupTest(t *testing.T) (string, engine.Engine, persistence.DataStoreFactor }) e := engine.NewFactory(ds, &config.Config{ - Command: path.Join(utils.MustGetwd(), "../../bin/dagu"), + Executable: path.Join(utils.MustGetwd(), "../../bin/dagu"), }).Create() return tmpDir, e, ds @@ -58,7 +58,7 @@ func setupTestTmpDir(t *testing.T) (string, engine.Engine, persistence.DataStore }) e := engine.NewFactory(ds, &config.Config{ - Command: path.Join(utils.MustGetwd(), "../../bin/dagu"), + Executable: path.Join(utils.MustGetwd(), "../../bin/dagu"), }).Create() return tmpDir, e, ds diff --git a/internal/engine/factory.go b/internal/engine/factory.go index 4047a28fa..6519201a0 100644 --- a/internal/engine/factory.go +++ b/internal/engine/factory.go @@ -18,11 +18,8 @@ type factoryImpl struct { func NewFactory(ds persistence.DataStoreFactory, cfg *config.Config) Factory { impl := &factoryImpl{ dataStoreFactory: ds, + executable: cfg.Executable, } - if cfg == nil { - cfg = config.Get() - } - impl.executable = cfg.Command return impl } diff --git a/internal/executor/mail.go b/internal/executor/mail.go index a41c853f5..d2cc89e61 100644 --- a/internal/executor/mail.go +++ b/internal/executor/mail.go @@ -70,16 +70,16 @@ func CreateMailExecutor(ctx context.Context, step dag.Step) (Executor, error) { exec := &MailExecutor{cfg: &cfg} - d, err := dag.GetDAGFromContext(ctx) + dagCtx, err := dag.GetContext(ctx) if err != nil { return nil, err } m := &mailer.Mailer{ Config: &mailer.Config{ - Host: d.Smtp.Host, - Port: d.Smtp.Port, - Username: d.Smtp.Username, - Password: d.Smtp.Password, + Host: dagCtx.DAG.Smtp.Host, + Port: dagCtx.DAG.Smtp.Port, + Username: dagCtx.DAG.Smtp.Username, + Password: dagCtx.DAG.Smtp.Password, }} exec.mailer = m diff --git a/internal/executor/subworkflow.go b/internal/executor/subworkflow.go new file mode 100644 index 000000000..1833c2e65 --- /dev/null +++ b/internal/executor/subworkflow.go @@ -0,0 +1,95 @@ +package executor + +import ( + "context" + "fmt" + "github.com/dagu-dev/dagu/internal/utils" + "io" + "os" + "os/exec" + "sync" + "syscall" + + "github.com/dagu-dev/dagu/internal/dag" +) + +type SubWorkflowExecutor struct { + cmd *exec.Cmd + lock sync.Mutex +} + +func (e *SubWorkflowExecutor) Run() error { + e.lock.Lock() + err := e.cmd.Start() + e.lock.Unlock() + if err != nil { + return err + } + return e.cmd.Wait() +} + +func (e *SubWorkflowExecutor) SetStdout(out io.Writer) { + e.cmd.Stdout = out +} + +func (e *SubWorkflowExecutor) SetStderr(out io.Writer) { + e.cmd.Stderr = out +} + +func (e *SubWorkflowExecutor) Kill(sig os.Signal) error { + e.lock.Lock() + defer e.lock.Unlock() + if e.cmd == nil || e.cmd.Process == nil { + return nil + } + return syscall.Kill(-e.cmd.Process.Pid, sig.(syscall.Signal)) +} + +func CreateSubWorkflowExecutor(ctx context.Context, step dag.Step) (Executor, error) { + executable, err := os.Executable() + if err != nil { + return nil, fmt.Errorf("failed to get executable path: %w", err) + } + + dagCtx, err := dag.GetContext(ctx) + if err != nil { + return nil, fmt.Errorf("failed to get dag context: %w", err) + } + + d, err := dagCtx.Finder.FindByName(step.SubWorkflow.Name) + if err != nil { + return nil, fmt.Errorf("failed to find subworkflow %q: %w", step.SubWorkflow.Name, err) + } + + params := os.ExpandEnv(step.SubWorkflow.Params) + + args := []string{ + "start", + fmt.Sprintf("--params=%q", params), + d.Location, + } + + cmd := exec.CommandContext(ctx, executable, args...) + if len(step.Dir) > 0 && !utils.FileExists(step.Dir) { + return nil, fmt.Errorf("directory %q does not exist", step.Dir) + } + cmd.Dir = step.Dir + cmd.Env = append(cmd.Env, os.Environ()...) + cmd.Env = append(cmd.Env, step.Variables...) + step.OutputVariables.Range(func(key, value interface{}) bool { + cmd.Env = append(cmd.Env, value.(string)) + return true + }) + cmd.SysProcAttr = &syscall.SysProcAttr{ + Setpgid: true, + Pgid: 0, + } + + return &SubWorkflowExecutor{ + cmd: cmd, + }, nil +} + +func init() { + Register(dag.ExecutorTypeSubWorkflow, CreateSubWorkflowExecutor) +} diff --git a/internal/persistence/interface.go b/internal/persistence/interface.go index 689865976..b3fba783b 100644 --- a/internal/persistence/interface.go +++ b/internal/persistence/interface.go @@ -47,6 +47,7 @@ type ( Rename(oldName, newName string) error GetSpec(name string) (string, error) UpdateSpec(name string, spec []byte) error + FindByName(name string) (*dag.DAG, error) } FlagStore interface { diff --git a/internal/persistence/local/dag_store.go b/internal/persistence/local/dag_store.go index 3feba3e36..93862f7e3 100644 --- a/internal/persistence/local/dag_store.go +++ b/internal/persistence/local/dag_store.go @@ -251,3 +251,60 @@ func (d *dagStoreImpl) Rename(oldDAGPath, newDAGPath string) error { } return os.Rename(oldLoc, newLoc) } + +func (d *dagStoreImpl) FindByName(name string) (*dag.DAG, error) { + file, err := d.resolve(name) + if err != nil { + return nil, err + } + cl := dag.Loader{} + return cl.Load(file, "") +} + +func (d *dagStoreImpl) resolve(name string) (string, error) { + // check if the name is a file path + if strings.Contains(name, string(filepath.Separator)) { + if !utils.FileExists(name) { + return "", fmt.Errorf("workflow %s not found", name) + } + return name, nil + } + + // check if the name is a file path + if strings.Contains(name, string(filepath.Separator)) { + foundPath, err := find(name) + if err != nil { + return "", fmt.Errorf("workflow %s not found", name) + } + return foundPath, nil + } + + // find the DAG definition + for _, dir := range []string{".", d.dir} { + subWorkflowPath := filepath.Join(dir, name) + foundPath, err := find(subWorkflowPath) + if err == nil { + return foundPath, nil + } + } + + // DAG not found + return "", fmt.Errorf("workflow %s not found", name) +} + +// find finds the sub workflow file with the given name. +func find(name string) (string, error) { + ext := path.Ext(name) + if ext == "" { + // try all supported extensions + for _, ext := range dag.EXTENSIONS { + if utils.FileExists(name + ext) { + return filepath.Abs(name + ext) + } + } + } else if utils.FileExists(name) { + // the name has an extension + return filepath.Abs(name) + } + return "", fmt.Errorf("sub workflow %s not found", name) +} diff --git a/internal/scheduler/node.go b/internal/scheduler/node.go index f88efcd4e..eebcf8b04 100644 --- a/internal/scheduler/node.go +++ b/internal/scheduler/node.go @@ -130,6 +130,7 @@ func (n *Node) setupExec(ctx context.Context) (executor.Executor, error) { defer n.mu.Unlock() ctx, fn := context.WithCancel(ctx) + n.cancelFunc = fn if n.step.CmdWithArgs != "" { diff --git a/service/frontend/handlers/response/step.go b/service/frontend/handlers/response/step.go index f9eec2ea4..41c1079f0 100644 --- a/service/frontend/handlers/response/step.go +++ b/service/frontend/handlers/response/step.go @@ -7,7 +7,7 @@ import ( ) func ToStepObject(step dag.Step) *models.StepObject { - return &models.StepObject{ + so := &models.StepObject{ Args: step.Args, CmdWithArgs: lo.ToPtr(step.CmdWithArgs), Command: lo.ToPtr(step.Command), @@ -24,6 +24,11 @@ func ToStepObject(step dag.Step) *models.StepObject { Script: lo.ToPtr(step.Script), Variables: step.Variables, } + if step.SubWorkflow != nil { + so.Run = step.SubWorkflow.Name + so.Params = step.SubWorkflow.Params + } + return so } func ToRepeatPolicy(repeatPolicy dag.RepeatPolicy) *models.RepeatPolicy { diff --git a/service/frontend/models/step_object.go b/service/frontend/models/step_object.go index 763bd7828..ab7d0df47 100644 --- a/service/frontend/models/step_object.go +++ b/service/frontend/models/step_object.go @@ -56,6 +56,9 @@ type StepObject struct { // Required: true Output *string `json:"Output"` + // params + Params string `json:"Params,omitempty"` + // preconditions // Required: true Preconditions []*Condition `json:"Preconditions"` @@ -64,6 +67,9 @@ type StepObject struct { // Required: true RepeatPolicy *RepeatPolicy `json:"RepeatPolicy"` + // run + Run string `json:"Run,omitempty"` + // script // Required: true Script *string `json:"Script"` diff --git a/service/frontend/restapi/embedded_spec.go b/service/frontend/restapi/embedded_spec.go index ed3ae4ff1..3ee0d8a69 100644 --- a/service/frontend/restapi/embedded_spec.go +++ b/service/frontend/restapi/embedded_spec.go @@ -977,6 +977,9 @@ func init() { "Output": { "type": "string" }, + "Params": { + "type": "string" + }, "Preconditions": { "type": "array", "items": { @@ -986,6 +989,9 @@ func init() { "RepeatPolicy": { "$ref": "#/definitions/repeatPolicy" }, + "Run": { + "type": "string" + }, "Script": { "type": "string" }, @@ -1965,6 +1971,9 @@ func init() { "Output": { "type": "string" }, + "Params": { + "type": "string" + }, "Preconditions": { "type": "array", "items": { @@ -1974,6 +1983,9 @@ func init() { "RepeatPolicy": { "$ref": "#/definitions/repeatPolicy" }, + "Run": { + "type": "string" + }, "Script": { "type": "string" }, diff --git a/service/scheduler/entry_reader/entry_reader_test.go b/service/scheduler/entry_reader/entry_reader_test.go index db008c9bf..5a8134757 100644 --- a/service/scheduler/entry_reader/entry_reader_test.go +++ b/service/scheduler/entry_reader/entry_reader_test.go @@ -43,9 +43,7 @@ func setupTest(t *testing.T) (string, engine.Factory) { SuspendFlagsDir: tmpDir, }) - ef := engine.NewFactory(ds, &config.Config{ - Command: path.Join(utils.MustGetwd(), "../../bin/dagu"), - }) + ef := engine.NewFactory(ds, &config.Config{}) return tmpDir, ef } diff --git a/service/scheduler/factory.go b/service/scheduler/factory.go index 623ebfc37..ab26edf73 100644 --- a/service/scheduler/factory.go +++ b/service/scheduler/factory.go @@ -10,7 +10,7 @@ import ( ) type jobFactory struct { - Command string + Executable string WorkDir string EngineFactory engine.Factory } @@ -18,7 +18,7 @@ type jobFactory struct { func (jf jobFactory) NewJob(d *dag.DAG, next time.Time) scheduler.Job { return &job.Job{ DAG: d, - Command: jf.Command, + Executable: jf.Executable, WorkDir: jf.WorkDir, Next: next, EngineFactory: jf.EngineFactory, diff --git a/service/scheduler/fx.go b/service/scheduler/fx.go index 8c0dd959a..f042f458a 100644 --- a/service/scheduler/fx.go +++ b/service/scheduler/fx.go @@ -2,7 +2,6 @@ package scheduler import ( "context" - "github.com/dagu-dev/dagu/internal/config" "github.com/dagu-dev/dagu/internal/engine" dagulogger "github.com/dagu-dev/dagu/internal/logger" @@ -42,9 +41,9 @@ func EntryReaderProvider( func JobFactoryProvider(cfg *config.Config, engineFactory engine.Factory) entry_reader.JobFactory { return &jobFactory{ - Command: cfg.Command, WorkDir: cfg.WorkDir, EngineFactory: engineFactory, + Executable: cfg.Executable, } } diff --git a/service/scheduler/job/job.go b/service/scheduler/job/job.go index dc691cf24..1ea986431 100644 --- a/service/scheduler/job/job.go +++ b/service/scheduler/job/job.go @@ -13,7 +13,7 @@ import ( // TODO: write tests type Job struct { DAG *dag.DAG - Command string + Executable string WorkDir string Next time.Time EngineFactory engine.Factory diff --git a/swagger.yaml b/swagger.yaml index d75f7cf58..67d7ec14a 100644 --- a/swagger.yaml +++ b/swagger.yaml @@ -628,6 +628,10 @@ definitions: type: array items: type: string + Run: + type: string + Params: + type: string Depends: type: array items: diff --git a/ui/src/models/index.ts b/ui/src/models/index.ts index 5c46432a8..9757e5d8e 100644 --- a/ui/src/models/index.ts +++ b/ui/src/models/index.ts @@ -200,6 +200,8 @@ export type Step = { RepeatPolicy: RepeatPolicy; MailOnError: boolean; Preconditions: Condition[]; + Run: string; + Params: string; }; export type RetryPolicy = {