Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-gtools / synnefo / ganeti / eventd.py @ 220e3cc0

History | View | Annotate | Download (16.4 kB)

1 b024b97a Georgios Gousios
#!/usr/bin/env python
2 737b0e28 Vangelis Koukis
# -*- coding: utf-8 -*-
3 b024b97a Georgios Gousios
#
4 737b0e28 Vangelis Koukis
# Copyright 2011 GRNET S.A. All rights reserved.
5 737b0e28 Vangelis Koukis
#
6 737b0e28 Vangelis Koukis
# Redistribution and use in source and binary forms, with or
7 737b0e28 Vangelis Koukis
# without modification, are permitted provided that the following
8 737b0e28 Vangelis Koukis
# conditions are met:
9 737b0e28 Vangelis Koukis
#
10 737b0e28 Vangelis Koukis
#   1. Redistributions of source code must retain the above
11 737b0e28 Vangelis Koukis
#      copyright notice, this list of conditions and the following
12 737b0e28 Vangelis Koukis
#      disclaimer.
13 737b0e28 Vangelis Koukis
#
14 737b0e28 Vangelis Koukis
#   2. Redistributions in binary form must reproduce the above
15 737b0e28 Vangelis Koukis
#      copyright notice, this list of conditions and the following
16 737b0e28 Vangelis Koukis
#      disclaimer in the documentation and/or other materials
17 737b0e28 Vangelis Koukis
#      provided with the distribution.
18 737b0e28 Vangelis Koukis
#
19 737b0e28 Vangelis Koukis
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20 737b0e28 Vangelis Koukis
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 737b0e28 Vangelis Koukis
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22 737b0e28 Vangelis Koukis
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23 737b0e28 Vangelis Koukis
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24 737b0e28 Vangelis Koukis
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25 737b0e28 Vangelis Koukis
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26 737b0e28 Vangelis Koukis
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27 737b0e28 Vangelis Koukis
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28 737b0e28 Vangelis Koukis
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29 737b0e28 Vangelis Koukis
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30 737b0e28 Vangelis Koukis
# POSSIBILITY OF SUCH DAMAGE.
31 737b0e28 Vangelis Koukis
#
32 737b0e28 Vangelis Koukis
# The views and conclusions contained in the software and
33 737b0e28 Vangelis Koukis
# documentation are those of the authors and should not be
34 737b0e28 Vangelis Koukis
# interpreted as representing official policies, either expressed
35 737b0e28 Vangelis Koukis
# or implied, of GRNET S.A.
36 737b0e28 Vangelis Koukis
#
37 45ebfd48 Vangelis Koukis
38 737b0e28 Vangelis Koukis
"""Ganeti notification daemon with AMQP support
39 b024b97a Georgios Gousios

40 b024b97a Georgios Gousios
A daemon to monitor the Ganeti job queue and publish job progress
41 80bd5072 Georgios Gousios
and Ganeti VM state notifications to the ganeti exchange
42 b024b97a Georgios Gousios
"""
43 b024b97a Georgios Gousios
44 b024b97a Georgios Gousios
import sys
45 b024b97a Georgios Gousios
import os
46 b024b97a Georgios Gousios
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
47 b024b97a Georgios Gousios
sys.path.append(path)
48 b024b97a Georgios Gousios
49 b024b97a Georgios Gousios
import json
50 b024b97a Georgios Gousios
import logging
51 b024b97a Georgios Gousios
import pyinotify
52 b024b97a Georgios Gousios
import daemon
53 b024b97a Georgios Gousios
import daemon.pidlockfile
54 cf2a3529 Christos Stavrakakis
import daemon.runner
55 cf2a3529 Christos Stavrakakis
from lockfile import LockTimeout
56 b024b97a Georgios Gousios
from signal import signal, SIGINT, SIGTERM
57 1dc821c9 Christos Stavrakakis
import setproctitle
58 b024b97a Georgios Gousios
59 a9bb2a1a Christos Stavrakakis
from ganeti import utils, jqueue, constants, serializer, pathutils, cli
60 6df16263 Christos Stavrakakis
from ganeti import errors as ganeti_errors
61 a9bb2a1a Christos Stavrakakis
from ganeti.ssconf import SimpleStore
62 e66a7e34 Christos Stavrakakis
63 b024b97a Georgios Gousios
64 6d6b8f88 Kostas Papadimitriou
from synnefo import settings
65 c4e55622 Christos Stavrakakis
from synnefo.lib.amqp import AMQPClient
66 c4e55622 Christos Stavrakakis
67 27a8e4ae Christos Stavrakakis
68 c4e55622 Christos Stavrakakis
def get_time_from_status(op, job):
69 c4e55622 Christos Stavrakakis
    """Generate a unique message identifier for a ganeti job.
70 c4e55622 Christos Stavrakakis

71 c4e55622 Christos Stavrakakis
    The identifier is based on the timestamp of the job. Since a ganeti
72 c4e55622 Christos Stavrakakis
    job passes from multiple states, we need to pick the timestamp that
73 c4e55622 Christos Stavrakakis
    corresponds to each state.
74 c4e55622 Christos Stavrakakis

75 c4e55622 Christos Stavrakakis
    """
76 c4e55622 Christos Stavrakakis
    status = op.status
77 c4e55622 Christos Stavrakakis
    if status == constants.JOB_STATUS_QUEUED:
78 765ff3ff Christos Stavrakakis
        time = job.received_timestamp
79 27a8e4ae Christos Stavrakakis
    try:  # Compatibility with Ganeti version
80 39ccdb18 Christos Stavrakakis
        if status == constants.JOB_STATUS_WAITLOCK:
81 765ff3ff Christos Stavrakakis
            time = op.start_timestamp
82 39ccdb18 Christos Stavrakakis
    except AttributeError:
83 39ccdb18 Christos Stavrakakis
        if status == constants.JOB_STATUS_WAITING:
84 765ff3ff Christos Stavrakakis
            time = op.start_timestamp
85 c4e55622 Christos Stavrakakis
    if status == constants.JOB_STATUS_CANCELING:
86 765ff3ff Christos Stavrakakis
        time = op.start_timestamp
87 c4e55622 Christos Stavrakakis
    if status == constants.JOB_STATUS_RUNNING:
88 765ff3ff Christos Stavrakakis
        time = op.exec_timestamp
89 c4e55622 Christos Stavrakakis
    if status in constants.JOBS_FINALIZED:
90 765ff3ff Christos Stavrakakis
        time = op.end_timestamp
91 765ff3ff Christos Stavrakakis
92 765ff3ff Christos Stavrakakis
    return time and time or job.end_timestamp
93 c4e55622 Christos Stavrakakis
94 27a8e4ae Christos Stavrakakis
    raise InvalidBackendStatus(status, job)
95 c4e55622 Christos Stavrakakis
96 c4e55622 Christos Stavrakakis
97 0e1f3323 Christos Stavrakakis
def get_instance_nics(instance, logger):
98 0e1f3323 Christos Stavrakakis
    """Query Ganeti to a get the instance's NICs.
99 0e1f3323 Christos Stavrakakis

100 6df16263 Christos Stavrakakis
    Get instance's NICs from Ganeti configuration data. If running on master,
101 6df16263 Christos Stavrakakis
    query Ganeti via Ganeti CLI client. Otherwise, get the nics from Ganeti
102 6df16263 Christos Stavrakakis
    configuration file.
103 6df16263 Christos Stavrakakis

104 0e1f3323 Christos Stavrakakis
    @type instance: string
105 0e1f3323 Christos Stavrakakis
    @param instance: the name of the instance
106 0e1f3323 Christos Stavrakakis
    @rtype: List of dicts
107 6df16263 Christos Stavrakakis
    @return: Dictionary containing the instance's NICs. Each dictionary
108 0e1f3323 Christos Stavrakakis
             contains the following keys: 'network', 'ip', 'mac', 'mode',
109 0e1f3323 Christos Stavrakakis
             'link' and 'firewall'
110 0e1f3323 Christos Stavrakakis

111 0e1f3323 Christos Stavrakakis
    """
112 6df16263 Christos Stavrakakis
    try:
113 6df16263 Christos Stavrakakis
        client = cli.GetClient()
114 220e3cc0 Christos Stavrakakis
        fields = ["nic.names", "nic.networks.names", "nic.ips", "nic.macs",
115 220e3cc0 Christos Stavrakakis
                  "nic.modes", "nic.links", "tags"]
116 6df16263 Christos Stavrakakis
        info = client.QueryInstances([instance], fields, use_locking=False)
117 220e3cc0 Christos Stavrakakis
        names, networks, ips, macs, modes, links, tags = info[0]
118 220e3cc0 Christos Stavrakakis
        nic_keys = ["name", "network", "ip", "mac", "mode", "link"]
119 220e3cc0 Christos Stavrakakis
        nics = zip(names, networks, ips, macs, modes, links)
120 6df16263 Christos Stavrakakis
        nics = map(lambda x: dict(zip(nic_keys, x)), nics)
121 6df16263 Christos Stavrakakis
    except ganeti_errors.OpPrereqError:
122 6df16263 Christos Stavrakakis
        # Not running on master! Load the conf file
123 6df16263 Christos Stavrakakis
        raw_data = utils.ReadFile(constants.CLUSTER_CONF_FILE)
124 6df16263 Christos Stavrakakis
        config = serializer.LoadJson(raw_data)
125 6df16263 Christos Stavrakakis
        i = config["instances"][instance]
126 6df16263 Christos Stavrakakis
        nics = []
127 6df16263 Christos Stavrakakis
        for nic in i["nics"]:
128 6df16263 Christos Stavrakakis
            params = nic.pop("nicparams")
129 6df16263 Christos Stavrakakis
            nic["mode"] = params["mode"]
130 6df16263 Christos Stavrakakis
            nic["link"] = params["link"]
131 6df16263 Christos Stavrakakis
            nics.append(nic)
132 6df16263 Christos Stavrakakis
        tags = i.get("tags", [])
133 0e1f3323 Christos Stavrakakis
    # Get firewall from instance Tags
134 0e1f3323 Christos Stavrakakis
    # Tags are of the form synnefo:network:N:firewall_mode
135 0e1f3323 Christos Stavrakakis
    for tag in tags:
136 0e1f3323 Christos Stavrakakis
        t = tag.split(":")
137 0e1f3323 Christos Stavrakakis
        if t[0:2] == ["synnefo", "network"]:
138 0e1f3323 Christos Stavrakakis
            if len(t) != 4:
139 0e1f3323 Christos Stavrakakis
                logger.error("Malformed synefo tag %s", tag)
140 0e1f3323 Christos Stavrakakis
                continue
141 0e1f3323 Christos Stavrakakis
            try:
142 0e1f3323 Christos Stavrakakis
                index = int(t[2])
143 0e1f3323 Christos Stavrakakis
                nics[index]['firewall'] = t[3]
144 0e1f3323 Christos Stavrakakis
            except ValueError:
145 0e1f3323 Christos Stavrakakis
                logger.error("Malformed synnefo tag %s", tag)
146 0e1f3323 Christos Stavrakakis
            except IndexError:
147 0e1f3323 Christos Stavrakakis
                logger.error("Found tag %s for non-existent NIC %d",
148 0e1f3323 Christos Stavrakakis
                             tag, index)
149 0e1f3323 Christos Stavrakakis
    return nics
150 0e1f3323 Christos Stavrakakis
151 0e1f3323 Christos Stavrakakis
152 c4e55622 Christos Stavrakakis
class InvalidBackendStatus(Exception):
153 c4e55622 Christos Stavrakakis
    def __init__(self, status, job):
154 c4e55622 Christos Stavrakakis
        self.status = status
155 c4e55622 Christos Stavrakakis
        self.job = job
156 c4e55622 Christos Stavrakakis
157 c4e55622 Christos Stavrakakis
    def __str__(self):
158 c4e55622 Christos Stavrakakis
        return repr("Invalid backend status: %s in job %s"
159 c4e55622 Christos Stavrakakis
                    % (self.status, self.job))
160 c4e55622 Christos Stavrakakis
161 27a8e4ae Christos Stavrakakis
162 39ccdb18 Christos Stavrakakis
def prefix_from_name(name):
163 39ccdb18 Christos Stavrakakis
    return name.split('-')[0]
164 39ccdb18 Christos Stavrakakis
165 39ccdb18 Christos Stavrakakis
166 39ccdb18 Christos Stavrakakis
def get_field(from_, field):
167 39ccdb18 Christos Stavrakakis
    try:
168 39ccdb18 Christos Stavrakakis
        return getattr(from_, field)
169 39ccdb18 Christos Stavrakakis
    except AttributeError:
170 39ccdb18 Christos Stavrakakis
        None
171 39ccdb18 Christos Stavrakakis
172 45ebfd48 Vangelis Koukis
173 b024b97a Georgios Gousios
class JobFileHandler(pyinotify.ProcessEvent):
174 27a8e4ae Christos Stavrakakis
    def __init__(self, logger, cluster_name):
175 348f53de Georgios Gousios
        pyinotify.ProcessEvent.__init__(self)
176 348f53de Georgios Gousios
        self.logger = logger
177 27a8e4ae Christos Stavrakakis
        self.cluster_name = cluster_name
178 27a8e4ae Christos Stavrakakis
179 2ef10562 Christos Stavrakakis
        # Set max_retries to 0 for unlimited retries.
180 2ef10562 Christos Stavrakakis
        self.client = AMQPClient(hosts=settings.AMQP_HOSTS, confirm_buffer=25,
181 a8858945 Christos Stavrakakis
                                 max_retries=0, logger=logger)
182 2ef10562 Christos Stavrakakis
183 c4e55622 Christos Stavrakakis
        handler_logger.info("Attempting to connect to RabbitMQ hosts")
184 27a8e4ae Christos Stavrakakis
185 c4e55622 Christos Stavrakakis
        self.client.connect()
186 c4e55622 Christos Stavrakakis
        handler_logger.info("Connected succesfully")
187 b024b97a Georgios Gousios
188 27a8e4ae Christos Stavrakakis
        self.client.exchange_declare(settings.EXCHANGE_GANETI, type='topic')
189 27a8e4ae Christos Stavrakakis
190 39ccdb18 Christos Stavrakakis
        self.op_handlers = {"INSTANCE": self.process_instance_op,
191 36d450e8 Christos Stavrakakis
                            "NETWORK": self.process_network_op,
192 36d450e8 Christos Stavrakakis
                            "CLUSTER": self.process_cluster_op}
193 39ccdb18 Christos Stavrakakis
                            # "GROUP": self.process_group_op}
194 39ccdb18 Christos Stavrakakis
195 b024b97a Georgios Gousios
    def process_IN_CLOSE_WRITE(self, event):
196 80dcd124 Vangelis Koukis
        self.process_IN_MOVED_TO(event)
197 80dcd124 Vangelis Koukis
198 80dcd124 Vangelis Koukis
    def process_IN_MOVED_TO(self, event):
199 b024b97a Georgios Gousios
        jobfile = os.path.join(event.path, event.name)
200 b024b97a Georgios Gousios
        if not event.name.startswith("job-"):
201 b024b97a Georgios Gousios
            self.logger.debug("Not a job file: %s" % event.path)
202 b024b97a Georgios Gousios
            return
203 b024b97a Georgios Gousios
204 b024b97a Georgios Gousios
        try:
205 b024b97a Georgios Gousios
            data = utils.ReadFile(jobfile)
206 b024b97a Georgios Gousios
        except IOError:
207 b024b97a Georgios Gousios
            return
208 b024b97a Georgios Gousios
209 b024b97a Georgios Gousios
        data = serializer.LoadJson(data)
210 a9bb2a1a Christos Stavrakakis
        job = jqueue._QueuedJob.Restore(None, data, False, False)
211 4ed30eed Christos Stavrakakis
212 39ccdb18 Christos Stavrakakis
        job_id = int(job.id)
213 b024b97a Georgios Gousios
214 b024b97a Georgios Gousios
        for op in job.ops:
215 39ccdb18 Christos Stavrakakis
            op_id = op.input.OP_ID
216 b024b97a Georgios Gousios
217 39ccdb18 Christos Stavrakakis
            msg = None
218 b024b97a Georgios Gousios
            try:
219 39ccdb18 Christos Stavrakakis
                handler_fn = self.op_handlers[op_id.split('_')[1]]
220 39ccdb18 Christos Stavrakakis
                msg, routekey = handler_fn(op, job_id)
221 39ccdb18 Christos Stavrakakis
            except KeyError:
222 b024b97a Georgios Gousios
                pass
223 b024b97a Georgios Gousios
224 39ccdb18 Christos Stavrakakis
            if not msg:
225 39ccdb18 Christos Stavrakakis
                self.logger.debug("Ignoring job: %s: %s", job_id, op_id)
226 07e4ab22 Christos Stavrakakis
                continue
227 07e4ab22 Christos Stavrakakis
228 39ccdb18 Christos Stavrakakis
            # Generate a unique message identifier
229 39ccdb18 Christos Stavrakakis
            event_time = get_time_from_status(op, job)
230 39ccdb18 Christos Stavrakakis
231 b024b97a Georgios Gousios
            # Get the last line of the op log as message
232 b024b97a Georgios Gousios
            try:
233 b024b97a Georgios Gousios
                logmsg = op.log[-1][-1]
234 b024b97a Georgios Gousios
            except IndexError:
235 b024b97a Georgios Gousios
                logmsg = None
236 348f53de Georgios Gousios
237 39ccdb18 Christos Stavrakakis
            # Add shared attributes for all operations
238 39ccdb18 Christos Stavrakakis
            msg.update({"event_time": event_time,
239 39ccdb18 Christos Stavrakakis
                        "operation": op_id,
240 39ccdb18 Christos Stavrakakis
                        "status": op.status,
241 27a8e4ae Christos Stavrakakis
                        "cluster": self.cluster_name,
242 39ccdb18 Christos Stavrakakis
                        "logmsg": logmsg,
243 39ccdb18 Christos Stavrakakis
                        "jobId": job_id})
244 c4e55622 Christos Stavrakakis
245 12ee1ad8 Christos Stavrakakis
            if op.status == "success":
246 12ee1ad8 Christos Stavrakakis
                msg["result"] = op.result
247 12ee1ad8 Christos Stavrakakis
248 90858bda Christos Stavrakakis
            if ((op_id in ["OP_INSTANCE_CREATE", "OP_INSTANCE_STARTUP"] and
249 90858bda Christos Stavrakakis
                 op.status == "success") or
250 90858bda Christos Stavrakakis
                (op_id == "OP_INSTANCE_SET_PARAMS" and
251 90858bda Christos Stavrakakis
                 op.status in ["success", "error", "cancelled"])):
