Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-gtools / synnefo / ganeti / eventd.py @ dc7159be

History | View | Annotate | Download (16.7 kB)

1 b024b97a Georgios Gousios
#!/usr/bin/env python
2 737b0e28 Vangelis Koukis
# -*- coding: utf-8 -*-
3 b024b97a Georgios Gousios
#
4 737b0e28 Vangelis Koukis
# Copyright 2011 GRNET S.A. All rights reserved.
5 737b0e28 Vangelis Koukis
#
6 737b0e28 Vangelis Koukis
# Redistribution and use in source and binary forms, with or
7 737b0e28 Vangelis Koukis
# without modification, are permitted provided that the following
8 737b0e28 Vangelis Koukis
# conditions are met:
9 737b0e28 Vangelis Koukis
#
10 737b0e28 Vangelis Koukis
#   1. Redistributions of source code must retain the above
11 737b0e28 Vangelis Koukis
#      copyright notice, this list of conditions and the following
12 737b0e28 Vangelis Koukis
#      disclaimer.
13 737b0e28 Vangelis Koukis
#
14 737b0e28 Vangelis Koukis
#   2. Redistributions in binary form must reproduce the above
15 737b0e28 Vangelis Koukis
#      copyright notice, this list of conditions and the following
16 737b0e28 Vangelis Koukis
#      disclaimer in the documentation and/or other materials
17 737b0e28 Vangelis Koukis
#      provided with the distribution.
18 737b0e28 Vangelis Koukis
#
19 737b0e28 Vangelis Koukis
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20 737b0e28 Vangelis Koukis
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 737b0e28 Vangelis Koukis
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22 737b0e28 Vangelis Koukis
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23 737b0e28 Vangelis Koukis
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24 737b0e28 Vangelis Koukis
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25 737b0e28 Vangelis Koukis
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26 737b0e28 Vangelis Koukis
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27 737b0e28 Vangelis Koukis
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28 737b0e28 Vangelis Koukis
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29 737b0e28 Vangelis Koukis
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30 737b0e28 Vangelis Koukis
# POSSIBILITY OF SUCH DAMAGE.
31 737b0e28 Vangelis Koukis
#
32 737b0e28 Vangelis Koukis
# The views and conclusions contained in the software and
33 737b0e28 Vangelis Koukis
# documentation are those of the authors and should not be
34 737b0e28 Vangelis Koukis
# interpreted as representing official policies, either expressed
35 737b0e28 Vangelis Koukis
# or implied, of GRNET S.A.
36 737b0e28 Vangelis Koukis
#
37 45ebfd48 Vangelis Koukis
38 737b0e28 Vangelis Koukis
"""Ganeti notification daemon with AMQP support
39 b024b97a Georgios Gousios

40 b024b97a Georgios Gousios
A daemon to monitor the Ganeti job queue and publish job progress
41 80bd5072 Georgios Gousios
and Ganeti VM state notifications to the ganeti exchange
42 b024b97a Georgios Gousios
"""
43 b024b97a Georgios Gousios
44 b024b97a Georgios Gousios
import sys
45 b024b97a Georgios Gousios
import os
46 b024b97a Georgios Gousios
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
47 b024b97a Georgios Gousios
sys.path.append(path)
48 3bac1e87 Christos Stavrakakis
# Since Ganeti 2.7, debian package ships the majority of the python code in
49 3bac1e87 Christos Stavrakakis
# a private module under '/usr/share/ganeti'. Add this directory to path
50 3bac1e87 Christos Stavrakakis
# in order to be able to import ganeti. Also, add it to the start of path
51 3bac1e87 Christos Stavrakakis
# to allow conflicts with Ganeti RAPI client.
52 3bac1e87 Christos Stavrakakis
sys.path.insert(0, "/usr/share/ganeti")
53 b024b97a Georgios Gousios
54 b024b97a Georgios Gousios
import json
55 b024b97a Georgios Gousios
import logging
56 b024b97a Georgios Gousios
import pyinotify
57 b024b97a Georgios Gousios
import daemon
58 b024b97a Georgios Gousios
import daemon.pidlockfile
59 cf2a3529 Christos Stavrakakis
import daemon.runner
60 cf2a3529 Christos Stavrakakis
from lockfile import LockTimeout
61 b024b97a Georgios Gousios
from signal import signal, SIGINT, SIGTERM
62 1dc821c9 Christos Stavrakakis
import setproctitle
63 b024b97a Georgios Gousios
64 a9bb2a1a Christos Stavrakakis
from ganeti import utils, jqueue, constants, serializer, pathutils, cli
65 6df16263 Christos Stavrakakis
from ganeti import errors as ganeti_errors
66 a9bb2a1a Christos Stavrakakis
from ganeti.ssconf import SimpleStore
67 e66a7e34 Christos Stavrakakis
68 b024b97a Georgios Gousios
69 6d6b8f88 Kostas Papadimitriou
from synnefo import settings
70 c4e55622 Christos Stavrakakis
from synnefo.lib.amqp import AMQPClient
71 c4e55622 Christos Stavrakakis
72 27a8e4ae Christos Stavrakakis
73 c4e55622 Christos Stavrakakis
def get_time_from_status(op, job):
74 c4e55622 Christos Stavrakakis
    """Generate a unique message identifier for a ganeti job.
75 c4e55622 Christos Stavrakakis

76 c4e55622 Christos Stavrakakis
    The identifier is based on the timestamp of the job. Since a ganeti
77 c4e55622 Christos Stavrakakis
    job passes from multiple states, we need to pick the timestamp that
78 c4e55622 Christos Stavrakakis
    corresponds to each state.
79 c4e55622 Christos Stavrakakis

80 c4e55622 Christos Stavrakakis
    """
