Skip to content
This repository was archived by the owner on Feb 8, 2021. It is now read-only.

Commit 8a7e67d

Browse files
authored
Merge pull request #744 from teawater/buffer
buffer: Handle the control operations (ContainerCreate) asynchronously
2 parents 900bc20 + 3b54092 commit 8a7e67d

File tree

9 files changed

+336
-25
lines changed

9 files changed

+336
-25
lines changed

daemon/buffer/buffer.go

+98
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
package buffer
2+
3+
import (
4+
"fmt"
5+
"sync"
6+
7+
"github.com/golang/glog"
8+
"github.com/hyperhq/hyperd/daemon/pod"
9+
apitypes "github.com/hyperhq/hyperd/types"
10+
)
11+
12+
type Buffer struct {
13+
goroutinesLimit uint64
14+
goroutinesLock sync.Mutex
15+
ch chan *pod.ContainerBuffer
16+
}
17+
18+
const (
19+
DefaultBufferChannelSize = 1024
20+
)
21+
22+
func NewBuffer(cfg *apitypes.HyperConfig) *Buffer {
23+
if cfg.BufferGoroutinesMax == 0 {
24+
return nil
25+
}
26+
if cfg.BufferChannelSize == 0 {
27+
cfg.BufferChannelSize = DefaultBufferChannelSize
28+
}
29+
30+
daemon := &Buffer{
31+
goroutinesLimit: cfg.BufferGoroutinesMax,
32+
ch: make(chan *pod.ContainerBuffer, cfg.BufferChannelSize),
33+
}
34+
35+
return daemon
36+
}
37+
38+
func (b *Buffer) CreateContainerInPod(p *pod.XPod, c *apitypes.UserContainer) (string, error) {
39+
if !p.IsAlive() {
40+
err := fmt.Errorf("pod is not running")
41+
p.Log(pod.ERROR, "%v", err)
42+
return "", err
43+
}
44+
if err := p.ReserveContainerName(c); err != nil {
45+
return "", err
46+
}
47+
48+
cb, err := p.AddContainerBuffer(c)
49+
if err != nil {
50+
return "", err
51+
}
52+
53+
b.goroutinesLock.Lock()
54+
defer b.goroutinesLock.Unlock()
55+
if b.goroutinesLimit != 0 {
56+
b.goroutinesLimit--
57+
go b.containerHandler(cb)
58+
glog.V(3).Infof("Put %+v to containerHandler, current limit %v", c, b.goroutinesLimit)
59+
} else {
60+
select {
61+
case b.ch <- cb:
62+
glog.V(3).Infof("Put %+v to channel", c)
63+
default:
64+
err := fmt.Errorf("%+v dropped because channel is full", c)
65+
glog.Errorf("%s", err)
66+
p.RemoveContainerBufferAll(cb)
67+
return "", err
68+
}
69+
}
70+
71+
return cb.Id, nil
72+
}
73+
74+
func (b *Buffer) containerHandler(cb *pod.ContainerBuffer) {
75+
loop:
76+
for {
77+
glog.V(3).Infof("Buffer begin to handle %+v", cb)
78+
id, err := cb.P.DoContainerCreate(cb.Spec, cb.Id)
79+
if err == nil {
80+
cb.P.RemoveContainerBuffer(cb)
81+
glog.V(3).Infof("Buffer handle %+v done, new id is %s", cb, id)
82+
} else {
83+
cb.P.RemoveContainerBufferAll(cb)
84+
glog.Errorf("Buffer handle %+v failed %v", cb, err)
85+
}
86+
87+
b.goroutinesLock.Lock()
88+
select {
89+
case cb = <-b.ch:
90+
b.goroutinesLock.Unlock()
91+
default:
92+
defer b.goroutinesLock.Unlock()
93+
b.goroutinesLimit++
94+
break loop
95+
}
96+
}
97+
glog.V(3).Infof("Channel is empty, current limit %v", b.goroutinesLimit)
98+
}

daemon/daemon.go

+4
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"path"
88
"strings"
99

10+
"github.com/hyperhq/hyperd/daemon/buffer"
1011
"github.com/hyperhq/hyperd/daemon/daemondb"
1112
"github.com/hyperhq/hyperd/daemon/pod"
1213
"github.com/hyperhq/hyperd/networking/portmapping"
@@ -41,6 +42,8 @@ type Daemon struct {
4142
Storage Storage
4243
Hypervisor string
4344
DefaultLog *pod.GlobalLogConfig
45+
46+
buffer *buffer.Buffer
4447
}
4548

4649
func (daemon *Daemon) Restore() error {
@@ -117,6 +120,7 @@ func NewDaemon(cfg *apitypes.HyperConfig) (*Daemon, error) {
117120
db: db,
118121
PodList: pod.NewPodList(),
119122
Host: cfg.Host,
123+
buffer: buffer.NewBuffer(cfg),
120124
}
121125

122126
daemon.Daemon, err = docker.NewDaemon(dockerCfg, registryCfg)

daemon/list.go

+2
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ func (daemon *Daemon) ListContainers(podId, vmId string) ([]*apitypes.ContainerL
5555
result = append(result, status)
5656
}
5757
}
58+
result = p.AppendContainerBufferStatus(result)
5859
}
5960
return result, nil
6061
}
@@ -118,6 +119,7 @@ func (daemon *Daemon) List(item, podId, vmId string) (map[string][]string, error
118119
if status != "" {
119120
containerJsonResponse = append(containerJsonResponse, status)
120121
}
122+
containerJsonResponse = p.AppendContainerBufferStatusString(containerJsonResponse)
121123
}
122124
}
123125
}

daemon/pod/buffer.go

+133
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
package pod
2+
3+
import (
4+
"github.com/docker/docker/pkg/stringid"
5+
apitypes "github.com/hyperhq/hyperd/types"
6+
"strings"
7+
)
8+
9+
type ContainerBuffer struct {
10+
Id string
11+
P *XPod
12+
Spec *apitypes.UserContainer
13+
}
14+
15+
func (cb *ContainerBuffer) info() *apitypes.Container {
16+
cinfo := &apitypes.Container{
17+
Name: "/" + cb.Spec.Name,
18+
ContainerID: cb.Id,
19+
Image: cb.Spec.Image,
20+
Commands: cb.Spec.Command,
21+
WorkingDir: cb.Spec.Workdir,
22+
Labels: cb.Spec.Labels,
23+
Ports: make([]*apitypes.ContainerPort, 0, len(cb.Spec.Ports)),
24+
VolumeMounts: make([]*apitypes.VolumeMount, 0, len(cb.Spec.Volumes)),
25+
Env: make([]*apitypes.EnvironmentVar, 0, len(cb.Spec.Envs)),
26+
Tty: cb.Spec.Tty,
27+
ImagePullPolicy: "",
28+
}
29+
for _, port := range cb.Spec.Ports {
30+
cinfo.Ports = append(cinfo.Ports, &apitypes.ContainerPort{
31+
HostPort: port.HostPort,
32+
ContainerPort: port.ContainerPort,
33+
Protocol: port.Protocol,
34+
})
35+
}
36+
for _, vol := range cb.Spec.Volumes {
37+
cinfo.VolumeMounts = append(cinfo.VolumeMounts, &apitypes.VolumeMount{
38+
Name: vol.Volume,
39+
MountPath: vol.Path,
40+
ReadOnly: vol.ReadOnly,
41+
})
42+
}
43+
for _, env := range cb.Spec.Envs {
44+
cinfo.Env = append(cinfo.Env, &apitypes.EnvironmentVar{
45+
Env: env.Env,
46+
Value: env.Value,
47+
})
48+
}
49+
return cinfo
50+
}
51+
52+
func (cb *ContainerBuffer) infoStatus() *apitypes.ContainerStatus {
53+
s := &apitypes.ContainerStatus{
54+
Name: cb.Spec.Name,
55+
ContainerID: cb.Id,
56+
Waiting: &apitypes.WaitingStatus{Reason: "Pending"},
57+
Running: &apitypes.RunningStatus{StartedAt: ""},
58+
Terminated: &apitypes.TermStatus{},
59+
Phase: "pending",
60+
}
61+
return s
62+
}
63+
64+
func (p *XPod) AddContainerBuffer(c *apitypes.UserContainer) (*ContainerBuffer, error) {
65+
cid := stringid.GenerateNonCryptoID()
66+
67+
cb := &ContainerBuffer{
68+
Id: cid,
69+
P: p,
70+
Spec: c,
71+
}
72+
p.statusLock.Lock()
73+
p.containerBuffers[cid] = cb
74+
p.statusLock.Unlock()
75+
76+
err := p.factory.registry.ReserveContainer(cid, c.Name, p.Id())
77+
if err != nil {
78+
p.RemoveContainerBuffer(cb)
79+
return nil, err
80+
}
81+
82+
return cb, nil
83+
}
84+
85+
func (p *XPod) RemoveContainerBuffer(cb *ContainerBuffer) {
86+
p.statusLock.Lock()
87+
defer p.statusLock.Unlock()
88+
delete(p.containerBuffers, cb.Id)
89+
}
90+
91+
func (p *XPod) RemoveContainerBufferAll(cb *ContainerBuffer) {
92+
p.factory.registry.ReleaseContainer(cb.Id, cb.Spec.Name)
93+
94+
p.RemoveContainerBuffer(cb)
95+
}
96+
97+
func (p *XPod) AppendContainerBufferStatus(result []*apitypes.ContainerListResult) []*apitypes.ContainerListResult {
98+
p.statusLock.RLock()
99+
defer p.statusLock.RUnlock()
100+
for id, cb := range p.containerBuffers {
101+
result = append(result, &apitypes.ContainerListResult{
102+
ContainerID: id,
103+
ContainerName: cb.Spec.Name,
104+
PodID: p.Id(),
105+
Status: "pending",
106+
})
107+
}
108+
return result
109+
}
110+
111+
func (p *XPod) AppendContainerBufferStatusString(result []string) []string {
112+
p.statusLock.RLock()
113+
defer p.statusLock.RUnlock()
114+
for id, cb := range p.containerBuffers {
115+
result = append(result, strings.Join([]string{id, cb.Spec.Name, p.Id(), "pending"}, ":"))
116+
}
117+
return result
118+
}
119+
120+
func (p *XPod) ContainerBufferInfo(cid string) *apitypes.ContainerInfo {
121+
p.statusLock.RLock()
122+
defer p.statusLock.RUnlock()
123+
if cb, ok := p.containerBuffers[cid]; ok {
124+
//Not set CreatedAt
125+
ci := &apitypes.ContainerInfo{
126+
PodID: p.Id(),
127+
Container: cb.info(),
128+
Status: cb.infoStatus(),
129+
}
130+
return ci
131+
}
132+
return nil
133+
}

