Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Standalone client

In this example we want to monitor a particular task.
If this task is aborted for any reason, we ask the
server for the job output. This could be mailed to the user.
#!/usr/bin/env python2.7
import ecflow
import time
import sys def monitor_critical_task(ci, path_to_task): # Query the server for any changes if ci.news_local(): # get the incremental changes, and merge with defs stored on the Client ci.sync_local() # check to see if definition exists in the server defs = ci.get_defs() if len(defs) == None0 : sys.exit(0) # return server has no suites # find the task we are interested in critical_task = defs.find_abs_node(path_to_task) if critical_task ==is None: # No such task sys.exit(0) # return # Check to see if task was aborted, if it was email me the job output if critical_task.get_state() == ecflow.State.aborted: # Get the job output the_aborted_task_output = ci.get_file(path_to_task,'jobout') # email(the_aborted_task_output) sys.exit(0) try: # Create the client. This will read the default environment variables ci = ecflow.Client("localhost", "4143") # Continually monitor the suite while 1: monitor_critical_task(ci, "/suite/critical_node") # Sleep for 5 minutes. # To avoid overloading server ensure sleep is > 60 seconds time.sleep(300) except RuntimeError, e: print str(e)

...

Code Block
languagepy
themeEclipse
titlepyecflow_fuse_ecflow.py
collapsetrue
#!/usr/bin/env python
import os
import sys
import time

sys.path.append("/usr/local/apps/ecflow/current/lib/python2.7/site-packages/ecflow")
from errno import ENOENT
from stat import S_IFDIR, S_IFREG, S_IFBLK
from sys import argv, exit
import ecflow
from ecflow import Client

from fuse import FUSE, Operations, LoggingMixIn, FuseOSError, fuse_get_context

FILESIZE = 10000
UPDATE = "time"
UPDATE = "always"

ignore = ("bdmv", "inf", )

exts = { "ecf": "script",
         "sms": "script",
         "man": "manual",
         "out": "jobout",
         "nop": None,
         "att": None,
         "job": "job" }

statuses = {
            -1: "unknown",
             0: "unknown",
             1: "suspended",
             2: "complete",
             3: "queued",
             4: "submitted",
             5: "active",
             6: "aborted",
             7: "shutdown",
             8: "halted",
             9: "unknown",             }

state3 = ("unk", "que", "sub", "com", "abo", "sus", "hal", "shu", )

def list_dir(node):
    res = []
    for item in node.nodes:
        name = item.name()
        state = ".%03s" % item.get_state()
        res.extend([ name + state[:4]])
        res.extend([ name + ".att"])
        if isinstance(item, ecflow.Task):
            res.extend([ name + ".job",
                         name + ".ecf",
                         name + ".out",
                         name + ".man",])
    return res

def list_att(node):
        attr = dict()

        try:
            clock = suite.get_clock()
            attr = dict()
            if clock: 
                attr['clock']= "%s" % clock
        except: pass

        attr['status'] = "%s" % node.get_state()

        defstatus = node.get_defstatus()
        if defstatus != ecflow.DState.queued: 
            attr['defstatus'] = "%s" % defstatus
            
        autocancel = node.get_autocancel()
        if autocancel: attr['autocancel']= "%s" % autocancel
        
        repeat = node.get_repeat()
        if not repeat.empty(): attr['repeat']= "%s # value:%s" % (repeat, 
                                                                  repeat.value())
    
        late = node.get_late()
        if late: attr['late']= "%s" % late

        complete_expr = node.get_complete()
        if complete_expr:
            for part_expr in complete_expr.parts:
                trig = "complete "
                if part_expr.and_expr(): trig = trig + "-a "
                if part_expr.or_expr():  trig = trig + "-o "
                attr['complete']= "%s" % trig + " %s" % \
                           part_expr.get_expression() + "\n"
        trigger_expr = node.get_trigger()
        if trigger_expr:
            for part_expr in trigger_expr.parts:
                trig = "trigger "
                if part_expr.and_expr(): trig = trig + "-a "
                if part_expr.or_expr():  trig = trig + "-o "
                attr['trigger'] = "%s" % trig + " %s" % \
                           part_expr.get_expression() + "\n"
            
        attr['edit'] = [ (item.name(), item.value()) for item in node.variables ]

        addit(node.meters, attr, 'meter')
        addit(node.events, attr, 'event')
        addit(node.labels, attr, 'label')
        addit(node.limits, attr, 'limit')
        addit(node.inlimits, attr, 'inlimits')
        addit(node.times, attr, 'time')
        addit(node.todays, attr, 'today')
        addit(node.dates, attr, 'date')
        addit(node.days, attr, "day")
        addit(node.crons, attr, "cron")

        import pprint
        pp = pprint.PrettyPrinter(indent=4)
        pp.pprint(attr)

        return pprint.pformat(attr)

def addit(array, cont, name):
        rc = []
        for item in array:
            rc.append("%s" % item)
        if rc is not None: cont[name] = rc

