Statistics
| Branch: | Tag: | Revision:

root / pithos / backends / simple.py @ 833baad6

History | View | Annotate | Download (41.3 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
# 
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
# 
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
# 
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
# 
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
# 
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import os
35
import time
36
import sqlite3
37
import logging
38
import hashlib
39
import binascii
40

    
41
from base import NotAllowedError, BaseBackend
42
from pithos.lib.hashfiler import Mapper, Blocker
43

    
44

    
45
logger = logging.getLogger(__name__)
46

    
47
def backend_method(func=None, autocommit=1):
48
    if func is None:
49
        def fn(func):
50
            return backend_method(func, autocommit)
51
        return fn
52

    
53
    if not autocommit:
54
        return func
55
    def fn(self, *args, **kw):
56
        self.con.execute('begin deferred')
57
        try:
58
            ret = func(self, *args, **kw)
59
            self.con.commit()
60
            return ret
61
        except:
62
            self.con.rollback()
63
            raise
64
    return fn
65

    
66

    
67
class SimpleBackend(BaseBackend):
68
    """A simple backend.
69
    
70
    Uses SQLite for storage.
71
    """
72
    
73
    # TODO: Create account if not present in all functions.
74
    
75
    def __init__(self, db):
76
        self.hash_algorithm = 'sha256'
77
        self.block_size = 4 * 1024 * 1024 # 4MB
78
        
79
        self.default_policy = {'quota': 0, 'versioning': 'auto'}
80
        
81
        basepath = os.path.split(db)[0]
82
        if basepath and not os.path.exists(basepath):
83
            os.makedirs(basepath)
84
        if not os.path.isdir(basepath):
85
            raise RuntimeError("Cannot open database at '%s'" % (db,))
86
        
87
        self.con = sqlite3.connect(basepath + '/db', check_same_thread=False)
88
        
89
        sql = '''pragma foreign_keys = on'''
90
        self.con.execute(sql)
91
        
92
        sql = '''create table if not exists versions (
93
                    version_id integer primary key,
94
                    name text,
95
                    user text,
96
                    tstamp integer not null,
97
                    size integer default 0,
98
                    hide integer default 0)'''
99
        self.con.execute(sql)
100
        sql = '''create table if not exists metadata (
101
                    version_id integer,
102
                    key text,
103
                    value text,
104
                    primary key (version_id, key)
105
                    foreign key (version_id) references versions(version_id)
106
                    on delete cascade)'''
107
        self.con.execute(sql)
108
        sql = '''create table if not exists policy (
109
                    name text, key text, value text, primary key (name, key))'''
110
        self.con.execute(sql)
111
        
112
        # Access control tables.
113
        sql = '''create table if not exists groups (
114
                    account text, gname text, user text)'''
115
        self.con.execute(sql)
116
        sql = '''create table if not exists permissions (
117
                    name text, op text, user text)'''
118
        self.con.execute(sql)
119
        sql = '''create table if not exists public (
120
                    name text, primary key (name))'''
121
        self.con.execute(sql)
122
        
123
        self.con.commit()
124
        
125
        params = {'blocksize': self.block_size,
126
                  'blockpath': basepath + '/blocks',
127
                  'hashtype': self.hash_algorithm}
128
        self.blocker = Blocker(**params)
129
        
130
        params = {'mappath': basepath + '/maps',
131
                  'namelen': self.blocker.hashlen}
132
        self.mapper = Mapper(**params)
133
    
134
    @backend_method
135
    def list_accounts(self, user, marker=None, limit=10000):
136
        """Return a list of accounts the user can access."""
137
        
138
        allowed = self._allowed_accounts(user)
139
        start, limit = self._list_limits(allowed, marker, limit)
140
        return allowed[start:start + limit]
141
    
142
    @backend_method
143
    def get_account_meta(self, user, account, until=None):
144
        """Return a dictionary with the account metadata."""
145
        
146
        logger.debug("get_account_meta: %s %s", account, until)
147
        if user != account:
148
            if until or account not in self._allowed_accounts(user):
149
                raise NotAllowedError
150
        else:
151
            self._create_account(user, account)
152
        try:
153
            version_id, mtime = self._get_accountinfo(account, until)
154
        except NameError:
155
            # Account does not exist before until.
156
            version_id = None
157
            mtime = until
158
        count, bytes, tstamp = self._get_pathstats(account, until)
159
        if mtime > tstamp:
160
            tstamp = mtime
161
        if until is None:
162
            modified = tstamp
163
        else:
164
            modified = self._get_pathstats(account)[2] # Overall last modification
165
            if mtime > modified:
166
                modified = mtime
167
        
168
        # Proper count.
169
        sql = 'select count(name) from (%s) where name glob ? and not name glob ?'
170
        sql = sql % self._sql_until(until)
171
        c = self.con.execute(sql, (account + '/*', account + '/*/*'))
172
        row = c.fetchone()
173
        count = row[0]
174
        
175
        if user != account:
176
            meta = {'name': account}
177
        else:
178
            meta = self._get_metadata(account, version_id)
179
            meta.update({'name': account, 'count': count, 'bytes': bytes})
180
            if until is not None:
181
                meta.update({'until_timestamp': tstamp})
182
        meta.update({'modified': modified})
183
        return meta
184
    
185
    @backend_method
186
    def update_account_meta(self, user, account, meta, replace=False):
187
        """Update the metadata associated with the account."""
188
        
189
        logger.debug("update_account_meta: %s %s %s", account, meta, replace)
190
        if user != account:
191
            raise NotAllowedError
192
        self._put_metadata(user, account, meta, replace, False)
193
    
194
    @backend_method
195
    def get_account_groups(self, user, account):
196
        """Return a dictionary with the user groups defined for this account."""
197
        
198
        logger.debug("get_account_groups: %s", account)
199
        if user != account:
200
            if account not in self._allowed_accounts(user):
201
                raise NotAllowedError
202
            return {}
203
        self._create_account(user, account)
204
        return self._get_groups(account)
205
    
206
    @backend_method
207
    def update_account_groups(self, user, account, groups, replace=False):
208
        """Update the groups associated with the account."""
209
        
210
        logger.debug("update_account_groups: %s %s %s", account, groups, replace)
211
        if user != account:
212
            raise NotAllowedError
213
        self._create_account(user, account)
214
        self._check_groups(groups)
215
        self._put_groups(account, groups, replace)
216
    
217
    @backend_method
218
    def put_account(self, user, account):
219
        """Create a new account with the given name."""
220
        
221
        logger.debug("put_account: %s", account)
222
        if user != account:
223
            raise NotAllowedError
224
        try:
225
            version_id, mtime = self._get_accountinfo(account)
226
        except NameError:
227
            pass
228
        else:
229
            raise NameError('Account already exists')
230
        self._put_version(account, user)
231
    
232
    @backend_method
233
    def delete_account(self, user, account):
234
        """Delete the account with the given name."""
235
        
236
        logger.debug("delete_account: %s", account)
237
        if user != account:
238
            raise NotAllowedError
239
        count = self._get_pathstats(account)[0]
240
        if count > 0:
241
            raise IndexError('Account is not empty')
242
        sql = 'delete from versions where name = ?'
243
        self.con.execute(sql, (account,))
244
        self._del_groups(account)
245
    
246
    @backend_method
247
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None):
248
        """Return a list of containers existing under an account."""
249
        
250
        logger.debug("list_containers: %s %s %s %s", account, marker, limit, until)
251
        if user != account:
252
            if until or account not in self._allowed_accounts(user):
253
                raise NotAllowedError
254
            allowed = self._allowed_containers(user, account)
255
            start, limit = self._list_limits(allowed, marker, limit)
256
            return allowed[start:start + limit]
257
        else:
258
            if shared:
259
                allowed = [x.split('/', 2)[1] for x in self._shared_paths(account)]
260
                start, limit = self._list_limits(allowed, marker, limit)
261
                return allowed[start:start + limit]
262
        return [x[0] for x in self._list_objects(account, '', '/', marker, limit, False, [], until)]
263
    
264
    @backend_method
265
    def get_container_meta(self, user, account, container, until=None):
266
        """Return a dictionary with the container metadata."""
267
        
268
        logger.debug("get_container_meta: %s %s %s", account, container, until)
269
        if user != account:
270
            if until or container not in self._allowed_containers(user, account):
271
                raise NotAllowedError
272
        path, version_id, mtime = self._get_containerinfo(account, container, until)
273
        count, bytes, tstamp = self._get_pathstats(path, until)
274
        if mtime > tstamp:
275
            tstamp = mtime
276
        if until is None:
277
            modified = tstamp
278
        else:
279
            modified = self._get_pathstats(path)[2] # Overall last modification
280
            if mtime > modified:
281
                modified = mtime
282
        
283
        if user != account:
284
            meta = {'name': container, 'modified': modified}
285
        else:
286
            meta = self._get_metadata(path, version_id)
287
            meta.update({'name': container, 'count': count, 'bytes': bytes, 'modified': modified})
288
            if until is not None:
289
                meta.update({'until_timestamp': tstamp})
290
        return meta
291
    
292
    @backend_method
293
    def update_container_meta(self, user, account, container, meta, replace=False):
294
        """Update the metadata associated with the container."""
295
        
296
        logger.debug("update_container_meta: %s %s %s %s", account, container, meta, replace)
297
        if user != account:
298
            raise NotAllowedError
299
        path, version_id, mtime = self._get_containerinfo(account, container)
300
        self._put_metadata(user, path, meta, replace, False)
301
    
302
    @backend_method
303
    def get_container_policy(self, user, account, container):
304
        """Return a dictionary with the container policy."""
305
        
306
        logger.debug("get_container_policy: %s %s", account, container)
307
        if user != account:
308
            if container not in self._allowed_containers(user, account):
309
                raise NotAllowedError
310
            return {}
311
        path = self._get_containerinfo(account, container)[0]
312
        return self._get_policy(path)
313
    
314
    @backend_method
315
    def update_container_policy(self, user, account, container, policy, replace=False):
316
        """Update the policy associated with the account."""
317
        
318
        logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace)
319
        if user != account:
320
            raise NotAllowedError
321
        path = self._get_containerinfo(account, container)[0]
322
        self._check_policy(policy)
323
        if replace:
324
            for k, v in self.default_policy.iteritems():
325
                if k not in policy:
326
                    policy[k] = v
327
        for k, v in policy.iteritems():
328
            sql = 'insert or replace into policy (name, key, value) values (?, ?, ?)'
329
            self.con.execute(sql, (path, k, v))
330
    
331
    @backend_method
332
    def put_container(self, user, account, container, policy=None):
333
        """Create a new container with the given name."""
334
        
335
        logger.debug("put_container: %s %s %s", account, container, policy)
336
        if user != account:
337
            raise NotAllowedError
338
        try:
339
            path, version_id, mtime = self._get_containerinfo(account, container)
340
        except NameError:
341
            pass
342
        else:
343
            raise NameError('Container already exists')
344
        if policy:
345
            self._check_policy(policy)
346
        path = '/'.join((account, container))
347
        version_id = self._put_version(path, user)[0]
348
        for k, v in self.default_policy.iteritems():
349
            if k not in policy:
350
                policy[k] = v
351
        for k, v in policy.iteritems():
352
            sql = 'insert or replace into policy (name, key, value) values (?, ?, ?)'
353
            self.con.execute(sql, (path, k, v))
354
    
355
    @backend_method
356
    def delete_container(self, user, account, container, until=None):
357
        """Delete/purge the container with the given name."""
358
        
359
        logger.debug("delete_container: %s %s %s", account, container, until)
360
        if user != account:
361
            raise NotAllowedError
362
        path, version_id, mtime = self._get_containerinfo(account, container)
363
        
364
        if until is not None:
365
            sql = '''select version_id from versions where name like ? and tstamp <= ?
366
                        and version_id not in (select version_id from (%s))'''
367
            sql = sql % self._sql_until() # Do not delete current versions.
368
            c = self.con.execute(sql, (path + '/%', until))
369
            for v in [x[0] for x in c.fetchall()]:
370
                self._del_version(v)
371
            return
372
        
373
        count = self._get_pathstats(path)[0]
374
        if count > 0:
375
            raise IndexError('Container is not empty')
376
        sql = 'delete from versions where name = ? or name like ?' # May contain hidden items.
377
        self.con.execute(sql, (path, path + '/%',))
378
        sql = 'delete from policy where name = ?'
379
        self.con.execute(sql, (path,))
380
        self._copy_version(user, account, account, True, False) # New account version (for timestamp update).
381
    
382
    @backend_method
383
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], shared=False, until=None):
384
        """Return a list of objects existing under a container."""
385
        
386
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, keys, shared, until)
387
        allowed = []
388
        if user != account:
389
            if until:
390
                raise NotAllowedError
391
            allowed = self._allowed_paths(user, '/'.join((account, container)))
392
            if not allowed:
393
                raise NotAllowedError
394
        else:
395
            if shared:
396
                allowed = self._shared_paths('/'.join((account, container)))
397
        path, version_id, mtime = self._get_containerinfo(account, container, until)
398
        return self._list_objects(path, prefix, delimiter, marker, limit, virtual, keys, until, allowed)
399
    
400
    @backend_method
401
    def list_object_meta(self, user, account, container, until=None):
402
        """Return a list with all the container's object meta keys."""
403
        
404
        logger.debug("list_object_meta: %s %s %s", account, container, until)
405
        allowed = []
406
        if user != account:
407
            if until:
408
                raise NotAllowedError
409
            allowed = self._allowed_paths(user, '/'.join((account, container)))
410
            if not allowed:
411
                raise NotAllowedError
412
        path, version_id, mtime = self._get_containerinfo(account, container, until)
413
        sql = '''select distinct m.key from (%s) o, metadata m
414
                    where m.version_id = o.version_id and o.name like ?'''
415
        sql = sql % self._sql_until(until)
416
        param = (path + '/%',)
417
        if allowed:
418
            for x in allowed:
419
                sql += ' and o.name like ?'
420
                param += (x,)
421
        c = self.con.execute(sql, param)
