Statistics
| Branch: | Tag: | Revision:

root / pithos / backends / simple.py @ d065f612

History | View | Annotate | Download (41.3 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
# 
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
# 
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
# 
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
# 
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
# 
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import os
35
import time
36
import sqlite3
37
import logging
38
import hashlib
39
import binascii
40

    
41
from base import NotAllowedError, BaseBackend
42
from pithos.lib.hashfiler import Mapper, Blocker
43

    
44

    
45
logger = logging.getLogger(__name__)
46

    
47
def backend_method(func=None, autocommit=1):
48
    if func is None:
49
        def fn(func):
50
            return backend_method(func, autocommit)
51
        return fn
52

    
53
    if not autocommit:
54
        return func
55
    def fn(self, *args, **kw):
56
        self.con.execute('begin deferred')
57
        try:
58
            ret = func(self, *args, **kw)
59
            self.con.commit()
60
            return ret
61
        except:
62
            self.con.rollback()
63
            raise
64
    return fn
65

    
66

    
67
class SimpleBackend(BaseBackend):
68
    """A simple backend.
69
    
70
    Uses SQLite for storage.
71
    """
72
    
73
    # TODO: Create account if not present in all functions.
74
    
75
    def __init__(self, db):
76
        self.hash_algorithm = 'sha256'
77
        self.block_size = 4 * 1024 * 1024 # 4MB
78
        
79
        self.default_policy = {'quota': 0, 'versioning': 'auto'}
80
        
81
        basepath = os.path.split(db)[0]
82
        if basepath and not os.path.exists(basepath):
83
            os.makedirs(basepath)
84
        if not os.path.isdir(basepath):
85
            raise RuntimeError("Cannot open database at '%s'" % (db,))
86
        
87
        self.con = sqlite3.connect(basepath + '/db', check_same_thread=False)
88
        
89
        sql = '''pragma foreign_keys = on'''
90
        self.con.execute(sql)
91
        
92
        sql = '''create table if not exists versions (
93
                    version_id integer primary key,
94
                    name text,
95
                    user text,
96
                    tstamp integer not null,
97
                    size integer default 0,
98
                    hide integer default 0)'''
99
        self.con.execute(sql)
100
        sql = '''create table if not exists metadata (
101
                    version_id integer,
102
                    key text,
103
                    value text,
104
                    primary key (version_id, key)
105
                    foreign key (version_id) references versions(version_id)
106
                    on delete cascade)'''
107
        self.con.execute(sql)
108
        sql = '''create table if not exists policy (
109
                    name text, key text, value text, primary key (name, key))'''
110
        self.con.execute(sql)
111
        
112
        # Access control tables.
113
        sql = '''create table if not exists groups (
114
                    account text, gname text, user text)'''
115
        self.con.execute(sql)
116
        sql = '''create table if not exists permissions (
117
                    name text, op text, user text)'''
118
        self.con.execute(sql)
119
        sql = '''create table if not exists public (
120
                    name text, primary key (name))'''
121
        self.con.execute(sql)
122
        
123
        self.con.commit()
124
        
125
        params = {'blocksize': self.block_size,
126
                  'blockpath': basepath + '/blocks',
127
                  'hashtype': self.hash_algorithm}
128
        self.blocker = Blocker(**params)
129
        
130
        params = {'mappath': basepath + '/maps',
131
                  'namelen': self.blocker.hashlen}
132
        self.mapper = Mapper(**params)
133
    
134
    @backend_method
135
    def list_accounts(self, user, marker=None, limit=10000):
136
        """Return a list of accounts the user can access."""
137
        
138
        allowed = self._allowed_accounts(user)
139
        start, limit = self._list_limits(allowed, marker, limit)
140
        return allowed[start:start + limit]
141
    
142
    @backend_method
143
    def get_account_meta(self, user, account, until=None):
144
        """Return a dictionary with the account metadata."""
145
        
146
        logger.debug("get_account_meta: %s %s", account, until)
147
        if user != account:
148
            if until or account not in self._allowed_accounts(user):
149
                raise NotAllowedError
150
        else:
151
            self._create_account(user, account)
152
        try:
153
            version_id, mtime = self._get_accountinfo(account, until)
154
        except NameError:
155
            # TODO: Make sure this doesn't happen.
156
            version_id = None
157
            mtime = 0
158
        count, bytes, tstamp = self._get_pathstats(account, until)
159
        if mtime > tstamp:
160
            tstamp = mtime
161
        if until is None:
162
            modified = tstamp
163
        else:
164
            modified = self._get_pathstats(account)[2] # Overall last modification
165
            if mtime > modified:
166
                modified = mtime
167
        
168
        # Proper count.
169
        sql = 'select count(name) from (%s) where name glob ? and not name glob ?'
170
        sql = sql % self._sql_until(until)
171
        c = self.con.execute(sql, (account + '/*', account + '/*/*'))
172
        row = c.fetchone()
173
        count = row[0]
174
        
175
        if user != account:
176
            meta = {'name': account}
177
        else:
178
            meta = self._get_metadata(account, version_id)
179
            meta.update({'name': account, 'count': count, 'bytes': bytes})
180
            if until is not None:
181
                meta.update({'until_timestamp': tstamp})
182
        meta.update({'modified': modified})
183
        return meta
184
    
185
    @backend_method
186
    def update_account_meta(self, user, account, meta, replace=False):
187
        """Update the metadata associated with the account."""
188
        
189
        logger.debug("update_account_meta: %s %s %s", account, meta, replace)
190
        if user != account:
191
            raise NotAllowedError
192
        self._put_metadata(user, account, meta, replace, False)
193
    
194
    @backend_method
195
    def get_account_groups(self, user, account):
196
        """Return a dictionary with the user groups defined for this account."""
197
        
198
        logger.debug("get_account_groups: %s", account)
199
        if user != account:
200
            if account not in self._allowed_accounts(user):
201
                raise NotAllowedError
202
            return {}
203
        self._create_account(user, account)
204
        return self._get_groups(account)
205
    
206
    @backend_method
207
    def update_account_groups(self, user, account, groups, replace=False):
208
        """Update the groups associated with the account."""
209
        
210
        logger.debug("update_account_groups: %s %s %s", account, groups, replace)
211
        if user != account:
212
            raise NotAllowedError
213
        self._create_account(user, account)
214
        self._check_groups(groups)
215
        self._put_groups(account, groups, replace)
216
    
217
    @backend_method
218
    def put_account(self, user, account):
219
        """Create a new account with the given name."""
220
        
221
        logger.debug("put_account: %s", account)
222
        if user != account:
223
            raise NotAllowedError
224
        try:
225
            version_id, mtime = self._get_accountinfo(account)
226
        except NameError:
227
            pass
228
        else:
229
            raise NameError('Account already exists')
230
        self._put_version(account, user)
231
    
232
    @backend_method
233
    def delete_account(self, user, account):
234
        """Delete the account with the given name."""
235
        
236
        logger.debug("delete_account: %s", account)
237
        if user != account:
238
            raise NotAllowedError
239
        count = self._get_pathstats(account)[0]
240
        if count > 0:
241
            raise IndexError('Account is not empty')
242
        sql = 'delete from versions where name = ?'
243
        self.con.execute(sql, (account,))
244
        self._del_groups(account)
245
    
246
    @backend_method
247
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None):
248
        """Return a list of containers existing under an account."""
