diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 377daf0..b2c2b17 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -24,7 +24,7 @@ jobs: run: go build -v -buildmode=plugin -o plugin/postgresql/postgres.so ./plugin/postgresql - name: Test with coverage - run: go test ./... -race -coverprofile=coverage.txt -covermode=atomic + run: go test ./pkg/... -race -coverprofile=coverage.txt -covermode=atomic - name: Codecov uses: codecov/codecov-action@v4.1.0 diff --git a/pkg/config/queue.go b/pkg/config/queue.go index 543ddf3..9dc73bc 100644 --- a/pkg/config/queue.go +++ b/pkg/config/queue.go @@ -60,7 +60,7 @@ type RetryConfig struct { Strategy string `yaml:"strategy,omitempty" json:"strategy,omitempty"` // Interval is the interval between retries - Interval time.Duration `yaml:"interval" json:"interval"` + Interval time.Duration `yaml:"interval,omitempty" json:"interval,omitempty"` // ThresholdStatus is the minimum status code that will trigger a retry, defaults to 500 ThresholdStatus int `yaml:"threshold_status,omitempty" json:"threshold_status,omitempty"` diff --git a/pkg/runner/runner.go b/pkg/runner/runner.go index 09beb06..1db9135 100644 --- a/pkg/runner/runner.go +++ b/pkg/runner/runner.go @@ -98,7 +98,7 @@ func sendRequestWithStrategy(qCfg *config.QueueConfig, rCfg *config.RouteConfig, // shouldRetry determines whether a request should be retried based on the response and retry configuration func shouldRetry(resp *http.Response, retryConfig *config.RetryConfig) bool { - if retryConfig == nil && !retryConfig.Enabled { + if retryConfig == nil || !retryConfig.Enabled { return false } return resp == nil || resp.StatusCode >= retryConfig.ThresholdStatus diff --git a/pkg/runner/runner_test.go b/pkg/runner/runner_test.go index c2505e1..c92c96a 100644 --- a/pkg/runner/runner_test.go +++ b/pkg/runner/runner_test.go @@ -302,6 +302,7 @@ func TestSendRequestWithStrategy(t *testing.T) { qCfg := &config.QueueConfig{ Name: "testQueue", Retry: &config.RetryConfig{ + Enabled: tt.retryEnabled, Strategy: tt.retryStrategy, MaxRetries: tt.maxRetries, Interval: tt.interval, @@ -315,7 +316,7 @@ func TestSendRequestWithStrategy(t *testing.T) { if mockHTTPRequester.CallCount != tt.expectedCalls { t.Errorf("Expected %d calls to SendRequest, got %d", tt.expectedCalls, mockHTTPRequester.CallCount) } - if qCfg.Retry != nil && qCfg.Retry.Enabled { + if qCfg.Retry != nil && qCfg.Retry.Enabled && qCfg.Retry.Strategy != common.RetryStrategyRand { if duration < qCfg.Retry.Interval*time.Duration(qCfg.Retry.MaxRetries) { t.Errorf("Expected duration to be greater than %d, got %d", qCfg.Retry.Interval*time.Duration(qCfg.Retry.MaxRetries), duration) } diff --git a/test/e2e/rabbitmq_http_test.go b/test/e2e/rabbitmq_http_test.go new file mode 100644 index 0000000..2ba166a --- /dev/null +++ b/test/e2e/rabbitmq_http_test.go @@ -0,0 +1,175 @@ +package e2e + +import ( + "fmt" + "os" + "testing" + "time" + + konsume "github.com/bugrakocabay/konsume/cmd" + "github.com/bugrakocabay/konsume/pkg/config" +) + +const ( + host = "localhost" + port = 5672 + username = "user" + password = "password" + testQueueName = "test-queue" +) + +type TestCase struct { + Description string + KonsumeConfig *config.Config + SetupMessage SetupMessage + ExpectedResult []HTTPRequestExpectation +} + +type SetupMessage struct { + QueueName string + Message []byte +} + +type HTTPRequestExpectation struct { + URL string + Method string + Body string +} + +func TestKonsumeWithRabbitMQHTTP(t *testing.T) { + mockServer, url, requestCapture := setupMockServer(t) + defer mockServer.Close() + + tests := []TestCase{ + { + Description: "Test with single message", + KonsumeConfig: &config.Config{ + Providers: []*config.ProviderConfig{ + { + Name: "rabbit-queue", + Type: "rabbitmq", + AMQPConfig: &config.AMQPConfig{ + Host: host, + Port: port, + Username: username, + Password: password, + }, + }, + }, + Queues: []*config.QueueConfig{ + { + Name: testQueueName + "-1", + Provider: "rabbit-queue", + Routes: []*config.RouteConfig{ + { + Name: "test-route", + URL: fmt.Sprintf("%s/200", url), + }, + }, + }, + }, + }, + SetupMessage: SetupMessage{ + QueueName: testQueueName + "-1", + Message: []byte("{\"id\": 0, \"name\": \"test\"}"), + }, + ExpectedResult: []HTTPRequestExpectation{ + { + URL: "/200", + Body: "{\"id\": 0, \"name\": \"test\"}", + Method: "POST", + }, + }, + }, + { + Description: "Test with single message dynamic body", + KonsumeConfig: &config.Config{ + Providers: []*config.ProviderConfig{ + { + Name: "rabbit-queue", + Type: "rabbitmq", + AMQPConfig: &config.AMQPConfig{ + Host: host, + Port: port, + Username: username, + Password: password, + }, + }, + }, + Queues: []*config.QueueConfig{ + { + Name: testQueueName + "-2", + Provider: "rabbit-queue", + Routes: []*config.RouteConfig{ + { + Name: "test-route", + URL: fmt.Sprintf("%s/200", url), + Body: map[string]interface{}{ + "some-id": "{{id}}", + "some-name": "{{name}}", + }, + }, + }, + }, + }, + }, + SetupMessage: SetupMessage{ + QueueName: testQueueName + "-2", + Message: []byte("{\"id\": 1, \"name\": \"test\"}"), + }, + ExpectedResult: []HTTPRequestExpectation{ + { + URL: "/200", + Body: "{\"some-id\":1,\"some-name\":\"test\"}", + Method: "POST", + }, + }, + }, + } + + for _, test := range tests { + t.Run(test.Description, func(t *testing.T) { + requestCapture.ReceivedRequests = nil + // Setting up the config file + configFilePath, cleanup := writeConfigToFile(test.KonsumeConfig) + defer cleanup() + os.Setenv("KONSUME_CONFIG_PATH", configFilePath) + + // Running konsume and waiting for it to consume the message + go konsume.Execute() + time.Sleep(2 * time.Second) + + // Pushing the message to the queue + connString := fmt.Sprintf("amqp://%s:%s@%s:%d/", username, password, host, port) + conn, ch, err := connectToRabbitMQ(connString) + if err != nil { + t.Fatalf("Failed to connect to RabbitMQ: %v", err) + } + defer conn.Close() + defer ch.Close() + err = pushMessageToQueue(ch, test.SetupMessage.QueueName, test.SetupMessage.Message) + if err != nil { + t.Fatalf("Failed to push message to queue: %v", err) + } + time.Sleep(2 * time.Second) + + // Checking the captured requests + requestCapture.Mutex.Lock() + defer requestCapture.Mutex.Unlock() + if len(requestCapture.ReceivedRequests) != len(test.ExpectedResult) { + t.Fatalf("Expected %d HTTP requests, but got %d", len(test.ExpectedResult), len(requestCapture.ReceivedRequests)) + } + for i, expectedRequest := range test.ExpectedResult { + if requestCapture.ReceivedRequests[i].URL != expectedRequest.URL { + t.Errorf("Expected URL: %s, but got: %s", expectedRequest.URL, requestCapture.ReceivedRequests[i].URL) + } + if requestCapture.ReceivedRequests[i].Method != expectedRequest.Method { + t.Errorf("Expected method: %s, but got: %s", expectedRequest.Method, requestCapture.ReceivedRequests[i].Method) + } + if requestCapture.ReceivedRequests[i].Body != expectedRequest.Body { + t.Errorf("Expected body: %s, but got: %s", expectedRequest.Body, requestCapture.ReceivedRequests[i].Body) + } + } + }) + } +} diff --git a/test/e2e/test_util.go b/test/e2e/test_util.go new file mode 100644 index 0000000..310b5ac --- /dev/null +++ b/test/e2e/test_util.go @@ -0,0 +1,123 @@ +package e2e + +import ( + "context" + "io" + "log" + "net/http" + "net/http/httptest" + "os" + "sync" + "testing" + + "github.com/bugrakocabay/konsume/pkg/config" + + amqp "github.com/rabbitmq/amqp091-go" + "gopkg.in/yaml.v3" +) + +type RequestCapture struct { + Mutex sync.Mutex + ReceivedRequests []HTTPRequestExpectation +} + +// connectToRabbitMQ establishes a connection to RabbitMQ and returns the connection and channel +func connectToRabbitMQ(connectionString string) (*amqp.Connection, *amqp.Channel, error) { + conn, err := amqp.Dial(connectionString) + if err != nil { + return nil, nil, err + } + + ch, err := conn.Channel() + if err != nil { + return nil, nil, err + } + + return conn, ch, nil +} + +// pushMessageToQueue publishes a message to the specified queue in RabbitMQ +func pushMessageToQueue(ch *amqp.Channel, queueName string, body []byte) error { + q, err := ch.QueueDeclare( + queueName, // name + true, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + nil, // arguments + ) + if err != nil { + return err + } + ctx := context.Background() + err = ch.PublishWithContext( + ctx, + "", // exchange + q.Name, // routing key + false, // mandatory + false, // immediate + amqp.Publishing{ + ContentType: "text/plain", + Body: body, + }) + if err != nil { + return err + } + + return nil +} + +// writeConfigToFile writes the given config to a temporary file and returns the file path and a cleanup function +func writeConfigToFile(cfg *config.Config) (string, func()) { + data, err := yaml.Marshal(cfg) + if err != nil { + log.Fatalf("Failed to marshal config: %v", err) + } + + tmpFile, err := os.CreateTemp("", "konsume-config-*.yaml") + if err != nil { + log.Fatalf("Failed to create temp file for config: %v", err) + } + + if _, err = tmpFile.Write(data); err != nil { + log.Fatalf("Failed to write to temp config file: %v", err) + } + if err = tmpFile.Close(); err != nil { + log.Fatalf("Failed to close temp config file: %v", err) + } + + return tmpFile.Name(), func() { os.Remove(tmpFile.Name()) } +} + +func setupMockServer(t *testing.T) (*httptest.Server, string, *RequestCapture) { + capture := &RequestCapture{} + + mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, err := io.ReadAll(r.Body) + if err != nil { + t.Fatal("Failed to read request body") + } + defer r.Body.Close() + + receivedRequest := HTTPRequestExpectation{ + URL: r.URL.String(), + Method: r.Method, + Body: string(body), + } + + capture.Mutex.Lock() + capture.ReceivedRequests = append(capture.ReceivedRequests, receivedRequest) + capture.Mutex.Unlock() + + switch r.URL.Path { + case "/400": + w.WriteHeader(http.StatusBadRequest) + case "/500": + w.WriteHeader(http.StatusInternalServerError) + default: + w.WriteHeader(http.StatusOK) + } + })) + + return mockServer, mockServer.URL, capture +}