############################################################################## # # Copyright (c) 2002 Zope Corporation and Contributors. All Rights Reserved. # # This software is subject to the provisions of the Zope Public License, # Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution. # THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS # FOR A PARTICULAR PURPOSE. # ############################################################################## """WebDAV support - lock item. $Id: LockItem.py 40218 2005-11-18 14:39:19Z andreasjung $ """ import time from AccessControl import ClassSecurityInfo from AccessControl.Owned import ownerInfo from Globals import Persistent from zope.interface import implements from common import generateLockToken from interfaces import ILockItem from WriteLockInterface import LockItemInterface MAXTIMEOUT = (2L**32)-1 # Maximum timeout time DEFAULTTIMEOUT = 12 * 60L # Default timeout def validateTimeout(timeout): # Timeout *should* be in the form "Seconds-XXX" or "Infinite" errors = [] try: t =str(timeout).split('-')[-1] if t.lower() == 'infinite': timeout = DEFAULTTIMEOUT # Default to 1800 secods for infinite else: # requests timeout = long(t) except ValueError: errors.append("Bad timeout value") if timeout > MAXTIMEOUT: errors.append("Timeout request is greater than %s" % MAXTIMEOUT) return timeout, errors class LockItem(Persistent): implements(ILockItem) __implements__ = (LockItemInterface,) # Use the Zope 2.3 declarative security to manage access security = ClassSecurityInfo() security.declarePublic('getOwner', 'getLockToken', 'getDepth', 'getTimeout', 'getTimeoutString', 'getModifiedTime', 'isValid', 'getLockScope', 'getLockType') security.declareProtected('Change Lock Information', 'setTimeout', 'refresh') security.declareProtected('Access contents information', 'getCreator', 'getCreatorPath') def __init__(self, creator, owner='', depth=0, timeout='Infinite', locktype='write', lockscope='exclusive', token=None): errors = [] # First check the values and raise value errors if outside of contract if not getattr(creator, 'getUserName', None): errors.append("Creator not a user object") if str(depth).lower() not in ('0', 'infinity'): errors.append("Depth must be 0 or infinity") if locktype.lower() != 'write': errors.append("Lock type '%s' not supported" % locktype) if lockscope.lower() != 'exclusive': errors.append("Lock scope '%s' not supported" % lockscope) timeout, e = validateTimeout(timeout) errors = errors + e # Finally, if there were errors, report them ALL to on high if errors: raise ValueError, errors # AccessControl.Owned.ownerInfo returns the id of the creator # and the path to the UserFolder they're defined in self._creator = ownerInfo(creator) self._owner = owner self._depth = depth self._timeout = timeout self._locktype = locktype self._lockscope = lockscope self._modifiedtime = time.time() if token is None: self._token = generateLockToken() else: self._token = token def getCreator(self): return self._creator def getCreatorPath(self): db, name = self._creator path = '/'.join(db) return "/%s/%s" % (path, name) def getOwner(self): return self._owner def getLockToken(self): return self._token def getDepth(self): return self._depth def getTimeout(self): return self._timeout def getTimeoutString(self): t = str(self._timeout) if t[-1] == 'L': t = t[:-1] # lob off Long signifier return "Second-%s" % t def setTimeout(self, newtimeout): timeout, errors = validateTimeout(newtimeout) if errors: raise ValueError, errors else: self._timeout = timeout self._modifiedtime = time.time() # reset modified def getModifiedTime(self): return self._modifiedtime def refresh(self): self._modifiedtime = time.time() def isValid(self): now = time.time() modified = self._modifiedtime timeout = self._timeout return (modified + timeout) > now def getLockType(self): return self._locktype def getLockScope(self): return self._lockscope def asLockDiscoveryProperty(self, ns='d',fake=0): if fake: token = 'this-is-a-faked-no-permission-token' else: token = self._token s = (' <%(ns)s:activelock>\n' ' <%(ns)s:locktype><%(ns)s:%(locktype)s/>\n' ' <%(ns)s:lockscope><%(ns)s:%(lockscope)s/>\n' ' <%(ns)s:depth>%(depth)s\n' ' <%(ns)s:owner>%(owner)s\n' ' <%(ns)s:timeout>%(timeout)s\n' ' <%(ns)s:locktoken>\n' ' <%(ns)s:href>opaquelocktoken:%(locktoken)s\n' ' \n' ' \n' ) % { 'ns': ns, 'locktype': self._locktype, 'lockscope': self._lockscope, 'depth': self._depth, 'owner': self._owner, 'timeout': self.getTimeoutString(), 'locktoken': token, } return s def asXML(self): s = """ %s """ % self.asLockDiscoveryProperty(ns="d") return s