[RayDP 2.0] Support Spark 4.1, Java 17, and Scala 2.13#460
[RayDP 2.0] Support Spark 4.1, Java 17, and Scala 2.13#460rexminnis wants to merge 16 commits intoray-project:masterfrom
Conversation
- Upgrade core POMs to Spark 4.1.1, Java 17, Scala 2.13.12
- Add spark411 shim module (SparkShims411, Spark411Helper, Spark411SQLHelper)
with proper TaskContextImpl, ArrowConverters, and ClassicSparkSession APIs
- Implement toDataFrame for Arrow batch deserialization via shim layer
- Add RayCoarseGrainedExecutorBackend for Spark 4.1 executor backend
- Disable legacy shim modules (spark322, spark330, spark340, spark350)
- Add pyproject.toml (PEP 517) and simplify setup.py
- Update CI workflow for Spark 4.1.1
a106075 to
8e9fe61
Compare
- Add 13-test ScalaTest suite validating shim descriptor, SPI loading, Arrow schema mapping, and Arrow round-trip conversions (including nulls, timestamps, decimals, empty DataFrames, and multi-batch) - Fix bug in Spark411SQLHelper.toArrowBatchRdd where df.schema was captured inside mapPartitions closure causing CANNOT_INVOKE_IN_TRANSFORMATIONS on Spark 4.1 - Bump Scala from 2.13.12 to 2.13.17 to match Spark 4.1.1
Spark 4.1.1's BlockInfoManager requires tasks to be registered via
registerTask() before they can acquire read locks. getRDDPartition was
creating a dummy TaskContext without registering, causing an NPE in
lockForReading when converting DataFrames to Ray Datasets.
- Generate unique taskAttemptIds via AtomicLong (starting at 1000000L)
to avoid collisions with real Spark tasks and concurrent calls
- Register task with BlockManager before block access
- Release all locks and unset TaskContext in finally block
- Add regression test confirming the NPE without registration
- Add lifecycle test validating register/read/cleanup flow
- Default Arrow IPC zstd compression (spark.sql.execution.arrow.compression.codec) - Pre-size ByteArrayOutputStream in getRDDPartition using BlockResult.bytes() - Read arrowUseLargeVarTypes from SQLConf instead of hardcoding false
- Bump Ray compile-time dependency from 2.34.0 to 2.47.1 (latest on Maven Central)
RayDP 2.0 only targets Spark 4.1 These modules were already excluded from the build but cluttered the source tree.
The old fromSparkRDD/RayDatasetRDD pipeline produced bytes blocks and was already @deprecated with zero callers. Remove it along with all supporting dead code: _convert_by_rdd (Python), RayDatasetRDD.scala, ObjectRefHolder, ObjectStoreWriter instance class, and the wasteful ray.get(blocks[0]) type-check in ray_dataset_to_spark_dataframe.
df.sql_ctx is deprecated in PySpark 4.1 and emits warnings on every access. All usages followed the pattern df.sql_ctx.sparkSession.X which simplifies to df.sparkSession.X. Also remove the dead _wrapped guard (Spark 3.x internal) and stale TODO/comments in RayAppMaster for fractional resources that are already supported.
Eliminates the Arrow→pandas→Arrow round-trip in _convert_by_udf by switching from mapInPandas to mapInArrow (stable since Spark 3.4). Data stays in Arrow format throughout: pa.concat_tables + to_batches yields RecordBatches directly with no serialization overhead.
…removal - Default Arrow compression codec from zstd to lz4 (faster for intra-cluster) - Set maxRecordsPerBatch=0 (unlimited) since RayDP converts whole partitions - Skip pa.concat_tables when partition has a single block (avoids copy) - Use _ray_global_worker to avoid deprecation warning in get_locations - Remove dead RecordPiece/RayObjectPiece classes and unused pandas import
double-copy in Arrow transfers Ray→Spark: batch actor lookups and data fetches into two ray.get() calls instead of 2N sequential calls, letting Ray pipeline transfers. Spark→Ray: replace ByteArrayOutputStream + toByteArray (two full copies of partition data) with a single pre-allocated byte[] and direct System.arraycopy (one copy), halving memory and CPU overhead in getRDDPartition.
Ray fetching in Spark→Ray conversion Instead of blocking on rdd.count() until ALL partitions are materialized before submitting any Ray fetch tasks, run materialization in a background thread and poll BlockManager for completed partitions. As each partition becomes available, its Ray fetch task is dispatched immediately — overlapping Spark computation with Ray data transfer. JVM: add StreamingRecoverableRDD handle with getReadyPartitions() that queries BlockManager.blockIdsToLocations for newly cached blocks. Python: from_spark_recoverable now polls the handle in a loop, dispatching fetch tasks incrementally as partitions complete.
7f8049f to
1b12991
Compare
- Quote python-version strings in matrix to prevent YAML float
parsing (3.10 → 3.1)
- Remove stale dependency pins incompatible with Python 3.12
(numpy<1.24, pydantic<2.0, click<8.3, tensorflow==2.13.1)
- Skip TF and XGBoost tests until their estimators are updated
- Use PEP 517 build (python -m build) instead of setup.py bdist_wheel
- Update GitHub Actions to current versions (checkout v4,
setup-python v5, setup-java v4, cache v4)
08bdfe9 to
2e10492
Compare
2.53.0) Replace deprecated ray.air.session and ray.air.config imports with ray.train equivalents. Remove TorchCheckpoint, TensorflowCheckpoint, and XGBoostCheckpoint in favor of direct model loading. Rewrite XGBoost estimator from declarative to functional API with train_loop_per_worker and RayTrainReportCallback.
2e10492 to
103d81f
Compare
|
@pang-wu @carsonwang - when you have a moment can I please get a review. |
|
Thanks for the PR, but I am not sure about the direction this PR is pointing to. It is better to split this into 3 different PRs:
|
|
Thanks for the feedback @pang-wu! Happy to split this up — I agree the PR is large. The driving motivation behind this work is that RayDP needs to modernize to stay relevant — Spark 4.x is Java 17-only, Scala 2.12 is EOL, and the I took a look at #450 and it seems like we've converged on a very similar approach for the core Spark 4.x shim (JSON4S → Jackson,
I can split the Spark 4.x core changes out of this PR into a focused PR so it's easier to review. For the other two pieces:
What works best for you? |
|
Converting this to draft per @pang-wu's feedback. Agreed this should be split into focused PRs:
Keeping this as draft for reference — the working code here can inform the individual PRs. |
Closes #459
Overview
This PR introduces RayDP 2.0, a major architectural upgrade migrating the core runtime to Apache Spark 4.1, Java 17, and Scala 2.13. This modernization addresses the removal of Scala 2.12/Java 8 support in Spark 4.x and aligns RayDP with modern infrastructure standards.
Key Changes
1. Core Runtime & Dependencies
4.1.1.Java 17(Strict requirement).2.13.17.2.53.0.2. Build System Overhaul
core/pom.xmlto target Java 17/Scala 2.13.pyproject.tomlfor build configuration.setup.pyto correctly package JARs intoraydp/jarsduringpip install -e ., fixing local development workflows.3. Shim Layer Architecture
raydp-shims-spark411to handle Spark 4.1 API drifts.Spark411HelperandSpark411SQLHelperto bridge package-private Spark APIs (e.g.,TaskContextImpl,ArrowConverters) that are no longer accessible directly.4. Code Migration
scala.collection.JavaConverters->scala.jdk.CollectionConverters.RayCoarseGrainedSchedulerBackendto handle removed APIs (e.g.,Utils.sparkJavaOpts).toDataFramein the new shim layer to supportray.data.from_sparkandds.to_spark.5. ML Estimator Migration to Ray Train V2 API
Migrated all ML estimators (Torch, TensorFlow, XGBoost) to the Ray Train V2 API, which is enabled by default in Ray 2.53.0. The deprecated
ray.airmodule is no longer used.ray.air.sessionandray.air.configimports withray.trainequivalents. Removed deprecatedTorchCheckpointin favor of directstate_dictloading.ray.air.sessionimports, removed deprecatedTensorflowCheckpoint, and droppedMultiWorkerMirroredStrategywhich is incompatible with Keras 3 (ray-project/ray#47464). Each Ray worker now trains independently on its data shard. Updated model save/load to use the.kerasformat (Keras 3 requirement).train_loop_per_workerAPI withRayTrainReportCallback. Removed deprecatedXGBoostCheckpointin favor of directxgboost.Boosterloading.Verification
A comprehensive sanity check suite (
examples/test_raydp_sanity.py) was added and verified locally on macOS (M1/M2).Verified Capabilities:
count,range).Breaking Changes
ray.airimports are no longer used.