Statistics
| Branch: | Tag: | Revision:

root / pithos / backends / simple.py @ 1993fea9

History | View | Annotate | Download (35.3 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
# 
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
# 
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
# 
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
# 
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
# 
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import os
35
import time
36
import sqlite3
37
import logging
38
import types
39
import hashlib
40
import shutil
41
import pickle
42

    
43
from base import NotAllowedError, BaseBackend
44

    
45

    
46
logger = logging.getLogger(__name__)
47

    
48

    
49
class SimpleBackend(BaseBackend):
50
    """A simple backend.
51
    
52
    Uses SQLite for storage.
53
    """
54
    
55
    # TODO: Automatic/manual clean-up after a time interval.
56
    
57
    def __init__(self, db):
58
        self.hash_algorithm = 'sha1'
59
        self.block_size = 128 * 1024 # 128KB
60
        
61
        self.default_policy = {'quota': 0, 'versioning': 'auto'}
62
        
63
        basepath = os.path.split(db)[0]
64
        if basepath and not os.path.exists(basepath):
65
            os.makedirs(basepath)
66
        
67
        self.con = sqlite3.connect(db, check_same_thread=False)
68
        sql = '''create table if not exists versions (
69
                    version_id integer primary key,
70
                    name text,
71
                    user text,
72
                    tstamp datetime default current_timestamp,
73
                    size integer default 0,
74
                    hide integer default 0)'''
75
        self.con.execute(sql)
76
        sql = '''create table if not exists metadata (
77
                    version_id integer, key text, value text, primary key (version_id, key))'''
78
        self.con.execute(sql)
79
        sql = '''create table if not exists blocks (
80
                    block_id text, data blob, primary key (block_id))'''
81
        self.con.execute(sql)
82
        sql = '''create table if not exists hashmaps (
83
                    version_id integer, pos integer, block_id text, primary key (version_id, pos))'''
84
        self.con.execute(sql)
85
        sql = '''create table if not exists groups (
86
                    account text, name text, users text, primary key (account, name))'''
87
        self.con.execute(sql)
88
        sql = '''create table if not exists policy (
89
                    name text, key text, value text, primary key (name, key))'''
90
        self.con.execute(sql)
91
        sql = '''create table if not exists permissions (
92
                    name text, read text, write text, primary key (name))'''
93
        self.con.execute(sql)
94
        sql = '''create table if not exists public (
95
                    name text, primary key (name))'''
96
        self.con.execute(sql)
97
        self.con.commit()
98
    
99
    def get_account_meta(self, user, account, until=None):
100
        """Return a dictionary with the account metadata."""
101
        
102
        logger.debug("get_account_meta: %s %s", account, until)
103
        if user != account:
104
            raise NotAllowedError
105
        try:
106
            version_id, mtime = self._get_accountinfo(account, until)
107
        except NameError:
108
            version_id = None
109
            mtime = 0
110
        count, bytes, tstamp = self._get_pathstats(account, until)
111
        if mtime > tstamp:
112
            tstamp = mtime
113
        if until is None:
114
            modified = tstamp
115
        else:
116
            modified = self._get_pathstats(account)[2] # Overall last modification
117
            if mtime > modified:
118
                modified = mtime
119
        
120
        # Proper count.
121
        sql = 'select count(name) from (%s) where name glob ? and not name glob ?'
122
        sql = sql % self._sql_until(until)
123
        c = self.con.execute(sql, (account + '/*', account + '/*/*'))
124
        row = c.fetchone()
125
        count = row[0]
126
        
127
        meta = self._get_metadata(account, version_id)
128
        meta.update({'name': account, 'count': count, 'bytes': bytes})
129
        if modified:
130
            meta.update({'modified': modified})
131
        if until is not None:
132
            meta.update({'until_timestamp': tstamp})
133
        return meta
134
    
135
    def update_account_meta(self, user, account, meta, replace=False):
136
        """Update the metadata associated with the account."""
137
        
138
        logger.debug("update_account_meta: %s %s %s", account, meta, replace)
139
        if user != account:
140
            raise NotAllowedError
141
        self._put_metadata(user, account, meta, replace)
142
    
143
    def get_account_groups(self, user, account):
144
        """Return a dictionary with the user groups defined for this account."""
145
        
146
        logger.debug("get_account_groups: %s", account)
147
        if user != account:
148
            raise NotAllowedError
149
        return self._get_groups(account)
150
    
151
    def update_account_groups(self, user, account, groups, replace=False):
152
        """Update the groups associated with the account."""
153
        
154
        logger.debug("update_account_groups: %s %s %s", account, groups, replace)
155
        if user != account:
156
            raise NotAllowedError
157
        for k, v in groups.iteritems():
158
            if True in [False or ',' in x for x in v]:
159
                raise ValueError('Bad characters in groups')
160
        if replace:
161
            sql = 'delete from groups where account = ?'
162
            self.con.execute(sql, (account,))
163
        for k, v in groups.iteritems():
164
            if len(v) == 0:
165
                if not replace:
166
                    sql = 'delete from groups where account = ? and name = ?'
167
                    self.con.execute(sql, (account, k))
168
            else:
169
                sql = 'insert or replace into groups (account, name, users) values (?, ?, ?)'
170
                self.con.execute(sql, (account, k, ','.join(v)))
171
        self.con.commit()
172
    
173
    def delete_account(self, user, account):
174
        """Delete the account with the given name."""
175
        
176
        logger.debug("delete_account: %s", account)
177
        if user != account:
178
            raise NotAllowedError
179
        count, bytes, tstamp = self._get_pathstats(account)
180
        if count > 0:
181
            raise IndexError('Account is not empty')
182
        self._del_path(account) # Point of no return.
183
    
184
    def list_containers(self, user, account, marker=None, limit=10000, until=None):
185
        """Return a list of containers existing under an account."""
186
        
187
        logger.debug("list_containers: %s %s %s %s", account, marker, limit, until)
188
        if user != account:
189
            raise NotAllowedError
190
        return self._list_objects(account, '', '/', marker, limit, False, [], until)
191
    
192
    def get_container_meta(self, user, account, container, until=None):
193
        """Return a dictionary with the container metadata."""
194
        
195
        logger.debug("get_container_meta: %s %s %s", account, container, until)
196
        if user != account:
197
            raise NotAllowedError
198
        path, version_id, mtime = self._get_containerinfo(account, container, until)
199
        count, bytes, tstamp = self._get_pathstats(path, until)
200
        if mtime > tstamp:
201
            tstamp = mtime
202
        if until is None:
203
            modified = tstamp
204
        else:
205
            modified = self._get_pathstats(path)[2] # Overall last modification
206
            if mtime > modified:
207
                modified = mtime
208
        
209
        meta = self._get_metadata(path, version_id)
210
        meta.update({'name': container, 'count': count, 'bytes': bytes, 'modified': modified})
211
        if until is not None:
212
            meta.update({'until_timestamp': tstamp})
213
        return meta
214
    
215
    def update_container_meta(self, user, account, container, meta, replace=False):
216
        """Update the metadata associated with the container."""
217
        
218
        logger.debug("update_container_meta: %s %s %s %s", account, container, meta, replace)
219
        if user != account:
220
            raise NotAllowedError
221
        path, version_id, mtime = self._get_containerinfo(account, container)
222
        self._put_metadata(user, path, meta, replace)
223
    
224
    def get_container_policy(self, user, account, container):
225
        """Return a dictionary with the container policy."""
226
        
227
        logger.debug("get_container_policy: %s %s", account, container)
228
        if user != account:
229
            raise NotAllowedError
230
        path = self._get_containerinfo(account, container)[0]
231
        return self._get_policy(path)
232
    
233
    def update_container_policy(self, user, account, container, policy, replace=False):
234
        """Update the policy associated with the account."""
235
        
236
        logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace)
