JobBatch.py

Developer Documentation Only Below This Point:

Batch management package for PBS, extended for use on a local machine or on LSF

There are two little naming bugbears in here to be aware of.

First, LOGFILE is a variable that affects where the batch processing log goes, but logfile() is a method that identifies where the standard output and error (and any specifically-written logfiles) of a given job execution log goes.

Second, “queue” refers to three things, the queue of PENDING jobs in the batch, the job processing management software, and the named queue supported by the processing management software. The first is maintained by a directory named PENDING, and referenced by pendingcount(). The third is identified by qm.QUEUESPEC. The second is implemented as an interface to either a LOCAL or PBS queue manager. This was originally called an interface manager, which is more appropriate, but it was too hard to write the code without using the term queue, because “PBS queue” is how it is always described in conversation. This is somewhat less frustrating than trying to get a non-systems person to say “workflow” instead of “pipeline,” but not ultimately worth the hassle, since even the author had difficulty with using the term “interface” consistently, in code or in conversation.

JobBatch.CONTINUEFLAG = -1

whether to re-iterate on incomplete batch

class JobBatch.JobBatch(metadata, resume=None, jobs_to_be_done=0, local=None, logit=None)

Manager for a collection of jobs provided as shell scripts. Submits and monitors the jobs in the batch, moving them through a small list of possible states represented as sub-directories of the task directory.

Invocation is through BatchRunner.py.

metadata The task metadata provided by the BatchRunner

jobs_to_be_done Number of jobs, to help with management.

local Whether to run on the local machine or a cluster manager if provided.

logit a logging method that duck-types Reporting.logit()

activecount()

Return the jobs not pending, done, or failed.

batching_loop(parallel=False, prototyping=False)

Main driver loop.

As long as there are pending jobs, feed the next bolus (which may be the whole batch in many instances) into the queue and then loop waiting for progress. As jobs finish, any remaining jobs are fed into the queue to maintain a full queue.

parallel => return a continuing status so that other batches in a replicate may start

prototype => run a single job and await its success before running the entire bolus

count(q)

Return the number of jobs currently in the queue (directory) named q

donecount()

Return the number of jobs that are DONE

donejobs()

Return a list of jobs that are DONE

errorcount()

Return the number of jobs that have encountered an error

errorjobs()

Return a list of jobs that have encountered an ERROR

examine_runbatch()

Determine and log the state of each running job. Return the count of jobs that have changed state since the last check.

feed_batch(delta=0, prototyping=False)

Log the start of the JobBatch and then submit some or all jobs in PENDING

self.max_bolus allows a user to force a lesser queue injection rate than the default behavior would permit.

self.max_bolus of 1 effectively causes sequential behavior

self.max_bolus was changed to an instance attribute to allow this module to change it based on runtime conditions, such as OSErrors caused by too many submissions

delta is the number that have recently finished

is_complete()

Return True if all jobs in the batch are completed, either with or without error; False if not.

is_correct()

Return True if all jobs in the batch finished without error

joblist(q)

Return the list of jobs currently in the queue (directory) named q The isdir() allows us to count all states including some that may not exist yet, such as QUEUED.

logfile(name)

Standard path to the log file.

pend_job(command, rename=None)

Add a script to the queue and a name for the job.

pendingcount()

Return the number of jobs with the status of “PENDING”

pendingjobs()

Return a list of jobs with the status of “PENDING”

queuedjobs()

Return a list of jobs with the status of “QUEUED”

record_job_change(jobid, todir, fromdir=None)

Move jobid from a directory representing one state to another.

record_job_completion(jobid, fromdir=None)

Change state and log the timestamp of successful completion of the job aliased jobid

record_job_error(jobid, fromdir=None)

Change state and log the timestamp of failure of the job aliased jobid

record_job_queued(jobid)

Change state and log the timestamp of aliased jobid getting queued (not in use)

record_job_start(pending, jobid)

Change state of the job aliased jobid to qm.STATUS_RUNNING

repend_errors()

For a batch that has run, find jobs that have moved to ERROR and rename them back to PENDING without their jobid suffixes

repend_runners()

For a batch that has run, find jobs that have moved to RUNNING and are no longer running; move them to DONE if done, otherwise move them back to PENDING without their jobid suffixes.

reset_jobcounter()

Set the jobcounter to the number of jobs in the pending queue.

runningcount()

Return the number of jobs with the status of “RUNNING”

runningjobs()

Return a list of jobs with the status of “RUNNING”

submit_job(name)

Make sure the command exists. Have the queue manager start the job if so. Return the job ID of the job or raise a SyQADAControl.SyQADAWarning (from manager.submit())