13. Job Arrays

Yahtzee Example

To sample the distribution 100,000 times, we can use the scheduler by submitting 10 jobs, each of which collects 10,000 samples.

We’ll prepare a submission script called yahtzee-array.slurm, using a job array to submit it to the scheduler 10 times.

#!/bin/bash

# Example of running python script as a job array

#SBATCH -J yahtzee
#SBATCH --array=1-10
#SBATCH -p normal
#SBATCH -c 1                            # CPU cores (up to 256 on normal partition)
#SBATCH -t 10:00
#SBATCH -o rollcount-array.csv
#SBATCH --mail-type=ALL
#SBATCH --mail-user=your_email@stanford.edu

# Run python script
python3 yahtzee.py 10000

To submit the script, run:

$ sbatch yahtzee-array.slurm

You should then see 10 jobs in the queue:

$ squeue
            JOBID PARTITION     NAME     USER ST       TIME  NODES NODELIST(REASON)
           44102_1    normal  yahtzee nrapstin  R       0:01      1 yen10
           44102_2    normal  yahtzee nrapstin  R       0:01      1 yen10
           44102_3    normal  yahtzee nrapstin  R       0:01      1 yen10
           44102_4    normal  yahtzee nrapstin  R       0:01      1 yen10
           44102_5    normal  yahtzee nrapstin  R       0:01      1 yen10
           44102_6    normal  yahtzee nrapstin  R       0:01      1 yen10
           44102_7    normal  yahtzee nrapstin  R       0:01      1 yen10
           44102_8    normal  yahtzee nrapstin  R       0:01      1 yen10
           44102_9    normal  yahtzee nrapstin  R       0:01      1 yen10
          44102_10    normal  yahtzee nrapstin  R       0:01      1 yen10

Each job task produces a csv file with 10,000 lines. After all of the job tasks have completed, we can combine the 10 csv files into one final csv file with 100,000 lines:

$ cat rollcount-array-*csv > rollcount-array.csv
$ wc -l rollcount-array.csv
100000 rollcount-array.csv


OCR Example

Alternative to writing a python script that loops over batches of images, we can run independent processing of image batches concurrently. To keep it simple, we will run the serial OCR script that will only process a user specified batch of images, getting rid of the for loop over batches of images in the ocr-serial.py script.

Take a look at ocr-job-array-task.py python script:

##############################################################
### Activate conda env ocr and run the script
# The script accepts these arguments:
#            path to the data
#            out path to write the results
#            unique id of the batch of images to process in a job array
##############################################################
# Execute the script with:
#     module load anaconda3
#     source activate /zfs/gsb/intermediate-yens/conda/ocr
#     python ocr-job-array-task.py /scratch/darc/intermediate-yens/data/ job_array_out/ $image_batch_id
##############################################################
# import sys library (needed for accepted command line args)
import sys
import numpy as np
import pandas as pd
from PIL import Image
import pytesseract # OCR with Tesseract
import glob, os, shutil, csv, json, time
from os import makedirs
from os.path import exists
from datetime import datetime

# start the timer
tmp = time.time()

# path to the data is the first command line argument
data_path = sys.argv[1]

# path to the out folder is the second command line argument
out_path = sys.argv[2]

# image batch id is the third argument (aka job array index)
batch_id = int(sys.argv[3])

intermediateFileNum = 100

# make out dir for OCR results
makedirs(out_path, exist_ok = True)

# check if file exists, if it doesn't, make a new CSV file
# we will make a unique csv file for each array task
# then combine them later
if not exists(out_path + 'processed_images' + str(batch_id) + '.csv'):
    # create a new csv file
    with open(out_path + 'processed_images' + str(batch_id) + '.csv', 'w') as csvfile:
        filewriter = csv.writer(csvfile, delimiter=',', quoting=csv.QUOTE_NONE)
        # write headers
        filewriter.writerow(['image_path', 'image_name', 'ocr_text', 'date'])

# csv exists so read it in
df = pd.read_csv(out_path + 'processed_images' + str(batch_id) + '.csv')

# make a list of image paths to OCR
image_paths = glob.glob(data_path + '*.jpg')
tot_images = len(image_paths)

# drop images we already processed
image_paths = [img for img in image_paths if img not in set(df['image_path'])]
print('image number to process: ', len(image_paths))

# batch_id is the batch number (passed as a command line arg)
# write intermediate results while processing all of the files
# make a temp df to append to result

df_tmp = pd.DataFrame(columns=['image_path', 'image_name', 'ocr_text', 'date'])

start = batch_id * intermediateFileNum

if batch_id == int(len(image_paths) / intermediateFileNum):
    end = len(image_paths)
else:
    end = batch_id * intermediateFileNum + intermediateFileNum

image_paths_batch = image_paths[start: end]

image_names = []
text_from_ocr = []

# loop over images to convert them into text in batches
for img in image_paths_batch:
    text_from_ocr.append( json.dumps(pytesseract.image_to_string(Image.open(img)).strip() ))
    image_names.append(img[32:])

# make a pd series for image paths
df_tmp['image_path'] = pd.Series(image_paths_batch, dtype='object')

