Merging Big Data Sets with Python Dask

Using dask instead of pandas to merge large data sets

If you are running out of memory on your desktop to carry out your data processing tasks, the Yen servers are a good place to try because the Yen{1,2,3,4,5} servers each have at least 1 T of RAM and the Yen-Slurm nodes have 1-3 TB of RAM each, although per Community Guidelines, you should limit memory to 192 GB on most of the interactive yens.

The python package dask is a powerful python package that allows you to do data analytics in parallel which means it should be faster and more memory efficient than pandas. It follows pandas syntax and can speed up common data processing tasks usually done in pandas such as merging big data sets.

Example

We are going to use public EDGAR data from 2013. We will merge two data sets - EDGAR log files from 2013 and financial statements from 2013.

Data download

The Edgar log files are downloaded by grabbing a list of URLs for each log file from the SEC website.

Let’s make data directory where the two datasets will be:

mkdir data
cd data

Here is the URLs for the logs we are going to use in this example (any 10 zip files from 2013 for the logs would do): Save this file to 10_log_list_2013.txt:

www.sec.gov/dera/data/Public-EDGAR-log-file-data/2013/Qtr1/log20130217.zip
www.sec.gov/dera/data/Public-EDGAR-log-file-data/2013/Qtr1/log20130225.zip
www.sec.gov/dera/data/Public-EDGAR-log-file-data/2013/Qtr2/log20130605.zip
www.sec.gov/dera/data/Public-EDGAR-log-file-data/2013/Qtr2/log20130606.zip
www.sec.gov/dera/data/Public-EDGAR-log-file-data/2013/Qtr3/log20130712.zip
www.sec.gov/dera/data/Public-EDGAR-log-file-data/2013/Qtr3/log20130804.zip
www.sec.gov/dera/data/Public-EDGAR-log-file-data/2013/Qtr3/log20130812.zip
www.sec.gov/dera/data/Public-EDGAR-log-file-data/2013/Qtr3/log20130907.zip
www.sec.gov/dera/data/Public-EDGAR-log-file-data/2013/Qtr3/log20130915.zip
www.sec.gov/dera/data/Public-EDGAR-log-file-data/2013/Qtr4/log20131020.zip

Download the data for these 10 URLs by running the following command on the command line:

mkdir raw_edgar_logs2013
cat 10_log_list_2013.txt | xargs -n 1 -P 10 wget -nc -P raw_edgar_logs2013

Now you should have 10 zip files downloaded in raw_edgar_logs2013 directory:

ls -ltrh raw_edgar_logs2013

You should see:

total 1.3G
-rwxrwxr-x 1 nrapstin  97M Nov 13  2017 log20130217.zip
-rwxrwxr-x 1 nrapstin 106M Nov 13  2017 log20130225.zip
-rwxrwxr-x 1 nrapstin 120M Nov 13  2017 log20130606.zip
-rwxrwxr-x 1 nrapstin 122M Nov 13  2017 log20130605.zip
-rwxrwxr-x 1 nrapstin 143M Nov 13  2017 log20130712.zip
-rwxrwxr-x 1 nrapstin  88M Nov 13  2017 log20130804.zip
-rwxrwxr-x 1 nrapstin 136M Nov 13  2017 log20130812.zip
-rwxrwxr-x 1 nrapstin  76M Nov 13  2017 log20130907.zip
-rwxrwxr-x 1 nrapstin  77M Nov 13  2017 log20130915.zip
-rwxrwxr-x 1 nrapstin  79M Nov 13  2017 log20131020.zip

Next, we will combine all 10 logs into one big csv file. Save the following into a new script combine_logs.py.

# Python script to combine 10 EDGAR log files from 2013 into one csv file
import pandas as pd
import numpy as np
from zipfile import ZipFile
import glob

def process_your_file(f):
    '''
    Helper function to read in zip file and
    return a pandas df
    '''
    zip_file = ZipFile(f)

    # get file name
    filename = zip_file.filename[19:-4]

    # read in csv
    df = pd.read_csv(zip_file.open(filename + '.csv'))
    return df

# combine all 2013 logs into one df
files = glob.glob('raw_edgar_logs2013/log2013*.zip')

# make a list of df's to combine
frames = [ process_your_file(f) for f in files ]

# combine all df's in the list
result = pd.concat(frames)

# write out 10 2013 csv files into one csv
# results in a 7.8 G df
result.to_csv('logs2013-10logs.csv')

Run the python script to combine the logs into one csv file which will take about 10 minutes:

python combine_logs.py

The second dataset is financial statments from 2013 that can be downloaded from here. We will also combine them into one csv file.

