- Created by Axel Bonet, last modified on Apr 13, 2020
Thanks to python syntax, provided there is no need to use include to merge code, one single python script can be used on the command line, but also as a task wrapper with no change, and it may provide the ability to load the task into the expected server/suite for operation and test.
meter sweeper Expand source
#!/usr/bin/env python
"""
python docstring: demonstrate python cli + ecFlow task wrapper + task loader
$manual
DESCRIPTION:
sweeper:
find out min/max among modeleps_nemo tasks
OPERATORS: please, set complete if problematic
ANALYST: example as pure python task - job
$end
$comment
comments can be added ...
$end
"""
import time
import os.path
import getopt
import sys
PATH = "/usr/local/apps/ecflow/4.0.9/lib/python2.7/site-packages/ecflow"
sys.path.insert(0, PATH)
import ecflow as ec
MICRO = "$$" # double dollar to please ecFlow micro character balance
TARGET_TASK = "modeleps_nemo" # use option -t to overwrite
TARGET_METER = "step" # would this need an option???
MC_STEPS_LIST = ("/mc/main/12/legA/fc", # use option -p for single path
"/mc/main/12/legB/fc",
"/mc/main/18bc/legA/fc",
"/mc/main/00/legA/fc",
"/mc/main/00/legB/fc",
"/mc/main/06bc/legA/fc",
"/mofc/thu/01/legC/fc",
"/mofc/thu/hind/14/back",
"/mofc/mon/01/legC/fc",
"/mofc/mon/hind/14/back",)
def focus_below(node):
""" continue down below nodes which name is expected ;
please update for external use...
"""
return (node.name() in ("00", "12", "18bc", "06bc",
"legA", "legB", "pf", "cf", "fc")
or "main" == node.name())
def process(item, path=None, low=999, high=-1, task=None):
""" track nodes to follow and update min/max """
if type(item) == str:
defs = ec.Defs(item)
elif isinstance(item, ec.Client):
item.sync_local()
defs = item.get_defs()
elif isinstance(item, ec.Defs):
defs = item
else: defs = None
if defs:
for node in defs.suites:
if node.name() != str(path.split('/')[1]):
continue
status = "%s" % node.get_state()
if node.is_suspended() or status == "unknown":
continue
for item in node.nodes:
low, high = process(item, path, low, high, task)
elif isinstance(item, ec.Family):
for node in item.nodes:
if focus_below(node):
return process(node, path, low, high, task)
low, high = process(node, path, low, high, task)
elif isinstance(item, ec.Task):
node = item
path2node = item.get_abs_node_path()
status = "%s" % node.get_state()
# ignore cal-val # ecmwf specific
if ("/cv/" in path2node or
not task in item.name()): return low, high
if "/cv/" in path2node and (high > 240 or low < -1):
high = 240
return low, high
stamp = ""
evt = ""
meter = ""
if node.is_suspended():
status = "suspended"
try:
for att in node.meters:
meter = "%s" % att.value()
if att.name() != TARGET_METER:
continue
if att.value() > high:
high = att.value()
elif att.value() < low:
low = att.value()
except Exception as excpt:
print("#! problem with line:", excpt)
# name = "%20s %s %5s %5s" % (path2node, stamp, evt, meter)
else: print type(item)
return low, high
def create_task():
""" add ecFlow task into a family..."""
sys.path.append("/home/ma/emos/o/def")
import ecf
steps = 360
return ecf.Task("sweeper").add(
ecf.Variables(
ECF_FILES=os.getenv("ECF_FDI", None),
ECF_EXTN=".py",
ECF_JOB_CMD="ssh -x $WSHOST$ $ECF_JOB$ > $ECF_JOBOUT$ 2>&1 &",
ECF_MICRO=MICRO[0],
),
ecf.Label("info", ""),
ecf.Defcomplete(),
ecf.Meter("min", -1, steps, steps),
ecf.Meter("max", -1, steps, steps),)
def loader(test=1, host=os.getenv("ECF_TEST_HOST", None)):
""" replace sweeper task in test/oper mode """
sys.path.append("/home/ma/emos/o/def")
import ecf
if test:
env = {"host": os.getenv("ECF_TEST_HOST", None),
"port": os.getenv("ECF_TEST_PORT", None),
"path": "/admine/steps", }
else:
env = {"host": os.getenv("ECF_OPER_HOST", None),
"port": os.getenv("ECF_OPER_PORT", None),
"path": "/admin/steps", }
defs = ecf.Defs()
path = env["path"]
sname = str(path.split('/')[1])
fname = str(path.split('/')[2])
suite = ecf.Suite(sname).add(
ecf.Extern(MC_STEPS_LIST, defs),
ecf.Family(fname).add(create_task().add(
ecf.Cron("04:30 23:59 03:00"),
ecf.Trigger("==active or ".join(MC_STEPS_LIST) + "==active"),
ecf.Variables(WSHOST=host,
QUEUE="test"),
)))
defs.add_suite(suite)
print "#MSG: replacing", path, env["host"], env["port"]
ecf.Client(env["host"], env["port"]).replace(path, defs, 1, 1)
class Sweep(object):
""" find min/max meter value below a node """
def __init__(self, host=None, port=None, path=None, task=None):
import signal
if host is None:
host = os.getenv("ECF_NODE", "$ECF_NODE$")
if port is None:
port = int(os.getenv("ECF_PORT", "$ECF_PORT$"))
self.clt = ec.Client(host, port)
self.cl2 = None
if path is None:
self.path = MC_STEPS_LIST
else: self.path = (path)
if task is None:
self.task = TARGET_TASK
else: self.task = task
# shall make sense when processed into job by ecflow
# name remains when not processed...
ecfv = {"ECF_NODE": "$ECF_NODE$",
"ECF_PASS": "$ECF_PASS$",
"ECF_NAME": "$ECF_NAME$",
"ECF_PORT": "$ECF_PORT$",
"ECF_TRYNO": "$ECF_TRYNO$", }
for key, val in ecfv.items():
if key in val:
ecfv[key] = os.getenv(key, None)
if ecfv["ECF_NODE"] and ecfv["ECF_NAME"]:
print "#MSG will communicated with server..."
print "#kill: ssh %s kill -15 %d" % (ecfv["ECF_NODE"], os.getpid())
self.cl2 = ec.Client()
self.cl2.set_host_port(ecfv["ECF_NODE"], ecfv["ECF_PORT"])
self.cl2.set_child_pid(os.getpid())
self.cl2.set_child_path(ecfv["ECF_NAME"])
self.cl2.set_child_password(ecfv["ECF_PASS"])
self.cl2.set_child_try_no(int(ecfv["ECF_TRYNO"]))
self.cl2.child_init()
self.cl2.set_child_timeout(20)
for sig in (signal.SIGINT,
signal.SIGHUP,
signal.SIGQUIT,
signal.SIGILL,
signal.SIGTRAP,
signal.SIGIOT,
signal.SIGBUS,
signal.SIGFPE,
signal.SIGUSR1,
signal.SIGUSR2,
signal.SIGPIPE,
signal.SIGTERM,
signal.SIGXCPU,
signal.SIGPWR):
signal.signal(sig, self.signal_handler)
def signal_handler(self, signum, frame):
""" catch signal """
print 'Aborting: Signal handler called with signal ', signum
self.cl2.child_abort("Signal handler called with signal " + str(signum))
def report(self, msg, meter=None):
""" communicate with ecFlow server """
if not self.cl2:
return
if meter:
self.cl2.child_meter(msg, meter)
elif msg == "stop":
self.cl2.child_complete()
sys.exit(0)
else: self.cl2.child_label("info", msg)
def update(self):
""" refresh status tree asking ecFlow server """
num = 20
leg = "legA"
for path in self.path:
first = 1
low = 0
high = 0
self.clt.ch_register(False, [str(path.split('/')[1])])
if "/back" in path:
path += "/%02d/%s" % (num, leg)
while 1:
clt = self.clt
status = ""
try:
if clt.news_local() or first: # has the server changed
clt.sync_local()
first = 0
defs = clt.get_defs()
node = defs.find_abs_node(path)
if node:
status = "%s" % node.get_state()
low, high = process(node, self.path, 999, -1, self.task)
sleep = 30
else: sleep = 90
msg = "... %ds %s %s %d %d"% (sleep, path, status,
low, high)
print msg
if type(low) != type(high):
print type(low), type(high)
if status == "complete":
if leg == "legB":
num -= 1
leg = "legA"
else: leg = "legB"
break
elif low == high and status in (
"queued", "aborted", "complete") or node is None:
if node:
msg = "# %s %s" % (node.get_abs_node_path(), status)
print msg,
self.report(msg)
break
else:
self.report(msg)
self.report("min", low)
self.report("max", high)
sys.stdout.flush()
time.sleep(sleep)
except RuntimeError, exp:
print str(exp)
def usage():
""" help """
print """client
-o: operational node is to be replaced, default is test node,
provide this option BEFORE -r
-r: replace task node
-p: path to look below, by default internal list MC_STEPS_LIST will be used
-h: this help
ECF_NODE=localhost ECF_PORT=31415 ./client.py --path /mc/main/18bc/legA/fc -e
"""
if __name__ == '__main__':
try:
OPTS, ARGS = getopt.getopt(
sys.argv[1:], "hep:rot:",
["help", "ens", "path", "replace", "oper", "task"])
except getopt.GetoptError as err:
print "# what?", usage()
sys.exit(2)
TEST = 1
PATH = None
TASK = None
for o, a in OPTS:
if o in ("-e", "--ens", ):
Sweep(path=PATH).update()
sys.exit(0)
elif o in ("-r", "--replace", ):
loader(TEST)
sys.exit(0)
elif o in ("-p", "--path", ):
PATH = a
elif o in ("-o", "--oper", ):
TEST = 0
elif o in ("-t", "--task", ):
TASK = a
CLT = Sweep(path=PATH, task=TASK)
CLT.update()
CLT.report("stop")
Sweeper is used in operation to start product generation earlier.
