Merge branch 'master' of https://code.grnet.gr/git/pithos
[pithos] / pithos / backends / modular_alchemy.py
1 # Copyright 2011 GRNET S.A. All rights reserved.
2
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 import os
35 import time
36 import sqlite3
37 import logging
38 import hashlib
39 import binascii
40
41 from base import NotAllowedError, BaseBackend
42 from lib_alchemy.node import Node, ROOTNODE, SERIAL, SIZE, MTIME, MUSER, CLUSTER
43 from lib_alchemy.permissions import Permissions, READ, WRITE
44 from lib_alchemy.policy import Policy
45 from lib_alchemy.hashfiler import Mapper, Blocker
46 from sqlalchemy import create_engine
47
48 ( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
49
50 inf = float('inf')
51
52
53 logger = logging.getLogger(__name__)
54
55 def backend_method(func=None, autocommit=1):
56     if func is None:
57         def fn(func):
58             return backend_method(func, autocommit)
59         return fn
60
61     if not autocommit:
62         return func
63     def fn(self, *args, **kw):
64         self.con.execute('begin deferred')
65         try:
66             ret = func(self, *args, **kw)
67             self.con.commit()
68             return ret
69         except:
70             self.con.rollback()
71             raise
72     return fn
73
74
75 class ModularBackend(BaseBackend):
76     """A modular backend.
77     
78     Uses modules for SQL functions and storage.
79     """
80     
81     def __init__(self, db):
82         self.hash_algorithm = 'sha256'
83         self.block_size = 4 * 1024 * 1024 # 4MB
84         
85         self.default_policy = {'quota': 0, 'versioning': 'auto'}
86         
87         basepath = os.path.split(db)[0]
88         if basepath and not os.path.exists(basepath):
89             os.makedirs(basepath)
90         if not os.path.isdir(basepath):
91             raise RuntimeError("Cannot open database at '%s'" % (db,))
92         
93         dbuser = 'pithos'
94         dbpass = 'archipelagos'
95         dbhost = '62.217.112.56'
96         dbname = 'pithosdb'
97         connection_str = 'mysql://%s:%s@%s/%s' %(dbuser, dbpass, dbhost, dbname)
98         engine = create_engine(connection_str, echo=True)
99         
100         params = {'blocksize': self.block_size,
101                   'blockpath': basepath + '/blocks',
102                   'hashtype': self.hash_algorithm}
103         self.blocker = Blocker(**params)
104         
105         params = {'mappath': basepath + '/maps',
106                   'namelen': self.blocker.hashlen}
107         self.mapper = Mapper(**params)
108         
109         params = {'connection': engine.connect(),
110                   'engine': engine}
111         self.permissions = Permissions(**params)
112         self.policy = Policy(**params)
113         self.node = Node(**params)
114         
115         self.con.commit()
116     
117     @backend_method
118     def list_accounts(self, user, marker=None, limit=10000):
119         """Return a list of accounts the user can access."""
120         
121         logger.debug("list_accounts: %s %s", user, marker, limit)
122         allowed = self._allowed_accounts(user)
123         start, limit = self._list_limits(allowed, marker, limit)
124         return allowed[start:start + limit]
125     
126     @backend_method
127     def get_account_meta(self, user, account, until=None):
128         """Return a dictionary with the account metadata."""
129         
130         logger.debug("get_account_meta: %s %s", account, until)
131         path, node = self._lookup_account(account, user == account)
132         if user != account:
133             if until or node is None or account not in self._allowed_accounts(user):
134                 raise NotAllowedError
135         try:
136             props = self._get_properties(node, until)
137             mtime = props[MTIME]
138         except NameError:
139             props = None
140             mtime = until
141         count, bytes, tstamp = self._get_statistics(node, until)
142         tstamp = max(tstamp, mtime)
143         if until is None:
144             modified = tstamp
145         else:
146             modified = self._get_statistics(node)[2] # Overall last modification.
147             modified = max(modified, mtime)
148         
149         if user != account:
150             meta = {'name': account}
151         else:
152             meta = {}
153             if props is not None:
154                 meta.update(dict(self.node.attribute_get(props[SERIAL])))
155             if until is not None:
156                 meta.update({'until_timestamp': tstamp})
157             meta.update({'name': account, 'count': count, 'bytes': bytes})
158         meta.update({'modified': modified})
159         return meta
160     
161     @backend_method
162     def update_account_meta(self, user, account, meta, replace=False):
163         """Update the metadata associated with the account."""
164         
165         logger.debug("update_account_meta: %s %s %s", account, meta, replace)
166         if user != account:
167             raise NotAllowedError
168         path, node = self._lookup_account(account, True)
169         self._put_metadata(user, node, meta, replace, False)
170     
171     @backend_method
172     def get_account_groups(self, user, account):
173         """Return a dictionary with the user groups defined for this account."""
174         
175         logger.debug("get_account_groups: %s", account)
176         if user != account:
177             if account not in self._allowed_accounts(user):
178                 raise NotAllowedError
179             return {}
180         self._lookup_account(account, True)
181         return self.permissions.group_dict(account)
182     
183     @backend_method
184     def update_account_groups(self, user, account, groups, replace=False):
185         """Update the groups associated with the account."""
186         
187         logger.debug("update_account_groups: %s %s %s", account, groups, replace)
188         if user != account:
189             raise NotAllowedError
190         self._lookup_account(account, True)
191         self._check_groups(groups)
192         if replace:
193             self.permissions.group_destroy(account)
194         for k, v in groups.iteritems():
195             if not replace: # If not already deleted.
196                 self.permissions.group_delete(account, k)
197             if v:
198                 self.permissions.group_addmany(account, k, v)
199     
200     @backend_method
201     def put_account(self, user, account):
202         """Create a new account with the given name."""
203         
204         logger.debug("put_account: %s", account)
205         if user != account:
206             raise NotAllowedError
207         node = self.node.node_lookup(account)
208         if node is not None:
209             raise NameError('Account already exists')
210         self._put_path(user, ROOTNODE, account)
211     
212     @backend_method
213     def delete_account(self, user, account):
214         """Delete the account with the given name."""
215         
216         logger.debug("delete_account: %s", account)
217         if user != account:
218             raise NotAllowedError
219         node = self.node.node_lookup(account)
220         if node is None:
221             return
222         if not self.node.node_remove(node):
223             raise IndexError('Account is not empty')
224         self.permissions.group_destroy(account)
225     
226     @backend_method
227     def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None):
228         """Return a list of containers existing under an account."""
229         
230         logger.debug("list_containers: %s %s %s %s %s", account, marker, limit, shared, until)
231         if user != account:
232             if until or account not in self._allowed_accounts(user):
233                 raise NotAllowedError
234             allowed = self._allowed_containers(user, account)
235             start, limit = self._list_limits(allowed, marker, limit)
236             return allowed[start:start + limit]
237         if shared:
238             allowed = [x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)]
239             start, limit = self._list_limits(allowed, marker, limit)
240             return allowed[start:start + limit]
241         node = self.node.node_lookup(account)
242         return [x[0] for x in self._list_objects(node, account, '', '/', marker, limit, False, [], until)]
243     
244     @backend_method
245     def get_container_meta(self, user, account, container, until=None):
246         """Return a dictionary with the container metadata."""
247         
248         logger.debug("get_container_meta: %s %s %s", account, container, until)
249         if user != account:
250             if until or container not in self._allowed_containers(user, account):
251                 raise NotAllowedError
252         path, node = self._lookup_container(account, container)
253         props = self._get_properties(node, until)
254         mtime = props[MTIME]
255         count, bytes, tstamp = self._get_statistics(node, until)
256         tstamp = max(tstamp, mtime)
257         if until is None:
258             modified = tstamp
259         else:
260             modified = self._get_statistics(node)[2] # Overall last modification.
261             modified = max(modified, mtime)
262         
263         if user != account:
264             meta = {'name': container}
265         else:
266             meta = dict(self.node.attribute_get(props[SERIAL]))
267             if until is not None:
268                 meta.update({'until_timestamp': tstamp})
269             meta.update({'name': container, 'count': count, 'bytes': bytes})
270         meta.update({'modified': modified})
271         return meta
272     
273     @backend_method
274     def update_container_meta(self, user, account, container, meta, replace=False):
275         """Update the metadata associated with the container."""
276         
277         logger.debug("update_container_meta: %s %s %s %s", account, container, meta, replace)
278         if user != account:
279             raise NotAllowedError
280         path, node = self._lookup_container(account, container)
281         self._put_metadata(user, node, meta, replace, False)
282     
283     @backend_method
284     def get_container_policy(self, user, account, container):
285         """Return a dictionary with the container policy."""
286         
287         logger.debug("get_container_policy: %s %s", account, container)
288         if user != account:
289             if container not in self._allowed_containers(user, account):
290                 raise NotAllowedError
291             return {}
292         path = self._lookup_container(account, container)[0]
293         return self.policy.policy_get(path)
294     
295     @backend_method
296     def update_container_policy(self, user, account, container, policy, replace=False):
297         """Update the policy associated with the account."""
298         
299         logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace)
300         if user != account:
301             raise NotAllowedError
302         path = self._lookup_container(account, container)[0]
303         self._check_policy(policy)
304         if replace:
305             for k, v in self.default_policy.iteritems():
306                 if k not in policy:
307                     policy[k] = v
308         self.policy.policy_set(path, policy)
309     
310     @backend_method
311     def put_container(self, user, account, container, policy=None):
312         """Create a new container with the given name."""
313         
314         logger.debug("put_container: %s %s %s", account, container, policy)
315         if user != account:
316             raise NotAllowedError
317         try:
318             path, node = self._lookup_container(account, container)
319         except NameError:
320             pass
321         else:
322             raise NameError('Container already exists')
323         if policy:
324             self._check_policy(policy)
325         path = '/'.join((account, container))
326         self._put_path(user, self._lookup_account(account, True)[1], path)
327         for k, v in self.default_policy.iteritems():
328             if k not in policy:
329                 policy[k] = v
330         self.policy.policy_set(path, policy)
331     
332     @backend_method
333     def delete_container(self, user, account, container, until=None):
334         """Delete/purge the container with the given name."""
335         
336         logger.debug("delete_container: %s %s %s", account, container, until)
337         if user != account:
338             raise NotAllowedError
339         path, node = self._lookup_container(account, container)
340         
341         if until is not None:
342             versions = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
343             for v in versions:
344                 self.mapper.map_remv(v)
345             self.node.node_purge_children(node, until, CLUSTER_DELETED)
346             return
347         
348         if self._get_statistics(node)[0] > 0:
349             raise IndexError('Container is not empty')
350         versions = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
351         for v in versions:
352             self.mapper.map_remv(v)
353         self.node.node_purge_children(node, inf, CLUSTER_DELETED)
354         self.node.node_remove(node)
355         self.policy.policy_unset(path)
356     
357     @backend_method
358     def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], shared=False, until=None):
359         """Return a list of objects existing under a container."""
360         
361         logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, keys, shared, until)
362         allowed = []
363         if user != account:
364             if until:
365                 raise NotAllowedError
366             allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
367             if not allowed:
368                 raise NotAllowedError
369         else:
370             if shared:
371                 allowed = self.permissions.access_list_shared('/'.join((account, container)))
372         path, node = self._lookup_container(account, container)
373         return self._list_objects(node, path, prefix, delimiter, marker, limit, virtual, keys, until, allowed)
374     
375     @backend_method
376     def list_object_meta(self, user, account, container, until=None):
377         """Return a list with all the container's object meta keys."""
378         
379         logger.debug("list_object_meta: %s %s %s", account, container, until)
380         allowed = []
381         if user != account:
382             if until:
383                 raise NotAllowedError
384             allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
385             if not allowed:
386                 raise NotAllowedError
387         path, node = self._lookup_container(account, container)
388         before = until if until is not None else inf
389         return self.node.latest_attribute_keys(node, before, CLUSTER_DELETED, allowed)
390     
391     @backend_method
392     def get_object_meta(self, user, account, container, name, version=None):
393         """Return a dictionary with the object metadata."""
394         
395         logger.debug("get_object_meta: %s %s %s %s", account, container, name, version)
396         self._can_read(user, account, container, name)
397         path, node = self._lookup_object(account, container, name)
398         props = self._get_version(node, version)
399         if version is None:
400             modified = props[MTIME]
401         else:
402             modified = self._get_version(node)[MTIME] # Overall last modification.
403         
404         meta = dict(self.node.attribute_get(props[SERIAL]))
405         meta.update({'name': name, 'bytes': props[SIZE]})
406         meta.update({'version': props[SERIAL], 'version_timestamp': props[MTIME]})
407         meta.update({'modified': modified, 'modified_by': props[MUSER]})
408         return meta
409     
410     @backend_method
411     def update_object_meta(self, user, account, container, name, meta, replace=False):
412         """Update the metadata associated with the object."""
413         
414         logger.debug("update_object_meta: %s %s %s %s %s", account, container, name, meta, replace)
415         self._can_write(user, account, container, name)
416         path, node = self._lookup_object(account, container, name)
417         self._put_metadata(user, node, meta, replace)
418     
419     @backend_method
420     def get_object_permissions(self, user, account, container, name):
421         """Return the path from which this object gets its permissions from,\
422         along with a dictionary containing the permissions."""
423         
424         logger.debug("get_object_permissions: %s %s %s", account, container, name)
425         self._can_read(user, account, container, name)
426         path = self._lookup_object(account, container, name)[0]
427         return self.permissions.access_inherit(path)
428     
429     @backend_method
430     def update_object_permissions(self, user, account, container, name, permissions):
431         """Update the permissions associated with the object."""
432         
433         logger.debug("update_object_permissions: %s %s %s %s", account, container, name, permissions)
434         if user != account:
435             raise NotAllowedError
436         path = self._lookup_object(account, container, name)[0]
437         self._check_permissions(path, permissions)
438         self.permissions.access_set(path, permissions)
439     
440     @backend_method
441     def get_object_public(self, user, account, container, name):
442         """Return the public URL of the object if applicable."""
443         
444         logger.debug("get_object_public: %s %s %s", account, container, name)
445         self._can_read(user, account, container, name)
446         path = self._lookup_object(account, container, name)[0]
447         if self.permissions.public_check(path):
448             return '/public/' + path
449         return None
450     
451     @backend_method
452     def update_object_public(self, user, account, container, name, public):
453         """Update the public status of the object."""
454         
455         logger.debug("update_object_public: %s %s %s %s", account, container, name, public)
456         self._can_write(user, account, container, name)
457         path = self._lookup_object(account, container, name)[0]
458         if not public:
459             self.permissions.public_unset(path)
460         else:
461             self.permissions.public_set(path)
462     
463     @backend_method
464     def get_object_hashmap(self, user, account, container, name, version=None):
465         """Return the object's size and a list with partial hashes."""
466         
467         logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
468         self._can_read(user, account, container, name)
469         path, node = self._lookup_object(account, container, name)
470         props = self._get_version(node, version)
471         hashmap = self.mapper.map_retr(props[SERIAL])
472         return props[SIZE], [binascii.hexlify(x) for x in hashmap]
473     
474     @backend_method
475     def update_object_hashmap(self, user, account, container, name, size, hashmap, meta={}, replace_meta=False, permissions=None):
476         """Create/update an object with the specified size and partial hashes."""
477         
478         logger.debug("update_object_hashmap: %s %s %s %s %s", account, container, name, size, hashmap)
479         if permissions is not None and user != account:
480             raise NotAllowedError
481         self._can_write(user, account, container, name)
482         missing = self.blocker.block_ping([binascii.unhexlify(x) for x in hashmap])
483         if missing:
484             ie = IndexError()
485             ie.data = missing
486             raise ie
487         if permissions is not None:
488             self._check_permissions(path, permissions)
489         path, node = self._put_object_node(account, container, name)
490         src_version_id, dest_version_id = self._copy_version(user, node, None, node, size)
491         self.mapper.map_stor(dest_version_id, [binascii.unhexlify(x) for x in hashmap])
492         if not replace_meta and src_version_id is not None:
493             self.node.attribute_copy(src_version_id, dest_version_id)
494         self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems()))
495         if permissions is not None:
496             self.permissions.access_set(path, permissions)
497     
498     @backend_method
499     def copy_object(self, user, account, src_container, src_name, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None):
500         """Copy an object's data and metadata."""
501         
502         logger.debug("copy_object: %s %s %s %s %s %s %s %s %s", account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version)
503         if permissions is not None and user != account:
504             raise NotAllowedError
505         self._can_read(user, account, src_container, src_name)
506         self._can_write(user, account, dest_container, dest_name)
507         src_path, src_node = self._lookup_object(account, src_container, src_name)
508         if permissions is not None:
509             self._check_permissions(dest_path, permissions)
510         dest_path, dest_node = self._put_object_node(account, dest_container, dest_name)
511         src_version_id, dest_version_id = self._copy_version(user, src_node, src_version, dest_node)
512         if src_version_id is not None:
513             self._copy_data(src_version_id, dest_version_id)
514         if not replace_meta and src_version_id is not None:
515             self.node.attribute_copy(src_version_id, dest_version_id)
516         self.node.attribute_set(dest_version_id, ((k, v) for k, v in dest_meta.iteritems()))
517         if permissions is not None:
518             self.permissions.access_set(dest_path, permissions)
519     
520     @backend_method
521     def move_object(self, user, account, src_container, src_name, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None):
522         """Move an object's data and metadata."""
523         
524         logger.debug("move_object: %s %s %s %s %s %s %s %s", account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions)
525         self.copy_object(user, account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions, None)
526         self.delete_object(user, account, src_container, src_name)
527     
528     @backend_method
529     def delete_object(self, user, account, container, name, until=None):
530         """Delete/purge an object."""
531         
532         logger.debug("delete_object: %s %s %s %s", account, container, name, until)
533         if user != account:
534             raise NotAllowedError
535         
536         if until is not None:
537             path = '/'.join((account, container, name))
538             node = self.node.node_lookup(path)
539             if node is None:
540                 return
541             versions = self.node.node_purge(node, until, CLUSTER_NORMAL)
542             versions += self.node.node_purge(node, until, CLUSTER_HISTORY)
543             for v in versions:
544                 self.mapper.map_remv(v)
545             self.node.node_purge_children(node, until, CLUSTER_DELETED)
546             try:
547                 props = self._get_version(node)
548             except NameError:
549                 pass
550             else:
551                 self.permissions.access_clear(path)
552             return
553         
554         path, node = self._lookup_object(account, container, name)
555         self._copy_version(user, node, None, node, 0, CLUSTER_DELETED)
556         self.permissions.access_clear(path)
557     
558     @backend_method
559     def list_versions(self, user, account, container, name):
560         """Return a list of all (version, version_timestamp) tuples for an object."""
561         
562         logger.debug("list_versions: %s %s %s", account, container, name)
563         self._can_read(user, account, container, name)
564         return self.node.node_get_versions(node, ['serial', 'mtime'])
565     
566     @backend_method(autocommit=0)
567     def get_block(self, hash):
568         """Return a block's data."""
569         
570         logger.debug("get_block: %s", hash)
571         blocks = self.blocker.block_retr((binascii.unhexlify(hash),))
572         if not blocks:
573             raise NameError('Block does not exist')
574         return blocks[0]
575     
576     @backend_method(autocommit=0)
577     def put_block(self, data):
578         """Create a block and return the hash."""
579         
580         logger.debug("put_block: %s", len(data))
581         hashes, absent = self.blocker.block_stor((data,))
582         return binascii.hexlify(hashes[0])
583     
584     @backend_method(autocommit=0)
585     def update_block(self, hash, data, offset=0):
586         """Update a known block and return the hash."""
587         
588         logger.debug("update_block: %s %s %s", hash, len(data), offset)
589         if offset == 0 and len(data) == self.block_size:
590             return self.put_block(data)
591         h, e = self.blocker.block_delta(binascii.unhexlify(hash), ((offset, data),))
592         return binascii.hexlify(h)
593     
594     def _check_policy(self, policy):
595         for k in policy.keys():
596             if policy[k] == '':
597                 policy[k] = self.default_policy.get(k)
598         for k, v in policy.iteritems():
599             if k == 'quota':
600                 q = int(v) # May raise ValueError.
601                 if q < 0:
602                     raise ValueError
603             elif k == 'versioning':
604                 if v not in ['auto', 'manual', 'none']:
605                     raise ValueError
606             else:
607                 raise ValueError
608     
609     def _sql_until(self, parent, until=None):
610         """Return the sql to get the latest versions until the timestamp given."""
611         
612         if until is None:
613             until = time.time()
614         sql = ("select v.serial, n.path, v.mtime, v.size "
615                "from versions v, nodes n "
616                "where v.serial = (select max(serial) "
617                                  "from versions "
618                                  "where node = v.node and mtime < %s) "
619                "and v.cluster != %s "
620                "and v.node = n.node "
621                "and v.node in (select node "
622                               "from nodes "
623                               "where parent = %s)")
624         return sql % (until, CLUSTER_DELETED, parent)
625     
626     def _list_limits(self, listing, marker, limit):
627         start = 0
628         if marker:
629             try:
630                 start = listing.index(marker) + 1
631             except ValueError:
632                 pass
633         if not limit or limit > 10000:
634             limit = 10000
635         return start, limit
636     
637     def _list_objects(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], until=None, allowed=[]):
638         cont_prefix = path + '/'
639         if keys and len(keys) > 0:
640 #             sql = '''select distinct o.name, o.version_id from (%s) o, metadata m where o.name like ? and
641 #                         m.version_id = o.version_id and m.key in (%s)'''
642 #             sql = sql % (self._sql_until(until), ', '.join('?' * len(keys)))
643 #             param = (cont_prefix + prefix + '%',) + tuple(keys)
644 #             if allowed:
645 #                 sql += ' and (' + ' or '.join(('o.name like ?',) * len(allowed)) + ')'
646 #                 param += tuple([x + '%' for x in allowed])
647 #             sql += ' order by o.name'
648             return []
649         else:
650             sql = 'select path, serial from (%s) where path like ?'
651             sql = sql % self._sql_until(parent, until)
652             param = (cont_prefix + prefix + '%',)
653             if allowed:
654                 sql += ' and (' + ' or '.join(('name like ?',) * len(allowed)) + ')'
655                 param += tuple([x + '%' for x in allowed])
656             sql += ' order by path'
657         c = self.con.execute(sql, param)
658         objects = [(x[0][len(cont_prefix):], x[1]) for x in c.fetchall()]
659         if delimiter:
660             pseudo_objects = []
661             for x in objects:
662                 pseudo_name = x[0]
663                 i = pseudo_name.find(delimiter, len(prefix))
664                 if not virtual:
665                     # If the delimiter is not found, or the name ends
666                     # with the delimiter's first occurence.
667                     if i == -1 or len(pseudo_name) == i + len(delimiter):
668                         pseudo_objects.append(x)
669                 else:
670                     # If the delimiter is found, keep up to (and including) the delimiter.
671                     if i != -1:
672                         pseudo_name = pseudo_name[:i + len(delimiter)]
673                     if pseudo_name not in [y[0] for y in pseudo_objects]:
674                         if pseudo_name == x[0]:
675                             pseudo_objects.append(x)
676                         else:
677                             pseudo_objects.append((pseudo_name, None))
678             objects = pseudo_objects
679         
680         start, limit = self._list_limits([x[0] for x in objects], marker, limit)
681         return objects[start:start + limit]
682     
683     # Path functions.
684     
685     def _put_object_node(self, account, container, name):
686         path, parent = self._lookup_container(account, container)
687         path = '/'.join((path, name))
688         node = self.node.node_lookup(path)
689         if node is None:
690             node = self.node.node_create(parent, path)
691         return path, node
692     
693     def _put_path(self, user, parent, path):
694         node = self.node.node_create(parent, path)
695         self.node.version_create(node, 0, None, user, CLUSTER_NORMAL)
696         return node
697     
698     def _lookup_account(self, account, create=True):
699         node = self.node.node_lookup(account)
700         if node is None and create:
701             node = self._put_path(account, ROOTNODE, account) # User is account.
702         return account, node
703     
704     def _lookup_container(self, account, container):
705         path = '/'.join((account, container))
706         node = self.node.node_lookup(path)
707         if node is None:
708             raise NameError('Container does not exist')
709         return path, node
710     
711     def _lookup_object(self, account, container, name):
712         path = '/'.join((account, container, name))
713         node = self.node.node_lookup(path)
714         if node is None:
715             raise NameError('Object does not exist')
716         return path, node
717     
718     def _get_properties(self, node, until=None):
719         """Return properties until the timestamp given."""
720         
721         before = until if until is not None else inf
722         props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
723         if props is None and until is not None:
724             props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
725         if props is None:
726             raise NameError('Path does not exist')
727         return props
728     
729     def _get_statistics(self, node, until=None):
730         """Return count, sum of size and latest timestamp of everything under node."""
731         
732         if until is None:
733             stats = self.node.statistics_get(node, CLUSTER_NORMAL)
734         else:
735             stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
736         if stats is None:
737             stats = (0, 0, 0)
738         return stats
739     
740     def _get_version(self, node, version=None):
741         if version is None:
742             props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
743             if props is None:
744                 raise NameError('Object does not exist')
745         else:
746             props = self.node.version_get_properties(version)
747             if props is None or props[CLUSTER] == CLUSTER_DELETED:
748                 raise IndexError('Version does not exist')
749         return props
750     
751     def _copy_version(self, user, src_node, src_version, dest_node, dest_size=None, dest_cluster=CLUSTER_NORMAL):
752         
753         # Get source serial and size.
754         if src_version is not None:
755             src_props = self._get_version(src_node, src_version)
756             src_version_id = src_props[SERIAL]
757             size = src_props[SIZE]
758         else:
759             # Latest or create from scratch.
760             try:
761                 src_props = self._get_version(src_node)
762                 src_version_id = src_props[SERIAL]
763                 size = src_props[SIZE]
764             except NameError:
765                 src_version_id = None
766                 size = 0
767         if dest_size is not None:
768             size = dest_size
769         
770         # Move the latest version at destination to CLUSTER_HISTORY and create new.
771         if src_node == dest_node and src_version is None and src_version_id is not None:
772             self.node.version_recluster(src_version_id, CLUSTER_HISTORY)
773         else:
774             dest_props = self.node.version_lookup(dest_node, inf, CLUSTER_NORMAL)
775             if dest_props is not None:
776                 self.node.version_recluster(dest_props[SERIAL], CLUSTER_HISTORY)
777         dest_version_id, mtime = self.node.version_create(dest_node, size, src_version_id, user, dest_cluster)
778         
779         return src_version_id, dest_version_id
780     
781     def _copy_data(self, src_version, dest_version):
782         hashmap = self.mapper.map_retr(src_version)
783         self.mapper.map_stor(dest_version, hashmap)
784     
785     def _get_metadata(self, version):
786         if version is None:
787             return {}
788         return dict(self.node.attribute_get(version))
789     
790     def _put_metadata(self, user, node, meta, replace=False, copy_data=True):
791         """Create a new version and store metadata."""
792         
793         src_version_id, dest_version_id = self._copy_version(user, node, None, node)
794         if not replace:
795             if src_version_id is not None:
796                 self.node.attribute_copy(src_version_id, dest_version_id)
797             self.node.attribute_del(dest_version_id, (k for k, v in meta.iteritems() if v == ''))
798             self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems() if v != ''))
799         else:
800             self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems()))
801         if copy_data and src_version_id is not None:
802             self._copy_data(src_version_id, dest_version_id)
803     
804     # Access control functions.
805     
806     def _check_groups(self, groups):
807         # raise ValueError('Bad characters in groups')
808         pass
809     
810     def _check_permissions(self, path, permissions):
811         # raise ValueError('Bad characters in permissions')
812         
813         # Check for existing permissions.
814         paths = self.permissions.access_list(path)
815         if paths:
816             ae = AttributeError()
817             ae.data = paths
818             raise ae
819     
820     def _can_read(self, user, account, container, name):
821         if user == account:
822             return True
823         path = '/'.join((account, container, name))
824         if not self.permissions.access_check(path, READ, user) and not self.permissions.access_check(path, WRITE, user):
825             raise NotAllowedError
826     
827     def _can_write(self, user, account, container, name):
828         if user == account:
829             return True
830         path = '/'.join((account, container, name))
831         if not self.permissions.access_check(path, WRITE, user):
832             raise NotAllowedError
833     
834     def _allowed_accounts(self, user):
835         allow = set()
836         for path in self.permissions.access_list_paths(user):
837             allow.add(path.split('/', 1)[0])
838         return sorted(allow)
839     
840     def _allowed_containers(self, user, account):
841         allow = set()
842         for path in self.permissions.access_list_paths(user, account):
843             allow.add(path.split('/', 2)[1])
844         return sorted(allow)