Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-gtools / synnefo / ganeti / eventd.py @ 11d4d283

History | View | Annotate | Download (18.1 kB)

1 b024b97a Georgios Gousios
#!/usr/bin/env python
2 737b0e28 Vangelis Koukis
# -*- coding: utf-8 -*-
3 b024b97a Georgios Gousios
#
4 737b0e28 Vangelis Koukis
# Copyright 2011 GRNET S.A. All rights reserved.
5 737b0e28 Vangelis Koukis
#
6 737b0e28 Vangelis Koukis
# Redistribution and use in source and binary forms, with or
7 737b0e28 Vangelis Koukis
# without modification, are permitted provided that the following
8 737b0e28 Vangelis Koukis
# conditions are met:
9 737b0e28 Vangelis Koukis
#
10 737b0e28 Vangelis Koukis
#   1. Redistributions of source code must retain the above
11 737b0e28 Vangelis Koukis
#      copyright notice, this list of conditions and the following
12 737b0e28 Vangelis Koukis
#      disclaimer.
13 737b0e28 Vangelis Koukis
#
14 737b0e28 Vangelis Koukis
#   2. Redistributions in binary form must reproduce the above
15 737b0e28 Vangelis Koukis
#      copyright notice, this list of conditions and the following
16 737b0e28 Vangelis Koukis
#      disclaimer in the documentation and/or other materials
17 737b0e28 Vangelis Koukis
#      provided with the distribution.
18 737b0e28 Vangelis Koukis
#
19 737b0e28 Vangelis Koukis
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20 737b0e28 Vangelis Koukis
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 737b0e28 Vangelis Koukis
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22 737b0e28 Vangelis Koukis
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23 737b0e28 Vangelis Koukis
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24 737b0e28 Vangelis Koukis
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25 737b0e28 Vangelis Koukis
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26 737b0e28 Vangelis Koukis
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27 737b0e28 Vangelis Koukis
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28 737b0e28 Vangelis Koukis
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29 737b0e28 Vangelis Koukis
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30 737b0e28 Vangelis Koukis
# POSSIBILITY OF SUCH DAMAGE.
31 737b0e28 Vangelis Koukis
#
32 737b0e28 Vangelis Koukis
# The views and conclusions contained in the software and
33 737b0e28 Vangelis Koukis
# documentation are those of the authors and should not be
34 737b0e28 Vangelis Koukis
# interpreted as representing official policies, either expressed
35 737b0e28 Vangelis Koukis
# or implied, of GRNET S.A.
36 737b0e28 Vangelis Koukis
#
37 45ebfd48 Vangelis Koukis
38 737b0e28 Vangelis Koukis
"""Ganeti notification daemon with AMQP support
39 b024b97a Georgios Gousios

40 b024b97a Georgios Gousios
A daemon to monitor the Ganeti job queue and publish job progress
41 80bd5072 Georgios Gousios
and Ganeti VM state notifications to the ganeti exchange
42 b024b97a Georgios Gousios
"""
43 b024b97a Georgios Gousios
44 b024b97a Georgios Gousios
import sys
45 b024b97a Georgios Gousios
import os
46 b024b97a Georgios Gousios
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
47 b024b97a Georgios Gousios
sys.path.append(path)
48 3bac1e87 Christos Stavrakakis
# Since Ganeti 2.7, debian package ships the majority of the python code in
49 3bac1e87 Christos Stavrakakis
# a private module under '/usr/share/ganeti'. Add this directory to path
50 3bac1e87 Christos Stavrakakis
# in order to be able to import ganeti. Also, add it to the start of path
51 3bac1e87 Christos Stavrakakis
# to allow conflicts with Ganeti RAPI client.
52 3bac1e87 Christos Stavrakakis
sys.path.insert(0, "/usr/share/ganeti")
53 b024b97a Georgios Gousios
54 b024b97a Georgios Gousios
import json
55 b024b97a Georgios Gousios
import logging
56 b024b97a Georgios Gousios
import pyinotify
57 b024b97a Georgios Gousios
import daemon
58 b024b97a Georgios Gousios
import daemon.pidlockfile
59 cf2a3529 Christos Stavrakakis
import daemon.runner
60 cf2a3529 Christos Stavrakakis
from lockfile import LockTimeout
61 b024b97a Georgios Gousios
from signal import signal, SIGINT, SIGTERM
62 1dc821c9 Christos Stavrakakis
import setproctitle
63 b024b97a Georgios Gousios
64 a9bb2a1a Christos Stavrakakis
from ganeti import utils, jqueue, constants, serializer, pathutils, cli
65 6df16263 Christos Stavrakakis
from ganeti import errors as ganeti_errors
66 a9bb2a1a Christos Stavrakakis
from ganeti.ssconf import SimpleStore
67 e66a7e34 Christos Stavrakakis
68 b024b97a Georgios Gousios
69 6d6b8f88 Kostas Papadimitriou
from synnefo import settings
70 c4e55622 Christos Stavrakakis
from synnefo.lib.amqp import AMQPClient
71 c4e55622 Christos Stavrakakis
72 27a8e4ae Christos Stavrakakis
73 c4e55622 Christos Stavrakakis
def get_time_from_status(op, job):
74 c4e55622 Christos Stavrakakis
    """Generate a unique message identifier for a ganeti job.
75 c4e55622 Christos Stavrakakis

76 c4e55622 Christos Stavrakakis
    The identifier is based on the timestamp of the job. Since a ganeti
77 c4e55622 Christos Stavrakakis
    job passes from multiple states, we need to pick the timestamp that
78 c4e55622 Christos Stavrakakis
    corresponds to each state.
79 c4e55622 Christos Stavrakakis

80 c4e55622 Christos Stavrakakis
    """
81 c4e55622 Christos Stavrakakis
    status = op.status
82 c4e55622 Christos Stavrakakis
    if status == constants.JOB_STATUS_QUEUED:
83 765ff3ff Christos Stavrakakis
        time = job.received_timestamp
84 27a8e4ae Christos Stavrakakis
    try:  # Compatibility with Ganeti version
85 39ccdb18 Christos Stavrakakis
        if status == constants.JOB_STATUS_WAITLOCK:
86 765ff3ff Christos Stavrakakis
            time = op.start_timestamp
87 39ccdb18 Christos Stavrakakis
    except AttributeError:
88 39ccdb18 Christos Stavrakakis
        if status == constants.JOB_STATUS_WAITING:
89 765ff3ff Christos Stavrakakis
            time = op.start_timestamp
90 c4e55622 Christos Stavrakakis
    if status == constants.JOB_STATUS_CANCELING:
91 765ff3ff Christos Stavrakakis
        time = op.start_timestamp
92 c4e55622 Christos Stavrakakis
    if status == constants.JOB_STATUS_RUNNING:
93 765ff3ff Christos Stavrakakis
        time = op.exec_timestamp
94 c4e55622 Christos Stavrakakis
    if status in constants.JOBS_FINALIZED:
95 765ff3ff Christos Stavrakakis
        time = op.end_timestamp
96 765ff3ff Christos Stavrakakis
97 765ff3ff Christos Stavrakakis
    return time and time or job.end_timestamp
98 c4e55622 Christos Stavrakakis
99 27a8e4ae Christos Stavrakakis
    raise InvalidBackendStatus(status, job)
100 c4e55622 Christos Stavrakakis
101 c4e55622 Christos Stavrakakis
102 d9b25288 Christos Stavrakakis
def get_instance_attachments(instance, logger):
103 d9b25288 Christos Stavrakakis
    """Query Ganeti to a get the instance's attachments (NICs and Disks)
104 0e1f3323 Christos Stavrakakis

105 d9b25288 Christos Stavrakakis
    Get instance's attachments from Ganeti configuration data. If running on
106 d9b25288 Christos Stavrakakis
    master, query Ganeti via Ganeti CLI client. Otherwise, get attachments
107 d9b25288 Christos Stavrakakis
    straight from Ganeti's configuration file.
108 6df16263 Christos Stavrakakis

109 0e1f3323 Christos Stavrakakis
    @type instance: string
110 0e1f3323 Christos Stavrakakis
    @param instance: the name of the instance
111 d9b25288 Christos Stavrakakis
    @rtype: instance's NICs and Disks
112 d9b25288 Christos Stavrakakis
    @return: Dictionary containing the 'nics' and 'disks' of the instance.
113 0e1f3323 Christos Stavrakakis

114 0e1f3323 Christos Stavrakakis
    """
115 6df16263 Christos Stavrakakis
    try:
116 6df16263 Christos Stavrakakis
        client = cli.GetClient()
117 d9b25288 Christos Stavrakakis
        q_fields = ["nic.names", "nic.networks.names", "nic.ips", "nic.macs",
118 d9b25288 Christos Stavrakakis
                    "nic.modes", "nic.links", "nic.uuids", "tags",
119 d9b25288 Christos Stavrakakis
                    "disk.names", "disk.sizes", "disk.uuids"]
120 d9b25288 Christos Stavrakakis
        info = client.QueryInstances([instance], q_fields, use_locking=False)
121 d9b25288 Christos Stavrakakis
        # Parse NICs
122 d9b25288 Christos Stavrakakis
        names, networks, ips, macs, modes, links, uuids, tags = info[0][:-3]
123 d9b25288 Christos Stavrakakis
        nic_keys = ["name", "network", "ip", "mac", "mode", "link", "uuid"]
124 d9b25288 Christos Stavrakakis
        nics = zip(names, networks, ips, macs, modes, links, uuids)
125 6df16263 Christos Stavrakakis
        nics = map(lambda x: dict(zip(nic_keys, x)), nics)
126 d9b25288 Christos Stavrakakis
        # Parse Disks
127 d9b25288 Christos Stavrakakis
        names, sizes, uuids = info[0][-3:]
128 d9b25288 Christos Stavrakakis
        disk_keys = ["name", "size", "uuid"]
129 d9b25288 Christos Stavrakakis
        disks = zip(names, sizes, uuids)
130 d9b25288 Christos Stavrakakis
        disks = map(lambda x: dict(zip(disk_keys, x)), disks)
131 6df16263 Christos Stavrakakis
    except ganeti_errors.OpPrereqError:
132 6df16263 Christos Stavrakakis
        # Not running on master! Load the conf file
133 ffc4b474 Christos Stavrakakis
        raw_data = utils.ReadFile(pathutils.CLUSTER_CONF_FILE)
134 6df16263 Christos Stavrakakis
        config = serializer.LoadJson(raw_data)
135 6df16263 Christos Stavrakakis
        i = config["instances"][instance]
136 d9b25288 Christos Stavrakakis
        # Parse NICs
137 6df16263 Christos Stavrakakis
        nics = []
138 d9b25288 Christos Stavrakakis
        for index, nic in enumerate(i["nics"]):
139 6df16263 Christos Stavrakakis
            params = nic.pop("nicparams")
140 6df16263 Christos Stavrakakis
            nic["mode"] = params["mode"]
141 6df16263 Christos Stavrakakis
            nic["link"] = params["link"]
142 d9b25288 Christos Stavrakakis
            nic["index"] = index
143 6df16263 Christos Stavrakakis
            nics.append(nic)
144 d9b25288 Christos Stavrakakis
        # Parse Disks
145 d9b25288 Christos Stavrakakis
        disks = []
146 d9b25288 Christos Stavrakakis
        for index, disk in enumerate(i["disks"]):
147 d9b25288 Christos Stavrakakis
            disks.append({"name": disk.pop("name"),
148 d9b25288 Christos Stavrakakis
                          "size": disk["size"],
149 d9b25288 Christos Stavrakakis
                          "uuid": disk["uuid"],
150 d9b25288 Christos Stavrakakis
                          "index": index})
151 6df16263 Christos Stavrakakis
        tags = i.get("tags", [])
152 0e1f3323 Christos Stavrakakis
    # Get firewall from instance Tags
153 0e1f3323 Christos Stavrakakis
    # Tags are of the form synnefo:network:N:firewall_mode
154 0e1f3323 Christos Stavrakakis
    for tag in tags:
155 0e1f3323 Christos Stavrakakis
        t = tag.split(":")
156 0e1f3323 Christos Stavrakakis
        if t[0:2] == ["synnefo", "network"]:
157 0e1f3323 Christos Stavrakakis
            if len(t) != 4:
158 0e1f3323 Christos Stavrakakis
                logger.error("Malformed synefo tag %s", tag)
159 0e1f3323 Christos Stavrakakis
                continue
160 d0545590 Christos Stavrakakis
            nic_name = t[2]
161 d0545590 Christos Stavrakakis
            firewall = t[3]
162 d0545590 Christos Stavrakakis
            [nic.setdefault("firewall", firewall)
163 d0545590 Christos Stavrakakis
             for nic in nics if nic["name"] == nic_name]
164 d9b25288 Christos Stavrakakis
    attachments = {"nics": nics,
165 d9b25288 Christos Stavrakakis
                   "disks": disks}
166 d9b25288 Christos Stavrakakis
    return attachments
167 0e1f3323 Christos Stavrakakis
168 0e1f3323 Christos Stavrakakis
169 c4e55622 Christos Stavrakakis
class InvalidBackendStatus(Exception):
170 c4e55622 Christos Stavrakakis
    def __init__(self, status, job):
171 c4e55622 Christos Stavrakakis
        self.status = status
172 c4e55622 Christos Stavrakakis
        self.job = job
173 c4e55622 Christos Stavrakakis
174 c4e55622 Christos Stavrakakis
    def __str__(self):
175 c4e55622 Christos Stavrakakis
        return repr("Invalid backend status: %s in job %s"
176 c4e55622 Christos Stavrakakis
                    % (self.status, self.job))
177 c4e55622 Christos Stavrakakis
178 27a8e4ae Christos Stavrakakis
179 39ccdb18 Christos Stavrakakis
def prefix_from_name(name):
180 39ccdb18 Christos Stavrakakis
    return name.split('-')[0]
181 39ccdb18 Christos Stavrakakis
182 39ccdb18 Christos Stavrakakis
183 39ccdb18 Christos Stavrakakis
def get_field(from_, field):
184 39ccdb18 Christos Stavrakakis
    try:
185 39ccdb18 Christos Stavrakakis
        return getattr(from_, field)
186 39ccdb18 Christos Stavrakakis
    except AttributeError:
187 39ccdb18 Christos Stavrakakis
        None
188 39ccdb18 Christos Stavrakakis
189 45ebfd48 Vangelis Koukis
190 b024b97a Georgios Gousios
class JobFileHandler(pyinotify.ProcessEvent):
191 27a8e4ae Christos Stavrakakis
    def __init__(self, logger, cluster_name):
192 348f53de Georgios Gousios
        pyinotify.ProcessEvent.__init__(self)
193 348f53de Georgios Gousios
        self.logger = logger
194 27a8e4ae Christos Stavrakakis
        self.cluster_name = cluster_name
195 27a8e4ae Christos Stavrakakis
196 2ef10562 Christos Stavrakakis
        # Set max_retries to 0 for unlimited retries.
