Migration Tools: Progress IV
authorSofia Papagiannaki <papagian@gmail.com>
Tue, 18 Oct 2011 17:48:29 +0000 (20:48 +0300)
committerSofia Papagiannaki <papagian@gmail.com>
Tue, 18 Oct 2011 17:48:29 +0000 (20:48 +0300)
Refs #1171

tools/lib/migrate.py
tools/migrate-data
tools/migrate-db [moved from tools/migrate_db with 50% similarity]
tools/migrate-users [moved from tools/migrate_users with 97% similarity]

index 817499e..de2146d 100644 (file)
@@ -34,7 +34,8 @@
 # or implied, of GRNET S.A.
 
 from sqlalchemy import create_engine
 # or implied, of GRNET S.A.
 
 from sqlalchemy import create_engine
-from sqlalchemy import Table, MetaData
+from sqlalchemy import Table, Column, String, MetaData
+from sqlalchemy.sql import select
 
 from django.conf import settings
 
 
 from django.conf import settings
 
@@ -51,4 +52,35 @@ class Migration(object):
         self.backend = ModularBackend(*options)
     
     def execute(self):
         self.backend = ModularBackend(*options)
     
     def execute(self):
-        pass
\ No newline at end of file
+        pass
+
+class Cache():
+    def __init__(self, db):
+        self.engine = create_engine(db)
+        metadata = MetaData(self.engine)
+        
+        columns=[]
+        columns.append(Column('path', String(2048), primary_key=True))
+        columns.append(Column('hash', String(255)))
+        self.files = Table('files', metadata, *columns)
+        self.conn = self.engine.connect()
+        self.engine.echo = True
+        metadata.create_all(self.engine)
+    
+    def put(self, path, hash):
+        # Insert or replace.
+        s = self.files.delete().where(self.files.c.path==path)
+        r = self.conn.execute(s)
+        r.close()
+        s = self.files.insert()
+        r = self.conn.execute(s, {'path': path, 'hash': hash})
+        r.close()
+    
+    def get(self, path):
+        s = select([self.files.c.hash], self.files.c.path == path)
+        r = self.conn.execute(s)
+        l = r.fetchone()
+        r.close()
+        if not l:
+            return l
+        return l[0]
\ No newline at end of file
index f4cef1e..6794ca3 100755 (executable)
 
 from binascii import hexlify
 
 
 from binascii import hexlify
 
-from sqlalchemy import create_engine
-from sqlalchemy import Table, Column, String, MetaData
+from sqlalchemy import Table
 from sqlalchemy.sql import select
 
 from pithos import settings
 from pithos.backends.modular import ModularBackend
 
 from lib.hashmap import HashMap
 from sqlalchemy.sql import select
 
 from pithos import settings
 from pithos.backends.modular import ModularBackend
 
 from lib.hashmap import HashMap
-from lib.migrate import Migration
+from lib.migrate import Migration, Cache
 
 
+import os
+    
 class DataMigration(Migration):
     def __init__(self, pithosdb, db):
         Migration.__init__(self,  pithosdb)
 class DataMigration(Migration):
     def __init__(self, pithosdb, db):
         Migration.__init__(self,  pithosdb)
-        # XXX Need more columns for primary key - last modified timestamp...
-        engine = create_engine(db)
-        metadata = MetaData(engine)
-        
-        columns=[]
-        columns.append(Column('path', String(2048), primary_key=True))
-        columns.append(Column('hash', String(255)))
-        self.files = Table('files', metadata, *columns)
-        metadata.create_all(engine)
-    
-    def cache_put(self, path, hash):
-        # Insert or replace.
-        s = self.files.delete().where(self.files.c.path==path)
-        r = self.conn.execute(s)
-        r.close()
-        s = self.files.insert()
-        r = self.conn.execute(s, {'path': path, 'hash': hash})
-        r.close()
+        self.cache = Cache(db)
     
     
-    def cache_get(self, path):
-        s = select([self.files.c.hash], self.files.c.path == path)
-        r = self.conn.execute(s)
-        l = r.fetchone()
-        r.close()
-        if not l:
-            return l
-        return l[0]
-    
-    def execute(self):
-        blocksize = self.backend.block_size
-        blockhash = self.backend.hash_algorithm
-        
+    def retrieve_files(self):
         # Loop for all available files.
         filebody = Table('filebody', self.metadata, autoload=True)
         s = select([filebody.c.storedfilepath])
         rp = self.conn.execute(s)
         # Loop for all available files.
         filebody = Table('filebody', self.metadata, autoload=True)
         s = select([filebody.c.storedfilepath])
         rp = self.conn.execute(s)
-        paths = rp.fetchall()
+        path = rp.fetchone()
+        while path:
+            yield path
+            path = rp.fetchone()
         rp.close()
         rp.close()
+    
+    def execute(self):
+        blocksize = self.backend.block_size
+        blockhash = self.backend.hash_algorithm
         
         
-        for path in paths:
+        for (path,) in self.retrieve_files():
             map = HashMap(blocksize, blockhash)
             map = HashMap(blocksize, blockhash)
-            map.load(path)
+            try:
+                map.load(open(path))
+            except Exception, e:
+                print e
+                continue
             hash = hexlify(map.hash())
             
             hash = hexlify(map.hash())
             
-            if hash != self.cache_get(path):
+            if hash != self.cache.get(path):
                 missing = self.backend.blocker.block_ping(map) # XXX Backend hack...
                 status = '[>] ' + path
                 if missing:
                 missing = self.backend.blocker.block_ping(map) # XXX Backend hack...
                 status = '[>] ' + path
                 if missing:
@@ -105,7 +88,7 @@ class DataMigration(Migration):
                             self.backend.put_block(block)
                 else:
                     status += ' - no blocks missing'
                             self.backend.put_block(block)
                 else:
                     status += ' - no blocks missing'
-                self.cache_put(path, hash)
+                self.cache.put(path, hash)
             else:
                 status = '[-] ' + path
             print status
             else:
                 status = '[-] ' + path
             print status
similarity index 50%
rename from tools/migrate_db
rename to tools/migrate-db
index 606a4f8..bef3148 100755 (executable)
@@ -46,12 +46,12 @@ from django.conf import settings
 
 from pithos.backends.modular import CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED
 from pithos.backends.lib.sqlalchemy.node import Node
 
 from pithos.backends.modular import CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED
 from pithos.backends.lib.sqlalchemy.node import Node
-from pithos.backends.lib.sqlalchemy.dbwrapper import DBWrapper
 
 from lib.transfer import upload
 from lib.hashmap import HashMap, file_read_iterator
 from lib.client import Fault
 
 from lib.transfer import upload
 from lib.hashmap import HashMap, file_read_iterator
 from lib.client import Fault
-from lib.migrate import Migration
+from lib.migrate import Migration, Cache
+from calendar import timegm
 
 import json
 import os
 
 import json
 import os
@@ -60,34 +60,11 @@ import hashlib
 import mimetypes
 
 class ObjectMigration(Migration):
 import mimetypes
 
 class ObjectMigration(Migration):
-    def __init__(self, old_db):
+    def __init__(self, old_db, db, f):
         Migration.__init__(self, old_db)
         Migration.__init__(self, old_db)
-        self.wrapper = ClientWrapper(self.backend)
-        params = {'wrapper': DBWrapper(self.backend.db)}
-        self.node = Node(**params)
-        
-    def create_default_containers(self):
-        users = PithosUser.objects.all()
-        for u in users:
-            print '#', u.uniq
-            try:
-                self.wrapper.create_container('pithos', u.uniq)
-                self.wrapper.create_container('trash', u.uniq)
-            except NameError, e:
-                pass
+        self.cache = Cache(db)
     
     
-    def get_path(self, child_id):
-        folderTable = Table('folder', self.metadata, autoload=True)
-        s = select([folderTable.c.parent_id, folderTable.c.name])
-        s = s.where(folderTable.c.id == child_id)
-        rp = self.conn.execute(s)
-        parent_id, foldername = rp.fetchone()
-        if not parent_id:
-            return ''
-        else:
-            return '%s/%s' %(self.get_path(parent_id), foldername)
-    
-    def create_object(self, username, container, object, filepath, mimetype):
+    def create_node(self, username, container, object, filepath, mimetype):
         obj = ''
         path = '/'.join(object.split('/')[:-1])
         name =  object.split('/')[-1]
         obj = ''
         path = '/'.join(object.split('/')[:-1])
         name =  object.split('/')[-1]
@@ -95,35 +72,78 @@ class ObjectMigration(Migration):
         for f in path.split('/'):
             obj = '%s/%s' %(obj, f) if obj else f
             try:
         for f in path.split('/'):
             obj = '%s/%s' %(obj, f) if obj else f
             try:
-                self.wrapper.create_directory_marker(container, obj, username)
+                md5 = hashlib.md5()
+                meta = {'Content-Type':'application/directory',
+                        'hash':  md5.hexdigest().lower()}
+                self.backend.update_object_hashmap(username, username, container, obj, 0, [], meta) 
             except NameError, e:
                 pass
             except NameError, e:
                 pass
-        self.wrapper.set_account(username)
-                
-        prefix = '%s/' %path if path else ''
-        print '#', filepath, container, prefix, name, mimetype
-        return upload(self.wrapper, filepath, container, prefix, name, mimetype)
+        
+        parent_path = '%s/%s' %(username, container)
+        parent_node = self.backend.node.node_lookup(parent_path)
+        path = '%s/%s' %(parent_path, object)
+        nodeid = self.backend.node.node_create(parent_node, path)
+        return nodeid
     
     
-    def create_history(self, user, header_id, node_id, deleted=False):
-        filebody = Table('filebody', self.metadata, autoload=True)
-        gss_user = Table('gss_user', self.metadata, autoload=True)
-        j = filebody.join(gss_user, filebody.c.modifiedby_id == gss_user.c.id)
-        s = select([filebody.c.filesize, gss_user.c.username], from_obj=j)
-        s = s.where(filebody.c.header_id == header_id)
-        s = s.order_by(filebody.c.version)
-        rp = self.conn.execute(s)
-        versions = rp.fetchall()
-        print '#', len(versions)
-        rp.close()
+    def create_history(self, header_id, node_id, deleted=False):
         i = 0
         i = 0
-        for size, modyfied_by  in versions:
-            cluster = CLUSTER_HISTORY if i < len(versions) - 1 else CLUSTER_NORMAL
+        map = HashMap(self.backend.block_size, self.backend.hash_algorithm)
+        for t, rowcount  in self.retrieve_node_versions(header_id):
+            size, modyfied_by, filepath, mimetype, modificationdate = t
+            cluster = CLUSTER_HISTORY if i < rowcount - 1 else CLUSTER_NORMAL
             cluster = cluster if not deleted else CLUSTER_DELETED
             cluster = cluster if not deleted else CLUSTER_DELETED
-            args = (node_id, size, None, modyfied_by, cluster)
-            self.node.version_create(*args)
+            hash = self.cache.get(filepath)
+            if hash == None:
+                raise Exception("Missing hash") 
+            args = (node_id, hash, size, None, modyfied_by, cluster)
+            serial = self.backend.node.version_create(*args)[0]
+            meta = {'hash':hash,
+                    'content-type':mimetype}
+            self.backend.node.attribute_set(serial, ((k, v) for k, v in meta.iteritems()))
+            timestamp = timegm(modificationdate.timetuple())
+            microseconds = modificationdate.time().microsecond
+            f.write('update versions set mtime=\'%10d.%6d\' where serial=%s;' %(timestamp, microseconds, serial))
             i += 1
     
             i += 1
     
