Skip to content

Commit 602a2ea

Browse files
julienstraefiker
authored andcommitted
Adds mirroring service
1 parent fd24b18 commit 602a2ea

File tree

10 files changed

+465
-10
lines changed

10 files changed

+465
-10
lines changed

docs/content/routing/services/index.md

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,53 @@ http:
353353
- url: "http://private-ip-server-2/"
354354
```
355355
356+
### Mirroring (service)
357+
358+
The mirroring is able to mirror requests sent to a service to other services.
359+
360+
This strategy can be defined only with [File](../../providers/file.md).
361+
362+
```toml tab="TOML"
363+
[http.services]
364+
[http.services.mirroring]
365+
[http.services.mirroring.mirroring]
366+
service = "app"
367+
[[http.services.mirroring.mirroring.mirrors]]
368+
name = "mirror"
369+
percent = 10
370+
371+
[http.services.app]
372+
[http.services.app.loadBalancer]
373+
[[http.services.appv1.loadBalancer.servers]]
374+
url = "http://private-ip-server-1/"
375+
376+
[http.services.mirror]
377+
[http.services.mirror.loadBalancer]
378+
[[http.services.mirror.loadBalancer.servers]]
379+
url = "http://private-ip-server-2/"
380+
```
381+
382+
```yaml tab="YAML"
383+
http:
384+
services:
385+
mirroring:
386+
mirroring:
387+
service: app
388+
mirrors:
389+
- name: mirror
390+
percent: 10
391+
392+
app:
393+
loadBalancer:
394+
servers:
395+
- url: "http://private-ip-server-1/"
396+
397+
mirror:
398+
loadBalancer:
399+
servers:
400+
- url: "http://private-ip-server-2/"
401+
```
402+
356403
## Configuring TCP Services
357404
358405
### General

integration/fixtures/mirror.toml

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
[global]
2+
checkNewVersion = false
3+
sendAnonymousUsage = false
4+
5+
[api]
6+
7+
[log]
8+
level = "DEBUG"
9+
10+
[entryPoints]
11+
12+
[entryPoints.web]
13+
address = ":8000"
14+
15+
[providers.file]
16+
filename = "{{ .SelfFilename }}"
17+
18+
## dynamic configuration ##
19+
20+
[http.routers]
21+
[http.routers.router]
22+
service = "mirror"
23+
rule = "Path(`/whoami`)"
24+
25+
[http.services]
26+
[http.services.mirror.mirroring]
27+
service = "service1"
28+
[[http.services.mirror.mirroring.mirrors]]
29+
name = "mirror1"
30+
percent = 10
31+
[[http.services.mirror.mirroring.mirrors]]
32+
name = "mirror2"
33+
percent = 50
34+
35+
[http.services.service1.loadBalancer]
36+
[[http.services.service1.loadBalancer.servers]]
37+
url = "{{ .MainServer }}"
38+
[http.services.mirror1.loadBalancer]
39+
[[http.services.mirror1.loadBalancer.servers]]
40+
url = "{{ .Mirror1Server }}"
41+
[http.services.mirror2.loadBalancer]
42+
[[http.services.mirror2.loadBalancer.servers]]
43+
url = "{{ .Mirror2Server }}"
44+

integration/simple_test.go

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,15 @@ package integration
22

33
import (
44
"bytes"
5+
"context"
56
"encoding/json"
67
"fmt"
78
"io/ioutil"
89
"net/http"
910
"net/http/httptest"
1011
"os"
1112
"strings"
13+
"sync/atomic"
1214
"syscall"
1315
"time"
1416

@@ -671,3 +673,111 @@ func (s *SimpleSuite) TestWRRSticky(c *check.C) {
671673
c.Assert(repartition[server1], checker.Equals, 4)
672674
c.Assert(repartition[server2], checker.Equals, 0)
673675
}
676+
677+
func (s *SimpleSuite) TestMirror(c *check.C) {
678+
var count, countMirror1, countMirror2 int32
679+
680+
main := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
681+
atomic.AddInt32(&count, 1)
682+
}))
683+
684+
mirror1 := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
685+
atomic.AddInt32(&countMirror1, 1)
686+
}))
687+
688+
mirror2 := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
689+
atomic.AddInt32(&countMirror2, 1)
690+
}))
691+
692+
mainServer := main.URL
693+
mirror1Server := mirror1.URL
694+
mirror2Server := mirror2.URL
695+
696+
file := s.adaptFile(c, "fixtures/mirror.toml", struct {
697+
MainServer string
698+
Mirror1Server string
699+
Mirror2Server string
700+
}{MainServer: mainServer, Mirror1Server: mirror1Server, Mirror2Server: mirror2Server})
701+
defer os.Remove(file)
702+
703+
cmd, output := s.traefikCmd(withConfigFile(file))
704+
defer output(c)
705+
706+
err := cmd.Start()
707+
c.Assert(err, checker.IsNil)
708+
defer cmd.Process.Kill()
709+
710+
err = try.GetRequest("http://127.0.0.1:8080/api/http/services", 1000*time.Millisecond, try.BodyContains("mirror1", "mirror2", "service1"))
711+
c.Assert(err, checker.IsNil)
712+
713+
req, err := http.NewRequest(http.MethodGet, "http://127.0.0.1:8000/whoami", nil)
714+
c.Assert(err, checker.IsNil)
715+
for i := 0; i < 10; i++ {
716+
response, err := http.DefaultClient.Do(req)
717+
c.Assert(err, checker.IsNil)
718+
c.Assert(response.StatusCode, checker.Equals, http.StatusOK)
719+
}
720+
721+
countTotal := atomic.LoadInt32(&count)
722+
val1 := atomic.LoadInt32(&countMirror1)
723+
val2 := atomic.LoadInt32(&countMirror2)
724+
725+
c.Assert(countTotal, checker.Equals, int32(10))
726+
c.Assert(val1, checker.Equals, int32(1))
727+
c.Assert(val2, checker.Equals, int32(5))
728+
}
729+
730+
func (s *SimpleSuite) TestMirrorCanceled(c *check.C) {
731+
var count, countMirror1, countMirror2 int32
732+
733+
main := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
734+
atomic.AddInt32(&count, 1)
735+
time.Sleep(time.Second * 2)
736+
}))
737+
738+
mirror1 := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
739+
atomic.AddInt32(&countMirror1, 1)
740+
}))
741+
742+
mirror2 := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
743+
atomic.AddInt32(&countMirror2, 1)
744+
}))
745+
746+
mainServer := main.URL
747+
mirror1Server := mirror1.URL
748+
mirror2Server := mirror2.URL
749+
750+
file := s.adaptFile(c, "fixtures/mirror.toml", struct {
751+
MainServer string
752+
Mirror1Server string
753+
Mirror2Server string
754+
}{MainServer: mainServer, Mirror1Server: mirror1Server, Mirror2Server: mirror2Server})
755+
defer os.Remove(file)
756+
757+
cmd, output := s.traefikCmd(withConfigFile(file))
758+
defer output(c)
759+
760+
err := cmd.Start()
761+
c.Assert(err, checker.IsNil)
762+
defer cmd.Process.Kill()
763+
764+
err = try.GetRequest("http://127.0.0.1:8080/api/http/services", 1000*time.Millisecond, try.BodyContains("mirror1", "mirror2", "service1"))
765+
c.Assert(err, checker.IsNil)
766+
767+
for i := 0; i < 5; i++ {
768+
req, err := http.NewRequest(http.MethodGet, "http://127.0.0.1:8000/whoami", nil)
769+
c.Assert(err, checker.IsNil)
770+
771+
newCtx, _ := context.WithTimeout(req.Context(), time.Second)
772+
req = req.WithContext(newCtx)
773+
http.DefaultClient.Do(req)
774+
}
775+
776+
countTotal := atomic.LoadInt32(&count)
777+
val1 := atomic.LoadInt32(&countMirror1)
778+
val2 := atomic.LoadInt32(&countMirror2)
779+
780+
c.Assert(countTotal, checker.Equals, int32(5))
781+
c.Assert(val1, checker.Equals, int32(0))
782+
c.Assert(val2, checker.Equals, int32(0))
783+
}

pkg/config/dynamic/http_config.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ type HTTPConfiguration struct {
2121
type Service struct {
2222
LoadBalancer *ServersLoadBalancer `json:"loadBalancer,omitempty" toml:"loadBalancer,omitempty" yaml:"loadBalancer,omitempty"`
2323
Weighted *WeightedRoundRobin `json:"weighted,omitempty" toml:"weighted,omitempty" yaml:"weighted,omitempty" label:"-"`
24+
Mirroring *Mirroring `json:"mirroring,omitempty" toml:"mirroring,omitempty" yaml:"mirroring,omitempty" label:"-"`
2425
}
2526

2627
// +k8s:deepcopy-gen=true
@@ -46,6 +47,22 @@ type RouterTLSConfig struct {
4647

4748
// +k8s:deepcopy-gen=true
4849

50+
// Mirroring holds the Mirroring configuration.
51+
type Mirroring struct {
52+
Service string `json:"service,omitempty" toml:"service,omitempty" yaml:"service,omitempty"`
53+
Mirrors []MirrorService `json:"mirrors,omitempty" toml:"mirrors,omitempty" yaml:"mirrors,omitempty"`
54+
}
55+
56+
// +k8s:deepcopy-gen=true
57+
58+
// MirrorService holds the MirrorService configuration.
59+
type MirrorService struct {
60+
Name string `json:"name,omitempty" toml:"name,omitempty" yaml:"name,omitempty"`
61+
Percent int `json:"percent,omitempty" toml:"percent,omitempty" yaml:"percent,omitempty"`
62+
}
63+
64+
// +k8s:deepcopy-gen=true
65+
4966
// WeightedRoundRobin is a weighted round robin load-balancer of services.
5067
type WeightedRoundRobin struct {
5168
Services []WRRService `json:"services,omitempty" toml:"services,omitempty" yaml:"services,omitempty"`

pkg/server/router/router_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,7 @@ func TestRouterManager_Get(t *testing.T) {
306306
Middlewares: test.middlewaresConfig,
307307
},
308308
})
309-
serviceManager := service.NewManager(rtConf.Services, http.DefaultTransport, nil)
309+
serviceManager := service.NewManager(rtConf.Services, http.DefaultTransport, nil, nil)
310310
middlewaresBuilder := middleware.NewBuilder(rtConf.Middlewares, serviceManager)
311311
responseModifierFactory := responsemodifiers.NewBuilder(rtConf.Middlewares)
312312
routerManager := NewManager(rtConf, serviceManager, middlewaresBuilder, responseModifierFactory)
@@ -407,7 +407,7 @@ func TestAccessLog(t *testing.T) {
407407
Middlewares: test.middlewaresConfig,
408408
},
409409
})
410-
serviceManager := service.NewManager(rtConf.Services, http.DefaultTransport, nil)
410+
serviceManager := service.NewManager(rtConf.Services, http.DefaultTransport, nil, nil)
411411
middlewaresBuilder := middleware.NewBuilder(rtConf.Middlewares, serviceManager)
412412
responseModifierFactory := responsemodifiers.NewBuilder(rtConf.Middlewares)
413413
routerManager := NewManager(rtConf, serviceManager, middlewaresBuilder, responseModifierFactory)
@@ -693,7 +693,7 @@ func TestRuntimeConfiguration(t *testing.T) {
693693
Middlewares: test.middlewareConfig,
694694
},
695695
})
696-
serviceManager := service.NewManager(rtConf.Services, http.DefaultTransport, nil)
696+
serviceManager := service.NewManager(rtConf.Services, http.DefaultTransport, nil, nil)
697697
middlewaresBuilder := middleware.NewBuilder(rtConf.Middlewares, serviceManager)
698698
responseModifierFactory := responsemodifiers.NewBuilder(map[string]*runtime.MiddlewareInfo{})
699699
routerManager := NewManager(rtConf, serviceManager, middlewaresBuilder, responseModifierFactory)
@@ -767,7 +767,7 @@ func BenchmarkRouterServe(b *testing.B) {
767767
Middlewares: map[string]*dynamic.Middleware{},
768768
},
769769
})
770-
serviceManager := service.NewManager(rtConf.Services, &staticTransport{res}, nil)
770+
serviceManager := service.NewManager(rtConf.Services, &staticTransport{res}, nil, nil)
771771
middlewaresBuilder := middleware.NewBuilder(rtConf.Middlewares, serviceManager)
772772
responseModifierFactory := responsemodifiers.NewBuilder(rtConf.Middlewares)
773773
routerManager := NewManager(rtConf, serviceManager, middlewaresBuilder, responseModifierFactory)
@@ -808,7 +808,7 @@ func BenchmarkService(b *testing.B) {
808808
Services: serviceConfig,
809809
},
810810
})
811-
serviceManager := service.NewManager(rtConf.Services, &staticTransport{res}, nil)
811+
serviceManager := service.NewManager(rtConf.Services, &staticTransport{res}, nil, nil)
812812
w := httptest.NewRecorder()
813813
req := testhelpers.MustNewRequest(http.MethodGet, "http://foo.bar/", nil)
814814

pkg/server/server_configuration.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ func (s *Server) createTCPRouters(ctx context.Context, configuration *runtime.Co
9797

9898
// createHTTPHandlers returns, for the given configuration and entryPoints, the HTTP handlers for non-TLS connections, and for the TLS ones. the given configuration must not be nil. its fields will get mutated.
9999
func (s *Server) createHTTPHandlers(ctx context.Context, configuration *runtime.Configuration, entryPoints []string) (map[string]http.Handler, map[string]http.Handler) {
100-
serviceManager := service.NewManager(configuration.Services, s.defaultRoundTripper, s.metricsRegistry)
100+
serviceManager := service.NewManager(configuration.Services, s.defaultRoundTripper, s.metricsRegistry, s.routinesPool)
101101
middlewaresBuilder := middleware.NewBuilder(configuration.Middlewares, serviceManager)
102102
responseModifierFactory := responsemodifiers.NewBuilder(configuration.Middlewares)
103103
routerManager := router.NewManager(configuration, serviceManager, middlewaresBuilder, responseModifierFactory)

0 commit comments

Comments
 (0)