197 2ef10562 Christos Stavrakakis
        self.client = AMQPClient(hosts=settings.AMQP_HOSTS, confirm_buffer=25,
198 a8858945 Christos Stavrakakis
                                 max_retries=0, logger=logger)
199 2ef10562 Christos Stavrakakis
200 c4e55622 Christos Stavrakakis
        handler_logger.info("Attempting to connect to RabbitMQ hosts")
201 27a8e4ae Christos Stavrakakis
202 c4e55622 Christos Stavrakakis
        self.client.connect()
203 c4e55622 Christos Stavrakakis
        handler_logger.info("Connected succesfully")
204 b024b97a Georgios Gousios
205 27a8e4ae Christos Stavrakakis
        self.client.exchange_declare(settings.EXCHANGE_GANETI, type='topic')
206 27a8e4ae Christos Stavrakakis
207 39ccdb18 Christos Stavrakakis
        self.op_handlers = {"INSTANCE": self.process_instance_op,
208 36d450e8 Christos Stavrakakis
                            "NETWORK": self.process_network_op,
209 36d450e8 Christos Stavrakakis
                            "CLUSTER": self.process_cluster_op}
210 39ccdb18 Christos Stavrakakis
                            # "GROUP": self.process_group_op}
211 39ccdb18 Christos Stavrakakis
212 b024b97a Georgios Gousios
    def process_IN_CLOSE_WRITE(self, event):
213 80dcd124 Vangelis Koukis
        self.process_IN_MOVED_TO(event)
214 80dcd124 Vangelis Koukis
215 80dcd124 Vangelis Koukis
    def process_IN_MOVED_TO(self, event):
216 b024b97a Georgios Gousios
        jobfile = os.path.join(event.path, event.name)
217 b024b97a Georgios Gousios
        if not event.name.startswith("job-"):
218 b024b97a Georgios Gousios
            self.logger.debug("Not a job file: %s" % event.path)
219 b024b97a Georgios Gousios
            return
220 b024b97a Georgios Gousios
221 b024b97a Georgios Gousios
        try:
222 b024b97a Georgios Gousios
            data = utils.ReadFile(jobfile)
223 b024b97a Georgios Gousios
        except IOError:
224 b024b97a Georgios Gousios
            return
225 b024b97a Georgios Gousios
226 b024b97a Georgios Gousios
        data = serializer.LoadJson(data)
227 a9bb2a1a Christos Stavrakakis
        job = jqueue._QueuedJob.Restore(None, data, False, False)
228 4ed30eed Christos Stavrakakis
229 39ccdb18 Christos Stavrakakis
        job_id = int(job.id)
230 b024b97a Georgios Gousios
231 b024b97a Georgios Gousios
        for op in job.ops:
232 39ccdb18 Christos Stavrakakis
            op_id = op.input.OP_ID
233 b024b97a Georgios Gousios
234 39ccdb18 Christos Stavrakakis
            msg = None
235 b024b97a Georgios Gousios
            try:
236 39ccdb18 Christos Stavrakakis
                handler_fn = self.op_handlers[op_id.split('_')[1]]
237 39ccdb18 Christos Stavrakakis
                msg, routekey = handler_fn(op, job_id)
238 39ccdb18 Christos Stavrakakis
            except KeyError:
239 b024b97a Georgios Gousios
                pass
240 b024b97a Georgios Gousios
241 39ccdb18 Christos Stavrakakis
            if not msg:
242 39ccdb18 Christos Stavrakakis
                self.logger.debug("Ignoring job: %s: %s", job_id, op_id)
243 07e4ab22 Christos Stavrakakis
                continue
244 07e4ab22 Christos Stavrakakis
245 39ccdb18 Christos Stavrakakis
            # Generate a unique message identifier
246 39ccdb18 Christos Stavrakakis
            event_time = get_time_from_status(op, job)
247 39ccdb18 Christos Stavrakakis
248 b024b97a Georgios Gousios
            # Get the last line of the op log as message
249 b024b97a Georgios Gousios
            try:
250 b024b97a Georgios Gousios
                logmsg = op.log[-1][-1]
251 b024b97a Georgios Gousios
            except IndexError:
252 b024b97a Georgios Gousios
                logmsg = None
253 348f53de Georgios Gousios
254 39ccdb18 Christos Stavrakakis
            # Add shared attributes for all operations
255 39ccdb18 Christos Stavrakakis
            msg.update({"event_time": event_time,
256 39ccdb18 Christos Stavrakakis
                        "operation": op_id,
257 39ccdb18 Christos Stavrakakis
                        "status": op.status,
258 27a8e4ae Christos Stavrakakis
                        "cluster": self.cluster_name,
259 39ccdb18 Christos Stavrakakis
                        "logmsg": logmsg,
260 aee560b0 Christos Stavrakakis
                        "result": op.result,
261 39ccdb18 Christos Stavrakakis
                        "jobId": job_id})
262 c4e55622 Christos Stavrakakis
263 12ee1ad8 Christos Stavrakakis
            if op.status == "success":
264 12ee1ad8 Christos Stavrakakis
                msg["result"] = op.result
265 12ee1ad8 Christos Stavrakakis
266 727fb2f9 Christos Stavrakakis
            if op_id == "OP_INSTANCE_CREATE" and op.status == "error":
267 727fb2f9 Christos Stavrakakis
                # In case an instance creation fails send the job input
268 727fb2f9 Christos Stavrakakis
                # so that the job can be retried if needed.
269 727fb2f9 Christos Stavrakakis
                msg["job_fields"] = op.Serialize()["input"]
270 727fb2f9 Christos Stavrakakis
271 c4e55622 Christos Stavrakakis
            msg = json.dumps(msg)
272 727fb2f9 Christos Stavrakakis
273 39ccdb18 Christos Stavrakakis
            self.logger.debug("Delivering msg: %s (key=%s)", msg, routekey)
274 c4e55622 Christos Stavrakakis
275 c4e55622 Christos Stavrakakis
            # Send the message to RabbitMQ
276 db400d82 Christos Stavrakakis
            self.client.basic_publish(settings.EXCHANGE_GANETI,
277 db400d82 Christos Stavrakakis
                                      routekey,
278 db400d82 Christos Stavrakakis
                                      msg)
279 39ccdb18 Christos Stavrakakis
280 39ccdb18 Christos Stavrakakis
    def process_instance_op(self, op, job_id):
281 39ccdb18 Christos Stavrakakis
        """ Process OP_INSTANCE_* opcodes.
282 39ccdb18 Christos Stavrakakis

283 39ccdb18 Christos Stavrakakis
        """
284 39ccdb18 Christos Stavrakakis
        input = op.input
285 39ccdb18 Christos Stavrakakis
        op_id = input.OP_ID
286 39ccdb18 Christos Stavrakakis
287 39ccdb18 Christos Stavrakakis
        instances = None
288 39ccdb18 Christos Stavrakakis
        instances = get_field(input, 'instance_name')
289 39ccdb18 Christos Stavrakakis
        if not instances:
290 39ccdb18 Christos Stavrakakis
            instances = get_field(input, 'instances')
291 39ccdb18 Christos Stavrakakis
            if not instances or len(instances) > 1:
292 39ccdb18 Christos Stavrakakis
                # Do not publish messages for jobs with no or multiple
293 449787d3 Christos Stavrakakis
                # instances.  Currently snf-dispatcher can not normally handle
294 449787d3 Christos Stavrakakis
                # these messages
295 39ccdb18 Christos Stavrakakis
                return None, None
296 39ccdb18 Christos Stavrakakis
            else:
297 39ccdb18 Christos Stavrakakis
                instances = instances[0]
298 39ccdb18 Christos Stavrakakis
299 39ccdb18 Christos Stavrakakis
        self.logger.debug("Job: %d: %s(%s) %s", job_id, op_id,
300 39ccdb18 Christos Stavrakakis
                          instances, op.status)
301 39ccdb18 Christos Stavrakakis
302 e6fbada1 Christos Stavrakakis
        job_fields = {}
303 e6fbada1 Christos Stavrakakis
        if op_id in ["OP_INSTANCE_SET_PARAMS", "OP_INSTANCE_CREATE"]:
304 e6fbada1 Christos Stavrakakis
            job_fields = {"nics": get_field(input, "nics"),
305 e6fbada1 Christos Stavrakakis
                          "disks": get_field(input, "disks"),
306 e6fbada1 Christos Stavrakakis
                          "beparams": get_field(input, "beparams")}
307 11d4d283 Christos Stavrakakis
        elif op_id == "OP_INSTANCE_SNAPSHOT":
308 11d4d283 Christos Stavrakakis
            job_fields = {"disks": get_field(input, "disks")}
309 11d4d283 Christos Stavrakakis
            reason = get_field(input, "reason")
310 11d4d283 Christos Stavrakakis
            snapshot_info = None
311 11d4d283 Christos Stavrakakis
            if isinstance(reason, list) and len(reason) > 0:
312 11d4d283 Christos Stavrakakis
                reason = reason[0]
313 11d4d283 Christos Stavrakakis
                if reason[0] == "gnt:user":
314 11d4d283 Christos Stavrakakis
                    snapshot_info = reason[1]
315 11d4d283 Christos Stavrakakis
            self.logger.critical("LALALL %s", job_fields["disks"][0])
316 11d4d283 Christos Stavrakakis
            job_fields["disks"][0][1]["snapshot_info"] = snapshot_info
317 e6fbada1 Christos Stavrakakis
318 39ccdb18 Christos Stavrakakis
        msg = {"type": "ganeti-op-status",
319 39ccdb18 Christos Stavrakakis
               "instance": instances,
320 e6fbada1 Christos Stavrakakis
               "operation": op_id,
321 e6fbada1 Christos Stavrakakis
               "job_fields": job_fields}
322 39ccdb18 Christos Stavrakakis
323 e5a92bec Christos Stavrakakis
        if ((op_id in ["OP_INSTANCE_CREATE", "OP_INSTANCE_STARTUP"] and
324 e5a92bec Christos Stavrakakis
             op.status == "success") or
325 e5a92bec Christos Stavrakakis
            (op_id == "OP_INSTANCE_SET_PARAMS" and
326 e5a92bec Christos Stavrakakis
             op.status in ["success", "error", "cancelled"])):