422
        return [x[0] for x in c.fetchall()]
423
    
424
    @backend_method
425
    def get_object_meta(self, user, account, container, name, version=None):
426
        """Return a dictionary with the object metadata."""
427
        
428
        logger.debug("get_object_meta: %s %s %s %s", account, container, name, version)
429
        self._can_read(user, account, container, name)
430
        path, version_id, muser, mtime, size = self._get_objectinfo(account, container, name, version)
431
        if version is None:
432
            modified = mtime
433
        else:
434
            modified = self._get_version(path, version)[2] # Overall last modification
435
        
436
        meta = self._get_metadata(path, version_id)
437
        meta.update({'name': name, 'bytes': size})
438
        meta.update({'version': version_id, 'version_timestamp': mtime})
439
        meta.update({'modified': modified, 'modified_by': muser})
440
        return meta
441
    
442
    @backend_method
443
    def update_object_meta(self, user, account, container, name, meta, replace=False):
444
        """Update the metadata associated with the object."""
445
        
446
        logger.debug("update_object_meta: %s %s %s %s %s", account, container, name, meta, replace)
447
        self._can_write(user, account, container, name)
448
        path, version_id, muser, mtime, size = self._get_objectinfo(account, container, name)
449
        self._put_metadata(user, path, meta, replace)
450
    
451
    @backend_method
452
    def get_object_permissions(self, user, account, container, name):
453
        """Return the path from which this object gets its permissions from,\
454
        along with a dictionary containing the permissions."""
455
        
456
        logger.debug("get_object_permissions: %s %s %s", account, container, name)
457
        self._can_read(user, account, container, name)
458
        path = self._get_objectinfo(account, container, name)[0]
459
        return self._get_permissions(path)
460
    
461
    @backend_method
462
    def update_object_permissions(self, user, account, container, name, permissions):
463
        """Update the permissions associated with the object."""
464
        
465
        logger.debug("update_object_permissions: %s %s %s %s", account, container, name, permissions)
466
        if user != account:
467
            raise NotAllowedError
468
        path = self._get_objectinfo(account, container, name)[0]
469
        r, w = self._check_permissions(path, permissions)
470
        self._put_permissions(path, r, w)
471
    
472
    @backend_method
473
    def get_object_public(self, user, account, container, name):
474
        """Return the public URL of the object if applicable."""
475
        
476
        logger.debug("get_object_public: %s %s %s", account, container, name)
477
        self._can_read(user, account, container, name)
478
        path = self._get_objectinfo(account, container, name)[0]
479
        if self._get_public(path):
480
            return '/public/' + path
481
        return None
482
    
483
    @backend_method
484
    def update_object_public(self, user, account, container, name, public):
485
        """Update the public status of the object."""
486
        
487
        logger.debug("update_object_public: %s %s %s %s", account, container, name, public)
488
        self._can_write(user, account, container, name)
489
        path = self._get_objectinfo(account, container, name)[0]
490
        self._put_public(path, public)
491
    
492
    @backend_method
493
    def get_object_hashmap(self, user, account, container, name, version=None):
494
        """Return the object's size and a list with partial hashes."""
495
        
496
        logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
497
        self._can_read(user, account, container, name)
498
        path, version_id, muser, mtime, size = self._get_objectinfo(account, container, name, version)
499
        hashmap = self.mapper.map_retr(version_id)
500
        return size, [binascii.hexlify(x) for x in hashmap]
501
    
502
    @backend_method
503
    def update_object_hashmap(self, user, account, container, name, size, hashmap, meta={}, replace_meta=False, permissions=None):
504
        """Create/update an object with the specified size and partial hashes."""
505
        
506
        logger.debug("update_object_hashmap: %s %s %s %s %s", account, container, name, size, hashmap)
507
        if permissions is not None and user != account:
508
            raise NotAllowedError
509
        self._can_write(user, account, container, name)
510
        missing = self.blocker.block_ping([binascii.unhexlify(x) for x in hashmap])
511
        if missing:
512
            ie = IndexError()
513
            ie.data = missing
514
            raise ie
515
        path = self._get_containerinfo(account, container)[0]
516
        path = '/'.join((path, name))
517
        if permissions is not None:
518
            r, w = self._check_permissions(path, permissions)
519
        src_version_id, dest_version_id = self._copy_version(user, path, path, not replace_meta, False)
520
        sql = 'update versions set size = ? where version_id = ?'
521
        self.con.execute(sql, (size, dest_version_id))
522
        self.mapper.map_stor(dest_version_id, [binascii.unhexlify(x) for x in hashmap])
523
        for k, v in meta.iteritems():
524
            sql = 'insert or replace into metadata (version_id, key, value) values (?, ?, ?)'
525
            self.con.execute(sql, (dest_version_id, k, v))
526
        if permissions is not None:
