ecFlow's documentation is now on readthedocs!

An ecFlow Python Client can be used to solve a Fault Tolerance request, for example, when three out of five families or tasks are enough to carry on submitting new jobs.

ecflow3of5.py
#!/usr/bin/env python
""" ./ecflow3of5.py 63 /e_41r2/main/12/prod """
import ecflow
import os
import sys
import time
host = os.getenv("ECF_HOST", "localhost")
port = os.getenv("ECF_PORT", 31415)
client = ecflow.Client(host, port)
wait = False; wait = True
interval = 30
outof5 = int(sys.argv[1])
node_path = sys.argv[2]
def stop(msg, num): print msg; sys.exit(num)
while 1:
    tot = 0
    count = 0
    done = True
    client.sync_local()
    node = client.get_defs().find_abs_node(node_path)
    if node is None: stop("node not found!!!", 1)
    for item in node.nodes:
        count += 1
        status = "%s" % item.get_state()
        # print item.get_abs_node_path(), status, outof5, tot    
        if status == "complete":
            tot += 1
            if tot >= outof5: stop("# OK", 0)
        elif status == "aborted": 
            pass
        else: done = False
    if count < outof5: stop("# Impossible: %d < %d" % (count, outof5), 1)
    if done: stop("# KO %d" % tot, 1)
    print "# still possible",
    if wait: 
        print "...", tot, outof5, count
        time.sleep(interval); 
    else: stop("", -1)

Such client can be used as an ecFlow task wrapper, with few more lines.

tasked client
#!/usr/bin/env python
""" an example for a simple python client to help ecFlow scheduling
 + unit test
 + task loading in the suite
$manual 

  example
$end
$comment
  a comment in a comment
$end
"""
import ecflow
import getopt
import os
import sys
import time
import unittest

sys.path.append("/home/ma/emos/def/o/def") # ecf can be imported ...

MICRO = "$$" # keep ecFlow pleased with micro character balance

def usage():
    print sys.argv[0], """ -n <num> -p <path_to_node>
      -h # help
      -i <number> # sleep interval for sleep mode
      -t # unit test
      -r # load the task definition in a suite, associated with -p option 
      -w # activate sleep mode
    """
    for num, val in enumerate(sys.argv):
        print num, val
    sys.exit(2)

def stop(msg, num, child=None): 
    print msg; 
    if child: 
        child.report(msg)
        if num == 0: child.report("stop")
    sys.stdout.flush()
    sys.stderr.flush()
    sys.exit(num)

class Child(object):
    """
    /var/tmp/map/work/p4/metapps/suites/o/admin/ecflow3of5_prod.py -i 30 -w -p /test3outof5/fam -n 3

    """
    def __init__(self):
        import signal
        env = {
            "ECF_NODE": "$ECF_NODE$", # check can be on a server, child on another...
            "ECF_PASS": "$ECF_PASS$",
            "ECF_NAME": "$ECF_NAME$",
            "ECF_PORT": "$ECF_PORT$",
            "ECF_TRYNO": "$ECF_TRYNO$",             }
        self.client = None
        if MICRO[0] in env["ECF_PORT"]: 
            print "# cli mode"
            return
        print "#MSG: will communicate with server..."
        print "#MSG: kill: ssh %s kill -15 %d" % (os.getenv("HOST","localhost"),
                                             os.getpid())
        self.client = ecflow.Client()
        self.client.set_host_port(env["ECF_NODE"], int(env["ECF_PORT"]))
        self.client.set_child_pid(os.getpid())
        self.client.set_child_path(env["ECF_NAME"])
        self.client.set_child_password(env["ECF_PASS"])
        self.client.set_child_try_no(int(env["ECF_TRYNO"]))
        self.client.child_init()
        self.client.set_child_timeout(20)

        for sig in (signal.SIGINT,
                        signal.SIGHUP,
                        signal.SIGQUIT,
                        signal.SIGILL,
                        signal.SIGTRAP,
                        signal.SIGIOT,
                        signal.SIGBUS,
                        signal.SIGFPE,
                        signal.SIGUSR1,
                        signal.SIGUSR2,
                        signal.SIGPIPE,
                        signal.SIGTERM,
                        signal.SIGXCPU,
                        signal.SIGPWR):
            signal.signal(sig, self.signal_handler)

    def signal_handler(self, signum, frame):
        """ catch signal """
        print 'Aborting: Signal handler called with signal ', signum
        if self.client:
            self.client.child_abort("Signal handler called with signal " + str(signum))
            sys.exit(0)

 def __exit__(self, exc_type, exc_value, traceback):
        if self.client: 
            self.client.child_abort()
            sys.exit(0)

 def report(self, msg, meter=None):
        """ communicate with ecFlow server """
        if not self.client:
            return
        elif meter:
            self.client.child_meter(msg, meter)
        elif msg == "stop":
            self.client.child_complete()
            self.client = None
        else: self.client.child_label("info", msg)


def check3of5(outof5, node_path, wait=True, interval=30):
  host = os.getenv("ECF_HOST", "localhost")
  port = os.getenv("ECF_PORT", 31415)
  client = ecflow.Client(host, port)
  client.ch_register(False, [ str(node_path.split('/')[1]) ])
  child = Child() # choice? might be global variable ???
  # raise BaseException()
  while 1:
    tot = 0
    count = 0
    done = True
    client.sync_local()
    node = client.get_defs().find_abs_node(node_path)
    if node is None: stop("node not found!!!", 1, child)

    for item in node.nodes:
        count += 1
        status = "%s" % item.get_state()
        # print item.get_abs_node_path(), status, outof5, tot    
        if status == "complete":
            tot += 1
            if tot >= outof5: stop("# OK", 0, child)

        elif status == "aborted": 
            pass
        else: done = False

    if count < outof5: stop("# Impossible %d %d" % (count, outof5), 1, child)

    if done: stop("# KO %d %d %d" % (tot, outof5, count), 1, child)

    print "# still possible",
    if wait: 
        print "...", tot, outof5, count
        child.report("... %d %d %d" % (tot, outof5, count))
        time.sleep(interval); 
    else: stop("", -1, child)

def task_check():
    from ecf import Task, Label, Variables
    pwd = os.getcwd()
    name = sys.argv[0]
    if '/' in name: name = name.split('/')[-1]
    if '.' in name: name = name.split('.')[0]
    return Task(name).add(
        Label("info", "this task uses a python-script directly, no job..."),
        Variables(ECF_MICRO = MICRO[0],
                  ECF_FILES= pwd,
                  ECF_HOME= pwd,
                  ECF_EXTN= ".py",
                  ECF_PASS= "FREE",
                  # ECF_JOB_CMD= pwd + "/%s -i $INTERVAL$ -w" % name
                  # ECF_JOB_CMD= "python $ECF_JOB$ -i $INTERVAL$ -w"
                  ECF_JOB_CMD= "python $ECF_JOB$ -w"
                  + " -n 3 -p /$SUITE$/fam"
                  + " > $ECF_JOBOUT$ 2>&1",
                  INTERVAL=30,
              ),
    )

class Test3outof5(unittest.TestCase):
    """ a test case """
    def test_1(self, test_ok=1):
        """ a test """
        import ecf
        from ecf import Task, Label, Variables, Suite, Defs, Family, Trigger

        defs = Defs()
        sname = "test3outof5"
        cargo = task_check()

        defs.add_suite(
            Suite(sname).add(
                # ecf.Defstatus("suspended"),
                Family("fam").add(
                    ecf.Defstatus("suspended"),
                    [ Task(name) for name in ("a", "b", "c", "d", "e") ]),
                
                cargo.add(Trigger("fam != queued")), # trigger as string

                Family("user").add(
                    # trigger with python == and obj:
                    Trigger(cargo.name() == ecf.COMPLETE),
                    Task("dummy").add(
                        ecf.Complete("1==1"))
                )
            )
        )

        client = replace("/%s" % sname, defs)
        client.begin_suite(sname)
        client.force_state("/%s/fam/a" % sname, ecflow.State.complete)
        client.force_state("/%s/fam/b" % sname, ecflow.State.aborted)
        client.force_state("/%s/fam/c" % sname, ecflow.State.complete)

        if test_ok: client.force_state("/%s/fam/d" % sname, ecflow.State.active)
        else: client.force_state("/%s/fam/d" % sname, ecflow.State.aborted)

        if test_ok: client.force_state("/%s/fam/e" % sname, ecflow.State.complete)
        else: client.force_state("/%s/fam/e" % sname, ecflow.State.aborted)

    # def test_2(self):         self.test_1(0)

def replace(path, defs=None):
    import ecf
    host = os.getenv("ECF_HOST", "localhost")
    port = os.getenv("ECF_PORT", 31415)
    client = ecf.Client(host, port)
    # ecf.Client(host, port).replace(path, defs, 1, 1)
    client.replace(path, defs, 1, 1)
    return client

if __name__ == '__main__':
    try: 
        OPTS, ARGS = getopt.getopt(
            sys.argv[1:], "hi:n:p:rtw", 
            ["help", "interval", "number", "path", "replace", "test", "wait"])
    except getopt.GetoptError as err:
        print "#what?", usage()

    INTERVAL = 30
    WAIT = False
    PATH = None
    NUM = None     
    REPLACE = False
    for o, a in OPTS:
        if o in ("-h", "--help"): 
            usage()
        elif o in ("-i", "--interval"): 
            INTERVAL = int(a)
        elif o in ("-n", "--number"): 
            NUM = int(a)
        elif o in ("-p", "--path"): 
            PATH = a        
        elif o in ("-r", "--replace"): 
            REPLACE = True 
        elif o in ("-t", "--test"): 
            unittest.main(argv=[sys.argv[0]])
            sys.exit(0)
        elif o in ("-w", "--wait"): 
            WAIT = True

    if NUM and PATH:
        check3of5(NUM, PATH, WAIT, INTERVAL)

    elif PATH and REPLACE:
        replace(PATH)

    else: usage()