327 d9b25288 Christos Stavrakakis
                attachments = get_instance_attachments(msg["instance"],
328 d9b25288 Christos Stavrakakis
                                                       self.logger)
329 d9b25288 Christos Stavrakakis
                msg["instance_nics"] = attachments["nics"]
330 d9b25288 Christos Stavrakakis
                msg["instance_disks"] = attachments["disks"]
331 12ee1ad8 Christos Stavrakakis
332 39ccdb18 Christos Stavrakakis
        routekey = "ganeti.%s.event.op" % prefix_from_name(instances)
333 39ccdb18 Christos Stavrakakis
334 39ccdb18 Christos Stavrakakis
        return msg, routekey
335 39ccdb18 Christos Stavrakakis
336 39ccdb18 Christos Stavrakakis
    def process_network_op(self, op, job_id):
337 39ccdb18 Christos Stavrakakis
        """ Process OP_NETWORK_* opcodes.
338 39ccdb18 Christos Stavrakakis

339 39ccdb18 Christos Stavrakakis
        """
340 39ccdb18 Christos Stavrakakis
341 39ccdb18 Christos Stavrakakis
        input = op.input
342 39ccdb18 Christos Stavrakakis
        op_id = input.OP_ID
343 39ccdb18 Christos Stavrakakis
        network_name = get_field(input, 'network_name')
344 39ccdb18 Christos Stavrakakis
345 39ccdb18 Christos Stavrakakis
        if not network_name:
346 39ccdb18 Christos Stavrakakis
            return None, None
347 39ccdb18 Christos Stavrakakis
348 39ccdb18 Christos Stavrakakis
        self.logger.debug("Job: %d: %s(%s) %s", job_id, op_id,
349 39ccdb18 Christos Stavrakakis
                          network_name, op.status)
350 39ccdb18 Christos Stavrakakis
351 e6fbada1 Christos Stavrakakis
        job_fields = {
352 e6fbada1 Christos Stavrakakis
            'subnet': get_field(input, 'network'),
353 e6fbada1 Christos Stavrakakis
            'gateway': get_field(input, 'gateway'),
354 e6fbada1 Christos Stavrakakis
            "add_reserved_ips": get_field(input, "add_reserved_ips"),
355 e6fbada1 Christos Stavrakakis
            "remove_reserved_ips": get_field(input, "remove_reserved_ips"),
356 e6fbada1 Christos Stavrakakis
            # 'network_mode': get_field(input, 'network_mode'),
357 e6fbada1 Christos Stavrakakis
            # 'network_link': get_field(input, 'network_link'),
358 e6fbada1 Christos Stavrakakis
            'group_name': get_field(input, 'group_name')}
359 e6fbada1 Christos Stavrakakis
360 39ccdb18 Christos Stavrakakis
        msg = {'operation':    op_id,
361 39ccdb18 Christos Stavrakakis
               'type':         "ganeti-network-status",
362 39ccdb18 Christos Stavrakakis
               'network':      network_name,
363 e6fbada1 Christos Stavrakakis
               'job_fields':   job_fields}
364 e6fbada1 Christos Stavrakakis
365 39ccdb18 Christos Stavrakakis
        routekey = "ganeti.%s.event.network" % prefix_from_name(network_name)
366 39ccdb18 Christos Stavrakakis
367 39ccdb18 Christos Stavrakakis
        return msg, routekey
368 39ccdb18 Christos Stavrakakis
369 36d450e8 Christos Stavrakakis
    def process_cluster_op(self, op, job_id):
370 36d450e8 Christos Stavrakakis
        """ Process OP_CLUSTER_* opcodes.
371 39ccdb18 Christos Stavrakakis

372 36d450e8 Christos Stavrakakis
        """
373 36d450e8 Christos Stavrakakis
374 36d450e8 Christos Stavrakakis
        input = op.input
375 36d450e8 Christos Stavrakakis
        op_id = input.OP_ID
376 36d450e8 Christos Stavrakakis
377 36d450e8 Christos Stavrakakis
        self.logger.debug("Job: %d: %s %s", job_id, op_id, op.status)
378 39ccdb18 Christos Stavrakakis
379 4b7fbdc6 Christos Stavrakakis
        if op_id != "OP_CLUSTER_SET_PARAMS":
380 4b7fbdc6 Christos Stavrakakis
            # Send only modifications of cluster
381 4b7fbdc6 Christos Stavrakakis
            return None, None
382 4b7fbdc6 Christos Stavrakakis
383 36d450e8 Christos Stavrakakis
        msg = {'operation':    op_id,
384 36d450e8 Christos Stavrakakis
               'type':         "ganeti-cluster-status"}
385 36d450e8 Christos Stavrakakis
386 36d450e8 Christos Stavrakakis
        routekey = "ganeti.event.cluster"
387 36d450e8 Christos Stavrakakis
388 36d450e8 Christos Stavrakakis
        return msg, routekey
