0e6e3ba966b4bfca490390d570877e2c39436803
[archipelago] / ci / cluster.py
1 #!/usr/bin/env python
2
3 # Copyright 2013 GRNET S.A. All rights reserved.
4 #
5 # Redistribution and use in source and binary forms, with or
6 # without modification, are permitted provided that the following
7 # conditions are met:
8 #
9 #   1. Redistributions of source code must retain the above
10 #      copyright notice, this list of conditions and the following
11 #      disclaimer.
12 #
13 #   2. Redistributions in binary form must reproduce the above
14 #      copyright notice, this list of conditions and the following
15 #      disclaimer in the documentation and/or other materials
16 #      provided with the distribution.
17 #
18 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
19 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
20 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
22 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
25 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
26 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
27 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
28 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 # POSSIBILITY OF SUCH DAMAGE.
30 #
31 # The views and conclusions contained in the software and
32 # documentation are those of the authors and should not be
33 # interpreted as representing official policies, either expressed
34 # or implied, of GRNET S.A.
35
36 import os
37 import sys
38 import time
39 import logging
40 import subprocess
41 import StringIO
42 import fabric.api as fabric
43 from ConfigParser import ConfigParser
44
45 from kamaki.clients.astakos import AstakosClient
46 from kamaki.clients.cyclades import CycladesClient
47 from kamaki.clients.image import ImageClient
48
49 fabric.env.disable_known_hosts = True
50 fabric.env.shell = "/bin/bash -c"
51 fabric.env.connection_attempts = 10
52 #fabric.env.output_prefix = None
53
54
55 #dir = os.path.dirname(os.path.abspath(__file__))
56 def get_list_items(s):
57     l = s.split(',')
58     return [ x.strip() for x in l ]
59
60 #TODO add support for seperate output file
61 def _run(cmd):
62     """Run fabric"""
63     return fabric.run(cmd)
64
65 def _red(msg):
66     """Red color"""
67     return "\x1b[31m" + str(msg) + "\x1b[0m"
68
69
70 def _yellow(msg):
71     """Yellow color"""
72     return "\x1b[33m" + str(msg) + "\x1b[0m"
73
74
75 def _green(msg):
76     """Green color"""
77     return "\x1b[32m" + str(msg) + "\x1b[0m"
78
79
80 def _check_fabric(fun):
81     """Check if fabric env has been set"""
82     def wrapper(self, *args):
83         """wrapper function"""
84         if not self.fabric_installed:
85             self.setup_fabric()
86         return fun(self, *args)
87     return wrapper
88
89 def capture_streams(func):
90     myout = StringIO.StringIO()
91     myerr = StringIO.StringIO()
92
93     def write_streams(stdout, stderr):
94         if stdout:
95             f = open(stdout, 'w+')
96             f.write(myout.getvalue())
97             f.close()
98         if stderr:
99             f = open(stderr, 'w+')
100             f.write(myerr.getvalue())
101             f.close()
102
103     def inner(*args, **kwargs):
104         stdout = kwargs.pop('stdout', None)
105         stderr = kwargs.pop('stderr', None)
106         mock = kwargs.pop('mock', True)
107         __stdout = sys.stdout
108         sys.stdout = myout
109         __stderr = sys.stderr
110         sys.stderr = myerr
111         exc = True
112         try:
113             ret = func(*args, **kwargs)
114             exc = False
115         finally:
116             write_streams(stdout, stderr)
117             sys.stdout = __stdout
118             sys.stderr = __stderr
119             if not mock or exc:
120                 print myout.getvalue()
121                 print myerr.getvalue()
122             myout.truncate(0)
123             myerr.truncate(0)
124
125         return ret
126     return inner
127
128 def get_port_from_ip(ip):
129     ip = ip.split('.')
130     port = 10000 + int(ip[2]) * 256 + int(ip[3])
131     return port
132
133 class Timeout(Exception):
134     timeout = 0
135     def __init__(self, timeout):
136         self.timeout = timeout
137
138     def __str__(self):
139         return "Timed out after %d secs" % self.timeout
140
141 class RemoteCommandFailed(Exception):
142     cmd = None
143     def __init__(self, cmd):
144         self.cmd = cmd
145
146     def __str__(self):
147         return "Remote command failed: %s" % self.cmd
148
149 class RemotePutFailed(Exception):
150     local_file = None
151     remote_path = None
152     def __init__(self, local, remote):
153         self.local_file = local
154         self.remote_path = remote
155
156     def __str__(self):
157         return "Failed to put local file %s to remote path %s" % \
158                 (self.local_file, self.remote_path)
159
160
161 class _MyFormatter(logging.Formatter):
162     """Logging Formatter"""
163     def format(self, record):
164         format_orig = self._fmt
165         if record.levelno == logging.DEBUG:
166             self._fmt = "  %(msg)s"
167         elif record.levelno == logging.INFO:
168             self._fmt = "%(msg)s"
169         elif record.levelno == logging.WARNING:
170             self._fmt = _yellow("[W] %(msg)s")
171         elif record.levelno == logging.ERROR:
172             self._fmt = _red("[E] %(msg)s")
173         result = logging.Formatter.format(self, record)
174         self._fmt = format_orig
175         return result
176
177 class ConfigClient(object):
178     logger = None
179     config = None
180
181     def __init__(self, conffile=None, **kwargs):
182         if self.logger is None:
183             ConfigClient.set_logger('ConfigClient')
184
185         if self.config is None:
186             ConfigClient.set_config(conffile)
187
188         for arg in kwargs:
189             self._write_config('Global', arg, kwargs[arg]) 
190
191     @classmethod
192     def set_logger(cls, name):
193         """Foo"""
194         cls.logger = logging.getLogger(name)
195         cls.logger.setLevel(logging.INFO)
196         handler = logging.StreamHandler()
197         handler.setFormatter(_MyFormatter())
198         cls.logger.addHandler(handler)
199
200     @classmethod
201     def set_config(cls, conffile):
202         """Read config file"""
203         if not conffile:
204             ci_dir = os.path.dirname(os.path.abspath(__file__))
205             cls.conffile = os.path.join(ci_dir, "config")
206         else:
207             cls.conffile = conffile
208         cls.config = ConfigParser()
209         cls.config.optionxform = str
210         cls.config.read(cls.conffile)
211
212     def _write_config(self, section, option, value):
213         """Write changes back to config file"""
214         if not self.config.has_section(section):
215             self.config.add_section(section)
216         self.config.set(section, option, str(value))
217         with open(self.conffile, 'wb') as configfile:
218             self.config.write(configfile)
219
220     def _remove_config(self, section, option):
221         """Write changes back to config file"""
222         if not self.config.has_section(section):
223             return
224         self.config.remove_option(section, option)
225         with open(self.conffile, 'wb') as configfile:
226             self.config.write(configfile)
227
228
229 class CloudClient(ConfigClient):
230     image_client = None
231     cyclades_client = None
232     astakos_client = None
233     logger = None
234     auth_url = None
235     token = None
236     cyclades_url = None
237
238     def __init__(self, conffile=None):
239         ConfigClient.__init__(self, conffile)
240         if self.logger is None:
241             CloudClient.set_logger('CloudClient')
242
243         if self.auth_url is None:
244             CloudClient.set_auth_url()
245
246         if self.token is None:
247             CloudClient.set_token()
248
249         if self.astakos_client is None:
250             CloudClient.set_astakos_client(self.auth_url, self.token)
251
252         if self.cyclades_url is None:
253             CloudClient.set_cyclades_url()
254
255         if self.cyclades_client is None:
256             CloudClient.set_cyclades_client(self.cyclades_url, self.token)
257
258         if self.image_client is None:
259             CloudClient.set_image_client(self.cyclades_url, self.token)
260
261     @classmethod
262     def set_logger(cls, name):
263         """Foo"""
264         cls.logger = logging.getLogger(name)
265         cls.logger.setLevel(logging.INFO)
266         handler = logging.StreamHandler()
267         handler.setFormatter(_MyFormatter())
268         cls.logger.addHandler(handler)
269
270     @classmethod
271     def set_auth_url(cls):
272         """Foo"""
273         cls.logger.info("Setup kamaki client..")
274         cls.auth_url = cls.config.get('Global', 'auth_url')
275         cls.logger.debug("Authentication URL is %s" % _green(cls.auth_url))
276
277     @classmethod
278     def set_cyclades_url(cls):
279         """Foo"""
280         cls.cyclades_url = \
281             cls.astakos_client.get_service_endpoints('compute')['publicURL']
282         cls.logger.debug("Cyclades API url is %s" % _green(cls.cyclades_url))
283
284     @classmethod
285     def set_token(cls):
286         """Foo"""
287         cls.token = cls.config.get('Global', 'token')
288         cls.logger.debug("Token is %s" % _green(cls.token))
289
290     @classmethod
291     def set_astakos_client(cls, auth_url, token):
292         """Foo"""
293         cls.astakos_client = AstakosClient(auth_url, token)
294
295     @classmethod
296     def set_cyclades_client(cls, cyclades_url, token):
297         """Foo"""
298         cls.cyclades_client = CycladesClient(cyclades_url, token)
299         cls.cyclades_client.CONNECTION_RETRY_LIMIT = 2
300
301     @classmethod
302     def set_image_client(cls, cyclades_url, token):
303         """Foo"""
304         image_url = \
305             cls.astakos_client.get_service_endpoints('image')['publicURL']
306         cls.logger.debug("Images API url is %s" % _green(image_url))
307         cls.image_client = ImageClient(cyclades_url, token)
308         cls.image_client.CONNECTION_RETRY_LIMIT = 2
309
310     def _wait_transition(self, server_id, new_status):
311         """Wait for server to go to new_status"""
312         self.logger.debug("Waiting for server to become %s" % new_status)
313         timeout = self.config.getint('Global', 'build_timeout')
314         sleep_time = 5
315         while True:
316             server = self.cyclades_client.get_server_details(server_id)
317             if server['status'] == new_status:
318                 return server
319             elif timeout < 0:
320                 self.logger.error(
321                     "Waiting for server to become %s timed out" % new_status)
322                 return None
323             time.sleep(sleep_time)
324
325 class Server(CloudClient):
326     server = None
327     config_id = None
328     name = None
329     flavor_id = None
330     image_id = None
331     server_id = None
332     ipv4 = None
333     port = None
334     user = None
335     passwd = None
336     status = None
337     packages = None
338     install_cmd = None
339     update_cmd = None
340     files = []
341
342     def __init__(self, config_id):
343         CloudClient.__init__(self)
344         self.logger = logging.getLogger(config_id)
345         self.logger.setLevel(logging.INFO)
346         handler = logging.StreamHandler()
347         handler.setFormatter(_MyFormatter())
348         self.logger.addHandler(handler)
349
350         self.config_id = config_id
351         self.__get_flavor_id()
352         self.__get_image_id()
353         self.__get_name()
354         self.__get_packages()
355         self.__get_update_cmd()
356         self.__get_install_cmd()
357         self.__get_files()
358
359         if self.config.has_option(self.config_id, 'server_id'):
360             self.server_id = int(self.config.get(self.config_id, 'server_id'))
361         if self.config.has_option(self.config_id, 'user'):
362             self.user = self.config.get(self.config_id, 'user')
363         if self.config.has_option(self.config_id, 'passwd'):
364             self.passwd = self.config.get(self.config_id, 'passwd')
365
366         if self.flavor_id is None:
367             raise Exception("Flavor id not found for %s" % self.config_id)
368
369         if self.image_id is None:
370             raise Exception("Image id not found for %s" % self.config_id)
371
372         if self.server_id is not None:
373             server = self.cyclades_client.get_server_details(self.server_id)
374             if not server:
375                 raise Exception("Invalid server id")
376             self.update(server)
377
378     def __get_packages(self):
379         packages = []
380         if self.config.has_option(self.config_id, 'packages'):
381             packages += get_list_items(self.config.get(self.config_id, 'packages'))
382         if self.config.has_option('Global', 'packages'):
383             packages += get_list_items(self.config.get('Global', 'packages'))
384
385         if len(packages) > 0:
386             self.packages = packages
387
388     def __get_files(self):
389         files = []
390         tmp = []
391         tmp1 = []
392         if self.config.has_option(self.config_id, 'files'):
393             tmp1 = get_list_items(self.config.get(self.config_id, 'files'))
394             if len(tmp1) > 0 and len(tmp1) % 2 == 0:
395                 tmp += tmp1
396         if self.config.has_option('Global', 'files'):
397             tmp1 += get_list_items(self.config.get('Global', 'files'))
398             if len(tmp1) > 0 and len(tmp1) % 2 == 0:
399                 tmp += tmp1
400
401         for i in range(0, len(tmp), 2):
402             t = (tmp[i], tmp[i+1])
403             files.append(t)
404
405         if len(files) > 0:
406             self.files = files
407
408     def __get_flavor_id(self):
409         if self.config.has_option(self.config_id, 'flavor_id'):
410             self.flavor_id = self.config.get(self.config_id, 'flavor_id')
411         elif self.config.has_option('Global', 'flavor_id'):
412             self.flavor_id = self.config.get('Global', 'flavor_id')
413
414     def __get_image_id(self):
415         if self.config.has_option(self.config_id, 'image_id'):
416             self.image_id = self.config.get(self.config_id, 'image_id')
417         elif self.config.has_option('Global', 'image_id'):
418             self.image_id = self.config.get('Global', 'image_id')
419
420     def __get_name(self):
421         if self.config.has_option(self.config_id, 'name'):
422             self.name = self.config.get(self.config_id, 'name')
423         else:
424             self.name = self.config_id
425
426     def __get_update_cmd(self):
427         if self.config.has_option(self.config_id, 'update_cmd'):
428             self.update_cmd = self.config.get(self.config_id, 'update_cmd')
429         elif self.config.has_option('Global', 'update_cmd'):
430             self.update_cmd = self.config.get('Global', 'update_cmd')
431
432     def __get_install_cmd(self):
433         if self.config.has_option(self.config_id, 'install_cmd'):
434             self.install_cmd = self.config.get(self.config_id, 'install_cmd')
435         elif self.config.has_option('Global', 'install_cmd'):
436             self.install_cmd = self.config.get('Global', 'install_cmd')
437
438
439     def wait_transition(self, new_status):
440         if self.server_id:
441             server = self._wait_transition(self.server_id, new_status)
442             if server:
443                 self.update(server)
444                 return server
445         else:
446             return False
447
448     def create(self, wait=False):
449         if self.status and self.status != "DELETED":
450             return False
451         self.logger.info("Create a new server..")
452         server = self.cyclades_client.create_server(self.name, self.flavor_id,
453                                                     self.image_id)
454         self.server = server
455         self.server_id = server['id']
456         self.logger.debug("Server got id %s" % _green(self.server_id))
457         self.user = server['metadata']['users']
458         self.logger.debug("Server's admin user is %s" % _green(self.user))
459         self.passwd= server['adminPass']
460         self.logger.debug(
461             "Server's admin password is %s" % _green(self.passwd))
462         if wait:
463             server = self.wait_transition("ACTIVE")
464             if not server:
465                 return False
466         return True
467
468     def destroy(self, wait=False):
469         if self.server_id is None:
470             self.logger.debug("Server %s does not have server_id" %
471                     self.config_id)
472             return True
473
474         if self.status == "DELETED":
475             self.logger.debug("Server %d is marked as DELETED" %
476                     self.server_id)
477             self.update()
478             return True
479
480         self.logger.info("Destoying server with id %s " % self.server_id)
481         self.cyclades_client.delete_server(self.server_id)
482         if wait:
483             return self.wait_transition("DELETED")
484         return True
485
486     def update(self, server=None):
487         if not server:
488             if self.server_id is not None:
489                 server = self.cyclades_client.get_server_details(self.server_id)
490             else:
491                 raise Exception("Server id is not set")
492         try:
493             server_ip = server['attachments'][0]['ipv4']
494         except:
495             server_ip = None
496         server_port = 22
497         if server_ip and self.config.has_option('Global', 'okeanos_io'):
498             io = self.config.getboolean('Global', 'okeanos_io')
499             if io:
500                 server_port = get_port_from_ip(server_ip)
501                 server_ip = "gate.demo.synnefo.org"
502
503         self.logger.debug("Server's IPv4 is %s" % _green(server_ip))
504         self.logger.debug("Server's ssh port is %s" % _green(server_port))
505         self.ipv4 = server_ip
506         self.port = server_port
507         self.status = server['status']
508         self.write_config()
509
510     def write_config(self):
511         if self.status == "DELETED":
512             return self.clear_config()
513         if self.server_id:
514             self._write_config(self.config_id, 'server_id', self.server_id)
515         else:
516             self._remove_config(self.config_id, 'server_id')
517         if self.user:
518             self._write_config(self.config_id, 'user', self.user)
519         else:
520             self._remove_config(self.config_id, 'user')
521         if self.passwd:
522             self._write_config(self.config_id, 'passwd', self.passwd)
523         else:
524             self._remove_config(self.config_id, 'passwd')
525         if self.ipv4:
526             self._write_config(self.config_id, 'ipv4', self.ipv4)
527         else:
528             self._remove_config(self.config_id, 'ipv4')
529         if self.port:
530             self._write_config(self.config_id, 'port', self.port)
531         else:
532             self._remove_config(self.config_id, 'port')
533
534     def clear_config(self):
535         self._remove_config(self.config_id, 'server_id')
536         self._remove_config(self.config_id, 'user')
537         self._remove_config(self.config_id, 'passwd')
538         self._remove_config(self.config_id, 'ipv4')
539         self._remove_config(self.config_id, 'port')
540
541
542     def ping(self, timeout):
543         start = time.time()
544
545         while timeout == 0 or time.time() - start < timeout:
546             self.logger.info("Pinging host %s(%s)" % \
547                     (_green(self.config_id), _green(self.ipv4)))
548             cmd = ['ping', '-c', '1', '-w', '20', self.ipv4]
549             ping = subprocess.Popen(cmd, shell=False,
550                                     stdout=subprocess.PIPE, stderr=subprocess.PIPE) 
551             (stdout, stderr) = ping.communicate()
552             ret = ping.wait()
553             if ret == 0:
554                 self.logger.info("Pinging host %s %s" % \
555                         (self.config_id, _green("succeeded")))
556                 return True
557             self.logger.info("Pinging host %s failed. %s" % \
558                     (self.config_id, _yellow("Retrying")))
559
560         raise Timeout(timeout)
561
562     @capture_streams
563     def __execute_command(self, cmd):
564         with fabric.settings(host_string=str(self.ipv4), port=self.port,
565                 user=self.user, password=self.passwd, warn_only=True):
566             if not _run(cmd).succeeded:
567                 raise RemoteCommandFailed(cmd)
568
569     def execute_command(self, cmd, verbose=False):
570         host = "%s@%s:%d" % (self.user, self.ipv4, self.port)
571         self.logger.info("Executing cmd \"%s\" on host %s " % \
572                 (_yellow(cmd), _green(host)))
573         mock = not verbose
574         return self.__execute_command(cmd, mock=mock, stdout='/tmp/cmd_out',
575                 stderr='/tmp/cmd_err')
576
577     def install_packages(self, packages=None):
578         if not packages:
579             packages = self.packages
580
581         if not packages:
582             return
583
584         self.logger.info("Installing packages \"%s\" on host %s " % \
585                 (_green(' '.join(packages)), _green(self.config_id)))
586         self.execute_command("""
587         {0}
588         {1} {2}
589         """.format(self.update_cmd, self.install_cmd, ' '.join(packages)))
590
591     @capture_streams
592     def _inject_file(self, local_file, remote_path):
593         with fabric.settings(host_string=str(self.ipv4), port=self.port,
594                 user=self.user, password=self.passwd, warn_only=True):
595             if not fabric.put(local_file, remote_path).succeeded:
596                 raise RemotePutFailed(local_file, remote_path)
597
598     def inject_file(self, local_file, remote_path, verbose=False):
599         self.logger.info("Putting file %s on host %s" %
600                          (_yellow(local_file), _green(self.config_id)))
601         mock = not verbose
602         self._inject_file(local_file, remote_path, mock=mock)
603
604     def inject_files(self):
605         for f in self.files:
606             self.inject_file(f[0], f[1])
607
608 class Cluster(ConfigClient):
609
610     def __init__(self, **kwargs):
611         ConfigClient.__init__(self, **kwargs)
612
613         # Setup logger
614         self.logger = logging.getLogger('Cluster')
615         self.logger.setLevel(logging.INFO)
616         handler = logging.StreamHandler()
617         handler.setFormatter(_MyFormatter())
618         self.logger.addHandler(handler)
619         self.server_list = self.config.get('Global', 'servers')
620         self.server_list = get_list_items(self.server_list)
621         self.servers = []
622         self.cluster_created = True
623         for s in self.server_list:
624             server = Server(s)
625             if not server.status or server.status == "DELETED":
626                 self.cluster_created = False
627             self.servers.append(server)
628
629         self.destroy_on_error = False
630         if self.config.has_option('Global', 'destroy_on_error'):
631             self.destroy_on_error = \
632                     self.config.getboolean('Global', 'destroy_on_error')
633
634         self.cleanup_servers = False
635         if self.config.has_option('Global', 'cleanup_servers'):
636             self.cleanup_servers = \
637                     self.config.getboolean('Global', 'cleanup_servers')
638
639     def create(self):
640         if self.cleanup_servers:
641             self.destroy()
642
643         try:
644             submitted = []
645             for s in self.servers:
646                 if not s.status or s.status != "ACTIVE":
647                     s.create()
648                     submitted.append(s)
649
650             self.wait_status(submitted, "ACTIVE")
651
652             if self.config.has_option('Global', 'okeanos_io'):
653                 io = self.config.getboolean('Global', 'okeanos_io')
654                 if not io:
655                     for s in self.servers:
656                         s.ping(100)
657
658             for s in self.servers:
659                 s.inject_files()
660
661             for s in self.servers:
662                 s.install_packages()
663
664         except Exception as e:
665             if self.destroy_on_error:
666                 self.destroy()
667             raise e
668         self.cluster_created = True
669
670     def destroy(self):
671         submitted = []
672         for s in self.servers:
673             s.destroy()
674             submitted.append(s)
675
676         self.wait_status(submitted, "DELETED")
677         self.cluster_created = False
678
679     def execute_command(self, cmd, verbose=False):
680         for s in self.servers:
681             s.execute_command(cmd, verbose)
682
683     def inject_file(self, local_file, remote_path, verbose=False):
684         for s in self.servers:
685             s.inject_file(local_file, remote_path, verbose=verbose)
686
687     def install_packages(self, packages=None):
688         for s in self.servers:
689             s.install_packages(packages)
690
691
692     def wait_status(self, servers, status):
693         self.logger.debug("Waiting for %d servers to become %s" %
694                 (len(servers), status ))
695         for s in servers:
696             if s.wait_transition(status):
697                 self.logger.debug("Server %d became %s" % (s.server_id, status))
698             else:
699                 return False
700         return True
701
702     def get_server(self, name):
703         for s in self.servers:
704             if s.config_id == name:
705                 return s
706         return None
707
708 if __name__ == '__main__':
709     c = Cluster()
710     c.create()
711     c.destroy()
712