fix: initialize per-request quotaholder seriallist
[pithos] / snf-pithos-backend / pithos / backends / modular.py
1 # Copyright 2011-2012 GRNET S.A. All rights reserved.
2 #
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6 #
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10 #
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15 #
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28 #
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 import sys
35 import os
36 import time
37 import uuid as uuidlib
38 import logging
39 import hashlib
40 import binascii
41
42 from commissioning.clients.quotaholder import QuotaholderHTTP
43
44 from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend, \
45     AccountExists, ContainerExists, AccountNotEmpty, ContainerNotEmpty, ItemNotExists, VersionNotExists
46
47 # Stripped-down version of the HashMap class found in tools.
48
49
50 class HashMap(list):
51
52     def __init__(self, blocksize, blockhash):
53         super(HashMap, self).__init__()
54         self.blocksize = blocksize
55         self.blockhash = blockhash
56
57     def _hash_raw(self, v):
58         h = hashlib.new(self.blockhash)
59         h.update(v)
60         return h.digest()
61
62     def hash(self):
63         if len(self) == 0:
64             return self._hash_raw('')
65         if len(self) == 1:
66             return self.__getitem__(0)
67
68         h = list(self)
69         s = 2
70         while s < len(h):
71             s = s * 2
72         h += [('\x00' * len(h[0]))] * (s - len(h))
73         while len(h) > 1:
74             h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
75         return h[0]
76
77 # Default modules and settings.
78 DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
79 DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
80 DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
81 DEFAULT_BLOCK_PATH = 'data/'
82 DEFAULT_BLOCK_UMASK = 0o022
83 #DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
84 #DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
85 #DEFAULT_QUEUE_EXCHANGE = 'pithos'
86
87 QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
88 QUEUE_CLIENT_ID = 'pithos'
89 QUEUE_INSTANCE_ID = '1'
90
91 (CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
92
93 inf = float('inf')
94
95 ULTIMATE_ANSWER = 42
96
97
98 logger = logging.getLogger(__name__)
99
100
101 def backend_method(func=None, autocommit=1):
102     if func is None:
103         def fn(func):
104             return backend_method(func, autocommit)
105         return fn
106
107     if not autocommit:
108         return func
109
110     def fn(self, *args, **kw):
111         self.wrapper.execute()
112         serials = []
113         self.serials = serials
114         self.messages = []
115         try:
116             ret = func(self, *args, **kw)
117             for m in self.messages:
118                 self.queue.send(*m)
119             if serials:
120                 self.quotaholder.accept_commission(
121                             context     =   {},
122                             clientkey   =   'pithos',
123                             serials     =   serials)
124             self.wrapper.commit()
125             return ret
126         except:
127             self.quotaholder.reject_commission(
128                         context     =   {},
129                         clientkey   =   'pithos',
130                         serials     =   serials)
131             self.wrapper.rollback()
132             raise
133     return fn
134
135
136 class ModularBackend(BaseBackend):
137     """A modular backend.
138
139     Uses modules for SQL functions and storage.
140     """
141
142     def __init__(self, db_module=None, db_connection=None,
143                  block_module=None, block_path=None, block_umask=None,
144                  queue_module=None, queue_hosts=None,
145                  queue_exchange=None, quotaholder_url=None):
146         db_module = db_module or DEFAULT_DB_MODULE
147         db_connection = db_connection or DEFAULT_DB_CONNECTION
148         block_module = block_module or DEFAULT_BLOCK_MODULE
149         block_path = block_path or DEFAULT_BLOCK_PATH
150         block_umask = block_umask or DEFAULT_BLOCK_UMASK
151         #queue_module = queue_module or DEFAULT_QUEUE_MODULE
152         #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
153         #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
154                 
155         self.hash_algorithm = 'sha256'
156         self.block_size = 4 * 1024 * 1024  # 4MB
157
158         self.default_policy = {'quota': DEFAULT_QUOTA,
159                                'versioning': DEFAULT_VERSIONING}
160
161         def load_module(m):
162             __import__(m)
163             return sys.modules[m]
164
165         self.db_module = load_module(db_module)
166         self.wrapper = self.db_module.DBWrapper(db_connection)
167         params = {'wrapper': self.wrapper}
168         self.permissions = self.db_module.Permissions(**params)
169         self.config = self.db_module.Config(**params)
170         self.quotaholder_serials = self.db_module.QuotaholderSerial(**params)
171         for x in ['READ', 'WRITE']:
172             setattr(self, x, getattr(self.db_module, x))
173         self.node = self.db_module.Node(**params)
174         for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
175             setattr(self, x, getattr(self.db_module, x))
176
177         self.block_module = load_module(block_module)
178         params = {'path': block_path,
179                   'block_size': self.block_size,
180                   'hash_algorithm': self.hash_algorithm,
181                   'umask': block_umask}
182         self.store = self.block_module.Store(**params)
183
184         if queue_module and queue_hosts:
185             self.queue_module = load_module(queue_module)
186             params = {'hosts': queue_hosts,
187                           'exchange': queue_exchange,
188                       'client_id': QUEUE_CLIENT_ID}
189             self.queue = self.queue_module.Queue(**params)
190         else:
191             class NoQueue:
192                 def send(self, *args):
193                     pass
194
195                 def close(self):
196                     pass
197
198             self.queue = NoQueue()
199
200         self.quotaholder_url = quotaholder_url
201         self.quotaholder = QuotaholderHTTP(quotaholder_url)
202         self.serials = []
203
204     def close(self):
205         self.wrapper.close()
206         self.queue.close()
207
208     @backend_method
209     def list_accounts(self, user, marker=None, limit=10000):
210         """Return a list of accounts the user can access."""
211
212         logger.debug("list_accounts: %s %s %s", user, marker, limit)
213         allowed = self._allowed_accounts(user)
214         start, limit = self._list_limits(allowed, marker, limit)
215         return allowed[start:start + limit]
216
217     @backend_method
218     def get_account_meta(self, user, account, domain, until=None, include_user_defined=True):
219         """Return a dictionary with the account metadata for the domain."""
220
221         logger.debug(
222             "get_account_meta: %s %s %s %s", user, account, domain, until)
223         path, node = self._lookup_account(account, user == account)
224         if user != account:
225             if until or node is None or account not in self._allowed_accounts(user):
226                 raise NotAllowedError
227         try:
228             props = self._get_properties(node, until)
229             mtime = props[self.MTIME]
230         except NameError:
231             props = None
232             mtime = until
233         count, bytes, tstamp = self._get_statistics(node, until)
234         tstamp = max(tstamp, mtime)
235         if until is None:
236             modified = tstamp
237         else:
238             modified = self._get_statistics(
239                 node)[2]  # Overall last modification.
240             modified = max(modified, mtime)
241
242         if user != account:
243             meta = {'name': account}
244         else:
245             meta = {}
246             if props is not None and include_user_defined:
247                 meta.update(
248                     dict(self.node.attribute_get(props[self.SERIAL], domain)))
249             if until is not None:
250                 meta.update({'until_timestamp': tstamp})
251             meta.update({'name': account, 'count': count, 'bytes': bytes})
252         meta.update({'modified': modified})
253         return meta
254
255     @backend_method
256     def update_account_meta(self, user, account, domain, meta, replace=False):
257         """Update the metadata associated with the account for the domain."""
258
259         logger.debug("update_account_meta: %s %s %s %s %s", user,
260                      account, domain, meta, replace)
261         if user != account:
262             raise NotAllowedError
263         path, node = self._lookup_account(account, True)
264         self._put_metadata(user, node, domain, meta, replace)
265
266     @backend_method
267     def get_account_groups(self, user, account):
268         """Return a dictionary with the user groups defined for this account."""
269
270         logger.debug("get_account_groups: %s %s", user, account)
271         if user != account:
272             if account not in self._allowed_accounts(user):
273                 raise NotAllowedError
274             return {}
275         self._lookup_account(account, True)
276         return self.permissions.group_dict(account)
277
278     @backend_method
279     def update_account_groups(self, user, account, groups, replace=False):
280         """Update the groups associated with the account."""
281
282         logger.debug("update_account_groups: %s %s %s %s", user,
283                      account, groups, replace)
284         if user != account:
285             raise NotAllowedError
286         self._lookup_account(account, True)
287         self._check_groups(groups)
288         if replace:
289             self.permissions.group_destroy(account)
290         for k, v in groups.iteritems():
291             if not replace:  # If not already deleted.
292                 self.permissions.group_delete(account, k)
293             if v:
294                 self.permissions.group_addmany(account, k, v)
295
296     @backend_method
297     def get_account_policy(self, user, account):
298         """Return a dictionary with the account policy."""
299
300         logger.debug("get_account_policy: %s %s", user, account)
301         if user != account:
302             if account not in self._allowed_accounts(user):
303                 raise NotAllowedError
304             return {}
305         path, node = self._lookup_account(account, True)
306         return self._get_policy(node)
307
308     @backend_method
309     def update_account_policy(self, user, account, policy, replace=False):
310         """Update the policy associated with the account."""
311
312         logger.debug("update_account_policy: %s %s %s %s", user,
313                      account, policy, replace)
314         if user != account:
315             raise NotAllowedError
316         path, node = self._lookup_account(account, True)
317         self._check_policy(policy)
318         self._put_policy(node, policy, replace)
319
320     @backend_method
321     def put_account(self, user, account, policy={}):
322         """Create a new account with the given name."""
323
324         logger.debug("put_account: %s %s %s", user, account, policy)
325         if user != account:
326             raise NotAllowedError
327         node = self.node.node_lookup(account)
328         if node is not None:
329             raise AccountExists('Account already exists')
330         if policy:
331             self._check_policy(policy)
332         node = self._put_path(user, self.ROOTNODE, account)
333         self._put_policy(node, policy, True)
334
335     @backend_method
336     def delete_account(self, user, account):
337         """Delete the account with the given name."""
338
339         logger.debug("delete_account: %s %s", user, account)
340         if user != account:
341             raise NotAllowedError
342         node = self.node.node_lookup(account)
343         if node is None:
344             return
345         if not self.node.node_remove(node):
346             raise AccountNotEmpty('Account is not empty')
347         self.permissions.group_destroy(account)
348
349     @backend_method
350     def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None, public=False):
351         """Return a list of containers existing under an account."""
352
353         logger.debug("list_containers: %s %s %s %s %s %s %s", user,
354                      account, marker, limit, shared, until, public)
355         if user != account:
356             if until or account not in self._allowed_accounts(user):
357                 raise NotAllowedError
358             allowed = self._allowed_containers(user, account)
359             start, limit = self._list_limits(allowed, marker, limit)
360             return allowed[start:start + limit]
361         if shared or public:
362             allowed = set()
363             if shared:
364                 allowed.update([x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)])
365             if public:
366                 allowed.update([x[0].split('/', 2)[1] for x in self.permissions.public_list(account)])
367             allowed = sorted(allowed)
368             start, limit = self._list_limits(allowed, marker, limit)
369             return allowed[start:start + limit]
370         node = self.node.node_lookup(account)
371         containers = [x[0] for x in self._list_object_properties(
372             node, account, '', '/', marker, limit, False, None, [], until)]
373         start, limit = self._list_limits(
374             [x[0] for x in containers], marker, limit)
375         return containers[start:start + limit]
376
377     @backend_method
378     def list_container_meta(self, user, account, container, domain, until=None):
379         """Return a list with all the container's object meta keys for the domain."""
380
381         logger.debug("list_container_meta: %s %s %s %s %s", user,
382                      account, container, domain, until)
383         allowed = []
384         if user != account:
385             if until:
386                 raise NotAllowedError
387             allowed = self.permissions.access_list_paths(
388                 user, '/'.join((account, container)))
389             if not allowed:
390                 raise NotAllowedError
391         path, node = self._lookup_container(account, container)
392         before = until if until is not None else inf
393         allowed = self._get_formatted_paths(allowed)
394         return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
395
396     @backend_method
397     def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
398         """Return a dictionary with the container metadata for the domain."""
399
400         logger.debug("get_container_meta: %s %s %s %s %s", user,
401                      account, container, domain, until)
402         if user != account:
403             if until or container not in self._allowed_containers(user, account):
404                 raise NotAllowedError
405         path, node = self._lookup_container(account, container)
406         props = self._get_properties(node, until)
407         mtime = props[self.MTIME]
408         count, bytes, tstamp = self._get_statistics(node, until)
409         tstamp = max(tstamp, mtime)
410         if until is None:
411             modified = tstamp
412         else:
413             modified = self._get_statistics(
414                 node)[2]  # Overall last modification.
415             modified = max(modified, mtime)
416
417         if user != account:
418             meta = {'name': container}
419         else:
420             meta = {}
421             if include_user_defined:
422                 meta.update(
423                     dict(self.node.attribute_get(props[self.SERIAL], domain)))
424             if until is not None:
425                 meta.update({'until_timestamp': tstamp})
426             meta.update({'name': container, 'count': count, 'bytes': bytes})
427         meta.update({'modified': modified})
428         return meta
429
430     @backend_method
431     def update_container_meta(self, user, account, container, domain, meta, replace=False):
432         """Update the metadata associated with the container for the domain."""
433
434         logger.debug("update_container_meta: %s %s %s %s %s %s",
435                      user, account, container, domain, meta, replace)
436         if user != account:
437             raise NotAllowedError
438         path, node = self._lookup_container(account, container)
439         src_version_id, dest_version_id = self._put_metadata(
440             user, node, domain, meta, replace)
441         if src_version_id is not None:
442             versioning = self._get_policy(node)['versioning']
443             if versioning != 'auto':
444                 self.node.version_remove(src_version_id)
445
446     @backend_method
447     def get_container_policy(self, user, account, container):
448         """Return a dictionary with the container policy."""
449
450         logger.debug(
451             "get_container_policy: %s %s %s", user, account, container)
452         if user != account:
453             if container not in self._allowed_containers(user, account):
454                 raise NotAllowedError
455             return {}
456         path, node = self._lookup_container(account, container)
457         return self._get_policy(node)
458
459     @backend_method
460     def update_container_policy(self, user, account, container, policy, replace=False):
461         """Update the policy associated with the container."""
462
463         logger.debug("update_container_policy: %s %s %s %s %s",
464                      user, account, container, policy, replace)
465         if user != account:
466             raise NotAllowedError
467         path, node = self._lookup_container(account, container)
468         self._check_policy(policy)
469         self._put_policy(node, policy, replace)
470
471     @backend_method
472     def put_container(self, user, account, container, policy={}):
473         """Create a new container with the given name."""
474
475         logger.debug(
476             "put_container: %s %s %s %s", user, account, container, policy)
477         if user != account:
478             raise NotAllowedError
479         try:
480             path, node = self._lookup_container(account, container)
481         except NameError:
482             pass
483         else:
484             raise ContainerExists('Container already exists')
485         if policy:
486             self._check_policy(policy)
487         path = '/'.join((account, container))
488         node = self._put_path(
489             user, self._lookup_account(account, True)[1], path)
490         self._put_policy(node, policy, True)
491
492     @backend_method
493     def delete_container(self, user, account, container, until=None, prefix='', delimiter=None):
494         """Delete/purge the container with the given name."""
495
496         logger.debug("delete_container: %s %s %s %s %s %s", user,
497                      account, container, until, prefix, delimiter)
498         if user != account:
499             raise NotAllowedError
500         path, node = self._lookup_container(account, container)
501
502         if until is not None:
503             hashes, size, serials = self.node.node_purge_children(
504                 node, until, CLUSTER_HISTORY)
505             for h in hashes:
506                 self.store.map_delete(h)
507             self.node.node_purge_children(node, until, CLUSTER_DELETED)
508             self._report_size_change(user, account, -size,
509                                      {'action':'container purge', 'path': path,
510                                       'versions': serials})
511             return
512
513         if not delimiter:
514             if self._get_statistics(node)[0] > 0:
515                 raise ContainerNotEmpty('Container is not empty')
516             hashes, size, serials = self.node.node_purge_children(
517                 node, inf, CLUSTER_HISTORY)
518             for h in hashes:
519                 self.store.map_delete(h)
520             self.node.node_purge_children(node, inf, CLUSTER_DELETED)
521             self.node.node_remove(node)
522             self._report_size_change(user, account, -size,
523                                      {'action': 'container delete',
524                                       'path': path,
525                                       'versions': serials})
526         else:
527                 # remove only contents
528             src_names = self._list_objects_no_limit(user, account, container, prefix='', delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
529             paths = []
530             for t in src_names:
531                 path = '/'.join((account, container, t[0]))
532                 node = t[2]
533                 src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
534                 del_size = self._apply_versioning(
535                     account, container, src_version_id)
536                 if del_size:
537                     self._report_size_change(user, account, -del_size,
538                                              {'action': 'object delete',
539                                               'path': path, 'versions': [dest_version_id]})
540                 self._report_object_change(
541                     user, account, path, details={'action': 'object delete'})
542                 paths.append(path)
543             self.permissions.access_clear_bulk(paths)
544
545     def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public):
546         if user != account and until:
547             raise NotAllowedError
548         if shared and public:
549             # get shared first
550             shared = self._list_object_permissions(
551                 user, account, container, prefix, shared=True, public=False)
552             objects = set()
553             if shared:
554                 path, node = self._lookup_container(account, container)
555                 shared = self._get_formatted_paths(shared)
556                 objects |= set(self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, shared, all_props))
557
558             # get public
559             objects |= set(self._list_public_object_properties(
560                 user, account, container, prefix, all_props))
561             objects = list(objects)
562
563             objects.sort(key=lambda x: x[0])
564             start, limit = self._list_limits(
565                 [x[0] for x in objects], marker, limit)
566             return objects[start:start + limit]
567         elif public:
568             objects = self._list_public_object_properties(
569                 user, account, container, prefix, all_props)
570             start, limit = self._list_limits(
571                 [x[0] for x in objects], marker, limit)
572             return objects[start:start + limit]
573
574         allowed = self._list_object_permissions(
575             user, account, container, prefix, shared, public)
576         if shared and not allowed:
577             return []
578         path, node = self._lookup_container(account, container)
579         allowed = self._get_formatted_paths(allowed)
580         objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
581         start, limit = self._list_limits(
582             [x[0] for x in objects], marker, limit)
583         return objects[start:start + limit]
584
585     def _list_public_object_properties(self, user, account, container, prefix, all_props):
586         public = self._list_object_permissions(
587             user, account, container, prefix, shared=False, public=True)
588         paths, nodes = self._lookup_objects(public)
589         path = '/'.join((account, container))
590         cont_prefix = path + '/'
591         paths = [x[len(cont_prefix):] for x in paths]
592         props = self.node.version_lookup_bulk(nodes, all_props=all_props)
593         objects = [(path,) + props for path, props in zip(paths, props)]
594         return objects
595
596     def _list_objects_no_limit(self, user, account, container, prefix, delimiter, virtual, domain, keys, shared, until, size_range, all_props, public):
597         objects = []
598         while True:
599             marker = objects[-1] if objects else None
600             limit = 10000
601             l = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public)
602             objects.extend(l)
603             if not l or len(l) < limit:
604                 break
605         return objects
606
607     def _list_object_permissions(self, user, account, container, prefix, shared, public):
608         allowed = []
609         path = '/'.join((account, container, prefix)).rstrip('/')
610         if user != account:
611             allowed = self.permissions.access_list_paths(user, path)
612             if not allowed:
613                 raise NotAllowedError
614         else:
615             allowed = set()
616             if shared:
617                 allowed.update(self.permissions.access_list_shared(path))
618             if public:
619                 allowed.update(
620                     [x[0] for x in self.permissions.public_list(path)])
621             allowed = sorted(allowed)
622             if not allowed:
623                 return []
624         return allowed
625
626     @backend_method
627     def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
628         """Return a list of object (name, version_id) tuples existing under a container."""
629
630         logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
631         return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False, public)
632
633     @backend_method
634     def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
635         """Return a list of object metadata dicts existing under a container."""
636
637         logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
638         props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True, public)
639         objects = []
640         for p in props:
641             if len(p) == 2:
642                 objects.append({'subdir': p[0]})
643             else:
644                 objects.append({'name': p[0],
645                                 'bytes': p[self.SIZE + 1],
646                                 'type': p[self.TYPE + 1],
647                                 'hash': p[self.HASH + 1],
648                                 'version': p[self.SERIAL + 1],
649                                 'version_timestamp': p[self.MTIME + 1],
650                                 'modified': p[self.MTIME + 1] if until is None else None,
651                                 'modified_by': p[self.MUSER + 1],
652                                 'uuid': p[self.UUID + 1],
653                                 'checksum': p[self.CHECKSUM + 1]})
654         return objects
655
656     @backend_method
657     def list_object_permissions(self, user, account, container, prefix=''):
658         """Return a list of paths that enforce permissions under a container."""
659
660         logger.debug("list_object_permissions: %s %s %s %s", user,
661                      account, container, prefix)
662         return self._list_object_permissions(user, account, container, prefix, True, False)
663
664     @backend_method
665     def list_object_public(self, user, account, container, prefix=''):
666         """Return a dict mapping paths to public ids for objects that are public under a container."""
667
668         logger.debug("list_object_public: %s %s %s %s", user,
669                      account, container, prefix)
670         public = {}
671         for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
672             public[path] = p + ULTIMATE_ANSWER
673         return public
674
675     @backend_method
676     def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
677         """Return a dictionary with the object metadata for the domain."""
678
679         logger.debug("get_object_meta: %s %s %s %s %s %s", user,
680                      account, container, name, domain, version)
681         self._can_read(user, account, container, name)
682         path, node = self._lookup_object(account, container, name)
683         props = self._get_version(node, version)
684         if version is None:
685             modified = props[self.MTIME]
686         else:
687             try:
688                 modified = self._get_version(
689                     node)[self.MTIME]  # Overall last modification.
690             except NameError:  # Object may be deleted.
691                 del_props = self.node.version_lookup(
692                     node, inf, CLUSTER_DELETED)
693                 if del_props is None:
694                     raise ItemNotExists('Object does not exist')
695                 modified = del_props[self.MTIME]
696
697         meta = {}
698         if include_user_defined:
699             meta.update(
700                 dict(self.node.attribute_get(props[self.SERIAL], domain)))
701         meta.update({'name': name,
702                      'bytes': props[self.SIZE],
703                      'type': props[self.TYPE],
704                      'hash': props[self.HASH],
705                      'version': props[self.SERIAL],
706                      'version_timestamp': props[self.MTIME],
707                      'modified': modified,
708                      'modified_by': props[self.MUSER],
709                      'uuid': props[self.UUID],
710                      'checksum': props[self.CHECKSUM]})
711         return meta
712
713     @backend_method
714     def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
715         """Update the metadata associated with the object for the domain and return the new version."""
716
717         logger.debug("update_object_meta: %s %s %s %s %s %s %s",
718                      user, account, container, name, domain, meta, replace)
719         self._can_write(user, account, container, name)
720         path, node = self._lookup_object(account, container, name)
721         src_version_id, dest_version_id = self._put_metadata(
722             user, node, domain, meta, replace)
723         self._apply_versioning(account, container, src_version_id)
724         return dest_version_id
725
726     @backend_method
727     def get_object_permissions(self, user, account, container, name):
728         """Return the action allowed on the object, the path
729         from which the object gets its permissions from,
730         along with a dictionary containing the permissions."""
731
732         logger.debug("get_object_permissions: %s %s %s %s", user,
733                      account, container, name)
734         allowed = 'write'
735         permissions_path = self._get_permissions_path(account, container, name)
736         if user != account:
737             if self.permissions.access_check(permissions_path, self.WRITE, user):
738                 allowed = 'write'
739             elif self.permissions.access_check(permissions_path, self.READ, user):
740                 allowed = 'read'
741             else:
742                 raise NotAllowedError
743         self._lookup_object(account, container, name)
744         return (allowed, permissions_path, self.permissions.access_get(permissions_path))
745
746     @backend_method
747     def update_object_permissions(self, user, account, container, name, permissions):
748         """Update the permissions associated with the object."""
749
750         logger.debug("update_object_permissions: %s %s %s %s %s",
751                      user, account, container, name, permissions)
752         if user != account:
753             raise NotAllowedError
754         path = self._lookup_object(account, container, name)[0]
755         self._check_permissions(path, permissions)
756         self.permissions.access_set(path, permissions)
757         self._report_sharing_change(user, account, path, {'members':
758                                     self.permissions.access_members(path)})
759
760     @backend_method
761     def get_object_public(self, user, account, container, name):
762         """Return the public id of the object if applicable."""
763
764         logger.debug(
765             "get_object_public: %s %s %s %s", user, account, container, name)
766         self._can_read(user, account, container, name)
767         path = self._lookup_object(account, container, name)[0]
768         p = self.permissions.public_get(path)
769         if p is not None:
770             p += ULTIMATE_ANSWER
771         return p
772
773     @backend_method
774     def update_object_public(self, user, account, container, name, public):
775         """Update the public status of the object."""
776
777         logger.debug("update_object_public: %s %s %s %s %s", user,
778                      account, container, name, public)
779         self._can_write(user, account, container, name)
780         path = self._lookup_object(account, container, name)[0]
781         if not public:
782             self.permissions.public_unset(path)
783         else:
784             self.permissions.public_set(path)
785
786     @backend_method
787     def get_object_hashmap(self, user, account, container, name, version=None):
788         """Return the object's size and a list with partial hashes."""
789
790         logger.debug("get_object_hashmap: %s %s %s %s %s", user,
791                      account, container, name, version)
792         self._can_read(user, account, container, name)
793         path, node = self._lookup_object(account, container, name)
794         props = self._get_version(node, version)
795         hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
796         return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
797
798     def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
799         if permissions is not None and user != account:
800             raise NotAllowedError
801         self._can_write(user, account, container, name)
802         if permissions is not None:
803             path = '/'.join((account, container, name))
804             self._check_permissions(path, permissions)
805
806         account_path, account_node = self._lookup_account(account, True)
807         container_path, container_node = self._lookup_container(
808             account, container)
809         path, node = self._put_object_node(
810             container_path, container_node, name)
811         pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
812
813         # Handle meta.
814         if src_version_id is None:
815             src_version_id = pre_version_id
816         self._put_metadata_duplicate(
817             src_version_id, dest_version_id, domain, meta, replace_meta)
818
819         # Check quota.
820         del_size = self._apply_versioning(account, container, pre_version_id)
821         size_delta = size - del_size
822         if size_delta > 0:
823             account_quota = long(self._get_policy(account_node)['quota'])
824             container_quota = long(self._get_policy(container_node)['quota'])
825             if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
826                (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
827                 # This must be executed in a transaction, so the version is never created if it fails.
828                 raise QuotaError
829         self._report_size_change(user, account, size_delta,
830                                  {'action': 'object update', 'path': path,
831                                   'versions': [dest_version_id]})
832
833         if permissions is not None:
834             self.permissions.access_set(path, permissions)
835             self._report_sharing_change(user, account, path, {'members': self.permissions.access_members(path)})
836
837         self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
838         return dest_version_id
839
840     @backend_method
841     def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
842         """Create/update an object with the specified size and partial hashes."""
843
844         logger.debug("update_object_hashmap: %s %s %s %s %s %s %s %s", user,
845                      account, container, name, size, type, hashmap, checksum)
846         if size == 0:  # No such thing as an empty hashmap.
847             hashmap = [self.put_block('')]
848         map = HashMap(self.block_size, self.hash_algorithm)
849         map.extend([binascii.unhexlify(x) for x in hashmap])
850         missing = self.store.block_search(map)
851         if missing:
852             ie = IndexError()
853             ie.data = [binascii.hexlify(x) for x in missing]
854             raise ie
855
856         hash = map.hash()
857         dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
858         self.store.map_put(hash, map)
859         return dest_version_id
860
861     @backend_method
862     def update_object_checksum(self, user, account, container, name, version, checksum):
863         """Update an object's checksum."""
864
865         logger.debug("update_object_checksum: %s %s %s %s %s %s",
866                      user, account, container, name, version, checksum)
867         # Update objects with greater version and same hashmap and size (fix metadata updates).
868         self._can_write(user, account, container, name)
869         path, node = self._lookup_object(account, container, name)
870         props = self._get_version(node, version)
871         versions = self.node.node_get_versions(node)
872         for x in versions:
873             if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
874                 self.node.version_put_property(
875                     x[self.SERIAL], 'checksum', checksum)
876
877     def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False, delimiter=None):
878         dest_version_ids = []
879         self._can_read(user, src_account, src_container, src_name)
880         path, node = self._lookup_object(src_account, src_container, src_name)
881         # TODO: Will do another fetch of the properties in duplicate version...
882         props = self._get_version(
883             node, src_version)  # Check to see if source exists.
884         src_version_id = props[self.SERIAL]
885         hash = props[self.HASH]
886         size = props[self.SIZE]
887         is_copy = not is_move and (src_account, src_container, src_name) != (
888             dest_account, dest_container, dest_name)  # New uuid.
889         dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
890         if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
891             self._delete_object(user, src_account, src_container, src_name)
892
893         if delimiter:
894             prefix = src_name + \
895                 delimiter if not src_name.endswith(delimiter) else src_name
896             src_names = self._list_objects_no_limit(user, src_account, src_container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
897             src_names.sort(key=lambda x: x[2])  # order by nodes
898             paths = [elem[0] for elem in src_names]
899             nodes = [elem[2] for elem in src_names]
900             # TODO: Will do another fetch of the properties in duplicate version...
901             props = self._get_versions(nodes)  # Check to see if source exists.
902
903             for prop, path, node in zip(props, paths, nodes):
904                 src_version_id = prop[self.SERIAL]
905                 hash = prop[self.HASH]
906                 vtype = prop[self.TYPE]
907                 size = prop[self.SIZE]
908                 dest_prefix = dest_name + delimiter if not dest_name.endswith(
909                     delimiter) else dest_name
910                 vdest_name = path.replace(prefix, dest_prefix, 1)
911                 dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, vdest_name, size, vtype, hash, None, dest_domain, meta={}, replace_meta=False, permissions=None, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
912                 if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
913                     self._delete_object(user, src_account, src_container, path)
914         return dest_version_ids[0] if len(dest_version_ids) == 1 else dest_version_ids
915
916     @backend_method
917     def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None, delimiter=None):
918         """Copy an object's data and metadata."""
919
920         logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, delimiter)
921         dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False, delimiter)
922         return dest_version_id
923
924     @backend_method
925     def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, delimiter=None):
926         """Move an object's data and metadata."""
927
928         logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, delimiter)
929         if user != src_account:
930             raise NotAllowedError
931         dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True, delimiter)
932         return dest_version_id
933
934     def _delete_object(self, user, account, container, name, until=None, delimiter=None):
935         if user != account:
936             raise NotAllowedError
937
938         if until is not None:
939             path = '/'.join((account, container, name))
940             node = self.node.node_lookup(path)
941             if node is None:
942                 return
943             hashes = []
944             size = 0
945             serials = []
946             h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL)
947             hashes += h
948             size += s
949             serials += v
950             h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY)
951             hashes += h
952             size += s
953             serials += v
954             for h in hashes:
955                 self.store.map_delete(h)
956             self.node.node_purge(node, until, CLUSTER_DELETED)
957             try:
958                 props = self._get_version(node)
959             except NameError:
960                 self.permissions.access_clear(path)
961             self._report_size_change(user, account, -size,
962                                     {'action': 'object purge', 'path': path,
963                                      'versions': serials})
964             return
965
966         path, node = self._lookup_object(account, container, name)
967         src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
968         del_size = self._apply_versioning(account, container, src_version_id)
969         if del_size:
970             self._report_size_change(user, account, -del_size,
971                                      {'action': 'object delete', 'path': path,
972                                       'versions': [dest_version_id]})
973         self._report_object_change(
974             user, account, path, details={'action': 'object delete'})
975         self.permissions.access_clear(path)
976
977         if delimiter:
978             prefix = name + delimiter if not name.endswith(delimiter) else name
979             src_names = self._list_objects_no_limit(user, account, container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
980             paths = []
981             for t in src_names:
982                 path = '/'.join((account, container, t[0]))
983                 node = t[2]
984                 src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
985                 del_size = self._apply_versioning(
986                     account, container, src_version_id)
987                 if del_size:
988                     self._report_size_change(user, account, -del_size,
989                                              {'action': 'object delete',
990                                               'path': path,
991                                               'versions': [dest_version_id]})
992                 self._report_object_change(
993                     user, account, path, details={'action': 'object delete'})
994                 paths.append(path)
995             self.permissions.access_clear_bulk(paths)
996
997     @backend_method
998     def delete_object(self, user, account, container, name, until=None, prefix='', delimiter=None):
999         """Delete/purge an object."""
1000
1001         logger.debug("delete_object: %s %s %s %s %s %s %s", user,
1002                      account, container, name, until, prefix, delimiter)
1003         self._delete_object(user, account, container, name, until, delimiter)
1004
1005     @backend_method
1006     def list_versions(self, user, account, container, name):
1007         """Return a list of all (version, version_timestamp) tuples for an object."""
1008
1009         logger.debug(
1010             "list_versions: %s %s %s %s", user, account, container, name)
1011         self._can_read(user, account, container, name)
1012         path, node = self._lookup_object(account, container, name)
1013         versions = self.node.node_get_versions(node)
1014         return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
1015
1016     @backend_method
1017     def get_uuid(self, user, uuid):
1018         """Return the (account, container, name) for the UUID given."""
1019
1020         logger.debug("get_uuid: %s %s", user, uuid)
1021         info = self.node.latest_uuid(uuid)
1022         if info is None:
1023             raise NameError
1024         path, serial = info
1025         account, container, name = path.split('/', 2)
1026         self._can_read(user, account, container, name)
1027         return (account, container, name)
1028
1029     @backend_method
1030     def get_public(self, user, public):
1031         """Return the (account, container, name) for the public id given."""
1032
1033         logger.debug("get_public: %s %s", user, public)
1034         if public is None or public < ULTIMATE_ANSWER:
1035             raise NameError
1036         path = self.permissions.public_path(public - ULTIMATE_ANSWER)
1037         if path is None:
1038             raise NameError
1039         account, container, name = path.split('/', 2)
1040         self._can_read(user, account, container, name)
1041         return (account, container, name)
1042
1043     @backend_method(autocommit=0)
1044     def get_block(self, hash):
1045         """Return a block's data."""
1046
1047         logger.debug("get_block: %s", hash)
1048         block = self.store.block_get(binascii.unhexlify(hash))
1049         if not block:
1050             raise ItemNotExists('Block does not exist')
1051         return block
1052
1053     @backend_method(autocommit=0)
1054     def put_block(self, data):
1055         """Store a block and return the hash."""
1056
1057         logger.debug("put_block: %s", len(data))
1058         return binascii.hexlify(self.store.block_put(data))
1059
1060     @backend_method(autocommit=0)
1061     def update_block(self, hash, data, offset=0):
1062         """Update a known block and return the hash."""
1063
1064         logger.debug("update_block: %s %s %s", hash, len(data), offset)
1065         if offset == 0 and len(data) == self.block_size:
1066             return self.put_block(data)
1067         h = self.store.block_update(binascii.unhexlify(hash), offset, data)
1068         return binascii.hexlify(h)
1069
1070     # Path functions.
1071
1072     def _generate_uuid(self):
1073         return str(uuidlib.uuid4())
1074
1075     def _put_object_node(self, path, parent, name):
1076         path = '/'.join((path, name))
1077         node = self.node.node_lookup(path)
1078         if node is None:
1079             node = self.node.node_create(parent, path)
1080         return path, node
1081
1082     def _put_path(self, user, parent, path):
1083         node = self.node.node_create(parent, path)
1084         self.node.version_create(node, None, 0, '', None, user,
1085                                  self._generate_uuid(), '', CLUSTER_NORMAL)
1086         return node
1087
1088     def _lookup_account(self, account, create=True):
1089         node = self.node.node_lookup(account)
1090         if node is None and create:
1091             node = self._put_path(
1092                 account, self.ROOTNODE, account)  # User is account.
1093         return account, node
1094
1095     def _lookup_container(self, account, container):
1096         path = '/'.join((account, container))
1097         node = self.node.node_lookup(path)
1098         if node is None:
1099             raise ItemNotExists('Container does not exist')
1100         return path, node
1101
1102     def _lookup_object(self, account, container, name):
1103         path = '/'.join((account, container, name))
1104         node = self.node.node_lookup(path)
1105         if node is None:
1106             raise ItemNotExists('Object does not exist')
1107         return path, node
1108
1109     def _lookup_objects(self, paths):
1110         nodes = self.node.node_lookup_bulk(paths)
1111         return paths, nodes
1112
1113     def _get_properties(self, node, until=None):
1114         """Return properties until the timestamp given."""
1115
1116         before = until if until is not None else inf
1117         props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1118         if props is None and until is not None:
1119             props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1120         if props is None:
1121             raise ItemNotExists('Path does not exist')
1122         return props
1123
1124     def _get_statistics(self, node, until=None):
1125         """Return count, sum of size and latest timestamp of everything under node."""
1126
1127         if until is None:
1128             stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1129         else:
1130             stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1131         if stats is None:
1132             stats = (0, 0, 0)
1133         return stats
1134
1135     def _get_version(self, node, version=None):
1136         if version is None:
1137             props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1138             if props is None:
1139                 raise ItemNotExists('Object does not exist')
1140         else:
1141             try:
1142                 version = int(version)
1143             except ValueError:
1144                 raise VersionNotExists('Version does not exist')
1145             props = self.node.version_get_properties(version)
1146             if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1147                 raise VersionNotExists('Version does not exist')
1148         return props
1149
1150     def _get_versions(self, nodes):
1151         return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1152
1153     def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
1154         """Create a new version of the node."""
1155
1156         props = self.node.version_lookup(
1157             node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1158         if props is not None:
1159             src_version_id = props[self.SERIAL]
1160             src_hash = props[self.HASH]
1161             src_size = props[self.SIZE]
1162             src_type = props[self.TYPE]
1163             src_checksum = props[self.CHECKSUM]
1164         else:
1165             src_version_id = None
1166             src_hash = None
1167             src_size = 0
1168             src_type = ''
1169             src_checksum = ''
1170         if size is None:  # Set metadata.
1171             hash = src_hash  # This way hash can be set to None (account or container).
1172             size = src_size
1173         if type is None:
1174             type = src_type
1175         if checksum is None:
1176             checksum = src_checksum
1177         uuid = self._generate_uuid(
1178         ) if (is_copy or src_version_id is None) else props[self.UUID]
1179
1180         if src_node is None:
1181             pre_version_id = src_version_id
1182         else:
1183             pre_version_id = None
1184             props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1185             if props is not None:
1186                 pre_version_id = props[self.SERIAL]
1187         if pre_version_id is not None:
1188             self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
1189
1190         dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
1191         return pre_version_id, dest_version_id
1192
1193     def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
1194         if src_version_id is not None:
1195             self.node.attribute_copy(src_version_id, dest_version_id)
1196         if not replace:
1197             self.node.attribute_del(dest_version_id, domain, (
1198                 k for k, v in meta.iteritems() if v == ''))
1199             self.node.attribute_set(dest_version_id, domain, (
1200                 (k, v) for k, v in meta.iteritems() if v != ''))
1201         else:
1202             self.node.attribute_del(dest_version_id, domain)
1203             self.node.attribute_set(dest_version_id, domain, ((
1204                 k, v) for k, v in meta.iteritems()))
1205
1206     def _put_metadata(self, user, node, domain, meta, replace=False):
1207         """Create a new version and store metadata."""
1208
1209         src_version_id, dest_version_id = self._put_version_duplicate(
1210             user, node)
1211         self._put_metadata_duplicate(
1212             src_version_id, dest_version_id, domain, meta, replace)
1213         return src_version_id, dest_version_id
1214
1215     def _list_limits(self, listing, marker, limit):
1216         start = 0
1217         if marker:
1218             try:
1219                 start = listing.index(marker) + 1
1220             except ValueError:
1221                 pass
1222         if not limit or limit > 10000:
1223             limit = 10000
1224         return start, limit
1225
1226     def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[], all_props=False):
1227         cont_prefix = path + '/'
1228         prefix = cont_prefix + prefix
1229         start = cont_prefix + marker if marker else None
1230         before = until if until is not None else inf
1231         filterq = keys if domain else []
1232         sizeq = size_range
1233
1234         objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
1235         objects.extend([(p, None) for p in prefixes] if virtual else [])
1236         objects.sort(key=lambda x: x[0])
1237         objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1238         return objects
1239
1240     # Reporting functions.
1241
1242     def _report_size_change(self, user, account, size, details={}):
1243         account_node = self._lookup_account(account, True)[1]
1244         total = self._get_statistics(account_node)[1]
1245         details.update({'user': user, 'total': total})
1246         logger.debug(
1247             "_report_size_change: %s %s %s %s", user, account, size, details)
1248         self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',), 
1249                               account, QUEUE_INSTANCE_ID, 'diskspace',
1250                               float(size), details))
1251
1252         serial = self.quotaholder.issue_commission(
1253                 context     =   {},
1254                 target      =   account,
1255                 key         =   '1',
1256                 clientkey   =   'pithos',
1257                 ownerkey    =   '',
1258                 provisions  =   (('pithos+', 'pithos+ : diskspace', size),)
1259         )
1260         self.serials.append(serial)
1261
1262     def _report_object_change(self, user, account, path, details={}):
1263         details.update({'user': user})
1264         logger.debug("_report_object_change: %s %s %s %s", user,
1265                      account, path, details)
1266         self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1267                                                   account, QUEUE_INSTANCE_ID, 'object', path, details))
1268
1269     def _report_sharing_change(self, user, account, path, details={}):
1270         logger.debug("_report_permissions_change: %s %s %s %s",
1271                      user, account, path, details)
1272         details.update({'user': user})
1273         self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1274                                                   account, QUEUE_INSTANCE_ID, 'sharing', path, details))
1275
1276     # Policy functions.
1277
1278     def _check_policy(self, policy):
1279         for k in policy.keys():
1280             if policy[k] == '':
1281                 policy[k] = self.default_policy.get(k)
1282         for k, v in policy.iteritems():
1283             if k == 'quota':
1284                 q = int(v)  # May raise ValueError.
1285                 if q < 0:
1286                     raise ValueError
1287             elif k == 'versioning':
1288                 if v not in ['auto', 'none']:
1289                     raise ValueError
1290             else:
1291                 raise ValueError
1292
1293     def _put_policy(self, node, policy, replace):
1294         if replace:
1295             for k, v in self.default_policy.iteritems():
1296                 if k not in policy:
1297                     policy[k] = v
1298         self.node.policy_set(node, policy)
1299
1300     def _get_policy(self, node):
1301         policy = self.default_policy.copy()
1302         policy.update(self.node.policy_get(node))
1303         return policy
1304
1305     def _apply_versioning(self, account, container, version_id):
1306         """Delete the provided version if such is the policy.
1307            Return size of object removed.
1308         """
1309
1310         if version_id is None:
1311             return 0
1312         path, node = self._lookup_container(account, container)
1313         versioning = self._get_policy(node)['versioning']
1314         if versioning != 'auto':
1315             hash, size = self.node.version_remove(version_id)
1316             self.store.map_delete(hash)
1317             return size
1318         return 0
1319
1320     # Access control functions.
1321
1322     def _check_groups(self, groups):
1323         # raise ValueError('Bad characters in groups')
1324         pass
1325
1326     def _check_permissions(self, path, permissions):
1327         # raise ValueError('Bad characters in permissions')
1328         pass
1329
1330     def _get_formatted_paths(self, paths):
1331         formatted = []
1332         for p in paths:
1333             node = self.node.node_lookup(p)
1334             if node is not None:
1335                 props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1336             if props is not None:
1337                 if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1338                     formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1339                 formatted.append((p, self.MATCH_EXACT))
1340         return formatted
1341
1342     def _get_permissions_path(self, account, container, name):
1343         path = '/'.join((account, container, name))
1344         permission_paths = self.permissions.access_inherit(path)
1345         permission_paths.sort()
1346         permission_paths.reverse()
1347         for p in permission_paths:
1348             if p == path:
1349                 return p
1350             else:
1351                 if p.count('/') < 2:
1352                     continue
1353                 node = self.node.node_lookup(p)
1354                 if node is not None:
1355                     props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1356                 if props is not None:
1357                     if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1358                         return p
1359         return None
1360
1361     def _can_read(self, user, account, container, name):
1362         if user == account:
1363             return True
1364         path = '/'.join((account, container, name))
1365         if self.permissions.public_get(path) is not None:
1366             return True
1367         path = self._get_permissions_path(account, container, name)
1368         if not path:
1369             raise NotAllowedError
1370         if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1371             raise NotAllowedError
1372
1373     def _can_write(self, user, account, container, name):
1374         if user == account:
1375             return True
1376         path = '/'.join((account, container, name))
1377         path = self._get_permissions_path(account, container, name)
1378         if not path:
1379             raise NotAllowedError
1380         if not self.permissions.access_check(path, self.WRITE, user):
1381             raise NotAllowedError
1382
1383     def _allowed_accounts(self, user):
1384         allow = set()
1385         for path in self.permissions.access_list_paths(user):
1386             allow.add(path.split('/', 1)[0])
1387         return sorted(allow)
1388
1389     def _allowed_containers(self, user, account):
1390         allow = set()
1391         for path in self.permissions.access_list_paths(user, account):
1392             allow.add(path.split('/', 2)[1])
1393         return sorted(allow)