ecFlow's documentation is now on readthedocs!
Sometimes waiting for the completion of a task is not good enough.
If a task is producing several results, another task may start as
soon as the first results are ready.
For that, ecFlow introduces the concept of event‘s.
An event is a message that a task will report to ecflow_server while it is running.

Events have names and a task can set several of them.


Ecf Script

We will create new tasks (t3, t4) that will be triggered by the events emitted in task t2.
Create the ecf script for t3 and t4 by copying t1.
To notify ecflow_server, the task (t2 in the example below)
must call ecflow_client –event which is one of the child command‘s


Amend $HOME/course/test/f1/t2.ecf
%include <head.h>
echo "I will now sleep for %SLEEP% seconds"
sleep %SLEEP%
ecflow_client --event=a       # Set the first event
sleep %SLEEP%                 # Sleep a bit more
ecflow_client --event=b       # Set the second event
sleep %SLEEP%                 # A last nap...
%include <tail.h>

Text

# Definition of the suite test.
suite test
   edit ECF_INCLUDE "$HOME/course"  # replace '$HOME' with the path to your home directory
   edit ECF_HOME    "$HOME/course"
   family f1
     edit SLEEP 20
     task t1
     task t2
         trigger t1 eq complete
         event a
         event b
     task t3
         trigger t2:a
     task t4
         trigger t2:b
   endfamily
endsuite

Python

$HOME/course/test.py
import os
from ecflow import Defs,Suite,Family,Task,Edit,Trigger,Event

def create_family_f1():
    return Family("f1",
                Edit(SLEEP=20),
                Task("t1"),
                Task("t2",
                    Trigger("t1 == complete"),
                    Event("a"),
                    Event("b")),
                Task("t3",
                    Trigger("t2:a")),
                Task("t4",
                    Trigger("t2:b")))
             
print("Creating suite definition")
home = os.path.join(os.getenv("HOME"), "course")
defs = Defs( 
        Suite("test",
            Edit(ECF_INCLUDE=home,ECF_HOME=home),
            create_family_f1()))
print(defs) 

print("Checking job creation: .ecf -> .job0")  
print(defs.check_job_creation())

print("Saving definition to file 'test.def'")
defs.save_as_defs("test.def")

What to do

  1. Update test.def or test.py
  2. Edit t2.ecf to call ecflow_client –event.
  3. Copy t1.ecf to t3.ecf and t4.ecf
  4. Replace the suite
    python:  python3 test.py; python3 client.py
    text:       ecflow_client --suspend=/test ; ecflow_client --replace=/test test.def
  5. Observe the tasks in ecflow_ui
  6. See the triggers by selecting t2/t3 and clicking on the trigger tab
      

1 Comment

  1. Unknown User (ma0)

    Alternative styles

    import os
    import ecflow 
    
    def create_family_f1():
        f1 = ecflow.Family("f1")
        f1.add_variable("SLEEP", 20)
        f1.add_task("t1") 
        
        t2 = f1.add_task("t2")  
        t2.add_trigger("t1 eq complete") 
        t2.add_event("a")
        t2.add_event("b")
        
        f1.add_task("t3").add_trigger("t2:a")  
        f1.add_task("t4").add_trigger("t2:b")  
        return f1
    
    print("Creating suite definition")
    defs = ecflow.Defs()
    suite = defs.add_suite("test")
    suite.add_variable("ECF_INCLUDE", os.path.join(os.getenv("HOME"), "course"))
    suite.add_variable("ECF_HOME",    os.path.join(os.getenv("HOME"), "course"))
    
    suite.add_family( create_family_f1() )
    print(defs)
    
    print("Checking job creation: .ecf -> .job0")   
    print(defs.check_job_creation())
    
    print("Checking trigger expressions")
    print(defs.check())
    
    print("Saving definition to file 'test.def'")
    defs.save_as_defs("test.def")
    
    print("replace suite /test in server")
    defs.test.replace_on_server()
    import os
    from ecflow import *
    
    home = os.path.join(os.getenv("HOME"), "course")
    def create_family_f1():
       f1 = Family("f1") + [ Edit(SLEEP=20),
               Task("t1"),
               Task("t2") + Trigger(["t1"]) + Event("a") + Event("b"),
               Task("t3") + Trigger("t2:a"),
               Task("t4") + Trigger("t2:b") ]
       return f1
            
    print("Creating suite definition")
    defs = Defs() + Suite("test")
    defs.test += [ Edit(ECF_INCLUDE=home,ECF_HOME=home), create_family_f1() ]
    print(defs)
    
    print("Checking job creation: .ecf -> .job0")
    print(defs.check_job_creation())
    print("Saving definition to file 'test.def'")
    defs.save_as_defs("test.def")
    
    print("replace suite /test in server")
    defs.test.replace_on_server()