389 39ccdb18 Christos Stavrakakis
390 39ccdb18 Christos Stavrakakis
391 27a8e4ae Christos Stavrakakis
def find_cluster_name():
392 51fc0054 Christos Stavrakakis
    global handler_logger
393 51fc0054 Christos Stavrakakis
    try:
394 a9bb2a1a Christos Stavrakakis
        ss = SimpleStore()
395 a9bb2a1a Christos Stavrakakis
        name = ss.GetClusterName()
396 51fc0054 Christos Stavrakakis
    except Exception as e:
397 51fc0054 Christos Stavrakakis
        handler_logger.error('Can not get the name of the Cluster: %s' % e)
398 51fc0054 Christos Stavrakakis
        raise e
399 51fc0054 Christos Stavrakakis
400 27a8e4ae Christos Stavrakakis
    return name
401 39ccdb18 Christos Stavrakakis
402 449787d3 Christos Stavrakakis
403 b024b97a Georgios Gousios
handler_logger = None
404 449787d3 Christos Stavrakakis
405 449787d3 Christos Stavrakakis
406 b024b97a Georgios Gousios
def fatal_signal_handler(signum, frame):
407 b024b97a Georgios Gousios
    global handler_logger
408 b024b97a Georgios Gousios
409 b024b97a Georgios Gousios
    handler_logger.info("Caught fatal signal %d, will raise SystemExit",
410 2cd99e7a Georgios Gousios
                        signum)
411 b024b97a Georgios Gousios
    raise SystemExit
412 b024b97a Georgios Gousios
413 449787d3 Christos Stavrakakis
414 b024b97a Georgios Gousios
def parse_arguments(args):
415 b024b97a Georgios Gousios
    from optparse import OptionParser
416 b024b97a Georgios Gousios
417 b024b97a Georgios Gousios
    parser = OptionParser()
418 b024b97a Georgios Gousios
    parser.add_option("-d", "--debug", action="store_true", dest="debug",
419 2cd99e7a Georgios Gousios
                      help="Enable debugging information")
420 b024b97a Georgios Gousios
    parser.add_option("-l", "--log", dest="log_file",
421 45ebfd48 Vangelis Koukis
                      default="/var/log/snf-ganeti-eventd.log",
422 2cd99e7a Georgios Gousios
                      metavar="FILE",
423 2cd99e7a Georgios Gousios
                      help="Write log to FILE instead of %s" %
424 449787d3 Christos Stavrakakis
                           "/var/log/snf-ganeti-eventd.log")
425 b024b97a Georgios Gousios
    parser.add_option('--pid-file', dest="pid_file",
426 45ebfd48 Vangelis Koukis
                      default="/var/run/snf-ganeti-eventd.pid",
427 2cd99e7a Georgios Gousios
                      metavar='PIDFILE',
428 2cd99e7a Georgios Gousios
                      help="Save PID to file (default: %s)" %
429 449787d3 Christos Stavrakakis
                           "/var/run/snf-ganeti-eventd.pid")
430 b024b97a Georgios Gousios
431 b024b97a Georgios Gousios
    return parser.parse_args(args)
432 b024b97a Georgios Gousios
433 449787d3 Christos Stavrakakis
434 b024b97a Georgios Gousios
def main():
435 b024b97a Georgios Gousios
    global handler_logger
436 b024b97a Georgios Gousios
437 b024b97a Georgios Gousios
    (opts, args) = parse_arguments(sys.argv[1:])
438 b024b97a Georgios Gousios
439 b024b97a Georgios Gousios
    # Initialize logger
440 b024b97a Georgios Gousios
    lvl = logging.DEBUG if opts.debug else logging.INFO
441 03a4b970 Georgios Gousios
    logger = logging.getLogger("ganeti.eventd")
442 b024b97a Georgios Gousios
    logger.setLevel(lvl)
443 36cf1973 Vangelis Koukis
    formatter = logging.Formatter(
444 36cf1973 Vangelis Koukis
        "%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
445 36cf1973 Vangelis Koukis
        "%Y-%m-%d %H:%M:%S")
446 b024b97a Georgios Gousios
    handler = logging.FileHandler(opts.log_file)
447 b024b97a Georgios Gousios
    handler.setFormatter(formatter)
448 b024b97a Georgios Gousios
    logger.addHandler(handler)
449 b024b97a Georgios Gousios
    handler_logger = logger
450 b024b97a Georgios Gousios
451 1dc821c9 Christos Stavrakakis
    # Rename this process so 'ps' output looks like this is a native
452 1dc821c9 Christos Stavrakakis
    # executable.  Can not seperate command-line arguments from actual name of
453 1dc821c9 Christos Stavrakakis
    # the executable by NUL bytes, so only show the name of the executable
454 1dc821c9 Christos Stavrakakis
    # instead.  setproctitle.setproctitle("\x00".join(sys.argv))
455 1dc821c9 Christos Stavrakakis
    setproctitle.setproctitle(sys.argv[0])
456 1dc821c9 Christos Stavrakakis
457 cf2a3529 Christos Stavrakakis
    # Create pidfile
458 cf2a3529 Christos Stavrakakis
    pidf = daemon.pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
459 cf2a3529 Christos Stavrakakis
460 cf2a3529 Christos Stavrakakis
    # Remove any stale PID files, left behind by previous invocations
