Skip to content

Commit 9ad180b

Browse files
authored
Merge pull request #437 from hookdeck/destazureservicebus
feat: implement destazureservicebus
2 parents 9a95f28 + 4bec835 commit 9ad180b

File tree

15 files changed

+1270
-13
lines changed

15 files changed

+1270
-13
lines changed

build/dev/azure/config.json

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,15 @@
4141
}
4242
]
4343
},
44+
// test destination
45+
{
46+
"Name": "destination-test",
47+
"Subscriptions": [
48+
{
49+
"Name": "destination-test-sub"
50+
}
51+
]
52+
},
4453

4554
// tests
4655
{
@@ -53,6 +62,17 @@
5362
"Rules": []
5463
}
5564
]
65+
},
66+
{
67+
"Name": "TestDestinationAzureServiceBusSuite-topic",
68+
"Properties": {},
69+
"Subscriptions": [
70+
{
71+
"Name": "TestDestinationAzureServiceBusSuite-subscription",
72+
"Properties": {},
73+
"Rules": []
74+
}
75+
]
5676
}
5777
]
5878
}
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"log"
8+
"os"
9+
"os/signal"
10+
"syscall"
11+
12+
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
13+
)
14+
15+
// local
16+
const (
17+
TOPIC_NAME = "destination-test"
18+
SUBSCRIPTION_NAME = "destination-test-sub"
19+
CONNECTION_STRING = "Endpoint=sb://localhost;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;"
20+
)
21+
22+
func main() {
23+
if err := run(); err != nil {
24+
panic(err)
25+
}
26+
}
27+
28+
func run() error {
29+
// Create client
30+
client, err := azservicebus.NewClientFromConnectionString(CONNECTION_STRING, nil)
31+
if err != nil {
32+
return fmt.Errorf("failed to create client: %w", err)
33+
}
34+
defer client.Close(context.Background())
35+
36+
// Create receiver for the subscription
37+
receiver, err := client.NewReceiverForSubscription(TOPIC_NAME, SUBSCRIPTION_NAME, nil)
38+
if err != nil {
39+
return fmt.Errorf("failed to create receiver: %w", err)
40+
}
41+
defer receiver.Close(context.Background())
42+
43+
// Set up signal handling for graceful shutdown
44+
termChan := make(chan os.Signal, 1)
45+
signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)
46+
47+
// Start consuming messages
48+
ctx, cancel := context.WithCancel(context.Background())
49+
defer cancel()
50+
51+
go func() {
52+
for {
53+
messages, err := receiver.ReceiveMessages(ctx, 1, nil)
54+
if err != nil {
55+
if ctx.Err() != nil {
56+
// Context cancelled, exit gracefully
57+
return
58+
}
59+
log.Printf("[x] Error receiving messages: %v", err)
60+
continue
61+
}
62+
63+
for _, msg := range messages {
64+
// Log message details
65+
log.Printf("[x] Received message:")
66+
log.Printf(" Message ID: %s", msg.MessageID)
67+
log.Printf(" Sequence Number: %d", *msg.SequenceNumber)
68+
69+
// Log application properties (metadata)
70+
if len(msg.ApplicationProperties) > 0 {
71+
log.Printf(" Metadata:")
72+
for k, v := range msg.ApplicationProperties {
73+
log.Printf(" %s: %v", k, v)
74+
}
75+
}
76+
77+
// Log message body
78+
var data interface{}
79+
if err := json.Unmarshal(msg.Body, &data); err == nil {
80+
// Pretty print JSON
81+
pretty, _ := json.MarshalIndent(data, " ", " ")
82+
log.Printf(" Body (JSON):\n %s", string(pretty))
83+
} else {
84+
// Raw body
85+
log.Printf(" Body (Raw): %s", string(msg.Body))
86+
}
87+
88+
// Complete the message
89+
if err := receiver.CompleteMessage(ctx, msg, nil); err != nil {
90+
log.Printf("[x] Error completing message: %v", err)
91+
}
92+
}
93+
}
94+
}()
95+
96+
// Log configuration
97+
log.Printf("[*] Azure Service Bus Consumer")
98+
log.Printf("[*] Topic: %s", TOPIC_NAME)
99+
log.Printf("[*] Subscription: %s", SUBSCRIPTION_NAME)
100+
log.Printf("[*] Namespace: %s", extractNamespace(CONNECTION_STRING))
101+
log.Printf("[*] Ready to receive messages. Press Ctrl+C to exit.")
102+
103+
// Wait for termination signal
104+
<-termChan
105+
log.Printf("[*] Shutting down...")
106+
cancel()
107+
108+
return nil
109+
}
110+
111+
func extractNamespace(connStr string) string {
112+
// Simple extraction for display purposes
113+
start := len("Endpoint=sb://")
114+
if len(connStr) > start {
115+
end := start
116+
for end < len(connStr) && connStr[end] != '.' {
117+
end++
118+
}
119+
if end > start {
120+
return connStr[start:end]
121+
}
122+
}
123+
return "unknown"
124+
}
Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
# AzureServiceBus Destination configuration
2+
3+
Here's a rough document explaining how AzureServiceBus works and how the destination is implemented with Outpost.
4+
5+
# PubSub vs Queue
6+
7+
Azure ServiceBus supports both PubSub (Topic & Subscription) and Queue. From the Publisher (Azure's term is Sender) perspective, it doesn't really care whether it's publishing to a Topic or to a Queue. So, from the destination config, all we need is a single "name" field.
8+
9+
## Message
10+
11+
Whether it's publishing to Topic or Queue, the Publisher needs to send an Azure's Message. Here's the full Golang SDK Message struct:
12+
13+
```golang
14+
// Message is a message with a body and commonly used properties.
15+
// Properties that are pointers are optional.
16+
type Message struct {
17+
// ApplicationProperties can be used to store custom metadata for a message.
18+
ApplicationProperties map[string]any
19+
20+
// Body corresponds to the first []byte array in the Data section of an AMQP message.
21+
Body []byte
22+
23+
// ContentType describes the payload of the message, with a descriptor following
24+
// the format of Content-Type, specified by RFC2045 (ex: "application/json").
25+
ContentType *string
26+
27+
// CorrelationID allows an application to specify a context for the message for the purposes of
28+
// correlation, for example reflecting the MessageID of a message that is being
29+
// replied to.
30+
CorrelationID *string
31+
32+
// MessageID is an application-defined value that uniquely identifies
33+
// the message and its payload. The identifier is a free-form string.
34+
//
35+
// If enabled, the duplicate detection feature identifies and removes further submissions
36+
// of messages with the same MessageId.
37+
MessageID *string
38+
39+
// PartitionKey is used with a partitioned entity and enables assigning related messages
40+
// to the same internal partition. This ensures that the submission sequence order is correctly
41+
// recorded. The partition is chosen by a hash function in Service Bus and cannot be chosen
42+
// directly.
43+
//
44+
// For session-aware entities, the ReceivedMessage.SessionID overrides this value.
45+
PartitionKey *string
46+
47+
// ReplyTo is an application-defined value specify a reply path to the receiver of the message. When
48+
// a sender expects a reply, it sets the value to the absolute or relative path of the queue or topic
49+
// it expects the reply to be sent to.
50+
ReplyTo *string
51+
52+
// ReplyToSessionID augments the ReplyTo information and specifies which SessionId should
53+
// be set for the reply when sent to the reply entity.
54+
ReplyToSessionID *string
55+
56+
// ScheduledEnqueueTime specifies a time when a message will be enqueued. The message is transferred
57+
// to the broker but will not available until the scheduled time.
58+
ScheduledEnqueueTime *time.Time
59+
60+
// SessionID is used with session-aware entities and associates a message with an application-defined
61+
// session ID. Note that an empty string is a valid session identifier.
62+
// Messages with the same session identifier are subject to summary locking and enable
63+
// exact in-order processing and demultiplexing. For session-unaware entities, this value is ignored.
64+
SessionID *string
65+
66+
// Subject enables an application to indicate the purpose of the message, similar to an email subject line.
67+
Subject *string
68+
69+
// TimeToLive is the duration after which the message expires, starting from the instant the
70+
// message has been accepted and stored by the broker, found in the ReceivedMessage.EnqueuedTime
71+
// property.
72+
//
73+
// When not set explicitly, the assumed value is the DefaultTimeToLive for the queue or topic.
74+
// A message's TimeToLive cannot be longer than the entity's DefaultTimeToLive is silently
75+
// adjusted if it does.
76+
TimeToLive *time.Duration
77+
78+
// To is reserved for future use in routing scenarios but is not currently used by Service Bus.
79+
// Applications can use this value to indicate the logical destination of the message.
80+
To *string
81+
}
82+
```
83+
84+
Here are a few notable configuration, especially on the destination level that we may want to support:
85+
86+
- MessageID --> MessageIDTemplate, similar to AWS Kinesis Parition Key approach
87+
- CorrelationID --> CorrelationIDTemplate, similar to AWS Kinesis Parition Key approach
88+
- PartitionKey --> PartitionKeyTemplate, similar to AWS Kinesis Parition Key approach
89+
90+
- ScheduledEnqueueTime
91+
- TimeToLive
92+
93+
The current implementation doesn't support any of these. So when create destination, it's super straightforward:
94+
95+
```golang
96+
type Config struct {
97+
Name string
98+
}
99+
```
100+
101+
If we want to support these, we can either add them to Config, such as `Config.TTL`, or we can also add a suffix like `Config.MessageTTL` to specify that these config would apply to the Message.
102+
103+
## Authentication
104+
105+
For authentication, we currently support "connection_string" which by default have access to the full Namespace.
106+
107+
## Creating Topic/Queue-Specific Access Policy
108+
109+
### For a Topic (Send-only access):
110+
111+
Create a Send-only policy for a specific topic
112+
113+
az servicebus topic authorization-rule create \
114+
--resource-group outpost-demo-rg \
115+
--namespace-name outpost-demo-sb-${RANDOM_SUFFIX} \
116+
--topic-name events \
117+
--name SendOnlyPolicy \
118+
--rights Send
119+
120+
Get the Topic-Specific Connection String:
121+
122+
az servicebus topic authorization-rule keys list \
123+
--resource-group outpost-demo-rg \
124+
--namespace-name outpost-demo-sb-${RANDOM_SUFFIX} \
125+
--topic-name events \
126+
--name SendOnlyPolicy \
127+
--query primaryConnectionString \
128+
--output tsv
129+
130+
This returns a connection string that can only send to the events topic:
131+
Endpoint=sb://outpost-demo-sb-a3f2b1.servicebus.windows.net/;SharedAccessKeyName=Send
132+
OnlyPolicy;SharedAccessKey=xyz789...;EntityPath=events
133+
134+
### For Queues (similar approach):
135+
136+
Create a Send-only policy for a specific queue
137+
az servicebus queue authorization-rule create \
138+
--resource-group outpost-demo-rg \
139+
--namespace-name outpost-demo-sb-${RANDOM_SUFFIX} \
140+
--queue-name myqueue \
141+
--name SendOnlyPolicy \
142+
--rights Send
143+
144+
Available Permission Rights:
145+
146+
- Send - Can only send messages
147+
- Listen - Can only receive messages
148+
- Manage - Full control (send, receive, manage)
149+
150+
You can combine multiple rights:
151+
--rights Send Listen # Can both send and receive
152+
153+
Benefits of Entity-Level Access:
154+
155+
1. Security: Limits blast radius if credentials are compromised
156+
2. Principle of Least Privilege: Outpost only needs Send permission
157+
3. Audit Trail: Can track which policy is being used
158+
4. Rotation: Can rotate entity-specific keys without affecting other services
159+
160+
Important Notes:
161+
162+
- Entity-level connection strings include EntityPath parameter
163+
- These policies are scoped to a single topic/queue
164+
- Perfect for production where you want to limit Outpost to only sending to specific
165+
topics
166+
- The connection string format is the same, just with limited scope
167+
168+
This is the recommended approach for production use - give Outpost only the minimum
169+
permissions it needs (Send) and only to the specific topic/queue it should access.
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
# Test AzureServiceBus Destination
2+
3+
Assuming you have an Azure account & an authenticated Azure CLI, here are the steps you can do to set up a test ServiceBus Topic & Subscription for testing.
4+
5+
Here are the resources you'll set up:
6+
7+
1. Resource Group - A container that holds related Azure resources
8+
- Name: outpost-demo-rg
9+
2. Service Bus Namespace - The messaging service container (must be globally unique)
10+
- Name: outpost-demo-sb-[RANDOM] (e.g., outpost-demo-sb-a3f2b1)
11+
3. Topic - A message distribution mechanism (pub/sub pattern)
12+
- Name: destination-test
13+
4. Subscription - A receiver for messages sent to the topic
14+
- Name: destination-test-sub
15+
16+
Step 1: Create a Resource Group
17+
18+
az group create \
19+
--name outpost-demo-rg \
20+
--location eastus
21+
22+
Step 2: Create a Service Bus Namespace
23+
24+
# Generate a random suffix for uniqueness
25+
RANDOM_SUFFIX=$(openssl rand -hex 3)
26+
27+
# Create the namespace
28+
az servicebus namespace create \
29+
--resource-group outpost-demo-rg \
30+
--name outpost-demo-sb-${RANDOM_SUFFIX} \
31+
--location eastus \
32+
--sku Standard
33+
34+
Note: The namespace name must be globally unique. The Standard SKU is required for topics (Basic only
35+
supports queues).
36+
37+
Step 3: Create a Topic
38+
39+
az servicebus topic create \
40+
--resource-group outpost-demo-rg \
41+
--namespace-name outpost-demo-sb-${RANDOM_SUFFIX} \
42+
--name destination-test
43+
44+
Step 4: Create a Subscription
45+
46+
az servicebus topic subscription create \
47+
--resource-group outpost-demo-rg \
48+
--namespace-name outpost-demo-sb-${RANDOM_SUFFIX} \
49+
--topic-name destination-test \
50+
--name destination-test-sub
51+
52+
Step 5: Get the Connection String
53+
54+
az servicebus namespace authorization-rule keys list \
55+
--resource-group outpost-demo-rg \
56+
--namespace-name outpost-demo-sb-${RANDOM_SUFFIX} \
57+
--name RootManageSharedAccessKey \
58+
--query primaryConnectionString \
59+
--output tsv
60+
61+
Example Output:
62+
Endpoint=sb://outpost-demo-sb-a3f2b1.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;Sh
63+
aredAccessKey=abcd1234...
64+
65+
You can then create an Azure destination with:
66+
- name: "destination-test"
67+
- connection string: "Endpoint=sb://outpost-demo-sb-a3f2b1.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;Sh
68+
aredAccessKey=abcd1234..."

0 commit comments

Comments
 (0)