Statistics
| Branch: | Tag: | Revision:

root / pithos / backends / modular.py @ 0f9d752c

History | View | Annotate | Download (36.6 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
# 
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
# 
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
# 
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
# 
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
# 
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import os
35
import time
36
import sqlite3
37
import logging
38
import hashlib
39
import binascii
40

    
41
from base import NotAllowedError, BaseBackend
42
from lib.permissions import Permissions, READ, WRITE
43
from lib.policy import Policy
44
from lib.hashfiler import Mapper, Blocker
45

    
46

    
47
logger = logging.getLogger(__name__)
48

    
49
def backend_method(func=None, autocommit=1):
50
    if func is None:
51
        def fn(func):
52
            return backend_method(func, autocommit)
53
        return fn
54

    
55
    if not autocommit:
56
        return func
57
    def fn(self, *args, **kw):
58
        self.con.execute('begin deferred')
59
        try:
60
            ret = func(self, *args, **kw)
61
            self.con.commit()
62
            return ret
63
        except:
64
            self.con.rollback()
65
            raise
66
    return fn
67

    
68

    
69
class ModularBackend(BaseBackend):
70
    """A modular backend.
71
    
72
    Uses SQLite for storage.
73
    """
74
    
75
    # TODO: Create account if not present in all functions.
76
    
77
    def __init__(self, db):
78
        self.hash_algorithm = 'sha256'
79
        self.block_size = 4 * 1024 * 1024 # 4MB
80
        
81
        self.default_policy = {'quota': 0, 'versioning': 'auto'}
82
        
83
        basepath = os.path.split(db)[0]
84
        if basepath and not os.path.exists(basepath):
85
            os.makedirs(basepath)
86
        if not os.path.isdir(basepath):
87
            raise RuntimeError("Cannot open database at '%s'" % (db,))
88
        
89
        self.con = sqlite3.connect(basepath + '/db', check_same_thread=False)
90
        
91
        sql = '''pragma foreign_keys = on'''
92
        self.con.execute(sql)
93
        
94
        sql = '''create table if not exists versions (
95
                    version_id integer primary key,
96
                    name text,
97
                    user text,
98
                    tstamp integer not null,
99
                    size integer default 0,
100
                    hide integer default 0)'''
101
        self.con.execute(sql)
102
        sql = '''create table if not exists metadata (
103
                    version_id integer,
104
                    key text,
105
                    value text,
106
                    primary key (version_id, key)
107
                    foreign key (version_id) references versions(version_id)
108
                    on delete cascade)'''
109
        self.con.execute(sql)
110
        
111
        self.con.commit()
112
        
113
        params = {'blocksize': self.block_size,
114
                  'blockpath': basepath + '/blocks',
115
                  'hashtype': self.hash_algorithm}
116
        self.blocker = Blocker(**params)
117
        
118
        params = {'mappath': basepath + '/maps',
119
                  'namelen': self.blocker.hashlen}
120
        self.mapper = Mapper(**params)
121
        
122
        params = {'connection': self.con,
123
                  'cursor': self.con.cursor()}
124
        self.permissions = Permissions(**params)
125
        self.policy = Policy(**params)
126
    
127
    @backend_method
128
    def list_accounts(self, user, marker=None, limit=10000):
129
        """Return a list of accounts the user can access."""
130
        
131
        allowed = self._allowed_accounts(user)
132
        start, limit = self._list_limits(allowed, marker, limit)
133
        return allowed[start:start + limit]
134
    
135
    @backend_method
136
    def get_account_meta(self, user, account, until=None):
137
        """Return a dictionary with the account metadata."""
138
        
139
        logger.debug("get_account_meta: %s %s", account, until)
140
        if user != account:
141
            if until or account not in self._allowed_accounts(user):
142
                raise NotAllowedError
143
        else:
144
            self._create_account(user, account)
145
        try:
146
            version_id, mtime = self._get_accountinfo(account, until)
147
        except NameError:
148
            # Account does not exist before until.
149
            version_id = None
150
            mtime = until
151
        count, bytes, tstamp = self._get_pathstats(account, until)
152
        if mtime > tstamp:
153
            tstamp = mtime
154
        if until is None:
155
            modified = tstamp
156
        else:
157
            modified = self._get_pathstats(account)[2] # Overall last modification
158
            if mtime > modified:
159
                modified = mtime
160
        
161
        # Proper count.
162
        sql = 'select count(name) from (%s) where name glob ? and not name glob ?'
163
        sql = sql % self._sql_until(until)
164
        c = self.con.execute(sql, (account + '/*', account + '/*/*'))
165
        row = c.fetchone()
166
        count = row[0]
167
        
168
        if user != account:
169
            meta = {'name': account}
170
        else:
171
            meta = self._get_metadata(account, version_id)
172
            meta.update({'name': account, 'count': count, 'bytes': bytes})
173
            if until is not None:
174
                meta.update({'until_timestamp': tstamp})
175
        meta.update({'modified': modified})
176
        return meta
177
    
178
    @backend_method
179
    def update_account_meta(self, user, account, meta, replace=False):
180
        """Update the metadata associated with the account."""
181
        
182
        logger.debug("update_account_meta: %s %s %s", account, meta, replace)
183
        if user != account:
184
            raise NotAllowedError
185
        self._put_metadata(user, account, meta, replace, False)
186
    
187
    @backend_method
188
    def get_account_groups(self, user, account):
189
        """Return a dictionary with the user groups defined for this account."""
190
        
191
        logger.debug("get_account_groups: %s", account)
192
        if user != account:
193
            if account not in self._allowed_accounts(user):
194
                raise NotAllowedError
195
            return {}
196
        self._create_account(user, account)
197
        return self.permissions.group_dict(account)
198
    
199
    @backend_method
200
    def update_account_groups(self, user, account, groups, replace=False):
201
        """Update the groups associated with the account."""
202
        
203
        logger.debug("update_account_groups: %s %s %s", account, groups, replace)
204
        if user != account:
205
            raise NotAllowedError
206
        self._create_account(user, account)
207
        self._check_groups(groups)
208
        if replace:
209
            self.permissions.group_destroy(account)
210
        for k, v in groups.iteritems():
211
            if not replace: # If not already deleted.
212
                self.permissions.group_delete(account, k)
213
            if v:
214
                self.permissions.group_addmany(account, k, v)
215
    
216
    @backend_method
217
    def put_account(self, user, account):
218
        """Create a new account with the given name."""
219
        
220
        logger.debug("put_account: %s", account)
221
        if user != account:
222
            raise NotAllowedError
223
        try:
224
            version_id, mtime = self._get_accountinfo(account)
225
        except NameError:
226
            pass
227
        else:
228
            raise NameError('Account already exists')
229
        self._put_version(account, user)
230
    
231
    @backend_method
232
    def delete_account(self, user, account):
233
        """Delete the account with the given name."""
234
        
235
        logger.debug("delete_account: %s", account)
236
        if user != account:
237
            raise NotAllowedError
238
        count = self._get_pathstats(account)[0]
239
        if count > 0:
240
            raise IndexError('Account is not empty')
241
        sql = 'delete from versions where name = ?'
242
        self.con.execute(sql, (account,))
243
        self.permissions.group_destroy(account)
244
    
245
    @backend_method
246
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None):
247
        """Return a list of containers existing under an account."""
248
        
249
        logger.debug("list_containers: %s %s %s %s", account, marker, limit, until)
250
        if user != account:
251
            if until or account not in self._allowed_accounts(user):
252
                raise NotAllowedError
253
            allowed = self._allowed_containers(user, account)
254
            start, limit = self._list_limits(allowed, marker, limit)
255
            return allowed[start:start + limit]
256
        else:
257
            if shared:
258
                allowed = [x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)]
259
                start, limit = self._list_limits(allowed, marker, limit)
260
                return allowed[start:start + limit]
261
        return [x[0] for x in self._list_objects(account, '', '/', marker, limit, False, [], until)]
262
    
263
    @backend_method
264
    def get_container_meta(self, user, account, container, until=None):
265
        """Return a dictionary with the container metadata."""
266
        
267
        logger.debug("get_container_meta: %s %s %s", account, container, until)
268
        if user != account:
269
            if until or container not in self._allowed_containers(user, account):
270
                raise NotAllowedError
271
        path, version_id, mtime = self._get_containerinfo(account, container, until)
272
        count, bytes, tstamp = self._get_pathstats(path, until)
273
        if mtime > tstamp:
274
            tstamp = mtime
275
        if until is None:
276
            modified = tstamp
277
        else:
278
            modified = self._get_pathstats(path)[2] # Overall last modification
279
            if mtime > modified:
280
                modified = mtime
281
        
282
        if user != account:
283
            meta = {'name': container, 'modified': modified}
284
        else:
285
            meta = self._get_metadata(path, version_id)
286
            meta.update({'name': container, 'count': count, 'bytes': bytes, 'modified': modified})
287
            if until is not None:
288
                meta.update({'until_timestamp': tstamp})
289
        return meta
290
    
291
    @backend_method
292
    def update_container_meta(self, user, account, container, meta, replace=False):
293
        """Update the metadata associated with the container."""
294
        
295
        logger.debug("update_container_meta: %s %s %s %s", account, container, meta, replace)
296
        if user != account:
297
            raise NotAllowedError
298
        path, version_id, mtime = self._get_containerinfo(account, container)
299
        self._put_metadata(user, path, meta, replace, False)
300
    
301
    @backend_method
302
    def get_container_policy(self, user, account, container):
303
        """Return a dictionary with the container policy."""
304
        
305
        logger.debug("get_container_policy: %s %s", account, container)
306
        if user != account:
307
            if container not in self._allowed_containers(user, account):
308
                raise NotAllowedError
309
            return {}
310
        path = self._get_containerinfo(account, container)[0]
311
        return self._get_policy(path)
312
    
313
    @backend_method
314
    def update_container_policy(self, user, account, container, policy, replace=False):
315
        """Update the policy associated with the account."""
316
        
317
        logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace)
318
        if user != account:
319
            raise NotAllowedError
320
        path = self._get_containerinfo(account, container)[0]
321
        self._check_policy(policy)
322
        if replace:
323
            for k, v in self.default_policy.iteritems():
324
                if k not in policy:
325
                    policy[k] = v
326
        self.policy.policy_set(path, policy)
327
    
328
    @backend_method
329
    def put_container(self, user, account, container, policy=None):
330
        """Create a new container with the given name."""
331
        
332
        logger.debug("put_container: %s %s %s", account, container, policy)
333
        if user != account:
334
            raise NotAllowedError
335
        try:
336
            path, version_id, mtime = self._get_containerinfo(account, container)
337
        except NameError:
338
            pass
339
        else:
340
            raise NameError('Container already exists')
341
        if policy:
342
            self._check_policy(policy)
343
        path = '/'.join((account, container))
344
        version_id = self._put_version(path, user)[0]
345
        for k, v in self.default_policy.iteritems():
346
            if k not in policy:
347
                policy[k] = v
348
        self.policy.policy_set(path, policy)
349
    
350
    @backend_method
351
    def delete_container(self, user, account, container, until=None):
352
        """Delete/purge the container with the given name."""
353
        
354
        logger.debug("delete_container: %s %s %s", account, container, until)
355
        if user != account:
356
            raise NotAllowedError
357
        path, version_id, mtime = self._get_containerinfo(account, container)
358
        
359
        if until is not None:
360
            sql = '''select version_id from versions where name like ? and tstamp <= ?
361
                        and version_id not in (select version_id from (%s))'''
362
            sql = sql % self._sql_until() # Do not delete current versions.
363
            c = self.con.execute(sql, (path + '/%', until))
364
            for v in [x[0] for x in c.fetchall()]:
365
                self._del_version(v)
366
            return
367
        
368
        count = self._get_pathstats(path)[0]
369
        if count > 0:
370
            raise IndexError('Container is not empty')
371
        sql = 'delete from versions where name = ? or name like ?' # May contain hidden items.
372
        self.con.execute(sql, (path, path + '/%',))
373
        self.policy.policy_unset(path)
374
        self._copy_version(user, account, account, True, False) # New account version (for timestamp update).
375
    
376
    @backend_method
377
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], shared=False, until=None):
378
        """Return a list of objects existing under a container."""
379
        
380
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, keys, shared, until)
381
        allowed = []
382
        if user != account:
383
            if until:
384
                raise NotAllowedError
385
            allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
386
            if not allowed:
387
                raise NotAllowedError
388
        else:
389
            if shared:
390
                allowed = self.permissions.access_list_shared('/'.join((account, container)))
391
        path, version_id, mtime = self._get_containerinfo(account, container, until)
392
        return self._list_objects(path, prefix, delimiter, marker, limit, virtual, keys, until, allowed)
393
    
394
    @backend_method
395
    def list_object_meta(self, user, account, container, until=None):
396
        """Return a list with all the container's object meta keys."""
397
        
398
        logger.debug("list_object_meta: %s %s %s", account, container, until)
399
        allowed = []
400
        if user != account:
401
            if until:
402
                raise NotAllowedError
403
            allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
404
            if not allowed:
405
                raise NotAllowedError
406
        path, version_id, mtime = self._get_containerinfo(account, container, until)
407
        sql = '''select distinct m.key from (%s) o, metadata m
408
                    where m.version_id = o.version_id and o.name like ?'''
409
        sql = sql % self._sql_until(until)
410
        param = (path + '/%',)
411
        if allowed:
412
            for x in allowed:
413
                sql += ' and o.name like ?'
414
                param += (x,)
415
        c = self.con.execute(sql, param)
416
        return [x[0] for x in c.fetchall()]
417
    
418
    @backend_method
419
    def get_object_meta(self, user, account, container, name, version=None):
420
        """Return a dictionary with the object metadata."""
421
        
422
        logger.debug("get_object_meta: %s %s %s %s", account, container, name, version)
423
        self._can_read(user, account, container, name)
424
        path, version_id, muser, mtime, size = self._get_objectinfo(account, container, name, version)
425
        if version is None:
426
            modified = mtime
427
        else:
428
            modified = self._get_version(path, version)[2] # Overall last modification
429
        
430
        meta = self._get_metadata(path, version_id)
431
        meta.update({'name': name, 'bytes': size})
432
        meta.update({'version': version_id, 'version_timestamp': mtime})
433
        meta.update({'modified': modified, 'modified_by': muser})
434
        return meta
435
    
436
    @backend_method
437
    def update_object_meta(self, user, account, container, name, meta, replace=False):
438
        """Update the metadata associated with the object."""
439
        
440
        logger.debug("update_object_meta: %s %s %s %s %s", account, container, name, meta, replace)
441
        self._can_write(user, account, container, name)
442
        path, version_id, muser, mtime, size = self._get_objectinfo(account, container, name)
443
        self._put_metadata(user, path, meta, replace)
444
    
445
    @backend_method
446
    def get_object_permissions(self, user, account, container, name):
447
        """Return the path from which this object gets its permissions from,\
448
        along with a dictionary containing the permissions."""
449
        
450
        logger.debug("get_object_permissions: %s %s %s", account, container, name)
451
        self._can_read(user, account, container, name)
452
        path = self._get_objectinfo(account, container, name)[0]
453
        return self.permissions.access_inherit(path)
454
    
455
    @backend_method
456
    def update_object_permissions(self, user, account, container, name, permissions):
457
        """Update the permissions associated with the object."""
458
        
459
        logger.debug("update_object_permissions: %s %s %s %s", account, container, name, permissions)
460
        if user != account:
461
            raise NotAllowedError
462
        path = self._get_objectinfo(account, container, name)[0]
463
        self._check_permissions(path, permissions)
464
        self.permissions.access_set(path, permissions)
465
    
466
    @backend_method
467
    def get_object_public(self, user, account, container, name):
468
        """Return the public URL of the object if applicable."""
469
        
470
        logger.debug("get_object_public: %s %s %s", account, container, name)
471
        self._can_read(user, account, container, name)
472
        path = self._get_objectinfo(account, container, name)[0]
473
        if self.permissions.public_check(path):
474
            return '/public/' + path
475
        return None
476
    
477
    @backend_method
478
    def update_object_public(self, user, account, container, name, public):
479
        """Update the public status of the object."""
480
        
