Versions Compared


  • This line was added.
  • This line was removed.
  • Formatting was changed.


Code Block --help


If the server is running locally the typical usage might be: ( This will load the definition into server with port 4141)

Code Block my_test.def --port 4141


Code Block
#!/usr/bin/env python2.7
# Name        :
# Author      : Avi
# Revision    : $Revision: #10 $
# Copyright 2009-20192020 ECMWF.
# This software is licensed under the terms of the Apache Licence version 2.0
# which can be obtained at
# In applying this licence, ECMWF does not waive the privileges and immunities
# granted to it by virtue of its status as an intergovernmental organisation
# nor does it submit to any jurisdiction.

# =============================================================================
# Code for testing *any* definition
#   Since any ad hoc definition will reference local directories in the
#   ECF_ variables, we need to remove them and inject our own.
#   This script is re-runnable, and hence will delete suites in the server
#   matching those in the input definition. Hence it is best to use this 
#   script with a *test* server to avoid accidentally deleting existing suites 
#   of the same name.
# =============================================================================
import ecflow
import os       # for getenv
import sys
import shutil   # used to remove directory tree
import argparse # for argument parsing     

def deleteget_variablesroot_affectingsource_job_generationdir(node): 
    """delete customer related ECF variables, these will point to directories
  cwd = os.getcwd()
    #print "get_root_source_dir from: " + cwd
     that don't exist. Its ok we will regenerate our own local ones"""
    var = node.find_variable("ECF_HOME"while (1):
        # Get to directory that has ecflow
        head, tail = os.path.split(cwd)
    if not var.empty() :      #print "   head:" + head
#print "   tail:" + tail
       var =if nodetail.find_variable("ECF_FILESecflow") != -1 :
      if  not var.empty() :  
# bjam, already at varthe = node.find_variable("ECF_INCLUDE")source directory
    if not var.empty() :  
   if os.path.exists(cwd    node.delete_variable("ECF_INCLUDE") + "/Jamroot.jam"): 
     var = node.find_variable("ECF_JOB_CMD")
    if not var.emptyprint()" :  
Found Jamroot.jam in "     node.delete_variable("ECF_JOB_CMD")+ cwd)
    var = node.find_variable("ECF_KILL_CMD")
    if not var.empty() : return cwd
    var = node.find_variable("ECF_STATUS_CMD")if tail != "Pyext" and tail != "migrate":
    if not var.empty() :  
   # in cmake, we  node.delete_variable("ECF_STATUS_CMD")    
    var = node.find_variable("ECF_OUT")may be in the build directory, hence we need to determine source directory
    if not var.empty() :  
   file = cwd +  node.delete_variable("ECF_OUT")"/CTestTestfile.cmake"
def traverse_container(node_container):
    """Recursively traverse definition node hierarchy and delete #print "   searching for " + file
       the  variables that affect jobif generation.os.path.exists(file):
    for node in node_container.nodes:
 # determine path by looking into  delete_variables_affecting_job_generation(node)this file:
        if  not isinstance(node, ecflow.Task)     with open(file) as fp:
if __name__ == "__main__"for line in fp:
    default_port = "3141"
    if "ECF_PORT" in os.environ:
       ## default_port = os.environ["ECF_PORT"]

Source directory: /tmp/ma0/workspace/ecflow/Acore
     default_host = "localhost"
    if "ECF_HOST" in os.environ:
        default_host  =if osline.environ["ECF_HOST"]

find("Source directory"):
    DESC = """Will allow any definition to be loaded and  played on the server
          tokens  This is done by:
= line.split()
               o Remove existing ECF_ variables that affect job generation. 
    if len(tokens) == 4:
       i.e. variables that refer to customer specific directories are removed
            o Allows ECF_HOME to specified,#print defaults" to cwd + /CUSTOMER/ECF_HOMEreturning root_source_dir:", tokens[3]
            o Generates the scripts(.ecf files) automatically based on the definition.
           return tokens[3]
  i.e. if a task has events,meters,labels then the client request for these are
  raise RuntimeError("ERROR could not find Source directory    in CTestTestfile.cmake")
     automatically injected in the generated .ecf script fileselse:
            o Will clear out existing data both on disk and on the server to allow raise RuntimeError("ERROR could not find file CTestTestfile.cmake in " + cwd)
              multiple re-runs of
 this script. ** If this is an issuecwd please= usehead
    return  cwd

def delete_variables_affecting_job_generation(node): 
  a test server **
        """delete customer related ECF variables, these will point to directories
    o All suites arethat put into a suspended statedon't exist. ThisIts allowsok thewe GUIwill toregenerate resumeour them
own local ones"""
    var = node.find_variable("ECF_HOME")
    oif The server is restarted and suites are begunnot var.empty() :  
        node.delete_variable("ECF_HOME")    This
 programs assumes that ecflowvar module= is accessible.node.find_variable("ECF_FILES")
    if not var.empty() :  
    PARSERvar = argparsenode.ArgumentParser(description=DESC,  find_variable("ECF_INCLUDE")
    if not var.empty() :  
    var     = node.find_variable("ECF_JOB_CMD")
    if   formatter_class=argparse.RawDescriptionHelpFormatter)
    PARSER.add_argument('defs_file',not var.empty() :  
    var = node.find_variable("ECF_KILL_CMD")
    if  help="The definition file")not var.empty() :  
    PARSER.add_argument('--host', default=default_host,    node.delete_variable("ECF_KILL_CMD")    
    var = node.find_variable("ECF_STATUS_CMD")
    if not var.empty() :  
         help="The name of the host machine, defaults to ECF_HOST otherwise 'localhost'")
    PARSER.add_argument('--port', default=default_port,   node.delete_variable("ECF_STATUS_CMD")    
    var = node.find_variable("ECF_OUT")
    if not var.empty() :  
   help="The port on the host, defaults to ECF_PORT otherwise uses 3141")
def traverse_container(node_container):
    PARSER.add_argument('--path', default="/",   
   """Recursively traverse definition node hierarchy and delete
       the variables that affect job generation.
     help="replace only thedelete_variables_affecting_job_generation(node_container)
    for node path in the suite")
     PARSER.add_argument('--ecf_home', default=os.getcwd() + "/CUSTOMER/ECF_HOME", delete_variables_affecting_job_generation(node)
        if not isinstance(node, ecflow.Task):
             help="Directory to be used for generated scripts(ECF_HOME), defaults to ./CUSTOMER/ECF_HOME")
    PARSER.add_argument('--verbose', nargs='?', default=False, const=True, type=bool,traverse_container(node)  
if __name__ == "__main__":
    DESC = """Will allow any definition to be loaded and played on the server
            This is done by:
            o Remove existing ECF_ variables that affect job generation. 
              i.e variables that refer to customer specific directories are removed
            o Allows ECF_HOME to specified, defaults to ./CUSTOMER/ECF_HOME
            o Generates the scripts(.ecf files) automatically based on the definition.
              i.e if a task has events,meters,labels then the client request for these are
              automatically injected in the generated .ecf script files
            o Will clear out existing data both on disk and on the server to allow 
              multiple re-runs of this script. ** If this is an issue please use
              a test server **
            o All suites are put into a suspended state. This allows the GUI to resume them
            o The server is restarted and suites are begun
            This programs assumes that ecflow module is accessible

                python Pyext/samples/ --port=3141 --verbose=True ANode/parser/test/data/good_defs/trigger/late.def
    print("Running ecflow version " + ecflow.Client().version()  + " debug build(" + str(ecflow.debug_build()) +")")
    if 'PYTHONPATH' in os.environ:
        print("PYTHONPATH: " + str(os.environ['PYTHONPATH'].split(os.pathsep)))
    print("sys.path:   " + str(sys.path))
    default_port = "3141"
    if "ECF_PORT" in os.environ:
         default_port = os.environ["ECF_PORT"]
    default_host = "localhost"
    if "ECF_HOST" in os.environ:
        default_host  = os.environ["ECF_HOST"]

    PARSER = argparse.ArgumentParser(description=DESC,  
                        help="The definition file")
    PARSER.add_argument('--host', default=default_host,   
                        help="The name of the host machine, defaults to 'localhost'")
    PARSER.add_argument('--port', default=default_port,   
                        help="The port on the host, defaults to 3141")
    PARSER.add_argument('--path', default="/",   
                        help="replace only the node path in the suite")
    PARSER.add_argument('--ecf_home', default=os.getcwd() + "/CUSTOMER/ECF_HOME",
                        help="Directory to be used for generated scripts(ECF_HOME), defaults to ./CUSTOMER/ECF_HOME")
    PARSER.add_argument('--verbose', nargs='?', default=False, const=True, type=bool,
                        help="Show verbose output")
    ARGS = PARSER.parse_args()
    ARGS.defs_file = os.path.expandvars(ARGS.defs_file) # expand references to any environment variables
    print(ARGS  )  
    # If running on local work space, use /Pyext/test/data/CUSTOMER/ECF_HOME as ecf_home
    using_workspace = False;
    ecflow_source_dir = ""
        ecflow_source_dir = get_root_source_dir();
        ARGS.ecf_home = ecflow_source_dir + "/Pyext/test/data/CUSTOMER/ECF_HOME"
        using_workspace = True
        if ARGS.verbose:
     help="Show verbose output")
    ARGS = PARSER.parse_args()
    ARGS.defs_file = os.path.expandvars(ARGS.defs_file) # expand references to any environment variables
    print ARGS print("Workspace is defined ecflow_source_dir: ",ecflow_source_dir)
    # If running on localpass
 work space, use /Pyext/test/data/CUSTOMER/ECF_HOME as ecf_home
    if ARGS.verbose:
        print ("Using ECF_HOME=" + ARGS.ecf_home)
        if ARGS.verbose: 
            print ("\nloading the definition from the input arguments(" + ARGS.defs_file + ")\n"
        DEFS = ecflow.Defs(ARGS.defs_file)
    except RuntimeError, as ex:
        print ("   ecflow.Defs(" + ARGS.defs_file + ") failed:\n" + str(ex))
    if ARGS.verbose: 
        print ("remove test data associated with the DEFS, so we start fresh, Allows rerun")
    for suite in DEFS.suites:
        dir_to_remove = ARGS.ecf_home + suite.get_abs_node_path()
        if ARGS.verbose: 
            print ("   Deleting directory: " + dir_to_remove + "\n")
        shutil.rmtree(dir_to_remove, True)  
    if ARGS.verbose: 
        print ("remove remote reference to ECF_HOME and ECF_INCLUDE, since we inject or own\n")
    for suite in DEFS.suites:
    ifDEFS.add_variable("ECF_HOME", ARGS.verbose: ecf_home)
    print "add variables required for script generation, for all suites\n"
    DEFS.add_variable("ECF_HOME", ARGS.ecf_homeif using_workspace: 
        path_to_ecflow_client = ecflow.File.find_client()
    DEFS.add_variable("SLEEP", "10")  # not strictly required since default is 1 second
        if os.path.exists( path_to_ecflow_client ):
            DEFS.add_variable("ECF_INCLUDECLIENT_EXE_PATH", ARGS.ecf_home + "/includes")

path_to_ecflow_client )
            if ARGS.verbose: print("Adding ECF_CLIENT_EXE_PATH:",path_to_ecflow_client)

    DEFS.add_variable("SLEEP", "10")  # printnot "Placestrictly allrequired suitessince intodefault suspendedis state, so they can be started by the GUI\n"  
1 second
    DEFS.add_variable("ECF_INCLUDE", ARGS.ecf_home + "/includes")

    for suite in DEFS.suites:
    if ARGS.verbose: 
        print (DEFS)

    if ARGS.verbose: 
        print ("Generating script files(.ecf) from the definition")

    if ARGS.verbose: 
        print ("\nchecking script file generation, pre-processing & variable substitution\n")
    JOB_CTRLmsg = ecflow.JobCreationCtrl()
    assert len(JOB_CTRL.get_error_msg()) == 0, JOB_CTRL.get_error_msg()
    # ===========================================================================
    CL = ecflow.Client(, ARGS.port)
        if ARGS.verbose: 
            print ("check server " + + ":" + ARGS.port + " is running") 

        if ARGS.verbose: 
            print ("Server is already running. re-start the server")

        if ARGS.verbose: 
            print ("Remove suites associated with this DEFS, allows rerun *******************************************")
        for suite in DEFS.suites:
                CL.delete(suite.get_abs_node_path(), True)
            except RuntimeError, as ex:
                pass # For first run this will fail, hence ignore
        if ARGS.verbose: 
            print ("Load the definition into " + + ":" + ARGS.port)
        if ARGS.path == "/":
            CL.replace(ARGS.path, DEFS)

        if ARGS.verbose: 
            print ("Begin all suites. They should be suspended.")
        print ("Loaded suites:")
        for suite in DEFS.suites:
            print ("   " +
        print ("into server " + \
                             + ":" + ARGS.port + ", please view the playable suites in the GUI")
    except RuntimeError, as ex:
        print ("Error: " + str(ex))