From 2db16f05a0b4b311e4461488ac5568a2bead8506 Mon Sep 17 00:00:00 2001 From: Sofia Papagiannaki Date: Tue, 4 Oct 2011 13:57:21 +0300 Subject: [PATCH] Migration Tools: Progess III Refs #1171 --- pithos/backends/modular.py | 1 + tools/lib/hashmap.py | 9 +-- tools/lib/migrate.py | 5 ++ tools/lib/transfer.py | 12 ++-- tools/migrate-data | 14 +++-- tools/migrate_db | 145 +++++++++++++++++++++++++++++--------------- 6 files changed, 124 insertions(+), 62 deletions(-) diff --git a/pithos/backends/modular.py b/pithos/backends/modular.py index 5ac22c25..c1c0f84 100644 --- a/pithos/backends/modular.py +++ b/pithos/backends/modular.py @@ -86,6 +86,7 @@ class ModularBackend(BaseBackend): __import__(mod) self.mod = sys.modules[mod] + self.db = db self.wrapper = self.mod.dbwrapper.DBWrapper(db) params = {'blocksize': self.block_size, diff --git a/tools/lib/hashmap.py b/tools/lib/hashmap.py index c198742..dae1492 100644 --- a/tools/lib/hashmap.py +++ b/tools/lib/hashmap.py @@ -70,7 +70,8 @@ class HashMap(list): h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)] return h[0] - def load(self, f): - with open(f) as fp: - for block in file_read_iterator(fp, self.blocksize): - self.append(self._hash_block(block)) + def load(self, fp): + self.size = 0 + for block in file_read_iterator(fp, self.blocksize): + self.append(self._hash_block(block)) + self.size += len(block) diff --git a/tools/lib/migrate.py b/tools/lib/migrate.py index 453f4a1..817499e 100644 --- a/tools/lib/migrate.py +++ b/tools/lib/migrate.py @@ -36,6 +36,8 @@ from sqlalchemy import create_engine from sqlalchemy import Table, MetaData +from django.conf import settings + from pithos.backends.modular import ModularBackend class Migration(object): @@ -44,6 +46,9 @@ class Migration(object): self.metadata = MetaData(self.engine) #self.engine.echo = True self.conn = self.engine.connect() + + options = getattr(settings, 'BACKEND', None)[1] + self.backend = ModularBackend(*options) def execute(self): pass \ No newline at end of file diff --git a/tools/lib/transfer.py b/tools/lib/transfer.py index 2d10b5d..8e91ab2 100644 --- a/tools/lib/transfer.py +++ b/tools/lib/transfer.py @@ -40,7 +40,7 @@ from cStringIO import StringIO from client import Fault -def upload(client, file, container, prefix, name=None): +def upload(client, file, container, prefix, name=None, mimetype=None): meta = client.retrieve_container_metadata(container) blocksize = int(meta['x-container-block-size']) @@ -48,18 +48,20 @@ def upload(client, file, container, prefix, name=None): size = os.path.getsize(file) hashes = HashMap(blocksize, blockhash) - hashes.load(file) + hashes.load(open(file)) map = {'bytes': size, 'hashes': [hexlify(x) for x in hashes]} objectname = name if name else os.path.split(file)[-1] object = prefix + objectname + kwargs = {'mimetype':mimetype} if mimetype else {} + v = None try: - client.create_object_by_hashmap(container, object, map) + v = client.create_object_by_hashmap(container, object, map, **kwargs) except Fault, fault: if fault.status != 409: raise else: - return + return v if type(fault.data) == types.StringType: missing = fault.data.split('\n') @@ -76,7 +78,7 @@ def upload(client, file, container, prefix, name=None): block = fp.read(blocksize) client.update_container_data(container, StringIO(block)) - client.create_object_by_hashmap(container, object, map) + return client.create_object_by_hashmap(container, object, map, **kwargs) def download(client, container, object, file): diff --git a/tools/migrate-data b/tools/migrate-data index 7f93a8f..f4cef1e 100755 --- a/tools/migrate-data +++ b/tools/migrate-data @@ -46,14 +46,17 @@ from lib.hashmap import HashMap from lib.migrate import Migration class DataMigration(Migration): - def __init__(self, db): - Migration.__init__(self, db) + def __init__(self, pithosdb, db): + Migration.__init__(self, pithosdb) # XXX Need more columns for primary key - last modified timestamp... + engine = create_engine(db) + metadata = MetaData(engine) + columns=[] columns.append(Column('path', String(2048), primary_key=True)) columns.append(Column('hash', String(255))) - self.files = Table('files', self.metadata, *columns) - self.metadata.create_all(self.engine) + self.files = Table('files', metadata, *columns) + metadata.create_all(engine) def cache_put(self, path, hash): # Insert or replace. @@ -108,7 +111,8 @@ class DataMigration(Migration): print status if __name__ == "__main__": + pithosdb = '' db = 'sqlite:///migrate.db' - dt = DataMigration(db) + dt = DataMigration(pithosdb, db) dt.execute() diff --git a/tools/migrate_db b/tools/migrate_db index 1e9a499..2b4a035 100755 --- a/tools/migrate_db +++ b/tools/migrate_db @@ -39,14 +39,17 @@ from sqlalchemy.sql import select from binascii import hexlify from pithos.backends.lib.hashfiler import Blocker +from pithos.backends.lib.sqlalchemy import Node from pithos.aai.models import PithosUser from django.conf import settings -from pithos.backends.modular import ModularBackend +from pithos.backends.modular import CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED +from pithos.backends.lib.sqlalchemy.node import Node +from pithos.backends.lib.sqlalchemy.dbwrapper import DBWrapper from lib.transfer import upload -from lib.hashmap import HashMap +from lib.hashmap import HashMap, file_read_iterator from lib.client import Fault from lib.migrate import Migration @@ -54,12 +57,15 @@ import json import os import sys import hashlib +import mimetypes -class ObjectMigration(DataMigration): - def __init__(self, db, path, block_size, hash_algorithm): - DataMigration.__init__(self, db, path, block_size, hash_algorithm) - self.wrapper = ClientWrapper() - +class ObjectMigration(Migration): + def __init__(self, old_db): + Migration.__init__(self, old_db) + self.wrapper = ClientWrapper(self.backend) + params = {'wrapper': DBWrapper(self.backend.db)} + self.node = Node(**params) + def create_default_containers(self): users = PithosUser.objects.all() for u in users: @@ -81,41 +87,88 @@ class ObjectMigration(DataMigration): else: return '%s/%s' %(self.get_path(parent_id), foldername) + def create_object(self, username, container, object, filepath, mimetype): + obj = '' + path = '/'.join(object.split('/')[:-1]) + name = object.split('/')[-1] + #create directory markers + for f in path.split('/'): + obj = '%s/%s' %(obj, f) if obj else f + try: + self.wrapper.create_directory_marker('pithos', obj, username) + except NameError, e: + pass + self.wrapper.set_account(username) + + prefix = '%s/' %path if path else '' + print '#', filepath, container, prefix, name, mimetype + return upload(self.wrapper, filepath, container, prefix, name, mimetype) + + def create_history(self, user, header_id, node_id, deleted=False): + filebody = Table('filebody', self.metadata, autoload=True) + gss_user = Table('gss_user', self.metadata, autoload=True) + j = filebody.join(gss_user, filebody.c.modifiedby_id == gss_user.c.id) + s = select([filebody.c.filesize, gss_user.c.username], from_obj=j) + s = s.where(filebody.c.header_id == header_id) + s = s.order_by(filebody.c.version) + rp = self.conn.execute(s) + versions = rp.fetchall() + print '#', len(versions) + rp.close() + i = 0 + for size, modyfied_by in versions: + cluster = CLUSTER_HISTORY if i < len(versions) - 1 else CLUSTER_NORMAL + cluster = cluster if not deleted else CLUSTER_DELETED + args = (node_id, size, None, modyfied_by, cluster) + self.node.version_create(*args) + i += 1 + def create_objects(self): fileheader = Table('fileheader', self.metadata, autoload=True) filebody = Table('filebody', self.metadata, autoload=True) folder = Table('folder', self.metadata, autoload=True) gss_user = Table('gss_user', self.metadata, autoload=True) - j = filebody.join(fileheader, filebody.c.header_id == fileheader.c.id) + j = filebody.join(fileheader, filebody.c.id == fileheader.c.currentbody_id) j = j.join(folder, fileheader.c.folder_id == folder.c.id) j = j.join(gss_user, fileheader.c.owner_id == gss_user.c.id) - s = select([gss_user.c.username, fileheader.c.folder_id, fileheader.c.name, - filebody.c.storedfilepath], from_obj=j) + s = select([gss_user.c.username, fileheader.c.id, fileheader.c.folder_id, + fileheader.c.name, fileheader.c.deleted, filebody.c.storedfilepath, + filebody.c.mimetype], from_obj=j) rp = self.conn.execute(s) objects = rp.fetchall() - for username, folderid, filename, filepath in objects: + for username, headerid, folderid, filename, deleted, filepath, mimetype in objects: path = self.get_path(folderid)[1:] - obj = '' - #create directory markers - for f in path.split('/'): - obj = '%s/%s' %(obj, f) if obj else f - try: - self.wrapper.create_directory_marker('pithos', obj, username) - except NameError, e: - pass - self.wrapper.set_account(username) - - print '#', username, path, filename - prefix = '%s/' %path if path else '' - upload(self.wrapper, filepath, 'pithos', prefix, filename) + container = 'pithos' if not deleted else 'trash' + object = '%s/%s' %(path, filename) + #filepath = '/Users/butters/Downloads/torvalds-linux-0f86267' + vserial = self.create_object(username, container, object, filepath, mimetype) + nodeid = self.node.version_get_properties(vserial, keys=('node',))[0] + self.create_history(username, headerid, nodeid, deleted) + self.node.version_remove(vserial) + #self.set_metadata() + #self.set_public() + #self.statistics() + #self.set_permissions() + + def handle_deleted(self): + pass + + def upload_dir(self, dir, prefix, user, container): + for f in os.listdir(dir): + fullpath = '%s/%s' %(dir, f) + if os.path.isfile(fullpath): + type = mimetypes.guess_type(fullpath)[0] + name = '/'.join(fullpath.split(prefix)[1:]) + print '@', user, container, name, fullpath, type + self.create_object(user, container, name, fullpath, type) + else: self.upload_dir(fullpath, prefix, user, container) class ClientWrapper(object): """Wraps client methods used by transfer.upload() to ModularBackend methods""" - def __init__(self): - options = getattr(settings, 'BACKEND', None)[1] - self.backend = ModularBackend(*options) + def __init__(self, backend): + self.backend = backend self.block_size = self.backend.block_size self.block_hash = self.backend.hash_algorithm @@ -131,42 +184,38 @@ class ClientWrapper(object): 'hash': md5.hexdigest().lower()} self.backend.update_object_hashmap(account, account, container, object, 0, [], meta) - def create_object_by_hashmap(self, container, object, map): + def create_object_by_hashmap(self, container, object, map, mimetype=None): hashmap = HashMap(self.block_size, self.block_hash) - for hash in map['hashes']: - hashmap.append(hash) + for h in map['hashes']: + hashmap.append(h) meta = {'hash':hexlify(hashmap.hash())} + if mimetype: + meta['content-type'] = mimetype size = map['bytes'] try: args = [self.account, self.account, container, object, size, map['hashes'], meta] - self.backend.update_object_hashmap(*args) + return self.backend.update_object_hashmap(*args) except IndexError, ie: fault = Fault(ie.data, 409) raise fault - def create_object(self, container, object, f): - hashmap = HashMap(self.block_size, self.block_hash) - hashmap.load(f) - map = [hexlify(x) for x in hashmap] - meta = {'hash':hashmap.hash()} - size = hashmap.size - self.backend.update_object_hashmap(self.account, self.account, container, object, size, hashmap, meta) + def update_container_data(self, container, f): + #just put the blocks + for block in file_read_iterator(f, self.block_size): + self.backend.put_block(block) def retrieve_container_metadata(self, container): return {'x-container-block-size':self.block_size, 'x-container-block-hash':self.block_hash} - + if __name__ == "__main__": - db = '' + old_db = '' - basepath = options = getattr(settings, 'PROJECT_PATH', None) - params = {'db':db, - 'path':os.path.join(basepath, 'data/pithos/'), - 'block_size':(4 * 1024 * 1024), - 'hash_algorithm':'sha256'} + ot = ObjectMigration(old_db) + #ot.create_default_containers() + #ot.create_objects() - ot = ObjectMigration(**params) - ot.create_default_containers() - ot.create_objects() + p = '' + ot.upload_dir(p, p, 'chstath', 'linux') \ No newline at end of file -- 1.7.10.4