Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 19 additions & 16 deletions metaflow/runner/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import asyncio
import tempfile
import select
import fcntl
from contextlib import contextmanager
from subprocess import CalledProcessError
from typing import Any, Dict, TYPE_CHECKING, ContextManager, Tuple
Expand Down Expand Up @@ -129,30 +130,32 @@ def read_from_fifo_when_ready(
data = os.read(fifo_fd, 8192)
if data:
content += data
# We got data! Now switch to blocking mode for guaranteed complete reads.
# In blocking mode, read() won't return 0 until writer closes AND all
# kernel buffers are drained - this is POSIX guaranteed.
flags = fcntl.fcntl(fifo_fd, fcntl.F_GETFL)
fcntl.fcntl(fifo_fd, fcntl.F_SETFL, flags & ~os.O_NONBLOCK)

# Now do blocking reads until true EOF
while True:
chunk = os.read(fifo_fd, 8192)
if not chunk:
# True EOF - all data drained
break
content += chunk
# All data read, exit main loop
break
else:
if len(events):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When does this happen? So we got some event (like file close?) and no data?

# We read an EOF -- consider the file done
break
else:
# We had no events (just a timeout) and the read didn't return
# an exception so the file is still open; we continue waiting for data
# On some systems (notably MacOS), even after the file is closed on the
# other end, we may not get a BlockingIOError or proper EOF signal.
# Instead of using an arbitrary timeout, check if the writer process
# has actually exited. If it has and we have content, we can safely
# assume EOF. If the process is still running, continue waiting.
if content and check_process_exited(command_obj):
# Process has exited and we got an empty read with no poll events.
# This is EOF - break out to return the content we've collected.
break
# else: process is still running, continue waiting for more data
pass
except BlockingIOError:
has_blocking_error = True
if content:
# The file was closed
break
# else, if we have no content, we continue waiting for the file to be open
# and written to.
# File not ready yet, continue waiting
pass

if not content and check_process_exited(command_obj):
raise CalledProcessError(command_obj.process.returncode, command_obj.command)
Expand Down
Loading