Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagepy
themeEclipse
titlemeter sweeper
collapsetrue
#!/usr/bin/env python

"""
python docstring: demonstrate python cli + ecFlow task wrapper + task loader

$manual
DESCRIPTION:

sweeper:

find out min/max among modeleps_nemo tasks

OPERATORS: please, set complete if problematic

ANALYST: example as pure python task - job
$end

$comment
  comments can be added ...
$end

"""
import time
import os.path
import getopt
import sys

PATH = "/usr/local/apps/ecflow/4.0.9/lib/python2.7/site-packages/ecflow"
sys.path.insert(0, PATH)
import ecflow as ec

MICRO = "$$" # double dollar to please ecFlow micro character balance
TARGET_TASK = "modeleps_nemo"             # use option -t to overwrite
TARGET_METER = "step"                     # would this need an option???
MC_STEPS_LIST = ("/mc/main/12/legA/fc",   # use option -p for single path
                 "/mc/main/12/legB/fc",
                 "/mc/main/18bc/legA/fc",
                 "/mc/main/00/legA/fc",
                 "/mc/main/00/legB/fc",
                 "/mc/main/06bc/legA/fc",
                 "/mofc/thu/01/legC/fc",
                 "/mofc/thu/hind/14/back",
                 "/mofc/mon/01/legC/fc",
                 "/mofc/mon/hind/14/back",)

def focus_below(node):
    """ continue down below nodes which name is expected ;
    please update for external use...
    """
    return (node.name() in ("00", "12", "18bc", "06bc",
                            "legA", "legB", "pf", "cf", "fc")
            or "main" == node.name())

def process(item, path=None, low=999, high=-1, task=None):
    """ track nodes to follow and update min/max """
    if type(item) == str:
        defs = ec.Defs(item)
    elif isinstance(item, ec.Client):
        item.sync_local()
        defs = item.get_defs()
    elif isinstance(item, ec.Defs):
        defs = item
    else: defs = None

    if defs:
        for node in defs.suites:
            if node.name() != str(path.split('/')[1]):
                continue
            status = "%s" % node.get_state()
            if node.is_suspended() or status == "unknown":
                continue
            for item in node.nodes:
                low, high = process(item, path, low, high, task)

    elif isinstance(item, ec.Family):
        for node in item.nodes:
            if focus_below(node):
                return process(node, path, low, high, task)
            low, high = process(node, path, low, high, task)

    elif isinstance(item, ec.Task):
        node = item
        path2node = item.get_abs_node_path()
        status = "%s" % node.get_state()

        # ignore cal-val # ecmwf specific
        if ("/cv/" in path2node or
            not task in item.name()): return low, high
        if "/cv/" in path2node and (high > 240 or low < -1):
            high = 240
            return low, high

        stamp = ""
        evt = ""
        meter = ""
        if node.is_suspended():
            status = "suspended"
        try:
            for att in node.meters:
                meter = "%s" % att.value()
                if att.name() != TARGET_METER:
                    continue
                if   att.value() > high:
                    high = att.value()
                elif att.value() < low:
                    low = att.value()

        except Exception as excpt:
            print("#! problem with line:", excpt)
        # name = "%20s %s %5s %5s" % (path2node, stamp, evt, meter)

    else: print type(item)

    return low, high

def create_task():
    """ add ecFlow task into a family..."""
    sys.path.append("/home/ma/emos/o/def")
    import ecf
    steps = 360
    return ecf.Task("sweeper").add(
        ecf.Variables(
            ECF_FILES=os.getenv("ECF_FDI", None),
            ECF_EXTN=".py",
            ECF_JOB_CMD="ssh -x $WSHOST$ $ECF_JOB$ > $ECF_JOBOUT$ 2>&1 &",
            ECF_MICRO=MICRO[0],
        ),
        ecf.Label("info", ""),
        ecf.Defcomplete(),
        ecf.Meter("min", -1, steps, steps),
        ecf.Meter("max", -1, steps, steps),)

def loader(test=1, host=os.getenv("ECF_TEST_HOST", None)):
    """ replace sweeper task in test/oper mode """
    sys.path.append("/home/ma/emos/o/def")
    import ecf

    if test:
        env = {"host": os.getenv("ECF_TEST_HOST", None),
               "port": os.getenv("ECF_TEST_PORT", None),
               "path": "/admine/steps", }
    else:
        env = {"host": os.getenv("ECF_OPER_HOST", None),
               "port": os.getenv("ECF_OPER_PORT", None),
               "path": "/admin/steps", }
    defs = ecf.Defs()

    path = env["path"]
    sname = str(path.split('/')[1])
    fname = str(path.split('/')[2])

    suite = ecf.Suite(sname).add(
        ecf.Extern(MC_STEPS_LIST, defs),

        ecf.Family(fname).add(create_task().add(
            ecf.Cron("04:30 23:59 03:00"),
            ecf.Trigger("==active or ".join(MC_STEPS_LIST) + "==active"),
            ecf.Variables(WSHOST=host,
                          QUEUE="test"),
        )))

    defs.add_suite(suite)

    print "#MSG: replacing", path, env["host"], env["port"]
    ecf.Client(env["host"], env["port"]).replace(path, defs, 1, 1)


