Revision f6c0005f tools/migrate-data

b/tools/migrate-data
35 35

  
36 36
from binascii import hexlify
37 37

  
38
from sqlalchemy import create_engine
39
from sqlalchemy import Table, Column, String, MetaData
38
from sqlalchemy import Table
40 39
from sqlalchemy.sql import select
41 40

  
42 41
from pithos import settings
43 42
from pithos.backends.modular import ModularBackend
44 43

  
45 44
from lib.hashmap import HashMap
46
from lib.migrate import Migration
45
from lib.migrate import Migration, Cache
47 46

  
47
import os
48
    
48 49
class DataMigration(Migration):
49 50
    def __init__(self, pithosdb, db):
50 51
        Migration.__init__(self,  pithosdb)
51
        # XXX Need more columns for primary key - last modified timestamp...
52
        engine = create_engine(db)
53
        metadata = MetaData(engine)
54
        
55
        columns=[]
56
        columns.append(Column('path', String(2048), primary_key=True))
57
        columns.append(Column('hash', String(255)))
58
        self.files = Table('files', metadata, *columns)
59
        metadata.create_all(engine)
60
    
61
    def cache_put(self, path, hash):
62
        # Insert or replace.
63
        s = self.files.delete().where(self.files.c.path==path)
64
        r = self.conn.execute(s)
65
        r.close()
66
        s = self.files.insert()
67
        r = self.conn.execute(s, {'path': path, 'hash': hash})
68
        r.close()
52
        self.cache = Cache(db)
69 53
    
70
    def cache_get(self, path):
71
        s = select([self.files.c.hash], self.files.c.path == path)
72
        r = self.conn.execute(s)
73
        l = r.fetchone()
74
        r.close()
75
        if not l:
76
            return l
77
        return l[0]
78
    
79
    def execute(self):
80
        blocksize = self.backend.block_size
81
        blockhash = self.backend.hash_algorithm
82
        
54
    def retrieve_files(self):
83 55
        # Loop for all available files.
84 56
        filebody = Table('filebody', self.metadata, autoload=True)
85 57
        s = select([filebody.c.storedfilepath])
86 58
        rp = self.conn.execute(s)
87
        paths = rp.fetchall()
59
        path = rp.fetchone()
60
        while path:
61
            yield path
62
            path = rp.fetchone()
88 63
        rp.close()
64
    
65
    def execute(self):
66
        blocksize = self.backend.block_size
67
        blockhash = self.backend.hash_algorithm
89 68
        
90
        for path in paths:
69
        for (path,) in self.retrieve_files():
91 70
            map = HashMap(blocksize, blockhash)
92
            map.load(path)
71
            try:
72
                map.load(open(path))
73
            except Exception, e:
74
                print e
75
                continue
93 76
            hash = hexlify(map.hash())
94 77
            
95
            if hash != self.cache_get(path):
78
            if hash != self.cache.get(path):
96 79
                missing = self.backend.blocker.block_ping(map) # XXX Backend hack...
97 80
                status = '[>] ' + path
98 81
                if missing:
......
105 88
                            self.backend.put_block(block)
106 89
                else:
107 90
                    status += ' - no blocks missing'
108
                self.cache_put(path, hash)
91
                self.cache.put(path, hash)
109 92
            else:
110 93
                status = '[-] ' + path
111 94
            print status

Also available in: Unified diff