Statistics
| Branch: | Tag: | Revision:

root / pithos / backends / simple.py @ 17629fea

History | View | Annotate | Download (37.9 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
# 
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
# 
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
# 
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
# 
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
# 
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import os
35
import time
36
import sqlite3
37
import logging
38
import hashlib
39
import binascii
40

    
41
from base import NotAllowedError, BaseBackend
42
from pithos.lib.hashfiler import Mapper, Blocker
43

    
44

    
45
logger = logging.getLogger(__name__)
46

    
47

    
48
class SimpleBackend(BaseBackend):
49
    """A simple backend.
50
    
51
    Uses SQLite for storage.
52
    """
53
    
54
    def __init__(self, db):
55
        self.hash_algorithm = 'sha256'
56
        self.block_size = 4 * 1024 * 1024 # 4MB
57
        
58
        self.default_policy = {'quota': 0, 'versioning': 'auto'}
59
        
60
        basepath = os.path.split(db)[0]
61
        if basepath and not os.path.exists(basepath):
62
            os.makedirs(basepath)
63
        if not os.path.isdir(basepath):
64
            raise RuntimeError("Cannot open database at '%s'" % (db,))
65
        
66
        self.con = sqlite3.connect(basepath + '/db', check_same_thread=False)
67
        
68
        sql = '''pragma foreign_keys = on'''
69
        self.con.execute(sql)
70
        
71
        sql = '''create table if not exists versions (
72
                    version_id integer primary key,
73
                    name text,
74
                    user text,
75
                    tstamp integer not null,
76
                    size integer default 0,
77
                    hide integer default 0)'''
78
        self.con.execute(sql)
79
        sql = '''create table if not exists metadata (
80
                    version_id integer,
81
                    key text,
82
                    value text,
83
                    primary key (version_id, key)
84
                    foreign key (version_id) references versions(version_id)
85
                    on delete cascade)'''
86
        self.con.execute(sql)
87
        sql = '''create table if not exists policy (
88
                    name text, key text, value text, primary key (name, key))'''
89
        self.con.execute(sql)
90
        
91
        # Access control tables.
92
        sql = '''create table if not exists groups (
93
                    account text, gname text, user text)'''
94
        self.con.execute(sql)
95
        sql = '''create table if not exists permissions (
96
                    name text, op text, user text)'''
97
        self.con.execute(sql)
98
        sql = '''create table if not exists public (
99
                    name text, primary key (name))'''
100
        self.con.execute(sql)
101
        
102
        self.con.commit()
103
        
104
        params = {'blocksize': self.block_size,
105
                  'blockpath': basepath + '/blocks',
106
                  'hashtype': self.hash_algorithm}
107
        self.blocker = Blocker(**params)
108
        
109
        params = {'mappath': basepath + '/maps',
110
                  'namelen': self.blocker.hashlen}
111
        self.mapper = Mapper(**params)
112
    
113
    def get_account_meta(self, user, account, until=None):
114
        """Return a dictionary with the account metadata."""
115
        
116
        logger.debug("get_account_meta: %s %s", account, until)
117
        if user != account:
118
            raise NotAllowedError
119
        try:
120
            version_id, mtime = self._get_accountinfo(account, until)
121
        except NameError:
122
            version_id = None
123
            mtime = 0
124
        count, bytes, tstamp = self._get_pathstats(account, until)
125
        if mtime > tstamp:
126
            tstamp = mtime
127
        if until is None:
128
            modified = tstamp
129
        else:
130
            modified = self._get_pathstats(account)[2] # Overall last modification
131
            if mtime > modified:
132
                modified = mtime
133
        
134
        # Proper count.
135
        sql = 'select count(name) from (%s) where name glob ? and not name glob ?'
136
        sql = sql % self._sql_until(until)
137
        c = self.con.execute(sql, (account + '/*', account + '/*/*'))
138
        row = c.fetchone()
139
        count = row[0]
140
        
141
        meta = self._get_metadata(account, version_id)
142
        meta.update({'name': account, 'count': count, 'bytes': bytes})
143
        if modified:
144
            meta.update({'modified': modified})
145
        if until is not None:
146
            meta.update({'until_timestamp': tstamp})
147
        return meta
148
    
149
    def update_account_meta(self, user, account, meta, replace=False):
150
        """Update the metadata associated with the account."""
151
        
152
        logger.debug("update_account_meta: %s %s %s", account, meta, replace)
153
        if user != account:
154
            raise NotAllowedError
155
        self._put_metadata(user, account, meta, replace, False)
156
    
157
    def get_account_groups(self, user, account):
158
        """Return a dictionary with the user groups defined for this account."""
159
        
160
        logger.debug("get_account_groups: %s", account)
161
        if user != account:
162
            raise NotAllowedError
163
        return self._get_groups(account)
164
    
165
    def update_account_groups(self, user, account, groups, replace=False):
166
        """Update the groups associated with the account."""
167
        
168
        logger.debug("update_account_groups: %s %s %s", account, groups, replace)
169
        if user != account:
170
            raise NotAllowedError
171
        self._check_groups(groups)
172
        self._put_groups(account, groups, replace)
173
    
174
    def put_account(self, user, account):
175
        """Create a new account with the given name."""
176
        
177
        logger.debug("put_account: %s", account)
178
        if user != account:
179
            raise NotAllowedError
180
        try:
181
            version_id, mtime = self._get_accountinfo(account)
182
        except NameError:
183
            pass
184
        else:
185
            raise NameError('Account already exists')
186
        version_id = self._put_version(account, user)
187
        self.con.commit()
188
    
189
    def delete_account(self, user, account):
190
        """Delete the account with the given name."""
191
        
192
        logger.debug("delete_account: %s", account)
193
        if user != account:
194
            raise NotAllowedError
195
        count = self._get_pathstats(account)[0]
196
        if count > 0:
197
            raise IndexError('Account is not empty')
198
        sql = 'delete from versions where name = ?'
199
        self.con.execute(sql, (account,))
200
        self._del_groups(account)
201
        self.con.commit()
202
    
203
    def list_containers(self, user, account, marker=None, limit=10000, until=None):
204
        """Return a list of containers existing under an account."""
205
        
206
        logger.debug("list_containers: %s %s %s %s", account, marker, limit, until)
207
        if user != account:
208
            if until:
209
                raise NotAllowedError
210
            containers = self._allowed_containers(user, account)
211
            start = 0
212
            if marker:
213
                try:
214
                    start = containers.index(marker) + 1
215
                except ValueError:
216
                    pass
217
            if not limit or limit > 10000:
218
                limit = 10000
219
            return containers[start:start + limit]
220
        return self._list_objects(account, '', '/', marker, limit, False, [], until)
221
    
222
    def get_container_meta(self, user, account, container, until=None):
223
        """Return a dictionary with the container metadata."""
224
        
225
        logger.debug("get_container_meta: %s %s %s", account, container, until)
226
        if user != account:
227
            raise NotAllowedError
228
        path, version_id, mtime = self._get_containerinfo(account, container, until)
229
        count, bytes, tstamp = self._get_pathstats(path, until)
230
        if mtime > tstamp:
231
            tstamp = mtime
232
        if until is None:
233
            modified = tstamp
234
        else:
235
            modified = self._get_pathstats(path)[2] # Overall last modification
236
            if mtime > modified:
237
                modified = mtime
238
        
239
        meta = self._get_metadata(path, version_id)
240
        meta.update({'name': container, 'count': count, 'bytes': bytes, 'modified': modified})
241
        if until is not None:
242
            meta.update({'until_timestamp': tstamp})
243
        return meta
244
    
245
    def update_container_meta(self, user, account, container, meta, replace=False):
246
        """Update the metadata associated with the container."""
247
        
248
        logger.debug("update_container_meta: %s %s %s %s", account, container, meta, replace)
249
        if user != account:
250
            raise NotAllowedError
251
        path, version_id, mtime = self._get_containerinfo(account, container)
252
        self._put_metadata(user, path, meta, replace, False)
253
    
254
    def get_container_policy(self, user, account, container):
255
        """Return a dictionary with the container policy."""
256
        
257
        logger.debug("get_container_policy: %s %s", account, container)
258
        if user != account:
259
            raise NotAllowedError
260
        path = self._get_containerinfo(account, container)[0]
261
        return self._get_policy(path)
262
    
263
    def update_container_policy(self, user, account, container, policy, replace=False):
264
        """Update the policy associated with the account."""
265
        
266
        logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace)
267
        if user != account:
268
            raise NotAllowedError
269
        path = self._get_containerinfo(account, container)[0]
270
        self._check_policy(policy)
271
        if replace:
272
            for k, v in self.default_policy.iteritems():
273
                if k not in policy:
274
                    policy[k] = v
275
        for k, v in policy.iteritems():
276
            sql = 'insert or replace into policy (name, key, value) values (?, ?, ?)'
277
            self.con.execute(sql, (path, k, v))
278
        self.con.commit()
279
    
280
    def put_container(self, user, account, container, policy=None):
281
        """Create a new container with the given name."""
282
        
283
        logger.debug("put_container: %s %s %s", account, container, policy)
284
        if user != account:
285
            raise NotAllowedError
286
        try:
287
            path, version_id, mtime = self._get_containerinfo(account, container)
288
        except NameError:
289
            pass
290
        else:
291
            raise NameError('Container already exists')
292
        if policy:
293
            self._check_policy(policy)
294
        path = os.path.join(account, container)
295
        version_id = self._put_version(path, user)
296
        for k, v in self.default_policy.iteritems():
297
            if k not in policy:
298
                policy[k] = v
299
        for k, v in policy.iteritems():
300
            sql = 'insert or replace into policy (name, key, value) values (?, ?, ?)'
301
            self.con.execute(sql, (path, k, v))
302
        self.con.commit()
303
    
304
    def delete_container(self, user, account, container, until=None):
305
        """Delete/purge the container with the given name."""
306
        
307
        logger.debug("delete_container: %s %s %s", account, container, until)
308
        if user != account:
309
            raise NotAllowedError
310
        path, version_id, mtime = self._get_containerinfo(account, container)
311
        
312
        if until is not None:
313
            sql = '''select version_id from versions where name like ? and tstamp <= ?
314
                        and version_id not in (select version_id from (%s))'''
315
            sql = sql % self._sql_until() # Do not delete current versions.
316
            c = self.con.execute(sql, (path + '/%', until))
317
            for v in [x[0] for x in c.fetchall()]:
318
                self._del_version(v)
319
            self.con.commit()
320
            return
321
        
322
        count = self._get_pathstats(path)[0]
323
        if count > 0:
324
            raise IndexError('Container is not empty')
325
        sql = 'delete from versions where name = ? or name like ?' # May contain hidden items.
326
        self.con.execute(sql, (path, path + '/%',))
327
        sql = 'delete from policy where name = ?'
328
        self.con.execute(sql, (path,))
329
        self._copy_version(user, account, account, True, False) # New account version (for timestamp update).
330
    
331
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], until=None):
332
        """Return a list of objects existing under a container."""
333
        
334
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, keys, until)
335
        if user != account:
336
            raise NotAllowedError
337
        path, version_id, mtime = self._get_containerinfo(account, container, until)
338
        return self._list_objects(path, prefix, delimiter, marker, limit, virtual, keys, until)
339
    
340
    def list_object_meta(self, user, account, container, until=None):
341
        """Return a list with all the container's object meta keys."""
342
        
343
        logger.debug("list_object_meta: %s %s %s", account, container, until)
344
        if user != account:
345
            raise NotAllowedError
346
        path, version_id, mtime = self._get_containerinfo(account, container, until)
347
        sql = '''select distinct m.key from (%s) o, metadata m
348
                    where m.version_id = o.version_id and o.name like ?'''
349
        sql = sql % self._sql_until(until)
350
        c = self.con.execute(sql, (path + '/%',))
351
        return [x[0] for x in c.fetchall()]
352
    
353
    def get_object_meta(self, user, account, container, name, version=None):
354
        """Return a dictionary with the object metadata."""
355
        
356
        logger.debug("get_object_meta: %s %s %s %s", account, container, name, version)
357
        self._can_read(user, account, container, name)
358
        path, version_id, muser, mtime, size = self._get_objectinfo(account, container, name, version)
359
        if version is None:
360
            modified = mtime
361
        else:
362
            modified = self._get_version(path, version)[2] # Overall last modification
363
        
364
        meta = self._get_metadata(path, version_id)
365
        meta.update({'name': name, 'bytes': size})
366
        meta.update({'version': version_id, 'version_timestamp': mtime})
367
        meta.update({'modified': modified, 'modified_by': muser})
368
        return meta
369
    
370
    def update_object_meta(self, user, account, container, name, meta, replace=False):
371
        """Update the metadata associated with the object."""
372
        
373
        logger.debug("update_object_meta: %s %s %s %s %s", account, container, name, meta, replace)
374
        self._can_write(user, account, container, name)
375
        path, version_id, muser, mtime, size = self._get_objectinfo(account, container, name)
376
        self._put_metadata(user, path, meta, replace)
377
    
378
    def get_object_permissions(self, user, account, container, name):
379
        """Return the path from which this object gets its permissions from,\
380
        along with a dictionary containing the permissions."""
381
        
382
        logger.debug("get_object_permissions: %s %s %s", account, container, name)
383
        self._can_read(user, account, container, name)
384
        path = self._get_objectinfo(account, container, name)[0]
385
        return self._get_permissions(path)
386
    
387
    def update_object_permissions(self, user, account, container, name, permissions):
388
        """Update the permissions associated with the object."""
389
        
390
        logger.debug("update_object_permissions: %s %s %s %s", account, container, name, permissions)
391
        if user != account:
392
            raise NotAllowedError
393
        path = self._get_objectinfo(account, container, name)[0]
394
        r, w = self._check_permissions(path, permissions)
395
        self._put_permissions(path, r, w)
396
    
397
    def get_object_public(self, user, account, container, name):
398
        """Return the public URL of the object if applicable."""
399
        
400
        logger.debug("get_object_public: %s %s %s", account, container, name)
401
        self._can_read(user, account, container, name)
402
        path = self._get_objectinfo(account, container, name)[0]
403
        if self._get_public(path):
404
            return '/public/' + path
405
        return None
406
    
407
    def update_object_public(self, user, account, container, name, public):
408
        """Update the public status of the object."""
409
        
410
        logger.debug("update_object_public: %s %s %s %s", account, container, name, public)
411
        self._can_write(user, account, container, name)
412
        path = self._get_objectinfo(account, container, name)[0]
413
        self._put_public(path, public)
414
    
415
    def get_object_hashmap(self, user, account, container, name, version=None):
416
        """Return the object's size and a list with partial hashes."""
417
        
418
        logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
419
        self._can_read(user, account, container, name)
420
        path, version_id, muser, mtime, size = self._get_objectinfo(account, container, name, version)
421
        hashmap = self.mapper.map_retr(version_id)
422
        return size, [binascii.hexlify(x) for x in hashmap]
423
    
424
    def update_object_hashmap(self, user, account, container, name, size, hashmap, meta={}, replace_meta=False, permissions=None):
425
        """Create/update an object with the specified size and partial hashes."""
426
        
427
        logger.debug("update_object_hashmap: %s %s %s %s %s", account, container, name, size, hashmap)
428
        if permissions is not None and user != account:
429
            raise NotAllowedError
430
        self._can_write(user, account, container, name)
431
        missing = self.blocker.block_ping([binascii.unhexlify(x) for x in hashmap])
432
        if missing:
433
            ie = IndexError()
434
            ie.data = missing
435
            raise ie
436
        path = self._get_containerinfo(account, container)[0]
437
        path = os.path.join(path, name)
438
        if permissions is not None:
439
            r, w = self._check_permissions(path, permissions)
440
        src_version_id, dest_version_id = self._copy_version(user, path, path, not replace_meta, False)
441
        sql = 'update versions set size = ? where version_id = ?'
442
        self.con.execute(sql, (size, dest_version_id))
443
        self.mapper.map_stor(dest_version_id, [binascii.unhexlify(x) for x in hashmap])
444
        for k, v in meta.iteritems():
445
            sql = 'insert or replace into metadata (version_id, key, value) values (?, ?, ?)'
446
            self.con.execute(sql, (dest_version_id, k, v))
447
        if permissions is not None:
448
            self._put_permissions(path, r, w)
449
        self.con.commit()
450
    
451
    def copy_object(self, user, account, src_container, src_name, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None):
452
        """Copy an object's data and metadata."""
453
        
454
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s", account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version)
455
        if permissions is not None and user != account:
456
            raise NotAllowedError
457
        self._can_read(user, account, src_container, src_name)
458
        self._can_write(user, account, dest_container, dest_name)
459
        self._get_containerinfo(account, src_container)
460
        if src_version is None:
461
            src_path = self._get_objectinfo(account, src_container, src_name)[0]
462
        else:
463
            src_path = os.path.join(account, src_container, src_name)
464
        dest_path = self._get_containerinfo(account, dest_container)[0]
465
        dest_path = os.path.join(dest_path, dest_name)