249
        
250
        logger.debug("list_containers: %s %s %s %s", account, marker, limit, until)
251
        if user != account:
252
            if until or account not in self._allowed_accounts(user):
253
                raise NotAllowedError
254
            allowed = self._allowed_containers(user, account)
255
            start, limit = self._list_limits(allowed, marker, limit)
256
            return allowed[start:start + limit]
257
        else:
258
            if shared:
259
                allowed = [x.split('/', 2)[1] for x in self._shared_paths(account)]
260
                start, limit = self._list_limits(allowed, marker, limit)
261
                return allowed[start:start + limit]
262
        return [x[0] for x in self._list_objects(account, '', '/', marker, limit, False, [], until)]
263
    
264
    @backend_method
265
    def get_container_meta(self, user, account, container, until=None):
266
        """Return a dictionary with the container metadata."""
267
        
268
        logger.debug("get_container_meta: %s %s %s", account, container, until)
269
        if user != account:
270
            if until or container not in self._allowed_containers(user, account):
271
                raise NotAllowedError
272
        path, version_id, mtime = self._get_containerinfo(account, container, until)
273
        count, bytes, tstamp = self._get_pathstats(path, until)
274
        if mtime > tstamp:
275
            tstamp = mtime
276
        if until is None:
277
            modified = tstamp
278
        else:
279
            modified = self._get_pathstats(path)[2] # Overall last modification
280
            if mtime > modified:
281
                modified = mtime
282
        
283
        if user != account:
284
            meta = {'name': container, 'modified': modified}
285
        else:
286
            meta = self._get_metadata(path, version_id)
287
            meta.update({'name': container, 'count': count, 'bytes': bytes, 'modified': modified})
288
            if until is not None:
289
                meta.update({'until_timestamp': tstamp})
290
        return meta
291
    
292
    @backend_method
293
    def update_container_meta(self, user, account, container, meta, replace=False):
294
        """Update the metadata associated with the container."""
295
        
296
        logger.debug("update_container_meta: %s %s %s %s", account, container, meta, replace)
297
        if user != account:
298
            raise NotAllowedError
299
        path, version_id, mtime = self._get_containerinfo(account, container)
