Statistics
| Branch: | Tag: | Revision:

root / pithos / backends / simple.py @ b18ef3ad

History | View | Annotate | Download (40.9 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
# 
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
# 
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
# 
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
# 
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
# 
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import os
35
import time
36
import sqlite3
37
import logging
38
import hashlib
39
import binascii
40

    
41
from base import NotAllowedError, BaseBackend
42
from pithos.lib.hashfiler import Mapper, Blocker
43

    
44

    
45
logger = logging.getLogger(__name__)
46

    
47
def backend_method(func=None, autocommit=1):
48
    if func is None:
49
        def fn(func):
50
            return backend_method(func, autocommit)
51
        return fn
52

    
53
    if not autocommit:
54
        return func
55
    def fn(self, *args, **kw):
56
        self.con.execute('begin deferred')
57
        try:
58
            ret = func(self, *args, **kw)
59
            self.con.commit()
60
            return ret
61
        except:
62
            self.con.rollback()
63
            raise
64
    return fn
65

    
66

    
67
class SimpleBackend(BaseBackend):
68
    """A simple backend.
69
    
70
    Uses SQLite for storage.
71
    """
72
    
73
    def __init__(self, db):
74
        self.hash_algorithm = 'sha256'
75
        self.block_size = 4 * 1024 * 1024 # 4MB
76
        
77
        self.default_policy = {'quota': 0, 'versioning': 'auto'}
78
        
79
        basepath = os.path.split(db)[0]
80
        if basepath and not os.path.exists(basepath):
81
            os.makedirs(basepath)
82
        if not os.path.isdir(basepath):
83
            raise RuntimeError("Cannot open database at '%s'" % (db,))
84
        
85
        self.con = sqlite3.connect(basepath + '/db', check_same_thread=False)
86
        
87
        sql = '''pragma foreign_keys = on'''
88
        self.con.execute(sql)
89
        
90
        sql = '''create table if not exists versions (
91
                    version_id integer primary key,
92
                    name text,
93
                    user text,
94
                    tstamp integer not null,
95
                    size integer default 0,
96
                    hide integer default 0)'''
97
        self.con.execute(sql)
98
        sql = '''create table if not exists metadata (
99
                    version_id integer,
100
                    key text,
101
                    value text,
102
                    primary key (version_id, key)
103
                    foreign key (version_id) references versions(version_id)
104
                    on delete cascade)'''
105
        self.con.execute(sql)
106
        sql = '''create table if not exists policy (
107
                    name text, key text, value text, primary key (name, key))'''
108
        self.con.execute(sql)
109
        
110
        # Access control tables.
111
        sql = '''create table if not exists groups (
112
                    account text, gname text, user text)'''
113
        self.con.execute(sql)
114
        sql = '''create table if not exists permissions (
115
                    name text, op text, user text)'''
116
        self.con.execute(sql)
117
        sql = '''create table if not exists public (
118
                    name text, primary key (name))'''
119
        self.con.execute(sql)
120
        
121
        self.con.commit()
122
        
123
        params = {'blocksize': self.block_size,
124
                  'blockpath': basepath + '/blocks',
125
                  'hashtype': self.hash_algorithm}
126
        self.blocker = Blocker(**params)
127
        
128
        params = {'mappath': basepath + '/maps',
129
                  'namelen': self.blocker.hashlen}
130
        self.mapper = Mapper(**params)
131
    
132
    @backend_method
133
    def list_accounts(self, user, marker=None, limit=10000):
134
        """Return a list of accounts the user can access."""
135
        
136
        allowed = self._allowed_accounts(user)
137
        start, limit = self._list_limits(allowed, marker, limit)
138
        return allowed[start:start + limit]
139
    
140
    @backend_method
141
    def get_account_meta(self, user, account, until=None):
142
        """Return a dictionary with the account metadata."""
143
        
144
        logger.debug("get_account_meta: %s %s", account, until)
145
        if user != account:
146
            if until or account not in self._allowed_accounts(user):
147
                raise NotAllowedError
148
        try:
149
            version_id, mtime = self._get_accountinfo(account, until)
150
        except NameError:
151
            version_id = None
152
            mtime = 0
153
        count, bytes, tstamp = self._get_pathstats(account, until)
154
        if mtime > tstamp:
155
            tstamp = mtime
156
        if until is None:
157
            modified = tstamp
158
        else:
159
            modified = self._get_pathstats(account)[2] # Overall last modification
160
            if mtime > modified:
161
                modified = mtime
162
        
163
        # Proper count.
164
        sql = 'select count(name) from (%s) where name glob ? and not name glob ?'
165
        sql = sql % self._sql_until(until)
166
        c = self.con.execute(sql, (account + '/*', account + '/*/*'))
167
        row = c.fetchone()
168
        count = row[0]
169
        
170
        if user != account:
171
            meta = {'name': account}
172
        else:
173
            meta = self._get_metadata(account, version_id)
174
            meta.update({'name': account, 'count': count, 'bytes': bytes})
175
            if until is not None:
176
                meta.update({'until_timestamp': tstamp})
177
        if modified:
178
            meta.update({'modified': modified})
179
        return meta
180
    
181
    @backend_method
182
    def update_account_meta(self, user, account, meta, replace=False):
183
        """Update the metadata associated with the account."""
184
        
185
        logger.debug("update_account_meta: %s %s %s", account, meta, replace)
186
        if user != account:
187
            raise NotAllowedError
188
        self._put_metadata(user, account, meta, replace, False)
189
    
190
    @backend_method
191
    def get_account_groups(self, user, account):
192
        """Return a dictionary with the user groups defined for this account."""
193
        
194
        logger.debug("get_account_groups: %s", account)
195
        if user != account:
196
            if account not in self._allowed_accounts(user):
197
                raise NotAllowedError
198
            return {}
199
        return self._get_groups(account)
200
    
201
    @backend_method
202
    def update_account_groups(self, user, account, groups, replace=False):
203
        """Update the groups associated with the account."""
204
        
205
        logger.debug("update_account_groups: %s %s %s", account, groups, replace)
206
        if user != account:
207
            raise NotAllowedError
208
        self._check_groups(groups)
209
        self._put_groups(account, groups, replace)
210
    
211
    @backend_method
212
    def put_account(self, user, account):
213
        """Create a new account with the given name."""
214
        
215
        logger.debug("put_account: %s", account)
216
        if user != account:
217
            raise NotAllowedError
218
        try:
219
            version_id, mtime = self._get_accountinfo(account)
220
        except NameError:
221
            pass
222
        else:
223
            raise NameError('Account already exists')
224
        version_id = self._put_version(account, user)
225
    
226
    @backend_method
227
    def delete_account(self, user, account):
228
        """Delete the account with the given name."""
229
        
230
        logger.debug("delete_account: %s", account)
231
        if user != account:
232
            raise NotAllowedError
233
        count = self._get_pathstats(account)[0]
234
        if count > 0:
235
            raise IndexError('Account is not empty')
236
        sql = 'delete from versions where name = ?'
237
        self.con.execute(sql, (account,))
238
        self._del_groups(account)
239
    
240
    @backend_method
241
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None):
242
        """Return a list of containers existing under an account."""
