From 17d862157cf9562e8e150449cdbff52c91eeb2e0 Mon Sep 17 00:00:00 2001 From: Stavros Sachtouris Date: Thu, 7 Feb 2013 14:06:22 +0200 Subject: [PATCH] Create a cached access api to Astakos >= 0.13 --- kamaki/cli/commands/astakos_cli.py | 106 -- kamaki/clients/tests.py | 2367 ------------------------------------ 2 files changed, 2473 deletions(-) delete mode 100644 kamaki/clients/tests.py diff --git a/kamaki/cli/commands/astakos_cli.py b/kamaki/cli/commands/astakos_cli.py index 0a13e23..b76fb81 100644 --- a/kamaki/cli/commands/astakos_cli.py +++ b/kamaki/cli/commands/astakos_cli.py @@ -33,14 +33,8 @@ from kamaki.cli import command from kamaki.clients.astakos import AstakosClient -<<<<<<< HEAD -from kamaki.cli.utils import print_dict, print_items -from kamaki.cli.errors import raiseCLIError, CLISyntaxError -from kamaki.cli.commands import _command_init -======= from kamaki.cli.utils import print_dict from kamaki.cli.commands import _command_init, errors ->>>>>>> develop from kamaki.cli.command_tree import CommandTree from kamaki.cli.argument import ValueArgument @@ -65,105 +59,6 @@ class _astakos_init(_command_init): @command(astakos_cmds) class astakos_authenticate(_astakos_init): -<<<<<<< HEAD - """Authenticate a user, show user information""" - - def main(self, token=None): - super(self.__class__, self).main() - try: - reply = self.client.authenticate(token) - except Exception as err: - raiseCLIError(err) - print_dict(reply) - - -@command(astakos_cmds) -class astakos_admin_userinfo(_astakos_init): - """Get user info, provided you have admin privileges""" - - def __init__(self, arguments={}): - super(self.__class__, self).__init__(arguments) - self.arguments['email'] = ValueArgument('target user email', '--email') - self.arguments['username'] = ValueArgument('target username', - '--username') - - def main(self, admin_token=None): - super(self.__class__, self).main() - email = self.get_argument('email') - username = self.get_argument('username') - try: - if email and username: - raise CLISyntaxError( - 'Arguments %s and %s are mutually exclusive' % ( - self.arguments['email'].parsed_name, - self.arguments['username'].parsed_name - ), importance=1) - elif email: - reply = self.client.get_user_by_email(email, - admin=True, - token=admin_token) - elif username: - reply = self.client.get_user_by_username(username, - admin=True, - token=admin_token) - else: - raise CLISyntaxError( - 'Exactly one of %s or %s is obligatory' % ( - self.arguments['email'].parsed_name, - self.arguments['username'].parsed_name - ), importance=1) - except Exception as err: - raiseCLIError(err) - print_dict(reply) - - -@command(astakos_cmds) -class astakos_service_userinfo(_astakos_init): - """Get user info with service token""" - - def __init__(self, arguments={}): - super(self.__class__, self).__init__(arguments) - self.arguments['email'] = ValueArgument('target user email', '--email') - self.arguments['username'] = ValueArgument('target username', - '--username') - - def main(self, service_token=None): - super(self.__class__, self).main() - email = self.get_argument('email') - username = self.get_argument('username') - try: - if email and username: - raise CLISyntaxError( - 'Arguments %s and %s are mutually exclusive' % ( - self.arguments['email'].parsed_name, - self.arguments['username'].parsed_name - ), importance=1) - elif email: - reply = self.client.get_user_by_email(email, - token=service_token) - elif username: - reply = self.client.get_user_by_username(username, - token=service_token) - else: - raise CLISyntaxError( - 'Exactly one of %s or %s is obligatory' % ( - self.arguments['email'].parsed_name, - self.arguments['username'].parsed_name - ), importance=1) - except Exception as err: - raiseCLIError(err) - print_dict(reply) - - -@command(astakos_cmds) -class astakos_service_list(_astakos_init): - """List cloud services associated with astakos""" - - def main(self, token=None): - super(self.__class__, self).main() - reply = self.client.list_services(token) - print_items(reply) -======= """Authenticate a user Get user information (e.g. unique account name) from token Token should be set in settings: @@ -181,4 +76,3 @@ class astakos_service_list(_astakos_init): def main(self, custom_token=None): self._run(custom_token) ->>>>>>> develop diff --git a/kamaki/clients/tests.py b/kamaki/clients/tests.py deleted file mode 100644 index 2d101fd..0000000 --- a/kamaki/clients/tests.py +++ /dev/null @@ -1,2367 +0,0 @@ -# Copyright 2011 GRNET S.A. All rights reserved. -# -# Redistribution and use in source and binary forms, with or -# without modification, are permitted provided that the following -# conditions are met: -# -# 1. Redistributions of source code must retain the above -# copyright notice, this list of conditions and the following -# disclaimer. -# -# 2. Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following -# disclaimer in the documentation and/or other materials -# provided with the distribution. -# -# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS -# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED -# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR -# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR -# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF -# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED -# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN -# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE -# POSSIBILITY OF SUCH DAMAGE. -# -# The views and conclusions contained in the software and -# documentation are those of the authors and should not be -# interpreted as representing official policies, either expressed -# or implied, of GRNET S.A. - -from argparse import ArgumentParser -import unittest -import time -import datetime -import os -import sys - -try: - from progress.bar import FillingCirclesBar as IncrementalBar -except ImportError: - print('No progress bars in testing!') - pass - -from kamaki.clients import ClientError -from kamaki.clients.pithos import PithosClient as pithos -from kamaki.clients.cyclades import CycladesClient as cyclades -from kamaki.clients.image import ImageClient as image -from kamaki.clients.astakos import AstakosClient as astakos -from kamaki.cli.config import Config - -TEST_ALL = False - -cnf = Config() -global_username = None -token = None - - -def _init_cnf(): - global cnf - global global_username - global_username = cnf.get('test', 'account') or\ - cnf.get('global', 'account') - global token - token = cnf.get('test', 'token') or cnf.get('global', 'token') - - -class testAstakos(unittest.TestCase): - def setUp(self): - _init_cnf() - global cnf - url = cnf.get('test', 'astakos_url') or cnf.get('astakos', 'url') - global token - self.client = astakos(url, token) - - def tearDown(self): - pass - - def test_authenticate(self): - r = self.client.authenticate() - for term in ('username', - 'auth_token_expires', - 'auth_token', - 'auth_token_created', - 'groups', - 'uniq', - 'has_credits', - 'has_signed_terms'): - self.assertTrue(term in r) - - def test_list_services(self): - r = self.client.list_services() - self.assertTrue(len(r) > 1) - for service in r: - self.assertTrue(set(('id', 'name')).issubset(service.keys())) - - -class testImage(unittest.TestCase): - def setUp(self): - _init_cnf() - global cnf - cyclades_url = cnf.get('compute', 'url') - url = cnf.get('image', 'url') - global token - self.token = token - self.imgid = 'b2dffe52-64a4-48c3-8a4c-8214cc3165cf' - self.now = time.mktime(time.gmtime()) - self.imgname = 'img_%s' % self.now - global global_username - self.imglocation = 'pithos://%s/pithos/my.img'\ - % global_username - self.client = image(url, self.token) - self.cyclades = cyclades(cyclades_url, self.token) - self._imglist = {} - - def _prepare_img(self): - global cnf - global global_username - username = global_username.split('@')[0] - imglocalpath =\ - '/home/%s/src/kamaki-settings/files/centos.diskdump' % username - f = open(imglocalpath, 'rb') - pithcli = pithos(cnf.get('store', 'url'), - self.token, - global_username, - 'pithos') - print('\t- Upload an image at %s...' % imglocalpath) - pithcli.upload_object('my.img', f) - print('\t- ok') - f.close() - - self.client.register(self.imgname, - self.imglocation, - params=dict(is_public=True)) - img = self._get_img_by_name(self.imgname) - self._imglist[self.imgname] = img - - def tearDown(self): - for img in self._imglist.values(): - self.cyclades.delete_image(img['id']) - - def _get_img_by_name(self, name): - r = self.cyclades.list_images() - for img in r: - if img['name'] == name: - return img - return None - - def assert_dicts_are_deeply_equal(self, d1, d2): - for k, v in d1.items(): - self.assertTrue(k in d2) - if isinstance(v, dict): - self.assert_dicts_are_deeply_equal(v, d2[k]) - else: - self.assertEqual(unicode(v), unicode(d2[k])) - - def test_list_public(self): - """Test list_public""" - r = self.client.list_public() - r0 = self.client.list_public(order='-') - self.assertTrue(len(r) > 0) - for img in r: - for term in ('status', - 'name', - 'container_format', - 'disk_format', - 'id', - 'size'): - self.assertTrue(term in img) - self.assertTrue(len(r), len(r0)) - r0.reverse() - for i, img in enumerate(r): - self.assert_dicts_are_deeply_equal(img, r0[i]) - r1 = self.client.list_public(detail=True) - for img in r1: - for term in ('status', - 'name', - 'checksum', - 'created_at', - 'disk_format', - 'updated_at', - 'id', - 'location', - 'container_format', - 'owner', - 'is_public', - 'deleted_at', - 'properties', - 'size'): - self.assertTrue(term in img) - for interm in ( - 'osfamily', - 'users', - 'os', - 'root_partition', - 'description'): - self.assertTrue(interm in img['properties']) - size_max = 1000000000 - r2 = self.client.list_public(filters=dict(size_max=size_max)) - self.assertTrue(len(r2) <= len(r)) - for img in r2: - self.assertTrue(int(img['size']) <= size_max) - - def test_get_meta(self): - """Test get_meta""" - r = self.client.get_meta(self.imgid) - self.assertEqual(r['id'], self.imgid) - for term in ('status', - 'name', - 'checksum', - 'updated-at', - 'created-at', - 'deleted-at', - 'location', - 'is-public', - 'owner', - 'disk-format', - 'size', - 'container-format'): - self.assertTrue(term in r) - for interm in ('kernel', - 'osfamily', - 'users', - 'gui', 'sortorder', - 'root-partition', - 'os', - 'description'): - self.assertTrue(interm in r['properties']) - - def test_register(self): - """Test register""" - self._prepare_img() - self.assertTrue(len(self._imglist) > 0) - for img in self._imglist.values(): - self.assertTrue(img != None) - - def test_reregister(self): - """Test reregister""" - self._prepare_img() - self.client.reregister(self.imglocation, - properties=dict(my_property='some_value')) - - def test_set_members(self): - """Test set_members""" - self._prepare_img() - members = ['%s@fake.net' % self.now] - for img in self._imglist.values(): - self.client.set_members(img['id'], members) - r = self.client.list_members(img['id']) - self.assertEqual(r[0]['member_id'], members[0]) - - def test_list_members(self): - """Test list_members""" - self.test_set_members() - - def test_remove_members(self): - """Test remove_members - NO CHECK""" - return - self._prepare_img() - members = ['%s@fake.net' % self.now, '%s_v2@fake.net' % self.now] - for img in self._imglist.values(): - self.client.set_members(img['id'], members) - r = self.client.list_members(img['id']) - self.assertTrue(len(r) > 1) - self.client.remove_member(img['id'], members[0]) - r0 = self.client.list_members(img['id']) - self.assertEqual(len(r), 1 + len(r0)) - self.assertEqual(r0[0]['member_id'], members[1]) - - def test_list_shared(self): - """Test list_shared - NOT CHECKED""" - #No way to test this, if I dont have member images - pass - - -class testCyclades(unittest.TestCase): - """Set up a Cyclades thorough test""" - def setUp(self): - """okeanos""" - _init_cnf() - global cnf - url = cnf.get('compute', 'url') - global token - global global_username - self.img = 'b2dffe52-64a4-48c3-8a4c-8214cc3165cf' - self.img_details = { - u'status': u'ACTIVE', - u'updated': u'2012-11-19T13:52:16+00:00', - u'name': u'Debian Base', - u'created': u'2012-10-16T09:03:12+00:00', - u'progress': 100, - u'id': self.img, - u'metadata': { - u'values': { - u'kernel': u'2.6.32', - u'osfamily': u'linux', - u'users': u'root', - u'gui': u'No GUI', - u'sortorder': u'1', - u'os': u'debian', - u'root_partition': u'1', - u'description': u'Debian 6.0.6 (Squeeze) Base System'} - } - } - self.flavor_details = {u'name': u'C1R1024D20', - u'ram': 1024, - u'id': 1, - u'SNF:disk_template': u'drbd', - u'disk': 20, - u'cpu': 1} - self.PROFILES = ('ENABLED', 'DISABLED', 'PROTECTED') - - """okeanos.io""" - """ - self.img = 'b3e68235-3abd-4d60-adfe-1379a4f8d3fe' - self.img_details = { - u'status': u'ACTIVE', - u'updated': u'2012-11-19T15:29:51+00:00', - u'name': u'Debian Base', - u'created': u'2012-11-19T14:54:57+00:00', - u'progress': 100, - u'id': self.img, - u'metadata': { - u'values': { - u'kernel': u'2.6.32', - u'osfamily': u'linux', - u'users': u'root', - u'gui': u'No GUI', - u'sortorder': u'1', - u'os': u'debian', - u'root_partition': u'1', - u'description': u'Debian 6.0.6 (Squeeze) Base System'} - } - } - """ - - self.servers = {} - self.now = time.mktime(time.gmtime()) - self.servname1 = 'serv' + unicode(self.now) - self.servname2 = self.servname1 + '_v2' - self.flavorid = 1 - #servers have to be created at the begining... - self.networks = {} - self.netname1 = 'net' + unicode(self.now) - self.netname2 = 'net' + unicode(self.now) + '_v2' - - self.client = cyclades(url, token) - pass - - def tearDown(self): - """Destoy servers used in testing""" - print - for netid in self.networks.keys(): - self._delete_network(netid) - if 0 >= len(self.servers): - return - print('-> Found %s servers to delete' % len(self.servers)) - for server in self.servers.values(): - self._delete_server(server['id']) - - def _create_server(self, servername, flavorid, imageid, personality=None): - server = self.client.create_server(servername, - flavorid, - imageid, - personality) - self.servers[servername] = server - return server - - def _delete_server(self, servid): - try: - current_state = self.client.get_server_details(servid) - current_state = current_state['status'] - if current_state == 'DELETED': - return - except: - return - self.client.delete_server(servid) - self._wait_for_status(servid, current_state) - - def _create_network(self, netname, **kwargs): - net = self.client.create_network(netname, **kwargs) - self.networks[net['id']] = net - return net - - def _delete_network(self, netid): - sys.stdout.write('\tDelete network %s ' % netid) - self.client.disconnect_network_nics(netid) - wait = 3 - while True: - try: - self.client.delete_network(netid) - print('\n\tSUCCESFULL COMMIT delete network %s' % netid) - break - except ClientError as err: - self.assertEqual(err.status, 421) - time.sleep(wait) - wait += 3 - sys.stdout.write('.') - - def if_not_all(foo): - global TEST_ALL - if TEST_ALL: - return None - return foo - - def assert_dicts_are_deeply_equal(self, d1, d2): - for k, v in d1.items(): - self.assertTrue(k in d2) - if isinstance(v, dict): - self.assert_dicts_are_deeply_equal(v, d2[k]) - else: - self.assertEqual(unicode(v), unicode(d2[k])) - - def test_000(self): - "Prepare a full Cyclades test scenario" - global TEST_ALL - TEST_ALL = True - - self.server1 = self._create_server(self.servname1, - self.flavorid, - self.img) - self.server2 = self._create_server(self.servname2, - self.flavorid + 2, - self.img) - - print('testing') - sys.stdout.write(' test create server') - self._test_create_server() - print('...ok') - - sys.stdout.write(' test list servers') - self._test_list_servers() - print('...ok') - - print('- wait for test servers to build') - self._wait_for_status(self.server1['id'], 'BUILD') - self._wait_for_status(self.server2['id'], 'BUILD') - print('- ok') - - sys.stdout.write(' test get server details') - self._test_get_server_details() - print('...ok') - - sys.stdout.write(' test get image details') - self._test_get_image_details() - print('...ok') - - sys.stdout.write(' test update_server_name') - self._test_update_server_name() - print('...ok') - - sys.stdout.write(' test reboot_server') - self._test_reboot_server() - print('...ok') - - print('- wait for test servers to boot') - self._wait_for_status(self.server1['id'], 'REBOOT') - self._wait_for_status(self.server2['id'], 'REBOOT') - print('- ok') - - sys.stdout.write(' test create_server_metadata') - self._test_create_server_metadata() - print('...ok') - - sys.stdout.write(' test get_server_metadata') - self._test_get_server_metadata() - print('...ok') - - sys.stdout.write(' test update_server_metadata') - self._test_update_server_metadata() - print('...ok') - - sys.stdout.write(' test delete_server_metadata') - self._test_delete_server_metadata() - print('...ok') - - sys.stdout.write(' test list_flavors') - self._test_list_flavors() - print('...ok') - - sys.stdout.write(' test get_flavor_details') - self._test_get_flavor_details() - print('...ok') - - sys.stdout.write(' test list_images') - self._test_list_images() - print('...ok') - - sys.stdout.write(' test get_image_details') - self._test_get_image_details() - print('...ok') - - sys.stdout.write(' test get_image_metadata') - self._test_get_image_metadata() - print('...ok') - - sys.stdout.write(' test shutdown_server') - self._test_shutdown_server() - print('...ok') - - sys.stdout.write(' test start_server') - self._test_start_server() - print('...ok') - - sys.stdout.write(' test get_server_console') - self._test_get_server_console() - print('...ok') - - sys.stdout.write(' test get_firewall_profile') - print('\tSKIP') - #self._test_get_firewall_profile() - print('...ok') - - sys.stdout.write(' test set_firewall_profile') - print('\tSKIP') - #self._test_set_firewall_profile() - print('...ok') - - sys.stdout.write(' test get_server_stats') - self._test_get_server_stats() - print('...ok') - - self.network1 = self._create_network(self.netname1) - - sys.stdout.write(' test create_network') - self._test_create_network() - print('...ok') - - print('- wait for network to be activated') - self._wait_for_network(self.network1['id'], 'ACTIVE') - print('- ok') - - sys.stdout.write(' test connect_server') - self._test_connect_server() - print('...ok') - - sys.stdout.write(' test disconnect_server') - self._test_disconnect_server() - print('...ok') - - self.network2 = self._create_network(self.netname2) - print('- wait for network to be activated') - self._wait_for_network(self.network2['id'], 'ACTIVE') - print('- ok') - - sys.stdout.write(' test list_server_nics') - self._test_list_server_nics() - print('...ok') - - sys.stdout.write(' test list_networks') - self._test_list_networks() - print('...ok') - - sys.stdout.write(' test get_network_details') - self._test_get_network_details() - print('...ok') - - sys.stdout.write(' test update_network_name') - self._test_update_network_name() - print('...ok') - - """Don't have auth for these: - sys.stdout.write(' test delete_image') - self._test_delete_image() - print('...ok') - sys.stdout.write(' test create_image_metadata') - self._test_create_image_metadata() - print('...ok') - sys.stdout.write(' test update_image_metadata') - self._test_update_image_metadata() - print('...ok') - sys.stdout.write(' test delete_image_metadata') - self._test_delete_image_metadata() - print('...ok') - """ - - @if_not_all - def test_parallel_creation(self): - """test create with multiple threads""" - from kamaki.clients import SilentEvent - c1 = SilentEvent(self._create_server, - self.servname1, - self.flavorid, - self.img) - c2 = SilentEvent(self._create_server, - self.servname2, - self.flavorid + 2, - self.img) - c3 = SilentEvent(self._create_server, - self.servname1, - self.flavorid, - self.img) - c4 = SilentEvent(self._create_server, - self.servname2, - self.flavorid + 2, - self.img) - c5 = SilentEvent(self._create_server, - self.servname1, - self.flavorid, - self.img) - c6 = SilentEvent(self._create_server, - self.servname2, - self.flavorid + 2, - self.img) - c7 = SilentEvent(self._create_server, - self.servname1, - self.flavorid, - self.img) - c8 = SilentEvent(self._create_server, - self.servname2, - self.flavorid + 2, - self.img) - c1.start() - c2.start() - c3.start() - c4.start() - c5.start() - c6.start() - c7.start() - c8.start() - - def _wait_for_network(self, netid, status): - wait = 3 - limit = 50 - c = ['|', '/', '-', '\\'] - sys.stdout.write('\t- make net %s %s ' % (netid, status)) - while wait < limit: - r = self.client.get_network_details(netid) - if r['status'] == status: - print('\tOK') - return True - sys.stdout.write('\tit is now %s, wait %ss '\ - % (r['status'], wait)) - for i in range(wait * 4): - sys.stdout.write('\b%s' % c[i % 4]) - sys.stdout.flush() - time.sleep(0.25) - print('\b ') - wait += 3 - return False - - def _wait_for_nic(self, netid, servid, in_creation=True): - self._wait_for_network(netid, 'ACTIVE') - c = ['|', '/', '-', '\\'] - limit = 50 - wait = 3 - largetry = 0 - while wait < limit: - nics = self.client.list_server_nics(servid) - for net in nics: - found_nic = net['network_id'] == netid - if (in_creation and found_nic)\ - or not (in_creation or found_nic): - return True - dis = '' if in_creation else 'dis' - sys.stdout.write('\twait nic %s to %sconnect to %s: %ss '\ - % (netid, dis, servid, wait)) - for i in range(wait * 4): - sys.stdout.write('\b%s' % c[i % 4]) - sys.stdout.flush() - time.sleep(0.25) - print('\b ') - wait += 3 - if wait >= limit and largetry < 3: - wait = 3 - largetry += 1 - return False - - def _has_status(self, servid, status): - r = self.client.get_server_details(servid) - return r['status'] == status - - def _wait_for_status(self, servid, status): - withbar = True - try: - wait_bar = IncrementalBar('\tServer[%s] in %s ' % (servid, status)) - except NameError: - withbar = False - - wait_cb = None - if withbar: - wait_bar.start() - - def progress_gen(n): - for i in wait_bar.iter(range(int(n))): - yield - yield - - wait_cb = progress_gen - - time.sleep(0.5) - self.client.wait_server(servid, status, wait_cb=wait_cb) - if withbar: - wait_bar.finish() - - @if_not_all - def test_list_servers(self): - """Test list servers""" - self.server1 = self._create_server(self.servname1, - self.flavorid, - self.img) - self.server2 = self._create_server(self.servname2, - self.flavorid + 1, - self.img) - self._test_list_servers() - - def _test_list_servers(self): - servers = self.client.list_servers() - dservers = self.client.list_servers(detail=True) - - """detailed and simple are same size""" - self.assertEqual(len(dservers), len(servers)) - for i in range(len(servers)): - for field in ('created', - 'flavorRef', - 'hostId', - 'imageRef', - 'progress', - 'status', - 'updated'): - self.assertFalse(field in servers[i]) - self.assertTrue(field in dservers[i]) - - """detailed and simple contain same names""" - names = sorted(map(lambda x: x["name"], servers)) - dnames = sorted(map(lambda x: x["name"], dservers)) - self.assertEqual(names, dnames) - - @if_not_all - def test_create_server(self): - """Test create_server""" - self.server1 = self._create_server(self.servname1, - self.flavorid, - self.img) - self._wait_for_status(self.server1['id'], 'BUILD') - self._test_create_server() - - def _test_create_server(self): - self.assertEqual(self.server1["name"], self.servname1) - self.assertEqual(self.server1["flavorRef"], self.flavorid) - self.assertEqual(self.server1["imageRef"], self.img) - self.assertEqual(self.server1["status"], "BUILD") - - @if_not_all - def test_get_server_details(self): - """Test get_server_details""" - self.server1 = self._create_server(self.servname1, - self.flavorid, - self.img) - self._wait_for_status(self.server1['id'], 'BUILD') - self._test_get_server_details() - - def _test_get_server_details(self): - r = self.client.get_server_details(self.server1['id']) - self.assertEqual(r["name"], self.servname1) - self.assertEqual(r["flavorRef"], self.flavorid) - self.assertEqual(r["imageRef"], self.img) - self.assertEqual(r["status"], "ACTIVE") - - @if_not_all - def test_update_server_name(self): - """Test update_server_name""" - self.server1 = self._create_server(self.servname1, - self.flavorid, - self.img) - self._test_update_server_name() - - def _test_update_server_name(self): - new_name = self.servname1 + '_new_name' - self.client.update_server_name(self.server1['id'], new_name) - r = self.client.get_server_details(self.server1['id'], - success=(200, 400)) - self.assertEqual(r['name'], new_name) - changed = self.servers.pop(self.servname1) - changed['name'] = new_name - self.servers[new_name] = changed - - @if_not_all - def test_reboot_server(self): - """Test reboot server""" - self.server1 = self._create_server(self.servname1, - self.flavorid, - self.img) - self._wait_for_status(self.server1['id'], 'BUILD') - self.server2 = self._create_server(self.servname2, - self.flavorid + 1, - self.img) - self._wait_for_status(self.server2['id'], 'BUILD') - self._test_reboot_server() - self._wait_for_status(self.server1['id'], 'REBOOT') - self._wait_for_status(self.server2['id'], 'REBOOT') - - def _test_reboot_server(self): - self.client.reboot_server(self.server1['id']) - self.assertTrue(self._has_status(self.server1['id'], 'REBOOT')) - self.client.reboot_server(self.server2['id'], hard=True) - self.assertTrue(self._has_status(self.server2['id'], 'REBOOT')) - - @if_not_all - def test_get_server_metadata(self): - """Test get server_metadata""" - self.server1 = self._create_server(self.servname1, - self.flavorid, - self.img) - self._test_get_server_metadata() - - def _test_get_server_metadata(self): - self.client.create_server_metadata(self.server1['id'], - 'mymeta_0', - 'val_0') - r = self.client.get_server_metadata(self.server1['id'], 'mymeta_0') - self.assertEqual(r['mymeta_0'], 'val_0') - - @if_not_all - def test_create_server_metadata(self): - """Test create_server_metadata""" - self.server1 = self._create_server(self.servname1, - self.flavorid, - self.img) - self._test_create_server_metadata() - - def _test_create_server_metadata(self): - r1 = self.client.create_server_metadata(self.server1['id'], - 'mymeta', - 'mymeta val') - self.assertTrue('mymeta' in r1) - r2 = self.client.get_server_metadata(self.server1['id'], 'mymeta') - self.assert_dicts_are_deeply_equal(r1, r2) - - @if_not_all - def test_update_server_metadata(self): - """Test update_server_metadata""" - self.server1 = self._create_server(self.servname1, - self.flavorid, - self.img) - self._test_update_server_metadata() - - def _test_update_server_metadata(self): - r1 = self.client.create_server_metadata(self.server1['id'], - 'mymeta3', - 'val2') - self.assertTrue('mymeta3'in r1) - r2 = self.client.update_server_metadata(self.server1['id'], - mymeta3='val3') - self.assertTrue(r2['mymeta3'], 'val3') - - @if_not_all - def test_delete_server_metadata(self): - """Test delete_server_metadata""" - self.server1 = self._create_server(self.servname1, - self.flavorid, - self.img) - self._test_delete_server_metadata() - - def _test_delete_server_metadata(self): - r1 = self.client.create_server_metadata(self.server1['id'], - 'mymeta', - 'val') - self.assertTrue('mymeta' in r1) - self.client.delete_server_metadata(self.server1['id'], 'mymeta') - try: - self.client.get_server_metadata(self.server1['id'], 'mymeta') - raise ClientError('Wrong Error', status=100) - except ClientError as err: - self.assertEqual(err.status, 404) - - @if_not_all - def test_list_flavors(self): - """Test flavors_get""" - self._test_list_flavors() - - def _test_list_flavors(self): - r = self.client.list_flavors() - self.assertTrue(len(r) > 1) - r = self.client.list_flavors(detail=True) - self.assertTrue('SNF:disk_template' in r[0]) - - @if_not_all - def test_get_flavor_details(self): - """Test test_get_flavor_details""" - self._test_get_flavor_details() - - def _test_get_flavor_details(self): - r = self.client.get_flavor_details(self.flavorid) - self.assert_dicts_are_deeply_equal(self.flavor_details, r) - - @if_not_all - def test_list_images(self): - """Test list_images""" - self._test_list_images() - - def _test_list_images(self): - r = self.client.list_images() - self.assertTrue(len(r) > 1) - r = self.client.list_images(detail=True) - for detailed_img in r: - if detailed_img['id'] == self.img: - break - self.assert_dicts_are_deeply_equal(detailed_img, self.img_details) - - @if_not_all - def test_get_image_details(self): - """Test image_details""" - self._test_get_image_details() - - def _test_get_image_details(self): - r = self.client.get_image_details(self.img) - r.pop('updated') - self.assert_dicts_are_deeply_equal(r, self.img_details) - - @if_not_all - def test_get_image_metadata(self): - """Test get_image_metadata""" - self._test_get_image_metadata() - - def _test_get_image_metadata(self): - r = self.client.get_image_metadata(self.img) - self.assert_dicts_are_deeply_equal( - self.img_details['metadata']['values'], r) - for key, val in self.img_details['metadata']['values'].items(): - r = self.client.get_image_metadata(self.img, key) - self.assertEqual(r[key], val) - - @if_not_all - def test_start_server(self): - """Test start_server""" - self.server1 = self._create_server(self.servname1, - self.flavorid, - self.img) - self._wait_for_status(self.server1['id'], 'BUILD') - self.client.shutdown_server(self.server1['id']) - self._wait_for_status(self.server1['id'], 'ACTIVE') - self._test_start_server() - - def _test_start_server(self): - self.client.start_server(self.server1['id']) - self._wait_for_status(self.server1['id'], 'STOPPED') - r = self.client.get_server_details(self.server1['id']) - self.assertEqual(r['status'], 'ACTIVE') - - @if_not_all - def test_shutdown_server(self): - """Test shutdown_server""" - self.server1 = self._create_server(self.servname1, - self.flavorid, - self.img) - self._wait_for_status(self.server1['id'], 'BUILD') - self._test_shutdown_server() - - def _test_shutdown_server(self): - self.client.shutdown_server(self.server1['id']) - self._wait_for_status(self.server1['id'], 'ACTIVE') - r = self.client.get_server_details(self.server1['id']) - self.assertEqual(r['status'], 'STOPPED') - - @if_not_all - def test_get_server_console(self): - """Test get_server_console""" - self.server2 = self._create_server(self.servname2, - self.flavorid + 2, - self.img) - self._wait_for_status(self.server2['id'], 'BUILD') - self._test_get_server_console() - - def _test_get_server_console(self): - r = self.client.get_server_console(self.server2['id']) - self.assertTrue('host' in r) - self.assertTrue('password' in r) - self.assertTrue('port' in r) - self.assertTrue('type' in r) - - @if_not_all - def test_get_firewall_profile(self): - """Test get_firewall_profile""" - self.server1 = self._create_server(self.servname1, - self.flavorid, - self.img) - self._test_get_firewall_profile() - - def _test_get_firewall_profile(self): - self._wait_for_status(self.server1['id'], 'BUILD') - fprofile = self.client.get_firewall_profile(self.server1['id']) - self.assertTrue(fprofile in self.PROFILES) - - @if_not_all - def test_set_firewall_profile(self): - """Test set_firewall_profile""" - self.server1 = self._create_server(self.servname1, - self.flavorid, - self.img) - self._test_set_firewall_profile() - - def _test_set_firewall_profile(self): - - self._wait_for_status(self.server1['id'], 'BUILD') - PROFILES = ['DISABLED', 'ENABLED', 'PROTECTED'] - fprofile = self.client.get_firewall_profile(self.server1['id']) - print('') - count_success = 0 - for counter, fprofile in enumerate(PROFILES): - npos = counter + 1 - try: - nprofile = PROFILES[npos] - except IndexError: - nprofile = PROFILES[0] - print('\tprofile swap %s: %s -> %s' % (npos, fprofile, nprofile)) - self.client.set_firewall_profile(self.server1['id'], nprofile) - time.sleep(0.5) - self.client.reboot_server(self.server1['id'], hard=True) - time.sleep(1) - self._wait_for_status(self.server1['id'], 'REBOOT') - time.sleep(0.5) - changed = self.client.get_firewall_profile(self.server1['id']) - try: - self.assertEqual(changed, nprofile) - except AssertionError as err: - if count_success: - print('\tFAIL in swap #%s' % npos) - break - else: - raise err - count_success += 1 - - @if_not_all - def test_get_server_stats(self): - self.server1 = self._create_server(self.servname1, - self.flavorid, - self.img) - self._test_get_server_stats() - - def _test_get_server_stats(self): - r = self.client.get_server_stats(self.server1['id']) - for term in ('cpuBar', - 'cpuTimeSeries', - 'netBar', - 'netTimeSeries', - 'refresh'): - self.assertTrue(term in r) - - @if_not_all - def test_list_networks(self): - """Test list_network""" - self.network1 = self._create_network(self.netname1) - self._wait_for_network(self.network1['id'], 'ACTIVE') - self._test_list_networks() - - def _test_list_networks(self): - r = self.client.list_networks() - self.assertTrue(len(r) > 1) - ids = [net['id'] for net in r] - names = [net['name'] for net in r] - self.assertTrue('1' in ids) - #self.assertTrue('public' in names) - self.assertTrue(self.network1['id'] in ids) - self.assertTrue(self.network1['name'] in names) - - r = self.client.list_networks(detail=True) - ids = [net['id'] for net in r] - names = [net['name'] for net in r] - for net in r: - self.assertTrue(net['id'] in ids) - self.assertTrue(net['name'] in names) - for term in ('status', 'updated', 'created'): - self.assertTrue(term in net.keys()) - - @if_not_all - def test_create_network(self): - """Test create_network""" - self.network1 = self._create_network(self.netname1) - self._test_create_network() - - def _test_create_network(self): - nets = self.client.list_networks(self.network1['id']) - chosen = [net for net in nets if net['id'] == self.network1['id']][0] - chosen.pop('updated') - net1 = dict(self.network1) - net1.pop('updated') - self.assert_dicts_are_deeply_equal(chosen, net1) - - @if_not_all - def test_connect_server(self): - """Test connect_server""" - self.server1 = self._create_server(self.servname1, - self.flavorid, - self.img) - self.network1 = self._create_network(self.netname1) - self._wait_for_status(self.server1['id'], 'BUILD') - self._wait_for_network(self.network1['id'], 'ACTIVE') - self._test_connect_server() - - def _test_connect_server(self): - self.client.connect_server(self.server1['id'], self.network1['id']) - self.assertTrue(self._wait_for_nic(self.network1['id'], - self.server1['id'])) - - @if_not_all - def test_disconnect_server(self): - """Test disconnect_server""" - self.test_connect_server() - self._test_disconnect_server() - - def _test_disconnect_server(self): - self.client.disconnect_server(self.server1['id'], self.network1['id']) - self.assertTrue(self._wait_for_nic(self.network1['id'], - self.server1['id'], - in_creation=False)) - - @if_not_all - def test_list_server_nics(self): - """Test list_server_nics""" - self.server1 = self._create_server(self.servname1, - self.flavorid, - self.img) - self.network2 = self._create_network(self.netname2) - self._wait_for_status(self.server1['id'], 'BUILD') - self._wait_for_network(self.network2['id'], 'ACTIVE') - self._test_list_server_nics() - - def _test_list_server_nics(self): - r = self.client.list_server_nics(self.server1['id']) - len0 = len(r) - - self.client.connect_server(self.server1['id'], self.network2['id']) - self.assertTrue(self._wait_for_nic(self.network2['id'], - self.server1['id'])) - r = self.client.list_server_nics(self.server1['id']) - self.assertTrue(len(r) > len0) - - @if_not_all - def test_get_network_details(self): - """Test get_network_details""" - self.network1 = self._create_network(self.netname1) - self._test_get_network_details() - - def _test_get_network_details(self): - r = self.client.get_network_details(self.network1['id']) - net1 = dict(self.network1) - net1.pop('status') - net1.pop('updated', None) - net1.pop('attachments') - r.pop('status') - r.pop('updated', None) - r.pop('attachments') - self.assert_dicts_are_deeply_equal(net1, r) - - @if_not_all - def test_update_network_name(self): - self.network2 = self._create_network(self.netname2) - self._test_update_network_name() - - def _test_update_network_name(self): - updated_name = self.netname2 + '_upd' - self.client.update_network_name(self.network2['id'], updated_name) - wait = 3 - c = ['|', '/', '-', '\\'] - r = self.client.get_network_details(self.network2['id']) - while wait < 50: - if r['name'] == updated_name: - break - sys.stdout.write( - '\twait for %s renaming (%s->%s) %ss ' % (self.network2['id'], - self.network2['name'], - updated_name, wait)) - for i in range(4 * wait): - sys.stdout.write('\b%s' % c[i % 4]) - sys.stdout.flush() - time.sleep(0.25) - print('') - wait += 3 - r = self.client.get_network_details(self.network2['id']) - self.assertEqual(r['name'], updated_name) - - """ Don't have auth to test this - @if_not_all - def test_delete_image(self): - ""Test delete_image"" - self._test_delete_image() - def _test_delete_image(self): - images = self.client.list_images() - self.client.delete_image(images[2]['id']) - try: - r = self.client.get_image_details(images[2]['id'], success=(400)) - except ClientError as err: - self.assertEqual(err.status, 404) - - @if_not_all - def test_create_image_metadata(self): - ""Test create_image_metadata"" - self._test_create_image_metadata() - def _test_create_image_metadata(self): - r = self.client.create_image_metadata(self.img, 'mykey', 'myval') - self.assertEqual(r['mykey'], 'myval') - - @if_not_all - def test_update_image_metadata(self): - ""Test update_image_metadata"" - self._test_update_image_metadata() - def _test_update_image_metadata(self): - r = self.client.create_image_metadata(self.img, 'mykey0', 'myval') - r = self.client.update_image_metadata(self.img, 'mykey0', 'myval0') - self.assertEqual(r['mykey0'], 'myval0') - - @if_not_all - def test_delete_image_metadata(self): - ""Test delete_image_metadata"" - self._test_delete_image_metadata() - def _test_delete_image_metadata(self): - self.client.create_image_metadata(self.img, 'mykey1', 'myval1') - self.client.delete_image_metadata(self.img, 'mykey1') - r = self.client.get_image_metadata(self.img) - self.assertNotEqual('mykey1' in r) - """ - - -class testPithos(unittest.TestCase): - """Set up a Pithos+ thorough test""" - def setUp(self): - _init_cnf() - global cnf - url = cnf.get('store', 'url') - - global token - global global_username - account = global_username - - """ - url='https://pithos.okeanos.io/v1' - """ - - """ - def add_handler(name, level, prefix=''): - h = logging.StreamHandler() - fmt = logging.Formatter(prefix + '%(message)s') - h.setFormatter(fmt) - logger = logging.getLogger(name) - logger.addHandler(h) - logger.setLevel(level) - import logging - sendlog = logging.getLogger('clients.send') - recvlog = logging.getLogger('clients.recv') - add_handler('requests', logging.INFO, prefix='* ') - add_handler('clients.send', logging.INFO, prefix='> ') - add_handler('clients.recv', logging.INFO, prefix='< ') - """ - - self.fname = None - container = None - self.client = pithos(url, token, account, container) - self.now = time.mktime(time.gmtime()) - self.c1 = 'c1_' + unicode(self.now) - self.c2 = 'c2_' + unicode(self.now) - self.c3 = 'c3_' + unicode(self.now) - - self.client.create_container(self.c1) - self.client.create_container(self.c2) - self.client.create_container(self.c3) - self.makeNewObject(self.c1, 'test') - self.makeNewObject(self.c2, 'test') - self.now_unformated = datetime.datetime.utcnow() - self.makeNewObject(self.c1, 'test1') - self.makeNewObject(self.c2, 'test1') - """Prepare an object to be shared - also its container""" - self.client.container = self.c1 - self.client.object_post('test', - update=True, - permissions={'read': 'someUser'}) - - self.makeNewObject(self.c1, 'another.test') - - def makeNewObject(self, container, obj): - self.client.container = container - self.client.object_put(obj, - content_type='application/octet-stream', - data='file %s that lives in %s' % (obj, container), - metadata={'incontainer': container}) - - def forceDeleteContainer(self, container): - self.client.container = container - try: - r = self.client.list_objects() - except ClientError: - return - for obj in r: - name = obj['name'] - self.client.del_object(name) - r = self.client.container_delete() - self.container = '' - - def tearDown(self): - """Destroy test cases""" - if self.fname is not None: - try: - os.remove(self.fname) - except OSError: - pass - self.fname = None - self.forceDeleteContainer(self.c1) - self.forceDeleteContainer(self.c2) - try: - self.forceDeleteContainer(self.c3) - except ClientError: - pass - self.client.container = '' - - def test_000(self): - """Perform a full Pithos+ kamaki support test""" - - def test_account_head(self): - """Test account_HEAD""" - r = self.client.account_head() - self.assertEqual(r.status_code, 204) - - r = self.client.account_head(until='1000000000') - self.assertEqual(r.status_code, 204) - - r = self.client.get_account_info(until='1000000000') - datestring = unicode(r['x-account-until-timestamp']) - self.assertEqual(u'Sun, 09 Sep 2001 01:46:40 GMT', datestring) - - r = self.client.get_account_quota() - self.assertTrue('x-account-policy-quota' in r) - - r = self.client.get_account_versioning() - self.assertTrue('x-account-policy-versioning' in r) - - """Check if(un)modified_since""" - for format in self.client.DATE_FORMATS: - now_formated = self.now_unformated.strftime(format) - r1 = self.client.account_head(if_modified_since=now_formated, - success=(204, 304, 412)) - sc1 = r1.status_code - r1.release() - r2 = self.client.account_head(if_unmodified_since=now_formated, - success=(204, 304, 412)) - sc2 = r2.status_code - r2.release() - self.assertNotEqual(sc1, sc2) - - def test_account_get(self): - """Test account_GET""" - #r = self.client.account_get() - #self.assertEqual(r.status_code, 200) - r = self.client.list_containers() - fullLen = len(r) - self.assertTrue(fullLen > 2) - - r = self.client.account_get(limit=1) - self.assertEqual(len(r.json), 1) - - r = self.client.account_get(marker='c2_') - temp_c0 = r.json[0]['name'] - temp_c2 = r.json[2]['name'] - - r = self.client.account_get(limit=2, marker='c2_') - conames = [container['name'] for container in r.json \ - if container['name'].lower().startswith('c2_')] - self.assertTrue(temp_c0 in conames) - self.assertFalse(temp_c2 in conames) - - r = self.client.account_get(show_only_shared=True) - self.assertTrue(self.c1 in [c['name'] for c in r.json]) - - r = self.client.account_get(until=1342609206) - self.assertTrue(len(r.json) <= fullLen) - - """Check if(un)modified_since""" - for format in self.client.DATE_FORMATS: - now_formated = self.now_unformated.strftime(format) - r1 = self.client.account_get(if_modified_since=now_formated, - success=(200, 304, 412)) - sc1 = r1.status_code - r1.release() - r2 = self.client.account_get(if_unmodified_since=now_formated, - success=(200, 304, 412)) - sc2 = r2.status_code - r2.release() - self.assertNotEqual(sc1, sc2) - - """Check sharing_accounts""" - r = self.client.get_sharing_accounts() - self.assertTrue(len(r) > 0) - - def test_account_post(self): - """Test account_POST""" - r = self.client.account_post() - self.assertEqual(r.status_code, 202) - grpName = 'grp' + unicode(self.now) - - """Method set/del_account_meta and set_account_groupcall use - account_post internally - """ - self.client.set_account_group(grpName, ['u1', 'u2']) - r = self.client.get_account_group() - self.assertEqual(r['x-account-group-' + grpName], 'u1,u2') - self.client.del_account_group(grpName) - r = self.client.get_account_group() - self.assertTrue('x-account-group-' + grpName not in r) - - mprefix = 'meta' + unicode(self.now) - self.client.set_account_meta({mprefix + '1': 'v1', - mprefix + '2': 'v2'}) - r = self.client.get_account_meta() - self.assertEqual(r['x-account-meta-' + mprefix + '1'], 'v1') - self.assertEqual(r['x-account-meta-' + mprefix + '2'], 'v2') - - self.client.del_account_meta(mprefix + '1') - r = self.client.get_account_meta() - self.assertTrue('x-account-meta-' + mprefix + '1' not in r) - - self.client.del_account_meta(mprefix + '2') - r = self.client.get_account_meta() - self.assertTrue('x-account-meta-' + mprefix + '2' not in r) - - """Missing testing for quota, versioning, because normally - you don't have permissions to modify those at account level - """ - - newquota = 1000000 - self.client.set_account_quota(newquota) - #r = self.client.get_account_info() - #print(unicode(r)) - #r = self.client.get_account_quota() - #self.assertEqual(r['x-account-policy-quota'], newquota) - self.client.set_account_versioning('auto') - - def test_container_head(self): - """Test container_HEAD""" - self.client.container = self.c1 - - r = self.client.container_head() - self.assertEqual(r.status_code, 204) - - """Check until""" - r = self.client.container_head(until=1000000, success=(204, 404)) - self.assertEqual(r.status_code, 404) - - """Check and if(un)modified_since""" - for format in self.client.DATE_FORMATS: - now_formated = self.now_unformated.strftime(format) - r1 = self.client.container_head(if_modified_since=now_formated, - success=(204, 304, 412)) - sc1 = r1.status_code - r1.release() - r2 = self.client.container_head(if_unmodified_since=now_formated, - success=(204, 304, 412)) - sc2 = r2.status_code - r2.release() - self.assertNotEqual(sc1, sc2) - - """Check container object meta""" - r = self.client.get_container_object_meta() - self.assertEqual(r['x-container-object-meta'], 'Incontainer') - - def test_container_get(self): - """Test container_GET""" - self.client.container = self.c1 - - r = self.client.container_get() - self.assertEqual(r.status_code, 200) - fullLen = len(r.json) - - r = self.client.container_get(prefix='test') - lalobjects = [obj for obj in r.json if obj['name'].startswith('test')] - self.assertTrue(len(r.json) > 1) - self.assertEqual(len(r.json), len(lalobjects)) - - r = self.client.container_get(limit=1) - self.assertEqual(len(r.json), 1) - - r = self.client.container_get(marker='another') - self.assertTrue(len(r.json) > 1) - neobjects = [obj for obj in r.json if obj['name'] > 'another'] - self.assertEqual(len(r.json), len(neobjects)) - - r = self.client.container_get(prefix='another.test', delimiter='.') - self.assertTrue(fullLen > len(r.json)) - - r = self.client.container_get(path='/') - self.assertEqual(fullLen, len(r.json)) - - r = self.client.container_get(format='xml') - self.assertEqual(r.text.split()[4], 'name="' + self.c1 + '">') - - r = self.client.container_get(meta=['incontainer']) - self.assertTrue(len(r.json) > 0) - - r = self.client.container_get(show_only_shared=True) - self.assertTrue(len(r.json) < fullLen) - - try: - r = self.client.container_get(until=1000000000) - datestring = unicode(r.headers['x-account-until-timestamp']) - self.assertEqual(u'Sun, 09 Sep 2001 01:46:40 GMT', datestring) - - except ClientError: - - pass - - """Check and if un/modified_since""" - for format in self.client.DATE_FORMATS: - now_formated = self.now_unformated.strftime(format) - r1 = self.client.container_get(if_modified_since=now_formated, - success=(200, 304, 412)) - sc1 = r1.status_code - r1.release() - r2 = self.client.container_get(if_unmodified_since=now_formated, - success=(200, 304, 412)) - sc2 = r2.status_code - r2.release() - self.assertNotEqual(sc1, sc2) - - def test_container_put(self): - """Test container_PUT""" - self.client.container = self.c2 - - r = self.client.container_put() - self.assertEqual(r.status_code, 202) - - r = self.client.get_container_quota(self.client.container) - cquota = r.values()[0] - newquota = 2 * int(cquota) - - r = self.client.container_put(quota=newquota) - self.assertEqual(r.status_code, 202) - - r = self.client.get_container_quota(self.client.container) - xquota = int(r.values()[0]) - self.assertEqual(newquota, xquota) - - r = self.client.container_put(versioning='auto') - self.assertEqual(r.status_code, 202) - - r = self.client.get_container_versioning(self.client.container) - nvers = r.values()[0] - self.assertEqual('auto', nvers) - - r = self.client.container_put(versioning='none') - self.assertEqual(r.status_code, 202) - - r = self.client.get_container_versioning(self.client.container) - nvers = r.values()[0] - self.assertEqual('none', nvers) - - r = self.client.container_put(metadata={'m1': 'v1', 'm2': 'v2'}) - self.assertEqual(r.status_code, 202) - - r = self.client.get_container_meta(self.client.container) - self.assertTrue('x-container-meta-m1' in r) - self.assertEqual(r['x-container-meta-m1'], 'v1') - self.assertTrue('x-container-meta-m2' in r) - self.assertEqual(r['x-container-meta-m2'], 'v2') - - r = self.client.container_put(metadata={'m1': '', 'm2': 'v2a'}) - self.assertEqual(r.status_code, 202) - - r = self.client.get_container_meta(self.client.container) - self.assertTrue('x-container-meta-m1' not in r) - self.assertTrue('x-container-meta-m2' in r) - self.assertEqual(r['x-container-meta-m2'], 'v2a') - - self.client.del_container_meta(self.client.container) - - def test_container_post(self): - """Test container_POST""" - self.client.container = self.c2 - - """Simple post""" - r = self.client.container_post() - self.assertEqual(r.status_code, 202) - - """post meta""" - self.client.set_container_meta({'m1': 'v1', 'm2': 'v2'}) - r = self.client.get_container_meta(self.client.container) - self.assertTrue('x-container-meta-m1' in r) - self.assertEqual(r['x-container-meta-m1'], 'v1') - self.assertTrue('x-container-meta-m2' in r) - self.assertEqual(r['x-container-meta-m2'], 'v2') - - """post/2del meta""" - r = self.client.del_container_meta('m1') - r = self.client.set_container_meta({'m2': 'v2a'}) - r = self.client.get_container_meta(self.client.container) - self.assertTrue('x-container-meta-m1' not in r) - self.assertTrue('x-container-meta-m2' in r) - self.assertEqual(r['x-container-meta-m2'], 'v2a') - - """check quota""" - r = self.client.get_container_quota(self.client.container) - cquota = r.values()[0] - newquota = 2 * int(cquota) - r = self.client.set_container_quota(newquota) - r = self.client.get_container_quota(self.client.container) - xquota = int(r.values()[0]) - self.assertEqual(newquota, xquota) - r = self.client.set_container_quota(cquota) - r = self.client.get_container_quota(self.client.container) - xquota = r.values()[0] - self.assertEqual(cquota, xquota) - - """Check versioning""" - self.client.set_container_versioning('auto') - r = self.client.get_container_versioning(self.client.container) - nvers = r.values()[0] - self.assertEqual('auto', nvers) - self.client.set_container_versioning('none') - r = self.client.get_container_versioning(self.client.container) - nvers = r.values()[0] - self.assertEqual('none', nvers) - - """put_block uses content_type and content_length to - post blocks of data 2 container. All that in upload_object""" - """Change a file at fs""" - self.create_large_file(1024 * 1024 * 100, 'l100M.' + unicode(self.now)) - """Upload it at a directory in container""" - self.client.create_directory('dir') - newf = open(self.fname, 'r') - self.client.upload_object('/dir/sample.file', newf) - newf.close() - """Check if file has been uploaded""" - r = self.client.get_object_info('/dir/sample.file') - self.assertTrue(int(r['content-length']) > 100000000) - - """WTF is tranfer_encoding? What should I check about th** s**t? """ - #TODO - - """Check update=False""" - r = self.client.object_post('test', - update=False, - metadata={'newmeta': 'newval'}) - - r = self.client.get_object_info('test') - self.assertTrue('x-object-meta-newmeta' in r) - self.assertFalse('x-object-meta-incontainer' in r) - - r = self.client.del_container_meta('m2') - - def test_container_delete(self): - """Test container_DELETE""" - - """Fail to delete a non-empty container""" - self.client.container = self.c2 - r = self.client.container_delete(success=409) - self.assertEqual(r.status_code, 409) - - """Fail to delete c3 (empty) container""" - self.client.container = self.c3 - r = self.client.container_delete(until='1000000000') - self.assertEqual(r.status_code, 204) - - """Delete c3 (empty) container""" - r = self.client.container_delete() - self.assertEqual(r.status_code, 204) - - """Purge container(empty a container), check versionlist""" - self.client.container = self.c1 - r = self.client.object_head('test', success=(200, 404)) - self.assertEqual(r.status_code, 200) - self.client.del_container(delimiter='/') - r = self.client.object_head('test', success=(200, 404)) - self.assertEqual(r.status_code, 404) - r = self.client.get_object_versionlist('test') - self.assertTrue(len(r) > 0) - self.assertTrue(len(r[0]) > 1) - self.client.purge_container() - self.assertRaises(ClientError, - self.client.get_object_versionlist, - 'test') - - def test_object_head(self): - """Test object_HEAD""" - self.client.container = self.c2 - obj = 'test' - - r = self.client.object_head(obj) - self.assertEqual(r.status_code, 200) - etag = r.headers['etag'] - - r = self.client.object_head(obj, version=40) - self.assertEqual(r.headers['x-object-version'], '40') - - r = self.client.object_head(obj, if_etag_match=etag) - self.assertEqual(r.status_code, 200) - - r = self.client.object_head(obj, - if_etag_not_match=etag, - success=(200, 412, 304)) - self.assertNotEqual(r.status_code, 200) - - r = self.client.object_head(obj, - version=40, - if_etag_match=etag, - success=412) - self.assertEqual(r.status_code, 412) - - """Check and if(un)modified_since""" - for format in self.client.DATE_FORMATS: - now_formated = self.now_unformated.strftime(format) - r1 = self.client.object_head(obj, if_modified_since=now_formated, - success=(200, 304, 412)) - sc1 = r1.status_code - r1.release() - r2 = self.client.object_head(obj, if_unmodified_since=now_formated, - success=(200, 304, 412)) - sc2 = r2.status_code - r2.release() - self.assertNotEqual(sc1, sc2) - - def test_object_get(self): - """Test object_GET""" - self.client.container = self.c1 - obj = 'test' - - r = self.client.object_get(obj) - self.assertEqual(r.status_code, 200) - - osize = int(r.headers['content-length']) - etag = r.headers['etag'] - - r = self.client.object_get(obj, hashmap=True) - self.assertTrue('hashes' in r.json\ - and 'block_hash' in r.json\ - and 'block_size' in r.json\ - and 'bytes' in r.json) - - r = self.client.object_get(obj, format='xml', hashmap=True) - self.assertEqual(len(r.text.split('hash>')), 3) - - rangestr = 'bytes=%s-%s' % (osize / 3, osize / 2) - r = self.client.object_get(obj, - data_range=rangestr, - success=(200, 206)) - partsize = int(r.headers['content-length']) - self.assertTrue(0 < partsize and partsize <= 1 + osize / 3) - - rangestr = 'bytes=%s-%s' % (osize / 3, osize / 2) - r = self.client.object_get(obj, - data_range=rangestr, - if_range=True, - success=(200, 206)) - partsize = int(r.headers['content-length']) - self.assertTrue(0 < partsize and partsize <= 1 + osize / 3) - - r = self.client.object_get(obj, if_etag_match=etag) - self.assertEqual(r.status_code, 200) - - r = self.client.object_get(obj, if_etag_not_match=etag + 'LALALA') - self.assertEqual(r.status_code, 200) - - """Check and if(un)modified_since""" - for format in self.client.DATE_FORMATS: - now_formated = self.now_unformated.strftime(format) - r1 = self.client.object_get(obj, if_modified_since=now_formated, - success=(200, 304, 412)) - sc1 = r1.status_code - r1.release() - r2 = self.client.object_get(obj, - if_unmodified_since=now_formated, - success=(200, 304, 412)) - sc2 = r2.status_code - r2.release() - self.assertNotEqual(sc1, sc2) - - """Upload an object to download""" - src_fname = '/tmp/localfile1_%s' % self.now - dnl_fname = '/tmp/localfile2_%s' % self.now - trg_fname = 'remotefile_%s' % self.now - f_size = 59247824 - self.create_large_file(f_size, src_fname) - src_f = open(src_fname, 'rb+') - print('\tUploading...') - self.client.upload_object(trg_fname, src_f) - src_f.close() - print('\tDownloading...') - dnl_f = open(dnl_fname, 'wb+') - self.client.download_object(trg_fname, dnl_f) - dnl_f.close() - - print('\tCheck if files match...') - src_f = open(src_fname) - dnl_f = open(dnl_fname) - for pos in (0, f_size / 2, f_size - 20): - src_f.seek(pos) - dnl_f.seek(pos) - self.assertEqual(src_f.read(10), dnl_f.read(10)) - src_f.close() - dnl_f.close() - - os.remove(src_fname) - os.remove(dnl_fname) - - def test_object_put(self): - """Test object_PUT""" - - self.client.container = self.c2 - obj = 'another.test' - - """create the object""" - r = self.client.object_put(obj, - data='a', - content_type='application/octer-stream', - permissions={ - 'read': ['accX:groupA', 'u1', 'u2'], - 'write': ['u2', 'u3']}, - metadata={'key1': 'val1', 'key2': 'val2'}, - content_encoding='UTF-8', - content_disposition='attachment; filename="fname.ext"') - self.assertEqual(r.status_code, 201) - etag = r.headers['etag'] - - """Check content-disposition""" - r = self.client.get_object_info(obj) - self.assertTrue('content-disposition' in r) - - """Check permissions""" - r = self.client.get_object_sharing(obj) - self.assertTrue('accx:groupa' in r['read']) - self.assertTrue('u1' in r['read']) - self.assertTrue('u2' in r['write']) - self.assertTrue('u3' in r['write']) - - """Check metadata""" - r = self.client.get_object_meta(obj) - self.assertEqual(r['x-object-meta-key1'], 'val1') - self.assertEqual(r['x-object-meta-key2'], 'val2') - - """Check public and if_etag_match""" - r = self.client.object_put(obj, if_etag_match=etag, data='b', - content_type='application/octet-stream', public=True) - - r = self.client.object_get(obj) - self.assertTrue('x-object-public' in r.headers) - vers2 = int(r.headers['x-object-version']) - etag = r.headers['etag'] - self.assertEqual(r.text, 'b') - - """Check if_etag_not_match""" - r = self.client.object_put(obj, if_etag_not_match=etag, data='c', - content_type='application/octet-stream', success=(201, 412)) - self.assertEqual(r.status_code, 412) - - """Check content_type and content_length""" - tmpdir = 'dir' + unicode(self.now) - r = self.client.object_put(tmpdir, - content_type='application/directory', - content_length=0) - - r = self.client.get_object_info(tmpdir) - self.assertEqual(r['content-type'], 'application/directory') - - """Check copy_from, content_encoding""" - r = self.client.object_put('%s/%s' % (tmpdir, obj), - format=None, - copy_from='/%s/%s' % (self.client.container, obj), - content_encoding='application/octet-stream', - source_account=self.client.account, - content_length=0, success=201) - self.assertEqual(r.status_code, 201) - - """Test copy_object for cross-conctainer copy""" - self.client.copy_object(src_container=self.c2, - src_object='%s/%s' % (tmpdir, obj), - dst_container=self.c1, - dst_object=obj) - self.client.container = self.c1 - r1 = self.client.get_object_info(obj) - self.client.container = self.c2 - r2 = self.client.get_object_info('%s/%s' % (tmpdir, obj)) - self.assertEqual(r1['x-object-hash'], r2['x-object-hash']) - - """Check cross-container copy_from, content_encoding""" - self.client.container = self.c1 - fromstr = '/%s/%s/%s' % (self.c2, tmpdir, obj) - r = self.client.object_put(obj, - format=None, - copy_from=fromstr, - content_encoding='application/octet-stream', - source_account=self.client.account, - content_length=0, success=201) - - self.assertEqual(r.status_code, 201) - r = self.client.get_object_info(obj) - self.assertEqual(r['etag'], etag) - - """Check source_account""" - self.client.container = self.c2 - fromstr = '/%s/%s' % (self.c1, obj) - r = self.client.object_put(obj + 'v2', - format=None, - move_from=fromstr, - content_encoding='application/octet-stream', - source_account='nonExistendAddress@NeverLand.com', - content_length=0, - success=(201, 403)) - self.assertEqual(r.status_code, 403) - - """Check cross-container move_from""" - self.client.container = self.c1 - r1 = self.client.get_object_info(obj) - self.client.container = self.c2 - self.client.move_object(src_container=self.c1, - src_object=obj, - dst_container=self.c2, - dst_object=obj + 'v0') - r0 = self.client.get_object_info(obj + 'v0') - self.assertEqual(r1['x-object-hash'], r0['x-object-hash']) - - """Check move_from""" - r = self.client.object_put(obj + 'v1', - format=None, - move_from='/%s/%s' % (self.c2, obj), - source_version=vers2, - content_encoding='application/octet-stream', - content_length=0, success=201) - - """Check manifest""" - mobj = 'manifest.test' - txt = '' - for i in range(10): - txt += '%s' % i - r = self.client.object_put('%s/%s' % (mobj, i), - data='%s' % i, - content_length=1, - success=201, - content_type='application/octet-stream', - content_encoding='application/octet-stream') - - r = self.client.object_put(mobj, - content_length=0, - content_type='application/octet-stream', - manifest='%s/%s' % (self.client.container, mobj)) - - r = self.client.object_get(mobj) - self.assertEqual(r.text, txt) - - """Upload a local file with one request""" - self.create_large_file(1024 * 10, 'l10K.' + unicode(self.now)) - newf = open(self.fname, 'r') - self.client.upload_object('sample.file', newf) - newf.close() - """Check if file has been uploaded""" - r = self.client.get_object_info('sample.file') - self.assertEqual(int(r['content-length']), 10260) - - """Some problems with transfer-encoding?""" - - def test_object_copy(self): - """Test object_COPY""" - self.client.container = self.c2 - obj = 'test2' - - data = '{"key1":"val1", "key2":"val2"}' - r = self.client.object_put(obj + 'orig', - content_type='application/octet-stream', - data=data, - metadata={'mkey1': 'mval1', 'mkey2': 'mval2'}, - permissions={ - 'read': ['accX:groupA', 'u1', 'u2'], - 'write': ['u2', 'u3']}, - content_disposition='attachment; filename="fname.ext"') - - r = self.client.object_copy(obj + 'orig', - destination='/%s/%s' % (self.client.container, obj), - ignore_content_type=False, content_type='application/json', - metadata={'mkey2': 'mval2a', 'mkey3': 'mval3'}, - permissions={'write': ['u5', 'accX:groupB']}) - self.assertEqual(r.status_code, 201) - - """Check content-disposition""" - r = self.client.get_object_info(obj) - self.assertTrue('content-disposition' in r) - - """Check Metadata""" - r = self.client.get_object_meta(obj) - self.assertEqual(r['x-object-meta-mkey1'], 'mval1') - self.assertEqual(r['x-object-meta-mkey2'], 'mval2a') - self.assertEqual(r['x-object-meta-mkey3'], 'mval3') - - """Check permissions""" - r = self.client.get_object_sharing(obj) - self.assertFalse('read' in r or 'u2' in r['write']) - self.assertTrue('accx:groupb' in r['write']) - - """Check destination account""" - r = self.client.object_copy(obj, - destination='/%s/%s' % (self.c1, obj), - content_encoding='utf8', - content_type='application/json', - destination_account='nonExistendAddress@NeverLand.com', - success=(201, 403)) - self.assertEqual(r.status_code, 403) - - """Check destination being another container - and also content_type and content encoding""" - r = self.client.object_copy(obj, - destination='/%s/%s' % (self.c1, obj), - content_encoding='utf8', - content_type='application/json') - self.assertEqual(r.status_code, 201) - self.assertEqual(r.headers['content-type'], - 'application/json; charset=UTF-8') - - """Check ignore_content_type and content_type""" - r = self.client.object_get(obj) - etag = r.headers['etag'] - ctype = r.headers['content-type'] - self.assertEqual(ctype, 'application/json') - - r = self.client.object_copy(obj + 'orig', - destination='/%s/%s0' % (self.client.container, obj), - ignore_content_type=True, - content_type='application/json') - self.assertEqual(r.status_code, 201) - self.assertNotEqual(r.headers['content-type'], 'application/json') - - """Check if_etag_(not_)match""" - r = self.client.object_copy(obj, - destination='/%s/%s1' % (self.client.container, obj), - if_etag_match=etag) - self.assertEqual(r.status_code, 201) - - r = self.client.object_copy(obj, - destination='/%s/%s2' % (self.client.container, obj), - if_etag_not_match='lalala') - self.assertEqual(r.status_code, 201) - vers2 = r.headers['x-object-version'] - - """Check source_version, public and format """ - r = self.client.object_copy(obj + '2', - destination='/%s/%s3' % (self.client.container, obj), - source_version=vers2, - format='xml', - public=True) - self.assertEqual(r.status_code, 201) - self.assertTrue(r.headers['content-type'].index('xml') > 0) - - r = self.client.get_object_info(obj + '3') - self.assertTrue('x-object-public' in r) - - def test_object_move(self): - """Test object_MOVE""" - self.client.container = self.c2 - obj = 'test2' - - data = '{"key1": "val1", "key2": "val2"}' - r = self.client.object_put(obj + 'orig', - content_type='application/octet-stream', - data=data, - metadata={'mkey1': 'mval1', 'mkey2': 'mval2'}, - permissions={'read': ['accX:groupA', 'u1', 'u2'], - 'write': ['u2', 'u3']}) - - r = self.client.object_move(obj + 'orig', - destination='/%s/%s' % (self.client.container, obj), - ignore_content_type=False, - content_type='application/json', - metadata={'mkey2': 'mval2a', 'mkey3': 'mval3'}, - permissions={'write': ['u5', 'accX:groupB']}) - self.assertEqual(r.status_code, 201) - - """Check Metadata""" - r = self.client.get_object_meta(obj) - self.assertEqual(r['x-object-meta-mkey1'], 'mval1') - self.assertEqual(r['x-object-meta-mkey2'], 'mval2a') - self.assertEqual(r['x-object-meta-mkey3'], 'mval3') - - """Check permissions""" - r = self.client.get_object_sharing(obj) - self.assertFalse('read' in r) - self.assertTrue('u5' in r['write']) - self.assertTrue('accx:groupb' in r['write']) - - """Check destination account""" - r = self.client.object_move(obj, - destination='/%s/%s' % (self.c1, obj), - content_encoding='utf8', - content_type='application/json', - destination_account='nonExistendAddress@NeverLand.com', - success=(201, 403)) - self.assertEqual(r.status_code, 403) - - """Check destination being another container and also - content_type, content_disposition and content encoding""" - r = self.client.object_move(obj, - destination='/%s/%s' % (self.c1, obj), - content_encoding='utf8', - content_type='application/json', - content_disposition='attachment; filename="fname.ext"') - self.assertEqual(r.status_code, 201) - self.assertEqual(r.headers['content-type'], - 'application/json; charset=UTF-8') - self.client.container = self.c1 - r = self.client.get_object_info(obj) - self.assertTrue('content-disposition' in r\ - and 'fname.ext' in r['content-disposition']) - etag = r['etag'] - ctype = r['content-type'] - self.assertEqual(ctype, 'application/json') - - """Check ignore_content_type and content_type""" - r = self.client.object_move(obj, - destination='/%s/%s' % (self.c2, obj), - ignore_content_type=True, - content_type='application/json') - self.assertEqual(r.status_code, 201) - self.assertNotEqual(r.headers['content-type'], 'application/json') - - """Check if_etag_(not_)match""" - self.client.container = self.c2 - r = self.client.object_move(obj, - destination='/%s/%s0' % (self.client.container, obj), - if_etag_match=etag) - self.assertEqual(r.status_code, 201) - - r = self.client.object_move(obj + '0', - destination='/%s/%s1' % (self.client.container, obj), - if_etag_not_match='lalala') - self.assertEqual(r.status_code, 201) - - """Check public and format """ - r = self.client.object_move(obj + '1', - destination='/%s/%s2' % (self.client.container, obj), - format='xml', public=True) - self.assertEqual(r.status_code, 201) - self.assertTrue(r.headers['content-type'].index('xml') > 0) - - r = self.client.get_object_info(obj + '2') - self.assertTrue('x-object-public' in r) - - def test_object_post(self): - """Test object_POST""" - self.client.container = self.c2 - obj = 'test2' - """create a filesystem file""" - self.fname = obj - newf = open(self.fname, 'w') - newf.writelines(['ello!\n', - 'This is a test line\n', - 'inside a test file\n']) - newf.close() - """create a file on container""" - r = self.client.object_put(obj, - content_type='application/octet-stream', - data='H', - metadata={'mkey1': 'mval1', 'mkey2': 'mval2'}, - permissions={'read': ['accX:groupA', 'u1', 'u2'], - 'write': ['u2', 'u3']}) - - """Append tests update, content_range, content_type, content_length""" - newf = open(obj, 'r') - self.client.append_object(obj, newf) - r = self.client.object_get(obj) - self.assertTrue(r.text.startswith('Hello!')) - - """Overwrite tests update, - content_type, content_length, content_range - """ - newf.seek(0) - r = self.client.overwrite_object(obj, 0, 10, newf) - r = self.client.object_get(obj) - self.assertTrue(r.text.startswith('ello!')) - newf.close() - - """Truncate tests update, - content_range, content_type, object_bytes and source_object""" - r = self.client.truncate_object(obj, 5) - r = self.client.object_get(obj) - self.assertEqual(r.text, 'ello!') - - """Check metadata""" - self.client.set_object_meta(obj, {'mkey2': 'mval2a', 'mkey3': 'mval3'}) - r = self.client.get_object_meta(obj) - self.assertEqual(r['x-object-meta-mkey1'], 'mval1') - self.assertEqual(r['x-object-meta-mkey2'], 'mval2a') - self.assertEqual(r['x-object-meta-mkey3'], 'mval3') - self.client.del_object_meta(obj, 'mkey1') - r = self.client.get_object_meta(obj) - self.assertFalse('x-object-meta-mkey1' in r) - - """Check permissions""" - self.client.set_object_sharing(obj, - read_permition=['u4', 'u5'], write_permition=['u4']) - r = self.client.get_object_sharing(obj) - self.assertTrue('read' in r) - self.assertTrue('u5' in r['read']) - self.assertTrue('write' in r) - self.assertTrue('u4' in r['write']) - self.client.del_object_sharing(obj) - r = self.client.get_object_sharing(obj) - self.assertTrue(len(r) == 0) - - """Check publish""" - self.client.publish_object(obj) - r = self.client.get_object_info(obj) - self.assertTrue('x-object-public' in r) - self.client.unpublish_object(obj) - r = self.client.get_object_info(obj) - self.assertFalse('x-object-public' in r) - - """Check if_etag_(not)match""" - etag = r['etag'] - """ - r = self.client.object_post(obj, - update=True, - public=True, - if_etag_not_match=etag, - success=(412, 202, 204)) - self.assertEqual(r.status_code, 412) - """ - - r = self.client.object_post(obj, update=True, public=True, - if_etag_match=etag, content_encoding='application/json') - - r = self.client.get_object_info(obj) - helloVersion = r['x-object-version'] - self.assertTrue('x-object-public' in r) - self.assertEqual(r['content-encoding'], 'application/json') - - """Check source_version and source_account and content_disposition""" - r = self.client.object_post(obj, - update=True, - content_type='application/octet-srteam', - content_length=5, - content_range='bytes 1-5/*', - source_object='/%s/%s' % (self.c2, obj), - source_account='thisAccountWillNeverExist@adminland.com', - source_version=helloVersion, - data='12345', - success=(403, 202, 204)) - self.assertEqual(r.status_code, 403) - - r = self.client.object_post(obj, - update=True, - content_type='application/octet-srteam', - content_length=5, - content_range='bytes 1-5/*', - source_object='/%s/%s' % (self.c2, obj), - source_account=self.client.account, - source_version=helloVersion, - data='12345', - content_disposition='attachment; filename="fname.ext"') - - r = self.client.object_get(obj) - self.assertEqual(r.text, 'eello!') - self.assertTrue('content-disposition' in r.headers\ - and 'fname.ext' in r.headers['content-disposition']) - - """Check manifest""" - mobj = 'manifest.test' - txt = '' - for i in range(10): - txt += '%s' % i - r = self.client.object_put('%s/%s' % (mobj, i), - data='%s' % i, - content_length=1, - success=201, - content_encoding='application/octet-stream', - content_type='application/octet-stream') - - self.client.create_object_by_manifestation(mobj, - content_type='application/octet-stream') - - r = self.client.object_post(mobj, - manifest='%s/%s' % (self.client.container, mobj)) - - r = self.client.object_get(mobj) - self.assertEqual(r.text, txt) - - """We need to check transfer_encoding """ - - def test_object_delete(self): - """Test object_DELETE""" - self.client.container = self.c2 - obj = 'test2' - """create a file on container""" - r = self.client.object_put(obj, - content_type='application/octet-stream', - data='H', - metadata={'mkey1': 'mval1', 'mkey2': 'mval2'}, - permissions={'read': ['accX:groupA', 'u1', 'u2'], - 'write': ['u2', 'u3']}) - - """Check with false until""" - r = self.client.object_delete(obj, until=1000000) - - r = self.client.object_get(obj, success=(200, 404)) - self.assertEqual(r.status_code, 200) - - """Check normal case""" - r = self.client.object_delete(obj) - self.assertEqual(r.status_code, 204) - - r = self.client.object_get(obj, success=(200, 404)) - self.assertEqual(r.status_code, 404) - - def create_large_file(self, size, name): - """Create a large file at fs""" - self.fname = name - import random - random.seed(self.now) - rf = open('/dev/urandom', 'r') - f = open(self.fname, 'w') - sys.stdout.write( - ' create random file %s of size %s ' % (name, size)) - for hobyte_id in xrange(size / 8): - #sss = 'hobt%s' % random.randint(1000, 9999) - f.write(rf.read(8)) - if 0 == (hobyte_id * 800) % size: - f.write('\n') - f.flush() - prs = (hobyte_id * 800) // size - sys.stdout.write('\b\b') - if prs > 10: - sys.stdout.write('\b') - sys.stdout.write('%s%%' % prs) - sys.stdout.flush() - print('\b\b\b100%') - f.flush() - rf.close() - f.close() - """""" - - -def init_parser(): - parser = ArgumentParser(add_help=False) - parser.add_argument('-h', '--help', - dest='help', - action='store_true', - default=False, - help="Show this help message and exit") - return parser - - -def main(argv): - - suiteFew = unittest.TestSuite() - if len(argv) == 0 or argv[0] == 'pithos': - if len(argv) == 1: - suiteFew.addTest(unittest.makeSuite(testPithos)) - else: - suiteFew.addTest(testPithos('test_' + argv[1])) - if len(argv) == 0 or argv[0] == 'cyclades': - if len(argv) == 1: - #suiteFew.addTest(unittest.makeSuite(testCyclades)) - suiteFew.addTest(testCyclades('test_000')) - else: - suiteFew.addTest(testCyclades('test_' + argv[1])) - if len(argv) == 0 or argv[0] == 'image': - if len(argv) == 1: - suiteFew.addTest(unittest.makeSuite(testImage)) - else: - suiteFew.addTest(testImage('test_' + argv[1])) - if len(argv) == 0 or argv[0] == 'astakos': - if len(argv) == 1: - suiteFew.addTest(unittest.makeSuite(testAstakos)) - else: - suiteFew.addTest(testAstakos('test_' + argv[1])) - - unittest.TextTestRunner(verbosity=2).run(suiteFew) - -if __name__ == '__main__': - parser = init_parser() - args, argv = parser.parse_known_args() - if len(argv) > 2 or getattr(args, 'help') or len(argv) < 1: - raise Exception('\tusage: tests.py [command]') - main(argv) -- 1.7.10.4