Statistics
| Branch: | Tag: | Revision:

root / snf-tools / synnefo_tools / burnin / common.py @ c2f037ff

History | View | Annotate | Download (23.6 kB)

1
# Copyright 2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
"""
35
Common utils for burnin tests
36

37
"""
38

    
39
import re
40
import shutil
41
import unittest
42
import datetime
43
import tempfile
44
import traceback
45

    
46
from kamaki.clients.cyclades import CycladesClient, CycladesNetworkClient
47
from kamaki.clients.astakos import AstakosClient, parse_endpoints
48
from kamaki.clients.compute import ComputeClient
49
from kamaki.clients.pithos import PithosClient
50
from kamaki.clients.image import ImageClient
51

    
52
from synnefo_tools.burnin.logger import Log
53

    
54

    
55
# --------------------------------------------------------------------
56
# Global variables
57
logger = None   # pylint: disable=invalid-name
58
success = None  # pylint: disable=invalid-name
59
SNF_TEST_PREFIX = "snf-test-"
60
CONNECTION_RETRY_LIMIT = 2
61
SYSTEM_USERS = ["images@okeanos.grnet.gr", "images@demo.synnefo.org"]
62
KB = 2**10
63
MB = 2**20
64
GB = 2**30
65

    
66
QADD = 1
67
QREMOVE = -1
68

    
69
QDISK = "cyclades.disk"
70
QVM = "cyclades.vm"
71
QPITHOS = "pithos.diskspace"
72
QRAM = "cyclades.ram"
73
QIP = "cyclades.floating_ip"
74
QCPU = "cyclades.cpu"
75
QNET = "cyclades.network.private"
76

    
77

    
78
# --------------------------------------------------------------------
79
# BurninTestResult class
80
class BurninTestResult(unittest.TestResult):
81
    """Modify the TextTestResult class"""
82
    def __init__(self):
83
        super(BurninTestResult, self).__init__()
84

    
85
        # Test parameters
86
        self.failfast = True
87

    
88
    def startTest(self, test):  # noqa
89
        """Called when the test case test is about to be run"""
90
        super(BurninTestResult, self).startTest(test)
91
        logger.log(test.__class__.__name__, test.shortDescription())
92

    
93
    # pylint: disable=no-self-use
94
    def _test_failed(self, test, err):
95
        """Test failed"""
96
        # Get class name
97
        if test.__class__.__name__ == "_ErrorHolder":
98
            class_name = test.id().split('.')[-1].rstrip(')')
99
        else:
100
            class_name = test.__class__.__name__
101
        err_msg = str(test) + "... failed (%s)."
102
        timestamp = datetime.datetime.strftime(
103
            datetime.datetime.now(), "%a %b %d %Y %H:%M:%S")
104
        logger.error(class_name, err_msg, timestamp)
105
        (err_type, err_value, err_trace) = err
106
        trcback = traceback.format_exception(err_type, err_value, err_trace)
107
        logger.info(class_name, trcback)
108

    
109
    def addError(self, test, err):  # noqa
110
        """Called when the test case test raises an unexpected exception"""
111
        super(BurninTestResult, self).addError(test, err)
112
        self._test_failed(test, err)
113

    
114
    def addFailure(self, test, err):  # noqa
115
        """Called when the test case test signals a failure"""
116
        super(BurninTestResult, self).addFailure(test, err)
117
        self._test_failed(test, err)
118

    
119
    # pylint: disable=fixme
120
    def addSkip(self, test, reason):  # noqa
121
        """Called when the test case test is skipped
122

123
        If reason starts with "__SkipClass__: " then
124
        we should stop the execution of all the TestSuite.
125

126
        TODO: There should be a better way to do this
127

128
        """
129
        super(BurninTestResult, self).addSkip(test, reason)
130
        if reason.startswith("__SkipClass__: "):
131
            self.stop()
132

    
133

    
134
# --------------------------------------------------------------------
135
# Helper Classes
136
# pylint: disable=too-few-public-methods
137
# pylint: disable=too-many-instance-attributes
138
class Clients(object):
139
    """Our kamaki clients"""
140
    auth_url = None
141
    token = None
142
    # Astakos
143
    astakos = None
144
    retry = CONNECTION_RETRY_LIMIT
145
    # Compute
146
    compute = None
147
    compute_url = None
148
    # Cyclades
149
    cyclades = None
150
    # Network
151
    network = None
152
    network_url = None
153
    # Pithos
154
    pithos = None
155
    pithos_url = None
156
    # Image
157
    image = None
158
    image_url = None
159

    
160
    def initialize_clients(self):
161
        """Initialize all the Kamaki Clients"""
162
        self.astakos = AstakosClient(self.auth_url, self.token)
163
        self.astakos.CONNECTION_RETRY_LIMIT = self.retry
164

    
165
        endpoints = self.astakos.authenticate()
166

    
167
        self.compute_url = _get_endpoint_url(endpoints, "compute")
168
        self.compute = ComputeClient(self.compute_url, self.token)
169
        self.compute.CONNECTION_RETRY_LIMIT = self.retry
170

    
171
        self.cyclades = CycladesClient(self.compute_url, self.token)
172
        self.cyclades.CONNECTION_RETRY_LIMIT = self.retry
173

    
174
        self.network_url = _get_endpoint_url(endpoints, "network")
175
        self.network = CycladesNetworkClient(self.network_url, self.token)
176
        self.network.CONNECTION_RETRY_LIMIT = self.retry
177

    
178
        self.pithos_url = _get_endpoint_url(endpoints, "object-store")
179
        self.pithos = PithosClient(self.pithos_url, self.token)
180
        self.pithos.CONNECTION_RETRY_LIMIT = self.retry
181

    
182
        self.image_url = _get_endpoint_url(endpoints, "image")
183
        self.image = ImageClient(self.image_url, self.token)
184
        self.image.CONNECTION_RETRY_LIMIT = self.retry
185

    
186

    
187
def _get_endpoint_url(endpoints, endpoint_type):
188
    """Get the publicURL for the specified endpoint"""
189

    
190
    service_catalog = parse_endpoints(endpoints, ep_type=endpoint_type)
191
    return service_catalog[0]['endpoints'][0]['publicURL']
192

    
193

    
194
class Proper(object):
195
    """A descriptor used by tests implementing the TestCase class
196

197
    Since each instance of the TestCase will only be used to run a single
198
    test method (a new fixture is created for each test) the attributes can
199
    not be saved in the class instances. Instead we use descriptors.
200

201
    """
202
    def __init__(self, value=None):
203
        self.val = value
204

    
205
    def __get__(self, obj, objtype=None):
206
        return self.val
207

    
208
    def __set__(self, obj, value):
209
        self.val = value
210

    
211

    
212
# --------------------------------------------------------------------
213
# BurninTests class
214
# pylint: disable=too-many-public-methods
215
class BurninTests(unittest.TestCase):
216
    """Common class that all burnin tests should implement"""
217
    clients = Clients()
218
    run_id = None
219
    use_ipv6 = None
220
    action_timeout = None
221
    action_warning = None
222
    query_interval = None
223
    system_user = None
224
    images = None
225
    flavors = None
226
    delete_stale = False
227
    temp_directory = None
228
    failfast = None
229

    
230
    quotas = Proper(value=None)
231
    uuid = Proper(value=None)
232

    
233
    @classmethod
234
    def setUpClass(cls):  # noqa
235
        """Initialize BurninTests"""
236
        cls.suite_name = cls.__name__
237
        logger.testsuite_start(cls.suite_name)
238

    
239
        # Set test parameters
240
        cls.longMessage = True
241

    
242
    def test_000_clients_setup(self):
243
        """Initializing astakos/cyclades/pithos clients"""
244
        # Update class attributes
245
        self.clients.initialize_clients()
246
        self.info("Astakos auth url is %s", self.clients.auth_url)
247
        self.info("Cyclades url is %s", self.clients.compute_url)
248
        self.info("Network url is %s", self.clients.network_url)
249
        self.info("Pithos url is %s", self.clients.pithos_url)
