Statistics
| Branch: | Tag: | Revision:

root / pithos / backends / simple.py @ 58a6c894

History | View | Annotate | Download (21.9 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
# 
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
# 
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
# 
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
# 
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
# 
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import os
35
import time
36
import sqlite3
37
import logging
38
import types
39
import hashlib
40
import shutil
41
import pickle
42

    
43
from base import BaseBackend
44

    
45

    
46
logger = logging.getLogger(__name__)
47

    
48

    
49
class SimpleBackend(BaseBackend):
50
    """A simple backend.
51
    
52
    Uses SQLite for storage.
53
    """
54
    
55
    # TODO: Automatic/manual clean-up after a time interval.
56
    
57
    def __init__(self, db):
58
        self.hash_algorithm = 'sha1'
59
        self.block_size = 128 * 1024 # 128KB
60
        
61
        basepath = os.path.split(db)[0]
62
        if basepath and not os.path.exists(basepath):
63
            os.makedirs(basepath)
64
        
65
        self.con = sqlite3.connect(db)
66
        sql = '''create table if not exists versions (
67
                    version_id integer primary key,
68
                    name text,
69
                    tstamp datetime default current_timestamp,
70
                    size integer default 0,
71
                    hide integer default 0)'''
72
        self.con.execute(sql)
73
        sql = '''create table if not exists metadata (
74
                    version_id integer, key text, value text, primary key (version_id, key))'''
75
        self.con.execute(sql)
76
        sql = '''create table if not exists blocks (
77
                    block_id text, data blob, primary key (block_id))'''
78
        self.con.execute(sql)
79
        sql = '''create table if not exists hashmaps (
80
                    version_id integer, pos integer, block_id text, primary key (version_id, pos))'''
81
        self.con.execute(sql)
82
        self.con.commit()
83
    
84
    def delete_account(self, account):
85
        """Delete the account with the given name."""
86
        
87
        logger.debug("delete_account: %s", account)
88
        count, bytes, tstamp = self._get_pathstats(account)
89
        if count > 0:
90
            raise IndexError('Account is not empty')
91
        self._del_path(account) # Point of no return.
92
    
93
    def get_account_meta(self, account, until=None):
94
        """Return a dictionary with the account metadata."""
95
        
96
        logger.debug("get_account_meta: %s %s", account, until)
97
        try:
98
            version_id, mtime = self._get_accountinfo(account, until)
99
        except NameError:
100
            version_id = None
101
        count, bytes, tstamp = self._get_pathstats(account, until)
102
        if until is None:
103
            modified = tstamp
104
        else:
105
            modified = self._get_pathstats(account)[2] # Overall last modification
106
        
107
        # Proper count.
108
        sql = 'select count(name) from (%s) where name glob ? and not name glob ?'
109
        sql = sql % self._sql_until(until)
110
        c = self.con.execute(sql, (account + '/*', account + '/*/*'))
111
        row = c.fetchone()
112
        count = row[0]
113
        
114
        meta = self._get_metadata(account, version_id)
115
        meta.update({'name': account, 'count': count, 'bytes': bytes})
116
        if modified:
117
            meta.update({'modified': modified})
118
        if until is not None:
119
            meta.update({'until_timestamp': tstamp})
120
        return meta
121
    
122
    def update_account_meta(self, account, meta, replace=False):
123
        """Update the metadata associated with the account."""
124
        
125
        logger.debug("update_account_meta: %s %s %s", account, meta, replace)
126
        self._put_metadata(account, meta, replace)
127
    
128
    def list_containers(self, account, marker=None, limit=10000, until=None):
129
        """Return a list of containers existing under an account."""
130
        
131
        logger.debug("list_containers: %s %s %s %s", account, marker, limit, until)
132
        return self._list_objects(account, '', '/', marker, limit, False, [], until)
133
    
134
    def put_container(self, account, container):
135
        """Create a new container with the given name."""
136
        
137
        logger.debug("put_container: %s %s", account, container)
138
        try:
139
            path, version_id, mtime = self._get_containerinfo(account, container)
140
        except NameError:
141
            path = os.path.join(account, container)
142
            version_id = self._put_version(path)
143
        else:
144
            raise NameError('Container already exists')
145
    
146
    def delete_container(self, account, container):
147
        """Delete the container with the given name."""
148
        
149
        logger.debug("delete_container: %s %s", account, container)
150
        path, version_id, mtime = self._get_containerinfo(account, container)
151
        count, bytes, tstamp = self._get_pathstats(path)
152
        if count > 0:
153
            raise IndexError('Container is not empty')
154
        self._del_path(path) # Point of no return.
155
        self._copy_version(account, account, True, True) # New account version.
156
    
157
    def get_container_meta(self, account, container, until=None):
158
        """Return a dictionary with the container metadata."""
159
        
160
        logger.debug("get_container_meta: %s %s %s", account, container, until)
161
        
162
        path, version_id, mtime = self._get_containerinfo(account, container, until)
163
        count, bytes, tstamp = self._get_pathstats(path, until)
164
        if until is None:
165
            modified = tstamp
166
        else:
167
            modified = self._get_pathstats(account)[2] # Overall last modification
168
        
169
        meta = self._get_metadata(path, version_id)
170
        meta.update({'name': container, 'count': count, 'bytes': bytes, 'modified': modified})
171
        if until is not None:
172
            meta.update({'until_timestamp': tstamp})
173
        return meta
174
    
175
    def update_container_meta(self, account, container, meta, replace=False):
176
        """Update the metadata associated with the container."""
177
        
178
        logger.debug("update_container_meta: %s %s %s %s", account, container, meta, replace)
179
        path, version_id, mtime = self._get_containerinfo(account, container)
180
        self._put_metadata(path, meta, replace)
181
    
182
    def list_objects(self, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], until=None):
183
        """Return a list of objects existing under a container."""
184
        
185
        logger.debug("list_objects: %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, until)
186
        path, version_id, mtime = self._get_containerinfo(account, container, until)
187
        return self._list_objects(path, prefix, delimiter, marker, limit, virtual, keys, until)
188
    
189
    def list_object_meta(self, account, container, until=None):
190
        """Return a list with all the container's object meta keys."""
191
        
192
        logger.debug("list_object_meta: %s %s %s", account, container, until)
193
        path, version_id, mtime = self._get_containerinfo(account, container, until)
194
        sql = '''select distinct m.key from (%s) o, metadata m
195
                    where m.version_id = o.version_id and o.name like ?'''
196
        sql = sql % self._sql_until(until)
197
        c = self.con.execute(sql, (path + '/%',))
198
        return [x[0] for x in c.fetchall()]
199
    
200
    def get_object_meta(self, account, container, name, version=None):
201
        """Return a dictionary with the object metadata."""
202
        
203
        logger.debug("get_object_meta: %s %s %s %s", account, container, name, version)
204
        path, version_id, mtime, size = self._get_objectinfo(account, container, name, version)
205
        if version is None:
206
            modified = mtime
207
        else:
208
            modified = self._get_version(path)[1] # Overall last modification
209
        
210
        meta = self._get_metadata(path, version_id)
211
        meta.update({'name': name, 'bytes': size, 'version': version_id, 'version_timestamp': mtime, 'modified': modified})
212
        return meta
213
    
214
    def update_object_meta(self, account, container, name, meta, replace=False):
215
        """Update the metadata associated with the object."""
216
        
217
        logger.debug("update_object_meta: %s %s %s %s %s", account, container, name, meta, replace)
218
        path, version_id, mtime, size = self._get_objectinfo(account, container, name)
219
        self._put_metadata(path, meta, replace)
220
    
221
    def get_object_hashmap(self, account, container, name, version=None):
222
        """Return the object's size and a list with partial hashes."""
223
        
224
        logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
225
        path, version_id, mtime, size = self._get_objectinfo(account, container, name, version)
226
        sql = 'select block_id from hashmaps where version_id = ? order by pos asc'
227
        c = self.con.execute(sql, (version_id,))
228
        hashmap = [x[0] for x in c.fetchall()]
229
        return size, hashmap
230
    
231
    def update_object_hashmap(self, account, container, name, size, hashmap):
232
        """Create/update an object with the specified size and partial hashes."""
233
        
234
        logger.debug("update_object_hashmap: %s %s %s %s %s", account, container, name, size, hashmap)
235
        path = self._get_containerinfo(account, container)[0]
236
        path = os.path.join(path, name)
237
        src_version_id, dest_version_id = self._copy_version(path, path, True, False)
238
        sql = 'update versions set size = ? where version_id = ?'
239
        self.con.execute(sql, (size, dest_version_id))
240
        # TODO: Check for block_id existence.
241
        for i in range(len(hashmap)):
242
            sql = 'insert or replace into hashmaps (version_id, pos, block_id) values (?, ?, ?)'
243
            self.con.execute(sql, (dest_version_id, i, hashmap[i]))
244
        self.con.commit()
245
    
246
    def copy_object(self, account, src_container, src_name, dest_container, dest_name, dest_meta={}, replace_meta=False, src_version=None):
247
        """Copy an object's data and metadata."""
248
        
249
        logger.debug("copy_object: %s %s %s %s %s %s %s %s", account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, src_version)
250
        if src_version is None:
251
            src_path = self._get_objectinfo(account, src_container, src_name)[0]
252
        else:
253
            src_path = os.path.join(account, src_container, src_name)
254
        dest_path = self._get_containerinfo(account, dest_container)[0]
255
        dest_path = os.path.join(dest_path, dest_name)
256
        src_version_id, dest_version_id = self._copy_version(src_path, dest_path, not replace_meta, True, src_version)
257
        for k, v in dest_meta.iteritems():
258
            sql = 'insert or replace into metadata (version_id, key, value) values (?, ?, ?)'
259
            self.con.execute(sql, (dest_version_id, k, v))
260
        self.con.commit()
261
    
262
    def move_object(self, account, src_container, src_name, dest_container, dest_name, dest_meta={}, replace_meta=False, src_version=None):
263
        """Move an object's data and metadata."""
264
        
265
        logger.debug("move_object: %s %s %s %s, %s %s %s %s", account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, src_version)
266
        self.copy_object(account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, src_version)
267
        self.delete_object(account, src_container, src_name)
268
    
269
    def delete_object(self, account, container, name):
270
        """Delete an object."""
271
        
272
        logger.debug("delete_object: %s %s %s", account, container, name)
273
        path, version_id, mtime, size = self._get_objectinfo(account, container, name)
274
        self._put_version(path, 0, 1)
275
    
276
    def list_versions(self, account, container, name):
277
        """Return a list of version (version_id, version_modified) tuples for an object."""
278
        
279
        # This will even show deleted versions.
280
        path = os.path.join(account, container, name)
281
        sql = '''select distinct version_id, strftime('%s', tstamp) from versions where name = ?'''
282
        c = self.con.execute(sql, (path,))
283
        return [(str(x[0]), int(x[1])) for x in c.fetchall()]
284
    
285
    def get_block(self, hash):
286
        """Return a block's data."""
287
        
288
        logger.debug("get_block: %s", hash)
289
        c = self.con.execute('select data from blocks where block_id = ?', (hash,))
290
        row = c.fetchone()
291
        if row:
292
            return str(row[0])
293
        else:
294
            raise NameError('Block does not exist')
295
    
296
    def put_block(self, data):
297
        """Create a block and return the hash."""
298
        
299
        logger.debug("put_block: %s", len(data))
300
        h = hashlib.new(self.hash_algorithm)
301
        h.update(data.rstrip('\x00'))
302
        hash = h.hexdigest()
303
        sql = 'insert or ignore into blocks (block_id, data) values (?, ?)'
304
        self.con.execute(sql, (hash, buffer(data)))
305
        self.con.commit()
306
        return hash
307
    
308
    def update_block(self, hash, data, offset=0):
309
        """Update a known block and return the hash."""
310
        
311
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
312
        if offset == 0 and len(data) == self.block_size:
313
            return self.put_block(data)
314
        src_data = self.get_block(hash)
315
        bs = self.block_size
316
        if offset < 0 or offset > bs or offset + len(data) > bs:
317
            raise IndexError('Offset or data outside block limits')
318
        dest_data = src_data[:offset] + data + src_data[offset + len(data):]
319
        return self.put_block(dest_data)
320
    
321
    def _sql_until(self, until=None):
322
        """Return the sql to get the latest versions until the timestamp given."""
323
        if until is None:
324
            until = int(time.time())
325
        sql = '''select version_id, name, strftime('%s', tstamp) as tstamp, size from versions v
326
                    where version_id = (select max(version_id) from versions
327
                                        where v.name = name and tstamp <= datetime(%s, 'unixepoch'))
328
                    and hide = 0'''
329
        return sql % ('%s', until)
330
    
331
    def _get_pathstats(self, path, until=None):
332
        """Return count and sum of size of everything under path and latest timestamp."""
333
        
334
        sql = 'select count(version_id), total(size), max(tstamp) from (%s) where name like ?'
335
        sql = sql % self._sql_until(until)
336
        c = self.con.execute(sql, (path + '/%',))
337
        row = c.fetchone()
338
        tstamp = row[2] if row[2] is not None else 0
339
        return int(row[0]), int(row[1]), int(tstamp)
340
    
341
    def _get_version(self, path, version=None):
342
        if version is None:            
343
            sql = '''select version_id, strftime('%s', tstamp), size, hide from versions where name = ?
344
                        order by version_id desc limit 1'''
345
            c = self.con.execute(sql, (path,))
346
            row = c.fetchone()
347
            if not row or int(row[3]):
348
                raise NameError('Object does not exist')
349
        else:
350
            sql = '''select version_id, strftime('%s', tstamp), size from versions where name = ?
351
                        and version_id = ?'''
352
            c = self.con.execute(sql, (path, version))
353
            row = c.fetchone()
354
            if not row:
355
                raise IndexError('Version does not exist')
356
        return str(row[0]), int(row[1]), int(row[2])
357
    
358
    def _put_version(self, path, size=0, hide=0):
359
        sql = 'insert into versions (name, size, hide) values (?, ?, ?)'
360
        id = self.con.execute(sql, (path, size, hide)).lastrowid
361
        self.con.commit()
362
        return str(id)
363
    
364
    def _copy_version(self, src_path, dest_path, copy_meta=True, copy_data=True, src_version=None):
365
        if src_version is not None:
366
            src_version_id, mtime, size = self._get_version(src_path, src_version)
367
        else:
368
            # Latest or create from scratch.
369
            try:
370
                src_version_id, mtime, size = self._get_version(src_path)
371
            except NameError:
372
                src_version_id = None
373
                size = 0
374
        if not copy_data:
375
            size = 0
376
        dest_version_id = self._put_version(dest_path, size)
377
        if copy_meta and src_version_id is not None:
378
            sql = 'insert into metadata select %s, key, value from metadata where version_id = ?'
379
            sql = sql % dest_version_id
380
            self.con.execute(sql, (src_version_id,))
381
        if copy_data and src_version_id is not None:
382
            sql = 'insert into hashmaps select %s, pos, block_id from hashmaps where version_id = ?'
383
            sql = sql % dest_version_id
384
            self.con.execute(sql, (src_version_id,))
385
        self.con.commit()
386
        return src_version_id, dest_version_id
387
    
388
    def _get_versioninfo(self, account, container, name, until=None):
389
        """Return path, latest version, associated timestamp and size until the timestamp given."""
390
        
391
        p = (account, container, name)
392
        try:
393
            p = p[:p.index(None)]
394
        except ValueError:
395
            pass
396
        path = os.path.join(*p)
397
        sql = '''select version_id, tstamp, size from (%s) where name = ?'''
398
        sql = sql % self._sql_until(until)
399
        c = self.con.execute(sql, (path,))
400
        row = c.fetchone()
401
        if row is None:
402
            raise NameError('Path does not exist')
403
        return path, str(row[0]), int(row[1]), int(row[2])
404
    
405
    def _get_accountinfo(self, account, until=None):
406
        try:
407
            path, version_id, mtime, size = self._get_versioninfo(account, None, None, until)
408
            return version_id, mtime
409
        except:
410
            raise NameError('Account does not exist')
411
    
412
    def _get_containerinfo(self, account, container, until=None):
413
        try:
414
            path, version_id, mtime, size = self._get_versioninfo(account, container, None, until)
415
            return path, version_id, mtime
416
        except:
417
            raise NameError('Container does not exist')
418
    
419
    def _get_objectinfo(self, account, container, name, version=None):
420
        path = os.path.join(account, container, name)
421
        version_id, mtime, size = self._get_version(path, version)
422
        return path, version_id, mtime, size
423
    
424
    def _get_metadata(self, path, version):
425
        sql = 'select key, value from metadata where version_id = ?'
426
        c = self.con.execute(sql, (version,))
427
        return dict(c.fetchall())
428
    
429
    def _put_metadata(self, path, meta, replace=False):
430
        """Create a new version and store metadata."""
431
        
432
        src_version_id, dest_version_id = self._copy_version(path, path, not replace, True)
433
        for k, v in meta.iteritems():
434
            sql = 'insert or replace into metadata (version_id, key, value) values (?, ?, ?)'
435
            self.con.execute(sql, (dest_version_id, k, v))
436
        self.con.commit()
437
    
438
    def _list_objects(self, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], until=None):
439
        cont_prefix = path + '/'
440
        if keys and len(keys) > 0:
441
            sql = '''select distinct o.name, o.version_id from (%s) o, metadata m where o.name like ? and
442
                        m.version_id = o.version_id and m.key in (%s) order by o.name'''
443
            sql = sql % (self._sql_until(until), ', '.join('?' * len(keys)))
444
            param = (cont_prefix + prefix + '%',) + tuple(keys)
445
        else:
446
            sql = 'select name, version_id from (%s) where name like ? order by name'
447
            sql = sql % self._sql_until(until)
448
            param = (cont_prefix + prefix + '%',)
449
        c = self.con.execute(sql, param)
450
        objects = [(x[0][len(cont_prefix):], x[1]) for x in c.fetchall()]
451
        if delimiter:
452
            pseudo_objects = []
453
            for x in objects:
454
                pseudo_name = x[0]
455
                i = pseudo_name.find(delimiter, len(prefix))
456
                if not virtual:
457
                    # If the delimiter is not found, or the name ends
458
                    # with the delimiter's first occurence.
459
                    if i == -1 or len(pseudo_name) == i + len(delimiter):
460
                        pseudo_objects.append(x)
461
                else:
462
                    # If the delimiter is found, keep up to (and including) the delimiter.
463
                    if i != -1:
464
                        pseudo_name = pseudo_name[:i + len(delimiter)]
465
                    if pseudo_name not in [y[0] for y in pseudo_objects]:
466
                        pseudo_objects.append((pseudo_name, x[1]))
467
            objects = pseudo_objects
468
        
469
        start = 0
470
        if marker:
471
            try:
472
                start = [x[0] for x in objects].index(marker) + 1
473
            except ValueError:
474
                pass
475
        if not limit or limit > 10000:
476
            limit = 10000
477
        return objects[start:start + limit]
478
    
479
    def _del_path(self, path):
480
        sql = '''delete from hashmaps where version_id in
481
                    (select version_id from versions where name = ?)'''
482
        self.con.execute(sql, (path,))
483
        sql = '''delete from metadata where version_id in
484
                    (select version_id from versions where name = ?)'''
485
        self.con.execute(sql, (path,))
486
        sql = '''delete from versions where name = ?'''
487
        self.con.execute(sql, (path,))
488
        self.con.commit()