12. Parallel Script

We can parallelize the OCR process over images then combine the results using python multiprocessing package. In the script, we specify the number of cores to use in parallel with ncore variable. Remember to follow community guidelines and limit the number of cores to 12 and memory to 320 GB on the interactive yen1 (and JupyterHub on this yen) and to 48 cores and 192 GB of RAM on yen2-5 and their JupyterHubs. We will also connect to the same yen to monitor our resource usage while the script is running.

Let’s look at ocr-parallel.py script that processes the images in parallel.

##############################################################
### Activate conda env ocr and run the script
#     module load anaconda3
#     source activate /zfs/gsb/intermediate-yens/conda/ocr
#     python ocr-parallel.py
##############################################################
import numpy as np
import pandas as pd
from PIL import Image
import pytesseract # OCR with Tesseract
import glob, os, shutil, csv, json, time
from os import makedirs
from os.path import exists, isdir
from datetime import datetime
from tqdm import tqdm   # progress bar
from multiprocessing import Pool # for parallel run

# start the timer
tmp = time.time()

# store OCR results
out_path = 'ocr_parallel_out/'

# remove output directory if it already exists
if exists(out_path) and isdir(out_path):
    shutil.rmtree(out_path)

# make empty dir for OCR results
makedirs(out_path)

# make a master df -> CSV file to keep track of what we have already processed

# check if file exists, if it doesn't, make a new CSV file
if not exists(out_path + 'processed_images.csv'):
    # create a new csv file
    with open(out_path + 'processed_images.csv', 'w') as csvfile:
        filewriter = csv.writer(csvfile, delimiter=',', quoting=csv.QUOTE_NONE)
        # write headers
        filewriter.writerow(['image_path', 'image_name', 'ocr_text', 'date'])

# csv exists so read it in
df = pd.read_csv(out_path + 'processed_images.csv')

# path to the data
data_path = '/scratch/darc/intermediate-yens/data/'

# make a list of image paths to OCR
image_paths = glob.glob(data_path + '*.jpg')

tot_images = len(image_paths)

# drop images we already processed
image_paths = [img for img in image_paths if img not in set(df['image_path'])]

print('image number to process: ', len(image_paths))

intermediateFileNum = 100

def process_image(input_path):
    # returns a dictionary of image name, image path, ocr text and processed date
    # to be inserted as a row into the results csv
    return {'image_path' : input_path,
            'image_name' : input_path[32:],
            'ocr_text' : json.dumps(pytesseract.image_to_string(Image.open(input_path)).strip()),
            'date' : datetime.today().strftime("%m/%d/%Y")
            }

# spin up a pool of processes
ncore = 10

# number of image batches
if len(image_paths) % intermediateFileNum == 0:
    nbatches = int(len(image_paths) / intermediateFileNum)
else:
    nbatches = int(len(image_paths) / intermediateFileNum) + 1

# start a pool of processes to OCR images in parallel
p = Pool(processes = ncore)

# make progress bar for the for loop over batches of images
for i in tqdm( range(nbatches) ):
    # i is the batch number
    # write intermediate results while processing all of the files
    start = i * intermediateFileNum

    if i == int(len(image_paths) / intermediateFileNum):
        end = len(image_paths)
    else:
        end = i * intermediateFileNum + intermediateFileNum

    image_paths_batch = image_paths[start: end]

    # loop over images to convert them into text on multiple cores
    # return the dictionaries with results in any order
    for d in p.imap_unordered(process_image, image_paths_batch):
        # add data to df
        df = df.append(d, ignore_index = True)

    # save intermediate results after each batch of images
    df.to_csv(out_path + 'processed_images.csv', index = False)
    print('processed', len(df), 'images out of', tot_images)

p.close()

print('running with %d cores took: %s seconds' % (ncore, str(time.time() - tmp)))

Let’s run the parallel version of the script after activating the conda environment:

$ source activate /zfs/gsb/intermediate-yens/conda/ocr
$ python ocr-parallel.py

While the script is running, ssh to the same yen in a different terminal window and monitor your CPU and memory usage like we did with the serial script:

$ htop -u $USER

which should show multiple python tasks that are spawned by the main python process. The script sets ncore variable to 10.

The output should look like:

image number to process:  1000
  0%|                                                                                                    | 0/10 [00:00<?, ?it/s]processed 100 images out of 1000
 10%|█████████▏                                                                                  | 1/10 [00:02<00:20,  2.23s/it]processed 200 images out of 1000
 20%|██████████████████▍                                                                         | 2/10 [00:04<00:16,  2.12s/it]processed 300 images out of 1000
 30%|███████████████████████████▌                                                                | 3/10 [00:06<00:14,  2.12s/it]processed 400 images out of 1000
 40%|████████████████████████████████████▊                                                       | 4/10 [00:08<00:12,  2.11s/it]processed 500 images out of 1000
 50%|██████████████████████████████████████████████                                              | 5/10 [00:10<00:10,  2.11s/it]processed 600 images out of 1000
 60%|███████████████████████████████████████████████████████▏                                    | 6/10 [00:12<00:08,  2.08s/it]processed 700 images out of 1000
 70%|████████████████████████████████████████████████████████████████▍                           | 7/10 [00:14<00:06,  2.09s/it]processed 800 images out of 1000
 80%|█████████████████████████████████████████████████████████████████████████▌                  | 8/10 [00:16<00:04,  2.10s/it]processed 900 images out of 1000
 90%|██████████████████████████████████████████████████████████████████████████████████▊         | 9/10 [00:18<00:02,  2.10s/it]processed 1000 images out of 1000
100%|███████████████████████████████████████████████████████████████████████████████████████████| 10/10 [00:21<00:00,  2.11s/it]
running with 10 core(s) took: 35.90546727180481 seconds

With 10 cores, the run took 36 seconds as opposed to 183 seconds with 1 core.

Check that the resulting csv table and the number of rows are as expected.

Look at the result:

$ wc -l ocr_parallel_out/processed_images.csv   # should return 1001 
$ cat ocr_parallel_out/processed_images.csv


Parallel Example with Command Line Arguments

We can modify the code to pass command line arguments so we do not have to hard code them in the script. For example, we want to vary the number of cores, the number of images in a batch for intermediate save, the path to the data and the path where to store results, so it makes sense to make these four variables command line arguments instead. The user will supply those when she executes the python script.

Take a look at the script called ocr-parallel-command-line-args.py.

##############################################################
### Activate conda env ocr and run the script
# The script accepts four arguments:
#            path to the data
#            out path to write the results
#            number of cores to run the script in parallel
#            number of images in a batch to save intermediate results
##############################################################
# Execute the script with:
#    module load anaconda3
#    source activate /zfs/gsb/intermediate-yens/conda/ocr
#    python ocr-parallel-command-line-args.py /scratch/darc/intermediate-yens/data/ ocr_parallel_args_out/ 10 100
##############################################################
# import sys library (needed for accepted command line args)
import sys
import numpy as np
import pandas as pd
from PIL import Image
import pytesseract # OCR with Tesseract
import glob, os, shutil, csv, json, time
from os import makedirs
from os.path import exists, isdir
from datetime import datetime
from tqdm import tqdm   # progress bar
from multiprocessing import Pool # for parallel run

# start the timer
tmp = time.time()

# path to the data is the first command line argument
data_path = sys.argv[1]

# path to the out folder is the second command line argument
out_path = sys.argv[2]

# number of images in a batch is the fourth argument
intermediateFileNum = int(sys.argv[4])

# remove output directory if it already exists
if exists(out_path) and isdir(out_path):
    shutil.rmtree(out_path)

# make empty dir for OCR results
makedirs(out_path)

# check if file exists, if it doesn't, make a new CSV file
if not exists(out_path + 'processed_images.csv'):
    # create a new csv file
    with open(out_path + 'processed_images.csv', 'w') as csvfile:
        filewriter = csv.writer(csvfile, delimiter=',', quoting=csv.QUOTE_NONE)
        # write headers
        filewriter.writerow(['image_path', 'image_name', 'ocr_text', 'date'])

# csv exists so read it in
df = pd.read_csv(out_path + 'processed_images.csv')

# make a list of image paths to OCR
image_paths = glob.glob(data_path + '*.jpg')
tot_images = len(image_paths)

# drop images we already processed
image_paths = [img for img in image_paths if img not in set(df['image_path'])]
print('image number to process: ', len(image_paths))

def process_image(input_path):
    # returns a dictionary of image name, image path, ocr text and processed date
    # to be inserted as a row into the results csv
    return {'image_path' : input_path,
           'image_name' : input_path[32:],
           'ocr_text' : json.dumps(pytesseract.image_to_string(Image.open(input_path)).strip()),
            'date' : datetime.today().strftime("%m/%d/%Y")
            }

ncore = int(sys.argv[3])

# number of image batches
if len(image_paths) % intermediateFileNum == 0:
    nbatches = int(len(image_paths) / intermediateFileNum)
else:
    nbatches = int(len(image_paths) / intermediateFileNum) + 1

# start a pool of processes to OCR images in parallel
p = Pool(processes = ncore)

# make progress bar for the for loop over batches of images
for i in tqdm( range(nbatches) ):
    # i is the batch number
    # write intermediate results while processing all of the files
    start = i * intermediateFileNum

    if i == int(len(image_paths) / intermediateFileNum):
        end = len(image_paths)
    else:
        end = i * intermediateFileNum + intermediateFileNum

    image_paths_batch = image_paths[start: end]

    # loop over images to convert them into text on multiple cores
    # return the dictionaries with results in any order
    for d in p.imap_unordered(process_image, image_paths_batch):
        # add data to df
        df = df.append(d, ignore_index = True)
    # save intermediate results after each batch of images
    df.to_csv(out_path + 'processed_images.csv', index = False)
    print('processed', len(df), 'images out of', tot_images)

p.close()

print('running with %d cores took: %s seconds' % (ncore, str(time.time() - tmp)))

Run with:

$ python ocr-parallel-command-line-args.py /scratch/darc/intermediate-yens/data/ ocr_parallel_args_out/ 10 100

And monitor CPU usage with htop -u $USER in the second terminal.

Look at the result:

$ wc -l ocr_parallel_args_out/processed_images.csv
$ cat ocr_parallel_args_out/processed_images.csv


Parallel Run via Slurm

Finally, let’s create a submission slurm script that will run our parallel python script with the command line arguments. Since this code is multithreaded, we can give slurm a hint in the submission script.

Look at the ocr-parallel.slurm submission script and edit to include your email address:

#!/bin/bash

# Example of running parallel python script in a batch mode

#SBATCH -J ocr
#SBATCH -p normal
#SBATCH -c 10                           # CPU cores (up to 256)
#SBATCH -t 30:00
#SBATCH -o ocr-parallel-%j.out
#SBATCH --mail-type=ALL
#SBATCH --mail-user=your_email@stanford.edu
#SBATCH --hint=multithread

# For safety, we deactivate any conda env that might be activated on interactive yens before submission and purge all loaded modules
source deactivate
module purge

# Load software
module load anaconda3

# Activate the environment
source activate /zfs/gsb/intermediate-yens/conda/ocr

# Run python script
python ocr-parallel-command-line-args.py /scratch/darc/intermediate-yens/data/ ocr_parallel_args_out_slurm/ 10 100

Submit the slurm script to the yen-slurm server:

$ sbatch ocr-parallel.slurm

Monitor the queue of jobs:

$ squeue
             JOBID PARTITION     NAME     USER ST       TIME  NODES NODELIST(REASON)
             40417    normal      ocr nrapstin  R       0:02      1 yen11

or with additional columns:

$ squeue -o "%.18i %.9P %.8j %.8u %.8T %.10M %.10l %.4C %.7m %.15R"

While the job is running, you can monitor its progress with:

$ tail -f ocr-parallel-*.out

After the job is finished, look at the output and the emails from slurm:

$ cat ocr-parallel-*.out
$ wc -l ocr_parallel_args_out_slurm/processed_images.csv
$ cat ocr_parallel_args_out_slurm/processed_images.csv

Last modification to the slurm script is to use environment variable $SLURM_CPUS_PER_TASK for the number of cores so you only have to specify how many CPU cores to run on in the #SBATCH -c line and pass that number to the python script below as an argument.

Look at the slurm file called ocr-parallel-final.slurm and edit to include your email address:

#!/bin/bash

# Example of running parallel python script in a batch mode

#SBATCH -J ocr
#SBATCH -p normal
#SBATCH -c 10                           # CPU cores (up to 256)
#SBATCH -t 30:00
#SBATCH -o ocr-parallel-%j.out
#SBATCH --mail-type=ALL
#SBATCH --mail-user=your_email@stanford.edu
#SBATCH --hint=multithread

# For safety, we deactivate any conda env that might be activated on interactive yens before submission and purge all loaded modules
source deactivate
module purge

# Load software
module load anaconda3

# Activate the environment
source activate /zfs/gsb/intermediate-yens/conda/ocr

# Run python script
python ocr-parallel-command-line-args.py /scratch/darc/intermediate-yens/data/ out_parallel_final/ ${SLURM_CPUS_PER_TASK} 100

Submit and monitor:

$ sbatch ocr-parallel-final.slurm
$ squeue -o "%.18i %.9P %.8j %.8u %.8T %.10M %.10l %.4C %.7m %.15R"

After the job is finished, look at the output to make sure the results are as expected.