root / snf-pithos-tools / pithos / tools / sync.py @ bff2c0e0
History | View | Annotate | Download (10.4 kB)
1 |
#!/usr/bin/env python
|
---|---|
2 |
|
3 |
# Copyright 2011-2012 GRNET S.A. All rights reserved.
|
4 |
#
|
5 |
# Redistribution and use in source and binary forms, with or
|
6 |
# without modification, are permitted provided that the following
|
7 |
# conditions are met:
|
8 |
#
|
9 |
# 1. Redistributions of source code must retain the above
|
10 |
# copyright notice, this list of conditions and the following
|
11 |
# disclaimer.
|
12 |
#
|
13 |
# 2. Redistributions in binary form must reproduce the above
|
14 |
# copyright notice, this list of conditions and the following
|
15 |
# disclaimer in the documentation and/or other materials
|
16 |
# provided with the distribution.
|
17 |
#
|
18 |
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
|
19 |
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
|
20 |
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
|
21 |
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
|
22 |
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
23 |
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
24 |
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
|
25 |
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
|
26 |
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
|
27 |
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
|
28 |
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
29 |
# POSSIBILITY OF SUCH DAMAGE.
|
30 |
#
|
31 |
# The views and conclusions contained in the software and
|
32 |
# documentation are those of the authors and should not be
|
33 |
# interpreted as representing official policies, either expressed
|
34 |
# or implied, of GRNET S.A.
|
35 |
|
36 |
import os |
37 |
import sqlite3 |
38 |
import sys |
39 |
|
40 |
from os.path import exists, expanduser, isdir, isfile, join, split |
41 |
from shutil import copyfile |
42 |
from time import time |
43 |
|
44 |
from pithos.tools.lib.transfer import download, upload |
45 |
from pithos.tools.lib.client import Pithos_Client, Fault |
46 |
from pithos.tools.lib.hashmap import merkle |
47 |
from pithos.tools.lib.util import get_user, get_auth, get_url |
48 |
|
49 |
|
50 |
DEFAULT_CONTAINER = 'pithos'
|
51 |
SETTINGS_DIR = expanduser('~/.pithos')
|
52 |
TRASH_DIR = '.pithos_trash'
|
53 |
|
54 |
SQL_CREATE_FILES_TABLE = '''CREATE TABLE IF NOT EXISTS files (
|
55 |
path TEXT PRIMARY KEY,
|
56 |
hash TEXT,
|
57 |
timestamp INTEGER)'''
|
58 |
|
59 |
|
60 |
client = Pithos_Client(get_url(), get_auth(), get_user()) |
61 |
|
62 |
|
63 |
def _makedirs(path): |
64 |
try:
|
65 |
os.makedirs(path) |
66 |
except OSError: |
67 |
pass
|
68 |
|
69 |
|
70 |
class State(object): |
71 |
def __init__(self, syncdir, container): |
72 |
self.syncdir = syncdir
|
73 |
self.container = container
|
74 |
self.trashdir = join(syncdir, TRASH_DIR)
|
75 |
self.deleted_dirs = set() |
76 |
|
77 |
_makedirs(self.trashdir)
|
78 |
|
79 |
dbpath = join(SETTINGS_DIR, 'sync.db')
|
80 |
self.conn = sqlite3.connect(dbpath)
|
81 |
self.conn.execute(SQL_CREATE_FILES_TABLE)
|
82 |
self.conn.commit()
|
83 |
|
84 |
def current_hash(self, path): |
85 |
"""Return the hash of the file as it exists now in the filesystem"""
|
86 |
|
87 |
fullpath = join(self.syncdir, path)
|
88 |
if fullpath in self.deleted_dirs: |
89 |
return 'DEL' |
90 |
if not exists(fullpath): |
91 |
return 'DEL' |
92 |
if isdir(fullpath):
|
93 |
return 'DIR' |
94 |
return merkle(fullpath)
|
95 |
|
96 |
def delete_inactive(self, timestamp): |
97 |
sql = 'DELETE FROM files WHERE timestamp != ?'
|
98 |
self.conn.execute(sql, (timestamp,))
|
99 |
self.conn.commit()
|
100 |
|
101 |
def download(self, path, hash): |
102 |
fullpath = join(self.syncdir, path)
|
103 |
if hash == 'DEL': |
104 |
self.trash(path)
|
105 |
elif hash == 'DIR': |
106 |
_makedirs(fullpath) |
107 |
else:
|
108 |
self.trash(path) # Trash any old version |
109 |
localpath = self.find_hash(hash) |
110 |
if localpath:
|
111 |
copyfile(localpath, fullpath) |
112 |
else:
|
113 |
print 'Downloading %s...' % path |
114 |
download(client, self.container, path, fullpath)
|
115 |
|
116 |
current = self.current_hash(path)
|
117 |
assert current == hash, "Downloaded file does not match hash" |
118 |
self.save(path, hash) |
119 |
|
120 |
def empty_trash(self): |
121 |
for filename in os.listdir(self.trashdir): |
122 |
path = join(self.trashdir, filename)
|
123 |
os.remove(path) |
124 |
|
125 |
def find_hash(self, hash): |
126 |
sql = 'SELECT path FROM files WHERE hash = ?'
|
127 |
ret = self.conn.execute(sql, (hash,)).fetchone() |
128 |
if ret:
|
129 |
return join(self.syncdir, ret[0]) |
130 |
|
131 |
if hash in os.listdir(self.trashdir): |
132 |
return join(self.trashdir, hash) |
133 |
|
134 |
return None |
135 |
|
136 |
def previous_hash(self, path): |
137 |
"""Return the hash of the file according to the previous sync with
|
138 |
the server. Return DEL if not such entry exists."""
|
139 |
|
140 |
sql = 'SELECT hash FROM files WHERE path = ?'
|
141 |
ret = self.conn.execute(sql, (path,)).fetchone()
|
142 |
return ret[0] if ret else 'DEL' |
143 |
|
144 |
def remote_hash(self, path): |
145 |
"""Return the hash of the file according to the server"""
|
146 |
|
147 |
try:
|
148 |
meta = client.retrieve_object_metadata(self.container, path)
|
149 |
except Fault:
|
150 |
return 'DEL' |
151 |
if meta.get('content-type', '').split(';', 1)[0].strip() == 'application/directory': |
152 |
return 'DIR' |
153 |
else:
|
154 |
return meta['x-object-hash'] |
155 |
|
156 |
def remove_deleted_dirs(self): |
157 |
for path in sorted(self.deleted_dirs, key=len, reverse=True): |
158 |
os.rmdir(path) |
159 |
self.deleted_dirs.remove(path)
|
160 |
|
161 |
def resolve_conflict(self, path, hash): |
162 |
"""Resolve a sync conflict by renaming the local file and downloading
|
163 |
the remote one."""
|
164 |
|
165 |
fullpath = join(self.syncdir, path)
|
166 |
resolved = fullpath + '.local'
|
167 |
i = 0
|
168 |
while exists(resolved):
|
169 |
i += 1
|
170 |
resolved = fullpath + '.local%d' % i
|
171 |
|
172 |
os.rename(fullpath, resolved) |
173 |
self.download(path, hash) |
174 |
|
175 |
def rmdir(self, path): |
176 |
"""Remove a dir or mark for deletion if non-empty
|
177 |
|
178 |
If a dir is empty delete it and check if any of its parents should be
|
179 |
deleted too. Else mark it for later deletion.
|
180 |
"""
|
181 |
|
182 |
fullpath = join(self.syncdir, path)
|
183 |
if not exists(fullpath): |
184 |
return
|
185 |
|
186 |
if os.listdir(fullpath):
|
187 |
# Directory not empty
|
188 |
self.deleted_dirs.add(fullpath)
|
189 |
return
|
190 |
|
191 |
os.rmdir(fullpath) |
192 |
self.deleted_dirs.discard(fullpath)
|
193 |
|
194 |
parent = dirname(fullpath) |
195 |
while parent in self.deleted_dirs: |
196 |
os.rmdir(parent) |
197 |
self.deleted_dirs.remove(parent)
|
198 |
parent = dirname(parent) |
199 |
|
200 |
def save(self, path, hash): |
201 |
"""Save the hash value of a file. This value will be later returned
|
202 |
by `previous_hash`."""
|
203 |
|
204 |
sql = 'INSERT OR REPLACE INTO files (path, hash) VALUES (?, ?)'
|
205 |
self.conn.execute(sql, (path, hash)) |
206 |
self.conn.commit()
|
207 |
|
208 |
def touch(self, path, now): |
209 |
sql = 'UPDATE files SET timestamp = ? WHERE path = ?'
|
210 |
self.conn.execute(sql, (now, path))
|
211 |
self.conn.commit()
|
212 |
|
213 |
def trash(self, path): |
214 |
"""Move a file to trash or delete it if it's a directory"""
|
215 |
|
216 |
fullpath = join(self.syncdir, path)
|
217 |
if not exists(fullpath): |
218 |
return
|
219 |
|
220 |
if isfile(fullpath):
|
221 |
hash = merkle(fullpath) |
222 |
trashpath = join(self.trashdir, hash) |
223 |
os.rename(fullpath, trashpath) |
224 |
else:
|
225 |
self.rmdir(path)
|
226 |
|
227 |
def upload(self, path, hash): |
228 |
fullpath = join(self.syncdir, path)
|
229 |
if hash == 'DEL': |
230 |
client.delete_object(self.container, path)
|
231 |
elif hash == 'DIR': |
232 |
client.create_directory_marker(self.container, path)
|
233 |
else:
|
234 |
prefix, name = split(path) |
235 |
if prefix:
|
236 |
prefix += '/'
|
237 |
print 'Uploading %s...' % path |
238 |
upload(client, fullpath, self.container, prefix, name)
|
239 |
|
240 |
remote = self.remote_hash(path)
|
241 |
assert remote == hash, "Uploaded file does not match hash" |
242 |
self.save(path, hash) |
243 |
|
244 |
|
245 |
def sync(path, state): |
246 |
previous = state.previous_hash(path) |
247 |
current = state.current_hash(path) |
248 |
remote = state.remote_hash(path) |
249 |
|
250 |
if current == previous:
|
251 |
# No local changes, download any remote changes
|
252 |
if remote != previous:
|
253 |
state.download(path, remote) |
254 |
elif remote == previous:
|
255 |
# No remote changes, upload any local changes
|
256 |
if current != previous:
|
257 |
state.upload(path, current) |
258 |
else:
|
259 |
# Both local and remote file have changes since last sync
|
260 |
if current == remote:
|
261 |
state.save(path, remote) # Local and remote changes match
|
262 |
else:
|
263 |
state.resolve_conflict(path, remote) |
264 |
|
265 |
|
266 |
def walk(dir, container): |
267 |
"""Iterates on the files of the hierarchy created by merging the files
|
268 |
in `dir` and the objects in `container`."""
|
269 |
|
270 |
pending = ['']
|
271 |
|
272 |
while pending:
|
273 |
dirs = set()
|
274 |
files = set()
|
275 |
root = pending.pop(0) # Depth First Traversal |
276 |
if root == TRASH_DIR:
|
277 |
continue
|
278 |
if root:
|
279 |
yield root
|
280 |
|
281 |
dirpath = join(dir, root)
|
282 |
if exists(dirpath):
|
283 |
for filename in os.listdir(dirpath): |
284 |
path = join(root, filename) |
285 |
if isdir(join(dir, path)): |
286 |
dirs.add(path) |
287 |
else:
|
288 |
files.add(path) |
289 |
|
290 |
for object in client.list_objects(container, format='json', |
291 |
prefix=root, delimiter='/'):
|
292 |
if 'subdir' in object: |
293 |
continue
|
294 |
name = object['name'] |
295 |
if object['content_type'].split(';', 1)[0].strip() == 'application/directory': |
296 |
dirs.add(name) |
297 |
else:
|
298 |
files.add(name) |
299 |
|
300 |
pending += sorted(dirs)
|
301 |
for path in files: |
302 |
yield path
|
303 |
|
304 |
|
305 |
def main(): |
306 |
if len(sys.argv) != 2: |
307 |
print 'syntax: %s <dir>' % sys.argv[0] |
308 |
sys.exit(1)
|
309 |
|
310 |
syncdir = sys.argv[1]
|
311 |
|
312 |
_makedirs(SETTINGS_DIR) |
313 |
container = os.environ.get('PITHOS_SYNC_CONTAINER', DEFAULT_CONTAINER)
|
314 |
client.create_container(container) |
315 |
|
316 |
state = State(syncdir, container) |
317 |
|
318 |
now = int(time())
|
319 |
for path in walk(syncdir, container): |
320 |
print 'Syncing', path |
321 |
sync(path, state) |
322 |
state.touch(path, now) |
323 |
|
324 |
state.delete_inactive(now) |
325 |
state.empty_trash() |
326 |
state.remove_deleted_dirs() |
327 |
|
328 |
|
329 |
if __name__ == '__main__': |
330 |
main() |