Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Stream container logs #53

Open
wants to merge 1 commit into
base: metrics
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions cmd/icinga-kubernetes/main.go
Original file line number Diff line number Diff line change
@@ -7,6 +7,7 @@ import (
"github.com/icinga/icinga-go-library/driver"
"github.com/icinga/icinga-go-library/logging"
"github.com/icinga/icinga-kubernetes/internal"
"github.com/icinga/icinga-kubernetes/pkg/api"
"github.com/icinga/icinga-kubernetes/pkg/contracts"
"github.com/icinga/icinga-kubernetes/pkg/schema"
"github.com/icinga/icinga-kubernetes/pkg/sync"
@@ -157,6 +158,13 @@ func main() {
return nodeMetricSync.Clean(ctx, forwardDeleteNodesToMetricChannel)
})

// stream log api
logStreamApi := api.NewLogStreamApi(k, logs.GetChildLogger("LogStreamApi"), &cfg.Api.Log)

g.Go(func() error {
return logStreamApi.Stream(ctx)
})

if err := g.Wait(); err != nil {
logging.Fatal(errors.Wrap(err, "can't sync"))
}
8 changes: 8 additions & 0 deletions config.example.yml
Original file line number Diff line number Diff line change
@@ -39,3 +39,11 @@ logging:
# Valid units are "ms", "s", "m", "h".
# Defaults to "20s".
# interval: 20s

api:
log:
# Address where the api is reachable
address: '/logStream'

# Port where the api is reachable
port: 8080
6 changes: 6 additions & 0 deletions internal/config.go
Original file line number Diff line number Diff line change
@@ -3,12 +3,14 @@ package internal
import (
"github.com/icinga/icinga-go-library/database"
"github.com/icinga/icinga-go-library/logging"
"github.com/icinga/icinga-kubernetes/pkg/api"
)

// Config defines Icinga Kubernetes config.
type Config struct {
Database database.Config `yaml:"database"`
Logging logging.Config `yaml:"logging"`
Api api.Config `yaml:"api"`
}

// Validate checks constraints in the supplied configuration and returns an error if they are violated.
@@ -21,5 +23,9 @@ func (c *Config) Validate() error {
return err
}

if err := c.Api.Validate(); err != nil {

}

return nil
}
13 changes: 13 additions & 0 deletions pkg/api/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package api

type Config struct {
Log LogStreamApiConfig `yaml:"log"`
}

func (c *Config) Validate() error {
if err := c.Log.Validate(); err != nil {
return err
}

return nil
}
176 changes: 176 additions & 0 deletions pkg/api/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
package api

import (
"bufio"
"context"
"encoding/json"
"fmt"
"github.com/icinga/icinga-go-library/logging"
"github.com/pkg/errors"
"io"
v1 "k8s.io/api/core/v1"
kmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"net"
"net/http"
"strconv"
"time"
)

// LogStreamApiConfig stores config for log streaming api
type LogStreamApiConfig struct {
Address string `yaml:"address"`
Port int `yaml:"port"`
}

// Validate validates LogStreamApiConfig
func (c *LogStreamApiConfig) Validate() error {

if c.Address == "" {
return errors.New("address missing")
}

if c.Port < 1 || c.Port > 65536 {
return errors.New("invalid port number")
}

return nil
}

// LogStreamApi streams log per http rest api
type LogStreamApi struct {
clientset *kubernetes.Clientset
logger *logging.Logger
config *LogStreamApiConfig
}

// NewLogStreamApi creates new LogStreamApi initialized with clienset, logger and config
func NewLogStreamApi(clientset *kubernetes.Clientset, logger *logging.Logger, config *LogStreamApiConfig) *LogStreamApi {
return &LogStreamApi{
clientset: clientset,
logger: logger,
config: config,
}
}

// Handle returns HandlerFunc that handles the api
func (lsa *LogStreamApi) Handle() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
lsa.HandleHttp(w, r)
lsa.logger.Debug("Finished sending response...")
}
}

// HandleHttp handles the api
func (lsa *LogStreamApi) HandleHttp(w http.ResponseWriter, r *http.Request) {
query := r.URL.Query()
namespace := query.Get("namespace")
podName := query.Get("podName")
containerName := query.Get("containerName")
lastTimestampParam, err := strconv.ParseInt(query.Get("lastTimestamp"), 10, 64)
if err != nil {
fmt.Println("error getting last timestamp from url:", err)
}

lastTimestamp := kmetav1.Time{Time: time.Unix(lastTimestampParam, 0)}

ctx := r.Context()
flusher, ok := w.(http.Flusher)
if !ok {
http.NotFound(w, r)
return
}

podLogOpts := v1.PodLogOptions{
Container: containerName,
Timestamps: true,
Follow: true,
SinceTime: &lastTimestamp,
}

reader, err := lsa.clientset.CoreV1().Pods(namespace).GetLogs(podName, &podLogOpts).Stream(ctx)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
flusher.Flush()
return
}

// Send the initial headers saying we're gonna stream the response.
w.Header().Set("Transfer-Encoding", "chunked")
w.WriteHeader(http.StatusOK)
flusher.Flush()

enc := json.NewEncoder(w)

stringReader := bufio.NewReader(reader)

lsa.logger.Debug("Connected")

for {
select {
case <-ctx.Done():
lsa.logger.Debug("Connection closed")
return
default:
msg, err := stringReader.ReadString('\n')
if err != nil {
if err == io.EOF {
break
}
break
}

// Send some data
err = enc.Encode(msg)
if err != nil {
lsa.logger.Fatal(err)
}

flusher.Flush()
}
}
}

// Stream starts a local webserver and provides the api
func (lsa *LogStreamApi) Stream(ctx context.Context) (err error) {

mux := http.NewServeMux()
mux.Handle(lsa.config.Address, lsa.Handle())

srv := &http.Server{
Addr: ":" + strconv.Itoa(lsa.config.Port),
Handler: mux,
BaseContext: func(net.Listener) context.Context {
return ctx
},
}

go func() {
if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
lsa.logger.Fatalf("listen:%+s\n", err)
}
}()

lsa.logger.Info("Starting stream")

select {
case <-ctx.Done():
lsa.logger.Info("Stopped")
//
ctxShutDown, cancel := context.WithCancel(context.Background())
defer cancel()

err := srv.Shutdown(ctxShutDown)
if err != nil {
lsa.logger.Fatalf("Shutdown Failed:%+s", err)
}

lsa.logger.Info("Exited properly")

if errors.Is(err, http.ErrServerClosed) {
err = nil
}

return err
}
}
4 changes: 2 additions & 2 deletions schema/mysql/schema.sql
Original file line number Diff line number Diff line change
@@ -43,8 +43,8 @@ CREATE TABLE log
id BINARY(20) NOT NULL,
reference_id BINARY(20) NOT NULL,
container_name VARCHAR(255) COLLATE utf8mb4_unicode_ci NOT NULL,
time LONGTEXT NOT NULL,
log LONGTEXT NOT NULL,
time LONGTEXT,
log LONGTEXT,
PRIMARY KEY (id)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4