Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-gtools / synnefo / ganeti / eventd.py @ 449787d3

History | View | Annotate | Download (14.8 kB)

1 b024b97a Georgios Gousios
#!/usr/bin/env python
2 737b0e28 Vangelis Koukis
# -*- coding: utf-8 -*-
3 b024b97a Georgios Gousios
#
4 737b0e28 Vangelis Koukis
# Copyright 2011 GRNET S.A. All rights reserved.
5 737b0e28 Vangelis Koukis
#
6 737b0e28 Vangelis Koukis
# Redistribution and use in source and binary forms, with or
7 737b0e28 Vangelis Koukis
# without modification, are permitted provided that the following
8 737b0e28 Vangelis Koukis
# conditions are met:
9 737b0e28 Vangelis Koukis
#
10 737b0e28 Vangelis Koukis
#   1. Redistributions of source code must retain the above
11 737b0e28 Vangelis Koukis
#      copyright notice, this list of conditions and the following
12 737b0e28 Vangelis Koukis
#      disclaimer.
13 737b0e28 Vangelis Koukis
#
14 737b0e28 Vangelis Koukis
#   2. Redistributions in binary form must reproduce the above
15 737b0e28 Vangelis Koukis
#      copyright notice, this list of conditions and the following
16 737b0e28 Vangelis Koukis
#      disclaimer in the documentation and/or other materials
17 737b0e28 Vangelis Koukis
#      provided with the distribution.
18 737b0e28 Vangelis Koukis
#
19 737b0e28 Vangelis Koukis
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20 737b0e28 Vangelis Koukis
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 737b0e28 Vangelis Koukis
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22 737b0e28 Vangelis Koukis
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23 737b0e28 Vangelis Koukis
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24 737b0e28 Vangelis Koukis
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25 737b0e28 Vangelis Koukis
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26 737b0e28 Vangelis Koukis
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27 737b0e28 Vangelis Koukis
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28 737b0e28 Vangelis Koukis
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29 737b0e28 Vangelis Koukis
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30 737b0e28 Vangelis Koukis
# POSSIBILITY OF SUCH DAMAGE.
31 737b0e28 Vangelis Koukis
#
32 737b0e28 Vangelis Koukis
# The views and conclusions contained in the software and
33 737b0e28 Vangelis Koukis
# documentation are those of the authors and should not be
34 737b0e28 Vangelis Koukis
# interpreted as representing official policies, either expressed
35 737b0e28 Vangelis Koukis
# or implied, of GRNET S.A.
36 737b0e28 Vangelis Koukis
#
37 45ebfd48 Vangelis Koukis
38 737b0e28 Vangelis Koukis
"""Ganeti notification daemon with AMQP support
39 b024b97a Georgios Gousios

40 b024b97a Georgios Gousios
A daemon to monitor the Ganeti job queue and publish job progress
41 80bd5072 Georgios Gousios
and Ganeti VM state notifications to the ganeti exchange
42 b024b97a Georgios Gousios
"""
43 b024b97a Georgios Gousios
44 b024b97a Georgios Gousios
import sys
45 b024b97a Georgios Gousios
import os
46 b024b97a Georgios Gousios
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
47 b024b97a Georgios Gousios
sys.path.append(path)
48 b024b97a Georgios Gousios
49 b024b97a Georgios Gousios
import json
50 b024b97a Georgios Gousios
import logging
51 b024b97a Georgios Gousios
import pyinotify
52 b024b97a Georgios Gousios
import daemon
53 b024b97a Georgios Gousios
import daemon.pidlockfile
54 cf2a3529 Christos Stavrakakis
import daemon.runner
55 cf2a3529 Christos Stavrakakis
from lockfile import LockTimeout
56 b024b97a Georgios Gousios
from signal import signal, SIGINT, SIGTERM
57 1dc821c9 Christos Stavrakakis
import setproctitle
58 b024b97a Georgios Gousios
59 0e1f3323 Christos Stavrakakis
from ganeti import utils, jqueue, constants, serializer, cli
60 e66a7e34 Christos Stavrakakis
from ganeti.ssconf import SimpleConfigReader
61 e66a7e34 Christos Stavrakakis
62 b024b97a Georgios Gousios
63 6d6b8f88 Kostas Papadimitriou
from synnefo import settings
64 c4e55622 Christos Stavrakakis
from synnefo.lib.amqp import AMQPClient
65 c4e55622 Christos Stavrakakis
66 27a8e4ae Christos Stavrakakis
67 c4e55622 Christos Stavrakakis
def get_time_from_status(op, job):
68 c4e55622 Christos Stavrakakis
    """Generate a unique message identifier for a ganeti job.
69 c4e55622 Christos Stavrakakis

70 c4e55622 Christos Stavrakakis
    The identifier is based on the timestamp of the job. Since a ganeti
71 c4e55622 Christos Stavrakakis
    job passes from multiple states, we need to pick the timestamp that
72 c4e55622 Christos Stavrakakis
    corresponds to each state.
73 c4e55622 Christos Stavrakakis

74 c4e55622 Christos Stavrakakis
    """
