Statistics
| Branch: | Tag: | Revision:

root / pithos / backends / simple.py @ f77b150f

History | View | Annotate | Download (39.8 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
# 
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
# 
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
# 
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
# 
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
# 
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import os
35
import time
36
import sqlite3
37
import logging
38
import types
39
import hashlib
40
import shutil
41
import pickle
42

    
43
from base import NotAllowedError, BaseBackend
44

    
45

    
46
logger = logging.getLogger(__name__)
47

    
48

    
49
class SimpleBackend(BaseBackend):
50
    """A simple backend.
51
    
52
    Uses SQLite for storage.
53
    """
54
    
55
    # TODO: Automatic/manual clean-up after a time interval.
56
    
57
    def __init__(self, db):
58
        self.hash_algorithm = 'sha1'
59
        self.block_size = 128 * 1024 # 128KB
60
        
61
        self.default_policy = {'quota': 0, 'versioning': 'auto'}
62
        
63
        basepath = os.path.split(db)[0]
64
        if basepath and not os.path.exists(basepath):
65
            os.makedirs(basepath)
66
        
67
        self.con = sqlite3.connect(db, check_same_thread=False)
68
        
69
        sql = '''pragma foreign_keys = on'''
70
        self.con.execute(sql)
71
        
72
        sql = '''create table if not exists versions (
73
                    version_id integer primary key,
74
                    name text,
75
                    user text,
76
                    tstamp integer not null,
77
                    size integer default 0,
78
                    trash integer default 0,
79
                    until integer default null)'''
80
        self.con.execute(sql)
81
        sql = '''create table if not exists metadata (
82
                    version_id integer,
83
                    key text,
84
                    value text,
85
                    primary key (version_id, key)
86
                    foreign key (version_id) references versions(version_id)
87
                    on delete cascade)'''
88
        self.con.execute(sql)
89
        sql = '''create table if not exists hashmaps (
90
                    version_id integer,
91
                    pos integer,
92
                    block_id text,
93
                    primary key (version_id, pos)
94
                    foreign key (version_id) references versions(version_id)
95
                    on delete cascade)'''
96
        self.con.execute(sql)
97
        sql = '''create table if not exists blocks (
98
                    block_id text, data blob, primary key (block_id))'''
99
        self.con.execute(sql)
100
        
101
        sql = '''create table if not exists policy (
102
                    name text, key text, value text, primary key (name, key))'''
103
        self.con.execute(sql)
104
        
105
        sql = '''create table if not exists groups (
106
                    account text, name text, users text, primary key (account, name))'''
107
        self.con.execute(sql)
108
        sql = '''create table if not exists permissions (
109
                    name text, read text, write text, primary key (name))'''
110
        self.con.execute(sql)
111
        sql = '''create table if not exists public (
112
                    name text, primary key (name))'''
113
        self.con.execute(sql)
114
        self.con.commit()
115
    
116
    def get_account_meta(self, user, account, until=None):
117
        """Return a dictionary with the account metadata."""
118
        
119
        logger.debug("get_account_meta: %s %s", account, until)
120
        if user != account:
121
            raise NotAllowedError
122
        try:
123
            version_id, mtime = self._get_accountinfo(account, until)
124
        except NameError:
125
            version_id = None
126
            mtime = 0
127
        count, bytes, tstamp = self._get_pathstats(account, until)
128
        if mtime > tstamp:
129
            tstamp = mtime
130
        if until is None:
131
            modified = tstamp
132
        else:
133
            modified = self._get_pathstats(account)[2] # Overall last modification
134
            if mtime > modified:
135
                modified = mtime
136
        
137
        # Proper count.
138
        sql = 'select count(name) from (%s) where name glob ? and not name glob ?'
139
        sql = sql % self._sql_until(until)
140
        c = self.con.execute(sql, (account + '/*', account + '/*/*'))
141
        row = c.fetchone()
142
        count = row[0]
143
        
144
        meta = self._get_metadata(account, version_id)
145
        meta.update({'name': account, 'count': count, 'bytes': bytes})
146
        if modified:
147
            meta.update({'modified': modified})
148
        if until is not None:
149
            meta.update({'until_timestamp': tstamp})
150
        return meta
151
    
152
    def update_account_meta(self, user, account, meta, replace=False):
153
        """Update the metadata associated with the account."""
154
        
155
        logger.debug("update_account_meta: %s %s %s", account, meta, replace)
156
        if user != account:
157
            raise NotAllowedError
158
        self._put_metadata(user, account, meta, replace)
159
    
160
    def get_account_groups(self, user, account):
161
        """Return a dictionary with the user groups defined for this account."""
162
        
163
        logger.debug("get_account_groups: %s", account)
164
        if user != account:
165
            raise NotAllowedError
166
        return self._get_groups(account)
167
    
168
    def update_account_groups(self, user, account, groups, replace=False):
169
        """Update the groups associated with the account."""
170
        
171
        logger.debug("update_account_groups: %s %s %s", account, groups, replace)
172
        if user != account:
173
            raise NotAllowedError
174
        for k, v in groups.iteritems():
175
            if True in [False or ',' in x for x in v]:
176
                raise ValueError('Bad characters in groups')
177
        if replace:
178
            sql = 'delete from groups where account = ?'
179
            self.con.execute(sql, (account,))
180
        for k, v in groups.iteritems():
181
            if len(v) == 0:
182
                if not replace:
183
                    sql = 'delete from groups where account = ? and name = ?'
184
                    self.con.execute(sql, (account, k))
185
            else:
186
                sql = 'insert or replace into groups (account, name, users) values (?, ?, ?)'
187
                self.con.execute(sql, (account, k, ','.join(v)))
188
        self.con.commit()
189
    
190
    def put_account(self, user, account):
191
        """Create a new account with the given name."""
192
        
193
        logger.debug("put_account: %s", account)
194
        if user != account:
195
            raise NotAllowedError
196
        try:
197
            version_id, mtime = self._get_accountinfo(account)
198
        except NameError:
199
            pass
200
        else:
201
            raise NameError('Account already exists')
202
        version_id = self._put_version(account, user)
203
        self.con.commit()
204
    
205
    def delete_account(self, user, account):
206
        """Delete the account with the given name."""
207
        
208
        logger.debug("delete_account: %s", account)
209
        if user != account:
210
            raise NotAllowedError
211
        if self._get_pathcount(account) > 0:
212
            raise IndexError('Account is not empty')
213
        sql = 'delete from versions where name = ?'
214
        self.con.execute(sql, (path,))
215
        sql = 'delete from groups where name = ?'
216
        self.con.execute(sql, (account,))
217
        self.con.commit()
218
    
219
    def list_containers(self, user, account, marker=None, limit=10000, until=None):
220
        """Return a list of containers existing under an account."""
221
        
222
        logger.debug("list_containers: %s %s %s %s", account, marker, limit, until)
223
        if user != account:
224
            raise NotAllowedError
225
        return self._list_objects(account, '', '/', marker, limit, False, [], False, until)
226
    
227
    def get_container_meta(self, user, account, container, until=None):
228
        """Return a dictionary with the container metadata."""
229
        
230
        logger.debug("get_container_meta: %s %s %s", account, container, until)
231
        if user != account:
232
            raise NotAllowedError
233
        
234
        # TODO: Container meta for trash.
235
        path, version_id, mtime = self._get_containerinfo(account, container, until)
236
        count, bytes, tstamp = self._get_pathstats(path, until)
237
        if mtime > tstamp:
238
            tstamp = mtime
239
        if until is None:
240
            modified = tstamp
241
        else:
242
            modified = self._get_pathstats(path)[2] # Overall last modification
243
            if mtime > modified:
244
                modified = mtime
245
        
246
        meta = self._get_metadata(path, version_id)
247
        meta.update({'name': container, 'count': count, 'bytes': bytes, 'modified': modified})
248
        if until is not None:
249
            meta.update({'until_timestamp': tstamp})
250
        return meta
251
    
252
    def update_container_meta(self, user, account, container, meta, replace=False):
253
        """Update the metadata associated with the container."""
254
        
255
        logger.debug("update_container_meta: %s %s %s %s", account, container, meta, replace)
256
        if user != account:
257
            raise NotAllowedError
258
        path, version_id, mtime = self._get_containerinfo(account, container)
259
        self._put_metadata(user, path, meta, replace)
260
    
261
    def get_container_policy(self, user, account, container):
262
        """Return a dictionary with the container policy."""
263
        
264
        logger.debug("get_container_policy: %s %s", account, container)
265
        if user != account:
266
            raise NotAllowedError
267
        path = self._get_containerinfo(account, container)[0]
268
        return self._get_policy(path)
269
    
270
    def update_container_policy(self, user, account, container, policy, replace=False):
271
        """Update the policy associated with the account."""
272
        
273
        logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace)
