import sys import _edbm from _edbm import VIRTUAL, MULTIPLE, REFERENCED, REFERENCEDA, LOOKUP, TEXTUAL from _edbm import DTYPE_LARGESTRING from _edbm import CURSOR_RETURNS_SET, CURSOR_RETURNS_DBITEMS from _edbm import CURSOR_RETURNS_COUNT import re version = "1.3 [lib %s]" % _edbm.version config = None def setroot(path, name=""): import ConfigParser global config config = ConfigParser.ConfigParser({'app_root': path}) if name: config.read(['%s/defaults.config' % path, '%s/%s.config' % (path, name)]) else: config.read(['%s/defaults.config' % path, '%s/edbm.config' % path]) try: pypath = config.get('python', 'path') except: pass else: paths = pypath.split(':') for path in paths: if path not in sys.path: sys.path.append(path) def access(ip, path): return _edbm.access(ip, path) filterparser = re.compile(r"([^\[]*)\[(.+?)\]", re.DOTALL).findall class DbItem: def __init__(self, factory): self.factory = factory def __getattr__(self, name): value = self.factory.build(self, name) setattr(self, name, value) return value def __nonzero__(self): return 1 def __getitem__(self, name): return getattr(self, name) def count(self, name): return self.factory.count(self, name) def getschema(self): return self.factory.schema def select(self, name, filter): return self.factory.select(self, name, filter) def __str__(self): return self.__repr__() def __repr__(self): names = self.factory.schema['fields'] text = [] for name in names: info = self.factory.schema['attrinfo'][name] text.append("%s %s" % (name, str(getattr(self, name)))) return "\n".join(text) class ResultSet: def __init__(self, factory, result): self.factory = factory self.result = result self.encoder = None self.subresults = {} def __getattr__(self, name): return getattr(self.result, name) def __len__(self): return len(self.result) def __nonzero__(self): return self.total <> 0 def __getitem__(self, index): data = self.result[index] if self.encoder: data = self.encoder(data) if self.factory: return self.factory(data) return data def __getslice__(self, low, high): rs = self.result[low:high] ret = ResultSet(self.factory, rs) ret.setencoder(self.encoder) return ret def __and__(self, other): return ResultSet(self.factory, self.result.inter(other.result)) def __or__(self, other): return ResultSet(self.factory, self.result.union(other.result)) # XXX this is not the usual xor semantics def __xor__(self, other): return ResultSet(self.factory, self.result.diff(other.result)) def __contains__(self, item): return item in self.result def ids(self): return self.result.ids() def setencoder(self, encoder): self.encoder = encoder def set_subresult(self, name, subr): if self.subresults.has_key(name): self.subresults[name].append(subr) else: self.subresults[name] = [subr] def findowners(self, other, attr): result = self.result.findowners(other.result, attr) return ResultSet(self.factory, result) def find(self, q): db = self.result.db r = self.result & db.findall(q) return ResultSet(self.factory, r) class Factory: def __init__(self, db): self.schema = db.schema self.dbname = db.name def __call__(self, data): # called upon item initialization # this factory expects the lower level to return lists item = DbItem(self) i = 0 for name in self.schema['fields']: value = data[i] if value is None: i += 1 continue else: setattr(item, name, value) i += 1 # by convention "system fields" such as the item's # internal id or blob reference follow the actual data # there may currently be only one blob per item item._id = data[i] try: value = data[i + 1] except IndexError: pass else: item._blob_ref = value return item def count(self, item, name): info = self.schema['attrinfo'][name] (localkey, foreignkey, foreignmodel, multiple) = (info.local, info.foreign, info.dbname, info.flags & MULTIPLE) value = getattr(item, localkey) if not value: return 0 foreigndb = getdb(foreignmodel) return foreigndb.count_key(foreignkey, value) def build(self, item, name): info = self.schema['attrinfo'][name] if info.flags & LOOKUP: (localkey, tbname) = (info.local, info.tbname) value = getattr(item, localkey) if not value: return None table = gettb(tbname) if type(value) is type([]): ret = [] for val in value: try: result = table[val] except: ret.append(None) else: ret.append(result) else: try: result = table[value] except: ret = None else: ret = result return ret if info.flags & REFERENCEDA: # XXX multiple not implemented source = info.source stack = source.split(".") stack.reverse() obj = item while stack: obj = getattr(obj, stack.pop()) return obj if info.flags & VIRTUAL: value = None maker_name = 'make_' + name if hasattr(self, maker_name): maker = getattr(self, maker_name) value = maker(item, name, value) return value ## if info.flags & OWNED: ## dbname = info.dbname ## db = getdb(dbname) ## i = getattr(item, info.local) ## if i: ## return db[i - 1] ## return None if not info.flags & REFERENCED: if info.dtype == DTYPE_LARGESTRING: x = item._blob_ref if x: (offset, leng) = x return getdb(self.dbname).getblob(offset, leng) return "" # must be a referenced item--should maybe check (localkey, foreignkey, foreignmodel, multiple) = (info.local, info.foreign, info.dbname, info.flags & MULTIPLE) value = getattr(item, localkey) if not value: return None foreigndb = getdb(foreignmodel) if type(value) is type([]): ret = [] for val in value: if not multiple: result = foreigndb.find_by_key(foreignkey, val) if result.total: ret.append(result[0]) else: ret.append(None) else: result = foreigndb.find_by_key(foreignkey, val) ret.append(result) else: if not multiple: result = foreigndb.find_by_key(foreignkey, value) if result.total: ret = result[0] else: ret = None else: result = foreigndb.find_by_key(foreignkey, value) ret = result return ret def select(self, item, name, filter): info = self.attrinfo[name] if not info.flags & REFERENCED: raise TypeError, "attribute is not a referenced item" (localkey, foreignkey, foreigndb, multiple) = (info.local, info.foreign, info.dbname, info.flags & MULTIPLE) value = getattr(item, localkey) if not value: return None db = getdb(foreigndb) result = db.find("%s = %s & %s" % (foreignkey, value, filter), 1, 100) return result def nullfactory(item): return item def schema(path): return _edbm.schema(path) class DataBase: def __init__(self, db, dbname): self._db = db self.name = dbname self.encoder = None self._factory = None def setfactory(self, factory): self._factory = factory def setencoder(self, encoder): self.encoder = encoder def __len__(self): return len(self._db) def __getattr__(self, name): return getattr(self._db, name) def __getitem__(self, index): item = self._db[index] item = self.encoder and self.encoder(item) or item return self._factory and self._factory(item) or item def get_by_pkey(self, key): item = self._db.get_by_pkey(key) item = self.encoder and self.encoder(item) or item return self._factory and self._factory(item) or item def get_by_key(self, keyname, keyvalue): item = self._db.get_by_key(keyname, keyvalue) item = self.encoder and self.encoder(item) or item return self._factory and self._factory(item) or item get = get_by_pkey def find_by_key(self, keyname, keyvalue): r = self._db.find_by_key(keyname, keyvalue) ret = ResultSet(self._factory, r) if self.encoder: ret.setencoder(self.encoder) return ret def count_key(self, keyname, keyvalue): return self._db.count_key(keyname, keyvalue) def find(self, query, first, last, flag=0): r = self._db.find(query, first, last, flag) ret = ResultSet(self._factory, r) if self.encoder: ret.setencoder(self.encoder) return ret def findall2(self, query, flag=0): # partial implementation of the extended find syntax # waiting for a C-level implementation # XXX there is NO error checking original_query = query filters = filterparser(query) if filters: query = filters[0][0].strip() subqueries = [x[1].split(":", 1) for x in filters] else: subqueries = [] subresults = [] r = None for attr, subquery in subqueries: info = self.schema['attrinfo'][attr] localinfo = self.schema['attrinfo'][info.local] if localinfo.flags & VIRTUAL: raise ValueError, "Cannot join on a virtual attr" foreigndbname = info.dbname foreigndb = getdb(foreigndbname) foreigninfo = foreigndb.schema['attrinfo'][info.foreign] if foreigninfo.flags & MULTIPLE: raise ValueError, "Cannot join on a multiple key" if foreigninfo.flags & VIRTUAL: raise ValueError, "Cannot join on a virtual attr" r1 = foreigndb._db.findall(subquery) if r is not None: if r.total: r = r.findowners(r1, attr) else: break else: r = self._db.findowners(query, r1, attr, flag) if not r.total: break if r1.total: subresult = ResultSet(foreigndb._factory, r1) subresult.query = subquery subresults.append((attr, subresult)) if r is None: r = self._db.findall(query, flag) ret = ResultSet(self._factory, r) if self.encoder: ret.setencoder(self.encoder) for (attr, subresult) in subresults: ret.set_subresult(attr, subresult) ret.query = original_query return ret def findall(self, query, flag=0): r = self._db.findall(query, flag) ret = ResultSet(self._factory, r) if self.encoder: ret.setencoder(self.encoder) return ret def find_in_set(self, query, set): r = self._db.find_in_set(query, set.result) ret = ResultSet(self._factory, r) if self.encoder: ret.setencoder(self.encoder) return ret def cursor(self, name, option=CURSOR_RETURNS_COUNT): c = self._db.cursor(name) return Cursor(c, self._factory, option) def close(self): self._db.close() def count(self, query): return self._db.count(query) # obsolete def join(self, set1, set2, attr): if set1 is not None: r = self._db.join(set1.result, set2.result, attr) else: r = self._db.join(None, set2.result, attr) ret = ResultSet(self._factory, r) if self.encoder: ret.setencoder(self.encoder) return ret def getblob(self, offset, leng): str = self._db.get_blob(offset, leng) return self.encoder and self.encoder.encodestring(str) or str def set_blob_retrieve(self): self._db.set_blob_retrieve() def returns_list(self): self._db.returns_list() def returns_dict(self): self._db.returns_dict() def returns_inst(self, klass): self._db.returns_inst(klass) def result(self, ids): res = self._db.result(ids) return ResultSet(self._factory, res) # XXX CURSOR_RETURNS_DBITEMS is intended to be used only for unique # indexes -- but this is not enforced by the code # XXX add encoder class Cursor: def __init__(self, c, factory, option): self.c = c self.factory = factory self.set_option(option) def set_option(self, option): if option == CURSOR_RETURNS_DBITEMS: self.c.set_option(CURSOR_RETURNS_SET) else: self.c.set_option(option) self.option = option def first(self): (key, value) = self.c.first() if self.option == CURSOR_RETURNS_SET: rs = ResultSet(self.factory, value) return (key, rs) elif self.option == CURSOR_RETURNS_DBITEMS: item = value[0] return (key, self.factory and self.factory(item) or item) return (key, value) def last(self): (key, value) = self.c.last() if self.option == CURSOR_RETURNS_SET: rs = ResultSet(self.factory, value) return (key, rs) elif self.option == CURSOR_RETURNS_DBITEMS: item = value[0] return (key, self.factory and self.factory(item) or item) return (key, value) def next(self): (key, value) = self.c.next() if self.option == CURSOR_RETURNS_SET: rs = ResultSet(self.factory, value) return (key, rs) elif self.option == CURSOR_RETURNS_DBITEMS: item = value[0] return (key, self.factory and self.factory(item) or item) return (key, value) def previous(self): (key, value) = self.c.previous() if self.option == CURSOR_RETURNS_SET: rs = ResultSet(self.factory, value) return (key, rs) elif self.option == CURSOR_RETURNS_DBITEMS: item = value[0] if self.factory: return (key, self.factory(item)) return (key, item) return (key, value) def has_key(self, key): return self.c.has_key(key) def set_location(self, key): (key, value) = self.c.set_location(key) if self.option == CURSOR_RETURNS_SET: rs = ResultSet(self.factory, value) return (key, rs) elif self.option == CURSOR_RETURNS_DBITEMS: item = value[0] return (key, self.factory and self.factory(item) or item) return (key, value) def look(self, key, limit=20): pass # to do class Table: def __init__(self, tb): self._tb = tb self.encoder = None def setencoder(self, encoder): self.encoder = encoder def __getitem__(self, key): item = self._tb.get(key) return self.encoder and self.encoder(item) or item def close(self): self._tb.close() class ItemTable: def __init__(self, tb): self._tb = tb self.encoder = None def setfactory(self, factory): self._factory = factory def setencoder(self, encoder): self.encoder = encoder def __getitem__(self, key): item = self._tb.get(key) item = self.encoder and self.encoder(item) or item return self._factory and self._factory(item) or item extencoding = "" # XXX we currently only support tex2iso, tex2utf8 def set_external_encoding(enc="iso-8859-1"): global extencoding extencoding = enc def getencoder(schema, encoding="iso-8859-1"): import encoders if encoding == "iso-8859-1": enc = encoders.Tex2ISOEncoder(schema) elif encoding == "utf-8": enc = encoders.Tex2UTF8Encoder(schema) return enc or None databases = {} queues = {} tables = {} def registertable(tbname, tb): tables[tbname] = tb def registerdb(dbname, db): databases[dbname] = db def registerqueue(name, db): queues[name] = db def opentb(tbname): tb = tables.get(tbname, None) if tb is None: path = config.get(tbname, 'dbpath') try: schema = config.get(tbname, 'schema') except: import btree _tb = btree.open(path) tb = Table(_tb) else: _tb = _edbm.tbopen(path, schema) tb = ItemTable(_tb) try: factory = config.get(tbname, 'factory') except: factory = tbname try: module = __import__(factory) f = module.get_factory(tb, tbname) except ImportError: f = Factory(tb) tb.setfactory(f) if extencoding: enc = getencoder(tb.schema, extencoding) if enc: tb.setencoder(enc) registertable(tbname, tb) return tb def gettb(tbname): tb = tables.get(tbname, None) if tb is None: tb = opentb(tbname) return tb def closetb(tbname): tb = tables[tbname] tb.close() def opendb(dbname, use_default_factory=1): db = databases.get(dbname, None) if db is None: schema = config.get(dbname, 'schema') files = config.get(dbname, 'dbpath').split(":") if len(files) > 1: _db = _edbm.db(files, schema) else: _db = _edbm.db(files[0], schema) try: modname = config.get(dbname, 'dbmodule') except: db = DataBase(_db, dbname) registerdb(dbname, db) if use_default_factory: try: factory = config.get(dbname, 'factory') except: factory = 'default' if factory == 'default': f = Factory(db) else: try: module = __import__(factory) f = module.get_factory(db, dbname) except ImportError: f = Factory(db) db.setfactory(f) else: module = __import__(modname) db = module.get_database(_db, dbname) registerdb(dbname, db) if extencoding: enc = getencoder(db.schema, extencoding) if enc: db.setencoder(enc) return db def getdb(dbmodelname): db = databases.get(dbmodelname, None) if db is None: db = opendb(dbmodelname) return db def _getdb(name): db = getdb(name) return db._db _edbm.setgetdb(_getdb) def closedb(dbname): db = databases[dbname] db.close() del databases[dbname] def closequeue(dbname): db = queues[dbname] db.close() del queues[dbname] def getqueue(dbname): schema = config.get(dbname, 'schema') queue = config.get(dbname, 'queuepath') _db = _edbm.db(queue, schema) db = DataBase(_db, dbname) registerqueue(dbname, db) try: module = __import__(dbname) f = module.get_factory(db, dbname) except ImportError: f = Factory(db) db.setfactory(f) return db class IndexLoader: def __init__(self, db, ld, dbname): self._db = db self._ld = ld self.name = dbname self.factory = None def setfactory(self, f): self.factory = f def run(self): db = self._db ld = self._ld f = self.factory for item in db: if f: item = f(item) ld.feed(item) ld.endload() def idxloader(dbname, rules=None, tmpdir="/tmp", factory=None): import edbmload schema = config.get(dbname, 'schema') if rules is None: rules = config.get(dbname, 'rules') path = config.get(dbname, 'dbpath') flags = edbmload.LOAD_INDEXES info = {'schema' : schema, 'rules' : rules, 'flags' : flags, 'tmpdir' : tmpdir } _ld = edbmload.loader(path, info) db = getdb(dbname) ld = IndexLoader(db, _ld, dbname) if factory: ld.setfactory(factory) return ld # XXX C level loaders should have a schema attribute class Loader: def __init__(self, ld, schema, dbname): self._ld = ld self.schema = schema self.name = dbname self._factory = None def setfactory(self, factory): self._factory = factory def feed(self, dict): if self._factory: item = self._factory(dict) self._ld.feed(item) else: self._ld.feed(dict) def endload(self): self._ld.endload() def loader(dbname, rules=None, tmpdir="/tmp", load_data=1, load_indexes=1): import edbmload schemapath = config.get(dbname, 'schema') path = config.get(dbname, 'dbpath') flags = 0 if load_data: flags = flags | edbmload.LOAD_DATA if load_indexes: flags = flags | edbmload.LOAD_INDEXES if rules is None: rules = config.get(dbname, 'rules') info = {'schema' : schemapath, 'rules' : rules, 'flags' : flags, 'tmpdir' : tmpdir } _ld = edbmload.loader(path, info) schema = _edbm.schema(schemapath) if flags & edbmload.LOAD_INDEXES: try: modname = config.get(dbname, 'dbmodule') except: ld = Loader(_ld, schema, dbname) try: factory = config.get(dbname, 'factory') except: factory = dbname try: module = __import__(factory) f = module.get_loading_factory(ld, dbname) except ImportError: pass except AttributeError: pass else: ld.setfactory(f) else: module = __import__(modname) ld = module.get_loader(_ld, dbname) else: ld = Loader(_ld, schema, dbname) return ld class Queue: def __init__(self, q): self._q = q def replace(self, item): return self._q.replace(item) def append(self, item): return self._q.append(item) def put(self, item): return self._q.put(item) def close(self): self._q.close() def queue(dbname, rules=None, tmpdir="/tmp"): import edbmload schemapath = config.get(dbname, 'schema') if rules is None: rules = config.get(dbname, 'rules') dbpath = config.get(dbname, 'dbpath') qpath = config.get(dbname, 'queue') info = {'schema' : schemapath, 'rules' : rules, 'flags' : edbmload.LOAD_INDEXES | edbmload.LOAD_DATA | edbmload.CHECK_UNIQUE, 'tmpdir' : tmpdir } _q = edbmload.queue(dbpath, qpath, info) return Queue(_q)