Statistics
| Branch: | Tag: | Revision:

root / snf-tools / synnefo_tools / burnin / common.py @ 0a83201b

History | View | Annotate | Download (23 kB)

1
# Copyright 2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
"""
35
Common utils for burnin tests
36

37
"""
38

    
39
import re
40
import shutil
41
import unittest
42
import datetime
43
import tempfile
44
import traceback
45

    
46
from kamaki.clients.cyclades import CycladesClient, CycladesNetworkClient
47
from kamaki.clients.astakos import AstakosClient, parse_endpoints
48
from kamaki.clients.compute import ComputeClient
49
from kamaki.clients.pithos import PithosClient
50
from kamaki.clients.image import ImageClient
51

    
52
from synnefo_tools.burnin.logger import Log
53

    
54

    
55
# --------------------------------------------------------------------
56
# Global variables
57
logger = None   # Invalid constant name. pylint: disable-msg=C0103
58
success = None  # Invalid constant name. pylint: disable-msg=C0103
59
SNF_TEST_PREFIX = "snf-test-"
60
CONNECTION_RETRY_LIMIT = 2
61
SYSTEM_USERS = ["images@okeanos.grnet.gr", "images@demo.synnefo.org"]
62
KB = 2**10
63
MB = 2**20
64
GB = 2**30
65

    
66

    
67
# --------------------------------------------------------------------
68
# BurninTestResult class
69
class BurninTestResult(unittest.TestResult):
70
    """Modify the TextTestResult class"""
71
    def __init__(self):
72
        super(BurninTestResult, self).__init__()
73

    
74
        # Test parameters
75
        self.failfast = True
76

    
77
    def startTest(self, test):  # noqa
78
        """Called when the test case test is about to be run"""
79
        super(BurninTestResult, self).startTest(test)
80
        logger.log(test.__class__.__name__, test.shortDescription())
81

    
82
    # Method could be a function. pylint: disable-msg=R0201
83
    def _test_failed(self, test, err):
84
        """Test failed"""
85
        # Get class name
86
        if test.__class__.__name__ == "_ErrorHolder":
87
            class_name = test.id().split('.')[-1].rstrip(')')
88
        else:
89
            class_name = test.__class__.__name__
90
        err_msg = str(test) + "... failed (%s)."
91
        timestamp = datetime.datetime.strftime(
92
            datetime.datetime.now(), "%a %b %d %Y %H:%M:%S")
93
        logger.error(class_name, err_msg, timestamp)
94
        (err_type, err_value, err_trace) = err
95
        trcback = traceback.format_exception(err_type, err_value, err_trace)
96
        logger.info(class_name, trcback)
97

    
98
    def addError(self, test, err):  # noqa
99
        """Called when the test case test raises an unexpected exception"""
100
        super(BurninTestResult, self).addError(test, err)
101
        self._test_failed(test, err)
102

    
103
    def addFailure(self, test, err):  # noqa
104
        """Called when the test case test signals a failure"""
105
        super(BurninTestResult, self).addFailure(test, err)
106
        self._test_failed(test, err)
107

    
108

    
109
# --------------------------------------------------------------------
110
# Helper Classes
111
# Too few public methods. pylint: disable-msg=R0903
112
# Too many instance attributes. pylint: disable-msg=R0902
113
class Clients(object):
114
    """Our kamaki clients"""
115
    auth_url = None
116
    token = None
117
    # Astakos
118
    astakos = None
119
    retry = CONNECTION_RETRY_LIMIT
120
    # Compute
121
    compute = None
122
    compute_url = None
123
    # Cyclades
124
    cyclades = None
125
    # Network
126
    network = None
127
    network_url = None
128
    # Pithos
129
    pithos = None
130
    pithos_url = None
131
    # Image
132
    image = None
133
    image_url = None
134

    
135
    def initialize_clients(self):
136
        """Initialize all the Kamaki Clients"""
137
        self.astakos = AstakosClient(self.auth_url, self.token)