81 c4e55622 Christos Stavrakakis
    status = op.status
82 c4e55622 Christos Stavrakakis
    if status == constants.JOB_STATUS_QUEUED:
83 765ff3ff Christos Stavrakakis
        time = job.received_timestamp
84 27a8e4ae Christos Stavrakakis
    try:  # Compatibility with Ganeti version
85 39ccdb18 Christos Stavrakakis
        if status == constants.JOB_STATUS_WAITLOCK:
86 765ff3ff Christos Stavrakakis
            time = op.start_timestamp
87 39ccdb18 Christos Stavrakakis
    except AttributeError:
88 39ccdb18 Christos Stavrakakis
        if status == constants.JOB_STATUS_WAITING:
89 765ff3ff Christos Stavrakakis
            time = op.start_timestamp
90 c4e55622 Christos Stavrakakis
    if status == constants.JOB_STATUS_CANCELING:
91 765ff3ff Christos Stavrakakis
        time = op.start_timestamp
92 c4e55622 Christos Stavrakakis
    if status == constants.JOB_STATUS_RUNNING:
93 765ff3ff Christos Stavrakakis
        time = op.exec_timestamp
94 c4e55622 Christos Stavrakakis
    if status in constants.JOBS_FINALIZED:
95 765ff3ff Christos Stavrakakis
        time = op.end_timestamp
96 765ff3ff Christos Stavrakakis
97 765ff3ff Christos Stavrakakis
    return time and time or job.end_timestamp
98 c4e55622 Christos Stavrakakis
99 27a8e4ae Christos Stavrakakis
    raise InvalidBackendStatus(status, job)
100 c4e55622 Christos Stavrakakis
101 c4e55622 Christos Stavrakakis
102 0e1f3323 Christos Stavrakakis
def get_instance_nics(instance, logger):
103 0e1f3323 Christos Stavrakakis
    """Query Ganeti to a get the instance's NICs.
104 0e1f3323 Christos Stavrakakis

105 6df16263 Christos Stavrakakis
    Get instance's NICs from Ganeti configuration data. If running on master,
106 6df16263 Christos Stavrakakis
    query Ganeti via Ganeti CLI client. Otherwise, get the nics from Ganeti
107 6df16263 Christos Stavrakakis
    configuration file.
108 6df16263 Christos Stavrakakis

109 0e1f3323 Christos Stavrakakis
    @type instance: string
110 0e1f3323 Christos Stavrakakis
    @param instance: the name of the instance
111 0e1f3323 Christos Stavrakakis
    @rtype: List of dicts
112 6df16263 Christos Stavrakakis
    @return: Dictionary containing the instance's NICs. Each dictionary
113 0e1f3323 Christos Stavrakakis
             contains the following keys: 'network', 'ip', 'mac', 'mode',
114 0e1f3323 Christos Stavrakakis
             'link' and 'firewall'
115 0e1f3323 Christos Stavrakakis

116 0e1f3323 Christos Stavrakakis
    """
117 6df16263 Christos Stavrakakis
    try:
118 6df16263 Christos Stavrakakis
        client = cli.GetClient()
119 220e3cc0 Christos Stavrakakis
        fields = ["nic.names", "nic.networks.names", "nic.ips", "nic.macs",
120 220e3cc0 Christos Stavrakakis
                  "nic.modes", "nic.links", "tags"]
121 6df16263 Christos Stavrakakis
        info = client.QueryInstances([instance], fields, use_locking=False)
122 220e3cc0 Christos Stavrakakis
        names, networks, ips, macs, modes, links, tags = info[0]
123 220e3cc0 Christos Stavrakakis
        nic_keys = ["name", "network", "ip", "mac", "mode", "link"]
124 220e3cc0 Christos Stavrakakis
        nics = zip(names, networks, ips, macs, modes, links)
125 6df16263 Christos Stavrakakis
        nics = map(lambda x: dict(zip(nic_keys, x)), nics)
