Revision c0b10ca6 tools/pithos-sync
b/tools/pithos-sync | ||
---|---|---|
36 | 36 |
import os |
37 | 37 |
import sqlite3 |
38 | 38 |
import sys |
39 |
import shutil |
|
40 |
import pickle |
|
41 | 39 |
|
42 |
from lib import transfer |
|
40 |
from os.path import exists, expanduser, isdir, isfile, join, split |
|
41 |
from shutil import copyfile |
|
42 |
from time import time |
|
43 |
|
|
44 |
from lib.transfer import download, upload |
|
43 | 45 |
from lib.client import Pithos_Client, Fault |
44 |
from lib.hashmap import HashMap, merkle
|
|
46 |
from lib.hashmap import merkle |
|
45 | 47 |
from lib.util import get_user, get_auth, get_server |
46 | 48 |
|
47 | 49 |
|
48 | 50 |
DEFAULT_CONTAINER = 'pithos' |
51 |
SETTINGS_DIR = expanduser('~/.pithos') |
|
52 |
TRASH_DIR = '.pithos_trash' |
|
49 | 53 |
|
50 |
def get_container(): |
|
51 |
try: |
|
52 |
return os.environ['PITHOS_SYNC_CONTAINER'] |
|
53 |
except KeyError: |
|
54 |
return DEFAULT_CONTAINER |
|
55 |
|
|
54 |
SQL_CREATE_FILES_TABLE = '''CREATE TABLE IF NOT EXISTS files ( |
|
55 |
path TEXT PRIMARY KEY, |
|
56 |
hash TEXT, |
|
57 |
timestamp INTEGER)''' |
|
56 | 58 |
|
57 |
def create_dir(path): |
|
58 |
if not os.path.exists(path): |
|
59 |
os.makedirs(path) |
|
60 |
if not os.path.isdir(path): |
|
61 |
raise RuntimeError("Cannot open '%s'" % (path,)) |
|
62 | 59 |
|
60 |
client = Pithos_Client(get_server(), get_auth(), get_user()) |
|
63 | 61 |
|
64 |
def copy_file(src, dst): |
|
65 |
print '***', 'COPYING', src, dst |
|
66 |
path = os.path.dirname(dst) |
|
67 |
create_dir(path) |
|
68 |
shutil.copyfile(src, dst) |
|
69 | 62 |
|
63 |
def _makedirs(path): |
|
64 |
try: |
|
65 |
os.makedirs(path) |
|
66 |
except OSError: |
|
67 |
pass |
|
70 | 68 |
|
71 |
client = None |
|
72 |
conf = None |
|
73 |
confdir = None |
|
74 |
trash = None |
|
75 |
lstate = None |
|
76 |
cstate = None |
|
77 |
rstate = None |
|
78 | 69 |
|
70 |
class State(object): |
|
71 |
def __init__(self, syncdir, container): |
|
72 |
self.syncdir = syncdir |
|
73 |
self.container = container |
|
74 |
self.trashdir = join(syncdir, TRASH_DIR) |
|
75 |
self.deleted_dirs = set() |
|
79 | 76 |
|
80 |
class Trash(object): |
|
81 |
def __init__(self): |
|
82 |
self.path = os.path.join(confdir, 'trash') |
|
83 |
create_dir(self.path) |
|
77 |
_makedirs(self.trashdir) |
|
84 | 78 |
|
85 |
dbpath = os.path.join(confdir, 'trash.db')
|
|
79 |
dbpath = join(SETTINGS_DIR, 'sync.db')
|
|
86 | 80 |
self.conn = sqlite3.connect(dbpath) |
87 |
sql = '''CREATE TABLE IF NOT EXISTS files ( |
|
88 |
path TEXT PRIMARY KEY, hash TEXT)''' |
|
89 |
self.conn.execute(sql) |
|
81 |
self.conn.execute(SQL_CREATE_FILES_TABLE) |
|
90 | 82 |
self.conn.commit() |
91 | 83 |
|
92 |
def put(self, fullpath, path, hash):
|
|
93 |
copy_file(fullpath, os.path.join(self.path, path))
|
|
94 |
os.remove(fullpath) |
|
95 |
sql = 'INSERT OR REPLACE INTO files VALUES (?, ?)'
|
|
96 |
self.conn.execute(sql, (path, hash))
|
|
97 |
self.conn.commit()
|
|
98 |
|
|
99 |
def search(self, hash):
|
|
100 |
sql = 'SELECT path FROM files WHERE hash = ?'
|
|
101 |
ret = self.conn.execute(sql, (hash,)).fetchone()
|
|
102 |
return ret[0] if ret else None
|
|
84 |
def current_hash(self, path):
|
|
85 |
"""Return the hash of the file as it exists now in the filesystem"""
|
|
86 |
|
|
87 |
fullpath = join(self.syncdir, path)
|
|
88 |
if fullpath in self.deleted_dirs:
|
|
89 |
return 'DEL'
|
|
90 |
if not exists(fullpath): |
|
91 |
return 'DEL'
|
|
92 |
if isdir(fullpath):
|
|
93 |
return 'DIR'
|
|
94 |
return merkle(fullpath)
|
|
103 | 95 |
|
104 |
def empty(self):
|
|
105 |
sql = 'DELETE FROM files' |
|
106 |
self.conn.execute(sql) |
|
96 |
def delete_inactive(self, timestamp):
|
|
97 |
sql = 'DELETE FROM files WHERE timestamp != ?'
|
|
98 |
self.conn.execute(sql, (timestamp,))
|
|
107 | 99 |
self.conn.commit() |
108 |
shutil.rmtree(self.path) |
|
109 | 100 |
|
110 |
def fullpath(self, path): |
|
111 |
return os.path.join(self.path, path) |
|
112 |
|
|
113 |
|
|
114 |
class LocalState(object): |
|
115 |
def __init__(self, path): |
|
116 |
self.path = path |
|
101 |
def download(self, path, hash): |
|
102 |
fullpath = join(self.syncdir, path) |
|
103 |
if hash == 'DEL': |
|
104 |
self.trash(path) |
|
105 |
elif hash == 'DIR': |
|
106 |
_makedirs(fullpath) |
|
107 |
else: |
|
108 |
self.trash(path) # Trash any old version |
|
109 |
localpath = self.find_hash(hash) |
|
110 |
if localpath: |
|
111 |
copyfile(localpath, fullpath) |
|
112 |
else: |
|
113 |
print 'Downloading %s...' % path |
|
114 |
download(client, self.container, path, fullpath) |
|
117 | 115 |
|
118 |
dbpath = os.path.join(confdir, 'state.db') |
|
119 |
self.conn = sqlite3.connect(dbpath) |
|
120 |
sql = '''CREATE TABLE IF NOT EXISTS files ( |
|
121 |
path TEXT PRIMARY KEY, hash TEXT)''' |
|
122 |
self.conn.execute(sql) |
|
123 |
self.conn.commit() |
|
116 |
current = self.current_hash(path) |
|
117 |
assert current == hash, "Downloaded file does not match hash" |
|
118 |
self.save(path, hash) |
|
124 | 119 |
|
125 |
def get(self, path):
|
|
126 |
sql = 'SELECT hash FROM files WHERE path = ?'
|
|
127 |
ret = self.conn.execute(sql, (path,)).fetchone()
|
|
128 |
return ret[0] if ret else 'DEL'
|
|
120 |
def empty_trash(self):
|
|
121 |
for filename in os.listdir(self.trashdir):
|
|
122 |
path = join(self.trashdir, filename)
|
|
123 |
os.remove(path)
|
|
129 | 124 |
|
130 |
def put(self, path, hash): |
|
131 |
sql = 'INSERT OR REPLACE INTO files VALUES (?, ?)' |
|
132 |
self.conn.execute(sql, (path, hash)) |
|
133 |
self.conn.commit() |
|
134 |
|
|
135 |
def search(self, hash): |
|
125 |
def find_hash(self, hash): |
|
136 | 126 |
sql = 'SELECT path FROM files WHERE hash = ?' |
137 | 127 |
ret = self.conn.execute(sql, (hash,)).fetchone() |
138 |
return ret[0] if ret else None |
|
139 |
|
|
140 |
def fullpath(self, path): |
|
141 |
return os.path.join(self.path, path) |
|
142 |
|
|
143 |
|
|
144 |
class CurrentState(object): |
|
145 |
def __init__(self, path): |
|
146 |
self.path = path |
|
147 |
|
|
148 |
def get(self, path): |
|
149 |
fullpath = os.path.join(self.path, path) |
|
150 |
if os.path.exists(fullpath): |
|
151 |
if os.path.isdir(fullpath): |
|
152 |
return 'DIR' |
|
153 |
else: |
|
154 |
return merkle(fullpath, conf['blocksize'], conf['blockhash']) |
|
155 |
else: |
|
156 |
return 'DEL' |
|
128 |
if ret: |
|
129 |
return join(self.syncdir, ret[0]) |
|
130 |
|
|
131 |
if hash in os.listdir(self.trashdir): |
|
132 |
return join(self.trashdir, hash) |
|
133 |
|
|
134 |
return None |
|
157 | 135 |
|
158 |
def fullpath(self, path): |
|
159 |
return os.path.join(self.path, path) |
|
160 |
|
|
161 |
|
|
162 |
class RemoteState(object): |
|
163 |
def __init__(self, client): |
|
164 |
self.client = client |
|
165 |
self.container = get_container() |
|
136 |
def previous_hash(self, path): |
|
137 |
"""Return the hash of the file according to the previous sync with |
|
138 |
the server. Return DEL if not such entry exists.""" |
|
139 |
|
|
140 |
sql = 'SELECT hash FROM files WHERE path = ?' |
|
141 |
ret = self.conn.execute(sql, (path,)).fetchone() |
|
142 |
return ret[0] if ret else 'DEL' |
|
166 | 143 |
|
167 |
def get(self, path): |
|
144 |
def remote_hash(self, path): |
|
145 |
"""Return the hash of the file according to the server""" |
|
146 |
|
|
168 | 147 |
try: |
169 |
meta = self.client.retrieve_object_metadata(self.container, path)
|
|
148 |
meta = client.retrieve_object_metadata(self.container, path) |
|
170 | 149 |
except Fault: |
171 | 150 |
return 'DEL' |
172 | 151 |
if meta.get('content-type', None) == 'application/directory': |
173 | 152 |
return 'DIR' |
174 | 153 |
else: |
175 | 154 |
return meta['x-object-hash'] |
176 |
|
|
177 |
|
|
178 |
def update_local(path, S): |
|
179 |
# XXX If something is already here, put it in trash and delete it. |
|
180 |
# XXX If we have a directory already here, put all files in trash. |
|
181 |
fullpath = cstate.fullpath(path) |
|
182 |
if S == 'DEL': |
|
183 |
trash.put(fullpath, path, S) |
|
184 |
elif S == 'DIR': |
|
185 |
if os.path.exists(fullpath): |
|
186 |
trash.put(fullpath, path, S) |
|
187 |
# XXX Strip trailing slash (or escape). |
|
188 |
os.mkdir(fullpath) |
|
189 |
else: |
|
190 |
# First, search for local copy |
|
191 |
file = lstate.search(S) |
|
192 |
if file: |
|
193 |
copy_file(lstate.fullpath(file), fullpath) |
|
155 |
|
|
156 |
def remove_deleted_dirs(self): |
|
157 |
for path in sorted(self.deleted_dirs, key=len, reverse=True): |
|
158 |
os.rmdir(path) |
|
159 |
self.deleted_dirs.remove(path) |
|
160 |
|
|
161 |
def resolve_conflict(self, path, hash): |
|
162 |
"""Resolve a sync conflict by renaming the local file and downloading |
|
163 |
the remote one.""" |
|
164 |
|
|
165 |
fullpath = join(self.syncdir, path) |
|
166 |
resolved = fullpath + '.local' |
|
167 |
i = 0 |
|
168 |
while exists(resolved): |
|
169 |
i += 1 |
|
170 |
resolved = fullpath + '.local%d' % i |
|
171 |
|
|
172 |
os.rename(fullpath, resolved) |
|
173 |
self.download(path, hash) |
|
174 |
|
|
175 |
def rmdir(self, path): |
|
176 |
"""Remove a dir or mark for deletion if non-empty |
|
177 |
|
|
178 |
If a dir is empty delete it and check if any of its parents should be |
|
179 |
deleted too. Else mark it for later deletion. |
|
180 |
""" |
|
181 |
|
|
182 |
fullpath = join(self.syncdir, path) |
|
183 |
if not exists(fullpath): |
|
184 |
return |
|
185 |
|
|
186 |
if os.listdir(fullpath): |
|
187 |
# Directory not empty |
|
188 |
self.deleted_dirs.add(fullpath) |
|
189 |
return |
|
190 |
|
|
191 |
os.rmdir(fullpath) |
|
192 |
self.deleted_dirs.discard(fullpath) |
|
193 |
|
|
194 |
parent = dirname(fullpath) |
|
195 |
while parent in self.deleted_dirs: |
|
196 |
os.rmdir(parent) |
|
197 |
self.deleted_dirs.remove(parent) |
|
198 |
parent = dirname(parent) |
|
199 |
|
|
200 |
def save(self, path, hash): |
|
201 |
"""Save the hash value of a file. This value will be later returned |
|
202 |
by `previous_hash`.""" |
|
203 |
|
|
204 |
sql = 'INSERT OR REPLACE INTO files (path, hash) VALUES (?, ?)' |
|
205 |
self.conn.execute(sql, (path, hash)) |
|
206 |
self.conn.commit() |
|
207 |
|
|
208 |
def touch(self, path, now): |
|
209 |
sql = 'UPDATE files SET timestamp = ? WHERE path = ?' |
|
210 |
self.conn.execute(sql, (now, path)) |
|
211 |
self.conn.commit() |
|
212 |
|
|
213 |
def trash(self, path): |
|
214 |
"""Move a file to trash or delete it if it's a directory""" |
|
215 |
|
|
216 |
fullpath = join(self.syncdir, path) |
|
217 |
if not exists(fullpath): |
|
218 |
return |
|
219 |
|
|
220 |
if isfile(fullpath): |
|
221 |
hash = merkle(fullpath) |
|
222 |
trashpath = join(self.trashdir, hash) |
|
223 |
os.rename(fullpath, trashpath) |
|
194 | 224 |
else: |
195 |
# Search for copy in trash |
|
196 |
file = trash.search(S) |
|
197 |
if file: |
|
198 |
# XXX Move from trash (not copy). |
|
199 |
copy_file(trash.fullpath(file), fullpath) |
|
200 |
else: |
|
201 |
# Download |
|
202 |
transfer.download(client, get_container(), path, fullpath) |
|
203 |
assert cstate.get(path) == S |
|
204 |
|
|
205 |
|
|
206 |
def update_remote(path, S): |
|
207 |
fullpath = cstate.fullpath(path) |
|
208 |
if S == 'DEL': |
|
209 |
client.delete_object(get_container(), path) |
|
210 |
elif S == 'DIR': |
|
211 |
client.create_directory_marker(get_container(), path) |
|
212 |
else: |
|
213 |
prefix, name = os.path.split(path) |
|
214 |
if prefix: |
|
215 |
prefix += '/' |
|
216 |
transfer.upload(client, fullpath, get_container(), prefix, name) |
|
217 |
assert rstate.get(path) == S |
|
218 |
|
|
219 |
|
|
220 |
def resolve_conflict(path): |
|
221 |
# XXX Check if this works with dirs. |
|
222 |
fullpath = cstate.fullpath(path) |
|
223 |
if os.path.exists(fullpath): |
|
224 |
os.rename(fullpath, fullpath + '.local') |
|
225 |
|
|
225 |
self.rmdir(path) |
|
226 |
|
|
227 |
def upload(self, path, hash): |
|
228 |
fullpath = join(self.syncdir, path) |
|
229 |
if hash == 'DEL': |
|
230 |
client.delete_object(self.container, path) |
|
231 |
elif hash == 'DIR': |
|
232 |
client.create_directory_marker(self.container, path) |
|
233 |
else: |
|
234 |
prefix, name = split(path) |
|
235 |
if prefix: |
|
236 |
prefix += '/' |
|
237 |
print 'Uploading %s...' % path |
|
238 |
upload(client, fullpath, self.container, prefix, name) |
|
239 |
|
|
240 |
remote = self.remote_hash(path) |
|
241 |
assert remote == hash, "Uploaded file does not match hash" |
|
242 |
self.save(path, hash) |
|
226 | 243 |
|
227 |
def sync(path): |
|
228 |
L = lstate.get(path) |
|
229 |
C = cstate.get(path) |
|
230 |
R = rstate.get(path) |
|
231 | 244 |
|
232 |
if C == L: |
|
233 |
# No local changes |
|
234 |
if R != L: |
|
235 |
update_local(path, R) |
|
236 |
lstate.put(path, R) |
|
237 |
return |
|
245 |
def sync(path, state): |
|
246 |
previous = state.previous_hash(path) |
|
247 |
current = state.current_hash(path) |
|
248 |
remote = state.remote_hash(path) |
|
238 | 249 |
|
239 |
if R == L: |
|
240 |
# No remote changes |
|
241 |
if C != L: |
|
242 |
update_remote(path, C) |
|
243 |
lstate.put(path, C) |
|
244 |
return |
|
245 |
|
|
246 |
# At this point both local and remote states have changes since last sync |
|
247 |
|
|
248 |
if C == R: |
|
249 |
# We were lucky, both had the same change |
|
250 |
lstate.put(path, R) |
|
250 |
if current == previous: |
|
251 |
# No local changes, download any remote changes |
|
252 |
if remote != previous: |
|
253 |
state.download(path, remote) |
|
254 |
elif remote == previous: |
|
255 |
# No remote changes, upload any local changes |
|
256 |
if current != previous: |
|
257 |
state.upload(path, current) |
|
251 | 258 |
else: |
252 |
# Conflict, try to resolve it |
|
253 |
resolve_conflict(path) |
|
254 |
update_local(path, R) |
|
255 |
lstate.put(path, R) |
|
259 |
# Both local and remote file have changes since last sync |
|
260 |
if current == remote: |
|
261 |
state.save(path, remote) # Local and remote changes match |
|
262 |
else: |
|
263 |
state.resolve_conflict(path, remote) |
|
256 | 264 |
|
257 | 265 |
|
258 |
def walk(dir): |
|
266 |
def walk(dir, container): |
|
267 |
"""Iterates on the files of the hierarchy created by merging the files |
|
268 |
in `dir` and the objects in `container`.""" |
|
269 |
|
|
259 | 270 |
pending = [''] |
260 | 271 |
|
261 | 272 |
while pending: |
262 | 273 |
dirs = set() |
263 | 274 |
files = set() |
264 |
root = pending.pop(0) |
|
275 |
root = pending.pop(0) # Depth First Traversal |
|
276 |
if root == TRASH_DIR: |
|
277 |
continue |
|
265 | 278 |
if root: |
266 | 279 |
yield root |
267 | 280 |
|
268 |
dirpath = os.path.join(dir, root)
|
|
269 |
if os.path.exists(dirpath):
|
|
281 |
dirpath = join(dir, root) |
|
282 |
if exists(dirpath): |
|
270 | 283 |
for filename in os.listdir(dirpath): |
271 |
path = os.path.join(root, filename)
|
|
272 |
if os.path.isdir(os.path.join(dir, path)):
|
|
284 |
path = join(root, filename) |
|
285 |
if isdir(join(dir, path)):
|
|
273 | 286 |
dirs.add(path) |
274 | 287 |
else: |
275 | 288 |
files.add(path) |
276 | 289 |
|
277 |
for object in client.list_objects(get_container(), prefix=root, |
|
278 |
delimiter='/', format='json'): |
|
279 |
# XXX Check subdirs. |
|
290 |
for object in client.list_objects(container, format='json', |
|
291 |
prefix=root, delimiter='/'): |
|
280 | 292 |
if 'subdir' in object: |
281 | 293 |
continue |
282 |
name = str(object['name'])
|
|
294 |
name = object['name']
|
|
283 | 295 |
if object['content_type'] == 'application/directory': |
284 | 296 |
dirs.add(name) |
285 | 297 |
else: |
... | ... | |
291 | 303 |
|
292 | 304 |
|
293 | 305 |
def main(): |
294 |
global client, conf, confdir, trash, lstate, cstate, rstate |
|
295 |
|
|
296 | 306 |
if len(sys.argv) != 2: |
297 | 307 |
print 'syntax: %s <dir>' % sys.argv[0] |
298 | 308 |
sys.exit(1) |
299 | 309 |
|
300 |
dir = sys.argv[1] |
|
301 |
client = Pithos_Client(get_server(), get_auth(), get_user()) |
|
310 |
syncdir = sys.argv[1] |
|
302 | 311 |
|
303 |
container = get_container() |
|
304 |
try: |
|
305 |
meta = client.retrieve_container_metadata(container) |
|
306 |
except Fault: |
|
307 |
raise RuntimeError("Cannot open container '%s'" % (container,)) |
|
308 |
|
|
309 |
conf = {'local': dir, |
|
310 |
'remote': container, |
|
311 |
'blocksize': int(meta['x-container-block-size']), |
|
312 |
'blockhash': meta['x-container-block-hash']} |
|
313 |
confdir = os.path.expanduser('~/.pithos-sync/') |
|
312 |
_makedirs(SETTINGS_DIR) |
|
313 |
container = os.environ.get('PITHOS_SYNC_CONTAINER', DEFAULT_CONTAINER) |
|
314 |
client.create_container(container) |
|
314 | 315 |
|
315 |
conffile = os.path.join(confdir, 'config') |
|
316 |
if os.path.isfile(conffile): |
|
317 |
try: |
|
318 |
if (conf != pickle.loads(open(conffile, 'rb').read())): |
|
319 |
raise ValueError |
|
320 |
except: |
|
321 |
shutil.rmtree(confdir) |
|
322 |
create_dir(confdir) |
|
316 |
state = State(syncdir, container) |
|
323 | 317 |
|
324 |
trash = Trash() |
|
325 |
lstate = LocalState(dir) |
|
326 |
cstate = CurrentState(dir) |
|
327 |
rstate = RemoteState(client) |
|
328 |
|
|
329 |
for path in walk(dir): |
|
318 |
now = int(time()) |
|
319 |
for path in walk(syncdir, container): |
|
330 | 320 |
print 'Syncing', path |
331 |
sync(path) |
|
332 |
|
|
333 |
f = open(conffile, 'wb') |
|
334 |
f.write(pickle.dumps(conf)) |
|
335 |
f.close() |
|
321 |
sync(path, state) |
|
322 |
state.touch(path, now) |
|
336 | 323 |
|
337 |
trash.empty() |
|
324 |
state.delete_inactive(now) |
|
325 |
state.empty_trash() |
|
326 |
state.remove_deleted_dirs() |
|
338 | 327 |
|
339 | 328 |
|
340 | 329 |
if __name__ == '__main__': |
Also available in: Unified diff