3 # Copyright 2013 GRNET S.A. All rights reserved.
5 # Redistribution and use in source and binary forms, with or
6 # without modification, are permitted provided that the following
9 # 1. Redistributions of source code must retain the above
10 # copyright notice, this list of conditions and the following
13 # 2. Redistributions in binary form must reproduce the above
14 # copyright notice, this list of conditions and the following
15 # disclaimer in the documentation and/or other materials
16 # provided with the distribution.
18 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
19 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
20 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
22 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
25 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
26 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
27 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
28 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 # POSSIBILITY OF SUCH DAMAGE.
31 # The views and conclusions contained in the software and
32 # documentation are those of the authors and should not be
33 # interpreted as representing official policies, either expressed
34 # or implied, of GRNET S.A.
42 import fabric.api as fabric
43 from ConfigParser import ConfigParser
45 from kamaki.clients.astakos import AstakosClient
46 from kamaki.clients.cyclades import CycladesClient
47 from kamaki.clients.image import ImageClient
49 fabric.env.disable_known_hosts = True
50 fabric.env.shell = "/bin/bash -c"
51 fabric.env.connection_attempts = 10
52 #fabric.env.output_prefix = None
55 #dir = os.path.dirname(os.path.abspath(__file__))
56 def get_list_items(s):
58 return [ x.strip() for x in l ]
60 #TODO add support for seperate output file
63 return fabric.run(cmd)
67 return "\x1b[31m" + str(msg) + "\x1b[0m"
72 return "\x1b[33m" + str(msg) + "\x1b[0m"
77 return "\x1b[32m" + str(msg) + "\x1b[0m"
80 def _check_fabric(fun):
81 """Check if fabric env has been set"""
82 def wrapper(self, *args):
83 """wrapper function"""
84 if not self.fabric_installed:
86 return fun(self, *args)
89 def capture_streams(func):
90 myout = StringIO.StringIO()
91 myerr = StringIO.StringIO()
93 def write_streams(stdout, stderr):
95 f = open(stdout, 'w+')
96 f.write(myout.getvalue())
99 f = open(stderr, 'w+')
100 f.write(myerr.getvalue())
103 def inner(*args, **kwargs):
104 stdout = kwargs.pop('stdout', None)
105 stderr = kwargs.pop('stderr', None)
106 mock = kwargs.pop('mock', True)
107 __stdout = sys.stdout
109 __stderr = sys.stderr
113 ret = func(*args, **kwargs)
116 write_streams(stdout, stderr)
117 sys.stdout = __stdout
118 sys.stderr = __stderr
120 print myout.getvalue()
121 print myerr.getvalue()
128 def get_port_from_ip(ip):
130 port = 10000 + int(ip[2]) * 256 + int(ip[3])
133 class Timeout(Exception):
135 def __init__(self, timeout):
136 self.timeout = timeout
139 return "Timed out after %d secs" % self.timeout
141 class RemoteCommandFailed(Exception):
143 def __init__(self, cmd):
147 return "Remote command failed: %s" % self.cmd
149 class RemotePutFailed(Exception):
152 def __init__(self, local, remote):
153 self.local_file = local
154 self.remote_path = remote
157 return "Failed to put local file %s to remote path %s" % \
158 (self.local_file, self.remote_path)
161 class _MyFormatter(logging.Formatter):
162 """Logging Formatter"""
163 def format(self, record):
164 format_orig = self._fmt
165 if record.levelno == logging.DEBUG:
166 self._fmt = " %(msg)s"
167 elif record.levelno == logging.INFO:
168 self._fmt = "%(msg)s"
169 elif record.levelno == logging.WARNING:
170 self._fmt = _yellow("[W] %(msg)s")
171 elif record.levelno == logging.ERROR:
172 self._fmt = _red("[E] %(msg)s")
173 result = logging.Formatter.format(self, record)
174 self._fmt = format_orig
177 class ConfigClient(object):
181 def __init__(self, conffile=None, **kwargs):
182 if self.logger is None:
183 ConfigClient.set_logger('ConfigClient')
185 if self.config is None:
186 ConfigClient.set_config(conffile)
189 self._write_config('Global', arg, kwargs[arg])
192 def set_logger(cls, name):
194 cls.logger = logging.getLogger(name)
195 cls.logger.setLevel(logging.INFO)
196 handler = logging.StreamHandler()
197 handler.setFormatter(_MyFormatter())
198 cls.logger.addHandler(handler)
201 def set_config(cls, conffile):
202 """Read config file"""
204 ci_dir = os.path.dirname(os.path.abspath(__file__))
205 cls.conffile = os.path.join(ci_dir, "config")
207 cls.conffile = conffile
208 cls.config = ConfigParser()
209 cls.config.optionxform = str
210 cls.config.read(cls.conffile)
212 def _write_config(self, section, option, value):
213 """Write changes back to config file"""
214 if not self.config.has_section(section):
215 self.config.add_section(section)
216 self.config.set(section, option, str(value))
217 with open(self.conffile, 'wb') as configfile:
218 self.config.write(configfile)
220 def _remove_config(self, section, option):
221 """Write changes back to config file"""
222 if not self.config.has_section(section):
224 self.config.remove_option(section, option)
225 with open(self.conffile, 'wb') as configfile:
226 self.config.write(configfile)
229 class CloudClient(ConfigClient):
231 cyclades_client = None
232 astakos_client = None
238 def __init__(self, conffile=None):
239 ConfigClient.__init__(self, conffile)
240 if self.logger is None:
241 CloudClient.set_logger('CloudClient')
243 if self.auth_url is None:
244 CloudClient.set_auth_url()
246 if self.token is None:
247 CloudClient.set_token()
249 if self.astakos_client is None:
250 CloudClient.set_astakos_client(self.auth_url, self.token)
252 if self.cyclades_url is None:
253 CloudClient.set_cyclades_url()
255 if self.cyclades_client is None:
256 CloudClient.set_cyclades_client(self.cyclades_url, self.token)
258 if self.image_client is None:
259 CloudClient.set_image_client(self.cyclades_url, self.token)
262 def set_logger(cls, name):
264 cls.logger = logging.getLogger(name)
265 cls.logger.setLevel(logging.INFO)
266 handler = logging.StreamHandler()
267 handler.setFormatter(_MyFormatter())
268 cls.logger.addHandler(handler)
271 def set_auth_url(cls):
273 cls.logger.info("Setup kamaki client..")
274 cls.auth_url = cls.config.get('Global', 'auth_url')
275 cls.logger.debug("Authentication URL is %s" % _green(cls.auth_url))
278 def set_cyclades_url(cls):
281 cls.astakos_client.get_service_endpoints('compute')['publicURL']
282 cls.logger.debug("Cyclades API url is %s" % _green(cls.cyclades_url))
287 cls.token = cls.config.get('Global', 'token')
288 cls.logger.debug("Token is %s" % _green(cls.token))
291 def set_astakos_client(cls, auth_url, token):
293 cls.astakos_client = AstakosClient(auth_url, token)
296 def set_cyclades_client(cls, cyclades_url, token):
298 cls.cyclades_client = CycladesClient(cyclades_url, token)
299 cls.cyclades_client.CONNECTION_RETRY_LIMIT = 2
302 def set_image_client(cls, cyclades_url, token):
305 cls.astakos_client.get_service_endpoints('image')['publicURL']
306 cls.logger.debug("Images API url is %s" % _green(image_url))
307 cls.image_client = ImageClient(cyclades_url, token)
308 cls.image_client.CONNECTION_RETRY_LIMIT = 2
310 def _wait_transition(self, server_id, new_status):
311 """Wait for server to go to new_status"""
312 self.logger.debug("Waiting for server to become %s" % new_status)
313 timeout = self.config.getint('Global', 'build_timeout')
316 server = self.cyclades_client.get_server_details(server_id)
317 if server['status'] == new_status:
321 "Waiting for server to become %s timed out" % new_status)
323 time.sleep(sleep_time)
325 class Server(CloudClient):
342 def __init__(self, config_id):
343 CloudClient.__init__(self)
344 self.logger = logging.getLogger(config_id)
345 self.logger.setLevel(logging.INFO)
346 handler = logging.StreamHandler()
347 handler.setFormatter(_MyFormatter())
348 self.logger.addHandler(handler)
350 self.config_id = config_id
351 self.__get_flavor_id()
352 self.__get_image_id()
354 self.__get_packages()
355 self.__get_update_cmd()
356 self.__get_install_cmd()
359 if self.config.has_option(self.config_id, 'server_id'):
360 self.server_id = int(self.config.get(self.config_id, 'server_id'))
361 if self.config.has_option(self.config_id, 'user'):
362 self.user = self.config.get(self.config_id, 'user')
363 if self.config.has_option(self.config_id, 'passwd'):
364 self.passwd = self.config.get(self.config_id, 'passwd')
366 if self.flavor_id is None:
367 raise Exception("Flavor id not found for %s" % self.config_id)
369 if self.image_id is None:
370 raise Exception("Image id not found for %s" % self.config_id)
372 if self.server_id is not None:
373 server = self.cyclades_client.get_server_details(self.server_id)
375 raise Exception("Invalid server id")
378 def __get_packages(self):
380 if self.config.has_option(self.config_id, 'packages'):
381 packages += get_list_items(self.config.get(self.config_id, 'packages'))
382 if self.config.has_option('Global', 'packages'):
383 packages += get_list_items(self.config.get('Global', 'packages'))
385 if len(packages) > 0:
386 self.packages = packages
388 def __get_files(self):
392 if self.config.has_option(self.config_id, 'files'):
393 tmp1 = get_list_items(self.config.get(self.config_id, 'files'))
394 if len(tmp1) > 0 and len(tmp1) % 2 == 0:
396 if self.config.has_option('Global', 'files'):
397 tmp1 += get_list_items(self.config.get('Global', 'files'))
398 if len(tmp1) > 0 and len(tmp1) % 2 == 0:
401 for i in range(0, len(tmp), 2):
402 t = (tmp[i], tmp[i+1])
408 def __get_flavor_id(self):
409 if self.config.has_option(self.config_id, 'flavor_id'):
410 self.flavor_id = self.config.get(self.config_id, 'flavor_id')
411 elif self.config.has_option('Global', 'flavor_id'):
412 self.flavor_id = self.config.get('Global', 'flavor_id')
414 def __get_image_id(self):
415 if self.config.has_option(self.config_id, 'image_id'):
416 self.image_id = self.config.get(self.config_id, 'image_id')
417 elif self.config.has_option('Global', 'image_id'):
418 self.image_id = self.config.get('Global', 'image_id')
420 def __get_name(self):
421 if self.config.has_option(self.config_id, 'name'):
422 self.name = self.config.get(self.config_id, 'name')
424 self.name = self.config_id
426 def __get_update_cmd(self):
427 if self.config.has_option(self.config_id, 'update_cmd'):
428 self.update_cmd = self.config.get(self.config_id, 'update_cmd')
429 elif self.config.has_option('Global', 'update_cmd'):
430 self.update_cmd = self.config.get('Global', 'update_cmd')
432 def __get_install_cmd(self):
433 if self.config.has_option(self.config_id, 'install_cmd'):
434 self.install_cmd = self.config.get(self.config_id, 'install_cmd')
435 elif self.config.has_option('Global', 'install_cmd'):
436 self.install_cmd = self.config.get('Global', 'install_cmd')
439 def wait_transition(self, new_status):
441 server = self._wait_transition(self.server_id, new_status)
448 def create(self, wait=False):
449 if self.status and self.status != "DELETED":
451 self.logger.info("Create a new server..")
452 server = self.cyclades_client.create_server(self.name, self.flavor_id,
455 self.server_id = server['id']
456 self.logger.debug("Server got id %s" % _green(self.server_id))
457 self.user = server['metadata']['users']
458 self.logger.debug("Server's admin user is %s" % _green(self.user))
459 self.passwd= server['adminPass']
461 "Server's admin password is %s" % _green(self.passwd))
463 server = self.wait_transition("ACTIVE")
468 def destroy(self, wait=False):
469 if self.server_id is None:
470 self.logger.debug("Server %s does not have server_id" %
474 if self.status == "DELETED":
475 self.logger.debug("Server %d is marked as DELETED" %
480 self.logger.info("Destoying server with id %s " % self.server_id)
481 self.cyclades_client.delete_server(self.server_id)
483 return self.wait_transition("DELETED")
486 def update(self, server=None):
488 if self.server_id is not None:
489 server = self.cyclades_client.get_server_details(self.server_id)
491 raise Exception("Server id is not set")
493 server_ip = server['attachments'][0]['ipv4']
497 if server_ip and self.config.has_option('Global', 'okeanos_io'):
498 io = self.config.getboolean('Global', 'okeanos_io')
500 server_port = get_port_from_ip(server_ip)
501 server_ip = "gate.demo.synnefo.org"
503 self.logger.debug("Server's IPv4 is %s" % _green(server_ip))
504 self.logger.debug("Server's ssh port is %s" % _green(server_port))
505 self.ipv4 = server_ip
506 self.port = server_port
507 self.status = server['status']
510 def write_config(self):
511 if self.status == "DELETED":
512 return self.clear_config()
514 self._write_config(self.config_id, 'server_id', self.server_id)
516 self._remove_config(self.config_id, 'server_id')
518 self._write_config(self.config_id, 'user', self.user)
520 self._remove_config(self.config_id, 'user')
522 self._write_config(self.config_id, 'passwd', self.passwd)
524 self._remove_config(self.config_id, 'passwd')
526 self._write_config(self.config_id, 'ipv4', self.ipv4)
528 self._remove_config(self.config_id, 'ipv4')
530 self._write_config(self.config_id, 'port', self.port)
532 self._remove_config(self.config_id, 'port')
534 def clear_config(self):
535 self._remove_config(self.config_id, 'server_id')
536 self._remove_config(self.config_id, 'user')
537 self._remove_config(self.config_id, 'passwd')
538 self._remove_config(self.config_id, 'ipv4')
539 self._remove_config(self.config_id, 'port')
542 def ping(self, timeout):
545 while timeout == 0 or time.time() - start < timeout:
546 self.logger.info("Pinging host %s(%s)" % \
547 (_green(self.config_id), _green(self.ipv4)))
548 cmd = ['ping', '-c', '1', '-w', '20', self.ipv4]
549 ping = subprocess.Popen(cmd, shell=False,
550 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
551 (stdout, stderr) = ping.communicate()
554 self.logger.info("Pinging host %s %s" % \
555 (self.config_id, _green("succeeded")))
557 self.logger.info("Pinging host %s failed. %s" % \
558 (self.config_id, _yellow("Retrying")))
560 raise Timeout(timeout)
563 def __execute_command(self, cmd):
564 with fabric.settings(host_string=str(self.ipv4), port=self.port,
565 user=self.user, password=self.passwd, warn_only=True):
566 if not _run(cmd).succeeded:
567 raise RemoteCommandFailed(cmd)
569 def execute_command(self, cmd, verbose=False):
570 host = "%s@%s:%d" % (self.user, self.ipv4, self.port)
571 self.logger.info("Executing cmd \"%s\" on host %s " % \
572 (_yellow(cmd), _green(host)))
574 return self.__execute_command(cmd, mock=mock, stdout='/tmp/cmd_out',
575 stderr='/tmp/cmd_err')
577 def install_packages(self, packages=None):
579 packages = self.packages
584 self.logger.info("Installing packages \"%s\" on host %s " % \
585 (_green(' '.join(packages)), _green(self.config_id)))
586 self.execute_command("""
589 """.format(self.update_cmd, self.install_cmd, ' '.join(packages)))
592 def _inject_file(self, local_file, remote_path):
593 with fabric.settings(host_string=str(self.ipv4), port=self.port,
594 user=self.user, password=self.passwd, warn_only=True):
595 if not fabric.put(local_file, remote_path).succeeded:
596 raise RemotePutFailed(local_file, remote_path)
598 def inject_file(self, local_file, remote_path, verbose=False):
599 self.logger.info("Putting file %s on host %s" %
600 (_yellow(local_file), _green(self.config_id)))
602 self._inject_file(local_file, remote_path, mock=mock)
604 def inject_files(self):
606 self.inject_file(f[0], f[1])
608 class Cluster(ConfigClient):
610 def __init__(self, **kwargs):
611 ConfigClient.__init__(self, **kwargs)
614 self.logger = logging.getLogger('Cluster')
615 self.logger.setLevel(logging.INFO)
616 handler = logging.StreamHandler()
617 handler.setFormatter(_MyFormatter())
618 self.logger.addHandler(handler)
619 self.server_list = self.config.get('Global', 'servers')
620 self.server_list = get_list_items(self.server_list)
622 self.cluster_created = True
623 for s in self.server_list:
625 if not server.status or server.status == "DELETED":
626 self.cluster_created = False
627 self.servers.append(server)
629 self.destroy_on_error = False
630 if self.config.has_option('Global', 'destroy_on_error'):
631 self.destroy_on_error = \
632 self.config.getboolean('Global', 'destroy_on_error')
634 self.cleanup_servers = False
635 if self.config.has_option('Global', 'cleanup_servers'):
636 self.cleanup_servers = \
637 self.config.getboolean('Global', 'cleanup_servers')
640 if self.cleanup_servers:
645 for s in self.servers:
646 if not s.status or s.status != "ACTIVE":
650 self.wait_status(submitted, "ACTIVE")
652 if self.config.has_option('Global', 'okeanos_io'):
653 io = self.config.getboolean('Global', 'okeanos_io')
655 for s in self.servers:
658 for s in self.servers:
661 for s in self.servers:
664 except Exception as e:
665 if self.destroy_on_error:
668 self.cluster_created = True
672 for s in self.servers:
676 self.wait_status(submitted, "DELETED")
677 self.cluster_created = False
679 def execute_command(self, cmd, verbose=False):
680 for s in self.servers:
681 s.execute_command(cmd, verbose)
683 def inject_file(self, local_file, remote_path, verbose=False):
684 for s in self.servers:
685 s.inject_file(local_file, remote_path, verbose=verbose)
687 def install_packages(self, packages=None):
688 for s in self.servers:
689 s.install_packages(packages)
692 def wait_status(self, servers, status):
693 self.logger.debug("Waiting for %d servers to become %s" %
694 (len(servers), status ))
696 if s.wait_transition(status):
697 self.logger.debug("Server %d became %s" % (s.server_id, status))
702 def get_server(self, name):
703 for s in self.servers:
704 if s.config_id == name:
708 if __name__ == '__main__':