Statistics
| Branch: | Tag: | Revision:

root / pithos / backends / simple.py @ 31a1c80d

History | View | Annotate | Download (22.2 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
# 
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
# 
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
# 
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
# 
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
# 
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import os
35
import time
36
import sqlite3
37
import logging
38
import types
39
import hashlib
40
import shutil
41
import pickle
42

    
43
from base import BaseBackend
44

    
45

    
46
logger = logging.getLogger(__name__)
47

    
48

    
49
class SimpleBackend(BaseBackend):
50
    """A simple backend.
51
    
52
    Uses SQLite for storage.
53
    """
54
    
55
    # TODO: Automatic/manual clean-up after a time interval.
56
    
57
    def __init__(self, db):
58
        self.hash_algorithm = 'sha1'
59
        self.block_size = 128 * 1024 # 128KB
60
        
61
        basepath = os.path.split(db)[0]
62
        if basepath and not os.path.exists(basepath):
63
            os.makedirs(basepath)
64
        
65
        self.con = sqlite3.connect(db)
66
        sql = '''create table if not exists versions (
67
                    version_id integer primary key,
68
                    name text,
69
                    tstamp datetime default current_timestamp,
70
                    size integer default 0,
71
                    hide integer default 0)'''
72
        self.con.execute(sql)
73
        sql = '''create table if not exists metadata (
74
                    version_id integer, key text, value text, primary key (version_id, key))'''
75
        self.con.execute(sql)
76
        sql = '''create table if not exists blocks (
77
                    block_id text, data blob, primary key (block_id))'''
78
        self.con.execute(sql)
79
        sql = '''create table if not exists hashmaps (
80
                    version_id integer, pos integer, block_id text, primary key (version_id, pos))'''
81
        self.con.execute(sql)
82
        self.con.commit()
83
    
84
    def delete_account(self, account):
85
        """Delete the account with the given name."""
86
        
87
        logger.debug("delete_account: %s", account)
88
        count, bytes, tstamp = self._get_pathstats(account)
89
        if count > 0:
90
            raise IndexError('Account is not empty')
91
        self._del_path(account) # Point of no return.
92
    
93
    def get_account_meta(self, account, until=None):
94
        """Return a dictionary with the account metadata."""
95
        
96
        logger.debug("get_account_meta: %s %s", account, until)
97
        try:
98
            version_id, mtime = self._get_accountinfo(account, until)
99
        except NameError:
100
            version_id = None
101
        count, bytes, tstamp = self._get_pathstats(account, until)
102
        if mtime > tstamp:
103
            tstamp = mtime
104
        if until is None:
105
            modified = tstamp
106
        else:
107
            modified = self._get_pathstats(account)[2] # Overall last modification
108
            if mtime > modified:
109
                modified = mtime
110
        
111
        # Proper count.
112
        sql = 'select count(name) from (%s) where name glob ? and not name glob ?'
113
        sql = sql % self._sql_until(until)
114
        c = self.con.execute(sql, (account + '/*', account + '/*/*'))
115
        row = c.fetchone()
116
        count = row[0]
117
        
118
        meta = self._get_metadata(account, version_id)
119
        meta.update({'name': account, 'count': count, 'bytes': bytes})
120
        if modified:
121
            meta.update({'modified': modified})
122
        if until is not None:
123
            meta.update({'until_timestamp': tstamp})
124
        return meta
125
    
126
    def update_account_meta(self, account, meta, replace=False):
127
        """Update the metadata associated with the account."""
128
        
129
        logger.debug("update_account_meta: %s %s %s", account, meta, replace)
130
        self._put_metadata(account, meta, replace)
131
    
132
    def list_containers(self, account, marker=None, limit=10000, until=None):
133
        """Return a list of containers existing under an account."""
134
        
135
        logger.debug("list_containers: %s %s %s %s", account, marker, limit, until)
136
        return self._list_objects(account, '', '/', marker, limit, False, [], until)
137
    
138
    def put_container(self, account, container):
139
        """Create a new container with the given name."""
140
        
141
        logger.debug("put_container: %s %s", account, container)
142
        try:
143
            path, version_id, mtime = self._get_containerinfo(account, container)
144
        except NameError:
145
            path = os.path.join(account, container)
146
            version_id = self._put_version(path)
147
        else:
148
            raise NameError('Container already exists')
149
    
150
    def delete_container(self, account, container):
151
        """Delete the container with the given name."""
152
        
153
        logger.debug("delete_container: %s %s", account, container)
154
        path, version_id, mtime = self._get_containerinfo(account, container)
155
        count, bytes, tstamp = self._get_pathstats(path)
156
        if count > 0:
157
            raise IndexError('Container is not empty')
158
        self._del_path(path) # Point of no return.
159
        self._copy_version(account, account, True, True) # New account version.
160
    
161
    def get_container_meta(self, account, container, until=None):
162
        """Return a dictionary with the container metadata."""
163
        
164
        logger.debug("get_container_meta: %s %s %s", account, container, until)
165
        
166
        path, version_id, mtime = self._get_containerinfo(account, container, until)
167
        count, bytes, tstamp = self._get_pathstats(path, until)
168
        if mtime > tstamp:
169
            tstamp = mtime
170
        if until is None:
171
            modified = tstamp
172
        else:
173
            modified = self._get_pathstats(path)[2] # Overall last modification
174
            if mtime > modified:
175
                modified = mtime
176
        
177
        meta = self._get_metadata(path, version_id)
178
        meta.update({'name': container, 'count': count, 'bytes': bytes, 'modified': modified})
179
        if until is not None:
180
            meta.update({'until_timestamp': tstamp})
181
        return meta
182
    
183
    def update_container_meta(self, account, container, meta, replace=False):
184
        """Update the metadata associated with the container."""
185
        
186
        logger.debug("update_container_meta: %s %s %s %s", account, container, meta, replace)
187
        path, version_id, mtime = self._get_containerinfo(account, container)
188
        self._put_metadata(path, meta, replace)
189
    
190
    def list_objects(self, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], until=None):
