Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run dag-processor separate from scheduler in local dev #1788

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions airflow/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ const (

warningTriggererDisabledNoVersionDetectedMsg = "warning: could not find the version of Airflow or Runtime you're using, are you using an official Docker image for your project ? Disabling Airflow triggerer."

WebserverDockerContainerName = "webserver"
SchedulerDockerContainerName = "scheduler"
TriggererDockerContainerName = "triggerer"
PostgresDockerContainerName = "postgres"
WebserverDockerContainerName = "webserver"
SchedulerDockerContainerName = "scheduler"
TriggererDockerContainerName = "triggerer"
DagProcessorDockerContainerName = "dag-processor"
PostgresDockerContainerName = "postgres"
)

var (
Expand Down
28 changes: 28 additions & 0 deletions airflow/include/composeyml.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ x-common-env-vars: &common-env-vars
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql://{{ .PostgresUser }}:{{ .PostgresPassword }}@{{ .PostgresHost }}:5432
AIRFLOW__CORE__LOAD_EXAMPLES: "False"
AIRFLOW__CORE__FERNET_KEY: "d6Vefz3G9U_ynXB3cr7y_Ak35tAHkEGAVxuz_B-jzWw="
AIRFLOW__SCHEDULER__STANDALONE_DAG_PROCESSOR: "True"
AIRFLOW__WEBSERVER__SECRET_KEY: "{{ .ProjectName }}"
AIRFLOW__WEBSERVER__RBAC: "True"
AIRFLOW__WEBSERVER__EXPOSE_CONFIG: "True"
Expand Down Expand Up @@ -133,3 +134,30 @@ services:
{{end}}
{{ .AirflowEnvFile }}
{{end}}

dag-processor:
image: {{ .AirflowImage }}
command: >
bash -c "(airflow db upgrade || airflow upgradedb) && airflow dag-processor"
restart: unless-stopped
networks:
- airflow
user: {{ .AirflowUser }}
labels:
io.astronomer.docker: "true"
io.astronomer.docker.cli: "true"
io.astronomer.docker.component: "airflow-dag-processor"
depends_on:
- postgres
environment: *common-env-vars
volumes:
- {{ .AirflowHome }}/dags:/usr/local/airflow/dags:{{ .MountLabel }}
- {{ .AirflowHome }}/plugins:/usr/local/airflow/plugins:{{ .MountLabel }}
- {{ .AirflowHome }}/include:/usr/local/airflow/include:{{ .MountLabel }}
{{if .SettingsFileExist}}
- {{ .AirflowHome }}/{{ .SettingsFile }}:/usr/local/airflow/{{ .SettingsFile }}:{{ .MountLabel }}
{{end}}
{{if .DuplicateImageVolumes}}
- airflow_logs:/usr/local/airflow/logs
{{end}}
{{ .AirflowEnvFile }}
26 changes: 18 additions & 8 deletions cmd/airflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,13 @@ var (
schedulerLogs bool
webserverLogs bool
triggererLogs bool
dagProcessorLogs bool
noCache bool
schedulerExec bool
postgresExec bool
webserverExec bool
triggererExec bool
dagProcessorExec bool
connections bool
variables bool
pools bool
Expand Down Expand Up @@ -236,7 +238,7 @@ func newAirflowStartCmd(astroCoreClient astrocore.CoreClient) *cobra.Command {
cmd := &cobra.Command{
Use: "start",
Short: "Start a local Airflow environment",
Long: "Start a local Airflow environment. This command will spin up 4 Docker containers on your machine, each for a different Airflow component: Webserver, scheduler, triggerer and metadata database.",
Long: "Start a local Airflow environment. This command will spin up 5 Docker containers on your machine, each for a different Airflow component: Webserver, scheduler, triggerer, dag processor, and metadata database.",
Args: cobra.MaximumNArgs(1),
PreRunE: EnsureRuntime,
RunE: func(cmd *cobra.Command, args []string) error {
Expand Down Expand Up @@ -274,7 +276,7 @@ func newAirflowRunCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "run",
Short: "Run Airflow CLI commands within your local Airflow environment",
Long: "Run Airflow CLI commands within your local Airflow environment. These commands run in the webserver container but can interact with your local scheduler, workers, and metadata database.",
Long: "Run Airflow CLI commands within your local Airflow environment. These commands run in the webserver container but can interact with your local scheduler, dag processor, workers, and metadata database.",
PreRunE: EnsureRuntime,
RunE: airflowRun,
Example: RunExample,
Expand All @@ -287,14 +289,15 @@ func newAirflowLogsCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "logs",
Short: "Display component logs for your local Airflow environment",
Long: "Display scheduler, worker, and webserver logs for your local Airflow environment",
Long: "Display scheduler, worker, webserver, or dag-processor logs for your local Airflow environment",
PreRunE: SetRuntimeIfExists,
RunE: airflowLogs,
}
cmd.Flags().BoolVarP(&followLogs, "follow", "f", false, "Follow log output")
cmd.Flags().BoolVarP(&schedulerLogs, "scheduler", "s", false, "Output scheduler logs")
cmd.Flags().BoolVarP(&webserverLogs, "webserver", "w", false, "Output webserver logs")
cmd.Flags().BoolVarP(&triggererLogs, "triggerer", "t", false, "Output triggerer logs")
cmd.Flags().BoolVarP(&dagProcessorLogs, "dag-processor", "d", false, "Output dag-processor logs")
return cmd
}

Expand Down Expand Up @@ -392,8 +395,8 @@ func newAirflowUpgradeCheckCmd() *cobra.Command {
func newAirflowBashCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "bash",
Short: "Exec into a running an Airflow container",
Long: "Use this command to Exec into either the Webserver, Sechduler, Postgres, or Triggerer Container to run bash commands",
Short: "Exec into a container",
Long: "Use this command to exec into either the Webserver, Scheduler, Postgres, Triggerer, or Dag-Processor Container to run bash commands",
Args: cobra.MaximumNArgs(1),
PreRunE: EnsureRuntime,
RunE: airflowBash,
Expand All @@ -402,6 +405,7 @@ func newAirflowBashCmd() *cobra.Command {
cmd.Flags().BoolVarP(&webserverExec, "webserver", "w", false, "Exec into the webserver container")
cmd.Flags().BoolVarP(&postgresExec, "postgres", "p", false, "Exec into the postgres container")
cmd.Flags().BoolVarP(&triggererExec, "triggerer", "t", false, "Exec into the triggerer container")
cmd.Flags().BoolVarP(&dagProcessorExec, "dag-processor", "d", false, "Exec into the dag-processor container")
return cmd
}

Expand Down Expand Up @@ -730,8 +734,8 @@ func airflowLogs(cmd *cobra.Command, args []string) error {
// default is to display all logs
containersNames := make([]string, 0)

if !schedulerLogs && !webserverLogs && !triggererLogs {
containersNames = append(containersNames, []string{airflow.WebserverDockerContainerName, airflow.SchedulerDockerContainerName, airflow.TriggererDockerContainerName}...)
if !schedulerLogs && !webserverLogs && !triggererLogs && !dagProcessorLogs {
containersNames = append(containersNames, []string{airflow.WebserverDockerContainerName, airflow.SchedulerDockerContainerName, airflow.TriggererDockerContainerName, airflow.DagProcessorDockerContainerName}...)
}
if webserverLogs {
containersNames = append(containersNames, []string{airflow.WebserverDockerContainerName}...)
Expand All @@ -742,6 +746,9 @@ func airflowLogs(cmd *cobra.Command, args []string) error {
if triggererLogs {
containersNames = append(containersNames, []string{airflow.TriggererDockerContainerName}...)
}
if dagProcessorLogs {
containersNames = append(containersNames, []string{airflow.DagProcessorDockerContainerName}...)
}

// Silence Usage as we have now validated command input
cmd.SilenceUsage = true
Expand Down Expand Up @@ -918,7 +925,10 @@ func airflowBash(cmd *cobra.Command, args []string) error {
if schedulerExec {
container = airflow.SchedulerDockerContainerName
}
// exec into secheduler by default
if dagProcessorExec {
container = airflow.DagProcessorDockerContainerName
}
// exec into scheduler by default
if container == "" {
container = airflow.SchedulerDockerContainerName
}
Expand Down