Khalid Mammadov

Parallel processing (Pandas example)

If you find yourself working with Pandas and need process loads of files in parallel then keep reading.

Here I explain how to perform parallel processing in Python in general and Pandas in particular example. As you know Python (CPython) uses single thread to run the program but Python provides multiprocessing module to help us run multiple threads.

Lets put together a simple code that we can run in single and parallel modes.

Code

I have following test files in my local folder. These are some exchange rates from different periods.

https://github.com/datasets/exchange-rates

I am intending to create a code that will process them and filter specific date.

Process single file function:

def process_file(f: Path, filter_string: str) -> pd.DataFrame:
    """
    Process single file:
     Load as Pandas Dataframe and filter my matching filter_string
    """
    df = pd.read_csv(f, header='infer')
    return df[df["Date"] == filter_string]

As you can see it takes file and loads that using Pandas and filters some data.

Sequential

Below function loops exchange rates directory and process files in a sequence:

def process_files_sequentially(p: Path) -> List:
"""
Process files one at a time by looping the given directory
"""
all_dfs = []
for f in p.glob('*.csv'):
df = process_file(f, '1982-01-01')
all_dfs.append(df)

return all_dfs

Parallel

Now, in order to convert this to parallel processing I am going to use Pool class. This class is very easy to use and it provides methods called map and starmap for parallelization. The main difference between two is that map only works on iterable of single objects as builtin map function but in parallel but starmap allows to us to use iterable with multiple arguments (list, tuple etc).

Below is the code:

def process_files_parallel(p: Path) -> List:
"""
Process files in the folder by creating parallel loads
It will create N parallel processes based on CPU count (vertical scaling)
"""
p_args = []
for f in p.glob('*.csv'):
p_args.append([f, '1982-01-01'])

with mpp.Pool() as p:
all_dfs = p.starmap(process_file, p_args)

return all_dfs

So here we loop as in single processing version but this time we build a list of parameters list (List[List]). Then we provide this list of arguments along with function to the startmap method. The way Pool works it creates a "pool" of threads that can be utilized. If we dont provide a number argument to the Pool class it will use number of CPUs we have got as an argument and will create that amount of threads. So, in my example I have got 4 CPU (core) and it will create the same number of threads for processing. Then it runs our process_file function for the first four files from the list in parallel with arguments and once done will move to the next ones (depending which one finished first).

Below is the test scripts I have run on my local REPL using timeit module for 1000 times to demonstrate how sequential processing version is different from parallel.

Sequential test

import timeit 
timeit.timeit("pp.process_files_sequentially(Path('~/dev/test/exchange-rates/data/').expanduser())", "from parallel_pandas import parallel as pp; from pathlib import Path", number=1000)

Below is the screenshot from my process monitor. As you can see it only uses one CPU (CPU1) on 100% utilization and all others are idling.

Parallel test

In below example I use parallel version of the processing method and run it again 1000 times

import timeit 
timeit.timeit("pp.process_files_parallel(Path('~/dev/test/exchange-rates/data/').expanduser())", "from parallel_pandas import parallel as pp; from pathlib import Path", number=1000)

Here is the screenshot from process monitor while processing

As you can see this time I am making use of all CPU power from my PC and processing data in parallel. This method reduces processing time by factor of your CPU/Cores

Git

Please find below the source code:

https://github.com/khalidmammadov/python_code/tree/master/parallel_pandas