Skip to content

Commit 88d11f8

Browse files
authored
✨ Add live pod/container log collection. (#770)
Add _live_ pod/container log collector. The overhead of appending the container logs is low and is helpful when troubleshooting. The logs are collected as they are written instead of at the end after the pod has terminated. The log collection needs to be _resumable_ for cases where the hub is restarted while collecting. Flow: 1. Task manager creates the pod. 2. Each time task pod's status is evaluated, manager will ensure a collector is running (until the task has completed) for each container. 3. The collector (goroutine) will self-terminate when getting EOF tailing the log. --- Also, improves the pod.yaml generated for the pod snapshot. Example pod.yaml. ``` --- metadata: creationTimestamp: "2024-12-18T21:19:20Z" generateName: task-24- labels: app: tackle role: task task: "24" name: task-24-qq7xz namespace: konveyor-tackle ownerReferences: - apiVersion: tackle.konveyor.io/v1alpha1 kind: Tackle name: tackle uid: ff2b1d47-83a0-494c-a998-1a5cee4c40c1 resourceVersion: "173511" uid: c4a0baf1-aaab-4d6b-89c4-90f23cf32fef spec: containers: - env: - name: ADDON_HOME value: /addon - name: SHARED_PATH value: /shared - name: CACHE_PATH value: /cache - name: HUB_BASE_URL value: http://tackle-hub.konveyor-tackle.svc:8080 - name: TASK value: "24" - name: TOKEN valueFrom: secretKeyRef: key: TOKEN name: task-24-sk82v image: quay.io/konveyor/tackle2-addon-discovery:latest imagePullPolicy: Always name: addon resources: limits: cpu: 500m memory: 512Mi requests: cpu: 500m memory: 512Mi securityContext: runAsUser: 1001 terminationMessagePath: /dev/termination-log terminationMessagePolicy: File volumeMounts: - mountPath: /addon name: addon - mountPath: /shared name: shared - mountPath: /cache name: cache - mountPath: /var/run/secrets/kubernetes.io/serviceaccount name: kube-api-access-drxg4 readOnly: true dnsPolicy: ClusterFirst enableServiceLinks: true nodeName: main preemptionPolicy: PreemptLowerPriority priority: 0 restartPolicy: Never schedulerName: default-scheduler securityContext: {} serviceAccount: tackle-hub serviceAccountName: tackle-hub terminationGracePeriodSeconds: 30 tolerations: - effect: NoExecute key: node.kubernetes.io/not-ready operator: Exists tolerationSeconds: 300 - effect: NoExecute key: node.kubernetes.io/unreachable operator: Exists tolerationSeconds: 300 volumes: - emptyDir: {} name: addon - emptyDir: {} name: shared - name: cache persistentVolumeClaim: claimName: tackle-cache-volume-claim - name: kube-api-access-drxg4 projected: defaultMode: 420 sources: - serviceAccountToken: expirationSeconds: 3607 path: token - configMap: items: - key: ca.crt path: ca.crt name: kube-root-ca.crt - downwardAPI: items: - fieldRef: apiVersion: v1 fieldPath: metadata.namespace path: namespace status: conditions: - lastProbeTime: null lastTransitionTime: "2024-12-18T21:19:23Z" status: "True" type: PodReadyToStartContainers - lastProbeTime: null lastTransitionTime: "2024-12-18T21:19:20Z" reason: PodCompleted status: "True" type: Initialized - lastProbeTime: null lastTransitionTime: "2024-12-18T21:19:25Z" reason: PodCompleted status: "False" type: Ready - lastProbeTime: null lastTransitionTime: "2024-12-18T21:19:25Z" reason: PodCompleted status: "False" type: ContainersReady - lastProbeTime: null lastTransitionTime: "2024-12-18T21:19:20Z" status: "True" type: PodScheduled containerStatuses: - containerID: docker://21d9f93d9b25a54b96e58dcb30127c8a54553f370cff5b52982735314c530d31 image: quay.io/konveyor/tackle2-addon-discovery:latest imageID: docker-pullable://quay.io/konveyor/tackle2-addon-discovery@sha256:74aeb44254736057a52085aa6d070c3ec62614338e680acb75cdf33e5b06802e lastState: {} name: addon ready: false restartCount: 0 started: false state: terminated: containerID: docker://21d9f93d9b25a54b96e58dcb30127c8a54553f370cff5b52982735314c530d31 exitCode: 0 finishedAt: "2024-12-18T21:19:24Z" reason: Completed startedAt: "2024-12-18T21:19:22Z" hostIP: 192.168.49.2 phase: Running podIP: 10.244.0.81 podIPs: - ip: 10.244.0.81 qosClass: Guaranteed startTime: "2024-12-18T21:19:20Z" --- Events: | Type Reason Age Reporter Message ------- ---------- ----- ------------------ ------- Normal Scheduled 0s default-scheduler Successfully assigned konveyor-tackle/task-24-qq7xz to main Normal Pulling 0s kubelet Pulling image "quay.io/konveyor/tackle2-addon-discovery:latest" Normal Pulled 0s kubelet Successfully pulled image "quay.io/konveyor/tackle2-addon-discovery:latest" in 1.93s (1.93s including waiting) Normal Created 0s kubelet Created container addon Normal Started 0s kubelet Started container addon ``` --------- Signed-off-by: Jeff Ortel <[email protected]>
1 parent 5040af2 commit 88d11f8

File tree

3 files changed

+354
-93
lines changed

3 files changed

+354
-93
lines changed

task/collector.go

+242
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,242 @@
1+
package task
2+
3+
import (
4+
"context"
5+
"io"
6+
"os"
7+
"sync"
8+
9+
liberr "github.com/jortel/go-utils/error"
10+
k8s2 "github.com/konveyor/tackle2-hub/k8s"
11+
"github.com/konveyor/tackle2-hub/model"
12+
"gorm.io/gorm"
13+
core "k8s.io/api/core/v1"
14+
)
15+
16+
// LogManager manages log collectors.
17+
type LogManager struct {
18+
// collector registry.
19+
collector map[string]*LogCollector
20+
// mutex
21+
mutex sync.Mutex
22+
// DB
23+
DB *gorm.DB
24+
}
25+
26+
// EnsureCollection - ensure each container has a log collector.
27+
func (m *LogManager) EnsureCollection(task *Task, pod *core.Pod, ctx context.Context) (err error) {
28+
m.mutex.Lock()
29+
defer m.mutex.Unlock()
30+
for _, container := range pod.Status.ContainerStatuses {
31+
if container.State.Waiting != nil {
32+
continue
33+
}
34+
collector := &LogCollector{
35+
Owner: m,
36+
Registry: m.collector,
37+
DB: m.DB,
38+
Pod: pod,
39+
Container: &container,
40+
}
41+
key := collector.key()
42+
if _, found := m.collector[key]; found {
43+
continue
44+
}
45+
err = collector.Begin(task, ctx)
46+
if err != nil {
47+
return
48+
}
49+
m.collector[key] = collector
50+
}
51+
return
52+
}
53+
54+
// terminated provides notification that a collector has terminated.
55+
func (m *LogManager) terminated(collector *LogCollector) {
56+
m.mutex.Lock()
57+
defer m.mutex.Unlock()
58+
for i := range m.collector {
59+
if collector == m.collector[i] {
60+
key := collector.key()
61+
delete(m.collector, key)
62+
break
63+
}
64+
}
65+
}
66+
67+
// LogCollector collect and report container logs.
68+
type LogCollector struct {
69+
Owner *LogManager
70+
Registry map[string]*LogCollector
71+
DB *gorm.DB
72+
Pod *core.Pod
73+
Container *core.ContainerStatus
74+
//
75+
nBuf int
76+
nSkip int64
77+
}
78+
79+
// Begin - get container log and store in file.
80+
// - Request logs.
81+
// - Create file resource and attach to the task.
82+
// - Register collector.
83+
// - Write (copy) log.
84+
// - Unregister collector.
85+
func (r *LogCollector) Begin(task *Task, ctx context.Context) (err error) {
86+
reader, err := r.request(ctx)
87+
if err != nil {
88+
return
89+
}
90+
f, err := r.file(task)
91+
if err != nil {
92+
return
93+
}
94+
go func() {
95+
defer func() {
96+
_ = reader.Close()
97+
_ = f.Close()
98+
r.Owner.terminated(r)
99+
}()
100+
err := r.copy(reader, f)
101+
Log.Error(err, "")
102+
}()
103+
return
104+
}
105+
106+
// key returns the collector key.
107+
func (r *LogCollector) key() (key string) {
108+
key = r.Pod.Name + "." + r.Container.Name
109+
return
110+
}
111+
112+
// request logs from k8s.
113+
func (r *LogCollector) request(ctx context.Context) (reader io.ReadCloser, err error) {
114+
options := &core.PodLogOptions{
115+
Container: r.Container.Name,
116+
Follow: true,
117+
}
118+
clientSet, err := k8s2.NewClientSet()
119+
if err != nil {
120+
return
121+
}
122+
podClient := clientSet.CoreV1().Pods(Settings.Hub.Namespace)
123+
req := podClient.GetLogs(r.Pod.Name, options)
124+
reader, err = req.Stream(ctx)
125+
if err != nil {
126+
err = liberr.Wrap(err)
127+
return
128+
}
129+
return
130+
}
131+
132+
// name returns the canonical name for the container log.
133+
func (r *LogCollector) name() (s string) {
134+
s = r.Container.Name + ".log"
135+
return
136+
}
137+
138+
// file returns an attached log file for writing.
139+
func (r *LogCollector) file(task *Task) (f *os.File, err error) {
140+
f, found, err := r.find(task)
141+
if found || err != nil {
142+
return
143+
}
144+
f, err = r.create(task)
145+
return
146+
}
147+
148+
// find finds and opens an attached log file by name.
149+
func (r *LogCollector) find(task *Task) (f *os.File, found bool, err error) {
150+
var file model.File
151+
name := r.name()
152+
for _, attached := range task.Attached {
153+
if attached.Name == name {
154+
found = true
155+
err = r.DB.First(&file, attached.ID).Error
156+
if err != nil {
157+
err = liberr.Wrap(err)
158+
return
159+
}
160+
}
161+
}
162+
if !found {
163+
return
164+
}
165+
f, err = os.OpenFile(file.Path, os.O_RDONLY|os.O_APPEND, 0666)
166+
if err != nil {
167+
err = liberr.Wrap(err)
168+
return
169+
}
170+
st, err := f.Stat()
171+
if err != nil {
172+
err = liberr.Wrap(err)
173+
return
174+
}
175+
r.nSkip = st.Size()
176+
return
177+
}
178+
179+
// create creates and attaches the log file.
180+
func (r *LogCollector) create(task *Task) (f *os.File, err error) {
181+
file := &model.File{Name: r.name()}
182+
err = r.DB.Create(file).Error
183+
if err != nil {
184+
err = liberr.Wrap(err)
185+
return
186+
}
187+
f, err = os.Create(file.Path)
188+
if err != nil {
189+
_ = r.DB.Delete(file)
190+
err = liberr.Wrap(err)
191+
return
192+
}
193+
task.attach(file)
194+
return
195+
}
196+
197+
// copy data.
198+
// The read bytes are discarded when smaller than nSkip.
199+
// The offset is adjusted when to account for the buffer
200+
// containing bytes to be skipped and written.
201+
func (r *LogCollector) copy(reader io.ReadCloser, writer io.Writer) (err error) {
202+
if r.nBuf < 1 {
203+
r.nBuf = 4096
204+
}
205+
buf := make([]byte, r.nBuf)
206+
for {
207+
n := 0
208+
n, err = reader.Read(buf)
209+
if err != nil {
210+
if err == io.EOF {
211+
err = nil
212+
}
213+
break
214+
}
215+
nRead := int64(n)
216+
if nRead == 0 {
217+
continue
218+
}
219+
offset := int64(0)
220+
if r.nSkip > 0 {
221+
if nRead > r.nSkip {
222+
offset = r.nSkip
223+
r.nSkip = 0
224+
} else {
225+
r.nSkip -= nRead
226+
continue
227+
}
228+
}
229+
b := buf[offset:nRead]
230+
_, err = writer.Write(b)
231+
if err != nil {
232+
return
233+
}
234+
if f, cast := writer.(*os.File); cast {
235+
err = f.Sync()
236+
if err != nil {
237+
return
238+
}
239+
}
240+
}
241+
return
242+
}

0 commit comments

Comments
 (0)