Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-gtools / synnefo / ganeti / eventd.py @ e5a92bec

History | View | Annotate | Download (16 kB)

1 b024b97a Georgios Gousios
#!/usr/bin/env python
2 737b0e28 Vangelis Koukis
# -*- coding: utf-8 -*-
3 b024b97a Georgios Gousios
#
4 737b0e28 Vangelis Koukis
# Copyright 2011 GRNET S.A. All rights reserved.
5 737b0e28 Vangelis Koukis
#
6 737b0e28 Vangelis Koukis
# Redistribution and use in source and binary forms, with or
7 737b0e28 Vangelis Koukis
# without modification, are permitted provided that the following
8 737b0e28 Vangelis Koukis
# conditions are met:
9 737b0e28 Vangelis Koukis
#
10 737b0e28 Vangelis Koukis
#   1. Redistributions of source code must retain the above
11 737b0e28 Vangelis Koukis
#      copyright notice, this list of conditions and the following
12 737b0e28 Vangelis Koukis
#      disclaimer.
13 737b0e28 Vangelis Koukis
#
14 737b0e28 Vangelis Koukis
#   2. Redistributions in binary form must reproduce the above
15 737b0e28 Vangelis Koukis
#      copyright notice, this list of conditions and the following
16 737b0e28 Vangelis Koukis
#      disclaimer in the documentation and/or other materials
17 737b0e28 Vangelis Koukis
#      provided with the distribution.
18 737b0e28 Vangelis Koukis
#
19 737b0e28 Vangelis Koukis
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20 737b0e28 Vangelis Koukis
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 737b0e28 Vangelis Koukis
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22 737b0e28 Vangelis Koukis
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23 737b0e28 Vangelis Koukis
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24 737b0e28 Vangelis Koukis
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25 737b0e28 Vangelis Koukis
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26 737b0e28 Vangelis Koukis
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27 737b0e28 Vangelis Koukis
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28 737b0e28 Vangelis Koukis
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29 737b0e28 Vangelis Koukis
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30 737b0e28 Vangelis Koukis
# POSSIBILITY OF SUCH DAMAGE.
31 737b0e28 Vangelis Koukis
#
32 737b0e28 Vangelis Koukis
# The views and conclusions contained in the software and
33 737b0e28 Vangelis Koukis
# documentation are those of the authors and should not be
34 737b0e28 Vangelis Koukis
# interpreted as representing official policies, either expressed
35 737b0e28 Vangelis Koukis
# or implied, of GRNET S.A.
36 737b0e28 Vangelis Koukis
#
37 45ebfd48 Vangelis Koukis
38 737b0e28 Vangelis Koukis
"""Ganeti notification daemon with AMQP support
39 b024b97a Georgios Gousios

40 b024b97a Georgios Gousios
A daemon to monitor the Ganeti job queue and publish job progress
41 80bd5072 Georgios Gousios
and Ganeti VM state notifications to the ganeti exchange
42 b024b97a Georgios Gousios
"""
43 b024b97a Georgios Gousios
44 b024b97a Georgios Gousios
import sys
45 b024b97a Georgios Gousios
import os
46 b024b97a Georgios Gousios
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
47 b024b97a Georgios Gousios
sys.path.append(path)
48 b024b97a Georgios Gousios
49 b024b97a Georgios Gousios
import json
50 b024b97a Georgios Gousios
import logging
51 b024b97a Georgios Gousios
import pyinotify
52 b024b97a Georgios Gousios
import daemon
53 b024b97a Georgios Gousios
import daemon.pidlockfile
54 cf2a3529 Christos Stavrakakis
import daemon.runner
55 cf2a3529 Christos Stavrakakis
from lockfile import LockTimeout
56 b024b97a Georgios Gousios
from signal import signal, SIGINT, SIGTERM
57 1dc821c9 Christos Stavrakakis
import setproctitle
58 b024b97a Georgios Gousios
59 a9bb2a1a Christos Stavrakakis
from ganeti import utils, jqueue, constants, serializer, pathutils, cli
60 6df16263 Christos Stavrakakis
from ganeti import errors as ganeti_errors
61 a9bb2a1a Christos Stavrakakis
from ganeti.ssconf import SimpleStore
62 e66a7e34 Christos Stavrakakis
63 b024b97a Georgios Gousios
64 6d6b8f88 Kostas Papadimitriou
from synnefo import settings
65 c4e55622 Christos Stavrakakis
from synnefo.lib.amqp import AMQPClient
66 c4e55622 Christos Stavrakakis
67 27a8e4ae Christos Stavrakakis
68 c4e55622 Christos Stavrakakis
def get_time_from_status(op, job):
69 c4e55622 Christos Stavrakakis
    """Generate a unique message identifier for a ganeti job.
70 c4e55622 Christos Stavrakakis

71 c4e55622 Christos Stavrakakis
    The identifier is based on the timestamp of the job. Since a ganeti
72 c4e55622 Christos Stavrakakis
    job passes from multiple states, we need to pick the timestamp that
73 c4e55622 Christos Stavrakakis
    corresponds to each state.
74 c4e55622 Christos Stavrakakis

75 c4e55622 Christos Stavrakakis
    """
76 c4e55622 Christos Stavrakakis
    status = op.status
77 c4e55622 Christos Stavrakakis
    if status == constants.JOB_STATUS_QUEUED:
78 765ff3ff Christos Stavrakakis
        time = job.received_timestamp
79 27a8e4ae Christos Stavrakakis
    try:  # Compatibility with Ganeti version
80 39ccdb18 Christos Stavrakakis
        if status == constants.JOB_STATUS_WAITLOCK:
81 765ff3ff Christos Stavrakakis
            time = op.start_timestamp
82 39ccdb18 Christos Stavrakakis
    except AttributeError:
83 39ccdb18 Christos Stavrakakis
        if status == constants.JOB_STATUS_WAITING:
84 765ff3ff Christos Stavrakakis
            time = op.start_timestamp
85 c4e55622 Christos Stavrakakis
    if status == constants.JOB_STATUS_CANCELING:
86 765ff3ff Christos Stavrakakis
        time = op.start_timestamp
87 c4e55622 Christos Stavrakakis
    if status == constants.JOB_STATUS_RUNNING:
88 765ff3ff Christos Stavrakakis
        time = op.exec_timestamp
89 c4e55622 Christos Stavrakakis
    if status in constants.JOBS_FINALIZED:
90 765ff3ff Christos Stavrakakis
        time = op.end_timestamp
91 765ff3ff Christos Stavrakakis
92 765ff3ff Christos Stavrakakis
    return time and time or job.end_timestamp
93 c4e55622 Christos Stavrakakis
94 27a8e4ae Christos Stavrakakis
    raise InvalidBackendStatus(status, job)
95 c4e55622 Christos Stavrakakis
96 c4e55622 Christos Stavrakakis
97 0e1f3323 Christos Stavrakakis
def get_instance_nics(instance, logger):
98 0e1f3323 Christos Stavrakakis
    """Query Ganeti to a get the instance's NICs.
99 0e1f3323 Christos Stavrakakis

100 6df16263 Christos Stavrakakis
    Get instance's NICs from Ganeti configuration data. If running on master,
101 6df16263 Christos Stavrakakis
    query Ganeti via Ganeti CLI client. Otherwise, get the nics from Ganeti
102 6df16263 Christos Stavrakakis
    configuration file.
103 6df16263 Christos Stavrakakis

104 0e1f3323 Christos Stavrakakis
    @type instance: string
105 0e1f3323 Christos Stavrakakis
    @param instance: the name of the instance
106 0e1f3323 Christos Stavrakakis
    @rtype: List of dicts
107 6df16263 Christos Stavrakakis
    @return: Dictionary containing the instance's NICs. Each dictionary
108 0e1f3323 Christos Stavrakakis
             contains the following keys: 'network', 'ip', 'mac', 'mode',
109 0e1f3323 Christos Stavrakakis
             'link' and 'firewall'
110 0e1f3323 Christos Stavrakakis

111 0e1f3323 Christos Stavrakakis
    """
112 6df16263 Christos Stavrakakis
    try:
113 6df16263 Christos Stavrakakis
        client = cli.GetClient()
114 220e3cc0 Christos Stavrakakis
        fields = ["nic.names", "nic.networks.names", "nic.ips", "nic.macs",
115 220e3cc0 Christos Stavrakakis
                  "nic.modes", "nic.links", "tags"]
116 6df16263 Christos Stavrakakis
        info = client.QueryInstances([instance], fields, use_locking=False)
117 220e3cc0 Christos Stavrakakis
        names, networks, ips, macs, modes, links, tags = info[0]
118 220e3cc0 Christos Stavrakakis
        nic_keys = ["name", "network", "ip", "mac", "mode", "link"]
119 220e3cc0 Christos Stavrakakis
        nics = zip(names, networks, ips, macs, modes, links)
120 6df16263 Christos Stavrakakis
        nics = map(lambda x: dict(zip(nic_keys, x)), nics)
121 6df16263 Christos Stavrakakis
    except ganeti_errors.OpPrereqError:
122 6df16263 Christos Stavrakakis
        # Not running on master! Load the conf file
123 6df16263 Christos Stavrakakis
        raw_data = utils.ReadFile(constants.CLUSTER_CONF_FILE)
124 6df16263 Christos Stavrakakis
        config = serializer.LoadJson(raw_data)
125 6df16263 Christos Stavrakakis
        i = config["instances"][instance]
126 6df16263 Christos Stavrakakis
        nics = []
127 6df16263 Christos Stavrakakis
        for nic in i["nics"]:
128 6df16263 Christos Stavrakakis
            params = nic.pop("nicparams")
129 6df16263 Christos Stavrakakis
            nic["mode"] = params["mode"]
130 6df16263 Christos Stavrakakis
            nic["link"] = params["link"]
131 6df16263 Christos Stavrakakis
            nics.append(nic)
132 6df16263 Christos Stavrakakis
        tags = i.get("tags", [])
133 0e1f3323 Christos Stavrakakis
    # Get firewall from instance Tags
134 0e1f3323 Christos Stavrakakis
    # Tags are of the form synnefo:network:N:firewall_mode
135 0e1f3323 Christos Stavrakakis
    for tag in tags:
136 0e1f3323 Christos Stavrakakis
        t = tag.split(":")
137 0e1f3323 Christos Stavrakakis
        if t[0:2] == ["synnefo", "network"]:
138 0e1f3323 Christos Stavrakakis
            if len(t) != 4:
139 0e1f3323 Christos Stavrakakis
                logger.error("Malformed synefo tag %s", tag)
140 0e1f3323 Christos Stavrakakis
                continue
141 d0545590 Christos Stavrakakis
            nic_name = t[2]
142 d0545590 Christos Stavrakakis
            firewall = t[3]
143 d0545590 Christos Stavrakakis
            [nic.setdefault("firewall", firewall)
144 d0545590 Christos Stavrakakis
             for nic in nics if nic["name"] == nic_name]
145 0e1f3323 Christos Stavrakakis
    return nics
146 0e1f3323 Christos Stavrakakis
147 0e1f3323 Christos Stavrakakis
148 c4e55622 Christos Stavrakakis
class InvalidBackendStatus(Exception):
149 c4e55622 Christos Stavrakakis
    def __init__(self, status, job):
150 c4e55622 Christos Stavrakakis
        self.status = status
151 c4e55622 Christos Stavrakakis
        self.job = job
152 c4e55622 Christos Stavrakakis
153 c4e55622 Christos Stavrakakis
    def __str__(self):
154 c4e55622 Christos Stavrakakis
        return repr("Invalid backend status: %s in job %s"
155 c4e55622 Christos Stavrakakis
                    % (self.status, self.job))
156 c4e55622 Christos Stavrakakis
157 27a8e4ae Christos Stavrakakis
158 39ccdb18 Christos Stavrakakis
def prefix_from_name(name):
159 39ccdb18 Christos Stavrakakis
    return name.split('-')[0]
160 39ccdb18 Christos Stavrakakis
161 39ccdb18 Christos Stavrakakis
162 39ccdb18 Christos Stavrakakis
def get_field(from_, field):
163 39ccdb18 Christos Stavrakakis
    try:
164 39ccdb18 Christos Stavrakakis
        return getattr(from_, field)
165 39ccdb18 Christos Stavrakakis
    except AttributeError:
166 39ccdb18 Christos Stavrakakis
        None
167 39ccdb18 Christos Stavrakakis
168 45ebfd48 Vangelis Koukis
169 b024b97a Georgios Gousios
class JobFileHandler(pyinotify.ProcessEvent):
170 27a8e4ae Christos Stavrakakis
    def __init__(self, logger, cluster_name):
171 348f53de Georgios Gousios
        pyinotify.ProcessEvent.__init__(self)
172 348f53de Georgios Gousios
        self.logger = logger
173 27a8e4ae Christos Stavrakakis
        self.cluster_name = cluster_name
174 27a8e4ae Christos Stavrakakis
175 2ef10562 Christos Stavrakakis
        # Set max_retries to 0 for unlimited retries.
176 2ef10562 Christos Stavrakakis
        self.client = AMQPClient(hosts=settings.AMQP_HOSTS, confirm_buffer=25,
177 a8858945 Christos Stavrakakis
                                 max_retries=0, logger=logger)
178 2ef10562 Christos Stavrakakis
179 c4e55622 Christos Stavrakakis
        handler_logger.info("Attempting to connect to RabbitMQ hosts")
180 27a8e4ae Christos Stavrakakis
181 c4e55622 Christos Stavrakakis
        self.client.connect()