75 c4e55622 Christos Stavrakakis
    status = op.status
76 c4e55622 Christos Stavrakakis
    if status == constants.JOB_STATUS_QUEUED:
77 765ff3ff Christos Stavrakakis
        time = job.received_timestamp
78 27a8e4ae Christos Stavrakakis
    try:  # Compatibility with Ganeti version
79 39ccdb18 Christos Stavrakakis
        if status == constants.JOB_STATUS_WAITLOCK:
80 765ff3ff Christos Stavrakakis
            time = op.start_timestamp
81 39ccdb18 Christos Stavrakakis
    except AttributeError:
82 39ccdb18 Christos Stavrakakis
        if status == constants.JOB_STATUS_WAITING:
83 765ff3ff Christos Stavrakakis
            time = op.start_timestamp
84 c4e55622 Christos Stavrakakis
    if status == constants.JOB_STATUS_CANCELING:
85 765ff3ff Christos Stavrakakis
        time = op.start_timestamp
86 c4e55622 Christos Stavrakakis
    if status == constants.JOB_STATUS_RUNNING:
87 765ff3ff Christos Stavrakakis
        time = op.exec_timestamp
88 c4e55622 Christos Stavrakakis
    if status in constants.JOBS_FINALIZED:
89 765ff3ff Christos Stavrakakis
        time = op.end_timestamp
90 765ff3ff Christos Stavrakakis
91 765ff3ff Christos Stavrakakis
    return time and time or job.end_timestamp
92 c4e55622 Christos Stavrakakis
93 27a8e4ae Christos Stavrakakis
    raise InvalidBackendStatus(status, job)
94 c4e55622 Christos Stavrakakis
95 c4e55622 Christos Stavrakakis
96 0e1f3323 Christos Stavrakakis
def get_instance_nics(instance, logger):
97 0e1f3323 Christos Stavrakakis
    """Query Ganeti to a get the instance's NICs.
98 0e1f3323 Christos Stavrakakis

99 0e1f3323 Christos Stavrakakis
    @type instance: string
100 0e1f3323 Christos Stavrakakis
    @param instance: the name of the instance
101 0e1f3323 Christos Stavrakakis
    @rtype: List of dicts
102 0e1f3323 Christos Stavrakakis
    @retrun: Dictionary containing the instance's NICs. Each dictionary
103 0e1f3323 Christos Stavrakakis
             contains the following keys: 'network', 'ip', 'mac', 'mode',
104 0e1f3323 Christos Stavrakakis
             'link' and 'firewall'
105 0e1f3323 Christos Stavrakakis

106 0e1f3323 Christos Stavrakakis
    """
107 0e1f3323 Christos Stavrakakis
    fields = ["nic.networks", "nic.ips", "nic.macs", "nic.modes", "nic.links",
108 0e1f3323 Christos Stavrakakis
              "tags"]
109 0e1f3323 Christos Stavrakakis
    # Get Ganeti client
110 0e1f3323 Christos Stavrakakis
    client = cli.GetClient()
111 0e1f3323 Christos Stavrakakis
    info = client.QueryInstances([instance], fields, use_locking=False)
112 0e1f3323 Christos Stavrakakis
    networks, ips, macs, modes, links, tags = info[0]
113 0e1f3323 Christos Stavrakakis
    nic_keys = ["network", "ip", "mac", "mode", "link"]
114 0e1f3323 Christos Stavrakakis
    nics = zip(networks, ips, macs, modes, links)
