Skip to content

Commit

Permalink
Merge pull request kata-containers#3553 from fgiudici/kata-monitor_ca…
Browse files Browse the repository at this point in the history
…chefix

kata-monitor: simplify sandbox cache management and attach kubernetes POD metadata to metrics
  • Loading branch information
fidencio authored Feb 21, 2022
2 parents 031da99 + ab44728 commit 1e9f3c8
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 97 deletions.
53 changes: 29 additions & 24 deletions src/runtime/pkg/kata-monitor/cri.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ func getAddressAndDialer(endpoint string) (string, func(ctx context.Context, add

func getConnection(endPoint string) (*grpc.ClientConn, error) {
var conn *grpc.ClientConn
monitorLog.Debugf("connect using endpoint '%s' with '%s' timeout", endPoint, defaultTimeout)
addr, dialer, err := getAddressAndDialer(endPoint)
if err != nil {
return nil, err
Expand All @@ -51,7 +50,7 @@ func getConnection(endPoint string) (*grpc.ClientConn, error) {
errMsg := errors.Wrapf(err, "connect endpoint '%s', make sure you are running as root and the endpoint has been started", endPoint)
return nil, errMsg
}
monitorLog.Debugf("connected successfully using endpoint: %s", endPoint)
monitorLog.Tracef("connected successfully using endpoint: %s", endPoint)
return conn, nil
}

Expand Down Expand Up @@ -114,15 +113,15 @@ func parseEndpoint(endpoint string) (string, string, error) {
}
}

// getSandboxes gets ready sandboxes from the container engine and returns an updated sandboxMap
func (km *KataMonitor) getSandboxes(sandboxMap map[string]bool) (map[string]bool, error) {
newMap := make(map[string]bool)
// syncSandboxes gets pods metadata from the container manager and updates the sandbox cache.
func (km *KataMonitor) syncSandboxes(sandboxList []string) ([]string, error) {
runtimeClient, runtimeConn, err := getRuntimeClient(km.runtimeEndpoint)
if err != nil {
return newMap, err
return sandboxList, err
}
defer closeConnection(runtimeConn)

// TODO: if len(sandboxList) is 1, better we just runtimeClient.PodSandboxStatus(...) targeting the single sandbox
filter := &pb.PodSandboxFilter{
State: &pb.PodSandboxStateValue{
State: pb.PodSandboxState_SANDBOX_READY,
Expand All @@ -132,29 +131,35 @@ func (km *KataMonitor) getSandboxes(sandboxMap map[string]bool) (map[string]bool
request := &pb.ListPodSandboxRequest{
Filter: filter,
}
monitorLog.Debugf("ListPodSandboxRequest: %v", request)
monitorLog.Tracef("ListPodSandboxRequest: %v", request)
r, err := runtimeClient.ListPodSandbox(context.Background(), request)
if err != nil {
return newMap, err
return sandboxList, err
}
monitorLog.Debugf("ListPodSandboxResponse: %v", r)
monitorLog.Tracef("ListPodSandboxResponse: %v", r)

for _, pod := range r.Items {
// Use the cached data if available
if isKata, ok := sandboxMap[pod.Id]; ok {
newMap[pod.Id] = isKata
continue
for _, sandbox := range sandboxList {
if pod.Id == sandbox {
km.sandboxCache.setMetadata(sandbox, sandboxKubeData{
uid: pod.Metadata.Uid,
name: pod.Metadata.Name,
namespace: pod.Metadata.Namespace,
})

sandboxList = removeFromSandboxList(sandboxList, sandbox)

monitorLog.WithFields(logrus.Fields{
"Pod Name": pod.Metadata.Name,
"Pod Namespace": pod.Metadata.Namespace,
"Pod UID": pod.Metadata.Uid,
}).Debugf("Synced KATA POD %s", pod.Id)

break
}
}

// Check if a directory associated with the POD ID exist on the kata fs:
// if so we know that the POD is a kata one.
newMap[pod.Id] = checkSandboxFSExists(pod.Id)
monitorLog.WithFields(logrus.Fields{
"id": pod.Id,
"is kata": newMap[pod.Id],
"pod": pod,
}).Debug("")
}

return newMap, nil
// TODO: here we should mark the sandboxes we failed to retrieve info from: we should try a finite number of times
// to retrieve their metadata: if we fail resign and remove them from the sanbox cache (with a Warning log).
return sandboxList, nil
}
42 changes: 30 additions & 12 deletions src/runtime/pkg/kata-monitor/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func encodeMetricFamily(mfs []*dto.MetricFamily, encoder expfmt.Encoder) error {
// aggregateSandboxMetrics will get metrics from one sandbox and do some process
func (km *KataMonitor) aggregateSandboxMetrics(encoder expfmt.Encoder) error {
// get all kata sandboxes from cache
sandboxes := km.sandboxCache.getKataSandboxes()
sandboxes := km.sandboxCache.getSandboxList()
// save running kata pods as a metrics.
runningShimCount.Set(float64(len(sandboxes)))

Expand All @@ -156,21 +156,25 @@ func (km *KataMonitor) aggregateSandboxMetrics(encoder expfmt.Encoder) error {
// used to receive response
results := make(chan []*dto.MetricFamily, len(sandboxes))

monitorLog.WithField("sandbox_count", len(sandboxes)).Debugf("sandboxes count")
monitorLog.WithField("sandboxes count", len(sandboxes)).Debugf("aggregate sandbox metrics")

// get metrics from sandbox's shim
for _, sandboxID := range sandboxes {
sandboxMetadata, ok := km.sandboxCache.getMetadata(sandboxID)
if !ok { // likely the sandbox has been just removed
continue
}
wg.Add(1)
go func(sandboxID string, results chan<- []*dto.MetricFamily) {
sandboxMetrics, err := getParsedMetrics(sandboxID)
go func(sandboxID string, sandboxMetadata sandboxKubeData, results chan<- []*dto.MetricFamily) {
sandboxMetrics, err := getParsedMetrics(sandboxID, sandboxMetadata)
if err != nil {
monitorLog.WithError(err).WithField("sandbox_id", sandboxID).Errorf("failed to get metrics for sandbox")
}

results <- sandboxMetrics
wg.Done()
monitorLog.WithField("sandbox_id", sandboxID).Debug("job finished")
}(sandboxID, results)
}(sandboxID, sandboxMetadata, results)

monitorLog.WithField("sandbox_id", sandboxID).Debug("job started")
}
Expand Down Expand Up @@ -219,13 +223,13 @@ func (km *KataMonitor) aggregateSandboxMetrics(encoder expfmt.Encoder) error {

}

func getParsedMetrics(sandboxID string) ([]*dto.MetricFamily, error) {
func getParsedMetrics(sandboxID string, sandboxMetadata sandboxKubeData) ([]*dto.MetricFamily, error) {
body, err := doGet(sandboxID, defaultTimeout, "metrics")
if err != nil {
return nil, err
}

return parsePrometheusMetrics(sandboxID, body)
return parsePrometheusMetrics(sandboxID, sandboxMetadata, body)
}

// GetSandboxMetrics will get sandbox's metrics from shim
Expand All @@ -240,7 +244,7 @@ func GetSandboxMetrics(sandboxID string) (string, error) {

// parsePrometheusMetrics will decode metrics from Prometheus text format
// and return array of *dto.MetricFamily with an ASC order
func parsePrometheusMetrics(sandboxID string, body []byte) ([]*dto.MetricFamily, error) {
func parsePrometheusMetrics(sandboxID string, sandboxMetadata sandboxKubeData, body []byte) ([]*dto.MetricFamily, error) {
reader := bytes.NewReader(body)
decoder := expfmt.NewDecoder(reader, expfmt.FmtText)

Expand All @@ -258,10 +262,24 @@ func parsePrometheusMetrics(sandboxID string, body []byte) ([]*dto.MetricFamily,
metricList := mf.Metric
for j := range metricList {
metric := metricList[j]
metric.Label = append(metric.Label, &dto.LabelPair{
Name: mutils.String2Pointer("sandbox_id"),
Value: mutils.String2Pointer(sandboxID),
})
metric.Label = append(metric.Label,
&dto.LabelPair{
Name: mutils.String2Pointer("sandbox_id"),
Value: mutils.String2Pointer(sandboxID),
},
&dto.LabelPair{
Name: mutils.String2Pointer("kube_uid"),
Value: mutils.String2Pointer(sandboxMetadata.uid),
},
&dto.LabelPair{
Name: mutils.String2Pointer("kube_name"),
Value: mutils.String2Pointer(sandboxMetadata.name),
},
&dto.LabelPair{
Name: mutils.String2Pointer("kube_namespace"),
Value: mutils.String2Pointer(sandboxMetadata.namespace),
},
)
}

// Kata shim are using prometheus go client, add a prefix for metric name to avoid confusing
Expand Down
12 changes: 10 additions & 2 deletions src/runtime/pkg/kata-monitor/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ ttt 999
func TestParsePrometheusMetrics(t *testing.T) {
assert := assert.New(t)
sandboxID := "sandboxID-abc"
sandboxMetadata := sandboxKubeData{"123", "pod-name", "pod-namespace"}

// parse metrics
list, err := parsePrometheusMetrics(sandboxID, []byte(shimMetricBody))
list, err := parsePrometheusMetrics(sandboxID, sandboxMetadata, []byte(shimMetricBody))
assert.Nil(err, "parsePrometheusMetrics should not return error")

assert.Equal(4, len(list), "should return 3 metric families")
Expand All @@ -56,9 +57,16 @@ func TestParsePrometheusMetrics(t *testing.T) {

// get the metric
m := mf.Metric[0]
assert.Equal(1, len(m.Label), "should have only 1 labels")
assert.Equal(4, len(m.Label), "should have 4 labels")
assert.Equal("sandbox_id", *m.Label[0].Name, "label name should be sandbox_id")
assert.Equal(sandboxID, *m.Label[0].Value, "label value should be", sandboxID)
assert.Equal("kube_uid", *m.Label[1].Name, "label name should be kube_uid")
assert.Equal(sandboxMetadata.uid, *m.Label[1].Value, "label value should be", sandboxMetadata.uid)

assert.Equal("kube_name", *m.Label[2].Name, "label name should be kube_name")
assert.Equal(sandboxMetadata.name, *m.Label[2].Value, "label value should be", sandboxMetadata.name)
assert.Equal("kube_namespace", *m.Label[3].Name, "label name should be kube_namespace")
assert.Equal(sandboxMetadata.namespace, *m.Label[3].Value, "label value should be", sandboxMetadata.namespace)

summary := m.Summary
assert.NotNil(summary, "summary should not be nil")
Expand Down
70 changes: 50 additions & 20 deletions src/runtime/pkg/kata-monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func NewKataMonitor(runtimeEndpoint string) (*KataMonitor, error) {
runtimeEndpoint: runtimeEndpoint,
sandboxCache: &sandboxCache{
Mutex: &sync.Mutex{},
sandboxes: make(map[string]bool),
sandboxes: make(map[string]sandboxKubeData),
},
}

Expand All @@ -65,6 +65,15 @@ func NewKataMonitor(runtimeEndpoint string) (*KataMonitor, error) {
return km, nil
}

func removeFromSandboxList(sandboxList []string, sandboxToRemove string) []string {
for i, sandbox := range sandboxList {
if sandbox == sandboxToRemove {
return append(sandboxList[:i], sandboxList[i+1:]...)
}
}
return sandboxList
}

// startPodCacheUpdater will boot a thread to manage sandbox cache
func (km *KataMonitor) startPodCacheUpdater() {
sbsWatcher, err := fsnotify.NewWatcher()
Expand All @@ -84,9 +93,24 @@ func (km *KataMonitor) startPodCacheUpdater() {
monitorLog.Debugf("started fs monitoring @%s", getSandboxFS())
break
}
// we refresh the pod cache once if we get multiple add/delete pod events in a short time (< podCacheRefreshDelaySeconds)
// Initial sync with the kata sandboxes already running
sbsFile, err := os.Open(getSandboxFS())
if err != nil {
monitorLog.WithError(err).Fatal("cannot open sandboxes fs")
os.Exit(1)
}
sandboxList, err := sbsFile.Readdirnames(0)
if err != nil {
monitorLog.WithError(err).Fatal("cannot read sandboxes fs")
os.Exit(1)
}
monitorLog.Debug("initial sync of sbs directory completed")
monitorLog.Tracef("pod list from sbs: %v", sandboxList)

// We should get kubernetes metadata from the container manager for each new kata sandbox we detect.
// It may take a while for data to be available, so we always wait podCacheRefreshDelaySeconds before checking.
cacheUpdateTimer := time.NewTimer(podCacheRefreshDelaySeconds * time.Second)
cacheUpdateTimerWasSet := false
cacheUpdateTimerIsSet := true
for {
select {
case event, ok := <-sbsWatcher.Events:
Expand All @@ -99,11 +123,18 @@ func (km *KataMonitor) startPodCacheUpdater() {
case fsnotify.Create:
splitPath := strings.Split(event.Name, string(os.PathSeparator))
id := splitPath[len(splitPath)-1]
if !km.sandboxCache.putIfNotExists(id, true) {
if !km.sandboxCache.putIfNotExists(id, sandboxKubeData{}) {
monitorLog.WithField("pod", id).Warn(
"CREATE event but pod already present in the sandbox cache")
}
sandboxList = append(sandboxList, id)
monitorLog.WithField("pod", id).Info("sandbox cache: added pod")
if !cacheUpdateTimerIsSet {
cacheUpdateTimer.Reset(podCacheRefreshDelaySeconds * time.Second)
cacheUpdateTimerIsSet = true
monitorLog.Debugf(
"cache update timer fires in %d secs", podCacheRefreshDelaySeconds)
}

case fsnotify.Remove:
splitPath := strings.Split(event.Name, string(os.PathSeparator))
Expand All @@ -112,28 +143,27 @@ func (km *KataMonitor) startPodCacheUpdater() {
monitorLog.WithField("pod", id).Warn(
"REMOVE event but pod was missing from the sandbox cache")
}
sandboxList = removeFromSandboxList(sandboxList, id)
monitorLog.WithField("pod", id).Info("sandbox cache: removed pod")

default:
monitorLog.WithField("event", event).Warn("got unexpected fs event")
}

// While we process fs events directly to update the sandbox cache we need to sync with the
// container engine to ensure we are on sync with it: we can get out of sync in environments
// where kata workloads can be started by other processes than the container engine.
cacheUpdateTimerWasSet = cacheUpdateTimer.Reset(podCacheRefreshDelaySeconds * time.Second)
monitorLog.WithField("was reset", cacheUpdateTimerWasSet).Debugf(
"cache update timer fires in %d secs", podCacheRefreshDelaySeconds)

case <-cacheUpdateTimer.C:
sandboxes, err := km.getSandboxes(km.sandboxCache.getAllSandboxes())
cacheUpdateTimerIsSet = false
monitorLog.WithField("pod list", sandboxList).Debugf(
"retrieve pods metadata from the container manager")
sandboxList, err = km.syncSandboxes(sandboxList)
if err != nil {
monitorLog.WithError(err).Error("failed to get sandboxes")
monitorLog.WithError(err).Error("failed to get sandboxes metadata")
continue
}
monitorLog.WithField("count", len(sandboxes)).Info("synced sandbox cache with the container engine")
monitorLog.WithField("sandboxes", sandboxes).Debug("dump sandbox cache")
km.sandboxCache.set(sandboxes)
if len(sandboxList) > 0 {
monitorLog.WithField("sandboxes", sandboxList).Debugf(
"%d sandboxes still miss metadata", len(sandboxList))
cacheUpdateTimer.Reset(podCacheRefreshDelaySeconds * time.Second)
cacheUpdateTimerIsSet = true
}

monitorLog.WithField("sandboxes", km.sandboxCache.getSandboxList()).Trace("dump sandbox cache")
}
}
}
Expand All @@ -157,7 +187,7 @@ func (km *KataMonitor) GetAgentURL(w http.ResponseWriter, r *http.Request) {

// ListSandboxes list all sandboxes running in Kata
func (km *KataMonitor) ListSandboxes(w http.ResponseWriter, r *http.Request) {
sandboxes := km.sandboxCache.getKataSandboxes()
sandboxes := km.sandboxCache.getSandboxList()
for _, s := range sandboxes {
w.Write([]byte(fmt.Sprintf("%s\n", s)))
}
Expand Down
Loading

0 comments on commit 1e9f3c8

Please sign in to comment.