300
        self._put_metadata(user, path, meta, replace, False)
301
    
302
    @backend_method
303
    def get_container_policy(self, user, account, container):
304
        """Return a dictionary with the container policy."""
305
        
306
        logger.debug("get_container_policy: %s %s", account, container)
307
        if user != account:
308
            if container not in self._allowed_containers(user, account):
309
                raise NotAllowedError
310
            return {}
311
        path = self._get_containerinfo(account, container)[0]
312
        return self._get_policy(path)
313
    
314
    @backend_method
315
    def update_container_policy(self, user, account, container, policy, replace=False):
316
        """Update the policy associated with the account."""
317
        
318
        logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace)
319
        if user != account:
320
            raise NotAllowedError
321
        path = self._get_containerinfo(account, container)[0]
322
        self._check_policy(policy)
323
        if replace:
324
            for k, v in self.default_policy.iteritems():
325
                if k not in policy:
326
                    policy[k] = v
327
        for k, v in policy.iteritems():
328
            sql = 'insert or replace into policy (name, key, value) values (?, ?, ?)'
329
            self.con.execute(sql, (path, k, v))
330
    
331
    @backend_method
332
    def put_container(self, user, account, container, policy=None):
333
        """Create a new container with the given name."""
334
        
335
        logger.debug("put_container: %s %s %s", account, container, policy)
336
        if user != account:
337
            raise NotAllowedError
338
        try:
339
            path, version_id, mtime = self._get_containerinfo(account, container)
340
        except NameError:
341
            pass
342
        else:
343
            raise NameError('Container already exists')
344
        if policy:
345
            self._check_policy(policy)
346
        path = os.path.join(account, container)
347
        version_id = self._put_version(path, user)[0]
348
        for k, v in self.default_policy.iteritems():
349
            if k not in policy:
350
                policy[k] = v
351
        for k, v in policy.iteritems():
352
            sql = 'insert or replace into policy (name, key, value) values (?, ?, ?)'
353
            self.con.execute(sql, (path, k, v))
354
    
355
    @backend_method
356
    def delete_container(self, user, account, container, until=None):
357
        """Delete/purge the container with the given name."""
358
        
359
        logger.debug("delete_container: %s %s %s", account, container, until)
360
        if user != account:
361
            raise NotAllowedError
362
        path, version_id, mtime = self._get_containerinfo(account, container)
363
        
364
        if until is not None:
365
            sql = '''select version_id from versions where name like ? and tstamp <= ?
366
                        and version_id not in (select version_id from (%s))'''
367
            sql = sql % self._sql_until() # Do not delete current versions.
368
            c = self.con.execute(sql, (path + '/%', until))
369
            for v in [x[0] for x in c.fetchall()]:
370
                self._del_version(v)
371
            return
372
        
373
        count = self._get_pathstats(path)[0]
374
        if count > 0:
375
            raise IndexError('Container is not empty')
376
        sql = 'delete from versions where name = ? or name like ?' # May contain hidden items.
377
        self.con.execute(sql, (path, path + '/%',))
378
        sql = 'delete from policy where name = ?'
379
        self.con.execute(sql, (path,))
380
        self._copy_version(user, account, account, True, False) # New account version (for timestamp update).
381
    
382
    @backend_method
383
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], shared=False, until=None):
384
        """Return a list of objects existing under a container."""
385
        
386
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, keys, shared, until)
387
        allowed = []
388
        if user != account:
389
            if until:
390
                raise NotAllowedError
391
            allowed = self._allowed_paths(user, os.path.join(account, container))
392
            if not allowed:
393
                raise NotAllowedError
394
        else:
395
            if shared:
396
                allowed = self._shared_paths(os.path.join(account, container))
397
        path, version_id, mtime = self._get_containerinfo(account, container, until)
398
        return self._list_objects(path, prefix, delimiter, marker, limit, virtual, keys, until, allowed)
399
    
400
    @backend_method
401
    def list_object_meta(self, user, account, container, until=None):
402
        """Return a list with all the container's object meta keys."""
403
        
404
        logger.debug("list_object_meta: %s %s %s", account, container, until)
405
        allowed = []
406
        if user != account:
407
            if until:
408
                raise NotAllowedError
409
            allowed = self._allowed_paths(user, os.path.join(account, container))
410
            if not allowed:
411
                raise NotAllowedError
412
        path, version_id, mtime = self._get_containerinfo(account, container, until)
413
        sql = '''select distinct m.key from (%s) o, metadata m
414
                    where m.version_id = o.version_id and o.name like ?'''
415
        sql = sql % self._sql_until(until)
416
        param = (path + '/%',)
417
        if allowed:
418
            for x in allowed:
419
                sql += ' and o.name like ?'
420
                param += (x,)
