Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Section
bordertrue
Column

Setting the environment is done calling

Code Block
module load ecflow

When module is not available on a platform, use is the original way to set PATH and PYTHONPATH variables

Code Block
use ecflow

It is possible to setup a specific version with

Code Block
module unload ecflow; module load ecflow/4.0.2

Server can be started with

Code Block
ecflow_start.sh

Client command can be called to get the self-contained documentation

Code Block
ecflow_client --help

and the graphical interface is started with

Code Block
ecflowview; 
# line below shall add localhost as part the ecflowview->Servers list
grep localhost $HOME/.ecflowrc/servers || \
  echo "localhost $(uname -n) $(( 1500 + $(id -g))) \
  >> $HOME/ecflowrc/servers
Column

Server administrator directory is $HOME/ecflow_server/ which will contain the server log file, the check point file (binary snapshot of the server content). It is defined as variable ECF_HOME on the top node.

 

Next step is to load a suite into the server. The following python script can be used for suite definition, to expand the suite into a file, and to load it into the serve, as shown in the ecflowview snapshot below

Code Block
themeEmacs
languagepy
titlesuite definition and loading
collapsetrue
#!/usr/bin/env python
# -*- coding= UTF-8 -*-
""" course 2014 
sample suite to spawn jobs on main available cpu facilities
at  ECMWF 
"""

from __future__ import with_statement
import sys, os, pwd, getopt
sys.path.append('/home/ma/emos/def/o/def')
import inc_emos as ic
from inc_emos import Family, Task, Variables, Meter, Event, \
    Label, Limit, If, Complete, Time, Trigger, Client
from ecf import get_username, Clock, Defstatus, Inlimit, Repeat, get_uid, Defs

# import ecf; ecf.USE_TRIGGER = 0 # DEBUG MODE: not to load triggers

