[PEAK] Reactor-driven microthreads

Phillip J. Eby pje at telecommunity.com
Wed Dec 31 00:22:11 EST 2003


It seems like I've been dealing in various kinds of reactive systems a lot 
lately using PEAK.  Although they were easier to write than they would've 
been without PEAK, I think they're not quite "easy enough" yet.

Twisted makes reactivity the center of the world, which to me is the tail 
wagging the dog.  Most application code -- and programmer thought -- is 
based on linear and iterative patterns.  It would be nice to be able to 
leverage one's linear-thinking skills to write reactive code.  So nice, in 
fact, that it would be worth a little runtime overhead to be able to write 
code faster and with fewer defects.

Regular threads are not the answer, though, as preemptive multitasking is 
prone to far subtler bugs.  So, a co-operative multitasking mechanism is 
needed.

Python generator functions provide an ideal basis for such a 
mechanism.  Indeed, Twisted has its 'flow' package to do something similar 
to what I have in mind.  However, even 'flow' has some "twisted" 
assumptions that don't really suit PEAK.  Even in 'flow', the reactive 
paradigm still seems quite central, and generators are used to make the 
existing Twisted framework more usable in certain areas.

But, a lot of the things the Twisted framework offers, like protocols and 
connectors and the like, are primarily necessary because Twisted is 
event-driven in the first place!  If it were based on co-operative 
multitasking, much of that complexity would go away.  Consider the relative 
simplicity of:

     def talkToAServer(self,address):

         sock = self.lookupComponent(address, adaptTo=net.IClientAsyncSocket)
         readline = sock.readline

         try:
             yield sock.untilConnected(timeout=self.connTimeout)
             yield sock.send("HELO world\n")
             yield sock.untilLineRead(timeout=self.lineTimeout)
             data = readline()
             # do something with the data

         except sock.TimeoutError:
             yield mthread.Error()
             sock.close()
             return

         sock.close()

Compared to most client code I've seen for Twisted, this is pretty darn simple.

Anyway, the basic idea is that you write generators that yield the results 
of calling methods that figuratively "wait" for something to happen.  To 
use the above code, something might call:

     mthread.Thread(self.talkToAServer(address)).run()

The Thread wrapper handles iterating over the generator it's passed, and 
manages scheduling.  In essence a Thread calls 
'reactor.callLater(0,thread.iterate)' to reschedule itself if code has 
co-operatively yielded, e.g. via 'yield None'.  If an 'IThreadScheduler' is 
yielded, though, the Thread passes itself to the 'IThreadScheduler' 
instance to request scheduling.  A scheduler can do things like add a 
reader to the reactor, schedule a delayed call to resume the thread, or 
"push" a new generator on top of the Thread's "stack", so that the Thread 
then runs the other generator.  In this way, lines like 'yield 
sock.send("HELO world\n")' may effectively transfer control to another 
generator.  That is, 'sock.send()' could be another generator loop, 
something like:

     def send(self, data, timeout=None):
         sentBytes = self._realsocket.send(data)
         while sentBytes<len(data):
             yield self.untilWritable(timeout)
             sentBytes += self._realsocket.send(data[sentBytes:])

'untilWritable' would return an 'IThreadScheduler' that adds a writer to 
the reactor that re-enables the Thread that's executing the generator.

Under this scheme, the current process supervisor would be able to ditch a 
lot of state driven code with something like:

     processCount = binding.Make(mthread.Value)
     desiredProcesses = binding.Make(mthread.Value)

     def _ensureProcessesRunning(self):
         while True:
             if self.processCount()<self.desiredProcesses():
                 # Start a process as soon as possible
                 self.reactor.callLater(0, self._doStart)
                 # But don't do any more scheduling until start interval passes
                 yield mthread.sleep(self.startInterval)
             else:
                 # We have enough processes, so reset desired = minimum
                 self.desiredProcesses.set(self.minChildren)
                 # And sleep until something relevant changes
                 yield self.processCount.changed() | 
self.desiredProcesses.changed()

And in the process supervisor's startup process, it would do:

     self.processCount.set(0)
     self.desiredProcesses.set(self.minProcesses)
     Thread(self._ensureProcessesRunning()).run()

Finally, in methods that added or removed child processes, we would update 
the process count variable, and in methods that wanted more children 
running, we would bump up 'desiredProcesses' (but no higher than maxChildren).

The simple loop above took much less time to write than the code that does 
this now in the supervisor tool, and it's easier to tell if it's 
correct.  To implement it would require only the Thread and Watchable 
classes, and a Union class (to implement the '|' operator over 
IThreadSchedulers).

My main concern about this concept are that it is likely to do an awful lot 
of object creation and function calls, compared to the event-driven 
approach.  OTOH, for threads like the last one, those objects are created 
only when something relevant happens, so maybe it doesn't really do 
anything more than the event-driven approach would.  And, if operations 
like Unions, Values, and the rest are called a lot, then it might be worth 
porting them to Pyrex.  For objects like sockets and file handles, we could 
avoid creating new objects for each read or write by having "reader" and 
"writer" IThreadScheduler objects that live on the wrapper, as long as we 
were willing to have timeouts be a socket-level configuration rather than a 
per-read or per-write setting.

Similarly, Value objects could have their 'changed' attribute be an 
IThreadScheduler, so you'd yield the 'changed' object.  Hm.  So that loop 
above would become:

     def _ensureProcessesRunning(self):

         processCount = self.processCount
         desiredProcs = self.desiredProcesses
         until_something_changed = processCount.changed | desiredProcs.changed

         while True:
             if processCount() < desiredProcs():
                 # Start a process as soon as possible
                 self.reactor.callLater(0, self._doStart)
                 # But don't do any more scheduling until start interval passes
                 yield mthread.sleep(self.startInterval)
             else:
                 # We have enough processes, so reset desired = minimum
                 self.desiredProcesses.set(self.minChildren)
                 # And sleep until something relevant changes
                 yield until_something_changed

And now the only objects being created are for the callLater() and sleep() 
invocations.

Exceptions are an interesting problem.  A scheduler like 'sleep()' can 
throw an exception in the generator where it's called.  But an error in the 
scheduler itself is akin to an uncaught exception in a "real" thread: it 
may force termination of the thread.  Alas, I don't know of a way to push 
such exceptions back into the generator in the general case.  For things 
like read operations on a socket, you have to call something when execution 
resumes, so of course there you can have an error.

It may be that instead of doing:

     Thread(self._ensureProcessesRunning()).run()

one does something like:

     Thread().run(self._ensureProcessesRunning, *args, **kw)

And the Thread() object passes itself to the generator.  Then the generator 
itself would be able to do things like 'thread.errors()' to force any 
errors to be thrown in the current execution.  Ugh.  It's too bad there's 
no way to "throw" errors "into" a generator, without having to call 
something from inside the generator that holds the exception in order to 
reraise it.  Bleah.

I guess we'll have to do some thinking about how exceptions are handled in 
Threads for this to work, and consider what errors *need* to be seen by 
Thread-generators, versus what errors should terminate a Thread, or be 
simply caught-and-logged by a handler.  This is one of those unfortunate 
coding areas where errors simply *must* be explicitly silenced.  On the 
plus side, this applies almost exclusively to IThreadSchedulers themselves, 
not to threaded generators.

Well, it's getting awfully late, so I think I'll wait for another day to 
sort out all the details of exception handling.  But it sounds like the 
only *really* open issue here is how schedulers' errors can be passed back 
into the generators in a relatively transparent fashion.  Probably, that 
means that we'd end up with statements like:

     yield thread(until_something_changed)

or:

     yield until_something_changed(thread)

as either of these would let the thread object invoke the scheduler (or 
vice versa) while still "in" the generator's execution.  So any errors 
would be thrown in the original context.  Then, only errors occurring when 
the thread is "resumed" would have to be checked for inside the generator.

Hm.  This seems like the first really good use case I've seen for having a 
macro facility in Python, since it would allow us to spell the 
yield+errorcheck combination with less boilerplate.  Ah well.




More information about the PEAK mailing list