Similar to the log data, we have a list of URLs that we want to download the data from. Save this file to financial_2013_urls_list.txt:

https://www.sec.gov/files/dera/data/financial-statement-data-sets/2013q1.zip
https://www.sec.gov/files/dera/data/financial-statement-data-sets/2013q2.zip
https://www.sec.gov/files/dera/data/financial-statement-data-sets/2013q3.zip
https://www.sec.gov/files/dera/data/financial-statement-data-sets/2013q4.zip

Then run the following command to download the financial statements from 2013 into financial2013 directory:

mkdir financial2013
cat financial_2013_urls_list.txt | xargs -n 1 -P 12 wget -nc -P financial2013

Now you should have 4 zip files downloaded in financial2013 directory:

ls -ltrh financial2013

You should see:

total 242M
-rwxrwxr-x 1 nrapstin  50M Sep  5  2017 2013q1.zip
-rwxrwxr-x 1 nrapstin  50M Sep  5  2017 2013q2.zip
-rwxrwxr-x 1 nrapstin  47M Sep  5  2017 2013q3.zip
-rwxrwxr-x 1 nrapstin  47M Sep  5  2017 2013q4.zip

Next, we will combine the 4 zip files into one big csv file. Save the following into a new script combine_financial.py.

# Python script to combine EDGAR finacial files from 2013 into one csv file
import pandas as pd
import numpy as np
from zipfile import ZipFile
import glob

# combine all 2013 financial statements into one df
files = glob.glob('financial2013/*.zip')

def process_your_file(f):
    zip_file = ZipFile(f)

    # read in sub.txt for each quarter
    df = pd.read_csv(zip_file.open('sub.txt'), sep='\t')
    return df


# make a list of df's to combine
frames = [ process_your_file(f) for f in files ]

# combine all df's in the list
result = pd.concat(frames)

# write out 2013 finanical data
result.to_csv('finance2013.csv')

Now we should have two data sets that we are going to merge using pandas and dask python packages - logs2013-10logs.csv and finance2013.csv inside data directory.

cd ..
ls -ltrh data/*csv

You should see the output:

-rwxrwxr-x 1 nrapstin operator 7.8G Jan 14 09:33 data/logs2013-10logs.csv
-rwxrwxr-x 1 nrapstin operator 9.4M Jan 14 09:59 data/finance2013.csv

Profiling in python

Before we run any script, let’s talk about profiling.

Profiling memory

We will use memory_profiler python package to generate a memory footprint profile to compare pandas vs dask versions of the code. This profiler will tell you how much memory each line of your script uses. After installing the package, simply add a decorator (@profile) before the function you want to profile in your python script. Then the profile is generated by running python -m memory_profiler my_script.py. If you simply want to profile the entire script, like we do in this example, make your script one big function and profile that function.

Profiling two data sets merge

pandas

We can use pandas to merge the two datasets. Remember to make the whole script a function with a @profile decorator for the memory profiler to run.

Save the following script to pandas-mem-profile.py:

#########################################
# Merge example using pandas
#########################################
@profile
def profile_func():

    # packages
    import numpy as np
    import pandas as pd
    import time

    # read in two data sets to merge
    logdata = pd.read_csv('data/logs2013-10logs.csv')

    # need to convert cik to int - otherwise get wrong merged results
    logdata['cik'] = logdata['cik'].astype('int')

    findata = pd.read_csv('data/finance2013.csv')

    # merge in pandas
    merged = logdata.merge(right=findata, on=['cik'])

    # write the merged data out
    merged.to_csv('data/mem-merged_df.csv')

if __name__ == '__main__':
    profile_func()

dask

Instead, we can use dask to do the same merge. We substitute dask dataframes where we previously used pandas dataframes. Save this to a new file called dask-mem-profile.py.

#########################################
# Merge example using dask
#########################################
@profile
def profile_func():

    # packages
    import numpy as np
    import dask.dataframe as dd
    import time

    # read in two data sets to merge
    logdata = dd.read_csv('data/logs2013-10logs.csv', dtype={'browser': 'object'})

    # need to convert cik to int - otherwise get wrong merged results
    logdata['cik'] = logdata['cik'].astype('int')
    findata = dd.read_csv('data/finance2013.csv')

    # merge in dask
    merged_dd = logdata.merge(right=findata, on=['cik'])

    # write the merged data out
    merged_dd.to_csv('data/mem-merged_dd.csv', single_file = True)

if __name__ == '__main__':
    profile_func()

We are going to run both memory profile scripts on the yen-slurm servers using slurm scheduler. This is a slurm submission script (save it to memory_profile.slurm):

#!/bin/bash

#SBATCH -J profile
#SBATCH -o mem-profile-%j.out
#SBATCH --time=10:00:00
#SBATCH --mem=500G
#SBATCH --cpus-per-task=12
#SBATCH --mail-type=ALL
#SBATCH --mail-user=USER@stanford.edu

module load intel-python3
source activate dask

# memory profile
python -m memory_profiler pandas-mem-profile.py
python -m memory_profiler dask-mem-profile.py

Adding your email address in the slurm script will ensure you get emails when your job starts and finishes with an exit code as well (exit code 0 means everything worked normally and any other number means the code crashed).

Before you can run this script on yen-slurm, let’s install the necessary python packages for the scripts to run. You can load existing python module on the yens then create your conda environment. You only have to create the conda environment once then you just activate it when you want to use it in the future.

To create conda environment to install pandas, dask, and memory_profiler packages into, run:

module load intel-python3

conda create -n dask python=3.8
source activate dask
pip install pandas dask memory_profiler

Now we are ready to submit the profiling scripts. To submit:

sbatch memory_profile.slurm

Then monitor your job with:

squeue

You should see your job running:

           JOBID PARTITION     NAME     USER ST       TIME  NODES NODELIST(REASON)
             21749    normal  profile nrapstin  R       0:07      1 yen10

Once the job finishes, display the out file to see the results of profiling. The name of the out file will have the JOBID that you saw in the output from squeue command.

cat mem-profile-21749.out

You should see something like:

Filename: pandas-mem-profile.py

Line #    Mem usage    Increment  Occurences   Line Contents
============================================================
     4   19.043 MiB   19.043 MiB           1   @profile
     5                                         def profile_func():
     6
     7                                             # packages
     8   33.203 MiB   14.160 MiB           1       import numpy as np
     9   58.402 MiB   25.199 MiB           1       import pandas as pd
    10   58.402 MiB    0.000 MiB           1       import time
    11
    12                                             # read in two data sets to merge
    13 85244.812 MiB 85186.410 MiB           1       logdata = pd.read_csv('data/logs2013-10logs.csv')
    14
    15                                             # need to convert cik to int - otherwise get wrong merged results
    16 85245.402 MiB    0.590 MiB           1       logdata['cik'] = logdata['cik'].astype('int')
    17
    18 85267.191 MiB   21.789 MiB           1       findata = pd.read_csv('data/finance2013.csv')
    19
    20                                             # merge in pandas
    21 149995.457 MiB 64728.266 MiB           1       merged = logdata.merge(right=findata, on=['cik'])
    22
    23                                             # write the merged data out
    24 149995.469 MiB    0.012 MiB           1       merged.to_csv('data/mem-merged_df.csv')


Filename: dask-mem-profile.py

Line #    Mem usage    Increment  Occurences   Line Contents
============================================================
     4   19.027 MiB   19.027 MiB           1   @profile
     5                                         def profile_func():
     6
     7                                             # packages
     8   33.605 MiB   14.578 MiB           1       import numpy as np
     9   67.973 MiB   34.367 MiB           1       import dask.dataframe as dd
    10   67.973 MiB    0.000 MiB           1       import time
    11
    12                                             # read in two data sets to merge
    13   73.801 MiB    5.828 MiB           1       logdata = dd.read_csv('data/logs2013-10logs.csv', dtype={'browser': 'object'})
    14
    15                                             # need to convert cik to int - otherwise get wrong merged results
    16   73.801 MiB    0.000 MiB           1       logdata['cik'] = logdata['cik'].astype('int')
    17   75.332 MiB    1.531 MiB           1       findata = dd.read_csv('data/finance2013.csv')
    18
    19                                             # merge in dask
    20   75.332 MiB    0.000 MiB           1       merged_dd = logdata.merge(right=findata, on=['cik'])
    21
    22                                             # write the merged data out
    23 34784.383 MiB 34709.051 MiB           1       merged_dd.to_csv('data/mem-merged_dd.csv', single_file = True)

The first columm (Line #) lists the line number in the python script, the second column (Mem usage) lists total RAM used while the third column (Increment) lists the additional memory that is added from executing each line of the script. We can compare Increment column values line by line between pandas and dask version of the code to see that it takes roughly half the memory for dask version and that dask does not actually load the datasets in memory until it needs it (at computing the write to csv command).

Profiling speed

Similarly, we want to see how long each line of our script takes. We will use time package to manually time data loading, merging and writing to disk calls. Python has awesome profiling visualization tools that come in handy when you want to understand where your code is slow or fast and optimize the slow parts. One good profile visualization package is SnakeViz package that lets you visualize and interact with the script profile in your browser. Although we will not use it in this example, I encourage you to look it up for your next Python project especially when thinking about optimizing or rewriting your script.

pandas

Again, we use pandas to do all of the data loading, merging and writing to disk. Let’s look at the python script pandas-speed-profile.py:

#########################################
# Merge example using pandas
#########################################
# packages
import numpy as np
import pandas as pd
import time

# read in two data sets to merge
print('loading log data in pandas...')
tmp = time.time()
logdata = pd.read_csv('data/logs2013-10logs.csv', dtype={'browser': 'object'})
print('log data loading took: %s seconds' % (str(time.time() - tmp)))

# need to convert cik to int - otherwise get wrong merged results
logdata['cik'] = logdata['cik'].astype('int')

print('loading finance data in pandas...')
tmp = time.time()
findata = pd.read_csv('data/finance2013.csv')
print('finance data loading took: %s seconds' % (str(time.time() - tmp)))

# merge in pandas
print('merge data sets in pandas...')
tmp = time.time()
merged = logdata.merge(right=findata, on=['cik'])
print('merging datasets took: %s seconds' % (str(time.time() - tmp)))

# write out the merged df
print('writing data sets in pandas...')
tmp = time.time()
merged.to_csv('data/merged_df.csv')
print('writing datasets took: %s seconds' % (str(time.time() - tmp)))

dask

Instead, we can use dask to do the same merge:

#########################################
# Merge example using dask
#########################################
# packages
import numpy as np
import dask.dataframe as dd
import time

# read in two data sets to merge
print('loading log data in dask...')
tmp = time.time()
logdata = dd.read_csv('data/logs2013-10logs.csv', dtype={'browser': 'object'})
print('log data loading took: %s seconds' % (str(time.time() - tmp)))

# need to convert cik to int - otherwise get wrong merged results
logdata['cik'] = logdata['cik'].astype('int')

print('loading finance data in dask...')
tmp = time.time()
findata = dd.read_csv('data/finance2013.csv')
print('finance data loading took: %s seconds' % (str(time.time() - tmp)))

# merge in dask
print('merge data sets in dask...')
tmp = time.time()
merged_dd = logdata.merge(right=findata, on=['cik'])
print('merging datasets took: %s seconds' % (str(time.time() - tmp)))

# write out the merged df
print('writing data sets in dask...')
tmp = time.time()
merged_dd.to_csv('data/merged_dd.csv', single_file=True)
print('writing datasets took: %s seconds' % (str(time.time() - tmp)))

We are going to run both speed profile scripts using the same conda environment dask and the following slurm submission script (save it to speed_profile.slurm):

#!/bin/bash

#SBATCH -J speed-profile
#SBATCH -o speed-profile-%j.out
#SBATCH --time=10:00:00
#SBATCH --mem=500G
#SBATCH --cpus-per-task=12
#SBATCH --mail-type=ALL
#SBATCH --mail-user=USER@stanford.edu

module load intel-python3
source activate dask

# speed profile
python pandas-speed-profile.py
python dask-speed-profile.py

To submit, run:

sbatch speed_profile.slurm

Then monitor your job with:

squeue
           JOBID PARTITION     NAME     USER ST       TIME  NODES NODELIST(REASON)
           24591    normal speed-pr nrapstin  R    1:19:16      1 yen10

Once the job finishes, display the out file to see the results of profiling. The name of the out file will have the JOBID that you saw in the output from squeue command.

cat speed-profile-24591.out

You should see something like:

loading log data in pandas...
log data loading took: 175.0192587375641 seconds
loading finance data in pandas...
finance data loading took: 0.917410135269165 seconds
merge data sets in pandas...
merging datasets took: 633.4607284069061 seconds
writing data sets in pandas...
writing datasets took: 2910.702185153961 seconds
loading log data in dask...
log data loading took: 0.029045820236206055 seconds
loading finance data in dask...
finance data loading took: 0.018599510192871094 seconds
merge data sets in dask...
merging datasets took: 0.03328442573547363 seconds
writing data sets in dask...
writing datasets took: 3696.9374153614044 seconds

Let me summarize both output files in a table below which shows the efficiency of dask over pandas in both speed and memory utilization:

pandas Memory it took Time it took
Loading log data 85 GB 175.0 sec
Loading finance data 22 MB 0.92 sec
Merge data sets 65 GB 633.5 sec
Writing merged data to disk 0.01 MB 2910.7 sec

In comparision:

dask Memory it took Time it took
Loading log data 6 MB 0.03 sec
Loading finance data 1.5 MB 0.02 sec
Merge data sets 0 MB 0.03 sec
Writing merged data to disk 35 GB 3696.9 sec

Finally, I would like to acknowledge Sara Malik and Professor Jung Ho Choi that inspired this dask vs pandas comparison example.