Create/expose methods for mass VM create/delete
authorStavros Sachtouris <saxtouri@admin.grnet.gr>
Thu, 3 Oct 2013 13:18:05 +0000 (16:18 +0300)
committerStavros Sachtouris <saxtouri@admin.grnet.gr>
Thu, 3 Oct 2013 13:18:05 +0000 (16:18 +0300)
Refs: #4429

kamaki/cli/commands/cyclades.py
kamaki/cli/commands/errors.py
kamaki/cli/commands/pithos.py
kamaki/clients/cyclades/__init__.py

index 85a034d..729ee85 100644 (file)
@@ -383,7 +383,6 @@ class server_create(_init_cyclades, _optional_json, _server_wait):
     @errors.plankton.id
     @errors.cyclades.flavor_id
     def _run(self, name, flavor_id, image_id):
-        print 'hey, wha?'
         r = self.client.create_server(
             name, int(flavor_id), image_id, personality=self['personality'])
         usernames = self._uuids2usernames([r['user_id'], r['tenant_id']])
@@ -786,6 +785,52 @@ class server_wait(_init_cyclades, _server_wait):
         self._run(server_id=server_id, current_status=current_status)
 
 
+@command(server_cmds)
+class server_cluster_create(_init_cyclades):
+    """Create a cluster of virtual servers
+    All new servers will be named as <prefix><increment> e.g.,
+    mycluster1, mycluster2, etc.
+    All servers in the cluster will run the same image on the same hardware
+    flavor.
+    """
+
+    @errors.generic.all
+    @errors.cyclades.connection
+    @errors.plankton.id
+    @errors.cyclades.flavor_id
+    @errors.cyclades.cluster_size
+    def _run(self, prefix, image_id, flavor_id, size):
+        servers = [dict(
+            name='%s%s' % (prefix, i),
+            flavor_id=flavor_id,
+            image_id=image_id) for i in range(int(size))]
+        self.client.create_cluster(servers)
+
+    def main(self, prefix, image_id, flavor_id, size):
+        super(self.__class__, self)._run()
+        self._run(prefix, image_id=image_id, flavor_id=flavor_id, size=size)
+
+
+@command(server_cmds)
+class server_cluster_delete(_init_cyclades):
+    """Remove all servers that belong to a virtual cluster
+    A virtual cluster consists of the virtual servers with the same name prefix
+    ATTENTION: make sure you want to delete all servers of that prefix
+    To get a list of your servers:  /server list
+    """
+
+    @errors.generic.all
+    @errors.cyclades.connection
+    def _run(self, prefix):
+        servers = [s['id'] for s in self.client.list_servers() if (
+            s['name'].startswith(prefix))]
+        self.client.delete_cluster(servers)
+
+    def main(self, prefix):
+        super(self.__class__, self)._run()
+        self._run(prefix)
+
+
 @command(flavor_cmds)
 class flavor_list(_init_cyclades, _optional_json, _name_filter, _id_filter):
     """List available hardware flavors"""
index c75cc34..ff162a2 100644 (file)
@@ -191,6 +191,25 @@ class cyclades(object):
         return _raise
 
     @classmethod
+    def cluster_size(this, foo):
+        def _raise(self, *args, **kwargs):
+            size = kwargs.get('size', None)
+            try:
+                size = int(size)
+                assert size > 0, 'Cluster size must be a positive integer'
+                return foo(self, *args, **kwargs)
+            except ValueError as ve:
+                msg = 'Invalid cluster size value %s' % size
+                raiseCLIError(ve, msg, importance=1, details=[
+                    'Cluster size must be a positive integer'])
+            except AssertionError as ae:
+                raiseCLIError(
+                    ae, 'Invalid cluster size %s' % size, importance=1)
+            except ClientError:
+                raise
+        return _raise
+
+    @classmethod
     def network_id(this, foo):
         def _raise(self, *args, **kwargs):
             network_id = kwargs.get('network_id', None)
index 28850eb..57cfdd2 100644 (file)
@@ -231,9 +231,7 @@ class _file_container_command(_file_account_command):
             'Set container to work with (temporary)', ('-C', '--container'))
 
     def extract_container_and_path(
-            self,
-            container_with_path,
-            path_is_optional=True):
+            self, container_with_path, path_is_optional=True):
         """Contains all heuristics for deciding what should be used as
         container or path. Options are:
         * user string of the form container:path
index b86a5a5..6c6787b 100644 (file)
 # interpreted as representing official policies, either expressed
 # or implied, of GRNET S.A.
 
-from sys import stdout
 from time import sleep
 
 from kamaki.clients.cyclades.rest_api import CycladesRestClient
-from kamaki.clients import ClientError
+from kamaki.clients import ClientError, SilentEvent, sendlog
 
 
 class CycladesClient(CycladesRestClient):
@@ -73,6 +72,49 @@ class CycladesClient(CycladesRestClient):
             name, flavor_id, image_id,
             metadata=metadata, personality=personality)
 
+    def _async_run(self, method, kwarg_list):
+        """Fire threads of operations
+
+        :param method: the method to run in each thread
+
+        :param kwarg_list: (list of dicts) the arguments to pass in each method
+            call
+        """
+        flying = []
+        self._init_thread_limit()
+        for kwargs in kwarg_list:
+            self._watch_thread_limit(flying)
+            flying.append(SilentEvent(method=method, **kwargs))
+            flying[-1].start()
+            unfinished = []
+            for thread in flying:
+                if thread.isAlive():
+                    unfinished.append(thread)
+                elif thread.exception:
+                    raise thread.exception
+            flying = unfinished
+        sendlog.info('- - - wait for threads to finish')
+        for thread in flying:
+            if thread.isAlive():
+                thread.join()
+            elif thread.exception:
+                raise thread.exception
+
+    def create_cluster(self, servers):
+        """Create multiple servers asynchronously
+        :param servers: (list of dicts) [
+        {name, flavor_id, image_id, metadata, personality}, ...]
+        """
+        #  Perform async server creations
+        return self._async_run(self.create_server, servers)
+
+    def delete_cluster(self, server_ids):
+        """
+        :param server_ids: (list) ids of servers to delete
+        """
+        self._async_run(
+            self.delete_server, [dict(server_id=sid) for sid in server_ids])
+
     def start_server(self, server_id):
         """Submit a startup request