Skip to content

Commit

Permalink
ci: fix kafka e2e test (numaproj#16)
Browse files Browse the repository at this point in the history
* ci: fix kafka e2e test

Signed-off-by: Derek Wang <[email protected]>
  • Loading branch information
whynowy authored May 27, 2022
1 parent 5c9433f commit be6b441
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 10 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ jobs:
max-parallel: 4
matrix:
driver: [jetstream]
case: [e2e]
case: [e2e, kafka-e2e]
steps:
- name: Checkout code
uses: actions/checkout@v3
Expand Down
15 changes: 13 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,22 @@ test-e2e:
test-kafka-e2e:
test-%:
$(MAKE) image e2eapi-image
kubectl -n numaflow-system delete po -lapp=controller-manager,app.kubernetes.io/part-of=numaflow
kubectl -n numaflow-system delete po -lapp.kubernetes.io/component=controller-manager,app.kubernetes.io/part-of=numaflow
kubectl -n numaflow-system delete po e2e-api-pod --ignore-not-found=true
cat test/manifests/e2e-api-pod.yaml | sed '[email protected]/numaproj/@$(IMAGE_NAMESPACE)/@' | sed 's/:$(BASE_VERSION)/:$(VERSION)/' | kubectl -n numaflow-system apply -f -
go generate $(shell find ./test/$* -name '*.go')
go test -v -timeout 10m -count 1 --tags test -p 1 ./test/$*
-go test -v -timeout 10m -count 1 --tags test -p 1 ./test/$*
$(MAKE) cleanup-e2e


.PHONY: cleanup-e2e
cleanup-e2e:
kubectl -n numaflow-system delete svc -lnumaflow-e2e=true --ignore-not-found=true
kubectl -n numaflow-system delete sts -lnumaflow-e2e=true --ignore-not-found=true
kubectl -n numaflow-system delete deploy -lnumaflow-e2e=true --ignore-not-found=true
kubectl -n numaflow-system delete cm -lnumaflow-e2e=true --ignore-not-found=true
kubectl -n numaflow-system delete secret -lnumaflow-e2e=true --ignore-not-found=true
kubectl -n numaflow-system delete po -lnumaflow-e2e=true --ignore-not-found=true

.PHONY: image
image: clean dist/$(BINARY_NAME)-linux-amd64
Expand Down
4 changes: 2 additions & 2 deletions config/apps/kafka/kafka-minimal.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ spec:
spec:
containers:
- name: main
image: wurstmeister/kafka
image: wurstmeister/kafka:2.13-2.8.1
imagePullPolicy: IfNotPresent
ports:
- containerPort: 9092
Expand All @@ -81,7 +81,7 @@ spec:
- name: KAFKA_BROKER_ID
value: "0"
- name: KAFKA_CREATE_TOPICS
value: "input-topic:1:2,middle-topic:1:2,output-topic:1:2"
value: "input-topic:1:1,middle-topic:1:1,output-topic:1:1"
readinessProbe:
tcpSocket:
port: 9092
Expand Down
4 changes: 4 additions & 0 deletions config/apps/kafka/kustomization.yaml
Original file line number Diff line number Diff line change
@@ -1,2 +1,6 @@
resources:
- kafka-minimal.yaml

commonLabels:
"numaflow-e2e": "true"

12 changes: 11 additions & 1 deletion test/e2e-api/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package main

import (
"fmt"
"github.com/Shopify/sarama"
"io/ioutil"
"log"
"net/http"
"strconv"
"time"

"github.com/Shopify/sarama"
)

func init() {
Expand All @@ -17,12 +18,14 @@ func init() {
topic := r.URL.Query().Get("topic")
admin, err := sarama.NewClusterAdmin(brokers, sarama.NewConfig())
if err != nil {
log.Println(err)
w.WriteHeader(500)
_, _ = w.Write([]byte(err.Error()))
return
}
defer admin.Close()
if err = admin.CreateTopic(topic, &sarama.TopicDetail{NumPartitions: 1, ReplicationFactor: 1}, true); err != nil {
log.Println(err)
w.WriteHeader(500)
_, _ = w.Write([]byte(err.Error()))
return
Expand All @@ -34,6 +37,7 @@ func init() {
topic := r.URL.Query().Get("topic")
admin, err := sarama.NewClusterAdmin(brokers, sarama.NewConfig())
if err != nil {
log.Println(err)
w.WriteHeader(500)
_, _ = w.Write([]byte(err.Error()))
return
Expand All @@ -49,13 +53,15 @@ func init() {
http.HandleFunc("/kafka/list-topics", func(w http.ResponseWriter, r *http.Request) {
consumer, err := sarama.NewConsumer(brokers, sarama.NewConfig())
if err != nil {
log.Println(err)
w.WriteHeader(500)
_, _ = w.Write([]byte(err.Error()))
return
}
defer consumer.Close()
topics, err := consumer.Topics()
if err != nil {
log.Println(err)
w.WriteHeader(500)
_, _ = w.Write([]byte(err.Error()))
return
Expand All @@ -68,6 +74,7 @@ func init() {
topic := r.URL.Query().Get("topic")
count, err := strconv.Atoi(r.URL.Query().Get("count"))
if err != nil {
log.Println(err)
w.WriteHeader(500)
_, _ = w.Write([]byte(err.Error()))
return
Expand All @@ -81,6 +88,7 @@ func init() {
defer consumer.Close()
pConsumer, err := consumer.ConsumePartition(topic, 0, sarama.OffsetOldest)
if err != nil {
log.Println(err)
w.WriteHeader(500)
_, _ = w.Write([]byte(err.Error()))
return
Expand All @@ -104,6 +112,7 @@ func init() {
key := r.URL.Query().Get("key")
buf, err := ioutil.ReadAll(r.Body)
if err != nil {
log.Println(err)
w.WriteHeader(500)
_, _ = w.Write([]byte(err.Error()))
return
Expand Down Expand Up @@ -135,6 +144,7 @@ func init() {
mf := newMessageFactory(r.URL.Query())
duration, err := time.ParseDuration(r.URL.Query().Get("sleep"))
if err != nil {
log.Println(err)
w.WriteHeader(400)
_, _ = w.Write([]byte(err.Error()))
return
Expand Down
12 changes: 8 additions & 4 deletions test/kafka-e2e/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ import (
"time"
)

//go:generate kubectl -n numaflow-system delete statefulset zookeeper kafka-broker --ignore-not-found=true
//go:generate kubectl apply -k ../../config/apps/kafka -n numaflow-system
//go:generate kubectl -n numaflow-system delete statefulset zookeeper kafka-broker --ignore-not-found=true
//go:generate kubectl apply -k ../../config/apps/kafka -n numaflow-system
// Wait for zookeeper to come up
//go:generate sleep 60

type KafkaSuite struct {
fixtures.E2ESuite
Expand Down Expand Up @@ -73,6 +75,7 @@ func (ks *KafkaSuite) TestKafkaSink() {
fixtures.ExpectKafkaTopicCount(outputTopic, 15, 3*time.Second)

}

func (ks *KafkaSuite) TestKafkaSourceSink() {
inputTopic := fixtures.CreateKafkaTopic()
outputTopic := fixtures.CreateKafkaTopic()
Expand All @@ -86,8 +89,9 @@ func (ks *KafkaSuite) TestKafkaSourceSink() {
Name: "input",
Source: &dfv1.Source{
Kafka: &dfv1.KafkaSource{
Brokers: []string{"kafka-broker:9092"},
Topic: inputTopic,
Brokers: []string{"kafka-broker:9092"},
Topic: inputTopic,
ConsumerGroupName: "test-group",
},
},
},
Expand Down
2 changes: 2 additions & 0 deletions test/manifests/e2e-api-pod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ apiVersion: v1
kind: Pod
metadata:
name: e2e-api-pod
labels:
numaflow-e2e: "true"
spec:
containers:
- name: main
Expand Down

0 comments on commit be6b441

Please sign in to comment.