115 0e1f3323 Christos Stavrakakis
    nics = map(lambda x: dict(zip(nic_keys, x)), nics)
116 0e1f3323 Christos Stavrakakis
    # Get firewall from instance Tags
117 0e1f3323 Christos Stavrakakis
    # Tags are of the form synnefo:network:N:firewall_mode
118 0e1f3323 Christos Stavrakakis
    for tag in tags:
119 0e1f3323 Christos Stavrakakis
        t = tag.split(":")
120 0e1f3323 Christos Stavrakakis
        if t[0:2] == ["synnefo", "network"]:
121 0e1f3323 Christos Stavrakakis
            if len(t) != 4:
122 0e1f3323 Christos Stavrakakis
                logger.error("Malformed synefo tag %s", tag)
123 0e1f3323 Christos Stavrakakis
                continue
124 0e1f3323 Christos Stavrakakis
            try:
125 0e1f3323 Christos Stavrakakis
                index = int(t[2])
126 0e1f3323 Christos Stavrakakis
                nics[index]['firewall'] = t[3]
127 0e1f3323 Christos Stavrakakis
            except ValueError:
128 0e1f3323 Christos Stavrakakis
                logger.error("Malformed synnefo tag %s", tag)
129 0e1f3323 Christos Stavrakakis
            except IndexError:
130 0e1f3323 Christos Stavrakakis
                logger.error("Found tag %s for non-existent NIC %d",
131 0e1f3323 Christos Stavrakakis
                             tag, index)
132 0e1f3323 Christos Stavrakakis
    return nics
133 0e1f3323 Christos Stavrakakis
134 0e1f3323 Christos Stavrakakis
135 c4e55622 Christos Stavrakakis
class InvalidBackendStatus(Exception):
136 c4e55622 Christos Stavrakakis
    def __init__(self, status, job):
137 c4e55622 Christos Stavrakakis
        self.status = status
138 c4e55622 Christos Stavrakakis
        self.job = job
139 c4e55622 Christos Stavrakakis
140 c4e55622 Christos Stavrakakis
    def __str__(self):
141 c4e55622 Christos Stavrakakis
        return repr("Invalid backend status: %s in job %s"
142 c4e55622 Christos Stavrakakis
                    % (self.status, self.job))
143 c4e55622 Christos Stavrakakis
144 27a8e4ae Christos Stavrakakis
145 39ccdb18 Christos Stavrakakis
def prefix_from_name(name):
146 39ccdb18 Christos Stavrakakis
    return name.split('-')[0]
147 39ccdb18 Christos Stavrakakis
148 39ccdb18 Christos Stavrakakis
149 39ccdb18 Christos Stavrakakis
def get_field(from_, field):
150 39ccdb18 Christos Stavrakakis
    try:
151 39ccdb18 Christos Stavrakakis
        return getattr(from_, field)
152 39ccdb18 Christos Stavrakakis
    except AttributeError:
153 39ccdb18 Christos Stavrakakis
        None
154 39ccdb18 Christos Stavrakakis
155 45ebfd48 Vangelis Koukis
156 b024b97a Georgios Gousios
class JobFileHandler(pyinotify.ProcessEvent):
157 27a8e4ae Christos Stavrakakis
    def __init__(self, logger, cluster_name):
158 348f53de Georgios Gousios
        pyinotify.ProcessEvent.__init__(self)
159 348f53de Georgios Gousios
        self.logger = logger
160 27a8e4ae Christos Stavrakakis
        self.cluster_name = cluster_name
161 27a8e4ae Christos Stavrakakis
162 2ef10562 Christos Stavrakakis
        # Set max_retries to 0 for unlimited retries.
163 2ef10562 Christos Stavrakakis
        self.client = AMQPClient(hosts=settings.AMQP_HOSTS, confirm_buffer=25,
164 a8858945 Christos Stavrakakis
                                 max_retries=0, logger=logger)
165 2ef10562 Christos Stavrakakis
166 c4e55622 Christos Stavrakakis
        handler_logger.info("Attempting to connect to RabbitMQ hosts")
167 27a8e4ae Christos Stavrakakis
168 c4e55622 Christos Stavrakakis
        self.client.connect()
169 c4e55622 Christos Stavrakakis
        handler_logger.info("Connected succesfully")
170 b024b97a Georgios Gousios
171 27a8e4ae Christos Stavrakakis
        self.client.exchange_declare(settings.EXCHANGE_GANETI, type='topic')
172 27a8e4ae Christos Stavrakakis
173 39ccdb18 Christos Stavrakakis
        self.op_handlers = {"INSTANCE": self.process_instance_op,
174 39ccdb18 Christos Stavrakakis
                            "NETWORK": self.process_network_op}
175 39ccdb18 Christos Stavrakakis
                            # "GROUP": self.process_group_op}
176 39ccdb18 Christos Stavrakakis
177 b024b97a Georgios Gousios
    def process_IN_CLOSE_WRITE(self, event):
178 80dcd124 Vangelis Koukis
        self.process_IN_MOVED_TO(event)
179 80dcd124 Vangelis Koukis
180 80dcd124 Vangelis Koukis
    def process_IN_MOVED_TO(self, event):
181 b024b97a Georgios Gousios
        jobfile = os.path.join(event.path, event.name)
182 b024b97a Georgios Gousios
        if not event.name.startswith("job-"):
183 b024b97a Georgios Gousios
            self.logger.debug("Not a job file: %s" % event.path)
184 b024b97a Georgios Gousios
            return
185 b024b97a Georgios Gousios
186 b024b97a Georgios Gousios
        try:
187 b024b97a Georgios Gousios
            data = utils.ReadFile(jobfile)
188 b024b97a Georgios Gousios
        except IOError:
189 b024b97a Georgios Gousios
            return
190 b024b97a Georgios Gousios
191 b024b97a Georgios Gousios
        data = serializer.LoadJson(data)
192 27a8e4ae Christos Stavrakakis
        try:  # Compatibility with Ganeti version
193 4ed30eed Christos Stavrakakis
            job = jqueue._QueuedJob.Restore(None, data, False)
194 4ed30eed Christos Stavrakakis
        except TypeError:
195 4ed30eed Christos Stavrakakis
            job = jqueue._QueuedJob.Restore(None, data)
196 4ed30eed Christos Stavrakakis
197 39ccdb18 Christos Stavrakakis
        job_id = int(job.id)
198 b024b97a Georgios Gousios
199 b024b97a Georgios Gousios
        for op in job.ops:
200 39ccdb18 Christos Stavrakakis
            op_id = op.input.OP_ID
201 b024b97a Georgios Gousios
202 39ccdb18 Christos Stavrakakis
            msg = None
203 b024b97a Georgios Gousios
            try:
204 39ccdb18 Christos Stavrakakis
                handler_fn = self.op_handlers[op_id.split('_')[1]]
205 39ccdb18 Christos Stavrakakis
                msg, routekey = handler_fn(op, job_id)
206 39ccdb18 Christos Stavrakakis
            except KeyError:
207 b024b97a Georgios Gousios
                pass
208 b024b97a Georgios Gousios
209 39ccdb18 Christos Stavrakakis
            if not msg:
210 39ccdb18 Christos Stavrakakis
                self.logger.debug("Ignoring job: %s: %s", job_id, op_id)
211 07e4ab22 Christos Stavrakakis
                continue
212 07e4ab22 Christos Stavrakakis
213 39ccdb18 Christos Stavrakakis
            # Generate a unique message identifier
214 39ccdb18 Christos Stavrakakis
            event_time = get_time_from_status(op, job)
215 39ccdb18 Christos Stavrakakis
216 b024b97a Georgios Gousios
            # Get the last line of the op log as message
217 b024b97a Georgios Gousios
            try:
218 b024b97a Georgios Gousios
                logmsg = op.log[-1][-1]
219 b024b97a Georgios Gousios
            except IndexError:
220 b024b97a Georgios Gousios
                logmsg = None
221 348f53de Georgios Gousios
222 39ccdb18 Christos Stavrakakis
            # Add shared attributes for all operations
223 39ccdb18 Christos Stavrakakis
            msg.update({"event_time": event_time,
224 39ccdb18 Christos Stavrakakis
                        "operation": op_id,
225 39ccdb18 Christos Stavrakakis
                        "status": op.status,
226 27a8e4ae Christos Stavrakakis
                        "cluster": self.cluster_name,
227 39ccdb18 Christos Stavrakakis
                        "logmsg": logmsg,
228 39ccdb18 Christos Stavrakakis
                        "jobId": job_id})
229 c4e55622 Christos Stavrakakis
230 0e1f3323 Christos Stavrakakis
            if op_id in ["OP_INSTANCE_CREATE", "OP_INSTANCE_SET_PARAMS",
231 0e1f3323 Christos Stavrakakis
                         "OP_INSTANCE_STARTUP"]:
232 0e1f3323 Christos Stavrakakis
                if op.status == "success":
233 0e1f3323 Christos Stavrakakis
                    nics = get_instance_nics(msg["instance"], self.logger)
234 0e1f3323 Christos Stavrakakis
                    msg["nics"] = nics
235 0e1f3323 Christos Stavrakakis
236 c4e55622 Christos Stavrakakis
            msg = json.dumps(msg)
237 39ccdb18 Christos Stavrakakis
            self.logger.debug("Delivering msg: %s (key=%s)", msg, routekey)
238 c4e55622 Christos Stavrakakis
239 c4e55622 Christos Stavrakakis
            # Send the message to RabbitMQ
240 db400d82 Christos Stavrakakis
            self.client.basic_publish(settings.EXCHANGE_GANETI,
241 db400d82 Christos Stavrakakis
                                      routekey,
242 db400d82 Christos Stavrakakis
                                      msg)
243 39ccdb18 Christos Stavrakakis
244 39ccdb18 Christos Stavrakakis
    def process_instance_op(self, op, job_id):
245 39ccdb18 Christos Stavrakakis
        """ Process OP_INSTANCE_* opcodes.
246 39ccdb18 Christos Stavrakakis

247 39ccdb18 Christos Stavrakakis
        """
248 39ccdb18 Christos Stavrakakis
        input = op.input
249 39ccdb18 Christos Stavrakakis
        op_id = input.OP_ID
250 39ccdb18 Christos Stavrakakis
251 39ccdb18 Christos Stavrakakis
        instances = None
252 39ccdb18 Christos Stavrakakis
        instances = get_field(input, 'instance_name')
253 39ccdb18 Christos Stavrakakis
        if not instances:
254 39ccdb18 Christos Stavrakakis
            instances = get_field(input, 'instances')
255 39ccdb18 Christos Stavrakakis
            if not instances or len(instances) > 1:
256 39ccdb18 Christos Stavrakakis
                # Do not publish messages for jobs with no or multiple
257 449787d3 Christos Stavrakakis
                # instances.  Currently snf-dispatcher can not normally handle
258 449787d3 Christos Stavrakakis
                # these messages
259 39ccdb18 Christos Stavrakakis
                return None, None
260 39ccdb18 Christos Stavrakakis
            else:
261 39ccdb18 Christos Stavrakakis
                instances = instances[0]
262 39ccdb18 Christos Stavrakakis
263 39ccdb18 Christos Stavrakakis
        self.logger.debug("Job: %d: %s(%s) %s", job_id, op_id,
264 39ccdb18 Christos Stavrakakis
                          instances, op.status)
265 39ccdb18 Christos Stavrakakis
266 39ccdb18 Christos Stavrakakis
        msg = {"type": "ganeti-op-status",
267 39ccdb18 Christos Stavrakakis
               "instance": instances,
268 39ccdb18 Christos Stavrakakis
               "operation": op_id}
269 39ccdb18 Christos Stavrakakis
270 39ccdb18 Christos Stavrakakis
        routekey = "ganeti.%s.event.op" % prefix_from_name(instances)
271 39ccdb18 Christos Stavrakakis
272 39ccdb18 Christos Stavrakakis
        return msg, routekey
273 39ccdb18 Christos Stavrakakis
274 39ccdb18 Christos Stavrakakis
    def process_network_op(self, op, job_id):
275 39ccdb18 Christos Stavrakakis
        """ Process OP_NETWORK_* opcodes.
276 39ccdb18 Christos Stavrakakis

277 39ccdb18 Christos Stavrakakis
        """
278 39ccdb18 Christos Stavrakakis
279 39ccdb18 Christos Stavrakakis
        input = op.input
280 39ccdb18 Christos Stavrakakis
        op_id = input.OP_ID
281 39ccdb18 Christos Stavrakakis
        network_name = get_field(input, 'network_name')
282 39ccdb18 Christos Stavrakakis
283 39ccdb18 Christos Stavrakakis
        if not network_name:
284 39ccdb18 Christos Stavrakakis
            return None, None
285 39ccdb18 Christos Stavrakakis
286 39ccdb18 Christos Stavrakakis
        self.logger.debug("Job: %d: %s(%s) %s", job_id, op_id,
287 39ccdb18 Christos Stavrakakis
                          network_name, op.status)
288 39ccdb18 Christos Stavrakakis
289 39ccdb18 Christos Stavrakakis
        msg = {'operation':    op_id,
290 39ccdb18 Christos Stavrakakis
               'type':         "ganeti-network-status",
291 39ccdb18 Christos Stavrakakis
               'network':      network_name,
292 39ccdb18 Christos Stavrakakis
               'subnet':       get_field(input, 'network'),
293 39ccdb18 Christos Stavrakakis
               # 'network_mode': get_field(input, 'network_mode'),
294 39ccdb18 Christos Stavrakakis
               # 'network_link': get_field(input, 'network_link'),
295 39ccdb18 Christos Stavrakakis
               'gateway':      get_field(input, 'gateway'),
296 39ccdb18 Christos Stavrakakis
               'group_name':   get_field(input, 'group_name')}
297 39ccdb18 Christos Stavrakakis
298 fd2bdbb2 Christos Stavrakakis
        if op_id == "OP_NETWORK_SET_PARAMS":
299 449787d3 Christos Stavrakakis
            msg["add_reserved_ips"] = get_field(input, "add_reserved_ips")
300 449787d3 Christos Stavrakakis
            msg["remove_reserved_ips"] = get_field(input,
301 449787d3 Christos Stavrakakis
                                                   "remove_reserved_ips")
302 39ccdb18 Christos Stavrakakis
        routekey = "ganeti.%s.event.network" % prefix_from_name(network_name)
303 39ccdb18 Christos Stavrakakis
304 39ccdb18 Christos Stavrakakis
        return msg, routekey
305 39ccdb18 Christos Stavrakakis
306 39ccdb18 Christos Stavrakakis
307 39ccdb18 Christos Stavrakakis
    # def process_group_op(self, op, job_id):
308 39ccdb18 Christos Stavrakakis
    #     """ Process OP_GROUP_* opcodes.
309 39ccdb18 Christos Stavrakakis
310 39ccdb18 Christos Stavrakakis
    #     """
311 39ccdb18 Christos Stavrakakis
    #     return None, None
312 39ccdb18 Christos Stavrakakis
313 39ccdb18 Christos Stavrakakis
314 27a8e4ae Christos Stavrakakis
def find_cluster_name():
315 51fc0054 Christos Stavrakakis
    global handler_logger
316 51fc0054 Christos Stavrakakis
    try:
317 e66a7e34 Christos Stavrakakis
        scr = SimpleConfigReader()
318 e66a7e34 Christos Stavrakakis
        name = scr.GetClusterName()
319 51fc0054 Christos Stavrakakis
    except Exception as e:
320 51fc0054 Christos Stavrakakis
        handler_logger.error('Can not get the name of the Cluster: %s' % e)
321 51fc0054 Christos Stavrakakis
        raise e
322 51fc0054 Christos Stavrakakis
323 27a8e4ae Christos Stavrakakis
    return name
324 39ccdb18 Christos Stavrakakis
325 449787d3 Christos Stavrakakis
326 b024b97a Georgios Gousios
handler_logger = None
327 449787d3 Christos Stavrakakis
328 449787d3 Christos Stavrakakis
329 b024b97a Georgios Gousios
def fatal_signal_handler(signum, frame):
330 b024b97a Georgios Gousios
    global handler_logger
331 b024b97a Georgios Gousios
332 b024b97a Georgios Gousios
    handler_logger.info("Caught fatal signal %d, will raise SystemExit",
333 2cd99e7a Georgios Gousios
                        signum)
334 b024b97a Georgios Gousios
    raise SystemExit
335 b024b97a Georgios Gousios
336 449787d3 Christos Stavrakakis
337 b024b97a Georgios Gousios
def parse_arguments(args):
338 b024b97a Georgios Gousios
    from optparse import OptionParser
339 b024b97a Georgios Gousios
340 b024b97a Georgios Gousios
    parser = OptionParser()
341 b024b97a Georgios Gousios
    parser.add_option("-d", "--debug", action="store_true", dest="debug",
342 2cd99e7a Georgios Gousios
                      help="Enable debugging information")
343 b024b97a Georgios Gousios
    parser.add_option("-l", "--log", dest="log_file",
344 45ebfd48 Vangelis Koukis
                      default="/var/log/snf-ganeti-eventd.log",
345 2cd99e7a Georgios Gousios
                      metavar="FILE",
346 2cd99e7a Georgios Gousios
                      help="Write log to FILE instead of %s" %
347 449787d3 Christos Stavrakakis
                           "/var/log/snf-ganeti-eventd.log")
348 b024b97a Georgios Gousios
    parser.add_option('--pid-file', dest="pid_file",
349 45ebfd48 Vangelis Koukis
                      default="/var/run/snf-ganeti-eventd.pid",
350 2cd99e7a Georgios Gousios
                      metavar='PIDFILE',
351 2cd99e7a Georgios Gousios
                      help="Save PID to file (default: %s)" %
352 449787d3 Christos Stavrakakis
                           "/var/run/snf-ganeti-eventd.pid")
353 b024b97a Georgios Gousios
354 b024b97a Georgios Gousios
    return parser.parse_args(args)
355 b024b97a Georgios Gousios
356 449787d3 Christos Stavrakakis
357 b024b97a Georgios Gousios
def main():
358 b024b97a Georgios Gousios
    global handler_logger
359 b024b97a Georgios Gousios
360 b024b97a Georgios Gousios
    (opts, args) = parse_arguments(sys.argv[1:])
361 b024b97a Georgios Gousios
362 b024b97a Georgios Gousios
    # Initialize logger
363 b024b97a Georgios Gousios
    lvl = logging.DEBUG if opts.debug else logging.INFO
364 03a4b970 Georgios Gousios
    logger = logging.getLogger("ganeti.eventd")
365 b024b97a Georgios Gousios
    logger.setLevel(lvl)
366 36cf1973 Vangelis Koukis
    formatter = logging.Formatter(
367 36cf1973 Vangelis Koukis
        "%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
368 36cf1973 Vangelis Koukis
        "%Y-%m-%d %H:%M:%S")
369 b024b97a Georgios Gousios
    handler = logging.FileHandler(opts.log_file)
370 b024b97a Georgios Gousios
    handler.setFormatter(formatter)
371 b024b97a Georgios Gousios
    logger.addHandler(handler)
372 b024b97a Georgios Gousios
    handler_logger = logger
373 b024b97a Georgios Gousios
374 1dc821c9 Christos Stavrakakis
    # Rename this process so 'ps' output looks like this is a native
375 1dc821c9 Christos Stavrakakis
    # executable.  Can not seperate command-line arguments from actual name of
376 1dc821c9 Christos Stavrakakis
    # the executable by NUL bytes, so only show the name of the executable
377 1dc821c9 Christos Stavrakakis
    # instead.  setproctitle.setproctitle("\x00".join(sys.argv))
378 1dc821c9 Christos Stavrakakis
    setproctitle.setproctitle(sys.argv[0])
379 1dc821c9 Christos Stavrakakis
380 cf2a3529 Christos Stavrakakis
    # Create pidfile
381 cf2a3529 Christos Stavrakakis
    pidf = daemon.pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
382 cf2a3529 Christos Stavrakakis
383 cf2a3529 Christos Stavrakakis
    # Remove any stale PID files, left behind by previous invocations
384 cf2a3529 Christos Stavrakakis
    if daemon.runner.is_pidfile_stale(pidf):
385 cf2a3529 Christos Stavrakakis
        logger.warning("Removing stale PID lock file %s", pidf.path)
386 cf2a3529 Christos Stavrakakis
        pidf.break_lock()
387 cf2a3529 Christos Stavrakakis
388 348f53de Georgios Gousios
    # Become a daemon:
389 348f53de Georgios Gousios
    # Redirect stdout and stderr to handler.stream to catch
390 348f53de Georgios Gousios
    # early errors in the daemonization process [e.g., pidfile creation]
391 348f53de Georgios Gousios
    # which will otherwise go to /dev/null.
392 348f53de Georgios Gousios
    daemon_context = daemon.DaemonContext(
393 449787d3 Christos Stavrakakis
        pidfile=pidf,
394 449787d3 Christos Stavrakakis
        umask=022,
395 449787d3 Christos Stavrakakis
        stdout=handler.stream,
396 449787d3 Christos Stavrakakis
        stderr=handler.stream,
397 449787d3 Christos Stavrakakis
        files_preserve=[handler.stream])
398 cf2a3529 Christos Stavrakakis
    try:
399 cf2a3529 Christos Stavrakakis
        daemon_context.open()
400 cf2a3529 Christos Stavrakakis
    except (daemon.pidlockfile.AlreadyLocked, LockTimeout):
401 cf2a3529 Christos Stavrakakis
        logger.critical("Failed to lock pidfile %s, another instance running?",
402 cf2a3529 Christos Stavrakakis
                        pidf.path)
403 cf2a3529 Christos Stavrakakis
        sys.exit(1)
404 cf2a3529 Christos Stavrakakis
405 348f53de Georgios Gousios
    logger.info("Became a daemon")
406 348f53de Georgios Gousios
407 348f53de Georgios Gousios
    # Catch signals to ensure graceful shutdown
408 348f53de Georgios Gousios
    signal(SIGINT, fatal_signal_handler)
409 348f53de Georgios Gousios
    signal(SIGTERM, fatal_signal_handler)
410 348f53de Georgios Gousios
411 b024b97a Georgios Gousios
    # Monitor the Ganeti job queue, create and push notifications
412 b024b97a Georgios Gousios
    wm = pyinotify.WatchManager()
413 449787d3 Christos Stavrakakis
    mask = (pyinotify.EventsCodes.ALL_FLAGS["IN_MOVED_TO"] |
414 449787d3 Christos Stavrakakis
            pyinotify.EventsCodes.ALL_FLAGS["IN_CLOSE_WRITE"])
415 51fc0054 Christos Stavrakakis
416 27a8e4ae Christos Stavrakakis
    cluster_name = find_cluster_name()
417 51fc0054 Christos Stavrakakis
418 27a8e4ae Christos Stavrakakis
    handler = JobFileHandler(logger, cluster_name)
419 b024b97a Georgios Gousios
    notifier = pyinotify.Notifier(wm, handler)
420 b024b97a Georgios Gousios
421 b024b97a Georgios Gousios
    try:
422 b024b97a Georgios Gousios
        # Fail if adding the inotify() watch fails for any reason
423 b024b97a Georgios Gousios
        res = wm.add_watch(constants.QUEUE_DIR, mask)
424 b024b97a Georgios Gousios
        if res[constants.QUEUE_DIR] < 0:
425 36cf1973 Vangelis Koukis
            raise Exception("pyinotify add_watch returned negative descriptor")
426 348f53de Georgios Gousios
427 449787d3 Christos Stavrakakis
        logger.info("Now watching %s of %s" %
428 449787d3 Christos Stavrakakis
                    (constants.QUEUE_DIR, cluster_name))
429 b024b97a Georgios Gousios
430 b024b97a Georgios Gousios
        while True:    # loop forever
431 348f53de Georgios Gousios
            # process the queue of events as explained above
432 b024b97a Georgios Gousios
            notifier.process_events()
433 b024b97a Georgios Gousios
            if notifier.check_events():
434 b024b97a Georgios Gousios
                # read notified events and enqeue them
435 b024b97a Georgios Gousios
                notifier.read_events()
436 b024b97a Georgios Gousios
    except SystemExit:
437 b024b97a Georgios Gousios
        logger.info("SystemExit")
438 b024b97a Georgios Gousios
    except:
439 b024b97a Georgios Gousios
        logger.exception("Caught exception, terminating")
440 b024b97a Georgios Gousios
    finally:
441 b024b97a Georgios Gousios
        # destroy the inotify's instance on this interrupt (stop monitoring)
442 b024b97a Georgios Gousios
        notifier.stop()
443 b024b97a Georgios Gousios
        raise
444 b024b97a Georgios Gousios
445 b024b97a Georgios Gousios
if __name__ == "__main__":
446 b024b97a Georgios Gousios
    sys.exit(main())
447 b024b97a Georgios Gousios
448 348f53de Georgios Gousios
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :