from time import time
from sqlalchemy import Table, Integer, BigInteger, DECIMAL, Column, String, MetaData, ForeignKey
+from sqlalchemy.types import Text
from sqlalchemy.schema import Index, Sequence
-from sqlalchemy.sql import func, and_, or_, null, select, bindparam
+from sqlalchemy.sql import func, and_, or_, null, select, bindparam, text
from sqlalchemy.ext.compiler import compiles
+#from sqlalchemy.dialects.mysql import VARBINARY
+from sqlalchemy.engine.reflection import Inspector
from dbworker import DBWorker
ondelete='CASCADE',
onupdate='CASCADE'),
autoincrement=False))
- columns.append(Column('path', String(2048), default='', nullable=False))
+ path_length = 2048
+ path_length_in_bytes = path_length * 4
+ columns.append(Column('path', Text(path_length_in_bytes), default='', nullable=False))
self.nodes = Table('nodes', metadata, *columns, mysql_engine='InnoDB')
# place an index on path
- Index('idx_nodes_path', self.nodes.c.path)
+ #Index('idx_nodes_path', self.nodes.c.path)
#create policy table
columns=[]
metadata.create_all(self.engine)
+ # the following code creates an index of specific length
+ # this can be accompliced in sqlalchemy >= 0.7.3
+ # providing mysql_length option during index creation
+ insp = Inspector.from_engine(self.engine)
+ indexes = [elem['name'] for elem in insp.get_indexes('nodes')]
+ if 'idx_nodes_path' not in indexes:
+ s = text('CREATE INDEX idx_nodes_path ON nodes (path(%s))' %path_length_in_bytes)
+ self.conn.execute(s).close()
+
s = self.nodes.select().where(and_(self.nodes.c.node == ROOTNODE,
self.nodes.c.parent == ROOTNODE))
rp = self.conn.execute(s)
Return None if the path is not found.
"""
- s = select([self.nodes.c.node], self.nodes.c.path == path)
+ s = select([self.nodes.c.node], self.nodes.c.path.like(path))
r = self.conn.execute(s)
row = r.fetchone()
r.close()
self.assert_extended(objects, 'xml', 'object')
node_name = objects.getElementsByTagName('name')[0]
self.assertEqual(node_name.firstChild.data, '/objectname')
-
- #objects = self.client.list_objects('test', prefix='/')
- #self.assertEqual(objects, ['/objectname'])
- #
- #objects = self.client.list_objects('test', path='/')
- #self.assertEqual(objects, ['/objectname'])
- #
- #objects = self.client.list_objects('test', prefix='/', delimiter='n')
- #self.assertEqual(objects, ['/object'])
def test_list_objects_with_limit_marker(self):
objects = self.client.list_objects(self.container[0], limit=2)
self.objects[0]['meta'])
self.assertEqual(o, self.objects[0]['data'])
+ def test_objects_with_trailing_spaces(self):
+ self.client.create_container('test')
+ #create 'a' object
+ self.upload_random_data('test', 'a')
+ #look for 'a ' object
+ self.assert_raises_fault(404, self.client.retrieve_object,
+ 'test', 'a ')
+
+ #delete 'a' object
+ self.client.delete_object('test', 'a')
+ self.assert_raises_fault(404, self.client.retrieve_object,
+ 'test', 'a')
+
+ #create 'a ' object
+ self.upload_random_data('test', 'a ')
+ #look for 'a' object
+ self.assert_raises_fault(404, self.client.retrieve_object,
+ 'test', 'a')
+
def test_get_invalid(self):
self.assert_raises_fault(404, self.client.retrieve_object,
self.containers[0], self.objects[0]['name'])