Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 6 additions & 46 deletions .github/workflows/local-setup-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,61 +11,21 @@ jobs:
- uses: actions/setup-java@v4
with:
distribution: "zulu"
java-version: "11"
java-version: "17"
- name: Install poetry
run: pipx install poetry
- uses: actions/setup-python@v5
with:
python-version: "3.11"
python-version: "3.13"
cache: "poetry"
- name: Install Python Dependencies
run: |
poetry install
- name: Run local unit tests
run: |
poetry run python -m pytest tests/unit
- name: Run local integration tests
run: |
poetry run python -m pytest tests/integration

windows:
runs-on: windows-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-java@v4
with:
distribution: "zulu"
java-version: "11"
- name: Install poetry
run: pipx install poetry
- uses: actions/setup-python@v5
with:
python-version: "3.11"
cache: "poetry"
- name: Install Hadoop for Windows
# See https://cwiki.apache.org/confluence/display/HADOOP2/WindowsProblems
# that recommends https://github.com/steveloughran/winutils
# that recommends https://github.com/cdarlint/winutils
#
# Setting environement variables: e.g.
# $env:HADOOP_HOME = "$pwd\winutils\hadoop-3.3.5"
# $env:Path += ";$pwd\winutils\hadoop-3.3.5\bin"
# requires a special handling: https://stackoverflow.com/questions/61858388/how-do-i-set-an-enviroment-variable-in-github-action-on-a-windows-server
#
# Reading / Writing to parquet through winutils requires Microsoft Visual C++ 2010 Service Pack 1
# https://stackoverflow.com/questions/45947375/why-does-starting-a-streaming-query-lead-to-exitcodeexception-exitcode-1073741
run: |
choco install vcredist2010
git clone --depth 1 -b master https://github.com/cdarlint/winutils.git
echo "HADOOP_HOME=$pwd\winutils\hadoop-3.3.5" >> $env:GITHUB_ENV
echo ";$pwd\winutils\hadoop-3.3.5\bin" | Out-File -FilePath $env:GITHUB_PATH -Encoding utf8 -Append

- name: Test Hadoop Setup
- name: Run style and type checks
run: |
winutils.exe chmod 777 D:\a\dataengineer-transformations-python\dataengineer-transformations-python
- name: Install Python Dependencies
run: |
poetry install
poetry run ruff format && poetry run ruff check
poetry run mypy --ignore-missing-imports --disallow-untyped-calls --disallow-untyped-defs --disallow-incomplete-defs \
data_transformations tests
- name: Run local unit tests
run: |
poetry run python -m pytest tests/unit
Expand Down
9 changes: 0 additions & 9 deletions .pylintrc

This file was deleted.

8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ These jobs are using _PySpark_ to process larger volumes of data and are suppose

Please make sure you have the following installed and can run them

- Python (3.11.X), you can use for example [pyenv](https://github.com/pyenv/pyenv#installation) to manage your python versions locally
- Python (3.13.X), you can use for example [pyenv](https://github.com/pyenv/pyenv#installation) to manage your python versions locally
- [Poetry](https://python-poetry.org/docs/#installation)
- Java (11), you can use [sdkman](https://sdkman.io/) to install and manage java locally
- Java (17), you can use [sdkman](https://sdkman.io/) to install and manage java locally

#### Windows users

We recommend using WSL 2 on Windows for this exercise, due to the [lack of support](https://cwiki.apache.org/confluence/display/HADOOP2/WindowsProblems) of windows paths from Hadoop/Spark.

Follow instructions on the [Windows official page](https://learn.microsoft.com/en-us/windows/wsl/setup/environment)
Follow instructions on the [Windows official page](https://learn.microsoft.com/en-us/windows/wsl/setup/environment) and then the linux install.

#### Install all dependencies

Expand Down Expand Up @@ -60,7 +60,7 @@ poetry run pytest tests/integration
poetry run mypy --ignore-missing-imports --disallow-untyped-calls --disallow-untyped-defs --disallow-incomplete-defs \
data_transformations tests

poetry run pylint data_transformations tests
poetry run ruff format && poetry run ruff check
```

### Anything else?
Expand Down
8 changes: 5 additions & 3 deletions data_transformations/citibike/distance_transformer.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql import DataFrame, SparkSession

METERS_PER_FOOT = 0.3048
FEET_PER_MILE = 5280
Expand All @@ -10,11 +10,13 @@ def compute_distance(_spark: SparkSession, dataframe: DataFrame) -> DataFrame:
return dataframe


def run(spark: SparkSession, input_dataset_path: str, transformed_dataset_path: str) -> None:
def run(
spark: SparkSession, input_dataset_path: str, transformed_dataset_path: str
) -> None:
input_dataset = spark.read.parquet(input_dataset_path)
input_dataset.show()

dataset_with_distances = compute_distance(spark, input_dataset)
dataset_with_distances.show()

dataset_with_distances.write.parquet(transformed_dataset_path, mode='append')
dataset_with_distances.write.parquet(transformed_dataset_path, mode="append")
6 changes: 5 additions & 1 deletion data_transformations/citibike/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@ def sanitize_columns(columns: List[str]) -> List[str]:

def run(spark: SparkSession, ingest_path: str, transformation_path: str) -> None:
logging.info("Reading text file from: %s", ingest_path)
input_df = spark.read.format("org.apache.spark.csv").option("header", True).csv(ingest_path)
input_df = (
spark.read.format("org.apache.spark.csv")
.option("header", True)
.csv(ingest_path)
)
renamed_columns = sanitize_columns(input_df.columns)
ref_df = input_df.toDF(*renamed_columns)
ref_df.printSchema()
Expand Down
6 changes: 3 additions & 3 deletions jobs/citibike_distance_calculation.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import logging

import sys

from pyspark.sql import SparkSession

from data_transformations.citibike import distance_transformer

LOG_FILENAME = 'project.log'
LOG_FILENAME = "project.log"
APP_NAME = "Citibike Pipeline: Distance Calculation"

if __name__ == '__main__':
if __name__ == "__main__":
logging.basicConfig(filename=LOG_FILENAME, level=logging.INFO)
arguments = sys.argv
print(f"Argument list passed: {arguments}")
Expand Down
8 changes: 4 additions & 4 deletions jobs/citibike_ingest.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
import logging

import sys

from pyspark.sql import SparkSession

from data_transformations.citibike import ingest

LOG_FILENAME = 'project.log'
LOG_FILENAME = "project.log"
APP_NAME = "Citibike Pipeline: Ingest"

if __name__ == '__main__':
if __name__ == "__main__":
logging.basicConfig(filename=LOG_FILENAME, level=logging.INFO)
logging.info(sys.argv)

if len(sys.argv) is not 3:
if len(sys.argv) != 3:
logging.warning("Input source and output path are required")
sys.exit(1)

Expand Down
8 changes: 4 additions & 4 deletions jobs/word_count.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
import logging

import sys

from pyspark.sql import SparkSession

from data_transformations.wordcount import word_count_transformer

LOG_FILENAME = 'project.log'
LOG_FILENAME = "project.log"
APP_NAME = "WordCount"

if __name__ == '__main__':
if __name__ == "__main__":
logging.basicConfig(filename=LOG_FILENAME, level=logging.INFO)
logging.info(sys.argv)

if len(sys.argv) is not 3:
if len(sys.argv) != 3:
logging.warning("Input .txt file and output path are required")
sys.exit(1)

Expand Down
Loading