Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 45 additions & 14 deletions airflow-core/tests/integration/otel/test_otel.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ def dag_execution_for_testing_metrics(self, capfd):
try:
# Start the processes here and not as fixtures or in a common setup,
# so that the test can capture their output.
scheduler_process, apiserver_process = self.start_scheduler()
scheduler_process, apiserver_process = self.start_scheduler(capture_output=True)

dag_id = "otel_test_dag"

Expand Down Expand Up @@ -363,10 +363,7 @@ def dag_execution_for_testing_metrics(self, capfd):
"The apiserver process status is None, which means that it hasn't terminated as expected."
)

out, err = capfd.readouterr()
log.info("out-start --\n%s\n-- out-end", out)
log.info("err-start --\n%s\n-- err-end", err)

out, _err = capfd.readouterr()
return out, dag

def _get_ti(self, dag_id: str, run_id: str, task_id: str) -> Any | None:
Expand Down Expand Up @@ -482,9 +479,7 @@ def test_dag_execution_succeeds(self, capfd):
"The apiserver process status is None, which means that it hasn't terminated as expected."
)

out, err = capfd.readouterr()
log.info("out-start --\n%s\n-- out-end", out)
log.info("err-start --\n%s\n-- err-end", err)
capfd.readouterr()

host = "jaeger"
service_name = os.environ.get("OTEL_SERVICE_NAME", "test")
Expand All @@ -508,24 +503,60 @@ def get_parent_span_id(span):

nested = get_span_hierarchy()
assert nested == {
"sub_span1": "task_run.task1",
"hook.on_starting": "startup",
"get_bundle": "parse",
"initialize": "parse",
"_verify_bundle_access": "parse",
"make BundleDagBag": "parse",
"parse": "startup",
"get_template_context": "startup",
"startup": "task_run.task1",
"delete xcom": "run",
"get_template_env": "_prepare",
"render_templates": "_prepare",
"_serialize_rendered_fields": "_prepare",
"set_rendered_fields": "_prepare",
"set_rendered_map_index": "_prepare",
"_validate_task_inlets_and_outlets": "_prepare",
"listener.on_task_instance_running": "_prepare",
"_prepare": "run",
"prepare context": "_execute_task",
"pre-execute": "_execute_task",
"on_execute_callback": "_execute_task",
"execute": "_execute_task",
"post_execute_hook": "_execute_task",
"_execute_task": "run",
"render_map_index": "run",
"push xcom": "run",
"handle success": "run",
"handle_extra_links": "finalize",
"success_callback": "finalize",
"listener.success_callback": "finalize",
"listener.before_stopping": "finalize",
"finalize": "run",
"run": "task_run.task1",
"close_socket": "task_run.task1",
"task_run.task1": "dag_run.otel_test_dag",
"sub_span1": "prepare context",
"dag_run.otel_test_dag": None,
}

def start_scheduler(self):
def start_scheduler(self, capture_output: bool = False):
stdout = None if capture_output else subprocess.DEVNULL
stderr = None if capture_output else subprocess.DEVNULL

scheduler_process = subprocess.Popen(
self.scheduler_command_args,
env=os.environ.copy(),
stdout=None,
stderr=None,
stdout=stdout,
stderr=stderr,
)

apiserver_process = subprocess.Popen(
self.apiserver_command_args,
env=os.environ.copy(),
stdout=None,
stderr=None,
stdout=stdout,
stderr=stderr,
)

# Wait to ensure both processes have started.
Expand Down
Loading
Loading