Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated Flink sample for HDInsight on AKS #130

Draft
wants to merge 31 commits into
base: main
Choose a base branch
from
Draft
Changes from 1 commit
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
.
algattik committed Jul 20, 2023
commit 4aa6ecff149203d1c822471327abbc8aff1feffd
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -211,7 +211,7 @@ Implement a stream processing architecture using:
Implement a stream processing architecture using:

- Event Hubs Kafka (Ingest / Immutable Log)
- Flink on HDInsight or Azure Kubernetes Service (Stream Process)
- Flink on HDInsight (Stream Process)
- Event Hubs Kafka (Serve)

### [Event Hubs Kafka + Azure Functions + Cosmos DB](https://github.com/Azure-Samples/streaming-at-scale/tree/main/eventhubskafka-functions-cosmosdb)
2 changes: 1 addition & 1 deletion components/azure-event-hubs/create-event-hub.sh
Original file line number Diff line number Diff line change
@@ -29,7 +29,7 @@ echo ". name: $eventHubName"
echo ". partitions: $EVENTHUB_PARTITIONS"

az eventhubs eventhub create -n $eventHubName -g $RESOURCE_GROUP \
--message-retention 1 --partition-count $EVENTHUB_PARTITIONS --namespace-name $eventHubsNamespace \
--partition-count $EVENTHUB_PARTITIONS --namespace-name $eventHubsNamespace \
--enable-capture "$EVENTHUB_CAPTURE" --capture-interval 300 --capture-size-limit 314572800 \
--archive-name-format 'capture/{Namespace}/{EventHub}/{Year}_{Month}_{Day}_{Hour}_{Minute}_{Second}_{PartitionId}' \
--blob-container streamingatscale \
22 changes: 3 additions & 19 deletions eventhubskafka-flink-eventhubskafka/README.md
Original file line number Diff line number Diff line change
@@ -3,7 +3,7 @@ topic: sample
languages:
- azurecli
- json
- sql
- java
products:
- azure
- azure-container-instances
@@ -18,8 +18,6 @@ statusNotificationTargets:

This sample uses Apache Flink to process streaming data from Event Hubs Kafka and uses another Event Hubs Kafka as a sink to store JSON data. This is done to analyze pure streaming performance of Flink; no aggregation is done and data is passed as fast as possible from the input to the output. Data is augmented by adding additional fields.

The sample provides a choice among options for hosting Flink: Azure Kubernetes Service, or Azure HDInsight Hadoop (using YARN).

To support very high throughput, two different Event Hubs namespaces are deployed by the template. Event Hubs capacity is limited to up to 20 units of 1 MB/s each (although this limit can be increased through a support ticket). If incoming throughput is under 10 MB/s, you could deploy two Event Hub instances under a single namespace instead.

The provided scripts will create an end-to-end solution complete with load test client.
@@ -40,8 +38,6 @@ The following tools/languages are also needed:
- Install: `sudo apt install jq`
- [Maven](https://maven.apache.org/install.html)
- Install: `sudo apt install maven`
- [kubectl](https://kubernetes.io/docs/tasks/tools/install-kubectl/)
- [helm](https://helm.sh/docs/using_helm/#installing-helm)

## Setup Solution

@@ -78,9 +74,9 @@ To make sure that name collisions will be unlikely, you should use a random stri

The script will create the following resources:

- **Azure Container Instances** to host Spark Load Test Clients: by default one client will be created, generating a load of 1000 events/second
- **Azure Container Instances** to host Load Test Clients: by default one client will be created, generating a load of 1000 events/second
- **Event Hubs** Namespace, Hub and Consumer Group: to ingest data incoming from test clients and to store data generated by Apache Flink
- **HDInsight** or **Azure Kubernetes Service**: to host the Apache Flink job that processes event data
- **HDInsight**: to host the Apache Flink job that processes event data
- **Azure Monitor**: to monitor HDInsight, Azure Kubernetes Service and Flink

## Streamed Data
@@ -126,9 +122,6 @@ If you want to change some setting of the solution, like number of load test cli
# settings for AKS (-p aks)
export AKS_NODES=3
export AKS_VM_SIZE=Standard_D2s_v3
# settings for HDInsight YARN (-p hdinsight)
export HDINSIGHT_HADOOP_WORKERS=3
export HDINSIGHT_HADOOP_WORKER_SIZE=Standard_D3_V2

The above settings have been chosen to sustain a 1,000 msg/s stream. The script also contains settings for 5,000 msg/s and 10,000 msg/s.

@@ -179,15 +172,6 @@ The Flink Job Manager UI shows information about the current running job. The IP

![Flink Job Manager Web UI](../_doc/_images/flink-job-manager.png)

### Flink deployment on AKS

Deployment on Azure Kubernetes Service is done in single-job, highly available mode. The deployment includes:
* A Zookeeper cluster for maintaining quorum
* A pod for the (per-job) Flink Job Manager and
* A pod for each Flink Task Manager deployed as part of the job

In HA mode, the Flink the JobManager exposes a dynamically allocated port. Together with the JobManager, we run a custom sidecar container containing a small shell script. The script calls the JobManager REST API (running on fixed port 8081) to discover the JobManager RPC port, then calls the Kubernetes API to update the port exposed in the Kubernetes Service. RBAC is used to grant the sidecar container permissions to only this specific operation in the API.

### Flink deployment on HDInsight

Deployment on HDInsight is done in job server, highly available mode. The deployment runs a YARN job for the Flink Job Manager, then submits a JAR job to the Job Manager. The Job Manager creates a YARN application per job.
50 changes: 3 additions & 47 deletions eventhubskafka-flink-eventhubskafka/create-solution.sh
Original file line number Diff line number Diff line change
@@ -7,11 +7,11 @@ export PREFIX=''
export LOCATION="eastus"
export TESTTYPE="1"
export STEPS="CIPTM"
export FLINK_PLATFORM='aks'
export FLINK_PLATFORM='hdinsight-aks'
export FLINK_JOBTYPE='simple-relay'

usage() {
echo "Usage: $0 -d <deployment-name> [-s <steps>] [-t <test-type>] [-l <location>] [-p <platform>]"
echo "Usage: $0 -d <deployment-name> [-s <steps>] [-t <test-type>] [-l <location>]"
echo "-s: specify which steps should be executed. Default=$STEPS"
echo " Possible values:"
echo " C=COMMON"
@@ -21,7 +21,6 @@ usage() {
echo " M=METRICS reporting"
echo " V=VERIFY deployment"
echo "-t: test 1,5,10 thousands msgs/sec. Default=$TESTTYPE"
echo "-p: platform: 'aks', 'hdinsight-aks' or 'hdinsight'. Default=$FLINK_PLATFORM"
echo "-a: type of job: 'simple-relay' or 'complex-processing'. Default=$FLINK_JOBTYPE"
echo "-l: where to create the resources. Default=$LOCATION"
exit 1;
@@ -42,9 +41,6 @@ while getopts ":d:s:t:l:p:a:" arg; do
l)
LOCATION=${OPTARG}
;;
p)
FLINK_PLATFORM=${OPTARG}
;;
a)
FLINK_JOBTYPE=${OPTARG}
;;
@@ -65,14 +61,7 @@ if [ "$TESTTYPE" == "10" ]; then
export EVENTHUB_PARTITIONS=8
export FLINK_PARALLELISM=8
export SIMULATOR_INSTANCES=5
# settings for AKS (-p aks)
export AKS_NODES=4
export AKS_VM_SIZE=Standard_D4s_v3
# settings for HDInsight AKS (-p hdinsight-aks)
export HDINSIGHT_AKS_WORKER_SIZE=Standard_D8ds_v5
# settings for HDInsight YARN (-p hdinsight)
export HDINSIGHT_HADOOP_WORKERS=3
export HDINSIGHT_HADOOP_WORKER_SIZE=Standard_D3_V2
fi

# 5000 messages/sec
@@ -81,14 +70,7 @@ if [ "$TESTTYPE" == "5" ]; then
export EVENTHUB_PARTITIONS=4
export FLINK_PARALLELISM=4
export SIMULATOR_INSTANCES=3
# settings for AKS (-p aks)
export AKS_NODES=5
export AKS_VM_SIZE=Standard_D2s_v3
# settings for HDInsight AKS (-p hdinsight-aks)
export HDINSIGHT_AKS_WORKER_SIZE=Standard_D8ds_v5
# settings for HDInsight YARN (-p hdinsight)
export HDINSIGHT_HADOOP_WORKERS=3
export HDINSIGHT_HADOOP_WORKER_SIZE=Standard_D3_V2
fi

# 1000 messages/sec
@@ -97,14 +79,7 @@ if [ "$TESTTYPE" == "1" ]; then
export EVENTHUB_PARTITIONS=1
export FLINK_PARALLELISM=1
export SIMULATOR_INSTANCES=1
# settings for AKS (-p aks)
export AKS_NODES=3
export AKS_VM_SIZE=Standard_D2s_v3
# settings for HDInsight AKS (-p hdinsight-aks)
export HDINSIGHT_AKS_WORKER_SIZE=Standard_D8ds_v5
# settings for HDInsight YARN (-p hdinsight)
export HDINSIGHT_HADOOP_WORKERS=3
export HDINSIGHT_HADOOP_WORKER_SIZE=Standard_D3_V2
fi

# last checks and variables setup
@@ -138,18 +113,9 @@ echo "Configuration: "
echo ". Resource Group => $RESOURCE_GROUP"
echo ". Region => $LOCATION"
echo ". EventHubs => TU: $EVENTHUB_CAPACITY, Partitions: $EVENTHUB_PARTITIONS"
if [ "$FLINK_PLATFORM" == "hdinsight" ]; then
echo ". HDInsight YARN => VM: $HDINSIGHT_HADOOP_WORKER_SIZE, Workers: $HDINSIGHT_HADOOP_WORKERS"
elif [ "$FLINK_PLATFORM" == "hdinsight-aks" ]; then
echo ". HDInsight on AKS => VM: $HDINSIGHT_AKS_WORKER_SIZE"
else
echo ". AKS => VM: $AKS_VM_SIZE, Workers: $AKS_NODES"
fi
echo ". HDInsight on AKS => VM: $HDINSIGHT_AKS_WORKER_SIZE"
echo ". Flink => Parallelism: $FLINK_PARALLELISM"
echo ". Simulators => $SIMULATOR_INSTANCES"
if [[ -n ${AD_SP_APP_ID:-} && -n ${AD_SP_SECRET:-} ]]; then
echo ". Service Principal => $AD_SP_APP_ID"
fi
echo

echo "Deployment started..."
@@ -158,13 +124,11 @@ echo
echo "***** [C] Setting up COMMON resources"

export AZURE_STORAGE_ACCOUNT=$PREFIX"storage"
export VNET_NAME=$PREFIX"-vnet"

RUN=`echo $STEPS | grep C -o || true`
if [ ! -z "$RUN" ]; then
source ../components/azure-common/create-resource-group.sh
source ../components/azure-storage/create-storage-account.sh
source ../components/azure-common/create-virtual-network.sh
fi
echo

@@ -187,16 +151,8 @@ echo

echo "***** [P] Setting up PROCESSING"

export APPINSIGHTS_NAME=$PREFIX"appmon"
export HDINSIGHT_AKS_NAME=$PREFIX"hdi"
export HDINSIGHT_AKS_RESOURCE_PREFIX=$PREFIX
# Creating multiple HDInsight clusters in the same Virtual Network requires each cluster to have unique first six characters.
export HDINSIGHT_YARN_NAME="yarn"$PREFIX"hdi"
export HDINSIGHT_PASSWORD="Strong_Passw0rd!"
export AKS_CLUSTER=$PREFIX"aks"
export SERVICE_PRINCIPAL_KV_NAME=$AKS_CLUSTER
export SERVICE_PRINCIPAL_KEYVAULT=$PREFIX"spkv"
export ACR_NAME=$PREFIX"acr"

source ../components/azure-monitor/generate-workspace-name.sh