421
        c = self.con.execute(sql, param)
422
        return [x[0] for x in c.fetchall()]
423
    
424
    @backend_method
425
    def get_object_meta(self, user, account, container, name, version=None):
426
        """Return a dictionary with the object metadata."""
427
        
428
        logger.debug("get_object_meta: %s %s %s %s", account, container, name, version)
429
        self._can_read(user, account, container, name)
430
        path, version_id, muser, mtime, size = self._get_objectinfo(account, container, name, version)
431
        if version is None:
432
            modified = mtime
433
        else:
434
            modified = self._get_version(path, version)[2] # Overall last modification
435
        
436
        meta = self._get_metadata(path, version_id)
437
        meta.update({'name': name, 'bytes': size})
438
        meta.update({'version': version_id, 'version_timestamp': mtime})
439
        meta.update({'modified': modified, 'modified_by': muser})
440
        return meta
441
    
442
    @backend_method
443
    def update_object_meta(self, user, account, container, name, meta, replace=False):
444
        """Update the metadata associated with the object."""
445
        
446
        logger.debug("update_object_meta: %s %s %s %s %s", account, container, name, meta, replace)
447
        self._can_write(user, account, container, name)
448
        path, version_id, muser, mtime, size = self._get_objectinfo(account, container, name)
449
        self._put_metadata(user, path, meta, replace)
450
    
451
    @backend_method
452
    def get_object_permissions(self, user, account, container, name):
453
        """Return the path from which this object gets its permissions from,\
454
        along with a dictionary containing the permissions."""
455
        
456
        logger.debug("get_object_permissions: %s %s %s", account, container, name)
457
        self._can_read(user, account, container, name)
458
        path = self._get_objectinfo(account, container, name)[0]
459
        return self._get_permissions(path)
460
    
461
    @backend_method
462
    def update_object_permissions(self, user, account, container, name, permissions):
463
        """Update the permissions associated with the object."""
464
        
465
        logger.debug("update_object_permissions: %s %s %s %s", account, container, name, permissions)
466
        if user != account:
467
            raise NotAllowedError
468
        path = self._get_objectinfo(account, container, name)[0]
469
        r, w = self._check_permissions(path, permissions)
470
        self._put_permissions(path, r, w)
471
    
472
    @backend_method
473
    def get_object_public(self, user, account, container, name):
474
        """Return the public URL of the object if applicable."""
475
        
476
        logger.debug("get_object_public: %s %s %s", account, container, name)
477
        self._can_read(user, account, container, name)
478
        path = self._get_objectinfo(account, container, name)[0]
479
        if self._get_public(path):
480
            return '/public/' + path
481
        return None
482
    
483
    @backend_method
484
    def update_object_public(self, user, account, container, name, public):
485
        """Update the public status of the object."""
486
        
487
        logger.debug("update_object_public: %s %s %s %s", account, container, name, public)
488
        self._can_write(user, account, container, name)
489
        path = self._get_objectinfo(account, container, name)[0]
490
        self._put_public(path, public)
491
    
492
    @backend_method
493
    def get_object_hashmap(self, user, account, container, name, version=None):
494
        """Return the object's size and a list with partial hashes."""
495
        
496
        logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
497
        self._can_read(user, account, container, name)
498
        path, version_id, muser, mtime, size = self._get_objectinfo(account, container, name, version)
499
        hashmap = self.mapper.map_retr(version_id)
500
        return size, [binascii.hexlify(x) for x in hashmap]
501
    
502
    @backend_method
503
    def update_object_hashmap(self, user, account, container, name, size, hashmap, meta={}, replace_meta=False, permissions=None):
504
        """Create/update an object with the specified size and partial hashes."""
505
        
506
        logger.debug("update_object_hashmap: %s %s %s %s %s", account, container, name, size, hashmap)
507
        if permissions is not None and user != account:
508
            raise NotAllowedError
509
        self._can_write(user, account, container, name)
510
        missing = self.blocker.block_ping([binascii.unhexlify(x) for x in hashmap])
511
        if missing:
512
            ie = IndexError()
513
            ie.data = missing
514
            raise ie
515
        path = self._get_containerinfo(account, container)[0]
516
        path = os.path.join(path, name)
517
        if permissions is not None:
518
            r, w = self._check_permissions(path, permissions)
519
        src_version_id, dest_version_id = self._copy_version(user, path, path, not replace_meta, False)
520
        sql = 'update versions set size = ? where version_id = ?'
521
        self.con.execute(sql, (size, dest_version_id))
522
        self.mapper.map_stor(dest_version_id, [binascii.unhexlify(x) for x in hashmap])
523
        for k, v in meta.iteritems():
524
            sql = 'insert or replace into metadata (version_id, key, value) values (?, ?, ?)'
525
            self.con.execute(sql, (dest_version_id, k, v))
