- Created by Axel Bonet, last modified on Apr 20, 2020
From one simple task which both produces and consumes "steps", to the extreme parallel version with one task per step to process.

Alternatives Expand source
beg = 0
fin = 48
by = 3
def not_consumer(): return Variables(CONSUME= "no")
def not_producer(): return Variables(PRODUCE= "no")
def ev():return (Event("p"), Event("c"))
def call_task(name, start, stop, inc):
meter = None
if start != stop:
meter = Meter( "step", -1, int(stop))
return Task(name).add(
ev(),
Variables(BEG= start,
FIN= stop,
BY= inc),
meter)
def call_consumer(SELECTION):
# defstatus suspended # avoid early start
def consume1(leap = 1, leap_nb = 3):
return [ Family("%d" % leap).add(
call_task( "consume", "'%STEP%'", "'%STEP%'", by * leap_nb).add(
Repeat("STEP", beg + by * (leap - 1), fin, by * leap_nb, kind="integer"),
# same trigger as consume0
# Trigger("(%s and (consume:STEP le %s1/produce:STEP)) or " % (lead, prod) +
# "(not %s and (consume:STEP le %s0/produce:step))" % (lead, prod))
Trigger("(consume:STEP le %s1/produce:STEP)" % prod),
)) for leap in xrange(1, leap_nb) ]
def consume2(beg, fin):
return [ Family("%03d" % idx).add(
call_task( "consume", idx, idx, by).add(
Variables(STEP= idx),
# Same trigger as consume0
# Trigger("(%s and (consume:STEP le %s1/produce:STEP)) or " % (lead, prod) +
# "(not %s and (consume:STEP le %s0/produce:step))" % (lead, prod))))
Trigger("consume:STEP le %s1/produce:STEP" % prod),
)) for idx in xrange(beg, fin+1, by )]
lead = ic.psel() + "/consumer/admin/leader:1"
prod = ic.psel() + "/consumer/produce"
if 0: return Family("consumer").add(Defcomplete()) # FIXME
return Family("consumer").add(
Defcomplete(),
Variables(SLEEP= 10,
PRODUCE= "no",
CONSUME= "no"),
Family("limit").add(
Defcomplete(),
Limit("consume", 7),),
Family("admin").add(
# set manually with Xcdp or alter the event 1 so
# that producer 1 becomes leader
# default is producer0 leads
Task("leader").add(
Event("1"),
Defcomplete())),
# text this task is dummy task not designed to run
# default : task does both :
Variables(PRODUCE= "yes",
CONSUME= "yes"),
# this task will do both, ie serial
call_task( "produce", beg, fin, by).add(
Label("info", "do both at once"),),
# this will loop inside the task, reporting
# its processed step as a meter indication
Family("produce0").add(
not_consumer(),
call_task( "produce", beg, fin, by )),
# here, choice is to push a new task for each step
Family("produce1").add(
not_consumer(),
call_task( "produce", '%STEP%', "%STEP%", by ).add(
Repeat("STEP", beg, fin, by , kind="integer"), )),
# PRB edit FIN '$((%STEP% + %BY%))'
Family("consume").add(
not_producer(),
Inlimit("limit:consume"),
Variables(CALL_WAITER= 1,
# $step will be interpreted in the job!
TRIGGER= "../produce:step -gt consume:$step or ../produce eq complete"),
call_task( "consume", beg, fin, by ).add(
)),
Family("consume0or1").add(
not_producer(),
Inlimit("limit:consume"),
call_task( "consume", "%STEP%", "%STEP%", by, ),
Repeat( "STEP", beg, fin, by, kind="integer"),
Trigger("(%s and (consume0or1:STEP le %s1/produce:STEP)) or "
% (lead, prod) +
"(not %s and (consume0or1:STEP le %s0/produce:step))"
% (lead, prod))),
Family("consume1").add(
not_producer(),
Inlimit("limit:consume"),
consume1()),
Family("consume2").add(
# loop is ``exploded''
# consume limit may be changed manually with Xcdp
# to reduce or increase the load
Inlimit("limit:consume"),
not_producer(),
consume2(beg, fin)))