Statistics
| Branch: | Tag: | Revision:

root / pithos / backends / simple.py @ c9af0703

History | View | Annotate | Download (22.7 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
# 
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
# 
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
# 
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
# 
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
# 
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import os
35
import time
36
import sqlite3
37
import logging
38
import types
39
import hashlib
40
import shutil
41
import pickle
42

    
43
from base import BaseBackend
44

    
45

    
46
logger = logging.getLogger(__name__)
47

    
48

    
49
class SimpleBackend(BaseBackend):
50
    """A simple backend.
51
    
52
    Uses SQLite for storage.
53
    """
54
    
55
    # TODO: Automatic/manual clean-up after a time interval.
56
    
57
    def __init__(self, db):
58
        self.hash_algorithm = 'sha1'
59
        self.block_size = 128 * 1024 # 128KB
60
        
61
        basepath = os.path.split(db)[0]
62
        if basepath and not os.path.exists(basepath):
63
            os.makedirs(basepath)
64
        
65
        self.con = sqlite3.connect(db)
66
        sql = '''create table if not exists versions (
67
                    version_id integer primary key,
68
                    name text,
69
                    tstamp datetime default current_timestamp,
70
                    size integer default 0,
71
                    hide integer default 0)'''
72
        self.con.execute(sql)
73
        sql = '''create table if not exists metadata (
74
                    version_id integer, key text, value text, primary key (version_id, key))'''
75
        self.con.execute(sql)
76
        sql = '''create table if not exists blocks (
77
                    block_id text, data blob, primary key (block_id))'''
78
        self.con.execute(sql)
79
        sql = '''create table if not exists hashmaps (
80
                    version_id integer, pos integer, block_id text, primary key (version_id, pos))'''
81
        self.con.execute(sql)
82
        self.con.commit()
83
    
84
    def delete_account(self, user, account):
85
        """Delete the account with the given name."""
86
        
87
        logger.debug("delete_account: %s", account)
88
        count, bytes, tstamp = self._get_pathstats(account)
89
        if count > 0:
90
            raise IndexError('Account is not empty')
91
        self._del_path(account) # Point of no return.
92
    
93
    def get_account_meta(self, user, account, until=None):
94
        """Return a dictionary with the account metadata."""
95
        
96
        logger.debug("get_account_meta: %s %s", account, until)
97
        try:
98
            version_id, mtime = self._get_accountinfo(account, until)
99
        except NameError:
100
            version_id = None
101
            mtime = 0
102
        count, bytes, tstamp = self._get_pathstats(account, until)
103
        if mtime > tstamp:
104
            tstamp = mtime
105
        if until is None:
106
            modified = tstamp
107
        else:
108
            modified = self._get_pathstats(account)[2] # Overall last modification
109
            if mtime > modified:
110
                modified = mtime
111
        
112
        # Proper count.
113
        sql = 'select count(name) from (%s) where name glob ? and not name glob ?'
114
        sql = sql % self._sql_until(until)
115
        c = self.con.execute(sql, (account + '/*', account + '/*/*'))
116
        row = c.fetchone()
117
        count = row[0]
118
        
119
        meta = self._get_metadata(account, version_id)
120
        meta.update({'name': account, 'count': count, 'bytes': bytes})
121
        if modified:
122
            meta.update({'modified': modified})
123
        if until is not None:
124
            meta.update({'until_timestamp': tstamp})
125
        return meta
126
    
127
    def update_account_meta(self, user, account, meta, replace=False):
128
        """Update the metadata associated with the account."""
129
        
130
        logger.debug("update_account_meta: %s %s %s", account, meta, replace)
131
        self._put_metadata(account, meta, replace)
132
    
133
    def list_containers(self, user, account, marker=None, limit=10000, until=None):
134
        """Return a list of containers existing under an account."""
135
        
136
        logger.debug("list_containers: %s %s %s %s", account, marker, limit, until)
137
        return self._list_objects(account, '', '/', marker, limit, False, [], until)
138
    
139
    def put_container(self, user, account, container):
140
        """Create a new container with the given name."""
141
        
142
        logger.debug("put_container: %s %s", account, container)
143
        try:
144
            path, version_id, mtime = self._get_containerinfo(account, container)
145
        except NameError:
146
            path = os.path.join(account, container)
147
            version_id = self._put_version(path)
148
        else:
149
            raise NameError('Container already exists')
150
    
151
    def delete_container(self, user, account, container):
152
        """Delete the container with the given name."""
153
        
154
        logger.debug("delete_container: %s %s", account, container)
155
        path, version_id, mtime = self._get_containerinfo(account, container)
156
        count, bytes, tstamp = self._get_pathstats(path)
157
        if count > 0:
158
            raise IndexError('Container is not empty')
159
        self._del_path(path) # Point of no return.
160
        self._copy_version(account, account, True, True) # New account version.
161
    
162
    def get_container_meta(self, user, account, container, until=None):
163
        """Return a dictionary with the container metadata."""
164
        
165
        logger.debug("get_container_meta: %s %s %s", account, container, until)
166
        
167
        path, version_id, mtime = self._get_containerinfo(account, container, until)
168
        count, bytes, tstamp = self._get_pathstats(path, until)
169
        if mtime > tstamp:
170
            tstamp = mtime
171
        if until is None:
172
            modified = tstamp
173
        else:
174
            modified = self._get_pathstats(path)[2] # Overall last modification
175
            if mtime > modified:
176
                modified = mtime
177
        
178
        meta = self._get_metadata(path, version_id)
179
        meta.update({'name': container, 'count': count, 'bytes': bytes, 'modified': modified})
180
        if until is not None:
181
            meta.update({'until_timestamp': tstamp})
182
        return meta
183
    
184
    def update_container_meta(self, user, account, container, meta, replace=False):
185
        """Update the metadata associated with the container."""
186
        
187
        logger.debug("update_container_meta: %s %s %s %s", account, container, meta, replace)
188
        path, version_id, mtime = self._get_containerinfo(account, container)
189
        self._put_metadata(path, meta, replace)
190
    
191
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], until=None):
192
        """Return a list of objects existing under a container."""