274
        if user != account:
275
            raise NotAllowedError
276
        path = self._get_containerinfo(account, container)[0]
277
        self._check_policy(policy)
278
        if replace:
279
            for k, v in self.default_policy.iteritems():
280
                if k not in policy:
281
                    policy[k] = v
282
        for k, v in policy.iteritems():
283
            sql = 'insert or replace into policy (name, key, value) values (?, ?, ?)'
284
            self.con.execute(sql, (path, k, v))
285
        self.con.commit()
286
    
287
    def put_container(self, user, account, container, policy=None):
288
        """Create a new container with the given name."""
289
        
290
        logger.debug("put_container: %s %s %s", account, container, policy)
291
        if user != account:
292
            raise NotAllowedError
293
        try:
294
            path, version_id, mtime = self._get_containerinfo(account, container)
295
        except NameError:
296
            pass
297
        else:
298
            raise NameError('Container already exists')
299
        if policy:
300
            self._check_policy(policy)
301
        path = os.path.join(account, container)
302
        version_id = self._put_version(path, user)
303
        for k, v in self.default_policy.iteritems():
304
            if k not in policy:
305
                policy[k] = v
306
        for k, v in policy.iteritems():
307
            sql = 'insert or replace into policy (name, key, value) values (?, ?, ?)'
308
            self.con.execute(sql, (path, k, v))
309
        self.con.commit()
310
    
311
    def delete_container(self, user, account, container, until=None):
312
        """Delete the container with the given name."""
313
        
314
        logger.debug("delete_container: %s %s %s", account, container, until)
315
        if user != account:
316
            raise NotAllowedError
317
        path, version_id, mtime = self._get_containerinfo(account, container)
318
        
319
        if until is not None:
320
            sql = '''select version_id from versions where name like ? and tstamp <= ?'''
321
            c = self.con.execute(sql, (path + '/%', until))
322
            versions = [x[0] for x in c.fetchall()]
323
            for v in versions:
324
                sql = 'delete from hashmaps where version_id = ?'
325
                self.con.execute(sql, (v,))
326
                sql = 'delete from versions where version_id = ?'
327
                self.con.execute(sql, (v,))
328
            self.con.commit()
329
            return
330
        
331
        if self._get_pathcount(path) > 0:
332
            raise IndexError('Container is not empty')
333
        sql = 'delete from versions where name like ?' # May contain hidden trash items.
334
        self.con.execute(sql, (path + '/%',))
335
        sql = 'delete from policy where name = ?'
336
        self.con.execute(sql, (path,))
337
        self._copy_version(user, account, account, True, True) # New account version (for timestamp update).
338
    
339
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], trash=False, until=None):
340
        """Return a list of objects existing under a container."""
341
        
342
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, keys, trash, until)
343
        if user != account:
344
            raise NotAllowedError
345
        path, version_id, mtime = self._get_containerinfo(account, container, until)
346
        return self._list_objects(path, prefix, delimiter, marker, limit, virtual, keys, trash, until)
347
    
348
    def list_object_meta(self, user, account, container, trash=False, until=None):
349
        """Return a list with all the container's object meta keys."""
350
        
351
        logger.debug("list_object_meta: %s %s %s", account, container, until)
352
        if user != account:
353
            raise NotAllowedError
354
        path, version_id, mtime = self._get_containerinfo(account, container, until)
355
        sql = '''select distinct m.key from (%s) o, metadata m
356
                    where m.version_id = o.version_id and o.name like ?'''
357
        sql = sql % self._sql_until(until, trash)
358
        c = self.con.execute(sql, (path + '/%',))
359
        return [x[0] for x in c.fetchall()]
360
    
361
    def get_object_meta(self, user, account, container, name, version=None):
362
        """Return a dictionary with the object metadata."""
363
        
364
        logger.debug("get_object_meta: %s %s %s %s", account, container, name, version)
365
        self._can_read(user, account, container, name)
366
        path, version_id, muser, mtime, size = self._get_objectinfo(account, container, name, version)
367
        if version is None:
368
            modified = mtime
369
        else:
370
            modified = self._get_version(path, version)[2] # Overall last modification
371
        
372
        meta = self._get_metadata(path, version_id)
373
        meta.update({'name': name, 'bytes': size})
374
        meta.update({'version': version_id, 'version_timestamp': mtime})
375
        meta.update({'modified': modified, 'modified_by': muser})
376
        return meta
377
    
378
    def update_object_meta(self, user, account, container, name, meta, replace=False):
379
        """Update the metadata associated with the object."""
380
        
381
        logger.debug("update_object_meta: %s %s %s %s %s", account, container, name, meta, replace)
382
        self._can_write(user, account, container, name)
383
        path, version_id, muser, mtime, size = self._get_objectinfo(account, container, name)
384
        self._put_metadata(user, path, meta, replace)
385
    
386
    def get_object_permissions(self, user, account, container, name):
387
        """Return the path from which this object gets its permissions from,\
388
        along with a dictionary containing the permissions."""
389
        
390
        logger.debug("get_object_permissions: %s %s %s", account, container, name)
391
        self._can_read(user, account, container, name)
392
        path = self._get_objectinfo(account, container, name)[0]
393
        return self._get_permissions(path)
394
    
395
    def update_object_permissions(self, user, account, container, name, permissions):
396
        """Update the permissions associated with the object."""
397
        
398
        logger.debug("update_object_permissions: %s %s %s %s", account, container, name, permissions)
399
        if user != account:
400
            raise NotAllowedError
401
        path = self._get_objectinfo(account, container, name)[0]
402
        r, w = self._check_permissions(path, permissions)
403
        self._put_permissions(path, r, w)
404
    
405
    def get_object_public(self, user, account, container, name):
406
        """Return the public URL of the object if applicable."""
407
        
408
        logger.debug("get_object_public: %s %s %s", account, container, name)
409
        self._can_read(user, account, container, name)
410
        path = self._get_objectinfo(account, container, name)[0]
411
        if self._get_public(path):
412
            return '/public/' + path
413
        return None
414
    
415
    def update_object_public(self, user, account, container, name, public):
416
        """Update the public status of the object."""
417
        
418
        logger.debug("update_object_public: %s %s %s %s", account, container, name, public)
419
        self._can_write(user, account, container, name)
420
        path = self._get_objectinfo(account, container, name)[0]
421
        self._put_public(path, public)
422
    
423
    def get_object_hashmap(self, user, account, container, name, version=None):
424
        """Return the object's size and a list with partial hashes."""
425
        
426
        logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
427
        self._can_read(user, account, container, name)
428
        path, version_id, muser, mtime, size = self._get_objectinfo(account, container, name, version)
429
        sql = 'select block_id from hashmaps where version_id = ? order by pos asc'
430
        c = self.con.execute(sql, (version_id,))
431
        hashmap = [x[0] for x in c.fetchall()]
432
        return size, hashmap
433
    
434
    def update_object_hashmap(self, user, account, container, name, size, hashmap, meta={}, replace_meta=False, permissions=None):
435
        """Create/update an object with the specified size and partial hashes."""
436
        
437
        logger.debug("update_object_hashmap: %s %s %s %s %s", account, container, name, size, hashmap)
438
        if permissions is not None and user != account:
439
            raise NotAllowedError
440
        self._can_write(user, account, container, name)
441
        missing = []
442
        for i in range(len(hashmap)):
443
            sql = 'select count(*) from blocks where block_id = ?'
444
            c = self.con.execute(sql, (hashmap[i],))
445
            if c.fetchone()[0] == 0:
446
                missing.append(hashmap[i])
447
        if missing:
448
            ie = IndexError()
449
            ie.data = missing
450
            raise ie
451
        path = self._get_containerinfo(account, container)[0]
452
        path = os.path.join(path, name)
453
        if permissions is not None:
454
            r, w = self._check_permissions(path, permissions)
455
        src_version_id, dest_version_id = self._copy_version(user, path, path, not replace_meta, False)
456
        sql = 'update versions set size = ? where version_id = ?'
457
        self.con.execute(sql, (size, dest_version_id))
458
        # TODO: Check for block_id existence.
459
        for i in range(len(hashmap)):
460
            sql = 'insert or replace into hashmaps (version_id, pos, block_id) values (?, ?, ?)'
461
            self.con.execute(sql, (dest_version_id, i, hashmap[i]))
462
        for k, v in meta.iteritems():
463
            sql = 'insert or replace into metadata (version_id, key, value) values (?, ?, ?)'
464
            self.con.execute(sql, (dest_version_id, k, v))
465
        if permissions is not None:
466
            sql = 'insert or replace into permissions (name, read, write) values (?, ?, ?)'
467
            self.con.execute(sql, (path, r, w))
468
        self.con.commit()
469
    
470
    def copy_object(self, user, account, src_container, src_name, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None):
471
        """Copy an object's data and metadata."""
472
        
473
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s", account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version)
474
        if permissions is not None and user != account:
475
            raise NotAllowedError
476
        self._can_read(user, account, src_container, src_name)
477
        self._can_write(user, account, dest_container, dest_name)
478
        self._get_containerinfo(account, src_container)
479
        if src_version is None:
480
            src_path = self._get_objectinfo(account, src_container, src_name)[0]
481
        else:
482
            src_path = os.path.join(account, src_container, src_name)