191
        """Return a list of objects existing under a container."""
192
        
193
        logger.debug("list_objects: %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, until)
194
        path, version_id, mtime = self._get_containerinfo(account, container, until)
195
        return self._list_objects(path, prefix, delimiter, marker, limit, virtual, keys, until)
196
    
197
    def list_object_meta(self, account, container, until=None):
198
        """Return a list with all the container's object meta keys."""
199
        
200
        logger.debug("list_object_meta: %s %s %s", account, container, until)
201
        path, version_id, mtime = self._get_containerinfo(account, container, until)
202
        sql = '''select distinct m.key from (%s) o, metadata m
203
                    where m.version_id = o.version_id and o.name like ?'''
204
        sql = sql % self._sql_until(until)
205
        c = self.con.execute(sql, (path + '/%',))
206
        return [x[0] for x in c.fetchall()]
207
    
208
    def get_object_meta(self, account, container, name, version=None):
209
        """Return a dictionary with the object metadata."""
210
        
211
        logger.debug("get_object_meta: %s %s %s %s", account, container, name, version)
212
        path, version_id, mtime, size = self._get_objectinfo(account, container, name, version)
213
        if version is None:
214
            modified = mtime
215
        else:
216
            modified = self._get_version(path)[1] # Overall last modification
217
        
218
        meta = self._get_metadata(path, version_id)
219
        meta.update({'name': name, 'bytes': size, 'version': version_id, 'version_timestamp': mtime, 'modified': modified})
220
        return meta
221
    
222
    def update_object_meta(self, account, container, name, meta, replace=False):
223
        """Update the metadata associated with the object."""
224
        
225
        logger.debug("update_object_meta: %s %s %s %s %s", account, container, name, meta, replace)
226
        path, version_id, mtime, size = self._get_objectinfo(account, container, name)
227
        self._put_metadata(path, meta, replace)
228
    
229
    def get_object_hashmap(self, account, container, name, version=None):
230
        """Return the object's size and a list with partial hashes."""
231
        
232
        logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
233
        path, version_id, mtime, size = self._get_objectinfo(account, container, name, version)
234
        sql = 'select block_id from hashmaps where version_id = ? order by pos asc'
235
        c = self.con.execute(sql, (version_id,))
236
        hashmap = [x[0] for x in c.fetchall()]
237
        return size, hashmap
238
    
239
    def update_object_hashmap(self, account, container, name, size, hashmap):
240
        """Create/update an object with the specified size and partial hashes."""
241
        
242
        logger.debug("update_object_hashmap: %s %s %s %s %s", account, container, name, size, hashmap)
243
        path = self._get_containerinfo(account, container)[0]
244
        path = os.path.join(path, name)
245
        src_version_id, dest_version_id = self._copy_version(path, path, True, False)
246
        sql = 'update versions set size = ? where version_id = ?'
247
        self.con.execute(sql, (size, dest_version_id))
248
        # TODO: Check for block_id existence.
249
        for i in range(len(hashmap)):
250
            sql = 'insert or replace into hashmaps (version_id, pos, block_id) values (?, ?, ?)'
251
            self.con.execute(sql, (dest_version_id, i, hashmap[i]))
252
        self.con.commit()
253
    
254
    def copy_object(self, account, src_container, src_name, dest_container, dest_name, dest_meta={}, replace_meta=False, src_version=None):
255
        """Copy an object's data and metadata."""
256
        
257
        logger.debug("copy_object: %s %s %s %s %s %s %s %s", account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, src_version)
258
        if src_version is None:
259
            src_path = self._get_objectinfo(account, src_container, src_name)[0]
260
        else:
261
            src_path = os.path.join(account, src_container, src_name)
262
        dest_path = self._get_containerinfo(account, dest_container)[0]
263
        dest_path = os.path.join(dest_path, dest_name)
264
        src_version_id, dest_version_id = self._copy_version(src_path, dest_path, not replace_meta, True, src_version)
265
        for k, v in dest_meta.iteritems():
266
            sql = 'insert or replace into metadata (version_id, key, value) values (?, ?, ?)'
267
            self.con.execute(sql, (dest_version_id, k, v))
268
        self.con.commit()
269
    
270
    def move_object(self, account, src_container, src_name, dest_container, dest_name, dest_meta={}, replace_meta=False, src_version=None):
271
        """Move an object's data and metadata."""
272
        
273
        logger.debug("move_object: %s %s %s %s, %s %s %s %s", account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, src_version)
274
        self.copy_object(account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, src_version)
275
        self.delete_object(account, src_container, src_name)
276
    
277
    def delete_object(self, account, container, name):
278
        """Delete an object."""
279
        
280
        logger.debug("delete_object: %s %s %s", account, container, name)
281
        path, version_id, mtime, size = self._get_objectinfo(account, container, name)
282
        self._put_version(path, 0, 1)
283
    
284
    def list_versions(self, account, container, name):
285
        """Return a list of version (version_id, version_modified) tuples for an object."""
286
        
287
        # This will even show deleted versions.
288
        path = os.path.join(account, container, name)
289
        sql = '''select distinct version_id, strftime('%s', tstamp) from versions where name = ?'''
290
        c = self.con.execute(sql, (path,))
291
        return [(str(x[0]), int(x[1])) for x in c.fetchall()]
292
    
293
    def get_block(self, hash):
294
        """Return a block's data."""
295
        
296
        logger.debug("get_block: %s", hash)
297
        c = self.con.execute('select data from blocks where block_id = ?', (hash,))
298
        row = c.fetchone()
299
        if row:
300
            return str(row[0])
301
        else:
302
            raise NameError('Block does not exist')
303
    
304
    def put_block(self, data):
305
        """Create a block and return the hash."""
306
        
307
        logger.debug("put_block: %s", len(data))
308
        h = hashlib.new(self.hash_algorithm)
309
        h.update(data.rstrip('\x00'))
310
        hash = h.hexdigest()
311
        sql = 'insert or ignore into blocks (block_id, data) values (?, ?)'
312
        self.con.execute(sql, (hash, buffer(data)))
313
        self.con.commit()
314
        return hash
315
    
316
    def update_block(self, hash, data, offset=0):
317
        """Update a known block and return the hash."""
318
        
319
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
320
        if offset == 0 and len(data) == self.block_size:
321
            return self.put_block(data)
322
        src_data = self.get_block(hash)
323
        bs = self.block_size
324
        if offset < 0 or offset > bs or offset + len(data) > bs:
325
            raise IndexError('Offset or data outside block limits')
326
        dest_data = src_data[:offset] + data + src_data[offset + len(data):]
327
        return self.put_block(dest_data)
328
    
329
    def _sql_until(self, until=None):
330
        """Return the sql to get the latest versions until the timestamp given."""
331
        if until is None:
332
            until = int(time.time())
333
        sql = '''select version_id, name, strftime('%s', tstamp) as tstamp, size from versions v
334
                    where version_id = (select max(version_id) from versions
335
                                        where v.name = name and tstamp <= datetime(%s, 'unixepoch'))
336
                    and hide = 0'''
337
        return sql % ('%s', until)
338
    
339
    def _get_pathstats(self, path, until=None):
340
        """Return count and sum of size of everything under path and latest timestamp."""
341
        
342
        sql = 'select count(version_id), total(size), max(tstamp) from (%s) where name like ?'
343
        sql = sql % self._sql_until(until)
344
        c = self.con.execute(sql, (path + '/%',))
345
        row = c.fetchone()
346
        tstamp = row[2] if row[2] is not None else 0
347
        return int(row[0]), int(row[1]), int(tstamp)
348
    
349
    def _get_version(self, path, version=None):
350
        if version is None:            
351
            sql = '''select version_id, strftime('%s', tstamp), size, hide from versions where name = ?
352
                        order by version_id desc limit 1'''
353
            c = self.con.execute(sql, (path,))
354
            row = c.fetchone()
355
            if not row or int(row[3]):
356
                raise NameError('Object does not exist')
357
        else:
358
            sql = '''select version_id, strftime('%s', tstamp), size from versions where name = ?
359
                        and version_id = ?'''
360
            c = self.con.execute(sql, (path, version))
361
            row = c.fetchone()
362
            if not row:
363
                raise IndexError('Version does not exist')
364
        return str(row[0]), int(row[1]), int(row[2])
365
    
366
    def _put_version(self, path, size=0, hide=0):
367
        sql = 'insert into versions (name, size, hide) values (?, ?, ?)'
368
        id = self.con.execute(sql, (path, size, hide)).lastrowid
369
        self.con.commit()
370
        return str(id)
371
    
372
    def _copy_version(self, src_path, dest_path, copy_meta=True, copy_data=True, src_version=None):
373
        if src_version is not None:
374
            src_version_id, mtime, size = self._get_version(src_path, src_version)
375
        else:
376
            # Latest or create from scratch.
377
            try:
378
                src_version_id, mtime, size = self._get_version(src_path)
379
            except NameError:
380
                src_version_id = None
381
                size = 0
382
        if not copy_data:
383
            size = 0
384
        dest_version_id = self._put_version(dest_path, size)
385
        if copy_meta and src_version_id is not None:
386
            sql = 'insert into metadata select %s, key, value from metadata where version_id = ?'
387
            sql = sql % dest_version_id
388
            self.con.execute(sql, (src_version_id,))
389
        if copy_data and src_version_id is not None:
390
            sql = 'insert into hashmaps select %s, pos, block_id from hashmaps where version_id = ?'
391
            sql = sql % dest_version_id
392
            self.con.execute(sql, (src_version_id,))
393
        self.con.commit()
394
        return src_version_id, dest_version_id
395
    
396
    def _get_versioninfo(self, account, container, name, until=None):
397
        """Return path, latest version, associated timestamp and size until the timestamp given."""
398
        
399
        p = (account, container, name)
400
        try:
401
            p = p[:p.index(None)]
402
        except ValueError:
403
            pass
404
        path = os.path.join(*p)
405
        sql = '''select version_id, tstamp, size from (%s) where name = ?'''
406
        sql = sql % self._sql_until(until)
407
        c = self.con.execute(sql, (path,))
408
        row = c.fetchone()
409
        if row is None:
410
            raise NameError('Path does not exist')
411
        return path, str(row[0]), int(row[1]), int(row[2])
412
    
413
    def _get_accountinfo(self, account, until=None):
414
        try:
415
            path, version_id, mtime, size = self._get_versioninfo(account, None, None, until)
416
            return version_id, mtime
417
        except:
418
            raise NameError('Account does not exist')
419
    
420
    def _get_containerinfo(self, account, container, until=None):
421
        try:
422
            path, version_id, mtime, size = self._get_versioninfo(account, container, None, until)
423
            return path, version_id, mtime
424
        except:
425
            raise NameError('Container does not exist')
426
    
427
    def _get_objectinfo(self, account, container, name, version=None):
428
        path = os.path.join(account, container, name)
429
        version_id, mtime, size = self._get_version(path, version)
430
        return path, version_id, mtime, size
431
    
432
    def _get_metadata(self, path, version):
433
        sql = 'select key, value from metadata where version_id = ?'
434
        c = self.con.execute(sql, (version,))
435
        return dict(c.fetchall())
436
    
437
    def _put_metadata(self, path, meta, replace=False):
438
        """Create a new version and store metadata."""
439
        
440
        src_version_id, dest_version_id = self._copy_version(path, path, not replace, True)
441
        for k, v in meta.iteritems():
442
            sql = 'insert or replace into metadata (version_id, key, value) values (?, ?, ?)'
443
            self.con.execute(sql, (dest_version_id, k, v))
444
        self.con.commit()
445
    
446
    def _list_objects(self, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], until=None):
447
        cont_prefix = path + '/'
448
        if keys and len(keys) > 0:
449
            sql = '''select distinct o.name, o.version_id from (%s) o, metadata m where o.name like ? and
450
                        m.version_id = o.version_id and m.key in (%s) order by o.name'''
451
            sql = sql % (self._sql_until(until), ', '.join('?' * len(keys)))
452
            param = (cont_prefix + prefix + '%',) + tuple(keys)
453
        else:
454
            sql = 'select name, version_id from (%s) where name like ? order by name'
455
            sql = sql % self._sql_until(until)
456
            param = (cont_prefix + prefix + '%',)
457
        c = self.con.execute(sql, param)
458
        objects = [(x[0][len(cont_prefix):], x[1]) for x in c.fetchall()]
459
        if delimiter:
460
            pseudo_objects = []
461
            for x in objects:
462
                pseudo_name = x[0]
463
                i = pseudo_name.find(delimiter, len(prefix))
464
                if not virtual:
465
                    # If the delimiter is not found, or the name ends
466
                    # with the delimiter's first occurence.
467
                    if i == -1 or len(pseudo_name) == i + len(delimiter):
468
                        pseudo_objects.append(x)
469
                else:
470
                    # If the delimiter is found, keep up to (and including) the delimiter.
471
                    if i != -1:
472
                        pseudo_name = pseudo_name[:i + len(delimiter)]
473
                    if pseudo_name not in [y[0] for y in pseudo_objects]:
474
                        pseudo_objects.append((pseudo_name, x[1]))
475
            objects = pseudo_objects
476
        
477
        start = 0
478
        if marker:
479
            try:
480
                start = [x[0] for x in objects].index(marker) + 1
481
            except ValueError:
482
                pass
483
        if not limit or limit > 10000:
484
            limit = 10000
485
        return objects[start:start + limit]
486
    
487
    def _del_path(self, path):
488
        sql = '''delete from hashmaps where version_id in
489
                    (select version_id from versions where name = ?)'''
490
        self.con.execute(sql, (path,))
491
        sql = '''delete from metadata where version_id in
492
                    (select version_id from versions where name = ?)'''
493
        self.con.execute(sql, (path,))
494
        sql = '''delete from versions where name = ?'''
495
        self.con.execute(sql, (path,))
496
        self.con.commit()