243
        
244
        logger.debug("list_containers: %s %s %s %s", account, marker, limit, until)
245
        if user != account:
246
            if until or account not in self._allowed_accounts(user):
247
                raise NotAllowedError
248
            allowed = self._allowed_containers(user, account)
249
            start, limit = self._list_limits(allowed, marker, limit)
250
            return allowed[start:start + limit]
251
        else:
252
            if shared:
253
                allowed = [x.split('/', 2)[1] for x in self._shared_paths(account)]
254
                start, limit = self._list_limits(allowed, marker, limit)
255
                return allowed[start:start + limit]
256
        return [x[0] for x in self._list_objects(account, '', '/', marker, limit, False, [], until)]
257
    
258
    @backend_method
259
    def get_container_meta(self, user, account, container, until=None):
260
        """Return a dictionary with the container metadata."""
261
        
262
        logger.debug("get_container_meta: %s %s %s", account, container, until)
263
        if user != account:
264
            if until or container not in self._allowed_containers(user, account):
265
                raise NotAllowedError
266
        path, version_id, mtime = self._get_containerinfo(account, container, until)
267
        count, bytes, tstamp = self._get_pathstats(path, until)
268
        if mtime > tstamp:
269
            tstamp = mtime
270
        if until is None:
271
            modified = tstamp
272
        else:
273
            modified = self._get_pathstats(path)[2] # Overall last modification
274
            if mtime > modified:
275
                modified = mtime
276
        
277
        if user != account:
278
            meta = {'name': container, 'modified': modified}
279
        else:
280
            meta = self._get_metadata(path, version_id)
281
            meta.update({'name': container, 'count': count, 'bytes': bytes, 'modified': modified})
282
            if until is not None:
283
                meta.update({'until_timestamp': tstamp})
284
        return meta
285
    
286
    @backend_method
287
    def update_container_meta(self, user, account, container, meta, replace=False):
288
        """Update the metadata associated with the container."""
289
        
290
        logger.debug("update_container_meta: %s %s %s %s", account, container, meta, replace)
291
        if user != account:
292
            raise NotAllowedError
293
        path, version_id, mtime = self._get_containerinfo(account, container)
294
        self._put_metadata(user, path, meta, replace, False)
295
    