466
        if permissions is not None:
467
            r, w = self._check_permissions(dest_path, permissions)
468
        src_version_id, dest_version_id = self._copy_version(user, src_path, dest_path, not replace_meta, True, src_version)
469
        for k, v in dest_meta.iteritems():
470
            sql = 'insert or replace into metadata (version_id, key, value) values (?, ?, ?)'
471
            self.con.execute(sql, (dest_version_id, k, v))
472
        if permissions is not None:
473
            self._put_permissions(dest_path, r, w)
474
        self.con.commit()
475
    
476
    def move_object(self, user, account, src_container, src_name, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None):
477
        """Move an object's data and metadata."""
478
        
479
        logger.debug("move_object: %s %s %s %s %s %s %s %s", account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions)
480
        self.copy_object(user, account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions, None)
481
        self.delete_object(user, account, src_container, src_name)
482
    
483
    def delete_object(self, user, account, container, name, until=None):
484
        """Delete/purge an object."""
485
        
486
        logger.debug("delete_object: %s %s %s %s", account, container, name, until)
487
        if user != account:
488
            raise NotAllowedError
489
        
490
        if until is not None:
491
            path = os.path.join(account, container, name)
492
            sql = '''select version_id from versions where name = ? and tstamp <= ?'''
493
            c = self.con.execute(sql, (path, until))
494
            for v in [x[0] in c.fetchall()]:
495
                self._del_version(v)
496
            try:
497
                version_id = self._get_version(path)[0]
498
            except NameError:
499
                pass
500
            else:
501
                self._del_sharing(path)
502
            self.con.commit()
503
            return
504
        
505
        path = self._get_objectinfo(account, container, name)[0]
506
        self._put_version(path, user, 0, 1)
507
        self._del_sharing(path)
508
    
509
    def list_versions(self, user, account, container, name):
510
        """Return a list of all (version, version_timestamp) tuples for an object."""
511
        
512
        logger.debug("list_versions: %s %s %s", account, container, name)
513
        self._can_read(user, account, container, name)
514
        # This will even show deleted versions.
515
        path = os.path.join(account, container, name)
516
        sql = '''select distinct version_id, tstamp from versions where name = ? and hide = 0'''
517
        c = self.con.execute(sql, (path,))
518
        return [(int(x[0]), int(x[1])) for x in c.fetchall()]
519
    
520
    def get_block(self, hash):
521
        """Return a block's data."""
522
        
523
        logger.debug("get_block: %s", hash)
524
        blocks = self.blocker.block_retr((binascii.unhexlify(hash),))
525
        if not blocks:
526
            raise NameError('Block does not exist')
527
        return blocks[0]
528
    
529
    def put_block(self, data):
530
        """Create a block and return the hash."""
531
        
532
        logger.debug("put_block: %s", len(data))
533
        hashes, absent = self.blocker.block_stor((data,))
534
        return binascii.hexlify(hashes[0])
535
    
536
    def update_block(self, hash, data, offset=0):
537
        """Update a known block and return the hash."""
538
        
539
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
540
        if offset == 0 and len(data) == self.block_size:
541
            return self.put_block(data)
542
        h, e = self.blocker.block_delta(binascii.unhexlify(hash), ((offset, data),))
543
        return binascii.hexlify(h)
544
    
545
    def _sql_until(self, until=None):
546
        """Return the sql to get the latest versions until the timestamp given."""
547
        if until is None:
548
            until = int(time.time())
549
        sql = '''select version_id, name, tstamp, size from versions v
550
                    where version_id = (select max(version_id) from versions
551
                                        where v.name = name and tstamp <= %s)
552
                    and hide = 0'''
553
        return sql % (until,)
554
    
555
    def _get_pathstats(self, path, until=None):
556
        """Return count and sum of size of everything under path and latest timestamp."""
557
        
558
        sql = 'select count(version_id), total(size), max(tstamp) from (%s) where name like ?'
559
        sql = sql % self._sql_until(until)
560
        c = self.con.execute(sql, (path + '/%',))
561
        row = c.fetchone()
562
        tstamp = row[2] if row[2] is not None else 0
563
        return int(row[0]), int(row[1]), int(tstamp)
564
    
565
    def _get_version(self, path, version=None):
566
        if version is None:
567
            sql = '''select version_id, user, tstamp, size, hide from versions where name = ?
568
                        order by version_id desc limit 1'''
569
            c = self.con.execute(sql, (path,))
570
            row = c.fetchone()
571
            if not row or int(row[4]):
572
                raise NameError('Object does not exist')
573
        else:
574
            # The database (sqlite) will not complain if the version is not an integer.
575
            sql = '''select version_id, user, tstamp, size from versions where name = ?
576
                        and version_id = ?'''
577
            c = self.con.execute(sql, (path, version))
578
            row = c.fetchone()
579
            if not row:
580
                raise IndexError('Version does not exist')
581
        return str(row[0]), str(row[1]), int(row[2]), int(row[3])
582
    
583
    def _put_version(self, path, user, size=0, hide=0):
584
        tstamp = int(time.time())
585
        sql = 'insert into versions (name, user, tstamp, size, hide) values (?, ?, ?, ?, ?)'
586
        id = self.con.execute(sql, (path, user, tstamp, size, hide)).lastrowid
587
        self.con.commit()
588
        return str(id)
589
    
590
    def _copy_version(self, user, src_path, dest_path, copy_meta=True, copy_data=True, src_version=None):
591
        if src_version is not None:
592
            src_version_id, muser, mtime, size = self._get_version(src_path, src_version)
593
        else:
594
            # Latest or create from scratch.
595
            try:
596
                src_version_id, muser, mtime, size = self._get_version(src_path)
597
            except NameError:
598
                src_version_id = None
599
                size = 0
600
        if not copy_data:
601
            size = 0
602
        dest_version_id = self._put_version(dest_path, user, size)
603
        if copy_meta and src_version_id is not None:
604
            sql = 'insert into metadata select %s, key, value from metadata where version_id = ?'
605
            sql = sql % dest_version_id
606
            self.con.execute(sql, (src_version_id,))
607
        if copy_data and src_version_id is not None:
608
            # TODO: Copy properly.
609
            hashmap = self.mapper.map_retr(src_version_id)
610
            self.mapper.map_stor(dest_version_id, hashmap)
611
        self.con.commit()
612
        return src_version_id, dest_version_id
613
    
614
    def _get_versioninfo(self, account, container, name, until=None):
615
        """Return path, latest version, associated timestamp and size until the timestamp given."""
616
        
617
        p = (account, container, name)
618
        try:
619
            p = p[:p.index(None)]
620
        except ValueError:
621
            pass
622
        path = os.path.join(*p)
623
        sql = '''select version_id, tstamp, size from (%s) where name = ?'''
624
        sql = sql % self._sql_until(until)
625
        c = self.con.execute(sql, (path,))
626
        row = c.fetchone()
627
        if row is None:
628
            raise NameError('Path does not exist')
629
        return path, str(row[0]), int(row[1]), int(row[2])
630
    
631
    def _get_accountinfo(self, account, until=None):
632
        try:
633
            path, version_id, mtime, size = self._get_versioninfo(account, None, None, until)
634
            return version_id, mtime
635
        except:
636
            raise NameError('Account does not exist')
637
    
638
    def _get_containerinfo(self, account, container, until=None):
639
        try:
640
            path, version_id, mtime, size = self._get_versioninfo(account, container, None, until)
641
            return path, version_id, mtime
642
        except:
643
            raise NameError('Container does not exist')
644
    
645
    def _get_objectinfo(self, account, container, name, version=None):
646
        path = os.path.join(account, container, name)
647
        version_id, muser, mtime, size = self._get_version(path, version)
648
        return path, version_id, muser, mtime, size
649
    
650
    def _get_metadata(self, path, version):
651
        sql = 'select key, value from metadata where version_id = ?'
652
        c = self.con.execute(sql, (version,))
653
        return dict(c.fetchall())
654
    
655
    def _put_metadata(self, user, path, meta, replace=False, copy_data=True):
656
        """Create a new version and store metadata."""
657
        
658
        src_version_id, dest_version_id = self._copy_version(user, path, path, not replace, copy_data)
659
        for k, v in meta.iteritems():
660
            if not replace and v == '':
661
                sql = 'delete from metadata where version_id = ? and key = ?'
662
                self.con.execute(sql, (dest_version_id, k))
663
            else:
664
                sql = 'insert or replace into metadata (version_id, key, value) values (?, ?, ?)'
665
                self.con.execute(sql, (dest_version_id, k, v))
666
        self.con.commit()
667
    
668
    def _check_policy(self, policy):
669
        for k in policy.keys():
670
            if policy[k] == '':
671
                policy[k] = self.default_policy.get(k)
672
        for k, v in policy.iteritems():
673
            if k == 'quota':
674
                q = int(v) # May raise ValueError.
675
                if q < 0:
676
                    raise ValueError
677
            elif k == 'versioning':
678
                if v not in ['auto', 'manual', 'none']:
679
                    raise ValueError
680
            else:
681
                raise ValueError
682
    
683
    def _get_policy(self, path):
684
        sql = 'select key, value from policy where name = ?'
685
        c = self.con.execute(sql, (path,))
686
        return dict(c.fetchall())
687
    
688
    def _list_objects(self, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], until=None):
689
        cont_prefix = path + '/'
690
        if keys and len(keys) > 0:
691
            sql = '''select distinct o.name, o.version_id from (%s) o, metadata m where o.name like ? and
692
                        m.version_id = o.version_id and m.key in (%s) order by o.name'''
693
            sql = sql % (self._sql_until(until), ', '.join('?' * len(keys)))
694
            param = (cont_prefix + prefix + '%',) + tuple(keys)
695
        else:
696
            sql = 'select name, version_id from (%s) where name like ? order by name'
697
            sql = sql % self._sql_until(until)
698
            param = (cont_prefix + prefix + '%',)
699
        c = self.con.execute(sql, param)
700
        objects = [(x[0][len(cont_prefix):], x[1]) for x in c.fetchall()]
701
        if delimiter:
702
            pseudo_objects = []
703
            for x in objects:
704
                pseudo_name = x[0]
705
                i = pseudo_name.find(delimiter, len(prefix))
706
                if not virtual:
707
                    # If the delimiter is not found, or the name ends
708
                    # with the delimiter's first occurence.
709
                    if i == -1 or len(pseudo_name) == i + len(delimiter):
710
                        pseudo_objects.append(x)
711
                else:
712
                    # If the delimiter is found, keep up to (and including) the delimiter.
713
                    if i != -1:
714
                        pseudo_name = pseudo_name[:i + len(delimiter)]
715
                    if pseudo_name not in [y[0] for y in pseudo_objects]:
716
                        if pseudo_name == x[0]:
717
                            pseudo_objects.append(x)
718
                        else:
719
                            pseudo_objects.append((pseudo_name, None))
720
            objects = pseudo_objects
721
        
722
        start = 0
723
        if marker:
724
            try:
725
                start = [x[0] for x in objects].index(marker) + 1
726
            except ValueError:
727
                pass
728
        if not limit or limit > 10000:
729
            limit = 10000
730
        return objects[start:start + limit]
731
    
732
    def _del_version(self, version):
733
        self.mapper.map_remv(version)
734
        sql = 'delete from versions where version_id = ?'
735
        self.con.execute(sql, (version,))
736
    
737
    # Access control functions.
738
    
739
    def _check_groups(self, groups):
740
        # Example follows.
741
        # for k, v in groups.iteritems():
742
        #     if True in [False or ',' in x for x in v]:
743
        #         raise ValueError('Bad characters in groups')
744
        pass
745
    
746
    def _get_groups(self, account):
747
        sql = 'select gname, user from groups where account = ?'
748
        c = self.con.execute(sql, (account,))
749
        groups = {}
750
        for row in c.fetchall():
751
            if row[0] not in groups:
752
                groups[row[0]] = []
753
            groups[row[0]].append(row[1])
754
        return groups
755
    
756
    def _put_groups(self, account, groups, replace=False):
757
        if replace:
758
            self._del_groups(account)
759
        for k, v in groups.iteritems():
760
            sql = 'delete from groups where account = ? and gname = ?'
761
            self.con.execute(sql, (account, k))
762
            if v:
763
                sql = 'insert into groups (account, gname, user) values (?, ?, ?)'
764
                self.con.executemany(sql, [(account, k, x) for x in v])
765
        self.con.commit()
766
    
767
    def _del_groups(self, account):
768
        sql = 'delete from groups where account = ?'
769
        self.con.execute(sql, (account,))
770
    
771
    def _check_permissions(self, path, permissions):
772
        # Check for existing permissions.
773
        sql = '''select name from permissions
774
                    where name != ? and (name like ? or ? like name || ?)'''
775
        c = self.con.execute(sql, (path, path + '%', path, '%'))
776
        row = c.fetchone()
777
        if row:
778
            ae = AttributeError()
779
            ae.data = row[0]
780
            raise ae
781
        
782
        # Format given permissions.
783
        if len(permissions) == 0:
784
            return [], []
785
        r = permissions.get('read', [])
786
        w = permissions.get('write', [])
787
        # Examples follow.
788
        # if True in [False or ',' in x for x in r]:
789
        #     raise ValueError('Bad characters in read permissions')
790
        # if True in [False or ',' in x for x in w]:
791
        #     raise ValueError('Bad characters in write permissions')
792
        return r, w
793
    
794
    def _get_permissions(self, path):
795
        # Check for permissions at path or above.
796
        sql = 'select name, op, user from permissions where ? like name || ?'
797
        c = self.con.execute(sql, (path, '%'))
798
        name = path
799
        perms = {} # Return nothing, if nothing is set.
800
        for row in c.fetchall():
801
            name = row[0]
802
            if row[1] not in perms:
803
                perms[row[1]] = []
804
            perms[row[1]].append(row[2])
805
        return name, perms
806
    
807
    def _put_permissions(self, path, r, w):
808
        sql = 'delete from permissions where name = ?'
809
        self.con.execute(sql, (path,))
810
        sql = 'insert into permissions (name, op, user) values (?, ?, ?)'
811
        if r:
812
            self.con.executemany(sql, [(path, 'read', x) for x in r])
813
        if w:
814
            self.con.executemany(sql, [(path, 'write', x) for x in w])
815
        self.con.commit()
816
    
817
    def _get_public(self, path):
818
        sql = 'select name from public where name = ?'
819
        c = self.con.execute(sql, (path,))
820
        row = c.fetchone()
821
        if not row:
822
            return False
823
        return True
824
    
825
    def _put_public(self, path, public):
826
        if not public:
827
            sql = 'delete from public where name = ?'
828
        else:
829
            sql = 'insert or replace into public (name) values (?)'
830
        self.con.execute(sql, (path,))
831
        self.con.commit()
832
    
833
    def _del_sharing(self, path):
834
        sql = 'delete from permissions where name = ?'
835
        self.con.execute(sql, (path,))
836
        sql = 'delete from public where name = ?'
837
        self.con.execute(sql, (path,))
838
        self.con.commit()
839
    
840
    def _is_allowed(self, user, account, container, name, op='read'):
841
        if user == account:
842
            return True
843
        path = os.path.join(account, container, name)
844
        if op == 'read' and self._get_public(path):
845
            return True
846
        perm_path, perms = self._get_permissions(path)
847
        
848
        # Expand groups.
849
        for x in ('read', 'write'):
850
            g_perms = set()
851
            for y in perms.get(x, []):
852
                if ':' in y:
853
                    g_account, g_name = y.split(':', 1)
854
                    groups = self._get_groups(g_account)
855
                    if g_name in groups:
856
                        g_perms.update(groups[g_name])
857
                else:
858
                    g_perms.add(y)
859
            perms[x] = g_perms
860
        
861
        if op == 'read' and ('*' in perms['read'] or user in perms['read']):
862
            return True
863
        if '*' in perms['write'] or user in perms['write']:
864
            return True
865
        return False
866
    
867
    def _can_read(self, user, account, container, name):
868
        if not self._is_allowed(user, account, container, name, 'read'):
869
            raise NotAllowedError
870
    
871
    def _can_write(self, user, account, container, name):
872
        if not self._is_allowed(user, account, container, name, 'write'):
873
            raise NotAllowedError
874
    
875
    def _allowed_paths(self, user, prefix=None):
876
        sql = '''select distinct name from permissions where (user = ?
877
                    or user in (select account || ':' || gname from groups where user = ?))'''
878
        param = (user, user)
879
        if prefix:
880
            sql += ' and name like ?'
881
            param += (prefix + '/%',)
882
        c = self.con.execute(sql, param)
883
        return [x[0] for x in c.fetchall()]
884
    
885
    def _allowed_accounts(self, user):
886
        allow = set()
887
        for path in self._allowed_paths(user):
888
            allow.add(path.split('/', 1)[0])
889
        return sorted(allow)
890
    
891
    def _allowed_containers(self, user, account):
892
        allow = set()
893
        for path in self._allowed_paths(user, account):
894
            allow.add(path.split('/', 2)[1])
895
        return sorted(allow)