Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport to v2.2.0 release branch] Soak tests on Kubernetes (#1821) #1869

Open
wants to merge 1 commit into
base: release/v2.2.x
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions tests/soak/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
FROM debian:bookworm
ARG LK_VERSION
ARG CKPY_VERSION
RUN test -n "${LK_VERSION}" || (echo "LK_VERSION env variable required" && exit 1)
RUN test -n "${CKPY_VERSION}" || (echo "CKPY_VERSION env variable required" && exit 1)
ENV DEBIAN_FRONTEND=noninteractive
RUN apt update && apt install -y sudo
RUN mkdir -p /soaktests
COPY bootstrap.sh /soaktests
WORKDIR /soaktests
RUN /soaktests/bootstrap.sh ${CKPY_VERSION} ${LK_VERSION}
ENTRYPOINT [ "/soaktests/confluent-kafka-python/tests/soak/run.sh" ]
30 changes: 9 additions & 21 deletions tests/soak/README.md
Original file line number Diff line number Diff line change
@@ -6,26 +6,14 @@ of time, typically 2+ weeks, to vet out any resource leaks, etc.
The soak testing client is made up of a producer, producing messages to
the configured topic, and a consumer, consuming the same messages back.

DataDog reporting supported by setting datadog.api_key a and datadog.app_key
in the soak client configuration file.
OpenTelemetry reporting supported through OTLP.

# Installation

There are some convenience script to get you started.

On the host (ec2) where you aim to run the soaktest, do:

$ git clone https://github.com/confluentinc/librdkafka
$ git clone https://github.com/confluentinc/confluent-kafka-python

# Build librdkafka and python
$ ~/confluent-kafka-python/tests/soak/build.sh <librdkafka-version> <cfl-python-version>

# Set up config:
$ cp ~/confluent-kafka-python/tests/soak/ccloud.config.example ~/confluent-kafka-python/ccloud.config

# Start a screen session
$ screen bash

# Within the screen session, run the soak client
(screen)$ ~/run.sh
(screen)$ Ctrl-A d # to detach
TESTID=normal \
LK_VERSION=v2.2.0 \
CKPY_VERSION=v2.2.0 \
CC_BOOSTRAP_SERVERS=_ \
CC_USERNAME=_ \
CC_PASSWORD=_ \
DOCKER_REPOSITORY=_ ./install.sh
17 changes: 7 additions & 10 deletions tests/soak/ubuntu-bootstrap.sh → tests/soak/bootstrap.sh
Original file line number Diff line number Diff line change
@@ -16,12 +16,11 @@ fi

python_branch=$1
librdkafka_branch=$2
venv=$PWD/venv

sudo apt update
sudo apt install -y make gcc g++ zlib1g-dev libssl-dev libzstd-dev screen \
python3.6-dev python3-pip python3-virtualenv

pushd $HOME
sudo apt install -y git curl make gcc g++ zlib1g-dev libssl-dev libzstd-dev \
python3-dev python3-pip python3-venv

if [[ ! -d confluent-kafka-python ]]; then
git clone https://github.com/confluentinc/confluent-kafka-python
@@ -33,14 +32,14 @@ git checkout $python_branch

echo "Installing librdkafka $librdkafka_branch"
tools/bootstrap-librdkafka.sh --require-ssl $librdkafka_branch /usr
rm -rf tmp-build

echo "Installing interceptors"
tools/install-interceptors.sh
# echo "Installing interceptors"
# tools/install-interceptors.sh

venv=$HOME/venv
echo "Setting up virtualenv in $venv"
if [[ ! -d $venv ]]; then
virtualenv -p python3.6 $venv
python3 -m venv $venv
fi
source $venv/bin/activate

@@ -57,8 +56,6 @@ python -c "import confluent_kafka; print(confluent_kafka.version(), confluent_ka

deactivate

popd # $HOME

echo "All done, activate the virtualenv in $venv before running the client:"
echo "source $venv/bin/activate"

14 changes: 0 additions & 14 deletions tests/soak/ccloud.config.example

This file was deleted.

48 changes: 48 additions & 0 deletions tests/soak/install.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#!/bin/bash
set -e

DOCKER_REPOSITORY_DEFAULT=${DOCKER_REPOSITORY:-docker.io/library/njc-py-soak-tests}
NAMESPACE=njc-soak-tests
NOCACHE=${NOCACHE:---no-cache}

for var in LK_VERSION CKPY_VERSION CC_BOOSTRAP_SERVERS CC_USERNAME CC_PASSWORD TESTID \
DOCKER_REPOSITORY_DEFAULT; do
VAR_VALUE=$(eval echo \$$var)
if [ -z "$VAR_VALUE" ]; then
echo "env variable $var is required"
exit 1
fi
done

TAG=${LK_VERSION}-${CKPY_VERSION}

COMMAND="docker build . $NOCACHE --build-arg LK_VERSION=${LK_VERSION} \
--build-arg CKPY_VERSION=${CKPY_VERSION} \
-t ${DOCKER_REPOSITORY_DEFAULT}:${TAG}"
echo $COMMAND
$COMMAND

if [ ! -z "$DOCKER_REPOSITORY" ]; then
COMMAND="docker push ${DOCKER_REPOSITORY}:${TAG}"
echo $COMMAND
$COMMAND
fi

if [ "$(uname -p)" = "x86_64" ]; then
NODE_ARCH="amd64"
else
NODE_ARCH="arm64"
fi

COMMAND="helm upgrade --install njc-py-soak-tests-${TESTID} ./njc-py-soak-tests \
--set "cluster.bootstrapServers=${CC_BOOSTRAP_SERVERS}" \
--set "cluster.username=${CC_USERNAME}" \
--set "cluster.password=${CC_PASSWORD}" \
--set "image.repository=${DOCKER_REPOSITORY_DEFAULT}" \
--set "testid=${TESTID}" \
--set "fullnameOverride=njc-py-soak-tests-${TESTID}" \
--set "image.tag=${TAG}" \
--set "nodeSelector.kubernetes\\.io/arch=${NODE_ARCH}" \
--namespace "${NAMESPACE}" --create-namespace"
echo $COMMAND
$COMMAND
23 changes: 23 additions & 0 deletions tests/soak/njc-py-soak-tests/.helmignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Patterns to ignore when building packages.
# This supports shell glob matching, relative path matching, and
# negation (prefixed with !). Only one pattern per line.
.DS_Store
# Common VCS dirs
.git/
.gitignore
.bzr/
.bzrignore
.hg/
.hgignore
.svn/
# Common backup files
*.swp
*.bak
*.tmp
*.orig
*~
# Various IDEs
.project
.idea/
*.tmproj
.vscode/
24 changes: 24 additions & 0 deletions tests/soak/njc-py-soak-tests/Chart.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
apiVersion: v2
name: njc-py-soak-tests
description: A Helm chart for Kubernetes

# A chart can be either an 'application' or a 'library' chart.
#
# Application charts are a collection of templates that can be packaged into versioned archives
# to be deployed.
#
# Library charts provide useful utilities or functions for the chart developer. They're included as
# a dependency of application charts to inject those utilities and functions into the rendering
# pipeline. Library charts do not define any templates and therefore cannot be deployed.
type: application

# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 0.1.0

# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using.
# It is recommended to use it with quotes.
appVersion: "1.0.0"
1 change: 1 addition & 0 deletions tests/soak/njc-py-soak-tests/templates/NOTES.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
NJC Python soak tests installed!
62 changes: 62 additions & 0 deletions tests/soak/njc-py-soak-tests/templates/_helpers.tpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
{{/*
Expand the name of the chart.
*/}}
{{- define "njc-py-soak-tests.name" -}}
{{- default .Chart.Name .Values.nameOverride | trunc 63 | trimSuffix "-" }}
{{- end }}

{{/*
Create a default fully qualified app name.
We truncate at 63 chars because some Kubernetes name fields are limited to this (by the DNS naming spec).
If release name contains chart name it will be used as a full name.
*/}}
{{- define "njc-py-soak-tests.fullname" -}}
{{- if .Values.fullnameOverride }}
{{- .Values.fullnameOverride | trunc 63 | trimSuffix "-" }}
{{- else }}
{{- $name := default .Chart.Name .Values.nameOverride }}
{{- if contains $name .Release.Name }}
{{- .Release.Name | trunc 63 | trimSuffix "-" }}
{{- else }}
{{- printf "%s-%s" .Release.Name $name | trunc 63 | trimSuffix "-" }}
{{- end }}
{{- end }}
{{- end }}

{{/*
Create chart name and version as used by the chart label.
*/}}
{{- define "njc-py-soak-tests.chart" -}}
{{- printf "%s-%s" .Chart.Name .Chart.Version | replace "+" "_" | trunc 63 | trimSuffix "-" }}
{{- end }}

{{/*
Common labels
*/}}
{{- define "njc-py-soak-tests.labels" -}}
helm.sh/chart: {{ include "njc-py-soak-tests.chart" . }}
{{ include "njc-py-soak-tests.selectorLabels" . }}
{{- if .Chart.AppVersion }}
app.kubernetes.io/version: {{ .Chart.AppVersion | quote }}
{{- end }}
app.kubernetes.io/managed-by: {{ .Release.Service }}
{{- end }}

{{/*
Selector labels
*/}}
{{- define "njc-py-soak-tests.selectorLabels" -}}
app.kubernetes.io/name: {{ include "njc-py-soak-tests.name" . }}
app.kubernetes.io/instance: {{ .Release.Name }}
{{- end }}

{{/*
Create the name of the service account to use
*/}}
{{- define "njc-py-soak-tests.serviceAccountName" -}}
{{- if .Values.serviceAccount.create }}
{{- default (include "njc-py-soak-tests.fullname" .) .Values.serviceAccount.name }}
{{- else }}
{{- default "default" .Values.serviceAccount.name }}
{{- end }}
{{- end }}
87 changes: 87 additions & 0 deletions tests/soak/njc-py-soak-tests/templates/deployment.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ include "njc-py-soak-tests.fullname" . }}
labels:
{{- include "njc-py-soak-tests.labels" . | nindent 4 }}
spec:
replicas: {{ .Values.replicaCount }}
selector:
matchLabels:
{{- include "njc-py-soak-tests.selectorLabels" . | nindent 6 }}
strategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 0
maxUnavailable: 1
template:
metadata:
{{- with .Values.podAnnotations }}
annotations:
{{- toYaml . | nindent 8 }}
{{- end }}
labels:
{{- include "njc-py-soak-tests.selectorLabels" . | nindent 8 }}
spec:
{{- with .Values.imagePullSecrets }}
imagePullSecrets:
{{- toYaml . | nindent 8 }}
{{- end }}
serviceAccountName: {{ include "njc-py-soak-tests.serviceAccountName" . }}
securityContext:
{{- toYaml .Values.podSecurityContext | nindent 8 }}
volumes:
- name: secret
secret:
secretName: {{ include "njc-py-soak-tests.fullname" $ }}-secret
containers:
- name: {{ .Chart.Name }}
securityContext:
{{- toYaml .Values.securityContext | nindent 12 }}
image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
imagePullPolicy: {{ .Values.image.pullPolicy }}
env:
- name: NODEIP
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: status.hostIP
- name: OTEL_METRICS_EXPORTER
value: "otlp"
- name: OTEL_RESOURCE_ATTRIBUTES
value: "service.name={{ include "njc-py-soak-tests.fullname" . }},service.version={{ .Values.image.tag | default .Chart.AppVersion }}"
- name: OTEL_TRACES_EXPORTER
value: "none"
- name: OTEL_EXPORTER_OTLP_ENDPOINT
value: "http://$(NODEIP):14317"
- name: OTEL_METRIC_EXPORT_INTERVAL
value: "10000"
- name: TESTID
value: "{{ .Values.testid }}"
volumeMounts:
- name: "secret"
mountPath: "/soaktests/confluent-kafka-python/ccloud.config"
subPath: ccloud.config
readOnly: true
# livenessProbe:
# httpGet:
# path: /
# port: http
# readinessProbe:
# httpGet:
# path: /
# port: http
resources:
{{- toYaml .Values.resources | nindent 12 }}
{{- with .Values.nodeSelector }}
nodeSelector:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.affinity }}
affinity:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.tolerations }}
tolerations:
{{- toYaml . | nindent 8 }}
{{- end }}
35 changes: 35 additions & 0 deletions tests/soak/njc-py-soak-tests/templates/secrets.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
{{- range $nameSuffix, $values := .Values.secrets }}
---
apiVersion: v1
kind: Secret
metadata:
name: {{ include "njc-py-soak-tests.fullname" $ }}-{{ $nameSuffix }}
{{- with $values.annotations }}
annotations:
{{- range $key, $value := . }}
{{- printf "%s: %s" $key (tpl $value $ | quote) | nindent 4 }}
{{- end }}
{{- end }}
labels:
{{- range $key, $value := $values.labels }}
{{- printf "%s: %s" $key (tpl $value $ | quote) | nindent 4 }}
{{- end }}
type: {{ default "Opaque" $values.type }}
{{- with $values.data }}
data:
{{- toYaml . | nindent 2 }}
{{- end }}
stringData:
ccloud.config: |-
bootstrap.servers={{ $.Values.cluster.bootstrapServers }}
sasl.mechanisms=PLAIN
security.protocol=SASL_SSL
sasl.username={{ $.Values.cluster.username }}
sasl.password={{ $.Values.cluster.password }}
{{- $.Values.properties | nindent 4 -}}
{{- with $values.stringData }}
{{- range $key, $value := . }}
{{- printf "%s: %s" $key (tpl $value $ | quote) | nindent 2 }}
{{- end }}
{{- end }}
{{- end -}}
12 changes: 12 additions & 0 deletions tests/soak/njc-py-soak-tests/templates/serviceaccount.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{{- if .Values.serviceAccount.create -}}
apiVersion: v1
kind: ServiceAccount
metadata:
name: {{ include "njc-py-soak-tests.serviceAccountName" . }}
labels:
{{- include "njc-py-soak-tests.labels" . | nindent 4 }}
{{- with .Values.serviceAccount.annotations }}
annotations:
{{- toYaml . | nindent 4 }}
{{- end }}
{{- end }}
66 changes: 66 additions & 0 deletions tests/soak/njc-py-soak-tests/values.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Default values for njc-py-soak-tests.
# This is a YAML-formatted file.
# Declare variables to be passed into your templates.

replicaCount: 1

image:
# To use in minikube
repository: docker.io/library/njc-py-soak-tests
pullPolicy: Always
# Overrides the image tag whose default is the chart appVersion.
tag: ""

imagePullSecrets: []
nameOverride: ""
fullnameOverride: "njc-py-soak-tests"

serviceAccount:
# Specifies whether a service account should be created
create: true
# Annotations to add to the service account
annotations: {}
# The name of the service account to use.
# If not set and create is true, a name is generated using the fullname template
name: ""

podAnnotations: {}

podSecurityContext: {}
# fsGroup: 2000

securityContext: {}
# capabilities:
# drop:
# - ALL
# readOnlyRootFilesystem: true
# runAsNonRoot: true
# runAsUser: 1000

resources:
limits:
cpu: 100m
memory: 512Mi
requests:
cpu: 100m
memory: 128Mi

cluster:
bootstrapServers: ""
username: ""
password: ""

properties: |-
enable.idempotence=true
debug=eos,generic,broker,security,consumer
linger.ms=2
compression.type=lz4
secrets:
secret: {}

nodeSelector: {}

tolerations: []

affinity: {}
3 changes: 2 additions & 1 deletion tests/soak/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
datadog
psutil
opentelemetry-distro
opentelemetry-exporter-otlp
13 changes: 3 additions & 10 deletions tests/soak/run.sh
Original file line number Diff line number Diff line change
@@ -11,19 +11,12 @@ if [[ -z $librdkafka_version ]]; then
exit 1
fi

if [[ -z $STY ]]; then
echo "This script should be run from inside a screen session"
exit 1
fi

set -u
topic="pysoak-$librdkafka_version"
logfile="${topic}.log.bz2"
topic="pysoak-$TESTID-$librdkafka_version"

echo "Starting soak client using topic $topic with logs written to $logfile"
echo "Starting soak client using topic $topic"
set +x
time confluent-kafka-python/tests/soak/soakclient.py -t $topic -r 80 -f confluent-kafka-python/ccloud.config 2>&1 \
| tee /dev/stderr | bzip2 > $logfile
time opentelemetry-instrument confluent-kafka-python/tests/soak/soakclient.py -i $TESTID -t $topic -r 80 -f confluent-kafka-python/ccloud.config 2>&1
ret=$?
echo "Python client exited with status $ret"
exit $ret
161 changes: 97 additions & 64 deletions tests/soak/soakclient.py
Original file line number Diff line number Diff line change
@@ -20,7 +20,7 @@
# long term validation testing.
#
# Usage:
# tests/soak/soakclient.py -t <topic> -r <produce-rate> -f <client-conf-file>
# tests/soak/soakclient.py -i <testid> -t <topic> -r <produce-rate> -f <client-conf-file>
#
# A unique topic should be used for each soakclient instance.
#
@@ -30,6 +30,7 @@
from confluent_kafka.admin import AdminClient, NewTopic
from collections import defaultdict
from builtins import int
from opentelemetry import metrics
import argparse
import threading
import time
@@ -40,7 +41,6 @@
import resource
import os
import psutil
import datadog


