Skip to content

Commit

Permalink
Merge pull request #5 from pedropark99/transf-part-2
Browse files Browse the repository at this point in the history
Add a second chapter to present Spark DataFrame transformations
  • Loading branch information
pedropark99 authored Jun 18, 2023
2 parents 21e3780 + 86c7395 commit 14f045a
Show file tree
Hide file tree
Showing 41 changed files with 3,395 additions and 581 deletions.
12 changes: 11 additions & 1 deletion Chapters/04-dataframes.qmd
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,16 @@ If you need to, you can easily collect a python list with the column names prese
students.columns
```


## Getting the number of rows

If you want to know the number of rows present in a Spark DataFrame, just use the `count()` method of this DataFrame. As a result, Spark will build this DataFrame, and count the number of rows present in it.

```{python}
students.count()
```


## Spark Data Types

Each column of your Spark DataFrame is associated with a specific data type. Spark supports a large number of different data types. You can see the full list at the official documentation page[^04-dataframes-1]. For now, we will focus on the most used data types, which are listed below:
Expand Down Expand Up @@ -200,7 +210,7 @@ s = StringType()
print(s)
```

## The DataFrame Schema
## The DataFrame Schema {#sec-dataframe-schema}

The schema of a Spark DataFrame is the combination of column names and the data types associated with each of these columns. Schemas can be set explicitly by you (that is, you can tell Spark how the schema of your DataFrame should look like), or, they can be automatically defined by Spark while reading or creating your data.

Expand Down
90 changes: 84 additions & 6 deletions Chapters/05-transforming.qmd
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@

# Transforming your Spark DataFrame
# Transforming your Spark DataFrame - Part 1 {#sec-transforming-dataframes-part1}

## Introduction

Expand All @@ -21,11 +21,13 @@ This means that you always have to transform the initial data that you have, to

Spark DataFrames are **immutable**, meaning that, they cannot be directly changed. But you can use an existing DataFrame to create a new one, based on a set of transformations. In other words, you define a new DataFrame as a transformed version of an older DataFrame.

Basically every `pyspark` program that you write will have such transformations. Spark support many types of transformations, however, in this chapter, we will focus on four basic transformations that you can apply to a DataFrame:
Basically every `pyspark` program that you write will have such transformations. Spark support many types of transformations, however, in this chapter, we will focus on six basic transformations that you can apply to a DataFrame:

- Filtering rows;
- Sorting rows;
- Filtering rows based on a logical condition;
- Selecting a subset of rows;
- Selecting specific columns;
- Adding or deleting columns;
- Sorting rows;
- Calculating aggregates;

Therefore, when you apply one of the above transformations to an existing DataFrame, you will get a new DataFrame as a result. You usually combine multiple transformations together to get your desired result. As a first example, lets get back to the `df` DataFrame:
Expand Down Expand Up @@ -231,7 +233,11 @@ Now, to see the actual data of this DataFrame, we can use the `show()` action as
transf.show(5)
```

As you can see below, this `transf` DataFrame have 2421 rows in total:

```{python}
transf.count()
```

## Filtering rows of your DataFrame

Expand Down Expand Up @@ -479,7 +485,7 @@ transf\
```


### Filtering `null` values (i.e. missing data)
### Filtering `null` values (i.e. missing data) {#sec-filter-null-values}

Sometimes, the `null` values play an important role in your filter. You either want to collect all these `null` values, so you can investigate why they are null in the first place, or, you want to completely eliminate them from your DataFrame.

Expand Down Expand Up @@ -659,6 +665,78 @@ transf\
.show(5)
```



## Selecting a subset of rows from your DataFrame

At some point, you might need to use just a small piece of your DataFrame over the next steps of your pipeline, and not the entire thing. For example, you may want to select just the fisrt (or last) 5 rows of this DataFrame, or, maybe, you need to take a random sample of rows from it.

In this section I will discuss the main methods offered by Spark to deal with these scenarios. Each method returns a subset of rows from the original DataFrame as a result. But each method works differently from the other, and uses a different strategy to retrieve this subset.

### Limiting the number of rows in your DataFrame

The `limit()` method is very similar to the `LIMIT` SQL keyword. It limits the number of rows present in your DataFrame to a specific amount. So, if I run `transf.limit(1)` I get a new DataFrame as a result, which have only a single row from the `transf` DataFrame. As you can see below:

```{python}
single_transfer = transf.limit(1)
single_transfer.show()
```

Is worth mentioning that the `limit()` method will always try to limit your original DataFrame, to the first $n$ rows. This means that the command `df.limit(430)` tries to limit the `df` DataFrame to its first 430 rows.

This also means that 430 is the maximum number of rows that will be taken from the `df` DataFrame. So, if `df` DataFrame has less than 430 lines, like 14 rows, than, nothing will happen, i.e. the result of `df.limit(430)` will be equivalent to the `df` DataFrame itself.


### Getting the first/last $n$ rows of your DataFrame

The methods `head()` and `tail()` allows you to collect the first/last $n$ rows of your DataFrame, respectively. One key aspect from these methods, is that they return a list of `Row` values, instead of a new DataFrame (such as the `limit()` method). You can compare these methods to the `take()` and `collect()` methods that we introduced at @sec-dataframe-actions, because they both produce a list of `Row` values as well.

Now, the `head()` method produce the same output as the `take()` method. However, these two methods work very differently under the hoods, and, are recommended to be used in different scenarios.

More specifically, if you have a big DataFrame (i.e. a DataFrame with many rows) is recommended to use `take()` (instead of `head()`) to collect the first $n$ rows from it. Because the `head()` method makes Spark to load the entire DataFrame into the driver's memory, and this can easily cause an "out of memory" situation for big DataFrames. So, use the `head()` method only for small DataFrames.

In the example below, we are using these methods to get the first and last 2 rows of the `transf` DataFrame:

```{python}
# First 2 rows of `transf` DataFrame:
first2 = transf.head(2)
# Last 2 rows of `transf` DataFrame:
last2 = transf.tail(2)
print(last2)
```

### Taking a random sample of your DataFrame

With the `sample()` you can take a random sample of rows from your DataFrame. In other words, this method returns a new DataFrame with a random subset of rows from the original DataFrame.

This method have three main arguments, which are:

- `withReplacement`: a boolean value indicating if the samples are with replacement or not. Defaults to `False`;
- `fraction`: the fraction of rows you want to sample from the DataFrame. Have to be a positive float value, from 0 to 1;
- `seed`: an integer representing the seed for the sampling process. This is an optional argument;


In the example below, we are trying to get a sample that represents 15% of the original `transf` DataFrame, and using the integer 24 as our sampling seed:

```{python}
transf_sample = transf.sample(fraction = 0.15, seed = 24)
transf_sample.show(5)
```

In other words, the `fraction` argument represents a fraction of the total number of rows in the original DataFrame. Since the `transf` DataFrame have 2421 rows in total, by setting the `fraction` argument to 0.15, we are asking Spark to collect a sample from `transf` that have approximately $0.15 \times 2421 \approx 363$ rows.

If we calculate the number of rows in `transf_sample` DataFrame, we can see that this DataFrame have a number of rows close to 363:

```{python}
transf_sample.count()
```

Furthermore, the sampling seed is just a way to ask Spark to produce the same random sample of the original DataFrame. That is, the sampling seed makes the result sample fixed. You always get the same random sample when you run the `sample()` method.

On the other hand, when you do not set the `seed` argument, then, Spark will likely produce a different random sample of the original DataFrame every time you run the `sample()` method.


## Managing the columns of your DataFrame

Sometimes, you need manage or transform the columns you have. For example, you might need to change the order of these columns, or, to delete/rename some of them. To do this, you can use the `select()` and `drop()` methods of your DataFrame.
Expand Down Expand Up @@ -869,7 +947,7 @@ In the other hand, when we define groups in a DataFrame (by using the `groupby()
This means that each row in the resulting DataFrame describes a specific group in the original DataFrame, and, `agg()` usually produces a DataFrame with more than one single row when its calculations are performed by group. Because our DataFrames usually have more than one single group.


### Calculating aggregates per group in your DataFrame
### Calculating aggregates per group in your DataFrame {#sec-group-by}

But how you define the groups inside your DataFrame? To do this, we use the `groupby()` and `groupBy()` methods. These methods are both synonymous (they do the same thing).

Expand Down
4 changes: 2 additions & 2 deletions Chapters/06-dataframes-sql.qmd
Original file line number Diff line number Diff line change
Expand Up @@ -621,12 +621,12 @@ ORDER BY bill_length_mm
'''
# The same result of the example above
spark.sql(query)
spark.sql(query).show()
```



### Spark functions are usually translated into SQL functions
### Spark functions are usually translated into SQL functions {#sec-sql-expr}

Every function from the `pyspark.sql.functions` module you might use to describe your transformations in python, can be directly used in Spark SQL. In other words, every Spark function that is accesible in python, is also accesible in Spark SQL.

Expand Down
Loading

0 comments on commit 14f045a

Please sign in to comment.