126 6df16263 Christos Stavrakakis
    except ganeti_errors.OpPrereqError:
127 6df16263 Christos Stavrakakis
        # Not running on master! Load the conf file
128 ffc4b474 Christos Stavrakakis
        raw_data = utils.ReadFile(pathutils.CLUSTER_CONF_FILE)
129 6df16263 Christos Stavrakakis
        config = serializer.LoadJson(raw_data)
130 6df16263 Christos Stavrakakis
        i = config["instances"][instance]
131 6df16263 Christos Stavrakakis
        nics = []
132 6df16263 Christos Stavrakakis
        for nic in i["nics"]:
133 6df16263 Christos Stavrakakis
            params = nic.pop("nicparams")
134 6df16263 Christos Stavrakakis
            nic["mode"] = params["mode"]
135 6df16263 Christos Stavrakakis
            nic["link"] = params["link"]
136 6df16263 Christos Stavrakakis
            nics.append(nic)
137 6df16263 Christos Stavrakakis
        tags = i.get("tags", [])
138 0e1f3323 Christos Stavrakakis
    # Get firewall from instance Tags
139 0e1f3323 Christos Stavrakakis
    # Tags are of the form synnefo:network:N:firewall_mode
140 0e1f3323 Christos Stavrakakis
    for tag in tags:
141 0e1f3323 Christos Stavrakakis
        t = tag.split(":")
142 0e1f3323 Christos Stavrakakis
        if t[0:2] == ["synnefo", "network"]:
143 0e1f3323 Christos Stavrakakis
            if len(t) != 4:
144 0e1f3323 Christos Stavrakakis
                logger.error("Malformed synefo tag %s", tag)
145 0e1f3323 Christos Stavrakakis
                continue
146 d0545590 Christos Stavrakakis
            nic_name = t[2]
147 d0545590 Christos Stavrakakis
            firewall = t[3]
148 d0545590 Christos Stavrakakis
            [nic.setdefault("firewall", firewall)
149 d0545590 Christos Stavrakakis
             for nic in nics if nic["name"] == nic_name]
150 0e1f3323 Christos Stavrakakis
    return nics
151 0e1f3323 Christos Stavrakakis
152 0e1f3323 Christos Stavrakakis
153 c4e55622 Christos Stavrakakis
class InvalidBackendStatus(Exception):
154 c4e55622 Christos Stavrakakis
    def __init__(self, status, job):
155 c4e55622 Christos Stavrakakis
        self.status = status
156 c4e55622 Christos Stavrakakis
        self.job = job
157 c4e55622 Christos Stavrakakis
158 c4e55622 Christos Stavrakakis
    def __str__(self):
159 c4e55622 Christos Stavrakakis
        return repr("Invalid backend status: %s in job %s"
160 c4e55622 Christos Stavrakakis
                    % (self.status, self.job))
161 c4e55622 Christos Stavrakakis
162 27a8e4ae Christos Stavrakakis
163 39ccdb18 Christos Stavrakakis
def prefix_from_name(name):
164 39ccdb18 Christos Stavrakakis
    return name.split('-')[0]
165 39ccdb18 Christos Stavrakakis
166 39ccdb18 Christos Stavrakakis
167 39ccdb18 Christos Stavrakakis
def get_field(from_, field):
168 39ccdb18 Christos Stavrakakis
    try:
169 39ccdb18 Christos Stavrakakis
        return getattr(from_, field)
170 39ccdb18 Christos Stavrakakis
    except AttributeError:
171 39ccdb18 Christos Stavrakakis
        None
172 39ccdb18 Christos Stavrakakis
173 45ebfd48 Vangelis Koukis
174 b024b97a Georgios Gousios
class JobFileHandler(pyinotify.ProcessEvent):
175 27a8e4ae Christos Stavrakakis
    def __init__(self, logger, cluster_name):
176 348f53de Georgios Gousios
        pyinotify.ProcessEvent.__init__(self)
177 348f53de Georgios Gousios
        self.logger = logger
178 27a8e4ae Christos Stavrakakis
        self.cluster_name = cluster_name
179 27a8e4ae Christos Stavrakakis
180 2ef10562 Christos Stavrakakis
        # Set max_retries to 0 for unlimited retries.
181 2ef10562 Christos Stavrakakis
        self.client = AMQPClient(hosts=settings.AMQP_HOSTS, confirm_buffer=25,
182 a8858945 Christos Stavrakakis
                                 max_retries=0, logger=logger)
183 2ef10562 Christos Stavrakakis
184 c4e55622 Christos Stavrakakis
        handler_logger.info("Attempting to connect to RabbitMQ hosts")
185 27a8e4ae Christos Stavrakakis
186 c4e55622 Christos Stavrakakis
        self.client.connect()
187 c4e55622 Christos Stavrakakis
        handler_logger.info("Connected succesfully")
188 b024b97a Georgios Gousios
189 27a8e4ae Christos Stavrakakis
        self.client.exchange_declare(settings.EXCHANGE_GANETI, type='topic')
190 27a8e4ae Christos Stavrakakis
191 39ccdb18 Christos Stavrakakis
        self.op_handlers = {"INSTANCE": self.process_instance_op,
192 36d450e8 Christos Stavrakakis
                            "NETWORK": self.process_network_op,
193 36d450e8 Christos Stavrakakis
                            "CLUSTER": self.process_cluster_op}
194 39ccdb18 Christos Stavrakakis
                            # "GROUP": self.process_group_op}
195 39ccdb18 Christos Stavrakakis
196 b024b97a Georgios Gousios
    def process_IN_CLOSE_WRITE(self, event):
197 80dcd124 Vangelis Koukis
        self.process_IN_MOVED_TO(event)
198 80dcd124 Vangelis Koukis
199 80dcd124 Vangelis Koukis
    def process_IN_MOVED_TO(self, event):
200 b024b97a Georgios Gousios
        jobfile = os.path.join(event.path, event.name)
201 b024b97a Georgios Gousios
        if not event.name.startswith("job-"):
202 b024b97a Georgios Gousios
            self.logger.debug("Not a job file: %s" % event.path)
203 b024b97a Georgios Gousios
            return
204 b024b97a Georgios Gousios
205 b024b97a Georgios Gousios
        try:
206 b024b97a Georgios Gousios
            data = utils.ReadFile(jobfile)
207 b024b97a Georgios Gousios
        except IOError:
208 b024b97a Georgios Gousios
            return
209 b024b97a Georgios Gousios
210 b024b97a Georgios Gousios
        data = serializer.LoadJson(data)
211 a9bb2a1a Christos Stavrakakis
        job = jqueue._QueuedJob.Restore(None, data, False, False)
212 4ed30eed Christos Stavrakakis
213 39ccdb18 Christos Stavrakakis
        job_id = int(job.id)
214 b024b97a Georgios Gousios
215 b024b97a Georgios Gousios
        for op in job.ops:
216 39ccdb18 Christos Stavrakakis
            op_id = op.input.OP_ID
217 b024b97a Georgios Gousios
218 39ccdb18 Christos Stavrakakis
            msg = None
219 b024b97a Georgios Gousios
            try:
220 39ccdb18 Christos Stavrakakis
                handler_fn = self.op_handlers[op_id.split('_')[1]]
221 39ccdb18 Christos Stavrakakis
                msg, routekey = handler_fn(op, job_id)
222 39ccdb18 Christos Stavrakakis
            except KeyError:
223 b024b97a Georgios Gousios
                pass
224 b024b97a Georgios Gousios
225 39ccdb18 Christos Stavrakakis
            if not msg:
226 39ccdb18 Christos Stavrakakis
                self.logger.debug("Ignoring job: %s: %s", job_id, op_id)
227 07e4ab22 Christos Stavrakakis
                continue
228 07e4ab22 Christos Stavrakakis
229 39ccdb18 Christos Stavrakakis
            # Generate a unique message identifier
230 39ccdb18 Christos Stavrakakis
            event_time = get_time_from_status(op, job)
231 39ccdb18 Christos Stavrakakis
232 b024b97a Georgios Gousios
            # Get the last line of the op log as message
233 b024b97a Georgios Gousios
            try:
234 b024b97a Georgios Gousios
                logmsg = op.log[-1][-1]
235 b024b97a Georgios Gousios
            except IndexError:
236 b024b97a Georgios Gousios
                logmsg = None
237 348f53de Georgios Gousios
238 39ccdb18 Christos Stavrakakis
            # Add shared attributes for all operations
239 39ccdb18 Christos Stavrakakis
            msg.update({"event_time": event_time,
240 39ccdb18 Christos Stavrakakis
                        "operation": op_id,
241 39ccdb18 Christos Stavrakakis
                        "status": op.status,
242 27a8e4ae Christos Stavrakakis
                        "cluster": self.cluster_name,
243 39ccdb18 Christos Stavrakakis
                        "logmsg": logmsg,
244 aee560b0 Christos Stavrakakis
                        "result": op.result,
245 39ccdb18 Christos Stavrakakis
                        "jobId": job_id})
246 c4e55622 Christos Stavrakakis
247 12ee1ad8 Christos Stavrakakis
            if op.status == "success":
248 12ee1ad8 Christos Stavrakakis
                msg["result"] = op.result
249 12ee1ad8 Christos Stavrakakis
250 727fb2f9 Christos Stavrakakis
            if op_id == "OP_INSTANCE_CREATE" and op.status == "error":
251 727fb2f9 Christos Stavrakakis
                # In case an instance creation fails send the job input
252 727fb2f9 Christos Stavrakakis
                # so that the job can be retried if needed.
253 727fb2f9 Christos Stavrakakis
                msg["job_fields"] = op.Serialize()["input"]
254 727fb2f9 Christos Stavrakakis
255 c4e55622 Christos Stavrakakis
            msg = json.dumps(msg)
256 727fb2f9 Christos Stavrakakis
257 39ccdb18 Christos Stavrakakis
            self.logger.debug("Delivering msg: %s (key=%s)", msg, routekey)
258 c4e55622 Christos Stavrakakis
259 c4e55622 Christos Stavrakakis
            # Send the message to RabbitMQ
260 db400d82 Christos Stavrakakis
            self.client.basic_publish(settings.EXCHANGE_GANETI,
261 db400d82 Christos Stavrakakis
                                      routekey,
262 db400d82 Christos Stavrakakis
                                      msg)
263 39ccdb18 Christos Stavrakakis
264 39ccdb18 Christos Stavrakakis
    def process_instance_op(self, op, job_id):
265 39ccdb18 Christos Stavrakakis
        """ Process OP_INSTANCE_* opcodes.
266 39ccdb18 Christos Stavrakakis

267 39ccdb18 Christos Stavrakakis
        """
268 39ccdb18 Christos Stavrakakis
        input = op.input
269 39ccdb18 Christos Stavrakakis
        op_id = input.OP_ID
270 39ccdb18 Christos Stavrakakis
271 39ccdb18 Christos Stavrakakis
        instances = None
272 39ccdb18 Christos Stavrakakis
        instances = get_field(input, 'instance_name')
273 39ccdb18 Christos Stavrakakis
        if not instances:
274 39ccdb18 Christos Stavrakakis
            instances = get_field(input, 'instances')
275 39ccdb18 Christos Stavrakakis
            if not instances or len(instances) > 1:
276 39ccdb18 Christos Stavrakakis
                # Do not publish messages for jobs with no or multiple
277 449787d3 Christos Stavrakakis
                # instances.  Currently snf-dispatcher can not normally handle
278 449787d3 Christos Stavrakakis
                # these messages
279 39ccdb18 Christos Stavrakakis
                return None, None
280 39ccdb18 Christos Stavrakakis
            else:
281 39ccdb18 Christos Stavrakakis
                instances = instances[0]
282 39ccdb18 Christos Stavrakakis
283 39ccdb18 Christos Stavrakakis
        self.logger.debug("Job: %d: %s(%s) %s", job_id, op_id,
284 39ccdb18 Christos Stavrakakis
                          instances, op.status)
285 39ccdb18 Christos Stavrakakis
286 e6fbada1 Christos Stavrakakis
        job_fields = {}
287 e6fbada1 Christos Stavrakakis
        if op_id in ["OP_INSTANCE_SET_PARAMS", "OP_INSTANCE_CREATE"]:
288 e6fbada1 Christos Stavrakakis
            job_fields = {"nics": get_field(input, "nics"),
289 e6fbada1 Christos Stavrakakis
                          "disks": get_field(input, "disks"),
290 e6fbada1 Christos Stavrakakis
                          "beparams": get_field(input, "beparams")}
291 e6fbada1 Christos Stavrakakis
292 39ccdb18 Christos Stavrakakis
        msg = {"type": "ganeti-op-status",
293 39ccdb18 Christos Stavrakakis
               "instance": instances,
294 e6fbada1 Christos Stavrakakis
               "operation": op_id,
295 e6fbada1 Christos Stavrakakis
               "job_fields": job_fields}
296 39ccdb18 Christos Stavrakakis
297 e5a92bec Christos Stavrakakis
        if ((op_id in ["OP_INSTANCE_CREATE", "OP_INSTANCE_STARTUP"] and
298 e5a92bec Christos Stavrakakis
             op.status == "success") or
299 e5a92bec Christos Stavrakakis
            (op_id == "OP_INSTANCE_SET_PARAMS" and
300 e5a92bec Christos Stavrakakis
             op.status in ["success", "error", "cancelled"])):
301 e6fbada1 Christos Stavrakakis
                nics = get_instance_nics(msg["instance"], self.logger)
302 e6fbada1 Christos Stavrakakis
                msg["instance_nics"] = nics
303 12ee1ad8 Christos Stavrakakis
304 39ccdb18 Christos Stavrakakis
        routekey = "ganeti.%s.event.op" % prefix_from_name(instances)
305 39ccdb18 Christos Stavrakakis
306 39ccdb18 Christos Stavrakakis
        return msg, routekey
307 39ccdb18 Christos Stavrakakis
308 39ccdb18 Christos Stavrakakis
    def process_network_op(self, op, job_id):
309 39ccdb18 Christos Stavrakakis
        """ Process OP_NETWORK_* opcodes.
310 39ccdb18 Christos Stavrakakis

311 39ccdb18 Christos Stavrakakis
        """
312 39ccdb18 Christos Stavrakakis
313 39ccdb18 Christos Stavrakakis
        input = op.input
314 39ccdb18 Christos Stavrakakis
        op_id = input.OP_ID
315 39ccdb18 Christos Stavrakakis
        network_name = get_field(input, 'network_name')
316 39ccdb18 Christos Stavrakakis
317 39ccdb18 Christos Stavrakakis
        if not network_name:
318 39ccdb18 Christos Stavrakakis
            return None, None
319 39ccdb18 Christos Stavrakakis
320 39ccdb18 Christos Stavrakakis
        self.logger.debug("Job: %d: %s(%s) %s", job_id, op_id,
321 39ccdb18 Christos Stavrakakis
                          network_name, op.status)
322 39ccdb18 Christos Stavrakakis
323 e6fbada1 Christos Stavrakakis
        job_fields = {
324 e6fbada1 Christos Stavrakakis
            'subnet': get_field(input, 'network'),
325 e6fbada1 Christos Stavrakakis
            'gateway': get_field(input, 'gateway'),
326 e6fbada1 Christos Stavrakakis
            "add_reserved_ips": get_field(input, "add_reserved_ips"),
327 e6fbada1 Christos Stavrakakis
            "remove_reserved_ips": get_field(input, "remove_reserved_ips"),
328 e6fbada1 Christos Stavrakakis
            # 'network_mode': get_field(input, 'network_mode'),
329 e6fbada1 Christos Stavrakakis
            # 'network_link': get_field(input, 'network_link'),
330 e6fbada1 Christos Stavrakakis
            'group_name': get_field(input, 'group_name')}
331 e6fbada1 Christos Stavrakakis
332 39ccdb18 Christos Stavrakakis
        msg = {'operation':    op_id,
333 39ccdb18 Christos Stavrakakis
               'type':         "ganeti-network-status",
334 39ccdb18 Christos Stavrakakis
               'network':      network_name,
335 e6fbada1 Christos Stavrakakis
               'job_fields':   job_fields}
336 e6fbada1 Christos Stavrakakis
337 39ccdb18 Christos Stavrakakis
        routekey = "ganeti.%s.event.network" % prefix_from_name(network_name)
338 39ccdb18 Christos Stavrakakis
339 39ccdb18 Christos Stavrakakis
        return msg, routekey
340 39ccdb18 Christos Stavrakakis
341 36d450e8 Christos Stavrakakis
    def process_cluster_op(self, op, job_id):
342 36d450e8 Christos Stavrakakis
        """ Process OP_CLUSTER_* opcodes.
343 39ccdb18 Christos Stavrakakis

344 36d450e8 Christos Stavrakakis
        """
345 36d450e8 Christos Stavrakakis
346 36d450e8 Christos Stavrakakis
        input = op.input
347 36d450e8 Christos Stavrakakis
        op_id = input.OP_ID
348 36d450e8 Christos Stavrakakis
349 36d450e8 Christos Stavrakakis
        self.logger.debug("Job: %d: %s %s", job_id, op_id, op.status)
350 39ccdb18 Christos Stavrakakis
351 4b7fbdc6 Christos Stavrakakis
        if op_id != "OP_CLUSTER_SET_PARAMS":
352 4b7fbdc6 Christos Stavrakakis
            # Send only modifications of cluster
353 4b7fbdc6 Christos Stavrakakis
            return None, None
354 4b7fbdc6 Christos Stavrakakis
355 36d450e8 Christos Stavrakakis
        msg = {'operation':    op_id,
356 36d450e8 Christos Stavrakakis
               'type':         "ganeti-cluster-status"}
357 36d450e8 Christos Stavrakakis
358 36d450e8 Christos Stavrakakis
        routekey = "ganeti.event.cluster"
359 36d450e8 Christos Stavrakakis
360 36d450e8 Christos Stavrakakis
        return msg, routekey
361 39ccdb18 Christos Stavrakakis
362 39ccdb18 Christos Stavrakakis
363 27a8e4ae Christos Stavrakakis
def find_cluster_name():
364 51fc0054 Christos Stavrakakis
    global handler_logger
365 51fc0054 Christos Stavrakakis
    try:
366 a9bb2a1a Christos Stavrakakis
        ss = SimpleStore()
367 a9bb2a1a Christos Stavrakakis
        name = ss.GetClusterName()
368 51fc0054 Christos Stavrakakis
    except Exception as e:
369 51fc0054 Christos Stavrakakis
        handler_logger.error('Can not get the name of the Cluster: %s' % e)
370 51fc0054 Christos Stavrakakis
        raise e
371 51fc0054 Christos Stavrakakis
372 27a8e4ae Christos Stavrakakis
    return name
373 39ccdb18 Christos Stavrakakis
374 449787d3 Christos Stavrakakis
375 b024b97a Georgios Gousios
handler_logger = None
376 449787d3 Christos Stavrakakis
377 449787d3 Christos Stavrakakis
378 b024b97a Georgios Gousios
def fatal_signal_handler(signum, frame):
379 b024b97a Georgios Gousios
    global handler_logger
380 b024b97a Georgios Gousios
381 b024b97a Georgios Gousios
    handler_logger.info("Caught fatal signal %d, will raise SystemExit",
382 2cd99e7a Georgios Gousios
                        signum)
383 b024b97a Georgios Gousios
    raise SystemExit
384 b024b97a Georgios Gousios
385 449787d3 Christos Stavrakakis
386 b024b97a Georgios Gousios
def parse_arguments(args):
387 b024b97a Georgios Gousios
    from optparse import OptionParser
388 b024b97a Georgios Gousios
389 b024b97a Georgios Gousios
    parser = OptionParser()
390 b024b97a Georgios Gousios
    parser.add_option("-d", "--debug", action="store_true", dest="debug",
391 2cd99e7a Georgios Gousios
                      help="Enable debugging information")
392 b024b97a Georgios Gousios
    parser.add_option("-l", "--log", dest="log_file",
393 45ebfd48 Vangelis Koukis
                      default="/var/log/snf-ganeti-eventd.log",
394 2cd99e7a Georgios Gousios
                      metavar="FILE",
395 2cd99e7a Georgios Gousios
                      help="Write log to FILE instead of %s" %
396 449787d3 Christos Stavrakakis
                           "/var/log/snf-ganeti-eventd.log")
397 b024b97a Georgios Gousios
    parser.add_option('--pid-file', dest="pid_file",
398 45ebfd48 Vangelis Koukis
                      default="/var/run/snf-ganeti-eventd.pid",
399 2cd99e7a Georgios Gousios
                      metavar='PIDFILE',
400 2cd99e7a Georgios Gousios
                      help="Save PID to file (default: %s)" %
401 449787d3 Christos Stavrakakis
                           "/var/run/snf-ganeti-eventd.pid")
402 b024b97a Georgios Gousios
403 b024b97a Georgios Gousios
    return parser.parse_args(args)
404 b024b97a Georgios Gousios
405 449787d3 Christos Stavrakakis
406 b024b97a Georgios Gousios
def main():
407 b024b97a Georgios Gousios
    global handler_logger
408 b024b97a Georgios Gousios
409 b024b97a Georgios Gousios
    (opts, args) = parse_arguments(sys.argv[1:])
410 b024b97a Georgios Gousios
411 b024b97a Georgios Gousios
    # Initialize logger
412 b024b97a Georgios Gousios
    lvl = logging.DEBUG if opts.debug else logging.INFO
413 03a4b970 Georgios Gousios
    logger = logging.getLogger("ganeti.eventd")
414 b024b97a Georgios Gousios
    logger.setLevel(lvl)
415 36cf1973 Vangelis Koukis
    formatter = logging.Formatter(
416 36cf1973 Vangelis Koukis
        "%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
417 36cf1973 Vangelis Koukis
        "%Y-%m-%d %H:%M:%S")
418 b024b97a Georgios Gousios
    handler = logging.FileHandler(opts.log_file)
419 b024b97a Georgios Gousios
    handler.setFormatter(formatter)
420 b024b97a Georgios Gousios
    logger.addHandler(handler)
421 b024b97a Georgios Gousios
    handler_logger = logger
422 b024b97a Georgios Gousios
423 1dc821c9 Christos Stavrakakis
    # Rename this process so 'ps' output looks like this is a native
424 1dc821c9 Christos Stavrakakis
    # executable.  Can not seperate command-line arguments from actual name of
425 1dc821c9 Christos Stavrakakis
    # the executable by NUL bytes, so only show the name of the executable
426 1dc821c9 Christos Stavrakakis
    # instead.  setproctitle.setproctitle("\x00".join(sys.argv))
427 1dc821c9 Christos Stavrakakis
    setproctitle.setproctitle(sys.argv[0])
428 1dc821c9 Christos Stavrakakis
429 cf2a3529 Christos Stavrakakis
    # Create pidfile
430 cf2a3529 Christos Stavrakakis
    pidf = daemon.pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
431 cf2a3529 Christos Stavrakakis
432 cf2a3529 Christos Stavrakakis
    # Remove any stale PID files, left behind by previous invocations
433 cf2a3529 Christos Stavrakakis
    if daemon.runner.is_pidfile_stale(pidf):
434 cf2a3529 Christos Stavrakakis
        logger.warning("Removing stale PID lock file %s", pidf.path)
435 cf2a3529 Christos Stavrakakis
        pidf.break_lock()
436 cf2a3529 Christos Stavrakakis
437 348f53de Georgios Gousios
    # Become a daemon:
438 348f53de Georgios Gousios
    # Redirect stdout and stderr to handler.stream to catch
439 348f53de Georgios Gousios
    # early errors in the daemonization process [e.g., pidfile creation]
440 348f53de Georgios Gousios
    # which will otherwise go to /dev/null.
441 348f53de Georgios Gousios
    daemon_context = daemon.DaemonContext(
442 449787d3 Christos Stavrakakis
        pidfile=pidf,
443 449787d3 Christos Stavrakakis
        umask=022,
444 449787d3 Christos Stavrakakis
        stdout=handler.stream,
445 449787d3 Christos Stavrakakis
        stderr=handler.stream,
446 449787d3 Christos Stavrakakis
        files_preserve=[handler.stream])
447 cf2a3529 Christos Stavrakakis
    try:
448 cf2a3529 Christos Stavrakakis
        daemon_context.open()
449 cf2a3529 Christos Stavrakakis
    except (daemon.pidlockfile.AlreadyLocked, LockTimeout):
450 cf2a3529 Christos Stavrakakis
        logger.critical("Failed to lock pidfile %s, another instance running?",
451 cf2a3529 Christos Stavrakakis
                        pidf.path)
452 cf2a3529 Christos Stavrakakis
        sys.exit(1)
453 cf2a3529 Christos Stavrakakis
454 348f53de Georgios Gousios
    logger.info("Became a daemon")
455 348f53de Georgios Gousios
456 348f53de Georgios Gousios
    # Catch signals to ensure graceful shutdown
457 348f53de Georgios Gousios
    signal(SIGINT, fatal_signal_handler)
458 348f53de Georgios Gousios
    signal(SIGTERM, fatal_signal_handler)
459 348f53de Georgios Gousios
460 b024b97a Georgios Gousios
    # Monitor the Ganeti job queue, create and push notifications
461 b024b97a Georgios Gousios
    wm = pyinotify.WatchManager()
462 449787d3 Christos Stavrakakis
    mask = (pyinotify.EventsCodes.ALL_FLAGS["IN_MOVED_TO"] |
463 449787d3 Christos Stavrakakis
            pyinotify.EventsCodes.ALL_FLAGS["IN_CLOSE_WRITE"])
464 51fc0054 Christos Stavrakakis
465 27a8e4ae Christos Stavrakakis
    cluster_name = find_cluster_name()
466 51fc0054 Christos Stavrakakis
467 27a8e4ae Christos Stavrakakis
    handler = JobFileHandler(logger, cluster_name)
468 b024b97a Georgios Gousios
    notifier = pyinotify.Notifier(wm, handler)
469 b024b97a Georgios Gousios
470 b024b97a Georgios Gousios
    try:
471 b024b97a Georgios Gousios
        # Fail if adding the inotify() watch fails for any reason
472 a9bb2a1a Christos Stavrakakis
        res = wm.add_watch(pathutils.QUEUE_DIR, mask)
473 a9bb2a1a Christos Stavrakakis
        if res[pathutils.QUEUE_DIR] < 0:
474 36cf1973 Vangelis Koukis
            raise Exception("pyinotify add_watch returned negative descriptor")
475 348f53de Georgios Gousios
476 a9bb2a1a Christos Stavrakakis
        logger.info("Now watching %s of %s" % (pathutils.QUEUE_DIR,
477 a9bb2a1a Christos Stavrakakis
                    cluster_name))
478 b024b97a Georgios Gousios
479 b024b97a Georgios Gousios
        while True:    # loop forever
480 348f53de Georgios Gousios
            # process the queue of events as explained above
481 b024b97a Georgios Gousios
            notifier.process_events()
482 b024b97a Georgios Gousios
            if notifier.check_events():
483 b024b97a Georgios Gousios
                # read notified events and enqeue them
484 b024b97a Georgios Gousios
                notifier.read_events()
485 b024b97a Georgios Gousios
    except SystemExit:
486 b024b97a Georgios Gousios
        logger.info("SystemExit")
487 b024b97a Georgios Gousios
    except:
488 b024b97a Georgios Gousios
        logger.exception("Caught exception, terminating")
489 b024b97a Georgios Gousios
    finally:
490 b024b97a Georgios Gousios
        # destroy the inotify's instance on this interrupt (stop monitoring)
491 b024b97a Georgios Gousios
        notifier.stop()
492 b024b97a Georgios Gousios
        raise
493 b024b97a Georgios Gousios
494 b024b97a Georgios Gousios
if __name__ == "__main__":
495 b024b97a Georgios Gousios
    sys.exit(main())
496 b024b97a Georgios Gousios
497 348f53de Georgios Gousios
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :