[Saga-devel] saga-projects SVN commit 849: /applications/Lazarus/

yye00 at cct.lsu.edu yye00 at cct.lsu.edu
Fri Jan 16 10:45:00 CST 2009


User: yye00
Date: 2009/01/16 10:45 AM

Added:
 /applications/Lazarus/
  Calibrate.py, FileTest.py, yye00.py, yye01.py, yye03.py, yye04.py

Log:
 Adding Lazarus scripts to make sure they are not purged by sys admins

File Changes:

Directory: /applications/Lazarus/
=================================

File [added]: Calibrate.py
Delta lines: +68 -0
===================================================================
--- applications/Lazarus/Calibrate.py	2009-01-13 18:35:11 UTC (rev 848)
+++ applications/Lazarus/Calibrate.py	2009-01-16 16:44:59 UTC (rev 849)
@@ -0,0 +1,68 @@
+#!/usr/bin/env python
+
+import os
+import sys
+import string
+import math
+import traceback
+from string import * 
+from re import *
+
+# begin the checking module
+def Calibrate():
+    # This script performs a calibrated test of the commands we will use in checking
+    # the output from each stage
+    
+    # directory where the calibration file we want to check lives:
+    CALIBRATE_FILE_DIR = "/work/yye00/ICAC/Calibrate/"
+    # the calibration filename
+    CALIBRATE_FILENAME = "Calibrate"
+    # the calibration filename extension, important for hdf5 versus F5
+    CALIBRATE_EXTENSION = ".h5"
+    # The find command test
+    FIND_CMD = 'find ' + CALIBRATE_FILE_DIR + '. -name "*.h5" -print'
+    
+    # Flags that specify the error
+    EXIST_FLAG = 0
+    FIND_FLAG = 0
+    SIZE_FLAG = 0
+    SANITY_FLAG = 0
+    
+    # get the full filename
+    filename = CALIBRATE_FILE_DIR + CALIBRATE_FILENAME + CALIBRATE_EXTENSION
+    # Check that the file exists using os module
+    try:        
+        if(os.path.exists(filename)):
+            EXIST_FLAG = 1
+    except:
+        traceback.print_exc(file=sys.stdout)
+     
+    # Check that we can search for  the file using python
+    try:
+        cmd_out = os.popen(FIND_CMD).readlines()
+        file_out = cmd_out[0]
+        file_out=file_out.replace('/./','/')
+        file_out=file_out.rstrip("\n")
+        if(file_out==filename):
+            FIND_FLAG = 1
+    except:
+        traceback.print_exc(file=sys.stdout)
+    
+    # Check that the file is not zero sized
+    try:
+        fsize= os.path.getsize(filename)
+        if(fsize>0):
+            SIZE_FLAG = 1
+    except:
+        traceback.print_exc(file=sys.stdout)
+        
+    # Check that we passed all three tests
+    try:
+        if(EXIST_FLAG*FIND_FLAG*SIZE_FLAG==1):
+            SANITY_FLAG = 1
+            return SANITY_FLAG 
+        else:
+            return -1 
+    except:
+        traceback.print_exc(file=sys.stdout)            
+

File [added]: FileTest.py
Delta lines: +62 -0
===================================================================
--- applications/Lazarus/FileTest.py	2009-01-13 18:35:11 UTC (rev 848)
+++ applications/Lazarus/FileTest.py	2009-01-16 16:44:59 UTC (rev 849)
@@ -0,0 +1,62 @@
+#!/usr/bin/env python
+
+import os
+import sys
+import string
+import math
+import traceback
+from string import * 
+import re
+
+def filterdups(iterable):
+    result = []
+    for item in iterable:
+        if item not in result:
+            result.append(item)
+    return result
+
+def list_difference(list1, list2):
+    diff_list = []
+    for item in list1:
+        if not item in list2:
+            diff_list.append(item)
+    return diff_list
+
+# begin the checking module
+def FileTest(TEST_FILE_DIR,  TEST_DIR_SUFFIX,  TEST_FILE_EXTENSION,  NUMBER_OF_JOBS_PER_STAGE):
+    # The list of all the simulations we have files for
+    ALLSIMS_LIST=[]
+    # The list of simulations that need to be re-run:
+    RERUN_LIST=[]
+
+    # The find command test
+    FIND_CMD = 'find ' + TEST_FILE_DIR +"/*" + '  -name *.' + TEST_FILE_EXTENSION+' -print'
+     
+    # Check that we can search for  the file using python
+    try:
+        cmd_out = os.popen(FIND_CMD).readlines()
+        # Get the list of all the Simulations
+        for file_out in cmd_out:
+            ftemp = re.search(TEST_DIR_SUFFIX + "([0-9]*)", file_out).group()
+            ALLSIMS_LIST.append(atoi( re.sub(TEST_DIR_SUFFIX,'',ftemp)))
+        ALLSIMS_LIST=sorted(ALLSIMS_LIST)
+        ALLSIMS_LIST=filterdups(ALLSIMS_LIST)
+        
+        # Check that the files are not zeroes
+        for file_out in cmd_out:
+            fsize = os.path.getsize(file_out.rstrip("\n"))
+            if(fsize==0):
+                ftemp = re.search(TEST_DIR_SUFFIX + "([0-9]*)", file_out).group()
+                RERUN_LIST.append(atoi( re.sub(TEST_DIR_SUFFIX,'',ftemp) ) )
+
+        # Check that the number of simulations is the number of jobs
+        if(len(ALLSIMS_LIST) !=NUMBER_OF_JOBS_PER_STAGE):
+            original = range(0, NUMBER_OF_JOBS_PER_STAGE)
+            difference = list_difference(original, ALLSIMS_LIST)
+            print original
+            print difference
+            RERUN_LIST=RERUN_LIST+(difference)
+        return RERUN_LIST
+    except:
+        traceback.print_exc(file=sys.stdout)
+    

File [added]: yye00.py
Delta lines: +99 -0
===================================================================
--- applications/Lazarus/yye00.py	2009-01-13 18:35:11 UTC (rev 848)
+++ applications/Lazarus/yye00.py	2009-01-16 16:44:59 UTC (rev 849)
@@ -0,0 +1,99 @@
+#!/usr/bin/env python
+
+"""Module many_job.
+
+This Module is used to launch a set of bigjobs.
+
+"""
+
+import sys
+import getopt
+import saga
+import time
+import pdb
+import os
+import traceback
+import advert_job
+import logging
+import many_job
+
+NUMBER_JOBS=256
+
+def has_finished(state):
+        state = state.lower()
+        if state=="done" or state=="failed" or state=="canceled":
+            return True
+        else:
+            return False
+
+""" Test Job Submission via ManyJob abstraction """
+if __name__ == "__main__":
+    try:
+        print "ManyJob load test with " + str(NUMBER_JOBS) + " jobs."
+        starttime=time.time()
+
+        # submit via mj abstraction
+        resource_list =  ( {"gram_url" : "gram://qb1.loni.org/jobmanager-pbs", "number_cores" : "8", "allocation" : "loni_cybertools", "queue" : "workq", "re_agent": "/home/yye00/ICAC/bigjob/advert_launcher.sh"},
+                           {"gram_url" : "gram://qb1.loni.org/jobmanager-pbs", "number_cores" : "8", "allocation" : "loni_cybertools", "queue" : "workq", "re_agent": "/home/yye00/ICAC/bigjob/advert_launcher.sh"})
+
+
+
+        #resource_list = []
+        #resource_list.append({"gram_url" : "gram://qb1.loni.org/jobmanager-pbs", "number_cores" : "16", "allocation" : "loni_cybtertools", "queue" : "workq", "re_agent": "$(HOME)/src/REMDgManager/bigjob/advert_launcher.sh"})
+        print "Create manyjob service "
+        mjs = many_job.many_job_service(resource_list, None)
+        
+        jobs = []
+        job_start_times = {}
+        job_states = {}
+        cwd = os.getcwd()
+        for i in range(0, NUMBER_JOBS):
+            # create job description
+            jd = saga.job.description()
+            jd.executable = "/bin/date"
+            jd.number_of_processes = "1"
+            jd.spmd_variation = "single"
+            jd.arguments = [""]
+            jd.working_directory = "/work/yye00"
+            jd.output =  "/work/yye00/output/stdout-" + str(i) + ".txt"
+            jd.error = "/work/yye00/output/stderr-" + str(i) + ".txt"
+            subjob = mjs.create_job(jd)
+            subjob.run()
+            print "Submited sub-job " + "%d"%i + "."
+            jobs.append(subjob)
+            job_start_times[subjob]=time.time()
+            job_states[subjob] = subjob.get_state()
+        print "************************ All Jobs submitted ************************"
+        while 1: 
+            finish_counter=0
+            result_map = {}
+            for i in range(0, NUMBER_JOBS):
+                old_state = job_states[jobs[i]]
+                state = jobs[i].get_state()
+                if result_map.has_key(state) == False:
+                    result_map[state]=0
+                result_map[state] = result_map[state]+1
+                #print "counter: " + str(i) + " job: " + str(jobs[i]) + " state: " + state
+                if old_state != state:
+                    print "Job " + str(jobs[i]) + " changed from: " + old_state + " to " + state
+                if old_state != state and has_finished(state)==True:
+                     print "Job: " + str(jobs[i]) + " Runtime: " + str(time.time()-job_start_times[jobs[i]]) + " s."
+                if has_finished(state)==True:
+                     finish_counter = finish_counter + 1
+                job_states[jobs[i]]=state
+
+            print "Current states: " + str(result_map) 
+            time.sleep(5)
+            if finish_counter == NUMBER_JOBS:
+                break
+
+        mjs.cancel()
+        runtime = time.time()-starttime
+        print "Runtime: " + str(runtime) + " s; Runtime per Job: " + str(runtime/NUMBER_JOBS)
+    except:
+        traceback.print_exc(file=sys.stdout)
+        try:
+            mjs.cancel()
+        except:
+            pass
+

File [added]: yye01.py
Delta lines: +58 -0
===================================================================
--- applications/Lazarus/yye01.py	2009-01-13 18:35:11 UTC (rev 848)
+++ applications/Lazarus/yye01.py	2009-01-16 16:44:59 UTC (rev 849)
@@ -0,0 +1,58 @@
+""" Example application demonstrating job submission via bigjob 
+    advert_job implementation of BigJob is used
+"""
+
+import saga
+import os
+import advert_job
+import time
+
+advert_host = "fortytwo.cct.lsu.edu"
+
+""" Test Job Submission via Advert """
+if __name__ == "__main__":
+
+    # Parameter for BigJob
+    re_agent = os.getcwd() + "/advert_launcher.sh" # path to agent
+    nodes = 64 # number nodes for agent
+    lrms_url = "gram://qb1.loni.org/jobmanager-pbs" # resource url
+    project = "loni_jha_big" #allocation
+    queue = "workq" # queue (PBS)
+    workingdirectory="/tmp"  # working directory
+    userproxy = None # userproxy (not supported yet due to context issue w/ SAGA)
+
+    # start Glide-In job (Replica-Agent)
+    print "Start Glide-In at: " + lrms_url
+    advert_glidin_job = advert_job.advert_glidin_job(advert_host)
+    advert_glidin_job.start_glidin_job(lrms_url,
+                                        re_agent,
+                                        nodes,
+                                        queue,
+                                        project,
+                                        workingdirectory, 
+                                        userproxy)
+    print "BigJob URL: " + advert_glidin_job.glidin_url
+
+    # submit sub-job through big-job
+    jd = saga.job.description()
+    jd.executable = "/home/luckow/src/REMDgManager/bigjob/main"
+    jd.number_of_processes = "2"
+    jd.spmd_variation = "mpi"
+    jd.arguments = [""]
+    # !!Adjust!!
+    jd.working_directory = "/home/luckow"
+    jd.output = "output.txt"
+    jd.error = "error.txt"
+    advert_job = advert_job.advert_job(advert_host)
+    advert_job.submit_job(advert_glidin_job.glidin_url, jd)
+    
+    # busy wait for completion
+    while 1:
+        state = str(advert_job.get_state())
+        print "state: " + state
+        if(state=="Failed" or state=="Done"):
+            break
+        time.sleep(10)
+
+    #Cleanup
+    advert_glidin_job.cancel()

File [added]: yye03.py
Delta lines: +135 -0
===================================================================
--- applications/Lazarus/yye03.py	2009-01-13 18:35:11 UTC (rev 848)
+++ applications/Lazarus/yye03.py	2009-01-16 16:44:59 UTC (rev 849)
@@ -0,0 +1,135 @@
+#!/usr/bin/env python
+
+"""Module many_job.
+
+This Module is used to launch a set of bigjobs.
+
+"""
+
+import sys
+import getopt
+import saga
+import time
+import pdb
+import os
+import traceback
+import advert_job
+import logging
+import many_job
+
+"""
+These are parameters that you want to change for the jobs
+"""
+NUMBER_STAGES                     =  2
+NUMBER_JOBS_PER_STAGE      = 2
+WORK_DIR                               = "/work/yye00/ICAC/Simulations/"
+SIMULATION_DIR_SUFFIX          = "Simulation_id_"
+SIMULATION_EXE                      = "/work/yye00/ICAC/cactus_SAGASandTank"
+SIMULATION_PAR_FILE_SUFFIX = "/work/yye00/ICAC/ParFiles/ParFile_id_"
+ENKF_EXE                                 =  "/bin/date"
+CHECK_OUTPUT_CMD               = "find ./* -iname *.h5 | xargs h5ls |grep \"BLACKOILBASE::Pw\""
+
+"""
+Now create the work directories by iterating over the range and creating sub-directories
+This works fine for LOCAL SYSTEMS ONLY and needs to be revisited for other systems
+"""
+for i in range(0, NUMBER_JOBS_PER_STAGE):
+    simdirname = WORK_DIR + SIMULATION_DIR_SUFFIX + "%05d" %i
+    if not os.path.isdir(simdirname+"/"):
+        os.mkdir(simdirname+"/")
+"""
+These are the arrays of parameters for the  various jobs, you only need the sizes if they are different
+"""
+#from numarray import *
+#JOB_SIZES=ones(NUMBER_JOBS_PER_STAGE)
+#JOB_SIZES[1:NUMBER_JOBS_PER_STAGE] = 4
+JOB_SIZES=[]
+for i in range(0, NUMBER_JOBS_PER_STAGE):
+  JOB_SIZES.append(4)
+
+def has_finished(state):
+        state = state.lower()
+        if state=="done" or state=="failed" or state=="canceled":
+            return True
+        else:
+            return False
+
+""" This has been modified with a loop for all stages """
+if __name__ == "__main__":
+    # Store the start time and run time for all stages
+    starttime=[]
+    runtime=[]
+    try:
+        for stage in range(0,  NUMBER_STAGES):
+            print "Launching " + str(NUMBER_JOBS_PER_STAGE) + " jobs for Stage number " + str(stage)
+            starttime.append(time.time())
+
+            # submit via mj abstraction
+            resource_list =  ( {"gram_url" : "gram://qb1.loni.org/jobmanager-pbs", "number_cores" : "8", "allocation" : "loni_cybertools", "queue" : "workq", "re_agent": "/home/yye00/ICAC/bigjob/advert_launcher.sh"},
+                           {"gram_url" : "gram://qb1.loni.org/jobmanager-pbs", "number_cores" : "8", "allocation" : "loni_cybertools", "queue" : "workq", "re_agent": "/home/yye00/ICAC/bigjob/advert_launcher.sh"})
+            print "Create manyjob service "
+            mjs = many_job.many_job_service(resource_list, None)
+        
+            jobs = []
+            job_start_times = {}
+            job_states = {}
+            cwd = os.getcwd()
+            for i in range(0, NUMBER_JOBS_PER_STAGE):
+                # create job description
+                jd = saga.job.description()
+                jd.executable = SIMULATION_EXE
+                jd.number_of_processes = str(JOB_SIZES[i])
+                jd.spmd_variation = "mpi"
+                # toggle between these two statements depending on the parfile directory
+                #jd.arguments = [SIMULATION_PAR_FILE_SUFFIX+"%05d" %i +".par"]
+                jd.arguments=[SIMULATION_PAR_FILE_SUFFIX+"All.par"]
+                jd.working_directory = WORK_DIR + SIMULATION_DIR_SUFFIX + "%05d" %i + "/"
+                jd.output =  jd.working_directory + "stdout.txt"
+                jd.error = jd.working_directory + "stderr.txt"
+                subjob = mjs.create_job(jd)
+                subjob.run()
+                print "Submited sub-job " + "%05d"%i + "."
+                jobs.append(subjob)
+                job_start_times[subjob]=time.time()
+                job_states[subjob] = subjob.get_state()
+            print "************************ All Jobs submitted ************************"
+            print "And now we wait for each stage to finish"
+            
+            while 1: 
+                finish_counter=0
+                result_map = {}
+                for i in range(0, NUMBER_JOBS_PER_STAGE):
+                    old_state = job_states[jobs[i]]
+                    state = jobs[i].get_state()
+                    if result_map.has_key(state) == False:
+                        result_map[state]=0
+                    result_map[state] = result_map[state]+1
+                    #print "counter: " + str(i) + " job: " + str(jobs[i]) + " state: " + state
+                    if old_state != state:
+                        print "Job " + str(jobs[i]) + " changed from: " + old_state + " to " + state
+                    if old_state != state and has_finished(state)==True:
+                         print "Job: " + str(jobs[i]) + " Runtime: " + str(time.time()-job_start_times[jobs[i]]) + " s."
+                    if has_finished(state)==True:
+                         finish_counter = finish_counter + 1
+                    job_states[jobs[i]]=state
+    
+                print "Current states: " + str(result_map) 
+                time.sleep(5)
+                if finish_counter == NUMBER_JOBS_PER_STAGE:
+                    break
+            runtime.append( time.time()-starttime[stage])
+            print "Runtime for Stage "+ str(stage)+ " is: " + str(runtime[stage]) + " s; Runtime per Job: " + str(runtime[stage]/NUMBER_JOBS_PER_STAGE)
+            # Now we finished running all the executables, we need to do the checking
+            
+
+            
+            print "finished with Stage number: " + str(stage)
+        mjs.cancel()
+
+    except:
+        traceback.print_exc(file=sys.stdout)
+        try:
+            mjs.cancel()
+        except:
+            pass
+

File [added]: yye04.py
Delta lines: +199 -0
===================================================================
--- applications/Lazarus/yye04.py	2009-01-13 18:35:11 UTC (rev 848)
+++ applications/Lazarus/yye04.py	2009-01-16 16:44:59 UTC (rev 849)
@@ -0,0 +1,199 @@
+#!/usr/bin/env python
+
+"""Module many_job.
+
+This Module is used to launch a set of bigjobs.
+
+"""
+
+import sys
+import getopt
+import saga
+import time
+import pdb
+import os
+import traceback
+import advert_job
+import logging
+import many_job
+import Calibrate
+import FileTest
+
+"""
+These are parameters that you want to change for the jobs
+"""
+NUMBER_STAGES                     =  2
+NUMBER_JOBS_PER_STAGE      = 2
+WORK_DIR                               = "/work/yye00/ICAC/Simulations/"
+SIMULATION_DIR_PREFIX          = "Simulation_id_"
+SIMULATION_EXE                      = "/work/yye00/ICAC/cactus_SAGASandTank"
+SIMULATION_PAR_FILE_PREFIX = "/work/yye00/ICAC/ParFiles/ParFile_id_"
+ENKF_EXE                                 =  "/bin/date"
+OUTPUT_FILE_EXTENSION         = "h5"
+
+"""
+Now create the work directories by iterating over the range and creating sub-directories
+This works fine for LOCAL SYSTEMS ONLY and needs to be revisited for other systems
+"""
+for i in range(0, NUMBER_JOBS_PER_STAGE):
+    simdirname = WORK_DIR + SIMULATION_DIR_PREFIX + "%05d" %i
+    if not os.path.isdir(simdirname+"/"):
+        os.mkdir(simdirname+"/")
+"""
+These are the arrays of parameters for the  various jobs, you only need the sizes if they are different
+"""
+JOB_SIZES=[]
+for i in range(0, NUMBER_JOBS_PER_STAGE):
+  JOB_SIZES.append(4)
+
+def has_finished(state):
+        state = state.lower()
+        if state=="done" or state=="failed" or state=="canceled":
+            return True
+        else:
+            return False
+
+""" This has been modified with a loop for all stages """
+if __name__ == "__main__":
+    # Store the start time and run time for all stages
+    starttime=[]
+    runtime=[]
+    try:
+        for stage in range(0,  NUMBER_STAGES):
+            print "Launching " + str(NUMBER_JOBS_PER_STAGE) + " jobs for Stage number " + str(stage)
+            starttime.append(time.time())
+
+            # submit via mj abstraction
+            resource_list =  ( {"gram_url" : "gram://qb1.loni.org/jobmanager-pbs", "number_cores" : "8", "allocation" : "loni_cybertools", "queue" : "workq", "re_agent": "/home/yye00/ICAC/bigjob/advert_launcher.sh"},
+                           {"gram_url" : "gram://qb1.loni.org/jobmanager-pbs", "number_cores" : "8", "allocation" : "loni_cybertools", "queue" : "workq", "re_agent": "/home/yye00/ICAC/bigjob/advert_launcher.sh"})
+            print "Create manyjob service "
+            mjs = many_job.many_job_service(resource_list, None)
+        
+            jobs = []
+            job_start_times = {}
+            job_states = {}
+            cwd = os.getcwd()
+            for i in range(0, NUMBER_JOBS_PER_STAGE):
+                # create job description
+                jd = saga.job.description()
+                jd.executable = SIMULATION_EXE
+                jd.number_of_processes = str(JOB_SIZES[i])
+                jd.spmd_variation = "mpi"
+                # toggle between these two statements depending on the parfile directory
+                #jd.arguments = [SIMULATION_PAR_FILE_PREFIX+"%05d" %i +".par"]
+                jd.arguments=[SIMULATION_PAR_FILE_PREFIX+"All.par"]
+                jd.working_directory = WORK_DIR + SIMULATION_DIR_PREFIX + "%05d" %i + "/"
+                jd.output =  jd.working_directory + "stdout.txt"
+                jd.error = jd.working_directory + "stderr.txt"
+                subjob = mjs.create_job(jd)
+                subjob.run()
+                print "Submited sub-job " + "%05d"%i + "."
+                jobs.append(subjob)
+                job_start_times[subjob]=time.time()
+                job_states[subjob] = subjob.get_state()
+            print "************************ All Jobs submitted ************************"
+            print "And now we wait for each stage to finish"
+            
+            while 1: 
+                finish_counter=0
+                result_map = {}
+                for i in range(0, NUMBER_JOBS_PER_STAGE):
+                    old_state = job_states[jobs[i]]
+                    state = jobs[i].get_state()
+                    if result_map.has_key(state) == False:
+                        result_map[state]=0
+                    result_map[state] = result_map[state]+1
+                    #print "counter: " + str(i) + " job: " + str(jobs[i]) + " state: " + state
+                    if old_state != state:
+                        print "Job " + str(jobs[i]) + " changed from: " + old_state + " to " + state
+                    if old_state != state and has_finished(state)==True:
+                         print "Job: " + str(jobs[i]) + " Runtime: " + str(time.time()-job_start_times[jobs[i]]) + " s."
+                    if has_finished(state)==True:
+                         finish_counter = finish_counter + 1
+                    job_states[jobs[i]]=state
+    
+                print "Current states: " + str(result_map) 
+                time.sleep(5)
+                if finish_counter == NUMBER_JOBS_PER_STAGE:
+                    break
+            runtime.append( time.time()-starttime[stage])
+            print "#################################################"
+            print "Runtime for Stage "+ str(stage)+ " is: " + str(runtime[stage]) + " s; Runtime per Job: " + str(runtime[stage]/NUMBER_JOBS_PER_STAGE)
+            print "finished with Stage number: " + str(stage)
+            print "#################################################"
+            print "Performing checking"
+            print "Running Calibrate.py"
+            if(Calibrate.Calibrate() == 1):
+                print "Calibration successful"
+            else:
+                print "Calibration failed, proceed with caution"
+            
+            print "#################################################"
+            print "Checking the actual output files"
+            RERUN_LIST= FileTest.FileTest(WORK_DIR, SIMULATION_DIR_PREFIX, OUTPUT_FILE_EXTENSION, NUMBER_JOBS_PER_STAGE)
+            if len(RERUN_LIST)!=0:
+                print "THE FOLLOWING JOBS FAILED: " + str(out)
+                print "Attempting to re-submit"
+                rerun_jobs = []
+                rerun_job_start_times = {}
+                rerun_job_states = {}
+                cwd = os.getcwd()
+                for i in RERUN_LIST:
+                    # create job description
+                    jd = saga.job.description()
+                    jd.executable = SIMULATION_EXE
+                    jd.number_of_processes = str(JOB_SIZES[i])
+                    jd.spmd_variation = "mpi"
+                    # toggle between these two statements depending on the parfile directory
+                    #jd.arguments = [SIMULATION_PAR_FILE_PREFIX+"%05d" %i +".par"]
+                    jd.arguments=[SIMULATION_PAR_FILE_PREFIX+"All.par"]
+                    jd.working_directory = WORK_DIR + SIMULATION_DIR_PREFIX + "%05d" %i + "/"
+                    jd.output =  jd.working_directory + "stdout.txt"
+                    jd.error = jd.working_directory + "stderr.txt"
+                    subjob = mjs.create_job(jd)
+                    subjob.run()
+                    print "Submited sub-job " + "%05d"%i + "."
+                    rerun_jobs.append(subjob)
+                    rerun_job_start_times[subjob]=time.time()
+                    rerun_job_states[subjob] = subjob.get_state()
+                print "************************ All rerun_jobs submitted ************************"
+                print "And now we wait for the reruns to finish"
+                
+                while 1: 
+                    rerun_finish_counter=0
+                    rerun_result_map = {}
+                    for i in range(0, len(RERUN_LIST)):
+                        old_state = rerun_job_states[rerun_jobs[i]]
+                        state = rerun_jobs[i].get_state()
+                        if rerun_result_map.has_key(state) == False:
+                            rerun_result_map[state]=0
+                        rerun_result_map[state] = rerun_result_map[state]+1
+                        #print "counter: " + str(i) + " job: " + str(rerun_jobs[i]) + " state: " + state
+                        if old_state != state:
+                            print "Job " + str(rerun_jobs[i]) + " changed from: " + old_state + " to " + state
+                        if old_state != state and has_finished(state)==True:
+                             print "Job: " + str(rerun_jobs[i]) + " Runtime: " + str(time.time()-rerun_job_start_times[rerun_jobs[i]]) + " s."
+                        if has_finished(state)==True:
+                             finish_counter = finish_counter + 1
+                        rerun_job_states[rerun_jobs[i]]=state
+        
+                    print "Current states: " + str(result_map) 
+                    time.sleep(5)
+                    if finish_counter == len(RERUN_LIST):
+                        break
+                print "Finished the resubmit, now we double check the files"
+                RERUN_LIST2= FileTest.FileTest(WORK_DIR, SIMULATION_DIR_PREFIX, OUTPUT_FILE_EXTENSION, NUMBER_JOBS_PER_STAGE)
+                if(len(RERUN_LIST2)!=0):
+                    print "We have catastrophic failure that we cannot recover from, aborting"
+                    sys.exit()
+            else:
+                print "Success: all jobs finished properly"
+            
+        mjs.cancel()
+
+    except:
+        traceback.print_exc(file=sys.stdout)
+        try:
+            mjs.cancel()
+        except:
+            pass



More information about the saga-devel mailing list