Source code for shelljob.job

"""
	A basic monitor that provides a more convenient use of the process Groups.
"""
import multiprocessing
import time

from shelljob import proc

"""
	Manages a Group by collecting output and displaying feedback. This is an abstract
	class which provides the basic mechanism.
"""
[docs]class Monitor(object): def __init__( self, max_simul = multiprocessing.cpu_count(), feedback_timeout = 5.0 ): """ @param max_simul: the maximum number of processes which can be running at the same time @param feedback_timeout: generate progress feedback at this interval """ self.group = proc.Group() self.max_simul = max_simul self.jobs = {} self.feedback_timeout = feedback_timeout self.last_feedback = time.time() self.job_count = 0
[docs] def convert_cmds( self, in_cmds ): """ Converts a variable format input command list to a set of Jobs. """ out_cmds = [] for cmd in in_cmds: if not isinstance(cmd,Job): cmd = Job(cmd) if cmd.id == None: cmd.id = self.job_count self.job_count += 1 out_cmds.append( cmd ) return out_cmds
[docs] def run( self, cmds, shell = False ): """ Run a series of commands and wait for their completion. @param cmds: a list of cmds or Job's for Group.run. This will be run in parallel obeying the 'max_simul' option provided to the constructor. Using Job's directly allows you to associate additional data for use with each job -- helpful for custom monitors. """ cmds = self.convert_cmds( cmds ) while True: # ensure max_simul are running run_count = self._check_finished() if run_count < self.max_simul and len(cmds): # space to create a new job job = cmds[0] job.handle = self.group.run( job.cmd, shell = shell ) job.status = Job.STATUS_RUNNING cmds = cmds[1:] self.jobs[job.handle] = job self.job_started(job) # this allows a quick spawing of max_simul on first call continue lines = self.group.readlines() for handle, line in lines: self.job_output( self.jobs[handle], line ) self._check_feedback() if run_count == 0: break self.gen_feedback()
def _check_finished(self): """ Process all finished items. @return: count of still running jobs """ codes = self.group.get_exit_codes() count_run = 0 count_done = 0 for handle, code in codes: # it must be done and no output left before we consider it done if code == None or not handle.group_output_done: count_run += 1 continue job = self.jobs[handle] job.status = Job.STATUS_FINISHED job.exit_code = code count_done += 1 self.job_finished( job ) if count_done > 0: self.group.clear_finished() return count_run def _check_feedback(self): """ Call gen_feedback at regular interval. """ elapsed = time.time() - self.last_feedback if elapsed > self.feedback_timeout: self.gen_feedback() self.last_feedback = time.time()
[docs] def job_finished(self, job): """ (Virtual) Called when a job has completed. """ pass
[docs] def job_started(self, job): """ (Virtual) Called just after a job is started. """ pass
[docs] def job_output(self, job, line): """ (Virtual) Called for each line of output from a job. """ pass
[docs] def gen_feedback(self): """ (Virtual) Called whenever the Monitor things feedback should be generated (in addition to the other events). Generally this is called for each feedback_timeout period specified in the constructor. """ pass
[docs] def get_jobs(self): """ Get the list of jobs. """ return self.jobs.values()
[docs]class Job: STATUS_NONE = 0 STATUS_RUNNING = 1 STATUS_FINISHED = 2 """ Encapsulates information about a job. """ def __init__(self, cmd): # the Popen object, set by monitor when the job starts self.handle = None # An identifier, if set to None then monitor will assign one (incrementing) self.id = None # command executed by the job self.cmd = cmd # Current status of the job self.status = Job.STATUS_NONE # Value of the exit code from the process, or None if not yet finished self.exit_code = None # An identifying name self.name = None
[docs] def get_ref_name(self): """ A reference name suitable for using in identifiers and files. """ if not self.name: return self.id # generate simple safe filename base = self.name.replace( '/', '_' ).replace( '\\', '_' ).replace( ' ', '_' ) keep = ('_','.') return "".join( c for c in base if c.isalnum() or c in keep )
[docs]class FileMonitor(Monitor): """ A monitor which writes output to log files. Simple textual feedback will also be reported to the console. @param file_pattern: will be formatted with the job.id to produce filenames. These files are overwritten when a job starts and record the output of the job. @param meta: if True then meta information about the job will also be recorded to the logfile @param kwargs: the remaining arguments are passed to the Monitor constructor """ def __init__(self, file_pattern = '/tmp/job_{}.log', meta = True, **kwargs): super(FileMonitor,self).__init__( **kwargs ) self.file_pattern = file_pattern self.meta = meta
[docs] def get_log_name(self,job): """ (Virtual) get the log of the log file to use. """ return self.file_pattern.format( job.get_ref_name() )
[docs] def job_finished(self, job): """ Called when a job has completed. """ if self.meta: job.log_file.write( "Exit Code: {}\n".format( job.exit_code ).encode() ) job.log_file.close()
[docs] def job_started(self, job): """ Called just after a job is started. """ job.log_name = self.get_log_name(job) job.log_file = open(job.log_name,'wb') job.got_output = False if self.meta: job.log_file.write( "Job #{}: {}\n".format( job.id, job.cmd ).encode() )
[docs] def job_output(self, job, line): """ Called for each line of output from a job. """ job.log_file.write(line) job.got_output = True
[docs] def gen_feedback(self): """ Called whenever the Monitor things feedback should be generated (in addition to the other events). Generally this is called for each feedback_timeout period specified in the constructor. """ good = 0 bad = 0 output = 0 stall = 0 for job in self.get_jobs(): if job.exit_code != None: if job.exit_code == 0: good += 1 else: bad += 1 elif job.got_output: job.got_output = False output += 1 else: stall += 1 print( "Done: {} Failed: {} Reading: {} Idle: {}".format( good, bad, output, stall ) )