ecFlow's documentation is now on readthedocs!

From one simple task which both produces and consumes "steps", to the extreme parallel version with one task per step to process.

Alternatives
beg =  0
fin = 48
by  =  3
    
def not_consumer(): return Variables(CONSUME= "no")

def not_producer(): return Variables(PRODUCE= "no")

def ev():return (Event("p"), Event("c"))

def call_task(name, start, stop, inc):
    meter = None
    if start != stop:
        meter = Meter( "step", -1, int(stop))

    return Task(name).add(
        ev(),
        Variables(BEG= start,
                  FIN= stop,
                  BY= inc),
        meter)

def call_consumer(SELECTION):
    # defstatus suspended # avoid early start
    def consume1(leap = 1, leap_nb = 3):
        return [ Family("%d" % leap).add(
            call_task( "consume", "'%STEP%'", "'%STEP%'", by * leap_nb).add(
                Repeat("STEP", beg + by * (leap - 1), fin, by * leap_nb, kind="integer"),
                # same trigger as consume0
                # Trigger("(%s and (consume:STEP le %s1/produce:STEP)) or " % (lead, prod) + 
                #         "(not %s and (consume:STEP le %s0/produce:step))" % (lead, prod))
                Trigger("(consume:STEP le %s1/produce:STEP)" % prod),
            )) for leap in xrange(1, leap_nb) ]

    def consume2(beg, fin):
        return [ Family("%03d" % idx).add(
            call_task( "consume", idx, idx, by).add(
                Variables(STEP= idx),
                # Same trigger as consume0
                # Trigger("(%s and (consume:STEP le %s1/produce:STEP)) or " % (lead, prod) +
            #         "(not %s and (consume:STEP le %s0/produce:step))" % (lead, prod))))
                Trigger("consume:STEP le %s1/produce:STEP" % prod),
            )) for idx in xrange(beg, fin+1, by )]

    lead = ic.psel() + "/consumer/admin/leader:1"
    prod = ic.psel() + "/consumer/produce"
    if 0: return Family("consumer").add(Defcomplete()) # FIXME
    return Family("consumer").add(
        Defcomplete(),

        Variables(SLEEP= 10,
                  PRODUCE= "no",
                  CONSUME= "no"),

             Family("limit").add(
                Defcomplete(),
                Limit("consume", 7),),

             Family("admin").add(
                # set manually with Xcdp or alter the event 1 so
                # that producer 1 becomes leader
                # default is producer0 leads
                Task("leader").add(
                    Event("1"),
                    Defcomplete())),
             # text this task is dummy task not designed to run

             # default : task does both :
                 Variables(PRODUCE= "yes",
                           CONSUME= "yes"),
             # this task will do both, ie serial

             call_task( "produce", beg, fin, by).add(
            Label("info", "do both at once"),),
             # this will loop inside the task, reporting
             # its processed step as a meter indication

             Family("produce0").add(
                not_consumer(),
                call_task( "produce", beg, fin, by )),
             # here, choice is to push a new task for each step

             Family("produce1").add(
                not_consumer(),
                call_task( "produce", '%STEP%', "%STEP%", by ).add(
                Repeat("STEP", beg, fin, by , kind="integer"), )),
             # PRB edit FIN '$((%STEP% + %BY%))'

             Family("consume").add(
                not_producer(),
                Inlimit("limit:consume"),
                Variables(CALL_WAITER= 1,
                          # $step will be interpreted in the job!
                          TRIGGER= "../produce:step -gt consume:$step or ../produce eq complete"),
                call_task( "consume", beg, fin, by ).add(
                )),
             
             Family("consume0or1").add(
                not_producer(),
                Inlimit("limit:consume"),
                call_task( "consume", "%STEP%", "%STEP%", by, ),
                Repeat( "STEP", beg, fin, by, kind="integer"),
                Trigger("(%s and (consume0or1:STEP le %s1/produce:STEP)) or "
                        % (lead, prod) +
                        "(not %s and (consume0or1:STEP le %s0/produce:step))"
                        % (lead, prod))),

             Family("consume1").add(
                not_producer(),
                Inlimit("limit:consume"),
                consume1()),

             Family("consume2").add(
    # loop is ``exploded''
    # consume limit may be changed manually with Xcdp
    # to reduce or increase the load
                Inlimit("limit:consume"),
                not_producer(),
                consume2(beg, fin)))