Skip to content

Commit 5ca0069

Browse files
committed
WIP: Log streaming API
1 parent 0b81110 commit 5ca0069

File tree

6 files changed

+213
-2
lines changed

6 files changed

+213
-2
lines changed

cmd/icinga-kubernetes/main.go

+8
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"github.com/icinga/icinga-go-library/driver"
88
"github.com/icinga/icinga-go-library/logging"
99
"github.com/icinga/icinga-kubernetes/internal"
10+
"github.com/icinga/icinga-kubernetes/pkg/api"
1011
"github.com/icinga/icinga-kubernetes/pkg/contracts"
1112
"github.com/icinga/icinga-kubernetes/pkg/schema"
1213
"github.com/icinga/icinga-kubernetes/pkg/sync"
@@ -157,6 +158,13 @@ func main() {
157158
return nodeMetricSync.Clean(ctx, forwardDeleteNodesToMetricChannel)
158159
})
159160

161+
// stream log api
162+
logStreamApi := api.NewLogStreamApi(k, logs.GetChildLogger("LogStreamApi"), &cfg.Api.Log)
163+
164+
g.Go(func() error {
165+
return logStreamApi.Stream(ctx)
166+
})
167+
160168
if err := g.Wait(); err != nil {
161169
logging.Fatal(errors.Wrap(err, "can't sync"))
162170
}

config.example.yml

+8
Original file line numberDiff line numberDiff line change
@@ -39,3 +39,11 @@ logging:
3939
# Valid units are "ms", "s", "m", "h".
4040
# Defaults to "20s".
4141
# interval: 20s
42+
43+
api:
44+
log:
45+
# Address where the api is reachable
46+
address: '/logStream'
47+
48+
# Port where the api is reachable
49+
port: 8080

internal/config.go

+6
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,14 @@ package internal
33
import (
44
"github.com/icinga/icinga-go-library/database"
55
"github.com/icinga/icinga-go-library/logging"
6+
"github.com/icinga/icinga-kubernetes/pkg/api"
67
)
78

89
// Config defines Icinga Kubernetes config.
910
type Config struct {
1011
Database database.Config `yaml:"database"`
1112
Logging logging.Config `yaml:"logging"`
13+
Api api.Config `yaml:"api"`
1214
}
1315

1416
// Validate checks constraints in the supplied configuration and returns an error if they are violated.
@@ -21,5 +23,9 @@ func (c *Config) Validate() error {
2123
return err
2224
}
2325

26+
if err := c.Api.Validate(); err != nil {
27+
28+
}
29+
2430
return nil
2531
}

pkg/api/config.go

+13
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package api
2+
3+
type Config struct {
4+
Log LogStreamApiConfig `yaml:"log"`
5+
}
6+
7+
func (c *Config) Validate() error {
8+
if err := c.Log.Validate(); err != nil {
9+
return err
10+
}
11+
12+
return nil
13+
}

pkg/api/log.go