def create_task():
    """ create example tasks wrappers and include files"""
    ### task
    content = """#!/bin/ksh
%manual 
manual - this task is automatically created by cray.py
%end
%include <qsub.h>
if [[ $ARCH = hp* ]]; then export PATH=/usr/local/bin:$PATH; fi
if [[ $HOST = lxop* ]]; then export PATH=/usr/local/apps/ecflow/current:/usr/local/apps/sms/bin:$PATH; fi
%include <trap.h>
SLEEP=%SLEEP:0%
echo OK

case %ECF_PORT:0% in
0) base=900000; LOGPORT=%LOGPORT:0%;;
*) base=1000; LOGPORT=%LOGPORT:0%;;
esac
base=1000

case $ARCH in
cray)xlabel info $(printenv | grep -E '(SUBMIT_|EC_)') ;;
*) xlabel info OK
esac
  
printenv | sort
xevent 1
step=0
while (( $step <= 12 )); do
  xmeter step $step
  ((step = step + 1))
  sleep $SLEEP
done
%include <endt.h>
"""
    create_wrapper("test.sms", content)

    ### PYTHON TASK
    content = """$include <python_header.h>
# header files are located in the same directory as wrapper (quotes)
for step in range(0,101):
    print step
    xmeter("step", step)
else:
    print 'the loop is over'
xevent("1")
xlabel("info", "news from pure python world")

$include <python_endt.h>
"""
    create_wrapper("python.sms", content)

    content = """#!/usr/bin/env python
import os
import sys
import signal 

ECF_PORT=$ECF_PORT:0$
XECF="/usr/local/apps/ecflow/current/bin/ecflow_client ";
# --port $ECF_PORT:0$ --host $ECF_NODE:0$ ";
def SigHandler(signum, frame):
   print "caught signal " + signum
   xabort()
   sys.exit(0);
   return

print ECF_PORT
# set -eux ?
# trap 0 ?
# time stamp per executed line

import atexit
early_exit = True
@atexit.register
def goodbye():
  if early_exit: 
    print "too early"
    xabort()
  else: 
    xcomplete()

# TIME_STAMP
# http://shop.oreilly.com/product/9780596007973.do # recipie p436
import syslog, time

class FunctionFileLikeWrapper():
    def __init__(self, func): self.func = func
    def write(self, msg): self.func(msg)
    def flush(self): pass

class TimeStamper(object):
    msg_format = "[%y%m%d %H:%M:%S]", time.gmtime, "%s: %s"
    msg_format = "+ %H:%M:%S", time.gmtime, "%s %s"

    def __call__(self, msg):
        tfmt, tfun, gfmt = self.msg_format
        return "%s %s\\\n" % (time.strftime(tfmt, tfun()), msg)

class TeeFileLikeWrapper():
    def __init__(self, *files): self.files = files
    def write(self, msg): 
        for f in self.files: f.write(timestamp(msg.strip()))

class FlushingWrapper:
    def __init__(self, *files): self.files = files
    def write(self, msg):
        for f in self.files:
            f.write(timestamp(msg))
            # f.write(timestamp(msg.strip()))
            f.flush()

def logto(*files):
    # sys.stdout = TeeFileLikeWrapper(*files)
    sys.stdout = FlushingWrapper(*files)

syslogger = syslog.syslog
syslogfile = FunctionFileLikeWrapper(syslogger)
timestamp = TimeStamper()
logto(sys.stdout, syslogfile, open("log.tmp", "w"))
# end time stamp

if ECF_PORT > 0:
  os.environ['ECF_PORT'] = "$ECF_PORT:0$"
  os.environ['ECF_NAME'] = "$ECF_NAME:0$"
  os.environ['ECF_NODE'] = "$ECF_NODE:0$"
  os.environ['ECF_PASS'] = "$ECF_PASS:0$"

  def xinit():
    os.system(XECF + " --init " + str(os.getpid()))  
    print "init"
  def xabort():
    os.system(XECF + " --abort")  
  def xcomplete():
    os.system(XECF + " --complete")  
  def xmeter(name, step):
    os.system(XECF + " --meter " + name + " " + str(step))  
  def xevent(name):
    os.system(XECF + " --event " + name)  
  def xlabel(name, msg):
    os.system(XECF + " --label " + name + " '%s'" % msg)  
else:
  os.environ['SMS_PROG'] = "$SMS_PROG:0$"
  os.environ['SMSNAME'] = "$SMSNAME:0$"
  os.environ['SMSNODE'] = "$SMSNODE:0$"
  os.environ['SMSPASS'] = "$SMSPASS:0$"

  def xinit():
    os.system('smsinit ' + str(os.getpid()))
  def xabort():
    os.system('smsabort')
  def xcomplete():
    os.system('smscomplete')
  def xmeter(name, step):
    os.system("smsmeter " + name + " " + str(step))
  def xevent(name):
    os.system("smsevent " + name)
  def xlabel(name, msg):
    os.system("smslabel " + name + " '%s'" % msg)

signal.signal (signal.SIGHUP,  SigHandler)
signal.signal (signal.SIGINT,  SigHandler)
signal.signal (signal.SIGQUIT, SigHandler)
signal.signal (signal.SIGILL,  SigHandler)
signal.signal (signal.SIGTRAP, SigHandler)
signal.signal (signal.SIGIOT,  SigHandler)
signal.signal (signal.SIGBUS,  SigHandler)
signal.signal (signal.SIGFPE,  SigHandler)

"""
    create_wrapper("python_header.h", content)

    content = """# os.system('smscomplete')
early_exit = False
# xcomplete() # managed with atexit
# os.system('/usr/local/apps/sms/bin/ecflow/bin/ecf_client --complete')
"""
    create_wrapper("python_endt.h", content)

    ### PERL TASK
    create_wrapper("perl.sms", """#!/usr/bin/perl -w
^include <perl_header.h>
# header files are located in the same directory as wrapper (quotes)
print "Pure perl SMS task";
for ( my $step=1; $step <= 100 ; $step++ ) {
   print "this is the number $step\\\n";
   xmeter("step", $step);
}
xevent("1");
            
xlabel("info", "news from pure perl world");      
^include <perl_endt.h>
""")
    # create_wrapper("perl.sms", content)

    source = """use strict;

my $xmeter = "smsmeter"; my $arg_m = "";
my $xlabel = "smslabel"; my $arg_l = "";
my $xevent = "smsevent"; my $arg_e = "";
my $xcomplete = "smscomplete"; my $arg_c = "";
my $xabort = "smsabort";

if (^ECF_PORT:0^ != 0) {
$ENV{'ECF_PORT'}  = "^ECF_PORT:0^" ;  # ecFlow port number
$ENV{'ECF_NODE'}  = "^ECF_NODE:0^"  ; # ecFlow host
$ENV{'ECF_NAME'}  = "^ECF_NAME:0^"  ; # task path into the suite
$ENV{'ECF_PASS'}  = "^ECF_PASS:0^"  ; # password for the job
$ENV{'ECF_TRYNO'} = "^ECF_TRYNO:0^" ; # job occurence number
my $client = "/usr/local/apps/ecflow/current/bin/ecflow_client";
$xmeter = $client; $arg_m = "--meter";
$xlabel = $client; $arg_l = "--label";
$xevent = $client; $arg_e = "--event";
$xcomplete = $client; $arg_c = "--complete";
$xabort = $client; 

system($client, "--init", "$$");
} else {
  $ENV{'SMS_PROG'} = "^SMS_PROG:0^" ; # SMS Program Number
  $ENV{'SMSNODE'}  = "^SMSNODE:0^"  ; # SMS host
  $ENV{'SMSNAME'}  = "^SMSNAME:0^"  ; # task path into the suite
  $ENV{'SMSPASS'}  = "^SMSPASS:0^"  ; # password for the job occurence
  $ENV{'SMSTRYNO'} = "^SMSTRYNO:0^" ; # job occurence number
}

sub xmeter($$){ my ($name, $step) = @_; 
  system($xmeter, $arg_m, $name, $step); }
sub xevent($){ my ($name) = @_; 
  system($xevent, $arg_e, $name); }
sub xlabel($$){ my ($name, $msg) = @_; 
  system($xlabel, $arg_l, $name, $msg); }
sub xabort(){ system($xabort); }
sub xcomplete(){ system($xcomplete, $arg_c, "$$"); }

print "start";
eval '
"""
    create_wrapper("perl_header.h", source)

    source = """';
if ($@){
    print "caught signal: $@\n";
    xabort();
    exit;
  }
print "the job is now complete\n";
xcomplete();
exit;"""
    create_wrapper("perl_endt.h", source)

def create_wrapper(name, content):
    """ wrapper creation """
    print "#MSG: creating file %s/" % wdir + name
    wrapper = open(wdir + "/%s" % name, 'w')
    print >> wrapper, content
    wrapper.close()    
##############################################


def dummy(): 
    """ create test task"""
    return Task("test")

def limits():
    """create limits famly"""
    return (Family("limits").add(
            Defstatus("complete"),
            Limit("lim", 10),
            Limit("test", 10),
            Limit("hpc", 10),
            Limit("cca", 10),
            Limit("c2a", 10),
            Inlimit(ic.psel() + "/limits:lim")))

HCRAY = "cct"
SUBMIT = ic.SUBM + " %USER% %SCHOST% %ECF_JOB% %ECF_JOBOUT% submit"
GSUB = SUBMIT.replace("%SCHOST%", "%SCHOST% %ECF_RID%")
KILL = GSUB.replace(" submit", " kill")
STATUS = GSUB.replace(" submit", " status")
CHECK = STATUS

def unit(name, schost, queue, rdir, account="", leaf=True):
    """ course suite tree leave family"""
    return Family(name).add(
        If(account != "",
           Variables(ACCOUNT= account)),
        Variables(QUEUE=    queue, 
                  SCHOST=   schost,
                  ECF_OUT=  rdir,
                  LOGDIR=   rdir,),        
        If (leaf, Task("test").add(
            Event(1),
            Meter("step", -1, 120, 100),
            Label("info", "nop"))))


def call_python():
    """ pure python task example"""
    return Task("python").add(
        Variables(ECF_MICRO= "$",
                  ECF_INCLUDE= wdir,
                  SCHOST= "localhost",
                  ECF_JOB_CMD= "$ECF_JOB$ > $ECF_JOBOUT$ 2>&1 &"),
        Event("1"),
        Label("info", "micro is $"),
        Meter("step", -1, 100),
        # Defstatus("complete"),
        )

def call_perl():
    """ pure perl task example"""
    return Task("perl").add(
        Label("todo", "time-stamp PS4, set eux, exit 0 1"),
        Variables(ECF_MICRO= "^",
                  ECF_INCLUDE= wdir,
                  SCHOST= "localhost",
                  ECF_JOB_CMD= "^ECF_JOB^ > ^ECF_JOBOUT^ 2>&1 &"),
        Event("1"),
        Label("info", "micro is ^"),
        Meter("step", -1, 100),
        )