481
        logger.debug("update_object_public: %s %s %s %s", account, container, name, public)
482
        self._can_write(user, account, container, name)
483
        path = self._get_objectinfo(account, container, name)[0]
484
        if not public:
485
            self.permissions.public_unset(path)
486
        else:
487
            self.permissions.public_set(path)
488
    
489
    @backend_method
490
    def get_object_hashmap(self, user, account, container, name, version=None):
491
        """Return the object's size and a list with partial hashes."""
492
        
493
        logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
494
        self._can_read(user, account, container, name)
495
        path, version_id, muser, mtime, size = self._get_objectinfo(account, container, name, version)
496
        hashmap = self.mapper.map_retr(version_id)
497
        return size, [binascii.hexlify(x) for x in hashmap]
498
    
499
    @backend_method
500
    def update_object_hashmap(self, user, account, container, name, size, hashmap, meta={}, replace_meta=False, permissions=None):
501
        """Create/update an object with the specified size and partial hashes."""
502
        
503
        logger.debug("update_object_hashmap: %s %s %s %s %s", account, container, name, size, hashmap)
504
        if permissions is not None and user != account:
505
            raise NotAllowedError
506
        self._can_write(user, account, container, name)
507
        missing = self.blocker.block_ping([binascii.unhexlify(x) for x in hashmap])
508
        if missing:
509
            ie = IndexError()
510
            ie.data = missing
511
            raise ie
512
        path = self._get_containerinfo(account, container)[0]
513
        path = '/'.join((path, name))
514
        if permissions is not None:
515
            self._check_permissions(path, permissions)
516
        src_version_id, dest_version_id = self._copy_version(user, path, path, not replace_meta, False)
517
        sql = 'update versions set size = ? where version_id = ?'
518
        self.con.execute(sql, (size, dest_version_id))
519
        self.mapper.map_stor(dest_version_id, [binascii.unhexlify(x) for x in hashmap])
520
        for k, v in meta.iteritems():
521
            sql = 'insert or replace into metadata (version_id, key, value) values (?, ?, ?)'
522
            self.con.execute(sql, (dest_version_id, k, v))
523
        if permissions is not None:
524
            self.permissions.access_set(path, permissions)
525
    
526
    @backend_method
527
    def copy_object(self, user, account, src_container, src_name, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None):
528
        """Copy an object's data and metadata."""
529
        
530
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s", account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version)
531
        if permissions is not None and user != account:
532
            raise NotAllowedError
533
        self._can_read(user, account, src_container, src_name)
534
        self._can_write(user, account, dest_container, dest_name)
535
        self._get_containerinfo(account, src_container)
536
        if src_version is None:
537
            src_path = self._get_objectinfo(account, src_container, src_name)[0]
538
        else:
539
            src_path = '/'.join((account, src_container, src_name))
540
        dest_path = self._get_containerinfo(account, dest_container)[0]
541
        dest_path = '/'.join((dest_path, dest_name))
542
        if permissions is not None:
543
            self._check_permissions(dest_path, permissions)
544
        src_version_id, dest_version_id = self._copy_version(user, src_path, dest_path, not replace_meta, True, src_version)
545
        for k, v in dest_meta.iteritems():
546
            sql = 'insert or replace into metadata (version_id, key, value) values (?, ?, ?)'
547
            self.con.execute(sql, (dest_version_id, k, v))
548
        if permissions is not None:
549
            self.permissions.access_set(dest_path, permissions)
550
    
551
    @backend_method
552
    def move_object(self, user, account, src_container, src_name, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None):
553
        """Move an object's data and metadata."""
554
        
555
        logger.debug("move_object: %s %s %s %s %s %s %s %s", account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions)
556
        self.copy_object(user, account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions, None)
557
        self.delete_object(user, account, src_container, src_name)
558
    
559
    @backend_method
560
    def delete_object(self, user, account, container, name, until=None):
561
        """Delete/purge an object."""
562
        
563
        logger.debug("delete_object: %s %s %s %s", account, container, name, until)
564
        if user != account:
565
            raise NotAllowedError
566
        
567
        if until is not None:
568
            path = '/'.join((account, container, name))
569
            sql = '''select version_id from versions where name = ? and tstamp <= ?'''
