Revision a156c8b3 pithos/backends/simple.py
b/pithos/backends/simple.py | ||
---|---|---|
35 | 35 |
import time |
36 | 36 |
import sqlite3 |
37 | 37 |
import logging |
38 |
import types |
|
39 | 38 |
import hashlib |
40 |
import shutil |
|
41 |
import pickle |
|
39 |
import binascii |
|
42 | 40 |
|
43 | 41 |
from base import NotAllowedError, BaseBackend |
42 |
from pithos.lib.hashfiler import Mapper, Blocker |
|
44 | 43 |
|
45 | 44 |
|
46 | 45 |
logger = logging.getLogger(__name__) |
... | ... | |
52 | 51 |
Uses SQLite for storage. |
53 | 52 |
""" |
54 | 53 |
|
55 |
# TODO: Automatic/manual clean-up after a time interval. |
|
56 |
|
|
57 | 54 |
def __init__(self, db): |
58 |
self.hash_algorithm = 'sha1'
|
|
59 |
self.block_size = 128 * 1024 # 128KB
|
|
55 |
self.hash_algorithm = 'sha256'
|
|
56 |
self.block_size = 4 * 1024 * 1024 # 4MB
|
|
60 | 57 |
|
61 | 58 |
self.default_policy = {'quota': 0, 'versioning': 'auto'} |
62 | 59 |
|
63 | 60 |
basepath = os.path.split(db)[0] |
64 | 61 |
if basepath and not os.path.exists(basepath): |
65 | 62 |
os.makedirs(basepath) |
63 |
if not os.path.isdir(basepath): |
|
64 |
raise RuntimeError("Cannot open database at '%s'" % (db,)) |
|
66 | 65 |
|
67 |
self.con = sqlite3.connect(db, check_same_thread=False)
|
|
66 |
self.con = sqlite3.connect(basepath + '/db', check_same_thread=False)
|
|
68 | 67 |
|
69 | 68 |
sql = '''pragma foreign_keys = on''' |
70 | 69 |
self.con.execute(sql) |
... | ... | |
85 | 84 |
foreign key (version_id) references versions(version_id) |
86 | 85 |
on delete cascade)''' |
87 | 86 |
self.con.execute(sql) |
88 |
sql = '''create table if not exists hashmaps ( |
|
89 |
version_id integer, |
|
90 |
pos integer, |
|
91 |
block_id text, |
|
92 |
primary key (version_id, pos) |
|
93 |
foreign key (version_id) references versions(version_id) |
|
94 |
on delete cascade)''' |
|
95 |
self.con.execute(sql) |
|
96 |
sql = '''create table if not exists blocks ( |
|
97 |
block_id text, data blob, primary key (block_id))''' |
|
98 |
self.con.execute(sql) |
|
99 | 87 |
|
100 | 88 |
sql = '''create table if not exists policy ( |
101 | 89 |
name text, key text, value text, primary key (name, key))''' |
... | ... | |
111 | 99 |
name text, primary key (name))''' |
112 | 100 |
self.con.execute(sql) |
113 | 101 |
self.con.commit() |
102 |
|
|
103 |
params = {'blocksize': self.block_size, |
|
104 |
'blockpath': basepath + '/blocks', |
|
105 |
'hashtype': self.hash_algorithm} |
|
106 |
self.blocker = Blocker(**params) |
|
107 |
|
|
108 |
params = {'mappath': basepath + '/maps', |
|
109 |
'namelen': self.blocker.hashlen} |
|
110 |
self.mapper = Mapper(**params) |
|
114 | 111 |
|
115 | 112 |
def get_account_meta(self, user, account, until=None): |
116 | 113 |
"""Return a dictionary with the account metadata.""" |
... | ... | |
154 | 151 |
logger.debug("update_account_meta: %s %s %s", account, meta, replace) |
155 | 152 |
if user != account: |
156 | 153 |
raise NotAllowedError |
157 |
self._put_metadata(user, account, meta, replace) |
|
154 |
self._put_metadata(user, account, meta, replace, False)
|
|
158 | 155 |
|
159 | 156 |
def get_account_groups(self, user, account): |
160 | 157 |
"""Return a dictionary with the user groups defined for this account.""" |
... | ... | |
254 | 251 |
if user != account: |
255 | 252 |
raise NotAllowedError |
256 | 253 |
path, version_id, mtime = self._get_containerinfo(account, container) |
257 |
self._put_metadata(user, path, meta, replace) |
|
254 |
self._put_metadata(user, path, meta, replace, False)
|
|
258 | 255 |
|
259 | 256 |
def get_container_policy(self, user, account, container): |
260 | 257 |
"""Return a dictionary with the container policy.""" |
... | ... | |
329 | 326 |
self.con.execute(sql, (path, path + '/%',)) |
330 | 327 |
sql = 'delete from policy where name = ?' |
331 | 328 |
self.con.execute(sql, (path,)) |
332 |
self._copy_version(user, account, account, True, True) # New account version (for timestamp update).
|
|
329 |
self._copy_version(user, account, account, True, False) # New account version (for timestamp update).
|
|
333 | 330 |
|
334 | 331 |
def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], until=None): |
335 | 332 |
"""Return a list of objects existing under a container.""" |
... | ... | |
421 | 418 |
logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version) |
422 | 419 |
self._can_read(user, account, container, name) |
423 | 420 |
path, version_id, muser, mtime, size = self._get_objectinfo(account, container, name, version) |
424 |
sql = 'select block_id from hashmaps where version_id = ? order by pos asc' |
|
425 |
c = self.con.execute(sql, (version_id,)) |
|
426 |
hashmap = [x[0] for x in c.fetchall()] |
|
427 |
return size, hashmap |
|
421 |
hashmap = self.mapper.map_retr(version_id) |
|
422 |
return size, [binascii.hexlify(x) for x in hashmap] |
|
428 | 423 |
|
429 | 424 |
def update_object_hashmap(self, user, account, container, name, size, hashmap, meta={}, replace_meta=False, permissions=None): |
430 | 425 |
"""Create/update an object with the specified size and partial hashes.""" |
... | ... | |
433 | 428 |
if permissions is not None and user != account: |
434 | 429 |
raise NotAllowedError |
435 | 430 |
self._can_write(user, account, container, name) |
436 |
missing = [] |
|
437 |
for i in range(len(hashmap)): |
|
438 |
sql = 'select count(*) from blocks where block_id = ?' |
|
439 |
c = self.con.execute(sql, (hashmap[i],)) |
|
440 |
if c.fetchone()[0] == 0: |
|
441 |
missing.append(hashmap[i]) |
|
431 |
missing = self.blocker.block_ping([binascii.unhexlify(x) for x in hashmap]) |
|
442 | 432 |
if missing: |
443 | 433 |
ie = IndexError() |
444 | 434 |
ie.data = missing |
... | ... | |
450 | 440 |
src_version_id, dest_version_id = self._copy_version(user, path, path, not replace_meta, False) |
451 | 441 |
sql = 'update versions set size = ? where version_id = ?' |
452 | 442 |
self.con.execute(sql, (size, dest_version_id)) |
453 |
# TODO: Check for block_id existence. |
|
454 |
for i in range(len(hashmap)): |
|
455 |
sql = 'insert or replace into hashmaps (version_id, pos, block_id) values (?, ?, ?)' |
|
456 |
self.con.execute(sql, (dest_version_id, i, hashmap[i])) |
|
443 |
self.mapper.map_stor(dest_version_id, [binascii.unhexlify(x) for x in hashmap]) |
|
457 | 444 |
for k, v in meta.iteritems(): |
458 | 445 |
sql = 'insert or replace into metadata (version_id, key, value) values (?, ?, ?)' |
459 | 446 |
self.con.execute(sql, (dest_version_id, k, v)) |
... | ... | |
536 | 523 |
"""Return a block's data.""" |
537 | 524 |
|
538 | 525 |
logger.debug("get_block: %s", hash) |
539 |
c = self.con.execute('select data from blocks where block_id = ?', (hash,)) |
|
540 |
row = c.fetchone() |
|
541 |
if row: |
|
542 |
return str(row[0]) |
|
543 |
else: |
|
526 |
blocks = self.blocker.block_retr((binascii.unhexlify(hash),)) |
|
527 |
if not blocks: |
|
544 | 528 |
raise NameError('Block does not exist') |
529 |
return blocks[0] |
|
545 | 530 |
|
546 | 531 |
def put_block(self, data): |
547 | 532 |
"""Create a block and return the hash.""" |
548 | 533 |
|
549 | 534 |
logger.debug("put_block: %s", len(data)) |
550 |
h = hashlib.new(self.hash_algorithm) |
|
551 |
h.update(data.rstrip('\x00')) |
|
552 |
hash = h.hexdigest() |
|
553 |
sql = 'insert or ignore into blocks (block_id, data) values (?, ?)' |
|
554 |
self.con.execute(sql, (hash, buffer(data))) |
|
555 |
self.con.commit() |
|
556 |
return hash |
|
535 |
hashes, absent = self.blocker.block_stor((data,)) |
|
536 |
return binascii.hexlify(hashes[0]) |
|
557 | 537 |
|
558 | 538 |
def update_block(self, hash, data, offset=0): |
559 | 539 |
"""Update a known block and return the hash.""" |
... | ... | |
561 | 541 |
logger.debug("update_block: %s %s %s", hash, len(data), offset) |
562 | 542 |
if offset == 0 and len(data) == self.block_size: |
563 | 543 |
return self.put_block(data) |
564 |
src_data = self.get_block(hash) |
|
565 |
bs = self.block_size |
|
566 |
if offset < 0 or offset > bs or offset + len(data) > bs: |
|
567 |
raise IndexError('Offset or data outside block limits') |
|
568 |
dest_data = src_data[:offset] + data + src_data[offset + len(data):] |
|
569 |
return self.put_block(dest_data) |
|
544 |
h, e = self.blocker.block_delta(binascii.unhexlify(hash), ((offset, data),)) |
|
545 |
return binascii.hexlify(h) |
|
570 | 546 |
|
571 | 547 |
def _sql_until(self, until=None): |
572 | 548 |
"""Return the sql to get the latest versions until the timestamp given.""" |
... | ... | |
631 | 607 |
sql = sql % dest_version_id |
632 | 608 |
self.con.execute(sql, (src_version_id,)) |
633 | 609 |
if copy_data and src_version_id is not None: |
634 |
sql = 'insert into hashmaps select %s, pos, block_id from hashmaps where version_id = ?'
|
|
635 |
sql = sql % dest_version_id
|
|
636 |
self.con.execute(sql, (src_version_id,))
|
|
610 |
# TODO: Copy properly.
|
|
611 |
hashmap = self.mapper.map_retr(src_version_id)
|
|
612 |
self.mapper.map_stor(dest_version_id, hashmap)
|
|
637 | 613 |
self.con.commit() |
638 | 614 |
return src_version_id, dest_version_id |
639 | 615 |
|
... | ... | |
678 | 654 |
c = self.con.execute(sql, (version,)) |
679 | 655 |
return dict(c.fetchall()) |
680 | 656 |
|
681 |
def _put_metadata(self, user, path, meta, replace=False): |
|
657 |
def _put_metadata(self, user, path, meta, replace=False, copy_data=True):
|
|
682 | 658 |
"""Create a new version and store metadata.""" |
683 | 659 |
|
684 |
src_version_id, dest_version_id = self._copy_version(user, path, path, not replace, True)
|
|
660 |
src_version_id, dest_version_id = self._copy_version(user, path, path, not replace, copy_data)
|
|
685 | 661 |
for k, v in meta.iteritems(): |
686 | 662 |
if not replace and v == '': |
687 | 663 |
sql = 'delete from metadata where version_id = ? and key = ?' |
... | ... | |
865 | 841 |
return objects[start:start + limit] |
866 | 842 |
|
867 | 843 |
def _del_version(self, version): |
868 |
sql = 'delete from hashmaps where version_id = ?' |
|
869 |
self.con.execute(sql, (version,)) |
|
844 |
self.mapper.map_remv(version) |
|
870 | 845 |
sql = 'delete from versions where version_id = ?' |
871 | 846 |
self.con.execute(sql, (version,)) |
Also available in: Unified diff