##########################################################################

class Course(ic.SeedOD):
    """ example class for a suite suite definition"""
    def __init__(self):
        super(Course, self).__init__()

    def setup(self, node):
        account = "UNSET"
        global user, host, SUITE
        self.defs.add_extern("/o/main:YMD")

        node.add(
            Clock("real"),
            Defstatus("suspended"),
            Event("1"),
            Meter("step", -1, 100),
            Label("info", "click edit to update manually"),
            limits(),

            Variables(ECF_JOB_CMD= SUBMIT,
                      ECF_KILL_CMD= KILL,
                      ECF_STATUS_CMD= STATUS,
                      ECF_CHECK_CMD= CHECK,
                      ECF_EXTN= ".sms",
                      ACCOUNT=  "UNSET",
                      SCHOST= "localhost",
                      ECF_INCLUDE= idir,
                      ECF_HOME= jdir,
                      ECF_FILES= wdir,
                      QUEUE= "ns", USER= user, LOGDIR= "/tmp",
                      TOPATH= udir + "/logs",
                      SLEEP=    1,     # x120
                      ECF_TRIES= 1,),

            call_python(),

            call_perl(),
            )

        msg = "have you? setup ssh login, created remote directory" 
        msg += ", considered the need for a logserver"

        node.family("main").add(
            Repeat(name="YMD", kind="date", 
                   start=20010101, end=20991231, step=1),
            Family("00").add(
                Task("REPLACE_ME").add(Defstatus("complete"))
                ),
            Family("loop").add(
                Label("info", "prevent direct repeat increment"),
                Trigger("./00==complete"),
                Complete("/%s:1" % node.name()),
                Time("12:00")))

        node = node.family("submit").add(
            
            Variables(USER=     user,
                      ACCOUNT=  account),

            self.submits(),
            )

    def submits(self):
        # global HOST, user
        ct1logs = "/sc1/sb/%s/logs" % user
        out = [ unit("cca", "cca", "ns",    ct1logs)  ]      

        out += (unit("localhost", "localhost", "ns", "/tmp/%s/logs" % user),
                unit("lxab", "lxab", "serial", udir + "/logs"),
                unit("ecgb", "ecgb", "ns", udir + "/logs"),
                )
        return out

def usage():
    print sys.argv[0] + ''' -h -u [user] -n [node] -s [suite-name] \
 -e: ecflow'''

if __name__ == "__main__":
    global user, udir, ecflow, HOST
    SUITES = {"course": Course,
              }
    user = get_username()
    try:    HOST = sys.argv[-1]
    except: HOST = "localhost"
    ECFLOW  = True
    SUITE   = None
    user = get_username()
    opts, args = getopt.getopt(
          sys.argv[1:], "hp:u:es:n:p:", 
          ["help", "port", "user", "ecflow", "suite", "node", "path"])

    output = None
    verbose = False
    for o, a in opts:
        if   o in ("-n", "--node"):   
            HOST = a
        elif o in ("-h", "--help"):   
            usage()
            sys.exit()
        elif o in ("-s", "--suite"):  
            SUITE =  a
        elif o in ("-p", "--path"):   
            path =  a
        elif o in ("-e", "--ecflow"): 
            ecflow = True
        elif o in ("-u", "--user"):
            user = a
        elif o in ("-p", "--port"):
            port = a
        else: print "#ERR: what?", o, a; assert False, "unhandled option"

    ########### SETTINGS ################ WRAPPERS
    global wdir, rdir, fdef, jdir
    udir = pwd.getpwnam(user).pw_dir
    wdir = udir + "/ecflow_server/course"
    jdir = "/tmp/%s/ecflow" % user
    ddef = udir + "/ecflow_server"
    fdef = ddef + "/course.def"
    idir = "$HOME/ecflow_server/course/include"

    try:    
        os.makedirs(wdir)
    except: 
        pass
    try:    
        os.makedirs(jdir)
    except: 
        pass
    create_task()

    ########### SETTINGS ################ LOAD
    if 1: 
        client = Client(sys.argv[3], 1000 + int(get_uid()))
        top = Course().suite()
        if 1:
          out = file("%s.exp" % top.name(), 'w'); print >>out, top
        defs = Defs()
        defs.add_suite(top)
        client.replace(sys.argv[2], defs)
        sys.exit(0)

example suiteImage Added