Revision c4e55622 snf-cyclades-gtools/synnefo/ganeti/hook.py
b/snf-cyclades-gtools/synnefo/ganeti/hook.py | ||
---|---|---|
45 | 45 |
import os |
46 | 46 |
import subprocess |
47 | 47 |
|
48 |
import time |
|
49 | 48 |
import json |
50 | 49 |
import socket |
51 | 50 |
import logging |
52 | 51 |
|
53 |
from amqplib import client_0_8 as amqp
|
|
52 |
from time import time
|
|
54 | 53 |
|
55 | 54 |
from synnefo import settings |
55 |
from synnefo.lib.amqp import AMQPClient |
|
56 |
from synnefo.lib.utils import split_time |
|
56 | 57 |
|
57 | 58 |
|
58 | 59 |
def mac2eui64(mac, prefixstr): |
... | ... | |
135 | 136 |
nics_list.append(nics[i]) |
136 | 137 |
|
137 | 138 |
msg = { |
139 |
"event_time": split_time(time()), |
|
138 | 140 |
"type": "ganeti-net-status", |
139 | 141 |
"instance": instance, |
140 | 142 |
"nics": nics_list |
... | ... | |
149 | 151 |
self.environ = environ |
150 | 152 |
self.instance = instance |
151 | 153 |
self.prefix = prefix |
154 |
# Retry up to two times(per host) to open a channel to RabbitMQ. |
|
155 |
# The hook needs to abort if this count is exceeded, because it |
|
156 |
# runs synchronously with VM creation inside Ganeti, and may only |
|
157 |
# run for a finite amount of time. |
|
158 |
|
|
159 |
# FIXME: We need a reconciliation mechanism between the DB and |
|
160 |
# Ganeti, for cases exactly like this. |
|
161 |
self.client = AMQPClient(max_retries= 2*len(settings.AMQP_HOSTS)) |
|
162 |
self.client.connect() |
|
152 | 163 |
|
153 | 164 |
def on_master(self): |
154 | 165 |
"""Return True if running on the Ganeti master""" |
... | ... | |
158 | 169 |
for (msgtype, msg) in msgs: |
159 | 170 |
routekey = "ganeti.%s.event.%s" % (self.prefix, msgtype) |
160 | 171 |
self.logger.debug("Pushing message to RabbitMQ: %s (key = %s)", |
161 |
json.dumps(msg), routekey) |
|
162 |
msg = amqp.Message(json.dumps(msg)) |
|
163 |
msg.properties["delivery_mode"] = 2 # Persistent |
|
164 |
|
|
165 |
# Retry up to five times to open a channel to RabbitMQ. |
|
166 |
# The hook needs to abort if this count is exceeded, because it |
|
167 |
# runs synchronously with VM creation inside Ganeti, and may only |
|
168 |
# run for a finite amount of time. |
|
169 |
# |
|
170 |
# FIXME: We need a reconciliation mechanism between the DB and |
|
171 |
# Ganeti, for cases exactly like this. |
|
172 |
conn = None |
|
173 |
sent = False |
|
174 |
retry = 0 |
|
175 |
while not sent and retry < 5: |
|
176 |
self.logger.debug("Attempting to publish to RabbitMQ at %s", |
|
177 |
settings.RABBIT_HOST) |
|
178 |
try: |
|
179 |
if not conn: |
|
180 |
conn = amqp.Connection(host=settings.RABBIT_HOST, |
|
181 |
userid=settings.RABBIT_USERNAME, |
|
182 |
password=settings.RABBIT_PASSWORD, |
|
183 |
virtual_host=settings.RABBIT_VHOST) |
|
184 |
chann = conn.channel() |
|
185 |
self.logger.debug("Successfully connected to RabbitMQ at %s", |
|
186 |
settings.RABBIT_HOST) |
|
187 |
|
|
188 |
chann.basic_publish(msg, |
|
189 |
exchange=settings.EXCHANGE_GANETI, |
|
190 |
routing_key=routekey) |
|
191 |
sent = True |
|
192 |
self.logger.debug("Successfully sent message to RabbitMQ") |
|
193 |
except socket.error: |
|
194 |
conn = False |
|
195 |
retry += 1 |
|
196 |
self.logger.exception("Publish to RabbitMQ failed, retry=%d in 1s", |
|
197 |
retry) |
|
198 |
time.sleep(1) |
|
199 |
|
|
200 |
if not sent: |
|
201 |
raise Exception("Publish to RabbitMQ failed after %d tries, aborting" % retry) |
|
202 |
|
|
172 |
json.dumps(msg), routekey) |
|
173 |
msg = json.dumps(msg) |
|
174 |
self.client.basic_publish(exchange=settings.EXCHANGE_GANETI, |
|
175 |
routing_key=routekey, |
|
176 |
body=msg) |
|
177 |
self.client.close() |
|
203 | 178 |
|
204 | 179 |
class PostStartHook(GanetiHook): |
205 | 180 |
"""Post-instance-startup Ganeti Hook. |
Also available in: Unified diff