Skip to content

Commit

Permalink
Implemented Basic EKS Integration (apache#16571)
Browse files Browse the repository at this point in the history
* Implemented Basic EKS Integration

* Remove explicit region defaulting

cr https://code.amazon.com/reviews/CR-52973030

* Refactor the token generation and remove the AWS CLI dependency.

* move kubeconfig generator into `hooks/eks.py`

* EKS List hooks return all results

* Use a tempfile to store kubeconfig data

* Move kube config into Hook class as a contextmanager

* Removed random traits in tests

* Rework the eks.rst doc file

* Implemented Jinja templates for operators

- Added jinja template fields
- Refactored fields to snake_case since they are now exposed
- Removed a couple of straggling pylint instructions; pylint is no longer used

* conn_id refactor

- Refactored using IDE magic:
 - any field named `self.conn_id` is now `self.aws_conn_id`
 - any param named `conn_id` is now `aws_conn_id`
 - any constant named `CONN_ID` is now `DEFAULT_CONN_ID`
- Sensors were missing template fields, added those.

* Remove try/log blocks from hooks

* Implemented EKS system tests

* Remove List and Describe Operators and supporting code.

* Fixed `nextToken` final result bug

* Remove some nesting and some unnecessary logging before raising an exception

* Use the force

* Improved docs and samples

* Additional jinja templating

* Corrected some misused Optionals.

* Doc formatting fix

* Corrected logo

* Corrected logo - background and size
  • Loading branch information
ferruzzi authored Aug 19, 2021
1 parent 3b41bb4 commit bee48f3
Show file tree
Hide file tree
Showing 19 changed files with 2,989 additions and 4 deletions.
2 changes: 1 addition & 1 deletion CONTRIBUTING.rst
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,7 @@ Here is the list of packages and their extras:
Package Extras
========================== ===========================
airbyte http
amazon apache.hive,exasol,ftp,google,imap,mongo,mysql,postgres,salesforce,ssh
amazon apache.hive,cncf.kubernetes,exasol,ftp,google,imap,mongo,mysql,postgres,salesforce,ssh
apache.beam google
apache.druid apache.hive
apache.hive amazon,microsoft.mssql,mysql,presto,samba,vertica
Expand Down
131 changes: 131 additions & 0 deletions airflow/providers/amazon/aws/example_dags/example_eks_templated.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from airflow.models.dag import DAG
from airflow.providers.amazon.aws.hooks.eks import ClusterStates, NodegroupStates
from airflow.providers.amazon.aws.operators.eks import (
EKSCreateClusterOperator,
EKSCreateNodegroupOperator,
EKSDeleteClusterOperator,
EKSDeleteNodegroupOperator,
EKSPodOperator,
)
from airflow.providers.amazon.aws.sensors.eks import EKSClusterStateSensor, EKSNodegroupStateSensor
from airflow.utils.dates import days_ago

# Example Jinja Template format, substitute your values:
"""
{
"cluster_name": "templated-cluster",
"cluster_role_arn": "arn:aws:iam::123456789012:role/role_name",
"nodegroup_subnets": ["subnet-12345ab", "subnet-67890cd"],
"resources_vpc_config": {
"subnetIds": ["subnet-12345ab", "subnet-67890cd"],
"endpointPublicAccess": true,
"endpointPrivateAccess": false
},
"nodegroup_name": "templated-nodegroup",
"nodegroup_role_arn": "arn:aws:iam::123456789012:role/role_name"
}
"""

with DAG(
dag_id='to-publish-manuals-templated',
schedule_interval=None,
start_date=days_ago(2),
max_active_runs=1,
tags=['example', 'templated'],
# render_template_as_native_obj=True is what converts the Jinja to Python objects, instead of a string.
render_template_as_native_obj=True,
) as dag:

# Create an Amazon EKS Cluster control plane without attaching a compute service.
create_cluster = EKSCreateClusterOperator(
task_id='create_eks_cluster',
compute=None,
cluster_name="{{ dag_run.conf['cluster_name'] }}",
cluster_role_arn="{{ dag_run.conf['cluster_role_arn'] }}",
resources_vpc_config="{{ dag_run.conf['resources_vpc_config'] }}",
)

await_create_cluster = EKSClusterStateSensor(
task_id='wait_for_create_cluster',
cluster_name="{{ dag_run.conf['cluster_name'] }}",
target_state=ClusterStates.ACTIVE,
)

create_nodegroup = EKSCreateNodegroupOperator(
task_id='create_eks_nodegroup',
cluster_name="{{ dag_run.conf['cluster_name'] }}",
nodegroup_name="{{ dag_run.conf['nodegroup_name'] }}",
nodegroup_subnets="{{ dag_run.conf['nodegroup_subnets'] }}",
nodegroup_role_arn="{{ dag_run.conf['nodegroup_role_arn'] }}",
)

await_create_nodegroup = EKSNodegroupStateSensor(
task_id='wait_for_create_nodegroup',
cluster_name="{{ dag_run.conf['cluster_name'] }}",
nodegroup_name="{{ dag_run.conf['nodegroup_name'] }}",
target_state=NodegroupStates.ACTIVE,
)

start_pod = EKSPodOperator(
task_id="run_pod",
cluster_name="{{ dag_run.conf['cluster_name'] }}",
image="amazon/aws-cli:latest",
cmds=["sh", "-c", "ls"],
labels={"demo": "hello_world"},
get_logs=True,
# Delete the pod when it reaches its final state, or the execution is interrupted.
is_delete_operator_pod=True,
)

delete_nodegroup = EKSDeleteNodegroupOperator(
task_id='delete_eks_nodegroup',
cluster_name="{{ dag_run.conf['cluster_name'] }}",
nodegroup_name="{{ dag_run.conf['nodegroup_name'] }}",
)

await_delete_nodegroup = EKSNodegroupStateSensor(
task_id='wait_for_delete_nodegroup',
cluster_name="{{ dag_run.conf['cluster_name'] }}",
nodegroup_name="{{ dag_run.conf['nodegroup_name'] }}",
target_state=NodegroupStates.NONEXISTENT,
)

delete_cluster = EKSDeleteClusterOperator(
task_id='delete_eks_cluster',
cluster_name="{{ dag_run.conf['cluster_name'] }}",
)

await_delete_cluster = EKSClusterStateSensor(
task_id='wait_for_delete_cluster',
cluster_name="{{ dag_run.conf['cluster_name'] }}",
target_state=ClusterStates.NONEXISTENT,
)

(
create_cluster
>> await_create_cluster
>> create_nodegroup
>> await_create_nodegroup
>> start_pod
>> delete_nodegroup
>> await_delete_nodegroup
>> delete_cluster
>> await_delete_cluster
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from os import environ

from airflow.models.dag import DAG
from airflow.providers.amazon.aws.hooks.eks import ClusterStates, NodegroupStates
from airflow.providers.amazon.aws.operators.eks import (
EKSCreateClusterOperator,
EKSDeleteClusterOperator,
EKSPodOperator,
)
from airflow.providers.amazon.aws.sensors.eks import EKSClusterStateSensor, EKSNodegroupStateSensor
from airflow.utils.dates import days_ago

CLUSTER_NAME = 'eks-demo'
NODEGROUP_SUFFIX = '-nodegroup'
NODEGROUP_NAME = CLUSTER_NAME + NODEGROUP_SUFFIX
ROLE_ARN = environ.get('EKS_DEMO_ROLE_ARN', 'arn:aws:iam::123456789012:role/role_name')
SUBNETS = environ.get('EKS_DEMO_SUBNETS', 'subnet-12345ab subnet-67890cd').split(' ')
VPC_CONFIG = {
'subnetIds': SUBNETS,
'endpointPublicAccess': True,
'endpointPrivateAccess': False,
}


with DAG(
dag_id='example_eks_using_defaults_dag',
schedule_interval=None,
start_date=days_ago(2),
max_active_runs=1,
tags=['example'],
) as dag:

# [START howto_operator_eks_create_cluster_with_nodegroup]
# Create an Amazon EKS cluster control plane and an EKS nodegroup compute platform in one step.
create_cluster_and_nodegroup = EKSCreateClusterOperator(
task_id='create_eks_cluster_and_nodegroup',
cluster_name=CLUSTER_NAME,
nodegroup_name=NODEGROUP_NAME,
cluster_role_arn=ROLE_ARN,
nodegroup_role_arn=ROLE_ARN,
# Opting to use the same ARN for the cluster and the nodegroup here,
# but a different ARN could be configured and passed if desired.
resources_vpc_config=VPC_CONFIG,
# Compute defaults to 'nodegroup' but is called out here for the purposed of the example.
compute='nodegroup',
)
# [END howto_operator_eks_create_cluster_with_nodegroup]

await_create_nodegroup = EKSNodegroupStateSensor(
task_id='wait_for_create_nodegroup',
cluster_name=CLUSTER_NAME,
nodegroup_name=NODEGROUP_NAME,
target_state=NodegroupStates.ACTIVE,
)

start_pod = EKSPodOperator(
task_id="run_pod",
cluster_name=CLUSTER_NAME,
image="amazon/aws-cli:latest",
cmds=["sh", "-c", "ls"],
labels={"demo": "hello_world"},
get_logs=True,
# Delete the pod when it reaches its final state, or the execution is interrupted.
is_delete_operator_pod=True,
)

# [START howto_operator_eks_force_delete_cluster]
# An Amazon EKS cluster can not be deleted with attached resources.
# Setting the `force` to `True` will delete any attached resources before deleting the cluster.
delete_all = EKSDeleteClusterOperator(
task_id='delete_nodegroup_and_cluster', cluster_name=CLUSTER_NAME, force_delete_compute=True
)
# [END howto_operator_eks_force_delete_cluster]

await_delete_cluster = EKSClusterStateSensor(
task_id='wait_for_delete_cluster',
cluster_name=CLUSTER_NAME,
target_state=ClusterStates.NONEXISTENT,
)

create_cluster_and_nodegroup >> await_create_nodegroup >> start_pod >> delete_all >> await_delete_cluster
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from os import environ

from airflow.models.dag import DAG
from airflow.providers.amazon.aws.hooks.eks import ClusterStates, NodegroupStates
from airflow.providers.amazon.aws.operators.eks import (
EKSCreateClusterOperator,
EKSCreateNodegroupOperator,
EKSDeleteClusterOperator,
EKSDeleteNodegroupOperator,
EKSPodOperator,
)
from airflow.providers.amazon.aws.sensors.eks import EKSClusterStateSensor, EKSNodegroupStateSensor
from airflow.utils.dates import days_ago

CLUSTER_NAME = 'eks-demo'
NODEGROUP_SUFFIX = '-nodegroup'
NODEGROUP_NAME = CLUSTER_NAME + NODEGROUP_SUFFIX
ROLE_ARN = environ.get('EKS_DEMO_ROLE_ARN', 'arn:aws:iam::123456789012:role/role_name')
SUBNETS = environ.get('EKS_DEMO_SUBNETS', 'subnet-12345ab subnet-67890cd').split(' ')
VPC_CONFIG = {
'subnetIds': SUBNETS,
'endpointPublicAccess': True,
'endpointPrivateAccess': False,
}


with DAG(
dag_id='example_eks_with_nodegroups_dag',
schedule_interval=None,
start_date=days_ago(2),
max_active_runs=1,
tags=['example'],
) as dag:

# [START howto_operator_eks_create_cluster]
# Create an Amazon EKS Cluster control plane without attaching a compute service.
create_cluster = EKSCreateClusterOperator(
task_id='create_eks_cluster',
cluster_name=CLUSTER_NAME,
cluster_role_arn=ROLE_ARN,
resources_vpc_config=VPC_CONFIG,
compute=None,
)
# [END howto_operator_eks_create_cluster]

await_create_cluster = EKSClusterStateSensor(
task_id='wait_for_create_cluster',
cluster_name=CLUSTER_NAME,
target_state=ClusterStates.ACTIVE,
)

# [START howto_operator_eks_create_nodegroup]
create_nodegroup = EKSCreateNodegroupOperator(
task_id='create_eks_nodegroup',
cluster_name=CLUSTER_NAME,
nodegroup_name=NODEGROUP_NAME,
nodegroup_subnets=SUBNETS,
nodegroup_role_arn=ROLE_ARN,
)
# [END howto_operator_eks_create_nodegroup]

await_create_nodegroup = EKSNodegroupStateSensor(
task_id='wait_for_create_nodegroup',
cluster_name=CLUSTER_NAME,
nodegroup_name=NODEGROUP_NAME,
target_state=NodegroupStates.ACTIVE,
)

# [START howto_operator_eks_pod_operator]
start_pod = EKSPodOperator(
task_id="run_pod",
cluster_name=CLUSTER_NAME,
image="amazon/aws-cli:latest",
cmds=["sh", "-c", "ls"],
labels={"demo": "hello_world"},
get_logs=True,
# Delete the pod when it reaches its final state, or the execution is interrupted.
is_delete_operator_pod=True,
)
# [END howto_operator_eks_pod_operator]

# [START howto_operator_eks_delete_nodegroup]
delete_nodegroup = EKSDeleteNodegroupOperator(
task_id='delete_eks_nodegroup', cluster_name=CLUSTER_NAME, nodegroup_name=NODEGROUP_NAME
)
# [END howto_operator_eks_delete_nodegroup]

await_delete_nodegroup = EKSNodegroupStateSensor(
task_id='wait_for_delete_nodegroup',
cluster_name=CLUSTER_NAME,
nodegroup_name=NODEGROUP_NAME,
target_state=NodegroupStates.NONEXISTENT,
)

# [START howto_operator_eks_delete_cluster]
delete_cluster = EKSDeleteClusterOperator(task_id='delete_eks_cluster', cluster_name=CLUSTER_NAME)
# [END howto_operator_eks_delete_cluster]

await_delete_cluster = EKSClusterStateSensor(
task_id='wait_for_delete_cluster',
cluster_name=CLUSTER_NAME,
target_state=ClusterStates.NONEXISTENT,
)

(
create_cluster
>> await_create_cluster
>> create_nodegroup
>> await_create_nodegroup
>> start_pod
>> delete_nodegroup
>> await_delete_nodegroup
>> delete_cluster
>> await_delete_cluster
)
Loading

0 comments on commit bee48f3

Please sign in to comment.