Statistics
| Branch: | Tag: | Revision:

root / snf-ganeti-tools / synnefo / ganeti / hook.py @ 45ebfd48

History | View | Annotate | Download (9.5 kB)

1
#!/usr/bin/env python
2
#
3
# -*- coding: utf-8 -*-
4
#
5
# Copyright 2011 GRNET S.A. All rights reserved.
6
#
7
# Redistribution and use in source and binary forms, with or
8
# without modification, are permitted provided that the following
9
# conditions are met:
10
#
11
#   1. Redistributions of source code must retain the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer.
14
#
15
#   2. Redistributions in binary form must reproduce the above
16
#      copyright notice, this list of conditions and the following
17
#      disclaimer in the documentation and/or other materials
18
#      provided with the distribution.
19
#
20
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
21
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
22
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
24
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
27
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
28
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
29
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
30
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
31
# POSSIBILITY OF SUCH DAMAGE.
32
#
33
# The views and conclusions contained in the software and
34
# documentation are those of the authors and should not be
35
# interpreted as representing official policies, either expressed
36
# or implied, of GRNET S.A.
37
#
38
"""Ganeti hooks for Synnefo
39

40
These are the individual Ganeti hooks for Synnefo.
41

42
"""
43

    
44
import sys
45
import os
46
import subprocess
47

    
48
import time
49
import json
50
import socket
51
import logging
52

    
53
from amqplib import client_0_8 as amqp
54

    
55
try:
56
    conf_dir = os.environ["SYNNEFO_CONFIG_DIR"]
57
    import config
58
    settings = config.load(conf_dir)
59
except KeyError:
60
    import synnefo.settings as settings
61

    
62

    
63
def mac2eui64(mac, prefixstr):
64
    process = subprocess.Popen(["mac2eui64", mac, prefixstr],
65
                                stdout=subprocess.PIPE)
66
    return process.stdout.read().rstrip()
67

    
68
def ganeti_net_status(logger, environ):
69
    """Produce notifications of type 'Ganeti-net-status'
70

71
    Process all GANETI_INSTANCE_NICx_y environment variables,
72
    where x is the NIC index, starting at 0,
73
    and y is one of "MAC", "IP", "BRIDGE".
74

75
    The result is returned as a single notification message
76
    of type 'ganeti-net-status', detailing the NIC configuration
77
    of a Ganeti instance.
78

79
    """
80
    nics = {}
81

    
82
    key_to_attr = { 'IP': 'ip', 'MAC': 'mac', 'BRIDGE': 'link' }
83

    
84
    for env in environ.keys():
85
        if env.startswith("GANETI_INSTANCE_NIC"):
86
            s = env.replace("GANETI_INSTANCE_NIC", "").split('_', 1)
87
            if len(s) == 2 and s[0].isdigit() and s[1] in ('MAC', 'IP', 'BRIDGE'):
88
                index = int(s[0])
89
                key = key_to_attr[s[1]]
90

    
91
                if nics.has_key(index):
92
                    nics[index][key] = environ[env]
93
                else:
94
                    nics[index] = { key: environ[env] }
95

    
96
                # IPv6 support:
97
                #
98
                # The IPv6 of NIC with index 0 [the public NIC]
99
                # is derived using an EUI64 scheme.
100
                if index == 0 and key == 'mac':
101
                    nics[0]['ipv6'] = mac2eui64(nics[0]['mac'],
102
                                                settings.PUBLIC_IPV6_PREFIX)
103

    
104
    # Amend notification with firewall settings
105
    tags = environ.get('GANETI_INSTANCE_TAGS', '')
106
    for tag in tags.split(' '):
107
        t = tag.split(':')
108
        if t[0:2] == ['synnefo', 'network']:
109
            if len(t) != 4:
110
                logger.error("Malformed synnefo tag %s", tag)
111
                continue
112
            try:
113
                index = int(t[2])
114
                nics[index]['firewall'] = t[3]
115
            except ValueError:
116
                logger.error("Malformed synnefo tag %s", tag)
117
            except KeyError:
118
                logger.error("Found tag %s for non-existent NIC %d",
119
                             tag, index)
120

    
121
    # Verify our findings are consistent with the Ganeti environment
122
    indexes = list(nics.keys())
123
    ganeti_nic_count = int(environ['GANETI_INSTANCE_NIC_COUNT'])
124
    if len(indexes) != ganeti_nic_count:
125
        logger.error("I have %d NICs, Ganeti says number of NICs is %d",
126
            len(indexes), ganeti_nic_count)
127
        raise Exception("Inconsistent number of NICs in Ganeti environment")
128

    
129
    if indexes != range(0, len(indexes)):
130
        logger.error("Ganeti NIC indexes are not consecutive starting at zero.");
131
        logger.error("NIC indexes are: %s. Environment is: %s", indexes, environ)
132
        raise Exception("Unexpected inconsistency in the Ganeti environment")
133

    
134
    # Construct the notification
135
    instance = environ['GANETI_INSTANCE_NAME']
136

    
137
    nics_list = []
138
    for i in indexes:
139
        nics_list.append(nics[i])
140

    
141
    msg = {
142
        "type": "ganeti-net-status",
143
        "instance": instance,
144
        "nics": nics_list
145
    }
146

    
147
    return msg
148

    
149

    
150
class GanetiHook():
151
    def __init__(self, logger, environ, instance, prefix):
152
        self.logger = logger
153
        self.environ = environ
154
        self.instance = instance
155
        self.prefix = prefix
156

    
157
    def on_master(self):
158
        """Return True if running on the Ganeti master"""
159
        return socket.getfqdn() == self.environ['GANETI_MASTER']
160

    
161
    def publish_msgs(self, msgs):
162
        for (msgtype, msg) in msgs:
163
            routekey = "ganeti.%s.event.%s" % (self.prefix, msgtype)
164
            self.logger.debug("Pushing message to RabbitMQ: %s (key = %s)",
165
                json.dumps(msg), routekey)
166
            msg = amqp.Message(json.dumps(msg))
167
            msg.properties["delivery_mode"] = 2  # Persistent
168

    
169
            # Retry up to five times to open a channel to RabbitMQ.
170
            # The hook needs to abort if this count is exceeded, because it
171
            # runs synchronously with VM creation inside Ganeti, and may only
172
            # run for a finite amount of time.
173
            #
174
            # FIXME: We need a reconciliation mechanism between the DB and
175
            #        Ganeti, for cases exactly like this.
176
            conn = None
177
            sent = False
178
            retry = 0
179
            while not sent and retry < 5:
180
                self.logger.debug("Attempting to publish to RabbitMQ at %s",
181
                    settings.RABBIT_HOST)
182
                try:
183
                    if not conn:
184
                        conn = amqp.Connection(host=settings.RABBIT_HOST,
185
                            userid=settings.RABBIT_USERNAME,
186
                            password=settings.RABBIT_PASSWORD,
187
                            virtual_host=settings.RABBIT_VHOST)
188
                        chann = conn.channel()
189
                        self.logger.debug("Successfully connected to RabbitMQ at %s",
190
                            settings.RABBIT_HOST)
191

    
192
                    chann.basic_publish(msg,
193
                        exchange=settings.EXCHANGE_GANETI,
194
                        routing_key=routekey)
195
                    sent = True
196
                    self.logger.debug("Successfully sent message to RabbitMQ")
197
                except socket.error:
198
                    conn = False
199
                    retry += 1
200
                    self.logger.exception("Publish to RabbitMQ failed, retry=%d in 1s",
201
                        retry)
202
                    time.sleep(1)
203

    
204
            if not sent:
205
                raise Exception("Publish to RabbitMQ failed after %d tries, aborting" % retry)
206

    
207

    
208
class PostStartHook(GanetiHook):
209
    """Post-instance-startup Ganeti Hook.
210

211
    Produce notifications to the rest of the Synnefo
212
    infrastructure in the post-instance-start phase of Ganeti.
213

214
    Currently, this list only contains a single message,
215
    detailing the net configuration of an instance.
216

217
    This hook only runs on the Ganeti master.
218

219
    """
220
    def run(self):
221
        if self.on_master():
222
            notifs = []
223
            notifs.append(("net", ganeti_net_status(self.logger, self.environ)))
224

    
225
            self.publish_msgs(notifs)
226

    
227
        return 0
228

    
229

    
230
class PostStopHook(GanetiHook):
231
    def run(self):
232
        return 0
233

    
234

    
235
def main():
236
    logging.basicConfig(level=logging.DEBUG)
237
    logger = logging.getLogger("synnefo.ganeti")
238

    
239
    try:
240
        instance = os.environ['GANETI_INSTANCE_NAME']
241
        op = os.environ['GANETI_HOOKS_PATH']
242
        phase = os.environ['GANETI_HOOKS_PHASE']
243
    except KeyError:
244
        raise Exception("Environment missing one of: " \
245
            "GANETI_INSTANCE_NAME, GANETI_HOOKS_PATH, GANETI_HOOKS_PHASE")
246

    
247
    prefix = instance.split('-')[0]
248

    
249
    # FIXME: The hooks should only run for Synnefo instances.
250
    # Uncomment the following lines for a shared Ganeti deployment.
251
    # Currently, the following code is commented out because multiple
252
    # backend prefixes are used in the same Ganeti installation during development.
253
    #if not instance.startswith(settings.BACKEND_PREFIX_ID):
254
    #    logger.warning("Ignoring non-Synnefo instance %s", instance)
255
    #    return 0
256

    
257
    hooks = {
258
        ("instance-add", "post"): PostStartHook,
259
        ("instance-start", "post"): PostStartHook,
260
        ("instance-reboot", "post"): PostStartHook,
261
        ("instance-stop", "post"): PostStopHook,
262
        ("instance-modify", "post"): PostStartHook
263
    }
264

    
265
    try:
266
        hook = hooks[(op, phase)](logger, os.environ, instance, prefix)
267
    except KeyError:
268
        raise Exception("No hook found for operation = '%s', phase = '%s'" % (op, phase))
269
    return hook.run()
270

    
271
if __name__ == "__main__":
272
    sys.exit(main())