Revision c0b10ca6 tools/pithos-sync

b/tools/pithos-sync
36 36
import os
37 37
import sqlite3
38 38
import sys
39
import shutil
40
import pickle
41 39

  
42
from lib import transfer
40
from os.path import exists, expanduser, isdir, isfile, join, split
41
from shutil import copyfile
42
from time import time
43

  
44
from lib.transfer import download, upload
43 45
from lib.client import Pithos_Client, Fault
44
from lib.hashmap import HashMap, merkle
46
from lib.hashmap import merkle
45 47
from lib.util import get_user, get_auth, get_server
46 48

  
47 49

  
48 50
DEFAULT_CONTAINER = 'pithos'
51
SETTINGS_DIR = expanduser('~/.pithos')
52
TRASH_DIR = '.pithos_trash'
49 53

  
50
def get_container():
51
    try:
52
        return os.environ['PITHOS_SYNC_CONTAINER']
53
    except KeyError:
54
        return DEFAULT_CONTAINER
55

  
54
SQL_CREATE_FILES_TABLE = '''CREATE TABLE IF NOT EXISTS files (
55
                                path TEXT PRIMARY KEY,
56
                                hash TEXT,
57
                                timestamp INTEGER)'''
56 58

  
57
def create_dir(path):
58
    if not os.path.exists(path):
59
        os.makedirs(path)
60
    if not os.path.isdir(path):
61
        raise RuntimeError("Cannot open '%s'" % (path,))
62 59

  
60
client = Pithos_Client(get_server(), get_auth(), get_user())
63 61

  
64
def copy_file(src, dst):
65
    print '***', 'COPYING', src, dst
66
    path = os.path.dirname(dst)
67
    create_dir(path)
68
    shutil.copyfile(src, dst)
69 62

  
63
def _makedirs(path):
64
    try:
65
        os.makedirs(path)
66
    except OSError:
67
        pass
70 68

  
71
client = None
72
conf = None
73
confdir = None
74
trash = None
75
lstate = None
76
cstate = None
77
rstate = None
78 69

  
70
class State(object):
71
    def __init__(self, syncdir, container):
72
        self.syncdir = syncdir
73
        self.container = container
74
        self.trashdir = join(syncdir, TRASH_DIR)
75
        self.deleted_dirs = set()
79 76

  
80
class Trash(object):
81
    def __init__(self):
82
        self.path = os.path.join(confdir, 'trash')
83
        create_dir(self.path)
77
        _makedirs(self.trashdir)
84 78
        
85
        dbpath = os.path.join(confdir, 'trash.db')
79
        dbpath = join(SETTINGS_DIR, 'sync.db')
86 80
        self.conn = sqlite3.connect(dbpath)
87
        sql = '''CREATE TABLE IF NOT EXISTS files (
88
                    path TEXT PRIMARY KEY, hash TEXT)'''
89
        self.conn.execute(sql)
81
        self.conn.execute(SQL_CREATE_FILES_TABLE)
90 82
        self.conn.commit()
91 83
    
92
    def put(self, fullpath, path, hash):
93
        copy_file(fullpath, os.path.join(self.path, path))
94
        os.remove(fullpath)
95
        sql = 'INSERT OR REPLACE INTO files VALUES (?, ?)'
96
        self.conn.execute(sql, (path, hash))
97
        self.conn.commit()
98
    
99
    def search(self, hash):
100
        sql = 'SELECT path FROM files WHERE hash = ?'
101
        ret = self.conn.execute(sql, (hash,)).fetchone()
102
        return ret[0] if ret else None
84
    def current_hash(self, path):
85
        """Return the hash of the file as it exists now in the filesystem"""
86
        
87
        fullpath = join(self.syncdir, path)
88
        if fullpath in self.deleted_dirs:
89
            return 'DEL'
90
        if not exists(fullpath):
91
            return 'DEL'
92
        if isdir(fullpath):
93
            return 'DIR'
94
        return merkle(fullpath)
103 95
    
104
    def empty(self):
105
        sql = 'DELETE FROM files'
106
        self.conn.execute(sql)
96
    def delete_inactive(self, timestamp):
97
        sql = 'DELETE FROM files WHERE timestamp != ?'
98
        self.conn.execute(sql, (timestamp,))
107 99
        self.conn.commit()
108
        shutil.rmtree(self.path)
109 100
    
110
    def fullpath(self, path):
111
        return os.path.join(self.path, path)
112

  
113

  
114
class LocalState(object):
115
    def __init__(self, path):
116
        self.path = path
101
    def download(self, path, hash):
102
        fullpath = join(self.syncdir, path)
103
        if hash == 'DEL':
104
            self.trash(path)
105
        elif hash == 'DIR':
106
            _makedirs(fullpath)
107
        else:
108
            self.trash(path)    # Trash any old version
109
            localpath = self.find_hash(hash)
110
            if localpath:
111
                copyfile(localpath, fullpath)
112
            else:
113
                print 'Downloading %s...' % path
114
                download(client, self.container, path, fullpath)
117 115
        
118
        dbpath = os.path.join(confdir, 'state.db')
119
        self.conn = sqlite3.connect(dbpath)
120
        sql = '''CREATE TABLE IF NOT EXISTS files (
121
                    path TEXT PRIMARY KEY, hash TEXT)'''
122
        self.conn.execute(sql)
123
        self.conn.commit()
116
        current = self.current_hash(path)
117
        assert current == hash, "Downloaded file does not match hash"
118
        self.save(path, hash)
124 119
    
125
    def get(self, path):
126
        sql = 'SELECT hash FROM files WHERE path = ?'
127
        ret = self.conn.execute(sql, (path,)).fetchone()
128
        return ret[0] if ret else 'DEL'
120
    def empty_trash(self):
121
        for filename in os.listdir(self.trashdir):
122
            path = join(self.trashdir, filename)
123
            os.remove(path)
129 124
    
130
    def put(self, path, hash):
131
        sql = 'INSERT OR REPLACE INTO files VALUES (?, ?)'
132
        self.conn.execute(sql, (path, hash))
133
        self.conn.commit()
134

  
135
    def search(self, hash):
125
    def find_hash(self, hash):
136 126
        sql = 'SELECT path FROM files WHERE hash = ?'
137 127
        ret = self.conn.execute(sql, (hash,)).fetchone()
138
        return ret[0] if ret else None
139
    
140
    def fullpath(self, path):
141
        return os.path.join(self.path, path)
142

  
143

  
144
class CurrentState(object):
145
    def __init__(self, path):
146
        self.path = path
147
    
148
    def get(self, path):
149
        fullpath = os.path.join(self.path, path)
150
        if os.path.exists(fullpath):
151
            if os.path.isdir(fullpath):
152
                return 'DIR'
153
            else:
154
                return merkle(fullpath, conf['blocksize'], conf['blockhash'])
155
        else:
156
            return 'DEL'
128
        if ret:
129
            return join(self.syncdir, ret[0])
130
        
131
        if hash in os.listdir(self.trashdir):
132
            return join(self.trashdir, hash)
133
        
134
        return None
157 135
    
158
    def fullpath(self, path):
159
        return os.path.join(self.path, path)
160

  
161

  
162
class RemoteState(object):
163
    def __init__(self, client):
164
        self.client = client
165
        self.container = get_container()
136
    def previous_hash(self, path):
137
        """Return the hash of the file according to the previous sync with
138
           the server. Return DEL if not such entry exists."""
139
        
140
        sql = 'SELECT hash FROM files WHERE path = ?'
141
        ret = self.conn.execute(sql, (path,)).fetchone()
142
        return ret[0] if ret else 'DEL'
166 143
    
167
    def get(self, path):
144
    def remote_hash(self, path):
145
        """Return the hash of the file according to the server"""
146
        
168 147
        try:
169
            meta = self.client.retrieve_object_metadata(self.container, path)
148
            meta = client.retrieve_object_metadata(self.container, path)
170 149
        except Fault:
171 150
            return 'DEL'
172 151
        if meta.get('content-type', None) == 'application/directory':
173 152
            return 'DIR'
174 153
        else:
175 154
            return meta['x-object-hash']
176

  
177

  
178
def update_local(path, S):
179
    # XXX If something is already here, put it in trash and delete it.
180
    # XXX If we have a directory already here, put all files in trash.
181
    fullpath = cstate.fullpath(path)
182
    if S == 'DEL':
183
        trash.put(fullpath, path, S)
184
    elif S == 'DIR':
185
        if os.path.exists(fullpath):
186
            trash.put(fullpath, path, S)
187
        # XXX Strip trailing slash (or escape).
188
        os.mkdir(fullpath)
189
    else:
190
        # First, search for local copy
191
        file = lstate.search(S)
192
        if file:
193
            copy_file(lstate.fullpath(file), fullpath)
155
    
156
    def remove_deleted_dirs(self):
157
        for path in sorted(self.deleted_dirs, key=len, reverse=True):
158
            os.rmdir(path)
159
            self.deleted_dirs.remove(path)
160
    
161
    def resolve_conflict(self, path, hash):
162
        """Resolve a sync conflict by renaming the local file and downloading
163
           the remote one."""
164
        
165
        fullpath = join(self.syncdir, path)
166
        resolved = fullpath + '.local'
167
        i = 0
168
        while exists(resolved):
169
            i += 1
170
            resolved = fullpath + '.local%d' % i
171
        
172
        os.rename(fullpath, resolved)
173
        self.download(path, hash)
174
    
175
    def rmdir(self, path):
176
        """Remove a dir or mark for deletion if non-empty
177
        
178
        If a dir is empty delete it and check if any of its parents should be
179
        deleted too. Else mark it for later deletion.
180
        """
181
        
182
        fullpath = join(self.syncdir, path)
183
        if not exists(fullpath):
184
            return
185
        
186
        if os.listdir(fullpath):
187
            # Directory not empty
188
            self.deleted_dirs.add(fullpath)
189
            return
190
        
191
        os.rmdir(fullpath)
192
        self.deleted_dirs.discard(fullpath)
193
        
194
        parent = dirname(fullpath)
195
        while parent in self.deleted_dirs:
196
            os.rmdir(parent)
197
            self.deleted_dirs.remove(parent)
198
            parent = dirname(parent)
199
    
200
    def save(self, path, hash):
201
        """Save the hash value of a file. This value will be later returned
202
           by `previous_hash`."""
203
        
204
        sql = 'INSERT OR REPLACE INTO files (path, hash) VALUES (?, ?)'
205
        self.conn.execute(sql, (path, hash))
206
        self.conn.commit()
207
    
208
    def touch(self, path, now):
209
        sql = 'UPDATE files SET timestamp = ? WHERE path = ?'
210
        self.conn.execute(sql, (now, path))
211
        self.conn.commit()
212
    
213
    def trash(self, path):
214
        """Move a file to trash or delete it if it's a directory"""
215
        
216
        fullpath = join(self.syncdir, path)
217
        if not exists(fullpath):
218
            return
219
        
220
        if isfile(fullpath):
221
            hash = merkle(fullpath)
222
            trashpath = join(self.trashdir, hash)
223
            os.rename(fullpath, trashpath)
194 224
        else:
195
            # Search for copy in trash
196
            file = trash.search(S)
197
            if file:
198
                # XXX Move from trash (not copy).
199
                copy_file(trash.fullpath(file), fullpath)
200
            else:
201
                # Download
202
                transfer.download(client, get_container(), path, fullpath)
203
        assert cstate.get(path) == S
204

  
205

  
206
def update_remote(path, S):
207
    fullpath = cstate.fullpath(path)
208
    if S == 'DEL':
209
        client.delete_object(get_container(), path)
210
    elif S == 'DIR':
211
        client.create_directory_marker(get_container(), path)
212
    else:
213
        prefix, name = os.path.split(path)
214
        if prefix:
215
            prefix += '/'
216
        transfer.upload(client, fullpath, get_container(), prefix, name)
217
        assert rstate.get(path) == S
218

  
219

  
220
def resolve_conflict(path):
221
    # XXX Check if this works with dirs.
222
    fullpath = cstate.fullpath(path)
223
    if os.path.exists(fullpath):
224
        os.rename(fullpath, fullpath + '.local')
225

  
225
            self.rmdir(path)
226
    
227
    def upload(self, path, hash):
228
        fullpath = join(self.syncdir, path)
229
        if hash == 'DEL':
230
            client.delete_object(self.container, path)
231
        elif hash == 'DIR':
232
            client.create_directory_marker(self.container, path)
233
        else:
234
            prefix, name = split(path)
235
            if prefix:
236
                prefix += '/'
237
            print 'Uploading %s...' % path
238
            upload(client, fullpath, self.container, prefix, name)
239
        
240
        remote = self.remote_hash(path)
241
        assert remote == hash, "Uploaded file does not match hash"
242
        self.save(path, hash)
226 243

  
227
def sync(path):
228
    L = lstate.get(path)
229
    C = cstate.get(path)
230
    R = rstate.get(path)
231 244

  
232
    if C == L:
233
        # No local changes
234
        if R != L:
235
            update_local(path, R)
236
            lstate.put(path, R)
237
        return
245
def sync(path, state):
246
    previous = state.previous_hash(path)
247
    current = state.current_hash(path)
248
    remote = state.remote_hash(path)
238 249
    
239
    if R == L:
240
        # No remote changes
241
        if C != L:
242
            update_remote(path, C)
243
            lstate.put(path, C)
244
        return
245
    
246
    # At this point both local and remote states have changes since last sync
247

  
248
    if C == R:
249
        # We were lucky, both had the same change
250
        lstate.put(path, R)
250
    if current == previous:
251
        # No local changes, download any remote changes
252
        if remote != previous:
253
            state.download(path, remote)
254
    elif remote == previous:
255
        # No remote changes, upload any local changes
256
        if current != previous:
257
            state.upload(path, current)
251 258
    else:
252
        # Conflict, try to resolve it
253
        resolve_conflict(path)
254
        update_local(path, R)
255
        lstate.put(path, R)
259
        # Both local and remote file have changes since last sync
260
        if current == remote:
261
            state.save(path, remote)    # Local and remote changes match
262
        else:
263
            state.resolve_conflict(path, remote)
256 264

  
257 265

  
258
def walk(dir):
266
def walk(dir, container):
267
    """Iterates on the files of the hierarchy created by merging the files
268
       in `dir` and the objects in `container`."""
269
    
259 270
    pending = ['']
260 271
    
261 272
    while pending:
262 273
        dirs = set()
263 274
        files = set()
264
        root = pending.pop(0)
275
        root = pending.pop(0)   # Depth First Traversal
276
        if root == TRASH_DIR:
277
            continue
265 278
        if root:
266 279
            yield root
267 280
        
268
        dirpath = os.path.join(dir, root)
269
        if os.path.exists(dirpath):
281
        dirpath = join(dir, root)
282
        if exists(dirpath):
270 283
            for filename in os.listdir(dirpath):
271
                path = os.path.join(root, filename)
272
                if os.path.isdir(os.path.join(dir, path)):
284
                path = join(root, filename)
285
                if isdir(join(dir, path)):
273 286
                    dirs.add(path)
274 287
                else:
275 288
                    files.add(path)
276 289
        
277
        for object in client.list_objects(get_container(), prefix=root,
278
                                            delimiter='/', format='json'):
279
            # XXX Check subdirs.
290
        for object in client.list_objects(container, format='json',
291
                prefix=root, delimiter='/'):
280 292
            if 'subdir' in object:
281 293
                continue
282
            name = str(object['name'])
294
            name = object['name']
283 295
            if object['content_type'] == 'application/directory':
284 296
                dirs.add(name)
285 297
            else:
......
291 303

  
292 304

  
293 305
def main():
294
    global client, conf, confdir, trash, lstate, cstate, rstate
295
    
296 306
    if len(sys.argv) != 2:
297 307
        print 'syntax: %s <dir>' % sys.argv[0]
298 308
        sys.exit(1)
299 309
    
300
    dir = sys.argv[1]
301
    client = Pithos_Client(get_server(), get_auth(), get_user())
310
    syncdir = sys.argv[1]
302 311
    
303
    container = get_container()
304
    try:
305
        meta = client.retrieve_container_metadata(container)
306
    except Fault:
307
        raise RuntimeError("Cannot open container '%s'" % (container,))
308
    
309
    conf = {'local': dir,
310
            'remote': container,
311
            'blocksize': int(meta['x-container-block-size']),
312
            'blockhash': meta['x-container-block-hash']}
313
    confdir = os.path.expanduser('~/.pithos-sync/')
312
    _makedirs(SETTINGS_DIR)
313
    container = os.environ.get('PITHOS_SYNC_CONTAINER', DEFAULT_CONTAINER)
314
    client.create_container(container)
314 315
    
315
    conffile = os.path.join(confdir, 'config')
316
    if os.path.isfile(conffile):
317
        try:
318
            if (conf != pickle.loads(open(conffile, 'rb').read())):
319
                raise ValueError
320
        except:
321
            shutil.rmtree(confdir)
322
    create_dir(confdir)
316
    state = State(syncdir, container)
323 317
    
324
    trash = Trash()
325
    lstate = LocalState(dir)
326
    cstate = CurrentState(dir)
327
    rstate = RemoteState(client)
328
    
329
    for path in walk(dir):
318
    now = int(time())
319
    for path in walk(syncdir, container):
330 320
        print 'Syncing', path
331
        sync(path)
332
    
333
    f = open(conffile, 'wb')
334
    f.write(pickle.dumps(conf))
335
    f.close()
321
        sync(path, state)
322
        state.touch(path, now)
336 323
    
337
    trash.empty()
324
    state.delete_inactive(now)
325
    state.empty_trash()
326
    state.remove_deleted_dirs()
338 327

  
339 328

  
340 329
if __name__ == '__main__':

Also available in: Unified diff