from UserList import UserList from peak.api import events class ObservableList(UserList, events.Observable): singleFire = False def __init__(self, *args): UserList.__init__(self, *args) events.Observable.__init__(self) def __add__(self, other): UserList.__add__(self, other) self._fire(self) def __setitem__(self, i, item): UserList.__setitem__(self, i, item) self._fire(self) def __setslice__(self, i, j, other): UserList.__setslice__(self, i, j, other) self._fire(self) def __delitem__(self, i): UserList.__delitem__(self, i) self._fire(self) def __call__(self): return self.data def append(self, item): UserList.append(self, item) self._fire(self) def extend(self, other): UserList.extend(self, other) self._fire(self) def insert(self, i, item): UserList.insert(self, i, item) self._fire(self) def pop(self, i=-1): UserList.pop(self, i) self._fire(self) def remove(self, item): UserList.remove(self, item) self._fire(self) class ConditionalList(events.AbstractConditional): __slots__ = ('list') singleFire = False def __init__(self, list=None): if list is not None: self.list = list else: self.list = ObservableList() super(ConditionalList,self).__init__() events.subscribe(self.list, self._listFired) self.cmpval = self.__class__, self.list def __call__(self): return self.list() def derive(self,func): return self.list.derive(func) def _listFired(self, src, evt): if evt: self._fire(evt) def __add__(self, other): self.list.add(other) def __setitem__(self, i, item): self.list.__setitem__(i, item) def __setslice__(self, i, j, other): self.list.__setslice__(i, j, other) def __delitem__(self, i): self.list.__delitem__(i) def append(self, item): self.list.append(item) def extend(self, other): self.list.extend(other) def insert(self, i, item): self.list.insert(i, item) def pop(self, i=-1): self.list.pop(i) def remove(self, item): self.list.remove(item) def test(): from peak.api import config, binding, running, commands import time class Test(binding.Component): scheduler = binding.Obtain(events.IScheduler) q = binding.Make(ConditionalList) def reader(self): while True: print 'reader: waiting for queue' yield self.q; q = events.resume() print 'reader: queue changed: ', q while q: q.pop(0) reader = binding.Make(events.taskFactory(reader), uponAssembly=True) def writer(self): while True: print 'writer: writing to queue' self.q.append(time.time()) yield self.scheduler.sleep(3); events.resume() writer = binding.Make(events.taskFactory(writer), uponAssembly=True) root = config.makeRoot() ed = commands.EventDriven(root) ed.components = [Test(ed)] ed.run() if __name__ == '__main__': test()