570
            c = self.con.execute(sql, (path, until))
571
            for v in [x[0] in c.fetchall()]:
572
                self._del_version(v)
573
            try:
574
                version_id = self._get_version(path)[0]
575
            except NameError:
576
                pass
577
            else:
578
                self.permissions.access_clear(path)
579
            return
580
        
581
        path = self._get_objectinfo(account, container, name)[0]
582
        self._put_version(path, user, 0, 1)
583
        self.permissions.access_clear(path)
584
    
585
    @backend_method
586
    def list_versions(self, user, account, container, name):
587
        """Return a list of all (version, version_timestamp) tuples for an object."""
588
        
589
        logger.debug("list_versions: %s %s %s", account, container, name)
590
        self._can_read(user, account, container, name)
591
        # This will even show deleted versions.
592
        path = '/'.join((account, container, name))
593
        sql = '''select distinct version_id, tstamp from versions where name = ? and hide = 0'''
594
        c = self.con.execute(sql, (path,))
595
        return [(int(x[0]), int(x[1])) for x in c.fetchall()]
596
    
597
    @backend_method(autocommit=0)
598
    def get_block(self, hash):
599
        """Return a block's data."""
600
        
601
        logger.debug("get_block: %s", hash)
602
        blocks = self.blocker.block_retr((binascii.unhexlify(hash),))
603
        if not blocks:
604
            raise NameError('Block does not exist')
605
        return blocks[0]
606
    
607
    @backend_method(autocommit=0)
608
    def put_block(self, data):
609
        """Create a block and return the hash."""
610
        
611
        logger.debug("put_block: %s", len(data))
612
        hashes, absent = self.blocker.block_stor((data,))
613
        return binascii.hexlify(hashes[0])
614
    
615
    @backend_method(autocommit=0)
616
    def update_block(self, hash, data, offset=0):
617
        """Update a known block and return the hash."""
618
        
619
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
620
        if offset == 0 and len(data) == self.block_size:
621
            return self.put_block(data)
622
        h, e = self.blocker.block_delta(binascii.unhexlify(hash), ((offset, data),))
623
        return binascii.hexlify(h)
624
    
625
    def _sql_until(self, until=None):
626
        """Return the sql to get the latest versions until the timestamp given."""
627
        if until is None:
628
            until = int(time.time())
629
        sql = '''select version_id, name, tstamp, size from versions v
630
                    where version_id = (select max(version_id) from versions
631
                                        where v.name = name and tstamp <= %s)
632
                    and hide = 0'''
633
        return sql % (until,)
634
    
635
    def _get_pathstats(self, path, until=None):
636
        """Return count and sum of size of everything under path and latest timestamp."""
637
        
638
        sql = 'select count(version_id), total(size), max(tstamp) from (%s) where name like ?'
639
        sql = sql % self._sql_until(until)
640
        c = self.con.execute(sql, (path + '/%',))
641
        row = c.fetchone()
642
        tstamp = row[2] if row[2] is not None else 0
643
        return int(row[0]), int(row[1]), int(tstamp)
644
    
645
    def _get_version(self, path, version=None):
646
        if version is None:
647
            sql = '''select version_id, user, tstamp, size, hide from versions where name = ?
648
                        order by version_id desc limit 1'''
649
            c = self.con.execute(sql, (path,))
650
            row = c.fetchone()
651
            if not row or int(row[4]):
652
                raise NameError('Object does not exist')
653
        else:
654
            # The database (sqlite) will not complain if the version is not an integer.
655
            sql = '''select version_id, user, tstamp, size from versions where name = ?
656
                        and version_id = ?'''
657
            c = self.con.execute(sql, (path, version))
658
            row = c.fetchone()
659
            if not row:
660
                raise IndexError('Version does not exist')
661
        return str(row[0]), str(row[1]), int(row[2]), int(row[3])
662
    
663
    def _put_version(self, path, user, size=0, hide=0):
664
        tstamp = int(time.time())
665
        sql = 'insert into versions (name, user, tstamp, size, hide) values (?, ?, ?, ?, ?)'
666
        id = self.con.execute(sql, (path, user, tstamp, size, hide)).lastrowid
