Statistics
| Branch: | Tag: | Revision:

root / pithos / backends / simple.py @ cfe6939d

History | View | Annotate | Download (19.5 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
# 
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
# 
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
# 
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
# 
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
# 
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import os
35
import time
36
import sqlite3
37
import logging
38
import types
39
import hashlib
40
import shutil
41
import pickle
42

    
43
from base import BaseBackend
44

    
45

    
46
logger = logging.getLogger(__name__)
47

    
48

    
49
class SimpleBackend(BaseBackend):
50
    """A simple backend.
51
    
52
    Uses SQLite for storage.
53
    """
54
    
55
    def __init__(self, db):
56
        self.hash_algorithm = 'sha1'
57
        self.block_size = 128 * 1024 # 128KB
58
        
59
        basepath = os.path.split(db)[0]
60
        if basepath and not os.path.exists(basepath):
61
            os.makedirs(basepath)
62
        
63
        self.con = sqlite3.connect(db)
64
        sql = '''create table if not exists objects (
65
                    name text, tstamp text, primary key (name))'''
66
        self.con.execute(sql)
67
        sql = '''create table if not exists metadata (
68
                    name text, key text, value text, primary key (name, key))'''
69
        self.con.execute(sql)
70
        sql = '''create table if not exists versions (
71
                    object_id int, version int, size int, primary key (object_id, version))'''
72
        self.con.execute(sql)
73
        sql = '''create table if not exists blocks (
74
                    block_id text, data blob, primary key (block_id))'''
75
        self.con.execute(sql)
76
        sql = '''create table if not exists hashmaps (
77
                    version_id int, pos int, block_id text, primary key (version_id, pos))'''
78
        self.con.execute(sql)
79
        self.con.commit()
80
    
81
    def get_account_meta(self, account):
82
        """Return a dictionary with the account metadata."""
83
        
84
        logger.debug("get_account_meta: %s", account)
85
        count, bytes = self._get_pathstats(account)
86
        
87
        # Proper count.
88
        sql = 'select count(name) from objects where name glob ? and not name glob ?'
89
        c = self.con.execute(sql, (account + '/*', account + '/*/*'))
90
        row = c.fetchone()
91
        count = row[0]
92
        
93
        meta = self._get_metadata(account)
94
        meta.update({'name': account, 'count': count, 'bytes': bytes})
95
        return meta
96
    
97
    def update_account_meta(self, account, meta, replace=False):
98
        """Update the metadata associated with the account."""
99
        
100
        logger.debug("update_account_meta: %s %s %s", account, meta, replace)
101
        self._update_metadata(account, None, None, meta, replace)
102
    
103
    def put_container(self, account, name):
104
        """Create a new container with the given name."""
105
        
106
        logger.debug("put_container: %s %s", account, name)
107
        try:
108
            path, link, tstamp = self._get_containerinfo(account, name)
109
        except NameError:
110
            path = os.path.join(account, name)
111
            link = self._put_linkinfo(path)
112
        else:
113
            raise NameError('Container already exists')
114
        self._update_metadata(account, name, None, None)
115
    
116
    def delete_container(self, account, name):
117
        """Delete the container with the given name."""
118
        
119
        logger.debug("delete_container: %s %s", account, name)
120
        path, link, tstamp = self._get_containerinfo(account, name)
121
        count, bytes = self._get_pathstats(path)
122
        if count > 0:
123
            raise IndexError('Container is not empty')
124
        self._del_path(path)
125
        self._update_metadata(account, None, None, None)
126
    
127
    def get_container_meta(self, account, name):
128
        """Return a dictionary with the container metadata."""
129
        
130
        logger.debug("get_container_meta: %s %s", account, name)
131
        path, link, tstamp = self._get_containerinfo(account, name)
132
        count, bytes = self._get_pathstats(path)
133
        meta = self._get_metadata(path)
134
        meta.update({'name': name, 'count': count, 'bytes': bytes, 'created': tstamp})
135
        return meta
136
    
137
    def update_container_meta(self, account, name, meta, replace=False):
138
        """Update the metadata associated with the container."""
139
        
140
        logger.debug("update_container_meta: %s %s %s %s", account, name, meta, replace)
141
        path, link, tstamp = self._get_containerinfo(account, name)
142
        self._update_metadata(account, name, None, meta, replace)
143
    
144
    def list_containers(self, account, marker=None, limit=10000):
145
        """Return a list of containers existing under an account."""
146
        
147
        logger.debug("list_containers: %s %s %s", account, marker, limit)
148
        return self._list_objects(account, '', '/', marker, limit, False, [])
149
    
150
    def list_objects(self, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[]):
151
        """Return a list of objects existing under a container."""
152
        
153
        logger.debug("list_objects: %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit)
154
        path, link, tstamp = self._get_containerinfo(account, container)
155
        return self._list_objects(path, prefix, delimiter, marker, limit, virtual, keys)
156
    
157
    def list_object_meta(self, account, name):
158
        """Return a list with all the container's object meta keys."""
159
        
160
        logger.debug("list_object_meta: %s %s", account, name)
161
        path, link, tstamp = self._get_containerinfo(account, name)
162
        sql = 'select distinct key from metadata where name like ?'
163
        c = self.con.execute(sql, (path + '/%',))
164
        return [x[0] for x in c.fetchall()]
165
    
166
    def get_object_meta(self, account, container, name):
167
        """Return a dictionary with the object metadata."""
168
        
169
        logger.debug("get_object_meta: %s %s %s", account, container, name)
170
        path, link, tstamp = self._get_containerinfo(account, container)
171
        path, link, tstamp, version, size = self._get_objectinfo(account, container, name)
172
        meta = self._get_metadata(path)
173
        meta.update({'name': name, 'bytes': size, 'version': version, 'created': tstamp})
174
        return meta
175
    
176
    def update_object_meta(self, account, container, name, meta, replace=False):
177
        """Update the metadata associated with the object."""
178
        
179
        logger.debug("update_object_meta: %s %s %s %s %s", account, container, name, meta, replace)
180
        path, link, tstamp = self._get_containerinfo(account, container)
181
        path, link, tstamp, version, size = self._get_objectinfo(account, container, name)
182
        if 'versioned' in meta:
183
            if meta['versioned']:
184
                if version == 0:
185
                    sql = 'update versions set version = 1 where object_id = ?'
186
                    self.con.execute(sql, (link,))
187
                    self.con.commit()
188
            else:
189
                if version > 0:
190
                    self._del_uptoversion(link, version)
191
                    sql = 'update versions set version = 0 where object_id = ?'
192
                    self.con.execute(sql, (link,))
193
                    self.con.commit()
194
            del(meta['versioned'])
195
        self._update_metadata(account, container, name, meta, replace)
196
    
197
    def get_object_hashmap(self, account, container, name, version=None):
198
        """Return the object's size and a list with partial hashes."""
199
        
200
        logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
201
        path, link, tstamp = self._get_containerinfo(account, container)
202
        path, link, tstamp, version, size = self._get_objectinfo(account, container, name, version)
203
        
204
        sql = '''select block_id from hashmaps where version_id =
205
                    (select rowid from versions where object_id = ? and version = ?)
206
                    order by pos'''
207
        c = self.con.execute(sql, (link, version))
208
        hashmap = [x[0] for x in c.fetchall()]
209
        return size, hashmap
210
    
211
    def update_object_hashmap(self, account, container, name, size, hashmap):
212
        """Create/update an object with the specified size and partial hashes."""
213
        
214
        logger.debug("update_object_hashmap: %s %s %s %s %s", account, container, name, size, hashmap)
215
        path, link, tstamp = self._get_containerinfo(account, container)
216
        try:
217
            path, link, tstamp, version, s = self._get_objectinfo(account, container, name)
218
        except NameError:
219
            version = 0
220
        
221
        if version == 0:
222
            path = os.path.join(account, container, name)
223
            
224
            self._del_path(path, delmeta=False)
225
            link = self._put_linkinfo(path)
226
        else:
227
            version += 1
228
        
229
        sql = 'insert or replace into versions (object_id, version, size) values (?, ?, ?)'
230
        version_id = self.con.execute(sql, (link, version, size)).lastrowid
231
        for i in range(len(hashmap)):
232
            sql = 'insert or replace into hashmaps (version_id, pos, block_id) values (?, ?, ?)'
233
            self.con.execute(sql, (version_id, i, hashmap[i]))
234
        self.con.commit()
235
    
236
    def copy_object(self, account, src_container, src_name, dest_container, dest_name, dest_meta={}, replace_meta=False):
237
        """Copy an object's data and metadata."""
238
        
239
        logger.debug("copy_object: %s %s %s %s %s %s %s", account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta)
240
        size, hashmap = self.get_object_hashmap(account, src_container, src_name)
241
        self.update_object_hashmap(account, dest_container, dest_name, size, hashmap)
242
        if not replace_meta:
243
            meta = self._get_metadata(os.path.join(account, src_container, src_name))
244
            meta.update(dest_meta)
245
        else:
246
            meta = dest_meta
247
        self._update_metadata(account, dest_container, dest_name, meta, replace_meta)
248
    
249
    def move_object(self, account, src_container, src_name, dest_container, dest_name, dest_meta={}, replace_meta=False):
250
        """Move an object's data and metadata."""
251
        
252
        logger.debug("move_object: %s %s %s %s %s %s %s", account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta)
253
        self.copy_object(account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta)
254
        self.delete_object(account, src_container, src_name)
255
    
256
    def delete_object(self, account, container, name):
257
        """Delete an object."""
258
        
259
        logger.debug("delete_object: %s %s %s", account, container, name)
260
        path, link, tstamp = self._get_containerinfo(account, container)
261
        path = os.path.join(account, container, name)
262
        link, tstamp = self._get_linkinfo(path)
263
        self._del_path(path)
264
        self._update_metadata(account, container, None, None)
265
    
266
    def get_block(self, hash):
267
        """Return a block's data."""
268
        
269
        logger.debug("get_block: %s", hash)
270
        c = self.con.execute('select data from blocks where block_id = ?', (hash,))
271
        row = c.fetchone()
272
        if row:
273
            return str(row[0])
274
        else:
275
            raise NameError('Block does not exist')
276
    
277
    def put_block(self, data):
278
        """Create a block and return the hash."""
279
        
280
        logger.debug("put_block: %s", len(data))
281
        h = hashlib.new(self.hash_algorithm)
282
        h.update(data.rstrip('\x00'))
283
        hash = h.hexdigest()
284
        sql = 'insert or ignore into blocks (block_id, data) values (?, ?)'
285
        self.con.execute(sql, (hash, buffer(data)))
286
        self.con.commit()
287
        return hash
288
    
289
    def update_block(self, hash, data, offset=0):
290
        """Update a known block and return the hash."""
291
        
292
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
293
        if offset == 0 and len(data) == self.block_size:
294
            return self.put_block(data)
295
        src_data = self.get_block(hash)
296
        bs = self.block_size
297
        if offset < 0 or offset > bs or offset + len(data) > bs:
298
            raise IndexError('Offset or data outside block limits')
299
        dest_data = src_data[:offset] + data + src_data[offset + len(data):]
300
        return self.put_block(dest_data)
301
    
302
    def _get_linkinfo(self, path):
303
        c = self.con.execute('select rowid, tstamp from objects where name = ?', (path,))
304
        row = c.fetchone()
305
        if row:
306
            return str(row[0]), str(row[1])
307
        else:
308
            raise NameError('Object does not exist')
309
    
310
    def _put_linkinfo(self, path):
311
        sql = 'insert into objects (name, tstamp) values (?, ?)'
312
        id = self.con.execute(sql, (path, int(time.time()))).lastrowid
313
        self.con.commit()
314
        return str(id)
315
    
316
    def _get_containerinfo(self, account, container):
317
        path = os.path.join(account, container)
318
        try:
319
            link, tstamp = self._get_linkinfo(path)
320
        except NameError:
321
            raise NameError('Container does not exist')
322
        return path, link, tstamp
323
    
324
    def _get_objectinfo(self, account, container, name, version=None):
325
        path = os.path.join(account, container, name)
326
        link, tstamp = self._get_linkinfo(path)
327
        if not version: # If zero or None.
328
            sql = '''select version, size from versions v,
329
                        (select object_id, max(version) as m from versions
330
                            where object_id = ? group by object_id) as g
331
                        where v.object_id = g.object_id and v.version = g.m'''
332
            c = self.con.execute(sql, (link,))
333
        else:
334
            sql = 'select version, size from versions where object_id = ? and version = ?'
335
            c = self.con.execute(sql, (link, version))
336
        row = c.fetchone()
337
        if not row:
338
            raise IndexError('Version does not exist')
339
        
340
        return path, link, tstamp, int(row[0]), int(row[1])
341
    
342
    def _get_pathstats(self, path):
343
        """Return count and sum of size of all objects under path."""
344
        
345
        sql = '''select count(o), total(size) from (
346
                    select v.object_id as o, v.size from versions v,
347
                        (select object_id, max(version) as m from versions where object_id in
348
                            (select rowid from objects where name like ?) group by object_id) as g
349
                        where v.object_id = g.object_id and v.version = g.m
350
                    union
351
                    select rowid as o, 0 as size from objects where name like ?
352
                        and rowid not in (select object_id from versions))'''
353
        c = self.con.execute(sql, (path + '/%', path + '/%'))
354
        row = c.fetchone()
355
        return int(row[0]), int(row[1])
356
    
357
    def _list_objects(self, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[]):
358
        cont_prefix = path + '/'
359
        if keys and len(keys) > 0:
360
            sql = '''select o.name from objects o, metadata m where o.name like ? and
361
                        m.name = o.name and m.key in (%s) order by o.name'''
362
            sql = sql % ', '.join('?' * len(keys))
363
            param = (cont_prefix + prefix + '%',) + tuple(keys)
364
        else:
365
            sql = 'select name from objects where name like ? order by name'
366
            param = (cont_prefix + prefix + '%',)
367
        c = self.con.execute(sql, param)
368
        objects = [x[0][len(cont_prefix):] for x in c.fetchall()]
369
        if delimiter:
370
            pseudo_objects = []
371
            for x in objects:
372
                pseudo_name = x
373
                i = pseudo_name.find(delimiter, len(prefix))
374
                if not virtual:
375
                    # If the delimiter is not found, or the name ends
376
                    # with the delimiter's first occurence.
377
                    if i == -1 or len(pseudo_name) == i + len(delimiter):
378
                        pseudo_objects.append(pseudo_name)
379
                else:
380
                    # If the delimiter is found, keep up to (and including) the delimiter.
381
                    if i != -1:
382
                        pseudo_name = pseudo_name[:i + len(delimiter)]
383
                    if pseudo_name not in pseudo_objects:
384
                        pseudo_objects.append(pseudo_name)
385
            objects = pseudo_objects
386
        
387
        start = 0
388
        if marker:
389
            try:
390
                start = objects.index(marker) + 1
391
            except ValueError:
392
                pass
393
        if not limit or limit > 10000:
394
            limit = 10000
395
        return objects[start:start + limit]
396
    
397
    def _get_metadata(self, path):
398
        sql = 'select key, value from metadata where name = ?'
399
        c = self.con.execute(sql, (path,))
400
        return dict(c.fetchall())
401
    
402
    def _put_metadata(self, path, meta, replace=False):
403
        if replace:
404
            sql = 'delete from metadata where name = ?'
405
            self.con.execute(sql, (path,))
406
        for k, v in meta.iteritems():
407
            sql = 'insert or replace into metadata (name, key, value) values (?, ?, ?)'
408
            self.con.execute(sql, (path, k, v))
409
        self.con.commit()
410
    
411
    def _update_metadata(self, account, container, name, meta, replace=False):
412
        """Recursively update metadata and set modification time."""
413
        
414
        modified = {'modified': int(time.time())}
415
        if not meta:
416
            meta = {}
417
        meta.update(modified)
418
        path = (account, container, name)
419
        for x in reversed(range(3)):
420
            if not path[x]:
421
                continue
422
            self._put_metadata(os.path.join(*path[:x+1]), meta, replace)
423
            break
424
        for y in reversed(range(x)):
425
            self._put_metadata(os.path.join(*path[:y+1]), modified)
426
    
427
    def _del_uptoversion(self, link, version):
428
        sql = '''delete from hashmaps where version_id
429
                    (select rowid from versions where object_id = ? and version < ?)'''
430
        self.con.execute(sql, (link, version))
431
        self.con.execute('delete from versions where object_id = ?', (link,))
432
        self.con.commit()
433
    
434
    def _del_path(self, path, delmeta=True):
435
        sql = '''delete from hashmaps where version_id in
436
                    (select rowid from versions where object_id in
437
                    (select rowid from objects where name = ?))'''
438
        self.con.execute(sql, (path,))
439
        sql = '''delete from versions where object_id in
440
                    (select rowid from objects where name = ?)'''
441
        self.con.execute(sql, (path,))
442
        self.con.execute('delete from objects where name = ?', (path,))
443
        if delmeta:
444
            self.con.execute('delete from metadata where name = ?', (path,))
445
        self.con.commit()