Revision 81c60832

b/kamaki/cli/commands/cyclades.py
375 375
    arguments = dict(
376 376
        personality=PersonalityArgument(
377 377
            (80 * ' ').join(howto_personality), ('-p', '--personality')),
378
        wait=FlagArgument('Wait server to build', ('-w', '--wait'))
378
        wait=FlagArgument('Wait server to build', ('-w', '--wait')),
379
        cluster_size=IntArgument(
380
            'Create a cluster of servers of this size. In this case, the name'
381
            'parameter is the prefix of each server in the cluster (e.g.,'
382
            'srv1, srv2, etc.',
383
            '--cluster-size')
379 384
    )
380 385

  
386
    @errors.cyclades.cluster_size
387
    def _create_cluster(self, prefix, flavor_id, image_id, size):
388
        servers = [dict(
389
            name='%s%s' % (prefix, i),
390
            flavor_id=flavor_id,
391
            image_id=image_id,
392
            personality=self['personality']) for i in range(size)]
393
        if size == 1:
394
            return [self.client.create_server(**servers[0])]
395
        return self.client.async_run(self.client.create_server, servers)
396

  
381 397
    @errors.generic.all
382 398
    @errors.cyclades.connection
383 399
    @errors.plankton.id
384 400
    @errors.cyclades.flavor_id
385 401
    def _run(self, name, flavor_id, image_id):
386
        r = self.client.create_server(
387
            name, int(flavor_id), image_id, personality=self['personality'])
388
        usernames = self._uuids2usernames([r['user_id'], r['tenant_id']])
389
        r['user_id'] += ' (%s)' % usernames[r['user_id']]
390
        r['tenant_id'] += ' (%s)' % usernames[r['tenant_id']]
391
        self._print(r, self.print_dict)
392
        if self['wait']:
393
            self._wait(r['id'], r['status'])
402
        for r in self._create_cluster(
403
                name, flavor_id, image_id, size=self['cluster_size'] or 1):
404
            print 'HEY I GOT A', r
405
            print 'MKEY?????????????????'
406
            usernames = self._uuids2usernames([r['user_id'], r['tenant_id']])
407
            r['user_id'] += ' (%s)' % usernames[r['user_id']]
408
            r['tenant_id'] += ' (%s)' % usernames[r['tenant_id']]
409
            self._print(r, self.print_dict)
410
            if self['wait']:
411
                self._wait(r['id'], r['status'])
412
            self.error('')
394 413

  
395 414
    def main(self, name, flavor_id, image_id):
396 415
        super(self.__class__, self)._run()
......
421 440
    """Delete a virtual server"""
422 441

  
423 442
    arguments = dict(
424
        wait=FlagArgument('Wait server to be destroyed', ('-w', '--wait'))
443
        wait=FlagArgument('Wait server to be destroyed', ('-w', '--wait')),
444
        cluster=FlagArgument(
445
            '(DANGEROUS) Delete all virtual servers prefixed with the cluster '
446
            'prefix. In that case, the prefix replaces the server id',
447
            '--cluster')
425 448
    )
426 449

  
450
    def _server_ids(self, server_var):
451
        if self['cluster']:
452
            return [s['id'] for s in self.client.list_servers() if (
453
                s['name'].startswith(server_var))]
454

  
455
        @errors.cyclades.server_id
456
        def _check_server_id(self, server_id):
457
            return server_id
458

  
459
        return [_check_server_id(self, server_id=server_var), ]
460

  
427 461
    @errors.generic.all
428 462
    @errors.cyclades.connection
429
    @errors.cyclades.server_id
430
    def _run(self, server_id):
431
            status = 'DELETED'
463
    def _run(self, server_var):
464
        for server_id in self._server_ids(server_var):
432 465
            if self['wait']:
433 466
                details = self.client.get_server_details(server_id)
434 467
                status = details['status']
435 468

  
436
            r = self.client.delete_server(int(server_id))
469
            r = self.client.delete_server(server_id)
437 470
            self._optional_output(r)
438 471

  
439 472
            if self['wait']:
440 473
                self._wait(server_id, status)
441 474

  
442
    def main(self, server_id):
475
    def main(self, server_id_or_cluster_prefix):
443 476
        super(self.__class__, self)._run()
444
        self._run(server_id=server_id)
477
        self._run(server_id_or_cluster_prefix)
445 478

  
446 479

  
447 480
@command(server_cmds)
......
785 818
        self._run(server_id=server_id, current_status=current_status)
786 819

  
787 820

  
788
@command(server_cmds)
789
class server_cluster_create(_init_cyclades):
790
    """Create a cluster of virtual servers
791
    All new servers will be named as <prefix><increment> e.g.,
792
    mycluster1, mycluster2, etc.
793
    All servers in the cluster will run the same image on the same hardware
794
    flavor.
795
    """
796

  
797
    @errors.generic.all
798
    @errors.cyclades.connection
799
    @errors.plankton.id
800
    @errors.cyclades.flavor_id
801
    @errors.cyclades.cluster_size
802
    def _run(self, prefix, image_id, flavor_id, size):
803
        servers = [dict(
804
            name='%s%s' % (prefix, i),
805
            flavor_id=flavor_id,
806
            image_id=image_id) for i in range(int(size))]
807
        self.client.create_cluster(servers)
808

  
809
    def main(self, prefix, image_id, flavor_id, size):
810
        super(self.__class__, self)._run()
811
        self._run(prefix, image_id=image_id, flavor_id=flavor_id, size=size)
812

  
813

  
814
@command(server_cmds)
815
class server_cluster_delete(_init_cyclades):
816
    """Remove all servers that belong to a virtual cluster
817
    A virtual cluster consists of the virtual servers with the same name prefix
818
    ATTENTION: make sure you want to delete all servers of that prefix
819
    To get a list of your servers:  /server list
820
    """
821

  
822
    @errors.generic.all
823
    @errors.cyclades.connection
824
    def _run(self, prefix):
825
        servers = [s['id'] for s in self.client.list_servers() if (
826
            s['name'].startswith(prefix))]
827
        self.client.delete_cluster(servers)
828

  
829
    def main(self, prefix):
830
        super(self.__class__, self)._run()
831
        self._run(prefix)
832

  
833

  
834 821
@command(flavor_cmds)
835 822
class flavor_list(_init_cyclades, _optional_json, _name_filter, _id_filter):
836 823
    """List available hardware flavors"""
b/kamaki/cli/commands/errors.py
218 218
                return foo(self, *args, **kwargs)
219 219
            except ValueError as ve:
220 220
                msg = 'Invalid network id %s ' % network_id
221
                details = ['network id must be a positive integer']
221
                details = 'network id must be a positive integer'
222 222
                raiseCLIError(ve, msg, details=details, importance=1)
223 223
            except ClientError as ce:
224 224
                if network_id and ce.status == 404 and (
......
274 274
                return foo(self, *args, **kwargs)
275 275
            except ValueError as ve:
276 276
                msg = 'Invalid flavor id %s ' % flavor_id,
277
                details = 'Flavor id must be a positive integer',
277
                details = 'Flavor id must be a positive integer'
278 278
                raiseCLIError(ve, msg, details=details, importance=1)
279 279
            except ClientError as ce:
280 280
                if flavor_id and ce.status == 404 and (
......
294 294
                return foo(self, *args, **kwargs)
295 295
            except ValueError as ve:
296 296
                msg = 'Invalid virtual server id %s' % server_id,
297
                details = ['id must be a positive integer'],
297
                details = 'Server id must be a positive integer'
298 298
                raiseCLIError(ve, msg, details=details, importance=1)
299 299
            except ClientError as ce:
300 300
                err_msg = ('%s' % ce).lower()
b/kamaki/clients/__init__.py
369 369
            return []
370 370
        return threadlist
371 371

  
372
    def async_run(self, method, kwarg_list):
373
        """Fire threads of operations
374

  
375
        :param method: the method to run in each thread
376

  
377
        :param kwarg_list: (list of dicts) the arguments to pass in each method
378
            call
379

  
380
        :returns: (list) the results of each method call w.r. to the order of
381
            kwarg_list
382
        """
383
        flying, results = {}, {}
384
        self._init_thread_limit()
385
        for index, kwargs in enumerate(kwarg_list):
386
            self._watch_thread_limit(flying.values())
387
            flying[index] = SilentEvent(method=method, **kwargs)
388
            flying[index].start()
389
            unfinished = {}
390
            for key, thread in flying.items():
391
                if thread.isAlive():
392
                    unfinished[key] = thread
393
                elif thread.exception:
394
                    print 'HERE IS AN EXCEPTION MK?'
395
                    raise thread.exception
396
                else:
397
                    results[key] = thread.value
398
                print 'NO EXCEPTION', thread.value
399
            flying = unfinished
400
        sendlog.info('- - - wait for threads to finish')
401
        for key, thread in flying.items():
402
            if thread.isAlive():
403
                thread.join()
404
            elif thread.exception:
405
                print 'HERE IS AN EXCEPTION MK-2?'
406
                raise thread.exception
407
            results[key] = thread.value
408
            print 'NO EXCEPTION-2', thread.value
409
        return results.values()
410

  
372 411
    def _raise_for_status(self, r):
373 412
        log.debug('raise err from [%s] of type[%s]' % (r, type(r)))
374 413
        status_msg = getattr(r, 'status', None) or ''
b/kamaki/clients/compute/__init__.py
33 33

  
34 34
from kamaki.clients import ClientError
35 35
from kamaki.clients.compute.rest_api import ComputeRestClient
36
from kamaki.clients.utils import path4url
37 36

  
38 37

  
39 38
class ComputeClient(ComputeRestClient):
b/kamaki/clients/cyclades/__init__.py
72 72
            name, flavor_id, image_id,
73 73
            metadata=metadata, personality=personality)
74 74

  
75
    def _async_run(self, method, kwarg_list):
76
        """Fire threads of operations
77

  
78
        :param method: the method to run in each thread
79

  
80
        :param kwarg_list: (list of dicts) the arguments to pass in each method
81
            call
82
        """
83
        flying = []
84
        self._init_thread_limit()
85
        for kwargs in kwarg_list:
86
            self._watch_thread_limit(flying)
87
            flying.append(SilentEvent(method=method, **kwargs))
88
            flying[-1].start()
89
            unfinished = []
90
            for thread in flying:
91
                if thread.isAlive():
92
                    unfinished.append(thread)
93
                elif thread.exception:
94
                    raise thread.exception
95
            flying = unfinished
96
        sendlog.info('- - - wait for threads to finish')
97
        for thread in flying:
98
            if thread.isAlive():
99
                thread.join()
100
            elif thread.exception:
101
                raise thread.exception
102

  
103
    def create_cluster(self, servers):
104
        """Create multiple servers asynchronously
105
        :param servers: (list of dicts) [
106
        {name, flavor_id, image_id, metadata, personality}, ...]
107
        """
108
        #  Perform async server creations
109
        return self._async_run(self.create_server, servers)
110

  
111
    def delete_cluster(self, server_ids):
112
        """
113
        :param server_ids: (list) ids of servers to delete
114
        """
115
        self._async_run(
116
            self.delete_server, [dict(server_id=sid) for sid in server_ids])
117

  
118 75
    def start_server(self, server_id):
119 76
        """Submit a startup request
120 77

  

Also available in: Unified diff