Statistics
| Branch: | Tag: | Revision:

root / ganeti / hooks.py @ eab0602e

History | View | Annotate | Download (6.3 kB)

1
#!/usr/bin/env python
2
#
3
# Copyright (c) 2010 Greek Research and Technology Network
4
#
5
"""Ganeti hooks for Synnefo
6

7
These are the individual Ganeti hooks for Synnefo.
8

9
"""
10

    
11
import sys
12
import os
13

    
14
import time
15
import json
16
import socket
17
import logging
18

    
19
from amqplib import client_0_8 as amqp
20

    
21
from synnefo.util.mac2eui64 import mac2eui64
22
import synnefo.settings as settings
23

    
24
def ganeti_net_status(logger, environ):
25
    """Produce notifications of type 'Ganeti-net-status'
26
    
27
    Process all GANETI_INSTANCE_NICx_y environment variables,
28
    where x is the NIC index, starting at 0,
29
    and y is one of "MAC", "IP", "BRIDGE".
30

31
    The result is returned as a single notification message
32
    of type 'ganeti-net-status', detailing the NIC configuration
33
    of a Ganeti instance.
34

35
    """
36
    nics = {}
37

    
38
    key_to_attr = { 'IP': 'ip', 'MAC': 'mac', 'BRIDGE': 'link' }
39

    
40
    for env in environ.keys():
41
        if env.startswith("GANETI_INSTANCE_NIC"):
42
            s = env.replace("GANETI_INSTANCE_NIC", "").split('_', 1)
43
            if len(s) == 2 and s[0].isdigit() and s[1] in ('MAC', 'IP', 'BRIDGE'):
44
                index = int(s[0])
45
                key = key_to_attr[s[1]]
46

    
47
                if nics.has_key(index):
48
                    nics[index][key] = environ[env]
49
                else:
50
                    nics[index] = { key: environ[env] }
51

    
52
                # IPv6 support:
53
                #
54
                # The IPv6 of NIC with index 0 [the public NIC]
55
                # is derived using an EUI64 scheme.
56
                if index == 0 and key == 'mac':
57
                    nics[0]['ipv6'] = mac2eui64(nics[0]['mac'],
58
                                                settings.PUBLIC_IPV6_PREFIX)
59

    
60
    # Amend notification with firewall settings
61
    tags = environ.get('GANETI_INSTANCE_TAGS', '')
62
    for tag in tags.split(' '):
63
        t = tag.split(':')
64
        if t[0:2] == ['synnefo', 'network']:
65
            if len(t) != 4:
66
                logger.error("Malformed synnefo tag %s", tag)
67
                continue
68
            try:
69
                index = int(t[2])
70
                nics[index]['firewall'] = t[3]
71
            except ValueError:
72
                logger.error("Malformed synnefo tag %s", tag)
73
            except KeyError:
74
                logger.error("Found tag %s for non-existent NIC %d",
75
                             tag, index)
76

    
77
    # Verify our findings are consistent with the Ganeti environment
78
    indexes = list(nics.keys())
79
    ganeti_nic_count = int(environ['GANETI_INSTANCE_NIC_COUNT'])
80
    if len(indexes) != ganeti_nic_count:
81
        logger.error("I have %d NICs, Ganeti says number of NICs is %d",
82
            len(indexes), ganeti_nic_count)
83
        raise Exception("Inconsistent number of NICs in Ganeti environment")
84

    
85
    if indexes != range(0, len(indexes)):
86
        logger.error("Ganeti NIC indexes are not consecutive starting at zero.");
87
        logger.error("NIC indexes are: %s. Environment is: %s", indexes, environ)
88
        raise Exception("Unexpected inconsistency in the Ganeti environment")
89

    
90
    # Construct the notification
91
    instance = environ['GANETI_INSTANCE_NAME']
92

    
93
    nics_list = []
94
    for i in indexes:
95
        nics_list.append(nics[i])
96

    
97
    msg = {
98
        "type": "ganeti-net-status",
99
        "instance": instance,
100
        "nics": nics_list
101
    }
102

    
103
    return msg
104

    
105

    
106
class GanetiHook():
107
    def __init__(self, logger, environ, instance, prefix):
108
        self.logger = logger
109
        self.environ = environ
110
        self.instance = instance
111
        self.prefix = prefix
112

    
113
    def on_master(self):
114
        """Return True if running on the Ganeti master"""
115
        return socket.getfqdn() == self.environ['GANETI_MASTER']
116

    
117
    def publish_msgs(self, msgs):
118
        for (msgtype, msg) in msgs:
119
            routekey = "ganeti.%s.event.%s" % (self.prefix, msgtype)
120
            self.logger.debug("Pushing message to RabbitMQ: %s (key = %s)",
121
                json.dumps(msg), routekey)
122
            msg = amqp.Message(json.dumps(msg))
123
            msg.properties["delivery_mode"] = 2  # Persistent
124

    
125
            # Retry up to five times to open a channel to RabbitMQ.
126
            # The hook needs to abort if this count is exceeded, because it
127
            # runs synchronously with VM creation inside Ganeti, and may only
128
            # run for a finite amount of time.
129
            #
130
            # FIXME: We need a reconciliation mechanism between the DB and
131
            #        Ganeti, for cases exactly like this.
132
            conn = None
133
            sent = False
134
            retry = 0
135
            while not sent and retry < 5:
136
                self.logger.debug("Attempting to publish to RabbitMQ at %s",
137
                    settings.RABBIT_HOST)
138
                try:
139
                    if not conn:
140
                        conn = amqp.Connection(host=settings.RABBIT_HOST,
141
                            userid=settings.RABBIT_USERNAME,
142
                            password=settings.RABBIT_PASSWORD,
143
                            virtual_host=settings.RABBIT_VHOST)
144
                        chann = conn.channel()
145
                        self.logger.debug("Successfully connected to RabbitMQ at %s",
146
                            settings.RABBIT_HOST)
147

    
148
                    chann.basic_publish(msg,
149
                        exchange=settings.EXCHANGE_GANETI,
150
                        routing_key=routekey)
151
                    sent = True
152
                    self.logger.debug("Successfully sent message to RabbitMQ")
153
                except socket.error:
154
                    conn = False
155
                    retry += 1
156
                    self.logger.exception("Publish to RabbitMQ failed, retry=%d in 1s",
157
                        retry)
158
                    time.sleep(1)
159

    
160
            if not sent:
161
                raise Exception("Publish to RabbitMQ failed after %d tries, aborting" % retry)
162

    
163

    
164
class PostStartHook(GanetiHook):
165
    """Post-instance-startup Ganeti Hook.
166

167
    Produce notifications to the rest of the Synnefo
168
    infrastructure in the post-instance-start phase of Ganeti.
169

170
    Currently, this list only contains a single message,
171
    detailing the net configuration of an instance.
172

173
    This hook only runs on the Ganeti master.
174

175
    """
176
    def run(self):
177
        if self.on_master():
178
            notifs = []
179
            notifs.append(("net", ganeti_net_status(self.logger, self.environ)))
180
            
181
            self.publish_msgs(notifs)
182

    
183
        return 0
184

    
185

    
186
class PostStopHook(GanetiHook):
187
    def run(self):
188
        return 0
189