class FuseEcflow(LoggingMixIn, Operations):
    '''
    A simple Ecflow python client example
    '''

    def __init__(self, host="localhost", port=31415, path='.'):
        self.client = Client(host, port)
        self.client.sync_local()
        self.update = int(time.strftime("%H%M"))
        self.defs = self.client.get_defs()
        self.root = path
        print "#MSG: connected to %s " % host + port
        if 0: 
            for s in self.defs.suites: print s.name(),
            print

    def chmod(self, path, mode):
        raise FuseOSError(ENOENT)

    def chown(self, path, uid, gid):
        raise FuseOSError(ENOENT)

    def create(self, path, mode):
        raise FuseOSError(ENOENT)

    def destroy(self, path): pass

    def getattr(self, path, fh=None):
        uid, gid, pid = fuse_get_context()

        cur = os.lstat(".")
        st = dict((key, getattr(cur, key)) for key in (
            'st_atime', 'st_gid',
            'st_mode', 'st_mtime', 'st_size', 'st_uid'))
        if path == '/':
            st['st_mode'] = (S_IFDIR | 0555)
            st['st_nlink']=2

        elif '.' in path: 
            st['st_mode'] = (S_IFREG | 0444)
            st['st_size'] = FILESIZE 

        elif 1:
            st['st_mode'] = (S_IFDIR | 0444)
            st['st_size'] = 1

        else:
            raise FuseOSError(ENOENT)
        st['st_ctime'] = st['st_mtime'] = st['st_atime'] = time.time()
        return st

    def mkdir(self, path, mode):
        raise FuseOSError(ENOENT)

    def refresh(self):
        curr = int(time.strftime("%H%M"))
        if curr - self.update > 5 or UPDATE == "always":
            self.client.sync_local()
            self.update = curr

    def read(self, path, size, offset, fh):    
        ext = "nop"
        if "." in path: 
            path, ext = path.split(".")

        self.refresh()

        print "read", path, size, offset, fh, ext
        if ext in state3: return "-empty-"
        elif ext in ignore: return "-empty-"
        elif not ext in exts.keys(): print "what?", ext; return "-empty-"
        elif ext not in ("nop", "att"):
            res = "%s" % self.client.get_file(str(path), exts[ext])
            if len(res) > size: 
                return "#TRUNCATED\n" + res[:size-30] + "\n#TRUNCATED\n"
            return res
        else: node = self.client.get_defs().find_abs_node(str(path))
        res = ""

        if node: print node.name(), node.get_abs_node_path()
        if path == '/':
            for s in self.defs.suites: res += "%s " % s.name()
            return res

        elif node is None: return "-empty-"

        elif ext == "att": return list_att(node)

        elif 1:
            for s in node.nodes: res += "%s\n" % s.name()
            return res

        else: raise FuseOSError(ENOENT)

        f = open(path)
        f.seek(offset, 0)
        buf = f.read(size)
        f.close()
        return buf

    def readdir(self, path='/', fh=None):
        ext = "nop"
        if "." in path: 
            path, ext = path.split(".")
        if not ext in exts.keys(): print "readdir: what?", ext; return [ "/" ]
        print "readdir", path, fh, path, ext

        self.refresh()

        if ext != "nop": 
            return [ ".", "..", "ok", path.replace("/", "_"), ext ]
        else: node = self.client.get_defs().find_abs_node(str(path))
        res = []

        if path == '/':
            node = self.client.get_defs()
            return ['.', '..'] + [ "%s" % s.name() for s in node.suites ] 

        elif node is None: return ["-empty-" ]

        else: 
            res = ['.', '..'] + ["%s" % n.name() for n in node.nodes ] 
            res += list_dir(node)
            return res

    def readlink(self, path):
        raise FuseOSError(ENOENT)

    def rename(self, old, new):
        raise FuseOSError(ENOENT)   

    def rmdir(self, path):
        raise FuseOSError(ENOENT)   

    def symlink(self, target, source):
        raise FuseOSError(ENOENT)   

    def truncate(self, path, length, fh=None):
        raise FuseOSError(ENOENT)   

    def unlink(self, path):
        raise FuseOSError(ENOENT)   

    def utimens(self, path, times=None):
        raise FuseOSError(ENOENT)   

    def write(self, path, data, offset, fh):
        raise FuseOSError(ENOENT)   

if __name__ == '__main__':
    if len(argv) != 4:
        print('usage: %s <host> <port> <mountpoint>' % argv[0])
        exit(1)

    fuse = FUSE(FuseEcflow(argv[1], argv[2]), argv[3], 
                foreground=True, nothreads=True)

...

Code Block
languagebash
themeEclipse
titlemount server content as a file system
mkdir -p mnt; python ./pyecflow_fuse_ecflow.py $ECF_NODEHOST $ECF_PORT mnt;

In this example, each node status is encoded as a three characters extension (.com for complete, .que for queued, .abo for aborted ...) and you will appreciate that suspended status is not displayed.

...

Code Block
languagebash
titleuse case
# server eod3 was mounted
ls eod3/emc_41r2/thu
# 01  01.att  01.que  hind  hind.att  hind.que  verify  verify.att  verify.com
LS_COLORS+="*.abo=41:*.com=43:*.sus=01;45:*.que=44:*.sub=01;46:*.unk=47:*.act=01;42:"
ls eod3/emc_41r2/thu

cat eod3/emc_41r2/main/18bc/sv/getini.ecf
cat eod3/emc_41r2/main/18bc/sv/getini.man
cat eod3/emc_41r2/main/18bc/sv/getini.out
kdirstat eod3/emc_41r2/main/
find eod3 -name "*.abo"
mc eod3


Center

Image Added