Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-gtools / synnefo / ganeti / eventd.py @ 27a8e4ae

History | View | Annotate | Download (12.3 kB)

1 b024b97a Georgios Gousios
#!/usr/bin/env python
2 737b0e28 Vangelis Koukis
# -*- coding: utf-8 -*-
3 b024b97a Georgios Gousios
#
4 737b0e28 Vangelis Koukis
# Copyright 2011 GRNET S.A. All rights reserved.
5 737b0e28 Vangelis Koukis
#
6 737b0e28 Vangelis Koukis
# Redistribution and use in source and binary forms, with or
7 737b0e28 Vangelis Koukis
# without modification, are permitted provided that the following
8 737b0e28 Vangelis Koukis
# conditions are met:
9 737b0e28 Vangelis Koukis
#
10 737b0e28 Vangelis Koukis
#   1. Redistributions of source code must retain the above
11 737b0e28 Vangelis Koukis
#      copyright notice, this list of conditions and the following
12 737b0e28 Vangelis Koukis
#      disclaimer.
13 737b0e28 Vangelis Koukis
#
14 737b0e28 Vangelis Koukis
#   2. Redistributions in binary form must reproduce the above
15 737b0e28 Vangelis Koukis
#      copyright notice, this list of conditions and the following
16 737b0e28 Vangelis Koukis
#      disclaimer in the documentation and/or other materials
17 737b0e28 Vangelis Koukis
#      provided with the distribution.
18 737b0e28 Vangelis Koukis
#
19 737b0e28 Vangelis Koukis
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20 737b0e28 Vangelis Koukis
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 737b0e28 Vangelis Koukis
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22 737b0e28 Vangelis Koukis
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23 737b0e28 Vangelis Koukis
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24 737b0e28 Vangelis Koukis
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25 737b0e28 Vangelis Koukis
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26 737b0e28 Vangelis Koukis
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27 737b0e28 Vangelis Koukis
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28 737b0e28 Vangelis Koukis
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29 737b0e28 Vangelis Koukis
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30 737b0e28 Vangelis Koukis
# POSSIBILITY OF SUCH DAMAGE.
31 737b0e28 Vangelis Koukis
#
32 737b0e28 Vangelis Koukis
# The views and conclusions contained in the software and
33 737b0e28 Vangelis Koukis
# documentation are those of the authors and should not be
34 737b0e28 Vangelis Koukis
# interpreted as representing official policies, either expressed
35 737b0e28 Vangelis Koukis
# or implied, of GRNET S.A.
36 737b0e28 Vangelis Koukis
#
37 45ebfd48 Vangelis Koukis
38 737b0e28 Vangelis Koukis
"""Ganeti notification daemon with AMQP support
39 b024b97a Georgios Gousios

40 b024b97a Georgios Gousios
A daemon to monitor the Ganeti job queue and publish job progress
41 80bd5072 Georgios Gousios
and Ganeti VM state notifications to the ganeti exchange
42 b024b97a Georgios Gousios
"""
43 b024b97a Georgios Gousios
44 b024b97a Georgios Gousios
import sys
45 b024b97a Georgios Gousios
import os
46 b024b97a Georgios Gousios
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
47 b024b97a Georgios Gousios
sys.path.append(path)
48 b024b97a Georgios Gousios
49 b024b97a Georgios Gousios
import json
50 b024b97a Georgios Gousios
import logging
51 b024b97a Georgios Gousios
import pyinotify
52 b024b97a Georgios Gousios
import daemon
53 b024b97a Georgios Gousios
import daemon.pidlockfile
54 cf2a3529 Christos Stavrakakis
import daemon.runner
55 cf2a3529 Christos Stavrakakis
from lockfile import LockTimeout
56 b024b97a Georgios Gousios
from signal import signal, SIGINT, SIGTERM
57 b024b97a Georgios Gousios
58 b024b97a Georgios Gousios
from ganeti import utils
59 b024b97a Georgios Gousios
from ganeti import jqueue
60 b024b97a Georgios Gousios
from ganeti import constants
61 b024b97a Georgios Gousios
from ganeti import serializer
62 b024b97a Georgios Gousios
63 6d6b8f88 Kostas Papadimitriou
from synnefo import settings
64 c4e55622 Christos Stavrakakis
from synnefo.lib.amqp import AMQPClient
65 c4e55622 Christos Stavrakakis
66 27a8e4ae Christos Stavrakakis
67 c4e55622 Christos Stavrakakis
def get_time_from_status(op, job):
68 c4e55622 Christos Stavrakakis
    """Generate a unique message identifier for a ganeti job.
69 c4e55622 Christos Stavrakakis

70 c4e55622 Christos Stavrakakis
    The identifier is based on the timestamp of the job. Since a ganeti
71 c4e55622 Christos Stavrakakis
    job passes from multiple states, we need to pick the timestamp that
72 c4e55622 Christos Stavrakakis
    corresponds to each state.
73 c4e55622 Christos Stavrakakis

74 c4e55622 Christos Stavrakakis
    """
75 c4e55622 Christos Stavrakakis
    status = op.status
76 c4e55622 Christos Stavrakakis
    if status == constants.JOB_STATUS_QUEUED:
77 c4e55622 Christos Stavrakakis
        return job.received_timestamp
78 27a8e4ae Christos Stavrakakis
    try:  # Compatibility with Ganeti version
79 39ccdb18 Christos Stavrakakis
        if status == constants.JOB_STATUS_WAITLOCK:
80 39ccdb18 Christos Stavrakakis
            return op.start_timestamp
81 39ccdb18 Christos Stavrakakis
    except AttributeError:
82 39ccdb18 Christos Stavrakakis
        if status == constants.JOB_STATUS_WAITING:
83 39ccdb18 Christos Stavrakakis
            return op.start_timestamp
84 c4e55622 Christos Stavrakakis
    if status == constants.JOB_STATUS_CANCELING:
85 c4e55622 Christos Stavrakakis
        return op.start_timestamp
86 c4e55622 Christos Stavrakakis
    if status == constants.JOB_STATUS_RUNNING:
87 c4e55622 Christos Stavrakakis
        return op.exec_timestamp
88 c4e55622 Christos Stavrakakis
    if status in constants.JOBS_FINALIZED:
89 c4e55622 Christos Stavrakakis
        # success, canceled, error
90 c4e55622 Christos Stavrakakis
        return op.end_timestamp
91 c4e55622 Christos Stavrakakis
92 27a8e4ae Christos Stavrakakis
    raise InvalidBackendStatus(status, job)
93 c4e55622 Christos Stavrakakis
94 c4e55622 Christos Stavrakakis
95 c4e55622 Christos Stavrakakis
class InvalidBackendStatus(Exception):
96 c4e55622 Christos Stavrakakis
    def __init__(self, status, job):
97 c4e55622 Christos Stavrakakis
        self.status = status
98 c4e55622 Christos Stavrakakis
        self.job = job
99 c4e55622 Christos Stavrakakis
100 c4e55622 Christos Stavrakakis
    def __str__(self):
101 c4e55622 Christos Stavrakakis
        return repr("Invalid backend status: %s in job %s"
102 c4e55622 Christos Stavrakakis
                    % (self.status, self.job))
103 c4e55622 Christos Stavrakakis
104 27a8e4ae Christos Stavrakakis
105 39ccdb18 Christos Stavrakakis
def prefix_from_name(name):
106 39ccdb18 Christos Stavrakakis
    return name.split('-')[0]
107 39ccdb18 Christos Stavrakakis
108 39ccdb18 Christos Stavrakakis
109 39ccdb18 Christos Stavrakakis
def get_field(from_, field):
110 39ccdb18 Christos Stavrakakis
    try:
111 39ccdb18 Christos Stavrakakis
        return getattr(from_, field)
112 39ccdb18 Christos Stavrakakis
    except AttributeError:
113 39ccdb18 Christos Stavrakakis
        None
114 39ccdb18 Christos Stavrakakis
115 45ebfd48 Vangelis Koukis
116 b024b97a Georgios Gousios
class JobFileHandler(pyinotify.ProcessEvent):
117 27a8e4ae Christos Stavrakakis
    def __init__(self, logger, cluster_name):
118 348f53de Georgios Gousios
        pyinotify.ProcessEvent.__init__(self)
119 348f53de Georgios Gousios
        self.logger = logger
120 27a8e4ae Christos Stavrakakis
        self.cluster_name = cluster_name
121 27a8e4ae Christos Stavrakakis
122 b85320d0 Christos Stavrakakis
        self.client = AMQPClient(confirm_buffer=25)
123 c4e55622 Christos Stavrakakis
        handler_logger.info("Attempting to connect to RabbitMQ hosts")
124 27a8e4ae Christos Stavrakakis
125 c4e55622 Christos Stavrakakis
        self.client.connect()
126 c4e55622 Christos Stavrakakis
        handler_logger.info("Connected succesfully")
127 b024b97a Georgios Gousios
128 27a8e4ae Christos Stavrakakis
        self.client.exchange_declare(settings.EXCHANGE_GANETI, type='topic')
129 27a8e4ae Christos Stavrakakis
130 39ccdb18 Christos Stavrakakis
        self.op_handlers = {"INSTANCE": self.process_instance_op,
131 39ccdb18 Christos Stavrakakis
                            "NETWORK": self.process_network_op}
132 39ccdb18 Christos Stavrakakis
                            # "GROUP": self.process_group_op}
133 39ccdb18 Christos Stavrakakis
134 b024b97a Georgios Gousios
    def process_IN_CLOSE_WRITE(self, event):
135 80dcd124 Vangelis Koukis
        self.process_IN_MOVED_TO(event)
136 80dcd124 Vangelis Koukis
137 80dcd124 Vangelis Koukis
    def process_IN_MOVED_TO(self, event):
138 b024b97a Georgios Gousios
        jobfile = os.path.join(event.path, event.name)
139 b024b97a Georgios Gousios
        if not event.name.startswith("job-"):
140 b024b97a Georgios Gousios
            self.logger.debug("Not a job file: %s" % event.path)
141 b024b97a Georgios Gousios
            return
142 b024b97a Georgios Gousios
143 b024b97a Georgios Gousios
        try:
144 b024b97a Georgios Gousios
            data = utils.ReadFile(jobfile)
145 b024b97a Georgios Gousios
        except IOError:
146 b024b97a Georgios Gousios
            return
147 b024b97a Georgios Gousios
148 b024b97a Georgios Gousios
        data = serializer.LoadJson(data)
149 27a8e4ae Christos Stavrakakis
        try:  # Compatibility with Ganeti version
150 4ed30eed Christos Stavrakakis
            job = jqueue._QueuedJob.Restore(None, data, False)
151 4ed30eed Christos Stavrakakis
        except TypeError:
152 4ed30eed Christos Stavrakakis
            job = jqueue._QueuedJob.Restore(None, data)
153 4ed30eed Christos Stavrakakis
154 39ccdb18 Christos Stavrakakis
        job_id = int(job.id)
155 b024b97a Georgios Gousios
156 b024b97a Georgios Gousios
        for op in job.ops:
157 39ccdb18 Christos Stavrakakis
            op_id = op.input.OP_ID
158 b024b97a Georgios Gousios
159 39ccdb18 Christos Stavrakakis
            msg = None
160 b024b97a Georgios Gousios
            try:
161 39ccdb18 Christos Stavrakakis
                handler_fn = self.op_handlers[op_id.split('_')[1]]
162 39ccdb18 Christos Stavrakakis
                msg, routekey = handler_fn(op, job_id)
163 39ccdb18 Christos Stavrakakis
            except KeyError:
164 b024b97a Georgios Gousios
                pass
165 b024b97a Georgios Gousios
166 39ccdb18 Christos Stavrakakis
            if not msg:
167 39ccdb18 Christos Stavrakakis
                self.logger.debug("Ignoring job: %s: %s", job_id, op_id)
168 07e4ab22 Christos Stavrakakis
                continue
169 07e4ab22 Christos Stavrakakis
170 39ccdb18 Christos Stavrakakis
            # Generate a unique message identifier
171 39ccdb18 Christos Stavrakakis
            event_time = get_time_from_status(op, job)
172 39ccdb18 Christos Stavrakakis
173 b024b97a Georgios Gousios
            # Get the last line of the op log as message
174 b024b97a Georgios Gousios
            try:
175 b024b97a Georgios Gousios
                logmsg = op.log[-1][-1]
176 b024b97a Georgios Gousios
            except IndexError:
177 b024b97a Georgios Gousios
                logmsg = None
178 348f53de Georgios Gousios
179 39ccdb18 Christos Stavrakakis
            # Add shared attributes for all operations
180 39ccdb18 Christos Stavrakakis
            msg.update({"event_time": event_time,
181 39ccdb18 Christos Stavrakakis
                        "operation": op_id,
182 39ccdb18 Christos Stavrakakis
                        "status": op.status,
183 27a8e4ae Christos Stavrakakis
                        "cluster": self.cluster_name,
184 39ccdb18 Christos Stavrakakis
                        "logmsg": logmsg,
185 39ccdb18 Christos Stavrakakis
                        "jobId": job_id})
186 c4e55622 Christos Stavrakakis
187 c4e55622 Christos Stavrakakis
            msg = json.dumps(msg)
188 39ccdb18 Christos Stavrakakis
            self.logger.debug("Delivering msg: %s (key=%s)", msg, routekey)
189 c4e55622 Christos Stavrakakis
190 c4e55622 Christos Stavrakakis
            # Send the message to RabbitMQ
191 db400d82 Christos Stavrakakis
            self.client.basic_publish(settings.EXCHANGE_GANETI,
192 db400d82 Christos Stavrakakis
                                      routekey,
193 db400d82 Christos Stavrakakis
                                      msg)
194 39ccdb18 Christos Stavrakakis
195 39ccdb18 Christos Stavrakakis
    def process_instance_op(self, op, job_id):
196 39ccdb18 Christos Stavrakakis
        """ Process OP_INSTANCE_* opcodes.
197 39ccdb18 Christos Stavrakakis

198 39ccdb18 Christos Stavrakakis
        """
199 39ccdb18 Christos Stavrakakis
        input = op.input
200 39ccdb18 Christos Stavrakakis
        op_id = input.OP_ID
201 39ccdb18 Christos Stavrakakis
202 39ccdb18 Christos Stavrakakis
        instances = None
203 39ccdb18 Christos Stavrakakis
        instances = get_field(input, 'instance_name')
204 39ccdb18 Christos Stavrakakis
        if not instances:
205 39ccdb18 Christos Stavrakakis
            instances = get_field(input, 'instances')
206 39ccdb18 Christos Stavrakakis
            if not instances or len(instances) > 1:
207 39ccdb18 Christos Stavrakakis
                # Do not publish messages for jobs with no or multiple
208 39ccdb18 Christos Stavrakakis
                # instances.
209 39ccdb18 Christos Stavrakakis
                # Currently snf-dispatcher can not normally handle these messages
210 39ccdb18 Christos Stavrakakis
                return None, None
211 39ccdb18 Christos Stavrakakis
            else:
212 39ccdb18 Christos Stavrakakis
                instances = instances[0]
213 39ccdb18 Christos Stavrakakis
214 39ccdb18 Christos Stavrakakis
        self.logger.debug("Job: %d: %s(%s) %s", job_id, op_id,
215 39ccdb18 Christos Stavrakakis
                          instances, op.status)
216 39ccdb18 Christos Stavrakakis
217 39ccdb18 Christos Stavrakakis
        msg = {"type": "ganeti-op-status",
218 39ccdb18 Christos Stavrakakis
               "instance": instances,
219 39ccdb18 Christos Stavrakakis
               "operation": op_id}
220 39ccdb18 Christos Stavrakakis
221 39ccdb18 Christos Stavrakakis
        routekey = "ganeti.%s.event.op" % prefix_from_name(instances)
222 39ccdb18 Christos Stavrakakis
223 39ccdb18 Christos Stavrakakis
        return msg, routekey
224 39ccdb18 Christos Stavrakakis
225 39ccdb18 Christos Stavrakakis
    def process_network_op(self, op, job_id):
226 39ccdb18 Christos Stavrakakis
        """ Process OP_NETWORK_* opcodes.
227 39ccdb18 Christos Stavrakakis

228 39ccdb18 Christos Stavrakakis
        """
229 39ccdb18 Christos Stavrakakis
230 39ccdb18 Christos Stavrakakis
        input = op.input
231 39ccdb18 Christos Stavrakakis
        op_id = input.OP_ID
232 39ccdb18 Christos Stavrakakis
        network_name = get_field(input, 'network_name')
233 39ccdb18 Christos Stavrakakis
234 39ccdb18 Christos Stavrakakis
        if not network_name:
235 39ccdb18 Christos Stavrakakis
            return None, None
236 39ccdb18 Christos Stavrakakis
237 39ccdb18 Christos Stavrakakis
        self.logger.debug("Job: %d: %s(%s) %s", job_id, op_id,
238 39ccdb18 Christos Stavrakakis
                          network_name, op.status)
239 39ccdb18 Christos Stavrakakis
240 39ccdb18 Christos Stavrakakis
        msg = {'operation':    op_id,
241 39ccdb18 Christos Stavrakakis
               'type':         "ganeti-network-status",
242 39ccdb18 Christos Stavrakakis
               'network':      network_name,
243 39ccdb18 Christos Stavrakakis
               'subnet':       get_field(input, 'network'),
244 39ccdb18 Christos Stavrakakis
               # 'network_mode': get_field(input, 'network_mode'),
245 39ccdb18 Christos Stavrakakis
               # 'network_link': get_field(input, 'network_link'),
246 39ccdb18 Christos Stavrakakis
               'gateway':      get_field(input, 'gateway'),
247 39ccdb18 Christos Stavrakakis
               # 'reserved_ips': get_field(input, 'reserved_ips'),
248 39ccdb18 Christos Stavrakakis
               'group_name':   get_field(input, 'group_name')}
249 39ccdb18 Christos Stavrakakis
250 39ccdb18 Christos Stavrakakis
        routekey = "ganeti.%s.event.network" % prefix_from_name(network_name)
251 39ccdb18 Christos Stavrakakis
252 39ccdb18 Christos Stavrakakis
        return msg, routekey
253 39ccdb18 Christos Stavrakakis
254 39ccdb18 Christos Stavrakakis
255 39ccdb18 Christos Stavrakakis
    # def process_group_op(self, op, job_id):
256 39ccdb18 Christos Stavrakakis
    #     """ Process OP_GROUP_* opcodes.
257 39ccdb18 Christos Stavrakakis
258 39ccdb18 Christos Stavrakakis
    #     """
259 39ccdb18 Christos Stavrakakis
    #     return None, None
260 39ccdb18 Christos Stavrakakis
261 39ccdb18 Christos Stavrakakis
262 39ccdb18 Christos Stavrakakis
263 39ccdb18 Christos Stavrakakis
264 27a8e4ae Christos Stavrakakis
def find_cluster_name():
265 27a8e4ae Christos Stavrakakis
    path = constants.DATA_DIR + "/ssconf_" + constants.SS_CLUSTER_NAME
266 27a8e4ae Christos Stavrakakis
    f = open(path)
267 27a8e4ae Christos Stavrakakis
    name = f.readline().rstrip()
268 27a8e4ae Christos Stavrakakis
    f.close()
269 27a8e4ae Christos Stavrakakis
    return name
270 39ccdb18 Christos Stavrakakis
271 b024b97a Georgios Gousios
handler_logger = None
272 b024b97a Georgios Gousios
def fatal_signal_handler(signum, frame):
273 b024b97a Georgios Gousios
    global handler_logger
274 b024b97a Georgios Gousios
275 b024b97a Georgios Gousios
    handler_logger.info("Caught fatal signal %d, will raise SystemExit",
276 2cd99e7a Georgios Gousios
                        signum)
277 b024b97a Georgios Gousios
    raise SystemExit
278 b024b97a Georgios Gousios
279 b024b97a Georgios Gousios
def parse_arguments(args):
280 b024b97a Georgios Gousios
    from optparse import OptionParser
281 b024b97a Georgios Gousios
282 b024b97a Georgios Gousios
    parser = OptionParser()
283 b024b97a Georgios Gousios
    parser.add_option("-d", "--debug", action="store_true", dest="debug",
284 2cd99e7a Georgios Gousios
                      help="Enable debugging information")
285 b024b97a Georgios Gousios
    parser.add_option("-l", "--log", dest="log_file",
286 45ebfd48 Vangelis Koukis
                      default="/var/log/snf-ganeti-eventd.log",
287 2cd99e7a Georgios Gousios
                      metavar="FILE",
288 2cd99e7a Georgios Gousios
                      help="Write log to FILE instead of %s" %
289 45ebfd48 Vangelis Koukis
                          "/var/log/snf-ganeti-eventd.log")
290 b024b97a Georgios Gousios
    parser.add_option('--pid-file', dest="pid_file",
291 45ebfd48 Vangelis Koukis
                      default="/var/run/snf-ganeti-eventd.pid",
292 2cd99e7a Georgios Gousios
                      metavar='PIDFILE',
293 2cd99e7a Georgios Gousios
                      help="Save PID to file (default: %s)" %
294 45ebfd48 Vangelis Koukis
                          "/var/run/snf-ganeti-eventd.pid")
295 b024b97a Georgios Gousios
296 b024b97a Georgios Gousios
    return parser.parse_args(args)
297 b024b97a Georgios Gousios
298 b024b97a Georgios Gousios
def main():
299 b024b97a Georgios Gousios
    global handler_logger
300 b024b97a Georgios Gousios
301 b024b97a Georgios Gousios
    (opts, args) = parse_arguments(sys.argv[1:])
302 b024b97a Georgios Gousios
303 b024b97a Georgios Gousios
304 b024b97a Georgios Gousios
    # Initialize logger
305 b024b97a Georgios Gousios
    lvl = logging.DEBUG if opts.debug else logging.INFO
306 03a4b970 Georgios Gousios
    logger = logging.getLogger("ganeti.eventd")
307 b024b97a Georgios Gousios
    logger.setLevel(lvl)
308 36cf1973 Vangelis Koukis
    formatter = logging.Formatter(
309 36cf1973 Vangelis Koukis
        "%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
310 36cf1973 Vangelis Koukis
        "%Y-%m-%d %H:%M:%S")
311 b024b97a Georgios Gousios
    handler = logging.FileHandler(opts.log_file)
312 b024b97a Georgios Gousios
    handler.setFormatter(formatter)
313 b024b97a Georgios Gousios
    logger.addHandler(handler)
314 b024b97a Georgios Gousios
    handler_logger = logger
315 b024b97a Georgios Gousios
316 cf2a3529 Christos Stavrakakis
    # Create pidfile
317 cf2a3529 Christos Stavrakakis
    pidf = daemon.pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
318 cf2a3529 Christos Stavrakakis
319 cf2a3529 Christos Stavrakakis
    # Remove any stale PID files, left behind by previous invocations
320 cf2a3529 Christos Stavrakakis
    if daemon.runner.is_pidfile_stale(pidf):
321 cf2a3529 Christos Stavrakakis
        logger.warning("Removing stale PID lock file %s", pidf.path)
322 cf2a3529 Christos Stavrakakis
        pidf.break_lock()
323 cf2a3529 Christos Stavrakakis
324 348f53de Georgios Gousios
    # Become a daemon:
325 348f53de Georgios Gousios
    # Redirect stdout and stderr to handler.stream to catch
326 348f53de Georgios Gousios
    # early errors in the daemonization process [e.g., pidfile creation]
327 348f53de Georgios Gousios
    # which will otherwise go to /dev/null.
328 348f53de Georgios Gousios
    daemon_context = daemon.DaemonContext(
329 348f53de Georgios Gousios
            pidfile=pidf,
330 348f53de Georgios Gousios
            umask=022,
331 348f53de Georgios Gousios
            stdout=handler.stream,
332 348f53de Georgios Gousios
            stderr=handler.stream,
333 348f53de Georgios Gousios
            files_preserve=[handler.stream])
334 cf2a3529 Christos Stavrakakis
    try:
335 cf2a3529 Christos Stavrakakis
        daemon_context.open()
336 cf2a3529 Christos Stavrakakis
    except (daemon.pidlockfile.AlreadyLocked, LockTimeout):
337 cf2a3529 Christos Stavrakakis
        logger.critical("Failed to lock pidfile %s, another instance running?",
338 cf2a3529 Christos Stavrakakis
                        pidf.path)
339 cf2a3529 Christos Stavrakakis
        sys.exit(1)
340 cf2a3529 Christos Stavrakakis
341 348f53de Georgios Gousios
    logger.info("Became a daemon")
342 348f53de Georgios Gousios
343 348f53de Georgios Gousios
    # Catch signals to ensure graceful shutdown
344 348f53de Georgios Gousios
    signal(SIGINT, fatal_signal_handler)
345 348f53de Georgios Gousios
    signal(SIGTERM, fatal_signal_handler)
346 348f53de Georgios Gousios
347 b024b97a Georgios Gousios
    # Monitor the Ganeti job queue, create and push notifications
348 b024b97a Georgios Gousios
    wm = pyinotify.WatchManager()
349 80dcd124 Vangelis Koukis
    mask = pyinotify.EventsCodes.ALL_FLAGS["IN_MOVED_TO"] | \
350 80dcd124 Vangelis Koukis
           pyinotify.EventsCodes.ALL_FLAGS["IN_CLOSE_WRITE"]
351 27a8e4ae Christos Stavrakakis
    cluster_name = find_cluster_name()
352 27a8e4ae Christos Stavrakakis
    handler = JobFileHandler(logger, cluster_name)
353 b024b97a Georgios Gousios
    notifier = pyinotify.Notifier(wm, handler)
354 b024b97a Georgios Gousios
355 b024b97a Georgios Gousios
    try:
356 b024b97a Georgios Gousios
        # Fail if adding the inotify() watch fails for any reason
357 b024b97a Georgios Gousios
        res = wm.add_watch(constants.QUEUE_DIR, mask)
358 b024b97a Georgios Gousios
        if res[constants.QUEUE_DIR] < 0:
359 36cf1973 Vangelis Koukis
            raise Exception("pyinotify add_watch returned negative descriptor")
360 348f53de Georgios Gousios
361 27a8e4ae Christos Stavrakakis
        logger.info("Now watching %s of %s" % (constants.QUEUE_DIR,
362 27a8e4ae Christos Stavrakakis
                cluster_name))
363 b024b97a Georgios Gousios
364 b024b97a Georgios Gousios
        while True:    # loop forever
365 348f53de Georgios Gousios
            # process the queue of events as explained above
366 b024b97a Georgios Gousios
            notifier.process_events()
367 b024b97a Georgios Gousios
            if notifier.check_events():
368 b024b97a Georgios Gousios
                # read notified events and enqeue them
369 b024b97a Georgios Gousios
                notifier.read_events()
370 b024b97a Georgios Gousios
    except SystemExit:
371 b024b97a Georgios Gousios
        logger.info("SystemExit")
372 b024b97a Georgios Gousios
    except:
373 b024b97a Georgios Gousios
        logger.exception("Caught exception, terminating")
374 b024b97a Georgios Gousios
    finally:
375 b024b97a Georgios Gousios
        # destroy the inotify's instance on this interrupt (stop monitoring)
376 b024b97a Georgios Gousios
        notifier.stop()
377 b024b97a Georgios Gousios
        raise
378 b024b97a Georgios Gousios
379 b024b97a Georgios Gousios
if __name__ == "__main__":
380 b024b97a Georgios Gousios
    sys.exit(main())
381 b024b97a Georgios Gousios
382 348f53de Georgios Gousios
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :