GSoC: Partitioning a Dask-DataFrame
June 12, 2021 GSoc Partitioning
Dask DataFrame structure¶
Internally, a Dask DataFrame is split into many partitions, where each partition is one Pandas DataFrame. These DataFrames are split vertically along the index.
There are numerous strategies that can be used to partition Dask DataFrames, which determine how the elements of a DataFrame are separated into each resulting partition. Common strategies to partition Dask DataFrames include using a fixed number of partitions or partitions based on column values.
By default, Dask-DataFrames are partitioned based on the index. Like Pandas DataFrames, Dask-DataFrame indexes can be set using a column. Therefore Dask DataFrames can make use of columns and their order to partition them. We begin by comparing four (non-spatial) partitioning methods.
1. Partitioning a DataFrame using a fixed number of partitions¶
We begin by using a fixed number of partitions. This approach does not consider the internal structure of the data and may not efficiently partition the data.
import pandas as pd
import datetime as dt
import dask
import numpy as np
import dask.dataframe as dd
# Define datetime range - 5 months
ts=pd.date_range("2015-01-01 00:00", " 2015-05-01 23:50", freq="10min")
# Create a DataFrame with four columns based on a range between 0, 100, indexed by time
df = pd.DataFrame(np.random.randint(0,100,size=(len(ts),4)), columns=list('ABCD'))
df.head()
A | B | C | D | |
---|---|---|---|---|
0 | 30 | 4 | 12 | 88 |
1 | 20 | 0 | 83 | 35 |
2 | 53 | 40 | 15 | 45 |
3 | 84 | 25 | 40 | 84 |
4 | 87 | 45 | 14 | 27 |
# Partition DataFrame by fixed number
ddf = dd.from_pandas(df, npartitions=2)
ddf
A | B | C | D | |
---|---|---|---|---|
npartitions=2 | ||||
0 | int64 | int64 | int64 | int64 |
8712 | ... | ... | ... | ... |
17423 | ... | ... | ... | ... |
print(ddf.divisions)
(0, 8712, 17423)
2. Partition based on time¶
If we had a categorical-based column, such as a date-time column and we wanted to perform an expensive groupby operation based on each month. Therefore we may want to partition based on this information.
# Create a DataFrame with four columns based on a range between 0, 100, indexed by time
df = pd.DataFrame(np.random.randint(0,100,size=(len(ts),4)), columns=list('ABCD'), index=ts)
df.head()
A | B | C | D | |
---|---|---|---|---|
2015-01-01 00:00:00 | 43 | 74 | 0 | 70 |
2015-01-01 00:10:00 | 71 | 31 | 95 | 92 |
2015-01-01 00:20:00 | 72 | 76 | 20 | 74 |
2015-01-01 00:30:00 | 11 | 3 | 24 | 44 |
2015-01-01 00:40:00 | 47 | 79 | 25 | 30 |
# Partition DataFrame by fixed number
ddf = dd.from_pandas(df, npartitions=2)
ddf
A | B | C | D | |
---|---|---|---|---|
npartitions=2 | ||||
2015-01-01 00:00:00 | int64 | int64 | int64 | int64 |
2015-03-02 12:00:00 | ... | ... | ... | ... |
2015-05-01 23:50:00 | ... | ... | ... | ... |
Above we aribtrarily defined the number of partitions as 2 but below we have partitioned our data based on the starting month. This is a great improvement if we were interested in queries that make use of this information, such as groupbys. However, if we were interested in groupbys that were based on a specific column, this would not be appropriate.
ddf_time = ddf.repartition(freq='MS') # Month start
# We can check the number of partitons
ddf_time.npartitions
5
# We can check whether the partitions are divided
ddf_time.divisions
(Timestamp('2015-01-01 00:00:00'), Timestamp('2015-02-01 00:00:00'), Timestamp('2015-03-01 00:00:00'), Timestamp('2015-04-01 00:00:00'), Timestamp('2015-05-01 00:00:00'), Timestamp('2015-05-01 23:50:00'))
3. Partition based on a column¶
As discussed, Dask DataFrames partition data based on the index and we can set a particular column as the index. The previous DataFrame only contained integer columns and so it wouldn't make sense to partition based on these columns - npartitions would be approximately equal to the number of rows. To highlight the use case of partitioning based on a column, I add a new categorical column.
def my_user_function1(x):
"""Function to transform numerical column into categorical based on if/elif statements"""
if 0 < x <= 10:
return 'x'
elif 10 < x <= 25:
return 'xx'
elif 25 < x <= 50:
return 'xxx'
# else
return 'xxxx'
# # Create a DataFrame with four columns based on a range between 0, 100, indexed by time
df = pd.DataFrame(np.random.randint(0,100,size=(len(ts),4)), columns=list('ABCD'))
# Apply function to create new column
df['my_cat_col'] = df['A'].apply(my_user_function1)
df.head()
A | B | C | D | my_cat_col | |
---|---|---|---|---|---|
0 | 36 | 37 | 29 | 61 | xxx |
1 | 94 | 19 | 44 | 69 | xxxx |
2 | 28 | 3 | 35 | 43 | xxx |
3 | 86 | 26 | 92 | 2 | xxxx |
4 | 84 | 62 | 63 | 5 | xxxx |
df['my_cat_col'].value_counts()
my_cat_col xxxx 8787 xxx 4287 xx 2634 x 1716 Name: count, dtype: int64
Here we sort and re-index our data. This ensures the DataFrame is partitioned by the unique cols.
# Sort and set our index as our categorical column
df = df.sort_values('my_cat_col').set_index('my_cat_col', drop=False)
Given we have 4 unique values for our categorical column, I want to partition the pandas DataFrame into 4 partitions.
cat_len = len(df['my_cat_col'].unique())
cat_len
4
However, when we try to set the number of partitions as 4, we only get 2! After reading the documentation, according to Dask.dataframe.io, npartitions is optional and:
... depending on the size and index of the dataframe, the output may have fewer partitions than requested.
ddf = dd.from_pandas(df, npartitions=cat_len)
ddf
A | B | C | D | my_cat_col | |
---|---|---|---|---|---|
npartitions=4 | |||||
x | int64 | int64 | int64 | int64 | object |
xx | ... | ... | ... | ... | ... |
xxx | ... | ... | ... | ... | ... |
xxxx | ... | ... | ... | ... | ... |
xxxx | ... | ... | ... | ... | ... |
print(ddf.npartitions)
print(ddf.divisions)
4 ('x', 'xx', 'xxx', 'xxxx', 'xxxx')
The number of partitions are not evenly balanced (one partition contains the majority of the data) and this can have a negative performance on computation, resulting in the the advantages of parallelism and use of Dask effectively lost.
ddf.map_partitions(len).compute()
0 1716 1 2634 2 4287 3 8787 dtype: int64
4. Partition data based on a function¶
We can also partition our data using user provided function
This is handled by map_partitions
which maps the function across every Pandas DataFrame.
# Define a function
def my_user_function2(col1, col2):
"""A function that multiplies column 1 by column 2"""
return col1 * col2
# Apply function lazily to two DataFrame cols
ddf['result'] = dd.map_partitions(my_user_function2, ddf['A'], ddf['B'])
ddf.compute()
A | B | C | D | my_cat_col | result | |
---|---|---|---|---|---|---|
my_cat_col | ||||||
x | 8 | 14 | 82 | 0 | x | 112 |
x | 7 | 61 | 51 | 75 | x | 427 |
x | 3 | 46 | 26 | 56 | x | 138 |
x | 4 | 71 | 8 | 34 | x | 284 |
x | 6 | 1 | 19 | 85 | x | 6 |
... | ... | ... | ... | ... | ... | ... |
xxxx | 98 | 30 | 17 | 68 | xxxx | 2940 |
xxxx | 87 | 96 | 21 | 29 | xxxx | 8352 |
xxxx | 57 | 82 | 34 | 78 | xxxx | 4674 |
xxxx | 78 | 79 | 79 | 7 | xxxx | 6162 |
xxxx | 85 | 70 | 98 | 78 | xxxx | 5950 |
17424 rows × 6 columns
Conclusion¶
Partitioning data involves a re-shuffle
to sort our data along a new index.
In distributed computing, the shuffle is the process of broadcasting all partitions to all workers.
Shuffling the data is necessary when performing sorting, grouping, and indexing operations, because each row needs to be compared to every other row in the entire DataFrame to determine its correct relative position.
Whilst a shuffle is necessary when we need to re-sort our data along a new index, reshuffling is time-expensive and if Pandas can handle (in terms of memory / computation time) the operation without partitioning via Dask, Pandas should be adopted.
Up until now, we have partitioning our data using one-dimensional data. Two or more dimensional data (like spatial data) requires different techniques, which I will discuss over the next few blog posts.