Statistics
| Branch: | Tag: | Revision:

root / snf-tools / synnefo_tools / burnin / common.py @ 3139b628

History | View | Annotate | Download (23.1 kB)

1
# Copyright 2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
"""
35
Common utils for burnin tests
36

37
"""
38

    
39
import re
40
import shutil
41
import unittest
42
import datetime
43
import tempfile
44
import traceback
45

    
46
from kamaki.clients.cyclades import CycladesClient, CycladesNetworkClient
47
from kamaki.clients.astakos import AstakosClient
48
from kamaki.clients.compute import ComputeClient
49
from kamaki.clients.pithos import PithosClient
50
from kamaki.clients.image import ImageClient
51

    
52
from synnefo_tools.burnin.logger import Log
53

    
54

    
55
# --------------------------------------------------------------------
56
# Global variables
57
logger = None   # Invalid constant name. pylint: disable-msg=C0103
58
success = None  # Invalid constant name. pylint: disable-msg=C0103
59
SNF_TEST_PREFIX = "snf-test-"
60
CONNECTION_RETRY_LIMIT = 2
61
SYSTEM_USERS = ["images@okeanos.grnet.gr", "images@demo.synnefo.org"]
62
KB = 2**10
63
MB = 2**20
64
GB = 2**30
65

    
66

    
67
# --------------------------------------------------------------------
68
# BurninTestResult class
69
class BurninTestResult(unittest.TestResult):
70
    """Modify the TextTestResult class"""
71
    def __init__(self):
72
        super(BurninTestResult, self).__init__()
73

    
74
        # Test parameters
75
        self.failfast = True
76

    
77
    def startTest(self, test):  # noqa
78
        """Called when the test case test is about to be run"""
79
        super(BurninTestResult, self).startTest(test)
80
        logger.log(
81
            test.__class__.__name__,
82
            test.shortDescription() or 'Test %s' % test.__class__.__name__)
83

    
84
    # Method could be a function. pylint: disable-msg=R0201
85
    def _test_failed(self, test, err):
86
        """Test failed"""
87
        # Get class name
88
        if test.__class__.__name__ == "_ErrorHolder":
89
            class_name = test.id().split('.')[-1].rstrip(')')
90
        else:
91
            class_name = test.__class__.__name__
92
        err_msg = str(test) + "... failed (%s)."
93
        timestamp = datetime.datetime.strftime(
94
            datetime.datetime.now(), "%a %b %d %Y %H:%M:%S")
95
        logger.error(class_name, err_msg, timestamp)
96
        (err_type, err_value, err_trace) = err
97
        trcback = traceback.format_exception(err_type, err_value, err_trace)
98
        logger.info(class_name, trcback)
99

    
100
    def addError(self, test, err):  # noqa
101
        """Called when the test case test raises an unexpected exception"""
102
        super(BurninTestResult, self).addError(test, err)
103
        self._test_failed(test, err)
104

    
105
    def addFailure(self, test, err):  # noqa
106
        """Called when the test case test signals a failure"""
107
        super(BurninTestResult, self).addFailure(test, err)
108
        self._test_failed(test, err)
109

    
110

    
111
# --------------------------------------------------------------------
112
# Helper Classes
113
# Too few public methods. pylint: disable-msg=R0903
114
# Too many instance attributes. pylint: disable-msg=R0902
115
class Clients(object):
116
    """Our kamaki clients"""
117
    auth_url = None
118
    token = None
119
    # Astakos
120
    astakos = None
121
    retry = CONNECTION_RETRY_LIMIT
122
    # Compute
123
    compute = None
124
    compute_url = None
125
    # Cyclades
126
    cyclades = None
127
    # Network
128
    network = None
129
    network_url = None
130
    # Pithos
131
    pithos = None
132
    pithos_url = None
133
    # Image
134
    image = None
135
    image_url = None
136

    
137
    def initialize_clients(self):
138
        """Initialize all the Kamaki Clients"""
139
        self.astakos = AstakosClient(self.auth_url, self.token)
