Skip to content
This repository was archived by the owner on Oct 6, 2023. It is now read-only.

Commit e27d886

Browse files
committed
add wait step for kafka and zookeeper
1 parent 2e439d7 commit e27d886

File tree

8 files changed

+179
-131
lines changed

8 files changed

+179
-131
lines changed

.github/workflows/test.yaml

+25-1
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,39 @@ jobs:
1212
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
1313
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
1414
image: confluentinc/cp-kafka:latest
15+
ports:
16+
- 9092:9092
17+
zookeeper:
18+
env:
19+
ZOOKEEPER_CLIENT_PORT: "2181"
20+
ZOOKEEPER_TICK_TIME: "2000"
21+
image: confluentinc/cp-zookeeper:latest
22+
ports:
23+
- 2181:2181
1524
steps:
1625
- name: Install Go
1726
uses: actions/setup-go@v1
1827
with:
1928
go-version: ${{ matrix.go-version }}
2029
- name: Checkout code
2130
uses: actions/checkout@v1
31+
- name: Wait for Kafka
32+
run: "netstat -a || true\nwaitfor() {\n\twhile ! nc -v -z $1 $2\n\tdo sleep
33+
1\n\tdone\n}\nwaitfor localhost 9092\nwaitfor localhost 2181\n"
34+
shell: bash
35+
timeout-minutes: 1
2236
- name: Test
23-
run: go test ./...
37+
run: "nc -v -z localhost 9092\nnetstat -a || true\necho 'package main\nimport
38+
(\n\t\"fmt\"\n\t\"net\"\n\t\"os\"\n)\n\nfunc main() {\n\t_, err := net.Dial(\"tcp\",
39+
os.Args[1])\n\tfmt.Printf(\"dial %s: %v\\n\", os.Args[1], err)\n\tif err !=
40+
nil {\n\t\tos.Exit(1)\n\t}\n}\n' > /tmp/dial.go\ngo build -o /tmp/dial /tmp/dial.go\n/tmp/dial
41+
localhost:9092 || true\n\nmkdir /tmp/m\ncp go.mod client.go /tmp/m\necho '\npackage
42+
kafkatest_test\n\nimport (\n\t_ \"log\"\n\t\"net\"\n\t\"os\"\n\t\"testing\"\n\t_
43+
\"time\"\n\n\t_ \"github.com/Shopify/sarama\"\n\t_ \"github.com/frankban/quicktest\"\n\n\t_
44+
\"github.com/heetch/kafkatest\"\n)\n\nfunc TestFoo(t *testing.T) {\n\t_, err
45+
:= net.Dial(\"tcp\", os.Getenv(\"KAFKA_ADDRS\"))\n\tif err != nil {\n\t\tt.Errorf(\"cannot
46+
dial: %v\", err)\n\t}\n}\n' > /tmp/m/client_test.go\n(\n\tcd /tmp/m\n\tgo
47+
test\n)\nKAFKA_ADDRS=localhost:9092 go test ./...\n"
2448
strategy:
2549
matrix:
2650
go-version:

client.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ package kafkatest
55
import (
66
"crypto/rand"
77
"fmt"
8+
"log"
9+
"net"
810
"os"
911
"strconv"
1012
"strings"
@@ -54,6 +56,8 @@ func New() (*Kafka, error) {
5456
if addrsStr == "" {
5557
addrsStr = "localhost:9092"
5658
}
59+
_, err = net.Dial("tcp", addrsStr)
60+
log.Printf("net.Dial %q: %v", addrsStr, err)
5761
addrs := strings.Split(addrsStr, ",")
5862
useTLS, err := boolVar("KAFKA_USE_TLS")
5963
if err != nil {
@@ -84,7 +88,7 @@ func New() (*Kafka, error) {
8488
client.admin = admin
8589
break
8690
}
87-
if !a.Next() {
91+
if !a.More() {
8892
return nil, fmt.Errorf("cannot connect to Kafka cluster at %q after %v: %v", addrs, retryLimit, err)
8993
}
9094
}

client_test.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package kafkatest_test
22

33
import (
4+
"log"
45
"testing"
56
"time"
67

@@ -11,17 +12,21 @@ import (
1112
)
1213

1314
func TestNew(t *testing.T) {
15+
//sarama.Logger = log.New(os.Stderr, "[Sarama] ", log.LstdFlags)
16+
1417
c := qt.New(t)
1518
k, err := kafkatest.New()
1619
c.Assert(err, qt.Equals, nil)
1720

21+
log.Printf("************** succeeded in making initial connection")
22+
1823
// Produce a message to a new topic.
1924
cfg := k.Config()
2025
cfg.Producer.Return.Successes = true
2126
cfg.Producer.Return.Errors = true
2227

2328
producer, err := sarama.NewSyncProducer(k.Addrs(), cfg)
24-
c.Assert(err, qt.Equals, nil)
29+
c.Assert(err, qt.Equals, nil, qt.Commentf("addrs: %q", k.Addrs()))
2530
defer producer.Close()
2631
topic := k.NewTopic()
2732

cue.mod/pkg/github.com/heetch/cue-schema/github/workflow/example/go/.github/workflows/test.yaml

-44
This file was deleted.

cue.mod/pkg/github.com/heetch/cue-schema/github/workflow/example/go/ci.yaml

-42
This file was deleted.

cue.mod/pkg/github.com/heetch/cue-schema/github/workflow/go/go.cue

+84-38
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
// for easy modification of some parameters.
77
package workflow
88

9+
import "list"
10+
911
on: _ | *["push", "pull_request"]
1012
name: _ | *"Test"
1113
jobs: test: {
@@ -14,34 +16,35 @@ jobs: test: {
1416
platform: _ | *[ "\(p)-latest" for p in Platforms ]
1517
}
1618
"runs-on": "${{ matrix.platform }}"
17-
steps: [{
19+
steps: list.FlattenN([{
1820
name: "Install Go"
1921
uses: "actions/setup-go@v1"
2022
with: "go-version": "${{ matrix.go-version }}"
21-
}, {
23+
},
24+
// {
25+
// name: "Module cache"
26+
// uses: "actions/cache@v1"
27+
// with: {
28+
// path: "~/go/pkg/mod"
29+
// key: "${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}"
30+
// "restore-keys": "${{ runner.os }}-go-"
31+
// }
32+
// },
33+
{
2234
name: "Checkout code"
2335
uses: "actions/checkout@v1"
24-
}, _ | *{
36+
},
37+
// Include setup steps for any services that require them,
38+
[ServiceConfig[name].SetupStep for name, enabled in Services if enabled && ServiceConfig[name].SetupStep != null],
39+
_ | *{
2540
name: "Test"
2641
run: RunTest
27-
}]
42+
}], 1)
2843
}
2944

3045
// Include all named services.
31-
for name in Services {
32-
jobs: test: services: "\(name)": ServiceConfig[name]
33-
}
34-
35-
jobs: test: services: kafka?: _ | *{
36-
image: "confluentinc/cp-kafka:latest"
37-
env: {
38-
KAFKA_BROKER_ID: "1"
39-
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
40-
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092"
41-
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT"
42-
KAFKA_INTER_BROKER_LISTENER_NAME: "PLAINTEXT"
43-
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
44-
}
46+
for name, enabled in Services if enabled {
47+
jobs: test: services: "\(name)": ServiceConfig[name].Service
4548
}
4649

4750
// Platforms configures what platforms to run the tests on.
@@ -57,33 +60,76 @@ RunTest :: *"go test ./..." | string
5760
// Service configures which services to make available.
5861
// The configuration the service with name N is taken from
5962
// ServiceConfig[N]
60-
Services :: [... string]
63+
Services :: [_]: bool
6164

6265
// ServiceConfig holds the default configuration for services that
6366
// can be started by naming them in Services.
64-
ServiceConfig :: [_]: _
67+
ServiceConfig :: [_]: {
68+
// Service holds the contents of `jobs: test: services: "\(serviceName)"`
69+
"Service": Service
70+
71+
// SetupStep optionally holds a step to run to set up the service
72+
// before the main workflow action is run (for example to wait
73+
// for the service to become ready).
74+
SetupStep: JobStep | *null
75+
}
76+
77+
// Kafka requires zookeeper too.
78+
if Services["kafka"] != _|_ {
79+
Services :: zookeeper: true
80+
}
6581

6682
ServiceConfig :: kafka: {
67-
image: "confluentinc/cp-kafka:latest"
68-
env: {
69-
KAFKA_BROKER_ID: "1"
70-
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
71-
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092"
72-
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT"
73-
KAFKA_INTER_BROKER_LISTENER_NAME: "PLAINTEXT"
74-
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
83+
Service: {
84+
image: "confluentinc/cp-kafka:latest"
85+
ports: ["9092:9092"]
86+
env: {
87+
KAFKA_BROKER_ID: "1"
88+
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
89+
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092"
90+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT"
91+
KAFKA_INTER_BROKER_LISTENER_NAME: "PLAINTEXT"
92+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
93+
}
94+
}
95+
SetupStep: {
96+
name: "Wait for Kafka"
97+
"timeout-minutes": 1
98+
shell: "bash"
99+
run: #"""
100+
netstat -a || true
101+
waitfor() {
102+
while ! nc -v -z $1 $2
103+
do sleep 1
104+
done
105+
}
106+
waitfor localhost 9092
107+
waitfor localhost 2181
108+
109+
"""#
110+
}
111+
}
112+
113+
ServiceConfig :: zookeeper: {
114+
Service: {
115+
image: "confluentinc/cp-zookeeper:latest"
116+
ports: ["2181:2181"]
117+
env: {
118+
ZOOKEEPER_CLIENT_PORT: "2181"
119+
ZOOKEEPER_TICK_TIME: "2000"
120+
}
75121
}
76122
}
77123

78-
ServiceConfig :: postgres: {
79-
env: {
80-
POSTGRES_DB: "postgres"
81-
POSTGRES_PASSWORD: "postgres"
82-
POSTGRES_USER: "postgres"
124+
ServiceConfig :: postgres: _ |*{
125+
Service: {
126+
image: "postgres:10.8"
127+
ports: ["5432:5432"]
128+
env: {
129+
POSTGRES_DB: "postgres"
130+
POSTGRES_PASSWORD: "postgres"
131+
POSTGRES_USER: "postgres"
132+
}
133+
options: "--health-cmd pg_isready --health-interval 10s --health-timeout 5s --health-retries 5"
83134
}
84-
image: "postgres:10.8"
85-
options: "--health-cmd pg_isready --health-interval 10s --health-timeout 5s --health-retries 5"
86-
ports: [
87-
"5432:5432",
88-
]
89135
}

cue.mod/pkg/github.com/heetch/cue-schema/github/workflow/workflow.cue

+1-3
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@ package workflow
88

99
// TODO cross-verify against https://github.com/SchemaStore/schemastore/blob/master/src/schemas/json/github-workflow.json
1010

11-
import "regexp"
12-
1311
// name holds the name of your workflow. GitHub displays the
1412
// names of your workflows on your repository's actions page. If
1513
// you omit this field, GitHub sets the name to the workflow's
@@ -74,7 +72,7 @@ JobStep :: {
7472
"sh" |
7573
"cmd" |
7674
"powershell" |
77-
regexp.Match(#"\{0\}"#)
75+
=~#"\{0\}"#
7876

7977
// with holds a map of the input parameters defined by the action.
8078
// Each input parameter is a key/value pair. Input parameters are

0 commit comments

Comments
 (0)