-
Notifications
You must be signed in to change notification settings - Fork 16
/
Copy pathcommand.go
118 lines (101 loc) · 2.9 KB
/
command.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package kubemq
import (
"context"
"fmt"
"time"
)
type Command struct {
Id string
Channel string
Metadata string
Body []byte
Timeout time.Duration
ClientId string
Tags map[string]string
transport Transport
trace *Trace
}
func NewCommand() *Command {
return &Command{}
}
// SetId - set command requestId, otherwise new random uuid will be set
func (c *Command) SetId(id string) *Command {
c.Id = id
return c
}
// SetClientId - set command ClientId - mandatory if default client was not set
func (c *Command) SetClientId(clientId string) *Command {
c.ClientId = clientId
return c
}
// SetChannel - set command channel - mandatory if default channel was not set
func (c *Command) SetChannel(channel string) *Command {
c.Channel = channel
return c
}
// SetMetadata - set command metadata - mandatory if body field is empty
func (c *Command) SetMetadata(metadata string) *Command {
c.Metadata = metadata
return c
}
// SetBody - set command body - mandatory if metadata field is empty
func (c *Command) SetBody(body []byte) *Command {
c.Body = body
return c
}
// SetTimeout - set timeout for command to be returned. if timeout expired , send command will result with an error
func (c *Command) SetTimeout(timeout time.Duration) *Command {
c.Timeout = timeout
return c
}
// SetTags - set key value tags to command message
func (c *Command) SetTags(tags map[string]string) *Command {
c.Tags = map[string]string{}
for key, value := range tags {
c.Tags[key] = value
}
return c
}
// AddTag - add key value tags to command message
func (c *Command) AddTag(key, value string) *Command {
if c.Tags == nil {
c.Tags = map[string]string{}
}
c.Tags[key] = value
return c
}
// AddTrace - add tracing support to command
func (c *Command) AddTrace(name string) *Trace {
c.trace = CreateTrace(name)
return c.trace
}
// Send - sending command , waiting for response or timeout
func (c *Command) Send(ctx context.Context) (*CommandResponse, error) {
if c.transport == nil {
return nil, ErrNoTransportDefined
}
return c.transport.SendCommand(ctx, c)
}
type CommandReceive struct {
Id string
ClientId string
Channel string
Metadata string
Body []byte
ResponseTo string
Tags map[string]string
}
func (cr *CommandReceive) String() string {
return fmt.Sprintf("Id: %s, ClientId: %s, Channel: %s, Metadata: %s, Body: %s, ResponseTo: %s, Tags: %v", cr.Id, cr.ClientId, cr.Channel, cr.Metadata, string(cr.Body), cr.ResponseTo, cr.Tags)
}
type CommandResponse struct {
CommandId string
ResponseClientId string
Executed bool
ExecutedAt time.Time
Error string
Tags map[string]string
}
func (cr *CommandResponse) String() string {
return fmt.Sprintf("CommandId: %s, ResponseClientId: %s, Executed: %v, ExecutedAt: %s, Error: %s, Tags: %v", cr.CommandId, cr.ResponseClientId, cr.Executed, cr.ExecutedAt.String(), cr.Error, cr.Tags)
}