182 c4e55622 Christos Stavrakakis
        handler_logger.info("Connected succesfully")
183 b024b97a Georgios Gousios
184 27a8e4ae Christos Stavrakakis
        self.client.exchange_declare(settings.EXCHANGE_GANETI, type='topic')
185 27a8e4ae Christos Stavrakakis
186 39ccdb18 Christos Stavrakakis
        self.op_handlers = {"INSTANCE": self.process_instance_op,
187 36d450e8 Christos Stavrakakis
                            "NETWORK": self.process_network_op,
188 36d450e8 Christos Stavrakakis
                            "CLUSTER": self.process_cluster_op}
189 39ccdb18 Christos Stavrakakis
                            # "GROUP": self.process_group_op}
190 39ccdb18 Christos Stavrakakis
191 b024b97a Georgios Gousios
    def process_IN_CLOSE_WRITE(self, event):
192 80dcd124 Vangelis Koukis
        self.process_IN_MOVED_TO(event)
193 80dcd124 Vangelis Koukis
194 80dcd124 Vangelis Koukis
    def process_IN_MOVED_TO(self, event):
195 b024b97a Georgios Gousios
        jobfile = os.path.join(event.path, event.name)
196 b024b97a Georgios Gousios
        if not event.name.startswith("job-"):
197 b024b97a Georgios Gousios
            self.logger.debug("Not a job file: %s" % event.path)
198 b024b97a Georgios Gousios
            return
199 b024b97a Georgios Gousios
200 b024b97a Georgios Gousios
        try:
201 b024b97a Georgios Gousios
            data = utils.ReadFile(jobfile)
202 b024b97a Georgios Gousios
        except IOError:
203 b024b97a Georgios Gousios
            return
204 b024b97a Georgios Gousios
205 b024b97a Georgios Gousios
        data = serializer.LoadJson(data)
206 a9bb2a1a Christos Stavrakakis
        job = jqueue._QueuedJob.Restore(None, data, False, False)
207 4ed30eed Christos Stavrakakis
208 39ccdb18 Christos Stavrakakis
        job_id = int(job.id)
209 b024b97a Georgios Gousios
210 b024b97a Georgios Gousios
        for op in job.ops:
211 39ccdb18 Christos Stavrakakis
            op_id = op.input.OP_ID
212 b024b97a Georgios Gousios
213 39ccdb18 Christos Stavrakakis
            msg = None
214 b024b97a Georgios Gousios
            try:
215 39ccdb18 Christos Stavrakakis
                handler_fn = self.op_handlers[op_id.split('_')[1]]
216 39ccdb18 Christos Stavrakakis
                msg, routekey = handler_fn(op, job_id)
217 39ccdb18 Christos Stavrakakis
            except KeyError:
218 b024b97a Georgios Gousios
                pass
219 b024b97a Georgios Gousios
220 39ccdb18 Christos Stavrakakis
            if not msg:
221 39ccdb18 Christos Stavrakakis
                self.logger.debug("Ignoring job: %s: %s", job_id, op_id)
222 07e4ab22 Christos Stavrakakis
                continue
223 07e4ab22 Christos Stavrakakis
224 39ccdb18 Christos Stavrakakis
            # Generate a unique message identifier
225 39ccdb18 Christos Stavrakakis
            event_time = get_time_from_status(op, job)
226 39ccdb18 Christos Stavrakakis
227 b024b97a Georgios Gousios
            # Get the last line of the op log as message
228 b024b97a Georgios Gousios
            try:
229 b024b97a Georgios Gousios
                logmsg = op.log[-1][-1]
230 b024b97a Georgios Gousios
            except IndexError:
231 b024b97a Georgios Gousios
                logmsg = None
232 348f53de Georgios Gousios
233 39ccdb18 Christos Stavrakakis
            # Add shared attributes for all operations
234 39ccdb18 Christos Stavrakakis
            msg.update({"event_time": event_time,
235 39ccdb18 Christos Stavrakakis
                        "operation": op_id,
236 39ccdb18 Christos Stavrakakis
                        "status": op.status,
237 27a8e4ae Christos Stavrakakis
                        "cluster": self.cluster_name,
238 39ccdb18 Christos Stavrakakis
                        "logmsg": logmsg,
239 39ccdb18 Christos Stavrakakis
                        "jobId": job_id})
240 c4e55622 Christos Stavrakakis
241 12ee1ad8 Christos Stavrakakis
            if op.status == "success":
242 12ee1ad8 Christos Stavrakakis
                msg["result"] = op.result
243 12ee1ad8 Christos Stavrakakis
244 c4e55622 Christos Stavrakakis
            msg = json.dumps(msg)
245 39ccdb18 Christos Stavrakakis
            self.logger.debug("Delivering msg: %s (key=%s)", msg, routekey)
246 c4e55622 Christos Stavrakakis
247 c4e55622 Christos Stavrakakis
            # Send the message to RabbitMQ
248 db400d82 Christos Stavrakakis
            self.client.basic_publish(settings.EXCHANGE_GANETI,
249 db400d82 Christos Stavrakakis
                                      routekey,
250 db400d82 Christos Stavrakakis
                                      msg)
251 39ccdb18 Christos Stavrakakis
252 39ccdb18 Christos Stavrakakis
    def process_instance_op(self, op, job_id):
253 39ccdb18 Christos Stavrakakis
        """ Process OP_INSTANCE_* opcodes.
254 39ccdb18 Christos Stavrakakis

255 39ccdb18 Christos Stavrakakis
        """
256 39ccdb18 Christos Stavrakakis
        input = op.input
257 39ccdb18 Christos Stavrakakis
        op_id = input.OP_ID
258 39ccdb18 Christos Stavrakakis
259 39ccdb18 Christos Stavrakakis
        instances = None
260 39ccdb18 Christos Stavrakakis
        instances = get_field(input, 'instance_name')
261 39ccdb18 Christos Stavrakakis
        if not instances:
262 39ccdb18 Christos Stavrakakis
            instances = get_field(input, 'instances')
263 39ccdb18 Christos Stavrakakis
            if not instances or len(instances) > 1:
264 39ccdb18 Christos Stavrakakis
                # Do not publish messages for jobs with no or multiple
265 449787d3 Christos Stavrakakis
                # instances.  Currently snf-dispatcher can not normally handle
266 449787d3 Christos Stavrakakis
                # these messages
267 39ccdb18 Christos Stavrakakis
                return None, None
268 39ccdb18 Christos Stavrakakis
            else:
269 39ccdb18 Christos Stavrakakis
                instances = instances[0]
270 39ccdb18 Christos Stavrakakis
271 39ccdb18 Christos Stavrakakis
        self.logger.debug("Job: %d: %s(%s) %s", job_id, op_id,
272 39ccdb18 Christos Stavrakakis
                          instances, op.status)
273 39ccdb18 Christos Stavrakakis
274 e6fbada1 Christos Stavrakakis
        job_fields = {}
275 e6fbada1 Christos Stavrakakis
        if op_id in ["OP_INSTANCE_SET_PARAMS", "OP_INSTANCE_CREATE"]:
276 e6fbada1 Christos Stavrakakis
            job_fields = {"nics": get_field(input, "nics"),
277 e6fbada1 Christos Stavrakakis
                          "disks": get_field(input, "disks"),
278 e6fbada1 Christos Stavrakakis
                          "beparams": get_field(input, "beparams")}
279 e6fbada1 Christos Stavrakakis
280 39ccdb18 Christos Stavrakakis
        msg = {"type": "ganeti-op-status",
281 39ccdb18 Christos Stavrakakis
               "instance": instances,
282 e6fbada1 Christos Stavrakakis
               "operation": op_id,
283 e6fbada1 Christos Stavrakakis
               "job_fields": job_fields}
284 39ccdb18 Christos Stavrakakis
285 e5a92bec Christos Stavrakakis
        if ((op_id in ["OP_INSTANCE_CREATE", "OP_INSTANCE_STARTUP"] and
286 e5a92bec Christos Stavrakakis
             op.status == "success") or
287 e5a92bec Christos Stavrakakis
            (op_id == "OP_INSTANCE_SET_PARAMS" and
288 e5a92bec Christos Stavrakakis
             op.status in ["success", "error", "cancelled"])):
289 e6fbada1 Christos Stavrakakis
                nics = get_instance_nics(msg["instance"], self.logger)
290 e6fbada1 Christos Stavrakakis
                msg["instance_nics"] = nics
291 12ee1ad8 Christos Stavrakakis
292 39ccdb18 Christos Stavrakakis
        routekey = "ganeti.%s.event.op" % prefix_from_name(instances)
293 39ccdb18 Christos Stavrakakis
294 39ccdb18 Christos Stavrakakis
        return msg, routekey
295 39ccdb18 Christos Stavrakakis
296 39ccdb18 Christos Stavrakakis
    def process_network_op(self, op, job_id):
297 39ccdb18 Christos Stavrakakis
        """ Process OP_NETWORK_* opcodes.
298 39ccdb18 Christos Stavrakakis

299 39ccdb18 Christos Stavrakakis
        """
300 39ccdb18 Christos Stavrakakis
301 39ccdb18 Christos Stavrakakis
        input = op.input
302 39ccdb18 Christos Stavrakakis
        op_id = input.OP_ID
303 39ccdb18 Christos Stavrakakis
        network_name = get_field(input, 'network_name')
304 39ccdb18 Christos Stavrakakis
305 39ccdb18 Christos Stavrakakis
        if not network_name:
306 39ccdb18 Christos Stavrakakis
            return None, None
307 39ccdb18 Christos Stavrakakis
308 39ccdb18 Christos Stavrakakis
        self.logger.debug("Job: %d: %s(%s) %s", job_id, op_id,
309 39ccdb18 Christos Stavrakakis
                          network_name, op.status)
310 39ccdb18 Christos Stavrakakis
311 e6fbada1 Christos Stavrakakis
        job_fields = {
312 e6fbada1 Christos Stavrakakis
            'subnet': get_field(input, 'network'),
313 e6fbada1 Christos Stavrakakis
            'gateway': get_field(input, 'gateway'),
314 e6fbada1 Christos Stavrakakis
            "add_reserved_ips": get_field(input, "add_reserved_ips"),
315 e6fbada1 Christos Stavrakakis
            "remove_reserved_ips": get_field(input, "remove_reserved_ips"),
316 e6fbada1 Christos Stavrakakis
            # 'network_mode': get_field(input, 'network_mode'),
317 e6fbada1 Christos Stavrakakis
            # 'network_link': get_field(input, 'network_link'),
318 e6fbada1 Christos Stavrakakis
            'group_name': get_field(input, 'group_name')}
319 e6fbada1 Christos Stavrakakis
320 39ccdb18 Christos Stavrakakis
        msg = {'operation':    op_id,
321 39ccdb18 Christos Stavrakakis
               'type':         "ganeti-network-status",
322 39ccdb18 Christos Stavrakakis
               'network':      network_name,
323 e6fbada1 Christos Stavrakakis
               'job_fields':   job_fields}
324 e6fbada1 Christos Stavrakakis
325 39ccdb18 Christos Stavrakakis
        routekey = "ganeti.%s.event.network" % prefix_from_name(network_name)
326 39ccdb18 Christos Stavrakakis
327 39ccdb18 Christos Stavrakakis
        return msg, routekey
328 39ccdb18 Christos Stavrakakis
329 36d450e8 Christos Stavrakakis
    def process_cluster_op(self, op, job_id):
330 36d450e8 Christos Stavrakakis
        """ Process OP_CLUSTER_* opcodes.
331 39ccdb18 Christos Stavrakakis

332 36d450e8 Christos Stavrakakis
        """
333 36d450e8 Christos Stavrakakis
334 36d450e8 Christos Stavrakakis
        input = op.input
335 36d450e8 Christos Stavrakakis
        op_id = input.OP_ID
336 36d450e8 Christos Stavrakakis
337 36d450e8 Christos Stavrakakis
        self.logger.debug("Job: %d: %s %s", job_id, op_id, op.status)
338 39ccdb18 Christos Stavrakakis
339 36d450e8 Christos Stavrakakis
        msg = {'operation':    op_id,
340 36d450e8 Christos Stavrakakis
               'type':         "ganeti-cluster-status"}
341 36d450e8 Christos Stavrakakis
342 36d450e8 Christos Stavrakakis
        routekey = "ganeti.event.cluster"
343 36d450e8 Christos Stavrakakis
344 36d450e8 Christos Stavrakakis
        return msg, routekey
345 39ccdb18 Christos Stavrakakis
346 39ccdb18 Christos Stavrakakis
347 27a8e4ae Christos Stavrakakis
def find_cluster_name():
348 51fc0054 Christos Stavrakakis
    global handler_logger
349 51fc0054 Christos Stavrakakis
    try:
350 a9bb2a1a Christos Stavrakakis
        ss = SimpleStore()
351 a9bb2a1a Christos Stavrakakis
        name = ss.GetClusterName()
352 51fc0054 Christos Stavrakakis
    except Exception as e:
353 51fc0054 Christos Stavrakakis
        handler_logger.error('Can not get the name of the Cluster: %s' % e)
354 51fc0054 Christos Stavrakakis
        raise e
355 51fc0054 Christos Stavrakakis
356 27a8e4ae Christos Stavrakakis
    return name
357 39ccdb18 Christos Stavrakakis
358 449787d3 Christos Stavrakakis
359 b024b97a Georgios Gousios
handler_logger = None
360 449787d3 Christos Stavrakakis
361 449787d3 Christos Stavrakakis
362 b024b97a Georgios Gousios
def fatal_signal_handler(signum, frame):
363 b024b97a Georgios Gousios
    global handler_logger
364 b024b97a Georgios Gousios
365 b024b97a Georgios Gousios
    handler_logger.info("Caught fatal signal %d, will raise SystemExit",
366 2cd99e7a Georgios Gousios
                        signum)
367 b024b97a Georgios Gousios
    raise SystemExit
368 b024b97a Georgios Gousios
369 449787d3 Christos Stavrakakis
370 b024b97a Georgios Gousios
def parse_arguments(args):
371 b024b97a Georgios Gousios
    from optparse import OptionParser
372 b024b97a Georgios Gousios
373 b024b97a Georgios Gousios
    parser = OptionParser()
374 b024b97a Georgios Gousios
    parser.add_option("-d", "--debug", action="store_true", dest="debug",
375 2cd99e7a Georgios Gousios
                      help="Enable debugging information")
376 b024b97a Georgios Gousios
    parser.add_option("-l", "--log", dest="log_file",
377 45ebfd48 Vangelis Koukis
                      default="/var/log/snf-ganeti-eventd.log",
378 2cd99e7a Georgios Gousios
                      metavar="FILE",
379 2cd99e7a Georgios Gousios
                      help="Write log to FILE instead of %s" %
380 449787d3 Christos Stavrakakis
                           "/var/log/snf-ganeti-eventd.log")
381 b024b97a Georgios Gousios
    parser.add_option('--pid-file', dest="pid_file",
382 45ebfd48 Vangelis Koukis
                      default="/var/run/snf-ganeti-eventd.pid",
383 2cd99e7a Georgios Gousios
                      metavar='PIDFILE',
384 2cd99e7a Georgios Gousios
                      help="Save PID to file (default: %s)" %
385 449787d3 Christos Stavrakakis
                           "/var/run/snf-ganeti-eventd.pid")
386 b024b97a Georgios Gousios
387 b024b97a Georgios Gousios
    return parser.parse_args(args)
388 b024b97a Georgios Gousios
389 449787d3 Christos Stavrakakis
390 b024b97a Georgios Gousios
def main():
391 b024b97a Georgios Gousios
    global handler_logger
392 b024b97a Georgios Gousios
393 b024b97a Georgios Gousios
    (opts, args) = parse_arguments(sys.argv[1:])
394 b024b97a Georgios Gousios
395 b024b97a Georgios Gousios
    # Initialize logger
396 b024b97a Georgios Gousios
    lvl = logging.DEBUG if opts.debug else logging.INFO
397 03a4b970 Georgios Gousios
    logger = logging.getLogger("ganeti.eventd")
398 b024b97a Georgios Gousios
    logger.setLevel(lvl)
399 36cf1973 Vangelis Koukis
    formatter = logging.Formatter(
400 36cf1973 Vangelis Koukis
        "%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
401 36cf1973 Vangelis Koukis
        "%Y-%m-%d %H:%M:%S")
402 b024b97a Georgios Gousios
    handler = logging.FileHandler(opts.log_file)
403 b024b97a Georgios Gousios
    handler.setFormatter(formatter)
404 b024b97a Georgios Gousios
    logger.addHandler(handler)
405 b024b97a Georgios Gousios
    handler_logger = logger
406 b024b97a Georgios Gousios
407 1dc821c9 Christos Stavrakakis
    # Rename this process so 'ps' output looks like this is a native
408 1dc821c9 Christos Stavrakakis
    # executable.  Can not seperate command-line arguments from actual name of
409 1dc821c9 Christos Stavrakakis
    # the executable by NUL bytes, so only show the name of the executable
410 1dc821c9 Christos Stavrakakis
    # instead.  setproctitle.setproctitle("\x00".join(sys.argv))
411 1dc821c9 Christos Stavrakakis
    setproctitle.setproctitle(sys.argv[0])
412 1dc821c9 Christos Stavrakakis
413 cf2a3529 Christos Stavrakakis
    # Create pidfile
414 cf2a3529 Christos Stavrakakis
    pidf = daemon.pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
415 cf2a3529 Christos Stavrakakis
416 cf2a3529 Christos Stavrakakis
    # Remove any stale PID files, left behind by previous invocations
417 cf2a3529 Christos Stavrakakis
    if daemon.runner.is_pidfile_stale(pidf):
418 cf2a3529 Christos Stavrakakis
        logger.warning("Removing stale PID lock file %s", pidf.path)
419 cf2a3529 Christos Stavrakakis
        pidf.break_lock()
420 cf2a3529 Christos Stavrakakis
421 348f53de Georgios Gousios
    # Become a daemon:
422 348f53de Georgios Gousios
    # Redirect stdout and stderr to handler.stream to catch
423 348f53de Georgios Gousios
    # early errors in the daemonization process [e.g., pidfile creation]
424 348f53de Georgios Gousios
    # which will otherwise go to /dev/null.
425 348f53de Georgios Gousios
    daemon_context = daemon.DaemonContext(
426 449787d3 Christos Stavrakakis
        pidfile=pidf,
427 449787d3 Christos Stavrakakis
        umask=022,
428 449787d3 Christos Stavrakakis
        stdout=handler.stream,
429 449787d3 Christos Stavrakakis
        stderr=handler.stream,
430 449787d3 Christos Stavrakakis
        files_preserve=[handler.stream])
431 cf2a3529 Christos Stavrakakis
    try:
432 cf2a3529 Christos Stavrakakis
        daemon_context.open()
433 cf2a3529 Christos Stavrakakis
    except (daemon.pidlockfile.AlreadyLocked, LockTimeout):
434 cf2a3529 Christos Stavrakakis
        logger.critical("Failed to lock pidfile %s, another instance running?",
435 cf2a3529 Christos Stavrakakis
                        pidf.path)
436 cf2a3529 Christos Stavrakakis
        sys.exit(1)
437 cf2a3529 Christos Stavrakakis
438 348f53de Georgios Gousios
    logger.info("Became a daemon")
439 348f53de Georgios Gousios
440 348f53de Georgios Gousios
    # Catch signals to ensure graceful shutdown
441 348f53de Georgios Gousios
    signal(SIGINT, fatal_signal_handler)
442 348f53de Georgios Gousios
    signal(SIGTERM, fatal_signal_handler)
443 348f53de Georgios Gousios
444 b024b97a Georgios Gousios
    # Monitor the Ganeti job queue, create and push notifications
445 b024b97a Georgios Gousios
    wm = pyinotify.WatchManager()
446 449787d3 Christos Stavrakakis
    mask = (pyinotify.EventsCodes.ALL_FLAGS["IN_MOVED_TO"] |
447 449787d3 Christos Stavrakakis
            pyinotify.EventsCodes.ALL_FLAGS["IN_CLOSE_WRITE"])
448 51fc0054 Christos Stavrakakis
449 27a8e4ae Christos Stavrakakis
    cluster_name = find_cluster_name()
450 51fc0054 Christos Stavrakakis
451 27a8e4ae Christos Stavrakakis
    handler = JobFileHandler(logger, cluster_name)
452 b024b97a Georgios Gousios
    notifier = pyinotify.Notifier(wm, handler)
453 b024b97a Georgios Gousios
454 b024b97a Georgios Gousios
    try:
455 b024b97a Georgios Gousios
        # Fail if adding the inotify() watch fails for any reason
456 a9bb2a1a Christos Stavrakakis
        res = wm.add_watch(pathutils.QUEUE_DIR, mask)
457 a9bb2a1a Christos Stavrakakis
        if res[pathutils.QUEUE_DIR] < 0:
458 36cf1973 Vangelis Koukis
            raise Exception("pyinotify add_watch returned negative descriptor")
459 348f53de Georgios Gousios
460 a9bb2a1a Christos Stavrakakis
        logger.info("Now watching %s of %s" % (pathutils.QUEUE_DIR,
461 a9bb2a1a Christos Stavrakakis
                    cluster_name))
462 b024b97a Georgios Gousios
463 b024b97a Georgios Gousios
        while True:    # loop forever
464 348f53de Georgios Gousios
            # process the queue of events as explained above
465 b024b97a Georgios Gousios
            notifier.process_events()
466 b024b97a Georgios Gousios
            if notifier.check_events():
467 b024b97a Georgios Gousios
                # read notified events and enqeue them
468 b024b97a Georgios Gousios
                notifier.read_events()
469 b024b97a Georgios Gousios
    except SystemExit:
470 b024b97a Georgios Gousios
        logger.info("SystemExit")
471 b024b97a Georgios Gousios
    except:
472 b024b97a Georgios Gousios
        logger.exception("Caught exception, terminating")
473 b024b97a Georgios Gousios
    finally:
474 b024b97a Georgios Gousios
        # destroy the inotify's instance on this interrupt (stop monitoring)
475 b024b97a Georgios Gousios
        notifier.stop()
476 b024b97a Georgios Gousios
        raise
477 b024b97a Georgios Gousios
478 b024b97a Georgios Gousios
if __name__ == "__main__":
479 b024b97a Georgios Gousios
    sys.exit(main())
480 b024b97a Georgios Gousios
481 348f53de Georgios Gousios
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :