[PEAK] Reactor-driven microthreads

Phillip J. Eby pje at telecommunity.com
Wed Dec 31 09:38:05 EST 2003


At Wed, 31 Dec 2003 00:22:11 -0500, I wrote:
>Well, it's getting awfully late, so I think I'll wait for another day to 
>sort out all the details of exception handling.  But it sounds like the 
>only *really* open issue here is how schedulers' errors can be passed back 
>into the generators in a relatively transparent fashion.  Probably, that 
>means that we'd end up with statements like:
>
>     yield thread(until_something_changed)
>
>or:
>
>     yield until_something_changed(thread)
>
>as either of these would let the thread object invoke the scheduler (or 
>vice versa) while still "in" the generator's execution.  So any errors 
>would be thrown in the original context.  Then, only errors occurring when 
>the thread is "resumed" would have to be checked for inside the generator.

Eureka!!!  As I was going to sleep, I figured out how to pass errors *into* 
a generator.  See below:

#=====
from __future__ import generators
import sys

no_exc = None, None, None

import traceback

def resume():
     if sys.exc_info()<>no_exc:
         try:
             t,v,tb = sys.exc_info()
             raise t,v,tb
         finally:
             t=v=tb=None


class Thread(object):

     def __init__(self):
         self.stack = []
         self.call = self.stack.append

     def run(self,iterator):
         self.call(iterator)
         self.step()

     def step(self):
         stack = self.stack
         while stack:
             try:
                 for scheduler in stack[-1]:
                     if scheduler is not None:
                         self.call(scheduler)
                         break
                     else:
                         # not scheduled, stop
                         return
                 else:
                     stack.pop()
             except:
                 stack.pop()
                 if not stack:
                     raise

     def finish(self):
         while self.stack:
             self.step()

def gen1():
     try:
         print "calling gen2"
         yield gen2(); resume()
     except:
         print "caught",sys.exc_info()
         traceback.print_exception(*sys.exc_info())
     try:
         print "calling gen2"
         yield gen2(); resume()
     except:
         print "caught",sys.exc_info()
         traceback.print_exception(*sys.exc_info())
     try:
         print "calling gen4"
         yield gen4(); resume()
     except:
         print "caught",sys.exc_info()
         traceback.print_exception(*sys.exc_info())
     print "back from gen4"
     try:
         print "calling gen4"
         yield gen4(); resume()
     except:
         print "caught",sys.exc_info()
         traceback.print_exception(*sys.exc_info())
     print "back from gen4"

def gen2():
     yield gen3()

def gen3():
     raise "foo"
     yield None; resume()

def gen4():
     yield None; resume()

print; print
t = Thread()
t.run(gen1())

# Simulate being re-run by a scheduler until done...
t.finish()

#=====

As you can see, by simply calling 'resume()' after each 'yield', errors 
thrown outside the generator can be passed back "in" to it.  The tracebacks 
shown for these errors looks something like:

Traceback (most recent call last):
   File "C:\cygwin\home\pje\PEAK\src\threadexc.py", line 64, in gen1
     yield gen2(); resume()
   File "C:\cygwin\home\pje\PEAK\src\threadexc.py", line 36, in step
     for scheduler in stack[-1]:
   File "C:\cygwin\home\pje\PEAK\src\threadexc.py", line 87, in gen3
     raise "foo"
foo

That is, it looks from the traceback as though the generator where the 
error occurred, was called directly from the generator that caught the 
error.  Unfortunately, this leaves out how you got from one to the 
other.  On the bright side, it makes the traceback shorter and easier to 
read.  :)  And, it's really just amazing that you can do this at all.

The cool thing is that sys.exc_info() is thread-specific, making this 
behave correctly.  And, there's only a few lines that need to be changed to 
enable the full range of scheduling options from my last e-mail, something 
like:

                 for scheduler in stack[-1]:
                     if scheduler is not None:
                         if not 
adapt(scheduler,IThreadScheduler).schedule(self):
                             # Scheduler isn't calling us back, so keep going
                             break
                     # done this iteration
                     return

Then, the implementation of IThreadScheduler.schedule(thread) for a 
generator will be 'thread.call(theGenerator); return False'.  For 'sleep', 
it'd be something like 'reactor.callLater(seconds,thread.step); return 
True'.  Etc.

Microthreads, here we come!




More information about the PEAK mailing list