Skip to content

Commit

Permalink
Merge pull request #106 from bugrakocabay/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
bugrakocabay authored Mar 11, 2024
2 parents 9c0c098 + a1c90f6 commit 8bdea6d
Show file tree
Hide file tree
Showing 6 changed files with 303 additions and 4 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
run: go build -v -buildmode=plugin -o plugin/postgresql/postgres.so ./plugin/postgresql

- name: Test with coverage
run: go test ./... -race -coverprofile=coverage.txt -covermode=atomic
run: go test ./pkg/... -race -coverprofile=coverage.txt -covermode=atomic

- name: Codecov
uses: codecov/[email protected]
Expand Down
2 changes: 1 addition & 1 deletion pkg/config/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type RetryConfig struct {
Strategy string `yaml:"strategy,omitempty" json:"strategy,omitempty"`

// Interval is the interval between retries
Interval time.Duration `yaml:"interval" json:"interval"`
Interval time.Duration `yaml:"interval,omitempty" json:"interval,omitempty"`

// ThresholdStatus is the minimum status code that will trigger a retry, defaults to 500
ThresholdStatus int `yaml:"threshold_status,omitempty" json:"threshold_status,omitempty"`
Expand Down
2 changes: 1 addition & 1 deletion pkg/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func sendRequestWithStrategy(qCfg *config.QueueConfig, rCfg *config.RouteConfig,

// shouldRetry determines whether a request should be retried based on the response and retry configuration
func shouldRetry(resp *http.Response, retryConfig *config.RetryConfig) bool {
if retryConfig == nil && !retryConfig.Enabled {
if retryConfig == nil || !retryConfig.Enabled {
return false
}
return resp == nil || resp.StatusCode >= retryConfig.ThresholdStatus
Expand Down
3 changes: 2 additions & 1 deletion pkg/runner/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ func TestSendRequestWithStrategy(t *testing.T) {
qCfg := &config.QueueConfig{
Name: "testQueue",
Retry: &config.RetryConfig{
Enabled: tt.retryEnabled,
Strategy: tt.retryStrategy,
MaxRetries: tt.maxRetries,
Interval: tt.interval,
Expand All @@ -315,7 +316,7 @@ func TestSendRequestWithStrategy(t *testing.T) {
if mockHTTPRequester.CallCount != tt.expectedCalls {
t.Errorf("Expected %d calls to SendRequest, got %d", tt.expectedCalls, mockHTTPRequester.CallCount)
}
if qCfg.Retry != nil && qCfg.Retry.Enabled {
if qCfg.Retry != nil && qCfg.Retry.Enabled && qCfg.Retry.Strategy != common.RetryStrategyRand {
if duration < qCfg.Retry.Interval*time.Duration(qCfg.Retry.MaxRetries) {
t.Errorf("Expected duration to be greater than %d, got %d", qCfg.Retry.Interval*time.Duration(qCfg.Retry.MaxRetries), duration)
}
Expand Down
175 changes: 175 additions & 0 deletions test/e2e/rabbitmq_http_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
package e2e

import (
"fmt"
"os"
"testing"
"time"

konsume "github.com/bugrakocabay/konsume/cmd"
"github.com/bugrakocabay/konsume/pkg/config"
)

const (
host = "localhost"
port = 5672
username = "user"
password = "password"
testQueueName = "test-queue"
)

type TestCase struct {
Description string
KonsumeConfig *config.Config
SetupMessage SetupMessage
ExpectedResult []HTTPRequestExpectation
}

type SetupMessage struct {
QueueName string
Message []byte
}

type HTTPRequestExpectation struct {
URL string
Method string
Body string
}

func TestKonsumeWithRabbitMQHTTP(t *testing.T) {
mockServer, url, requestCapture := setupMockServer(t)
defer mockServer.Close()

tests := []TestCase{
{
Description: "Test with single message",
KonsumeConfig: &config.Config{
Providers: []*config.ProviderConfig{
{
Name: "rabbit-queue",
Type: "rabbitmq",
AMQPConfig: &config.AMQPConfig{
Host: host,
Port: port,
Username: username,
Password: password,
},
},
},
Queues: []*config.QueueConfig{
{
Name: testQueueName + "-1",
Provider: "rabbit-queue",
Routes: []*config.RouteConfig{
{
Name: "test-route",
URL: fmt.Sprintf("%s/200", url),
},
},
},
},
},
SetupMessage: SetupMessage{
QueueName: testQueueName + "-1",
Message: []byte("{\"id\": 0, \"name\": \"test\"}"),
},
ExpectedResult: []HTTPRequestExpectation{
{
URL: "/200",
Body: "{\"id\": 0, \"name\": \"test\"}",
Method: "POST",
},
},
},
{
Description: "Test with single message dynamic body",
KonsumeConfig: &config.Config{
Providers: []*config.ProviderConfig{
{
Name: "rabbit-queue",
Type: "rabbitmq",
AMQPConfig: &config.AMQPConfig{
Host: host,
Port: port,
Username: username,
Password: password,
},
},
},
Queues: []*config.QueueConfig{
{
Name: testQueueName + "-2",
Provider: "rabbit-queue",
Routes: []*config.RouteConfig{
{
Name: "test-route",
URL: fmt.Sprintf("%s/200", url),
Body: map[string]interface{}{
"some-id": "{{id}}",
"some-name": "{{name}}",
},
},
},
},
},
},
SetupMessage: SetupMessage{
QueueName: testQueueName + "-2",
Message: []byte("{\"id\": 1, \"name\": \"test\"}"),
},
ExpectedResult: []HTTPRequestExpectation{
{
URL: "/200",
Body: "{\"some-id\":1,\"some-name\":\"test\"}",
Method: "POST",
},
},
},
}

for _, test := range tests {
t.Run(test.Description, func(t *testing.T) {
requestCapture.ReceivedRequests = nil
// Setting up the config file
configFilePath, cleanup := writeConfigToFile(test.KonsumeConfig)
defer cleanup()
os.Setenv("KONSUME_CONFIG_PATH", configFilePath)

// Running konsume and waiting for it to consume the message
go konsume.Execute()
time.Sleep(2 * time.Second)

// Pushing the message to the queue
connString := fmt.Sprintf("amqp://%s:%s@%s:%d/", username, password, host, port)
conn, ch, err := connectToRabbitMQ(connString)
if err != nil {
t.Fatalf("Failed to connect to RabbitMQ: %v", err)
}
defer conn.Close()
defer ch.Close()
err = pushMessageToQueue(ch, test.SetupMessage.QueueName, test.SetupMessage.Message)
if err != nil {
t.Fatalf("Failed to push message to queue: %v", err)
}
time.Sleep(2 * time.Second)

// Checking the captured requests
requestCapture.Mutex.Lock()
defer requestCapture.Mutex.Unlock()
if len(requestCapture.ReceivedRequests) != len(test.ExpectedResult) {
t.Fatalf("Expected %d HTTP requests, but got %d", len(test.ExpectedResult), len(requestCapture.ReceivedRequests))
}
for i, expectedRequest := range test.ExpectedResult {
if requestCapture.ReceivedRequests[i].URL != expectedRequest.URL {
t.Errorf("Expected URL: %s, but got: %s", expectedRequest.URL, requestCapture.ReceivedRequests[i].URL)
}
if requestCapture.ReceivedRequests[i].Method != expectedRequest.Method {
t.Errorf("Expected method: %s, but got: %s", expectedRequest.Method, requestCapture.ReceivedRequests[i].Method)
}
if requestCapture.ReceivedRequests[i].Body != expectedRequest.Body {
t.Errorf("Expected body: %s, but got: %s", expectedRequest.Body, requestCapture.ReceivedRequests[i].Body)
}
}
})
}
}
123 changes: 123 additions & 0 deletions test/e2e/test_util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package e2e

import (
"context"
"io"
"log"
"net/http"
"net/http/httptest"
"os"
"sync"
"testing"

"github.com/bugrakocabay/konsume/pkg/config"

amqp "github.com/rabbitmq/amqp091-go"
"gopkg.in/yaml.v3"
)

type RequestCapture struct {
Mutex sync.Mutex
ReceivedRequests []HTTPRequestExpectation
}

// connectToRabbitMQ establishes a connection to RabbitMQ and returns the connection and channel
func connectToRabbitMQ(connectionString string) (*amqp.Connection, *amqp.Channel, error) {
conn, err := amqp.Dial(connectionString)
if err != nil {
return nil, nil, err
}

ch, err := conn.Channel()
if err != nil {
return nil, nil, err
}

return conn, ch, nil
}

// pushMessageToQueue publishes a message to the specified queue in RabbitMQ
func pushMessageToQueue(ch *amqp.Channel, queueName string, body []byte) error {
q, err := ch.QueueDeclare(
queueName, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
return err
}
ctx := context.Background()
err = ch.PublishWithContext(
ctx,
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: body,
})
if err != nil {
return err
}

return nil
}

// writeConfigToFile writes the given config to a temporary file and returns the file path and a cleanup function
func writeConfigToFile(cfg *config.Config) (string, func()) {
data, err := yaml.Marshal(cfg)
if err != nil {
log.Fatalf("Failed to marshal config: %v", err)
}

tmpFile, err := os.CreateTemp("", "konsume-config-*.yaml")
if err != nil {
log.Fatalf("Failed to create temp file for config: %v", err)
}

if _, err = tmpFile.Write(data); err != nil {
log.Fatalf("Failed to write to temp config file: %v", err)
}
if err = tmpFile.Close(); err != nil {
log.Fatalf("Failed to close temp config file: %v", err)
}

return tmpFile.Name(), func() { os.Remove(tmpFile.Name()) }
}

func setupMockServer(t *testing.T) (*httptest.Server, string, *RequestCapture) {
capture := &RequestCapture{}

mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
if err != nil {
t.Fatal("Failed to read request body")
}
defer r.Body.Close()

receivedRequest := HTTPRequestExpectation{
URL: r.URL.String(),
Method: r.Method,
Body: string(body),
}

capture.Mutex.Lock()
capture.ReceivedRequests = append(capture.ReceivedRequests, receivedRequest)
capture.Mutex.Unlock()

switch r.URL.Path {
case "/400":
w.WriteHeader(http.StatusBadRequest)
case "/500":
w.WriteHeader(http.StatusInternalServerError)
default:
w.WriteHeader(http.StatusOK)
}
}))

return mockServer, mockServer.URL, capture
}

0 comments on commit 8bdea6d

Please sign in to comment.