193
        
194
        logger.debug("list_objects: %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, until)
195
        path, version_id, mtime = self._get_containerinfo(account, container, until)
196
        return self._list_objects(path, prefix, delimiter, marker, limit, virtual, keys, until)
197
    
198
    def list_object_meta(self, user, account, container, until=None):
199
        """Return a list with all the container's object meta keys."""
200
        
201
        logger.debug("list_object_meta: %s %s %s", account, container, until)
202
        path, version_id, mtime = self._get_containerinfo(account, container, until)
203
        sql = '''select distinct m.key from (%s) o, metadata m
204
                    where m.version_id = o.version_id and o.name like ?'''
205
        sql = sql % self._sql_until(until)
206
        c = self.con.execute(sql, (path + '/%',))
207
        return [x[0] for x in c.fetchall()]
208
    
209
    def get_object_meta(self, user, account, container, name, version=None):
210
        """Return a dictionary with the object metadata."""
211
        
212
        logger.debug("get_object_meta: %s %s %s %s", account, container, name, version)
213
        path, version_id, mtime, size = self._get_objectinfo(account, container, name, version)
214
        if version is None:
215
            modified = mtime
216
        else:
217
            modified = self._get_version(path)[1] # Overall last modification
218
        
219
        meta = self._get_metadata(path, version_id)
220
        meta.update({'name': name, 'bytes': size, 'version': version_id, 'version_timestamp': mtime, 'modified': modified})
221
        return meta
222
    
223
    def update_object_meta(self, user, account, container, name, meta, replace=False):
224
        """Update the metadata associated with the object."""
225
        
226
        logger.debug("update_object_meta: %s %s %s %s %s", account, container, name, meta, replace)
227
        path, version_id, mtime, size = self._get_objectinfo(account, container, name)
228
        self._put_metadata(path, meta, replace)
229
    
230
    def get_object_hashmap(self, user, account, container, name, version=None):
231
        """Return the object's size and a list with partial hashes."""
232
        
233
        logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
234
        path, version_id, mtime, size = self._get_objectinfo(account, container, name, version)
235
        sql = 'select block_id from hashmaps where version_id = ? order by pos asc'
236
        c = self.con.execute(sql, (version_id,))
237
        hashmap = [x[0] for x in c.fetchall()]
238
        return size, hashmap
239
    
240
    def update_object_hashmap(self, user, account, container, name, size, hashmap, meta={}, replace_meta=False):
241
        """Create/update an object with the specified size and partial hashes."""
242
        
243
        logger.debug("update_object_hashmap: %s %s %s %s %s", account, container, name, size, hashmap)
244
        path = self._get_containerinfo(account, container)[0]
245
        path = os.path.join(path, name)
246
        src_version_id, dest_version_id = self._copy_version(path, path, not replace_meta, False)
247
        sql = 'update versions set size = ? where version_id = ?'
248
        self.con.execute(sql, (size, dest_version_id))
249
        # TODO: Check for block_id existence.
250
        for i in range(len(hashmap)):
251
            sql = 'insert or replace into hashmaps (version_id, pos, block_id) values (?, ?, ?)'
252
            self.con.execute(sql, (dest_version_id, i, hashmap[i]))
253
        for k, v in meta.iteritems():
254
            sql = 'insert or replace into metadata (version_id, key, value) values (?, ?, ?)'
255
            self.con.execute(sql, (dest_version_id, k, v))
256
        self.con.commit()
257
    
258
    def copy_object(self, user, account, src_container, src_name, dest_container, dest_name, dest_meta={}, replace_meta=False, src_version=None):
259
        """Copy an object's data and metadata."""
260
        
261
        logger.debug("copy_object: %s %s %s %s %s %s %s %s", account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, src_version)
262
        if src_version is None:
263
            src_path = self._get_objectinfo(account, src_container, src_name)[0]
264
        else:
265
            src_path = os.path.join(account, src_container, src_name)
266
        dest_path = self._get_containerinfo(account, dest_container)[0]
267
        dest_path = os.path.join(dest_path, dest_name)
268
        src_version_id, dest_version_id = self._copy_version(src_path, dest_path, not replace_meta, True, src_version)
269
        for k, v in dest_meta.iteritems():
270
            sql = 'insert or replace into metadata (version_id, key, value) values (?, ?, ?)'
271
            self.con.execute(sql, (dest_version_id, k, v))
272
        self.con.commit()
273
    
274
    def move_object(self, user, account, src_container, src_name, dest_container, dest_name, dest_meta={}, replace_meta=False, src_version=None):
275
        """Move an object's data and metadata."""
276
        
277
        logger.debug("move_object: %s %s %s %s, %s %s %s %s", account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, src_version)
278
        self.copy_object(user, account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, src_version)
279
        self.delete_object(user, account, src_container, src_name)
280
    
281
    def delete_object(self, user, account, container, name):
282
        """Delete an object."""
283
        
284
        logger.debug("delete_object: %s %s %s", account, container, name)
285
        path, version_id, mtime, size = self._get_objectinfo(account, container, name)
286
        self._put_version(path, 0, 1)
287
    
288
    def list_versions(self, user, account, container, name):
289
        """Return a list of all (version, version_timestamp) tuples for an object."""
290
        
291
        logger.debug("list_versions: %s %s %s", account, container, name)
292
        # This will even show deleted versions.
293
        path = os.path.join(account, container, name)
294
        sql = '''select distinct version_id, strftime('%s', tstamp) from versions where name = ?'''
295
        c = self.con.execute(sql, (path,))
296
        return [(int(x[0]), int(x[1])) for x in c.fetchall()]
297
    
298
    def get_block(self, hash):
299
        """Return a block's data."""
300
        
301
        logger.debug("get_block: %s", hash)
302
        c = self.con.execute('select data from blocks where block_id = ?', (hash,))
303
        row = c.fetchone()
304
        if row:
305
            return str(row[0])
306
        else:
307
            raise NameError('Block does not exist')
308
    
309
    def put_block(self, data):
310
        """Create a block and return the hash."""
311
        
312
        logger.debug("put_block: %s", len(data))
313
        h = hashlib.new(self.hash_algorithm)
314
        h.update(data.rstrip('\x00'))
315
        hash = h.hexdigest()
316
        sql = 'insert or ignore into blocks (block_id, data) values (?, ?)'
317
        self.con.execute(sql, (hash, buffer(data)))
318
        self.con.commit()
319
        return hash
320
    
321
    def update_block(self, hash, data, offset=0):
322
        """Update a known block and return the hash."""
323
        
324
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
325
        if offset == 0 and len(data) == self.block_size:
326
            return self.put_block(data)
327
        src_data = self.get_block(hash)
328
        bs = self.block_size
329
        if offset < 0 or offset > bs or offset + len(data) > bs:
330
            raise IndexError('Offset or data outside block limits')
331
        dest_data = src_data[:offset] + data + src_data[offset + len(data):]
332
        return self.put_block(dest_data)
333
    
334
    def _sql_until(self, until=None):
335
        """Return the sql to get the latest versions until the timestamp given."""
336
        if until is None:
337
            until = int(time.time())
338
        sql = '''select version_id, name, strftime('%s', tstamp) as tstamp, size from versions v
339
                    where version_id = (select max(version_id) from versions
340
                                        where v.name = name and tstamp <= datetime(%s, 'unixepoch'))
341
                    and hide = 0'''
342
        return sql % ('%s', until)
343
    
344
    def _get_pathstats(self, path, until=None):
345
        """Return count and sum of size of everything under path and latest timestamp."""
346
        
347
        sql = 'select count(version_id), total(size), max(tstamp) from (%s) where name like ?'
348
        sql = sql % self._sql_until(until)
349
        c = self.con.execute(sql, (path + '/%',))
350
        row = c.fetchone()
351
        tstamp = row[2] if row[2] is not None else 0
352
        return int(row[0]), int(row[1]), int(tstamp)
353
    
354
    def _get_version(self, path, version=None):
355
        if version is None:            
356
            sql = '''select version_id, strftime('%s', tstamp), size, hide from versions where name = ?
357
                        order by version_id desc limit 1'''
358
            c = self.con.execute(sql, (path,))
359
            row = c.fetchone()
360
            if not row or int(row[3]):
361
                raise NameError('Object does not exist')
362
        else:
363
            sql = '''select version_id, strftime('%s', tstamp), size from versions where name = ?
364
                        and version_id = ?'''
365
            c = self.con.execute(sql, (path, version))
366
            row = c.fetchone()
367
            if not row:
368
                raise IndexError('Version does not exist')
369
        return str(row[0]), int(row[1]), int(row[2])
370
    
371
    def _put_version(self, path, size=0, hide=0):
372
        sql = 'insert into versions (name, size, hide) values (?, ?, ?)'
373
        id = self.con.execute(sql, (path, size, hide)).lastrowid
374
        self.con.commit()
375
        return str(id)
376
    
377
    def _copy_version(self, src_path, dest_path, copy_meta=True, copy_data=True, src_version=None):
378
        if src_version is not None:
379
            src_version_id, mtime, size = self._get_version(src_path, src_version)
380
        else:
381
            # Latest or create from scratch.
382
            try:
383
                src_version_id, mtime, size = self._get_version(src_path)
384
            except NameError:
385
                src_version_id = None
386
                size = 0
387
        if not copy_data:
388
            size = 0
389
        dest_version_id = self._put_version(dest_path, size)
390
        if copy_meta and src_version_id is not None:
391
            sql = 'insert into metadata select %s, key, value from metadata where version_id = ?'
392
            sql = sql % dest_version_id
393
            self.con.execute(sql, (src_version_id,))
394
        if copy_data and src_version_id is not None:
395
            sql = 'insert into hashmaps select %s, pos, block_id from hashmaps where version_id = ?'
396
            sql = sql % dest_version_id
397
            self.con.execute(sql, (src_version_id,))
398
        self.con.commit()
399
        return src_version_id, dest_version_id
400
    
401
    def _get_versioninfo(self, account, container, name, until=None):
402
        """Return path, latest version, associated timestamp and size until the timestamp given."""
403
        
404
        p = (account, container, name)
405
        try:
406
            p = p[:p.index(None)]
407
        except ValueError:
408
            pass
409
        path = os.path.join(*p)
410
        sql = '''select version_id, tstamp, size from (%s) where name = ?'''
411
        sql = sql % self._sql_until(until)
412
        c = self.con.execute(sql, (path,))
413
        row = c.fetchone()
414
        if row is None:
415
            raise NameError('Path does not exist')
416
        return path, str(row[0]), int(row[1]), int(row[2])
417
    
418
    def _get_accountinfo(self, account, until=None):
419
        try:
420
            path, version_id, mtime, size = self._get_versioninfo(account, None, None, until)
421
            return version_id, mtime
422
        except:
423
            raise NameError('Account does not exist')
424
    
425
    def _get_containerinfo(self, account, container, until=None):
426
        try:
427
            path, version_id, mtime, size = self._get_versioninfo(account, container, None, until)
428
            return path, version_id, mtime
429
        except:
430
            raise NameError('Container does not exist')
431
    
432
    def _get_objectinfo(self, account, container, name, version=None):
433
        path = os.path.join(account, container, name)
434
        version_id, mtime, size = self._get_version(path, version)
435
        return path, version_id, mtime, size
436
    
437
    def _get_metadata(self, path, version):
438
        sql = 'select key, value from metadata where version_id = ?'
439
        c = self.con.execute(sql, (version,))
440
        return dict(c.fetchall())
441
    
442
    def _put_metadata(self, path, meta, replace=False):
443
        """Create a new version and store metadata."""
444
        
445
        src_version_id, dest_version_id = self._copy_version(path, path, not replace, True)
446
        for k, v in meta.iteritems():
447
            sql = 'insert or replace into metadata (version_id, key, value) values (?, ?, ?)'
448
            self.con.execute(sql, (dest_version_id, k, v))
449
        self.con.commit()
450
    
451
    def _list_objects(self, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], until=None):