526
        if permissions is not None:
527
            self._put_permissions(path, r, w)
528
    
529
    @backend_method
530
    def copy_object(self, user, account, src_container, src_name, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None):
531
        """Copy an object's data and metadata."""
532
        
533
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s", account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version)
534
        if permissions is not None and user != account:
535
            raise NotAllowedError
536
        self._can_read(user, account, src_container, src_name)
537
        self._can_write(user, account, dest_container, dest_name)
538
        self._get_containerinfo(account, src_container)
539
        if src_version is None:
540
            src_path = self._get_objectinfo(account, src_container, src_name)[0]
541
        else:
542
            src_path = os.path.join(account, src_container, src_name)
543
        dest_path = self._get_containerinfo(account, dest_container)[0]
544
        dest_path = os.path.join(dest_path, dest_name)
545
        if permissions is not None:
546
            r, w = self._check_permissions(dest_path, permissions)
547
        src_version_id, dest_version_id = self._copy_version(user, src_path, dest_path, not replace_meta, True, src_version)
548
        for k, v in dest_meta.iteritems():
549
            sql = 'insert or replace into metadata (version_id, key, value) values (?, ?, ?)'
550
            self.con.execute(sql, (dest_version_id, k, v))
551
        if permissions is not None:
552
            self._put_permissions(dest_path, r, w)
553
    
554
    @backend_method
555
    def move_object(self, user, account, src_container, src_name, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None):
556
        """Move an object's data and metadata."""
557
        
558
        logger.debug("move_object: %s %s %s %s %s %s %s %s", account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions)
559
        self.copy_object(user, account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions, None)
560
        self.delete_object(user, account, src_container, src_name)
561
    
562
    @backend_method
563
    def delete_object(self, user, account, container, name, until=None):
564
        """Delete/purge an object."""
565
        
566
        logger.debug("delete_object: %s %s %s %s", account, container, name, until)
567
        if user != account:
568
            raise NotAllowedError
569
        
570
        if until is not None:
571
            path = os.path.join(account, container, name)
572
            sql = '''select version_id from versions where name = ? and tstamp <= ?'''
573
            c = self.con.execute(sql, (path, until))
574
            for v in [x[0] in c.fetchall()]:
575
                self._del_version(v)
576
            try:
577
                version_id = self._get_version(path)[0]
578
            except NameError:
579
                pass
580
            else:
581
                self._del_sharing(path)
582
            return
583
        
584
        path = self._get_objectinfo(account, container, name)[0]
585
        self._put_version(path, user, 0, 1)
586
        self._del_sharing(path)
587
    
588
    @backend_method
589
    def list_versions(self, user, account, container, name):
590
        """Return a list of all (version, version_timestamp) tuples for an object."""
591
        
592
        logger.debug("list_versions: %s %s %s", account, container, name)
593
        self._can_read(user, account, container, name)
594
        # This will even show deleted versions.
595
        path = os.path.join(account, container, name)
596
        sql = '''select distinct version_id, tstamp from versions where name = ? and hide = 0'''
597
        c = self.con.execute(sql, (path,))
598
        return [(int(x[0]), int(x[1])) for x in c.fetchall()]
599
    
600
    @backend_method(autocommit=0)
601
    def get_block(self, hash):
602
        """Return a block's data."""
603
        
604
        logger.debug("get_block: %s", hash)
605
        blocks = self.blocker.block_retr((binascii.unhexlify(hash),))
606
        if not blocks:
607
            raise NameError('Block does not exist')
608
        return blocks[0]
609
    
610
    @backend_method(autocommit=0)
611
    def put_block(self, data):
612
        """Create a block and return the hash."""
613
        
614
        logger.debug("put_block: %s", len(data))
615
        hashes, absent = self.blocker.block_stor((data,))
616
        return binascii.hexlify(hashes[0])
617
    
618
    @backend_method(autocommit=0)
619
    def update_block(self, hash, data, offset=0):
620
        """Update a known block and return the hash."""
621
        
622
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
623
        if offset == 0 and len(data) == self.block_size:
624
            return self.put_block(data)
625
        h, e = self.blocker.block_delta(binascii.unhexlify(hash), ((offset, data),))
626
        return binascii.hexlify(h)
627
    
628
    def _sql_until(self, until=None):
629
        """Return the sql to get the latest versions until the timestamp given."""
630
        if until is None:
631
            until = int(time.time())
632
        sql = '''select version_id, name, tstamp, size from versions v
633
                    where version_id = (select max(version_id) from versions
634
                                        where v.name = name and tstamp <= %s)
635
                    and hide = 0'''
636
        return sql % (until,)
637
    
638
    def _get_pathstats(self, path, until=None):
639
        """Return count and sum of size of everything under path and latest timestamp."""
640
        
