import os.path from peak.api import binding, storage, model, PropertyName from peak.running.commands import AbstractCommand DATABASE = 'sqlite:test.db' createDB = ''' create table objects ( object_id integer primary key, name varchar(50), description varchar(200) ); ''' class Object(model.Element): class name(model.Attribute): referencedType = model.String defaultValue = '' class description(model.Attribute): referencedType = model.String defaultValue = '' class TableDM(storage.EntityDM): db = binding.bindTo(DATABASE) table = binding.requireBinding('Name of database table') fieldMap = binding.requireBinding('A list to map field names to property names') keyField = binding.requireBinding('The field name of the primary key for this table') def _load(self, oid, ob): print '=== _load' row = ~self.db('select * from %s where %s=%%s' % (self.table, self.keyField), (oid,)) return self.stateFromRow(row) def stateFromRow(self, row): print '=== stateFromRow' d = {} for (property, field) in self.fieldMap.items(): d[property] = getattr(row, field, None) return d def _items(self, ob): print '=== _items' fields = [] values = [] placeholders = [] for (property, field) in self.fieldMap.items(): fields.append(field) values.append(getattr(ob, property, None)) placeholders.append('%s') return fields, values, placeholders def _save(self, ob): print '=== _save' fields, values, placeholders = self._items(ob) assignments = ','.join(['%s = %%s' % field for field in fields]) sql = 'update %s set %s where %s = %%d' % (self.table, assignments, self.keyField) self.db(sql, values + [ob._p_oid]) def _new(self, ob): print '=== _new' ct, = ~self.db('select max(%s) from %s' % (self.keyField, self.table)) ct = int(ct or 0) + 1 ob._p_oid = ob.id = ct fields, values, placeholders = self._items(ob) sql = 'insert into %s (%s) values (%s)' % ( self.table, ','.join(fields), ','.join(placeholders) ) self.db(sql, values) return ct class ObjectDM(TableDM): defaultClass = Object table = 'objects' keyField = 'object_id' fieldMap = { 'id': 'object_id', 'name': 'name', 'description': 'description' } class App(AbstractCommand): db = binding.Obtain(DATABASE, offerAs = [PropertyName('db')]) objects = binding.Make(ObjectDM) def _run(self): if not os.path.exists('test.db'): storage.beginTransaction(self) self.db(createDB) storage.commitTransaction(self) storage.beginTransaction(self) o = self.objects.newItem() id = self.objects.oidFor(o) print 'Initial value: "%s" (should be blank)' % o.name o.name = 'update1' o.description = 'description' storage.commitTransaction(self) storage.beginTransaction(self) o = self.objects[id] print 'After first update: "%s" (should be "update1")' % o.name o.name = 'update2' storage.commitTransaction(self) storage.beginTransaction(self) o = self.objects[id] print 'After first update: "%s" (should be "update2")' % o.name o.name = 'update3' storage.commitTransaction(self) storage.beginTransaction(self) for r in self.db('select * from objects'): print r storage.commitTransaction(self)