#!/usr/bin/python2.3 # # $Id: labrun.py,v 1.9 2004/04/01 23:57:04 dberger Exp $ # # A simple tool to run a set of commands across a set of hosts with # some maxmimum level of parallelism. # # inputs: (r) a file containing a list of hosts (one host per line) # it is assumed that you have ssh keyauth setup to these hosts # (i.e. that you don't need a password to login and launch jobs) # (r) a file containing a list of commands (one command per line) # (o) a host setup script which will be run exactly once per host # (lazily - hosts are only setup immediately prior to the # first job being dispatched to them). The script is given # the host to be prepped as a command line argument. # if this script fails (exits w/ non zero status) the host # is removed from the list of available hosts. The script # should insure that the target machine is, in fact, suitable # for running the intended jobs - including that it has # sufficient disk space, has the required binaries, etc. # # outputs: on termination: a list of any failed jobs and a list of any # unfinished/pending jobs # # import commands import getopt import os import random import signal import sys import thread import threading import time import traceback MAX_THREADS=5 MAX_HOST_RETRIES=5 MAX_JOB_RETRIES=2 DEADLOCK_TIMEOUT=360.0 availableHosts = [] prepdHosts = [] activeHosts = [] hostRetries = {} waitingJobs = [] failedJobs = [] jobRetries = {} activeJobs = [] activeThreads = [] threadLock = thread.allocate_lock() threadEvent = threading.Event() inShutdown = 0 def prephost(host, setup_cmd): try: cmd = setup_cmd + " " + host print '\tprepping host ' + host retval = 0 try: retval, output = commands.getstatusoutput(cmd) #time.sleep(random.randrange(0,2)) except: retval = 1 if retval == 0: # the caller holds the lock prepdHosts.append(host) return 0 else: print "\tprep of host " + host + " failed w/ output: " + output return 1 except: print "exception caught in prepHost:" traceback.print_exc(file=sys.stdout) return 1 def threadCleanup(host, command): try: l = len(activeThreads) if not (l > 0): print "assertion failed: l > 0" activeThreads.remove(thread.get_ident()) print str(len(activeThreads)) + " active threads remain" if not (l > len(activeThreads)): print "assertion failed: (l > len(activeThreads))" l = len(activeJobs) if not (l >= 0): print "assertion failed: (l >= 0)" if not (activeJobs.count(command) > 0): print "assertion failed: (activeJobs.count(command) > 0)" activeJobs.remove(command) if not (l == len(activeJobs)+1): print "assertion failed: (l == len(activeJobs)+1)" activeHosts.remove(host) if hostRetries[host] > 0: availableHosts.append(host) else: print "\tremoving host " + host + " from available list due to repeated job failures" except: print "exception caught in threadCleanup" traceback.print_exc(file=sys.stdout) def waitToSpawn(): global inShutdown global MAX_HOST_RETRIES if inShutdown: return 0 while (1): #print "waittospawn: acquire" threadLock.acquire() #print "waittospawn: acquired" active = len(activeThreads) threadLock.release() #print "waittospawn: released" if (active >= MAX_THREADS): threadEvent.wait() threadEvent.clear() else: return 1 def runjob(host, command, setup_cmd): threadEvent.set() try: #print "runjob: acquire" threadLock.acquire() #print "runjob: acquired" if prepdHosts.count(host) == 0 and setup_cmd != "": threadLock.release() #print "runjob: released" if prephost(host, setup_cmd) != 0: print "\tunable to prep host " + host + ": job requeued" waitingJobs.append(command) #print "runjob: acquire (2)" threadLock.acquire() #print "runjob: acquired (2)" threadCleanup(host, command) # threadCleanup put the host back on the available list - so # remove it again availableHosts.remove(host) threadLock.release() #print "runjob: release (2)" threadEvent.set() #print "runjob: set" return else: threadLock.release() #print "runjob: released:" if inShutdown: #print "runjob: acquire (shutdown)" threadLock.acquire() #print "runjob: acquired (shutdown)" waitingJobs.append(command) threadCleanup(host, command) threadLock.release() #print "runjob: release (shutdown)" threadEvent.set() #print "runjob: set (shutdown)" return cmd = "ssh -x " + host + ' "' + command + '"' print '\tstarting job on host ' + host retval = 0 try: retval, output = commands.getstatusoutput(cmd) #time.sleep(random.randrange(0,2)) except: print "running command threw an exception" retval = 1 #print "runjob: acquire (2)" threadLock.acquire() #print "runjob: acquired (2)" try: if retval != 0: # it didn't complete - let's try it again jobRetries[command] -= 1 hostRetries[host] -= 1 if jobRetries[command] > 0: print "\tjob running on host " + host + " failed: job requeued" waitingJobs.append(command) else: print "\tjob " + command + " failed too many times, abandoning" failedJobs.append(command) else: print "\tjob on host " + host + " completed." hostRetries[host] = min(hostRetries[host] + 1, MAX_HOST_RETRIES); finally: threadCleanup(host, command) threadLock.release() #print "runjob: release (2)" threadEvent.set() #print "runjob: set (2)" finally: #print "runjob: acquire (3)" threadLock.acquire() #print "runjob: acquire (3)" waiting = len(waitingJobs) available = len(availableHosts) print ("\t" + str(waiting) + " jobs waiting for service on " + str(available) + " host(s)") print ("\t" + str(len(activeJobs)) + " (" + str(threading.activeCount()-1) + ") jobs currently running.") if waiting > 0 and available > 0 and not inShutdown: launch_job(setup_cmd) threadLock.release() #print "runjob: release (3)" thread.exit() def usage(): print sys.argv[0] + ": [-t max_threads] [-r max_host_retries] [-j max_job_retries] [-d deadlock_timeout] -h host_list -s setup_script -c command_list" def shutdown(force): global inShutdown # if we're shutting down naturally, let any jobs being prep'd # complete normally. inShutdown = force while threading.activeCount() > 1: #print "shutdown: acquire" threadLock.acquire() #print "shutdown: acquired" active = len(activeThreads) threadLock.release() #print "shutdown: released" try: print "waiting for " + str(active) + " job(s) to finish..." if len(activeThreads) > 0: threadEvent.wait() threadEvent.clear() else: break except KeyboardInterrupt: if inShutdown == 1: print "I'm already shutting down - be patient..." else: inShutdown = 1 print "all threads have terminated... " #print "shutdown: acquire (2)" threadLock.acquire() #print "shutdown: acquired (2)" if len(waitingJobs) > 0: print "\twriting remaining jobs to pending-jobs.cmds" f = open("pending-jobs.cmds", 'w') for job in waitingJobs: f.write(str(job) + "\n") print ".", f.close() print "" if len(failedJobs) > 0 or len(activeJobs) > 0: print "\twriting failed jobs to failed-jobs.cmds" f = open("failed-jobs.cmds", 'w') if len(failedJobs) > 0: for job in failedJobs: f.write(str(job) + "\n") print ".", if len(activeJobs) > 0: for job in failedJobs: f.write(str(job) + "\n") print ".", f.close() threadLock.release() #print "shutdown: release (2)" sys.exit(0) def launch_job(setup_cmd): try: host = availableHosts.pop() command = waitingJobs.pop() except: return activeHosts.append(host) activeJobs.append(command) activeThreads.append(thread.start_new_thread(runjob, (host, command, setup_cmd))) def main(): global MAX_JOB_RETRIES global MAX_HOST_RETRIES global MAX_THREADS global DEADLOCK_TIMEOUT try: (optlist, args) = getopt.getopt(sys.argv[1:], "?h:s:c:t:r:j:d:", ["usage", "host_list=", "setup_script=", "command_list=", "max_threads=", "max_host_retries=", "max_job_retries=", "deadlock_timeout="]) except getopt.GetoptError: usage() sys.exit(1) setup_cmd = "" for o,a in optlist: if o == "?": usage() sys.exit(1) if o in ("-h", "--host_list"): availableHosts[:] = open(a).read().strip().split("\n") for host in availableHosts: hostRetries[host] = MAX_HOST_RETRIES random.shuffle(availableHosts) if o in ("-s", "--setup_script"): setup_cmd = a if o in ("-c", "--command_list"): waitingJobs[:] = open(a).read().strip().split("\n") for job in waitingJobs: jobRetries[job] = MAX_JOB_RETRIES if o in ("-t", "--max_threads"): MAX_THREADS = int(a) if o in ("-r", "--max_host_retries"): MAX_HOST_RETRIES = int(a) if len(hostRetries) > 0: for host in availableHosts: hostRetries[host] = MAX_HOST_RETRIES if o in ("-j", "--max_job_retries"): MAX_JOB_RETRIES = int(a) if len(jobRetries) > 0: for job in waitingJobs: jobRetries[job] = MAX_JOB_RETRIES if o in ("-d", "--deadlock_timeout"): DEADLOCK_TIMEOUT = float(a) if (len(waitingJobs) == 0 or len(availableHosts) == 0): usage() sys.exit(1) threadLock.acquire() wait = len(waitingJobs) active = len(activeJobs) hosts = len(availableHosts) threadLock.release() while wait > 0 or active > 0: #print "main: acquire" threadLock.acquire() #print "main: acquired" wait = len(waitingJobs) active = len(activeJobs) hosts = len(availableHosts) threadLock.release() #print "main: released" try: if hosts > 0 and wait > 0: #print "main: wait to spawn" waitToSpawn() #print "main: acquire (2)" threadLock.acquire() #print "main: acquired (2)" try: launch_job(setup_cmd) #print str(len(activeThreads)) + " threads running" #print "main: release (2)" threadLock.release() #print "main: wait" threadEvent.wait(DEADLOCK_TIMEOUT) if threadEvent.isSet(): threadEvent.clear() continue else: print "no available hosts left - aborting to avoid potential deadlock" break #print "main: acquire (3)" threadLock.acquire() #print "main: acquired (3)" #print "main: clear" threadEvent.clear() hosts = len(availableHosts) #print "main: release (3)" threadLock.release() if hosts == 0: print "sleeping carefully..." threadEvent.wait(DEADLOCK_TMEOUT) if threadEvent.isSet(): threadEvent.clear() continue else: print "no available hosts left - aborting to avoid potential deadlock" break #print "main: done" except: threadLock.release() print "exception caught in main loop" traceback.print_exc(file=sys.stdout) print "shutting down gracefully - this may take a while..." shutdown(1) sys.exit(-1) else: if wait == 0: break try: #print "main: else: wait" threadEvent.wait(DEADLOCK_TIMEOUT) if threadEvent.isSet(): #print "main: else: clear" threadEvent.clear() else: print "no available hosts left - aborting to avoid potential deadlock" break except: print "exception caught in main loop (2)" traceback.print_exc(file=sys.stdout) print "shutting down gracefully - this may take a while..." shutdown(1) sys.exit(-1) #print "main: acquire (3)" threadLock.acquire() #print "main: acquired (3)" if not (len(activeJobs) == len(activeThreads)): print "assertion failed: (len(activeJobs) == len(activeThreads))" #print "main: release (3)" threadLock.release() except KeyboardInterrupt: print "shutting down gracefully - this may take a while..." shutdown(1) sys.exit(1) print "terminating main loop..." shutdown(0) if __name__ == "__main__": main()