[PEAK] subscribable/conditional lists

Phillip J. Eby pje at telecommunity.com
Tue Apr 20 19:49:36 EDT 2004


At 04:22 PM 4/20/04 -0700, John Landahl wrote:
>peak.events could use support for subscribable/conditional lists, which
>could be used as an input queue for a Task-based worker loop.  Something
>like the following:
>
>class Work(binding.Component):
>     queue = binding.Make(ConditionalList)
>
>     def worker(self):
>         while True:
>             yield self.queue; queue = events.resume()
>             while queue:
>                 doWork(queue.pop(0))
>     worker = binding.Make(events.taskFactory(worker), uponAssembly=True)
>
>     def addWork(self, data):
>         self.queue.append(data)
>
>The worker should suspend until there's work for it in the queue, at
>which point it consumes the data in the queue and waits for more work.
>
>I've come up with a basic implementation (attached); does seem like a
>decent approach, or is there a better way to do this?

Dunno.  This is what semaphores are ordinarily used for, though.  That is, 
you bump the semaphore up when there's an item in the queue, and down to 
remove an item.  The advantage of this approach is that it's safe for there 
to be multiple tasks pulling items from the queue.

Of course, to really do queueing safely, you need to ensure that the queue 
can't grow unboundedly, so you actually need to also have a condition that 
indicates when the queue isn't full.  That way, a task writing to the queue 
can block until there's room to add something.

Anyway, the reason I said "dunno" is because I don't know what your 
requirements are.  I would guess, however, that all the extra list stuff 
you implemented probably isn't very useful for most applications, and from 
what I can tell, some of its implementation is broken, too.  I'd probably 
just redo your example as something like:

class Work(binding.Component):

     queue = binding.Make(list)
     itemsQueued = binding.Make(events.Semaphore)

     def worker(self):
         while True:
             yield self.itemsQueued; events.resume()
             item = self.queue.pop(0)
             self.itemsQueued.take()   # mine! nobody else can have it!
             doWork(item)

     worker = binding.Make(events.taskFactory(worker), 
uponAssembly=True)

     def addWork(self,data):
         self.queue.append(data)   # put it in the queue, then
         self.itemsQueued.put()    # let waiting tasks know it's there

Which is just as simple as your version, but more explicit about where task 
switching can or can't take place.




More information about the PEAK mailing list