import os
import sqlite3
import sys
-import shutil
-import pickle
-from lib import transfer
+from os.path import exists, expanduser, isdir, isfile, join, split
+from shutil import copyfile
+from time import time
+
+from lib.transfer import download, upload
from lib.client import Pithos_Client, Fault
-from lib.hashmap import HashMap, merkle
+from lib.hashmap import merkle
from lib.util import get_user, get_auth, get_server
DEFAULT_CONTAINER = 'pithos'
+SETTINGS_DIR = expanduser('~/.pithos')
+TRASH_DIR = '.pithos_trash'
-def get_container():
- try:
- return os.environ['PITHOS_SYNC_CONTAINER']
- except KeyError:
- return DEFAULT_CONTAINER
-
+SQL_CREATE_FILES_TABLE = '''CREATE TABLE IF NOT EXISTS files (
+ path TEXT PRIMARY KEY,
+ hash TEXT,
+ timestamp INTEGER)'''
-def create_dir(path):
- if not os.path.exists(path):
- os.makedirs(path)
- if not os.path.isdir(path):
- raise RuntimeError("Cannot open '%s'" % (path,))
+client = Pithos_Client(get_server(), get_auth(), get_user())
-def copy_file(src, dst):
- print '***', 'COPYING', src, dst
- path = os.path.dirname(dst)
- create_dir(path)
- shutil.copyfile(src, dst)
+def _makedirs(path):
+ try:
+ os.makedirs(path)
+ except OSError:
+ pass
-client = None
-conf = None
-confdir = None
-trash = None
-lstate = None
-cstate = None
-rstate = None
+class State(object):
+ def __init__(self, syncdir, container):
+ self.syncdir = syncdir
+ self.container = container
+ self.trashdir = join(syncdir, TRASH_DIR)
+ self.deleted_dirs = set()
-class Trash(object):
- def __init__(self):
- self.path = os.path.join(confdir, 'trash')
- create_dir(self.path)
+ _makedirs(self.trashdir)
- dbpath = os.path.join(confdir, 'trash.db')
+ dbpath = join(SETTINGS_DIR, 'sync.db')
self.conn = sqlite3.connect(dbpath)
- sql = '''CREATE TABLE IF NOT EXISTS files (
- path TEXT PRIMARY KEY, hash TEXT)'''
- self.conn.execute(sql)
+ self.conn.execute(SQL_CREATE_FILES_TABLE)
self.conn.commit()
- def put(self, fullpath, path, hash):
- copy_file(fullpath, os.path.join(self.path, path))
- os.remove(fullpath)
- sql = 'INSERT OR REPLACE INTO files VALUES (?, ?)'
- self.conn.execute(sql, (path, hash))
- self.conn.commit()
-
- def search(self, hash):
- sql = 'SELECT path FROM files WHERE hash = ?'
- ret = self.conn.execute(sql, (hash,)).fetchone()
- return ret[0] if ret else None
+ def current_hash(self, path):
+ """Return the hash of the file as it exists now in the filesystem"""
+
+ fullpath = join(self.syncdir, path)
+ if fullpath in self.deleted_dirs:
+ return 'DEL'
+ if not exists(fullpath):
+ return 'DEL'
+ if isdir(fullpath):
+ return 'DIR'
+ return merkle(fullpath)
- def empty(self):
- sql = 'DELETE FROM files'
- self.conn.execute(sql)
+ def delete_inactive(self, timestamp):
+ sql = 'DELETE FROM files WHERE timestamp != ?'
+ self.conn.execute(sql, (timestamp,))
self.conn.commit()
- shutil.rmtree(self.path)
- def fullpath(self, path):
- return os.path.join(self.path, path)
-
-
-class LocalState(object):
- def __init__(self, path):
- self.path = path
+ def download(self, path, hash):
+ fullpath = join(self.syncdir, path)
+ if hash == 'DEL':
+ self.trash(path)
+ elif hash == 'DIR':
+ _makedirs(fullpath)
+ else:
+ self.trash(path) # Trash any old version
+ localpath = self.find_hash(hash)
+ if localpath:
+ copyfile(localpath, fullpath)
+ else:
+ print 'Downloading %s...' % path
+ download(client, self.container, path, fullpath)
- dbpath = os.path.join(confdir, 'state.db')
- self.conn = sqlite3.connect(dbpath)
- sql = '''CREATE TABLE IF NOT EXISTS files (
- path TEXT PRIMARY KEY, hash TEXT)'''
- self.conn.execute(sql)
- self.conn.commit()
+ current = self.current_hash(path)
+ assert current == hash, "Downloaded file does not match hash"
+ self.save(path, hash)
- def get(self, path):
- sql = 'SELECT hash FROM files WHERE path = ?'
- ret = self.conn.execute(sql, (path,)).fetchone()
- return ret[0] if ret else 'DEL'
+ def empty_trash(self):
+ for filename in os.listdir(self.trashdir):
+ path = join(self.trashdir, filename)
+ os.remove(path)
- def put(self, path, hash):
- sql = 'INSERT OR REPLACE INTO files VALUES (?, ?)'
- self.conn.execute(sql, (path, hash))
- self.conn.commit()
-
- def search(self, hash):
+ def find_hash(self, hash):
sql = 'SELECT path FROM files WHERE hash = ?'
ret = self.conn.execute(sql, (hash,)).fetchone()
- return ret[0] if ret else None
-
- def fullpath(self, path):
- return os.path.join(self.path, path)
-
-
-class CurrentState(object):
- def __init__(self, path):
- self.path = path
-
- def get(self, path):
- fullpath = os.path.join(self.path, path)
- if os.path.exists(fullpath):
- if os.path.isdir(fullpath):
- return 'DIR'
- else:
- return merkle(fullpath, conf['blocksize'], conf['blockhash'])
- else:
- return 'DEL'
+ if ret:
+ return join(self.syncdir, ret[0])
+
+ if hash in os.listdir(self.trashdir):
+ return join(self.trashdir, hash)
+
+ return None
- def fullpath(self, path):
- return os.path.join(self.path, path)
-
-
-class RemoteState(object):
- def __init__(self, client):
- self.client = client
- self.container = get_container()
+ def previous_hash(self, path):
+ """Return the hash of the file according to the previous sync with
+ the server. Return DEL if not such entry exists."""
+
+ sql = 'SELECT hash FROM files WHERE path = ?'
+ ret = self.conn.execute(sql, (path,)).fetchone()
+ return ret[0] if ret else 'DEL'
- def get(self, path):
+ def remote_hash(self, path):
+ """Return the hash of the file according to the server"""
+
try:
- meta = self.client.retrieve_object_metadata(self.container, path)
+ meta = client.retrieve_object_metadata(self.container, path)
except Fault:
return 'DEL'
if meta.get('content-type', None) == 'application/directory':
return 'DIR'
else:
return meta['x-object-hash']
-
-
-def update_local(path, S):
- # XXX If something is already here, put it in trash and delete it.
- # XXX If we have a directory already here, put all files in trash.
- fullpath = cstate.fullpath(path)
- if S == 'DEL':
- trash.put(fullpath, path, S)
- elif S == 'DIR':
- if os.path.exists(fullpath):
- trash.put(fullpath, path, S)
- # XXX Strip trailing slash (or escape).
- os.mkdir(fullpath)
- else:
- # First, search for local copy
- file = lstate.search(S)
- if file:
- copy_file(lstate.fullpath(file), fullpath)
+
+ def remove_deleted_dirs(self):
+ for path in sorted(self.deleted_dirs, key=len, reverse=True):
+ os.rmdir(path)
+ self.deleted_dirs.remove(path)
+
+ def resolve_conflict(self, path, hash):
+ """Resolve a sync conflict by renaming the local file and downloading
+ the remote one."""
+
+ fullpath = join(self.syncdir, path)
+ resolved = fullpath + '.local'
+ i = 0
+ while exists(resolved):
+ i += 1
+ resolved = fullpath + '.local%d' % i
+
+ os.rename(fullpath, resolved)
+ self.download(path, hash)
+
+ def rmdir(self, path):
+ """Remove a dir or mark for deletion if non-empty
+
+ If a dir is empty delete it and check if any of its parents should be
+ deleted too. Else mark it for later deletion.
+ """
+
+ fullpath = join(self.syncdir, path)
+ if not exists(fullpath):
+ return
+
+ if os.listdir(fullpath):
+ # Directory not empty
+ self.deleted_dirs.add(fullpath)
+ return
+
+ os.rmdir(fullpath)
+ self.deleted_dirs.discard(fullpath)
+
+ parent = dirname(fullpath)
+ while parent in self.deleted_dirs:
+ os.rmdir(parent)
+ self.deleted_dirs.remove(parent)
+ parent = dirname(parent)
+
+ def save(self, path, hash):
+ """Save the hash value of a file. This value will be later returned
+ by `previous_hash`."""
+
+ sql = 'INSERT OR REPLACE INTO files (path, hash) VALUES (?, ?)'
+ self.conn.execute(sql, (path, hash))
+ self.conn.commit()
+
+ def touch(self, path, now):
+ sql = 'UPDATE files SET timestamp = ? WHERE path = ?'
+ self.conn.execute(sql, (now, path))
+ self.conn.commit()
+
+ def trash(self, path):
+ """Move a file to trash or delete it if it's a directory"""
+
+ fullpath = join(self.syncdir, path)
+ if not exists(fullpath):
+ return
+
+ if isfile(fullpath):
+ hash = merkle(fullpath)
+ trashpath = join(self.trashdir, hash)
+ os.rename(fullpath, trashpath)
else:
- # Search for copy in trash
- file = trash.search(S)
- if file:
- # XXX Move from trash (not copy).
- copy_file(trash.fullpath(file), fullpath)
- else:
- # Download
- transfer.download(client, get_container(), path, fullpath)
- assert cstate.get(path) == S
-
-
-def update_remote(path, S):
- fullpath = cstate.fullpath(path)
- if S == 'DEL':
- client.delete_object(get_container(), path)
- elif S == 'DIR':
- client.create_directory_marker(get_container(), path)
- else:
- prefix, name = os.path.split(path)
- if prefix:
- prefix += '/'
- transfer.upload(client, fullpath, get_container(), prefix, name)
- assert rstate.get(path) == S
-
-
-def resolve_conflict(path):
- # XXX Check if this works with dirs.
- fullpath = cstate.fullpath(path)
- if os.path.exists(fullpath):
- os.rename(fullpath, fullpath + '.local')
-
+ self.rmdir(path)
+
+ def upload(self, path, hash):
+ fullpath = join(self.syncdir, path)
+ if hash == 'DEL':
+ client.delete_object(self.container, path)
+ elif hash == 'DIR':
+ client.create_directory_marker(self.container, path)
+ else:
+ prefix, name = split(path)
+ if prefix:
+ prefix += '/'
+ print 'Uploading %s...' % path
+ upload(client, fullpath, self.container, prefix, name)
+
+ remote = self.remote_hash(path)
+ assert remote == hash, "Uploaded file does not match hash"
+ self.save(path, hash)
-def sync(path):
- L = lstate.get(path)
- C = cstate.get(path)
- R = rstate.get(path)
- if C == L:
- # No local changes
- if R != L:
- update_local(path, R)
- lstate.put(path, R)
- return
+def sync(path, state):
+ previous = state.previous_hash(path)
+ current = state.current_hash(path)
+ remote = state.remote_hash(path)
- if R == L:
- # No remote changes
- if C != L:
- update_remote(path, C)
- lstate.put(path, C)
- return
-
- # At this point both local and remote states have changes since last sync
-
- if C == R:
- # We were lucky, both had the same change
- lstate.put(path, R)
+ if current == previous:
+ # No local changes, download any remote changes
+ if remote != previous:
+ state.download(path, remote)
+ elif remote == previous:
+ # No remote changes, upload any local changes
+ if current != previous:
+ state.upload(path, current)
else:
- # Conflict, try to resolve it
- resolve_conflict(path)
- update_local(path, R)
- lstate.put(path, R)
+ # Both local and remote file have changes since last sync
+ if current == remote:
+ state.save(path, remote) # Local and remote changes match
+ else:
+ state.resolve_conflict(path, remote)
-def walk(dir):
+def walk(dir, container):
+ """Iterates on the files of the hierarchy created by merging the files
+ in `dir` and the objects in `container`."""
+
pending = ['']
while pending:
dirs = set()
files = set()
- root = pending.pop(0)
+ root = pending.pop(0) # Depth First Traversal
+ if root == TRASH_DIR:
+ continue
if root:
yield root
- dirpath = os.path.join(dir, root)
- if os.path.exists(dirpath):
+ dirpath = join(dir, root)
+ if exists(dirpath):
for filename in os.listdir(dirpath):
- path = os.path.join(root, filename)
- if os.path.isdir(os.path.join(dir, path)):
+ path = join(root, filename)
+ if isdir(join(dir, path)):
dirs.add(path)
else:
files.add(path)
- for object in client.list_objects(get_container(), prefix=root,
- delimiter='/', format='json'):
- # XXX Check subdirs.
+ for object in client.list_objects(container, format='json',
+ prefix=root, delimiter='/'):
if 'subdir' in object:
continue
- name = str(object['name'])
+ name = object['name']
if object['content_type'] == 'application/directory':
dirs.add(name)
else:
def main():
- global client, conf, confdir, trash, lstate, cstate, rstate
-
if len(sys.argv) != 2:
print 'syntax: %s <dir>' % sys.argv[0]
sys.exit(1)
- dir = sys.argv[1]
- client = Pithos_Client(get_server(), get_auth(), get_user())
+ syncdir = sys.argv[1]
- container = get_container()
- try:
- meta = client.retrieve_container_metadata(container)
- except Fault:
- raise RuntimeError("Cannot open container '%s'" % (container,))
-
- conf = {'local': dir,
- 'remote': container,
- 'blocksize': int(meta['x-container-block-size']),
- 'blockhash': meta['x-container-block-hash']}
- confdir = os.path.expanduser('~/.pithos-sync/')
+ _makedirs(SETTINGS_DIR)
+ container = os.environ.get('PITHOS_SYNC_CONTAINER', DEFAULT_CONTAINER)
+ client.create_container(container)
- conffile = os.path.join(confdir, 'config')
- if os.path.isfile(conffile):
- try:
- if (conf != pickle.loads(open(conffile, 'rb').read())):
- raise ValueError
- except:
- shutil.rmtree(confdir)
- create_dir(confdir)
+ state = State(syncdir, container)
- trash = Trash()
- lstate = LocalState(dir)
- cstate = CurrentState(dir)
- rstate = RemoteState(client)
-
- for path in walk(dir):
+ now = int(time())
+ for path in walk(syncdir, container):
print 'Syncing', path
- sync(path)
-
- f = open(conffile, 'wb')
- f.write(pickle.dumps(conf))
- f.close()
+ sync(path, state)
+ state.touch(path, now)
- trash.empty()
+ state.delete_inactive(now)
+ state.empty_trash()
+ state.remove_deleted_dirs()
if __name__ == '__main__':