296
    @backend_method
297
    def get_container_policy(self, user, account, container):
298
        """Return a dictionary with the container policy."""
299
        
300
        logger.debug("get_container_policy: %s %s", account, container)
301
        if user != account:
302
            if container not in self._allowed_containers(user, account):
303
                raise NotAllowedError
304
            return {}
305
        path = self._get_containerinfo(account, container)[0]
306
        return self._get_policy(path)
307
    
308
    @backend_method
309
    def update_container_policy(self, user, account, container, policy, replace=False):
310
        """Update the policy associated with the account."""
311
        
312
        logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace)
313
        if user != account:
314
            raise NotAllowedError
315
        path = self._get_containerinfo(account, container)[0]
316
        self._check_policy(policy)
317
        if replace:
318
            for k, v in self.default_policy.iteritems():
319
                if k not in policy:
320
                    policy[k] = v
321
        for k, v in policy.iteritems():
322
            sql = 'insert or replace into policy (name, key, value) values (?, ?, ?)'
323
            self.con.execute(sql, (path, k, v))
324
    
325
    @backend_method
326
    def put_container(self, user, account, container, policy=None):
327
        """Create a new container with the given name."""
328
        
329
        logger.debug("put_container: %s %s %s", account, container, policy)
330
        if user != account:
331
            raise NotAllowedError
332
        try:
333
            path, version_id, mtime = self._get_containerinfo(account, container)
334
        except NameError:
335
            pass
336
        else:
337
            raise NameError('Container already exists')
338
        if policy:
339
            self._check_policy(policy)
340
        path = os.path.join(account, container)
341
        version_id = self._put_version(path, user)
342
        for k, v in self.default_policy.iteritems():
343
            if k not in policy:
344
                policy[k] = v
345
        for k, v in policy.iteritems():
346
            sql = 'insert or replace into policy (name, key, value) values (?, ?, ?)'
347
            self.con.execute(sql, (path, k, v))
348
    
349
    @backend_method
350
    def delete_container(self, user, account, container, until=None):
351
        """Delete/purge the container with the given name."""
352
        
353
        logger.debug("delete_container: %s %s %s", account, container, until)
354
        if user != account:
355
            raise NotAllowedError
356
        path, version_id, mtime = self._get_containerinfo(account, container)
357
        
358
        if until is not None:
359
            sql = '''select version_id from versions where name like ? and tstamp <= ?
360
                        and version_id not in (select version_id from (%s))'''
361
            sql = sql % self._sql_until() # Do not delete current versions.
362
            c = self.con.execute(sql, (path + '/%', until))
363
            for v in [x[0] for x in c.fetchall()]:
364
                self._del_version(v)
365
            return
366
        
367
        count = self._get_pathstats(path)[0]
368
        if count > 0:
369
            raise IndexError('Container is not empty')
370
        sql = 'delete from versions where name = ? or name like ?' # May contain hidden items.
371
        self.con.execute(sql, (path, path + '/%',))
372
        sql = 'delete from policy where name = ?'
373
        self.con.execute(sql, (path,))
374
        self._copy_version(user, account, account, True, False) # New account version (for timestamp update).
375
    
376
    @backend_method
377
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], shared=False, until=None):
378
        """Return a list of objects existing under a container."""
379
        
380
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, keys, shared, until)
381
        allowed = []
382
        if user != account:
383
            if until:
384
                raise NotAllowedError
385
            allowed = self._allowed_paths(user, os.path.join(account, container))
386
            if not allowed:
387
                raise NotAllowedError
388
        else:
389
            if shared:
390
                allowed = self._shared_paths(os.path.join(account, container))
391
        path, version_id, mtime = self._get_containerinfo(account, container, until)
392
        return self._list_objects(path, prefix, delimiter, marker, limit, virtual, keys, until, allowed)
393
    
394
    @backend_method
395
    def list_object_meta(self, user, account, container, until=None):
396
        """Return a list with all the container's object meta keys."""
397
        
398
        logger.debug("list_object_meta: %s %s %s", account, container, until)
399
        allowed = []
400
        if user != account:
401
            if until:
402
                raise NotAllowedError
403
            allowed = self._allowed_paths(user, os.path.join(account, container))
404
            if not allowed:
405
                raise NotAllowedError
406
        path, version_id, mtime = self._get_containerinfo(account, container, until)
407
        sql = '''select distinct m.key from (%s) o, metadata m
408
                    where m.version_id = o.version_id and o.name like ?'''
409
        sql = sql % self._sql_until(until)
410
        param = (path + '/%',)
411
        if allowed:
412
            for x in allowed:
413
                sql += ' and o.name like ?'
414
                param += (x,)
415
        c = self.con.execute(sql, param)
416
        return [x[0] for x in c.fetchall()]