138
        self.astakos.CONNECTION_RETRY_LIMIT = self.retry
139

    
140
        endpoints = self.astakos.authenticate()
141

    
142
        self.compute_url = _get_endpoint_url(endpoints, "compute")
143
        self.compute = ComputeClient(self.compute_url, self.token)
144
        self.compute.CONNECTION_RETRY_LIMIT = self.retry
145

    
146
        self.cyclades = CycladesClient(self.compute_url, self.token)
147
        self.cyclades.CONNECTION_RETRY_LIMIT = self.retry
148

    
149
        self.network_url = _get_endpoint_url(endpoints, "network")
150
        self.network = CycladesNetworkClient(self.network_url, self.token)
151
        self.network.CONNECTION_RETRY_LIMIT = self.retry
152

    
153
        self.pithos_url = _get_endpoint_url(endpoints, "object-store")
154
        self.pithos = PithosClient(self.pithos_url, self.token)
155
        self.pithos.CONNECTION_RETRY_LIMIT = self.retry
156

    
157
        self.image_url = _get_endpoint_url(endpoints, "image")
158
        self.image = ImageClient(self.image_url, self.token)
159
        self.image.CONNECTION_RETRY_LIMIT = self.retry
160

    
161

    
162
def _get_endpoint_url(endpoints, endpoint_type):
163
    """Get the publicURL for the specified endpoint"""
164

    
165
    service_catalog = parse_endpoints(endpoints, ep_type=endpoint_type)
166
    return service_catalog[0]['endpoints'][0]['publicURL']
167

    
168

    
169
class Proper(object):
170
    """A descriptor used by tests implementing the TestCase class
171

172
    Since each instance of the TestCase will only be used to run a single
173
    test method (a new fixture is created for each test) the attributes can
174
    not be saved in the class instances. Instead we use descriptors.
175

176
    """
177
    def __init__(self, value=None):
178
        self.val = value
179

    
180
    def __get__(self, obj, objtype=None):
181
        return self.val
182

    
183
    def __set__(self, obj, value):
184
        self.val = value
185

    
186

    
187
# --------------------------------------------------------------------
188
# BurninTests class
189
# Too many public methods (45/20). pylint: disable-msg=R0904
190
class BurninTests(unittest.TestCase):
191
    """Common class that all burnin tests should implement"""
192
    clients = Clients()
193
    run_id = None
194
    use_ipv6 = None
195
    action_timeout = None
196
    action_warning = None
197
    query_interval = None
198
    system_user = None
199
    images = None
200
    flavors = None
201
    delete_stale = False
202
    temp_directory = None
203
    failfast = None
204

    
205
    quotas = Proper(value=None)
206

    
207
    @classmethod
208
    def setUpClass(cls):  # noqa
209
        """Initialize BurninTests"""
210
        cls.suite_name = cls.__name__
211
        logger.testsuite_start(cls.suite_name)
212

    
213
        # Set test parameters
214
        cls.longMessage = True
215

    
216
    def test_000_clients_setup(self):
217
        """Initializing astakos/cyclades/pithos clients"""
218
        # Update class attributes
219
        self.clients.initialize_clients()
220
        self.info("Astakos auth url is %s", self.clients.auth_url)
221
        self.info("Cyclades url is %s", self.clients.compute_url)
222
        self.info("Network url is %s", self.clients.network_url)
223
        self.info("Pithos url is %s", self.clients.pithos_url)
224
        self.info("Image url is %s", self.clients.image_url)
225

    
226
        self.quotas = self._get_quotas()
227
        self.info("  Disk usage is %s bytes",
228
                  self.quotas['system']['cyclades.disk']['usage'])
229
        self.info("  VM usage is %s",
230
                  self.quotas['system']['cyclades.vm']['usage'])
231
        self.info("  DiskSpace usage is %s bytes",
232
                  self.quotas['system']['pithos.diskspace']['usage'])
233
        self.info("  Ram usage is %s bytes",
234
                  self.quotas['system']['cyclades.ram']['usage'])
