Skip to content

Conversation

@ueshin
Copy link
Member

@ueshin ueshin commented Oct 21, 2025

What changes were proposed in this pull request?

Adds basic Python worker logging support.

The logs from Python's standard logger or print to stdout and stderr will be in the system.session.python_worker_logs view.

  • spark.sql.pyspark.worker.logging.enabled (False by default)
    When set to true, this configuration enables comprehensive logging within Python worker processes that execute User-Defined Functions (UDFs), User-Defined Table Functions (UDTFs), and other Python-based operations in Spark SQL.

For example:

>>> from pyspark.sql.functions import *
>>> import logging
>>>
>>> @udf
... def logging_test_udf(x):
...     logger = logging.getLogger("test")
...     logger.setLevel(logging.INFO)
...     logger.info(f"INFO level message: {x}")
...     print(f"PRINT(STDOUT): {x}")  # INFO level, logger is "stdout"
...     print(f"PRINT(STDERR): {x}", file=sys.stderr)  # ERROR level, logger is "stderr"
...     try:
...         1 / x
...     except:
...         logger.exception(f"1 / {x}")
...     return str(x)
...
>>> spark.conf.set("spark.sql.pyspark.worker.logging.enabled", True)
>>>
>>> spark.range(2).select(logging_test_udf("id")).show()
+-----+
|f(id)|
+-----+
|    0|
|    1|
+-----+

>>> spark.table("system.session.python_worker_logs").orderBy("ts").show(truncate=False)
+--------------------------+-----+---------------------+-------------------------------+-----------------------------------------------------------------------------+------+
|ts                        |level|msg                  |context                        |exception                                                                    |logger|
+--------------------------+-----+---------------------+-------------------------------+-----------------------------------------------------------------------------+------+
|2025-10-21 17:22:01.862654|INFO |INFO level message: 0|{func_name -> logging_test_udf}|NULL                                                                         |test  |
|2025-10-21 17:22:01.863826|INFO |INFO level message: 1|{func_name -> logging_test_udf}|NULL                                                                         |test  |
|2025-10-21 17:22:01.86505 |INFO |PRINT(STDOUT): 0     |{func_name -> logging_test_udf}|NULL                                                                         |stdout|
|2025-10-21 17:22:01.865827|INFO |PRINT(STDOUT): 1     |{func_name -> logging_test_udf}|NULL                                                                         |stdout|
|2025-10-21 17:22:01.87052 |ERROR|PRINT(STDERR): 0     |{func_name -> logging_test_udf}|NULL                                                                         |stderr|
|2025-10-21 17:22:01.871405|ERROR|PRINT(STDERR): 1     |{func_name -> logging_test_udf}|NULL                                                                         |stderr|
|2025-10-21 17:22:01.87188 |ERROR|1 / 0                |{func_name -> logging_test_udf}|{ZeroDivisionError, division by zero, [{NULL, logging_test_udf, <stdin>, 8}]}|test  |
+--------------------------+-----+---------------------+-------------------------------+-----------------------------------------------------------------------------+------+

Why are the changes needed?

The logging in UDF is difficult to collect the logs as they will go to the executor's stderr file.
If there are many executors, need to check the stderr files one-by-one.

Does this PR introduce any user-facing change?

Yes, Python UDF logging is available and collect them via a system view.

How was this patch tested?

Added the related tests.

Was this patch authored or co-authored using generative AI tooling?

No.

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I love this!

@ueshin ueshin changed the title [SPARK-53975][PYTHON] Adds basic logging support [SPARK-53975][PYTHON] Adds basic Python worker logging support Oct 22, 2025
@cloud-fan
Copy link
Contributor

cloud-fan commented Oct 22, 2025

should the log be query-centric instead of worker-centric? How can I find logs for a certain query?

Comment on lines +219 to +220
- func_name: Name of the function that initiated the logging
- class_name: Name of the class that initiated the logging if available
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we consider add module name into context?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can add it if necessary.

Copy link
Contributor

@allisonwang-db allisonwang-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is super awesome! Thank for working on it!

Comment on lines +125 to +126
writer.writeLog(
PythonWorkerLogLine(System.currentTimeMillis(), seqId.getAndIncrement(), json)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to limit the number of lines written to block manager?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @ivoson

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @ivoson @cloud-fan this is important as we don't want users to write unlimited number of logs into block manager.

Copy link
Member Author

@ueshin ueshin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan

should the log be query-centric instead of worker-centric? How can I find logs for a certain query?

Do we have any info to identify a query, like query_id in the executor?
If we have, I can add it to context, then we can query with context.query_id = 'xxx'.

Comment on lines +125 to +126
writer.writeLog(
PythonWorkerLogLine(System.currentTimeMillis(), seqId.getAndIncrement(), json)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @ivoson

Comment on lines +219 to +220
- func_name: Name of the function that initiated the logging
- class_name: Name of the class that initiated the logging if available
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can add it if necessary.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM (Pending CIs). Thank you for updating the PR.

Please rebase the PR to the master branch because master branch was broken Today for a while and recovered now, @ueshin .

Copy link
Contributor

@allisonwang-db allisonwang-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great! Very excited for this feature!

Comment on lines +125 to +126
writer.writeLog(
PythonWorkerLogLine(System.currentTimeMillis(), seqId.getAndIncrement(), json)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @ivoson @cloud-fan this is important as we don't want users to write unlimited number of logs into block manager.

@ueshin
Copy link
Member Author

ueshin commented Oct 24, 2025

cc @cloud-fan for another look.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants