Intermediate Yens
13. Job Arrays
Yahtzee Example
To sample the distribution 100,000 times, we can use the scheduler by submitting 10 jobs, each of which collects 10,000 samples.
We’ll prepare a submission script called yahtzee-array.slurm
, using a job array to submit it to the scheduler 10 times.
#!/bin/bash
# Example of running python script as a job array
#SBATCH -J yahtzee
#SBATCH --array=1-10
#SBATCH -p normal
#SBATCH -c 1 # CPU cores (up to 256 on normal partition)
#SBATCH -t 10:00
#SBATCH -o rollcount-array.csv
#SBATCH --mail-type=ALL
#SBATCH --mail-user=your_email@stanford.edu
# Run python script
python3 yahtzee.py 10000
To submit the script, run:
$ sbatch yahtzee-array.slurm
You should then see 10 jobs in the queue:
$ squeue
JOBID PARTITION NAME USER ST TIME NODES NODELIST(REASON)
44102_1 normal yahtzee nrapstin R 0:01 1 yen10
44102_2 normal yahtzee nrapstin R 0:01 1 yen10
44102_3 normal yahtzee nrapstin R 0:01 1 yen10
44102_4 normal yahtzee nrapstin R 0:01 1 yen10
44102_5 normal yahtzee nrapstin R 0:01 1 yen10
44102_6 normal yahtzee nrapstin R 0:01 1 yen10
44102_7 normal yahtzee nrapstin R 0:01 1 yen10
44102_8 normal yahtzee nrapstin R 0:01 1 yen10
44102_9 normal yahtzee nrapstin R 0:01 1 yen10
44102_10 normal yahtzee nrapstin R 0:01 1 yen10
Each job task produces a csv file with 10,000 lines. After all of the job tasks have completed, we can combine the 10 csv files into one final csv file with 100,000 lines:
$ cat rollcount-array-*csv > rollcount-array.csv
$ wc -l rollcount-array.csv
100000 rollcount-array.csv
OCR Example
Alternative to writing a python script that loops over batches of images, we can run independent processing of image batches concurrently.
To keep it simple, we will run the serial OCR script that will only process a user specified batch of images, getting rid of the for
loop over batches of images in the ocr-serial.py
script.
Take a look at ocr-job-array-task.py
python script:
##############################################################
### Activate conda env ocr and run the script
# The script accepts these arguments:
# path to the data
# out path to write the results
# unique id of the batch of images to process in a job array
##############################################################
# Execute the script with:
# module load anaconda3
# source activate /zfs/gsb/intermediate-yens/conda/ocr
# python ocr-job-array-task.py /scratch/darc/intermediate-yens/data/ job_array_out/ $image_batch_id
##############################################################
# import sys library (needed for accepted command line args)
import sys
import numpy as np
import pandas as pd
from PIL import Image
import pytesseract # OCR with Tesseract
import glob, os, shutil, csv, json, time
from os import makedirs
from os.path import exists
from datetime import datetime
# start the timer
tmp = time.time()
# path to the data is the first command line argument
data_path = sys.argv[1]
# path to the out folder is the second command line argument
out_path = sys.argv[2]
# image batch id is the third argument (aka job array index)
batch_id = int(sys.argv[3])
intermediateFileNum = 100
# make out dir for OCR results
makedirs(out_path, exist_ok = True)
# check if file exists, if it doesn't, make a new CSV file
# we will make a unique csv file for each array task
# then combine them later
if not exists(out_path + 'processed_images' + str(batch_id) + '.csv'):
# create a new csv file
with open(out_path + 'processed_images' + str(batch_id) + '.csv', 'w') as csvfile:
filewriter = csv.writer(csvfile, delimiter=',', quoting=csv.QUOTE_NONE)
# write headers
filewriter.writerow(['image_path', 'image_name', 'ocr_text', 'date'])
# csv exists so read it in
df = pd.read_csv(out_path + 'processed_images' + str(batch_id) + '.csv')
# make a list of image paths to OCR
image_paths = glob.glob(data_path + '*.jpg')
tot_images = len(image_paths)
# drop images we already processed
image_paths = [img for img in image_paths if img not in set(df['image_path'])]
print('image number to process: ', len(image_paths))
# batch_id is the batch number (passed as a command line arg)
# write intermediate results while processing all of the files
# make a temp df to append to result
df_tmp = pd.DataFrame(columns=['image_path', 'image_name', 'ocr_text', 'date'])
start = batch_id * intermediateFileNum
if batch_id == int(len(image_paths) / intermediateFileNum):
end = len(image_paths)
else:
end = batch_id * intermediateFileNum + intermediateFileNum
image_paths_batch = image_paths[start: end]
image_names = []
text_from_ocr = []
# loop over images to convert them into text in batches
for img in image_paths_batch:
text_from_ocr.append( json.dumps(pytesseract.image_to_string(Image.open(img)).strip() ))
image_names.append(img[32:])
# make a pd series for image paths
df_tmp['image_path'] = pd.Series(image_paths_batch, dtype='object')
# make a pd series for image names
df_tmp['image_name'] = pd.Series(image_names, dtype='object')
# make a pd series for OCR results
df_tmp['ocr_text'] = pd.Series(text_from_ocr, dtype='object')
# make a pd series for date
date_list = [datetime.today().strftime("%m/%d/%Y") for i in range(len(image_paths_batch))]
df_tmp['date'] = pd.Series(date_list, dtype='object')
# add data to df
df = df.append(df_tmp)
# save intermediate results after each batch of images
df.to_csv(out_path + 'processed_images' + str(batch_id) + '.csv', index = False)
print('processed', len(df), 'images out of', tot_images)
print('running in serial took: %s seconds' % (str(time.time() - tmp)))
Each task in a job array will run the same python script taking in a different chunk number to process a unique subset of the images and will output a resulting csv file that we will need to combine to get the final table for all images. We will launch all of the job tasks with a job array to process the entire test data set we have been working with all at once (hence the power of job arrays).
We add --array
option to the slurm file and pass each task as a command line argument.
Look at ocr-array.slurm
script:
#!/bin/bash
# Example of running python script as a job array
#SBATCH -J ocr
#SBATCH -p normal
#SBATCH --array=0-9 # define tasks in the array (each task has a unique $SLURM_ARRAY_TASK_ID)
#SBATCH -c 1
#SBATCH -t 30:00
#SBATCH -o ocr-array-%A-%a.out
#SBATCH --mail-type=ALL
#SBATCH --mail-user=your_email@stanford.edu
# For safety, we deactivate any conda env that might be activated on interactive yens before submission and purge all loaded modules
source deactivate
module purge
# Load software
module load anaconda3
# Activate the environment
source activate /zfs/gsb/intermediate-yens/conda/ocr
# Run python script
# $SLURM_ARRAY_TASK_ID variable becomes 0, 1, 2 or whatever you specify in --array option
python ocr-job-array-task-serial.py /scratch/darc/intermediate-yens/data/ job_array_out/ $SLURM_ARRAY_TASK_ID
Note that in this case, we specified slurm option #SBATCH --array=0-9
to run 10 independent tasks in parallel, each
processing 100 images.
Each task is using 1 core. Each task will generate a unique log file ocr-array-%A-%a.out
where %A
is the job ID and
%a
is array task ID.
Submit Job Array to Scheduler
We can now submit our job array slurm script to the scheduler to run the job array on the Yen-Slurm server. It will queue all of the tasks at the same time (some might sit in the queue while others are going to run right away). To submit, run:
$ sbatch ocr-array.slurm
Each task should take about 20 seconds to process 100 images. Then we are going to combine the resulting csv files into one final csv.
Monitor your jobs with watch squeue -u $USER
where $USER
is your SUNet ID.
The queue will look similar to:
JOBID PARTITION NAME USER ST TIME NODES NODELIST(REASON)
43316_[8-9] normal ocr nrapstin PD 0:00 1 (None)
43316_1 normal ocr nrapstin R 0:06 1 yen10
43316_2 normal ocr nrapstin R 0:06 1 yen10
43316_3 normal ocr nrapstin R 0:06 1 yen10
43316_4 normal ocr nrapstin R 0:06 1 yen10
43316_5 normal ocr nrapstin R 0:06 1 yen10
43316_6 normal ocr nrapstin R 0:06 1 yen10
43316_7 normal ocr nrapstin R 0:06 1 yen10
43316_0 normal ocr nrapstin R 0:08 1 yen10
After all of the tasks have finished, make sure the out files look good and each of the array task’s output csv looks correct as well.
$ cat ocr-array*out
The output from the job array tasks should look like:
/var/spool/slurmd/job159883/slurm_script: line 15: deactivate: No such file or directory
/zfs/gsb/intermediate-yens/nrapstin/scripts/ocr-job-array-task-serial.py:99: FutureWarning: The frame.append method is deprecated and will be removed from pandas in a future version. Use pandas.concat instead.
df = df.append(df_tmp)
image number to process: 1000
processed 100 images out of 1000
running in serial took: 21.49007225036621 seconds
/var/spool/slurmd/job159884/slurm_script: line 15: deactivate: No such file or directory
/zfs/gsb/intermediate-yens/nrapstin/scripts/ocr-job-array-task-serial.py:99: FutureWarning: The frame.append method is deprecated and will be removed from pandas in a future version. Use pandas.concat instead.
df = df.append(df_tmp)
image number to process: 1000
processed 100 images out of 1000
running in serial took: 21.055860996246338 seconds
/var/spool/slurmd/job159885/slurm_script: line 15: deactivate: No such file or directory
/zfs/gsb/intermediate-yens/nrapstin/scripts/ocr-job-array-task-serial.py:99: FutureWarning: The frame.append method is deprecated and will be removed from pandas in a future version. Use pandas.concat instead.
df = df.append(df_tmp)
image number to process: 1000
processed 100 images out of 1000
running in serial took: 22.216635942459106 seconds
/var/spool/slurmd/job159886/slurm_script: line 15: deactivate: No such file or directory
/zfs/gsb/intermediate-yens/nrapstin/scripts/ocr-job-array-task-serial.py:99: FutureWarning: The frame.append method is deprecated and will be removed from pandas in a future version. Use pandas.concat instead.
df = df.append(df_tmp)
image number to process: 1000
processed 100 images out of 1000
running in serial took: 22.403629302978516 seconds
/var/spool/slurmd/job159887/slurm_script: line 15: deactivate: No such file or directory
/zfs/gsb/intermediate-yens/nrapstin/scripts/ocr-job-array-task-serial.py:99: FutureWarning: The frame.append method is deprecated and will be removed from pandas in a future version. Use pandas.concat instead.
df = df.append(df_tmp)
image number to process: 1000
processed 100 images out of 1000
running in serial took: 22.728941202163696 seconds
/var/spool/slurmd/job159888/slurm_script: line 15: deactivate: No such file or directory
/zfs/gsb/intermediate-yens/nrapstin/scripts/ocr-job-array-task-serial.py:99: FutureWarning: The frame.append method is deprecated and will be removed from pandas in a future version. Use pandas.concat instead.
df = df.append(df_tmp)
image number to process: 1000
processed 100 images out of 1000
running in serial took: 22.54532027244568 seconds
/var/spool/slurmd/job159889/slurm_script: line 15: deactivate: No such file or directory
/zfs/gsb/intermediate-yens/nrapstin/scripts/ocr-job-array-task-serial.py:99: FutureWarning: The frame.append method is deprecated and will be removed from pandas in a future version. Use pandas.concat instead.
df = df.append(df_tmp)
image number to process: 1000
processed 100 images out of 1000
running in serial took: 21.468422889709473 seconds
/var/spool/slurmd/job159890/slurm_script: line 15: deactivate: No such file or directory
/zfs/gsb/intermediate-yens/nrapstin/scripts/ocr-job-array-task-serial.py:99: FutureWarning: The frame.append method is deprecated and will be removed from pandas in a future version. Use pandas.concat instead.
df = df.append(df_tmp)
image number to process: 1000
processed 100 images out of 1000
running in serial took: 22.16494607925415 seconds
/var/spool/slurmd/job159891/slurm_script: line 15: deactivate: No such file or directory
/zfs/gsb/intermediate-yens/nrapstin/scripts/ocr-job-array-task-serial.py:99: FutureWarning: The frame.append method is deprecated and will be removed from pandas in a future version. Use pandas.concat instead.
df = df.append(df_tmp)
image number to process: 1000
processed 100 images out of 1000
running in serial took: 22.325437545776367 seconds
/var/spool/slurmd/job159882/slurm_script: line 15: deactivate: No such file or directory
/zfs/gsb/intermediate-yens/nrapstin/scripts/ocr-job-array-task-serial.py:99: FutureWarning: The frame.append method is deprecated and will be removed from pandas in a future version. Use pandas.concat instead.
df = df.append(df_tmp)
image number to process: 1000
processed 100 images out of 1000
running in serial took: 22.093724727630615 seconds
List all output csv files:
$ ls -ltr job_array_out/
Should list 10 files.
See the number of lines in each csv file:
$ wc -l job_array_out/*.csv
101 job_array_out/processed_images0.csv
101 job_array_out/processed_images1.csv
101 job_array_out/processed_images2.csv
101 job_array_out/processed_images3.csv
101 job_array_out/processed_images4.csv
101 job_array_out/processed_images5.csv
101 job_array_out/processed_images6.csv
101 job_array_out/processed_images7.csv
101 job_array_out/processed_images8.csv
101 job_array_out/processed_images9.csv
1010 total
If everything looks correct, then we can combine the csv files by running the shell script that loops over the individual csv files and combines them into a final csv file:
We will run this combine_csv.sh
shell script:
#!/bin/sh
# shell script to combine individual csv files into one final csv
# run this after ALL job array tasks have finished to produce one final csv
# read in header
head -1 job_array_out/processed_images0.csv > job_array_out/final_processed_images.csv
# read in data w/o header
for filename in $(ls job_array_out/processed_images*.csv); do sed 1d $filename >> job_array_out/final_processed_images.csv; done
# clean up by removing individual csv files
rm job_array_out/processed_images*.csv
Run it with:
$ sh combine_csv.sh
Check that the final csv is what you expect.
$ wc -l job_array_out/final_processed_images.csv # should be 1001 lines long
$ cat job_array_out/final_processed_images.csv
There are limits on the size of job arrays and the amount of jobs that can be in the queue for any user at the same time.
The array tasks can be indexed from 0 to 511 (--array=0-511
), so you can set --array
option from 0 to 511 in the slurm script.
We can check user job limits for normal
partition with:
$ sacctmgr show qos normal
- The column named
MaxJobsPU
lists the maximum jobs each user can have running in thenormal
partition queue. That limit is 200 jobs per user. - The
MaxSubmitPU
column lists the maximum number of jobs that a user can have submitted to thenormal
partition queue (jobs can be pending or running). That limit is set to 500 jobs.
Connect with us