[PEAK] Anybody using reactors?

Phillip J. Eby pje at telecommunity.com
Wed Jan 14 15:47:55 EST 2004


At 10:13 PM 1/14/04 +0200, alexander smishlajev wrote:
>Phillip J. Eby wrote, at 14.01.2004 20:42:
>
>>>>would work, then?
>>>
>>>it seems so, at least at the first glance.
>>Good to know; I won't worry about such cancellations in peak.events, 
>>then, unless somebody else presents a different use case for 
>>cancelling/rescheduling.
>
>i found one slightly different case, although it is implemented without 
>removeLater (we didn't have such beast at that time).  when a running task 
>is suspended by another task, it shedules an operator warning to be iisued 
>after some time; when the task is resumed, the warning is disabled 
>(warning procedure is still run, but does not produce the message).
>
>it seems to me that this case also may be handled with
>
>   yeld events.AnyOf(appEvents.resume, scheduler.timeout())
>
>>So far, it seems to me the strength of peak.events is in managing events 
>>that happen in sequence, which is *very* hard to do clearly in a 
>>reactor-driven style.
>
>i wouldn't say it was that hard.  here's reactor callback of one of the 
>most complicated components:

I said it was hard to do *clearly*, and I've also found it hard to do 
correctly as well.  For example, peak.tools.supervisor has many state 
management bits that are not at all clear, and had many non-obvious bugs in 
the first versions.  By contrast, between yesterday and today I did two 
different versions of an "all processes are busy" reporting thread that I 
expect would've taken all week to get correct in the reactor-driven style.


>(you may wonder why it is done by getWork/doWork, and not by doRead. 
>that's because pipes between application components are not select()able 
>on windows platform.)

If they are pipes used solely *within* the process, you may find it more 
useful to use an events.Distributor and call 'send()' on it, like this:

def producer(scheduler,pipe):
     i = 0
     while True:
         pipe.send(i)
         i+=1
         yield scheduler.sleep(); events.resume()

def consumer(pipe):
     while True:
         yield pipe; data = events.resume()
         print data

pipe = events.Distributor()
c = events.Thread(consumer(pipe))
p = events.Thread(producer(scheduler,pipe))

Notice that we start the consumer before the producer, so that no data will 
be lost.  The only downside to this mechanism is that there is no buffering 
and if nobody is listening when a send() occurs, the data is dropped on the 
floor.  Of course, one could always implement those features atop the 
current microkernel, I just backed off from putting a Queue type in the 
microkernel itself.  It raised too many policy questions about what it was 
I really wanted in a Queue, garbage collection issues, etc., and was 
distracting from getting the more fundamental features in place.





More information about the PEAK mailing list