# make a pd series for image names
df_tmp['image_name'] = pd.Series(image_names, dtype='object')

# make a pd series for OCR results
df_tmp['ocr_text'] = pd.Series(text_from_ocr, dtype='object')

# make a pd series for date
date_list = [datetime.today().strftime("%m/%d/%Y") for i in range(len(image_paths_batch))]
df_tmp['date'] = pd.Series(date_list, dtype='object')

# add data to df
df = df.append(df_tmp)

# save intermediate results after each batch of images
df.to_csv(out_path + 'processed_images' + str(batch_id) + '.csv', index = False)
print('processed', len(df), 'images out of', tot_images)

print('running in serial took: %s seconds' % (str(time.time() - tmp)))

Each task in a job array will run the same python script taking in a different chunk number to process a unique subset of the images and will output a resulting csv file that we will need to combine to get the final table for all images. We will launch all of the job tasks with a job array to process the entire test data set we have been working with all at once (hence the power of job arrays).

We add --array option to the slurm file and pass each task as a command line argument. Look at ocr-array.slurm script:

#!/bin/bash

# Example of running python script as a job array

#SBATCH -J ocr
#SBATCH -p normal
#SBATCH --array=0-9              # define tasks in the array (each task has a unique $SLURM_ARRAY_TASK_ID)
#SBATCH -c 1
#SBATCH -t 30:00
#SBATCH -o ocr-array-%A-%a.out
#SBATCH --mail-type=ALL
#SBATCH --mail-user=your_email@stanford.edu

# For safety, we deactivate any conda env that might be activated on interactive yens before submission and purge all loaded modules
source deactivate
module purge

# Load software
module load anaconda3

# Activate the environment
source activate /zfs/gsb/intermediate-yens/conda/ocr

# Run python script
# $SLURM_ARRAY_TASK_ID variable becomes 0, 1, 2 or whatever you specify in --array option
python ocr-job-array-task-serial.py /scratch/darc/intermediate-yens/data/ job_array_out/ $SLURM_ARRAY_TASK_ID

Note that in this case, we specified slurm option #SBATCH --array=0-9 to run 10 independent tasks in parallel, each processing 100 images. Each task is using 1 core. Each task will generate a unique log file ocr-array-%A-%a.out where %A is the job ID and %a is array task ID.

Submit Job Array to Scheduler

We can now submit our job array slurm script to the scheduler to run the job array on the Yen-Slurm server. It will queue all of the tasks at the same time (some might sit in the queue while others are going to run right away). To submit, run:

$ sbatch ocr-array.slurm 

Each task should take about 20 seconds to process 100 images. Then we are going to combine the resulting csv files into one final csv.

Monitor your jobs with watch squeue -u $USER where $USER is your SUNet ID.

The queue will look similar to:


             JOBID PARTITION     NAME     USER ST       TIME  NODES NODELIST(REASON)
       43316_[8-9]    normal      ocr nrapstin PD       0:00      1 (None)
           43316_1    normal      ocr nrapstin  R       0:06      1 yen10
           43316_2    normal      ocr nrapstin  R       0:06      1 yen10
           43316_3    normal      ocr nrapstin  R       0:06      1 yen10
           43316_4    normal      ocr nrapstin  R       0:06      1 yen10
           43316_5    normal      ocr nrapstin  R       0:06      1 yen10
           43316_6    normal      ocr nrapstin  R       0:06      1 yen10
           43316_7    normal      ocr nrapstin  R       0:06      1 yen10
           43316_0    normal      ocr nrapstin  R       0:08      1 yen10

After all of the tasks have finished, make sure the out files look good and each of the array task’s output csv looks correct as well.

$ cat ocr-array*out

The output from the job array tasks should look like:

/var/spool/slurmd/job159883/slurm_script: line 15: deactivate: No such file or directory
/zfs/gsb/intermediate-yens/nrapstin/scripts/ocr-job-array-task-serial.py:99: FutureWarning: The frame.append method is deprecated and will be removed from pandas in a future version. Use pandas.concat instead.
  df = df.append(df_tmp)
image number to process:  1000
processed 100 images out of 1000
running in serial took: 21.49007225036621 seconds
/var/spool/slurmd/job159884/slurm_script: line 15: deactivate: No such file or directory
/zfs/gsb/intermediate-yens/nrapstin/scripts/ocr-job-array-task-serial.py:99: FutureWarning: The frame.append method is deprecated and will be removed from pandas in a future version. Use pandas.concat instead.
  df = df.append(df_tmp)
image number to process:  1000
processed 100 images out of 1000
running in serial took: 21.055860996246338 seconds
/var/spool/slurmd/job159885/slurm_script: line 15: deactivate: No such file or directory
/zfs/gsb/intermediate-yens/nrapstin/scripts/ocr-job-array-task-serial.py:99: FutureWarning: The frame.append method is deprecated and will be removed from pandas in a future version. Use pandas.concat instead.
  df = df.append(df_tmp)
