-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathcontext.go
186 lines (153 loc) · 4.58 KB
/
context.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
package raiden
import (
"context"
"encoding/json"
"errors"
"github.com/valyala/fasthttp"
"go.opentelemetry.io/otel/trace"
)
type (
// The `Context` interface defines a set of methods that can be implemented by a struct to provide a
// context for handling HTTP requests in the Raiden framework.
Context interface {
Ctx() context.Context
SetCtx(ctx context.Context)
Config() *Config
SendRpc(Rpc) error
ExecuteRpc(Rpc) (any, error)
SendJson(data any) error
SendError(message string) error
SendErrorWithCode(statusCode int, err error) error
RequestContext() *fasthttp.RequestCtx
Span() trace.Span
SetSpan(span trace.Span)
Tracer() trace.Tracer
NewJobCtx() (JobContext, error)
Write(data []byte)
WriteError(err error)
Set(key string, value any)
Get(key string) any
Publish(ctx context.Context, provider PubSubProviderType, topic string, message []byte) error
}
// The `Ctx` struct is a struct that implements the `Context` interface in the Raiden framework. It
// embeds the `context.Context` and `*fasthttp.RequestCtx` types, which provide the context and request
// information for handling HTTP requests. Additionally, it has fields for storing the configuration
// (`config`), span (`span`), and tracer (`tracer`) for tracing and monitoring purposes.
Ctx struct {
context.Context
*fasthttp.RequestCtx
config *Config
span trace.Span
tracer trace.Tracer
jobChan chan JobParams
data map[string]any
pubSub PubSub
}
)
func (c *Ctx) Config() *Config {
return c.config
}
func (c *Ctx) SendRpc(rpc Rpc) error {
rs, err := c.ExecuteRpc(rpc)
if err != nil {
return err
}
return c.SendJson(rs)
}
func (c *Ctx) ExecuteRpc(rpc Rpc) (any, error) {
return ExecuteRpc(c, rpc)
}
func (c *Ctx) RequestContext() *fasthttp.RequestCtx {
return c.RequestCtx
}
func (c *Ctx) Span() trace.Span {
return c.span
}
func (c *Ctx) SetSpan(span trace.Span) {
c.span = span
}
func (c *Ctx) Tracer() trace.Tracer {
return c.tracer
}
func (c *Ctx) SetJobChan(jobChan chan JobParams) {
c.jobChan = jobChan
}
func (c *Ctx) NewJobCtx() (JobContext, error) {
if c.jobChan != nil {
jobCtx := newJobCtx(c.config, c.pubSub, c.jobChan, make(JobData))
spanCtx := trace.SpanContextFromContext(c.Context)
jobCtx.SetContext(trace.ContextWithSpanContext(context.Background(), spanCtx))
return jobCtx, nil
}
return nil, errors.New(("event channel not available, enable scheduler to use this feature"))
}
func (c *Ctx) Ctx() context.Context {
return c.Context
}
func (c *Ctx) SetCtx(ctx context.Context) {
c.Context = ctx
}
func (c *Ctx) Get(key string) any {
if c.data == nil {
c.data = make(map[string]any)
}
return c.data[key]
}
func (c *Ctx) Set(key string, value any) {
if c.data == nil {
c.data = make(map[string]any)
}
c.data[key] = value
}
func (c *Ctx) Publish(ctx context.Context, provider PubSubProviderType, topic string, message []byte) error {
if c.pubSub == nil {
return errors.New("unable to publish because pubsub not initialize")
}
return c.pubSub.Publish(ctx, provider, topic, message)
}
// The `SendJson` function is a method of the `Ctx` struct in the Raiden framework. It is responsible
// for sending a JSON response to the client.
func (c *Ctx) SendJson(data any) error {
c.Response.Header.SetContentType("application/json")
byteData, err := json.Marshal(data)
if err != nil {
return err
}
c.Write(byteData)
return nil
}
func (c *Ctx) SendError(message string) error {
return &ErrorResponse{
Message: message,
StatusCode: fasthttp.StatusInternalServerError,
}
}
func (c *Ctx) SendErrorWithCode(statusCode int, err error) error {
return &ErrorResponse{
Message: err.Error(),
StatusCode: statusCode,
Code: fasthttp.StatusMessage(statusCode),
}
}
// The `WriteError` function is a method of the `Ctx` struct in the Raiden framework. It is responsible
// for writing an error response to the HTTP response body.
func (c *Ctx) WriteError(err error) {
c.Response.Header.SetContentType("application/json")
if errResponse, ok := err.(*ErrorResponse); ok {
responseByte, errMarshall := json.Marshal(errResponse)
if errMarshall == nil {
c.Response.SetStatusCode(errResponse.StatusCode)
c.Response.AppendBody(responseByte)
return
}
err = errMarshall
}
c.Response.SetStatusCode(fasthttp.StatusInternalServerError)
c.Response.AppendBodyString(err.Error())
}
// The `Write` function is a method of the `Ctx` struct in the Raiden framework. It is responsible for
// writing the response body to the HTTP response.
func (c *Ctx) Write(data []byte) {
c.Response.SetStatusCode(fasthttp.StatusOK)
c.Response.AppendBody(data)
}