667
        return str(id), tstamp
668
    
669
    def _copy_version(self, user, src_path, dest_path, copy_meta=True, copy_data=True, src_version=None):
670
        if src_version is not None:
671
            src_version_id, muser, mtime, size = self._get_version(src_path, src_version)
672
        else:
673
            # Latest or create from scratch.
674
            try:
675
                src_version_id, muser, mtime, size = self._get_version(src_path)
676
            except NameError:
677
                src_version_id = None
678
                size = 0
679
        if not copy_data:
680
            size = 0
681
        dest_version_id = self._put_version(dest_path, user, size)[0]
682
        if copy_meta and src_version_id is not None:
683
            sql = 'insert into metadata select %s, key, value from metadata where version_id = ?'
684
            sql = sql % dest_version_id
685
            self.con.execute(sql, (src_version_id,))
686
        if copy_data and src_version_id is not None:
687
            # TODO: Copy properly.
688
            hashmap = self.mapper.map_retr(src_version_id)
689
            self.mapper.map_stor(dest_version_id, hashmap)
690
        return src_version_id, dest_version_id
691
    
692
    def _get_versioninfo(self, account, container, name, until=None):
693
        """Return path, latest version, associated timestamp and size until the timestamp given."""
694
        
695
        p = (account, container, name)
696
        try:
697
            p = p[:p.index(None)]
698
        except ValueError:
699
            pass
700
        path = '/'.join(p)
701
        sql = '''select version_id, tstamp, size from (%s) where name = ?'''
702
        sql = sql % self._sql_until(until)
703
        c = self.con.execute(sql, (path,))
704
        row = c.fetchone()
705
        if row is None:
706
            raise NameError('Path does not exist')
707
        return path, str(row[0]), int(row[1]), int(row[2])
708
    
709
    def _get_accountinfo(self, account, until=None):
710
        try:
711
            path, version_id, mtime, size = self._get_versioninfo(account, None, None, until)
712
            return version_id, mtime
713
        except:
714
            raise NameError('Account does not exist')
715
    
716
    def _get_containerinfo(self, account, container, until=None):
717
        try:
718
            path, version_id, mtime, size = self._get_versioninfo(account, container, None, until)
719
            return path, version_id, mtime
720
        except:
721
            raise NameError('Container does not exist')
722
    
723
    def _get_objectinfo(self, account, container, name, version=None):
724
        path = '/'.join((account, container, name))
725
        version_id, muser, mtime, size = self._get_version(path, version)
726
        return path, version_id, muser, mtime, size
727
    
728
    def _create_account(self, user, account):
729
        try:
730
            self._get_accountinfo(account)
731
        except NameError:
732
            self._put_version(account, user)
733
    
734
    def _get_metadata(self, path, version):
735
        sql = 'select key, value from metadata where version_id = ?'
736
        c = self.con.execute(sql, (version,))
737
        return dict(c.fetchall())
738
    
739
    def _put_metadata(self, user, path, meta, replace=False, copy_data=True):
740
        """Create a new version and store metadata."""
741
        
742
        src_version_id, dest_version_id = self._copy_version(user, path, path, not replace, copy_data)
743
        for k, v in meta.iteritems():
744
            if not replace and v == '':
745
                sql = 'delete from metadata where version_id = ? and key = ?'
746
                self.con.execute(sql, (dest_version_id, k))
747
            else:
748
                sql = 'insert or replace into metadata (version_id, key, value) values (?, ?, ?)'
749
                self.con.execute(sql, (dest_version_id, k, v))
750
    
751
    def _check_policy(self, policy):
752
        for k in policy.keys():
753
            if policy[k] == '':
754
                policy[k] = self.default_policy.get(k)
755
        for k, v in policy.iteritems():
756
            if k == 'quota':
757
                q = int(v) # May raise ValueError.
758
                if q < 0:
759
                    raise ValueError
760
            elif k == 'versioning':
761
                if v not in ['auto', 'manual', 'none']:
762
                    raise ValueError
763
            else:
764
                raise ValueError
765
    
766
    def _get_policy(self, path):
767
        return self.policy.policy_get(path)
768
    
769
    def _list_limits(self, listing, marker, limit):
770
        start = 0
771
        if marker:
772
            try:
773
                start = listing.index(marker) + 1
774
            except ValueError:
775
                pass
776
        if not limit or limit > 10000:
777
            limit = 10000
778
        return start, limit
779
    
780
    def _list_objects(self, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], until=None, allowed=[]):
781
        cont_prefix = path + '/'
782
        if keys and len(keys) > 0:
783
            sql = '''select distinct o.name, o.version_id from (%s) o, metadata m where o.name like ? and
784
                        m.version_id = o.version_id and m.key in (%s)'''
785
            sql = sql % (self._sql_until(until), ', '.join('?' * len(keys)))
786
            param = (cont_prefix + prefix + '%',) + tuple(keys)
787
            if allowed:
788
                sql += ' and (' + ' or '.join(('o.name like ?',) * len(allowed)) + ')'
789
                param += tuple([x + '%' for x in allowed])
790
            sql += ' order by o.name'
791
        else:
792
            sql = 'select name, version_id from (%s) where name like ?'
793
            sql = sql % self._sql_until(until)
794
            param = (cont_prefix + prefix + '%',)
795
            if allowed:
796
                sql += ' and (' + ' or '.join(('name like ?',) * len(allowed)) + ')'
797
                param += tuple([x + '%' for x in allowed])
798
            sql += ' order by name'
799
        c = self.con.execute(sql, param)
800
        objects = [(x[0][len(cont_prefix):], x[1]) for x in c.fetchall()]
801
        if delimiter:
802
            pseudo_objects = []
803
            for x in objects:
804
                pseudo_name = x[0]
805
                i = pseudo_name.find(delimiter, len(prefix))
806
                if not virtual:
807
                    # If the delimiter is not found, or the name ends
808
                    # with the delimiter's first occurence.
809
                    if i == -1 or len(pseudo_name) == i + len(delimiter):
810
                        pseudo_objects.append(x)
811
                else:
812
                    # If the delimiter is found, keep up to (and including) the delimiter.
813
                    if i != -1:
814
                        pseudo_name = pseudo_name[:i + len(delimiter)]
815
                    if pseudo_name not in [y[0] for y in pseudo_objects]:
816
                        if pseudo_name == x[0]:
817
                            pseudo_objects.append(x)
818
                        else:
819
                            pseudo_objects.append((pseudo_name, None))
820
            objects = pseudo_objects
821
        
822
        start, limit = self._list_limits([x[0] for x in objects], marker, limit)
823
        return objects[start:start + limit]
824
    
825
    def _del_version(self, version):
826
        self.mapper.map_remv(version)
827
        sql = 'delete from versions where version_id = ?'
828
        self.con.execute(sql, (version,))
829
    
830
    # Access control functions.
831
    
832
    def _check_groups(self, groups):
833
        # raise ValueError('Bad characters in groups')
834
        pass
835
    
836
    def _check_permissions(self, path, permissions):
837
        # raise ValueError('Bad characters in permissions')
838
        
839
        # Check for existing permissions.
840
        paths = self.permissions.access_list(path)
841
        if paths:
842
            ae = AttributeError()
843
            ae.data = paths
844
            raise ae
845
    
846
    def _can_read(self, user, account, container, name):
847
        if user == account:
848
            return True
849
        path = '/'.join((account, container, name))
850
        if not self.permissions.access_check(path, READ, user) and not self.permissions.access_check(path, WRITE, user):
851
            raise NotAllowedError
852
    
853
    def _can_write(self, user, account, container, name):
854
        if user == account:
855
            return True
856
        path = '/'.join((account, container, name))
857
        if not self.permissions.access_check(path, WRITE, user):
858
            raise NotAllowedError
859
    
860
    def _allowed_accounts(self, user):
861
        allow = set()
862
        for path in self.permissions.access_list_paths(user):
863
            allow.add(path.split('/', 1)[0])
864
        return sorted(allow)
865
    
866
    def _allowed_containers(self, user, account):
867
        allow = set()
868
        for path in self.permissions.access_list_paths(user, account):
869
            allow.add(path.split('/', 2)[1])
870
        return sorted(allow)