417
    
418
    @backend_method
419
    def get_object_meta(self, user, account, container, name, version=None):
420
        """Return a dictionary with the object metadata."""
421
        
422
        logger.debug("get_object_meta: %s %s %s %s", account, container, name, version)
423
        self._can_read(user, account, container, name)
424
        path, version_id, muser, mtime, size = self._get_objectinfo(account, container, name, version)
425
        if version is None:
426
            modified = mtime
427
        else:
428
            modified = self._get_version(path, version)[2] # Overall last modification
429
        
430
        meta = self._get_metadata(path, version_id)
431
        meta.update({'name': name, 'bytes': size})
432
        meta.update({'version': version_id, 'version_timestamp': mtime})
433
        meta.update({'modified': modified, 'modified_by': muser})
434
        return meta
435
    
436
    @backend_method
437
    def update_object_meta(self, user, account, container, name, meta, replace=False):
438
        """Update the metadata associated with the object."""
439
        
440
        logger.debug("update_object_meta: %s %s %s %s %s", account, container, name, meta, replace)
441
        self._can_write(user, account, container, name)
442
        path, version_id, muser, mtime, size = self._get_objectinfo(account, container, name)
443
        self._put_metadata(user, path, meta, replace)
444
    
445
    @backend_method
446
    def get_object_permissions(self, user, account, container, name):
447
        """Return the path from which this object gets its permissions from,\
448
        along with a dictionary containing the permissions."""
449
        
450
        logger.debug("get_object_permissions: %s %s %s", account, container, name)
451
        self._can_read(user, account, container, name)
452
        path = self._get_objectinfo(account, container, name)[0]
453
        return self._get_permissions(path)
454
    
455
    @backend_method
456
    def update_object_permissions(self, user, account, container, name, permissions):
457
        """Update the permissions associated with the object."""
458
        
459
        logger.debug("update_object_permissions: %s %s %s %s", account, container, name, permissions)
460
        if user != account:
461
            raise NotAllowedError
462
        path = self._get_objectinfo(account, container, name)[0]
463
        r, w = self._check_permissions(path, permissions)
464
        self._put_permissions(path, r, w)
465
    
466
    @backend_method
467
    def get_object_public(self, user, account, container, name):
468
        """Return the public URL of the object if applicable."""
469
        
470
        logger.debug("get_object_public: %s %s %s", account, container, name)
471
        self._can_read(user, account, container, name)
472
        path = self._get_objectinfo(account, container, name)[0]
473
        if self._get_public(path):
474
            return '/public/' + path
475
        return None
476
    
477
    @backend_method
478
    def update_object_public(self, user, account, container, name, public):
479
        """Update the public status of the object."""
480
        
481
        logger.debug("update_object_public: %s %s %s %s", account, container, name, public)
482
        self._can_write(user, account, container, name)
483
        path = self._get_objectinfo(account, container, name)[0]
484
        self._put_public(path, public)
485
    
486
    @backend_method
487
    def get_object_hashmap(self, user, account, container, name, version=None):
488
        """Return the object's size and a list with partial hashes."""
489
        
490
        logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
491
        self._can_read(user, account, container, name)
492
        path, version_id, muser, mtime, size = self._get_objectinfo(account, container, name, version)
493
        hashmap = self.mapper.map_retr(version_id)
494
        return size, [binascii.hexlify(x) for x in hashmap]
495
    
496
    @backend_method
497
    def update_object_hashmap(self, user, account, container, name, size, hashmap, meta={}, replace_meta=False, permissions=None):
498
        """Create/update an object with the specified size and partial hashes."""
499
        
500
        logger.debug("update_object_hashmap: %s %s %s %s %s", account, container, name, size, hashmap)
501
        if permissions is not None and user != account:
502
            raise NotAllowedError
503
        self._can_write(user, account, container, name)
504
        missing = self.blocker.block_ping([binascii.unhexlify(x) for x in hashmap])
505
        if missing:
506
            ie = IndexError()
507
            ie.data = missing
508
            raise ie
509
        path = self._get_containerinfo(account, container)[0]
510
        path = os.path.join(path, name)
511
        if permissions is not None:
512
            r, w = self._check_permissions(path, permissions)
513
        src_version_id, dest_version_id = self._copy_version(user, path, path, not replace_meta, False)
514
        sql = 'update versions set size = ? where version_id = ?'
515
        self.con.execute(sql, (size, dest_version_id))
516
        self.mapper.map_stor(dest_version_id, [binascii.unhexlify(x) for x in hashmap])
517
        for k, v in meta.iteritems():
518
            sql = 'insert or replace into metadata (version_id, key, value) values (?, ?, ?)'
519
            self.con.execute(sql, (dest_version_id, k, v))
