Statistics
| Branch: | Tag: | Revision:

root / snf-tools / synnefo_tools / burnin / common.py @ 411cbbf4

History | View | Annotate | Download (23.5 kB)

1
# Copyright 2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
"""
35
Common utils for burnin tests
36

37
"""
38

    
39
import re
40
import shutil
41
import unittest
42
import datetime
43
import tempfile
44
import traceback
45

    
46
from kamaki.clients.cyclades import CycladesClient, CycladesNetworkClient
47
from kamaki.clients.astakos import AstakosClient, parse_endpoints
48
from kamaki.clients.compute import ComputeClient
49
from kamaki.clients.pithos import PithosClient
50
from kamaki.clients.image import ImageClient
51

    
52
from synnefo_tools.burnin.logger import Log
53

    
54

    
55
# --------------------------------------------------------------------
56
# Global variables
57
logger = None   # pylint: disable=invalid-name
58
success = None  # pylint: disable=invalid-name
59
SNF_TEST_PREFIX = "snf-test-"
60
CONNECTION_RETRY_LIMIT = 2
61
SYSTEM_USERS = ["images@okeanos.grnet.gr", "images@demo.synnefo.org"]
62
KB = 2**10
63
MB = 2**20
64
GB = 2**30
65

    
66
QADD = 1
67
QREMOVE = -1
68

    
69
QDISK = "cyclades.disk"
70
QVM = "cyclades.vm"
71
QPITHOS = "pithos.diskspace"
72
QRAM = "cyclades.ram"
73
QIP = "cyclades.floating_ip"
74
QCPU = "cyclades.cpu"
75
QNET = "cyclades.network.private"
76

    
77

    
78
# --------------------------------------------------------------------
79
# BurninTestResult class
80
class BurninTestResult(unittest.TestResult):
81
    """Modify the TextTestResult class"""
82
    def __init__(self):
83
        super(BurninTestResult, self).__init__()
84

    
85
        # Test parameters
86
        self.failfast = True
87

    
88
    def startTest(self, test):  # noqa
89
        """Called when the test case test is about to be run"""
90
        super(BurninTestResult, self).startTest(test)
91
        logger.log(test.__class__.__name__, test.shortDescription())
92

    
93
    # pylint: disable=no-self-use
94
    def _test_failed(self, test, err):
95
        """Test failed"""
96
        # Get class name
97
        if test.__class__.__name__ == "_ErrorHolder":
98
            class_name = test.id().split('.')[-1].rstrip(')')
99
        else:
100
            class_name = test.__class__.__name__
101
        err_msg = str(test) + "... failed (%s)."
102
        timestamp = datetime.datetime.strftime(
103
            datetime.datetime.now(), "%a %b %d %Y %H:%M:%S")
104
        logger.error(class_name, err_msg, timestamp)
105
        (err_type, err_value, err_trace) = err
106
        trcback = traceback.format_exception(err_type, err_value, err_trace)
107
        logger.info(class_name, trcback)
108

    
109
    def addError(self, test, err):  # noqa
110
        """Called when the test case test raises an unexpected exception"""
111
        super(BurninTestResult, self).addError(test, err)
112
        self._test_failed(test, err)
113

    
114
    def addFailure(self, test, err):  # noqa
115
        """Called when the test case test signals a failure"""
116
        super(BurninTestResult, self).addFailure(test, err)
117
        self._test_failed(test, err)
118

    
119
    # pylint: disable=fixme
120
    def addSkip(self, test, reason):  # noqa
121
        """Called when the test case test is skipped
122

123
        If reason starts with "__SkipClass__: " then
124
        we should stop the execution of all the TestSuite.
125

126
        TODO: There should be a better way to do this
127

128
        """
129
        super(BurninTestResult, self).addSkip(test, reason)
130
        if reason.startswith("__SkipClass__: "):
131
            self.stop()
132

    
133

    
134
# --------------------------------------------------------------------
135
# Helper Classes
136
# pylint: disable=too-few-public-methods
137
# pylint: disable=too-many-instance-attributes
138
class Clients(object):
139
    """Our kamaki clients"""
140
    auth_url = None
141
    token = None
142
    # Astakos
143
    astakos = None
144
    retry = CONNECTION_RETRY_LIMIT
145
    # Compute
146
    compute = None
147
    compute_url = None
148
    # Cyclades
149
    cyclades = None
150
    # Network
151
    network = None
152
    network_url = None
153
    # Pithos
154
    pithos = None
155
    pithos_url = None
156
    # Image
157
    image = None
158
    image_url = None
159

    
160
    def initialize_clients(self):
161
        """Initialize all the Kamaki Clients"""
162
        self.astakos = AstakosClient(self.auth_url, self.token)
163
        self.astakos.CONNECTION_RETRY_LIMIT = self.retry
164

    
165
        endpoints = self.astakos.authenticate()
166

    
167
        self.compute_url = _get_endpoint_url(endpoints, "compute")
168
        self.compute = ComputeClient(self.compute_url, self.token)
169
        self.compute.CONNECTION_RETRY_LIMIT = self.retry
170

    
171
        self.cyclades = CycladesClient(self.compute_url, self.token)
172
        self.cyclades.CONNECTION_RETRY_LIMIT = self.retry
173

    
174
        self.network_url = _get_endpoint_url(endpoints, "network")
175
        self.network = CycladesNetworkClient(self.network_url, self.token)
176
        self.network.CONNECTION_RETRY_LIMIT = self.retry
177

    
178
        self.pithos_url = _get_endpoint_url(endpoints, "object-store")
179
        self.pithos = PithosClient(self.pithos_url, self.token)
180
        self.pithos.CONNECTION_RETRY_LIMIT = self.retry
181

    
182
        self.image_url = _get_endpoint_url(endpoints, "image")
183
        self.image = ImageClient(self.image_url, self.token)
184
        self.image.CONNECTION_RETRY_LIMIT = self.retry
185

    
186

    
187
def _get_endpoint_url(endpoints, endpoint_type):
188
    """Get the publicURL for the specified endpoint"""
189

    
190
    service_catalog = parse_endpoints(endpoints, ep_type=endpoint_type)
191
    return service_catalog[0]['endpoints'][0]['publicURL']
192

    
193

    
194
class Proper(object):
195
    """A descriptor used by tests implementing the TestCase class
196

197
    Since each instance of the TestCase will only be used to run a single
198
    test method (a new fixture is created for each test) the attributes can
199
    not be saved in the class instances. Instead we use descriptors.
200

201
    """
202
    def __init__(self, value=None):
203
        self.val = value
204

    
205
    def __get__(self, obj, objtype=None):
206
        return self.val
207

    
208
    def __set__(self, obj, value):
209
        self.val = value
210

    
211

    
212
# --------------------------------------------------------------------
213
# BurninTests class
214
# pylint: disable=too-many-public-methods
215
class BurninTests(unittest.TestCase):
216
    """Common class that all burnin tests should implement"""
217
    clients = Clients()
218
    run_id = None
219
    use_ipv6 = None
220
    action_timeout = None
221
    action_warning = None
222
    query_interval = None
223
    system_user = None
224
    images = None
225
    flavors = None
226
    delete_stale = False
227
    temp_directory = None
228
    failfast = None
229

    
230
    quotas = Proper(value=None)
231
    uuid = Proper(value=None)
232

    
233
    @classmethod
234
    def setUpClass(cls):  # noqa
235
        """Initialize BurninTests"""
236
        cls.suite_name = cls.__name__
237
        logger.testsuite_start(cls.suite_name)
238

    
239
        # Set test parameters
240
        cls.longMessage = True
241

    
242
    def test_000_clients_setup(self):
243
        """Initializing astakos/cyclades/pithos clients"""
244
        # Update class attributes
245
        self.clients.initialize_clients()
246
        self.info("Astakos auth url is %s", self.clients.auth_url)
247
        self.info("Cyclades url is %s", self.clients.compute_url)
248
        self.info("Network url is %s", self.clients.network_url)
249
        self.info("Pithos url is %s", self.clients.pithos_url)
250
        self.info("Image url is %s", self.clients.image_url)
251

    
252
        self.quotas = self._get_quotas()
253
        for puuid, quotas in self.quotas.items():
254
            project_name = self._get_project_name(puuid)
255
            self.info("  Project %s:", project_name)
256
            self.info("    Disk usage is         %s bytes",
257
                      quotas['cyclades.disk']['usage'])
258
            self.info("    VM usage is           %s",
259
                      quotas['cyclades.vm']['usage'])
260
            self.info("    DiskSpace usage is    %s bytes",
261
                      quotas['pithos.diskspace']['usage'])
262
            self.info("    Ram usage is          %s bytes",
263
                      quotas['cyclades.ram']['usage'])
264
            self.info("    Floating IPs usage is %s",
265
                      quotas['cyclades.floating_ip']['usage'])
266
            self.info("    CPU usage is          %s",
267
                      quotas['cyclades.cpu']['usage'])
268
            self.info("    Network usage is      %s",
269
                      quotas['cyclades.network.private']['usage'])
270

    
271
    def _run_tests(self, tcases):
272
        """Run some generated testcases"""
273
        global success  # pylint: disable=invalid-name, global-statement
274

    
275
        for tcase in tcases:
276
            self.info("Running testsuite %s", tcase.__name__)
277
            success = run_test(tcase) and success
278
            if self.failfast and not success:
279
                break
280

    
281
    # ----------------------------------
282
    # Loggers helper functions
283
    def log(self, msg, *args):
284
        """Pass the section value to logger"""
285
        logger.log(self.suite_name, msg, *args)
286

    
287
    def info(self, msg, *args):
288
        """Pass the section value to logger"""
289
        logger.info(self.suite_name, msg, *args)
290

    
291
    def debug(self, msg, *args):
292
        """Pass the section value to logger"""
293
        logger.debug(self.suite_name, msg, *args)
294

    
295
    def warning(self, msg, *args):
296
        """Pass the section value to logger"""
297
        logger.warning(self.suite_name, msg, *args)
298

    
299
    def error(self, msg, *args):
300
        """Pass the section value to logger"""
301
        logger.error(self.suite_name, msg, *args)
302
        self.fail(msg % args)
303

    
304
    # ----------------------------------
305
    # Helper functions that every testsuite may need
306
    def _get_uuid(self):
307
        """Get our uuid"""
308
        if self.uuid is None:
309
            authenticate = self.clients.astakos.authenticate()
310
            self.uuid = authenticate['access']['user']['id']
311
            self.info("User's uuid is %s", self.uuid)
312
        return self.uuid
313

    
314
    def _get_username(self):
315
        """Get our User Name"""
316
        authenticate = self.clients.astakos.authenticate()
317
        username = authenticate['access']['user']['name']
318
        self.info("User's name is %s", username)
319
        return username
320

    
321
    def _create_tmp_directory(self):
322
        """Create a tmp directory"""
323
        temp_dir = tempfile.mkdtemp(dir=self.temp_directory)
324
        self.info("Temp directory %s created", temp_dir)
325
        return temp_dir
326

    
327
    def _remove_tmp_directory(self, tmp_dir):
328
        """Remove a tmp directory"""
329
        try:
330
            shutil.rmtree(tmp_dir)
331
            self.info("Temp directory %s deleted", tmp_dir)
332
        except OSError:
333
            pass
334

    
335
    def _get_uuid_of_system_user(self):
336
        """Get the uuid of the system user
337

338
        This is the user that upload the 'official' images.
339

340
        """
341
        self.info("Getting the uuid of the system user")
342
        system_users = None
343
        if self.system_user is not None:
344
            try:
345
                su_type, su_value = parse_typed_option(self.system_user)
346
                if su_type == "name":
347
                    system_users = [su_value]
348
                elif su_type == "id":
349
                    self.info("System user's uuid is %s", su_value)
350
                    return su_value
351
                else:
352
                    self.error("Unrecognized system-user type %s", su_type)
353
            except ValueError:
354
                msg = "Invalid system-user format: %s. Must be [id|name]:.+"
355
                self.warning(msg, self.system_user)
356

    
357
        if system_users is None:
358
            system_users = SYSTEM_USERS
359

    
360
        uuids = self.clients.astakos.get_uuids(system_users)
361
        for su_name in system_users:
362
            self.info("Trying username %s", su_name)
363
            if su_name in uuids:
364
                self.info("System user's uuid is %s", uuids[su_name])
365
                return uuids[su_name]
366

    
367
        self.warning("No system user found")
368
        return None
369

    
370
    def _skip_if(self, condition, msg):
371
        """Skip tests"""
372
        if condition:
373
            self.info("Test skipped: %s" % msg)
374
            self.skipTest(msg)
375

    
376
    def _skip_suite_if(self, condition, msg):
377
        """Skip the whole testsuite"""
378
        if condition:
379
            self.info("TestSuite skipped: %s" % msg)
380
            self.skipTest("__SkipClass__: %s" % msg)
381

    
382
    # ----------------------------------
383
    # Flavors
384
    def _get_list_of_flavors(self, detail=False):
385
        """Get (detailed) list of flavors"""
386
        if detail:
387
            self.info("Getting detailed list of flavors")
388
        else:
389
            self.info("Getting simple list of flavors")
390
        flavors = self.clients.compute.list_flavors(detail=detail)
391
        return flavors
392

    
393
    def _find_flavors(self, patterns, flavors=None):
394
        """Find a list of suitable flavors to use
395

396
        The patterns is a list of `typed_options'. A list of all flavors
397
        matching this patterns will be returned.
398

399
        """
400
        if flavors is None:
401
            flavors = self._get_list_of_flavors(detail=True)
402

    
403
        ret_flavors = []
404
        for ptrn in patterns:
405
            try:
406
                flv_type, flv_value = parse_typed_option(ptrn)
407
            except ValueError:
408
                msg = "Invalid flavor format: %s. Must be [id|name]:.+"
409
                self.warning(msg, ptrn)
410
                continue
411

    
412
            if flv_type == "name":
413
                # Filter flavor by name
414
                msg = "Trying to find a flavor with name %s"
415
                self.info(msg, flv_value)
416
                filtered_flvs = \
417
                    [f for f in flavors if
418
                     re.search(flv_value, f['name'], flags=re.I) is not None]
419
            elif flv_type == "id":
420
                # Filter flavors by id
421
                msg = "Trying to find a flavor with id %s"
422
                self.info(msg, flv_value)
423
                filtered_flvs = \
424
                    [f for f in flavors if str(f['id']) == flv_value]
425
            else:
426
                self.error("Unrecognized flavor type %s", flv_type)
427

    
428
            # Append and continue
429
            ret_flavors.extend(filtered_flvs)
430

    
431
        self.assertGreater(len(ret_flavors), 0,
432
                           "No matching flavors found")
433
        return ret_flavors
434

    
435
    # ----------------------------------
436
    # Images
437
    def _get_list_of_images(self, detail=False):
438
        """Get (detailed) list of images"""
439
        if detail:
440
            self.info("Getting detailed list of images")
441
        else:
442
            self.info("Getting simple list of images")
443
        images = self.clients.image.list_public(detail=detail)
444
        # Remove images registered by burnin
445
        images = [img for img in images
446
                  if not img['name'].startswith(SNF_TEST_PREFIX)]
447
        return images
448

    
449
    def _get_list_of_sys_images(self, images=None):
450
        """Get (detailed) list of images registered by system user or by me"""
451
        self.info("Getting list of images registered by system user or by me")
452
        if images is None:
453
            images = self._get_list_of_images(detail=True)
454

    
455
        su_uuid = self._get_uuid_of_system_user()
456
        my_uuid = self._get_uuid()
457
        ret_images = [i for i in images
458
                      if i['owner'] == su_uuid or i['owner'] == my_uuid]
459

    
460
        return ret_images
461

    
462
    def _find_images(self, patterns, images=None):
463
        """Find a list of suitable images to use
464

465
        The patterns is a list of `typed_options'. A list of all images
466
        matching this patterns will be returned.
467

468
        """
469
        if images is None:
470
            images = self._get_list_of_sys_images()
471

    
472
        ret_images = []
473
        for ptrn in patterns:
474
            try:
475
                img_type, img_value = parse_typed_option(ptrn)
476
            except ValueError:
477
                msg = "Invalid image format: %s. Must be [id|name]:.+"
478
                self.warning(msg, ptrn)
479
                continue
480

    
481
            if img_type == "name":
482
                # Filter image by name
483
                msg = "Trying to find an image with name %s"
484
                self.info(msg, img_value)
485
                filtered_imgs = \
486
                    [i for i in images if
487
                     re.search(img_value, i['name'], flags=re.I) is not None]
488
            elif img_type == "id":
489
                # Filter images by id
490
                msg = "Trying to find an image with id %s"
491
                self.info(msg, img_value)
492
                filtered_imgs = \
493
                    [i for i in images if
494
                     i['id'].lower() == img_value.lower()]
495
            else:
496
                self.error("Unrecognized image type %s", img_type)
497

    
498
            # Append and continue
499
            ret_images.extend(filtered_imgs)
500

    
501
        self.assertGreater(len(ret_images), 0,
502
                           "No matching images found")
503
        return ret_images
504

    
505
    # ----------------------------------
506
    # Pithos
507
    def _set_pithos_account(self, account):
508
        """Set the Pithos account"""
509
        assert account, "No pithos account was given"
510

    
511
        self.info("Setting Pithos account to %s", account)
512
        self.clients.pithos.account = account
513

    
514
    def _set_pithos_container(self, container):
515
        """Set the Pithos container"""
516
        assert container, "No pithos container was given"
517

    
518
        self.info("Setting Pithos container to %s", container)
519
        self.clients.pithos.container = container
520

    
521
    def _get_list_of_containers(self, account=None):
522
        """Get list of containers"""
523
        if account is not None:
524
            self._set_pithos_account(account)
525
        self.info("Getting list of containers")
526
        return self.clients.pithos.list_containers()
527

    
528
    def _create_pithos_container(self, container):
529
        """Create a pithos container
530

531
        If the container exists, nothing will happen
532

533
        """
534
        assert container, "No pithos container was given"
535

    
536
        self.info("Creating pithos container %s", container)
537
        self.clients.pithos.container = container
538
        self.clients.pithos.container_put()
539

    
540
    # ----------------------------------
541
    # Quotas
542
    def _get_quotas(self):
543
        """Get quotas"""
544
        self.info("Getting quotas")
545
        return dict(self.clients.astakos.get_quotas())
546

    
547
    # pylint: disable=invalid-name
548
    # pylint: disable=too-many-arguments
549
    def _check_quotas(self, changes):
550
        """Check that quotas' changes are consistent
551

552
        @param changes: A dict of the changes that have been made in quotas
553

554
        """
555
        if not changes:
556
            return
557

    
558
        self.info("Check that quotas' changes are consistent")
559
        old_quotas = self.quotas
560
        new_quotas = self._get_quotas()
561
        self.quotas = new_quotas
562

    
563
        self.assertListEqual(sorted(old_quotas.keys()),
564
                             sorted(new_quotas.keys()))
565

    
566
        # Take old_quotas and apply changes
567
        for prj, values in changes.items():
568
            self.assertIn(prj, old_quotas.keys())
569
            for q_name, q_mult, q_value, q_unit in values:
570
                if q_unit is None:
571
                    q_unit = 1
572
                q_value = q_mult*int(q_value)*q_unit
573
                assert isinstance(q_value, int), \
574
                    "Project %s: %s value has to be integer" % (prj, q_name)
575
                old_quotas[prj][q_name]['usage'] += q_value
576
                old_quotas[prj][q_name]['project_usage'] += q_value
577

    
578
        self.assertEqual(old_quotas, new_quotas)
579

    
580
    # ----------------------------------
581
    # Projects
582
    def _get_project_name(self, puuid):
583
        """Get the name of a project"""
584
        uuid = self._get_uuid()
585
        if puuid == uuid:
586
            return "base"
587
        else:
588
            project_info = self.clients.astakos.get_project(puuid)
589
            return project_info['name']
590

    
591

    
592
# --------------------------------------------------------------------
593
# Initialize Burnin
594
def initialize(opts, testsuites, stale_testsuites):
595
    """Initalize burnin
596

597
    Initialize our logger and burnin state
598

599
    """
600
    # Initialize logger
601
    global logger  # pylint: disable=invalid-name, global-statement
602
    curr_time = datetime.datetime.now()
603
    logger = Log(opts.log_folder, verbose=opts.verbose,
604
                 use_colors=opts.use_colors, in_parallel=False,
605
                 log_level=opts.log_level, curr_time=curr_time)
606

    
607
    # Initialize clients
608
    Clients.auth_url = opts.auth_url
609
    Clients.token = opts.token
610

    
611
    # Pass the rest options to BurninTests
612
    BurninTests.use_ipv6 = opts.use_ipv6
613
    BurninTests.action_timeout = opts.action_timeout
614
    BurninTests.action_warning = opts.action_warning
615
    BurninTests.query_interval = opts.query_interval
616
    BurninTests.system_user = opts.system_user
617
    BurninTests.flavors = opts.flavors
618
    BurninTests.images = opts.images
619
    BurninTests.delete_stale = opts.delete_stale
620
    BurninTests.temp_directory = opts.temp_directory
621
    BurninTests.failfast = opts.failfast
622
    BurninTests.run_id = SNF_TEST_PREFIX + \
623
        datetime.datetime.strftime(curr_time, "%Y%m%d%H%M%S")
624

    
625
    # Choose tests to run
626
    if opts.show_stale:
627
        # We will run the stale_testsuites
628
        return (stale_testsuites, True)
629

    
630
    if opts.tests != "all":
631
        testsuites = opts.tests
632
    if opts.exclude_tests is not None:
633
        testsuites = [tsuite for tsuite in testsuites
634
                      if tsuite not in opts.exclude_tests]
635

    
636
    return (testsuites, opts.failfast)
637

    
638

    
639
# --------------------------------------------------------------------
640
# Run Burnin
641
def run_burnin(testsuites, failfast=False):
642
    """Run burnin testsuites"""
643
    # pylint: disable=invalid-name,global-statement
644
    # pylint: disable=global-variable-not-assigned
645
    global logger, success
646

    
647
    success = True
648
    run_tests(testsuites, failfast=failfast)
649

    
650
    # Clean up our logger
651
    del logger
652

    
653
    # Return
654
    return 0 if success else 1
655

    
656

    
657
def run_tests(tcases, failfast=False):
658
    """Run some testcases"""
659
    # pylint: disable=invalid-name,global-statement
660
    # pylint: disable=global-variable-not-assigned
661
    global success
662

    
663
    for tcase in tcases:
664
        was_success = run_test(tcase)
665
        success = success and was_success
666
        if failfast and not success:
667
            break
668

    
669

    
670
def run_test(tcase):
671
    """Run a testcase"""
672
    tsuite = unittest.TestLoader().loadTestsFromTestCase(tcase)
673
    results = tsuite.run(BurninTestResult())
674

    
675
    return was_successful(tcase.__name__, results.wasSuccessful())
676

    
677

    
678
# --------------------------------------------------------------------
679
# Helper functions
680
def was_successful(tsuite, successful):
681
    """Handle whether a testsuite was succesful or not"""
682
    if successful:
683
        logger.testsuite_success(tsuite)
684
        return True
685
    else:
686
        logger.testsuite_failure(tsuite)
687
        return False
688

    
689

    
690
def parse_typed_option(value):
691
    """Parse typed options (flavors and images)
692

693
    The options are in the form 'id:123-345' or 'name:^Debian Base$'
694

695
    """
696
    try:
697
        [type_, val] = value.strip().split(':')
698
        if type_ not in ["id", "name"]:
699
            raise ValueError
700
        return type_, val
701
    except ValueError:
702
        raise