One possible solution:
import os
import ecflow
defs = ecflow.Defs()
suite = defs.add_suite("operation_suite")
suite.add_repeat( ecflow.RepeatDay(1) )
suite.add_variable("ECF_HOME", os.getenv("HOME") + "/course")
suite.add_variable("ECF_INCLUDE", os.getenv("HOME") + "/course")
suite.add_variable("ECF_FILES", os.getenv("HOME") + "/course/oper")
# Defines the triggers for the first cycle
cycle_triggers = "1"
for cycle in ( "00" , "12" ):
if cycle == "12" :
last_step = 240
else:
last_step = 24
fcycle_fam = suite.add_family(cycle)
fcycle_fam.add_variable("CYCLE", cycle)
fcycle_fam.add_variable("LAST_STEP", last_step)
if cycle_triggers != "1" :
fcycle_fam.add_trigger(cycle_triggers)
analysis_fam = fcycle_fam.add_family("analysis")
analysis_fam.add_task("get_observations")
analysis_fam.add_task("run_analysis").add_trigger("get_observations == complete")
analysis_fam.add_task("post_processing").add_trigger("run_analysis == complete")
forecast_fam = fcycle_fam.add_family("forecast")
forecast_fam.add_trigger("analysis == complete")
forecast_fam.add_task("get_input_data")
run_forecast_task = forecast_fam.add_task("run_forecast")
run_forecast_task.add_trigger("get_input_data == complete")
run_forecast_task.add_meter("step", 0, last_step, last_step)
archive_fam = fcycle_fam.add_family("archive")
fam_analsis = archive_fam.add_family("analysis")
fam_analsis.add_variable("TYPE","analysis")
fam_analsis.add_variable("STEP","0")
fam_analsis.add_trigger("../analysis/run_analysis == complete")
fam_analsis.add_task("save")
for i in range(6, last_step+1, 6):
step_fam = fam_analsis.add_family("step_" + str(i))
step_fam.add_variable("TYPE", "forecast")
step_fam.add_variable("STEP", i)
step_fam.add_trigger("../../forecast/run_forecast:step ge " + str(i))
step_fam.add_task("save")
# Defines the triggers for the next cycle
cycle_triggers = "./" + str(cycle) + " == complete"
1 Comment
Unknown User (ma0)
Alternative style
import os from ecflow import * home = os.getenv("HOME") + "/course" cycle_triggers = None last_step = { "12": 240, "00": 24, } def cycle_trigger(cycle): if cycle == "12": return Trigger("./00 == complete") return None defs = Defs().add( Suite("operation_suite").add( RepeatDay(1), Edit(ECF_HOME= home), Edit(ECF_INCLUDE= home), Edit(ECF_FILES= home + "/oper"), [ Family(cycle).add( Edit(CYCLE=cycle), Edit(LAST_STEP=last_step[cycle]), cycle_trigger(cycle), Family("analysis").add( Task("get_observations"), Task("run_analysis").add(Trigger(["get_observations"])), Task("post_processing").add(Trigger(["run_analysis"])), ), Family("forecast").add( Trigger(["analysis"]), Task("get_input_data"), Task("run_forecast").add( Trigger(["get_input_data"]), Meter("step", 0, last_step[cycle])), ), Family("archive").add( Family("analysis").add( Edit(TYPE="analysis"), Edit(STEP=0), Trigger("../analysis/run_analysis == complete"), Task("save"), [ Family("step_{}".format(i)).add( Edit(TYPE="forecast"), Edit(STEP=i), Trigger("../../forecast/run_forecast:step ge {}".format(i)), Task("save")) for i in range(6, last_step[cycle]+1, 6) ] ) ) ) for cycle in ( "00" , "12" ) ] )) print defs