520
        if permissions is not None:
521
            self._put_permissions(path, r, w)
522
    
523
    @backend_method
524
    def copy_object(self, user, account, src_container, src_name, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None):
525
        """Copy an object's data and metadata."""
526
        
527
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s", account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version)
528
        if permissions is not None and user != account:
529
            raise NotAllowedError
530
        self._can_read(user, account, src_container, src_name)
531
        self._can_write(user, account, dest_container, dest_name)
532
        self._get_containerinfo(account, src_container)
533
        if src_version is None:
534
            src_path = self._get_objectinfo(account, src_container, src_name)[0]
535
        else:
536
            src_path = os.path.join(account, src_container, src_name)
537
        dest_path = self._get_containerinfo(account, dest_container)[0]
538
        dest_path = os.path.join(dest_path, dest_name)
539
        if permissions is not None:
540
            r, w = self._check_permissions(dest_path, permissions)
541
        src_version_id, dest_version_id = self._copy_version(user, src_path, dest_path, not replace_meta, True, src_version)
542
        for k, v in dest_meta.iteritems():
543
            sql = 'insert or replace into metadata (version_id, key, value) values (?, ?, ?)'
544
            self.con.execute(sql, (dest_version_id, k, v))
545
        if permissions is not None:
546
            self._put_permissions(dest_path, r, w)
547
    
548
    @backend_method
549
    def move_object(self, user, account, src_container, src_name, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None):
550
        """Move an object's data and metadata."""
551
        
552
        logger.debug("move_object: %s %s %s %s %s %s %s %s", account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions)
553
        self.copy_object(user, account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions, None)
554
        self.delete_object(user, account, src_container, src_name)
555
    
556
    @backend_method
557
    def delete_object(self, user, account, container, name, until=None):
558
        """Delete/purge an object."""
559
        
560
        logger.debug("delete_object: %s %s %s %s", account, container, name, until)
561
        if user != account:
562
            raise NotAllowedError
563
        
564
        if until is not None:
565
            path = os.path.join(account, container, name)
566
            sql = '''select version_id from versions where name = ? and tstamp <= ?'''
567
            c = self.con.execute(sql, (path, until))
568
            for v in [x[0] in c.fetchall()]:
569
                self._del_version(v)
570
            try:
571
                version_id = self._get_version(path)[0]
572
            except NameError:
573
                pass
574
            else:
575
                self._del_sharing(path)
576
            return
577
        
578
        path = self._get_objectinfo(account, container, name)[0]
579
        self._put_version(path, user, 0, 1)
580
        self._del_sharing(path)
581
    
582
    @backend_method
583
    def list_versions(self, user, account, container, name):
584
        """Return a list of all (version, version_timestamp) tuples for an object."""
585
        
586
        logger.debug("list_versions: %s %s %s", account, container, name)
587
        self._can_read(user, account, container, name)
588
        # This will even show deleted versions.
589
        path = os.path.join(account, container, name)
590
        sql = '''select distinct version_id, tstamp from versions where name = ? and hide = 0'''
591
        c = self.con.execute(sql, (path,))
592
        return [(int(x[0]), int(x[1])) for x in c.fetchall()]
593
    
594
    @backend_method(autocommit=0)
595
    def get_block(self, hash):
596
        """Return a block's data."""
597
        
598
        logger.debug("get_block: %s", hash)
599
        blocks = self.blocker.block_retr((binascii.unhexlify(hash),))
600
        if not blocks:
601
            raise NameError('Block does not exist')
602
        return blocks[0]
603
    
604
    @backend_method(autocommit=0)
605
    def put_block(self, data):
606
        """Create a block and return the hash."""
607
        
608
        logger.debug("put_block: %s", len(data))
609
        hashes, absent = self.blocker.block_stor((data,))
610
        return binascii.hexlify(hashes[0])
611
    
612
    @backend_method(autocommit=0)
613
    def update_block(self, hash, data, offset=0):
614
        """Update a known block and return the hash."""
615
        
616
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
617
        if offset == 0 and len(data) == self.block_size:
618
            return self.put_block(data)
619
        h, e = self.blocker.block_delta(binascii.unhexlify(hash), ((offset, data),))
620
        return binascii.hexlify(h)
621
    
622
    def _sql_until(self, until=None):
623
        """Return the sql to get the latest versions until the timestamp given."""
624
        if until is None:
625
            until = int(time.time())
626
        sql = '''select version_id, name, tstamp, size from versions v
627
                    where version_id = (select max(version_id) from versions
628
                                        where v.name = name and tstamp <= %s)
629
                    and hide = 0'''
630
        return sql % (until,)
631
    
632
    def _get_pathstats(self, path, until=None):
633
        """Return count and sum of size of everything under path and latest timestamp."""
