ecFlow's documentation is now on readthedocs!

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 15 Next »

Setting the environment is done calling

module load ecflow

When module is not available on a platform, use is the original way to set PATH and PYTHONPATH variables

use ecflow

It is possible to setup a specific version with

module unload ecflow; module load ecflow/4.0.2

Server can be started with

ecflow_start.sh

Client command can be called to get the self-contained documentation

ecflow_client --help

and the graphical interface is started with

ecflowview; 
# line below shall add localhost as part of 
# the ecflowview->Servers list
grep localhost $HOME/.ecflowrc/servers || \
  echo "localhost $(uname -n) $((1500 + $(id -g)))\
  >> $HOME/ecflowrc/servers

Server administrator directory is $HOME/ecflow_server/ which will contain the server log file, the check point file (binary snapshot of the server content). It is defined as variable ECF_HOME on the top node.

Once the server and its GUI are started, click on Servers->localhost to connect the first time.

 

Next step is to load a suite into the server. The following python script can be used for suite definition, to expand the suite into a file, and to load it into the server, as shown in the ecflowview snapshot below

suite definition and loading
#!/usr/bin/env python
# -*- coding= UTF-8 -*-
""" course 2014 
sample suite to spawn jobs on main available cpu facilities
at  ECMWF 
"""

from __future__ import with_statement
import sys, os, pwd, getopt
sys.path.append('/home/ma/emos/def/o/def')
import inc_emos as ic
from inc_emos import Family, Task, Variables, Meter, Event, \
    Label, Limit, If, Complete, Time, Trigger, Client
from ecf import get_username, Clock, Defstatus, Inlimit, Repeat, get_uid, Defs

# import ecf; ecf.USE_TRIGGER = 0 # DEBUG MODE: not to load triggers

