pandas, python,

Parallelize pandas apply using dask and swifter

Posted on Feb 24, 2020 · 5 mins read
Share this

Using Pandas apply function to run a method along all the rows of a dataframe is slow and if you have a huge data to apply thru a CPU intensive function then it may take several seconds also.

In this post we are going to explore how we can partition the dataframe and apply the functions on this partitions using dask and other library and methods for parallel processing like swifter & Vectorization respectively

What is Parallel processing?

Parallel computing is a task where a large chunk of data is divided into smaller parts and processed simultaneously using the modern hardware capability of multiple CPU’s and cores of the machine.

Why Pandas apply function is slow?

The apply function does not take advantage of Vectorization and it returns a new Series and Dataframe objects and that’s the reason with a large dataset running with a for loop under the hood and overhead of IO operations makes it slow

What is Dask?

Dask is a library for parallel computing in Python and it is basically used for the following two tasks:

a) Task Scheduler: It is used for optimizing the task scheduling jobs just like celery, Luigi etc.

b) Store the data in Parallel Arrays, Dataframe and it runs on top of task scheduler

As per Dask Documentation:

Dask emphasizes the following virtues:

  • Familiar: Provides parallelized NumPy array and Pandas DataFrame objects
  • Flexible: Provides a task scheduling interface for more custom workloads and integration with other projects.
  • Native: Enables distributed computing in pure Python with access to the PyData stack.
  • Fast: Operates with low overhead, low latency, and minimal serialization necessary for fast numerical algorithms
  • Scales up: Runs resiliently on clusters with 1000s of cores
  • Scales down: Trivial to set up and run on a laptop in a single process
  • Responsive: Designed with interactive computing in mind, it provides rapid feedback and diagnostics to aid humans

Create a Big Dataframe of 100K rows

Build a dataframe with 100K rows and two columns with values selected randomly between 1 and 1000

import pandas as pd
import numpy as np
df = pd.DataFrame({'X':np.random.randint(1000, size=100000),'Y':np.random.randint(1000, size=100000)})

Function to add the sum of squares of each column

This function will take a dataframe object and will return the sum of square for each column

def add_squares(df):
    return df.x**2+df.y**2

timeit the add_squares function on dataframe

We will apply the add_squares functions on the dataframe and timeit the execution to see how long it takes to calculate the sum of square of columns for each row

%%timeit
df['add_squares']=df.apply(add_squares,axis=1)

7.18 s ± 699 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

It took average around 7.18 secs per loop to apply add_squares method on this dataframe of 100K rows and 2 columns

Let’s see if we can speed it up using the Dask

Parallelize using Dask Map_Partition

In this section we are going to see how to make partitions of this big dataframe and parallelize the add_squares method

We will construct a dask dataframe from pandas dataframe using from_pandas function and specify the number of partitions(nparitions) to break this dataframe into

We will break into 4 parition since my windows laptop is configured with a 4 core and 16GB memory/

import dask.dataframe as dd
ddf = dd.from_pandas(df, npartitions=4)

Next we will apply add_squares method on each of these partitions

%%timeit
ddf ['z'] = ddf.map_partitions(add_squares,meta=(None, 'int64')).compute()

24.3 ms ± 6.91 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

We will use timeit to capture the time it takes to apply the add_sqaure method on each parition

compute() at the end will load the data into memory

How fast is dask map_partition?

So we have seen dask map_paritions apply the add_squares method on this big dataframe in 24.3ms which is like 300X faster than the pandas apply function

Swifter

You can check their documentation to see how to install using conda and pip

As per their documentation it defines Swifter as:

A package which efficiently applies any function to a pandas dataframe or series in the fastest available manner

def add_squares(a,b):
    return a**2+b**2

We have added a new add_squares function which takes two arguments and returns their sum of squares

%%timeit
df ['add_sq']=df.swifter.apply(lambda row:add_squares(row.X,row.Y),axis=1)

23.4 ms ± 3.71 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

Vectorization

As far as possible you should try to vectorize the function and other options like iterrows and looping should be the last resort. Here we will see how can you vectorize the add_squares method

%%timeit
df ['add_squares']=add_squares(df['X'],df['Y'])

5.56 ms ± 980 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

Compare the speed of all methods

We have used four different ways to apply the add_squares method to our 100K dataframe rows

a) Pandas apply

b) Dask map_partition

c) Swifter

d) Vectorization

We will plot all the four timings in a bar graph

Conclusion

It is evident from the above result that Vectorization is a clear winner here which takes the minimum time to apply the add_squares method along the rows of the dataframe.

Dask Map_Partition and Swifter almost takes the same time to apply this method and compute the result for all the rows

So our first choice should be Vectorization and Just in case you are not able to Vectorize your function then you can use Dask map_parition and Swifter by paritioning te dataframe into multiple paritions and then running the function parallely on all these paritions

If you know any other methods or library which can parallelize the computation or run it parallely in Pandas then share it in the comments section below