452
        cont_prefix = path + '/'
453
        if keys and len(keys) > 0:
454
            sql = '''select distinct o.name, o.version_id from (%s) o, metadata m where o.name like ? and
455
                        m.version_id = o.version_id and m.key in (%s) order by o.name'''
456
            sql = sql % (self._sql_until(until), ', '.join('?' * len(keys)))
457
            param = (cont_prefix + prefix + '%',) + tuple(keys)
458
        else:
459
            sql = 'select name, version_id from (%s) where name like ? order by name'
460
            sql = sql % self._sql_until(until)
461
            param = (cont_prefix + prefix + '%',)
462
        c = self.con.execute(sql, param)
463
        objects = [(x[0][len(cont_prefix):], x[1]) for x in c.fetchall()]
464
        if delimiter:
465
            pseudo_objects = []
466
            for x in objects:
467
                pseudo_name = x[0]
468
                i = pseudo_name.find(delimiter, len(prefix))
469
                if not virtual:
470
                    # If the delimiter is not found, or the name ends
471
                    # with the delimiter's first occurence.
472
                    if i == -1 or len(pseudo_name) == i + len(delimiter):
473
                        pseudo_objects.append(x)
474
                else:
475
                    # If the delimiter is found, keep up to (and including) the delimiter.
476
                    if i != -1:
477
                        pseudo_name = pseudo_name[:i + len(delimiter)]
478
                    if pseudo_name not in [y[0] for y in pseudo_objects]:
479
                        if pseudo_name == x[0]:
480
                            pseudo_objects.append(x)
481
                        else:
482
                            pseudo_objects.append((pseudo_name, None))
483
            objects = pseudo_objects
484
        
485
        start = 0
486
        if marker:
487
            try:
488
                start = [x[0] for x in objects].index(marker) + 1
489
            except ValueError:
490
                pass
491
        if not limit or limit > 10000:
492
            limit = 10000
493
        return objects[start:start + limit]
494
    
495
    def _del_path(self, path):
496
        sql = '''delete from hashmaps where version_id in
497
                    (select version_id from versions where name = ?)'''
498
        self.con.execute(sql, (path,))
499
        sql = '''delete from metadata where version_id in
500
                    (select version_id from versions where name = ?)'''
501
        self.con.execute(sql, (path,))
502
        sql = '''delete from versions where name = ?'''
503
        self.con.execute(sql, (path,))
504
        self.con.commit()