Statistics
| Branch: | Tag: | Revision:

root / pithos / backends / simple.py @ 6d817842

History | View | Annotate | Download (22.8 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
# 
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
# 
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
# 
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
# 
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
# 
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import os
35
import time
36
import sqlite3
37
import logging
38
import types
39
import hashlib
40
import shutil
41
import pickle
42

    
43
from base import BaseBackend
44

    
45

    
46
logger = logging.getLogger(__name__)
47

    
48

    
49
class SimpleBackend(BaseBackend):
50
    """A simple backend.
51
    
52
    Uses SQLite for storage.
53
    """
54
    
55
    # TODO: Automatic/manual clean-up after a time interval.
56
    
57
    def __init__(self, db):
58
        self.hash_algorithm = 'sha1'
59
        self.block_size = 128 * 1024 # 128KB
60
        
61
        basepath = os.path.split(db)[0]
62
        if basepath and not os.path.exists(basepath):
63
            os.makedirs(basepath)
64
        
65
        self.con = sqlite3.connect(db, check_same_thread=False)
66
        sql = '''create table if not exists versions (
67
                    version_id integer primary key,
68
                    name text,
69
                    tstamp datetime default current_timestamp,
70
                    size integer default 0,
71
                    hide integer default 0)'''
72
        self.con.execute(sql)
73
        sql = '''create table if not exists metadata (
74
                    version_id integer, key text, value text, primary key (version_id, key))'''
75
        self.con.execute(sql)
76
        sql = '''create table if not exists blocks (
77
                    block_id text, data blob, primary key (block_id))'''
78
        self.con.execute(sql)
79
        sql = '''create table if not exists hashmaps (
80
                    version_id integer, pos integer, block_id text, primary key (version_id, pos))'''
81
        self.con.execute(sql)
82
        self.con.commit()
83
    
84
    def delete_account(self, user, account):
85
        """Delete the account with the given name."""
86
        
87
        logger.debug("delete_account: %s", account)
88
        count, bytes, tstamp = self._get_pathstats(account)
89
        if count > 0:
90
            raise IndexError('Account is not empty')
91
        self._del_path(account) # Point of no return.
92
    
93
    def get_account_meta(self, user, account, until=None):
94
        """Return a dictionary with the account metadata."""
95
        
96
        logger.debug("get_account_meta: %s %s", account, until)
97
        try:
98
            version_id, mtime = self._get_accountinfo(account, until)
99
        except NameError:
100
            version_id = None
101
            mtime = 0
102
        count, bytes, tstamp = self._get_pathstats(account, until)
103
        if mtime > tstamp:
104
            tstamp = mtime
105
        if until is None:
106
            modified = tstamp
107
        else:
108
            modified = self._get_pathstats(account)[2] # Overall last modification
109
            if mtime > modified:
110
                modified = mtime
111
        
112
        # Proper count.
113
        sql = 'select count(name) from (%s) where name glob ? and not name glob ?'
114
        sql = sql % self._sql_until(until)
115
        c = self.con.execute(sql, (account + '/*', account + '/*/*'))
116
        row = c.fetchone()
117
        count = row[0]
118
        
119
        meta = self._get_metadata(account, version_id)
120
        meta.update({'name': account, 'count': count, 'bytes': bytes})
121
        if modified:
122
            meta.update({'modified': modified})
123
        if until is not None:
124
            meta.update({'until_timestamp': tstamp})
125
        return meta
126
    
127
    def update_account_meta(self, user, account, meta, replace=False):
128
        """Update the metadata associated with the account."""
129
        
130
        logger.debug("update_account_meta: %s %s %s", account, meta, replace)
131
        self._put_metadata(account, meta, replace)
132
    
133
    def list_containers(self, user, account, marker=None, limit=10000, until=None):
134
        """Return a list of containers existing under an account."""
135
        
136
        logger.debug("list_containers: %s %s %s %s", account, marker, limit, until)
137
        return self._list_objects(account, '', '/', marker, limit, False, [], until)
138
    
139
    def put_container(self, user, account, container):
140
        """Create a new container with the given name."""
141
        
142
        logger.debug("put_container: %s %s", account, container)
143
        try:
144
            path, version_id, mtime = self._get_containerinfo(account, container)
145
        except NameError:
146
            path = os.path.join(account, container)
147
            version_id = self._put_version(path)
148
        else:
149
            raise NameError('Container already exists')
150
    
151
    def delete_container(self, user, account, container):
152
        """Delete the container with the given name."""
153
        
154
        logger.debug("delete_container: %s %s", account, container)
155
        path, version_id, mtime = self._get_containerinfo(account, container)
156
        count, bytes, tstamp = self._get_pathstats(path)
157
        if count > 0:
158
            raise IndexError('Container is not empty')
159
        self._del_path(path) # Point of no return.
160
        self._copy_version(account, account, True, True) # New account version.
161
    
162
    def get_container_meta(self, user, account, container, until=None):
163
        """Return a dictionary with the container metadata."""
164
        
165
        logger.debug("get_container_meta: %s %s %s", account, container, until)
166
        
167
        path, version_id, mtime = self._get_containerinfo(account, container, until)
168
        count, bytes, tstamp = self._get_pathstats(path, until)
169
        if mtime > tstamp:
170
            tstamp = mtime
171
        if until is None:
172
            modified = tstamp
173
        else:
174
            modified = self._get_pathstats(path)[2] # Overall last modification
175
            if mtime > modified:
176
                modified = mtime
177
        
178
        meta = self._get_metadata(path, version_id)
179
        meta.update({'name': container, 'count': count, 'bytes': bytes, 'modified': modified})
180
        if until is not None:
181
            meta.update({'until_timestamp': tstamp})
182
        return meta
183
    
184
    def update_container_meta(self, user, account, container, meta, replace=False):
185
        """Update the metadata associated with the container."""
186
        
187
        logger.debug("update_container_meta: %s %s %s %s", account, container, meta, replace)
188
        path, version_id, mtime = self._get_containerinfo(account, container)
189
        self._put_metadata(path, meta, replace)
190
    
191
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], until=None):
192
        """Return a list of objects existing under a container."""
193
        
194
        logger.debug("list_objects: %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, until)
195
        path, version_id, mtime = self._get_containerinfo(account, container, until)
196
        return self._list_objects(path, prefix, delimiter, marker, limit, virtual, keys, until)
197
    
198
    def list_object_meta(self, user, account, container, until=None):
199
        """Return a list with all the container's object meta keys."""
200
        
201
        logger.debug("list_object_meta: %s %s %s", account, container, until)
202
        path, version_id, mtime = self._get_containerinfo(account, container, until)
203
        sql = '''select distinct m.key from (%s) o, metadata m
204
                    where m.version_id = o.version_id and o.name like ?'''
205
        sql = sql % self._sql_until(until)
206
        c = self.con.execute(sql, (path + '/%',))
207
        return [x[0] for x in c.fetchall()]
208
    
209
    def get_object_meta(self, user, account, container, name, version=None):
210
        """Return a dictionary with the object metadata."""
211
        
212
        logger.debug("get_object_meta: %s %s %s %s", account, container, name, version)
213
        path, version_id, mtime, size = self._get_objectinfo(account, container, name, version)
214
        if version is None:
215
            modified = mtime
216
        else:
217
            modified = self._get_version(path)[1] # Overall last modification
218
        
219
        meta = self._get_metadata(path, version_id)
220
        meta.update({'name': name, 'bytes': size, 'version': version_id, 'version_timestamp': mtime, 'modified': modified})
221
        return meta
222
    
223
    def update_object_meta(self, user, account, container, name, meta, replace=False):
224
        """Update the metadata associated with the object."""
225
        
226
        logger.debug("update_object_meta: %s %s %s %s %s", account, container, name, meta, replace)
227
        path, version_id, mtime, size = self._get_objectinfo(account, container, name)
228
        self._put_metadata(path, meta, replace)
229
    
230
    def get_object_hashmap(self, user, account, container, name, version=None):
231
        """Return the object's size and a list with partial hashes."""
232
        
233
        logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
234
        path, version_id, mtime, size = self._get_objectinfo(account, container, name, version)
235
        sql = 'select block_id from hashmaps where version_id = ? order by pos asc'
236
        c = self.con.execute(sql, (version_id,))
237
        hashmap = [x[0] for x in c.fetchall()]
238
        return size, hashmap
239
    
240
    def update_object_hashmap(self, user, account, container, name, size, hashmap, meta={}, replace_meta=False):
241
        """Create/update an object with the specified size and partial hashes."""
242
        
243
        logger.debug("update_object_hashmap: %s %s %s %s %s", account, container, name, size, hashmap)
244
        path = self._get_containerinfo(account, container)[0]
245
        path = os.path.join(path, name)
246
        src_version_id, dest_version_id = self._copy_version(path, path, not replace_meta, False)
247
        sql = 'update versions set size = ? where version_id = ?'
248
        self.con.execute(sql, (size, dest_version_id))
249
        # TODO: Check for block_id existence.
250
        for i in range(len(hashmap)):
251
            sql = 'insert or replace into hashmaps (version_id, pos, block_id) values (?, ?, ?)'
252
            self.con.execute(sql, (dest_version_id, i, hashmap[i]))
253
        for k, v in meta.iteritems():
254
            sql = 'insert or replace into metadata (version_id, key, value) values (?, ?, ?)'
255
            self.con.execute(sql, (dest_version_id, k, v))
256
        self.con.commit()
257
    
258
    def copy_object(self, user, account, src_container, src_name, dest_container, dest_name, dest_meta={}, replace_meta=False, src_version=None):
259
        """Copy an object's data and metadata."""
260
        
261
        logger.debug("copy_object: %s %s %s %s %s %s %s %s", account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, src_version)
262
        self._get_containerinfo(account, src_container)
263
        if src_version is None:
264
            src_path = self._get_objectinfo(account, src_container, src_name)[0]
265
        else:
266
            src_path = os.path.join(account, src_container, src_name)
267
        dest_path = self._get_containerinfo(account, dest_container)[0]
268
        dest_path = os.path.join(dest_path, dest_name)
269
        src_version_id, dest_version_id = self._copy_version(src_path, dest_path, not replace_meta, True, src_version)
270
        for k, v in dest_meta.iteritems():
271
            sql = 'insert or replace into metadata (version_id, key, value) values (?, ?, ?)'
272
            self.con.execute(sql, (dest_version_id, k, v))
273
        self.con.commit()
274
    
275
    def move_object(self, user, account, src_container, src_name, dest_container, dest_name, dest_meta={}, replace_meta=False, src_version=None):
276
        """Move an object's data and metadata."""
277
        
278
        logger.debug("move_object: %s %s %s %s, %s %s %s %s", account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, src_version)
279
        self.copy_object(user, account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, src_version)
280
        self.delete_object(user, account, src_container, src_name)
281
    
282
    def delete_object(self, user, account, container, name):
283
        """Delete an object."""
284
        
285
        logger.debug("delete_object: %s %s %s", account, container, name)
286
        path, version_id, mtime, size = self._get_objectinfo(account, container, name)
287
        self._put_version(path, 0, 1)
288
    
289
    def list_versions(self, user, account, container, name):
290
        """Return a list of all (version, version_timestamp) tuples for an object."""
291
        
292
        logger.debug("list_versions: %s %s %s", account, container, name)
293
        # This will even show deleted versions.
294
        path = os.path.join(account, container, name)
295
        sql = '''select distinct version_id, strftime('%s', tstamp) from versions where name = ?'''
296
        c = self.con.execute(sql, (path,))
297
        return [(int(x[0]), int(x[1])) for x in c.fetchall()]
298
    
299
    def get_block(self, hash):
300
        """Return a block's data."""
301
        
302
        logger.debug("get_block: %s", hash)
303
        c = self.con.execute('select data from blocks where block_id = ?', (hash,))
304
        row = c.fetchone()
305
        if row:
306
            return str(row[0])
307
        else:
308
            raise NameError('Block does not exist')
309
    
310
    def put_block(self, data):
311
        """Create a block and return the hash."""
312
        
313
        logger.debug("put_block: %s", len(data))
314
        h = hashlib.new(self.hash_algorithm)
315
        h.update(data.rstrip('\x00'))
316
        hash = h.hexdigest()
317
        sql = 'insert or ignore into blocks (block_id, data) values (?, ?)'
318
        self.con.execute(sql, (hash, buffer(data)))
319
        self.con.commit()
320
        return hash
321
    
322
    def update_block(self, hash, data, offset=0):
323
        """Update a known block and return the hash."""
324
        
325
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
326
        if offset == 0 and len(data) == self.block_size:
327
            return self.put_block(data)
328
        src_data = self.get_block(hash)
329
        bs = self.block_size
330
        if offset < 0 or offset > bs or offset + len(data) > bs:
331
            raise IndexError('Offset or data outside block limits')
332
        dest_data = src_data[:offset] + data + src_data[offset + len(data):]
333
        return self.put_block(dest_data)
334
    
335
    def _sql_until(self, until=None):
336
        """Return the sql to get the latest versions until the timestamp given."""
337
        if until is None:
338
            until = int(time.time())
339
        sql = '''select version_id, name, strftime('%s', tstamp) as tstamp, size from versions v
340
                    where version_id = (select max(version_id) from versions
341
                                        where v.name = name and tstamp <= datetime(%s, 'unixepoch'))
342
                    and hide = 0'''
343
        return sql % ('%s', until)
344
    
345
    def _get_pathstats(self, path, until=None):
346
        """Return count and sum of size of everything under path and latest timestamp."""
347
        
348
        sql = 'select count(version_id), total(size), max(tstamp) from (%s) where name like ?'
349
        sql = sql % self._sql_until(until)
350
        c = self.con.execute(sql, (path + '/%',))
351
        row = c.fetchone()
352
        tstamp = row[2] if row[2] is not None else 0
353
        return int(row[0]), int(row[1]), int(tstamp)
354
    
355
    def _get_version(self, path, version=None):
356
        if version is None:            
357
            sql = '''select version_id, strftime('%s', tstamp), size, hide from versions where name = ?
358
                        order by version_id desc limit 1'''
359
            c = self.con.execute(sql, (path,))
360
            row = c.fetchone()
361
            if not row or int(row[3]):
362
                raise NameError('Object does not exist')
363
        else:
364
            sql = '''select version_id, strftime('%s', tstamp), size from versions where name = ?
365
                        and version_id = ?'''
366
            c = self.con.execute(sql, (path, version))
367
            row = c.fetchone()
368
            if not row:
369
                raise IndexError('Version does not exist')
370
        return str(row[0]), int(row[1]), int(row[2])
371
    
372
    def _put_version(self, path, size=0, hide=0):
373
        sql = 'insert into versions (name, size, hide) values (?, ?, ?)'
374
        id = self.con.execute(sql, (path, size, hide)).lastrowid
375
        self.con.commit()
376
        return str(id)
377
    
378
    def _copy_version(self, src_path, dest_path, copy_meta=True, copy_data=True, src_version=None):
379
        if src_version is not None:
380
            src_version_id, mtime, size = self._get_version(src_path, src_version)
381
        else:
382
            # Latest or create from scratch.
383
            try:
384
                src_version_id, mtime, size = self._get_version(src_path)
385
            except NameError:
386
                src_version_id = None
387
                size = 0
388
        if not copy_data:
389
            size = 0
390
        dest_version_id = self._put_version(dest_path, size)
391
        if copy_meta and src_version_id is not None:
392
            sql = 'insert into metadata select %s, key, value from metadata where version_id = ?'
393
            sql = sql % dest_version_id
394
            self.con.execute(sql, (src_version_id,))
395
        if copy_data and src_version_id is not None:
396
            sql = 'insert into hashmaps select %s, pos, block_id from hashmaps where version_id = ?'
397
            sql = sql % dest_version_id
398
            self.con.execute(sql, (src_version_id,))
399
        self.con.commit()
400
        return src_version_id, dest_version_id
401
    
402
    def _get_versioninfo(self, account, container, name, until=None):
403
        """Return path, latest version, associated timestamp and size until the timestamp given."""
404
        
405
        p = (account, container, name)
406
        try:
407
            p = p[:p.index(None)]
408
        except ValueError:
409
            pass
410
        path = os.path.join(*p)
411
        sql = '''select version_id, tstamp, size from (%s) where name = ?'''
412
        sql = sql % self._sql_until(until)
413
        c = self.con.execute(sql, (path,))
414
        row = c.fetchone()
415
        if row is None:
416
            raise NameError('Path does not exist')
417
        return path, str(row[0]), int(row[1]), int(row[2])
418
    
419
    def _get_accountinfo(self, account, until=None):
420
        try:
421
            path, version_id, mtime, size = self._get_versioninfo(account, None, None, until)
422
            return version_id, mtime
423
        except:
424
            raise NameError('Account does not exist')
425
    
426
    def _get_containerinfo(self, account, container, until=None):
427
        try:
428
            path, version_id, mtime, size = self._get_versioninfo(account, container, None, until)
429
            return path, version_id, mtime
430
        except:
431
            raise NameError('Container does not exist')
432
    
433
    def _get_objectinfo(self, account, container, name, version=None):
434
        path = os.path.join(account, container, name)
435
        version_id, mtime, size = self._get_version(path, version)
436
        return path, version_id, mtime, size
437
    
438
    def _get_metadata(self, path, version):
439
        sql = 'select key, value from metadata where version_id = ?'
440
        c = self.con.execute(sql, (version,))
441
        return dict(c.fetchall())
442
    
443
    def _put_metadata(self, path, meta, replace=False):
444
        """Create a new version and store metadata."""
445
        
446
        src_version_id, dest_version_id = self._copy_version(path, path, not replace, True)
447
        for k, v in meta.iteritems():
448
            sql = 'insert or replace into metadata (version_id, key, value) values (?, ?, ?)'
449
            self.con.execute(sql, (dest_version_id, k, v))
450
        self.con.commit()
451
    
452
    def _list_objects(self, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], until=None):
453
        cont_prefix = path + '/'
454
        if keys and len(keys) > 0:
455
            sql = '''select distinct o.name, o.version_id from (%s) o, metadata m where o.name like ? and
456
                        m.version_id = o.version_id and m.key in (%s) order by o.name'''
457
            sql = sql % (self._sql_until(until), ', '.join('?' * len(keys)))
458
            param = (cont_prefix + prefix + '%',) + tuple(keys)
459
        else:
460
            sql = 'select name, version_id from (%s) where name like ? order by name'
461
            sql = sql % self._sql_until(until)
462
            param = (cont_prefix + prefix + '%',)
463
        c = self.con.execute(sql, param)
464
        objects = [(x[0][len(cont_prefix):], x[1]) for x in c.fetchall()]
465
        if delimiter:
466
            pseudo_objects = []
467
            for x in objects:
468
                pseudo_name = x[0]
469
                i = pseudo_name.find(delimiter, len(prefix))
470
                if not virtual:
471
                    # If the delimiter is not found, or the name ends
472
                    # with the delimiter's first occurence.
473
                    if i == -1 or len(pseudo_name) == i + len(delimiter):
474
                        pseudo_objects.append(x)
475
                else:
476
                    # If the delimiter is found, keep up to (and including) the delimiter.
477
                    if i != -1:
478
                        pseudo_name = pseudo_name[:i + len(delimiter)]
479
                    if pseudo_name not in [y[0] for y in pseudo_objects]:
480
                        if pseudo_name == x[0]:
481
                            pseudo_objects.append(x)
482
                        else:
483
                            pseudo_objects.append((pseudo_name, None))
484
            objects = pseudo_objects
485
        
486
        start = 0
487
        if marker:
488
            try:
489
                start = [x[0] for x in objects].index(marker) + 1
490
            except ValueError:
491
                pass
492
        if not limit or limit > 10000:
493
            limit = 10000
494
        return objects[start:start + limit]
495
    
496
    def _del_path(self, path):
497
        sql = '''delete from hashmaps where version_id in
498
                    (select version_id from versions where name = ?)'''
499
        self.con.execute(sql, (path,))
500
        sql = '''delete from metadata where version_id in
501
                    (select version_id from versions where name = ?)'''
502
        self.con.execute(sql, (path,))
503
        sql = '''delete from versions where name = ?'''
504
        self.con.execute(sql, (path,))
505
        self.con.commit()