[Saga-devel] saga-projects SVN commit 849: /applications/Lazarus/
yye00 at cct.lsu.edu
yye00 at cct.lsu.edu
Fri Jan 16 10:45:00 CST 2009
User: yye00
Date: 2009/01/16 10:45 AM
Added:
/applications/Lazarus/
Calibrate.py, FileTest.py, yye00.py, yye01.py, yye03.py, yye04.py
Log:
Adding Lazarus scripts to make sure they are not purged by sys admins
File Changes:
Directory: /applications/Lazarus/
=================================
File [added]: Calibrate.py
Delta lines: +68 -0
===================================================================
--- applications/Lazarus/Calibrate.py 2009-01-13 18:35:11 UTC (rev 848)
+++ applications/Lazarus/Calibrate.py 2009-01-16 16:44:59 UTC (rev 849)
@@ -0,0 +1,68 @@
+#!/usr/bin/env python
+
+import os
+import sys
+import string
+import math
+import traceback
+from string import *
+from re import *
+
+# begin the checking module
+def Calibrate():
+ # This script performs a calibrated test of the commands we will use in checking
+ # the output from each stage
+
+ # directory where the calibration file we want to check lives:
+ CALIBRATE_FILE_DIR = "/work/yye00/ICAC/Calibrate/"
+ # the calibration filename
+ CALIBRATE_FILENAME = "Calibrate"
+ # the calibration filename extension, important for hdf5 versus F5
+ CALIBRATE_EXTENSION = ".h5"
+ # The find command test
+ FIND_CMD = 'find ' + CALIBRATE_FILE_DIR + '. -name "*.h5" -print'
+
+ # Flags that specify the error
+ EXIST_FLAG = 0
+ FIND_FLAG = 0
+ SIZE_FLAG = 0
+ SANITY_FLAG = 0
+
+ # get the full filename
+ filename = CALIBRATE_FILE_DIR + CALIBRATE_FILENAME + CALIBRATE_EXTENSION
+ # Check that the file exists using os module
+ try:
+ if(os.path.exists(filename)):
+ EXIST_FLAG = 1
+ except:
+ traceback.print_exc(file=sys.stdout)
+
+ # Check that we can search for the file using python
+ try:
+ cmd_out = os.popen(FIND_CMD).readlines()
+ file_out = cmd_out[0]
+ file_out=file_out.replace('/./','/')
+ file_out=file_out.rstrip("\n")
+ if(file_out==filename):
+ FIND_FLAG = 1
+ except:
+ traceback.print_exc(file=sys.stdout)
+
+ # Check that the file is not zero sized
+ try:
+ fsize= os.path.getsize(filename)
+ if(fsize>0):
+ SIZE_FLAG = 1
+ except:
+ traceback.print_exc(file=sys.stdout)
+
+ # Check that we passed all three tests
+ try:
+ if(EXIST_FLAG*FIND_FLAG*SIZE_FLAG==1):
+ SANITY_FLAG = 1
+ return SANITY_FLAG
+ else:
+ return -1
+ except:
+ traceback.print_exc(file=sys.stdout)
+
File [added]: FileTest.py
Delta lines: +62 -0
===================================================================
--- applications/Lazarus/FileTest.py 2009-01-13 18:35:11 UTC (rev 848)
+++ applications/Lazarus/FileTest.py 2009-01-16 16:44:59 UTC (rev 849)
@@ -0,0 +1,62 @@
+#!/usr/bin/env python
+
+import os
+import sys
+import string
+import math
+import traceback
+from string import *
+import re
+
+def filterdups(iterable):
+ result = []
+ for item in iterable:
+ if item not in result:
+ result.append(item)
+ return result
+
+def list_difference(list1, list2):
+ diff_list = []
+ for item in list1:
+ if not item in list2:
+ diff_list.append(item)
+ return diff_list
+
+# begin the checking module
+def FileTest(TEST_FILE_DIR, TEST_DIR_SUFFIX, TEST_FILE_EXTENSION, NUMBER_OF_JOBS_PER_STAGE):
+ # The list of all the simulations we have files for
+ ALLSIMS_LIST=[]
+ # The list of simulations that need to be re-run:
+ RERUN_LIST=[]
+
+ # The find command test
+ FIND_CMD = 'find ' + TEST_FILE_DIR +"/*" + ' -name *.' + TEST_FILE_EXTENSION+' -print'
+
+ # Check that we can search for the file using python
+ try:
+ cmd_out = os.popen(FIND_CMD).readlines()
+ # Get the list of all the Simulations
+ for file_out in cmd_out:
+ ftemp = re.search(TEST_DIR_SUFFIX + "([0-9]*)", file_out).group()
+ ALLSIMS_LIST.append(atoi( re.sub(TEST_DIR_SUFFIX,'',ftemp)))
+ ALLSIMS_LIST=sorted(ALLSIMS_LIST)
+ ALLSIMS_LIST=filterdups(ALLSIMS_LIST)
+
+ # Check that the files are not zeroes
+ for file_out in cmd_out:
+ fsize = os.path.getsize(file_out.rstrip("\n"))
+ if(fsize==0):
+ ftemp = re.search(TEST_DIR_SUFFIX + "([0-9]*)", file_out).group()
+ RERUN_LIST.append(atoi( re.sub(TEST_DIR_SUFFIX,'',ftemp) ) )
+
+ # Check that the number of simulations is the number of jobs
+ if(len(ALLSIMS_LIST) !=NUMBER_OF_JOBS_PER_STAGE):
+ original = range(0, NUMBER_OF_JOBS_PER_STAGE)
+ difference = list_difference(original, ALLSIMS_LIST)
+ print original
+ print difference
+ RERUN_LIST=RERUN_LIST+(difference)
+ return RERUN_LIST
+ except:
+ traceback.print_exc(file=sys.stdout)
+
File [added]: yye00.py
Delta lines: +99 -0
===================================================================
--- applications/Lazarus/yye00.py 2009-01-13 18:35:11 UTC (rev 848)
+++ applications/Lazarus/yye00.py 2009-01-16 16:44:59 UTC (rev 849)
@@ -0,0 +1,99 @@
+#!/usr/bin/env python
+
+"""Module many_job.
+
+This Module is used to launch a set of bigjobs.
+
+"""
+
+import sys
+import getopt
+import saga
+import time
+import pdb
+import os
+import traceback
+import advert_job
+import logging
+import many_job
+
+NUMBER_JOBS=256
+
+def has_finished(state):
+ state = state.lower()
+ if state=="done" or state=="failed" or state=="canceled":
+ return True
+ else:
+ return False
+
+""" Test Job Submission via ManyJob abstraction """
+if __name__ == "__main__":
+ try:
+ print "ManyJob load test with " + str(NUMBER_JOBS) + " jobs."
+ starttime=time.time()
+
+ # submit via mj abstraction
+ resource_list = ( {"gram_url" : "gram://qb1.loni.org/jobmanager-pbs", "number_cores" : "8", "allocation" : "loni_cybertools", "queue" : "workq", "re_agent": "/home/yye00/ICAC/bigjob/advert_launcher.sh"},
+ {"gram_url" : "gram://qb1.loni.org/jobmanager-pbs", "number_cores" : "8", "allocation" : "loni_cybertools", "queue" : "workq", "re_agent": "/home/yye00/ICAC/bigjob/advert_launcher.sh"})
+
+
+
+ #resource_list = []
+ #resource_list.append({"gram_url" : "gram://qb1.loni.org/jobmanager-pbs", "number_cores" : "16", "allocation" : "loni_cybtertools", "queue" : "workq", "re_agent": "$(HOME)/src/REMDgManager/bigjob/advert_launcher.sh"})
+ print "Create manyjob service "
+ mjs = many_job.many_job_service(resource_list, None)
+
+ jobs = []
+ job_start_times = {}
+ job_states = {}
+ cwd = os.getcwd()
+ for i in range(0, NUMBER_JOBS):
+ # create job description
+ jd = saga.job.description()
+ jd.executable = "/bin/date"
+ jd.number_of_processes = "1"
+ jd.spmd_variation = "single"
+ jd.arguments = [""]
+ jd.working_directory = "/work/yye00"
+ jd.output = "/work/yye00/output/stdout-" + str(i) + ".txt"
+ jd.error = "/work/yye00/output/stderr-" + str(i) + ".txt"
+ subjob = mjs.create_job(jd)
+ subjob.run()
+ print "Submited sub-job " + "%d"%i + "."
+ jobs.append(subjob)
+ job_start_times[subjob]=time.time()
+ job_states[subjob] = subjob.get_state()
+ print "************************ All Jobs submitted ************************"
+ while 1:
+ finish_counter=0
+ result_map = {}
+ for i in range(0, NUMBER_JOBS):
+ old_state = job_states[jobs[i]]
+ state = jobs[i].get_state()
+ if result_map.has_key(state) == False:
+ result_map[state]=0
+ result_map[state] = result_map[state]+1
+ #print "counter: " + str(i) + " job: " + str(jobs[i]) + " state: " + state
+ if old_state != state:
+ print "Job " + str(jobs[i]) + " changed from: " + old_state + " to " + state
+ if old_state != state and has_finished(state)==True:
+ print "Job: " + str(jobs[i]) + " Runtime: " + str(time.time()-job_start_times[jobs[i]]) + " s."
+ if has_finished(state)==True:
+ finish_counter = finish_counter + 1
+ job_states[jobs[i]]=state
+
+ print "Current states: " + str(result_map)
+ time.sleep(5)
+ if finish_counter == NUMBER_JOBS:
+ break
+
+ mjs.cancel()
+ runtime = time.time()-starttime
+ print "Runtime: " + str(runtime) + " s; Runtime per Job: " + str(runtime/NUMBER_JOBS)
+ except:
+ traceback.print_exc(file=sys.stdout)
+ try:
+ mjs.cancel()
+ except:
+ pass
+
File [added]: yye01.py
Delta lines: +58 -0
===================================================================
--- applications/Lazarus/yye01.py 2009-01-13 18:35:11 UTC (rev 848)
+++ applications/Lazarus/yye01.py 2009-01-16 16:44:59 UTC (rev 849)
@@ -0,0 +1,58 @@
+""" Example application demonstrating job submission via bigjob
+ advert_job implementation of BigJob is used
+"""
+
+import saga
+import os
+import advert_job
+import time
+
+advert_host = "fortytwo.cct.lsu.edu"
+
+""" Test Job Submission via Advert """
+if __name__ == "__main__":
+
+ # Parameter for BigJob
+ re_agent = os.getcwd() + "/advert_launcher.sh" # path to agent
+ nodes = 64 # number nodes for agent
+ lrms_url = "gram://qb1.loni.org/jobmanager-pbs" # resource url
+ project = "loni_jha_big" #allocation
+ queue = "workq" # queue (PBS)
+ workingdirectory="/tmp" # working directory
+ userproxy = None # userproxy (not supported yet due to context issue w/ SAGA)
+
+ # start Glide-In job (Replica-Agent)
+ print "Start Glide-In at: " + lrms_url
+ advert_glidin_job = advert_job.advert_glidin_job(advert_host)
+ advert_glidin_job.start_glidin_job(lrms_url,
+ re_agent,
+ nodes,
+ queue,
+ project,
+ workingdirectory,
+ userproxy)
+ print "BigJob URL: " + advert_glidin_job.glidin_url
+
+ # submit sub-job through big-job
+ jd = saga.job.description()
+ jd.executable = "/home/luckow/src/REMDgManager/bigjob/main"
+ jd.number_of_processes = "2"
+ jd.spmd_variation = "mpi"
+ jd.arguments = [""]
+ # !!Adjust!!
+ jd.working_directory = "/home/luckow"
+ jd.output = "output.txt"
+ jd.error = "error.txt"
+ advert_job = advert_job.advert_job(advert_host)
+ advert_job.submit_job(advert_glidin_job.glidin_url, jd)
+
+ # busy wait for completion
+ while 1:
+ state = str(advert_job.get_state())
+ print "state: " + state
+ if(state=="Failed" or state=="Done"):
+ break
+ time.sleep(10)
+
+ #Cleanup
+ advert_glidin_job.cancel()
File [added]: yye03.py
Delta lines: +135 -0
===================================================================
--- applications/Lazarus/yye03.py 2009-01-13 18:35:11 UTC (rev 848)
+++ applications/Lazarus/yye03.py 2009-01-16 16:44:59 UTC (rev 849)
@@ -0,0 +1,135 @@
+#!/usr/bin/env python
+
+"""Module many_job.
+
+This Module is used to launch a set of bigjobs.
+
+"""
+
+import sys
+import getopt
+import saga
+import time
+import pdb
+import os
+import traceback
+import advert_job
+import logging
+import many_job
+
+"""
+These are parameters that you want to change for the jobs
+"""
+NUMBER_STAGES = 2
+NUMBER_JOBS_PER_STAGE = 2
+WORK_DIR = "/work/yye00/ICAC/Simulations/"
+SIMULATION_DIR_SUFFIX = "Simulation_id_"
+SIMULATION_EXE = "/work/yye00/ICAC/cactus_SAGASandTank"
+SIMULATION_PAR_FILE_SUFFIX = "/work/yye00/ICAC/ParFiles/ParFile_id_"
+ENKF_EXE = "/bin/date"
+CHECK_OUTPUT_CMD = "find ./* -iname *.h5 | xargs h5ls |grep \"BLACKOILBASE::Pw\""
+
+"""
+Now create the work directories by iterating over the range and creating sub-directories
+This works fine for LOCAL SYSTEMS ONLY and needs to be revisited for other systems
+"""
+for i in range(0, NUMBER_JOBS_PER_STAGE):
+ simdirname = WORK_DIR + SIMULATION_DIR_SUFFIX + "%05d" %i
+ if not os.path.isdir(simdirname+"/"):
+ os.mkdir(simdirname+"/")
+"""
+These are the arrays of parameters for the various jobs, you only need the sizes if they are different
+"""
+#from numarray import *
+#JOB_SIZES=ones(NUMBER_JOBS_PER_STAGE)
+#JOB_SIZES[1:NUMBER_JOBS_PER_STAGE] = 4
+JOB_SIZES=[]
+for i in range(0, NUMBER_JOBS_PER_STAGE):
+ JOB_SIZES.append(4)
+
+def has_finished(state):
+ state = state.lower()
+ if state=="done" or state=="failed" or state=="canceled":
+ return True
+ else:
+ return False
+
+""" This has been modified with a loop for all stages """
+if __name__ == "__main__":
+ # Store the start time and run time for all stages
+ starttime=[]
+ runtime=[]
+ try:
+ for stage in range(0, NUMBER_STAGES):
+ print "Launching " + str(NUMBER_JOBS_PER_STAGE) + " jobs for Stage number " + str(stage)
+ starttime.append(time.time())
+
+ # submit via mj abstraction
+ resource_list = ( {"gram_url" : "gram://qb1.loni.org/jobmanager-pbs", "number_cores" : "8", "allocation" : "loni_cybertools", "queue" : "workq", "re_agent": "/home/yye00/ICAC/bigjob/advert_launcher.sh"},
+ {"gram_url" : "gram://qb1.loni.org/jobmanager-pbs", "number_cores" : "8", "allocation" : "loni_cybertools", "queue" : "workq", "re_agent": "/home/yye00/ICAC/bigjob/advert_launcher.sh"})
+ print "Create manyjob service "
+ mjs = many_job.many_job_service(resource_list, None)
+
+ jobs = []
+ job_start_times = {}
+ job_states = {}
+ cwd = os.getcwd()
+ for i in range(0, NUMBER_JOBS_PER_STAGE):
+ # create job description
+ jd = saga.job.description()
+ jd.executable = SIMULATION_EXE
+ jd.number_of_processes = str(JOB_SIZES[i])
+ jd.spmd_variation = "mpi"
+ # toggle between these two statements depending on the parfile directory
+ #jd.arguments = [SIMULATION_PAR_FILE_SUFFIX+"%05d" %i +".par"]
+ jd.arguments=[SIMULATION_PAR_FILE_SUFFIX+"All.par"]
+ jd.working_directory = WORK_DIR + SIMULATION_DIR_SUFFIX + "%05d" %i + "/"
+ jd.output = jd.working_directory + "stdout.txt"
+ jd.error = jd.working_directory + "stderr.txt"
+ subjob = mjs.create_job(jd)
+ subjob.run()
+ print "Submited sub-job " + "%05d"%i + "."
+ jobs.append(subjob)
+ job_start_times[subjob]=time.time()
+ job_states[subjob] = subjob.get_state()
+ print "************************ All Jobs submitted ************************"
+ print "And now we wait for each stage to finish"
+
+ while 1:
+ finish_counter=0
+ result_map = {}
+ for i in range(0, NUMBER_JOBS_PER_STAGE):
+ old_state = job_states[jobs[i]]
+ state = jobs[i].get_state()
+ if result_map.has_key(state) == False:
+ result_map[state]=0
+ result_map[state] = result_map[state]+1
+ #print "counter: " + str(i) + " job: " + str(jobs[i]) + " state: " + state
+ if old_state != state:
+ print "Job " + str(jobs[i]) + " changed from: " + old_state + " to " + state
+ if old_state != state and has_finished(state)==True:
+ print "Job: " + str(jobs[i]) + " Runtime: " + str(time.time()-job_start_times[jobs[i]]) + " s."
+ if has_finished(state)==True:
+ finish_counter = finish_counter + 1
+ job_states[jobs[i]]=state
+
+ print "Current states: " + str(result_map)
+ time.sleep(5)
+ if finish_counter == NUMBER_JOBS_PER_STAGE:
+ break
+ runtime.append( time.time()-starttime[stage])
+ print "Runtime for Stage "+ str(stage)+ " is: " + str(runtime[stage]) + " s; Runtime per Job: " + str(runtime[stage]/NUMBER_JOBS_PER_STAGE)
+ # Now we finished running all the executables, we need to do the checking
+
+
+
+ print "finished with Stage number: " + str(stage)
+ mjs.cancel()
+
+ except:
+ traceback.print_exc(file=sys.stdout)
+ try:
+ mjs.cancel()
+ except:
+ pass
+
File [added]: yye04.py
Delta lines: +199 -0
===================================================================
--- applications/Lazarus/yye04.py 2009-01-13 18:35:11 UTC (rev 848)
+++ applications/Lazarus/yye04.py 2009-01-16 16:44:59 UTC (rev 849)
@@ -0,0 +1,199 @@
+#!/usr/bin/env python
+
+"""Module many_job.
+
+This Module is used to launch a set of bigjobs.
+
+"""
+
+import sys
+import getopt
+import saga
+import time
+import pdb
+import os
+import traceback
+import advert_job
+import logging
+import many_job
+import Calibrate
+import FileTest
+
+"""
+These are parameters that you want to change for the jobs
+"""
+NUMBER_STAGES = 2
+NUMBER_JOBS_PER_STAGE = 2
+WORK_DIR = "/work/yye00/ICAC/Simulations/"
+SIMULATION_DIR_PREFIX = "Simulation_id_"
+SIMULATION_EXE = "/work/yye00/ICAC/cactus_SAGASandTank"
+SIMULATION_PAR_FILE_PREFIX = "/work/yye00/ICAC/ParFiles/ParFile_id_"
+ENKF_EXE = "/bin/date"
+OUTPUT_FILE_EXTENSION = "h5"
+
+"""
+Now create the work directories by iterating over the range and creating sub-directories
+This works fine for LOCAL SYSTEMS ONLY and needs to be revisited for other systems
+"""
+for i in range(0, NUMBER_JOBS_PER_STAGE):
+ simdirname = WORK_DIR + SIMULATION_DIR_PREFIX + "%05d" %i
+ if not os.path.isdir(simdirname+"/"):
+ os.mkdir(simdirname+"/")
+"""
+These are the arrays of parameters for the various jobs, you only need the sizes if they are different
+"""
+JOB_SIZES=[]
+for i in range(0, NUMBER_JOBS_PER_STAGE):
+ JOB_SIZES.append(4)
+
+def has_finished(state):
+ state = state.lower()
+ if state=="done" or state=="failed" or state=="canceled":
+ return True
+ else:
+ return False
+
+""" This has been modified with a loop for all stages """
+if __name__ == "__main__":
+ # Store the start time and run time for all stages
+ starttime=[]
+ runtime=[]
+ try:
+ for stage in range(0, NUMBER_STAGES):
+ print "Launching " + str(NUMBER_JOBS_PER_STAGE) + " jobs for Stage number " + str(stage)
+ starttime.append(time.time())
+
+ # submit via mj abstraction
+ resource_list = ( {"gram_url" : "gram://qb1.loni.org/jobmanager-pbs", "number_cores" : "8", "allocation" : "loni_cybertools", "queue" : "workq", "re_agent": "/home/yye00/ICAC/bigjob/advert_launcher.sh"},
+ {"gram_url" : "gram://qb1.loni.org/jobmanager-pbs", "number_cores" : "8", "allocation" : "loni_cybertools", "queue" : "workq", "re_agent": "/home/yye00/ICAC/bigjob/advert_launcher.sh"})
+ print "Create manyjob service "
+ mjs = many_job.many_job_service(resource_list, None)
+
+ jobs = []
+ job_start_times = {}
+ job_states = {}
+ cwd = os.getcwd()
+ for i in range(0, NUMBER_JOBS_PER_STAGE):
+ # create job description
+ jd = saga.job.description()
+ jd.executable = SIMULATION_EXE
+ jd.number_of_processes = str(JOB_SIZES[i])
+ jd.spmd_variation = "mpi"
+ # toggle between these two statements depending on the parfile directory
+ #jd.arguments = [SIMULATION_PAR_FILE_PREFIX+"%05d" %i +".par"]
+ jd.arguments=[SIMULATION_PAR_FILE_PREFIX+"All.par"]
+ jd.working_directory = WORK_DIR + SIMULATION_DIR_PREFIX + "%05d" %i + "/"
+ jd.output = jd.working_directory + "stdout.txt"
+ jd.error = jd.working_directory + "stderr.txt"
+ subjob = mjs.create_job(jd)
+ subjob.run()
+ print "Submited sub-job " + "%05d"%i + "."
+ jobs.append(subjob)
+ job_start_times[subjob]=time.time()
+ job_states[subjob] = subjob.get_state()
+ print "************************ All Jobs submitted ************************"
+ print "And now we wait for each stage to finish"
+
+ while 1:
+ finish_counter=0
+ result_map = {}
+ for i in range(0, NUMBER_JOBS_PER_STAGE):
+ old_state = job_states[jobs[i]]
+ state = jobs[i].get_state()
+ if result_map.has_key(state) == False:
+ result_map[state]=0
+ result_map[state] = result_map[state]+1
+ #print "counter: " + str(i) + " job: " + str(jobs[i]) + " state: " + state
+ if old_state != state:
+ print "Job " + str(jobs[i]) + " changed from: " + old_state + " to " + state
+ if old_state != state and has_finished(state)==True:
+ print "Job: " + str(jobs[i]) + " Runtime: " + str(time.time()-job_start_times[jobs[i]]) + " s."
+ if has_finished(state)==True:
+ finish_counter = finish_counter + 1
+ job_states[jobs[i]]=state
+
+ print "Current states: " + str(result_map)
+ time.sleep(5)
+ if finish_counter == NUMBER_JOBS_PER_STAGE:
+ break
+ runtime.append( time.time()-starttime[stage])
+ print "#################################################"
+ print "Runtime for Stage "+ str(stage)+ " is: " + str(runtime[stage]) + " s; Runtime per Job: " + str(runtime[stage]/NUMBER_JOBS_PER_STAGE)
+ print "finished with Stage number: " + str(stage)
+ print "#################################################"
+ print "Performing checking"
+ print "Running Calibrate.py"
+ if(Calibrate.Calibrate() == 1):
+ print "Calibration successful"
+ else:
+ print "Calibration failed, proceed with caution"
+
+ print "#################################################"
+ print "Checking the actual output files"
+ RERUN_LIST= FileTest.FileTest(WORK_DIR, SIMULATION_DIR_PREFIX, OUTPUT_FILE_EXTENSION, NUMBER_JOBS_PER_STAGE)
+ if len(RERUN_LIST)!=0:
+ print "THE FOLLOWING JOBS FAILED: " + str(out)
+ print "Attempting to re-submit"
+ rerun_jobs = []
+ rerun_job_start_times = {}
+ rerun_job_states = {}
+ cwd = os.getcwd()
+ for i in RERUN_LIST:
+ # create job description
+ jd = saga.job.description()
+ jd.executable = SIMULATION_EXE
+ jd.number_of_processes = str(JOB_SIZES[i])
+ jd.spmd_variation = "mpi"
+ # toggle between these two statements depending on the parfile directory
+ #jd.arguments = [SIMULATION_PAR_FILE_PREFIX+"%05d" %i +".par"]
+ jd.arguments=[SIMULATION_PAR_FILE_PREFIX+"All.par"]
+ jd.working_directory = WORK_DIR + SIMULATION_DIR_PREFIX + "%05d" %i + "/"
+ jd.output = jd.working_directory + "stdout.txt"
+ jd.error = jd.working_directory + "stderr.txt"
+ subjob = mjs.create_job(jd)
+ subjob.run()
+ print "Submited sub-job " + "%05d"%i + "."
+ rerun_jobs.append(subjob)
+ rerun_job_start_times[subjob]=time.time()
+ rerun_job_states[subjob] = subjob.get_state()
+ print "************************ All rerun_jobs submitted ************************"
+ print "And now we wait for the reruns to finish"
+
+ while 1:
+ rerun_finish_counter=0
+ rerun_result_map = {}
+ for i in range(0, len(RERUN_LIST)):
+ old_state = rerun_job_states[rerun_jobs[i]]
+ state = rerun_jobs[i].get_state()
+ if rerun_result_map.has_key(state) == False:
+ rerun_result_map[state]=0
+ rerun_result_map[state] = rerun_result_map[state]+1
+ #print "counter: " + str(i) + " job: " + str(rerun_jobs[i]) + " state: " + state
+ if old_state != state:
+ print "Job " + str(rerun_jobs[i]) + " changed from: " + old_state + " to " + state
+ if old_state != state and has_finished(state)==True:
+ print "Job: " + str(rerun_jobs[i]) + " Runtime: " + str(time.time()-rerun_job_start_times[rerun_jobs[i]]) + " s."
+ if has_finished(state)==True:
+ finish_counter = finish_counter + 1
+ rerun_job_states[rerun_jobs[i]]=state
+
+ print "Current states: " + str(result_map)
+ time.sleep(5)
+ if finish_counter == len(RERUN_LIST):
+ break
+ print "Finished the resubmit, now we double check the files"
+ RERUN_LIST2= FileTest.FileTest(WORK_DIR, SIMULATION_DIR_PREFIX, OUTPUT_FILE_EXTENSION, NUMBER_JOBS_PER_STAGE)
+ if(len(RERUN_LIST2)!=0):
+ print "We have catastrophic failure that we cannot recover from, aborting"
+ sys.exit()
+ else:
+ print "Success: all jobs finished properly"
+
+ mjs.cancel()
+
+ except:
+ traceback.print_exc(file=sys.stdout)
+ try:
+ mjs.cancel()
+ except:
+ pass
More information about the saga-devel
mailing list