237
        if user != account:
238
            raise NotAllowedError
239
        path = self._get_containerinfo(account, container)[0]
240
        self._check_policy(policy)
241
        if replace:
242
            for k, v in self.default_policy.iteritems():
243
                if k not in policy:
244
                    policy[k] = v
245
        for k, v in policy.iteritems():
246
            sql = 'insert or replace into policy (name, key, value) values (?, ?, ?)'
247
            self.con.execute(sql, (path, k, v))
248
        self.con.commit()
249
    
250
    def put_container(self, user, account, container, policy=None):
251
        """Create a new container with the given name."""
252
        
253
        logger.debug("put_container: %s %s %s", account, container, policy)
254
        if user != account:
255
            raise NotAllowedError
256
        try:
257
            path, version_id, mtime = self._get_containerinfo(account, container)
258
        except NameError:
259
            pass
260
        else:
261
            raise NameError('Container already exists')
262
        if policy:
263
            self._check_policy(policy)
264
        path = os.path.join(account, container)
265
        version_id = self._put_version(path, user)
266
        for k, v in self.default_policy.iteritems():
267
            if k not in policy:
268
                policy[k] = v
269
        for k, v in policy.iteritems():
270
            sql = 'insert or replace into policy (name, key, value) values (?, ?, ?)'
271
            self.con.execute(sql, (path, k, v))
272
        self.con.commit()
273
    
274
    def delete_container(self, user, account, container):
275
        """Delete the container with the given name."""
276
        
277
        logger.debug("delete_container: %s %s", account, container)
278
        if user != account:
279
            raise NotAllowedError
280
        path, version_id, mtime = self._get_containerinfo(account, container)
281
        count, bytes, tstamp = self._get_pathstats(path)
282
        if count > 0:
283
            raise IndexError('Container is not empty')
284
        self._del_path(path) # Point of no return.
285
        self._copy_version(user, account, account, True, True) # New account version.
286
    
287
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], until=None):
288
        """Return a list of objects existing under a container."""
289
        
290
        logger.debug("list_objects: %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, until)
291
        if user != account:
292
            raise NotAllowedError
293
        path, version_id, mtime = self._get_containerinfo(account, container, until)
294
        return self._list_objects(path, prefix, delimiter, marker, limit, virtual, keys, until)
295
    
296
    def list_object_meta(self, user, account, container, until=None):
297
        """Return a list with all the container's object meta keys."""
298
        
299
        logger.debug("list_object_meta: %s %s %s", account, container, until)
300
        if user != account:
301
            raise NotAllowedError
302
        path, version_id, mtime = self._get_containerinfo(account, container, until)
303
        sql = '''select distinct m.key from (%s) o, metadata m
304
                    where m.version_id = o.version_id and o.name like ?'''
305
        sql = sql % self._sql_until(until)
306
        c = self.con.execute(sql, (path + '/%',))
307
        return [x[0] for x in c.fetchall()]
308
    
309
    def get_object_meta(self, user, account, container, name, version=None):
310
        """Return a dictionary with the object metadata."""
311
        
312
        logger.debug("get_object_meta: %s %s %s %s", account, container, name, version)
313
        self._can_read(user, account, container, name)
314
        path, version_id, muser, mtime, size = self._get_objectinfo(account, container, name, version)
315
        if version is None:
316
            modified = mtime
317
        else:
318
            modified = self._get_version(path, version)[2] # Overall last modification
319
        
320
        meta = self._get_metadata(path, version_id)
321
        meta.update({'name': name, 'bytes': size})
322
        meta.update({'version': version_id, 'version_timestamp': mtime})
323
        meta.update({'modified': modified, 'modified_by': muser})
324
        return meta
325
    
326
    def update_object_meta(self, user, account, container, name, meta, replace=False):
327
        """Update the metadata associated with the object."""
328
        
329
        logger.debug("update_object_meta: %s %s %s %s %s", account, container, name, meta, replace)
330
        self._can_write(user, account, container, name)
331
        path, version_id, muser, mtime, size = self._get_objectinfo(account, container, name)
332
        self._put_metadata(user, path, meta, replace)
333
    
334
    def get_object_permissions(self, user, account, container, name):
335
        """Return the path from which this object gets its permissions from,\
336
        along with a dictionary containing the permissions."""
337
        
338
        logger.debug("get_object_permissions: %s %s %s", account, container, name)
339
        self._can_read(user, account, container, name)
340
        path = self._get_objectinfo(account, container, name)[0]
341
        return self._get_permissions(path)
342
    
343
    def update_object_permissions(self, user, account, container, name, permissions):
344
        """Update the permissions associated with the object."""
345
        
346
        logger.debug("update_object_permissions: %s %s %s %s", account, container, name, permissions)
347
        if user != account:
348
            raise NotAllowedError
349
        path = self._get_objectinfo(account, container, name)[0]
350
        r, w = self._check_permissions(path, permissions)
351
        self._put_permissions(path, r, w)
352
    
353
    def get_object_public(self, user, account, container, name):
354
        """Return the public URL of the object if applicable."""
355
        
356
        logger.debug("get_object_public: %s %s %s", account, container, name)
357
        self._can_read(user, account, container, name)
358
        path = self._get_objectinfo(account, container, name)[0]
359
        if self._get_public(path):
360
            return '/public/' + path
361
        return None
362
    
363
    def update_object_public(self, user, account, container, name, public):
364
        """Update the public status of the object."""
365
        
366
        logger.debug("update_object_public: %s %s %s %s", account, container, name, public)
367
        self._can_write(user, account, container, name)
368
        path = self._get_objectinfo(account, container, name)[0]
369
        self._put_public(path, public)
370
    
371
    def get_object_hashmap(self, user, account, container, name, version=None):
372
        """Return the object's size and a list with partial hashes."""
373
        
374
        logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
375
        self._can_read(user, account, container, name)
376
        path, version_id, muser, mtime, size = self._get_objectinfo(account, container, name, version)
377
        sql = 'select block_id from hashmaps where version_id = ? order by pos asc'
378
        c = self.con.execute(sql, (version_id,))
379
        hashmap = [x[0] for x in c.fetchall()]
380
        return size, hashmap
381
    
382
    def update_object_hashmap(self, user, account, container, name, size, hashmap, meta={}, replace_meta=False, permissions=None):
383
        """Create/update an object with the specified size and partial hashes."""
384
        
385
        logger.debug("update_object_hashmap: %s %s %s %s %s", account, container, name, size, hashmap)
386
        if permissions is not None and user != account:
387
            raise NotAllowedError
388
        self._can_write(user, account, container, name)
389
        missing = []
390
        for i in range(len(hashmap)):
391
            sql = 'select count(*) from blocks where block_id = ?'
392
            c = self.con.execute(sql, (hashmap[i],))
393
            if c.fetchone()[0] == 0:
394
                missing.append(hashmap[i])
395
        if missing:
396
            ie = IndexError()
397
            ie.data = missing
398
            raise ie
399
        path = self._get_containerinfo(account, container)[0]
400
        path = os.path.join(path, name)
401
        if permissions is not None:
402
            r, w = self._check_permissions(path, permissions)
403
        src_version_id, dest_version_id = self._copy_version(user, path, path, not replace_meta, False)
404
        sql = 'update versions set size = ? where version_id = ?'
405
        self.con.execute(sql, (size, dest_version_id))
406
        # TODO: Check for block_id existence.
407
        for i in range(len(hashmap)):
408
            sql = 'insert or replace into hashmaps (version_id, pos, block_id) values (?, ?, ?)'
409
            self.con.execute(sql, (dest_version_id, i, hashmap[i]))
410
        for k, v in meta.iteritems():
411
            sql = 'insert or replace into metadata (version_id, key, value) values (?, ?, ?)'
412
            self.con.execute(sql, (dest_version_id, k, v))
413
        if permissions is not None:
414
            sql = 'insert or replace into permissions (name, read, write) values (?, ?, ?)'
415
            self.con.execute(sql, (path, r, w))
416
        self.con.commit()
417
    
418
    def copy_object(self, user, account, src_container, src_name, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None):
419
        """Copy an object's data and metadata."""
420
        
421
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s", account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version)
422
        if permissions is not None and user != account:
423
            raise NotAllowedError
424
        self._can_read(user, account, src_container, src_name)
425
        self._can_write(user, account, dest_container, dest_name)
426
        self._get_containerinfo(account, src_container)
427
        if src_version is None:
428
            src_path = self._get_objectinfo(account, src_container, src_name)[0]
429
        else:
430
            src_path = os.path.join(account, src_container, src_name)
431
        dest_path = self._get_containerinfo(account, dest_container)[0]
432
        dest_path = os.path.join(dest_path, dest_name)
433
        if permissions is not None:
434
            r, w = self._check_permissions(dest_path, permissions)
435
        src_version_id, dest_version_id = self._copy_version(user, src_path, dest_path, not replace_meta, True, src_version)
436
        for k, v in dest_meta.iteritems():
437
            sql = 'insert or replace into metadata (version_id, key, value) values (?, ?, ?)'
438
            self.con.execute(sql, (dest_version_id, k, v))
439
        if permissions is not None:
440
            sql = 'insert or replace into permissions (name, read, write) values (?, ?, ?)'
441
            self.con.execute(sql, (dest_path, r, w))
442
        self.con.commit()
443
    
444
    def move_object(self, user, account, src_container, src_name, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None):
445
        """Move an object's data and metadata."""
446
        
447
        logger.debug("move_object: %s %s %s %s %s %s %s %s", account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions)
448
        self.copy_object(user, account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions, None)
449
        self.delete_object(user, account, src_container, src_name)
450
    
451
    def delete_object(self, user, account, container, name):
452
        """Delete an object."""
453
        
454
        logger.debug("delete_object: %s %s %s", account, container, name)
455
        if user != account:
456
            raise NotAllowedError
457
        path = self._get_objectinfo(account, container, name)[0]
458
        self._put_version(path, user, 0, 1)
459
        sql = 'delete from permissions where name = ?'
460
        self.con.execute(sql, (path,))
461
        sql = 'delete from public where name = ?'
462
        self.con.execute(sql, (path,))
463
        self.con.commit()
464
    
465
    def list_versions(self, user, account, container, name):
466
        """Return a list of all (version, version_timestamp) tuples for an object."""
467
        
468
        logger.debug("list_versions: %s %s %s", account, container, name)
469
        self._can_read(user, account, container, name)
470
        # This will even show deleted versions.
471
        path = os.path.join(account, container, name)
472
        sql = '''select distinct version_id, strftime('%s', tstamp) from versions where name = ? and hide = 0'''
473
        c = self.con.execute(sql, (path,))
474
        return [(int(x[0]), int(x[1])) for x in c.fetchall()]
475
    
476
    def get_block(self, hash):
477
        """Return a block's data."""
478
        
479
        logger.debug("get_block: %s", hash)
480
        c = self.con.execute('select data from blocks where block_id = ?', (hash,))
481
        row = c.fetchone()
482
        if row:
483
            return str(row[0])
484
        else:
485
            raise NameError('Block does not exist')
486
    
487
    def put_block(self, data):
488
        """Create a block and return the hash."""
489
        
490
        logger.debug("put_block: %s", len(data))
491
        h = hashlib.new(self.hash_algorithm)
492
        h.update(data.rstrip('\x00'))
493
        hash = h.hexdigest()
494
        sql = 'insert or ignore into blocks (block_id, data) values (?, ?)'
495
        self.con.execute(sql, (hash, buffer(data)))
496
        self.con.commit()
497
        return hash
498
    
499
    def update_block(self, hash, data, offset=0):
500
        """Update a known block and return the hash."""
501
        
502
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
503
        if offset == 0 and len(data) == self.block_size:
504
            return self.put_block(data)
505
        src_data = self.get_block(hash)
506
        bs = self.block_size
507
        if offset < 0 or offset > bs or offset + len(data) > bs:
508
            raise IndexError('Offset or data outside block limits')
509
        dest_data = src_data[:offset] + data + src_data[offset + len(data):]
510
        return self.put_block(dest_data)
511
    
512
    def _sql_until(self, until=None):
513
        """Return the sql to get the latest versions until the timestamp given."""
514
        if until is None:
515
            until = int(time.time())
516
        sql = '''select version_id, name, strftime('%s', tstamp) as tstamp, size from versions v
517
                    where version_id = (select max(version_id) from versions
518
                                        where v.name = name and tstamp <= datetime(%s, 'unixepoch'))
519
                    and hide = 0'''
520
        return sql % ('%s', until)
521
    
522
    def _get_pathstats(self, path, until=None):
523
        """Return count and sum of size of everything under path and latest timestamp."""
524
        
525
        sql = 'select count(version_id), total(size), max(tstamp) from (%s) where name like ?'
526
        sql = sql % self._sql_until(until)
527
        c = self.con.execute(sql, (path + '/%',))
528
        row = c.fetchone()
529
        tstamp = row[2] if row[2] is not None else 0
530
        return int(row[0]), int(row[1]), int(tstamp)
531
    
532
    def _get_version(self, path, version=None):
533
        if version is None:
534
            sql = '''select version_id, user, strftime('%s', tstamp), size, hide from versions where name = ?
535
                        order by version_id desc limit 1'''
536
            c = self.con.execute(sql, (path,))
537
            row = c.fetchone()
538
            if not row or int(row[4]):
539
                raise NameError('Object does not exist')
540
        else:
541
            # The database (sqlite) will not complain if the version is not an integer.
542
            sql = '''select version_id, user, strftime('%s', tstamp), size from versions where name = ?
543
                        and version_id = ?'''
544
            c = self.con.execute(sql, (path, version))
545
            row = c.fetchone()
546
            if not row:
547
                raise IndexError('Version does not exist')
548
        return str(row[0]), str(row[1]), int(row[2]), int(row[3])
549
    
550
    def _put_version(self, path, user, size=0, hide=0):
551
        sql = 'insert into versions (name, user, size, hide) values (?, ?, ?, ?)'
552
        id = self.con.execute(sql, (path, user, size, hide)).lastrowid
553
        self.con.commit()
554
        return str(id)
555
    
556
    def _copy_version(self, user, src_path, dest_path, copy_meta=True, copy_data=True, src_version=None):
557
        if src_version is not None:
558
            src_version_id, muser, mtime, size = self._get_version(src_path, src_version)
559
        else:
560
            # Latest or create from scratch.
561
            try:
562
                src_version_id, muser, mtime, size = self._get_version(src_path)
563
            except NameError:
564
                src_version_id = None
565
                size = 0
566
        if not copy_data:
567
            size = 0
568
        dest_version_id = self._put_version(dest_path, user, size)
569
        if copy_meta and src_version_id is not None:
570
            sql = 'insert into metadata select %s, key, value from metadata where version_id = ?'
571
            sql = sql % dest_version_id
572
            self.con.execute(sql, (src_version_id,))
573
        if copy_data and src_version_id is not None:
574
            sql = 'insert into hashmaps select %s, pos, block_id from hashmaps where version_id = ?'
575
            sql = sql % dest_version_id
576
            self.con.execute(sql, (src_version_id,))
577
        self.con.commit()
578
        return src_version_id, dest_version_id
579
    
580
    def _get_versioninfo(self, account, container, name, until=None):
581
        """Return path, latest version, associated timestamp and size until the timestamp given."""
582
        
583
        p = (account, container, name)
584
        try:
585
            p = p[:p.index(None)]
586
        except ValueError:
587
            pass
588
        path = os.path.join(*p)
589
        sql = '''select version_id, tstamp, size from (%s) where name = ?'''
590
        sql = sql % self._sql_until(until)
591
        c = self.con.execute(sql, (path,))
592
        row = c.fetchone()
593
        if row is None:
594
            raise NameError('Path does not exist')
595
        return path, str(row[0]), int(row[1]), int(row[2])
596
    
597
    def _get_accountinfo(self, account, until=None):
598
        try:
599
            path, version_id, mtime, size = self._get_versioninfo(account, None, None, until)
600
            return version_id, mtime
601
        except:
602
            raise NameError('Account does not exist')
603
    
604
    def _get_containerinfo(self, account, container, until=None):
605
        try:
606
            path, version_id, mtime, size = self._get_versioninfo(account, container, None, until)
607
            return path, version_id, mtime
608
        except:
609
            raise NameError('Container does not exist')
610
    
611
    def _get_objectinfo(self, account, container, name, version=None):
612
        path = os.path.join(account, container, name)
613
        version_id, muser, mtime, size = self._get_version(path, version)
614
        return path, version_id, muser, mtime, size
615
    
616
    def _get_metadata(self, path, version):
617
        sql = 'select key, value from metadata where version_id = ?'
618
        c = self.con.execute(sql, (version,))
619
        return dict(c.fetchall())
620
    
621
    def _put_metadata(self, user, path, meta, replace=False):
622
        """Create a new version and store metadata."""
623
        
624
        src_version_id, dest_version_id = self._copy_version(user, path, path, not replace, True)
625
        for k, v in meta.iteritems():
626
            if not replace and v == '':
627
                sql = 'delete from metadata where version_id = ? and key = ?'
628
                self.con.execute(sql, (dest_version_id, k))
629
            else:
630
                sql = 'insert or replace into metadata (version_id, key, value) values (?, ?, ?)'
631
                self.con.execute(sql, (dest_version_id, k, v))
632
        self.con.commit()
633
    
634
    def _get_groups(self, account):
635
        sql = 'select name, users from groups where account = ?'
636
        c = self.con.execute(sql, (account,))
637
        return dict([(x[0], x[1].split(',')) for x in c.fetchall()])
638
    
639
    def _check_policy(self, policy):
640
        for k in policy.keys():
641
            if policy[k] == '':
642
                policy[k] = self.default_policy.get(k)
643
        for k, v in policy.iteritems():
644
            if k == 'quota':
645
                q = int(v) # May raise ValueError.
646
                if q < 0:
647
                    raise ValueError
648
            elif k == 'versioning':
649
                if v not in ['auto', 'manual', 'none']:
650
                    raise ValueError
651
            else:
652
                raise ValueError
653
    
654
    def _get_policy(self, path):
655
        sql = 'select key, value from policy where name = ?'
656
        c = self.con.execute(sql, (path,))
657
        return dict(c.fetchall())
658
    
659
    def _is_allowed(self, user, account, container, name, op='read'):
660
        if user == account:
661
            return True
662
        path = os.path.join(account, container, name)
663
        if op == 'read' and self._get_public(path):
664
            return True
665
        perm_path, perms = self._get_permissions(path)
666
        
667
        # Expand groups.
668
        for x in ('read', 'write'):
669
            g_perms = []
670
            for y in perms.get(x, []):
671
                if ':' in y:
672
                    g_account, g_name = y.split(':', 1)
673
                    groups = self._get_groups(g_account)
674
                    if g_name in groups:
675
                        g_perms += groups[g_name]
676
                else:
677
                    g_perms.append(y)
678
            perms[x] = g_perms
679
        
680
        if op == 'read' and user in perms.get('read', []):
681
            return True
682
        if user in perms.get('write', []):
683
            return True
684
        return False
685
    
686
    def _can_read(self, user, account, container, name):
687
        if not self._is_allowed(user, account, container, name, 'read'):
688
            raise NotAllowedError
689
    
690
    def _can_write(self, user, account, container, name):
691
        if not self._is_allowed(user, account, container, name, 'write'):
692
            raise NotAllowedError
693
    
694
    def _check_permissions(self, path, permissions):
695
        # Check for existing permissions.
696
        sql = '''select name from permissions
697
                    where name != ? and (name like ? or ? like name || ?)'''
698
        c = self.con.execute(sql, (path, path + '%', path, '%'))
699
        row = c.fetchone()
700
        if row:
701
            ae = AttributeError()
702
            ae.data = row[0]
703
            raise ae
704
        
705
        # Format given permissions.
706
        if len(permissions) == 0:
707
            return '', ''
708
        r = permissions.get('read', [])
709
        w = permissions.get('write', [])
710
        if True in [False or ',' in x for x in r]:
711
            raise ValueError('Bad characters in read permissions')
712
        if True in [False or ',' in x for x in w]:
713
            raise ValueError('Bad characters in write permissions')
714
        return ','.join(r), ','.join(w)
715
    
716
    def _get_permissions(self, path):
717
        # Check for permissions at path or above.
718
        sql = 'select name, read, write from permissions where ? like name || ?'
719
        c = self.con.execute(sql, (path, '%'))
720
        row = c.fetchone()
721
        if not row:
722
            return path, {}
723
        
724
        name, r, w = row
725
        ret = {}
726
        if w != '':
727
            ret['write'] = w.split(',')
728
        if r != '':
729
            ret['read'] = r.split(',')
730
        return name, ret
731
    
732
    def _put_permissions(self, path, r, w):
733
        if r == '' and w == '':
734
            sql = 'delete from permissions where name = ?'
735
            self.con.execute(sql, (path,))
736
        else:
737
            sql = 'insert or replace into permissions (name, read, write) values (?, ?, ?)'
738
            self.con.execute(sql, (path, r, w))
739
        self.con.commit()
740
    
741
    def _get_public(self, path):
742
        sql = 'select name from public where name = ?'
743
        c = self.con.execute(sql, (path,))
744
        row = c.fetchone()
745
        if not row:
746
            return False
747
        return True
748
    
749
    def _put_public(self, path, public):
750
        if not public:
751
            sql = 'delete from public where name = ?'
752
        else:
753
            sql = 'insert or replace into public (name) values (?)'
754
        self.con.execute(sql, (path,))
755
        self.con.commit()
756
    
757
    def _list_objects(self, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], until=None):
