Skip to content

Commit 5731bb3

Browse files
authored
feat: add reactor http client (#23)
1 parent e2e84aa commit 5731bb3

File tree

3 files changed

+354
-0
lines changed

3 files changed

+354
-0
lines changed

hc/hc.go

+173
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
package hc
2+
3+
import (
4+
"context"
5+
"io"
6+
"net/http"
7+
8+
"github.com/jjeffcaii/reactor-go/mono"
9+
)
10+
11+
// DefaultClient is the default Client.
12+
var DefaultClient = NewClient(http.DefaultClient)
13+
14+
// Option is the type of http client options.
15+
type Option func(*option)
16+
17+
// Mono is alias of mono.Mono.
18+
type Mono = mono.Mono
19+
20+
// Client is a reactor style http client which returns a Mono.
21+
type Client interface {
22+
// Do sends an HTTP request and returns a response Mono.
23+
Do(req *http.Request, opts ...Option) Mono
24+
// Get sends a GET request and returns a response Mono.
25+
Get(url string, opts ...Option) Mono
26+
// Delete sends a DELETE request and returns a response Mono.
27+
Delete(url string, opts ...Option) Mono
28+
// Post sends a POST request and returns a response Mono.
29+
Post(url string, contentType string, body io.Reader, opts ...Option) Mono
30+
// Put sends a PUT request and returns a response Mono.
31+
Put(url string, contentType string, body io.Reader, opts ...Option) Mono
32+
// Patch sends a PATCH request and returns a response Mono.
33+
Patch(url string, contentType string, body io.Reader, opts ...Option) Mono
34+
}
35+
36+
// Do sends an HTTP request and returns a response Mono.
37+
func Do(req *http.Request, opts ...Option) Mono {
38+
return DefaultClient.Do(req, opts...)
39+
}
40+
41+
// Get sends a GET request and returns a response Mono.
42+
func Get(url string, opts ...Option) Mono {
43+
return DefaultClient.Get(url, opts...)
44+
}
45+
46+
// Delete sends a DELETE request and returns a response Mono.
47+
func Delete(url string, opts ...Option) Mono {
48+
return DefaultClient.Delete(url, opts...)
49+
}
50+
51+
// Post sends a POST request and returns a response Mono.
52+
func Post(url string, contentType string, body io.Reader, opts ...Option) Mono {
53+
return DefaultClient.Post(url, contentType, body, opts...)
54+
}
55+
56+
// Put sends a PUT request and returns a response Mono.
57+
func Put(url string, contentType string, body io.Reader, opts ...Option) Mono {
58+
return DefaultClient.Put(url, contentType, body, opts...)
59+
}
60+
61+
// Patch sends a PATCH request and returns a response Mono.
62+
func Patch(url string, contentType string, body io.Reader, opts ...Option) Mono {
63+
return DefaultClient.Patch(url, contentType, body, opts...)
64+
}
65+
66+
// NewClient creates a new Client.
67+
func NewClient(cli *http.Client, defaultOptions ...Option) Client {
68+
return &client{
69+
c: cli,
70+
opts: defaultOptions,
71+
}
72+
}
73+
74+
var _ Client = (*client)(nil)
75+
76+
type client struct {
77+
c *http.Client
78+
opts []Option
79+
}
80+
81+
func (h *client) Do(req *http.Request, opts ...Option) Mono {
82+
gen := func(ctx context.Context) (*http.Request, error) {
83+
return req.WithContext(ctx), nil
84+
}
85+
return h.execute(gen, opts...)
86+
}
87+
88+
func (h *client) Get(url string, opts ...Option) Mono {
89+
return h.customExecute(http.MethodGet, url, "", nil, opts...)
90+
}
91+
92+
func (h *client) Delete(url string, opts ...Option) Mono {
93+
return h.customExecute(http.MethodDelete, url, "", nil, opts...)
94+
}
95+
96+
func (h *client) Post(url string, contentType string, body io.Reader, opts ...Option) Mono {
97+
return h.customExecute(http.MethodPost, url, contentType, body, opts...)
98+
}
99+
100+
func (h *client) Put(url string, contentType string, body io.Reader, opts ...Option) Mono {
101+
return h.customExecute(http.MethodPut, url, contentType, body, opts...)
102+
}
103+
104+
func (h *client) Patch(url string, contentType string, body io.Reader, opts ...Option) Mono {
105+
return h.customExecute(http.MethodPatch, url, contentType, body, opts...)
106+
}
107+
108+
func (h *client) customExecute(method string, url string, contentType string, body io.Reader, opts ...Option) Mono {
109+
return h.execute(func(ctx context.Context) (*http.Request, error) {
110+
req, err := http.NewRequestWithContext(ctx, method, url, body)
111+
if err != nil {
112+
return nil, err
113+
}
114+
if len(contentType) > 0 {
115+
req.Header.Set("Content-Type", contentType)
116+
}
117+
return req, nil
118+
}, opts...)
119+
}
120+
121+
func (h *client) execute(gen func(ctx context.Context) (*http.Request, error), opts ...Option) Mono {
122+
return mono.Create(func(ctx context.Context, s mono.Sink) {
123+
var (
124+
opt *option
125+
req *http.Request
126+
err error
127+
)
128+
129+
// init option
130+
if len(opts)+len(h.opts) > 0 {
131+
opt = new(option)
132+
for _, it := range h.opts {
133+
it(opt)
134+
}
135+
for _, it := range opts {
136+
it(opt)
137+
}
138+
}
139+
140+
req, err = gen(ctx)
141+
if err != nil {
142+
s.Error(err)
143+
return
144+
}
145+
146+
if opt != nil && len(opt.reqChain) > 0 {
147+
for _, it := range opt.reqChain {
148+
it(req)
149+
}
150+
}
151+
152+
var res *http.Response
153+
res, err = h.c.Do(req)
154+
if err != nil {
155+
s.Error(err)
156+
return
157+
}
158+
159+
if opt == nil || opt.bodyParser == nil {
160+
s.Success(res)
161+
return
162+
}
163+
164+
var body interface{}
165+
body, err = opt.bodyParser(res)
166+
if err != nil {
167+
s.Error(err)
168+
return
169+
}
170+
171+
s.Success(body)
172+
})
173+
}

hc/hc_test.go

+95
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
package hc_test
2+
3+
import (
4+
"context"
5+
"io"
6+
"net/http"
7+
"testing"
8+
9+
"github.com/jjeffcaii/reactor-go"
10+
"github.com/jjeffcaii/reactor-go/hc"
11+
"github.com/jjeffcaii/reactor-go/mono"
12+
"github.com/jjeffcaii/reactor-go/scheduler"
13+
"github.com/jjeffcaii/reactor-go/tuple"
14+
"github.com/stretchr/testify/assert"
15+
)
16+
17+
var httpBinUrl = "https://httpbin.org/anything"
18+
19+
func TestClient_Get(t *testing.T) {
20+
v, err := hc.Get("http://ipecho.net", hc.WithStringResponse()).Block(context.Background())
21+
assert.NoError(t, err, "request failed")
22+
assert.NotEmpty(t, v.(string), "should not be empty")
23+
24+
cli := hc.NewClient(http.DefaultClient, hc.WithStringResponse())
25+
26+
v, err = mono.
27+
Zip(
28+
cli.Get("http://ipecho.net/plain").SubscribeOn(scheduler.Parallel()),
29+
cli.Get("http://ipecho.net/json").SubscribeOn(scheduler.Parallel()),
30+
cli.Get("http://ipecho.net/xml").SubscribeOn(scheduler.Parallel()),
31+
).
32+
Block(context.Background())
33+
assert.NoError(t, err)
34+
35+
tup := v.(tuple.Tuple)
36+
37+
tup.ForEachWithIndex(func(v reactor.Any, e error, i int) bool {
38+
t.Logf("#%02d: value=%v, error=%v", i, v, e)
39+
return true
40+
})
41+
42+
_, err = tup.Last()
43+
assert.Error(t, err)
44+
}
45+
46+
type HttpBin struct {
47+
Method string `json:"method"`
48+
Url string `json:"url"`
49+
Headers map[string]string `json:"headers"`
50+
}
51+
52+
func TestWithJSONBody(t *testing.T) {
53+
fakeHeaderKey := "X-Fake-Header"
54+
fakeHeaderValue := "fake value"
55+
v, err := hc.DefaultClient.
56+
Get(httpBinUrl, hc.WithRequestHijack(func(req *http.Request) {
57+
req.Header.Set(fakeHeaderKey, fakeHeaderValue)
58+
}), hc.WithJSONResponse(func() interface{} {
59+
return &HttpBin{}
60+
})).
61+
Block(context.Background())
62+
assert.NoError(t, err)
63+
assert.Equal(t, http.MethodGet, v.(*HttpBin).Method)
64+
assert.Equal(t, fakeHeaderValue, v.(*HttpBin).Headers[fakeHeaderKey])
65+
}
66+
67+
func TestRequests(t *testing.T) {
68+
validate := func(v interface{}, err error) {
69+
assert.NoError(t, err)
70+
_, ok := v.(*http.Response)
71+
assert.True(t, ok)
72+
}
73+
74+
for _, next := range []func(string, string, io.Reader, ...hc.Option) mono.Mono{
75+
hc.Post, hc.Put, hc.Patch,
76+
} {
77+
v, err := next(httpBinUrl, "application/json", nil).Block(context.Background())
78+
validate(v, err)
79+
}
80+
validate(hc.Get(httpBinUrl).Block(context.Background()))
81+
validate(hc.Delete(httpBinUrl).Block(context.Background()))
82+
}
83+
84+
func TestClient_Do(t *testing.T) {
85+
req, _ := http.NewRequest(http.MethodGet, httpBinUrl, nil)
86+
v, err := hc.Do(req, hc.WithStringResponse()).Block(context.Background())
87+
assert.NoError(t, err, "should not return error")
88+
assert.NotEmpty(t, v, "should not be empty")
89+
}
90+
91+
func TestDo_Failed(t *testing.T) {
92+
req, _ := http.NewRequest(http.MethodGet, "http://127.0.0.1/not-exists-path", nil)
93+
_, err := hc.Do(req).Block(context.Background())
94+
assert.Error(t, err)
95+
}

hc/option.go

+86
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package hc
2+
3+
import (
4+
"encoding/json"
5+
"errors"
6+
"io/ioutil"
7+
"net/http"
8+
)
9+
10+
type BodyParser func(*http.Response) (interface{}, error)
11+
12+
// WithStringResponse decodes the response body as string.
13+
func WithStringResponse() Option {
14+
return WithResponseBodyParser(func(res *http.Response) (interface{}, error) {
15+
defer func() {
16+
if res.Body != nil {
17+
_ = res.Body.Close()
18+
}
19+
}()
20+
21+
if !isSuccessful(res.StatusCode) {
22+
b, err := ioutil.ReadAll(res.Body)
23+
if err == nil {
24+
err = errors.New(string(b))
25+
}
26+
return nil, err
27+
}
28+
29+
b, err := ioutil.ReadAll(res.Body)
30+
if err != nil {
31+
return nil, err
32+
}
33+
return string(b), nil
34+
})
35+
}
36+
37+
// WithJSONResponse decodes the response body as json.
38+
func WithJSONResponse(gen func() (ptr interface{})) Option {
39+
return func(o *option) {
40+
o.bodyParser = func(res *http.Response) (interface{}, error) {
41+
defer func() {
42+
if res.Body != nil {
43+
_ = res.Body.Close()
44+
}
45+
}()
46+
47+
if !isSuccessful(res.StatusCode) {
48+
b, err := ioutil.ReadAll(res.Body)
49+
if err != nil {
50+
return nil, err
51+
}
52+
return nil, errors.New(string(b))
53+
}
54+
55+
value := gen()
56+
57+
if err := json.NewDecoder(res.Body).Decode(value); err != nil {
58+
return nil, err
59+
}
60+
return value, nil
61+
}
62+
}
63+
}
64+
65+
// WithRequestHijack can be used to customize the http request.
66+
func WithRequestHijack(h func(*http.Request)) Option {
67+
return func(o *option) {
68+
o.reqChain = append(o.reqChain, h)
69+
}
70+
}
71+
72+
// WithResponseBodyParser set the response body parser.
73+
func WithResponseBodyParser(bp BodyParser) Option {
74+
return func(o *option) {
75+
o.bodyParser = bp
76+
}
77+
}
78+
79+
type option struct {
80+
bodyParser BodyParser
81+
reqChain []func(*http.Request)
82+
}
83+
84+
func isSuccessful(code int) bool {
85+
return code > 199 && code < 300
86+
}

0 commit comments

Comments
 (0)