#!/usr/bin/env python # Copyright 2011 GRNET S.A. All rights reserved. # # Redistribution and use in source and binary forms, with or # without modification, are permitted provided that the following # conditions are met: # # 1. Redistributions of source code must retain the above # copyright notice, this list of conditions and the following # disclaimer. # # 2. Redistributions in binary form must reproduce the above # copyright notice, this list of conditions and the following # disclaimer in the documentation and/or other materials # provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. # # The views and conclusions contained in the software and # documentation are those of the authors and should not be # interpreted as representing official policies, either expressed # or implied, of GRNET S.A. import os import sqlite3 import sys import shutil import pickle import binascii from lib import transfer from lib.client import Pithos_Client, Fault from lib.hashmap import HashMap, merkle from lib.util import get_user, get_auth, get_server DEFAULT_CONTAINER = 'pithos' def get_container(): try: return os.environ['PITHOS_SYNC_CONTAINER'] except KeyError: return DEFAULT_CONTAINER def create_dir(path): if not os.path.exists(path): os.makedirs(path) if not os.path.isdir(path): raise RuntimeError("Cannot open '%s'" % (path,)) def copy_file(src, dst): path = os.dirname(dst) create_dir(path) shutil.copyfile(src, dst) client = None conf = None confdir = None trash = None lstate = None cstate = None rstate = None class Trash(object): def __init__(self): self.path = os.path.join(confdir, 'trash') create_dir(self.path) dbpath = os.path.join(confdir, 'trash.db') self.conn = sqlite3.connect(dbpath) sql = '''CREATE TABLE IF NOT EXISTS files ( path TEXT PRIMARY KEY, hash TEXT)''' self.conn.execute(sql) self.conn.commit() def put(self, path, hash): copy_file(path, os.path.join(self.path, path)) sql = 'INSERT OR REPLACE INTO files VALUES (?, ?)' self.conn.execute(sql, (path, hash)) self.conn.commit() def search(self, hash): sql = 'SELECT path FROM files WHERE hash = ?' ret = self.conn.execute(sql, (hash,)).fetchone() return ret[0] if ret else None def empty(self): sql = 'DELETE FROM files' self.conn.execute(sql) self.conn.commit() shutil.rmtree(self.path) def fullpath(self, path): return os.path.join(self.path, path) class LocalState(object): def __init__(self, path): self.path = path dbpath = os.path.join(confdir, 'state.db') self.conn = sqlite3.connect(dbpath) sql = '''CREATE TABLE IF NOT EXISTS files ( path TEXT PRIMARY KEY, hash TEXT)''' self.conn.execute(sql) self.conn.commit() def get(self, path): sql = 'SELECT hash FROM files WHERE path = ?' ret = self.conn.execute(sql, (path,)).fetchone() return ret[0] if ret else 'DEL' def put(self, path, hash): sql = 'INSERT OR REPLACE INTO files VALUES (?, ?)' self.conn.execute(sql, (path, hash)) self.conn.commit() def search(self, hash): sql = 'SELECT path FROM files WHERE hash = ?' ret = self.conn.execute(sql, (hash,)).fetchone() return ret[0] if ret else None def fullpath(self, path): return os.path.join(self.path, path) class CurrentState(object): def __init__(self, path): self.path = path def get(self, path): fullpath = os.path.join(self.path, path) if os.path.exists(fullpath): if os.path.isdir(fullpath): return 'DIR' else: return merkle(fullpath, conf['blocksize'], conf['blockhash']) else: return 'DEL' def fullpath(self, path): return os.path.join(self.path, path) class RemoteState(object): def __init__(self, client): self.client = client self.container = get_container() def get(self, path): try: meta = self.client.retrieve_object_metadata(self.container, path) except Fault: return 'DEL' if meta.get('content-type', None) == 'application/directory': return 'DIR' else: data = client.retrieve_object(self.container, path, format='json') hashmap = HashMap(conf['blocksize'], conf['blockhash']) hashmap += [binascii.unhexlify(x) for x in data['hashes']] return binascii.hexlify(hashmap.hash()) def update_local(path, S): fullpath = cstate.fullpath(path) if S == 'DEL': os.remove(fullpath) elif S == 'DIR': if os.path.exists(fullpath): os.remove(fullpath) os.mkdir(fullpath) else: # First, search for local copy file = lstate.search(S) if file: copy_file(lstate.fullpath(file), fullpath) else: # Search for copy in trash file = trash.search(S) if file: copy_file(trash.fullpath(file), fullpath) else: # Download transfer.download(client, get_container(), path, fullpath) assert cstate.get(path) == S def update_remote(path, S): fullpath = cstate.fullpath(path) if S == 'DEL': client.delete_object(get_container(), path) elif S == 'DIR': client.create_directory_marker(get_container(), path) else: prefix, name = os.path.split(path) if prefix: prefix += '/' transfer.upload(client, fullpath, get_container(), prefix, name) assert rstate.get(path) == S def resolve_conflict(path): fullpath = cstate.fullpath(path) if os.path.exists(fullpath): os.rename(fullpath, fullpath + '.local') def sync(path): L = lstate.get(path) C = cstate.get(path) R = rstate.get(path) if C == L: # No local changes if R != L: update_local(path, R) lstate.put(path, R) return if R == L: # No remote changes if C != L: update_remote(path, C) lstate.put(path, C) return # At this point both local and remote states have changes since last sync if C == R: # We were lucky, both had the same change lstate.put(path, R) else: # Conflict, try to resolve it resolve_conflict(path) update_local(path, R) lstate.put(path, R) def walk(dir): pending = [''] while pending: dirs = set() files = set() root = pending.pop(0) if root: yield root dirpath = os.path.join(dir, root) if os.path.exists(dirpath): for filename in os.listdir(dirpath): path = os.path.join(root, filename) if os.path.isdir(os.path.join(dir, path)): dirs.add(path) else: files.add(path) for object in client.list_objects(get_container(), prefix=root, delimiter='/', format='json'): if 'subdir' in object: continue name = str(object['name']) if object['content_type'] == 'application/directory': dirs.add(name) else: files.add(name) pending += sorted(dirs) for path in files: yield path def main(): global client, conf, confdir, trash, lstate, cstate, rstate if len(sys.argv) != 2: print 'syntax: %s ' % sys.argv[0] sys.exit(1) dir = sys.argv[1] client = Pithos_Client(get_server(), get_auth(), get_user()) container = get_container() try: meta = client.retrieve_container_metadata(container) except Fault: raise RuntimeError("Cannot open container '%s'" % (container,)) conf = {'local': dir, 'remote': container, 'blocksize': int(meta['x-container-block-size']), 'blockhash': meta['x-container-block-hash']} confdir = os.path.expanduser('~/.pithos-sync/') conffile = os.path.join(confdir, 'config') if os.path.isfile(conffile): try: if (conf != pickle.loads(open(conffile, 'rb').read())): raise ValueError except: shutil.rmtree(confdir) create_dir(confdir) trash = Trash() lstate = LocalState(dir) cstate = CurrentState(dir) rstate = RemoteState(client) for path in walk(dir): print 'Syncing', path sync(path) f = open(conffile, 'wb') f.write(pickle.dumps(conf)) f.close() trash.empty() if __name__ == '__main__': main()