class SoakRecord (object):
@@ -72,26 +72,27 @@ class SoakClient (object):
The producer and consumer run in separate background threads.
"""

# DataDog metric name prefix
DD_PFX = "kafka.client.soak.python."
# metric name prefix
METRIC_PFX = "kafka.client.soak.python."

def dr_cb(self, err, msg):
""" Producer delivery report callback """
if err is not None:
self.logger.warning("producer: delivery failed: {} [{}]: {}".
format(msg.topic(), msg.partition(), err))
self.dr_err_cnt += 1
self.dd_incr("producer.drerr", 1)
self.dd.event("Message delivery failure",
"Message delivery failed: {} [{}]: {}".
format(msg.topic(), msg.partition(), err),
hostname=self.hostname)
self.incr_counter("producer.drerr", 1)
self.incr_counter("producer.delivery.failure", 1, {
"topic": msg.topic(),
"partition": str(msg.partition()),
"err": str(err)
})

else:
self.dr_cnt += 1
self.dd_incr("producer.drok", 1)
self.dd_gauge("producer.latency", msg.latency(),
tags=["partition:{}".format(msg.partition())])
self.incr_counter("producer.drok", 1)
self.set_gauge("producer.latency", msg.latency(),
tags={"partition": "{}".format(msg.partition())})
if (self.dr_cnt % self.disprate) == 0:
self.logger.debug("producer: delivered message to {} [{}] at offset {} in {}s".format(
msg.topic(), msg.partition(), msg.offset(), msg.latency()))
@@ -118,7 +119,7 @@ def produce_record(self):
continue

self.producer_msgid += 1
self.dd_incr("producer.send", 1)
self.incr_counter("producer.send", 1)

def producer_status(self):
""" Print producer status """
@@ -218,7 +219,7 @@ def consumer_run(self):
if msg.error() is not None:
self.logger.error("consumer: error: {}".format(msg.error()))
self.consumer_err_cnt += 1
self.dd_incr("consumer.error", 1)
self.incr_counter("consumer.error", 1)
continue

try:
@@ -229,18 +230,18 @@ def consumer_run(self):
"{} [{}] at offset {} (headers {}): {}".format(
msg.topic(), msg.partition(), msg.offset(), msg.headers(), ex))
self.msg_err_cnt += 1
self.dd_incr("consumer.msgerr", 1)
self.incr_counter("consumer.msgerr", 1)

self.msg_cnt += 1
self.dd_incr("consumer.msg", 1)
self.incr_counter("consumer.msg", 1)

# end-to-end latency
headers = dict(msg.headers())
txtime = headers.get('time', None)
if txtime is not None:
latency = time.time() - float(txtime)
self.dd_gauge("consumer.e2e_latency", latency,
tags=["partition:{}".format(msg.partition())])
self.set_gauge("consumer.e2e_latency", latency,
tags={"partition": "{}".format(msg.partition())})
else:
latency = None

@@ -265,7 +266,7 @@ def consumer_run(self):
msg.offset(), msg.headers(), hw,
self.last_committed))
self.msg_dup_cnt += (hw + 1) - msg.offset()
self.dd_incr("consumer.msgdup", 1)
self.incr_counter("consumer.msgdup", 1)
elif msg.offset() > hw + 1:
self.logger.warning("consumer: Lost messages, now at {} "
"[{}] at offset {} (headers {}): "
@@ -274,7 +275,7 @@ def consumer_run(self):
msg.offset(), msg.headers(), hw,
self.last_committed))
self.msg_miss_cnt += msg.offset() - (hw + 1)
self.dd_incr("consumer.missedmsg", 1)
self.incr_counter("consumer.missedmsg", 1)

hwmarks[hwkey] = msg.offset()

@@ -297,22 +298,22 @@ def consumer_error_cb(self, err):
""" Consumer error callback """
self.logger.error("consumer: error_cb: {}".format(err))
self.consumer_error_cb_cnt += 1
self.dd_incr("consumer.errorcb", 1)
self.incr_counter("consumer.errorcb", 1)

def consumer_commit_cb(self, err, partitions):
""" Auto commit result callback """
if err is not None:
self.logger.error("consumer: offset commit failed for {}: {}".format(partitions, err))
self.consumer_err_cnt += 1
self.dd_incr("consumer.error", 1)
self.incr_counter("consumer.error", 1)
else:
self.last_committed = partitions

def producer_error_cb(self, err):
""" Producer error callback """
self.logger.error("producer: error_cb: {}".format(err))
self.producer_error_cb_cnt += 1
self.dd_incr("producer.errorcb", 1)
self.incr_counter("producer.errorcb", 1)

def rtt_stats(self, d):
""" Extract broker rtt statistics from the stats dict in @param d """
@@ -324,14 +325,14 @@ def rtt_stats(self, d):

parts = ','.join([str(x['partition']) for x in broker['toppars'].values()])

tags = ["broker:{}".format(broker['nodeid']),
"partitions:{}".format(parts),
"type:{}".format(d['type'])]
tags = {"broker": "{}".format(broker['nodeid']),
"partitions": "{}".format(parts),
"type": "{}".format(d['type'])}

self.dd_gauge("broker.rtt.p99",
float(broker['rtt']['p99']) / 1000000.0, tags=tags)
self.dd_gauge("broker.rtt.avg",
float(broker['rtt']['avg']) / 1000000.0, tags=tags)
self.set_gauge("broker.rtt.p99",
float(broker['rtt']['p99']) / 1000000.0, tags=tags)
self.set_gauge("broker.rtt.avg",
float(broker['rtt']['avg']) / 1000000.0, tags=tags)

def stats_cb(self, json_str):
""" Common statistics callback. """
@@ -350,16 +351,15 @@ def stats_cb(self, json_str):
self.logger.info("{} stats: {}/{} brokers UP, {} partition leaders: {}".format(
d['name'], len(up_brokers), broker_cnt, self.topic, leaders))

# Emit the full raw stats every now and then for troubleshooting.
# Emit the full raw stats for troubleshooting.
self.stats_cnt[d['type']] += 1
if (self.stats_cnt[d['type']] % 11) == 0:
self.logger.info("{} raw stats: {}".format(d['name'], json_str))
self.logger.info("{} raw stats: {}".format(d['name'], json_str))

self.rtt_stats(d)

# Sample the producer queue length
if d['type'] == 'producer':
self.dd_gauge("producer.outq", len(self.producer))
self.set_gauge("producer.outq", len(self.producer))

def create_topic(self, topic, conf):
""" Create the topic if it doesn't already exist """
@@ -374,7 +374,7 @@ def create_topic(self, topic, conf):
else:
raise