252 0e1f3323 Christos Stavrakakis
                    nics = get_instance_nics(msg["instance"], self.logger)
253 0e1f3323 Christos Stavrakakis
                    msg["nics"] = nics
254 0e1f3323 Christos Stavrakakis
255 c4e55622 Christos Stavrakakis
            msg = json.dumps(msg)
256 39ccdb18 Christos Stavrakakis
            self.logger.debug("Delivering msg: %s (key=%s)", msg, routekey)
257 c4e55622 Christos Stavrakakis
258 c4e55622 Christos Stavrakakis
            # Send the message to RabbitMQ
259 db400d82 Christos Stavrakakis
            self.client.basic_publish(settings.EXCHANGE_GANETI,
260 db400d82 Christos Stavrakakis
                                      routekey,
261 db400d82 Christos Stavrakakis
                                      msg)
262 39ccdb18 Christos Stavrakakis
263 39ccdb18 Christos Stavrakakis
    def process_instance_op(self, op, job_id):
264 39ccdb18 Christos Stavrakakis
        """ Process OP_INSTANCE_* opcodes.
265 39ccdb18 Christos Stavrakakis

266 39ccdb18 Christos Stavrakakis
        """
267 39ccdb18 Christos Stavrakakis
        input = op.input
268 39ccdb18 Christos Stavrakakis
        op_id = input.OP_ID
269 39ccdb18 Christos Stavrakakis
270 39ccdb18 Christos Stavrakakis
        instances = None
271 39ccdb18 Christos Stavrakakis
        instances = get_field(input, 'instance_name')
272 39ccdb18 Christos Stavrakakis
        if not instances:
273 39ccdb18 Christos Stavrakakis
            instances = get_field(input, 'instances')
274 39ccdb18 Christos Stavrakakis
            if not instances or len(instances) > 1:
275 39ccdb18 Christos Stavrakakis
                # Do not publish messages for jobs with no or multiple
276 449787d3 Christos Stavrakakis
                # instances.  Currently snf-dispatcher can not normally handle
277 449787d3 Christos Stavrakakis
                # these messages
278 39ccdb18 Christos Stavrakakis
                return None, None
279 39ccdb18 Christos Stavrakakis
            else:
280 39ccdb18 Christos Stavrakakis
                instances = instances[0]
281 39ccdb18 Christos Stavrakakis
282 39ccdb18 Christos Stavrakakis
        self.logger.debug("Job: %d: %s(%s) %s", job_id, op_id,
283 39ccdb18 Christos Stavrakakis
                          instances, op.status)
284 39ccdb18 Christos Stavrakakis
285 e6fbada1 Christos Stavrakakis
        job_fields = {}
286 e6fbada1 Christos Stavrakakis
        if op_id in ["OP_INSTANCE_SET_PARAMS", "OP_INSTANCE_CREATE"]:
287 e6fbada1 Christos Stavrakakis
            job_fields = {"nics": get_field(input, "nics"),
288 e6fbada1 Christos Stavrakakis
                          "disks": get_field(input, "disks"),
289 e6fbada1 Christos Stavrakakis
                          "beparams": get_field(input, "beparams")}
290 e6fbada1 Christos Stavrakakis
291 39ccdb18 Christos Stavrakakis
        msg = {"type": "ganeti-op-status",
292 39ccdb18 Christos Stavrakakis
               "instance": instances,
293 e6fbada1 Christos Stavrakakis
               "operation": op_id,
294 e6fbada1 Christos Stavrakakis
               "job_fields": job_fields}
295 39ccdb18 Christos Stavrakakis
296 e6fbada1 Christos Stavrakakis
        if op_id in ["OP_INSTANCE_CREATE", "OP_INSTANCE_SET_PARAMS",
297 e6fbada1 Christos Stavrakakis
                     "OP_INSTANCE_STARTUP"]:
298 e6fbada1 Christos Stavrakakis
            if op.status == "success":
299 e6fbada1 Christos Stavrakakis
                nics = get_instance_nics(msg["instance"], self.logger)
300 e6fbada1 Christos Stavrakakis
                msg["instance_nics"] = nics
301 12ee1ad8 Christos Stavrakakis
302 39ccdb18 Christos Stavrakakis
        routekey = "ganeti.%s.event.op" % prefix_from_name(instances)
303 39ccdb18 Christos Stavrakakis
304 39ccdb18 Christos Stavrakakis
        return msg, routekey
305 39ccdb18 Christos Stavrakakis
306 39ccdb18 Christos Stavrakakis
    def process_network_op(self, op, job_id):
307 39ccdb18 Christos Stavrakakis
        """ Process OP_NETWORK_* opcodes.
308 39ccdb18 Christos Stavrakakis

309 39ccdb18 Christos Stavrakakis
        """
310 39ccdb18 Christos Stavrakakis
311 39ccdb18 Christos Stavrakakis
        input = op.input
312 39ccdb18 Christos Stavrakakis
        op_id = input.OP_ID
313 39ccdb18 Christos Stavrakakis
        network_name = get_field(input, 'network_name')
314 39ccdb18 Christos Stavrakakis
315 39ccdb18 Christos Stavrakakis
        if not network_name:
316 39ccdb18 Christos Stavrakakis
            return None, None
317 39ccdb18 Christos Stavrakakis
318 39ccdb18 Christos Stavrakakis
        self.logger.debug("Job: %d: %s(%s) %s", job_id, op_id,
319 39ccdb18 Christos Stavrakakis
                          network_name, op.status)
320 39ccdb18 Christos Stavrakakis
321 e6fbada1 Christos Stavrakakis
        job_fields = {
322 e6fbada1 Christos Stavrakakis
            'subnet': get_field(input, 'network'),
323 e6fbada1 Christos Stavrakakis
            'gateway': get_field(input, 'gateway'),
324 e6fbada1 Christos Stavrakakis
            "add_reserved_ips": get_field(input, "add_reserved_ips"),
325 e6fbada1 Christos Stavrakakis
            "remove_reserved_ips": get_field(input, "remove_reserved_ips"),
326 e6fbada1 Christos Stavrakakis
            # 'network_mode': get_field(input, 'network_mode'),
327 e6fbada1 Christos Stavrakakis
            # 'network_link': get_field(input, 'network_link'),
328 e6fbada1 Christos Stavrakakis
            'group_name': get_field(input, 'group_name')}
329 e6fbada1 Christos Stavrakakis
330 39ccdb18 Christos Stavrakakis
        msg = {'operation':    op_id,
331 39ccdb18 Christos Stavrakakis
               'type':         "ganeti-network-status",
332 39ccdb18 Christos Stavrakakis
               'network':      network_name,
333 e6fbada1 Christos Stavrakakis
               'job_fields':   job_fields}
334 e6fbada1 Christos Stavrakakis
335 39ccdb18 Christos Stavrakakis
        routekey = "ganeti.%s.event.network" % prefix_from_name(network_name)
336 39ccdb18 Christos Stavrakakis
337 39ccdb18 Christos Stavrakakis
        return msg, routekey
338 39ccdb18 Christos Stavrakakis
339 36d450e8 Christos Stavrakakis
    def process_cluster_op(self, op, job_id):
340 36d450e8 Christos Stavrakakis
        """ Process OP_CLUSTER_* opcodes.
341 39ccdb18 Christos Stavrakakis

342 36d450e8 Christos Stavrakakis
        """
343 36d450e8 Christos Stavrakakis
344 36d450e8 Christos Stavrakakis
        input = op.input
345 36d450e8 Christos Stavrakakis
        op_id = input.OP_ID
346 36d450e8 Christos Stavrakakis
347 36d450e8 Christos Stavrakakis
        self.logger.debug("Job: %d: %s %s", job_id, op_id, op.status)
348 39ccdb18 Christos Stavrakakis
349 36d450e8 Christos Stavrakakis
        msg = {'operation':    op_id,
350 36d450e8 Christos Stavrakakis
               'type':         "ganeti-cluster-status"}
351 36d450e8 Christos Stavrakakis
352 36d450e8 Christos Stavrakakis
        routekey = "ganeti.event.cluster"
353 36d450e8 Christos Stavrakakis
354 36d450e8 Christos Stavrakakis
        return msg, routekey
355 39ccdb18 Christos Stavrakakis
356 39ccdb18 Christos Stavrakakis
357 27a8e4ae Christos Stavrakakis
def find_cluster_name():
358 51fc0054 Christos Stavrakakis
    global handler_logger
359 51fc0054 Christos Stavrakakis
    try:
360 a9bb2a1a Christos Stavrakakis
        ss = SimpleStore()
361 a9bb2a1a Christos Stavrakakis
        name = ss.GetClusterName()
362 51fc0054 Christos Stavrakakis
    except Exception as e:
363 51fc0054 Christos Stavrakakis
        handler_logger.error('Can not get the name of the Cluster: %s' % e)
364 51fc0054 Christos Stavrakakis
        raise e
365 51fc0054 Christos Stavrakakis
366 27a8e4ae Christos Stavrakakis
    return name
367 39ccdb18 Christos Stavrakakis
368 449787d3 Christos Stavrakakis
369 b024b97a Georgios Gousios
handler_logger = None
370 449787d3 Christos Stavrakakis
371 449787d3 Christos Stavrakakis
372 b024b97a Georgios Gousios
def fatal_signal_handler(signum, frame):
373 b024b97a Georgios Gousios
    global handler_logger
374 b024b97a Georgios Gousios
375 b024b97a Georgios Gousios
    handler_logger.info("Caught fatal signal %d, will raise SystemExit",
376 2cd99e7a Georgios Gousios
                        signum)
377 b024b97a Georgios Gousios
    raise SystemExit
378 b024b97a Georgios Gousios
379 449787d3 Christos Stavrakakis
380 b024b97a Georgios Gousios
def parse_arguments(args):
381 b024b97a Georgios Gousios
    from optparse import OptionParser
382 b024b97a Georgios Gousios
383 b024b97a Georgios Gousios
    parser = OptionParser()
384 b024b97a Georgios Gousios
    parser.add_option("-d", "--debug", action="store_true", dest="debug",
385 2cd99e7a Georgios Gousios
                      help="Enable debugging information")
386 b024b97a Georgios Gousios
    parser.add_option("-l", "--log", dest="log_file",
387 45ebfd48 Vangelis Koukis
                      default="/var/log/snf-ganeti-eventd.log",
388 2cd99e7a Georgios Gousios
                      metavar="FILE",
389 2cd99e7a Georgios Gousios
                      help="Write log to FILE instead of %s" %
390 449787d3 Christos Stavrakakis
                           "/var/log/snf-ganeti-eventd.log")
391 b024b97a Georgios Gousios
    parser.add_option('--pid-file', dest="pid_file",
392 45ebfd48 Vangelis Koukis
                      default="/var/run/snf-ganeti-eventd.pid",
393 2cd99e7a Georgios Gousios
                      metavar='PIDFILE',
394 2cd99e7a Georgios Gousios
                      help="Save PID to file (default: %s)" %
395 449787d3 Christos Stavrakakis
                           "/var/run/snf-ganeti-eventd.pid")
396 b024b97a Georgios Gousios
397 b024b97a Georgios Gousios
    return parser.parse_args(args)
398 b024b97a Georgios Gousios
399 449787d3 Christos Stavrakakis
400 b024b97a Georgios Gousios
def main():
401 b024b97a Georgios Gousios
    global handler_logger
402 b024b97a Georgios Gousios
403 b024b97a Georgios Gousios
    (opts, args) = parse_arguments(sys.argv[1:])
404 b024b97a Georgios Gousios
405 b024b97a Georgios Gousios
    # Initialize logger
406 b024b97a Georgios Gousios
    lvl = logging.DEBUG if opts.debug else logging.INFO
407 03a4b970 Georgios Gousios
    logger = logging.getLogger("ganeti.eventd")
408 b024b97a Georgios Gousios
    logger.setLevel(lvl)
409 36cf1973 Vangelis Koukis
    formatter = logging.Formatter(
410 36cf1973 Vangelis Koukis
        "%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
411 36cf1973 Vangelis Koukis
        "%Y-%m-%d %H:%M:%S")
412 b024b97a Georgios Gousios
    handler = logging.FileHandler(opts.log_file)
413 b024b97a Georgios Gousios
    handler.setFormatter(formatter)
414 b024b97a Georgios Gousios
    logger.addHandler(handler)
415 b024b97a Georgios Gousios
    handler_logger = logger
416 b024b97a Georgios Gousios
417 1dc821c9 Christos Stavrakakis
    # Rename this process so 'ps' output looks like this is a native
418 1dc821c9 Christos Stavrakakis
    # executable.  Can not seperate command-line arguments from actual name of
419 1dc821c9 Christos Stavrakakis
    # the executable by NUL bytes, so only show the name of the executable
420 1dc821c9 Christos Stavrakakis
    # instead.  setproctitle.setproctitle("\x00".join(sys.argv))
421 1dc821c9 Christos Stavrakakis
    setproctitle.setproctitle(sys.argv[0])
422 1dc821c9 Christos Stavrakakis
423 cf2a3529 Christos Stavrakakis
    # Create pidfile
424 cf2a3529 Christos Stavrakakis
    pidf = daemon.pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
425 cf2a3529 Christos Stavrakakis
426 cf2a3529 Christos Stavrakakis
    # Remove any stale PID files, left behind by previous invocations
427 cf2a3529 Christos Stavrakakis
    if daemon.runner.is_pidfile_stale(pidf):
428 cf2a3529 Christos Stavrakakis
        logger.warning("Removing stale PID lock file %s", pidf.path)
429 cf2a3529 Christos Stavrakakis
        pidf.break_lock()
430 cf2a3529 Christos Stavrakakis
431 348f53de Georgios Gousios
    # Become a daemon:
432 348f53de Georgios Gousios
    # Redirect stdout and stderr to handler.stream to catch
433 348f53de Georgios Gousios
    # early errors in the daemonization process [e.g., pidfile creation]
434 348f53de Georgios Gousios
    # which will otherwise go to /dev/null.
435 348f53de Georgios Gousios
    daemon_context = daemon.DaemonContext(
436 449787d3 Christos Stavrakakis
        pidfile=pidf,
437 449787d3 Christos Stavrakakis
        umask=022,
438 449787d3 Christos Stavrakakis
        stdout=handler.stream,
439 449787d3 Christos Stavrakakis
        stderr=handler.stream,
440 449787d3 Christos Stavrakakis
        files_preserve=[handler.stream])
441 cf2a3529 Christos Stavrakakis
    try:
442 cf2a3529 Christos Stavrakakis
        daemon_context.open()
443 cf2a3529 Christos Stavrakakis
    except (daemon.pidlockfile.AlreadyLocked, LockTimeout):
444 cf2a3529 Christos Stavrakakis
        logger.critical("Failed to lock pidfile %s, another instance running?",
445 cf2a3529 Christos Stavrakakis
                        pidf.path)
446 cf2a3529 Christos Stavrakakis
        sys.exit(1)
447 cf2a3529 Christos Stavrakakis
448 348f53de Georgios Gousios
    logger.info("Became a daemon")
449 348f53de Georgios Gousios
450 348f53de Georgios Gousios
    # Catch signals to ensure graceful shutdown
451 348f53de Georgios Gousios
    signal(SIGINT, fatal_signal_handler)
452 348f53de Georgios Gousios
    signal(SIGTERM, fatal_signal_handler)
453 348f53de Georgios Gousios
454 b024b97a Georgios Gousios
    # Monitor the Ganeti job queue, create and push notifications
455 b024b97a Georgios Gousios
    wm = pyinotify.WatchManager()
456 449787d3 Christos Stavrakakis
    mask = (pyinotify.EventsCodes.ALL_FLAGS["IN_MOVED_TO"] |
457 449787d3 Christos Stavrakakis
            pyinotify.EventsCodes.ALL_FLAGS["IN_CLOSE_WRITE"])
458 51fc0054 Christos Stavrakakis
459 27a8e4ae Christos Stavrakakis
    cluster_name = find_cluster_name()
460 51fc0054 Christos Stavrakakis
461 27a8e4ae Christos Stavrakakis
    handler = JobFileHandler(logger, cluster_name)
462 b024b97a Georgios Gousios
    notifier = pyinotify.Notifier(wm, handler)
463 b024b97a Georgios Gousios
464 b024b97a Georgios Gousios
    try:
465 b024b97a Georgios Gousios
        # Fail if adding the inotify() watch fails for any reason
466 a9bb2a1a Christos Stavrakakis
        res = wm.add_watch(pathutils.QUEUE_DIR, mask)
467 a9bb2a1a Christos Stavrakakis
        if res[pathutils.QUEUE_DIR] < 0:
468 36cf1973 Vangelis Koukis
            raise Exception("pyinotify add_watch returned negative descriptor")
469 348f53de Georgios Gousios
470 a9bb2a1a Christos Stavrakakis
        logger.info("Now watching %s of %s" % (pathutils.QUEUE_DIR,
471 a9bb2a1a Christos Stavrakakis
                    cluster_name))
472 b024b97a Georgios Gousios
473 b024b97a Georgios Gousios
        while True:    # loop forever
474 348f53de Georgios Gousios
            # process the queue of events as explained above
475 b024b97a Georgios Gousios
            notifier.process_events()
476 b024b97a Georgios Gousios
            if notifier.check_events():
477 b024b97a Georgios Gousios
                # read notified events and enqeue them
478 b024b97a Georgios Gousios
                notifier.read_events()
479 b024b97a Georgios Gousios
    except SystemExit:
480 b024b97a Georgios Gousios
        logger.info("SystemExit")
481 b024b97a Georgios Gousios
    except:
482 b024b97a Georgios Gousios
        logger.exception("Caught exception, terminating")
483 b024b97a Georgios Gousios
    finally:
484 b024b97a Georgios Gousios
        # destroy the inotify's instance on this interrupt (stop monitoring)
485 b024b97a Georgios Gousios
        notifier.stop()
486 b024b97a Georgios Gousios
        raise
487 b024b97a Georgios Gousios
488 b024b97a Georgios Gousios
if __name__ == "__main__":
489 b024b97a Georgios Gousios
    sys.exit(main())
490 b024b97a Georgios Gousios
491 348f53de Georgios Gousios
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :