Apply metadata domain to the backend.
authorAntony Chazapis <chazapis@gmail.com>
Thu, 15 Dec 2011 21:23:27 +0000 (23:23 +0200)
committerAntony Chazapis <chazapis@gmail.com>
Thu, 15 Dec 2011 21:23:27 +0000 (23:23 +0200)
Refs #1782

pithos/backends/lib/sqlite/node.py
pithos/backends/modular.py
pithos/lib/filter.py

index e4e0b73..b4ea7d5 100644 (file)
@@ -157,9 +157,10 @@ class Node(DBWorker):
         
         execute(""" create table if not exists attributes
                           ( serial integer,
+                            domain text,
                             key    text,
                             value  text,
-                            primary key (serial, key)
+                            primary key (serial, domain, key)
                             foreign key (serial)
                             references versions(serial)
                             on update cascade
@@ -546,7 +547,7 @@ class Node(DBWorker):
         self.execute(q, (serial,))
         return hash
     
-    def attribute_get(self, serial, keys=()):
+    def attribute_get(self, serial, domain, keys=()):
         """Return a list of (key, value) pairs of the version specified by serial.
            If keys is empty, return all attributes.
            Othwerise, return only those specified.
@@ -556,48 +557,49 @@ class Node(DBWorker):
         if keys:
             marks = ','.join('?' for k in keys)
             q = ("select key, value from attributes "
-                 "where key in (%s) and serial = ?" % (marks,))
-            execute(q, keys + (serial,))
+                 "where key in (%s) and serial = ? and domain = ?" % (marks,))
+            execute(q, keys + (serial, domain))
         else:
             q = "select key, value from attributes where serial = ?"
             execute(q, (serial,))
         return self.fetchall()
     
-    def attribute_set(self, serial, items):
+    def attribute_set(self, serial, domain, items):
         """Set the attributes of the version specified by serial.
            Receive attributes as an iterable of (key, value) pairs.
         """
         
-        q = ("insert or replace into attributes (serial, key, value) "
-             "values (?, ?, ?)")
-        self.executemany(q, ((serial, k, v) for k, v in items))
+        q = ("insert or replace into attributes (serial, domain, key, value) "
+             "values (?, ?, ?, ?)")
+        self.executemany(q, ((serial, domain, k, v) for k, v in items))
     
-    def attribute_del(self, serial, keys=()):
+    def attribute_del(self, serial, domain, keys=()):
         """Delete attributes of the version specified by serial.
            If keys is empty, delete all attributes.
            Otherwise delete those specified.
         """
         
         if keys:
-            q = "delete from attributes where serial = ? and key = ?"
-            self.executemany(q, ((serial, key) for key in keys))
+            q = "delete from attributes where serial = ? and domain = ? and key = ?"
+            self.executemany(q, ((serial, domain, key) for key in keys))
         else:
-            q = "delete from attributes where serial = ?"
-            self.execute(q, (serial,))
+            q = "delete from attributes where serial = ? and domain = ?"
+            self.execute(q, (serial, domain))
     
     def attribute_copy(self, source, dest):
         q = ("insert or replace into attributes "
-             "select ?, key, value from attributes "
+             "select ?, domain, key, value from attributes "
              "where serial = ?")
         self.execute(q, (dest, source))
     
-    def _construct_filters(self, filterq):
-        if not filterq:
+    def _construct_filters(self, domain, filterq):
+        print '***', domain, filterq
+        if not domain or not filterq:
             return None, None
         
         subqlist = []
         append = subqlist.append
-        included, excluded, opers = parse_filters(filterq.split(','))
+        included, excluded, opers = parse_filters(filterq)
         args = []
         
         if included:
@@ -616,13 +618,14 @@ class Node(DBWorker):
             t = (("(a.key = ? and a.value %s ?)" % (o,)) for k, o, v in opers)
             subq = "(" + ' or '.join(t) + ")"
             for k, o, v in opers:
-                args += (k, v)
+                args += [k, v]
             append(subq)
         
         if not subqlist:
             return None, None
         
-        subq = ' ' + ' and '.join(subqlist)
+        subq = ' and a.domain = ? and ' + ' and '.join(subqlist)
+        args = [domain] + args
         
         return subq, args
     
@@ -637,7 +640,7 @@ class Node(DBWorker):
         
         return subq, args
     
-    def latest_attribute_keys(self, parent, before=inf, except_cluster=0, pathq=[]):
+    def latest_attribute_keys(self, parent, domain, before=inf, except_cluster=0, pathq=[]):
         """Return a list with all keys pairs defined
            for all latest versions under parent that
            do not belong to the cluster.
@@ -654,8 +657,9 @@ class Node(DBWorker):
                            "from nodes "
                            "where parent = ?) "
              "and a.serial = v.serial "
+             "and a.domain = ? "
              "and n.node = v.node")
-        args = (before, except_cluster, parent)
+        args = (before, except_cluster, parent, domain)
         subq, subargs = self._construct_paths(pathq)
         if subq is not None:
             q += subq
@@ -665,7 +669,7 @@ class Node(DBWorker):
     
     def latest_version_list(self, parent, prefix='', delimiter=None,
                             start='', limit=10000, before=inf,
-                            except_cluster=0, pathq=[], filterq=None):
+                            except_cluster=0, pathq=[], domain=None, filterq=[]):
         """Return a (list of (path, serial) tuples, list of common prefixes)
            for the current versions of the paths with the given parent,
            matching the following criteria.
@@ -736,7 +740,7 @@ class Node(DBWorker):
         if subq is not None:
             q += subq
             args += subargs
-        subq, subargs = self._construct_filters(filterq)
+        subq, subargs = self._construct_filters(domain, filterq)
         if subq is not None:
             q += subq
             args += subargs
index 0a29c53..ccb7211 100644 (file)
@@ -146,7 +146,7 @@ class ModularBackend(BaseBackend):
         else:
             meta = {}
             if props is not None:
-                meta.update(dict(self.node.attribute_get(props[self.SERIAL])))
+                meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
             if until is not None:
                 meta.update({'until_timestamp': tstamp})
             meta.update({'name': account, 'count': count, 'bytes': bytes})
@@ -161,7 +161,7 @@ class ModularBackend(BaseBackend):
         if user != account:
             raise NotAllowedError
         path, node = self._lookup_account(account, True)
-        self._put_metadata(user, node, meta, replace)
+        self._put_metadata(user, node, domain, meta, replace)
     
     @backend_method
     def get_account_groups(self, user, account):
@@ -261,7 +261,7 @@ class ModularBackend(BaseBackend):
             start, limit = self._list_limits(allowed, marker, limit)
             return allowed[start:start + limit]
         node = self.node.node_lookup(account)
-        return [x[0] for x in self._list_objects(node, account, '', '/', marker, limit, False, [], until)]
+        return [x[0] for x in self._list_objects(node, account, '', '/', marker, limit, False, None, [], until)]
     
     @backend_method
     def get_container_meta(self, user, account, container, domain, until=None):
@@ -285,7 +285,7 @@ class ModularBackend(BaseBackend):
         if user != account:
             meta = {'name': container}
         else:
-            meta = dict(self.node.attribute_get(props[self.SERIAL]))
+            meta = dict(self.node.attribute_get(props[self.SERIAL], domain))
             if until is not None:
                 meta.update({'until_timestamp': tstamp})
             meta.update({'name': container, 'count': count, 'bytes': bytes})
@@ -300,7 +300,7 @@ class ModularBackend(BaseBackend):
         if user != account:
             raise NotAllowedError
         path, node = self._lookup_container(account, container)
-        self._put_metadata(user, node, meta, replace)
+        self._put_metadata(user, node, domain, meta, replace)
     
     @backend_method
     def get_container_policy(self, user, account, container):
@@ -386,7 +386,7 @@ class ModularBackend(BaseBackend):
                 if not allowed:
                     return []
         path, node = self._lookup_container(account, container)
-        return self._list_objects(node, path, prefix, delimiter, marker, limit, virtual, keys, until, allowed)
+        return self._list_objects(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, allowed)
     
     @backend_method
     def list_object_meta(self, user, account, container, domain, until=None):
@@ -402,7 +402,7 @@ class ModularBackend(BaseBackend):
                 raise NotAllowedError
         path, node = self._lookup_container(account, container)
         before = until if until is not None else inf
-        return self.node.latest_attribute_keys(node, before, CLUSTER_DELETED, allowed)
+        return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
     
     @backend_method
     def get_object_meta(self, user, account, container, name, domain, version=None):
@@ -423,7 +423,7 @@ class ModularBackend(BaseBackend):
                     raise NameError('Object does not exist')
                 modified = del_props[self.MTIME]
         
-        meta = dict(self.node.attribute_get(props[self.SERIAL]))
+        meta = dict(self.node.attribute_get(props[self.SERIAL], domain))
         meta.update({'name': name, 'bytes': props[self.SIZE], 'hash':props[self.HASH]})
         meta.update({'version': props[self.SERIAL], 'version_timestamp': props[self.MTIME]})
         meta.update({'modified': modified, 'modified_by': props[self.MUSER]})
@@ -436,7 +436,7 @@ class ModularBackend(BaseBackend):
         logger.debug("update_object_meta: %s %s %s %s %s %s", account, container, name, domain, meta, replace)
         self._can_write(user, account, container, name)
         path, node = self._lookup_object(account, container, name)
-        src_version_id, dest_version_id = self._put_metadata(user, node, meta, replace)
+        src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
         self._apply_versioning(account, container, src_version_id)
         return dest_version_id
     
@@ -505,7 +505,7 @@ class ModularBackend(BaseBackend):
         hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
         return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
     
-    def _update_object_hash(self, user, account, container, name, size, hash, meta={}, replace_meta=False, permissions=None):
+    def _update_object_hash(self, user, account, container, name, size, hash, permissions=None):
         if permissions is not None and user != account:
             raise NotAllowedError
         self._can_write(user, account, container, name)
@@ -528,13 +528,10 @@ class ModularBackend(BaseBackend):
                 # This must be executed in a transaction, so the version is never created if it fails.
                 raise QuotaError
         
-        if not replace_meta and src_version_id is not None:
-            self.node.attribute_copy(src_version_id, dest_version_id)
-        self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems()))
         if permissions is not None:
             self.permissions.access_set(path, permissions)
         self._apply_versioning(account, container, src_version_id)
-        return dest_version_id
+        return src_version_id, dest_version_id
     
     @backend_method
     def update_object_hashmap(self, user, account, container, name, size, hashmap, domain, meta={}, replace_meta=False, permissions=None):
@@ -552,11 +549,12 @@ class ModularBackend(BaseBackend):
             raise ie
         
         hash = map.hash()
-        dest_version_id = self._update_object_hash(user, account, container, name, size, binascii.hexlify(hash), meta, replace_meta, permissions)
+        src_version_id, dest_version_id = self._update_object_hash(user, account, container, name, size, binascii.hexlify(hash), permissions)
+        self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace_meta)
         self.store.map_put(hash, map)
         return dest_version_id
     
-    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None):
+    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None):
         self._can_read(user, src_account, src_container, src_name)
         path, node = self._lookup_object(src_account, src_container, src_name)
         props = self._get_version(node, src_version)
@@ -564,17 +562,8 @@ class ModularBackend(BaseBackend):
         hash = props[self.HASH]
         size = props[self.SIZE]
         
-        if (src_account, src_container, src_name) == (dest_account, dest_container, dest_name):
-            dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, hash, dest_meta, replace_meta, permissions)
-        else:
-            if replace_meta:
-                meta = dest_meta
-            else:
-                meta = {}
-            dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, hash, meta, True, permissions)
-            if not replace_meta:
-                self.node.attribute_copy(src_version_id, dest_version_id)
-                self.node.attribute_set(dest_version_id, ((k, v) for k, v in dest_meta.iteritems()))
+        src_v_id, dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, hash, permissions)
+        self._put_metadata_duplicate(src_version_id, dest_version_id, dest_domain, dest_meta, replace_meta)
         return dest_version_id
     
     @backend_method
@@ -582,7 +571,7 @@ class ModularBackend(BaseBackend):
         """Copy an object's data and metadata."""
         
         logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, src_version)
-        return self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version)
+        return self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, src_version)
     
     @backend_method
     def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta={}, replace_meta=False, permissions=None):
@@ -591,7 +580,7 @@ class ModularBackend(BaseBackend):
         logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions)
         if user != src_account:
             raise NotAllowedError
-        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, meta, replace_meta, permissions, None)
+        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, None)
         if (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
             self._delete_object(user, src_account, src_container, src_name)
         return dest_version_id
@@ -768,19 +757,21 @@ class ModularBackend(BaseBackend):
         dest_version_id, mtime = self.node.version_create(node, hash, size, src_version_id, user, cluster)
         return src_version_id, dest_version_id
     
-    def _put_metadata(self, user, node, meta, replace=False):
+    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
+        if src_version_id is not None:
+            self.node.attribute_copy(src_version_id, dest_version_id)
+        if not replace:
+            self.node.attribute_del(dest_version_id, domain, (k for k, v in meta.iteritems() if v == ''))
+            self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems() if v != ''))
+        else:
+            self.node.attribute_del(dest_version_id, domain)
+            self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems()))
+    
+    def _put_metadata(self, user, node, domain, meta, replace=False):
         """Create a new version and store metadata."""
         
         src_version_id, dest_version_id = self._put_version_duplicate(user, node)
-        
-        # TODO: Merge with other functions that update metadata...
-        if not replace:
-            if src_version_id is not None:
-                self.node.attribute_copy(src_version_id, dest_version_id)
-            self.node.attribute_del(dest_version_id, (k for k, v in meta.iteritems() if v == ''))
-            self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems() if v != ''))
-        else:
-            self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems()))
+        self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace)
         return src_version_id, dest_version_id
     
     def _list_limits(self, listing, marker, limit):
@@ -794,14 +785,14 @@ class ModularBackend(BaseBackend):
             limit = 10000
         return start, limit
     
-    def _list_objects(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], until=None, allowed=[]):
+    def _list_objects(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, allowed=[]):
         cont_prefix = path + '/'
         prefix = cont_prefix + prefix
         start = cont_prefix + marker if marker else None
         before = until if until is not None else inf
-        filterq = ','.join(keys) if keys else None
+        filterq = keys if domain else []
         
-        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, filterq)
+        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq)
         objects.extend([(p, None) for p in prefixes] if virtual else [])
         objects.sort(key=lambda x: x[0])
         objects = [(x[0][len(cont_prefix):], x[1]) for x in objects]
index 0f64180..df935dd 100644 (file)
@@ -34,7 +34,7 @@
 import re
 
 
-_regexfilter = re.compile('(!?)\s*([\w-]+)\s*(=|!=|<=|>=|<|>)?\s*(.*)$', re.UNICODE)
+_regexfilter = re.compile('(!?)\s*(.+)\s*(=|!=|<=|>=|<|>)?\s*(.*)$', re.UNICODE)
 
 
 def parse_filters(terms):