483
        dest_path = self._get_containerinfo(account, dest_container)[0]
484
        dest_path = os.path.join(dest_path, dest_name)
485
        if permissions is not None:
486
            r, w = self._check_permissions(dest_path, permissions)
487
        src_version_id, dest_version_id = self._copy_version(user, src_path, dest_path, not replace_meta, True, src_version)
488
        for k, v in dest_meta.iteritems():
489
            sql = 'insert or replace into metadata (version_id, key, value) values (?, ?, ?)'
490
            self.con.execute(sql, (dest_version_id, k, v))
491
        if permissions is not None:
492
            sql = 'insert or replace into permissions (name, read, write) values (?, ?, ?)'
493
            self.con.execute(sql, (dest_path, r, w))
494
        self.con.commit()
495
    
496
    def move_object(self, user, account, src_container, src_name, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None):
497
        """Move an object's data and metadata."""
498
        
499
        logger.debug("move_object: %s %s %s %s %s %s %s %s", account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions)
500
        self.copy_object(user, account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions, None)
501
        self.delete_object(user, account, src_container, src_name)
502
    
503
    def delete_object(self, user, account, container, name, until=None):
504
        """Delete an object."""
505
        
506
        logger.debug("delete_object: %s %s %s %s", account, container, name, until)
507
        if user != account:
508
            raise NotAllowedError
509
        if until is None:
510
            path = self._get_objectinfo(account, container, name)[0]
511
            sql = 'select version_id from versions where name = ?'
512
            c = self.con.execute(sql, (path,))
513
        else:
514
            path = os.path.join(account, container, name)
515
            sql = '''select version_id from versions where name = ? and tstamp <= ?'''
516
            c = self.con.execute(sql, (path, until))
517
        versions = [x[0] for x in c.fetchall()]
518
        for v in versions:
519
            sql = 'delete from hashmaps where version_id = ?'
520
            self.con.execute(sql, (v,))
521
            sql = 'delete from versions where version_id = ?'
522
            self.con.execute(sql, (v,))
523
        
524
        # If no more normal versions exist, delete permissions/public.
525
        sql = 'select version_id from versions where name = ? and trash = 0'
526
        row = self.con.execute(sql, (path,)).fetchone()
527
        if row is None:
528
            self._del_sharing(path)
529
        self.con.commit()
530
    
531
    def trash_object(self, user, account, container, name):
532
        """Trash an object."""
533
        
534
        logger.debug("trash_object: %s %s %s", account, container, name)
535
        if user != account:
536
            raise NotAllowedError
537
        path, version_id, muser, mtime, size = self._get_objectinfo(account, container, name)
538
        src_version_id, dest_version_id = self._copy_version(user, path, path, True, True, version_id)
539
        sql = 'update versions set trash = 1 where version_id = ?'
540
        self.con.execute(sql, (dest_version_id,))
541
        self._del_sharing(path)
542
        self.con.commit()
543
    
544
    def untrash_object(self, user, account, container, name, version):
545
        """Untrash an object."""
546
        
547
        logger.debug("untrash_object: %s %s %s %s", account, container, name, version)
548
        if user != account:
549
            raise NotAllowedError
550
        
551
        path = os.path.join(account, container, name)
552
        sql = '''select version_id from versions where name = ? and version_id = ? and trash = 1'''
553
        c = self.con.execute(sql, (path, version))
554
        row = c.fetchone()
555
        if not row or not int(row[1]):
556
            raise NameError('Object not in trash')
557
        sql = 'update versions set until = ? where version_id = ?'
558
        self.con.execute(sql, (int(time.time()), version))
559
        self.con.commit()
560
    
561
    def list_versions(self, user, account, container, name):
562
        """Return a list of all (version, version_timestamp) tuples for an object."""
563
        
564
        logger.debug("list_versions: %s %s %s", account, container, name)
565
        self._can_read(user, account, container, name)
566
        path = os.path.join(account, container, name)
567
        sql = '''select distinct version_id, tstamp from versions where name = ? and trash = 0'''
568
        c = self.con.execute(sql, (path,))
569
        return [(int(x[0]), int(x[1])) for x in c.fetchall()]
570
    
571
    def get_block(self, hash):
572
        """Return a block's data."""
573
        
574
        logger.debug("get_block: %s", hash)
575
        c = self.con.execute('select data from blocks where block_id = ?', (hash,))
576
        row = c.fetchone()
577
        if row:
578
            return str(row[0])
579
        else:
580
            raise NameError('Block does not exist')
581
    
582
    def put_block(self, data):
583
        """Create a block and return the hash."""
584
        
585
        logger.debug("put_block: %s", len(data))
586
        h = hashlib.new(self.hash_algorithm)
587
        h.update(data.rstrip('\x00'))
588
        hash = h.hexdigest()
589
        sql = 'insert or ignore into blocks (block_id, data) values (?, ?)'
590
        self.con.execute(sql, (hash, buffer(data)))
591
        self.con.commit()
592
        return hash
593
    
594
    def update_block(self, hash, data, offset=0):
595
        """Update a known block and return the hash."""
596
        
597
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
598
        if offset == 0 and len(data) == self.block_size:
599
            return self.put_block(data)
600
        src_data = self.get_block(hash)
601
        bs = self.block_size
602
        if offset < 0 or offset > bs or offset + len(data) > bs:
603
            raise IndexError('Offset or data outside block limits')
604
        dest_data = src_data[:offset] + data + src_data[offset + len(data):]
605
        return self.put_block(dest_data)
606
    
607
    def _sql_until(self, until=None, trash=False):
608
        """Return the sql to get the latest versions until the timestamp given."""
609
        
610
        if until is None:
611
            until = int(time.time())
612
        if not trash:
613
            sql = '''select version_id, name, tstamp, size from versions v
614
                        where version_id = (select max(version_id) from versions
615
                                            where v.name = name and tstamp <= ?)
616
                        and trash = 0'''
617
            return sql % (until,)
618
        else:
619
            sql = '''select version_id, name, tstamp, size from versions v
620
                        where trash = 1 and tstamp <= ? and (until is null or until > ?)'''
621
            return sql % (until, until)
622
    
623
    def _get_pathstats(self, path, until=None):
624
        """Return count, sum of size and latest timestamp of everything under path (latest versions/no trash)."""
625
        
626
        sql = 'select count(version_id), total(size) from (%s) where name like ?'
627
        sql = sql % self._sql_until(until)
628
        c = self.con.execute(sql, (path + '/%',))
629
        total_count, total_size = c.fetchone()
630
        sql = 'select max(tstamp) from versions where name like ? and tstamp <= ?' # Include trash actions.
631
        c = self.con.execute(sql, (path + '/%', until))
632
        row = c.fetchone()
633
        tstamp = row[0] if row[0] is not None else 0
634
        return int(total_count), int(total_size), int(tstamp)
635
    
636
    def _get_pathcount(self, path):
637
        """Return count of everything under path (including versions/trash)."""
638
        
639
        sql = 'select count(version_id) from versions where name like ? and until is null'
640
        c = self.con.execute(sql, (path + '/%',))
641
        row = c.fetchone()
642
        return int(row[0])
643
    
644
    def _get_version(self, path, version=None):
645
        if version is None:
646
            sql = '''select version_id, user, strftime('%s', tstamp), size, hide from versions where name = ?
647
                        order by version_id desc limit 1'''
648
            c = self.con.execute(sql, (path,))
649
            row = c.fetchone()
650
            if not row or int(row[4]):
651
                raise NameError('Object does not exist')
652
        else:
653
            # The database (sqlite) will not complain if the version is not an integer.
654
            sql = '''select version_id, user, strftime('%s', tstamp), size from versions where name = ?
655
                        and version_id = ?'''
656
            c = self.con.execute(sql, (path, version))
657
            row = c.fetchone()
658
            if not row:
659
                raise IndexError('Version does not exist')
660
        return str(row[0]), str(row[1]), int(row[2]), int(row[3])
661
    
662
    def _put_version(self, path, user, size=0):
663
        tstamp = int(time.time())
664
        sql = 'insert into versions (name, user, tstamp, size) values (?, ?, ?, ?)'
665
        id = self.con.execute(sql, (path, user, tstamp, size)).lastrowid
666
        self.con.commit()
667
        return str(id)
668
    
669
    def _copy_version(self, user, src_path, dest_path, copy_meta=True, copy_data=True, src_version=None):
670
        if src_version is not None:
671
            src_version_id, muser, mtime, size = self._get_version(src_path, src_version)
672
        else:
673
            # Latest or create from scratch.
674
            try:
675
                src_version_id, muser, mtime, size = self._get_version(src_path)
676
            except NameError:
677
                src_version_id = None
678
                size = 0
679
        if not copy_data:
680
            size = 0
681
        dest_version_id = self._put_version(dest_path, user, size)
682
        if copy_meta and src_version_id is not None:
683
            sql = 'insert into metadata select %s, key, value from metadata where version_id = ?'
684
            sql = sql % dest_version_id
685
            self.con.execute(sql, (src_version_id,))
686
        if copy_data and src_version_id is not None:
687
            sql = 'insert into hashmaps select %s, pos, block_id from hashmaps where version_id = ?'
688
            sql = sql % dest_version_id
689
            self.con.execute(sql, (src_version_id,))
690
        self.con.commit()
691
        return src_version_id, dest_version_id
692
    
693
    def _get_versioninfo(self, account, container, name, until=None):
694
        """Return path, latest version, associated timestamp and size until the timestamp given."""
695
        
696
        p = (account, container, name)
697
        try:
698
            p = p[:p.index(None)]
699
        except ValueError:
700
            pass
701
        path = os.path.join(*p)
702
        sql = '''select version_id, tstamp, size from (%s) where name = ?'''
703
        sql = sql % self._sql_until(until)
704
        c = self.con.execute(sql, (path,))
705
        row = c.fetchone()
706
        if row is None:
707
            raise NameError('Path does not exist')
708
        return path, str(row[0]), int(row[1]), int(row[2])
709
    
710
    def _get_accountinfo(self, account, until=None):
711
        try:
712
            path, version_id, mtime, size = self._get_versioninfo(account, None, None, until)
713
            return version_id, mtime
714
        except:
715
            raise NameError('Account does not exist')
716
    
717
    def _get_containerinfo(self, account, container, until=None):
718
        try:
719
            path, version_id, mtime, size = self._get_versioninfo(account, container, None, until)
720
            return path, version_id, mtime
721
        except:
722
            raise NameError('Container does not exist')
723
    
724
    def _get_objectinfo(self, account, container, name, version=None):
725
        path = os.path.join(account, container, name)
726
        version_id, muser, mtime, size = self._get_version(path, version)
727
        return path, version_id, muser, mtime, size
728
    
729
    def _get_metadata(self, path, version):
730
        sql = 'select key, value from metadata where version_id = ?'
731
        c = self.con.execute(sql, (version,))
732
        return dict(c.fetchall())
733
    
734
    def _put_metadata(self, user, path, meta, replace=False):
735
        """Create a new version and store metadata."""
736
        
737
        src_version_id, dest_version_id = self._copy_version(user, path, path, not replace, True)
738
        for k, v in meta.iteritems():
739
            if not replace and v == '':
740
                sql = 'delete from metadata where version_id = ? and key = ?'
741
                self.con.execute(sql, (dest_version_id, k))
742
            else:
743
                sql = 'insert or replace into metadata (version_id, key, value) values (?, ?, ?)'
744
                self.con.execute(sql, (dest_version_id, k, v))
745
        self.con.commit()
746
    
747
    def _get_groups(self, account):
748
        sql = 'select name, users from groups where account = ?'
749
        c = self.con.execute(sql, (account,))
750
        return dict([(x[0], x[1].split(',')) for x in c.fetchall()])
751
    
752
    def _check_policy(self, policy):
753
        for k in policy.keys():
754
            if policy[k] == '':
755
                policy[k] = self.default_policy.get(k)
756
        for k, v in policy.iteritems():
757
            if k == 'quota':
758
                q = int(v) # May raise ValueError.
759
                if q < 0:
760
                    raise ValueError
761
            elif k == 'versioning':
762
                if v not in ['auto', 'manual', 'none']:
763
                    raise ValueError
764
            else:
765
                raise ValueError
766
    
767
    def _get_policy(self, path):
768
        sql = 'select key, value from policy where name = ?'
769
        c = self.con.execute(sql, (path,))
770
        return dict(c.fetchall())
771
    
772
    def _is_allowed(self, user, account, container, name, op='read'):
773
        if user == account:
774
            return True
775
        path = os.path.join(account, container, name)
776
        if op == 'read' and self._get_public(path):
777
            return True
778
        perm_path, perms = self._get_permissions(path)
779
        
780
        # Expand groups.
781
        for x in ('read', 'write'):
782
            g_perms = []
783
            for y in perms.get(x, []):
784
                groups = self._get_groups(account)
785
                if y in groups: #it's a group
786
                    for g_name in groups[y]:
787
                        g_perms.append(g_name)
788
                else: #it's a user
789
                    g_perms.append(y)
790
            perms[x] = g_perms
791
        
792
        if op == 'read' and user in perms.get('read', []):
793
            return True
794
        if user in perms.get('write', []):
795
            return True
796
        return False
797
    
798
    def _can_read(self, user, account, container, name):
799
        if not self._is_allowed(user, account, container, name, 'read'):
800
            raise NotAllowedError
801
    
802
    def _can_write(self, user, account, container, name):
803
        if not self._is_allowed(user, account, container, name, 'write'):
804
            raise NotAllowedError
805
    
806
    def _check_permissions(self, path, permissions):
807
        # Check for existing permissions.
808
        sql = '''select name from permissions
809
                    where name != ? and (name like ? or ? like name || ?)'''
810
        c = self.con.execute(sql, (path, path + '%', path, '%'))
811
        row = c.fetchone()
812
        if row:
813
            ae = AttributeError()
814
            ae.data = row[0]
815
            raise ae
816
        
817
        # Format given permissions.
818
        if len(permissions) == 0:
819
            return '', ''
820
        r = permissions.get('read', [])
821
        w = permissions.get('write', [])
822
        if True in [False or ',' in x for x in r]:
823
            raise ValueError('Bad characters in read permissions')
824
        if True in [False or ',' in x for x in w]:
825
            raise ValueError('Bad characters in write permissions')
826
        return ','.join(r), ','.join(w)
827
    
828
    def _get_permissions(self, path):
829
        # Check for permissions at path or above.
830
        sql = 'select name, read, write from permissions where ? like name || ?'
831
        c = self.con.execute(sql, (path, '%'))
832
        row = c.fetchone()
833
        if not row:
834
            return path, {}
835
        
836
        name, r, w = row
837
        ret = {}
838
        if w != '':
839
            ret['write'] = w.split(',')
840
        if r != '':
841
            ret['read'] = r.split(',')
842
        return name, ret
843
    
844
    def _put_permissions(self, path, r, w):
845
        if r == '' and w == '':
846
            sql = 'delete from permissions where name = ?'
847
            self.con.execute(sql, (path,))
848
        else:
849
            sql = 'insert or replace into permissions (name, read, write) values (?, ?, ?)'
850
            self.con.execute(sql, (path, r, w))
851
        self.con.commit()
852
    
853
    def _get_public(self, path):
854
        sql = 'select name from public where name = ?'
855
        c = self.con.execute(sql, (path,))
856
        row = c.fetchone()
857
        if not row:
858
            return False
859
        return True
860
    
861
    def _put_public(self, path, public):
862
        if not public:
863
            sql = 'delete from public where name = ?'
864
        else:
865
            sql = 'insert or replace into public (name) values (?)'
866
        self.con.execute(sql, (path,))
867
        self.con.commit()
868
    
869
    def _del_sharing(self, path):
870
        sql = 'delete from permissions where name = ?'
871
        self.con.execute(sql, (path,))
872
        sql = 'delete from public where name = ?'
873
        self.con.execute(sql, (path,))
874
        self.con.commit()
875
    
876
    def _list_objects(self, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], trash=False, until=None):
877
        cont_prefix = path + '/'
878
        
879
        if keys and len(keys) > 0:
880
            sql = '''select distinct o.name, o.version_id from (%s) o, metadata m where o.name like ? and
881
                        m.version_id = o.version_id and m.key in (%s) order by o.name'''
882
            sql = sql % (self._sql_until(until, trash), ', '.join('?' * len(keys)))
883
            param = (cont_prefix + prefix + '%',) + tuple(keys)
884
        else:
885
            sql = 'select name, version_id from (%s) where name like ? order by name'
886
            sql = sql % (self._sql_until(until, trash),)
887
            param = (cont_prefix + prefix + '%',)
888
        c = self.con.execute(sql, param)
889
        objects = [(x[0][len(cont_prefix):], x[1]) for x in c.fetchall()]
890
        if delimiter:
891
            pseudo_objects = []
892
            for x in objects:
893
                pseudo_name = x[0]
894
                i = pseudo_name.find(delimiter, len(prefix))
895
                if not virtual:
896
                    # If the delimiter is not found, or the name ends
897
                    # with the delimiter's first occurence.
898
                    if i == -1 or len(pseudo_name) == i + len(delimiter):
899
                        pseudo_objects.append(x)
900
                else:
901
                    # If the delimiter is found, keep up to (and including) the delimiter.
902
                    if i != -1:
903
                        pseudo_name = pseudo_name[:i + len(delimiter)]
904
                    if pseudo_name not in [y[0] for y in pseudo_objects]:
905
                        if pseudo_name == x[0]:
906
                            pseudo_objects.append(x)
907
                        else:
908
                            pseudo_objects.append((pseudo_name, None))
909
            objects = pseudo_objects
910
        
911
        start = 0
912
        if marker:
913
            try:
914
                start = [x[0] for x in objects].index(marker) + 1
915
            except ValueError:
916
                pass
917
        if not limit or limit > 10000:
918
            limit = 10000
919
        return objects[start:start + limit]