From 85790d319c7b6b9c33ac381a437effbce3f13b7e Mon Sep 17 00:00:00 2001 From: syumai Date: Wed, 15 Jan 2025 20:57:29 +0900 Subject: [PATCH 1/5] remove IntBody and FloatBody from consumer message --- cloudflare/queues/consumermessage.go | 16 ---- cloudflare/queues/consumermessage_test.go | 94 ----------------------- 2 files changed, 110 deletions(-) diff --git a/cloudflare/queues/consumermessage.go b/cloudflare/queues/consumermessage.go index e1f2bec..b0e1203 100644 --- a/cloudflare/queues/consumermessage.go +++ b/cloudflare/queues/consumermessage.go @@ -80,19 +80,3 @@ func (m *ConsumerMessage) BytesBody() ([]byte, error) { return nil, fmt.Errorf("message body is not a byte array: %v", m.Body) } - -func (m *ConsumerMessage) IntBody() (int, error) { - if m.Body.Type() == js.TypeNumber { - return m.Body.Int(), nil - } - - return 0, fmt.Errorf("message body is not a number: %v", m.Body) -} - -func (m *ConsumerMessage) FloatBody() (float64, error) { - if m.Body.Type() == js.TypeNumber { - return m.Body.Float(), nil - } - - return 0, fmt.Errorf("message body is not a number: %v", m.Body) -} diff --git a/cloudflare/queues/consumermessage_test.go b/cloudflare/queues/consumermessage_test.go index c285ee2..909c0cd 100644 --- a/cloudflare/queues/consumermessage_test.go +++ b/cloudflare/queues/consumermessage_test.go @@ -225,97 +225,3 @@ func TestConsumerMessage_BytesBody(t *testing.T) { }) } } - -func TestConsumerMessage_IntBody(t *testing.T) { - tests := []struct { - name string - body js.Value - want int - wantErr bool - }{ - { - name: "int", - body: js.ValueOf(42), - want: 42, - }, - { - name: "float", - body: js.ValueOf(42.5), - want: 42, - }, - { - name: "string", - body: js.ValueOf("42"), - wantErr: true, - }, - { - name: "undefined", - body: js.Undefined(), - wantErr: true, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - m := &ConsumerMessage{ - Body: tt.body, - } - - got, err := m.IntBody() - if (err != nil) != tt.wantErr { - t.Fatalf("IntBody() error = %v, wantErr %v", err, tt.wantErr) - } - - if got != tt.want { - t.Fatalf("IntBody() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestConsumerMessage_FloatBody(t *testing.T) { - tests := []struct { - name string - body js.Value - want float64 - wantErr bool - }{ - { - name: "int", - body: js.ValueOf(42), - want: 42.0, - }, - { - name: "float", - body: js.ValueOf(42.5), - want: 42.5, - }, - { - name: "string", - body: js.ValueOf("42"), - wantErr: true, - }, - { - name: "undefined", - body: js.Undefined(), - wantErr: true, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - m := &ConsumerMessage{ - Body: tt.body, - } - - got, err := m.FloatBody() - if (err != nil) != tt.wantErr { - t.Fatalf("FloatBody() error = %v, wantErr %v", err, tt.wantErr) - } - - if got != tt.want { - t.Fatalf("FloatBody() = %v, want %v", got, tt.want) - } - }) - } -} From 0d08d319d69fcf16b2f1af2d091e9cd9834d08e0 Mon Sep 17 00:00:00 2001 From: syumai Date: Wed, 15 Jan 2025 21:03:29 +0900 Subject: [PATCH 2/5] stop treating string body as bytes body --- cloudflare/queues/consumermessage.go | 17 ++++++----------- cloudflare/queues/consumermessage_test.go | 9 +-------- 2 files changed, 7 insertions(+), 19 deletions(-) diff --git a/cloudflare/queues/consumermessage.go b/cloudflare/queues/consumermessage.go index b0e1203..a0ebba2 100644 --- a/cloudflare/queues/consumermessage.go +++ b/cloudflare/queues/consumermessage.go @@ -67,16 +67,11 @@ func (m *ConsumerMessage) StringBody() (string, error) { } func (m *ConsumerMessage) BytesBody() ([]byte, error) { - switch m.Body.Type() { - case js.TypeString: - return []byte(m.Body.String()), nil - case js.TypeObject: - if m.Body.InstanceOf(jsutil.Uint8ArrayClass) || m.Body.InstanceOf(jsutil.Uint8ClampedArrayClass) { - b := make([]byte, m.Body.Get("byteLength").Int()) - js.CopyBytesToGo(b, m.Body) - return b, nil - } + if m.Body.Type() != js.TypeObject || + !(m.Body.InstanceOf(jsutil.Uint8ArrayClass) || m.Body.InstanceOf(jsutil.Uint8ClampedArrayClass)) { + return nil, fmt.Errorf("message body is not a byte array: %v", m.Body) } - - return nil, fmt.Errorf("message body is not a byte array: %v", m.Body) + b := make([]byte, m.Body.Get("byteLength").Int()) + js.CopyBytesToGo(b, m.Body) + return b, nil } diff --git a/cloudflare/queues/consumermessage_test.go b/cloudflare/queues/consumermessage_test.go index 909c0cd..8c229c1 100644 --- a/cloudflare/queues/consumermessage_test.go +++ b/cloudflare/queues/consumermessage_test.go @@ -174,13 +174,6 @@ func TestConsumerMessage_BytesBody(t *testing.T) { want []byte wantErr bool }{ - { - name: "string", - body: func() js.Value { - return js.ValueOf("hello") - }, - want: []byte("hello"), - }, { name: "uint8 array", body: func() js.Value { @@ -202,7 +195,7 @@ func TestConsumerMessage_BytesBody(t *testing.T) { { name: "incorrect type", body: func() js.Value { - return js.ValueOf(42) + return js.ValueOf("hello") }, wantErr: true, }, From 304350426f2bd73ed4cb443cbacaa56a3a1f6daa Mon Sep 17 00:00:00 2001 From: syumai Date: Wed, 15 Jan 2025 21:09:14 +0900 Subject: [PATCH 3/5] add comment to rename batch message --- cloudflare/queues/batchmessage.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cloudflare/queues/batchmessage.go b/cloudflare/queues/batchmessage.go index c54ba3c..18185c0 100644 --- a/cloudflare/queues/batchmessage.go +++ b/cloudflare/queues/batchmessage.go @@ -6,6 +6,8 @@ import ( "github.com/syumai/workers/internal/jsutil" ) +// FIXME: rename to MessageSendRequest +// see: https://developers.cloudflare.com/queues/configuration/javascript-apis/#messagesendrequest type BatchMessage struct { body js.Value options *sendOptions From 13b10a693c71200560d999062f484613ab90a0a4 Mon Sep 17 00:00:00 2001 From: syumai Date: Wed, 15 Jan 2025 21:11:32 +0900 Subject: [PATCH 4/5] remove name prefix from ConsumerMessage / ConsumerMessageBatch --- _examples/queues/main.go | 2 +- cloudflare/queues/consumer.go | 4 ++-- .../queues/{consumermessage.go => message.go} | 16 ++++++++-------- ...consumermessage_test.go => message_test.go} | 14 +++++++------- ...consumermessagebatch.go => messagebatch.go} | 18 +++++++++--------- ...ssagebatch_test.go => messagebatch_test.go} | 10 +++++----- 6 files changed, 32 insertions(+), 32 deletions(-) rename cloudflare/queues/{consumermessage.go => message.go} (82%) rename cloudflare/queues/{consumermessage_test.go => message_test.go} (94%) rename cloudflare/queues/{consumermessagebatch.go => messagebatch.go} (73%) rename cloudflare/queues/{consumermessagebatch_test.go => messagebatch_test.go} (92%) diff --git a/_examples/queues/main.go b/_examples/queues/main.go index 80d0f4a..7aa9258 100644 --- a/_examples/queues/main.go +++ b/_examples/queues/main.go @@ -105,7 +105,7 @@ func produceBytes(q *queues.Producer, req *http.Request) error { return nil } -func consumeBatch(batch *queues.ConsumerMessageBatch) error { +func consumeBatch(batch *queues.MessageBatch) error { for _, msg := range batch.Messages { log.Printf("Received message: %v\n", msg.Body.Get("name").String()) } diff --git a/cloudflare/queues/consumer.go b/cloudflare/queues/consumer.go index 54f8099..606dbf8 100644 --- a/cloudflare/queues/consumer.go +++ b/cloudflare/queues/consumer.go @@ -12,7 +12,7 @@ import ( // A returned error will cause the batch to be retried (unless the batch or individual messages are acked). // NOTE: to do long-running message processing task within the Consumer, use cloudflare.WaitUntil, this will postpone the message // acknowledgment until the task is completed witout blocking the queue consumption. -type Consumer func(batch *ConsumerMessageBatch) error +type Consumer func(batch *MessageBatch) error var consumer Consumer @@ -44,7 +44,7 @@ func init() { } func consumeBatch(batch js.Value) error { - b, err := newConsumerMessageBatch(batch) + b, err := newMessageBatch(batch) if err != nil { return fmt.Errorf("failed to parse message batch: %v", err) } diff --git a/cloudflare/queues/consumermessage.go b/cloudflare/queues/message.go similarity index 82% rename from cloudflare/queues/consumermessage.go rename to cloudflare/queues/message.go index a0ebba2..dfdb7ac 100644 --- a/cloudflare/queues/consumermessage.go +++ b/cloudflare/queues/message.go @@ -8,9 +8,9 @@ import ( "github.com/syumai/workers/internal/jsutil" ) -// ConsumerMessage represents a message of the batch received by the consumer. +// Message represents a message of the batch received by the consumer. // - https://developers.cloudflare.com/queues/configuration/javascript-apis/#message -type ConsumerMessage struct { +type Message struct { // instance - The underlying instance of the JS message object passed by the cloudflare instance js.Value @@ -24,13 +24,13 @@ type ConsumerMessage struct { Attempts int } -func newConsumerMessage(obj js.Value) (*ConsumerMessage, error) { +func newMessage(obj js.Value) (*Message, error) { timestamp, err := jsutil.DateToTime(obj.Get("timestamp")) if err != nil { return nil, fmt.Errorf("failed to parse message timestamp: %v", err) } - return &ConsumerMessage{ + return &Message{ instance: obj, Id: obj.Get("id").String(), Body: obj.Get("body"), @@ -41,13 +41,13 @@ func newConsumerMessage(obj js.Value) (*ConsumerMessage, error) { // Ack acknowledges the message as successfully delivered despite the result returned from the consuming function. // - https://developers.cloudflare.com/queues/configuration/javascript-apis/#message -func (m *ConsumerMessage) Ack() { +func (m *Message) Ack() { m.instance.Call("ack") } // Retry marks the message to be re-delivered. // The message will be retried after the optional delay configured with RetryOption. -func (m *ConsumerMessage) Retry(opts ...RetryOption) { +func (m *Message) Retry(opts ...RetryOption) { var o *retryOptions if len(opts) > 0 { o = &retryOptions{} @@ -59,14 +59,14 @@ func (m *ConsumerMessage) Retry(opts ...RetryOption) { m.instance.Call("retry", o.toJS()) } -func (m *ConsumerMessage) StringBody() (string, error) { +func (m *Message) StringBody() (string, error) { if m.Body.Type() != js.TypeString { return "", fmt.Errorf("message body is not a string: %v", m.Body) } return m.Body.String(), nil } -func (m *ConsumerMessage) BytesBody() ([]byte, error) { +func (m *Message) BytesBody() ([]byte, error) { if m.Body.Type() != js.TypeObject || !(m.Body.InstanceOf(jsutil.Uint8ArrayClass) || m.Body.InstanceOf(jsutil.Uint8ClampedArrayClass)) { return nil, fmt.Errorf("message body is not a byte array: %v", m.Body) diff --git a/cloudflare/queues/consumermessage_test.go b/cloudflare/queues/message_test.go similarity index 94% rename from cloudflare/queues/consumermessage_test.go rename to cloudflare/queues/message_test.go index 8c229c1..f461970 100644 --- a/cloudflare/queues/consumermessage_test.go +++ b/cloudflare/queues/message_test.go @@ -20,9 +20,9 @@ func TestNewConsumerMessage(t *testing.T) { "attempts": 1, } - got, err := newConsumerMessage(js.ValueOf(m)) + got, err := newMessage(js.ValueOf(m)) if err != nil { - t.Fatalf("newConsumerMessage failed: %v", err) + t.Fatalf("newMessage failed: %v", err) } if body := got.Body.String(); body != "hello" { @@ -49,7 +49,7 @@ func TestConsumerMessage_Ack(t *testing.T) { ackCalled = true return nil })) - m := &ConsumerMessage{ + m := &Message{ instance: jsObj, } @@ -67,7 +67,7 @@ func TestConsumerMessage_Retry(t *testing.T) { retryCalled = true return nil })) - m := &ConsumerMessage{ + m := &Message{ instance: jsObj, } @@ -99,7 +99,7 @@ func TestConsumerMessage_RetryWithDelay(t *testing.T) { return nil })) - m := &ConsumerMessage{ + m := &Message{ instance: jsObj, } @@ -151,7 +151,7 @@ func TestNewConsumerMessage_StringBody(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - m := &ConsumerMessage{ + m := &Message{ Body: tt.body(), } @@ -203,7 +203,7 @@ func TestConsumerMessage_BytesBody(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - m := &ConsumerMessage{ + m := &Message{ Body: tt.body(), } diff --git a/cloudflare/queues/consumermessagebatch.go b/cloudflare/queues/messagebatch.go similarity index 73% rename from cloudflare/queues/consumermessagebatch.go rename to cloudflare/queues/messagebatch.go index 976b0bf..b21055f 100644 --- a/cloudflare/queues/consumermessagebatch.go +++ b/cloudflare/queues/messagebatch.go @@ -5,11 +5,11 @@ import ( "syscall/js" ) -// ConsumerMessageBatch represents a batch of messages received by the consumer. The size of the batch is determined by the +// MessageBatch represents a batch of messages received by the consumer. The size of the batch is determined by the // worker configuration. // - https://developers.cloudflare.com/queues/configuration/configure-queues/#consumer // - https://developers.cloudflare.com/queues/configuration/javascript-apis/#messagebatch -type ConsumerMessageBatch struct { +type MessageBatch struct { // instance - The underlying instance of the JS message object passed by the cloudflare instance js.Value @@ -17,21 +17,21 @@ type ConsumerMessageBatch struct { Queue string // Messages - The messages in the batch - Messages []*ConsumerMessage + Messages []*Message } -func newConsumerMessageBatch(obj js.Value) (*ConsumerMessageBatch, error) { +func newMessageBatch(obj js.Value) (*MessageBatch, error) { msgArr := obj.Get("messages") - messages := make([]*ConsumerMessage, msgArr.Length()) + messages := make([]*Message, msgArr.Length()) for i := 0; i < msgArr.Length(); i++ { - m, err := newConsumerMessage(msgArr.Index(i)) + m, err := newMessage(msgArr.Index(i)) if err != nil { return nil, fmt.Errorf("failed to parse message %d: %v", i, err) } messages[i] = m } - return &ConsumerMessageBatch{ + return &MessageBatch{ instance: obj, Queue: obj.Get("queue").String(), Messages: messages, @@ -40,14 +40,14 @@ func newConsumerMessageBatch(obj js.Value) (*ConsumerMessageBatch, error) { // AckAll acknowledges all messages in the batch as successfully delivered despite the result returned from the consuming function. // - https://developers.cloudflare.com/queues/configuration/javascript-apis/#messagebatch -func (b *ConsumerMessageBatch) AckAll() { +func (b *MessageBatch) AckAll() { b.instance.Call("ackAll") } // RetryAll marks all messages in the batch to be re-delivered. // The messages will be retried after the optional delay configured with RetryOption. // - https://developers.cloudflare.com/queues/configuration/javascript-apis/#messagebatch -func (b *ConsumerMessageBatch) RetryAll(opts ...RetryOption) { +func (b *MessageBatch) RetryAll(opts ...RetryOption) { var o *retryOptions if len(opts) > 0 { o = &retryOptions{} diff --git a/cloudflare/queues/consumermessagebatch_test.go b/cloudflare/queues/messagebatch_test.go similarity index 92% rename from cloudflare/queues/consumermessagebatch_test.go rename to cloudflare/queues/messagebatch_test.go index ec76dc6..2c58777 100644 --- a/cloudflare/queues/consumermessagebatch_test.go +++ b/cloudflare/queues/messagebatch_test.go @@ -24,9 +24,9 @@ func TestNewConsumerMessageBatch(t *testing.T) { }, } - got, err := newConsumerMessageBatch(js.ValueOf(m)) + got, err := newMessageBatch(js.ValueOf(m)) if err != nil { - t.Fatalf("newConsumerMessageBatch failed: %v", err) + t.Fatalf("newMessageBatch failed: %v", err) } if got.Queue != "some-queue" { @@ -62,7 +62,7 @@ func TestConsumerMessageBatch_AckAll(t *testing.T) { ackAllCalled = true return nil })) - b := &ConsumerMessageBatch{ + b := &MessageBatch{ instance: jsObj, } @@ -80,7 +80,7 @@ func TestConsumerMessageBatch_RetryAll(t *testing.T) { retryAllCalled = true return nil })) - b := &ConsumerMessageBatch{ + b := &MessageBatch{ instance: jsObj, } @@ -112,7 +112,7 @@ func TestConsumerMessageBatch_RetryAllWithRetryOption(t *testing.T) { return nil })) - b := &ConsumerMessageBatch{ + b := &MessageBatch{ instance: jsObj, } From 9047d7f30d127388054666385d77e24f355db1ff Mon Sep 17 00:00:00 2001 From: syumai Date: Wed, 15 Jan 2025 21:13:20 +0900 Subject: [PATCH 5/5] rename Id to ID --- cloudflare/queues/message.go | 6 +++--- cloudflare/queues/message_test.go | 4 ++-- cloudflare/queues/messagebatch_test.go | 4 ++-- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/cloudflare/queues/message.go b/cloudflare/queues/message.go index dfdb7ac..bf41a48 100644 --- a/cloudflare/queues/message.go +++ b/cloudflare/queues/message.go @@ -14,8 +14,8 @@ type Message struct { // instance - The underlying instance of the JS message object passed by the cloudflare instance js.Value - // Id - The unique Cloudflare-generated identifier of the message - Id string + // ID - The unique Cloudflare-generated identifier of the message + ID string // Timestamp - The time when the message was enqueued Timestamp time.Time // Body - The message body. Could be accessed directly or using converting helpers as StringBody, BytesBody, IntBody, FloatBody. @@ -32,7 +32,7 @@ func newMessage(obj js.Value) (*Message, error) { return &Message{ instance: obj, - Id: obj.Get("id").String(), + ID: obj.Get("id").String(), Body: obj.Get("body"), Attempts: obj.Get("attempts").Int(), Timestamp: timestamp, diff --git a/cloudflare/queues/message_test.go b/cloudflare/queues/message_test.go index f461970..ab21208 100644 --- a/cloudflare/queues/message_test.go +++ b/cloudflare/queues/message_test.go @@ -29,8 +29,8 @@ func TestNewConsumerMessage(t *testing.T) { t.Fatalf("Body() = %v, want %v", body, "hello") } - if got.Id != id { - t.Fatalf("Id = %v, want %v", got.Id, id) + if got.ID != id { + t.Fatalf("ID = %v, want %v", got.ID, id) } if got.Attempts != 1 { diff --git a/cloudflare/queues/messagebatch_test.go b/cloudflare/queues/messagebatch_test.go index 2c58777..0f847dc 100644 --- a/cloudflare/queues/messagebatch_test.go +++ b/cloudflare/queues/messagebatch_test.go @@ -42,8 +42,8 @@ func TestNewConsumerMessageBatch(t *testing.T) { t.Fatalf("Body() = %v, want %v", body, "hello") } - if msg.Id != id { - t.Fatalf("Id = %v, want %v", msg.Id, id) + if msg.ID != id { + t.Fatalf("ID = %v, want %v", msg.ID, id) } if msg.Attempts != 1 {