527
            self._put_permissions(path, r, w)
528
    
529
    @backend_method
530
    def copy_object(self, user, account, src_container, src_name, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None):
531
        """Copy an object's data and metadata."""
532
        
533
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s", account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version)
534
        if permissions is not None and user != account:
535
            raise NotAllowedError
536
        self._can_read(user, account, src_container, src_name)
537
        self._can_write(user, account, dest_container, dest_name)
538
        self._get_containerinfo(account, src_container)
539
        if src_version is None:
540
            src_path = self._get_objectinfo(account, src_container, src_name)[0]
541
        else:
542
            src_path = '/'.join((account, src_container, src_name))
543
        dest_path = self._get_containerinfo(account, dest_container)[0]
544
        dest_path = '/'.join((dest_path, dest_name))
545
        if permissions is not None:
546
            r, w = self._check_permissions(dest_path, permissions)
547
        src_version_id, dest_version_id = self._copy_version(user, src_path, dest_path, not replace_meta, True, src_version)
548
        for k, v in dest_meta.iteritems():
549
            sql = 'insert or replace into metadata (version_id, key, value) values (?, ?, ?)'
550
            self.con.execute(sql, (dest_version_id, k, v))
551
        if permissions is not None:
552
            self._put_permissions(dest_path, r, w)
553
    
554
    @backend_method
555
    def move_object(self, user, account, src_container, src_name, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None):
556
        """Move an object's data and metadata."""
557
        
558
        logger.debug("move_object: %s %s %s %s %s %s %s %s", account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions)
559
        self.copy_object(user, account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions, None)
560
        self.delete_object(user, account, src_container, src_name)
561
    
562
    @backend_method
563
    def delete_object(self, user, account, container, name, until=None):
564
        """Delete/purge an object."""
565
        
566
        logger.debug("delete_object: %s %s %s %s", account, container, name, until)
567
        if user != account:
568
            raise NotAllowedError
569
        
570
        if until is not None:
571
            path = '/'.join((account, container, name))
572
            sql = '''select version_id from versions where name = ? and tstamp <= ?'''
573
            c = self.con.execute(sql, (path, until))
574
            for v in [x[0] in c.fetchall()]:
575
                self._del_version(v)
576
            try:
577
                version_id = self._get_version(path)[0]
578
            except NameError:
579
                pass
580
            else:
581
                self._del_sharing(path)
582
            return
583
        
584
        path = self._get_objectinfo(account, container, name)[0]
585
        self._put_version(path, user, 0, 1)
586
        self._del_sharing(path)
587
    
588
    @backend_method
589
    def list_versions(self, user, account, container, name):
590
        """Return a list of all (version, version_timestamp) tuples for an object."""
591
        
592
        logger.debug("list_versions: %s %s %s", account, container, name)
593
        self._can_read(user, account, container, name)
594
        # This will even show deleted versions.
595
        path = '/'.join((account, container, name))
596
        sql = '''select distinct version_id, tstamp from versions where name = ? and hide = 0'''
597
        c = self.con.execute(sql, (path,))
598
        return [(int(x[0]), int(x[1])) for x in c.fetchall()]
599
    
600
    @backend_method(autocommit=0)
601
    def get_block(self, hash):
602
        """Return a block's data."""
603
        
604
        logger.debug("get_block: %s", hash)
605
        blocks = self.blocker.block_retr((binascii.unhexlify(hash),))
606
        if not blocks:
607
            raise NameError('Block does not exist')
608
        return blocks[0]
609
    
610
    @backend_method(autocommit=0)
611
    def put_block(self, data):
612
        """Create a block and return the hash."""
613
        
614
        logger.debug("put_block: %s", len(data))
615
        hashes, absent = self.blocker.block_stor((data,))
616
        return binascii.hexlify(hashes[0])
617
    
618
    @backend_method(autocommit=0)
619
    def update_block(self, hash, data, offset=0):
620
        """Update a known block and return the hash."""
621
        
622
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
623
        if offset == 0 and len(data) == self.block_size:
624
            return self.put_block(data)
625
        h, e = self.blocker.block_delta(binascii.unhexlify(hash), ((offset, data),))
626
        return binascii.hexlify(h)
627
    
628
    def _sql_until(self, until=None):
629
        """Return the sql to get the latest versions until the timestamp given."""
630
        if until is None:
631
            until = int(time.time())
632
        sql = '''select version_id, name, tstamp, size from versions v
633
                    where version_id = (select max(version_id) from versions
634
                                        where v.name = name and tstamp <= %s)
635
                    and hide = 0'''
636
        return sql % (until,)
637
    
638
    def _get_pathstats(self, path, until=None):
639
        """Return count and sum of size of everything under path and latest timestamp."""
640
        
641
        sql = 'select count(version_id), total(size), max(tstamp) from (%s) where name like ?'
642
        sql = sql % self._sql_until(until)
643
        c = self.con.execute(sql, (path + '/%',))
644
        row = c.fetchone()
645
        tstamp = row[2] if row[2] is not None else 0
646
        return int(row[0]), int(row[1]), int(tstamp)
647
    
648
    def _get_version(self, path, version=None):
649
        if version is None:
650
            sql = '''select version_id, user, tstamp, size, hide from versions where name = ?
651
                        order by version_id desc limit 1'''
652
            c = self.con.execute(sql, (path,))
653
            row = c.fetchone()
654
            if not row or int(row[4]):
655
                raise NameError('Object does not exist')
656
        else:
657
            # The database (sqlite) will not complain if the version is not an integer.
658
            sql = '''select version_id, user, tstamp, size from versions where name = ?
659
                        and version_id = ?'''
660
            c = self.con.execute(sql, (path, version))
661
            row = c.fetchone()
662
            if not row:
663
                raise IndexError('Version does not exist')
664
        return str(row[0]), str(row[1]), int(row[2]), int(row[3])
665
    
666
    def _put_version(self, path, user, size=0, hide=0):
667
        tstamp = int(time.time())
668
        sql = 'insert into versions (name, user, tstamp, size, hide) values (?, ?, ?, ?, ?)'
669
        id = self.con.execute(sql, (path, user, tstamp, size, hide)).lastrowid
670
        return str(id), tstamp
671
    
672
    def _copy_version(self, user, src_path, dest_path, copy_meta=True, copy_data=True, src_version=None):
673
        if src_version is not None:
674
            src_version_id, muser, mtime, size = self._get_version(src_path, src_version)
675
        else:
676
            # Latest or create from scratch.
677
            try:
678
                src_version_id, muser, mtime, size = self._get_version(src_path)
679
            except NameError:
680
                src_version_id = None
681
                size = 0
682
        if not copy_data:
683
            size = 0
684
        dest_version_id = self._put_version(dest_path, user, size)[0]
685
        if copy_meta and src_version_id is not None:
686
            sql = 'insert into metadata select %s, key, value from metadata where version_id = ?'
687
            sql = sql % dest_version_id
688
            self.con.execute(sql, (src_version_id,))
689
        if copy_data and src_version_id is not None:
690
            # TODO: Copy properly.
691
            hashmap = self.mapper.map_retr(src_version_id)
692
            self.mapper.map_stor(dest_version_id, hashmap)
693
        return src_version_id, dest_version_id
694
    
695
    def _get_versioninfo(self, account, container, name, until=None):
696
        """Return path, latest version, associated timestamp and size until the timestamp given."""
697
        
698
        p = (account, container, name)
699
        try:
700
            p = p[:p.index(None)]
701
        except ValueError:
702
            pass
703
        path = '/'.join(p)
704
        sql = '''select version_id, tstamp, size from (%s) where name = ?'''
705
        sql = sql % self._sql_until(until)
706
        c = self.con.execute(sql, (path,))
707
        row = c.fetchone()
708
        if row is None:
709
            raise NameError('Path does not exist')
710
        return path, str(row[0]), int(row[1]), int(row[2])
711
    
712
    def _get_accountinfo(self, account, until=None):
713
        try:
714
            path, version_id, mtime, size = self._get_versioninfo(account, None, None, until)
715
            return version_id, mtime
716
        except:
717
            raise NameError('Account does not exist')
718
    
719
    def _get_containerinfo(self, account, container, until=None):
720
        try:
721
            path, version_id, mtime, size = self._get_versioninfo(account, container, None, until)
722
            return path, version_id, mtime
723
        except:
724
            raise NameError('Container does not exist')
725
    
726
    def _get_objectinfo(self, account, container, name, version=None):
727
        path = '/'.join((account, container, name))
728
        version_id, muser, mtime, size = self._get_version(path, version)
729
        return path, version_id, muser, mtime, size
730
    
731
    def _create_account(self, user, account):
732
        try:
733
            self._get_accountinfo(account)
734
        except NameError:
735
            self._put_version(account, user)
736
    
737
    def _get_metadata(self, path, version):
738
        sql = 'select key, value from metadata where version_id = ?'
739
        c = self.con.execute(sql, (version,))
740
        return dict(c.fetchall())
741
    
742
    def _put_metadata(self, user, path, meta, replace=False, copy_data=True):
743
        """Create a new version and store metadata."""
744
        
745
        src_version_id, dest_version_id = self._copy_version(user, path, path, not replace, copy_data)
746
        for k, v in meta.iteritems():
747
            if not replace and v == '':
748
                sql = 'delete from metadata where version_id = ? and key = ?'
749
                self.con.execute(sql, (dest_version_id, k))
750
            else:
751
                sql = 'insert or replace into metadata (version_id, key, value) values (?, ?, ?)'
752
                self.con.execute(sql, (dest_version_id, k, v))
753
    
754
    def _check_policy(self, policy):
755
        for k in policy.keys():
756
            if policy[k] == '':
757
                policy[k] = self.default_policy.get(k)
758
        for k, v in policy.iteritems():
759
            if k == 'quota':
760
                q = int(v) # May raise ValueError.
761
                if q < 0:
762
                    raise ValueError
763
            elif k == 'versioning':
764
                if v not in ['auto', 'manual', 'none']:
765
                    raise ValueError
766
            else:
767
                raise ValueError
768
    
769
    def _get_policy(self, path):
770
        sql = 'select key, value from policy where name = ?'
771
        c = self.con.execute(sql, (path,))
772
        return dict(c.fetchall())
773
    
774
    def _list_limits(self, listing, marker, limit):
775
        start = 0
776
        if marker:
777
            try:
778
                start = listing.index(marker) + 1
779
            except ValueError:
780
                pass
781
        if not limit or limit > 10000:
782
            limit = 10000
783
        return start, limit
784
    
785
    def _list_objects(self, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], until=None, allowed=[]):
786
        cont_prefix = path + '/'
787
        if keys and len(keys) > 0:
788
            sql = '''select distinct o.name, o.version_id from (%s) o, metadata m where o.name like ? and
789
                        m.version_id = o.version_id and m.key in (%s)'''
790
            sql = sql % (self._sql_until(until), ', '.join('?' * len(keys)))
791
            param = (cont_prefix + prefix + '%',) + tuple(keys)
792
            if allowed:
793
                sql += ' and (' + ' or '.join(('o.name like ?',) * len(allowed)) + ')'
794
                param += tuple([x + '%' for x in allowed])
795
            sql += ' order by o.name'
796
        else:
797
            sql = 'select name, version_id from (%s) where name like ?'
798
            sql = sql % self._sql_until(until)
799
            param = (cont_prefix + prefix + '%',)
800
            if allowed:
801
                sql += ' and (' + ' or '.join(('name like ?',) * len(allowed)) + ')'
802
                param += tuple([x + '%' for x in allowed])
803
            sql += ' order by name'
804
        c = self.con.execute(sql, param)
805
        objects = [(x[0][len(cont_prefix):], x[1]) for x in c.fetchall()]
806
        if delimiter:
807
            pseudo_objects = []
808
            for x in objects:
809
                pseudo_name = x[0]
810
                i = pseudo_name.find(delimiter, len(prefix))
811
                if not virtual:
812
                    # If the delimiter is not found, or the name ends
813
                    # with the delimiter's first occurence.
814
                    if i == -1 or len(pseudo_name) == i + len(delimiter):
815
                        pseudo_objects.append(x)
816
                else:
817
                    # If the delimiter is found, keep up to (and including) the delimiter.
818
                    if i != -1:
819
                        pseudo_name = pseudo_name[:i + len(delimiter)]
820
                    if pseudo_name not in [y[0] for y in pseudo_objects]:
821
                        if pseudo_name == x[0]:
822
                            pseudo_objects.append(x)
823
                        else:
824
                            pseudo_objects.append((pseudo_name, None))
825
            objects = pseudo_objects
826
        
827
        start, limit = self._list_limits([x[0] for x in objects], marker, limit)
828
        return objects[start:start + limit]
829
    
830
    def _del_version(self, version):
831
        self.mapper.map_remv(version)
832
        sql = 'delete from versions where version_id = ?'
833
        self.con.execute(sql, (version,))
834
    
835
    # Access control functions.
836
    
837
    def _check_groups(self, groups):
838
        # Example follows.
839
        # for k, v in groups.iteritems():
840
        #     if True in [False or ',' in x for x in v]:
841
        #         raise ValueError('Bad characters in groups')
842
        pass
843
    
844
    def _get_groups(self, account):
845
        sql = 'select gname, user from groups where account = ?'
846
        c = self.con.execute(sql, (account,))
847
        groups = {}
848
        for row in c.fetchall():
849
            if row[0] not in groups:
850
                groups[row[0]] = []
851
            groups[row[0]].append(row[1])
852
        return groups
853
    
854
    def _put_groups(self, account, groups, replace=False):
855
        if replace:
856
            self._del_groups(account)
857
        for k, v in groups.iteritems():
858
            sql = 'delete from groups where account = ? and gname = ?'
859
            self.con.execute(sql, (account, k))
860
            if v:
861
                sql = 'insert into groups (account, gname, user) values (?, ?, ?)'
862
                self.con.executemany(sql, [(account, k, x) for x in v])
863
    
864
    def _del_groups(self, account):
865
        sql = 'delete from groups where account = ?'
866
        self.con.execute(sql, (account,))
867
    
868
    def _check_permissions(self, path, permissions):
869
        # Check for existing permissions.
870
        sql = '''select name from permissions
871
                    where name != ? and (name like ? or ? like name || ?)'''
872
        c = self.con.execute(sql, (path, path + '%', path, '%'))
873
        row = c.fetchone()
874
        if row:
875
            ae = AttributeError()
876
            ae.data = row[0]
877
            raise ae
878
        
879
        # Format given permissions.
880
        if len(permissions) == 0:
881
            return [], []
882
        r = permissions.get('read', [])
883
        w = permissions.get('write', [])
884
        # Examples follow.
885
        # if True in [False or ',' in x for x in r]:
886
        #     raise ValueError('Bad characters in read permissions')
887
        # if True in [False or ',' in x for x in w]:
888
        #     raise ValueError('Bad characters in write permissions')
889
        return r, w
890
    
891
    def _get_permissions(self, path):
892
        # Check for permissions at path or above.
893
        sql = 'select name, op, user from permissions where ? like name || ?'
894
        c = self.con.execute(sql, (path, '%'))
895
        name = path
896
        perms = {} # Return nothing, if nothing is set.
897
        for row in c.fetchall():
898
            name = row[0]
899
            if row[1] not in perms:
900
                perms[row[1]] = []
901
            perms[row[1]].append(row[2])
902
        return name, perms
903
    
904
    def _put_permissions(self, path, r, w):
905
        sql = 'delete from permissions where name = ?'
906
        self.con.execute(sql, (path,))
907
        sql = 'insert into permissions (name, op, user) values (?, ?, ?)'
908
        if r:
909
            self.con.executemany(sql, [(path, 'read', x) for x in r])
910
        if w:
911
            self.con.executemany(sql, [(path, 'write', x) for x in w])
912
    
913
    def _get_public(self, path):
914
        sql = 'select name from public where name = ?'
915
        c = self.con.execute(sql, (path,))
916
        row = c.fetchone()
917
        if not row:
918
            return False
919
        return True
920
    
921
    def _put_public(self, path, public):
922
        if not public:
923
            sql = 'delete from public where name = ?'
924
        else:
925
            sql = 'insert or replace into public (name) values (?)'
926
        self.con.execute(sql, (path,))
927
    
928
    def _del_sharing(self, path):
929
        sql = 'delete from permissions where name = ?'
930
        self.con.execute(sql, (path,))
931
        sql = 'delete from public where name = ?'
932
        self.con.execute(sql, (path,))
933
    
934
    def _is_allowed(self, user, account, container, name, op='read'):
935
        if user == account:
936
            return True
937
        path = '/'.join((account, container, name))
938
        if op == 'read' and self._get_public(path):
939
            return True
940
        perm_path, perms = self._get_permissions(path)
941
        
942
        # Expand groups.
943
        for x in ('read', 'write'):
944
            g_perms = set()
945
            for y in perms.get(x, []):
946
                if ':' in y:
947
                    g_account, g_name = y.split(':', 1)
948
                    groups = self._get_groups(g_account)
949
                    if g_name in groups:
950
                        g_perms.update(groups[g_name])
951
                else:
952
                    g_perms.add(y)
953
            perms[x] = g_perms
954
        
955
        if op == 'read' and ('*' in perms['read'] or user in perms['read']):
956
            return True
957
        if '*' in perms['write'] or user in perms['write']:
958
            return True
959
        return False
960
    
961
    def _can_read(self, user, account, container, name):
962
        if not self._is_allowed(user, account, container, name, 'read'):
963
            raise NotAllowedError
964
    
965
    def _can_write(self, user, account, container, name):
966
        if not self._is_allowed(user, account, container, name, 'write'):
967
            raise NotAllowedError
968
    
969
    def _allowed_paths(self, user, prefix=None):
970
        sql = '''select distinct name from permissions where (user = ?
971
                    or user in (select account || ':' || gname from groups where user = ?))'''
972
        param = (user, user)
973
        if prefix:
974
            sql += ' and name like ?'
975
            param += (prefix + '/%',)
976
        c = self.con.execute(sql, param)
977
        return [x[0] for x in c.fetchall()]
978
    
979
    def _allowed_accounts(self, user):
980
        allow = set()
981
        for path in self._allowed_paths(user):
982
            allow.add(path.split('/', 1)[0])
983
        return sorted(allow)
984
    
985
    def _allowed_containers(self, user, account):
986
        allow = set()
987
        for path in self._allowed_paths(user, account):
988
            allow.add(path.split('/', 2)[1])
989
        return sorted(allow)
990
    
991
    def _shared_paths(self, prefix):
992
        sql = 'select distinct name from permissions where name like ?'
993
        c = self.con.execute(sql, (prefix + '/%',))
994
        return [x[0] for x in c.fetchall()]