Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion internal/dcp/bootstrap/dcp_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,14 @@ func DcpRun(
// If we are in server-only mode (no standard controllers) such as when running tests,
// there is no point trying to clean up all resources on shutdown because no actual resources are involved,
// it is all test mocks. Another case to avoid full cleanup is when shutdown request explicitly disables it.
// Hosted services still need to shut down before we return so child processes can release session files.
if serverOnly || !resourceCleanup.IsFull() {
return nil // No cleanup needed, just return
err = shutdownHost()
if err != nil {
log.Error(err, "Failed to shut down hosted services")
return err
}
return nil
}

err = appmgmt.CleanupAllResources(log)
Expand Down
97 changes: 63 additions & 34 deletions internal/logs/containerlogs/container_logstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"io"
"os"
"sync"
"sync/atomic"

"github.com/go-logr/logr"
apiserver_resource "github.com/tilt-dev/tilt-apiserver/pkg/server/builder/resource"
Expand All @@ -31,7 +32,7 @@ import (
)

var (
lastLogStreamID logs.LogStreamID // Used to generate unique log stream IDs for each log stream
lastLogStreamID atomic.Uint64 // Used to generate unique log stream IDs for each log stream
)

type containerLogStreamer struct {
Expand All @@ -50,6 +51,12 @@ type containerLogStreamer struct {
lock *sync.Mutex
}

type containerStreamsToStop struct {
streamKind string
streams []*usvc_io.FollowWriter
cancelStreams func([]*usvc_io.FollowWriter)
}

func NewLogStreamer(log logr.Logger) *containerLogStreamer {
return &containerLogStreamer{
startupLogStreams: make(logs.LogStreamMop),
Expand Down Expand Up @@ -213,13 +220,12 @@ func (c *containerLogStreamer) StreamLogs(
// to account for the fact that ContainerLogSource.CaptureContainerLogs() is an asynchronous operation.
// A few no-data stop retries will allow us to get log lines from a container that we just started capturing logs for, regardless of follow/non-follow mode.
const noDataStopRetries = 3
followWriter := usvc_io.NewFollowWriter(ctx, src, dest, usvc_io.WithNoDataStopRetries(noDataStopRetries))
followWriter := usvc_io.NewFollowWriter(ctx, src, dest, usvc_io.WithNoDataStopRetries(noDataStopRetries), usvc_io.WithCloseSourceOnCancel())

c.lock.Lock()
defer c.lock.Unlock()

streamID := lastLogStreamID + 1
lastLogStreamID = streamID
streamID := logs.LogStreamID(lastLogStreamID.Add(1))

var streams logs.LogStreamMop
switch opts.Source {
Expand All @@ -240,7 +246,6 @@ func (c *containerLogStreamer) StreamLogs(

go func() {
defer cleanup()
defer src.Close()
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this removed and why is it OK not to close the file even though it is referenced by local variable only?

Copy link
Copy Markdown
Collaborator

@karolz-ms karolz-ms May 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I know after reading the whole change... but you tell me 😄

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FollowWriter is closing the source now, but I'm about to make that an explicit FollowWriterOption so that it's more clear why this isn't necessary.


streamLog.V(1).Info("Starting streaming logs to destination ...")
<-followWriter.Done()
Expand Down Expand Up @@ -274,62 +279,91 @@ func (c *containerLogStreamer) OnResourceUpdated(evt apiv1.ResourceWatcherEvent,
return
}

c.lock.Lock()
defer c.lock.Unlock()
var streamsToStop []containerStreamsToStop
var containerLogsToRelease *logs.LogDescriptorSet

if c.containerLogSource == nil {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this removed and why is it OK to remove?

// We haven't completed initialization for any resources yet
return
}
c.lock.Lock()

switch evt.Type {

case watch.Added, watch.Modified:
// "watch.Added" does not necessarily mean that the resource was just created.
// It really means that the resource was added to the watch stream (has been observed for the first time).
deletionRequested := ctr.DeletionTimestamp != nil && !ctr.DeletionTimestamp.IsZero()
if deletionRequested {
if c.containerLogs != nil {
containerLogsToRelease = c.containerLogs
}
}

if ctr.Status.State != apiv1.ContainerStateStarting && ctr.Status.State != apiv1.ContainerStateBuilding {
// If done starting the container, ensure startup logs stop streaming once they reach EOF
stopLogStreamsForContainer(c.startupLogStreams, ctr, "startup", log)
streamsToStop = append(streamsToStop, takeLogStreamsForContainer(c.startupLogStreams, ctr, "startup", logs.DelayStopFollowing))
}

if ctr.Done() {
// If the container is done, ensure standard and system logs stop streaming once they reach EOF
stopLogStreamsForContainer(c.stdioLogStreams, ctr, "stdio", log)
stopLogStreamsForContainer(c.systemLogStreams, ctr, "system", log)
streamsToStop = append(streamsToStop,
takeLogStreamsForContainer(c.stdioLogStreams, ctr, "stdio", logs.DelayStopFollowing),
takeLogStreamsForContainer(c.systemLogStreams, ctr, "system", logs.DelayStopFollowing),
)
}

case watch.Deleted:
// The resource was deleted, ensure any following log streams stop and cleanup their resources
stopLogStreamsForContainer(c.startupLogStreams, ctr, "startup", log)
stopLogStreamsForContainer(c.stdioLogStreams, ctr, "stdio", log)
stopLogStreamsForContainer(c.systemLogStreams, ctr, "system", log)
streamsToStop = append(streamsToStop,
takeLogStreamsForContainer(c.startupLogStreams, ctr, "startup", logs.DelayStopFollowing),
takeLogStreamsForContainer(c.stdioLogStreams, ctr, "stdio", logs.DelayStopFollowing),
takeLogStreamsForContainer(c.systemLogStreams, ctr, "system", logs.DelayStopFollowing),
)

if c.containerLogs != nil {
// Need to stop the log streamer and any log watchers for this container (if any) as it is being deleted.
// It is OK to call ReleaseForResource() if the resource is not in the set, it is a no-op in that case.
c.containerLogs.ReleaseForResource(ctr.UID)
containerLogsToRelease = c.containerLogs
}
}

c.lock.Unlock()

stopLogStreamsForContainer(ctr, log, streamsToStop)
if containerLogsToRelease != nil {
containerLogsToRelease.ReleaseForResource(ctr.UID)
}
}

func stopLogStreamsForContainer(
func takeLogStreamsForContainer(
streams logs.LogStreamMop,
ctr *apiv1.Container,
streamKind string,
log logr.Logger,
) {
cancelStreams func([]*usvc_io.FollowWriter),
) containerStreamsToStop {
if ctrStreams, found := streams[ctr.UID]; found {
delete(streams, ctr.UID)
return containerStreamsToStop{
streamKind: streamKind,
streams: maps.Values(ctrStreams),
cancelStreams: cancelStreams,
}
}

return containerStreamsToStop{}
}

func stopLogStreamsForContainer(ctr *apiv1.Container, log logr.Logger, streamsToStop []containerStreamsToStop) {
for _, streamGroup := range streamsToStop {
if len(streamGroup.streams) == 0 {
continue
}

if log.V(1).Enabled() {
log.V(1).Info(fmt.Sprintf("Stopping %s follow logs for container", streamKind),
log.V(1).Info(fmt.Sprintf("Stopping %s follow logs for container", streamGroup.streamKind),
"Container", ctr.Status.ContainerID,
"StreamCount", len(ctrStreams),
"StreamCount", len(streamGroup.streams),
)
}

logs.DelayCancelFollowStreams(maps.Values(ctrStreams), (*usvc_io.FollowWriter).StopFollow)
streamGroup.cancelStreams(streamGroup.streams)
}
}

Expand All @@ -352,31 +386,26 @@ func appropriatelyCancel(fw *usvc_io.FollowWriter, ctr *apiv1.Container, opts *a
if !follow {
fw.StopFollow() // Will stop writing after first EOF is reached
} else if ctr.Done() {
logs.DelayCancelFollowStreams([]*usvc_io.FollowWriter{fw}, (*usvc_io.FollowWriter).StopFollow)
logs.DelayStopFollowing([]*usvc_io.FollowWriter{fw})
}
}

func (c *containerLogStreamer) Dispose() error {
c.lock.Lock()

for _, w := range maps.FlattenValues(c.startupLogStreams) {
w.StopFollow()
}
streamsToCancel := maps.FlattenValues(c.startupLogStreams)
c.startupLogStreams = make(logs.LogStreamMop)
for _, w := range maps.FlattenValues(c.stdioLogStreams) {
w.StopFollow()
}
streamsToCancel = append(streamsToCancel, maps.FlattenValues(c.stdioLogStreams)...)
c.stdioLogStreams = make(logs.LogStreamMop)
for _, w := range maps.FlattenValues(c.systemLogStreams) {
w.StopFollow()
}
streamsToCancel = append(streamsToCancel, maps.FlattenValues(c.systemLogStreams)...)
c.systemLogStreams = make(logs.LogStreamMop)

lds := c.containerLogs
c.containerLogs = nil

c.lock.Unlock()

logs.CancelFollowStreams(streamsToCancel)
if lds != nil {
return lds.Dispose()
} else {
Expand Down
Loading
Loading