461 cf2a3529 Christos Stavrakakis
    if daemon.runner.is_pidfile_stale(pidf):
462 cf2a3529 Christos Stavrakakis
        logger.warning("Removing stale PID lock file %s", pidf.path)
463 cf2a3529 Christos Stavrakakis
        pidf.break_lock()
464 cf2a3529 Christos Stavrakakis
465 348f53de Georgios Gousios
    # Become a daemon:
466 348f53de Georgios Gousios
    # Redirect stdout and stderr to handler.stream to catch
467 348f53de Georgios Gousios
    # early errors in the daemonization process [e.g., pidfile creation]
468 348f53de Georgios Gousios
    # which will otherwise go to /dev/null.
469 348f53de Georgios Gousios
    daemon_context = daemon.DaemonContext(
470 449787d3 Christos Stavrakakis
        pidfile=pidf,
471 449787d3 Christos Stavrakakis
        umask=022,
472 449787d3 Christos Stavrakakis
        stdout=handler.stream,
473 449787d3 Christos Stavrakakis
        stderr=handler.stream,
474 449787d3 Christos Stavrakakis
        files_preserve=[handler.stream])
475 cf2a3529 Christos Stavrakakis
    try:
476 cf2a3529 Christos Stavrakakis
        daemon_context.open()
477 cf2a3529 Christos Stavrakakis
    except (daemon.pidlockfile.AlreadyLocked, LockTimeout):
478 cf2a3529 Christos Stavrakakis
        logger.critical("Failed to lock pidfile %s, another instance running?",
479 cf2a3529 Christos Stavrakakis
                        pidf.path)
480 cf2a3529 Christos Stavrakakis
        sys.exit(1)
481 cf2a3529 Christos Stavrakakis
482 348f53de Georgios Gousios
    logger.info("Became a daemon")
483 348f53de Georgios Gousios
484 348f53de Georgios Gousios
    # Catch signals to ensure graceful shutdown
485 348f53de Georgios Gousios
    signal(SIGINT, fatal_signal_handler)
486 348f53de Georgios Gousios
    signal(SIGTERM, fatal_signal_handler)
487 348f53de Georgios Gousios
488 b024b97a Georgios Gousios
    # Monitor the Ganeti job queue, create and push notifications
489 b024b97a Georgios Gousios
    wm = pyinotify.WatchManager()
490 449787d3 Christos Stavrakakis
    mask = (pyinotify.EventsCodes.ALL_FLAGS["IN_MOVED_TO"] |
491 449787d3 Christos Stavrakakis
            pyinotify.EventsCodes.ALL_FLAGS["IN_CLOSE_WRITE"])
492 51fc0054 Christos Stavrakakis
493 27a8e4ae Christos Stavrakakis
    cluster_name = find_cluster_name()
494 51fc0054 Christos Stavrakakis
495 27a8e4ae Christos Stavrakakis
    handler = JobFileHandler(logger, cluster_name)
496 b024b97a Georgios Gousios
    notifier = pyinotify.Notifier(wm, handler)
497 b024b97a Georgios Gousios
498 b024b97a Georgios Gousios
    try:
499 b024b97a Georgios Gousios
        # Fail if adding the inotify() watch fails for any reason
500 a9bb2a1a Christos Stavrakakis
        res = wm.add_watch(pathutils.QUEUE_DIR, mask)
501 a9bb2a1a Christos Stavrakakis
        if res[pathutils.QUEUE_DIR] < 0:
502 36cf1973 Vangelis Koukis
            raise Exception("pyinotify add_watch returned negative descriptor")
503 348f53de Georgios Gousios
504 a9bb2a1a Christos Stavrakakis
        logger.info("Now watching %s of %s" % (pathutils.QUEUE_DIR,
505 a9bb2a1a Christos Stavrakakis
                    cluster_name))
506 b024b97a Georgios Gousios
507 b024b97a Georgios Gousios
        while True:    # loop forever
508 348f53de Georgios Gousios
            # process the queue of events as explained above
509 b024b97a Georgios Gousios
            notifier.process_events()
510 b024b97a Georgios Gousios
            if notifier.check_events():
511 b024b97a Georgios Gousios
                # read notified events and enqeue them
512 b024b97a Georgios Gousios
                notifier.read_events()
513 b024b97a Georgios Gousios
    except SystemExit:
514 b024b97a Georgios Gousios
        logger.info("SystemExit")
515 b024b97a Georgios Gousios
    except:
516 b024b97a Georgios Gousios
        logger.exception("Caught exception, terminating")
517 b024b97a Georgios Gousios
    finally:
518 b024b97a Georgios Gousios
        # destroy the inotify's instance on this interrupt (stop monitoring)
519 b024b97a Georgios Gousios
        notifier.stop()
520 b024b97a Georgios Gousios
        raise
521 b024b97a Georgios Gousios
522 b024b97a Georgios Gousios
if __name__ == "__main__":
523 b024b97a Georgios Gousios
    sys.exit(main())
524 b024b97a Georgios Gousios
525 348f53de Georgios Gousios
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :