-
Notifications
You must be signed in to change notification settings - Fork 16
/
Copy pathquery.go
135 lines (116 loc) · 3.51 KB
/
query.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package kubemq
import (
"context"
"fmt"
"time"
)
type Query struct {
Id string
Channel string
Metadata string
Body []byte
Timeout time.Duration
ClientId string
CacheKey string
CacheTTL time.Duration
Tags map[string]string
transport Transport
trace *Trace
}
func NewQuery() *Query {
return &Query{}
}
// SetId - set query requestId, otherwise new random uuid will be set
func (q *Query) SetId(id string) *Query {
q.Id = id
return q
}
// SetClientId - set query ClientId - mandatory if default client was not set
func (q *Query) SetClientId(clientId string) *Query {
q.ClientId = clientId
return q
}
// SetChannel - set query channel - mandatory if default channel was not set
func (q *Query) SetChannel(channel string) *Query {
q.Channel = channel
return q
}
// SetMetadata - set query metadata - mandatory if body field is empty
func (q *Query) SetMetadata(metadata string) *Query {
q.Metadata = metadata
return q
}
// SetBody - set query body - mandatory if metadata field is empty
func (q *Query) SetBody(body []byte) *Query {
q.Body = body
return q
}
// SetTags - set key value tags to query message
func (q *Query) SetTags(tags map[string]string) *Query {
q.Tags = map[string]string{}
for key, value := range tags {
q.Tags[key] = value
}
return q
}
// AddTag - add key value tags to query message
func (q *Query) AddTag(key, value string) *Query {
if q.Tags == nil {
q.Tags = map[string]string{}
}
q.Tags[key] = value
return q
}
// SetTimeout - set timeout for query to be returned. if timeout expired , send query will result with an error
func (q *Query) SetTimeout(timeout time.Duration) *Query {
q.Timeout = timeout
return q
}
// SetCacheKey - set cache key to retrieve already stored query response, otherwise the response for this query will be stored in cache for future query requests
func (q *Query) SetCacheKey(cacheKey string) *Query {
q.CacheKey = cacheKey
return q
}
// SetCacheTTL - set cache time to live for the this query cache key response to be retrieved already stored query response, if not set default cacheTTL will be set
func (q *Query) SetCacheTTL(ttl time.Duration) *Query {
q.CacheTTL = ttl
return q
}
// Send - sending query request , waiting for response or timeout
func (q *Query) Send(ctx context.Context) (*QueryResponse, error) {
if q.transport == nil {
return nil, ErrNoTransportDefined
}
return q.transport.SendQuery(ctx, q)
}
// AddTrace - add tracing support to query
func (q *Query) AddTrace(name string) *Trace {
q.trace = CreateTrace(name)
return q.trace
}
type QueryReceive struct {
Id string
Channel string
ClientId string
Metadata string
Body []byte
ResponseTo string
Tags map[string]string
}
func (qr *QueryReceive) String() string {
return fmt.Sprintf("Id: %s, ClientId: %s, Channel: %s, Metadata: %s, Body: %s, ResponseTo: %s, Tags: %v", qr.Id, qr.ClientId, qr.Channel, qr.Metadata, string(qr.Body), qr.ResponseTo, qr.Tags)
}
type QueryResponse struct {
QueryId string
Executed bool
ExecutedAt time.Time
Metadata string
ResponseClientId string
Body []byte
CacheHit bool
Error string
Tags map[string]string
}
func (qr *QueryResponse) String() string {
return fmt.Sprintf("QueryId: %s, Executed: %v, ExecutedAt: %s, Metadata: %s, ResponseClientId: %s, Body: %s, CacheHit: %v, Error: %s, Tags: %v", qr.QueryId, qr.Executed, qr.ExecutedAt.String(), qr.Metadata, qr.ResponseClientId, string(qr.Body), qr.CacheHit, qr.Error, qr.Tags)
}