ecFlow's documentation is now on readthedocs!

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 21 Next »

Standalone client

In this example we want to monitor a particular task.
If this task is aborted for any reason, we ask the
server for the job output. This could be mailed to the user.
#!/usr/bin/env python2.7
import ecflow
import time

def monitor_critical_task(ci, path_to_task):
    
    # Query the server for any changes
    if ci.news_local():
            
        # get the incremental changes, and merge with defs stored on the Client 
        ci.sync_local()
        
        # check to see if definition exists in the server
        defs = ci.get_defs()
        if defs == None :
            exit(0) # return
            
        # find the task we are interested in 
        critical_task = defs.find_abs_node(path_to_task)
        if critical_task == None:
            # No such task
            exit(0) # return
             
        # Check to see if task was aborted, if it was email me the job output
        if critical_task.get_state() == ecflow.State.aborted:
                
            # Get the job output
            the_aborted_task_output = ci.get_file(path_to_task,'jobout')  
            # email(the_aborted_task_output)
            exit(0)
                
try:
    # Create the client. This will read the default environment variables
    ci = ecflow.Client("localhost", "4143")

    # Continually monitor the suite
    while 1:

        monitor_critical_task(ci, "/suite/critical_node")
                
        # Sleep for 5 minutes. 
        # To avoid overloading server ensure sleep is > 60 seconds 
        time.sleep(300)
        
except RuntimeError, e:
    print str(e)

Display ecFlow server content as a file system

Another example is to use an ecFlow python client to provide the server content visible like a mounted file system.

setup the environment
virtualenv venv
. ./venv/bin/activate
pip install fusepy

The following script shows that task wrappers, manual page, job, output or even node attributes may be provided as files.

ecflow fuse client example
#!/usr/bin/env python
import os
import sys
import time

sys.path.append("/usr/local/apps/ecflow/current/lib/python2.7/site-packages/ecflow")
from errno import ENOENT
from stat import S_IFDIR, S_IFREG, S_IFBLK
from sys import argv, exit
import ecflow
from ecflow import Client

from fuse import FUSE, Operations, LoggingMixIn, FuseOSError, fuse_get_context

FILESIZE = 10000
UPDATE = "time"
UPDATE = "always"

ignore = ("bdmv", "inf", )

exts = { "ecf": "script",
         "sms": "script",
         "man": "manual",
         "out": "jobout",
         "nop": None,
         "att": None,
         "job": "job" }

statuses = {
            -1: "unknown",
             0: "unknown",
             1: "suspended",
             2: "complete",
             3: "queued",
             4: "submitted",
             5: "active",
             6: "aborted",
             7: "shutdown",
             8: "halted",
             9: "unknown",             }

state3 = ("unk", "que", "sub", "com", "abo", "sus", "hal", "shu", )

def list_dir(node):
    res = []
    for item in node.nodes:
        name = item.name()
        state = ".%03s" % item.get_state()
        res.extend([ name + state[:4]])
        res.extend([ name + ".att"])
        if isinstance(item, ecflow.Task):
            res.extend([ name + ".job",
                         name + ".ecf",
                         name + ".out",
                         name + ".man",])
    return res

def list_att(node):
        attr = dict()

        try:
            clock = suite.get_clock()
            attr = dict()
            if clock: 
                attr['clock']= "%s" % clock
        except: pass

        attr['status'] = "%s" % node.get_state()

        defstatus = node.get_defstatus()
        if defstatus != ecflow.DState.queued: 
            attr['defstatus'] = "%s" % defstatus
            
        autocancel = node.get_autocancel()
        if autocancel: attr['autocancel']= "%s" % autocancel
        
        repeat = node.get_repeat()
        if not repeat.empty(): attr['repeat']= "%s # value:%s" % (repeat, 
                                                                  repeat.value())
    
        late = node.get_late()
        if late: attr['late']= "%s" % late

        complete_expr = node.get_complete()
        if complete_expr:
            for part_expr in complete_expr.parts:
                trig = "complete "
                if part_expr.and_expr(): trig = trig + "-a "
                if part_expr.or_expr():  trig = trig + "-o "
                attr['complete']= "%s" % trig + " %s" % \
                           part_expr.get_expression() + "\n"
        trigger_expr = node.get_trigger()
        if trigger_expr:
            for part_expr in trigger_expr.parts:
                trig = "trigger "
                if part_expr.and_expr(): trig = trig + "-a "
                if part_expr.or_expr():  trig = trig + "-o "
                attr['trigger'] = "%s" % trig + " %s" % \
                           part_expr.get_expression() + "\n"
            
        attr['edit'] = [ (item.name(), item.value()) for item in node.variables ]

        addit(node.meters, attr, 'meter')
        addit(node.events, attr, 'event')
        addit(node.labels, attr, 'label')
        addit(node.limits, attr, 'limit')
        addit(node.inlimits, attr, 'inlimits')
        addit(node.times, attr, 'time')
        addit(node.todays, attr, 'today')
        addit(node.dates, attr, 'date')
        addit(node.days, attr, "day")
        addit(node.crons, attr, "cron")

        import pprint
        pp = pprint.PrettyPrinter(indent=4)
        pp.pprint(attr)

        return pprint.pformat(attr)

def addit(array, cont, name):
        rc = []
        for item in array:
            rc.append("%s" % item)
        if rc is not None: cont[name] = rc

class FuseEcflow(LoggingMixIn, Operations):
    '''
    A simple Ecflow python client example
    '''

    def __init__(self, host="localhost", port=31415, path='.'):
        self.client = Client(host, port)
        self.client.sync_local()
        self.update = int(time.strftime("%H%M"))
        self.defs = self.client.get_defs()
        self.root = path
        print "#MSG: connected to %s " % host + port
        if 0: 
            for s in self.defs.suites: print s.name(),
            print

    def chmod(self, path, mode):
        raise FuseOSError(ENOENT)

    def chown(self, path, uid, gid):
        raise FuseOSError(ENOENT)

    def create(self, path, mode):
        raise FuseOSError(ENOENT)

    def destroy(self, path): pass

    def getattr(self, path, fh=None):
        uid, gid, pid = fuse_get_context()

        cur = os.lstat(".")
        st = dict((key, getattr(cur, key)) for key in (
            'st_atime', 'st_gid',
            'st_mode', 'st_mtime', 'st_size', 'st_uid'))
        if path == '/':
            st['st_mode'] = (S_IFDIR | 0555)
            st['st_nlink']=2

        elif '.' in path: 
            st['st_mode'] = (S_IFREG | 0444)
            st['st_size'] = FILESIZE 

        elif 1:
            st['st_mode'] = (S_IFDIR | 0444)
            st['st_size'] = 1

        else:
            raise FuseOSError(ENOENT)
        st['st_ctime'] = st['st_mtime'] = st['st_atime'] = time.time()
        return st

    def mkdir(self, path, mode):
        raise FuseOSError(ENOENT)

    def refresh(self):
        curr = int(time.strftime("%H%M"))
        if curr - self.update > 5 or UPDATE == "always":
            self.client.sync_local()
            self.update = curr

    def read(self, path, size, offset, fh):    
        ext = "nop"
        if "." in path: 
            path, ext = path.split(".")

        self.refresh()

        print "read", path, size, offset, fh, ext
        if ext in state3: return "-empty-"
        elif ext in ignore: return "-empty-"
        elif not ext in exts.keys(): print "what?", ext; return "-empty-"
        elif ext not in ("nop", "att"):
            res = "%s" % self.client.get_file(str(path), exts[ext])
            if len(res) > size: 
                return "#TRUNCATED\n" + res[:size-30] + "\n#TRUNCATED\n"
            return res
        else: node = self.client.get_defs().find_abs_node(str(path))
        res = ""

        if node: print node.name(), node.get_abs_node_path()
        if path == '/':
            for s in self.defs.suites: res += "%s " % s.name()
            return res

        elif node is None: return "-empty-"

        elif ext == "att": return list_att(node)

        elif 1:
            for s in node.nodes: res += "%s\n" % s.name()
            return res

        else: raise FuseOSError(ENOENT)

        f = open(path)
        f.seek(offset, 0)
        buf = f.read(size)
        f.close()
        return buf

    def readdir(self, path='/', fh=None):
        ext = "nop"
        if "." in path: 
            path, ext = path.split(".")
        if not ext in exts.keys(): print "readdir: what?", ext; return [ "/" ]
        print "readdir", path, fh, path, ext

        self.refresh()

        if ext != "nop": 
            return [ ".", "..", "ok", path.replace("/", "_"), ext ]
        else: node = self.client.get_defs().find_abs_node(str(path))
        res = []

        if path == '/':
            node = self.client.get_defs()
            return ['.', '..'] + [ "%s" % s.name() for s in node.suites ] 

        elif node is None: return ["-empty-" ]

        else: 
            res = ['.', '..'] + ["%s" % n.name() for n in node.nodes ] 
            res += list_dir(node)
            return res

    def readlink(self, path):
        raise FuseOSError(ENOENT)

    def rename(self, old, new):
        raise FuseOSError(ENOENT)   

    def rmdir(self, path):
        raise FuseOSError(ENOENT)   

    def symlink(self, target, source):
        raise FuseOSError(ENOENT)   

    def truncate(self, path, length, fh=None):
        raise FuseOSError(ENOENT)   

    def unlink(self, path):
        raise FuseOSError(ENOENT)   

    def utimens(self, path, times=None):
        raise FuseOSError(ENOENT)   

    def write(self, path, data, offset, fh):
        raise FuseOSError(ENOENT)   

if __name__ == '__main__':
    if len(argv) != 4:
        print('usage: %s <host> <port> <mountpoint>' % argv[0])
        exit(1)

    fuse = FUSE(FuseEcflow(argv[1], argv[2]), argv[3], 
                foreground=True, nothreads=True)

Client shall be activated:

mount server content as a file system
mkdir -p mnt; python ./py_fuse_ecflow.py $ECF_NODE $ECF_PORT 31415 mnt;

In this example, each node status is encoded and a three characters extension (.com for complete, .que for queued, .abo for aborted ...) and you will appreciate that suspended status is not displayed.

Standard command line utilities (find, ls, cat, ...) can be used then:

use case
# server eod3 was mounted
ls eod3/emc_41r2/thu
# 01  01.att  01.que  hind  hind.att  hind.que  verify  verify.att  verify.com
cat eod3/emc_41r2/main/18bc/sv/getini.ecf
cat eod3/emc_41r2/main/18bc/sv/getini.man
cat eod3/emc_41r2/main/18bc/sv/getini.out
kdirstat eod3/emc_41r2/main/
find eod3 -name "*.abo" 
  • No labels