+176
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
package api
2+
3+
import (
4+
"bufio"
5+
"context"
6+
"encoding/json"
7+
"fmt"
8+
"github.com/icinga/icinga-go-library/logging"
9+
"github.com/pkg/errors"
10+
"io"
11+
v1 "k8s.io/api/core/v1"
12+
kmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
13+
"k8s.io/client-go/kubernetes"
14+
"net"
15+
"net/http"
16+
"strconv"
17+
"time"
18+
)
19+
20+
// LogStreamApiConfig stores config for log streaming api
21+
type LogStreamApiConfig struct {
22+
Address string `yaml:"address"`
23+
Port int `yaml:"port"`
24+
}
25+
26+
// Validate validates LogStreamApiConfig
27+
func (c *LogStreamApiConfig) Validate() error {
28+
29+
if c.Address == "" {
30+
return errors.New("address missing")
31+
}
32+
33+
if c.Port < 1 || c.Port > 65536 {
34+
return errors.New("invalid port number")
35+
}
36+
37+
return nil
38+
}
39+
40+
// LogStreamApi streams log per http rest api
41+
type LogStreamApi struct {
42+
clientset *kubernetes.Clientset
43+
logger *logging.Logger
44+
config *LogStreamApiConfig
45+
}
46+
47+
// NewLogStreamApi creates new LogStreamApi initialized with clienset, logger and config
48+
func NewLogStreamApi(clientset *kubernetes.Clientset, logger *logging.Logger, config *LogStreamApiConfig) *LogStreamApi {
49+
return &LogStreamApi{
50+
clientset: clientset,
51+
logger: logger,
52+
config: config,
53+
}
54+
}
55+
56+
// Handle returns HandlerFunc that handles the api
57+
func (lsa *LogStreamApi) Handle() http.HandlerFunc {
58+
return func(w http.ResponseWriter, r *http.Request) {
59+
lsa.HandleHttp(w, r)
60+
lsa.logger.Debug("Finished sending response...")
61+
}
62+
}
63+
64+
// HandleHttp handles the api
65+
func (lsa *LogStreamApi) HandleHttp(w http.ResponseWriter, r *http.Request) {
66+
query := r.URL.Query()
67+
namespace := query.Get("namespace")
68+
podName := query.Get("podName")
69+
containerName := query.Get("containerName")
70+
lastTimestampParam, err := strconv.ParseInt(query.Get("lastTimestamp"), 10, 64)
71+
if err != nil {
72+
fmt.Println("error getting last timestamp from url:", err)
73+
}
74+
75+
lastTimestamp := kmetav1.Time{Time: time.Unix(lastTimestampParam, 0)}
76+
77+
ctx := r.Context()
78+
flusher, ok := w.(http.Flusher)
79+
if !ok {
80+
http.NotFound(w, r)
81+
return
82+
}
83+
84+
podLogOpts := v1.PodLogOptions{
85+
Container: containerName,
86+
Timestamps: true,
87+
Follow: true,
88+
SinceTime: &lastTimestamp,
89+
}
90+
91+
reader, err := lsa.clientset.CoreV1().Pods(namespace).GetLogs(podName, &podLogOpts).Stream(ctx)
92+
if err != nil {
93+
w.WriteHeader(http.StatusInternalServerError)
94+
flusher.Flush()
95+
return
96+
}
97+
98+
// Send the initial headers saying we're gonna stream the response.
99+
w.Header().Set("Transfer-Encoding", "chunked")
100+
w.WriteHeader(http.StatusOK)
101+
flusher.Flush()
102+
103+
enc := json.NewEncoder(w)
104+
105+
stringReader := bufio.NewReader(reader)
106+
107+
lsa.logger.Debug("Connected")
108+
109+
for {
110+
select {
111+
case <-ctx.Done():
112+
lsa.logger.Debug("Connection closed")
113+
return
114+
default:
115+
msg, err := stringReader.ReadString('\n')
116+
if err != nil {
117+
if err == io.EOF {
118+
break
119+
}
120+
break
121+
}
122+
123+
// Send some data
124+
err = enc.Encode(msg)
125+
if err != nil {
126+
lsa.logger.Fatal(err)
127+
}
128+
129+
flusher.Flush()
130+
}
131+
}
132+
}
133+
134+
// Stream starts a local webserver and provides the api
135+
func (lsa *LogStreamApi) Stream(ctx context.Context) (err error) {
136+
137+
mux := http.NewServeMux()
138+
mux.Handle(lsa.config.Address, lsa.Handle())
139+
140+
srv := &http.Server{
141+
Addr: ":" + strconv.Itoa(lsa.config.Port),
142+
Handler: mux,
143+
BaseContext: func(net.Listener) context.Context {
144+
return ctx
145+
},
146+
}
147+
148+
go func() {
149+
if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
150+
lsa.logger.Fatalf("listen:%+s\n", err)
151+
}
152+
}()
153+
154+
lsa.logger.Info("Starting stream")
155+
156+
select {
157+
case <-ctx.Done():
158+
lsa.logger.Info("Stopped")
159+
//
160+
ctxShutDown, cancel := context.WithCancel(context.Background())
161+
defer cancel()
162+
163+
err := srv.Shutdown(ctxShutDown)
164+
if err != nil {
165+
lsa.logger.Fatalf("Shutdown Failed:%+s", err)
166+
}
167+
168+
lsa.logger.Info("Exited properly")
169+
170+
if errors.Is(err, http.ErrServerClosed) {
171+
err = nil
172+
}
173+
174+
return err
175+
}
176+
}

schema/mysql/schema.sql

+2-2
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@ CREATE TABLE log
4343
id BINARY(20) NOT NULL,
4444
reference_id BINARY(20) NOT NULL,
4545
container_name VARCHAR(255) COLLATE utf8mb4_unicode_ci NOT NULL,
46-
time LONGTEXT NOT NULL,
47-
log LONGTEXT NOT NULL,
46+
time LONGTEXT,
47+
log LONGTEXT,
4848
PRIMARY KEY (id)
4949
) ENGINE = InnoDB
5050
DEFAULT CHARSET = utf8mb4

0 commit comments

Comments
 (0)