class Sweep(object):
    """ find min/max meter value below a node """

    def __init__(self, host=None, port=None, path=None, task=None):
        import signal
        if host is None:
            host = os.getenv("ECF_NODE", "$ECF_NODE$")
        if port is None:
            port = int(os.getenv("ECF_PORT", "$ECF_PORT$"))
        self.clt = ec.Client(host, port)
        self.cl2 = None
        if path is None:
            self.path = MC_STEPS_LIST
        else: self.path = (path)
        if task is None:
            self.task = TARGET_TASK
        else: self.task = task

        # shall make sense when processed into job by ecflow
        # name remains when not processed...
        ecfv = {"ECF_NODE": "$ECF_NODE$",
                "ECF_PASS": "$ECF_PASS$",
                "ECF_NAME": "$ECF_NAME$",
                "ECF_PORT": "$ECF_PORT$",
                "ECF_TRYNO": "$ECF_TRYNO$",        }

        for key, val in ecfv.items():
            if key in val:
                ecfv[key] = os.getenv(key, None)
        if ecfv["ECF_NODE"] and ecfv["ECF_NAME"]:
            print "#MSG will communicated with server..."
            print "#kill: ssh %s kill -15 %d" % (ecfv["ECF_NODE"], os.getpid())
            self.cl2 = ec.Client()
            self.cl2.set_host_port(ecfv["ECF_NODE"], ecfv["ECF_PORT"])
            self.cl2.set_child_pid(os.getpid())
            self.cl2.set_child_path(ecfv["ECF_NAME"])
            self.cl2.set_child_password(ecfv["ECF_PASS"])
            self.cl2.set_child_try_no(int(ecfv["ECF_TRYNO"]))
            self.cl2.child_init()
            self.cl2.set_child_timeout(20)

            for sig in (signal.SIGINT,
                        signal.SIGHUP,
                        signal.SIGQUIT,
                        signal.SIGILL,
                        signal.SIGTRAP,
                        signal.SIGIOT,
                        signal.SIGBUS,
                        signal.SIGFPE,
                        signal.SIGUSR1,
                        signal.SIGUSR2,
                        signal.SIGPIPE,
                        signal.SIGTERM,
                        signal.SIGXCPU,
                        signal.SIGPWR):
                signal.signal(sig, self.signal_handler)

    def signal_handler(self, signum, frame):
        """ catch signal """
        print 'Aborting: Signal handler called with signal ', signum
        self.cl2.child_abort("Signal handler called with signal " + str(signum))

    def report(self, msg, meter=None):
        """ communicate with ecFlow server """
        if not self.cl2:
            return
        if meter:
            self.cl2.child_meter(msg, meter)
        elif msg == "stop":
            self.cl2.child_complete()
            sys.exit(0)
        else: self.cl2.child_label("info", msg)

    def update(self):
        """ refresh status tree asking ecFlow server """
        num = 20
        leg = "legA"
        for path in self.path:
            first = 1
            low = 0
            high = 0
            self.clt.ch_register(False, [str(path.split('/')[1])])
            if "/back" in path:
                path += "/%02d/%s" % (num, leg)

            while 1:
                clt = self.clt
                status = ""
                try:
                    if clt.news_local() or first: # has the server changed
                        clt.sync_local()
                        first = 0
                        defs = clt.get_defs()
                        node = defs.find_abs_node(path)
                        if node:
                            status = "%s" % node.get_state()
                        low, high = process(node, self.path, 999, -1, self.task)
                        sleep = 30
                    else: sleep = 90
                    msg = "... %ds %s %s %d %d"% (sleep, path, status,
                                                  low, high)
                    print msg
                    if type(low) != type(high):
                        print type(low), type(high)

                    if status == "complete":
                        if leg == "legB":
                            num -= 1
                            leg = "legA"
                        else: leg = "legB"
                        break

                    elif low == high and status in (
                            "queued", "aborted", "complete") or node is None:
                        if node:
                            msg = "# %s %s" % (node.get_abs_node_path(), status)
                            print msg,
                            self.report(msg)
                        break
                    else:
                        self.report(msg)
                        self.report("min", low)
                        self.report("max", high)
                    sys.stdout.flush()
                    time.sleep(sleep)
                except RuntimeError, exp:
                    print str(exp)

def usage():
    """ help """
    print """client
  -o: operational node is to be replaced, default is test node,
      provide this option BEFORE -r
  -r: replace task node
  -p: path to look below, by default internal list MC_STEPS_LIST will be used
  -h: this help

ECF_NODE=localhost ECF_PORT=31415 ./client.py --path /mc/main/18bc/legA/fc -e

"""

if __name__ == '__main__':
    try:
        OPTS, ARGS = getopt.getopt(
            sys.argv[1:], "hep:rot:",
            ["help", "ens", "path", "replace", "oper", "task"])
    except getopt.GetoptError as err:
        print "# what?", usage()
        sys.exit(2)

    TEST = 1
    PATH = None
    TASK = None
    for o, a in OPTS:
        if o in ("-e", "--ens", ):
            Sweep(path=PATH).update()
            sys.exit(0)
        elif o in ("-r", "--replace", ):
            loader(TEST)
            sys.exit(0)
        elif o in ("-p", "--path", ):
            PATH = a
        elif o in ("-o", "--oper", ):
            TEST = 0
        elif o in ("-t", "--task", ):
            TASK = a
    CLT = Sweep(path=PATH, task=TASK)
    CLT.update()
    CLT.report("stop")

Sweeper is used in operation to start product generation earlier.

Center

Image Added