image number to process:  1000
processed 100 images out of 1000
running in serial took: 22.216635942459106 seconds
/var/spool/slurmd/job159886/slurm_script: line 15: deactivate: No such file or directory
/zfs/gsb/intermediate-yens/nrapstin/scripts/ocr-job-array-task-serial.py:99: FutureWarning: The frame.append method is deprecated and will be removed from pandas in a future version. Use pandas.concat instead.
  df = df.append(df_tmp)
image number to process:  1000
processed 100 images out of 1000
running in serial took: 22.403629302978516 seconds
/var/spool/slurmd/job159887/slurm_script: line 15: deactivate: No such file or directory
/zfs/gsb/intermediate-yens/nrapstin/scripts/ocr-job-array-task-serial.py:99: FutureWarning: The frame.append method is deprecated and will be removed from pandas in a future version. Use pandas.concat instead.
  df = df.append(df_tmp)
image number to process:  1000
processed 100 images out of 1000
running in serial took: 22.728941202163696 seconds
/var/spool/slurmd/job159888/slurm_script: line 15: deactivate: No such file or directory
/zfs/gsb/intermediate-yens/nrapstin/scripts/ocr-job-array-task-serial.py:99: FutureWarning: The frame.append method is deprecated and will be removed from pandas in a future version. Use pandas.concat instead.
  df = df.append(df_tmp)
image number to process:  1000
processed 100 images out of 1000
running in serial took: 22.54532027244568 seconds
/var/spool/slurmd/job159889/slurm_script: line 15: deactivate: No such file or directory
/zfs/gsb/intermediate-yens/nrapstin/scripts/ocr-job-array-task-serial.py:99: FutureWarning: The frame.append method is deprecated and will be removed from pandas in a future version. Use pandas.concat instead.
  df = df.append(df_tmp)
image number to process:  1000
processed 100 images out of 1000
running in serial took: 21.468422889709473 seconds
/var/spool/slurmd/job159890/slurm_script: line 15: deactivate: No such file or directory
/zfs/gsb/intermediate-yens/nrapstin/scripts/ocr-job-array-task-serial.py:99: FutureWarning: The frame.append method is deprecated and will be removed from pandas in a future version. Use pandas.concat instead.
  df = df.append(df_tmp)
image number to process:  1000
processed 100 images out of 1000
running in serial took: 22.16494607925415 seconds
/var/spool/slurmd/job159891/slurm_script: line 15: deactivate: No such file or directory
/zfs/gsb/intermediate-yens/nrapstin/scripts/ocr-job-array-task-serial.py:99: FutureWarning: The frame.append method is deprecated and will be removed from pandas in a future version. Use pandas.concat instead.
  df = df.append(df_tmp)
image number to process:  1000
processed 100 images out of 1000
running in serial took: 22.325437545776367 seconds
/var/spool/slurmd/job159882/slurm_script: line 15: deactivate: No such file or directory
/zfs/gsb/intermediate-yens/nrapstin/scripts/ocr-job-array-task-serial.py:99: FutureWarning: The frame.append method is deprecated and will be removed from pandas in a future version. Use pandas.concat instead.
  df = df.append(df_tmp)
image number to process:  1000
processed 100 images out of 1000
running in serial took: 22.093724727630615 seconds

List all output csv files:

$ ls -ltr job_array_out/

Should list 10 files.

See the number of lines in each csv file:

$ wc -l job_array_out/*.csv
   101 job_array_out/processed_images0.csv
   101 job_array_out/processed_images1.csv
   101 job_array_out/processed_images2.csv
   101 job_array_out/processed_images3.csv
   101 job_array_out/processed_images4.csv
   101 job_array_out/processed_images5.csv
   101 job_array_out/processed_images6.csv
   101 job_array_out/processed_images7.csv
   101 job_array_out/processed_images8.csv
   101 job_array_out/processed_images9.csv
  1010 total

If everything looks correct, then we can combine the csv files by running the shell script that loops over the individual csv files and combines them into a final csv file:

We will run this combine_csv.sh shell script:

#!/bin/sh

# shell script to combine individual csv files into one final csv
# run this after ALL job array tasks have finished to produce one final csv

# read in header
head -1 job_array_out/processed_images0.csv > job_array_out/final_processed_images.csv

# read in data w/o header
for filename in $(ls job_array_out/processed_images*.csv); do sed 1d $filename >> job_array_out/final_processed_images.csv; done

# clean up by removing individual csv files
rm job_array_out/processed_images*.csv

Run it with:

$ sh combine_csv.sh

Check that the final csv is what you expect.

$ wc -l job_array_out/final_processed_images.csv  # should be 1001 lines long

$ cat job_array_out/final_processed_images.csv

There are limits on the size of job arrays and the amount of jobs that can be in the queue for any user at the same time. The array tasks can be indexed from 0 to 511 (--array=0-511), so you can set --array option from 0 to 511 in the slurm script.

We can check user job limits for normal partition with:

$ sacctmgr show qos normal
  • The column named MaxJobsPU lists the maximum jobs each user can have running in the normal partition queue. That limit is 200 jobs per user.
  • The MaxSubmitPU column lists the maximum number of jobs that a user can have submitted to the normal partition queue (jobs can be pending or running). That limit is set to 500 jobs.