def __init__(self, topic, rate, conf):
def __init__(self, testid, topic, rate, conf):
""" SoakClient constructor. conf is the client configuration """
self.topic = topic
self.rate = rate
@@ -383,6 +383,12 @@ def __init__(self, topic, rate, conf):
self.stats_cnt = {'producer': 0, 'consumer': 0}
self.start_time = time.time()

# OTEL instruments
self.counters = {}
self.gauges = {}
self.gauge_cbs = {}
self.gauge_values = {}

self.last_rusage = None
self.last_rusage_time = None
self.proc = psutil.Process(os.getpid())
@@ -395,8 +401,10 @@ def __init__(self, topic, rate, conf):

# Construct a unique id to use for metrics hostname so that
# multiple instances of the SoakClient can run on the same machine.
hostname = datadog.util.hostname.get_hostname()
hostname = os.environ["HOSTNAME"]
self.hostname = "py-{}-{}".format(hostname, self.topic)
self.testid = testid
self.meter = metrics.get_meter("njc.python.soak.tests")

self.logger.info("SoakClient id {}".format(self.hostname))

@@ -405,13 +413,7 @@ def __init__(self, topic, rate, conf):
conf['group.id'] = 'soakclient-{}-{}-{}'.format(
self.hostname, version()[0], sys.version.split(' ')[0])

# Separate datadog config from client config
datadog_conf = {k[len("datadog."):]: conf[k]
for k in conf.keys() if k.startswith("datadog.")}
conf = {k: v for k, v in conf.items() if not k.startswith("datadog.")}

# Set up datadog agent
self.init_datadog(datadog_conf)
conf = {k: v for k, v in conf.items()}

def filter_config(conf, filter_out, strip_prefix):
len_sp = len(strip_prefix)
@@ -432,7 +434,7 @@ def filter_config(conf, filter_out, strip_prefix):
# Create Producer and Consumer, each running in its own thread.
#
conf['stats_cb'] = self.stats_cb
conf['statistics.interval.ms'] = 10000
conf['statistics.interval.ms'] = 120000

# Producer
pconf = filter_config(conf, ["consumer.", "admin."], "producer.")
@@ -465,36 +467,66 @@ def terminate(self):
# Final resource usage
soak.get_rusage()

def init_datadog(self, options):
""" Initialize datadog agent """
datadog.initialize(**options)

self.dd = datadog.ThreadStats()
self.dd.start()

def dd_incr(self, metric_name, incrval):
""" Increment datadog metric counter by incrval """
self.dd.increment(self.DD_PFX + metric_name, incrval, host=self.hostname)

def dd_gauge(self, metric_name, val, tags=None):
""" Set datadog metric gauge to val """
self.dd.gauge(self.DD_PFX + metric_name, val,
tags=tags, host=self.hostname)
def incr_counter(self, metric_name, incrval, tags=None):
""" Increment metric counter by incrval """
if not tags:
tags = {}
tags.update({
"host": self.hostname,
"testid": self.testid
})

full_metric_name = self.METRIC_PFX + metric_name
if full_metric_name not in self.counters:
self.counters[full_metric_name] = self.meter.create_counter(
full_metric_name,
description=full_metric_name,
)
counter = self.counters[full_metric_name]
counter.add(incrval, tags)

def set_gauge(self, metric_name, val, tags=None):
""" Set metric gauge to val """
if not tags:
tags = {}
tags.update({
"host": self.hostname,
"testid": self.testid
})

full_metric_name = self.METRIC_PFX + metric_name
if full_metric_name not in self.gauge_values:
self.gauge_values[full_metric_name] = []

self.gauge_values[full_metric_name].append([val, tags])

if full_metric_name not in self.gauges:
def cb(_):
for value in self.gauge_values[full_metric_name]:
yield metrics.Observation(value[0], value[1])
self.gauge_values[full_metric_name] = []

self.gauge_cbs[full_metric_name] = cb
self.gauges[full_metric_name] = self.meter.create_observable_gauge(
callbacks=[self.gauge_cbs[full_metric_name]],
name=full_metric_name,
description=full_metric_name
)

def calc_rusage_deltas(self, curr, prev, elapsed):
""" Calculate deltas between previous and current resource usage """

# User CPU %
user_cpu = ((curr.ru_utime - prev.ru_utime) / elapsed) * 100.0
self.dd_gauge("cpu.user", user_cpu)
self.set_gauge("cpu.user", user_cpu)

# System CPU %
sys_cpu = ((curr.ru_stime - prev.ru_stime) / elapsed) * 100.0
self.dd_gauge("cpu.system", sys_cpu)
self.set_gauge("cpu.system", sys_cpu)

# Max RSS memory (monotonic)
max_rss = curr.ru_maxrss / 1024.0
self.dd_gauge("memory.rss.max", max_rss)
self.set_gauge("memory.rss.max", max_rss)

self.logger.info("User CPU: {:.1f}%, System CPU: {:.1f}%, MaxRSS {:.3f}MiB".format(
user_cpu, sys_cpu, max_rss))
@@ -513,12 +545,13 @@ def get_rusage(self):

# Current RSS memory
rss = float(self.proc.memory_info().rss) / (1024.0*1024.0)
self.dd_gauge("memory.rss", rss)
self.set_gauge("memory.rss", rss)


if __name__ == '__main__':

parser = argparse.ArgumentParser(description='Kafka client soak test')
parser.add_argument('-i', dest='testid', type=str, required=True, help='Test id')
parser.add_argument('-b', dest='brokers', type=str, default=None, help='Bootstrap servers')
parser.add_argument('-t', dest='topic', type=str, required=True, help='Topic to use')
parser.add_argument('-r', dest='rate', type=float, default=10, help='Message produce rate per second')
@@ -554,7 +587,7 @@ def get_rusage(self):
conf['enable.partition.eof'] = False

# Create SoakClient
soak = SoakClient(args.topic, args.rate, conf)
soak = SoakClient(args.testid, args.topic, args.rate, conf)

# Get initial resource usage
soak.get_rusage()