140
        self.astakos.CONNECTION_RETRY_LIMIT = self.retry
141

    
142
        self.compute_url = \
143
            self.astakos.get_service_endpoints('compute')['publicURL']
144
        self.compute = ComputeClient(self.compute_url, self.token)
145
        self.compute.CONNECTION_RETRY_LIMIT = self.retry
146

    
147
        self.cyclades = CycladesClient(self.compute_url, self.token)
148
        self.cyclades.CONNECTION_RETRY_LIMIT = self.retry
149

    
150
        self.network_url = \
151
            self.astakos.get_service_endpoints('network')['publicURL']
152
        self.network = CycladesNetworkClient(self.network_url, self.token)
153
        self.network.CONNECTION_RETRY_LIMIT = self.retry
154

    
155
        self.pithos_url = self.astakos.\
156
            get_service_endpoints('object-store')['publicURL']
157
        self.pithos = PithosClient(self.pithos_url, self.token)
158
        self.pithos.CONNECTION_RETRY_LIMIT = self.retry
159

    
160
        self.image_url = \
161
            self.astakos.get_service_endpoints('image')['publicURL']
162
        self.image = ImageClient(self.image_url, self.token)
163
        self.image.CONNECTION_RETRY_LIMIT = self.retry
164

    
165

    
166
class Proper(object):
167
    """A descriptor used by tests implementing the TestCase class
168

169
    Since each instance of the TestCase will only be used to run a single
170
    test method (a new fixture is created for each test) the attributes can
171
    not be saved in the class instances. Instead we use descriptors.
172

173
    """
174
    def __init__(self, value=None):
175
        self.val = value
176

    
177
    def __get__(self, obj, objtype=None):
178
        return self.val
179

    
180
    def __set__(self, obj, value):
181
        self.val = value
182

    
183

    
184
# --------------------------------------------------------------------
185
# BurninTests class
186
# Too many public methods (45/20). pylint: disable-msg=R0904
187
class BurninTests(unittest.TestCase):
188
    """Common class that all burnin tests should implement"""
189
    clients = Clients()
190
    run_id = None
191
    use_ipv6 = None
192
    action_timeout = None
193
    action_warning = None
194
    query_interval = None
195
    system_user = None
196
    images = None
197
    flavors = None
198
    delete_stale = False
199
    temp_directory = None
200
    failfast = None
201

    
202
    quotas = Proper(value=None)
203

    
204
    @classmethod
205
    def setUpClass(cls):  # noqa
206
        """Initialize BurninTests"""
207
        cls.suite_name = cls.__name__
208
        logger.testsuite_start(cls.suite_name)
209

    
210
        # Set test parameters
211
        cls.longMessage = True
212

    
213
    def test_000_clients_setup(self):
214
        """Initializing astakos/cyclades/pithos clients"""
215
        # Update class attributes
216
        self.clients.initialize_clients()
217
        self.info("Astakos auth url is %s", self.clients.auth_url)
218
        self.info("Cyclades url is %s", self.clients.compute_url)
219
        self.info("Network url is %s", self.clients.network_url)
220
        self.info("Pithos url is %s", self.clients.pithos_url)
221
        self.info("Image url is %s", self.clients.image_url)
222

    
223
        self.quotas = self._get_quotas()
224
        self.info("  Disk usage is %s bytes",
225
                  self.quotas['system']['cyclades.disk']['usage'])
226
        self.info("  VM usage is %s",
227
                  self.quotas['system']['cyclades.vm']['usage'])
228
        self.info("  DiskSpace usage is %s bytes",
229
                  self.quotas['system']['pithos.diskspace']['usage'])
230
        self.info("  Ram usage is %s bytes",
231
                  self.quotas['system']['cyclades.ram']['usage'])
232
        self.info("  Floating IPs usage is %s",
233
                  self.quotas['system']['cyclades.floating_ip']['usage'])
234
        self.info("  CPU usage is %s",
235
                  self.quotas['system']['cyclades.cpu']['usage'])
236
        self.info("  Network usage is %s",
237
                  self.quotas['system']['cyclades.network.private']['usage'])
238

    
239
    def _run_tests(self, tcases):
240
        """Run some generated testcases"""
241
        global success  # Using global. pylint: disable-msg=C0103,W0603,W0602
242

    
243
        for tcase in tcases:
244
            self.info("Running testsuite %s", tcase.__name__)
245
            success = run_test(tcase) and success
246
            if self.failfast and not success:
247
                break
248

    
249
    # ----------------------------------
250
    # Loggers helper functions
251
    def log(self, msg, *args):
252
        """Pass the section value to logger"""
253
        logger.log(self.suite_name, msg, *args)
254

    
255
    def info(self, msg, *args):
256
        """Pass the section value to logger"""
257
        logger.info(self.suite_name, msg, *args)
258

    
259
    def debug(self, msg, *args):
260
        """Pass the section value to logger"""
261
        logger.debug(self.suite_name, msg, *args)
262

    
263
    def warning(self, msg, *args):
264
        """Pass the section value to logger"""
265
        logger.warning(self.suite_name, msg, *args)
266

    
267
    def error(self, msg, *args):
268
        """Pass the section value to logger"""
269
        logger.error(self.suite_name, msg, *args)
270

    
271
    # ----------------------------------
272
    # Helper functions that every testsuite may need
273
    def _get_uuid(self):
274
        """Get our uuid"""
275
        authenticate = self.clients.astakos.authenticate()
276
        uuid = authenticate['access']['user']['id']
277
        self.info("User's uuid is %s", uuid)
278
        return uuid
279

    
280
    def _get_username(self):
281
        """Get our User Name"""
282
        authenticate = self.clients.astakos.authenticate()
283
        username = authenticate['access']['user']['name']
284
        self.info("User's name is %s", username)
285
        return username
286

    
287
    def _create_tmp_directory(self):
288
        """Create a tmp directory"""
289
        temp_dir = tempfile.mkdtemp(dir=self.temp_directory)
290
        self.info("Temp directory %s created", temp_dir)
291
        return temp_dir
292

    
293
    def _remove_tmp_directory(self, tmp_dir):
294
        """Remove a tmp directory"""
295
        try:
296
            shutil.rmtree(tmp_dir)
297
            self.info("Temp directory %s deleted", tmp_dir)
298
        except OSError:
299
            pass
300

    
301
    def _get_uuid_of_system_user(self):
302
        """Get the uuid of the system user
303

304
        This is the user that upload the 'official' images.
305

306
        """
307
        self.info("Getting the uuid of the system user")
308
        system_users = None
309
        if self.system_user is not None:
310
            parsed_su = parse_typed_option(self.system_user)
311
            if parsed_su is None:
312
                msg = "Invalid system-user format: %s. Must be [id|name]:.+"
313
                self.warning(msg, self.system_user)
314
            else:
315
                su_type, su_value = parsed_su
316
                if su_type == "name":
317
                    system_users = [su_value]
318
                elif su_type == "id":
319
                    self.info("System user's uuid is %s", su_value)
320
                    return su_value
321
                else:
322
                    self.error("Unrecognized system-user type %s", su_type)
323
                    self.fail("Unrecognized system-user type")
324

    
325
        if system_users is None:
326
            system_users = SYSTEM_USERS
327

    
328
        uuids = self.clients.astakos.get_uuids(system_users)
329
        for su_name in system_users:
330
            self.info("Trying username %s", su_name)
331
            if su_name in uuids:
332
                self.info("System user's uuid is %s", uuids[su_name])
333
                return uuids[su_name]
334

    
335
        self.warning("No system user found")
336
        return None
337

    
338
    def _skip_if(self, condition, msg):
339
        """Skip tests"""
340
        if condition:
341
            self.info("Test skipped: %s" % msg)
342
            self.skipTest(msg)
343

    
344
    # ----------------------------------
345
    # Flavors
346
    def _get_list_of_flavors(self, detail=False):
347
        """Get (detailed) list of flavors"""
348
        if detail:
349
            self.info("Getting detailed list of flavors")
350
        else:
351
            self.info("Getting simple list of flavors")
352
        flavors = self.clients.compute.list_flavors(detail=detail)
353
        return flavors
354

    
355
    def _find_flavors(self, patterns, flavors=None):
356
        """Find a list of suitable flavors to use
357

358
        The patterns is a list of `typed_options'. A list of all flavors
359
        matching this patterns will be returned.
360

361
        """
362
        if flavors is None:
363
            flavors = self._get_list_of_flavors(detail=True)
364

    
365
        ret_flavors = []
366
        for ptrn in patterns:
367
            parsed_ptrn = parse_typed_option(ptrn)
368
            if parsed_ptrn is None:
369
                msg = "Invalid flavor format: %s. Must be [id|name]:.+"
370
                self.warning(msg, ptrn)
371
                continue
372
            flv_type, flv_value = parsed_ptrn
373
            if flv_type == "name":
374
                # Filter flavor by name
375
                msg = "Trying to find a flavor with name %s"
376
                self.info(msg, flv_value)
377
                filtered_flvs = \
378
                    [f for f in flavors if
379
                     re.search(flv_value, f['name'], flags=re.I) is not None]
380
            elif flv_type == "id":
381
                # Filter flavors by id
382
                msg = "Trying to find a flavor with id %s"
383
                self.info(msg, flv_value)
384
                filtered_flvs = \
385
                    [f for f in flavors if str(f['id']) == flv_value]
386
            else:
387
                self.error("Unrecognized flavor type %s", flv_type)
388
                self.fail("Unrecognized flavor type")
389

    
390
            # Append and continue
391
            ret_flavors.extend(filtered_flvs)
392

    
393
        self.assertGreater(len(ret_flavors), 0,
394
                           "No matching flavors found")
395
        return ret_flavors
396

    
397
    # ----------------------------------
398
    # Images
399
    def _get_list_of_images(self, detail=False):
400
        """Get (detailed) list of images"""
401
        if detail:
402
            self.info("Getting detailed list of images")
403
        else:
404
            self.info("Getting simple list of images")
405
        images = self.clients.image.list_public(detail=detail)
406
        # Remove images registered by burnin
407
        images = [img for img in images
408
                  if not img['name'].startswith(SNF_TEST_PREFIX)]
409
        return images
410

    
411
    def _get_list_of_sys_images(self, images=None):
412
        """Get (detailed) list of images registered by system user or by me"""
413
        self.info("Getting list of images registered by system user or by me")
414
        if images is None:
415
            images = self._get_list_of_images(detail=True)
416

    
417
        su_uuid = self._get_uuid_of_system_user()
418
        my_uuid = self._get_uuid()
419
        ret_images = [i for i in images
420
                      if i['owner'] == su_uuid or i['owner'] == my_uuid]
421

    
422
        return ret_images
423

    
424
    def _find_images(self, patterns, images=None):
425
        """Find a list of suitable images to use
426

427
        The patterns is a list of `typed_options'. A list of all images
428
        matching this patterns will be returned.
429

430
        """
431
        if images is None:
432
            images = self._get_list_of_sys_images()
433

    
434
        ret_images = []
435
        for ptrn in patterns:
436
            parsed_ptrn = parse_typed_option(ptrn)
437
            if parsed_ptrn is None:
438
                msg = "Invalid image format: %s. Must be [id|name]:.+"
439
                self.warning(msg, ptrn)
440
                continue
441
            img_type, img_value = parsed_ptrn
442
            if img_type == "name":
443
                # Filter image by name
444
                msg = "Trying to find an image with name %s"
445
                self.info(msg, img_value)
446
                filtered_imgs = \
447
                    [i for i in images if
448
                     re.search(img_value, i['name'], flags=re.I) is not None]
449
            elif img_type == "id":
450
                # Filter images by id
451
                msg = "Trying to find an image with id %s"
452
                self.info(msg, img_value)
453
                filtered_imgs = \
454
                    [i for i in images if
455
                     i['id'].lower() == img_value.lower()]
456
            else:
457
                self.error("Unrecognized image type %s", img_type)
458
                self.fail("Unrecognized image type")
459

    
460
            # Append and continue
461
            ret_images.extend(filtered_imgs)
462

    
463
        self.assertGreater(len(ret_images), 0,
464
                           "No matching images found")
465
        return ret_images
466

    
467
    # ----------------------------------
468
    # Pithos
469
    def _set_pithos_account(self, account):
470
        """Set the Pithos account"""
471
        assert account, "No pithos account was given"
472

    
473
        self.info("Setting Pithos account to %s", account)
474
        self.clients.pithos.account = account
475

    
476
    def _set_pithos_container(self, container):
477
        """Set the Pithos container"""
478
        assert container, "No pithos container was given"
479

    
480
        self.info("Setting Pithos container to %s", container)
481
        self.clients.pithos.container = container
482

    
483
    def _get_list_of_containers(self, account=None):
484
        """Get list of containers"""
485
        if account is not None:
486
            self._set_pithos_account(account)
487
        self.info("Getting list of containers")
488
        return self.clients.pithos.list_containers()
489

    
490
    def _create_pithos_container(self, container):
491
        """Create a pithos container
492

493
        If the container exists, nothing will happen
494

495
        """
496
        assert container, "No pithos container was given"
497

    
498
        self.info("Creating pithos container %s", container)
499
        self.clients.pithos.container = container
500
        self.clients.pithos.container_put()
501

    
502
    # ----------------------------------
503
    # Quotas
504
    def _get_quotas(self):
505
        """Get quotas"""
506
        self.info("Getting quotas")
507
        #  astakos_client = self.clients.astakos.get_client()
508
        return self.clients.astakos.get_quotas()
509

    
510
    # Invalid argument name. pylint: disable-msg=C0103
511
    # Too many arguments. pylint: disable-msg=R0913
512
    def _check_quotas(self, disk=None, vm=None, diskspace=None,
513
                      ram=None, ip=None, cpu=None, network=None):
514
        """Check that quotas' changes are consistent"""
515
        assert any(v is None for v in
516
                   [disk, vm, diskspace, ram, ip, cpu, network]), \
517
            "_check_quotas require arguments"
518

    
519
        self.info("Check that quotas' changes are consistent")
520
        old_quotas = self.quotas
521
        new_quotas = self._get_quotas()
522
        self.quotas = new_quotas
523

    
524
        # Check Disk usage
525
        self._check_quotas_aux(
526
            old_quotas, new_quotas, 'cyclades.disk', disk)
527
        # Check VM usage
528
        self._check_quotas_aux(
529
            old_quotas, new_quotas, 'cyclades.vm', vm)
530
        # Check DiskSpace usage
531
        self._check_quotas_aux(
532
            old_quotas, new_quotas, 'pithos.diskspace', diskspace)
533
        # Check Ram usage
534
        self._check_quotas_aux(
535
            old_quotas, new_quotas, 'cyclades.ram', ram)
536
        # Check Floating IPs usage
537
        self._check_quotas_aux(
538
            old_quotas, new_quotas, 'cyclades.floating_ip', ip)
539
        # Check CPU usage
540
        self._check_quotas_aux(
541
            old_quotas, new_quotas, 'cyclades.cpu', cpu)
542
        # Check Network usage
543
        self._check_quotas_aux(
544
            old_quotas, new_quotas, 'cyclades.network.private', network)
545

    
546
    def _check_quotas_aux(self, old_quotas, new_quotas, resource, value):
547
        """Auxiliary function for _check_quotas"""
548
        old_value = old_quotas['system'][resource]['usage']
549
        new_value = new_quotas['system'][resource]['usage']
550
        if value is not None:
551
            assert isinstance(value, int), \
552
                "%s value has to be integer" % resource
553
            old_value += value
554
        self.assertEqual(old_value, new_value,
555
                         "%s quotas don't match" % resource)
556

    
557

    
558
# --------------------------------------------------------------------
559
# Initialize Burnin
560
def initialize(opts, testsuites, stale_testsuites):
561
    """Initalize burnin
562

563
    Initialize our logger and burnin state
564

565
    """
566
    # Initialize logger
567
    global logger  # Using global statement. pylint: disable-msg=C0103,W0603
568
    curr_time = datetime.datetime.now()
569
    logger = Log(opts.log_folder, verbose=opts.verbose,
570
                 use_colors=opts.use_colors, in_parallel=False,
571
                 log_level=opts.log_level, curr_time=curr_time)
572

    
573
    # Initialize clients
574
    Clients.auth_url = opts.auth_url
575
    Clients.token = opts.token
576

    
577
    # Pass the rest options to BurninTests
578
    BurninTests.use_ipv6 = opts.use_ipv6
579
    BurninTests.action_timeout = opts.action_timeout
580
    BurninTests.action_warning = opts.action_warning
581
    BurninTests.query_interval = opts.query_interval
582
    BurninTests.system_user = opts.system_user
583
    BurninTests.flavors = opts.flavors
584
    BurninTests.images = opts.images
585
    BurninTests.delete_stale = opts.delete_stale
586
    BurninTests.temp_directory = opts.temp_directory
587
    BurninTests.failfast = opts.failfast
588
    BurninTests.run_id = SNF_TEST_PREFIX + \
589
        datetime.datetime.strftime(curr_time, "%Y%m%d%H%M%S")
590

    
591
    # Choose tests to run
592
    if opts.show_stale:
593
        # We will run the stale_testsuites
594
        return (stale_testsuites, True)
595

    
596
    if opts.tests != "all":
597
        testsuites = opts.tests
598
    if opts.exclude_tests is not None:
599
        testsuites = [tsuite for tsuite in testsuites
600
                      if tsuite not in opts.exclude_tests]
601

    
602
    return (testsuites, opts.failfast)
603

    
604

    
605
# --------------------------------------------------------------------
606
# Run Burnin
607
def run_burnin(testsuites, failfast=False):
608
    """Run burnin testsuites"""
609
    # Using global. pylint: disable-msg=C0103,W0603,W0602
610
    global logger, success
611

    
612
    success = True
613
    run_tests(testsuites, failfast=failfast)
614

    
615
    # Clean up our logger
616
    del(logger)
617

    
618
    # Return
619
    return 0 if success else 1
620

    
621

    
622
def run_tests(tcases, failfast=False):
623
    """Run some testcases"""
624
    global success  # Using global. pylint: disable-msg=C0103,W0603,W0602
625

    
626
    for tcase in tcases:
627
        was_success = run_test(tcase)
628
        success = success and was_success
629
        if failfast and not success:
630
            break
631

    
632

    
633
def run_test(tcase):
634
    """Run a testcase"""
635
    tsuite = unittest.TestLoader().loadTestsFromTestCase(tcase)
636
    results = tsuite.run(BurninTestResult())
637

    
638
    return was_successful(tcase.__name__, results.wasSuccessful())
639

    
640

    
641
# --------------------------------------------------------------------
642
# Helper functions
643
def was_successful(tsuite, successful):
644
    """Handle whether a testsuite was succesful or not"""
645
    if successful:
646
        logger.testsuite_success(tsuite)
647
        return True
648
    else:
649
        logger.testsuite_failure(tsuite)
650
        return False
651

    
652

    
653
def parse_typed_option(value):
654
    """Parse typed options (flavors and images)
655

656
    The options are in the form 'id:123-345' or 'name:^Debian Base$'
657

658
    """
659
    try:
660
        [type_, val] = value.strip().split(':')
661
        if type_ not in ["id", "name"]:
662
            raise ValueError
663
        return type_, val
664
    except ValueError:
665
        return None