250
        self.info("Image url is %s", self.clients.image_url)
251

    
252
        user_uuid = self._get_uuid()
253
        self.quotas = self._get_quotas()
254
        for puuid, quotas in self.quotas.items():
255
            project_name = self._get_project_name(puuid, user_uuid)
256
            self.info("  Project %s:", project_name)
257
            self.info("    Disk usage is         %s bytes",
258
                      quotas['cyclades.disk']['usage'])
259
            self.info("    VM usage is           %s",
260
                      quotas['cyclades.vm']['usage'])
261
            self.info("    DiskSpace usage is    %s bytes",
262
                      quotas['pithos.diskspace']['usage'])
263
            self.info("    Ram usage is          %s bytes",
264
                      quotas['cyclades.ram']['usage'])
265
            self.info("    Floating IPs usage is %s",
266
                      quotas['cyclades.floating_ip']['usage'])
267
            self.info("    CPU usage is          %s",
268
                      quotas['cyclades.cpu']['usage'])
269
            self.info("    Network usage is      %s",
270
                      quotas['cyclades.network.private']['usage'])
271

    
272
    def _run_tests(self, tcases):
273
        """Run some generated testcases"""
274
        global success  # pylint: disable=invalid-name, global-statement
275

    
276
        for tcase in tcases:
277
            self.info("Running testsuite %s", tcase.__name__)
278
            success = run_test(tcase) and success
279
            if self.failfast and not success:
280
                break
281

    
282
    # ----------------------------------
283
    # Loggers helper functions
284
    def log(self, msg, *args):
285
        """Pass the section value to logger"""
286
        logger.log(self.suite_name, msg, *args)
287

    
288
    def info(self, msg, *args):
289
        """Pass the section value to logger"""
290
        logger.info(self.suite_name, msg, *args)
291

    
292
    def debug(self, msg, *args):
293
        """Pass the section value to logger"""
294
        logger.debug(self.suite_name, msg, *args)
295

    
296
    def warning(self, msg, *args):
297
        """Pass the section value to logger"""
298
        logger.warning(self.suite_name, msg, *args)
299

    
300
    def error(self, msg, *args):
301
        """Pass the section value to logger"""
302
        logger.error(self.suite_name, msg, *args)
303
        self.fail(msg % args)
304

    
305
    # ----------------------------------
306
    # Helper functions that every testsuite may need
307
    def _get_uuid(self):
308
        """Get our uuid"""
309
        if self.uuid is None:
310
            authenticate = self.clients.astakos.authenticate()
311
            self.uuid = authenticate['access']['user']['id']
312
            self.info("User's uuid is %s", self.uuid)
313
        return self.uuid
314

    
315
    def _get_username(self):
316
        """Get our User Name"""
317
        authenticate = self.clients.astakos.authenticate()
318
        username = authenticate['access']['user']['name']
319
        self.info("User's name is %s", username)
320
        return username
321

    
322
    def _create_tmp_directory(self):
323
        """Create a tmp directory"""
324
        temp_dir = tempfile.mkdtemp(dir=self.temp_directory)
325
        self.info("Temp directory %s created", temp_dir)
326
        return temp_dir
327

    
328
    def _remove_tmp_directory(self, tmp_dir):
329
        """Remove a tmp directory"""
330
        try:
331
            shutil.rmtree(tmp_dir)
332
            self.info("Temp directory %s deleted", tmp_dir)
333
        except OSError:
334
            pass
335

    
336
    def _get_uuid_of_system_user(self):
337
        """Get the uuid of the system user
338

339
        This is the user that upload the 'official' images.
340

341
        """
342
        self.info("Getting the uuid of the system user")
343
        system_users = None
344
        if self.system_user is not None:
345
            try:
346
                su_type, su_value = parse_typed_option(self.system_user)
347
                if su_type == "name":
348
                    system_users = [su_value]
349
                elif su_type == "id":
350
                    self.info("System user's uuid is %s", su_value)
351
                    return su_value
352
                else:
353
                    self.error("Unrecognized system-user type %s", su_type)
354
            except ValueError:
355
                msg = "Invalid system-user format: %s. Must be [id|name]:.+"
356
                self.warning(msg, self.system_user)
357

    
358
        if system_users is None:
359
            system_users = SYSTEM_USERS
360

    
361
        uuids = self.clients.astakos.get_uuids(system_users)
362
        for su_name in system_users:
363
            self.info("Trying username %s", su_name)
364
            if su_name in uuids:
365
                self.info("System user's uuid is %s", uuids[su_name])
366
                return uuids[su_name]
367

    
368
        self.warning("No system user found")
369
        return None
370

    
371
    def _skip_if(self, condition, msg):
372
        """Skip tests"""
373
        if condition:
374
            self.info("Test skipped: %s" % msg)
375
            self.skipTest(msg)
376

    
377
    def _skip_suite_if(self, condition, msg):
378
        """Skip the whole testsuite"""
379
        if condition:
380
            self.info("TestSuite skipped: %s" % msg)
381
            self.skipTest("__SkipClass__: %s" % msg)
382

    
383
    # ----------------------------------
384
    # Flavors
385
    def _get_list_of_flavors(self, detail=False):
386
        """Get (detailed) list of flavors"""
387
        if detail:
388
            self.info("Getting detailed list of flavors")
389
        else:
390
            self.info("Getting simple list of flavors")
391
        flavors = self.clients.compute.list_flavors(detail=detail)
392
        return flavors
393

    
394
    def _find_flavors(self, patterns, flavors=None):
395
        """Find a list of suitable flavors to use
396

397
        The patterns is a list of `typed_options'. A list of all flavors
398
        matching this patterns will be returned.
399

400
        """
401
        if flavors is None:
402
            flavors = self._get_list_of_flavors(detail=True)
403

    
404
        ret_flavors = []
405
        for ptrn in patterns:
406
            try:
407
                flv_type, flv_value = parse_typed_option(ptrn)
408
            except ValueError:
409
                msg = "Invalid flavor format: %s. Must be [id|name]:.+"
410
                self.warning(msg, ptrn)
411
                continue
412

    
413
            if flv_type == "name":
414
                # Filter flavor by name
415
                msg = "Trying to find a flavor with name %s"
416
                self.info(msg, flv_value)
417
                filtered_flvs = \
418
                    [f for f in flavors if
419
                     re.search(flv_value, f['name'], flags=re.I) is not None]
420
            elif flv_type == "id":
421
                # Filter flavors by id
422
                msg = "Trying to find a flavor with id %s"
423
                self.info(msg, flv_value)
424
                filtered_flvs = \
425
                    [f for f in flavors if str(f['id']) == flv_value]
426
            else:
427
                self.error("Unrecognized flavor type %s", flv_type)
428

    
429
            # Append and continue
430
            ret_flavors.extend(filtered_flvs)
431

    
432
        self.assertGreater(len(ret_flavors), 0,
433
                           "No matching flavors found")
434
        return ret_flavors
435

    
436
    # ----------------------------------
437
    # Images
438
    def _get_list_of_images(self, detail=False):
439
        """Get (detailed) list of images"""
440
        if detail:
441
            self.info("Getting detailed list of images")
442
        else:
443
            self.info("Getting simple list of images")
444
        images = self.clients.image.list_public(detail=detail)
445
        # Remove images registered by burnin
446
        images = [img for img in images
447
                  if not img['name'].startswith(SNF_TEST_PREFIX)]
448
        return images
449

    
450
    def _get_list_of_sys_images(self, images=None):
451
        """Get (detailed) list of images registered by system user or by me"""
452
        self.info("Getting list of images registered by system user or by me")
453
        if images is None:
454
            images = self._get_list_of_images(detail=True)
455

    
456
        su_uuid = self._get_uuid_of_system_user()
457
        my_uuid = self._get_uuid()
458
        ret_images = [i for i in images
459
                      if i['owner'] == su_uuid or i['owner'] == my_uuid]
460

    
461
        return ret_images
462

    
463
    def _find_images(self, patterns, images=None):
464
        """Find a list of suitable images to use
465

466
        The patterns is a list of `typed_options'. A list of all images
467
        matching this patterns will be returned.
468

469
        """
470
        if images is None:
471
            images = self._get_list_of_sys_images()
472

    
473
        ret_images = []
474
        for ptrn in patterns:
475
            try:
476
                img_type, img_value = parse_typed_option(ptrn)
477
            except ValueError:
478
                msg = "Invalid image format: %s. Must be [id|name]:.+"
479
                self.warning(msg, ptrn)
480
                continue
481

    
482
            if img_type == "name":
483
                # Filter image by name
484
                msg = "Trying to find an image with name %s"
485
                self.info(msg, img_value)
486
                filtered_imgs = \
487
                    [i for i in images if
488
                     re.search(img_value, i['name'], flags=re.I) is not None]
489
            elif img_type == "id":
490
                # Filter images by id
491
                msg = "Trying to find an image with id %s"
492
                self.info(msg, img_value)
493
                filtered_imgs = \
494
                    [i for i in images if
495
                     i['id'].lower() == img_value.lower()]
496
            else:
497
                self.error("Unrecognized image type %s", img_type)
498

    
499
            # Append and continue
500
            ret_images.extend(filtered_imgs)
501

    
502
        self.assertGreater(len(ret_images), 0,
503
                           "No matching images found")
504
        return ret_images
505

    
506
    # ----------------------------------
507
    # Pithos
508
    def _set_pithos_account(self, account):
509
        """Set the Pithos account"""
510
        assert account, "No pithos account was given"
511

    
512
        self.info("Setting Pithos account to %s", account)
513
        self.clients.pithos.account = account
514

    
515
    def _set_pithos_container(self, container):
516
        """Set the Pithos container"""
517
        assert container, "No pithos container was given"
518

    
519
        self.info("Setting Pithos container to %s", container)
520
        self.clients.pithos.container = container
521

    
522
    def _get_list_of_containers(self, account=None):
523
        """Get list of containers"""
524
        if account is not None:
525
            self._set_pithos_account(account)
526
        self.info("Getting list of containers")
527
        return self.clients.pithos.list_containers()
528

    
529
    def _create_pithos_container(self, container):
530
        """Create a pithos container
531

532
        If the container exists, nothing will happen
533

534
        """
535
        assert container, "No pithos container was given"
536

    
537
        self.info("Creating pithos container %s", container)
538
        self.clients.pithos.container = container
539
        self.clients.pithos.container_put()
540

    
541
    # ----------------------------------
542
    # Quotas
543
    def _get_quotas(self):
544
        """Get quotas"""
545
        self.info("Getting quotas")
546
        return dict(self.clients.astakos.get_quotas())
547

    
548
    # pylint: disable=invalid-name
549
    # pylint: disable=too-many-arguments
550
    def _check_quotas(self, changes):
551
        """Check that quotas' changes are consistent
552

553
        @param changes: A dict of the changes that have been made in quotas
554

555
        """
556
        if not changes:
557
            return
558

    
559
        self.info("Check that quotas' changes are consistent")
560
        old_quotas = self.quotas
561
        new_quotas = self._get_quotas()
562
        self.quotas = new_quotas
563

    
564
        self.assertListEqual(sorted(old_quotas.keys()),
565
                             sorted(new_quotas.keys()))
566

    
567
        # Take old_quotas and apply changes
568
        for prj, values in changes.items():
569
            self.assertIn(prj, old_quotas.keys())
570
            for q_name, q_mult, q_value, q_unit in values:
571
                if q_unit is None:
572
                    q_unit = 1
573
                q_value = q_mult*int(q_value)*q_unit
574
                assert isinstance(q_value, int), \
575
                    "Project %s: %s value has to be integer" % (prj, q_name)
576
                old_quotas[prj][q_name]['usage'] += q_value
577
                old_quotas[prj][q_name]['project_usage'] += q_value
578

    
579
        self.assertEqual(old_quotas, new_quotas)
580

    
581
    # ----------------------------------
582
    # Projects
583
    def _get_project_name(self, puuid, uuid=None):
584
        """Get the name of a project"""
585
        if uuid is None:
586
            uuid = self._get_uuid()
587
        if puuid == uuid:
588
            return "base"
589
        else:
590
            project_info = self.clients.astakos.get_project(puuid)
591
            return project_info['name']
592

    
593

    
594
# --------------------------------------------------------------------
595
# Initialize Burnin
596
def initialize(opts, testsuites, stale_testsuites):
597
    """Initalize burnin
598

599
    Initialize our logger and burnin state
600

601
    """
602
    # Initialize logger
603
    global logger  # pylint: disable=invalid-name, global-statement
604
    curr_time = datetime.datetime.now()
605
    logger = Log(opts.log_folder, verbose=opts.verbose,
606
                 use_colors=opts.use_colors, in_parallel=False,
607
                 log_level=opts.log_level, curr_time=curr_time)
608

    
609
    # Initialize clients
610
    Clients.auth_url = opts.auth_url
611
    Clients.token = opts.token
612

    
613
    # Pass the rest options to BurninTests
614
    BurninTests.use_ipv6 = opts.use_ipv6
615
    BurninTests.action_timeout = opts.action_timeout
616
    BurninTests.action_warning = opts.action_warning
617
    BurninTests.query_interval = opts.query_interval
618
    BurninTests.system_user = opts.system_user
619
    BurninTests.flavors = opts.flavors
620
    BurninTests.images = opts.images
621
    BurninTests.delete_stale = opts.delete_stale
622
    BurninTests.temp_directory = opts.temp_directory
623
    BurninTests.failfast = opts.failfast
624
    BurninTests.run_id = SNF_TEST_PREFIX + \
625
        datetime.datetime.strftime(curr_time, "%Y%m%d%H%M%S")
626

    
627
    # Choose tests to run
628
    if opts.show_stale:
629
        # We will run the stale_testsuites
630
        return (stale_testsuites, True)
631

    
632
    if opts.tests != "all":
633
        testsuites = opts.tests
634
    if opts.exclude_tests is not None:
635
        testsuites = [tsuite for tsuite in testsuites
636
                      if tsuite not in opts.exclude_tests]
637

    
638
    return (testsuites, opts.failfast)
639

    
640

    
641
# --------------------------------------------------------------------
642
# Run Burnin
643
def run_burnin(testsuites, failfast=False):
644
    """Run burnin testsuites"""
645
    # pylint: disable=invalid-name,global-statement
646
    # pylint: disable=global-variable-not-assigned
647
    global logger, success
648

    
649
    success = True
650
    run_tests(testsuites, failfast=failfast)
651

    
652
    # Clean up our logger
653
    del logger
654

    
655
    # Return
656
    return 0 if success else 1
657

    
658

    
659
def run_tests(tcases, failfast=False):
660
    """Run some testcases"""
661
    # pylint: disable=invalid-name,global-statement
662
    # pylint: disable=global-variable-not-assigned
663
    global success
664

    
665
    for tcase in tcases:
666
        was_success = run_test(tcase)
667
        success = success and was_success
668
        if failfast and not success:
669
            break
670

    
671

    
672
def run_test(tcase):
673
    """Run a testcase"""
674
    tsuite = unittest.TestLoader().loadTestsFromTestCase(tcase)
675
    results = tsuite.run(BurninTestResult())
676

    
677
    return was_successful(tcase.__name__, results.wasSuccessful())
678

    
679

    
680
# --------------------------------------------------------------------
681
# Helper functions
682
def was_successful(tsuite, successful):
683
    """Handle whether a testsuite was succesful or not"""
684
    if successful:
685
        logger.testsuite_success(tsuite)
686
        return True
687
    else:
688
        logger.testsuite_failure(tsuite)
689
        return False
690

    
691

    
692
def parse_typed_option(value):
693
    """Parse typed options (flavors and images)
694

695
    The options are in the form 'id:123-345' or 'name:^Debian Base$'
696

697
    """
698
    try:
699
        [type_, val] = value.strip().split(':')
700
        if type_ not in ["id", "name"]:
701
            raise ValueError
702
        return type_, val
703
    except ValueError:
704
        raise