ecFlow's documentation is now on readthedocs!

Standalone client

In this example we want to monitor a particular task.
If this task is aborted for any reason, we ask the
server for the job output. This could be mailed to the user.
import ecflow
import time
import sys def monitor_critical_task(ci, path_to_task): # Query the server for any changes if ci.news_local(): # get the incremental changes, and merge with defs stored on the Client ci.sync_local() # check to see if definition exists in the server defs = ci.get_defs() if len(defs) == 0 : sys.exit(0) # return server has no suites # find the task we are interested in critical_task = defs.find_abs_node(path_to_task) if critical_task is None: # No such task sys.exit(0) # return # Check to see if task was aborted, if it was email me the job output if critical_task.get_state() == ecflow.State.aborted: # Get the job output the_aborted_task_output = ci.get_file(path_to_task,'jobout') # email(the_aborted_task_output) sys.exit(0) try: # Create the client. This will read the default environment variables ci = ecflow.Client("localhost", "4143") # Continually monitor the suite while 1: monitor_critical_task(ci, "/suite/critical_node") # Sleep for 5 minutes. # To avoid overloading server ensure sleep is > 60 seconds time.sleep(300) except RuntimeError, e: print str(e)

Display ecFlow server content as a file system

Another example is to use an ecFlow python client to provide the server content visible like a mounted file system.

setup the environment
virtualenv venv
. ./venv/bin/activate
pip install fusepy

The following script shows that task wrappers, manual page, job, output or even node attributes may be provided as files.

ecflow_fuse.py
#!/usr/bin/env python
import os
import sys
import time

sys.path.append("/usr/local/apps/ecflow/current/lib/python2.7/site-packages/ecflow")
from errno import ENOENT
from stat import S_IFDIR, S_IFREG, S_IFBLK
from sys import argv, exit
import ecflow
from ecflow import Client

from fuse import FUSE, Operations, LoggingMixIn, FuseOSError, fuse_get_context

FILESIZE = 10000
UPDATE = "time"
UPDATE = "always"

ignore = ("bdmv", "inf", )

exts = { "ecf": "script",
         "sms": "script",
         "man": "manual",
         "out": "jobout",
         "nop": None,
         "att": None,
         "job": "job" }

statuses = {
            -1: "unknown",
             0: "unknown",
             1: "suspended",
             2: "complete",
             3: "queued",
             4: "submitted",
             5: "active",
             6: "aborted",
             7: "shutdown",
             8: "halted",
             9: "unknown",             }

state3 = ("unk", "que", "sub", "com", "abo", "sus", "hal", "shu", )

def list_dir(node):
    res = []
    for item in node.nodes:
        name = item.name()
        state = ".%03s" % item.get_state()
        res.extend([ name + state[:4]])
        res.extend([ name + ".att"])
        if isinstance(item, ecflow.Task):
            res.extend([ name + ".job",
                         name + ".ecf",
                         name + ".out",
                         name + ".man",])
    return res

def list_att(node):
        attr = dict()

        try:
            clock = suite.get_clock()
            attr = dict()
            if clock: 
                attr['clock']= "%s" % clock
        except: pass

        attr['status'] = "%s" % node.get_state()

        defstatus = node.get_defstatus()
        if defstatus != ecflow.DState.queued: 
            attr['defstatus'] = "%s" % defstatus
            
        autocancel = node.get_autocancel()
        if autocancel: attr['autocancel']= "%s" % autocancel
        
        repeat = node.get_repeat()
        if not repeat.empty(): attr['repeat']= "%s # value:%s" % (repeat, 
                                                                  repeat.value())
    
        late = node.get_late()
        if late: attr['late']= "%s" % late

        complete_expr = node.get_complete()
        if complete_expr:
            for part_expr in complete_expr.parts:
                trig = "complete "
                if part_expr.and_expr(): trig = trig + "-a "
                if part_expr.or_expr():  trig = trig + "-o "
                attr['complete']= "%s" % trig + " %s" % \
                           part_expr.get_expression() + "\n"
        trigger_expr = node.get_trigger()
        if trigger_expr:
            for part_expr in trigger_expr.parts:
                trig = "trigger "
                if part_expr.and_expr(): trig = trig + "-a "
                if part_expr.or_expr():  trig = trig + "-o "
                attr['trigger'] = "%s" % trig + " %s" % \
                           part_expr.get_expression() + "\n"
            
        attr['edit'] = [ (item.name(), item.value()) for item in node.variables ]

        addit(node.meters, attr, 'meter')
        addit(node.events, attr, 'event')
        addit(node.labels, attr, 'label')
        addit(node.limits, attr, 'limit')
        addit(node.inlimits, attr, 'inlimits')
        addit(node.times, attr, 'time')
        addit(node.todays, attr, 'today')
        addit(node.dates, attr, 'date')
        addit(node.days, attr, "day")
        addit(node.crons, attr, "cron")

        import pprint
        pp = pprint.PrettyPrinter(indent=4)
        pp.pprint(attr)

        return pprint.pformat(attr)

def addit(array, cont, name):
        rc = []
        for item in array:
            rc.append("%s" % item)
        if rc is not None: cont[name] = rc

