Skip to content

Commit 9a47018

Browse files
committed
Decouple: Add a output queue
Without any output queue, decouple drop a packet, before sink.http have time to increase the number of workers, to handle more requests.
1 parent 7dd4d6b commit 9a47018

File tree

2 files changed

+12
-3
lines changed

2 files changed

+12
-3
lines changed

README.md

+5-2
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,9 @@ In the following example we decouple the sink from the source to avoid slowing d
204204
},
205205
{
206206
"type": "control.decouple",
207-
"config": {}
207+
"config": {
208+
"queue_size": 500
209+
}
208210
},
209211
{
210212
"type": "sink.http",
@@ -218,7 +220,8 @@ In the following example we decouple the sink from the source to avoid slowing d
218220

219221
| Param | Value |
220222
| ------- | ----------------------------------------------------- |
221-
| `quiet` | Do not log dropped messages summary. Default: `false` |
223+
| `quiet` | Do not log dropped messages summary. Default: `false` |
224+
| `queue_size` | Size of the output queue. Default: `100` |
222225

223226
#### control.rate_limit
224227

mirror/modules/control/decouple.go

+7-1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ func init() {
3030

3131
type DecoupleConfig struct {
3232
Quiet bool `json:"log"`
33+
QueueSize int `json:"queue_size"`
3334
}
3435

3536
type Decouple struct {
@@ -45,9 +46,14 @@ func NewDecouple(ctx *mirror.ModuleContext, cfg []byte) (mirror.Module, error) {
4546
return nil, err
4647
}
4748

49+
queueSize := 100
50+
if c.QueueSize > 0 {
51+
queueSize = c.QueueSize
52+
}
53+
4854
mod := &Decouple{
4955
ctx: ctx,
50-
out: make(chan mirror.Request),
56+
out: make(chan mirror.Request, queueSize),
5157
quiet: c.Quiet,
5258
}
5359

0 commit comments

Comments
 (0)