Skip to content
This repository was archived by the owner on Oct 6, 2023. It is now read-only.

add CI #4

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Code generated by github.com/heetch/cue-schema/github/workflow/generate. DO NOT EDIT.
jobs:
test:
runs-on: ${{ matrix.platform }}
services:
kafka:
env:
KAFKA_ADVERTISED_LISTENERS: interbroker://kafka:29092,fromclient://localhost:9092
KAFKA_BROKER_ID: "1"
KAFKA_INTER_BROKER_LISTENER_NAME: interbroker
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: interbroker:PLAINTEXT,fromclient:PLAINTEXT
KAFKA_LOG4J_ROOT_LOGLEVEL: DEBUG
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: "10000"
image: confluentinc/cp-kafka:latest
ports:
- 9092:9092
zookeeper:
env:
ZOOKEEPER_CLIENT_PORT: "2181"
ZOOKEEPER_LOG4J_ROOT_LOGLEVEL: DEBUG
ZOOKEEPER_TICK_TIME: "2000"
image: confluentinc/cp-zookeeper:latest
ports:
- 2181:2181
steps:
- name: Install Go
uses: actions/setup-go@v1
with:
go-version: ${{ matrix.go-version }}
- name: Checkout code
uses: actions/checkout@v1
- name: Wait for Kafka
run: "waitfor() {\n\twhile ! nc -v -z $1 $2\n\tdo sleep 1\n\tdone\n}\nwaitfor
localhost 9092\nwaitfor localhost 2181\n"
shell: bash
timeout-minutes: 1
- name: Test
run: "echo ip address of kafka:\ndocker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}'
\"${{ job.services.kafka.id }}\"\necho ip address of zookeeper:\ndocker inspect
-f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' \"${{ job.services.zookeeper.id
}}\"\n\n\nwhile nc -v -z localhost 9092; do\n\techo ok so far\n\tsleep 1\ndone\necho
------------- kafka logs\ndocker logs \"${{ job.services.kafka.id }}\"\necho
--------------- end kafka logs\necho\necho ------------- zookeeper logs\ndocker
logs \"${{ job.services.zookeeper.id }}\"\necho ---------------- end zookeeper
logs\ndocker ps\nexit 1\nexport KAFKA_ADDRS=localhost:9092\n#go test ./...\ngo
run dial.go $KAFKA_ADDRS\ngo test -mod=vendor ./...\n"
strategy:
matrix:
go-version:
- 1.13.x
platform:
- ubuntu-latest
name: Test
"on":
- push
- pull_request
185 changes: 10 additions & 175 deletions client.go
Original file line number Diff line number Diff line change
@@ -1,188 +1,23 @@
// Package kafkatest provides a package intended for running tests
// that require a Kafka backend.
package kafkatest
package main

import (
"crypto/rand"
"fmt"
"net"
"os"
"strconv"
"strings"
"time"

"github.com/Shopify/sarama"
"gopkg.in/retry.v1"
)

var ErrDisabled = fmt.Errorf("kafka tests are disabled")

// New connects to a Kafka instance and returns a Kafka
// instance that uses it.
//
// The following environment variables can be used to
// configure the connection parameters:
//
// - $KAFKA_DISABLE
// A boolean as parsed by strconv.ParseBool. If this is true,
// New will return ErrDisabled.
// - $KAFKA_ADDRS
// A comma-separate list of Kafka broker addresses in host:port
// form. If this is empty, localhost:9092 will be used.
// The list of address can be discovered by calling Client.Addrs.
// - $KAFKA_USERNAME, $KAFKA_PASWORD
// The username and password to use for SASL authentication.
// When $KAFKA_USERNAME is non-empty, SASL will be
// enabled.
// - $KAFKA_USE_TLS
// A boolean as parsed by strconv.ParseBool. If this
// is true, a secure TLS connection will be used.
//
// - $KAFKA_TIMEOUT
// The maximum duration to wait when trying to connect
// to Kakfa. Defaults to "30s".
//
// The returned Kafka instance must be closed after use.
func New() (*Kafka, error) {
disabled, err := boolVar("KAFKA_DISABLE")
func main() {
err := DialIt(os.Args[1])
fmt.Printf("dial %s: %v\n", os.Args[1], err)
if err != nil {
return nil, fmt.Errorf("bad value for $KAFKA_DISABLE: %v", err)
}
if disabled {
return nil, ErrDisabled
}
addrsStr := os.Getenv("KAFKA_ADDRS")
if addrsStr == "" {
addrsStr = "localhost:9092"
}
addrs := strings.Split(addrsStr, ",")
useTLS, err := boolVar("KAFKA_USE_TLS")
if err != nil {
return nil, fmt.Errorf("bad value for KAFKA_USE_TLS: %v", err)
}
client := &Kafka{
addrs: addrs,
useTLS: useTLS,
saslUser: os.Getenv("KAFKA_USERNAME"),
saslPassword: os.Getenv("KAFKA_PASSWORD"),
}
// The cluster might not be available immediately, so try
// for a while before giving up.
retryLimit := 30 * time.Second
if limit := os.Getenv("KAFKA_TIMEOUT"); limit != "" {
retryLimit, err = time.ParseDuration(limit)
if err != nil {
return nil, fmt.Errorf("bad value for KAFKA_TIMEOUT: %v", err)
}
}
retryStrategy := retry.LimitTime(retryLimit, retry.Exponential{
Initial: time.Millisecond,
MaxDelay: time.Second,
})
for a := retry.Start(retryStrategy, nil); a.Next(); {
admin, err := sarama.NewClusterAdmin(addrs, client.Config())
if err == nil {
client.admin = admin
break
}
if !a.Next() {
return nil, fmt.Errorf("cannot connect to Kafka cluster at %q after %v: %v", addrs, retryLimit, err)
}
}
return client, nil
}

// Kafka represents a connection to a Kafka cluster.
type Kafka struct {
addrs []string
useTLS bool
saslUser string
saslPassword string
admin sarama.ClusterAdmin
topics []string
}

// Config returns a sarama configuration that will
// use connection parameters defined in the environment
// variables described in New.
func (k *Kafka) Config() *sarama.Config {
cfg := sarama.NewConfig()
k.InitConfig(cfg)
return cfg
}

// InitConfig is similar to Config, except that instead of
// returning a new configuration, it configures an existing
// one.
func (k *Kafka) InitConfig(cfg *sarama.Config) {
if cfg.Version == sarama.MinVersion {
// R
cfg.Version = sarama.V1_0_0_0
}
cfg.Net.TLS.Enable = k.useTLS
if k.saslUser != "" {
cfg.Net.SASL.Enable = true
cfg.Net.SASL.User = k.saslUser
cfg.Net.SASL.Password = k.saslPassword
os.Exit(1)
}
}

// Addrs returns the configured Kakfa broker addresses.
func (k *Kafka) Addrs() []string {
return k.addrs
}

// NewTopic creates a new Kafka topic with a random name and
// single partition. It returns the topic's name. The topic will be deleted
// when c.Close is called.
//
// NewTopic panics if the topic cannot be created.
func (k *Kafka) NewTopic() string {
if k.admin == nil {
panic("cannot create topic with closed kafkatest.Kafka instance")
}
topic := randomName("kafkatest-")
if err := k.admin.CreateTopic(topic, &sarama.TopicDetail{
NumPartitions: 1,
ReplicationFactor: 1,
}, false); err != nil {
panic(fmt.Errorf("cannot create topic %q: %v", topic, err))
}
k.topics = append(k.topics, topic)
return topic
}

// Close closes the client connection and removes any topics
// created by Topic. This method may be called more than once.
func (k *Kafka) Close() error {
if k.admin == nil {
return nil
}
for ; len(k.topics) != 0; k.topics = k.topics[1:] {
if err := k.admin.DeleteTopic(k.topics[0]); err != nil {
return fmt.Errorf("cannot delete topic %q: %v", k.topics[0], err)
}
}
k.admin.Close()
k.admin = nil
return nil
}

func boolVar(envVar string) (bool, error) {
s := os.Getenv(envVar)
if s == "" {
return false, nil
}
b, err := strconv.ParseBool(s)
func DialIt(s string) error {
_, err := net.Dial("tcp", s)
if err != nil {
return false, fmt.Errorf("invalid boolean value %q (possible values are: 1, t, T, TRUE, true, True, 0, f, F, FALSE)", s)
return fmt.Errorf("dialit: %v", err)
}
return b, nil
}

func randomName(prefix string) string {
buf := make([]byte, 8)
if _, err := rand.Read(buf); err != nil {
panic(err)
}
return fmt.Sprintf("%s%x", prefix, buf)
return nil
}
85 changes: 6 additions & 79 deletions client_test.go
Original file line number Diff line number Diff line change
@@ -1,86 +1,13 @@
package kafkatest_test
package main

import (
"os"
"testing"
"time"

"github.com/Shopify/sarama"
qt "github.com/frankban/quicktest"

"github.com/heetch/kafkatest"
)

func TestNew(t *testing.T) {
c := qt.New(t)
k, err := kafkatest.New()
c.Assert(err, qt.Equals, nil)

// Produce a message to a new topic.
cfg := k.Config()
cfg.Producer.Return.Successes = true
cfg.Producer.Return.Errors = true

producer, err := sarama.NewSyncProducer(k.Addrs(), cfg)
c.Assert(err, qt.Equals, nil)
defer producer.Close()
topic := k.NewTopic()

// Check that the topic has actually been created.
admin, err := sarama.NewClusterAdmin(k.Addrs(), cfg)
c.Assert(err, qt.Equals, nil)
defer admin.Close()
topics, err := admin.ListTopics()
c.Assert(err, qt.Equals, nil)
_, ok := topics[topic]
c.Assert(ok, qt.Equals, true)

// Produce a message to the topic.
_, offset, err := producer.SendMessage(&sarama.ProducerMessage{
Topic: topic,
Key: sarama.StringEncoder("key"),
Value: sarama.StringEncoder("value"),
})
c.Assert(err, qt.Equals, nil)
c.Assert(offset, qt.Equals, int64(0))

// Check that we can consume the message we just produced.
consumer, err := sarama.NewConsumer(k.Addrs(), cfg)
c.Assert(err, qt.Equals, nil)
defer consumer.Close()
pconsumer, err := consumer.ConsumePartition(topic, 0, sarama.OffsetOldest)
c.Assert(err, qt.Equals, nil)
defer pconsumer.Close()
select {
case m := <-pconsumer.Messages():
c.Check(string(m.Key), qt.Equals, "key")
c.Check(string(m.Value), qt.Equals, "value")
case <-time.After(10 * time.Second):
c.Fatal("timed out waiting for message")
func TestFoo(t *testing.T) {
err := DialIt(os.Getenv("KAFKA_ADDRS"))
if err != nil {
t.Errorf("cannot dial %q: %v", os.Getenv("KAFKA_ADDRS"), err)
}

// Close the Kafka instance and check that the
// new topic has been removed.
err = k.Close()
c.Assert(err, qt.Equals, nil)
topics, err = admin.ListTopics()
c.Assert(err, qt.Equals, nil)
_, ok = topics[topic]
c.Assert(ok, qt.Equals, false)

// Check we can call Close again.
err = k.Close()
c.Assert(err, qt.Equals, nil)
}

func TestDisabled(t *testing.T) {
c := qt.New(t)
c.Setenv("KAFKA_DISABLE", "1")
k, err := kafkatest.New()
c.Assert(err, qt.Equals, kafkatest.ErrDisabled)
c.Assert(k, qt.IsNil)

c.Setenv("KAFKA_DISABLE", "bad")
k, err = kafkatest.New()
c.Assert(err, qt.ErrorMatches, `bad value for \$KAFKA_DISABLE: invalid boolean value "bad" \(possible values are: 1, t, T, TRUE, true, True, 0, f, F, FALSE\)`)
c.Assert(k, qt.IsNil)
}
1 change: 1 addition & 0 deletions cue.mod/module.cue
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
module: ""
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
module: "github.com/heetch/cue-schema/github"
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package generate

import (
"tool/file"
"encoding/yaml"
"github.com/heetch/cue-schema/github/workflow"
)

Workflow :: workflow

command: generateworkflow: {
task: write: file.Create & {
filename: ".github/workflows/test.yaml"
contents: """
# Code generated by github.com/heetch/cue-schema/github/workflow/generate. DO NOT EDIT.
\(yaml.Marshal(Workflow))
"""
}
}
Loading