235
        self.info("  Floating IPs usage is %s",
236
                  self.quotas['system']['cyclades.floating_ip']['usage'])
237
        self.info("  CPU usage is %s",
238
                  self.quotas['system']['cyclades.cpu']['usage'])
239
        self.info("  Network usage is %s",
240
                  self.quotas['system']['cyclades.network.private']['usage'])
241

    
242
    def _run_tests(self, tcases):
243
        """Run some generated testcases"""
244
        global success  # Using global. pylint: disable-msg=C0103,W0603,W0602
245

    
246
        for tcase in tcases:
247
            self.info("Running testsuite %s", tcase.__name__)
248
            success = run_test(tcase) and success
249
            if self.failfast and not success:
250
                break
251

    
252
    # ----------------------------------
253
    # Loggers helper functions
254
    def log(self, msg, *args):
255
        """Pass the section value to logger"""
256
        logger.log(self.suite_name, msg, *args)
257

    
258
    def info(self, msg, *args):
259
        """Pass the section value to logger"""
260
        logger.info(self.suite_name, msg, *args)
261

    
262
    def debug(self, msg, *args):
263
        """Pass the section value to logger"""
264
        logger.debug(self.suite_name, msg, *args)
265

    
266
    def warning(self, msg, *args):
267
        """Pass the section value to logger"""
268
        logger.warning(self.suite_name, msg, *args)
269

    
270
    def error(self, msg, *args):
271
        """Pass the section value to logger"""
272
        logger.error(self.suite_name, msg, *args)
273

    
274
    # ----------------------------------
275
    # Helper functions that every testsuite may need
276
    def _get_uuid(self):
277
        """Get our uuid"""
278
        authenticate = self.clients.astakos.authenticate()
279
        uuid = authenticate['access']['user']['id']
280
        self.info("User's uuid is %s", uuid)
281
        return uuid
282

    
283
    def _get_username(self):
284
        """Get our User Name"""
285
        authenticate = self.clients.astakos.authenticate()
286
        username = authenticate['access']['user']['name']
287
        self.info("User's name is %s", username)
288
        return username
289

    
290
    def _create_tmp_directory(self):
291
        """Create a tmp directory"""
292
        temp_dir = tempfile.mkdtemp(dir=self.temp_directory)
293
        self.info("Temp directory %s created", temp_dir)
294
        return temp_dir
295

    
296
    def _remove_tmp_directory(self, tmp_dir):
297
        """Remove a tmp directory"""
298
        try:
299
            shutil.rmtree(tmp_dir)
300
            self.info("Temp directory %s deleted", tmp_dir)
301
        except OSError:
302
            pass
303

    
304
    def _get_uuid_of_system_user(self):
305
        """Get the uuid of the system user
306

307
        This is the user that upload the 'official' images.
308

309
        """
310
        self.info("Getting the uuid of the system user")
311
        system_users = None
312
        if self.system_user is not None:
313
            try:
314
                su_type, su_value = parse_typed_option(self.system_user)
315
                if su_type == "name":
316
                    system_users = [su_value]
317
                elif su_type == "id":
318
                    self.info("System user's uuid is %s", su_value)
319
                    return su_value
320
                else:
321
                    self.error("Unrecognized system-user type %s", su_type)
322
                    self.fail("Unrecognized system-user type")
323
            except ValueError:
324
                msg = "Invalid system-user format: %s. Must be [id|name]:.+"
325
                self.warning(msg, self.system_user)
326

    
327
        if system_users is None:
328
            system_users = SYSTEM_USERS
329

    
330
        uuids = self.clients.astakos.get_uuids(system_users)
331
        for su_name in system_users:
332
            self.info("Trying username %s", su_name)
333
            if su_name in uuids:
334
                self.info("System user's uuid is %s", uuids[su_name])
335
                return uuids[su_name]
336

    
337
        self.warning("No system user found")
338
        return None
339

    
340
    def _skip_if(self, condition, msg):
341
        """Skip tests"""
342
        if condition:
343
            self.info("Test skipped: %s" % msg)
344
            self.skipTest(msg)
345

    
346
    # ----------------------------------
347
    # Flavors
348
    def _get_list_of_flavors(self, detail=False):
349
        """Get (detailed) list of flavors"""
350
        if detail:
351
            self.info("Getting detailed list of flavors")
352
        else:
353
            self.info("Getting simple list of flavors")
354
        flavors = self.clients.compute.list_flavors(detail=detail)
355
        return flavors
356

    
357
    def _find_flavors(self, patterns, flavors=None):
358
        """Find a list of suitable flavors to use
359

360
        The patterns is a list of `typed_options'. A list of all flavors
361
        matching this patterns will be returned.
362

363
        """
364
        if flavors is None:
365
            flavors = self._get_list_of_flavors(detail=True)
366

    
367
        ret_flavors = []
368
        for ptrn in patterns:
369
            try:
370
                flv_type, flv_value = parse_typed_option(ptrn)
371
            except ValueError:
372
                msg = "Invalid flavor format: %s. Must be [id|name]:.+"
373
                self.warning(msg, ptrn)
374
                continue
375

    
376
            if flv_type == "name":
377
                # Filter flavor by name
378
                msg = "Trying to find a flavor with name %s"
379
                self.info(msg, flv_value)
380
                filtered_flvs = \
381
                    [f for f in flavors if
382
                     re.search(flv_value, f['name'], flags=re.I) is not None]
383
            elif flv_type == "id":
384
                # Filter flavors by id
385
                msg = "Trying to find a flavor with id %s"
386
                self.info(msg, flv_value)
387
                filtered_flvs = \
388
                    [f for f in flavors if str(f['id']) == flv_value]
389
            else:
390
                self.error("Unrecognized flavor type %s", flv_type)
391
                self.fail("Unrecognized flavor type")
392

    
393
            # Append and continue
394
            ret_flavors.extend(filtered_flvs)
395

    
396
        self.assertGreater(len(ret_flavors), 0,
397
                           "No matching flavors found")
398
        return ret_flavors
399

    
400
    # ----------------------------------
401
    # Images
402
    def _get_list_of_images(self, detail=False):
403
        """Get (detailed) list of images"""
404
        if detail:
405
            self.info("Getting detailed list of images")
406
        else:
407
            self.info("Getting simple list of images")
408
        images = self.clients.image.list_public(detail=detail)
409
        # Remove images registered by burnin
410
        images = [img for img in images
411
                  if not img['name'].startswith(SNF_TEST_PREFIX)]
412
        return images
413

    
414
    def _get_list_of_sys_images(self, images=None):
415
        """Get (detailed) list of images registered by system user or by me"""
416
        self.info("Getting list of images registered by system user or by me")
417
        if images is None:
418
            images = self._get_list_of_images(detail=True)
419

    
420
        su_uuid = self._get_uuid_of_system_user()
421
        my_uuid = self._get_uuid()
422
        ret_images = [i for i in images
423
                      if i['owner'] == su_uuid or i['owner'] == my_uuid]
424

    
425
        return ret_images
426

    
427
    def _find_images(self, patterns, images=None):
428
        """Find a list of suitable images to use
429

430
        The patterns is a list of `typed_options'. A list of all images
431
        matching this patterns will be returned.
432

433
        """
434
        if images is None:
435
            images = self._get_list_of_sys_images()
436

    
437
        ret_images = []
438
        for ptrn in patterns:
439
            try:
440
                img_type, img_value = parse_typed_option(ptrn)
441
            except ValueError:
442
                msg = "Invalid image format: %s. Must be [id|name]:.+"
443
                self.warning(msg, ptrn)
444
                continue
445

    
446
            if img_type == "name":
447
                # Filter image by name
448
                msg = "Trying to find an image with name %s"
449
                self.info(msg, img_value)
450
                filtered_imgs = \
451
                    [i for i in images if
452
                     re.search(img_value, i['name'], flags=re.I) is not None]
453
            elif img_type == "id":
454
                # Filter images by id
455
                msg = "Trying to find an image with id %s"
456
                self.info(msg, img_value)
457
                filtered_imgs = \
458
                    [i for i in images if
459
                     i['id'].lower() == img_value.lower()]
460
            else:
461
                self.error("Unrecognized image type %s", img_type)
462
                self.fail("Unrecognized image type")
463

    
464
            # Append and continue
465
            ret_images.extend(filtered_imgs)
466

    
467
        self.assertGreater(len(ret_images), 0,
468
                           "No matching images found")
469
        return ret_images
470

    
471
    # ----------------------------------
472
    # Pithos
473
    def _set_pithos_account(self, account):
474
        """Set the Pithos account"""
475
        assert account, "No pithos account was given"
476

    
477
        self.info("Setting Pithos account to %s", account)
478
        self.clients.pithos.account = account
479

    
480
    def _set_pithos_container(self, container):
481
        """Set the Pithos container"""
482
        assert container, "No pithos container was given"
483

    
484
        self.info("Setting Pithos container to %s", container)
485
        self.clients.pithos.container = container
486

    
487
    def _get_list_of_containers(self, account=None):
488
        """Get list of containers"""
489
        if account is not None:
490
            self._set_pithos_account(account)
491
        self.info("Getting list of containers")
492
        return self.clients.pithos.list_containers()
493

    
494
    def _create_pithos_container(self, container):
495
        """Create a pithos container
496

497
        If the container exists, nothing will happen
498

499
        """
500
        assert container, "No pithos container was given"
501

    
502
        self.info("Creating pithos container %s", container)
503
        self.clients.pithos.container = container
504
        self.clients.pithos.container_put()
505

    
506
    # ----------------------------------
507
    # Quotas
508
    def _get_quotas(self):
509
        """Get quotas"""
510
        self.info("Getting quotas")
511
        return self.clients.astakos.get_quotas()
512

    
513
    # Invalid argument name. pylint: disable-msg=C0103
514
    # Too many arguments. pylint: disable-msg=R0913
515
    def _check_quotas(self, disk=None, vm=None, diskspace=None,
516
                      ram=None, ip=None, cpu=None, network=None):
517
        """Check that quotas' changes are consistent"""
518
        assert any(v is None for v in
519
                   [disk, vm, diskspace, ram, ip, cpu, network]), \
520
            "_check_quotas require arguments"
521

    
522
        self.info("Check that quotas' changes are consistent")
523
        old_quotas = self.quotas
524
        new_quotas = self._get_quotas()
525
        self.quotas = new_quotas
526

    
527
        # Check Disk usage
528
        self._check_quotas_aux(
529
            old_quotas, new_quotas, 'cyclades.disk', disk)
530
        # Check VM usage
531
        self._check_quotas_aux(
532
            old_quotas, new_quotas, 'cyclades.vm', vm)
533
        # Check DiskSpace usage
534
        self._check_quotas_aux(
535
            old_quotas, new_quotas, 'pithos.diskspace', diskspace)
536
        # Check Ram usage
537
        self._check_quotas_aux(
538
            old_quotas, new_quotas, 'cyclades.ram', ram)
539
        # Check Floating IPs usage
540
        self._check_quotas_aux(
541
            old_quotas, new_quotas, 'cyclades.floating_ip', ip)
542
        # Check CPU usage
543
        self._check_quotas_aux(
544
            old_quotas, new_quotas, 'cyclades.cpu', cpu)
545
        # Check Network usage
546
        self._check_quotas_aux(
547
            old_quotas, new_quotas, 'cyclades.network.private', network)
548

    
549
    def _check_quotas_aux(self, old_quotas, new_quotas, resource, value):
550
        """Auxiliary function for _check_quotas"""
551
        old_value = old_quotas['system'][resource]['usage']
552
        new_value = new_quotas['system'][resource]['usage']
553
        if value is not None:
554
            assert isinstance(value, int), \
555
                "%s value has to be integer" % resource
556
            old_value += value
557
        self.assertEqual(old_value, new_value,
558
                         "%s quotas don't match" % resource)
559

    
560

    
561
# --------------------------------------------------------------------
562
# Initialize Burnin
563
def initialize(opts, testsuites, stale_testsuites):
564
    """Initalize burnin
565

566
    Initialize our logger and burnin state
567

568
    """
569
    # Initialize logger
570
    global logger  # Using global statement. pylint: disable-msg=C0103,W0603
571
    curr_time = datetime.datetime.now()
572
    logger = Log(opts.log_folder, verbose=opts.verbose,
573
                 use_colors=opts.use_colors, in_parallel=False,
574
                 log_level=opts.log_level, curr_time=curr_time)
575

    
576
    # Initialize clients
577
    Clients.auth_url = opts.auth_url
578
    Clients.token = opts.token
579

    
580
    # Pass the rest options to BurninTests
581
    BurninTests.use_ipv6 = opts.use_ipv6
582
    BurninTests.action_timeout = opts.action_timeout
583
    BurninTests.action_warning = opts.action_warning
584
    BurninTests.query_interval = opts.query_interval
585
    BurninTests.system_user = opts.system_user
586
    BurninTests.flavors = opts.flavors
587
    BurninTests.images = opts.images
588
    BurninTests.delete_stale = opts.delete_stale
589
    BurninTests.temp_directory = opts.temp_directory
590
    BurninTests.failfast = opts.failfast
591
    BurninTests.run_id = SNF_TEST_PREFIX + \
592
        datetime.datetime.strftime(curr_time, "%Y%m%d%H%M%S")
593

    
594
    # Choose tests to run
595
    if opts.show_stale:
596
        # We will run the stale_testsuites
597
        return (stale_testsuites, True)
598

    
599
    if opts.tests != "all":
600
        testsuites = opts.tests
601
    if opts.exclude_tests is not None:
602
        testsuites = [tsuite for tsuite in testsuites
603
                      if tsuite not in opts.exclude_tests]
604

    
605
    return (testsuites, opts.failfast)
606

    
607

    
608
# --------------------------------------------------------------------
609
# Run Burnin
610
def run_burnin(testsuites, failfast=False):
611
    """Run burnin testsuites"""
612
    # Using global. pylint: disable-msg=C0103,W0603,W0602
613
    global logger, success
614

    
615
    success = True
616
    run_tests(testsuites, failfast=failfast)
617

    
618
    # Clean up our logger
619
    del logger
620

    
621
    # Return
622
    return 0 if success else 1
623

    
624

    
625
def run_tests(tcases, failfast=False):
626
    """Run some testcases"""
627
    global success  # Using global. pylint: disable-msg=C0103,W0603,W0602
628

    
629
    for tcase in tcases:
630
        was_success = run_test(tcase)
631
        success = success and was_success
632
        if failfast and not success:
633
            break
634

    
635

    
636
def run_test(tcase):
637
    """Run a testcase"""
638
    tsuite = unittest.TestLoader().loadTestsFromTestCase(tcase)
639
    results = tsuite.run(BurninTestResult())
640

    
641
    return was_successful(tcase.__name__, results.wasSuccessful())
642

    
643

    
644
# --------------------------------------------------------------------
645
# Helper functions
646
def was_successful(tsuite, successful):
647
    """Handle whether a testsuite was succesful or not"""
648
    if successful:
649
        logger.testsuite_success(tsuite)
650
        return True
651
    else:
652
        logger.testsuite_failure(tsuite)
653
        return False
654

    
655

    
656
def parse_typed_option(value):
657
    """Parse typed options (flavors and images)
658

659
    The options are in the form 'id:123-345' or 'name:^Debian Base$'
660

661
    """
662
    try:
663
        [type_, val] = value.strip().split(':')
664
        if type_ not in ["id", "name"]:
665
            raise ValueError
666
        return type_, val
667
    except ValueError:
668
        raise