Improve pithos-sync
authorGiorgos Verigakis <verigak@gmail.com>
Fri, 2 Dec 2011 14:20:50 +0000 (16:20 +0200)
committerGiorgos Verigakis <verigak@gmail.com>
Fri, 2 Dec 2011 14:20:50 +0000 (16:20 +0200)
* Do not redownload files that exist locally.
* General refactoring to simplify the code.

Refs #1495

tools/lib/transfer.py
tools/pithos-sync

index e5783eb..a854d4e 100644 (file)
@@ -87,7 +87,6 @@ def download(client, container, object, path):
     blockhash = res['block_hash']
     bytes = res['bytes']
     map = res['hashes']
-    print res
     
     if os.path.exists(path):
         h = HashMap(blocksize, blockhash)
index d4b64d5..1c69769 100755 (executable)
 import os
 import sqlite3
 import sys
-import shutil
-import pickle
 
-from lib import transfer
+from os.path import exists, expanduser, isdir, isfile, join, split
+from shutil import copyfile
+from time import time
+
+from lib.transfer import download, upload
 from lib.client import Pithos_Client, Fault
-from lib.hashmap import HashMap, merkle
+from lib.hashmap import merkle
 from lib.util import get_user, get_auth, get_server
 
 
 DEFAULT_CONTAINER = 'pithos'
+SETTINGS_DIR = expanduser('~/.pithos')
+TRASH_DIR = '.pithos_trash'
 
-def get_container():
-    try:
-        return os.environ['PITHOS_SYNC_CONTAINER']
-    except KeyError:
-        return DEFAULT_CONTAINER
-
+SQL_CREATE_FILES_TABLE = '''CREATE TABLE IF NOT EXISTS files (
+                                path TEXT PRIMARY KEY,
+                                hash TEXT,
+                                timestamp INTEGER)'''
 
-def create_dir(path):
-    if not os.path.exists(path):
-        os.makedirs(path)
-    if not os.path.isdir(path):
-        raise RuntimeError("Cannot open '%s'" % (path,))
 
+client = Pithos_Client(get_server(), get_auth(), get_user())
 
-def copy_file(src, dst):
-    print '***', 'COPYING', src, dst
-    path = os.path.dirname(dst)
-    create_dir(path)
-    shutil.copyfile(src, dst)
 
+def _makedirs(path):
+    try:
+        os.makedirs(path)
+    except OSError:
+        pass
 
-client = None
-conf = None
-confdir = None
-trash = None
-lstate = None
-cstate = None
-rstate = None
 
+class State(object):
+    def __init__(self, syncdir, container):
+        self.syncdir = syncdir
+        self.container = container
+        self.trashdir = join(syncdir, TRASH_DIR)
+        self.deleted_dirs = set()
 
-class Trash(object):
-    def __init__(self):
-        self.path = os.path.join(confdir, 'trash')
-        create_dir(self.path)
+        _makedirs(self.trashdir)
         
-        dbpath = os.path.join(confdir, 'trash.db')
+        dbpath = join(SETTINGS_DIR, 'sync.db')
         self.conn = sqlite3.connect(dbpath)
-        sql = '''CREATE TABLE IF NOT EXISTS files (
-                    path TEXT PRIMARY KEY, hash TEXT)'''
-        self.conn.execute(sql)
+        self.conn.execute(SQL_CREATE_FILES_TABLE)
         self.conn.commit()
     
-    def put(self, fullpath, path, hash):
-        copy_file(fullpath, os.path.join(self.path, path))
-        os.remove(fullpath)
-        sql = 'INSERT OR REPLACE INTO files VALUES (?, ?)'
-        self.conn.execute(sql, (path, hash))
-        self.conn.commit()
-    
-    def search(self, hash):
-        sql = 'SELECT path FROM files WHERE hash = ?'
-        ret = self.conn.execute(sql, (hash,)).fetchone()
-        return ret[0] if ret else None
+    def current_hash(self, path):
+        """Return the hash of the file as it exists now in the filesystem"""
+        
+        fullpath = join(self.syncdir, path)
+        if fullpath in self.deleted_dirs:
+            return 'DEL'
+        if not exists(fullpath):
+            return 'DEL'
+        if isdir(fullpath):
+            return 'DIR'
+        return merkle(fullpath)
     
-    def empty(self):
-        sql = 'DELETE FROM files'
-        self.conn.execute(sql)
+    def delete_inactive(self, timestamp):
+        sql = 'DELETE FROM files WHERE timestamp != ?'
+        self.conn.execute(sql, (timestamp,))
         self.conn.commit()
-        shutil.rmtree(self.path)
     
-    def fullpath(self, path):
-        return os.path.join(self.path, path)
-
-
-class LocalState(object):
-    def __init__(self, path):
-        self.path = path
+    def download(self, path, hash):
+        fullpath = join(self.syncdir, path)
+        if hash == 'DEL':
+            self.trash(path)
+        elif hash == 'DIR':
+            _makedirs(fullpath)
+        else:
+            self.trash(path)    # Trash any old version
+            localpath = self.find_hash(hash)
+            if localpath:
+                copyfile(localpath, fullpath)
+            else:
+                print 'Downloading %s...' % path
+                download(client, self.container, path, fullpath)
         
-        dbpath = os.path.join(confdir, 'state.db')
-        self.conn = sqlite3.connect(dbpath)
-        sql = '''CREATE TABLE IF NOT EXISTS files (
-                    path TEXT PRIMARY KEY, hash TEXT)'''
-        self.conn.execute(sql)
-        self.conn.commit()
+        current = self.current_hash(path)
+        assert current == hash, "Downloaded file does not match hash"
+        self.save(path, hash)
     
-    def get(self, path):
-        sql = 'SELECT hash FROM files WHERE path = ?'
-        ret = self.conn.execute(sql, (path,)).fetchone()
-        return ret[0] if ret else 'DEL'
+    def empty_trash(self):
+        for filename in os.listdir(self.trashdir):
+            path = join(self.trashdir, filename)
+            os.remove(path)
     
-    def put(self, path, hash):
-        sql = 'INSERT OR REPLACE INTO files VALUES (?, ?)'
-        self.conn.execute(sql, (path, hash))
-        self.conn.commit()
-
-    def search(self, hash):
+    def find_hash(self, hash):
         sql = 'SELECT path FROM files WHERE hash = ?'
         ret = self.conn.execute(sql, (hash,)).fetchone()
-        return ret[0] if ret else None
-    
-    def fullpath(self, path):
-        return os.path.join(self.path, path)
-
-
-class CurrentState(object):
-    def __init__(self, path):
-        self.path = path
-    
-    def get(self, path):
-        fullpath = os.path.join(self.path, path)
-        if os.path.exists(fullpath):
-            if os.path.isdir(fullpath):
-                return 'DIR'
-            else:
-                return merkle(fullpath, conf['blocksize'], conf['blockhash'])
-        else:
-            return 'DEL'
+        if ret:
+            return join(self.syncdir, ret[0])
+        
+        if hash in os.listdir(self.trashdir):
+            return join(self.trashdir, hash)
+        
+        return None
     
-    def fullpath(self, path):
-        return os.path.join(self.path, path)
-
-
-class RemoteState(object):
-    def __init__(self, client):
-        self.client = client
-        self.container = get_container()
+    def previous_hash(self, path):
+        """Return the hash of the file according to the previous sync with
+           the server. Return DEL if not such entry exists."""
+        
+        sql = 'SELECT hash FROM files WHERE path = ?'
+        ret = self.conn.execute(sql, (path,)).fetchone()
+        return ret[0] if ret else 'DEL'
     
-    def get(self, path):
+    def remote_hash(self, path):
+        """Return the hash of the file according to the server"""
+        
         try:
-            meta = self.client.retrieve_object_metadata(self.container, path)
+            meta = client.retrieve_object_metadata(self.container, path)
         except Fault:
             return 'DEL'
         if meta.get('content-type', None) == 'application/directory':
             return 'DIR'
         else:
             return meta['x-object-hash']
-
-
-def update_local(path, S):
-    # XXX If something is already here, put it in trash and delete it.
-    # XXX If we have a directory already here, put all files in trash.
-    fullpath = cstate.fullpath(path)
-    if S == 'DEL':
-        trash.put(fullpath, path, S)
-    elif S == 'DIR':
-        if os.path.exists(fullpath):
-            trash.put(fullpath, path, S)
-        # XXX Strip trailing slash (or escape).
-        os.mkdir(fullpath)
-    else:
-        # First, search for local copy
-        file = lstate.search(S)
-        if file:
-            copy_file(lstate.fullpath(file), fullpath)
+    
+    def remove_deleted_dirs(self):
+        for path in sorted(self.deleted_dirs, key=len, reverse=True):
+            os.rmdir(path)
+            self.deleted_dirs.remove(path)
+    
+    def resolve_conflict(self, path, hash):
+        """Resolve a sync conflict by renaming the local file and downloading
+           the remote one."""
+        
+        fullpath = join(self.syncdir, path)
+        resolved = fullpath + '.local'
+        i = 0
+        while exists(resolved):
+            i += 1
+            resolved = fullpath + '.local%d' % i
+        
+        os.rename(fullpath, resolved)
+        self.download(path, hash)
+    
+    def rmdir(self, path):
+        """Remove a dir or mark for deletion if non-empty
+        
+        If a dir is empty delete it and check if any of its parents should be
+        deleted too. Else mark it for later deletion.
+        """
+        
+        fullpath = join(self.syncdir, path)
+        if not exists(fullpath):
+            return
+        
+        if os.listdir(fullpath):
+            # Directory not empty
+            self.deleted_dirs.add(fullpath)
+            return
+        
+        os.rmdir(fullpath)
+        self.deleted_dirs.discard(fullpath)
+        
+        parent = dirname(fullpath)
+        while parent in self.deleted_dirs:
+            os.rmdir(parent)
+            self.deleted_dirs.remove(parent)
+            parent = dirname(parent)
+    
+    def save(self, path, hash):
+        """Save the hash value of a file. This value will be later returned
+           by `previous_hash`."""
+        
+        sql = 'INSERT OR REPLACE INTO files (path, hash) VALUES (?, ?)'
+        self.conn.execute(sql, (path, hash))
+        self.conn.commit()
+    
+    def touch(self, path, now):
+        sql = 'UPDATE files SET timestamp = ? WHERE path = ?'
+        self.conn.execute(sql, (now, path))
+        self.conn.commit()
+    
+    def trash(self, path):
+        """Move a file to trash or delete it if it's a directory"""
+        
+        fullpath = join(self.syncdir, path)
+        if not exists(fullpath):
+            return
+        
+        if isfile(fullpath):
+            hash = merkle(fullpath)
+            trashpath = join(self.trashdir, hash)
+            os.rename(fullpath, trashpath)
         else:
-            # Search for copy in trash
-            file = trash.search(S)
-            if file:
-                # XXX Move from trash (not copy).
-                copy_file(trash.fullpath(file), fullpath)
-            else:
-                # Download
-                transfer.download(client, get_container(), path, fullpath)
-        assert cstate.get(path) == S
-
-
-def update_remote(path, S):
-    fullpath = cstate.fullpath(path)
-    if S == 'DEL':
-        client.delete_object(get_container(), path)
-    elif S == 'DIR':
-        client.create_directory_marker(get_container(), path)
-    else:
-        prefix, name = os.path.split(path)
-        if prefix:
-            prefix += '/'
-        transfer.upload(client, fullpath, get_container(), prefix, name)
-        assert rstate.get(path) == S
-
-
-def resolve_conflict(path):
-    # XXX Check if this works with dirs.
-    fullpath = cstate.fullpath(path)
-    if os.path.exists(fullpath):
-        os.rename(fullpath, fullpath + '.local')
-
+            self.rmdir(path)
+    
+    def upload(self, path, hash):
+        fullpath = join(self.syncdir, path)
+        if hash == 'DEL':
+            client.delete_object(self.container, path)
+        elif hash == 'DIR':
+            client.create_directory_marker(self.container, path)
+        else:
+            prefix, name = split(path)
+            if prefix:
+                prefix += '/'
+            print 'Uploading %s...' % path
+            upload(client, fullpath, self.container, prefix, name)
+        
+        remote = self.remote_hash(path)
+        assert remote == hash, "Uploaded file does not match hash"
+        self.save(path, hash)
 
-def sync(path):
-    L = lstate.get(path)
-    C = cstate.get(path)
-    R = rstate.get(path)
 
-    if C == L:
-        # No local changes
-        if R != L:
-            update_local(path, R)
-            lstate.put(path, R)
-        return
+def sync(path, state):
+    previous = state.previous_hash(path)
+    current = state.current_hash(path)
+    remote = state.remote_hash(path)
     
-    if R == L:
-        # No remote changes
-        if C != L:
-            update_remote(path, C)
-            lstate.put(path, C)
-        return
-    
-    # At this point both local and remote states have changes since last sync
-
-    if C == R:
-        # We were lucky, both had the same change
-        lstate.put(path, R)
+    if current == previous:
+        # No local changes, download any remote changes
+        if remote != previous:
+            state.download(path, remote)
+    elif remote == previous:
+        # No remote changes, upload any local changes
+        if current != previous:
+            state.upload(path, current)
     else:
-        # Conflict, try to resolve it
-        resolve_conflict(path)
-        update_local(path, R)
-        lstate.put(path, R)
+        # Both local and remote file have changes since last sync
+        if current == remote:
+            state.save(path, remote)    # Local and remote changes match
+        else:
+            state.resolve_conflict(path, remote)
 
 
-def walk(dir):
+def walk(dir, container):
+    """Iterates on the files of the hierarchy created by merging the files
+       in `dir` and the objects in `container`."""
+    
     pending = ['']
     
     while pending:
         dirs = set()
         files = set()
-        root = pending.pop(0)
+        root = pending.pop(0)   # Depth First Traversal
+        if root == TRASH_DIR:
+            continue
         if root:
             yield root
         
-        dirpath = os.path.join(dir, root)
-        if os.path.exists(dirpath):
+        dirpath = join(dir, root)
+        if exists(dirpath):
             for filename in os.listdir(dirpath):
-                path = os.path.join(root, filename)
-                if os.path.isdir(os.path.join(dir, path)):
+                path = join(root, filename)
+                if isdir(join(dir, path)):
                     dirs.add(path)
                 else:
                     files.add(path)
         
-        for object in client.list_objects(get_container(), prefix=root,
-                                            delimiter='/', format='json'):
-            # XXX Check subdirs.
+        for object in client.list_objects(container, format='json',
+                prefix=root, delimiter='/'):
             if 'subdir' in object:
                 continue
-            name = str(object['name'])
+            name = object['name']
             if object['content_type'] == 'application/directory':
                 dirs.add(name)
             else:
@@ -291,50 +303,27 @@ def walk(dir):
 
 
 def main():
-    global client, conf, confdir, trash, lstate, cstate, rstate
-    
     if len(sys.argv) != 2:
         print 'syntax: %s <dir>' % sys.argv[0]
         sys.exit(1)
     
-    dir = sys.argv[1]
-    client = Pithos_Client(get_server(), get_auth(), get_user())
+    syncdir = sys.argv[1]
     
-    container = get_container()
-    try:
-        meta = client.retrieve_container_metadata(container)
-    except Fault:
-        raise RuntimeError("Cannot open container '%s'" % (container,))
-    
-    conf = {'local': dir,
-            'remote': container,
-            'blocksize': int(meta['x-container-block-size']),
-            'blockhash': meta['x-container-block-hash']}
-    confdir = os.path.expanduser('~/.pithos-sync/')
+    _makedirs(SETTINGS_DIR)
+    container = os.environ.get('PITHOS_SYNC_CONTAINER', DEFAULT_CONTAINER)
+    client.create_container(container)
     
-    conffile = os.path.join(confdir, 'config')
-    if os.path.isfile(conffile):
-        try:
-            if (conf != pickle.loads(open(conffile, 'rb').read())):
-                raise ValueError
-        except:
-            shutil.rmtree(confdir)
-    create_dir(confdir)
+    state = State(syncdir, container)
     
-    trash = Trash()
-    lstate = LocalState(dir)
-    cstate = CurrentState(dir)
-    rstate = RemoteState(client)
-    
-    for path in walk(dir):
+    now = int(time())
+    for path in walk(syncdir, container):
         print 'Syncing', path
-        sync(path)
-    
-    f = open(conffile, 'wb')
-    f.write(pickle.dumps(conf))
-    f.close()
+        sync(path, state)
+        state.touch(path, now)
     
-    trash.empty()
+    state.delete_inactive(now)
+    state.empty_trash()
+    state.remove_deleted_dirs()
 
 
 if __name__ == '__main__':