Skip to content

Commit

Permalink
Add run and params field (#573)
Browse files Browse the repository at this point in the history
* rename config variable command -> executable

* Add sub workflow feature

* Update API for adding 'run' and 'params'
  • Loading branch information
yohamta authored May 25, 2024
1 parent 3c6772c commit be528cd
Show file tree
Hide file tree
Showing 30 changed files with 321 additions and 70 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ dagu version
- ️[Quick Start Guide](https://dagu.readthedocs.io/en/latest/quickstart.html)
- [Command Line Interface](https://dagu.readthedocs.io/en/latest/cli.html)
- [Web User Interface](https://dagu.readthedocs.io/en/latest/web_interface.html)
- YAML Format
- Writing DAG
- [Minimal DAG Definition](https://dagu.readthedocs.io/en/latest/yaml_format.html#minimal-dag-definition)
- [Running Arbitrary Code Snippets](https://dagu.readthedocs.io/en/latest/yaml_format.html#running-arbitrary-code-snippets)
- [Environment Variables](https://dagu.readthedocs.io/en/latest/yaml_format.html#defining-environment-variables)
Expand All @@ -266,6 +266,8 @@ dagu version
- [Redirecting Stdout and Stderr](https://dagu.readthedocs.io/en/latest/yaml_format.html#redirecting-stdout-and-stderr)
- [Lifecycle Hooks](https://dagu.readthedocs.io/en/latest/yaml_format.html#adding-lifecycle-hooks)
- [Repeating Task](https://dagu.readthedocs.io/en/latest/yaml_format.html#repeating-a-task-at-regular-intervals)
- [Minimal DAG Definition](https://dagu.readthedocs.io/en/latest/yaml_format.html#minimal-dag-definition)
- [Running Sub-DAG](https://dagu.readthedocs.io/en/latest/yaml_format.html#running-sub-dag)
- [All Available Fields for a DAG](https://dagu.readthedocs.io/en/latest/yaml_format.html#all-available-fields-for-dags)
- [All Available Fields for a Step](https://dagu.readthedocs.io/en/latest/yaml_format.html#all-available-fields-for-steps)
- Example DAGs
Expand Down
2 changes: 1 addition & 1 deletion cmd/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func setupTest(t *testing.T) (string, engine.Engine, persistence.DataStoreFactor
DataDir: path.Join(tmpDir, ".dagu", "data"),
})

e := engine.NewFactory(ds, nil).Create()
e := engine.NewFactory(ds, config.Get()).Create()

return tmpDir, e, ds
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/restart_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestRestartCommand(t *testing.T) {
require.NoError(t, err)

df := client.NewDataStoreFactory(config.Get())
e = engine.NewFactory(df, nil).Create()
e = engine.NewFactory(df, config.Get()).Create()

sts := e.GetRecentHistory(d, 2)
require.Len(t, sts, 2)
Expand Down
2 changes: 1 addition & 1 deletion cmd/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func retryCmd() *cobra.Command {

// TODO: use engine.Engine instead of client.DataStoreFactory
df := client.NewDataStoreFactory(config.Get())
e := engine.NewFactory(df, nil).Create()
e := engine.NewFactory(df, config.Get()).Create()

hs := df.NewHistoryStore()

Expand Down
19 changes: 18 additions & 1 deletion docs/source/yaml_format.rst
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,19 @@ Formatting JSON
.. _command-execution-over-ssh:


Running Sub-DAG
~~~~~~~~~~~~~~~~

You can run a sub-DAG from a DAG file. The sub-DAG is defined in a separate file and can be called using the `run` field.

.. code-block:: yaml
steps:
- name: A task
run: <DAG file name> # e.g., sub_dag, sub_dag.yaml, /path/to/sub_dag.yaml
params: "FOO=BAR" # optional
All Available Fields
--------------------

Expand Down Expand Up @@ -545,6 +558,8 @@ Each step can have its own set of configurations, including:
- ``repeatPolicy``: The repeat policy for the step.
- ``preconditions``: The conditions that must be met before a step can run.
- ``depends``: The step depends on the other step.
- ``run``: The sub-DAG to run.
- ``params``: The parameters to pass to the sub-DAG.

Example:

Expand Down Expand Up @@ -576,4 +591,6 @@ Example:
- condition: "`echo $1`"
expected: "param1"
depends:
- some task name step
- some task name step
run: sub_dag
params: "FOO=BAR"
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ require (
github.com/jessevdk/go-flags v1.5.0
github.com/mattn/go-shellwords v1.0.12
github.com/mitchellh/mapstructure v1.5.0
github.com/pkg/errors v0.9.1
github.com/robfig/cron/v3 v3.0.1
github.com/spf13/cobra v1.6.1
github.com/spf13/viper v1.15.0
github.com/stretchr/testify v1.8.2
go.uber.org/fx v1.20.0
go.uber.org/goleak v1.3.0
golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63
golang.org/x/text v0.12.0
gopkg.in/yaml.v2 v2.4.0
Expand Down Expand Up @@ -53,7 +55,6 @@ require (
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.0.2 // indirect
github.com/pelletier/go-toml/v2 v2.0.6 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/sirupsen/logrus v1.9.0 // indirect
github.com/spf13/afero v1.9.3 // indirect
github.com/spf13/cast v1.5.0 // indirect
Expand All @@ -63,7 +64,6 @@ require (
go.mongodb.org/mongo-driver v1.11.3 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/dig v1.17.0 // indirect
go.uber.org/goleak v1.3.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
go.uber.org/zap v1.23.0 // indirect
golang.org/x/mod v0.12.0 // indirect
Expand Down
11 changes: 10 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9
cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8=
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/Microsoft/go-winio v0.6.0 h1:slsWYD/zyx7lCXoZVlvQrj0hPTM1HI4+v1sIda2yDvg=
Expand All @@ -47,6 +48,7 @@ github.com/asaskevich/govalidator v0.0.0-20200907205600-7a23bdc65eef/go.mod h1:W
github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 h1:DklsrG3dyBCFEj5IhUbnKptjxatkF07cF2ak3yi77so=
github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2/go.mod h1:WaHUgvxTVq04UNunO+XhnAqY/wQc+bxr74GqbsZ/Jqw=
github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A=
github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
Expand Down Expand Up @@ -75,6 +77,7 @@ github.com/envoyproxy/go-control-plane v0.9.7/go.mod h1:cwu0lG7PUMfa9snN8LXBig5y
github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/frankban/quicktest v1.14.3 h1:FJKSZTDHjyhriyC81FLQ0LY93eSai0ZyR/ZIkd3ZUKE=
github.com/frankban/quicktest v1.14.3/go.mod h1:mgiwOwqx65TmIk1wJ6Q7wvnVMocbUorkibMOrVTHZps=
github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY=
github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw=
github.com/go-chi/chi/v5 v5.0.8 h1:lD+NLqFcAi1ovnVZpsnObHGW4xb4J8lNmoYVfECH1Y0=
Expand Down Expand Up @@ -185,6 +188,7 @@ github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0=
github.com/google/martian/v3 v3.1.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0=
Expand Down Expand Up @@ -241,6 +245,7 @@ github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxv
github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
Expand Down Expand Up @@ -296,6 +301,7 @@ github.com/rogpeppe/go-internal v1.1.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFR
github.com/rogpeppe/go-internal v1.2.2/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k=
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/samber/lo v1.38.1 h1:j2XEAqXKb09Am4ebOg31SpvzUTTs6EN3VfgeLUhPdXM=
github.com/samber/lo v1.38.1/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA=
Expand Down Expand Up @@ -366,7 +372,6 @@ go.uber.org/dig v1.17.0 h1:5Chju+tUvcC+N7N6EV08BJz41UZuO3BmHcN4A287ZLI=
go.uber.org/dig v1.17.0/go.mod h1:rTxpf7l5I0eBTlE6/9RL+lDybC7WFwY2QH55ZSjy1mU=
go.uber.org/fx v1.20.0 h1:ZMC/pnRvhsthOZh9MZjMq5U8Or3mA9zBSPaLnzs3ihQ=
go.uber.org/fx v1.20.0/go.mod h1:qCUj0btiR3/JnanEr1TYEePfSw6o/4qYJscgvzQ5Ub0=
go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/multierr v1.8.0 h1:dg6GjLku4EH+249NNmoIciG9N/jURbDG+pFlTkhzIC8=
Expand Down Expand Up @@ -481,6 +486,7 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E=
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down Expand Up @@ -529,6 +535,7 @@ golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.11.0 h1:F9tnn/DA/Im8nCwm+fX+1/eBwi4qFjRT++MhtVC4ZX0=
golang.org/x/term v0.11.0/go.mod h1:zC9APTIj3jG3FdV/Ons+XE1riIZXG4aZ4GTHiPZJPIU=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand All @@ -544,6 +551,7 @@ golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxb
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.1.0 h1:xYY+Bajn2a7VBmTM5GikTmnK8ZuX8YgnQCqZpbBNtmA=
golang.org/x/time v0.1.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
Expand Down Expand Up @@ -695,6 +703,7 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA=
gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
Expand Down
9 changes: 5 additions & 4 deletions internal/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@ func New(config *Config, e engine.Engine, ds persistence.DataStoreFactory) *Agen

// Config contains the configuration for an Agent.
type Config struct {
DAG *dag.DAG
Dry bool
DAG *dag.DAG
DAGsDir string
Dry bool

// RetryTarget is the status to retry.
RetryTarget *model.Status
Expand Down Expand Up @@ -337,7 +338,7 @@ func (a *Agent) run(ctx context.Context) error {
utils.LogErr("write status", a.historyStore.Write(a.Status()))
}()

ctx = dag.NewContext(ctx, a.DAG)
ctx = dag.NewContext(ctx, a.DAG, a.dataStoreFactory.NewDAGStore())

lastErr := a.scheduler.Schedule(ctx, a.graph, done)
status := a.Status()
Expand Down Expand Up @@ -369,7 +370,7 @@ func (a *Agent) dryRun() error {

log.Printf("***** Starting DRY-RUN *****")

ctx := dag.NewContext(context.Background(), a.DAG)
ctx := dag.NewContext(context.Background(), a.DAG, a.dataStoreFactory.NewDAGStore())

lastErr := a.scheduler.Schedule(ctx, a.graph, done)
status := a.Status()
Expand Down
2 changes: 1 addition & 1 deletion internal/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func setupTest(t *testing.T) (string, engine.Engine, persistence.DataStoreFactor
DataDir: path.Join(tmpDir, ".dagu", "data"),
})

e := engine.NewFactory(ds, nil).Create()
e := engine.NewFactory(ds, config.Get()).Create()

return tmpDir, e, ds
}
Expand Down
16 changes: 8 additions & 8 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type Config struct {
Host string
Port int
DAGs string
Command string
Executable string
WorkDir string
IsBasicAuth bool
BasicAuthUsername string
Expand Down Expand Up @@ -81,7 +81,7 @@ func LoadConfig() error {
viper.SetEnvPrefix("dagu")
viper.SetEnvKeyReplacer(strings.NewReplacer("-", "_"))

_ = viper.BindEnv("command", "DAGU_EXECUTABLE")
_ = viper.BindEnv("executable", "DAGU_EXECUTABLE")
_ = viper.BindEnv("dags", "DAGU_DAGS_DIR")
_ = viper.BindEnv("workDir", "DAGU_WORK_DIR")
_ = viper.BindEnv("isBasicAuth", "DAGU_IS_BASICAUTH")
Expand All @@ -100,14 +100,15 @@ func LoadConfig() error {
_ = viper.BindEnv("isAuthToken", "DAGU_IS_AUTHTOKEN")
_ = viper.BindEnv("authToken", "DAGU_AUTHTOKEN")
_ = viper.BindEnv("latestStatusToday", "DAGU_LATEST_STATUS")
command := "dagu"
if ex, err := os.Executable(); err == nil {
command = ex

executable, err := os.Executable()
if err != nil {
return fmt.Errorf("failed to get executable path: %w", err)
}

viper.SetDefault("host", "127.0.0.1")
viper.SetDefault("port", "8080")
viper.SetDefault("command", command)
viper.SetDefault("executable", executable)
viper.SetDefault("dags", path.Join(appHome, "dags"))
viper.SetDefault("workDir", "")
viper.SetDefault("isBasicAuth", "0")
Expand All @@ -130,8 +131,7 @@ func LoadConfig() error {
_ = viper.ReadInConfig()

cfg := &Config{}
err := viper.Unmarshal(cfg)
if err != nil {
if err := viper.Unmarshal(cfg); err != nil {
return fmt.Errorf("failed to unmarshal cfg file: %w", err)
}
loadLegacyEnvs(cfg)
Expand Down
22 changes: 21 additions & 1 deletion internal/dag/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,9 +426,29 @@ func buildStep(variables []string, def *stepDef, funcs []*funcDef, options Build
}
step.MailOnError = def.MailOnError
step.Preconditions = loadPreCondition(def.Preconditions)

if err := parseSubWorkflow(step, def.Run, def.Params); err != nil {
return nil, err
}

return step, nil
}

func parseSubWorkflow(step *Step, name, params string) error {
if name == "" {
return nil
}
step.SubWorkflow = &SubWorkflow{
Name: name,
Params: params,
}
step.ExecutorConfig.Type = ExecutorTypeSubWorkflow
step.Command = fmt.Sprintf("run")
step.Args = []string{name, params}
step.CmdWithArgs = fmt.Sprintf("%s %s", name, params)
return nil
}

func parseExecutor(step *Step, executor any) error {
if executor == nil {
return nil
Expand Down Expand Up @@ -687,7 +707,7 @@ func assertStepDef(def *stepDef, funcs []*funcDef) error {
return errStepNameRequired
}
// TODO: Refactor the validation check for each executor.
if def.Executor == nil && def.Command == nil && def.Call == nil {
if def.Executor == nil && def.Command == nil && def.Call == nil && def.Run == "" {
return errStepCommandOrCallRequired
}

Expand Down
29 changes: 21 additions & 8 deletions internal/dag/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,35 @@ import (
)

// NewContext sets the current DAG to the context.
func NewContext(ctx context.Context, dag *DAG) context.Context {
return context.WithValue(ctx, DAGContextKey{}, dag)
func NewContext(ctx context.Context, dag *DAG, finder DAGFinder) context.Context {
return context.WithValue(ctx, ctxKey{}, Context{
DAG: dag,
Finder: finder,
})
}

// DAGContextKey is used as the key for storing the DAG in the context.
type DAGContextKey struct{}
// DAGFinder is an interface for finding a DAG by name.
type DAGFinder interface {
FindByName(name string) (*DAG, error)
}

type Context struct {
DAG *DAG
Finder DAGFinder
}

// ctxKey is used as the key for storing the DAG in the context.
type ctxKey struct{}

var (
errFailedAssertion = errors.New("failed to assert DAG from context")
)

// GetDAGFromContext returns the DAG from the current context.
func GetDAGFromContext(ctx context.Context) (*DAG, error) {
dag, ok := ctx.Value(DAGContextKey{}).(*DAG)
// GetContext returns the DAG from the current context.
func GetContext(ctx context.Context) (Context, error) {
dag, ok := ctx.Value(ctxKey{}).(Context)
if !ok {
return nil, errFailedAssertion
return Context{}, errFailedAssertion
}
return dag, nil
}
2 changes: 2 additions & 0 deletions internal/dag/definition.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ type stepDef struct {
SignalOnStop *string
Env string
Call *callFuncDef
Run string // Run is a sub workflow to run
Params string // Params is a string of parameters to pass to the sub workflow
}

type funcDef struct {
Expand Down
Loading

0 comments on commit be528cd

Please sign in to comment.