From: Giorgos Verigakis Date: Fri, 2 Dec 2011 14:20:50 +0000 (+0200) Subject: Improve pithos-sync X-Git-Tag: pithos/v0.8.1~2 X-Git-Url: https://code.grnet.gr/git/pithos/commitdiff_plain/c0b10ca658f625f38bf7f9ad0acdbe89db360eb7 Improve pithos-sync * Do not redownload files that exist locally. * General refactoring to simplify the code. Refs #1495 --- diff --git a/tools/lib/transfer.py b/tools/lib/transfer.py index e5783eb..a854d4e 100644 --- a/tools/lib/transfer.py +++ b/tools/lib/transfer.py @@ -87,7 +87,6 @@ def download(client, container, object, path): blockhash = res['block_hash'] bytes = res['bytes'] map = res['hashes'] - print res if os.path.exists(path): h = HashMap(blocksize, blockhash) diff --git a/tools/pithos-sync b/tools/pithos-sync index d4b64d5..1c69769 100755 --- a/tools/pithos-sync +++ b/tools/pithos-sync @@ -36,250 +36,262 @@ import os import sqlite3 import sys -import shutil -import pickle -from lib import transfer +from os.path import exists, expanduser, isdir, isfile, join, split +from shutil import copyfile +from time import time + +from lib.transfer import download, upload from lib.client import Pithos_Client, Fault -from lib.hashmap import HashMap, merkle +from lib.hashmap import merkle from lib.util import get_user, get_auth, get_server DEFAULT_CONTAINER = 'pithos' +SETTINGS_DIR = expanduser('~/.pithos') +TRASH_DIR = '.pithos_trash' -def get_container(): - try: - return os.environ['PITHOS_SYNC_CONTAINER'] - except KeyError: - return DEFAULT_CONTAINER - +SQL_CREATE_FILES_TABLE = '''CREATE TABLE IF NOT EXISTS files ( + path TEXT PRIMARY KEY, + hash TEXT, + timestamp INTEGER)''' -def create_dir(path): - if not os.path.exists(path): - os.makedirs(path) - if not os.path.isdir(path): - raise RuntimeError("Cannot open '%s'" % (path,)) +client = Pithos_Client(get_server(), get_auth(), get_user()) -def copy_file(src, dst): - print '***', 'COPYING', src, dst - path = os.path.dirname(dst) - create_dir(path) - shutil.copyfile(src, dst) +def _makedirs(path): + try: + os.makedirs(path) + except OSError: + pass -client = None -conf = None -confdir = None -trash = None -lstate = None -cstate = None -rstate = None +class State(object): + def __init__(self, syncdir, container): + self.syncdir = syncdir + self.container = container + self.trashdir = join(syncdir, TRASH_DIR) + self.deleted_dirs = set() -class Trash(object): - def __init__(self): - self.path = os.path.join(confdir, 'trash') - create_dir(self.path) + _makedirs(self.trashdir) - dbpath = os.path.join(confdir, 'trash.db') + dbpath = join(SETTINGS_DIR, 'sync.db') self.conn = sqlite3.connect(dbpath) - sql = '''CREATE TABLE IF NOT EXISTS files ( - path TEXT PRIMARY KEY, hash TEXT)''' - self.conn.execute(sql) + self.conn.execute(SQL_CREATE_FILES_TABLE) self.conn.commit() - def put(self, fullpath, path, hash): - copy_file(fullpath, os.path.join(self.path, path)) - os.remove(fullpath) - sql = 'INSERT OR REPLACE INTO files VALUES (?, ?)' - self.conn.execute(sql, (path, hash)) - self.conn.commit() - - def search(self, hash): - sql = 'SELECT path FROM files WHERE hash = ?' - ret = self.conn.execute(sql, (hash,)).fetchone() - return ret[0] if ret else None + def current_hash(self, path): + """Return the hash of the file as it exists now in the filesystem""" + + fullpath = join(self.syncdir, path) + if fullpath in self.deleted_dirs: + return 'DEL' + if not exists(fullpath): + return 'DEL' + if isdir(fullpath): + return 'DIR' + return merkle(fullpath) - def empty(self): - sql = 'DELETE FROM files' - self.conn.execute(sql) + def delete_inactive(self, timestamp): + sql = 'DELETE FROM files WHERE timestamp != ?' + self.conn.execute(sql, (timestamp,)) self.conn.commit() - shutil.rmtree(self.path) - def fullpath(self, path): - return os.path.join(self.path, path) - - -class LocalState(object): - def __init__(self, path): - self.path = path + def download(self, path, hash): + fullpath = join(self.syncdir, path) + if hash == 'DEL': + self.trash(path) + elif hash == 'DIR': + _makedirs(fullpath) + else: + self.trash(path) # Trash any old version + localpath = self.find_hash(hash) + if localpath: + copyfile(localpath, fullpath) + else: + print 'Downloading %s...' % path + download(client, self.container, path, fullpath) - dbpath = os.path.join(confdir, 'state.db') - self.conn = sqlite3.connect(dbpath) - sql = '''CREATE TABLE IF NOT EXISTS files ( - path TEXT PRIMARY KEY, hash TEXT)''' - self.conn.execute(sql) - self.conn.commit() + current = self.current_hash(path) + assert current == hash, "Downloaded file does not match hash" + self.save(path, hash) - def get(self, path): - sql = 'SELECT hash FROM files WHERE path = ?' - ret = self.conn.execute(sql, (path,)).fetchone() - return ret[0] if ret else 'DEL' + def empty_trash(self): + for filename in os.listdir(self.trashdir): + path = join(self.trashdir, filename) + os.remove(path) - def put(self, path, hash): - sql = 'INSERT OR REPLACE INTO files VALUES (?, ?)' - self.conn.execute(sql, (path, hash)) - self.conn.commit() - - def search(self, hash): + def find_hash(self, hash): sql = 'SELECT path FROM files WHERE hash = ?' ret = self.conn.execute(sql, (hash,)).fetchone() - return ret[0] if ret else None - - def fullpath(self, path): - return os.path.join(self.path, path) - - -class CurrentState(object): - def __init__(self, path): - self.path = path - - def get(self, path): - fullpath = os.path.join(self.path, path) - if os.path.exists(fullpath): - if os.path.isdir(fullpath): - return 'DIR' - else: - return merkle(fullpath, conf['blocksize'], conf['blockhash']) - else: - return 'DEL' + if ret: + return join(self.syncdir, ret[0]) + + if hash in os.listdir(self.trashdir): + return join(self.trashdir, hash) + + return None - def fullpath(self, path): - return os.path.join(self.path, path) - - -class RemoteState(object): - def __init__(self, client): - self.client = client - self.container = get_container() + def previous_hash(self, path): + """Return the hash of the file according to the previous sync with + the server. Return DEL if not such entry exists.""" + + sql = 'SELECT hash FROM files WHERE path = ?' + ret = self.conn.execute(sql, (path,)).fetchone() + return ret[0] if ret else 'DEL' - def get(self, path): + def remote_hash(self, path): + """Return the hash of the file according to the server""" + try: - meta = self.client.retrieve_object_metadata(self.container, path) + meta = client.retrieve_object_metadata(self.container, path) except Fault: return 'DEL' if meta.get('content-type', None) == 'application/directory': return 'DIR' else: return meta['x-object-hash'] - - -def update_local(path, S): - # XXX If something is already here, put it in trash and delete it. - # XXX If we have a directory already here, put all files in trash. - fullpath = cstate.fullpath(path) - if S == 'DEL': - trash.put(fullpath, path, S) - elif S == 'DIR': - if os.path.exists(fullpath): - trash.put(fullpath, path, S) - # XXX Strip trailing slash (or escape). - os.mkdir(fullpath) - else: - # First, search for local copy - file = lstate.search(S) - if file: - copy_file(lstate.fullpath(file), fullpath) + + def remove_deleted_dirs(self): + for path in sorted(self.deleted_dirs, key=len, reverse=True): + os.rmdir(path) + self.deleted_dirs.remove(path) + + def resolve_conflict(self, path, hash): + """Resolve a sync conflict by renaming the local file and downloading + the remote one.""" + + fullpath = join(self.syncdir, path) + resolved = fullpath + '.local' + i = 0 + while exists(resolved): + i += 1 + resolved = fullpath + '.local%d' % i + + os.rename(fullpath, resolved) + self.download(path, hash) + + def rmdir(self, path): + """Remove a dir or mark for deletion if non-empty + + If a dir is empty delete it and check if any of its parents should be + deleted too. Else mark it for later deletion. + """ + + fullpath = join(self.syncdir, path) + if not exists(fullpath): + return + + if os.listdir(fullpath): + # Directory not empty + self.deleted_dirs.add(fullpath) + return + + os.rmdir(fullpath) + self.deleted_dirs.discard(fullpath) + + parent = dirname(fullpath) + while parent in self.deleted_dirs: + os.rmdir(parent) + self.deleted_dirs.remove(parent) + parent = dirname(parent) + + def save(self, path, hash): + """Save the hash value of a file. This value will be later returned + by `previous_hash`.""" + + sql = 'INSERT OR REPLACE INTO files (path, hash) VALUES (?, ?)' + self.conn.execute(sql, (path, hash)) + self.conn.commit() + + def touch(self, path, now): + sql = 'UPDATE files SET timestamp = ? WHERE path = ?' + self.conn.execute(sql, (now, path)) + self.conn.commit() + + def trash(self, path): + """Move a file to trash or delete it if it's a directory""" + + fullpath = join(self.syncdir, path) + if not exists(fullpath): + return + + if isfile(fullpath): + hash = merkle(fullpath) + trashpath = join(self.trashdir, hash) + os.rename(fullpath, trashpath) else: - # Search for copy in trash - file = trash.search(S) - if file: - # XXX Move from trash (not copy). - copy_file(trash.fullpath(file), fullpath) - else: - # Download - transfer.download(client, get_container(), path, fullpath) - assert cstate.get(path) == S - - -def update_remote(path, S): - fullpath = cstate.fullpath(path) - if S == 'DEL': - client.delete_object(get_container(), path) - elif S == 'DIR': - client.create_directory_marker(get_container(), path) - else: - prefix, name = os.path.split(path) - if prefix: - prefix += '/' - transfer.upload(client, fullpath, get_container(), prefix, name) - assert rstate.get(path) == S - - -def resolve_conflict(path): - # XXX Check if this works with dirs. - fullpath = cstate.fullpath(path) - if os.path.exists(fullpath): - os.rename(fullpath, fullpath + '.local') - + self.rmdir(path) + + def upload(self, path, hash): + fullpath = join(self.syncdir, path) + if hash == 'DEL': + client.delete_object(self.container, path) + elif hash == 'DIR': + client.create_directory_marker(self.container, path) + else: + prefix, name = split(path) + if prefix: + prefix += '/' + print 'Uploading %s...' % path + upload(client, fullpath, self.container, prefix, name) + + remote = self.remote_hash(path) + assert remote == hash, "Uploaded file does not match hash" + self.save(path, hash) -def sync(path): - L = lstate.get(path) - C = cstate.get(path) - R = rstate.get(path) - if C == L: - # No local changes - if R != L: - update_local(path, R) - lstate.put(path, R) - return +def sync(path, state): + previous = state.previous_hash(path) + current = state.current_hash(path) + remote = state.remote_hash(path) - if R == L: - # No remote changes - if C != L: - update_remote(path, C) - lstate.put(path, C) - return - - # At this point both local and remote states have changes since last sync - - if C == R: - # We were lucky, both had the same change - lstate.put(path, R) + if current == previous: + # No local changes, download any remote changes + if remote != previous: + state.download(path, remote) + elif remote == previous: + # No remote changes, upload any local changes + if current != previous: + state.upload(path, current) else: - # Conflict, try to resolve it - resolve_conflict(path) - update_local(path, R) - lstate.put(path, R) + # Both local and remote file have changes since last sync + if current == remote: + state.save(path, remote) # Local and remote changes match + else: + state.resolve_conflict(path, remote) -def walk(dir): +def walk(dir, container): + """Iterates on the files of the hierarchy created by merging the files + in `dir` and the objects in `container`.""" + pending = [''] while pending: dirs = set() files = set() - root = pending.pop(0) + root = pending.pop(0) # Depth First Traversal + if root == TRASH_DIR: + continue if root: yield root - dirpath = os.path.join(dir, root) - if os.path.exists(dirpath): + dirpath = join(dir, root) + if exists(dirpath): for filename in os.listdir(dirpath): - path = os.path.join(root, filename) - if os.path.isdir(os.path.join(dir, path)): + path = join(root, filename) + if isdir(join(dir, path)): dirs.add(path) else: files.add(path) - for object in client.list_objects(get_container(), prefix=root, - delimiter='/', format='json'): - # XXX Check subdirs. + for object in client.list_objects(container, format='json', + prefix=root, delimiter='/'): if 'subdir' in object: continue - name = str(object['name']) + name = object['name'] if object['content_type'] == 'application/directory': dirs.add(name) else: @@ -291,50 +303,27 @@ def walk(dir): def main(): - global client, conf, confdir, trash, lstate, cstate, rstate - if len(sys.argv) != 2: print 'syntax: %s ' % sys.argv[0] sys.exit(1) - dir = sys.argv[1] - client = Pithos_Client(get_server(), get_auth(), get_user()) + syncdir = sys.argv[1] - container = get_container() - try: - meta = client.retrieve_container_metadata(container) - except Fault: - raise RuntimeError("Cannot open container '%s'" % (container,)) - - conf = {'local': dir, - 'remote': container, - 'blocksize': int(meta['x-container-block-size']), - 'blockhash': meta['x-container-block-hash']} - confdir = os.path.expanduser('~/.pithos-sync/') + _makedirs(SETTINGS_DIR) + container = os.environ.get('PITHOS_SYNC_CONTAINER', DEFAULT_CONTAINER) + client.create_container(container) - conffile = os.path.join(confdir, 'config') - if os.path.isfile(conffile): - try: - if (conf != pickle.loads(open(conffile, 'rb').read())): - raise ValueError - except: - shutil.rmtree(confdir) - create_dir(confdir) + state = State(syncdir, container) - trash = Trash() - lstate = LocalState(dir) - cstate = CurrentState(dir) - rstate = RemoteState(client) - - for path in walk(dir): + now = int(time()) + for path in walk(syncdir, container): print 'Syncing', path - sync(path) - - f = open(conffile, 'wb') - f.write(pickle.dumps(conf)) - f.close() + sync(path, state) + state.touch(path, now) - trash.empty() + state.delete_inactive(now) + state.empty_trash() + state.remove_deleted_dirs() if __name__ == '__main__':