Yet Another SPark Framework
An easy and lightweight tool for data engineering process built on top of Apache Spark for ETL/ELT process.
Yasp was created to help data engineers working with Apache Spark to reduce their pipeline development time by using a no-code/less-code approach.
With Yasp you can configure an ETL/ELT job that fetch data from a multiple source execute multiple transformation and write in multiple destination
It is written in Scala (2.12.15) on top of Apache Spark 3.3.0 and managed as an SBT (1.4.9) multi module project.
Yasp comes with the following modules:
- YaspApp provide the highest possible level of abstraction for your ETL job and come with an executable main class. Allow you to manage complex big data etl job with a simple yml file
- YaspService provide all the yasp ops for your ETL job.
- YaspCore provide spark primitives useful for yasp operations.
Only Apache Spark 3.x versions are supported
- JDK 8 Installed
- SBT Installed
- Windows user only: This is an Apache Spark based framework to run it locally you should configure the hadoop winutils on your laptop. Check here for more details.
There are no packages already built and distributed that you can download and use. You have to build your yasp package
Yasp can be built with a bundled version of spark and all required dependencies (delta, iceberg, etc.). The result of this build is a FAT jar that could be executed locally or in your cluster.
However, it is possible to build yasp in light mode, this excludes all the spark required dependencies. If you're planning to use it with the light package be sure that yasp can the required libraries on your local machine or cluster.
FAT package
- Clone:
git clone https://github.com/giucris/yasp.git
- Build:
sbt -Dyasp.spark.version=3.3.0 assembly
- Go to
yasp-app/target/scala2.12/
folder and you can find an executable jar fileyasp-app-spark-3.3.0-0.0.1.jar
LIGHT package
- Clone:
git clone https://github.com/giucris/yasp.git
- Build:
sbt -Dyasp.spark.version=3.3.0 -Dyasp.build.type=LIGHT assembly
- Go to
yasp-app/target/scala2.12/
folder and you can find an executable jar fileyasp-app-light-spark-3.3.0-0.0.1.jar
To run a yasp app you have to provide a single yaml file (json is fine too), that basically contain all your Extract Transform and Load task.
A yasp.yaml is a file that define a YaspExecution
. A YaspExecution
is a model that define an e2e ETL/EL job in terms
of Session
and YaspPlan
. A YaspPlan
is a model that define a list of YaspAction
.
For example:
# Session field
session:
kind: local #Define a local spark session. Use distributed for non local execution.
name: example-app #Define a Spark Appcliation name.
# YaspPlan field
plan:
# List of all actions
actions:
- id: csv_load # Id of the source action.
dataset: my_csv # Name of the dataset
source: # Source field, will contains all the configuration to read the data
format: csv # Standard Spark format
options: # Standard Spark format options
header: 'true'
path: path/to/input/csv/
- id: filter_csv # Id of the process action.
dataset: my_csv_filtered # Name of the dataset
process: # Process field, will contains all the Process configuration to transform the data
query: >- # A sql process configuration
SELECT *
FROM my_csv
WHERE id=1
dependsOn: # Depends on to enforce dependency
- csv_load
- id: sink_data # Id of the sink action.
dataset: my_csv_filtered # Name of the dataset to sink.
dest: # Destination field, contains all the Dest configuration to write the data
format: csv # Standard Spark format
options: # Standard Spark format options
header: 'true'
path: path/to/out/csv/
dependsOn:
- filter_csv # Depends on to enforce dependency
Take a look to the detailed user documentation for Session, YaspExecution , YaspPlan
Yasp support all the Apache Spark Source and Destination with the addition of Iceberg Table and Delta Table. Yasp support only Spark SQL process.
NB There are no specific Dest for Iceberg and Delta table. Any write action should be described as SQL process.
NB: You have to build yasp before using it, please follow the relative section
You can use Yasp in three different way.
Execute an ETL/EL
- Create a yasp.yaml file.
- [OPTIONAL] Execute a dry-run:
java -jar yasp-app-spark-x.y.z-i.j.k.jar --file <PATH_TO_YASP_YAML_FILE> --dry-run
- run:
java -jar yasp-app-spark-x.y.z-i.j.k.jar --file <PATH_TO_YASP_YAML_FILE>`
DRY RUN The dry-run does not execute any Spark action, it just provide to you the YaspPlan that will be executed.
- Create a yasp.yaml file and make it available for your cluster
- Then run yasp as main class for your spark submit:
spark-submit --class it.yasp.app.Yasp yasp-app-spark-x.y.z-i.j.k.jar --file yasp.yaml
Add the yasp-app reference to your dependencies into the build.sbt
or pom.xml
build and start using it in your code.
object MyUsersByCitiesReport {
def main(args: Array[String]): Unit = {
YaspApp.fromYaml(
s"""
|session:
| kind: Local
| name: example-app
| conf: {}
|plan:
| actions:
| - id: users
| source:
| format: csv
| options:
| path: users.csv
| - id: addresses
| source:
| format: json
| options:
| path: addresses.jsonl
| - id: cities
| source:
| format: jdbc
| options:
| url: my-db-url
| user: ${db_username}
| password: ${db_password}
| dbTable: my-table
| - id: user_with_address
| process:
| query: >-
| SELECT u.name,u.surname,a.address,a.city,a.country
| FROM users u
| JOIN addresses a ON u.id = a.user_id
| JOIN cities c ON a.city_id = c.id
| - id: user_with_address
| dest:
| format: parquet
| options:
| path: user_with_address
| partitionBy:
| - country_id
|""".stripMargin
)
}
}
object MyUsersByCitiesReport {
def main(args: Array[String]): Unit = {
YaspService().run(
YaspExecution(
session = Session(Local, "my-app-name", Map.empty),
plan = YaspPlan(
actions = Seq(
YaspSource("1",users", Source.Format("csv",options=Map("path"-> "users/","header" -> "true")), partitions = None, cache = None),
YaspSource("2","addresses", Source.Format("json",options=Map("path"->"addresses/")), partitions = None, cache = None),
YaspSource("3","cities", Source.Format("parquet",options=Map("path"->"cities/", "mergeSchema" ->"false")), partitions = None, cache = None),
YaspProcess("4","users_addresses", Sql("SELECT u.*,a.address,a.city_id FROM users u JOIN addresses a ON u.address_id=a.id"), partitions = None, cache = None),
YaspProcess("5","users_addresses_cities", Sql("SELECT u.*,a.address,a.city_id FROM users_addresses u JOIN cities c ON u.city_id=c.id"), partitions = None, cache = None),
YaspProcess("6","users_by_city", Sql("SELECT city,count(*) FROM users_addresses_cities GROUP BY city"), partitions = None, cache = None),
YaspSink("7","users_by_city", Dest.Format("parquet",Map("path"->s"user-by-city"), partitionBy=Seq("city")))
)
)
)
)
}
}
- Add BuiltIn Processor
- Add support for Hudi as Source/Dest
- Add data quality process
- Add Structured Streaming Execution
See relative file for more information:
Yasp was originally created just for fun, to avoid repeating my self, and to help data engineers (I am one of them) working with Apache Spark to reduce their pipeline development time.
I rewrote it from scratch and make it available to the open source community.