from unittest import TestCase, TestSuite, makeSuite, TextTestRunner
from argparse import ArgumentParser
from sys import stdout
-from traceback import extract_stack, print_stack
from progress.bar import ShadyBar
from kamaki.cli.config import Config
+from kamaki.cli.utils import spiner
def _add_value(foo, value):
class Generic(TestCase):
+ _waits = []
_cnf = None
_grp = None
_fetched = {}
super(Generic, self).__init__(specific)
self._cnf = Config(config_file)
self._grp = group
+ self._waits.append(0.71828)
+ for i in range(10):
+ self._waits.append(self._waits[-1] * 2.71828)
def __getitem__(self, key):
key = self._key(key)
def _safe_progress_bar(self, msg):
"""Try to get a progress bar, but do not raise errors"""
try:
- progress_bar = ShadyBar()
- gen = progress_bar.get_generator(msg)
+ wait_bar = ShadyBar(msg)
+
+ def wait_gen(n):
+ for i in wait_bar.iter(range(int(n))):
+ yield
+ yield
+ wait_cb = wait_gen
except Exception:
- return (None, None)
- return (progress_bar, gen)
+ stdout.write('%s:' % msg)
+ (wait_bar, wait_cb) = (None, spiner)
+ return (wait_bar, wait_cb)
def _safe_progress_bar_finish(self, progress_bar):
try:
progress_bar.finish()
except Exception:
- pass
-
- def do_with_progress_bar(self, action, msg, list, *args, **kwargs):
- (action_bar, action_cb) = self._safe_progress_bar('')
+ print(' DONE')
+
+ def do_with_progress_bar(self, action, msg, items):
+ if not items:
+ print('%s: DONE' % msg)
+ return
+ (action_bar, action_cb) = self._safe_progress_bar(msg)
+ action_gen = action_cb(len(items))
+ for item in items:
+ action(item)
+ action_gen.next()
+ self._safe_progress_bar_finish(action_bar)
def assert_dicts_are_deeply_equal(self, d1, d2):
for k, v in d1.items():
suiteFew.addTest(unittest.makeSuite(testPithos))
else:
suiteFew.addTest(testPithos('test_' + argv[1]))
- if len(argv) == 0 or argv[0] == 'cyclades':
- if len(argv) == 1:
- #suiteFew.addTest(unittest.makeSuite(testCyclades))
- suiteFew.addTest(testCyclades('test_000'))
- else:
- suiteFew.addTest(testCyclades('test_' + argv[1]))
"""
+ if len(argv) == 0 or argv[0] == 'cyclades':
+ from kamaki.clients.tests.cyclades import Cyclades
+ test_method = 'test_%s' % (argv[1] if len(argv) > 1 else '000')
+ suiteFew.addTest(Cyclades(test_method))
if len(argv) == 0 or argv[0] == 'image':
from kamaki.clients.tests.image import Image
test_method = 'test_%s' % (argv[1] if len(argv) > 1 else '000')
# or implied, of GRNET S.A.
import time
+from progress.bar import ShadyBar
-from kamaki.clients import tests
+from kamaki.clients import tests, ClientError
from kamaki.clients.cyclades import CycladesClient
def tearDown(self):
"""Destoy servers used in testing"""
- print
- for netid in self.networks.keys():
- self._delete_network(netid)
- if 0 >= len(self.servers):
- return
- print('-> Found %s servers to delete' % len(self.servers))
- for server in self.servers.values():
- self._delete_server(server['id'])
+ self.do_with_progress_bar(
+ self._delete_network,
+ 'Delete %s networks' % len(self.networks),
+ self.networks.keys())
+ server_list = [server['id'] for server in self.servers.values()]
+ self.do_with_progress_bar(
+ self._delete_server,
+ 'Delete %s servers %s' % (len(server_list), server_list),
+ server_list)
def _create_server(self, servername, flavorid, imageid, personality=None):
server = self.client.create_server(servername,
return
except:
return
- self.client.delete_server(servid)
self._wait_for_status(servid, current_state)
+ self.client.delete_server(servid)
def _create_network(self, netname, **kwargs):
net = self.client.create_network(netname, **kwargs)
return net
def _delete_network(self, netid):
- sys.stdout.write('\tDelete network %s ' % netid)
+ print('Disconnect nics of network %s' % netid)
self.client.disconnect_network_nics(netid)
- wait = 3
- while True:
+
+ def netwait(self, wait):
try:
self.client.delete_network(netid)
- print('\n\tSUCCESFULL COMMIT delete network %s' % netid)
- break
- except ClientError as err:
- self.assertEqual(err.status, 421)
+ except ClientError:
time.sleep(wait)
- wait += 3
- sys.stdout.write('.')
+ self.do_with_progress_bar(
+ netwait,
+ 'Delete network %s' % netid,
+ self._waits[:5])
+
+ def _wait_for_network(self, netid, status):
+
+ def netwait(self, wait):
+ r = self.client.get_network_details(netid)
+ if r['status'] == status:
+ return
+ time.sleep(wait)
+ self.do_with_progress_bar(
+ netwait,
+ 'Wait network %s to reach status %s' % (netid, status),
+ self._waits[:5])
+
+ def _wait_for_nic(self, netid, servid, in_creation=True):
+ self._wait_for_network(netid, 'ACTIVE')
+
+ def nicwait(self, wait):
+ nics = self.client.list_server_nics(servid)
+ for net in nics:
+ found_nic = net['network_id'] == netid
+ if (in_creation and found_nic)\
+ or not (in_creation or found_nic):
+ return
+ time.sleep(wait)
+ self.do_with_progress_bar(
+ nicwait,
+ 'Wait nic-%s-%s to %sconnect' % (
+ netid,
+ servid,
+ '' if in_creation else 'dis'),
+ self._waits[:5])
+
+ def _has_status(self, servid, status):
+ r = self.client.get_server_details(servid)
+ return r['status'] == status
+
+ def _wait_for_status(self, servid, status):
+ (wait_bar, wait_cb) = self._safe_progress_bar(
+ 'Server %s in %s' % (servid, status))
+ self.client.wait_server(servid, status, wait_cb=wait_cb)
+ self._safe_progress_bar_finish(wait_bar)
+
+ def test_list_servers(self):
+ """Test list servers"""
+ self.server1 = self._create_server(
+ self.servname1,
+ self.flavorid,
+ self.img)
+ return
+ self.server2 = self._create_server(
+ self.servname2,
+ self.flavorid + 2,
+ self.img)
+ self._test_list_servers()
+
+ def _test_list_servers(self):
+ servers = self.client.list_servers()
+ dservers = self.client.list_servers(detail=True)
+
+ """detailed and simple are same size"""
+ self.assertEqual(len(dservers), len(servers))
+ for i in range(len(servers)):
+ for field in ('created',
+ 'flavorRef',
+ 'hostId',
+ 'imageRef',
+ 'progress',
+ 'status',
+ 'updated'):
+ self.assertFalse(field in servers[i])
+ self.assertTrue(field in dservers[i])
+
+ """detailed and simple contain same names"""
+ names = sorted(map(lambda x: x["name"], servers))
+ dnames = sorted(map(lambda x: x["name"], dservers))
+ self.assertEqual(names, dnames)