Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-gtools / synnefo / ganeti / eventd.py @ 32e2716a

History | View | Annotate | Download (16 kB)

1 b024b97a Georgios Gousios
#!/usr/bin/env python
2 737b0e28 Vangelis Koukis
# -*- coding: utf-8 -*-
3 b024b97a Georgios Gousios
#
4 737b0e28 Vangelis Koukis
# Copyright 2011 GRNET S.A. All rights reserved.
5 737b0e28 Vangelis Koukis
#
6 737b0e28 Vangelis Koukis
# Redistribution and use in source and binary forms, with or
7 737b0e28 Vangelis Koukis
# without modification, are permitted provided that the following
8 737b0e28 Vangelis Koukis
# conditions are met:
9 737b0e28 Vangelis Koukis
#
10 737b0e28 Vangelis Koukis
#   1. Redistributions of source code must retain the above
11 737b0e28 Vangelis Koukis
#      copyright notice, this list of conditions and the following
12 737b0e28 Vangelis Koukis
#      disclaimer.
13 737b0e28 Vangelis Koukis
#
14 737b0e28 Vangelis Koukis
#   2. Redistributions in binary form must reproduce the above
15 737b0e28 Vangelis Koukis
#      copyright notice, this list of conditions and the following
16 737b0e28 Vangelis Koukis
#      disclaimer in the documentation and/or other materials
17 737b0e28 Vangelis Koukis
#      provided with the distribution.
18 737b0e28 Vangelis Koukis
#
19 737b0e28 Vangelis Koukis
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20 737b0e28 Vangelis Koukis
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 737b0e28 Vangelis Koukis
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22 737b0e28 Vangelis Koukis
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23 737b0e28 Vangelis Koukis
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24 737b0e28 Vangelis Koukis
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25 737b0e28 Vangelis Koukis
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26 737b0e28 Vangelis Koukis
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27 737b0e28 Vangelis Koukis
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28 737b0e28 Vangelis Koukis
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29 737b0e28 Vangelis Koukis
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30 737b0e28 Vangelis Koukis
# POSSIBILITY OF SUCH DAMAGE.
31 737b0e28 Vangelis Koukis
#
32 737b0e28 Vangelis Koukis
# The views and conclusions contained in the software and
33 737b0e28 Vangelis Koukis
# documentation are those of the authors and should not be
34 737b0e28 Vangelis Koukis
# interpreted as representing official policies, either expressed
35 737b0e28 Vangelis Koukis
# or implied, of GRNET S.A.
36 737b0e28 Vangelis Koukis
#
37 45ebfd48 Vangelis Koukis
38 737b0e28 Vangelis Koukis
"""Ganeti notification daemon with AMQP support
39 b024b97a Georgios Gousios

40 b024b97a Georgios Gousios
A daemon to monitor the Ganeti job queue and publish job progress
41 80bd5072 Georgios Gousios
and Ganeti VM state notifications to the ganeti exchange
42 b024b97a Georgios Gousios
"""
43 b024b97a Georgios Gousios
44 b024b97a Georgios Gousios
import sys
45 b024b97a Georgios Gousios
import os
46 b024b97a Georgios Gousios
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
47 b024b97a Georgios Gousios
sys.path.append(path)
48 b024b97a Georgios Gousios
49 b024b97a Georgios Gousios
import json
50 b024b97a Georgios Gousios
import logging
51 b024b97a Georgios Gousios
import pyinotify
52 b024b97a Georgios Gousios
import daemon
53 b024b97a Georgios Gousios
import daemon.pidlockfile
54 cf2a3529 Christos Stavrakakis
import daemon.runner
55 cf2a3529 Christos Stavrakakis
from lockfile import LockTimeout
56 b024b97a Georgios Gousios
from signal import signal, SIGINT, SIGTERM
57 1dc821c9 Christos Stavrakakis
import setproctitle
58 b024b97a Georgios Gousios
59 a9bb2a1a Christos Stavrakakis
from ganeti import utils, jqueue, constants, serializer, pathutils, cli
60 6df16263 Christos Stavrakakis
from ganeti import errors as ganeti_errors
61 a9bb2a1a Christos Stavrakakis
from ganeti.ssconf import SimpleStore
62 e66a7e34 Christos Stavrakakis
63 b024b97a Georgios Gousios
64 6d6b8f88 Kostas Papadimitriou
from synnefo import settings
65 c4e55622 Christos Stavrakakis
from synnefo.lib.amqp import AMQPClient
66 c4e55622 Christos Stavrakakis
67 27a8e4ae Christos Stavrakakis
68 c4e55622 Christos Stavrakakis
def get_time_from_status(op, job):
69 c4e55622 Christos Stavrakakis
    """Generate a unique message identifier for a ganeti job.
70 c4e55622 Christos Stavrakakis

71 c4e55622 Christos Stavrakakis
    The identifier is based on the timestamp of the job. Since a ganeti
72 c4e55622 Christos Stavrakakis
    job passes from multiple states, we need to pick the timestamp that
73 c4e55622 Christos Stavrakakis
    corresponds to each state.
74 c4e55622 Christos Stavrakakis

75 c4e55622 Christos Stavrakakis
    """
76 c4e55622 Christos Stavrakakis
    status = op.status
77 c4e55622 Christos Stavrakakis
    if status == constants.JOB_STATUS_QUEUED:
78 765ff3ff Christos Stavrakakis
        time = job.received_timestamp
79 27a8e4ae Christos Stavrakakis
    try:  # Compatibility with Ganeti version
80 39ccdb18 Christos Stavrakakis
        if status == constants.JOB_STATUS_WAITLOCK:
81 765ff3ff Christos Stavrakakis
            time = op.start_timestamp
82 39ccdb18 Christos Stavrakakis
    except AttributeError:
83 39ccdb18 Christos Stavrakakis
        if status == constants.JOB_STATUS_WAITING:
84 765ff3ff Christos Stavrakakis
            time = op.start_timestamp
85 c4e55622 Christos Stavrakakis
    if status == constants.JOB_STATUS_CANCELING:
86 765ff3ff Christos Stavrakakis
        time = op.start_timestamp
87 c4e55622 Christos Stavrakakis
    if status == constants.JOB_STATUS_RUNNING:
88 765ff3ff Christos Stavrakakis
        time = op.exec_timestamp
89 c4e55622 Christos Stavrakakis
    if status in constants.JOBS_FINALIZED:
90 765ff3ff Christos Stavrakakis
        time = op.end_timestamp
91 765ff3ff Christos Stavrakakis
92 765ff3ff Christos Stavrakakis
    return time and time or job.end_timestamp
93 c4e55622 Christos Stavrakakis
94 27a8e4ae Christos Stavrakakis
    raise InvalidBackendStatus(status, job)
95 c4e55622 Christos Stavrakakis
96 c4e55622 Christos Stavrakakis
97 0e1f3323 Christos Stavrakakis
def get_instance_nics(instance, logger):
98 0e1f3323 Christos Stavrakakis
    """Query Ganeti to a get the instance's NICs.
99 0e1f3323 Christos Stavrakakis

100 6df16263 Christos Stavrakakis
    Get instance's NICs from Ganeti configuration data. If running on master,
101 6df16263 Christos Stavrakakis
    query Ganeti via Ganeti CLI client. Otherwise, get the nics from Ganeti
102 6df16263 Christos Stavrakakis
    configuration file.
103 6df16263 Christos Stavrakakis

104 0e1f3323 Christos Stavrakakis
    @type instance: string
105 0e1f3323 Christos Stavrakakis
    @param instance: the name of the instance
106 0e1f3323 Christos Stavrakakis
    @rtype: List of dicts
107 6df16263 Christos Stavrakakis
    @return: Dictionary containing the instance's NICs. Each dictionary
108 0e1f3323 Christos Stavrakakis
             contains the following keys: 'network', 'ip', 'mac', 'mode',
109 0e1f3323 Christos Stavrakakis
             'link' and 'firewall'
110 0e1f3323 Christos Stavrakakis

111 0e1f3323 Christos Stavrakakis
    """
112 6df16263 Christos Stavrakakis
    try:
113 6df16263 Christos Stavrakakis
        client = cli.GetClient()
114 32e2716a Christos Stavrakakis
        fields = ["nic.networks.names", "nic.ips", "nic.macs", "nic.modes",
115 6df16263 Christos Stavrakakis
                  "nic.links", "tags"]
116 6df16263 Christos Stavrakakis
        info = client.QueryInstances([instance], fields, use_locking=False)
117 6df16263 Christos Stavrakakis
        networks, ips, macs, modes, links, tags = info[0]
118 6df16263 Christos Stavrakakis
        nic_keys = ["network", "ip", "mac", "mode", "link"]
119 6df16263 Christos Stavrakakis
        nics = zip(networks, ips, macs, modes, links)
120 6df16263 Christos Stavrakakis
        nics = map(lambda x: dict(zip(nic_keys, x)), nics)
121 6df16263 Christos Stavrakakis
    except ganeti_errors.OpPrereqError:
122 6df16263 Christos Stavrakakis
        # Not running on master! Load the conf file
123 6df16263 Christos Stavrakakis
        raw_data = utils.ReadFile(constants.CLUSTER_CONF_FILE)
124 6df16263 Christos Stavrakakis
        config = serializer.LoadJson(raw_data)
125 6df16263 Christos Stavrakakis
        i = config["instances"][instance]
126 6df16263 Christos Stavrakakis
        nics = []
127 6df16263 Christos Stavrakakis
        for nic in i["nics"]:
128 6df16263 Christos Stavrakakis
            params = nic.pop("nicparams")
129 6df16263 Christos Stavrakakis
            nic["mode"] = params["mode"]
130 6df16263 Christos Stavrakakis
            nic["link"] = params["link"]
131 6df16263 Christos Stavrakakis
            nics.append(nic)
132 6df16263 Christos Stavrakakis
        tags = i.get("tags", [])
133 0e1f3323 Christos Stavrakakis
    # Get firewall from instance Tags
134 0e1f3323 Christos Stavrakakis
    # Tags are of the form synnefo:network:N:firewall_mode
135 0e1f3323 Christos Stavrakakis
    for tag in tags:
136 0e1f3323 Christos Stavrakakis
        t = tag.split(":")
137 0e1f3323 Christos Stavrakakis
        if t[0:2] == ["synnefo", "network"]:
138 0e1f3323 Christos Stavrakakis
            if len(t) != 4:
139 0e1f3323 Christos Stavrakakis
                logger.error("Malformed synefo tag %s", tag)
140 0e1f3323 Christos Stavrakakis
                continue
141 0e1f3323 Christos Stavrakakis
            try:
142 0e1f3323 Christos Stavrakakis
                index = int(t[2])
143 0e1f3323 Christos Stavrakakis
                nics[index]['firewall'] = t[3]
144 0e1f3323 Christos Stavrakakis
            except ValueError:
145 0e1f3323 Christos Stavrakakis
                logger.error("Malformed synnefo tag %s", tag)
146 0e1f3323 Christos Stavrakakis
            except IndexError:
147 0e1f3323 Christos Stavrakakis
                logger.error("Found tag %s for non-existent NIC %d",
148 0e1f3323 Christos Stavrakakis
                             tag, index)
149 0e1f3323 Christos Stavrakakis
    return nics
150 0e1f3323 Christos Stavrakakis
151 0e1f3323 Christos Stavrakakis
152 c4e55622 Christos Stavrakakis
class InvalidBackendStatus(Exception):
153 c4e55622 Christos Stavrakakis
    def __init__(self, status, job):
154 c4e55622 Christos Stavrakakis
        self.status = status
155 c4e55622 Christos Stavrakakis
        self.job = job
156 c4e55622 Christos Stavrakakis
157 c4e55622 Christos Stavrakakis
    def __str__(self):
158 c4e55622 Christos Stavrakakis
        return repr("Invalid backend status: %s in job %s"
159 c4e55622 Christos Stavrakakis
                    % (self.status, self.job))
160 c4e55622 Christos Stavrakakis
161 27a8e4ae Christos Stavrakakis
162 39ccdb18 Christos Stavrakakis
def prefix_from_name(name):
163 39ccdb18 Christos Stavrakakis
    return name.split('-')[0]
164 39ccdb18 Christos Stavrakakis
165 39ccdb18 Christos Stavrakakis
166 39ccdb18 Christos Stavrakakis
def get_field(from_, field):
167 39ccdb18 Christos Stavrakakis
    try:
168 39ccdb18 Christos Stavrakakis
        return getattr(from_, field)
169 39ccdb18 Christos Stavrakakis
    except AttributeError:
170 39ccdb18 Christos Stavrakakis
        None
171 39ccdb18 Christos Stavrakakis
172 45ebfd48 Vangelis Koukis
173 b024b97a Georgios Gousios
class JobFileHandler(pyinotify.ProcessEvent):
174 27a8e4ae Christos Stavrakakis
    def __init__(self, logger, cluster_name):
175 348f53de Georgios Gousios
        pyinotify.ProcessEvent.__init__(self)
176 348f53de Georgios Gousios
        self.logger = logger
177 27a8e4ae Christos Stavrakakis
        self.cluster_name = cluster_name
178 27a8e4ae Christos Stavrakakis
179 2ef10562 Christos Stavrakakis
        # Set max_retries to 0 for unlimited retries.
180 2ef10562 Christos Stavrakakis
        self.client = AMQPClient(hosts=settings.AMQP_HOSTS, confirm_buffer=25,
181 a8858945 Christos Stavrakakis
                                 max_retries=0, logger=logger)
182 2ef10562 Christos Stavrakakis
183 c4e55622 Christos Stavrakakis
        handler_logger.info("Attempting to connect to RabbitMQ hosts")
184 27a8e4ae Christos Stavrakakis
185 c4e55622 Christos Stavrakakis
        self.client.connect()
186 c4e55622 Christos Stavrakakis
        handler_logger.info("Connected succesfully")
187 b024b97a Georgios Gousios
188 27a8e4ae Christos Stavrakakis
        self.client.exchange_declare(settings.EXCHANGE_GANETI, type='topic')
189 27a8e4ae Christos Stavrakakis
190 39ccdb18 Christos Stavrakakis
        self.op_handlers = {"INSTANCE": self.process_instance_op,
191 36d450e8 Christos Stavrakakis
                            "NETWORK": self.process_network_op,
192 36d450e8 Christos Stavrakakis
                            "CLUSTER": self.process_cluster_op}
193 39ccdb18 Christos Stavrakakis
                            # "GROUP": self.process_group_op}
194 39ccdb18 Christos Stavrakakis
195 b024b97a Georgios Gousios
    def process_IN_CLOSE_WRITE(self, event):
196 80dcd124 Vangelis Koukis
        self.process_IN_MOVED_TO(event)
197 80dcd124 Vangelis Koukis
198 80dcd124 Vangelis Koukis
    def process_IN_MOVED_TO(self, event):
199 b024b97a Georgios Gousios
        jobfile = os.path.join(event.path, event.name)
200 b024b97a Georgios Gousios
        if not event.name.startswith("job-"):
201 b024b97a Georgios Gousios
            self.logger.debug("Not a job file: %s" % event.path)
202 b024b97a Georgios Gousios
            return
203 b024b97a Georgios Gousios
204 b024b97a Georgios Gousios
        try:
205 b024b97a Georgios Gousios
            data = utils.ReadFile(jobfile)
206 b024b97a Georgios Gousios
        except IOError:
207 b024b97a Georgios Gousios
            return
208 b024b97a Georgios Gousios
209 b024b97a Georgios Gousios
        data = serializer.LoadJson(data)
210 a9bb2a1a Christos Stavrakakis
        job = jqueue._QueuedJob.Restore(None, data, False, False)
211 4ed30eed Christos Stavrakakis
212 39ccdb18 Christos Stavrakakis
        job_id = int(job.id)
213 b024b97a Georgios Gousios
214 b024b97a Georgios Gousios
        for op in job.ops:
215 39ccdb18 Christos Stavrakakis
            op_id = op.input.OP_ID
216 b024b97a Georgios Gousios
217 39ccdb18 Christos Stavrakakis
            msg = None
218 b024b97a Georgios Gousios
            try:
219 39ccdb18 Christos Stavrakakis
                handler_fn = self.op_handlers[op_id.split('_')[1]]
220 39ccdb18 Christos Stavrakakis
                msg, routekey = handler_fn(op, job_id)
221 39ccdb18 Christos Stavrakakis
            except KeyError:
222 b024b97a Georgios Gousios
                pass
223 b024b97a Georgios Gousios
224 39ccdb18 Christos Stavrakakis
            if not msg:
225 39ccdb18 Christos Stavrakakis
                self.logger.debug("Ignoring job: %s: %s", job_id, op_id)
226 07e4ab22 Christos Stavrakakis
                continue
227 07e4ab22 Christos Stavrakakis
228 39ccdb18 Christos Stavrakakis
            # Generate a unique message identifier
229 39ccdb18 Christos Stavrakakis
            event_time = get_time_from_status(op, job)
230 39ccdb18 Christos Stavrakakis
231 b024b97a Georgios Gousios
            # Get the last line of the op log as message
232 b024b97a Georgios Gousios
            try:
233 b024b97a Georgios Gousios
                logmsg = op.log[-1][-1]
234 b024b97a Georgios Gousios
            except IndexError:
235 b024b97a Georgios Gousios
                logmsg = None
236 348f53de Georgios Gousios
237 39ccdb18 Christos Stavrakakis
            # Add shared attributes for all operations
238 39ccdb18 Christos Stavrakakis
            msg.update({"event_time": event_time,
239 39ccdb18 Christos Stavrakakis
                        "operation": op_id,
240 39ccdb18 Christos Stavrakakis
                        "status": op.status,
241 27a8e4ae Christos Stavrakakis
                        "cluster": self.cluster_name,
242 39ccdb18 Christos Stavrakakis
                        "logmsg": logmsg,
243 39ccdb18 Christos Stavrakakis
                        "jobId": job_id})
