Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-tools / pithos / tools / pithos-sync @ 8c306eab

History | View | Annotate | Download (10.4 kB)

1
#!/usr/bin/env python
2

    
3
# Copyright 2011 GRNET S.A. All rights reserved.
4
# 
5
# Redistribution and use in source and binary forms, with or
6
# without modification, are permitted provided that the following
7
# conditions are met:
8
# 
9
#   1. Redistributions of source code must retain the above
10
#      copyright notice, this list of conditions and the following
11
#      disclaimer.
12
# 
13
#   2. Redistributions in binary form must reproduce the above
14
#      copyright notice, this list of conditions and the following
15
#      disclaimer in the documentation and/or other materials
16
#      provided with the distribution.
17
# 
18
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
19
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
20
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
22
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
25
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
26
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
27
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
28
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29
# POSSIBILITY OF SUCH DAMAGE.
30
# 
31
# The views and conclusions contained in the software and
32
# documentation are those of the authors and should not be
33
# interpreted as representing official policies, either expressed
34
# or implied, of GRNET S.A.
35

    
36
import os
37
import sqlite3
38
import sys
39

    
40
from os.path import exists, expanduser, isdir, isfile, join, split
41
from shutil import copyfile
42
from time import time
43

    
44
from pithos.lib.transfer import download, upload
45
from pithos.lib.client import Pithos_Client, Fault
46
from pithos.lib.hashmap import merkle
47
from pithos.lib.util import get_user, get_auth, get_server
48

    
49

    
50
DEFAULT_CONTAINER = 'pithos'
51
SETTINGS_DIR = expanduser('~/.pithos')
52
TRASH_DIR = '.pithos_trash'
53

    
54
SQL_CREATE_FILES_TABLE = '''CREATE TABLE IF NOT EXISTS files (
55
                                path TEXT PRIMARY KEY,
56
                                hash TEXT,
57
                                timestamp INTEGER)'''
58

    
59

    
60
client = Pithos_Client(get_server(), get_auth(), get_user())
61

    
62

    
63
def _makedirs(path):
64
    try:
65
        os.makedirs(path)
66
    except OSError:
67
        pass
68

    
69

    
70
class State(object):
71
    def __init__(self, syncdir, container):
72
        self.syncdir = syncdir
73
        self.container = container
74
        self.trashdir = join(syncdir, TRASH_DIR)
75
        self.deleted_dirs = set()
76

    
77
        _makedirs(self.trashdir)
78
        
79
        dbpath = join(SETTINGS_DIR, 'sync.db')
80
        self.conn = sqlite3.connect(dbpath)
81
        self.conn.execute(SQL_CREATE_FILES_TABLE)
82
        self.conn.commit()
83
    
84
    def current_hash(self, path):
85
        """Return the hash of the file as it exists now in the filesystem"""
86
        
87
        fullpath = join(self.syncdir, path)
88
        if fullpath in self.deleted_dirs:
89
            return 'DEL'
90
        if not exists(fullpath):
91
            return 'DEL'
92
        if isdir(fullpath):
93
            return 'DIR'
94
        return merkle(fullpath)
95
    
96
    def delete_inactive(self, timestamp):
97
        sql = 'DELETE FROM files WHERE timestamp != ?'
98
        self.conn.execute(sql, (timestamp,))
99
        self.conn.commit()
100
    
101
    def download(self, path, hash):
102
        fullpath = join(self.syncdir, path)
103
        if hash == 'DEL':
104
            self.trash(path)
105
        elif hash == 'DIR':
106
            _makedirs(fullpath)
107
        else:
108
            self.trash(path)    # Trash any old version
109
            localpath = self.find_hash(hash)
110
            if localpath:
111
                copyfile(localpath, fullpath)
112
            else:
113
                print 'Downloading %s...' % path
114
                download(client, self.container, path, fullpath)
115
        
116
        current = self.current_hash(path)
117
        assert current == hash, "Downloaded file does not match hash"
118
        self.save(path, hash)
119
    
120
    def empty_trash(self):
121
        for filename in os.listdir(self.trashdir):
122
            path = join(self.trashdir, filename)
123
            os.remove(path)
124
    
125
    def find_hash(self, hash):
126
        sql = 'SELECT path FROM files WHERE hash = ?'
127
        ret = self.conn.execute(sql, (hash,)).fetchone()
128
        if ret:
129
            return join(self.syncdir, ret[0])
130
        
131
        if hash in os.listdir(self.trashdir):
132
            return join(self.trashdir, hash)
133
        
134
        return None
135
    
136
    def previous_hash(self, path):
137
        """Return the hash of the file according to the previous sync with
138
           the server. Return DEL if not such entry exists."""
139
        
140
        sql = 'SELECT hash FROM files WHERE path = ?'
141
        ret = self.conn.execute(sql, (path,)).fetchone()
142
        return ret[0] if ret else 'DEL'
143
    
144
    def remote_hash(self, path):
145
        """Return the hash of the file according to the server"""
146
        
147
        try:
148
            meta = client.retrieve_object_metadata(self.container, path)
149
        except Fault:
150
            return 'DEL'
151
        if meta.get('content-type', None) == 'application/directory':
152
            return 'DIR'
153
        else:
154
            return meta['x-object-hash']
155
    
156
    def remove_deleted_dirs(self):
157
        for path in sorted(self.deleted_dirs, key=len, reverse=True):
158
            os.rmdir(path)
159
            self.deleted_dirs.remove(path)
160
    
161
    def resolve_conflict(self, path, hash):
162
        """Resolve a sync conflict by renaming the local file and downloading
163
           the remote one."""
164
        
165
        fullpath = join(self.syncdir, path)
166
        resolved = fullpath + '.local'
167
        i = 0
168
        while exists(resolved):
169
            i += 1
170
            resolved = fullpath + '.local%d' % i
171
        
172
        os.rename(fullpath, resolved)
173
        self.download(path, hash)
174
    
175
    def rmdir(self, path):
176
        """Remove a dir or mark for deletion if non-empty
177
        
178
        If a dir is empty delete it and check if any of its parents should be
179
        deleted too. Else mark it for later deletion.
180
        """
181
        
182
        fullpath = join(self.syncdir, path)
183
        if not exists(fullpath):
184
            return
185
        
186
        if os.listdir(fullpath):
187
            # Directory not empty
188
            self.deleted_dirs.add(fullpath)
189
            return
190
        
191
        os.rmdir(fullpath)
192
        self.deleted_dirs.discard(fullpath)
193
        
194
        parent = dirname(fullpath)
195
        while parent in self.deleted_dirs:
196
            os.rmdir(parent)
197
            self.deleted_dirs.remove(parent)
198
            parent = dirname(parent)
199
    
200
    def save(self, path, hash):
201
        """Save the hash value of a file. This value will be later returned
202
           by `previous_hash`."""
203
        
204
        sql = 'INSERT OR REPLACE INTO files (path, hash) VALUES (?, ?)'
205
        self.conn.execute(sql, (path, hash))
206
        self.conn.commit()
207
    
208
    def touch(self, path, now):
209
        sql = 'UPDATE files SET timestamp = ? WHERE path = ?'
210
        self.conn.execute(sql, (now, path))
211
        self.conn.commit()
212
    
213
    def trash(self, path):
214
        """Move a file to trash or delete it if it's a directory"""
215
        
216
        fullpath = join(self.syncdir, path)
217
        if not exists(fullpath):
218
            return
219
        
220
        if isfile(fullpath):
221
            hash = merkle(fullpath)
222
            trashpath = join(self.trashdir, hash)
223
            os.rename(fullpath, trashpath)
224
        else:
225
            self.rmdir(path)
226
    
227
    def upload(self, path, hash):
228
        fullpath = join(self.syncdir, path)
229
        if hash == 'DEL':
230
            client.delete_object(self.container, path)
231
        elif hash == 'DIR':
232
            client.create_directory_marker(self.container, path)
233
        else:
234
            prefix, name = split(path)
235
            if prefix:
236
                prefix += '/'
237
            print 'Uploading %s...' % path
238
            upload(client, fullpath, self.container, prefix, name)
239
        
240
        remote = self.remote_hash(path)
241
        assert remote == hash, "Uploaded file does not match hash"
242
        self.save(path, hash)
243

    
244

    
245
def sync(path, state):
246
    previous = state.previous_hash(path)
247
    current = state.current_hash(path)
248
    remote = state.remote_hash(path)
249
    
250
    if current == previous:
251
        # No local changes, download any remote changes
252
        if remote != previous:
253
            state.download(path, remote)
254
    elif remote == previous:
255
        # No remote changes, upload any local changes
256
        if current != previous:
257
            state.upload(path, current)
258
    else:
259
        # Both local and remote file have changes since last sync
260
        if current == remote:
261
            state.save(path, remote)    # Local and remote changes match
262
        else:
263
            state.resolve_conflict(path, remote)
264

    
265

    
266
def walk(dir, container):
267
    """Iterates on the files of the hierarchy created by merging the files
268
       in `dir` and the objects in `container`."""
269
    
270
    pending = ['']
271
    
272
    while pending:
273
        dirs = set()
274
        files = set()
275
        root = pending.pop(0)   # Depth First Traversal
276
        if root == TRASH_DIR:
277
            continue
278
        if root:
279
            yield root
280
        
281
        dirpath = join(dir, root)
282
        if exists(dirpath):
283
            for filename in os.listdir(dirpath):
284
                path = join(root, filename)
285
                if isdir(join(dir, path)):
286
                    dirs.add(path)
287
                else:
288
                    files.add(path)
289
        
290
        for object in client.list_objects(container, format='json',
291
                prefix=root, delimiter='/'):
292
            if 'subdir' in object:
293
                continue
294
            name = object['name']
295
            if object['content_type'] == 'application/directory':
296
                dirs.add(name)
297
            else:
298
                files.add(name)
299
        
300
        pending += sorted(dirs)
301
        for path in files:
302
            yield path
303

    
304

    
305
def main():
306
    if len(sys.argv) != 2:
307
        print 'syntax: %s <dir>' % sys.argv[0]
308
        sys.exit(1)
309
    
310
    syncdir = sys.argv[1]
311
    
312
    _makedirs(SETTINGS_DIR)
313
    container = os.environ.get('PITHOS_SYNC_CONTAINER', DEFAULT_CONTAINER)
314
    client.create_container(container)
315
    
316
    state = State(syncdir, container)
317
    
318
    now = int(time())
319
    for path in walk(syncdir, container):
320
        print 'Syncing', path
321
        sync(path, state)
322
        state.touch(path, now)
323
    
324
    state.delete_inactive(now)
325
    state.empty_trash()
326
    state.remove_deleted_dirs()
327

    
328

    
329
if __name__ == '__main__':
330
    main()