Skip to content

Commit d8c1303

Browse files
committed
WIP: Log streaming API
1 parent 0b81110 commit d8c1303

File tree

6 files changed

+211
-2
lines changed

6 files changed

+211
-2
lines changed

cmd/icinga-kubernetes/main.go

+8
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"github.com/icinga/icinga-go-library/driver"
88
"github.com/icinga/icinga-go-library/logging"
99
"github.com/icinga/icinga-kubernetes/internal"
10+
"github.com/icinga/icinga-kubernetes/pkg/api"
1011
"github.com/icinga/icinga-kubernetes/pkg/contracts"
1112
"github.com/icinga/icinga-kubernetes/pkg/schema"
1213
"github.com/icinga/icinga-kubernetes/pkg/sync"
@@ -157,6 +158,13 @@ func main() {
157158
return nodeMetricSync.Clean(ctx, forwardDeleteNodesToMetricChannel)
158159
})
159160

161+
// stream log api
162+
logStreamApi := api.NewLogStreamApi(k, logs.GetChildLogger("LogStreamApi"), &cfg.Api.Log)
163+
164+
g.Go(func() error {
165+
return logStreamApi.Stream(ctx)
166+
})
167+
160168
if err := g.Wait(); err != nil {
161169
logging.Fatal(errors.Wrap(err, "can't sync"))
162170
}

config.example.yml

+8
Original file line numberDiff line numberDiff line change
@@ -39,3 +39,11 @@ logging:
3939
# Valid units are "ms", "s", "m", "h".
4040
# Defaults to "20s".
4141
# interval: 20s
42+
43+
api:
44+
log:
45+
# Address where the api is reachable
46+
address: '/logStream'
47+
48+
# Port where the api is reachable
49+
port: 8080

internal/config.go

+6
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,14 @@ package internal
33
import (
44
"github.com/icinga/icinga-go-library/database"
55
"github.com/icinga/icinga-go-library/logging"
6+
"github.com/icinga/icinga-kubernetes/pkg/api"
67
)
78

89
// Config defines Icinga Kubernetes config.
910
type Config struct {
1011
Database database.Config `yaml:"database"`
1112
Logging logging.Config `yaml:"logging"`
13+
Api api.Config `yaml:"api"`
1214
}
1315

1416
// Validate checks constraints in the supplied configuration and returns an error if they are violated.
@@ -21,5 +23,9 @@ func (c *Config) Validate() error {
2123
return err
2224
}
2325

26+
if err := c.Api.Validate(); err != nil {
27+
28+
}
29+
2430
return nil
2531
}

pkg/api/config.go

+13
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package api
2+
3+
type Config struct {
4+
Log LogStreamApiConfig `yaml:"log"`
5+
}
6+
7+
func (c *Config) Validate() error {
8+
if err := c.Log.Validate(); err != nil {
9+
return err
10+
}
11+
12+
return nil
13+
}

pkg/api/log.go

+174
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
package api
2+
3+
import (
4+
"bufio"
5+
"context"
6+
"encoding/json"
7+
"fmt"
8+
"github.com/icinga/icinga-go-library/logging"
9+
"github.com/pkg/errors"
10+
"io"
11+
v1 "k8s.io/api/core/v1"
12+
kmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
13+
"k8s.io/client-go/kubernetes"
14+
"net/http"
15+
"strconv"
16+
"time"
17+
)
18+
19+
// LogStreamApiConfig stores config for log streaming api
20+
type LogStreamApiConfig struct {
21+
Address string `yaml:"address"`
22+
Port int `yaml:"port"`
23+
}
24+
25+
// Validate validates LogStreamApiConfig
26+
func (c *LogStreamApiConfig) Validate() error {
27+
28+
if c.Address == "" {
29+
return errors.New("address missing")
30+
}
31+
32+
if c.Port < 1 || c.Port > 65536 {
33+
return errors.New("invalid port number")
34+
}
35+
36+
return nil
37+
}
38+
39+
// LogStreamApi streams log per http rest api
40+
type LogStreamApi struct {
41+
clientset *kubernetes.Clientset
42+
logger *logging.Logger
43+
config *LogStreamApiConfig
44+
}
45+
46+
// NewLogStreamApi creates new LogStreamApi initialized with clienset, logger and config
47+
func NewLogStreamApi(clientset *kubernetes.Clientset, logger *logging.Logger, config *LogStreamApiConfig) *LogStreamApi {
48+
return &LogStreamApi{
49+
clientset: clientset,
50+
logger: logger,
51+
config: config,
52+
}
53+
}
54+
55+
// Handle returns HandlerFunc that handles the api
56+
func (lsa *LogStreamApi) Handle() http.HandlerFunc {
57+
return func(w http.ResponseWriter, r *http.Request) {
58+
lsa.HandleHttp(w, r)
59+
lsa.logger.Info("Finished sending response...")
60+
}
61+
}
62+
63+
// HandleHttp handles the api
64+
func (lsa *LogStreamApi) HandleHttp(w http.ResponseWriter, r *http.Request) {
65+
query := r.URL.Query()
66+
namespace := query.Get("namespace")
67+
podName := query.Get("podName")
68+
containerName := query.Get("containerName")
69+
lastTimestampParam, err := strconv.ParseInt(query.Get("lastTimestamp"), 10, 64)
70+
if err != nil {
71+
fmt.Println("error getting last timestamp from url:", err)
72+
}
73+
74+
lastTimestamp := kmetav1.Time{Time: time.Unix(lastTimestampParam, 0)}
75+
76+
ctx := r.Context()
77+
flusher, ok := w.(http.Flusher)
78+
if !ok {
79+
http.NotFound(w, r)
80+
return
81+
}
82+
83+
podLogOpts := v1.PodLogOptions{
84+
Container: containerName,
85+
Timestamps: true,
86+
Follow: true,
87+
SinceTime: &lastTimestamp,
88+
}
89+
90+
reader, err := lsa.clientset.CoreV1().Pods(namespace).GetLogs(podName, &podLogOpts).Stream(ctx)
91+
if err != nil {
92+
w.WriteHeader(http.StatusInternalServerError)
93+
flusher.Flush()
94+
return
95+
}
96+
97+
// Send the initial headers saying we're gonna stream the response.
98+
w.Header().Set("Transfer-Encoding", "chunked")
99+
w.WriteHeader(http.StatusOK)
100+
flusher.Flush()
101+
102+
enc := json.NewEncoder(w)
103+
104+
stringReader := bufio.NewReader(reader)
105+
106+
lsa.logger.Info("Client started listening...")
107+
108+
for {
109+
select {
110+
case <-ctx.Done():
111+
lsa.logger.Info("Client stopped listening")
112+
return
113+
default:
114+
msg, err := stringReader.ReadString('\n')
115+
if err != nil {
116+
if err == io.EOF {
117+
break
118+
}
119+
break
120+
}
121+
122+
// Send some data
123+
err = enc.Encode(msg)
124+
if err != nil {
125+
lsa.logger.Fatal(err)
126+
}
127+
128+
flusher.Flush()
129+
}
130+
}
131+
}
132+
133+
// Stream starts a local webserver and provides the api
134+
func (lsa *LogStreamApi) Stream(ctx context.Context) (err error) {
135+
136+
mux := http.NewServeMux()
137+
mux.Handle(lsa.config.Address, lsa.Handle())
138+
139+
srv := &http.Server{
140+
Addr: ":" + strconv.Itoa(lsa.config.Port),
141+
Handler: mux,
142+
}
143+
144+
go func() {
145+
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
146+
lsa.logger.Fatalf("listen:%+s\n", err)
147+
}
148+
}()
149+
150+
lsa.logger.Info("Starting stream")
151+
152+
select {
153+
case <-ctx.Done():
154+
lsa.logger.Info("Stopped")
155+
//
156+
ctxShutDown, cancel := context.WithCancel(context.Background())
157+
defer func() {
158+
cancel()
159+
}()
160+
161+
err := srv.Shutdown(ctxShutDown)
162+
if err != nil {
163+
lsa.logger.Fatalf("Shutdown Failed:%+s", err)
164+
}
165+
166+
lsa.logger.Info("Exited properly")
167+
168+
if err == http.ErrServerClosed {
169+
err = nil
170+
}
171+
172+
return err
173+
}
174+
}

schema/mysql/schema.sql

+2-2
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@ CREATE TABLE log
4343
id BINARY(20) NOT NULL,
4444
reference_id BINARY(20) NOT NULL,
4545
container_name VARCHAR(255) COLLATE utf8mb4_unicode_ci NOT NULL,
46-
time LONGTEXT NOT NULL,
47-
log LONGTEXT NOT NULL,
46+
time LONGTEXT,
47+
log LONGTEXT,
4848
PRIMARY KEY (id)
4949
) ENGINE = InnoDB
5050
DEFAULT CHARSET = utf8mb4

0 commit comments

Comments
 (0)