244 c4e55622 Christos Stavrakakis
245 12ee1ad8 Christos Stavrakakis
            if op.status == "success":
246 12ee1ad8 Christos Stavrakakis
                msg["result"] = op.result
247 12ee1ad8 Christos Stavrakakis
248 90858bda Christos Stavrakakis
            if ((op_id in ["OP_INSTANCE_CREATE", "OP_INSTANCE_STARTUP"] and
249 90858bda Christos Stavrakakis
                 op.status == "success") or
250 90858bda Christos Stavrakakis
                (op_id == "OP_INSTANCE_SET_PARAMS" and
251 90858bda Christos Stavrakakis
                 op.status in ["success", "error", "cancelled"])):
252 0e1f3323 Christos Stavrakakis
                    nics = get_instance_nics(msg["instance"], self.logger)
253 0e1f3323 Christos Stavrakakis
                    msg["nics"] = nics
254 0e1f3323 Christos Stavrakakis
255 c4e55622 Christos Stavrakakis
            msg = json.dumps(msg)
256 39ccdb18 Christos Stavrakakis
            self.logger.debug("Delivering msg: %s (key=%s)", msg, routekey)
257 c4e55622 Christos Stavrakakis
258 c4e55622 Christos Stavrakakis
            # Send the message to RabbitMQ
259 db400d82 Christos Stavrakakis
            self.client.basic_publish(settings.EXCHANGE_GANETI,
260 db400d82 Christos Stavrakakis
                                      routekey,
261 db400d82 Christos Stavrakakis
                                      msg)
262 39ccdb18 Christos Stavrakakis
263 39ccdb18 Christos Stavrakakis
    def process_instance_op(self, op, job_id):
264 39ccdb18 Christos Stavrakakis
        """ Process OP_INSTANCE_* opcodes.
265 39ccdb18 Christos Stavrakakis

266 39ccdb18 Christos Stavrakakis
        """
267 39ccdb18 Christos Stavrakakis
        input = op.input
268 39ccdb18 Christos Stavrakakis
        op_id = input.OP_ID
269 39ccdb18 Christos Stavrakakis
270 39ccdb18 Christos Stavrakakis
        instances = None
271 39ccdb18 Christos Stavrakakis
        instances = get_field(input, 'instance_name')
272 39ccdb18 Christos Stavrakakis
        if not instances:
273 39ccdb18 Christos Stavrakakis
            instances = get_field(input, 'instances')
274 39ccdb18 Christos Stavrakakis
            if not instances or len(instances) > 1:
275 39ccdb18 Christos Stavrakakis
                # Do not publish messages for jobs with no or multiple
276 449787d3 Christos Stavrakakis
                # instances.  Currently snf-dispatcher can not normally handle
277 449787d3 Christos Stavrakakis
                # these messages
278 39ccdb18 Christos Stavrakakis
                return None, None
279 39ccdb18 Christos Stavrakakis
            else:
280 39ccdb18 Christos Stavrakakis
                instances = instances[0]
281 39ccdb18 Christos Stavrakakis
282 39ccdb18 Christos Stavrakakis
        self.logger.debug("Job: %d: %s(%s) %s", job_id, op_id,
283 39ccdb18 Christos Stavrakakis
                          instances, op.status)
284 39ccdb18 Christos Stavrakakis
285 39ccdb18 Christos Stavrakakis
        msg = {"type": "ganeti-op-status",
286 39ccdb18 Christos Stavrakakis
               "instance": instances,
287 39ccdb18 Christos Stavrakakis
               "operation": op_id}
288 39ccdb18 Christos Stavrakakis
289 12ee1ad8 Christos Stavrakakis
        if op_id == "OP_INSTANCE_SET_PARAMS":
290 12ee1ad8 Christos Stavrakakis
            beparams = get_field(input, "beparams")
291 12ee1ad8 Christos Stavrakakis
            if beparams:
292 12ee1ad8 Christos Stavrakakis
                msg["beparams"] = beparams
293 12ee1ad8 Christos Stavrakakis
294 39ccdb18 Christos Stavrakakis
        routekey = "ganeti.%s.event.op" % prefix_from_name(instances)
295 39ccdb18 Christos Stavrakakis
296 39ccdb18 Christos Stavrakakis
        return msg, routekey
297 39ccdb18 Christos Stavrakakis
298 39ccdb18 Christos Stavrakakis
    def process_network_op(self, op, job_id):
299 39ccdb18 Christos Stavrakakis
        """ Process OP_NETWORK_* opcodes.
300 39ccdb18 Christos Stavrakakis

301 39ccdb18 Christos Stavrakakis
        """
302 39ccdb18 Christos Stavrakakis
303 39ccdb18 Christos Stavrakakis
        input = op.input
304 39ccdb18 Christos Stavrakakis
        op_id = input.OP_ID
305 39ccdb18 Christos Stavrakakis
        network_name = get_field(input, 'network_name')
306 39ccdb18 Christos Stavrakakis
307 39ccdb18 Christos Stavrakakis
        if not network_name:
308 39ccdb18 Christos Stavrakakis
            return None, None
309 39ccdb18 Christos Stavrakakis
310 39ccdb18 Christos Stavrakakis
        self.logger.debug("Job: %d: %s(%s) %s", job_id, op_id,
311 39ccdb18 Christos Stavrakakis
                          network_name, op.status)
312 39ccdb18 Christos Stavrakakis
313 39ccdb18 Christos Stavrakakis
        msg = {'operation':    op_id,
314 39ccdb18 Christos Stavrakakis
               'type':         "ganeti-network-status",
315 39ccdb18 Christos Stavrakakis
               'network':      network_name,
316 39ccdb18 Christos Stavrakakis
               'subnet':       get_field(input, 'network'),
317 39ccdb18 Christos Stavrakakis
               # 'network_mode': get_field(input, 'network_mode'),
318 39ccdb18 Christos Stavrakakis
               # 'network_link': get_field(input, 'network_link'),
319 39ccdb18 Christos Stavrakakis
               'gateway':      get_field(input, 'gateway'),
320 39ccdb18 Christos Stavrakakis
               'group_name':   get_field(input, 'group_name')}
321 39ccdb18 Christos Stavrakakis
322 fd2bdbb2 Christos Stavrakakis
        if op_id == "OP_NETWORK_SET_PARAMS":
323 449787d3 Christos Stavrakakis
            msg["add_reserved_ips"] = get_field(input, "add_reserved_ips")
324 449787d3 Christos Stavrakakis
            msg["remove_reserved_ips"] = get_field(input,
325 449787d3 Christos Stavrakakis
                                                   "remove_reserved_ips")
326 39ccdb18 Christos Stavrakakis
        routekey = "ganeti.%s.event.network" % prefix_from_name(network_name)
327 39ccdb18 Christos Stavrakakis
328 39ccdb18 Christos Stavrakakis
        return msg, routekey
329 39ccdb18 Christos Stavrakakis
330 36d450e8 Christos Stavrakakis
    def process_cluster_op(self, op, job_id):
331 36d450e8 Christos Stavrakakis
        """ Process OP_CLUSTER_* opcodes.
332 39ccdb18 Christos Stavrakakis

333 36d450e8 Christos Stavrakakis
        """
334 36d450e8 Christos Stavrakakis
335 36d450e8 Christos Stavrakakis
        input = op.input
336 36d450e8 Christos Stavrakakis
        op_id = input.OP_ID
337 36d450e8 Christos Stavrakakis
338 36d450e8 Christos Stavrakakis
        self.logger.debug("Job: %d: %s %s", job_id, op_id, op.status)
339 39ccdb18 Christos Stavrakakis
340 36d450e8 Christos Stavrakakis
        msg = {'operation':    op_id,
341 36d450e8 Christos Stavrakakis
               'type':         "ganeti-cluster-status"}
342 36d450e8 Christos Stavrakakis
343 36d450e8 Christos Stavrakakis
        routekey = "ganeti.event.cluster"
344 36d450e8 Christos Stavrakakis
345 36d450e8 Christos Stavrakakis
        return msg, routekey
346 39ccdb18 Christos Stavrakakis
347 39ccdb18 Christos Stavrakakis
348 27a8e4ae Christos Stavrakakis
def find_cluster_name():
349 51fc0054 Christos Stavrakakis
    global handler_logger
350 51fc0054 Christos Stavrakakis
    try:
351 a9bb2a1a Christos Stavrakakis
        ss = SimpleStore()
352 a9bb2a1a Christos Stavrakakis
        name = ss.GetClusterName()
353 51fc0054 Christos Stavrakakis
    except Exception as e:
354 51fc0054 Christos Stavrakakis
        handler_logger.error('Can not get the name of the Cluster: %s' % e)
355 51fc0054 Christos Stavrakakis
        raise e
356 51fc0054 Christos Stavrakakis
357 27a8e4ae Christos Stavrakakis
    return name
358 39ccdb18 Christos Stavrakakis
359 449787d3 Christos Stavrakakis
360 b024b97a Georgios Gousios
handler_logger = None
361 449787d3 Christos Stavrakakis
362 449787d3 Christos Stavrakakis
363 b024b97a Georgios Gousios
def fatal_signal_handler(signum, frame):
364 b024b97a Georgios Gousios
    global handler_logger
365 b024b97a Georgios Gousios
366 b024b97a Georgios Gousios
    handler_logger.info("Caught fatal signal %d, will raise SystemExit",
367 2cd99e7a Georgios Gousios
                        signum)
368 b024b97a Georgios Gousios
    raise SystemExit
369 b024b97a Georgios Gousios
370 449787d3 Christos Stavrakakis
371 b024b97a Georgios Gousios
def parse_arguments(args):
372 b024b97a Georgios Gousios
    from optparse import OptionParser
373 b024b97a Georgios Gousios
374 b024b97a Georgios Gousios
    parser = OptionParser()
375 b024b97a Georgios Gousios
    parser.add_option("-d", "--debug", action="store_true", dest="debug",
376 2cd99e7a Georgios Gousios
                      help="Enable debugging information")
377 b024b97a Georgios Gousios
    parser.add_option("-l", "--log", dest="log_file",
378 45ebfd48 Vangelis Koukis
                      default="/var/log/snf-ganeti-eventd.log",
379 2cd99e7a Georgios Gousios
                      metavar="FILE",
380 2cd99e7a Georgios Gousios
                      help="Write log to FILE instead of %s" %
381 449787d3 Christos Stavrakakis
                           "/var/log/snf-ganeti-eventd.log")
382 b024b97a Georgios Gousios
    parser.add_option('--pid-file', dest="pid_file",
383 45ebfd48 Vangelis Koukis
                      default="/var/run/snf-ganeti-eventd.pid",
384 2cd99e7a Georgios Gousios
                      metavar='PIDFILE',
385 2cd99e7a Georgios Gousios
                      help="Save PID to file (default: %s)" %
386 449787d3 Christos Stavrakakis
                           "/var/run/snf-ganeti-eventd.pid")
387 b024b97a Georgios Gousios
388 b024b97a Georgios Gousios
    return parser.parse_args(args)
389 b024b97a Georgios Gousios
390 449787d3 Christos Stavrakakis
391 b024b97a Georgios Gousios
def main():
392 b024b97a Georgios Gousios
    global handler_logger
393 b024b97a Georgios Gousios
394 b024b97a Georgios Gousios
    (opts, args) = parse_arguments(sys.argv[1:])
395 b024b97a Georgios Gousios
396 b024b97a Georgios Gousios
    # Initialize logger
397 b024b97a Georgios Gousios
    lvl = logging.DEBUG if opts.debug else logging.INFO
398 03a4b970 Georgios Gousios
    logger = logging.getLogger("ganeti.eventd")
399 b024b97a Georgios Gousios
    logger.setLevel(lvl)
400 36cf1973 Vangelis Koukis
    formatter = logging.Formatter(
401 36cf1973 Vangelis Koukis
        "%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
402 36cf1973 Vangelis Koukis
        "%Y-%m-%d %H:%M:%S")
403 b024b97a Georgios Gousios
    handler = logging.FileHandler(opts.log_file)
404 b024b97a Georgios Gousios
    handler.setFormatter(formatter)
405 b024b97a Georgios Gousios
    logger.addHandler(handler)
406 b024b97a Georgios Gousios
    handler_logger = logger
407 b024b97a Georgios Gousios
408 1dc821c9 Christos Stavrakakis
    # Rename this process so 'ps' output looks like this is a native
409 1dc821c9 Christos Stavrakakis
    # executable.  Can not seperate command-line arguments from actual name of
410 1dc821c9 Christos Stavrakakis
    # the executable by NUL bytes, so only show the name of the executable
411 1dc821c9 Christos Stavrakakis
    # instead.  setproctitle.setproctitle("\x00".join(sys.argv))
412 1dc821c9 Christos Stavrakakis
    setproctitle.setproctitle(sys.argv[0])
413 1dc821c9 Christos Stavrakakis
414 cf2a3529 Christos Stavrakakis
    # Create pidfile
415 cf2a3529 Christos Stavrakakis
    pidf = daemon.pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
416 cf2a3529 Christos Stavrakakis
417 cf2a3529 Christos Stavrakakis
    # Remove any stale PID files, left behind by previous invocations
418 cf2a3529 Christos Stavrakakis
    if daemon.runner.is_pidfile_stale(pidf):
419 cf2a3529 Christos Stavrakakis
        logger.warning("Removing stale PID lock file %s", pidf.path)
420 cf2a3529 Christos Stavrakakis
        pidf.break_lock()
421 cf2a3529 Christos Stavrakakis
422 348f53de Georgios Gousios
    # Become a daemon:
423 348f53de Georgios Gousios
    # Redirect stdout and stderr to handler.stream to catch
424 348f53de Georgios Gousios
    # early errors in the daemonization process [e.g., pidfile creation]
425 348f53de Georgios Gousios
    # which will otherwise go to /dev/null.
426 348f53de Georgios Gousios
    daemon_context = daemon.DaemonContext(
427 449787d3 Christos Stavrakakis
        pidfile=pidf,
428 449787d3 Christos Stavrakakis
        umask=022,
429 449787d3 Christos Stavrakakis
        stdout=handler.stream,
430 449787d3 Christos Stavrakakis
        stderr=handler.stream,
431 449787d3 Christos Stavrakakis
        files_preserve=[handler.stream])
432 cf2a3529 Christos Stavrakakis
    try:
433 cf2a3529 Christos Stavrakakis
        daemon_context.open()
434 cf2a3529 Christos Stavrakakis
    except (daemon.pidlockfile.AlreadyLocked, LockTimeout):
435 cf2a3529 Christos Stavrakakis
        logger.critical("Failed to lock pidfile %s, another instance running?",
436 cf2a3529 Christos Stavrakakis
                        pidf.path)
437 cf2a3529 Christos Stavrakakis
        sys.exit(1)
438 cf2a3529 Christos Stavrakakis
439 348f53de Georgios Gousios
    logger.info("Became a daemon")
440 348f53de Georgios Gousios
441 348f53de Georgios Gousios
    # Catch signals to ensure graceful shutdown
442 348f53de Georgios Gousios
    signal(SIGINT, fatal_signal_handler)
443 348f53de Georgios Gousios
    signal(SIGTERM, fatal_signal_handler)
444 348f53de Georgios Gousios
445 b024b97a Georgios Gousios
    # Monitor the Ganeti job queue, create and push notifications
446 b024b97a Georgios Gousios
    wm = pyinotify.WatchManager()
447 449787d3 Christos Stavrakakis
    mask = (pyinotify.EventsCodes.ALL_FLAGS["IN_MOVED_TO"] |
448 449787d3 Christos Stavrakakis
            pyinotify.EventsCodes.ALL_FLAGS["IN_CLOSE_WRITE"])
449 51fc0054 Christos Stavrakakis
450 27a8e4ae Christos Stavrakakis
    cluster_name = find_cluster_name()
451 51fc0054 Christos Stavrakakis
452 27a8e4ae Christos Stavrakakis
    handler = JobFileHandler(logger, cluster_name)
453 b024b97a Georgios Gousios
    notifier = pyinotify.Notifier(wm, handler)
454 b024b97a Georgios Gousios
455 b024b97a Georgios Gousios
    try:
456 b024b97a Georgios Gousios
        # Fail if adding the inotify() watch fails for any reason
457 a9bb2a1a Christos Stavrakakis
        res = wm.add_watch(pathutils.QUEUE_DIR, mask)
458 a9bb2a1a Christos Stavrakakis
        if res[pathutils.QUEUE_DIR] < 0:
459 36cf1973 Vangelis Koukis
            raise Exception("pyinotify add_watch returned negative descriptor")
460 348f53de Georgios Gousios
461 a9bb2a1a Christos Stavrakakis
        logger.info("Now watching %s of %s" % (pathutils.QUEUE_DIR,
462 a9bb2a1a Christos Stavrakakis
                    cluster_name))
463 b024b97a Georgios Gousios
464 b024b97a Georgios Gousios
        while True:    # loop forever
465 348f53de Georgios Gousios
            # process the queue of events as explained above
466 b024b97a Georgios Gousios
            notifier.process_events()
467 b024b97a Georgios Gousios
            if notifier.check_events():
468 b024b97a Georgios Gousios
                # read notified events and enqeue them
469 b024b97a Georgios Gousios
                notifier.read_events()
470 b024b97a Georgios Gousios
    except SystemExit:
471 b024b97a Georgios Gousios
        logger.info("SystemExit")
472 b024b97a Georgios Gousios
    except:
473 b024b97a Georgios Gousios
        logger.exception("Caught exception, terminating")
474 b024b97a Georgios Gousios
    finally:
475 b024b97a Georgios Gousios
        # destroy the inotify's instance on this interrupt (stop monitoring)
476 b024b97a Georgios Gousios
        notifier.stop()
477 b024b97a Georgios Gousios
        raise
478 b024b97a Georgios Gousios
479 b024b97a Georgios Gousios
if __name__ == "__main__":
480 b024b97a Georgios Gousios
    sys.exit(main())
481 b024b97a Georgios Gousios
482 348f53de Georgios Gousios
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :