Revision c4e55622 snf-cyclades-gtools/synnefo/ganeti/hook.py

b/snf-cyclades-gtools/synnefo/ganeti/hook.py
45 45
import os
46 46
import subprocess
47 47

  
48
import time
49 48
import json
50 49
import socket
51 50
import logging
52 51

  
53
from amqplib import client_0_8 as amqp
52
from time import time
54 53

  
55 54
from synnefo import settings
55
from synnefo.lib.amqp import AMQPClient
56
from synnefo.lib.utils import split_time
56 57

  
57 58

  
58 59
def mac2eui64(mac, prefixstr):
......
135 136
        nics_list.append(nics[i])
136 137

  
137 138
    msg = {
139
        "event_time": split_time(time()),
138 140
        "type": "ganeti-net-status",
139 141
        "instance": instance,
140 142
        "nics": nics_list
......
149 151
        self.environ = environ
150 152
        self.instance = instance
151 153
        self.prefix = prefix
154
        # Retry up to two times(per host) to open a channel to RabbitMQ.
155
        # The hook needs to abort if this count is exceeded, because it
156
        # runs synchronously with VM creation inside Ganeti, and may only
157
        # run for a finite amount of time.
158

  
159
        # FIXME: We need a reconciliation mechanism between the DB and
160
        #        Ganeti, for cases exactly like this.
161
        self.client = AMQPClient(max_retries= 2*len(settings.AMQP_HOSTS))
162
        self.client.connect()
152 163

  
153 164
    def on_master(self):
154 165
        """Return True if running on the Ganeti master"""
......
158 169
        for (msgtype, msg) in msgs:
159 170
            routekey = "ganeti.%s.event.%s" % (self.prefix, msgtype)
160 171
            self.logger.debug("Pushing message to RabbitMQ: %s (key = %s)",
161
                json.dumps(msg), routekey)
162
            msg = amqp.Message(json.dumps(msg))
163
            msg.properties["delivery_mode"] = 2  # Persistent
164

  
165
            # Retry up to five times to open a channel to RabbitMQ.
166
            # The hook needs to abort if this count is exceeded, because it
167
            # runs synchronously with VM creation inside Ganeti, and may only
168
            # run for a finite amount of time.
169
            #
170
            # FIXME: We need a reconciliation mechanism between the DB and
171
            #        Ganeti, for cases exactly like this.
172
            conn = None
173
            sent = False
174
            retry = 0
175
            while not sent and retry < 5:
176
                self.logger.debug("Attempting to publish to RabbitMQ at %s",
177
                    settings.RABBIT_HOST)
178
                try:
179
                    if not conn:
180
                        conn = amqp.Connection(host=settings.RABBIT_HOST,
181
                            userid=settings.RABBIT_USERNAME,
182
                            password=settings.RABBIT_PASSWORD,
183
                            virtual_host=settings.RABBIT_VHOST)
184
                        chann = conn.channel()
185
                        self.logger.debug("Successfully connected to RabbitMQ at %s",
186
                            settings.RABBIT_HOST)
187

  
188
                    chann.basic_publish(msg,
189
                        exchange=settings.EXCHANGE_GANETI,
190
                        routing_key=routekey)
191
                    sent = True
192
                    self.logger.debug("Successfully sent message to RabbitMQ")
193
                except socket.error:
194
                    conn = False
195
                    retry += 1
196
                    self.logger.exception("Publish to RabbitMQ failed, retry=%d in 1s",
197
                        retry)
198
                    time.sleep(1)
199

  
200
            if not sent:
201
                raise Exception("Publish to RabbitMQ failed after %d tries, aborting" % retry)
202

  
172
                              json.dumps(msg), routekey)
173
            msg = json.dumps(msg)
174
            self.client.basic_publish(exchange=settings.EXCHANGE_GANETI,
175
                                      routing_key=routekey,
176
                                      body=msg)
177
        self.client.close()
203 178

  
204 179
class PostStartHook(GanetiHook):
205 180
    """Post-instance-startup Ganeti Hook.

Also available in: Unified diff