634
        
635
        sql = 'select count(version_id), total(size), max(tstamp) from (%s) where name like ?'
636
        sql = sql % self._sql_until(until)
637
        c = self.con.execute(sql, (path + '/%',))
638
        row = c.fetchone()
639
        tstamp = row[2] if row[2] is not None else 0
640
        return int(row[0]), int(row[1]), int(tstamp)
641
    
642
    def _get_version(self, path, version=None):
643
        if version is None:
644
            sql = '''select version_id, user, tstamp, size, hide from versions where name = ?
645
                        order by version_id desc limit 1'''
646
            c = self.con.execute(sql, (path,))
647
            row = c.fetchone()
648
            if not row or int(row[4]):
649
                raise NameError('Object does not exist')
650
        else:
651
            # The database (sqlite) will not complain if the version is not an integer.
652
            sql = '''select version_id, user, tstamp, size from versions where name = ?
653
                        and version_id = ?'''
654
            c = self.con.execute(sql, (path, version))
655
            row = c.fetchone()
656
            if not row:
657
                raise IndexError('Version does not exist')
658
        return str(row[0]), str(row[1]), int(row[2]), int(row[3])
659
    
660
    def _put_version(self, path, user, size=0, hide=0):
661
        tstamp = int(time.time())
662
        sql = 'insert into versions (name, user, tstamp, size, hide) values (?, ?, ?, ?, ?)'
663
        id = self.con.execute(sql, (path, user, tstamp, size, hide)).lastrowid
664
        return str(id)
665
    
666
    def _copy_version(self, user, src_path, dest_path, copy_meta=True, copy_data=True, src_version=None):
667
        if src_version is not None:
668
            src_version_id, muser, mtime, size = self._get_version(src_path, src_version)
669
        else:
670
            # Latest or create from scratch.
671
            try:
672
                src_version_id, muser, mtime, size = self._get_version(src_path)
673
            except NameError:
674
                src_version_id = None
675
                size = 0
676
        if not copy_data:
677
            size = 0
678
        dest_version_id = self._put_version(dest_path, user, size)
679
        if copy_meta and src_version_id is not None:
680
            sql = 'insert into metadata select %s, key, value from metadata where version_id = ?'
681
            sql = sql % dest_version_id
682
            self.con.execute(sql, (src_version_id,))
683
        if copy_data and src_version_id is not None:
684
            # TODO: Copy properly.
685
            hashmap = self.mapper.map_retr(src_version_id)
686
            self.mapper.map_stor(dest_version_id, hashmap)
687
        return src_version_id, dest_version_id
688
    
689
    def _get_versioninfo(self, account, container, name, until=None):
690
        """Return path, latest version, associated timestamp and size until the timestamp given."""
691
        
692
        p = (account, container, name)
693
        try:
694
            p = p[:p.index(None)]
695
        except ValueError:
696
            pass
697
        path = os.path.join(*p)
698
        sql = '''select version_id, tstamp, size from (%s) where name = ?'''
699
        sql = sql % self._sql_until(until)
700
        c = self.con.execute(sql, (path,))
701
        row = c.fetchone()
702
        if row is None:
703
            raise NameError('Path does not exist')
704
        return path, str(row[0]), int(row[1]), int(row[2])
705
    
706
    def _get_accountinfo(self, account, until=None):
707
        try:
708
            path, version_id, mtime, size = self._get_versioninfo(account, None, None, until)
709
            return version_id, mtime
710
        except:
711
            raise NameError('Account does not exist')
712
    
713
    def _get_containerinfo(self, account, container, until=None):
714
        try:
715
            path, version_id, mtime, size = self._get_versioninfo(account, container, None, until)
716
            return path, version_id, mtime
717
        except:
718
            raise NameError('Container does not exist')
719
    
720
    def _get_objectinfo(self, account, container, name, version=None):
721
        path = os.path.join(account, container, name)
722
        version_id, muser, mtime, size = self._get_version(path, version)
723
        return path, version_id, muser, mtime, size
724
    
725
    def _get_metadata(self, path, version):
726
        sql = 'select key, value from metadata where version_id = ?'
727
        c = self.con.execute(sql, (version,))
728
        return dict(c.fetchall())
729
    
730
    def _put_metadata(self, user, path, meta, replace=False, copy_data=True):
731
        """Create a new version and store metadata."""
732
        
733
        src_version_id, dest_version_id = self._copy_version(user, path, path, not replace, copy_data)
734
        for k, v in meta.iteritems():
735
            if not replace and v == '':
736
                sql = 'delete from metadata where version_id = ? and key = ?'
737
                self.con.execute(sql, (dest_version_id, k))
738
            else:
739
                sql = 'insert or replace into metadata (version_id, key, value) values (?, ?, ?)'
740
                self.con.execute(sql, (dest_version_id, k, v))
741
    
742
    def _check_policy(self, policy):
743
        for k in policy.keys():
744
            if policy[k] == '':
745
                policy[k] = self.default_policy.get(k)
746
        for k, v in policy.iteritems():
747
            if k == 'quota':
748
                q = int(v) # May raise ValueError.
749
                if q < 0:
750
                    raise ValueError
751
            elif k == 'versioning':
752
                if v not in ['auto', 'manual', 'none']:
753
                    raise ValueError
754
            else:
755
                raise ValueError
756
    
757
    def _get_policy(self, path):
758
        sql = 'select key, value from policy where name = ?'
759
        c = self.con.execute(sql, (path,))
760
        return dict(c.fetchall())
761
    
762
    
763
    def _list_limits(self, listing, marker, limit):
764
        start = 0
765
        if marker:
766
            try:
767
                start = listing.index(marker) + 1
768
            except ValueError:
769
                pass
770
        if not limit or limit > 10000:
771
            limit = 10000
772
        return start, limit
773
    
774
    def _list_objects(self, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], until=None, allowed=[]):
775
        cont_prefix = path + '/'
776
        if keys and len(keys) > 0:
777
            sql = '''select distinct o.name, o.version_id from (%s) o, metadata m where o.name like ? and
778
                        m.version_id = o.version_id and m.key in (%s)'''
779
            sql = sql % (self._sql_until(until), ', '.join('?' * len(keys)))
780
            param = (cont_prefix + prefix + '%',) + tuple(keys)
781
            if allowed:
782
                for x in allowed:
783
                    sql += ' and o.name like ?'
784
                    param += (x,)
785
            sql += ' order by o.name'
786
        else:
787
            sql = 'select name, version_id from (%s) where name like ?'
788
            sql = sql % self._sql_until(until)
789
            param = (cont_prefix + prefix + '%',)
790
            if allowed:
791
                for x in allowed:
792
                    sql += ' and name like ?'
793
                    param += (x,)
794
            sql += ' order by name'
795
        c = self.con.execute(sql, param)
796
        objects = [(x[0][len(cont_prefix):], x[1]) for x in c.fetchall()]
797
        if delimiter:
798
            pseudo_objects = []
799
            for x in objects:
800
                pseudo_name = x[0]
801
                i = pseudo_name.find(delimiter, len(prefix))
802
                if not virtual:
803
                    # If the delimiter is not found, or the name ends
804
                    # with the delimiter's first occurence.
805
                    if i == -1 or len(pseudo_name) == i + len(delimiter):
806
                        pseudo_objects.append(x)
807
                else:
808
                    # If the delimiter is found, keep up to (and including) the delimiter.
809
                    if i != -1:
810
                        pseudo_name = pseudo_name[:i + len(delimiter)]
811
                    if pseudo_name not in [y[0] for y in pseudo_objects]:
812
                        if pseudo_name == x[0]:
813
                            pseudo_objects.append(x)
814
                        else:
815
                            pseudo_objects.append((pseudo_name, None))
816
            objects = pseudo_objects
817
        
818
        start, limit = self._list_limits([x[0] for x in objects], marker, limit)
819
        return objects[start:start + limit]
820
    
821
    def _del_version(self, version):
822
        self.mapper.map_remv(version)
823
        sql = 'delete from versions where version_id = ?'
824
        self.con.execute(sql, (version,))
825
    
826
    # Access control functions.
827
    
828
    def _check_groups(self, groups):
829
        # Example follows.
830
        # for k, v in groups.iteritems():
831
        #     if True in [False or ',' in x for x in v]:
832
        #         raise ValueError('Bad characters in groups')
833
        pass
834
    
835
    def _get_groups(self, account):
836
        sql = 'select gname, user from groups where account = ?'
837
        c = self.con.execute(sql, (account,))
838
        groups = {}
839
        for row in c.fetchall():
840
            if row[0] not in groups:
841
                groups[row[0]] = []
842
            groups[row[0]].append(row[1])
843
        return groups
844
    
845
    def _put_groups(self, account, groups, replace=False):
846
        if replace:
847
            self._del_groups(account)
848
        for k, v in groups.iteritems():
849
            sql = 'delete from groups where account = ? and gname = ?'
850
            self.con.execute(sql, (account, k))
851
            if v:
852
                sql = 'insert into groups (account, gname, user) values (?, ?, ?)'
853
                self.con.executemany(sql, [(account, k, x) for x in v])
854
    
855
    def _del_groups(self, account):
856
        sql = 'delete from groups where account = ?'
857
        self.con.execute(sql, (account,))
858
    
859
    def _check_permissions(self, path, permissions):
860
        # Check for existing permissions.
861
        sql = '''select name from permissions
862
                    where name != ? and (name like ? or ? like name || ?)'''
863
        c = self.con.execute(sql, (path, path + '%', path, '%'))
864
        row = c.fetchone()
865
        if row:
866
            ae = AttributeError()
867
            ae.data = row[0]
868
            raise ae
869
        
870
        # Format given permissions.
871
        if len(permissions) == 0:
872
            return [], []
873
        r = permissions.get('read', [])
874
        w = permissions.get('write', [])
875
        # Examples follow.
876
        # if True in [False or ',' in x for x in r]:
877
        #     raise ValueError('Bad characters in read permissions')
878
        # if True in [False or ',' in x for x in w]:
879
        #     raise ValueError('Bad characters in write permissions')
880
        return r, w
881
    
882
    def _get_permissions(self, path):
883
        # Check for permissions at path or above.
884
        sql = 'select name, op, user from permissions where ? like name || ?'
885
        c = self.con.execute(sql, (path, '%'))
886
        name = path
887
        perms = {} # Return nothing, if nothing is set.
888
        for row in c.fetchall():
889
            name = row[0]
890
            if row[1] not in perms:
891
                perms[row[1]] = []
892
            perms[row[1]].append(row[2])
893
        return name, perms
894
    
895
    def _put_permissions(self, path, r, w):
896
        sql = 'delete from permissions where name = ?'
897
        self.con.execute(sql, (path,))
898
        sql = 'insert into permissions (name, op, user) values (?, ?, ?)'
899
        if r:
900
            self.con.executemany(sql, [(path, 'read', x) for x in r])
901
        if w:
902
            self.con.executemany(sql, [(path, 'write', x) for x in w])
903
    
904
    def _get_public(self, path):
905
        sql = 'select name from public where name = ?'
906
        c = self.con.execute(sql, (path,))
907
        row = c.fetchone()
908
        if not row:
909
            return False
910
        return True
911
    
912
    def _put_public(self, path, public):
913
        if not public:
914
            sql = 'delete from public where name = ?'
915
        else:
916
            sql = 'insert or replace into public (name) values (?)'
917
        self.con.execute(sql, (path,))
918
    
919
    def _del_sharing(self, path):
920
        sql = 'delete from permissions where name = ?'
921
        self.con.execute(sql, (path,))
922
        sql = 'delete from public where name = ?'
923
        self.con.execute(sql, (path,))
924
    
925
    def _is_allowed(self, user, account, container, name, op='read'):
926
        if user == account:
927
            return True
928
        path = os.path.join(account, container, name)
929
        if op == 'read' and self._get_public(path):
930
            return True
931
        perm_path, perms = self._get_permissions(path)
932
        
933
        # Expand groups.
934
        for x in ('read', 'write'):
935
            g_perms = set()
936
            for y in perms.get(x, []):
937
                if ':' in y:
938
                    g_account, g_name = y.split(':', 1)
939
                    groups = self._get_groups(g_account)
940
                    if g_name in groups:
941
                        g_perms.update(groups[g_name])
942
                else:
943
                    g_perms.add(y)
944
            perms[x] = g_perms
945
        
946
        if op == 'read' and ('*' in perms['read'] or user in perms['read']):
947
            return True
948
        if '*' in perms['write'] or user in perms['write']:
949
            return True
950
        return False
951
    
952
    def _can_read(self, user, account, container, name):
953
        if not self._is_allowed(user, account, container, name, 'read'):
954
            raise NotAllowedError
955
    
956
    def _can_write(self, user, account, container, name):
957
        if not self._is_allowed(user, account, container, name, 'write'):
958
            raise NotAllowedError
959
    
960
    def _allowed_paths(self, user, prefix=None):
961
        sql = '''select distinct name from permissions where (user = ?
962
                    or user in (select account || ':' || gname from groups where user = ?))'''
963
        param = (user, user)
964
        if prefix:
965
            sql += ' and name like ?'
966
            param += (prefix + '/%',)
967
        c = self.con.execute(sql, param)
968
        return [x[0] for x in c.fetchall()]
969
    
970
    def _allowed_accounts(self, user):
971
        allow = set()
972
        for path in self._allowed_paths(user):
973
            allow.add(path.split('/', 1)[0])
974
        return sorted(allow)
975
    
976
    def _allowed_containers(self, user, account):
977
        allow = set()
978
        for path in self._allowed_paths(user, account):
979
            allow.add(path.split('/', 2)[1])
980
        return sorted(allow)
981
    
982
    def _shared_paths(self, prefix):
983
        sql = 'select distinct name from permissions where name like ?'
984
        c = self.con.execute(sql, (prefix + '/%',))
985
        return [x[0] for x in c.fetchall()]