Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/utils/log/file_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,7 @@ def _read(

# skip log stream until the last position
if metadata and "log_pos" in metadata:
islice(out_stream, metadata["log_pos"])
out_stream = islice(out_stream, metadata["log_pos"], None)
else:
# first time reading log, add messages before interleaved log stream
out_stream = chain(header, out_stream)
Expand Down
29 changes: 29 additions & 0 deletions airflow-core/tests/unit/utils/test_log_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,35 @@ def test__read_when_local(self, mock_read_local, create_task_instance):
assert extract_events(log_handler_output_stream) == ["the log"]
assert metadata == {"end_of_log": True, "log_pos": 1}

@patch("airflow.utils.log.file_task_handler.FileTaskHandler._read_from_local")
def test__read_when_local_respects_log_pos_metadata(self, mock_read_local, create_task_instance):
path = Path(
"dag_id=dag_for_testing_local_log_read/run_id=scheduled__2016-01-01T00:00:00+00:00/task_id=task_for_testing_local_log_read/attempt=1.log"
)
mock_read_local.return_value = (
["the messages"],
[convert_list_to_stream(["line 1", "line 2", "line 3"])],
)
local_log_file_read = create_task_instance(
dag_id="dag_for_testing_local_log_read",
task_id="task_for_testing_local_log_read",
run_type=DagRunType.SCHEDULED,
logical_date=DEFAULT_DATE,
)
fth = FileTaskHandler("")

log_handler_output_stream, metadata = fth._read(
ti=local_log_file_read,
try_number=1,
metadata={"log_pos": 2},
)

mock_read_local.assert_called_with(path)

# Should resume from the third line only.
assert extract_events(log_handler_output_stream) == ["line 3"]
assert metadata == {"end_of_log": True, "log_pos": 3}

def test__read_from_local(self, tmp_path):
"""Tests the behavior of method _read_from_local"""
path1 = tmp_path / "hello1.log"
Expand Down
Loading