def create_task():
    """ create example tasks wrappers and include files"""
    ### task
    content = """#!/bin/ksh
%manual 
manual - this task is automatically created by cray.py
%end
%include <qsub.h>
if [[ $ARCH = hp* ]]; then export PATH=/usr/local/bin:$PATH; fi
if [[ $HOST = lxop* ]]; then export PATH=/usr/local/apps/ecflow/current:/usr/local/apps/sms/bin:$PATH; fi
%include <trap.h>
SLEEP=%SLEEP:0%
echo OK

case %ECF_PORT:0% in
0) base=900000; LOGPORT=%LOGPORT:0%;;
*) base=1000; LOGPORT=%LOGPORT:0%;;
esac
base=1000

case $ARCH in
cray)xlabel info $(printenv | grep -E '(SUBMIT_|EC_)') ;;
*) xlabel info OK
esac
  
printenv | sort
xevent 1
step=0
while (( $step <= 12 )); do
  xmeter step $step
  ((step = step + 1))
  sleep $SLEEP
done
%include <endt.h>
"""
    create_wrapper("test.sms", content)

    ### PYTHON TASK
    content = """$include <python_header.h>
# header files are located in the same directory as wrapper (quotes)
for step in range(0,101):
    print step
    xmeter("step", step)
else:
    print 'the loop is over'
xevent("1")
xlabel("info", "news from pure python world")

$include <python_endt.h>
"""
    create_wrapper("python.sms", content)

    content = """#!/usr/bin/env python
import os
import sys
import signal 

ECF_PORT=$ECF_PORT:0$
XECF="/usr/local/apps/ecflow/current/bin/ecflow_client ";
# --port $ECF_PORT:0$ --host $ECF_NODE:0$ ";
def SigHandler(signum, frame):
   print "caught signal " + signum
   xabort()
   sys.exit(0);
   return

print ECF_PORT
# set -eux ?
# trap 0 ?
# time stamp per executed line

import atexit
early_exit = True
@atexit.register
def goodbye():
  if early_exit: 
    print "too early"
    xabort()
  else: 
    xcomplete()

# TIME_STAMP
# http://shop.oreilly.com/product/9780596007973.do # recipie p436
import syslog, time

class FunctionFileLikeWrapper():
    def __init__(self, func): self.func = func
    def write(self, msg): self.func(msg)
    def flush(self): pass

class TimeStamper(object):
    msg_format = "[%y%m%d %H:%M:%S]", time.gmtime, "%s: %s"
    msg_format = "+ %H:%M:%S", time.gmtime, "%s %s"

    def __call__(self, msg):
        tfmt, tfun, gfmt = self.msg_format
        return "%s %s\\\n" % (time.strftime(tfmt, tfun()), msg)

class TeeFileLikeWrapper():
    def __init__(self, *files): self.files = files
    def write(self, msg): 
        for f in self.files: f.write(timestamp(msg.strip()))

class FlushingWrapper:
    def __init__(self, *files): self.files = files
    def write(self, msg):
        for f in self.files:
            f.write(timestamp(msg))
            # f.write(timestamp(msg.strip()))
            f.flush()

def logto(*files):
    # sys.stdout = TeeFileLikeWrapper(*files)
    sys.stdout = FlushingWrapper(*files)

syslogger = syslog.syslog
syslogfile = FunctionFileLikeWrapper(syslogger)
timestamp = TimeStamper()
logto(sys.stdout, syslogfile, open("log.tmp", "w"))
# end time stamp

if ECF_PORT > 0:
  os.environ['ECF_PORT'] = "$ECF_PORT:0$"
  os.environ['ECF_NAME'] = "$ECF_NAME:0$"
  os.environ['ECF_NODE'] = "$ECF_NODE:0$"
  os.environ['ECF_PASS'] = "$ECF_PASS:0$"

  def xinit():
    os.system(XECF + " --init " + str(os.getpid()))  
    print "init"
  def xabort():
    os.system(XECF + " --abort")  
  def xcomplete():
    os.system(XECF + " --complete")  
  def xmeter(name, step):
    os.system(XECF + " --meter " + name + " " + str(step))  
  def xevent(name):
    os.system(XECF + " --event " + name)  
  def xlabel(name, msg):
    os.system(XECF + " --label " + name + " '%s'" % msg)  
else:
  os.environ['SMS_PROG'] = "$SMS_PROG:0$"
  os.environ['SMSNAME'] = "$SMSNAME:0$"
  os.environ['SMSNODE'] = "$SMSNODE:0$"
  os.environ['SMSPASS'] = "$SMSPASS:0$"

  def xinit():
    os.system('smsinit ' + str(os.getpid()))
  def xabort():
    os.system('smsabort')
  def xcomplete():
    os.system('smscomplete')
  def xmeter(name, step):
    os.system("smsmeter " + name + " " + str(step))
  def xevent(name):
    os.system("smsevent " + name)
  def xlabel(name, msg):
    os.system("smslabel " + name + " '%s'" % msg)

signal.signal (signal.SIGHUP,  SigHandler)
signal.signal (signal.SIGINT,  SigHandler)
signal.signal (signal.SIGQUIT, SigHandler)
signal.signal (signal.SIGILL,  SigHandler)
signal.signal (signal.SIGTRAP, SigHandler)
signal.signal (signal.SIGIOT,  SigHandler)
signal.signal (signal.SIGBUS,  SigHandler)
signal.signal (signal.SIGFPE,  SigHandler)

"""
    create_wrapper("python_header.h", content)

    content = """# os.system('smscomplete')
early_exit = False
# xcomplete() # managed with atexit
# os.system('/usr/local/apps/sms/bin/ecflow/bin/ecf_client --complete')
"""
    create_wrapper("python_endt.h", content)

    ### PERL TASK
    create_wrapper("perl.sms", """#!/usr/bin/perl -w
^include <perl_header.h>
# header files are located in the same directory as wrapper (quotes)
print "Pure perl SMS task";
for ( my $step=1; $step <= 100 ; $step++ ) {
   print "this is the number $step\\\n";
   xmeter("step", $step);
}
xevent("1");
            
xlabel("info", "news from pure perl world");      
^include <perl_endt.h>
""")
    # create_wrapper("perl.sms", content)

    source = """use strict;

my $xmeter = "smsmeter"; my $arg_m = "";
my $xlabel = "smslabel"; my $arg_l = "";
my $xevent = "smsevent"; my $arg_e = "";
my $xcomplete = "smscomplete"; my $arg_c = "";
my $xabort = "smsabort";

if (^ECF_PORT:0^ != 0) {
$ENV{'ECF_PORT'}  = "^ECF_PORT:0^" ;  # ecFlow port number
$ENV{'ECF_NODE'}  = "^ECF_NODE:0^"  ; # ecFlow host
$ENV{'ECF_NAME'}  = "^ECF_NAME:0^"  ; # task path into the suite
$ENV{'ECF_PASS'}  = "^ECF_PASS:0^"  ; # password for the job
$ENV{'ECF_TRYNO'} = "^ECF_TRYNO:0^" ; # job occurence number
my $client = "/usr/local/apps/ecflow/current/bin/ecflow_client";
$xmeter = $client; $arg_m = "--meter";
$xlabel = $client; $arg_l = "--label";
$xevent = $client; $arg_e = "--event";
$xcomplete = $client; $arg_c = "--complete";
$xabort = $client; 

system($client, "--init", "$$");
} else {
  $ENV{'SMS_PROG'} = "^SMS_PROG:0^" ; # SMS Program Number
  $ENV{'SMSNODE'}  = "^SMSNODE:0^"  ; # SMS host
  $ENV{'SMSNAME'}  = "^SMSNAME:0^"  ; # task path into the suite
  $ENV{'SMSPASS'}  = "^SMSPASS:0^"  ; # password for the job occurence
  $ENV{'SMSTRYNO'} = "^SMSTRYNO:0^" ; # job occurence number
}

sub xmeter($$){ my ($name, $step) = @_; 
  system($xmeter, $arg_m, $name, $step); }
sub xevent($){ my ($name) = @_; 
  system($xevent, $arg_e, $name); }
sub xlabel($$){ my ($name, $msg) = @_; 
  system($xlabel, $arg_l, $name, $msg); }
sub xabort(){ system($xabort); }
sub xcomplete(){ system($xcomplete, $arg_c, "$$"); }

print "start";
eval '
"""
    create_wrapper("perl_header.h", source)

    source = """';
if ($@){
    print "caught signal: $@\n";
    xabort();
    exit;
  }
print "the job is now complete\n";
xcomplete();
exit;"""
    create_wrapper("perl_endt.h", source)

def create_wrapper(name, content):
    """ wrapper creation """
    print "#MSG: creating file %s/" % wdir + name
    wrapper = open(wdir + "/%s" % name, 'w')
    print >> wrapper, content
    wrapper.close()    
##############################################


def dummy(): 
    """ create test task"""
    return Task("test")

def limits():
    """create limits famly"""
    return (Family("limits").add(
            Defstatus("complete"),
            Limit("lim", 10),
            Limit("test", 10),
            Limit("hpc", 10),
            Limit("cca", 10),
            Limit("c2a", 10),
            Inlimit(ic.psel() + "/limits:lim")))

HCRAY = "cct"
SUBMIT = ic.SUBM + " %USER% %SCHOST% %ECF_JOB% %ECF_JOBOUT% submit"
GSUB = SUBMIT.replace("%SCHOST%", "%SCHOST% %ECF_RID%")
KILL = GSUB.replace(" submit", " kill")
STATUS = GSUB.replace(" submit", " status")
CHECK = STATUS

def unit(name, schost, queue, rdir, account="", leaf=True):
    """ course suite tree leave family"""
    return Family(name).add(
        If(account != "",
           Variables(ACCOUNT= account)),
        Variables(QUEUE=    queue, 
                  SCHOST=   schost,
                  ECF_OUT=  rdir,
                  LOGDIR=   rdir,),        
        If (leaf, Task("test").add(
            Event(1),
            Meter("step", -1, 120, 100),
            Label("info", "nop"))))


def call_python():
    """ pure python task example"""
    return Task("python").add(
        Variables(ECF_MICRO= "$",
                  ECF_INCLUDE= wdir,
                  SCHOST= "localhost",
                  ECF_JOB_CMD= "$ECF_JOB$ > $ECF_JOBOUT$ 2>&1 &"),
        Event("1"),
        Label("info", "micro is $"),
        Meter("step", -1, 100),
        # Defstatus("complete"),
        )

def call_perl():
    """ pure perl task example"""
    return Task("perl").add(
        Label("todo", "time-stamp PS4, set eux, exit 0 1"),
        Variables(ECF_MICRO= "^",
                  ECF_INCLUDE= wdir,
                  SCHOST= "localhost",
                  ECF_JOB_CMD= "^ECF_JOB^ > ^ECF_JOBOUT^ 2>&1 &"),
        Event("1"),
        Label("info", "micro is ^"),
        Meter("step", -1, 100),
        )

##########################################################################