758
        cont_prefix = path + '/'
759
        if keys and len(keys) > 0:
760
            sql = '''select distinct o.name, o.version_id from (%s) o, metadata m where o.name like ? and
761
                        m.version_id = o.version_id and m.key in (%s) order by o.name'''
762
            sql = sql % (self._sql_until(until), ', '.join('?' * len(keys)))
763
            param = (cont_prefix + prefix + '%',) + tuple(keys)
764
        else:
765
            sql = 'select name, version_id from (%s) where name like ? order by name'
766
            sql = sql % self._sql_until(until)
767
            param = (cont_prefix + prefix + '%',)
768
        c = self.con.execute(sql, param)
769
        objects = [(x[0][len(cont_prefix):], x[1]) for x in c.fetchall()]
770
        if delimiter:
771
            pseudo_objects = []
772
            for x in objects:
773
                pseudo_name = x[0]
774
                i = pseudo_name.find(delimiter, len(prefix))
775
                if not virtual:
776
                    # If the delimiter is not found, or the name ends
777
                    # with the delimiter's first occurence.
778
                    if i == -1 or len(pseudo_name) == i + len(delimiter):
779
                        pseudo_objects.append(x)
780
                else:
781
                    # If the delimiter is found, keep up to (and including) the delimiter.
782
                    if i != -1:
783
                        pseudo_name = pseudo_name[:i + len(delimiter)]
784
                    if pseudo_name not in [y[0] for y in pseudo_objects]:
785
                        if pseudo_name == x[0]:
786
                            pseudo_objects.append(x)
787
                        else:
788
                            pseudo_objects.append((pseudo_name, None))
789
            objects = pseudo_objects
790
        
791
        start = 0
792
        if marker:
793
            try:
794
                start = [x[0] for x in objects].index(marker) + 1
795
            except ValueError:
796
                pass
797
        if not limit or limit > 10000:
798
            limit = 10000
799
        return objects[start:start + limit]
800
    
801
    def _del_path(self, path):
802
        sql = '''delete from hashmaps where version_id in
803
                    (select version_id from versions where name = ?)'''
804
        self.con.execute(sql, (path,))
805
        sql = '''delete from metadata where version_id in
806
                    (select version_id from versions where name = ?)'''
807
        self.con.execute(sql, (path,))
808
        sql = '''delete from versions where name = ?'''
809
        self.con.execute(sql, (path,))
810
        self.con.commit()