Skip to content

Added s3 offloading technique for RDD #51042

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

Rafig9501
Copy link

@Rafig9501 Rafig9501 commented May 28, 2025

What changes were proposed in this pull request?

This PR introduces a new feature to offload the intermediate results of RDD.collect() to S3-compatible storage (e.g., MinIO) to reduce driver memory pressure during large result collection. The enhancement adds logic to:

  • Write partition results to S3 in the task phase.
  • Stream data from S3 in the driver phase to construct the final collected array.
  • Clean up temporary session directories based on configuration.

The implementation introduces the following new configuration flags:

  • spark.rdd.collect.offloadToS3.enabled – enables/disables the offloading logic.
  • spark.rdd.collect.s3.path – sets the target S3 path for temporary data.
  • spark.rdd.collect.s3.cleanup – controls whether S3 offload data should be cleaned up after collection.

Fallback logic to default collect() is implemented for error scenarios (e.g., S3 write failure), ensuring reliability.

Why are the changes needed?

The default RDD.collect() behavior places the burden of materializing all partition results on the driver, which may lead to OOM errors when collecting large datasets. By offloading partition results to S3 during task execution and streaming them back in the driver, we significantly reduce the driver's memory footprint.

This is especially helpful for:

  • Collecting very large RDDs (e.g., tens of millions of records).
  • Environments with memory-constrained drivers.
  • Scenarios where partial failure resilience (via fallback) is desirable.

Does this PR introduce any user-facing change?

Yes.

This PR introduces three new user-facing Spark configuration properties:

  • spark.rdd.collect.offloadToS3.enabled (default: false)
  • spark.rdd.collect.s3.path (no default; must be explicitly set)
  • spark.rdd.collect.s3.cleanup (default: true)

If offloading is enabled and properly configured, the driver no longer receives all partitions' data in memory directly from the executors.

How was this patch tested?

This feature was tested via a custom multi-phase validation suite using spark-shell, structured as follows:

  • Basic Functional Tests: Validate offloading behavior with small RDDs and toggle settings (offload, cleanup).
  • Edge Cases:
    • Empty RDDs
    • RDDs with case class objects (serialization validation)
    • Disabling offloading to check fallback behavior
  • Error Handling:
    • Simulate misconfigured or inaccessible S3 paths to verify safe fallback to default collect() logic.

Testing included checking:

  • MinIO (S3) for session directory creation and cleanup
  • Logs for correct phase transitions and fallback messages
  • Collection result accuracy using assert()s on output

Full testing guide is included in project documentation for reproducibility.

Was this patch authored or co-authored using generative AI tooling?

No.

@Rafig9501 Rafig9501 changed the title added s3 offloading technique for RDD Added s3 offloading technique for RDD May 28, 2025
@github-actions github-actions bot added the CORE label May 28, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant