[PEAK] trellis activity does not save/restore Contextual context between task switches

Andrew Svetlov andrew.svetlov at gmail.com
Sat Mar 21 17:35:41 EDT 2009


As I fugured out trellis activity tasks does not switching context
inside task working loop. It leads to situation what if my tasks uses
something like context.new() in own code I cannot run two of them
together.

I made patch for TaskCell._stepper to support this feature. Main
change is: STACK now contains not only current generators for tasks
but also last context.State for every generator and updates states
when need.
--------------------------------------------------

from peak.events import trellis
from peak.events.activity import *
import sys
from peak import context


def _stepper(self, func):
    VALUE = self._result = []
    ERROR = self._error  = []
    STACK = [[func(), context.State.child()]]
    CALL = STACK.append
    RETURN = STACK.pop
    ctrl = trellis.ctrl
    def _step():
        ctx = STACK[-1][1]
        outer = ctx.swap()
        while STACK:
            try:
                it = STACK[-1][0]
                if VALUE and hasattr(it, 'send'):
                    rv = it.send(VALUE[0])
                elif ERROR and hasattr(it, 'throw'):
                    rv = it.throw(*ERROR.pop())
                else:
                    rv = it.next()
            except:
                del VALUE[:]
                ERROR.append(sys.exc_info())
                if ERROR[-1][0] is StopIteration:
                    ERROR.pop() # not really an error
                RETURN()
            else:
                del VALUE[:]
                if rv is Pause:
                    STACK[-1][1] = context.State.get()
                    break
                elif hasattr(rv, 'next'):
                    CALL([rv, context.State.get()]); continue
                elif isinstance(rv, Return):
                    rv = rv.value
                VALUE.append(rv)
                if len(STACK)==1: break
                RETURN()
        if STACK and not ERROR and not ctrl.reads:
            ctrl.current_listener.dirty() # re-run if still running
        outer.swap()
        return resume()

    return _step

def apply():
    TaskCell._stepper = _stepper

----------------------------------
Unittests covers this situation. Maybe I don't check all possible
cases but my tests reflects my current understanding of problem.

---------------------------------
from __future__ import with_statement
'''
Created on Mar 20, 2009

@author: asvetlov
'''
from peak.events import trellis, activity
from peak import context
import unittest
from fds_main.utilities.trellis import monkeypatch #monkeypatch required
monkeypatch.apply()

@context.setting
def val(value=1):
    return int(value)


class Workflow(trellis.Component):
    done1 = trellis.attr(False)
    done2 = trellis.attr(False)
    done3 = trellis.attr(False)

    ret1 = trellis.make(list)
    ret2 = trellis.make(list)
    ret3 = trellis.make(list)

    @activity.task
    def do1(self):
        global val
        self.ret1.append(val())
        try:
            with context.new():
                val <<= 2
                self.ret1.append(val())
                yield activity.Pause
                self.ret1.append(val())
        finally:
            self.done1 = True

    @activity.task
    def do2(self):
        global val
        self.ret2.append(val())
        try:
            with context.new():
                val <<= 3
                self.ret2.append(val())
                yield activity.Pause
                self.ret2.append(val())
        finally:
            self.done2 = True

    def nested(self):
        global val
        self.ret3.append(val())
        yield activity.Pause
        self.ret3.append(val())
        with context.new():
             val <<= 5
             yield activity.Pause
             self.ret3.append(val())
        yield activity.Pause
        self.ret3.append(val())
        yield 100

    @activity.task
    def do3(self):
        global val
        self.ret3.append(val())
        try:
            with context.new():
                val <<= 4
                self.ret3.append(val())
                yield activity.Pause
                self.ret3.append(val())
                a = yield self.nested()
                assert a==100
                self.ret3.append(val())
                yield activity.Pause
                self.ret3.append(val())
            yield activity.Pause
            self.ret3.append(val())
        finally:
            self.done3 = True

class TestParallelTasks(unittest.TestCase):
    def test_default_context(self):
        self.assertEquals(1, val())

    def test_parallel(self):
        w = Workflow()
        while not w.done1 or not w.done2:
            activity.EventLoop.flush()

        self.assertEquals([1, 2, 2], w.ret1)
        self.assertEquals([1, 3, 3], w.ret2)


    def test_capture_nondefault_context_and_restores_after_end_task(self):
        global val
        with context.new():
            val <<= 10

            self.assertEquals(10, val())
            w = Workflow()
            while not w.done1 or not w.done2:
                activity.EventLoop.flush()

        self.assertEquals([10, 2, 2], w.ret1)
        self.assertEquals([10, 3, 3], w.ret2)
        self.assertEquals(1, val())

    def test_check_switch_context_in_subtasks(self):
        global val
        with context.new():
            val <<= 10

            self.assertEquals(10, val())
            w = Workflow()
            while not w.done1 or not w.done2 or not w.done3:
                self.assertEquals(10, val())
                activity.EventLoop.flush()

        self.assertEquals([10, 2, 2], w.ret1)
        self.assertEquals([10, 3, 3], w.ret2)
        self.assertEquals([10, 4, 4, 4, 4, 5, 4, 4, 4, 10], w.ret3)


if __name__ == '__main__':
    unittest.main()
-------------------------------------
Please review and if this patch is clean - put it into current trellis trunk.
If not - please point where I'm wrong.


More information about the PEAK mailing list