from StringIO import StringIO
from hashlib import new as newhasher
from binascii import hexlify
+from httplib import HTTPConnection
+from urlparse import urlparse
import json
import unittest
self.client.share_object('φάκελος', 'ο1', ['%s:σεφς' % get_user()])
chef = Pithos_Client(get_url(),
'0009',
- 'διογένης',
- get_api())
+ 'διογένης')
self.assert_not_raises_fault(403, chef.retrieve_object_metadata,
'φάκελος', 'ο1', account=get_user())
def assert_read(self, authorized=[], any=False):
for token, account in OTHER_ACCOUNTS.items():
- cl = Pithos_Client(get_url(), token, account, get_api())
+ cl = Pithos_Client(get_url(), token, account)
if account in authorized or any:
self.assert_not_raises_fault(403, cl.retrieve_object_metadata,
'c', 'o', account=get_user())
#check inheritance
o = self.upload_random_data('c', 'o/also-shared')
for token, account in OTHER_ACCOUNTS.items():
- cl = Pithos_Client(get_url(), token, account, get_api())
+ cl = Pithos_Client(get_url(), token, account)
if account in authorized or any:
self.assert_not_raises_fault(403, cl.retrieve_object_metadata,
'c', 'o/also-shared', account=get_user())
def assert_write(self, o_data, authorized=[], any=False):
for token, account in OTHER_ACCOUNTS.items():
- cl = Pithos_Client(get_url(), token, account, get_api())
+ cl = Pithos_Client(get_url(), token, account)
new_data = get_random_data()
if account in authorized or any:
# test write access
o = self.upload_random_data('c', 'o/also-shared')
o_data = o['data']
for token, account in OTHER_ACCOUNTS.items():
- cl = Pithos_Client(get_url(), token, account, get_api())
+ cl = Pithos_Client(get_url(), token, account)
new_data = get_random_data()
if account in authorized or any:
# test write access
meta = self.client.retrieve_object_metadata('c', 'o')
self.assertTrue('x-object-public' in meta)
url = meta['x-object-public']
- public_client = Pithos_Client(get_url(), get_auth(), get_user(), api='')
- data = public_client.get(url)[2]
+
+ p = urlparse(get_url())
+ if p.scheme == 'http':
+ conn = HTTPConnection(p.netloc)
+ elif p.scheme == 'https':
+ conn = HTTPSConnection(p.netloc)
+ else:
+ raise Exception('Unknown URL scheme')
+
+ conn.request('GET', url)
+ resp = conn.getresponse()
+ length = resp.getheader('content-length', None)
+ data = resp.read(length)
self.assertEqual(o_data, data)
class AssertUUidInvariant(object):