Intermediate Yens
12. Parallel Script
We can parallelize the OCR process over images then combine the results using python multiprocessing
package.
In the script, we specify the number of cores to use in parallel with ncore
variable. Remember to follow
community guidelines and limit the number of cores to 12 and memory to 320 GB on the interactive yen1 (and JupyterHub on this yen) and to 48 cores and 192 GB of RAM on yen2-5 and their JupyterHubs.
We will also connect to the same yen to monitor our resource usage while the script is running.
Let’s look at ocr-parallel.py
script that processes the images in parallel.
##############################################################
### Activate conda env ocr and run the script
# module load anaconda3
# source activate /zfs/gsb/intermediate-yens/conda/ocr
# python ocr-parallel.py
##############################################################
import numpy as np
import pandas as pd
from PIL import Image
import pytesseract # OCR with Tesseract
import glob, os, shutil, csv, json, time
from os import makedirs
from os.path import exists, isdir
from datetime import datetime
from tqdm import tqdm # progress bar
from multiprocessing import Pool # for parallel run
# start the timer
tmp = time.time()
# store OCR results
out_path = 'ocr_parallel_out/'
# remove output directory if it already exists
if exists(out_path) and isdir(out_path):
shutil.rmtree(out_path)
# make empty dir for OCR results
makedirs(out_path)
# make a master df -> CSV file to keep track of what we have already processed
# check if file exists, if it doesn't, make a new CSV file
if not exists(out_path + 'processed_images.csv'):
# create a new csv file
with open(out_path + 'processed_images.csv', 'w') as csvfile:
filewriter = csv.writer(csvfile, delimiter=',', quoting=csv.QUOTE_NONE)
# write headers
filewriter.writerow(['image_path', 'image_name', 'ocr_text', 'date'])
# csv exists so read it in
df = pd.read_csv(out_path + 'processed_images.csv')
# path to the data
data_path = '/scratch/darc/intermediate-yens/data/'
# make a list of image paths to OCR
image_paths = glob.glob(data_path + '*.jpg')
tot_images = len(image_paths)
# drop images we already processed
image_paths = [img for img in image_paths if img not in set(df['image_path'])]
print('image number to process: ', len(image_paths))
intermediateFileNum = 100
def process_image(input_path):
# returns a dictionary of image name, image path, ocr text and processed date
# to be inserted as a row into the results csv
return {'image_path' : input_path,
'image_name' : input_path[32:],
'ocr_text' : json.dumps(pytesseract.image_to_string(Image.open(input_path)).strip()),
'date' : datetime.today().strftime("%m/%d/%Y")
}
# spin up a pool of processes
ncore = 10
# number of image batches
if len(image_paths) % intermediateFileNum == 0:
nbatches = int(len(image_paths) / intermediateFileNum)
else:
nbatches = int(len(image_paths) / intermediateFileNum) + 1
# start a pool of processes to OCR images in parallel
p = Pool(processes = ncore)
# make progress bar for the for loop over batches of images
for i in tqdm( range(nbatches) ):
# i is the batch number
# write intermediate results while processing all of the files
start = i * intermediateFileNum
if i == int(len(image_paths) / intermediateFileNum):
end = len(image_paths)
else:
end = i * intermediateFileNum + intermediateFileNum
image_paths_batch = image_paths[start: end]
# loop over images to convert them into text on multiple cores
# return the dictionaries with results in any order
for d in p.imap_unordered(process_image, image_paths_batch):
# add data to df
df = df.append(d, ignore_index = True)
# save intermediate results after each batch of images
df.to_csv(out_path + 'processed_images.csv', index = False)
print('processed', len(df), 'images out of', tot_images)
p.close()
print('running with %d cores took: %s seconds' % (ncore, str(time.time() - tmp)))
Let’s run the parallel version of the script after activating the conda environment:
$ source activate /zfs/gsb/intermediate-yens/conda/ocr
$ python ocr-parallel.py
While the script is running, ssh
to the same yen in a different terminal window and monitor your CPU and memory usage like
we did with the serial script:
$ htop -u $USER
which should show multiple python tasks that are spawned by the main python process. The script sets ncore
variable to 10.
The output should look like:
image number to process: 1000
0%| | 0/10 [00:00<?, ?it/s]processed 100 images out of 1000
10%|█████████▏ | 1/10 [00:02<00:20, 2.23s/it]processed 200 images out of 1000
20%|██████████████████▍ | 2/10 [00:04<00:16, 2.12s/it]processed 300 images out of 1000
30%|███████████████████████████▌ | 3/10 [00:06<00:14, 2.12s/it]processed 400 images out of 1000
40%|████████████████████████████████████▊ | 4/10 [00:08<00:12, 2.11s/it]processed 500 images out of 1000
50%|██████████████████████████████████████████████ | 5/10 [00:10<00:10, 2.11s/it]processed 600 images out of 1000
60%|███████████████████████████████████████████████████████▏ | 6/10 [00:12<00:08, 2.08s/it]processed 700 images out of 1000
70%|████████████████████████████████████████████████████████████████▍ | 7/10 [00:14<00:06, 2.09s/it]processed 800 images out of 1000
80%|█████████████████████████████████████████████████████████████████████████▌ | 8/10 [00:16<00:04, 2.10s/it]processed 900 images out of 1000
90%|██████████████████████████████████████████████████████████████████████████████████▊ | 9/10 [00:18<00:02, 2.10s/it]processed 1000 images out of 1000
100%|███████████████████████████████████████████████████████████████████████████████████████████| 10/10 [00:21<00:00, 2.11s/it]
running with 10 core(s) took: 35.90546727180481 seconds
With 10 cores, the run took 36 seconds as opposed to 183 seconds with 1 core.
Check that the resulting csv table and the number of rows are as expected.
Look at the result:
$ wc -l ocr_parallel_out/processed_images.csv # should return 1001
$ cat ocr_parallel_out/processed_images.csv
Parallel Example with Command Line Arguments
We can modify the code to pass command line arguments so we do not have to hard code them in the script. For example, we want to vary the number of cores, the number of images in a batch for intermediate save, the path to the data and the path where to store results, so it makes sense to make these four variables command line arguments instead. The user will supply those when she executes the python script.
Take a look at the script called ocr-parallel-command-line-args.py
.
##############################################################
### Activate conda env ocr and run the script
# The script accepts four arguments:
# path to the data
# out path to write the results
# number of cores to run the script in parallel
# number of images in a batch to save intermediate results
##############################################################
# Execute the script with:
# module load anaconda3
# source activate /zfs/gsb/intermediate-yens/conda/ocr
# python ocr-parallel-command-line-args.py /scratch/darc/intermediate-yens/data/ ocr_parallel_args_out/ 10 100
##############################################################
# import sys library (needed for accepted command line args)
import sys
import numpy as np
import pandas as pd
from PIL import Image
import pytesseract # OCR with Tesseract
import glob, os, shutil, csv, json, time
from os import makedirs
from os.path import exists, isdir
from datetime import datetime
from tqdm import tqdm # progress bar
from multiprocessing import Pool # for parallel run
# start the timer
tmp = time.time()
# path to the data is the first command line argument
data_path = sys.argv[1]
# path to the out folder is the second command line argument
out_path = sys.argv[2]
# number of images in a batch is the fourth argument
intermediateFileNum = int(sys.argv[4])
# remove output directory if it already exists
if exists(out_path) and isdir(out_path):
shutil.rmtree(out_path)
# make empty dir for OCR results
makedirs(out_path)
# check if file exists, if it doesn't, make a new CSV file
if not exists(out_path + 'processed_images.csv'):
# create a new csv file
with open(out_path + 'processed_images.csv', 'w') as csvfile:
filewriter = csv.writer(csvfile, delimiter=',', quoting=csv.QUOTE_NONE)
# write headers
filewriter.writerow(['image_path', 'image_name', 'ocr_text', 'date'])
# csv exists so read it in
df = pd.read_csv(out_path + 'processed_images.csv')
# make a list of image paths to OCR
image_paths = glob.glob(data_path + '*.jpg')
tot_images = len(image_paths)
# drop images we already processed
image_paths = [img for img in image_paths if img not in set(df['image_path'])]
print('image number to process: ', len(image_paths))
def process_image(input_path):
# returns a dictionary of image name, image path, ocr text and processed date
# to be inserted as a row into the results csv
return {'image_path' : input_path,
'image_name' : input_path[32:],
'ocr_text' : json.dumps(pytesseract.image_to_string(Image.open(input_path)).strip()),
'date' : datetime.today().strftime("%m/%d/%Y")
}
ncore = int(sys.argv[3])
# number of image batches
if len(image_paths) % intermediateFileNum == 0:
nbatches = int(len(image_paths) / intermediateFileNum)
else:
nbatches = int(len(image_paths) / intermediateFileNum) + 1
# start a pool of processes to OCR images in parallel
p = Pool(processes = ncore)
# make progress bar for the for loop over batches of images
for i in tqdm( range(nbatches) ):
# i is the batch number
# write intermediate results while processing all of the files
start = i * intermediateFileNum
if i == int(len(image_paths) / intermediateFileNum):
end = len(image_paths)
else:
end = i * intermediateFileNum + intermediateFileNum
image_paths_batch = image_paths[start: end]
# loop over images to convert them into text on multiple cores
# return the dictionaries with results in any order
for d in p.imap_unordered(process_image, image_paths_batch):
# add data to df
df = df.append(d, ignore_index = True)
# save intermediate results after each batch of images
df.to_csv(out_path + 'processed_images.csv', index = False)
print('processed', len(df), 'images out of', tot_images)
p.close()
print('running with %d cores took: %s seconds' % (ncore, str(time.time() - tmp)))
Run with:
$ python ocr-parallel-command-line-args.py /scratch/darc/intermediate-yens/data/ ocr_parallel_args_out/ 10 100
And monitor CPU usage with htop -u $USER
in the second terminal.
Look at the result:
$ wc -l ocr_parallel_args_out/processed_images.csv
$ cat ocr_parallel_args_out/processed_images.csv
Parallel Run via Slurm
Finally, let’s create a submission slurm script that will run our parallel python script with the command line arguments. Since this code is multithreaded, we can give slurm a hint in the submission script.
Look at the ocr-parallel.slurm
submission script and edit to include your email address:
#!/bin/bash
# Example of running parallel python script in a batch mode
#SBATCH -J ocr
#SBATCH -p normal
#SBATCH -c 10 # CPU cores (up to 256)
#SBATCH -t 30:00
#SBATCH -o ocr-parallel-%j.out
#SBATCH --mail-type=ALL
#SBATCH --mail-user=your_email@stanford.edu
#SBATCH --hint=multithread
# For safety, we deactivate any conda env that might be activated on interactive yens before submission and purge all loaded modules
source deactivate
module purge
# Load software
module load anaconda3
# Activate the environment
source activate /zfs/gsb/intermediate-yens/conda/ocr
# Run python script
python ocr-parallel-command-line-args.py /scratch/darc/intermediate-yens/data/ ocr_parallel_args_out_slurm/ 10 100
Submit the slurm script to the yen-slurm server:
$ sbatch ocr-parallel.slurm
Monitor the queue of jobs:
$ squeue
JOBID PARTITION NAME USER ST TIME NODES NODELIST(REASON)
40417 normal ocr nrapstin R 0:02 1 yen11
or with additional columns:
$ squeue -o "%.18i %.9P %.8j %.8u %.8T %.10M %.10l %.4C %.7m %.15R"
While the job is running, you can monitor its progress with:
$ tail -f ocr-parallel-*.out
After the job is finished, look at the output and the emails from slurm:
$ cat ocr-parallel-*.out
$ wc -l ocr_parallel_args_out_slurm/processed_images.csv
$ cat ocr_parallel_args_out_slurm/processed_images.csv
Last modification to the slurm script is to use environment variable $SLURM_CPUS_PER_TASK
for the number of cores
so you only have to specify how many CPU cores to run on in the #SBATCH -c
line and pass
that number to the python script below as an argument.
Look at the slurm file called ocr-parallel-final.slurm
and edit to include your email address:
#!/bin/bash
# Example of running parallel python script in a batch mode
#SBATCH -J ocr
#SBATCH -p normal
#SBATCH -c 10 # CPU cores (up to 256)
#SBATCH -t 30:00
#SBATCH -o ocr-parallel-%j.out
#SBATCH --mail-type=ALL
#SBATCH --mail-user=your_email@stanford.edu
#SBATCH --hint=multithread
# For safety, we deactivate any conda env that might be activated on interactive yens before submission and purge all loaded modules
source deactivate
module purge
# Load software
module load anaconda3
# Activate the environment
source activate /zfs/gsb/intermediate-yens/conda/ocr
# Run python script
python ocr-parallel-command-line-args.py /scratch/darc/intermediate-yens/data/ out_parallel_final/ ${SLURM_CPUS_PER_TASK} 100
Submit and monitor:
$ sbatch ocr-parallel-final.slurm
$ squeue -o "%.18i %.9P %.8j %.8u %.8T %.10M %.10l %.4C %.7m %.15R"
After the job is finished, look at the output to make sure the results are as expected.
Connect with us