class Course(ic.SeedOD):
    """ example class for a suite suite definition"""
    def __init__(self):
        super(Course, self).__init__()

    def setup(self, node):
        account = "UNSET"
        global user, host, SUITE
        self.defs.add_extern("/o/main:YMD")

        node.add(
            Clock("real"),
            Defstatus("suspended"),
            Event("1"),
            Meter("step", -1, 100),
            Label("info", "click edit to update manually"),
            limits(),

            Variables(ECF_JOB_CMD= SUBMIT,
                      ECF_KILL_CMD= KILL,
                      ECF_STATUS_CMD= STATUS,
                      ECF_CHECK_CMD= CHECK,
                      ECF_EXTN= ".sms",
                      ACCOUNT=  "UNSET",
                      SCHOST= "localhost",
                      ECF_INCLUDE= idir,
                      ECF_HOME= jdir,
                      ECF_FILES= wdir,
                      QUEUE= "ns", USER= user, LOGDIR= "/tmp",
                      TOPATH= udir + "/logs",
                      SLEEP=    1,     # x120
                      ECF_TRIES= 1,),

            call_python(),

            call_perl(),
            )

        msg = "have you? setup ssh login, created remote directory" 
        msg += ", considered the need for a logserver"

        node.family("main").add(
            Repeat(name="YMD", kind="date", 
                   start=20010101, end=20991231, step=1),
            Family("00").add(
                Task("REPLACE_ME").add(Defstatus("complete"))
                ),
            Family("loop").add(
                Label("info", "prevent direct repeat increment"),
                Trigger("./00==complete"),
                Complete("/%s:1" % node.name()),
                Time("12:00")))

        node = node.family("submit").add(
            
            Variables(USER=     user,
                      ACCOUNT=  account),

            self.submits(),
            )

    def submits(self):
        # global HOST, user
        ct1logs = "/sc1/sb/%s/logs" % user
        out = [ unit("cca", "cca", "ns",    ct1logs)  ]      

        out += (unit("localhost", "localhost", "ns", "/tmp/%s/logs" % user),
                unit("lxab", "lxab", "serial", udir + "/logs"),
                unit("ecgb", "ecgb", "ns", udir + "/logs"),
                )
        return out

def usage():
    print sys.argv[0] + ''' -h -u [user] -n [node] -s [suite-name] \
 -e: ecflow'''

if __name__ == "__main__":
    global user, udir, ecflow, HOST
    SUITES = {"course": Course,
              }
    user = get_username()
    try:    HOST = sys.argv[-1]
    except: HOST = "localhost"
    ECFLOW  = True
    SUITE   = None
    user = get_username()
    opts, args = getopt.getopt(
          sys.argv[1:], "hp:u:es:n:p:", 
          ["help", "port", "user", "ecflow", "suite", "node", "path"])

    output = None
    verbose = False
    for o, a in opts:
        if   o in ("-n", "--node"):   
            HOST = a
        elif o in ("-h", "--help"):   
            usage()
            sys.exit()
        elif o in ("-s", "--suite"):  
            SUITE =  a
        elif o in ("-p", "--path"):   
            path =  a
        elif o in ("-e", "--ecflow"): 
            ecflow = True
        elif o in ("-u", "--user"):
            user = a
        elif o in ("-p", "--port"):
            port = a
        else: print "#ERR: what?", o, a; assert False, "unhandled option"

    ########### SETTINGS ################ WRAPPERS
    global wdir, rdir, fdef, jdir
    udir = pwd.getpwnam(user).pw_dir
    wdir = udir + "/ecflow_server/course"
    jdir = "/tmp/%s/ecflow" % user
    ddef = udir + "/ecflow_server"
    fdef = ddef + "/course.def"
    idir = "$HOME/ecflow_server/course/include"

    try:    
        os.makedirs(wdir)
    except: 
        pass
    try:    
        os.makedirs(jdir)
    except: 
        pass
    create_task()

    ########### SETTINGS ################ LOAD
    if 1: 
        client = Client(sys.argv[3], 1000 + int(get_uid()))
        top = Course().suite()
        if 1:
          out = file("%s.exp" % top.name(), 'w'); print >>out, top
        defs = Defs()
        defs.add_suite(top)
        client.replace(sys.argv[2], defs)
        sys.exit(0)

example suite

Clicking on each task, we can check the presence of task wrapper script (ECF_FILES defined properly), then Edit it, preprocess it (ECF_INCLUDE defined properly, no micro character on its own), and submit it, as a task or as an alias.

Consider Options->CloseOnApply/Submit when multiple aliases must be sent from the same task in a short time.

submit

 

When the task does not reach the active status, we shall check that ECF_OUT directory is existing on the remote host, check that expected rsh or ssh connection does not request password anymore,  or query the queueing system while the directive may not be valid (user account, queue), yet.

 

When the submit family is working for the expected remote host(s), time to fill the main family with relevant tasks. Enjoy!

  • No labels