daemon/pod/pod.go

+15-1
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@ type XPod struct {
7878
// on it too.
7979
stoppedChan chan bool
8080
initCond *sync.Cond
81+
82+
containerBuffers map[string]*ContainerBuffer
8183
}
8284

8385
// The Log infrastructure, to add pod name as prefix of the log message.
@@ -291,6 +293,12 @@ func (p *XPod) ContainerInfo(cid string) (*apitypes.ContainerInfo, error) {
291293
}
292294
return ci, nil
293295
}
296+
297+
ci := p.ContainerBufferInfo(cid)
298+
if ci != nil {
299+
return ci, nil
300+
}
301+
294302
err := fmt.Errorf("container %s does not existing", cid)
295303
p.Log(ERROR, err)
296304
return nil, err
@@ -343,7 +351,7 @@ func (p *XPod) updatePodInfo() error {
343351
defer p.statusLock.RUnlock()
344352

345353
var (
346-
containers = make([]*apitypes.Container, 0, len(p.containers))
354+
containers = make([]*apitypes.Container, 0, len(p.containers)+len(p.containerBuffers))
347355
volumes = make([]*apitypes.PodVolume, 0, len(p.volumes))
348356
containerStatus = make([]*apitypes.ContainerStatus, 0, len(p.containers))
349357
)
@@ -365,6 +373,12 @@ func (p *XPod) updatePodInfo() error {
365373
succeeeded = "Failed"
366374
}
367375
}
376+
for _, cb := range p.containerBuffers {
377+
ci := cb.info()
378+
cs := cb.infoStatus()
379+
containers = append(containers, ci)
380+
containerStatus = append(containerStatus, cs)
381+
}
368382
p.info.Spec.Containers = containers
369383
p.info.Status.ContainerStatus = containerStatus
370384

daemon/pod/podlist.go

+14
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,20 @@ func (pl *PodList) ReserveContainer(id, name, pod string) error {
7171
return nil
7272
}
7373

74+
func (pl *PodList) ChangeContainerId(oldId, newId, pod string) error {
75+
pl.mu.Lock()
76+
defer pl.mu.Unlock()
77+
if _, ok := pl.pods[pod]; !ok {
78+
return fmt.Errorf("pod %s not exist for Changing container %s", pod, oldId)
79+
}
80+
if pn, ok := pl.containers[newId]; ok && pn != pod {
81+
return fmt.Errorf("the container id %s has already taken by pod %s", newId, pn)
82+
}
83+
delete(pl.containers, oldId)
84+
pl.containers[newId] = pod
85+
return nil
86+
}
87+
7488
func (pl *PodList) ReservePod(p *XPod) error {
7589
pl.mu.Lock()
7690
defer pl.mu.Unlock()

0 commit comments

Comments
 (0)