diff --git a/_examples/queues/main.go b/_examples/queues/main.go index 80d0f4a..7aa9258 100644 --- a/_examples/queues/main.go +++ b/_examples/queues/main.go @@ -105,7 +105,7 @@ func produceBytes(q *queues.Producer, req *http.Request) error { return nil } -func consumeBatch(batch *queues.ConsumerMessageBatch) error { +func consumeBatch(batch *queues.MessageBatch) error { for _, msg := range batch.Messages { log.Printf("Received message: %v\n", msg.Body.Get("name").String()) } diff --git a/cloudflare/queues/batchmessage.go b/cloudflare/queues/batchmessage.go index c54ba3c..18185c0 100644 --- a/cloudflare/queues/batchmessage.go +++ b/cloudflare/queues/batchmessage.go @@ -6,6 +6,8 @@ import ( "github.com/syumai/workers/internal/jsutil" ) +// FIXME: rename to MessageSendRequest +// see: https://developers.cloudflare.com/queues/configuration/javascript-apis/#messagesendrequest type BatchMessage struct { body js.Value options *sendOptions diff --git a/cloudflare/queues/consumer.go b/cloudflare/queues/consumer.go index 54f8099..606dbf8 100644 --- a/cloudflare/queues/consumer.go +++ b/cloudflare/queues/consumer.go @@ -12,7 +12,7 @@ import ( // A returned error will cause the batch to be retried (unless the batch or individual messages are acked). // NOTE: to do long-running message processing task within the Consumer, use cloudflare.WaitUntil, this will postpone the message // acknowledgment until the task is completed witout blocking the queue consumption. -type Consumer func(batch *ConsumerMessageBatch) error +type Consumer func(batch *MessageBatch) error var consumer Consumer @@ -44,7 +44,7 @@ func init() { } func consumeBatch(batch js.Value) error { - b, err := newConsumerMessageBatch(batch) + b, err := newMessageBatch(batch) if err != nil { return fmt.Errorf("failed to parse message batch: %v", err) } diff --git a/cloudflare/queues/consumermessage.go b/cloudflare/queues/message.go similarity index 54% rename from cloudflare/queues/consumermessage.go rename to cloudflare/queues/message.go index e1f2bec..bf41a48 100644 --- a/cloudflare/queues/consumermessage.go +++ b/cloudflare/queues/message.go @@ -8,14 +8,14 @@ import ( "github.com/syumai/workers/internal/jsutil" ) -// ConsumerMessage represents a message of the batch received by the consumer. +// Message represents a message of the batch received by the consumer. // - https://developers.cloudflare.com/queues/configuration/javascript-apis/#message -type ConsumerMessage struct { +type Message struct { // instance - The underlying instance of the JS message object passed by the cloudflare instance js.Value - // Id - The unique Cloudflare-generated identifier of the message - Id string + // ID - The unique Cloudflare-generated identifier of the message + ID string // Timestamp - The time when the message was enqueued Timestamp time.Time // Body - The message body. Could be accessed directly or using converting helpers as StringBody, BytesBody, IntBody, FloatBody. @@ -24,15 +24,15 @@ type ConsumerMessage struct { Attempts int } -func newConsumerMessage(obj js.Value) (*ConsumerMessage, error) { +func newMessage(obj js.Value) (*Message, error) { timestamp, err := jsutil.DateToTime(obj.Get("timestamp")) if err != nil { return nil, fmt.Errorf("failed to parse message timestamp: %v", err) } - return &ConsumerMessage{ + return &Message{ instance: obj, - Id: obj.Get("id").String(), + ID: obj.Get("id").String(), Body: obj.Get("body"), Attempts: obj.Get("attempts").Int(), Timestamp: timestamp, @@ -41,13 +41,13 @@ func newConsumerMessage(obj js.Value) (*ConsumerMessage, error) { // Ack acknowledges the message as successfully delivered despite the result returned from the consuming function. // - https://developers.cloudflare.com/queues/configuration/javascript-apis/#message -func (m *ConsumerMessage) Ack() { +func (m *Message) Ack() { m.instance.Call("ack") } // Retry marks the message to be re-delivered. // The message will be retried after the optional delay configured with RetryOption. -func (m *ConsumerMessage) Retry(opts ...RetryOption) { +func (m *Message) Retry(opts ...RetryOption) { var o *retryOptions if len(opts) > 0 { o = &retryOptions{} @@ -59,40 +59,19 @@ func (m *ConsumerMessage) Retry(opts ...RetryOption) { m.instance.Call("retry", o.toJS()) } -func (m *ConsumerMessage) StringBody() (string, error) { +func (m *Message) StringBody() (string, error) { if m.Body.Type() != js.TypeString { return "", fmt.Errorf("message body is not a string: %v", m.Body) } return m.Body.String(), nil } -func (m *ConsumerMessage) BytesBody() ([]byte, error) { - switch m.Body.Type() { - case js.TypeString: - return []byte(m.Body.String()), nil - case js.TypeObject: - if m.Body.InstanceOf(jsutil.Uint8ArrayClass) || m.Body.InstanceOf(jsutil.Uint8ClampedArrayClass) { - b := make([]byte, m.Body.Get("byteLength").Int()) - js.CopyBytesToGo(b, m.Body) - return b, nil - } - } - - return nil, fmt.Errorf("message body is not a byte array: %v", m.Body) -} - -func (m *ConsumerMessage) IntBody() (int, error) { - if m.Body.Type() == js.TypeNumber { - return m.Body.Int(), nil +func (m *Message) BytesBody() ([]byte, error) { + if m.Body.Type() != js.TypeObject || + !(m.Body.InstanceOf(jsutil.Uint8ArrayClass) || m.Body.InstanceOf(jsutil.Uint8ClampedArrayClass)) { + return nil, fmt.Errorf("message body is not a byte array: %v", m.Body) } - - return 0, fmt.Errorf("message body is not a number: %v", m.Body) -} - -func (m *ConsumerMessage) FloatBody() (float64, error) { - if m.Body.Type() == js.TypeNumber { - return m.Body.Float(), nil - } - - return 0, fmt.Errorf("message body is not a number: %v", m.Body) + b := make([]byte, m.Body.Get("byteLength").Int()) + js.CopyBytesToGo(b, m.Body) + return b, nil } diff --git a/cloudflare/queues/consumermessage_test.go b/cloudflare/queues/message_test.go similarity index 66% rename from cloudflare/queues/consumermessage_test.go rename to cloudflare/queues/message_test.go index c285ee2..ab21208 100644 --- a/cloudflare/queues/consumermessage_test.go +++ b/cloudflare/queues/message_test.go @@ -20,17 +20,17 @@ func TestNewConsumerMessage(t *testing.T) { "attempts": 1, } - got, err := newConsumerMessage(js.ValueOf(m)) + got, err := newMessage(js.ValueOf(m)) if err != nil { - t.Fatalf("newConsumerMessage failed: %v", err) + t.Fatalf("newMessage failed: %v", err) } if body := got.Body.String(); body != "hello" { t.Fatalf("Body() = %v, want %v", body, "hello") } - if got.Id != id { - t.Fatalf("Id = %v, want %v", got.Id, id) + if got.ID != id { + t.Fatalf("ID = %v, want %v", got.ID, id) } if got.Attempts != 1 { @@ -49,7 +49,7 @@ func TestConsumerMessage_Ack(t *testing.T) { ackCalled = true return nil })) - m := &ConsumerMessage{ + m := &Message{ instance: jsObj, } @@ -67,7 +67,7 @@ func TestConsumerMessage_Retry(t *testing.T) { retryCalled = true return nil })) - m := &ConsumerMessage{ + m := &Message{ instance: jsObj, } @@ -99,7 +99,7 @@ func TestConsumerMessage_RetryWithDelay(t *testing.T) { return nil })) - m := &ConsumerMessage{ + m := &Message{ instance: jsObj, } @@ -151,7 +151,7 @@ func TestNewConsumerMessage_StringBody(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - m := &ConsumerMessage{ + m := &Message{ Body: tt.body(), } @@ -174,13 +174,6 @@ func TestConsumerMessage_BytesBody(t *testing.T) { want []byte wantErr bool }{ - { - name: "string", - body: func() js.Value { - return js.ValueOf("hello") - }, - want: []byte("hello"), - }, { name: "uint8 array", body: func() js.Value { @@ -202,7 +195,7 @@ func TestConsumerMessage_BytesBody(t *testing.T) { { name: "incorrect type", body: func() js.Value { - return js.ValueOf(42) + return js.ValueOf("hello") }, wantErr: true, }, @@ -210,7 +203,7 @@ func TestConsumerMessage_BytesBody(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - m := &ConsumerMessage{ + m := &Message{ Body: tt.body(), } @@ -225,97 +218,3 @@ func TestConsumerMessage_BytesBody(t *testing.T) { }) } } - -func TestConsumerMessage_IntBody(t *testing.T) { - tests := []struct { - name string - body js.Value - want int - wantErr bool - }{ - { - name: "int", - body: js.ValueOf(42), - want: 42, - }, - { - name: "float", - body: js.ValueOf(42.5), - want: 42, - }, - { - name: "string", - body: js.ValueOf("42"), - wantErr: true, - }, - { - name: "undefined", - body: js.Undefined(), - wantErr: true, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - m := &ConsumerMessage{ - Body: tt.body, - } - - got, err := m.IntBody() - if (err != nil) != tt.wantErr { - t.Fatalf("IntBody() error = %v, wantErr %v", err, tt.wantErr) - } - - if got != tt.want { - t.Fatalf("IntBody() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestConsumerMessage_FloatBody(t *testing.T) { - tests := []struct { - name string - body js.Value - want float64 - wantErr bool - }{ - { - name: "int", - body: js.ValueOf(42), - want: 42.0, - }, - { - name: "float", - body: js.ValueOf(42.5), - want: 42.5, - }, - { - name: "string", - body: js.ValueOf("42"), - wantErr: true, - }, - { - name: "undefined", - body: js.Undefined(), - wantErr: true, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - m := &ConsumerMessage{ - Body: tt.body, - } - - got, err := m.FloatBody() - if (err != nil) != tt.wantErr { - t.Fatalf("FloatBody() error = %v, wantErr %v", err, tt.wantErr) - } - - if got != tt.want { - t.Fatalf("FloatBody() = %v, want %v", got, tt.want) - } - }) - } -} diff --git a/cloudflare/queues/consumermessagebatch.go b/cloudflare/queues/messagebatch.go similarity index 73% rename from cloudflare/queues/consumermessagebatch.go rename to cloudflare/queues/messagebatch.go index 976b0bf..b21055f 100644 --- a/cloudflare/queues/consumermessagebatch.go +++ b/cloudflare/queues/messagebatch.go @@ -5,11 +5,11 @@ import ( "syscall/js" ) -// ConsumerMessageBatch represents a batch of messages received by the consumer. The size of the batch is determined by the +// MessageBatch represents a batch of messages received by the consumer. The size of the batch is determined by the // worker configuration. // - https://developers.cloudflare.com/queues/configuration/configure-queues/#consumer // - https://developers.cloudflare.com/queues/configuration/javascript-apis/#messagebatch -type ConsumerMessageBatch struct { +type MessageBatch struct { // instance - The underlying instance of the JS message object passed by the cloudflare instance js.Value @@ -17,21 +17,21 @@ type ConsumerMessageBatch struct { Queue string // Messages - The messages in the batch - Messages []*ConsumerMessage + Messages []*Message } -func newConsumerMessageBatch(obj js.Value) (*ConsumerMessageBatch, error) { +func newMessageBatch(obj js.Value) (*MessageBatch, error) { msgArr := obj.Get("messages") - messages := make([]*ConsumerMessage, msgArr.Length()) + messages := make([]*Message, msgArr.Length()) for i := 0; i < msgArr.Length(); i++ { - m, err := newConsumerMessage(msgArr.Index(i)) + m, err := newMessage(msgArr.Index(i)) if err != nil { return nil, fmt.Errorf("failed to parse message %d: %v", i, err) } messages[i] = m } - return &ConsumerMessageBatch{ + return &MessageBatch{ instance: obj, Queue: obj.Get("queue").String(), Messages: messages, @@ -40,14 +40,14 @@ func newConsumerMessageBatch(obj js.Value) (*ConsumerMessageBatch, error) { // AckAll acknowledges all messages in the batch as successfully delivered despite the result returned from the consuming function. // - https://developers.cloudflare.com/queues/configuration/javascript-apis/#messagebatch -func (b *ConsumerMessageBatch) AckAll() { +func (b *MessageBatch) AckAll() { b.instance.Call("ackAll") } // RetryAll marks all messages in the batch to be re-delivered. // The messages will be retried after the optional delay configured with RetryOption. // - https://developers.cloudflare.com/queues/configuration/javascript-apis/#messagebatch -func (b *ConsumerMessageBatch) RetryAll(opts ...RetryOption) { +func (b *MessageBatch) RetryAll(opts ...RetryOption) { var o *retryOptions if len(opts) > 0 { o = &retryOptions{} diff --git a/cloudflare/queues/consumermessagebatch_test.go b/cloudflare/queues/messagebatch_test.go similarity index 90% rename from cloudflare/queues/consumermessagebatch_test.go rename to cloudflare/queues/messagebatch_test.go index ec76dc6..0f847dc 100644 --- a/cloudflare/queues/consumermessagebatch_test.go +++ b/cloudflare/queues/messagebatch_test.go @@ -24,9 +24,9 @@ func TestNewConsumerMessageBatch(t *testing.T) { }, } - got, err := newConsumerMessageBatch(js.ValueOf(m)) + got, err := newMessageBatch(js.ValueOf(m)) if err != nil { - t.Fatalf("newConsumerMessageBatch failed: %v", err) + t.Fatalf("newMessageBatch failed: %v", err) } if got.Queue != "some-queue" { @@ -42,8 +42,8 @@ func TestNewConsumerMessageBatch(t *testing.T) { t.Fatalf("Body() = %v, want %v", body, "hello") } - if msg.Id != id { - t.Fatalf("Id = %v, want %v", msg.Id, id) + if msg.ID != id { + t.Fatalf("ID = %v, want %v", msg.ID, id) } if msg.Attempts != 1 { @@ -62,7 +62,7 @@ func TestConsumerMessageBatch_AckAll(t *testing.T) { ackAllCalled = true return nil })) - b := &ConsumerMessageBatch{ + b := &MessageBatch{ instance: jsObj, } @@ -80,7 +80,7 @@ func TestConsumerMessageBatch_RetryAll(t *testing.T) { retryAllCalled = true return nil })) - b := &ConsumerMessageBatch{ + b := &MessageBatch{ instance: jsObj, } @@ -112,7 +112,7 @@ func TestConsumerMessageBatch_RetryAllWithRetryOption(t *testing.T) { return nil })) - b := &ConsumerMessageBatch{ + b := &MessageBatch{ instance: jsObj, }