Move cluster handling in server create/delete
authorStavros Sachtouris <saxtouri@admin.grnet.gr>
Thu, 3 Oct 2013 15:46:52 +0000 (18:46 +0300)
committerStavros Sachtouris <saxtouri@admin.grnet.gr>
Thu, 3 Oct 2013 15:46:52 +0000 (18:46 +0300)
Refs: #4429

kamaki/cli/commands/cyclades.py
kamaki/cli/commands/errors.py
kamaki/clients/__init__.py
kamaki/clients/compute/__init__.py
kamaki/clients/cyclades/__init__.py

index 729ee85..2adea2c 100644 (file)
@@ -375,22 +375,41 @@ class server_create(_init_cyclades, _optional_json, _server_wait):
     arguments = dict(
         personality=PersonalityArgument(
             (80 * ' ').join(howto_personality), ('-p', '--personality')),
-        wait=FlagArgument('Wait server to build', ('-w', '--wait'))
+        wait=FlagArgument('Wait server to build', ('-w', '--wait')),
+        cluster_size=IntArgument(
+            'Create a cluster of servers of this size. In this case, the name'
+            'parameter is the prefix of each server in the cluster (e.g.,'
+            'srv1, srv2, etc.',
+            '--cluster-size')
     )
 
+    @errors.cyclades.cluster_size
+    def _create_cluster(self, prefix, flavor_id, image_id, size):
+        servers = [dict(
+            name='%s%s' % (prefix, i),
+            flavor_id=flavor_id,
+            image_id=image_id,
+            personality=self['personality']) for i in range(size)]
+        if size == 1:
+            return [self.client.create_server(**servers[0])]
+        return self.client.async_run(self.client.create_server, servers)
+
     @errors.generic.all
     @errors.cyclades.connection
     @errors.plankton.id
     @errors.cyclades.flavor_id
     def _run(self, name, flavor_id, image_id):
-        r = self.client.create_server(
-            name, int(flavor_id), image_id, personality=self['personality'])
-        usernames = self._uuids2usernames([r['user_id'], r['tenant_id']])
-        r['user_id'] += ' (%s)' % usernames[r['user_id']]
-        r['tenant_id'] += ' (%s)' % usernames[r['tenant_id']]
-        self._print(r, self.print_dict)
-        if self['wait']:
-            self._wait(r['id'], r['status'])
+        for r in self._create_cluster(
+                name, flavor_id, image_id, size=self['cluster_size'] or 1):
+            print 'HEY I GOT A', r
+            print 'MKEY?????????????????'
+            usernames = self._uuids2usernames([r['user_id'], r['tenant_id']])
+            r['user_id'] += ' (%s)' % usernames[r['user_id']]
+            r['tenant_id'] += ' (%s)' % usernames[r['tenant_id']]
+            self._print(r, self.print_dict)
+            if self['wait']:
+                self._wait(r['id'], r['status'])
+            self.error('')
 
     def main(self, name, flavor_id, image_id):
         super(self.__class__, self)._run()
@@ -421,27 +440,41 @@ class server_delete(_init_cyclades, _optional_output_cmd, _server_wait):
     """Delete a virtual server"""
 
     arguments = dict(
-        wait=FlagArgument('Wait server to be destroyed', ('-w', '--wait'))
+        wait=FlagArgument('Wait server to be destroyed', ('-w', '--wait')),
+        cluster=FlagArgument(
+            '(DANGEROUS) Delete all virtual servers prefixed with the cluster '
+            'prefix. In that case, the prefix replaces the server id',
+            '--cluster')
     )
 
+    def _server_ids(self, server_var):
+        if self['cluster']:
+            return [s['id'] for s in self.client.list_servers() if (
+                s['name'].startswith(server_var))]
+
+        @errors.cyclades.server_id
+        def _check_server_id(self, server_id):
+            return server_id
+
+        return [_check_server_id(self, server_id=server_var), ]
+
     @errors.generic.all
     @errors.cyclades.connection
-    @errors.cyclades.server_id
-    def _run(self, server_id):
-            status = 'DELETED'
+    def _run(self, server_var):
+        for server_id in self._server_ids(server_var):
             if self['wait']:
                 details = self.client.get_server_details(server_id)
                 status = details['status']
 
-            r = self.client.delete_server(int(server_id))
+            r = self.client.delete_server(server_id)
             self._optional_output(r)
 
             if self['wait']:
                 self._wait(server_id, status)
 
-    def main(self, server_id):
+    def main(self, server_id_or_cluster_prefix):
         super(self.__class__, self)._run()
-        self._run(server_id=server_id)
+        self._run(server_id_or_cluster_prefix)
 
 
 @command(server_cmds)
@@ -785,52 +818,6 @@ class server_wait(_init_cyclades, _server_wait):
         self._run(server_id=server_id, current_status=current_status)
 
 
-@command(server_cmds)
-class server_cluster_create(_init_cyclades):
-    """Create a cluster of virtual servers
-    All new servers will be named as <prefix><increment> e.g.,
-    mycluster1, mycluster2, etc.
-    All servers in the cluster will run the same image on the same hardware
-    flavor.
-    """
-
-    @errors.generic.all
-    @errors.cyclades.connection
-    @errors.plankton.id
-    @errors.cyclades.flavor_id
-    @errors.cyclades.cluster_size
-    def _run(self, prefix, image_id, flavor_id, size):
-        servers = [dict(
-            name='%s%s' % (prefix, i),
-            flavor_id=flavor_id,
-            image_id=image_id) for i in range(int(size))]
-        self.client.create_cluster(servers)
-
-    def main(self, prefix, image_id, flavor_id, size):
-        super(self.__class__, self)._run()
-        self._run(prefix, image_id=image_id, flavor_id=flavor_id, size=size)
-
-
-@command(server_cmds)
-class server_cluster_delete(_init_cyclades):
-    """Remove all servers that belong to a virtual cluster
-    A virtual cluster consists of the virtual servers with the same name prefix
-    ATTENTION: make sure you want to delete all servers of that prefix
-    To get a list of your servers:  /server list
-    """
-
-    @errors.generic.all
-    @errors.cyclades.connection
-    def _run(self, prefix):
-        servers = [s['id'] for s in self.client.list_servers() if (
-            s['name'].startswith(prefix))]
-        self.client.delete_cluster(servers)
-
-    def main(self, prefix):
-        super(self.__class__, self)._run()
-        self._run(prefix)
-
-
 @command(flavor_cmds)
 class flavor_list(_init_cyclades, _optional_json, _name_filter, _id_filter):
     """List available hardware flavors"""
index ff162a2..5e336e4 100644 (file)
@@ -218,7 +218,7 @@ class cyclades(object):
                 return foo(self, *args, **kwargs)
             except ValueError as ve:
                 msg = 'Invalid network id %s ' % network_id
-                details = ['network id must be a positive integer']
+                details = 'network id must be a positive integer'
                 raiseCLIError(ve, msg, details=details, importance=1)
             except ClientError as ce:
                 if network_id and ce.status == 404 and (
@@ -274,7 +274,7 @@ class cyclades(object):
                 return foo(self, *args, **kwargs)
             except ValueError as ve:
                 msg = 'Invalid flavor id %s ' % flavor_id,
-                details = 'Flavor id must be a positive integer',
+                details = 'Flavor id must be a positive integer'
                 raiseCLIError(ve, msg, details=details, importance=1)
             except ClientError as ce:
                 if flavor_id and ce.status == 404 and (
@@ -294,7 +294,7 @@ class cyclades(object):
                 return foo(self, *args, **kwargs)
             except ValueError as ve:
                 msg = 'Invalid virtual server id %s' % server_id,
-                details = ['id must be a positive integer'],
+                details = 'Server id must be a positive integer'
                 raiseCLIError(ve, msg, details=details, importance=1)
             except ClientError as ce:
                 err_msg = ('%s' % ce).lower()
index 49e46a9..ad7640f 100644 (file)
@@ -369,6 +369,45 @@ class Client(Logged):
             return []
         return threadlist
 
+    def async_run(self, method, kwarg_list):
+        """Fire threads of operations
+
+        :param method: the method to run in each thread
+
+        :param kwarg_list: (list of dicts) the arguments to pass in each method
+            call
+
+        :returns: (list) the results of each method call w.r. to the order of
+            kwarg_list
+        """
+        flying, results = {}, {}
+        self._init_thread_limit()
+        for index, kwargs in enumerate(kwarg_list):
+            self._watch_thread_limit(flying.values())
+            flying[index] = SilentEvent(method=method, **kwargs)
+            flying[index].start()
+            unfinished = {}
+            for key, thread in flying.items():
+                if thread.isAlive():
+                    unfinished[key] = thread
+                elif thread.exception:
+                    print 'HERE IS AN EXCEPTION MK?'
+                    raise thread.exception
+                else:
+                    results[key] = thread.value
+                print 'NO EXCEPTION', thread.value
+            flying = unfinished
+        sendlog.info('- - - wait for threads to finish')
+        for key, thread in flying.items():
+            if thread.isAlive():
+                thread.join()
+            elif thread.exception:
+                print 'HERE IS AN EXCEPTION MK-2?'
+                raise thread.exception
+            results[key] = thread.value
+            print 'NO EXCEPTION-2', thread.value
+        return results.values()
+
     def _raise_for_status(self, r):
         log.debug('raise err from [%s] of type[%s]' % (r, type(r)))
         status_msg = getattr(r, 'status', None) or ''
index a7ab84b..d2649bf 100644 (file)
@@ -33,7 +33,6 @@
 
 from kamaki.clients import ClientError
 from kamaki.clients.compute.rest_api import ComputeRestClient
-from kamaki.clients.utils import path4url
 
 
 class ComputeClient(ComputeRestClient):
index 6c6787b..0461afa 100644 (file)
@@ -72,49 +72,6 @@ class CycladesClient(CycladesRestClient):
             name, flavor_id, image_id,
             metadata=metadata, personality=personality)
 
-    def _async_run(self, method, kwarg_list):
-        """Fire threads of operations
-
-        :param method: the method to run in each thread
-
-        :param kwarg_list: (list of dicts) the arguments to pass in each method
-            call
-        """
-        flying = []
-        self._init_thread_limit()
-        for kwargs in kwarg_list:
-            self._watch_thread_limit(flying)
-            flying.append(SilentEvent(method=method, **kwargs))
-            flying[-1].start()
-            unfinished = []
-            for thread in flying:
-                if thread.isAlive():
-                    unfinished.append(thread)
-                elif thread.exception:
-                    raise thread.exception
-            flying = unfinished
-        sendlog.info('- - - wait for threads to finish')
-        for thread in flying:
-            if thread.isAlive():
-                thread.join()
-            elif thread.exception:
-                raise thread.exception
-
-    def create_cluster(self, servers):
-        """Create multiple servers asynchronously
-        :param servers: (list of dicts) [
-        {name, flavor_id, image_id, metadata, personality}, ...]
-        """
-        #  Perform async server creations
-        return self._async_run(self.create_server, servers)
-
-    def delete_cluster(self, server_ids):
-        """
-        :param server_ids: (list) ids of servers to delete
-        """
-        self._async_run(
-            self.delete_server, [dict(server_id=sid) for sid in server_ids])
-
     def start_server(self, server_id):
         """Submit a startup request