##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors. All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
__doc__='''Generic Database adapter'''
__version__='$Revision: 1.116 $'[11:-2]
import OFS.SimpleItem, Aqueduct, RDB, re
import DocumentTemplate, marshal, md5, base64, Acquisition, os
from Aqueduct import decodestring, parse
from Aqueduct import custom_default_report, default_input_form
from Globals import DTMLFile, MessageDialog
from cStringIO import StringIO
import sys, Globals, OFS.SimpleItem, AccessControl.Role
from string import atoi, find, join, split, rstrip
import DocumentTemplate, sqlvar, sqltest, sqlgroup
from DocumentTemplate.html_quote import html_quote
from time import time
from zlib import compress, decompress
from DateTime.DateTime import DateTime
md5new=md5.new
import ExtensionClass
import DocumentTemplate.DT_Util
from cPickle import dumps, loads
from Results import Results
from App.Extensions import getBrain
from Globals import InitializeClass
from AccessControl import ClassSecurityInfo
from AccessControl import getSecurityManager
from AccessControl.Permissions import change_database_methods
from AccessControl.Permissions import use_database_methods
from AccessControl.Permissions import view_management_screens
from AccessControl.DTML import RestrictedDTML
from webdav.Resource import Resource
from webdav.Lockable import ResourceLockedError
from zExceptions import BadRequest
from BTrees.OOBTree import OOBucket as Bucket
class DatabaseError(BadRequest):
" base class for external relational data base connection problems "
pass
class nvSQL(DocumentTemplate.HTML):
# Non-validating SQL Template for use by SQLFiles.
commands={}
for k, v in DocumentTemplate.HTML.commands.items(): commands[k]=v
commands['sqlvar' ]=sqlvar.SQLVar
commands['sqltest']=sqltest.SQLTest
commands['sqlgroup' ]=sqlgroup.SQLGroup
_proxy_roles=()
class SQL(RestrictedDTML, ExtensionClass.Base, nvSQL):
# Validating SQL template for Zope SQL Methods.
pass
class DA(
Aqueduct.BaseQuery,Acquisition.Implicit,
Globals.Persistent,
AccessControl.Role.RoleManager,
OFS.SimpleItem.Item,
Resource
):
'Database Adapter'
security = ClassSecurityInfo()
security.declareObjectProtected(use_database_methods)
security.setPermissionDefault(use_database_methods,
('Anonymous', 'Manager'))
_col=None
max_rows_=1000
cache_time_=0
max_cache_=100
class_name_=class_file_=''
_zclass=None
allow_simple_one_argument_traversal=None
template_class=SQL
connection_hook=None
manage_options=(
(
{'label':'Edit', 'action':'manage_main',
'help':('ZSQLMethods','Z-SQL-Method_Edit.stx')},
{'label':'Test', 'action':'manage_testForm',
'help':('ZSQLMethods','Z-SQL-Method_Test.stx')},
{'label':'Advanced', 'action':'manage_advancedForm',
'help':('ZSQLMethods','Z-SQL-Method_Advanced.stx')},
)
+AccessControl.Role.RoleManager.manage_options
+OFS.SimpleItem.Item.manage_options
)
def __init__(self, id, title, connection_id, arguments, template):
self.id=str(id)
self.manage_edit(title, connection_id, arguments, template)
security.declareProtected(view_management_screens, 'manage_advancedForm')
manage_advancedForm=DTMLFile('dtml/advanced', globals())
security.declarePublic('test_url')
def test_url_(self):
'Method for testing server connection information'
return 'PING'
_size_changes={
'Bigger': (5,5),
'Smaller': (-5,-5),
'Narrower': (0,-5),
'Wider': (0,5),
'Taller': (5,0),
'Shorter': (-5,0),
}
def _er(self,title,connection_id,arguments,template,
SUBMIT,dtpref_cols,dtpref_rows,REQUEST):
dr,dc = self._size_changes[SUBMIT]
rows = str(max(1, int(dtpref_rows) + dr))
cols = str(dtpref_cols)
if cols.endswith('%'):
cols = str(min(100, max(25, int(cols[:-1]) + dc))) + '%'
else:
cols = str(max(35, int(cols) + dc))
e = (DateTime("GMT") + 365).rfc822()
setCookie = REQUEST["RESPONSE"].setCookie
setCookie("dtpref_rows", rows, path='/', expires=e)
setCookie("dtpref_cols", cols, path='/', expires=e)
REQUEST.other.update({"dtpref_cols":cols, "dtpref_rows":rows})
return self.manage_main(self, REQUEST, title=title,
arguments_src=arguments,
connection_id=connection_id, src=template)
security.declareProtected(change_database_methods, 'manage_edit')
def manage_edit(self,title,connection_id,arguments,template,
SUBMIT='Change', dtpref_cols='100%', dtpref_rows='20',
REQUEST=None):
"""Change database method properties
The 'connection_id' argument is the id of a database connection
that resides in the current folder or in a folder above the
current folder. The database should understand SQL.
The 'arguments' argument is a string containing an arguments
specification, as would be given in the SQL method cration form.
The 'template' argument is a string containing the source for the
SQL Template.
"""
if self._size_changes.has_key(SUBMIT):
return self._er(title,connection_id,arguments,template,
SUBMIT,dtpref_cols,dtpref_rows,REQUEST)
if self.wl_isLocked():
raise ResourceLockedError, 'SQL Method is locked via WebDAV'
self.title=str(title)
self.connection_id=str(connection_id)
arguments=str(arguments)
self.arguments_src=arguments
self._arg=parse(arguments)
template=str(template)
self.src=template
self.template=t=self.template_class(template)
t.cook()
self._v_cache={}, Bucket()
if REQUEST:
if SUBMIT=='Change and Test':
return self.manage_testForm(REQUEST)
message='ZSQL Method content changed'
return self.manage_main(self, REQUEST, manage_tabs_message=message)
return ''
security.declareProtected(change_database_methods, 'manage_advanced')
def manage_advanced(self, max_rows, max_cache, cache_time,
class_name, class_file, direct=None,
REQUEST=None, zclass='', connection_hook=None):
"""Change advanced properties
The arguments are:
max_rows -- The maximum number of rows to be returned from a query.
max_cache -- The maximum number of results to cache
cache_time -- The maximum amound of time to use a cached result.
class_name -- The name of a class that provides additional
attributes for result record objects. This class will be a
base class of the result record class.
class_file -- The name of the file containing the class
definition.
The class file normally resides in the 'Extensions'
directory, however, the file name may have a prefix of
'product.', indicating that it should be found in a product
directory.
For example, if the class file is: 'ACMEWidgets.foo', then an
attempt will first be made to use the file
'lib/python/Products/ACMEWidgets/Extensions/foo.py'. If this
failes, then the file 'Extensions/ACMEWidgets.foo.py' will be
used.
"""
# paranoid type checking
if type(max_rows) is not type(1):
max_rows=atoi(max_rows)
if type(max_cache) is not type(1):
max_cache=atoi(max_cache)
if type(cache_time) is not type(1):
cache_time=atoi(cache_time)
class_name=str(class_name)
class_file=str(class_file)
self.max_rows_ = max_rows
self.max_cache_, self.cache_time_ = max_cache, cache_time
self._v_cache={}, Bucket()
self.class_name_, self.class_file_ = class_name, class_file
self._v_brain=getBrain(self.class_file_, self.class_name_, 1)
self.allow_simple_one_argument_traversal=direct
self.connection_hook = connection_hook
if zclass:
for d in self.aq_acquire('_getProductRegistryData')('zclasses'):
if ("%s/%s" % (d.get('product'),d.get('id'))) == zclass:
self._zclass=d['meta_class']
break
if REQUEST is not None:
m="ZSQL Method advanced settings have been set"
return self.manage_advancedForm(self,REQUEST,manage_tabs_message=m)
## return self.manage_editedDialog(REQUEST)
#def getFindContent(self):
# """Return content for use by the Find machinery."""
# return '%s\n%s' % (self.arguments_src, self.src)
security.declareProtected(view_management_screens, 'PrincipiaSearchSource')
def PrincipiaSearchSource(self):
"""Return content for use by the Find machinery."""
return '%s\n%s' % (self.arguments_src, self.src)
# WebDAV / FTP support
default_content_type = 'text/plain'
security.declareProtected(view_management_screens, 'document_src')
def document_src(self, REQUEST=None, RESPONSE=None):
"""Return unprocessed document source."""
if RESPONSE is not None:
RESPONSE.setHeader('Content-Type', 'text/plain')
return '%s\n%s' % (self.arguments_src, self.src)
def manage_FTPget(self):
"""Get source for FTP download"""
self.REQUEST.RESPONSE.setHeader('Content-Type', 'text/plain')
return '%s\n%s' % (self.arguments_src, self.src)
def get_size(self): return len(self.document_src())
security.declareProtected(change_database_methods, 'PUT')
def PUT(self, REQUEST, RESPONSE):
"""Handle put requests"""
self.dav__init(REQUEST, RESPONSE)
self.dav__simpleifhandler(REQUEST, RESPONSE, refresh=1)
body = REQUEST.get('BODY', '')
m = re.match('\s*(.*)\s*\n', body, re.I | re.S)
if m:
self.arguments_src = m.group(1)
self._arg=parse(self.arguments_src)
body = body[m.end():]
template = body
self.src = template
self.template=t=self.template_class(template)
t.cook()
self._v_cache={}, Bucket()
RESPONSE.setStatus(204)
return RESPONSE
security.declareProtected(change_database_methods, 'manage_testForm')
def manage_testForm(self, REQUEST):
" "
input_src=default_input_form(self.title_or_id(),
self._arg, 'manage_test',
'')
return DocumentTemplate.HTML(input_src)(self, REQUEST, HTTP_REFERER='')
security.declareProtected(change_database_methods, 'manage_test')
def manage_test(self, REQUEST):
"""Test an SQL method."""
# Try to render the query template first so that the rendered
# source will be available for the error message in case some
# error occurs...
try: src=self(REQUEST, src__=1)
except: src="Could not render the query template!"
result=()
t=v=tb=None
try:
try:
src, result=self(REQUEST, test__=1)
if find(src,'\0'):
src=join(split(src,'\0'),'\n'+'-'*60+'\n')
if result._searchable_result_columns():
r=custom_default_report(self.id, result)
else:
r='This statement returned no results.'
except:
t, v, tb = sys.exc_info()
r='Error, %s: %s' % (t, v)
report=DocumentTemplate.HTML(
'\n'
'\n'
'\n
\n%s\n\n'
'
SQL used:
\n\n%s\n
\n
\n'
''
% (r,html_quote(src)))
report=apply(report,(self,REQUEST),{self.id:result})
if tb is not None:
self.raise_standardErrorMessage(
None, REQUEST, t, v, tb, None, report)
return report
finally: tb=None
security.declareProtected(view_management_screens, 'index_html')
def index_html(self, REQUEST):
""" """
REQUEST.RESPONSE.redirect("%s/manage_testForm" % REQUEST['URL1'])
def _searchable_arguments(self): return self._arg
def _searchable_result_columns(self): return self._col
def _cached_result(self, DB__, query, max_rows, conn_id):
# Try to fetch a result from the cache.
# Compute and cache the result otherwise.
# Also maintains the cache and ensures stale entries
# are never returned and that the cache never gets too large.
# NB: Correct cache behavior is predicated on Bucket.keys()
# returning a sequence ordered from smalled number
# (ie: the oldest cache entry) to largest number
# (ie: the newest cache entry). Please be careful if you
# change the class instantied below!
# get hold of a cache
caches = getattr(self,'_v_cache',None)
if caches is None:
caches = self._v_cache = {}, Bucket()
cache, tcache = caches
# the key for caching
cache_key = query,max_rows,conn_id
# the maximum number of result sets to cache
max_cache=self.max_cache_
# the current time
now=time()
# the oldest time which is not stale
t=now-self.cache_time_
# if the cache is too big, we purge entries from it
if len(cache) >= max_cache:
keys=tcache.keys()
# We also hoover out any stale entries, as we're
# already doing cache minimisation.
# 'keys' is ordered, so we purge the oldest results
# until the cache is small enough and there are no
# stale entries in it
while keys and (len(keys) >= max_cache or keys[0] < t):
key=keys[0]
q=tcache[key]
del tcache[key]
del cache[q]
del keys[0]
# okay, now see if we have a cached result
if cache.has_key(cache_key):
k, r = cache[cache_key]
# the result may still be stale, as we only hoover out
# stale results above if the cache gets too large.
if k > t:
# yay! a cached result returned!
return r
else:
# delete stale cache entries
del cache[cache_key]
del tcache[k]
# call the pure query
result=DB__.query(query,max_rows)
# When a ZSQL method is handled by one ZPublisher thread twice in
# less time than it takes for time.time() to return a different
# value, the SQL generated is different, then this code will leak
# an entry in 'cache' for each time the ZSQL method generates
# different SQL until time.time() returns a different value.
#
# On Linux, you would need an extremely fast machine under extremely
# high load, making this extremely unlikely. On Windows, this is a
# little more likely, but still unlikely to be a problem.
#
# If it does become a problem, the values of the tcache mapping
# need to be turned into sets of cache keys rather than a single
# cache key.
tcache[now]=cache_key
cache[cache_key]= now, result
return result
security.declareProtected(use_database_methods, '__call__')
def __call__(self, REQUEST=None, __ick__=None, src__=0, test__=0, **kw):
"""Call the database method
The arguments to the method should be passed via keyword
arguments, or in a single mapping object. If no arguments are
given, and if the method was invoked through the Web, then the
method will try to acquire and use the Web REQUEST object as
the argument mapping.
The returned value is a sequence of record objects.
"""
__traceback_supplement__ = (SQLMethodTracebackSupplement, self)
if REQUEST is None:
if kw: REQUEST=kw
else:
if hasattr(self, 'REQUEST'): REQUEST=self.REQUEST
else: REQUEST={}
# connection hook
c = self.connection_id
# for backwards compatability
hk = self.connection_hook
# go get the connection hook and call it
if hk: c = getattr(self, hk)()
try: dbc=getattr(self, c)
except AttributeError:
raise AttributeError, (
"The database connection %s cannot be found." % (
c))
try: DB__=dbc()
except: raise DatabaseError, (
'%s is not connected to a database' % self.id)
if hasattr(self, 'aq_parent'):
p=self.aq_parent
if self._isBeingAccessedAsZClassDefinedInstanceMethod():
p=p.aq_parent
else: p=None
argdata=self._argdata(REQUEST)
argdata['sql_delimiter']='\0'
argdata['sql_quote__']=dbc.sql_quote__
security=getSecurityManager()
security.addContext(self)
try:
try: query=apply(self.template, (p,), argdata)
except TypeError, msg:
msg = str(msg)
if find(msg,'client') >= 0:
raise NameError("'client' may not be used as an " +
"argument name in this context")
else: raise
finally: security.removeContext(self)
if src__: return query
if self.cache_time_ > 0 and self.max_cache_ > 0:
result=self._cached_result(DB__, query, self.max_rows_, c)
else:
result=DB__.query(query, self.max_rows_)
if hasattr(self, '_v_brain'): brain=self._v_brain
else:
brain=self._v_brain=getBrain(self.class_file_, self.class_name_)
zc=self._zclass
if zc is not None: zc=zc._zclass_
if type(result) is type(''):
f=StringIO()
f.write(result)
f.seek(0)
result=RDB.File(f,brain,p, zc)
else:
result=Results(result, brain, p, zc)
columns=result._searchable_result_columns()
if test__ and columns != self._col: self._col=columns
# If run in test mode, return both the query and results so
# that the template doesn't have to be rendered twice!
if test__: return query, result
return result
def da_has_single_argument(self): return len(self._arg)==1
def __getitem__(self, key):
args=self._arg
if self.allow_simple_one_argument_traversal and len(args)==1:
results=self({args.keys()[0]: key})
if results:
if len(results) > 1: raise KeyError, key
else: raise KeyError, key
r=results[0]
# if hasattr(self, 'aq_parent'): r=r.__of__(self.aq_parent)
return r
self._arg[key] # raise KeyError if not an arg
return Traverse(self,{},key)
def connectionIsValid(self):
return (hasattr(self, self.connection_id) and
hasattr(getattr(self, self.connection_id), 'connected'))
def connected(self):
return getattr(getattr(self, self.connection_id), 'connected')()
security.declareProtected(change_database_methods,
'manage_product_zclass_info')
def manage_product_zclass_info(self):
r=[]
Z=self._zclass
Z=getattr(Z, 'aq_self', Z)
for d in self.aq_acquire('_getProductRegistryData')('zclasses'):
z=d['meta_class']
if hasattr(z._zclass_,'_p_deactivate'):
# Eek, persistent
continue
x={}
x.update(d)
x['selected'] = (z is Z) and 'selected' or ''
del x['meta_class']
r.append(x)
return r
InitializeClass(DA)
ListType=type([])
class Traverse(ExtensionClass.Base):
"""Helper class for 'traversing' searches during URL traversal
"""
_da=None
def __init__(self, da, args, name=None):
self._r=None
self._da=da
self._args=args
self._name=name
def __bobo_traverse__(self, REQUEST, key):
name=self._name
da=self.__dict__['_da']
args=self._args
if name:
if args.has_key(name):
v=args[name]
if type(v) is not ListType: v=[v]
v.append(key)
key=v
args[name]=key
if len(args) < len(da._arg):
return self.__class__(da, args)
key=self # "consume" key
elif da._arg.has_key(key): return self.__class__(da, args, key)
results=da(args)
if results:
if len(results) > 1:
try: return results[atoi(key)].__of__(da)
except: raise KeyError, key
else: raise KeyError, key
r=results[0]
# if hasattr(da, 'aq_parent'): r=r.__of__(da.aq_parent)
self._r=r
if key is self: return r
if hasattr(r,'__bobo_traverse__'):
try: return r.__bobo_traverse__(REQUEST, key)
except: pass
try: return getattr(r,key)
except AttributeError, v:
if str(v) != key: raise AttributeError, v
return r[key]
def __getattr__(self, name):
r=self.__dict__['_r']
if hasattr(r, name): return getattr(r,name)
return getattr(self.__dict__['_da'], name)
class SQLMethodTracebackSupplement:
#__implements__ = ITracebackSupplement
def __init__(self, sql):
self.object = sql