641
        sql = 'select count(version_id), total(size), max(tstamp) from (%s) where name like ?'
642
        sql = sql % self._sql_until(until)
643
        c = self.con.execute(sql, (path + '/%',))
644
        row = c.fetchone()
645
        tstamp = row[2] if row[2] is not None else 0
646
        return int(row[0]), int(row[1]), int(tstamp)
647
    
648
    def _get_version(self, path, version=None):
649
        if version is None:
650
            sql = '''select version_id, user, tstamp, size, hide from versions where name = ?
651
                        order by version_id desc limit 1'''
652
            c = self.con.execute(sql, (path,))
653
            row = c.fetchone()
654
            if not row or int(row[4]):
655
                raise NameError('Object does not exist')
656
        else:
657
            # The database (sqlite) will not complain if the version is not an integer.
658
            sql = '''select version_id, user, tstamp, size from versions where name = ?
659
                        and version_id = ?'''
660
            c = self.con.execute(sql, (path, version))
661
            row = c.fetchone()
662
            if not row:
663
                raise IndexError('Version does not exist')
664
        return str(row[0]), str(row[1]), int(row[2]), int(row[3])
665
    
666
    def _put_version(self, path, user, size=0, hide=0):
667
        tstamp = int(time.time())
668
        sql = 'insert into versions (name, user, tstamp, size, hide) values (?, ?, ?, ?, ?)'
669
        id = self.con.execute(sql, (path, user, tstamp, size, hide)).lastrowid
670
        return str(id), tstamp
671
    
672
    def _copy_version(self, user, src_path, dest_path, copy_meta=True, copy_data=True, src_version=None):
673
        if src_version is not None:
674
            src_version_id, muser, mtime, size = self._get_version(src_path, src_version)
675
        else:
676
            # Latest or create from scratch.
677
            try:
678
                src_version_id, muser, mtime, size = self._get_version(src_path)
679
            except NameError:
680
                src_version_id = None
681
                size = 0
682
        if not copy_data:
683
            size = 0
684
        dest_version_id = self._put_version(dest_path, user, size)[0]
685
        if copy_meta and src_version_id is not None:
686
            sql = 'insert into metadata select %s, key, value from metadata where version_id = ?'
687
            sql = sql % dest_version_id
688
            self.con.execute(sql, (src_version_id,))
689
        if copy_data and src_version_id is not None:
690
            # TODO: Copy properly.
691
            hashmap = self.mapper.map_retr(src_version_id)
692
            self.mapper.map_stor(dest_version_id, hashmap)
693
        return src_version_id, dest_version_id
694
    
695
    def _get_versioninfo(self, account, container, name, until=None):
696
        """Return path, latest version, associated timestamp and size until the timestamp given."""
697
        
698
        p = (account, container, name)
699
        try:
700
            p = p[:p.index(None)]
701
        except ValueError:
702
            pass
703
        path = os.path.join(*p)
704
        sql = '''select version_id, tstamp, size from (%s) where name = ?'''
705
        sql = sql % self._sql_until(until)
706
        c = self.con.execute(sql, (path,))
707
        row = c.fetchone()
708
        if row is None:
709
            raise NameError('Path does not exist')
710
        return path, str(row[0]), int(row[1]), int(row[2])
711
    
712
    def _get_accountinfo(self, account, until=None):
713
        try:
714
            path, version_id, mtime, size = self._get_versioninfo(account, None, None, until)
715
            return version_id, mtime
716
        except:
717
            raise NameError('Account does not exist')
718
    
719
    def _get_containerinfo(self, account, container, until=None):
720
        try:
721
            path, version_id, mtime, size = self._get_versioninfo(account, container, None, until)
722
            return path, version_id, mtime
723
        except:
724
            raise NameError('Container does not exist')
725
    
726
    def _get_objectinfo(self, account, container, name, version=None):
727
        path = os.path.join(account, container, name)
728
        version_id, muser, mtime, size = self._get_version(path, version)
729
        return path, version_id, muser, mtime, size
730
    
731
    def _create_account(self, user, account):
732
        try:
733
            self._get_accountinfo(account)
734
        except NameError:
735
            self._put_version(account, user)
736
    
737
    def _get_metadata(self, path, version):
738
        sql = 'select key, value from metadata where version_id = ?'
739
        c = self.con.execute(sql, (version,))
740
        return dict(c.fetchall())
741
    
742
    def _put_metadata(self, user, path, meta, replace=False, copy_data=True):
743
        """Create a new version and store metadata."""
744
        
745
        src_version_id, dest_version_id = self._copy_version(user, path, path, not replace, copy_data)
746
        for k, v in meta.iteritems():
747
            if not replace and v == '':
748
                sql = 'delete from metadata where version_id = ? and key = ?'
749
                self.con.execute(sql, (dest_version_id, k))
750
            else:
751
                sql = 'insert or replace into metadata (version_id, key, value) values (?, ?, ?)'
752
                self.con.execute(sql, (dest_version_id, k, v))
753
    
754
    def _check_policy(self, policy):
755
        for k in policy.keys():
756
            if policy[k] == '':
757
                policy[k] = self.default_policy.get(k)
758
        for k, v in policy.iteritems():
759
            if k == 'quota':
760
                q = int(v) # May raise ValueError.
761
                if q < 0:
762
                    raise ValueError
763
            elif k == 'versioning':
764
                if v not in ['auto', 'manual', 'none']:
765
                    raise ValueError
766
            else:
767
                raise ValueError
768
    
769
    def _get_policy(self, path):
770
        sql = 'select key, value from policy where name = ?'
771
        c = self.con.execute(sql, (path,))
772
        return dict(c.fetchall())
773
    
774
    def _list_limits(self, listing, marker, limit):
775
        start = 0
776
        if marker:
777
            try:
778
                start = listing.index(marker) + 1
779
            except ValueError:
780
                pass
781
        if not limit or limit > 10000:
782
            limit = 10000
783
        return start, limit
784
    
785
    def _list_objects(self, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], until=None, allowed=[]):
786
        cont_prefix = path + '/'
787
        if keys and len(keys) > 0:
788
            sql = '''select distinct o.name, o.version_id from (%s) o, metadata m where o.name like ? and
789
                        m.version_id = o.version_id and m.key in (%s)'''
790
            sql = sql % (self._sql_until(until), ', '.join('?' * len(keys)))
791
            param = (cont_prefix + prefix + '%',) + tuple(keys)
792
            if allowed:
793
                for x in allowed:
794
                    sql += ' and o.name like ?'
795
                    param += (x,)
796
            sql += ' order by o.name'
797
        else:
798
            sql = 'select name, version_id from (%s) where name like ?'
799
            sql = sql % self._sql_until(until)
800
            param = (cont_prefix + prefix + '%',)
801
            if allowed:
802
                for x in allowed:
803
                    sql += ' and name like ?'
804
                    param += (x,)
805
            sql += ' order by name'
806
        c = self.con.execute(sql, param)
807
        objects = [(x[0][len(cont_prefix):], x[1]) for x in c.fetchall()]
808
        if delimiter:
809
            pseudo_objects = []
810
            for x in objects:
811
                pseudo_name = x[0]
812
                i = pseudo_name.find(delimiter, len(prefix))
813
                if not virtual:
814
                    # If the delimiter is not found, or the name ends
815
                    # with the delimiter's first occurence.
816
                    if i == -1 or len(pseudo_name) == i + len(delimiter):
817
                        pseudo_objects.append(x)
818
                else:
819
                    # If the delimiter is found, keep up to (and including) the delimiter.
820
                    if i != -1:
821
                        pseudo_name = pseudo_name[:i + len(delimiter)]
822
                    if pseudo_name not in [y[0] for y in pseudo_objects]:
823
                        if pseudo_name == x[0]:
824
                            pseudo_objects.append(x)
825
                        else:
826
                            pseudo_objects.append((pseudo_name, None))
827
            objects = pseudo_objects
828
        
829
        start, limit = self._list_limits([x[0] for x in objects], marker, limit)
830
        return objects[start:start + limit]
831
    
832
    def _del_version(self, version):
833
        self.mapper.map_remv(version)
834
        sql = 'delete from versions where version_id = ?'
835
        self.con.execute(sql, (version,))
836
    
837
    # Access control functions.
838
    
839
    def _check_groups(self, groups):
840
        # Example follows.
841
        # for k, v in groups.iteritems():
842
        #     if True in [False or ',' in x for x in v]:
843
        #         raise ValueError('Bad characters in groups')
844
        pass
845
    
846
    def _get_groups(self, account):
847
        sql = 'select gname, user from groups where account = ?'
848
        c = self.con.execute(sql, (account,))
849
        groups = {}
850
        for row in c.fetchall():
851
            if row[0] not in groups:
852
                groups[row[0]] = []
853
            groups[row[0]].append(row[1])
854
        return groups
855
    
856
    def _put_groups(self, account, groups, replace=False):
857
        if replace:
858
            self._del_groups(account)
859
        for k, v in groups.iteritems():
860
            sql = 'delete from groups where account = ? and gname = ?'
861
            self.con.execute(sql, (account, k))
862
            if v:
863
                sql = 'insert into groups (account, gname, user) values (?, ?, ?)'
864
                self.con.executemany(sql, [(account, k, x) for x in v])
865
    
866
    def _del_groups(self, account):
867
        sql = 'delete from groups where account = ?'
868
        self.con.execute(sql, (account,))
869
    
870
    def _check_permissions(self, path, permissions):
871
        # Check for existing permissions.
872
        sql = '''select name from permissions
873
                    where name != ? and (name like ? or ? like name || ?)'''
874
        c = self.con.execute(sql, (path, path + '%', path, '%'))
875
        row = c.fetchone()
876
        if row:
877
            ae = AttributeError()
878
            ae.data = row[0]
879
            raise ae
880
        
881
        # Format given permissions.
882
        if len(permissions) == 0:
883
            return [], []
884
        r = permissions.get('read', [])
885
        w = permissions.get('write', [])
886
        # Examples follow.
887
        # if True in [False or ',' in x for x in r]:
888
        #     raise ValueError('Bad characters in read permissions')
889
        # if True in [False or ',' in x for x in w]:
890
        #     raise ValueError('Bad characters in write permissions')
891
        return r, w
892
    
893
    def _get_permissions(self, path):
894
        # Check for permissions at path or above.
895
        sql = 'select name, op, user from permissions where ? like name || ?'
896
        c = self.con.execute(sql, (path, '%'))
897
        name = path
898
        perms = {} # Return nothing, if nothing is set.
899
        for row in c.fetchall():
900
            name = row[0]
901
            if row[1] not in perms:
902
                perms[row[1]] = []
903
            perms[row[1]].append(row[2])
904
        return name, perms
905
    
906
    def _put_permissions(self, path, r, w):
907
        sql = 'delete from permissions where name = ?'
908
        self.con.execute(sql, (path,))
909
        sql = 'insert into permissions (name, op, user) values (?, ?, ?)'
910
        if r:
911
            self.con.executemany(sql, [(path, 'read', x) for x in r])
912
        if w:
913
            self.con.executemany(sql, [(path, 'write', x) for x in w])
914
    
915
    def _get_public(self, path):
916
        sql = 'select name from public where name = ?'
917
        c = self.con.execute(sql, (path,))
918
        row = c.fetchone()
919
        if not row:
920
            return False
921
        return True
922
    
923
    def _put_public(self, path, public):
924
        if not public:
925
            sql = 'delete from public where name = ?'
926
        else:
927
            sql = 'insert or replace into public (name) values (?)'
928
        self.con.execute(sql, (path,))
929
    
930
    def _del_sharing(self, path):
931
        sql = 'delete from permissions where name = ?'
932
        self.con.execute(sql, (path,))
933
        sql = 'delete from public where name = ?'
934
        self.con.execute(sql, (path,))
935
    
936
    def _is_allowed(self, user, account, container, name, op='read'):
937
        if user == account:
938
            return True
939
        path = os.path.join(account, container, name)
940
        if op == 'read' and self._get_public(path):
941
            return True
942
        perm_path, perms = self._get_permissions(path)
943
        
944
        # Expand groups.
945
        for x in ('read', 'write'):
946
            g_perms = set()
947
            for y in perms.get(x, []):
948
                if ':' in y:
949
                    g_account, g_name = y.split(':', 1)
950
                    groups = self._get_groups(g_account)
951
                    if g_name in groups:
952
                        g_perms.update(groups[g_name])
953
                else:
954
                    g_perms.add(y)
955
            perms[x] = g_perms
956
        
957
        if op == 'read' and ('*' in perms['read'] or user in perms['read']):
958
            return True
959
        if '*' in perms['write'] or user in perms['write']:
960
            return True
961
        return False
962
    
963
    def _can_read(self, user, account, container, name):
964
        if not self._is_allowed(user, account, container, name, 'read'):
965
            raise NotAllowedError
966
    
967
    def _can_write(self, user, account, container, name):
968
        if not self._is_allowed(user, account, container, name, 'write'):
969
            raise NotAllowedError
970
    
971
    def _allowed_paths(self, user, prefix=None):
972
        sql = '''select distinct name from permissions where (user = ?
973
                    or user in (select account || ':' || gname from groups where user = ?))'''
974
        param = (user, user)
975
        if prefix:
976
            sql += ' and name like ?'
977
            param += (prefix + '/%',)
978
        c = self.con.execute(sql, param)
979
        return [x[0] for x in c.fetchall()]
980
    
981
    def _allowed_accounts(self, user):
982
        allow = set()
983
        for path in self._allowed_paths(user):
984
            allow.add(path.split('/', 1)[0])
985
        return sorted(allow)
986
    
987
    def _allowed_containers(self, user, account):
988
        allow = set()
989
        for path in self._allowed_paths(user, account):
990
            allow.add(path.split('/', 2)[1])
991
        return sorted(allow)
992
    
993
    def _shared_paths(self, prefix):
994
        sql = 'select distinct name from permissions where name like ?'
995
        c = self.con.execute(sql, (prefix + '/%',))
996
        return [x[0] for x in c.fetchall()]