+    def create_metadata(self, header_id, node_id):
+        for t in self.retrieve_metadata(header_id):
+            pass
+    
     def create_objects(self):
     def create_objects(self):
+        for username, headerid, folderid, filename, deleted, filepath, mimetype in self.retrieve_current_nodes():
+            path = self.retrieve_path(folderid)[1:]
+            container = 'pithos' if not deleted else 'trash'
+            
+            #create container if it does not exist
+            try:
+                self.backend._lookup_container(username, container)
+            except NameError:
+                self.backend.put_container(username, username, container) 
+            
+            #create node
+            object = '%s/%s' %(path, filename)
+            nodeid = self.create_node(username, container, object, filepath, mimetype)
+            
+            #create node history
+            self.create_history(headerid, nodeid, deleted)
+            
+            self.create_metadata(headerid, nodeid)
+            #self.set_public()
+            #self.statistics()
+            #self.set_permissions()
+    
+    def retrieve_path(self, child_id):
+        folderTable = Table('folder', self.metadata, autoload=True)
+        s = select([folderTable.c.parent_id, folderTable.c.name])
+        s = s.where(folderTable.c.id == child_id)
+        rp = self.conn.execute(s)
+        parent_id, foldername = rp.fetchone()
+        if not parent_id:
+            return ''
+        else:
+            return '%s/%s' %(self.retrieve_path(parent_id), foldername)
+    
+    def retrieve_current_nodes(self):
         fileheader = Table('fileheader', self.metadata, autoload=True)
         filebody = Table('filebody', self.metadata, autoload=True)
         folder = Table('folder', self.metadata, autoload=True)
         fileheader = Table('fileheader', self.metadata, autoload=True)
         filebody = Table('filebody', self.metadata, autoload=True)
         folder = Table('folder', self.metadata, autoload=True)
@@ -134,88 +154,50 @@ class ObjectMigration(Migration):
         s = select([gss_user.c.username,  fileheader.c.id, fileheader.c.folder_id,
                     fileheader.c.name,  fileheader.c.deleted, filebody.c.storedfilepath,
                     filebody.c.mimetype], from_obj=j)
         s = select([gss_user.c.username,  fileheader.c.id, fileheader.c.folder_id,
                     fileheader.c.name,  fileheader.c.deleted, filebody.c.storedfilepath,
                     filebody.c.mimetype], from_obj=j)
+        s = s.limit(1)
         rp = self.conn.execute(s)
         rp = self.conn.execute(s)
-        objects = rp.fetchall()
-        for username, headerid, folderid, filename, deleted, filepath, mimetype in objects:
-            path = self.get_path(folderid)[1:]
-            container = 'pithos' if not deleted else 'trash'
-            object = '%s/%s' %(path, filename)
-            #filepath = '/Users/butters/Downloads/torvalds-linux-0f86267'
-            vserial = self.create_object(username, container, object, filepath, mimetype)
-            nodeid = self.node.version_get_properties(vserial, keys=('node',))[0]
-            self.create_history(username, headerid, nodeid, deleted)
-            self.node.version_remove(vserial)
-            #self.set_metadata()
-            #self.set_public()
-            #self.statistics()
-            #self.set_permissions()
-    
-    def handle_deleted(self):
-        pass
-    
-    def upload_dir(self, dir, prefix, user, container):
-        for f in os.listdir(dir):
-            fullpath = '%s/%s' %(dir, f)
-            if os.path.isfile(fullpath):
-                type = mimetypes.guess_type(fullpath)[0]
-                name = '/'.join(fullpath.split(prefix)[1:])
-                print '@', user, container, name, fullpath, type
-                self.create_object(user, container, name, fullpath, type)
-            else: self.upload_dir(fullpath, prefix, user, container)
-
-class ClientWrapper(object):
-    """Wraps client methods used by transfer.upload()
-    to ModularBackend methods"""
-    
-    def __init__(self, backend):
-        self.backend = backend
-        self.block_size = self.backend.block_size
-        self.block_hash = self.backend.hash_algorithm
-    
-    def set_account(self, account):
-        self.account = account
-    
-    def create_container(self, container, account=None, **meta):
-        self.backend.put_container(account, account, container, meta)
-    
-    def create_directory_marker(self, container, object, account=None):
-        md5 = hashlib.md5()
-        meta = {'Content-Type':'application/directory',
-                'hash':  md5.hexdigest().lower()}
-        self.backend.update_object_hashmap(account, account, container, object, 0, [], meta)   
+        object = rp.fetchone()
+        while object:
+            yield object
+            object = rp.fetchone()
+        rp.close()
     
     
-    def create_object_by_hashmap(self, container, object, map, mimetype=None):
-        hashmap = HashMap(self.block_size, self.block_hash)
-        for h in map['hashes']:
-            hashmap.append(h)
-        meta = {'hash':hexlify(hashmap.hash())}
-        if mimetype:
-            meta['content-type'] = mimetype
-        size = map['bytes']
-        try:
-            args = [self.account, self.account, container, object, size,  map['hashes'], meta]
-            return self.backend.update_object_hashmap(*args)
-        except IndexError, ie:
-            fault = Fault(ie.data, 409)
-            raise fault
+    def retrieve_node_versions(self, header_id):
+        filebody = Table('filebody', self.metadata, autoload=True)
+        gss_user = Table('gss_user', self.metadata, autoload=True)
+        j = filebody.join(gss_user, filebody.c.modifiedby_id == gss_user.c.id)
+        s = select([filebody.c.filesize, gss_user.c.username,
+                    filebody.c.storedfilepath, filebody.c.mimetype,
+                    filebody.c.modificationdate], from_obj=j)
+        s = s.where(filebody.c.header_id == header_id)
+        s = s.order_by(filebody.c.version)
+        rp = self.conn.execute(s)
+        version = rp.fetchone()
+        while version:
+            yield version, rp.rowcount
+            version = rp.fetchone()
+        rp.close()
     
     
-    def update_container_data(self, container, f):
-        #just put the blocks
-        for block in file_read_iterator(f, self.block_size):
-            self.backend.put_block(block)
+    def retrieve_metadata(self, header_id):
+        filetag = Table('filetag', self.metadata, autoload=True)
+        s = filetag.select(filetag.c.fileid == header_id)
+        rp = self.conn.execute(s)
+        tag = rp.fetchone()
+        while tag:
+            yield tag
+            tag = tp.fetchone()
+        rp.close()
     
     
-    def retrieve_container_metadata(self, container):
-        return {'x-container-block-size':self.block_size,
-                'x-container-block-hash':self.block_hash}
+    def handle_deleted(self):
+        pass
 
 if __name__ == "__main__":
     old_db = ''
 
 if __name__ == "__main__":
     old_db = ''
+    db = ''
     
     
-    ot = ObjectMigration(old_db)
-    #ot.create_default_containers()
-    #ot.create_objects()
-    
-    p = ''
-    ot.upload_dir(p, p, 'chstath', 'linux')
+    f = open('fixdates.sql', 'w')
+    ot = ObjectMigration(old_db, db, f)
+    ot.create_objects()
+    f.close()
     
     
\ No newline at end of file
     
     
\ No newline at end of file
similarity index 97%
rename from tools/migrate_users
rename to tools/migrate-users
index 78091f0..005d4f5 100755 (executable)
@@ -65,6 +65,6 @@ class UserMigration(Migration):
             user.save(update_timestamps=False)
 
 if __name__ == "__main__":
             user.save(update_timestamps=False)
 
 if __name__ == "__main__":
-    db = 'postgresql://gss:m0ust@rda@62.217.112.56/pithos'
+    db = 'postgresql://gss@localhost/pithos'
     m = UserMigration(db)
     m.execute()
\ No newline at end of file
     m = UserMigration(db)
     m.execute()
\ No newline at end of file