class FuseEcflow(LoggingMixIn, Operations):
    '''
    A simple Ecflow python client example
    '''

    def __init__(self, host="localhost", port=31415, path='.'):
        self.client = Client(host, port)
        self.client.sync_local()
        self.update = int(time.strftime("%H%M"))
        self.defs = self.client.get_defs()
        self.root = path
        print "#MSG: connected to %s " % host + port
        if 0: 
            for s in self.defs.suites: print s.name(),
            print

    def chmod(self, path, mode):
        raise FuseOSError(ENOENT)

    def chown(self, path, uid, gid):
        raise FuseOSError(ENOENT)

    def create(self, path, mode):
        raise FuseOSError(ENOENT)

    def destroy(self, path): pass

    def getattr(self, path, fh=None):
        uid, gid, pid = fuse_get_context()

        cur = os.lstat(".")
        st = dict((key, getattr(cur, key)) for key in (
            'st_atime', 'st_gid',
            'st_mode', 'st_mtime', 'st_size', 'st_uid'))
        if path == '/':
            st['st_mode'] = (S_IFDIR | 0555)
            st['st_nlink']=2

        elif '.' in path: 
            st['st_mode'] = (S_IFREG | 0444)
            st['st_size'] = FILESIZE 

        elif 1:
            st['st_mode'] = (S_IFDIR | 0444)
            st['st_size'] = 1

        else:
            raise FuseOSError(ENOENT)
        st['st_ctime'] = st['st_mtime'] = st['st_atime'] = time.time()
        return st

    def mkdir(self, path, mode):
        raise FuseOSError(ENOENT)

    def refresh(self):
        curr = int(time.strftime("%H%M"))
        if curr - self.update > 5 or UPDATE == "always":
            self.client.sync_local()
            self.update = curr

    def read(self, path, size, offset, fh):    
        ext = "nop"
        if "." in path: 
            path, ext = path.split(".")

        self.refresh()

        print "read", path, size, offset, fh, ext
        if ext in state3: return "-empty-"
        elif ext in ignore: return "-empty-"
        elif not ext in exts.keys(): print "what?", ext; return "-empty-"
        elif ext not in ("nop", "att"):
            res = "%s" % self.client.get_file(str(path), exts[ext])
            if len(res) > size: 
                return "#TRUNCATED\n" + res[:size-30] + "\n#TRUNCATED\n"
            return res
        else: node = self.client.get_defs().find_abs_node(str(path))
        res = ""

        if node: print node.name(), node.get_abs_node_path()
        if path == '/':
            for s in self.defs.suites: res += "%s " % s.name()
            return res

        elif node is None: return "-empty-"

        elif ext == "att": return list_att(node)

        elif 1:
            for s in node.nodes: res += "%s\n" % s.name()
            return res

        else: raise FuseOSError(ENOENT)

        f = open(path)
        f.seek(offset, 0)
        buf = f.read(size)
        f.close()
        return buf

    def readdir(self, path='/', fh=None):
        ext = "nop"
        if "." in path: 
            path, ext = path.split(".")
        if not ext in exts.keys(): print "readdir: what?", ext; return [ "/" ]
        print "readdir", path, fh, path, ext

        self.refresh()

        if ext != "nop": 
            return [ ".", "..", "ok", path.replace("/", "_"), ext ]
        else: node = self.client.get_defs().find_abs_node(str(path))
        res = []

        if path == '/':
            node = self.client.get_defs()
            return ['.', '..'] + [ "%s" % s.name() for s in node.suites ] 

        elif node is None: return ["-empty-" ]

        else: 
            res = ['.', '..'] + ["%s" % n.name() for n in node.nodes ] 
            res += list_dir(node)
            return res

    def readlink(self, path):
        raise FuseOSError(ENOENT)

    def rename(self, old, new):
        raise FuseOSError(ENOENT)   

    def rmdir(self, path):
        raise FuseOSError(ENOENT)   

    def symlink(self, target, source):
        raise FuseOSError(ENOENT)   

    def truncate(self, path, length, fh=None):
        raise FuseOSError(ENOENT)   

    def unlink(self, path):
        raise FuseOSError(ENOENT)   

    def utimens(self, path, times=None):
        raise FuseOSError(ENOENT)   

    def write(self, path, data, offset, fh):
        raise FuseOSError(ENOENT)   

if __name__ == '__main__':
    if len(argv) != 4:
        print('usage: %s <host> <port> <mountpoint>' % argv[0])
        exit(1)

    fuse = FUSE(FuseEcflow(argv[1], argv[2]), argv[3], 
                foreground=True, nothreads=True)

Client shall be activated:

mount server content as a file system
mkdir -p mnt; python ./ecflow_fuse.py $ECF_HOST $ECF_PORT mnt;

In this example, each node status is encoded as a three characters extension (.com for complete, .que for queued, .abo for aborted ...) and you will appreciate that suspended status is not displayed.

Standard command line utilities (midnight commander, find, ls, cat, ...) can be used then:

use case
# server eod3 was mounted
ls eod3/emc_41r2/thu
# 01  01.att  01.que  hind  hind.att  hind.que  verify  verify.att  verify.com
LS_COLORS+="*.abo=41:*.com=43:*.sus=01;45:*.que=44:*.sub=01;46:*.unk=47:*.act=01;42:"
ls eod3/emc_41r2/thu

cat eod3/emc_41r2/main/18bc/sv/getini.ecf
cat eod3/emc_41r2/main/18bc/sv/getini.man
cat eod3/emc_41r2/main/18bc/sv/getini.out
kdirstat eod3/emc_41r2/main/
find eod3 -name "*.abo"
mc eod3