Statistics
| Branch: | Tag: | Revision:

root / ganeti / ganeti-0mqd.py @ 432fc8c3

History | View | Annotate | Download (3.2 kB)

1 b80d0a1e Vangelis Koukis
#!/usr/bin/env python
2 b80d0a1e Vangelis Koukis
#
3 b80d0a1e Vangelis Koukis
# Copyright (c) 2010 Greek Research and Technology Network
4 b80d0a1e Vangelis Koukis
#
5 b80d0a1e Vangelis Koukis
""" A daemon to monitor the Ganeti job queue and emit job progress notifications over 0mq. """
6 b80d0a1e Vangelis Koukis
7 b80d0a1e Vangelis Koukis
import os
8 b80d0a1e Vangelis Koukis
import sys
9 b80d0a1e Vangelis Koukis
import zmq
10 b80d0a1e Vangelis Koukis
import time
11 b80d0a1e Vangelis Koukis
import json
12 b80d0a1e Vangelis Koukis
import logging
13 b80d0a1e Vangelis Koukis
import pyinotify
14 b80d0a1e Vangelis Koukis
15 b80d0a1e Vangelis Koukis
from ganeti import utils
16 b80d0a1e Vangelis Koukis
from ganeti import jqueue
17 b80d0a1e Vangelis Koukis
from ganeti import constants
18 b80d0a1e Vangelis Koukis
from ganeti import serializer
19 b80d0a1e Vangelis Koukis
20 b80d0a1e Vangelis Koukis
GANETI_ZMQ_PUBLISHER = "tcp://*:5801" # FIXME: move to settings.py
21 b80d0a1e Vangelis Koukis
22 b80d0a1e Vangelis Koukis
23 b80d0a1e Vangelis Koukis
class JobFileHandler(pyinotify.ProcessEvent):
24 b80d0a1e Vangelis Koukis
    def __init__(self, publisher):
25 b80d0a1e Vangelis Koukis
            pyinotify.ProcessEvent.__init__(self)
26 b80d0a1e Vangelis Koukis
            self.publisher = publisher
27 b80d0a1e Vangelis Koukis
                  
28 b80d0a1e Vangelis Koukis
    def process_IN_CLOSE_WRITE(self, event):
29 b80d0a1e Vangelis Koukis
        jobfile = os.path.join(event.path, event.name)
30 b80d0a1e Vangelis Koukis
        if not event.name.startswith("job-"):
31 b80d0a1e Vangelis Koukis
            logging.debug("Not a job file: %s" % event.path)
32 b80d0a1e Vangelis Koukis
            return
33 b80d0a1e Vangelis Koukis
34 b80d0a1e Vangelis Koukis
        try:
35 b80d0a1e Vangelis Koukis
            data = utils.ReadFile(jobfile)
36 b80d0a1e Vangelis Koukis
        except IOError:
37 b80d0a1e Vangelis Koukis
38 b80d0a1e Vangelis Koukis
            return
39 b80d0a1e Vangelis Koukis
40 b80d0a1e Vangelis Koukis
        data = serializer.LoadJson(data)
41 b80d0a1e Vangelis Koukis
        job = jqueue._QueuedJob.Restore(None, data)
42 b80d0a1e Vangelis Koukis
43 b80d0a1e Vangelis Koukis
        for op in job.ops:
44 b80d0a1e Vangelis Koukis
            instances = ""
45 b80d0a1e Vangelis Koukis
            try:
46 b80d0a1e Vangelis Koukis
                instances = " ".join(op.input.instances)
47 b80d0a1e Vangelis Koukis
            except AttributeError:
48 b80d0a1e Vangelis Koukis
                pass
49 b80d0a1e Vangelis Koukis
50 b80d0a1e Vangelis Koukis
            try:
51 b80d0a1e Vangelis Koukis
                instances = op.input.instance_name
52 b80d0a1e Vangelis Koukis
            except AttributeError:
53 b80d0a1e Vangelis Koukis
                pass
54 b80d0a1e Vangelis Koukis
55 b80d0a1e Vangelis Koukis
            # Get the last line of the op log as message
56 b80d0a1e Vangelis Koukis
            try:
57 b80d0a1e Vangelis Koukis
                logmsg = op.log[-1][-1]
58 b80d0a1e Vangelis Koukis
            except IndexError:
59 b80d0a1e Vangelis Koukis
                logmsg = None
60 b80d0a1e Vangelis Koukis
            
61 b80d0a1e Vangelis Koukis
            logging.debug("%d: %s(%s) %s %s" % (int(job.id), op.input.OP_ID, instances, op.status, logmsg))
62 b80d0a1e Vangelis Koukis
            if op.status in constants.JOBS_FINALIZED:
63 b80d0a1e Vangelis Koukis
                logging.info("%d: %s" % (int(job.id), op.status))
64 b80d0a1e Vangelis Koukis
65 b80d0a1e Vangelis Koukis
            # Construct message
66 b80d0a1e Vangelis Koukis
            msg = {
67 d08a5f6f Vangelis Koukis
                "type": "ganeti-op-status",
68 b80d0a1e Vangelis Koukis
                "instance": instances,
69 b80d0a1e Vangelis Koukis
                "operation": op.input.OP_ID,
70 b80d0a1e Vangelis Koukis
                "jobId": int(job.id),
71 d08a5f6f Vangelis Koukis
                "status": op.status,
72 d08a5f6f Vangelis Koukis
                "logmsg": logmsg
73 b80d0a1e Vangelis Koukis
            }
74 b80d0a1e Vangelis Koukis
            if logmsg:
75 b80d0a1e Vangelis Koukis
                msg["message"] = logmsg
76 b80d0a1e Vangelis Koukis
            
77 b80d0a1e Vangelis Koukis
            # Output as JSON
78 b80d0a1e Vangelis Koukis
            print json.dumps(msg)
79 b80d0a1e Vangelis Koukis
            
80 b80d0a1e Vangelis Koukis
            self.publisher.send_json(msg)
81 b80d0a1e Vangelis Koukis
82 b80d0a1e Vangelis Koukis
83 b80d0a1e Vangelis Koukis
def main():
84 b80d0a1e Vangelis Koukis
    zmqc = zmq.Context()
85 b80d0a1e Vangelis Koukis
    publisher = zmqc.socket(zmq.PUB)
86 b80d0a1e Vangelis Koukis
    publisher.bind(GANETI_ZMQ_PUBLISHER)
87 b80d0a1e Vangelis Koukis
    logging.info("Now publishing on %s" % GANETI_ZMQ_PUBLISHER)
88 b80d0a1e Vangelis Koukis
    
89 b80d0a1e Vangelis Koukis
    wm = pyinotify.WatchManager()
90 b80d0a1e Vangelis Koukis
    mask = pyinotify.EventsCodes.ALL_FLAGS["IN_CLOSE_WRITE"]
91 b80d0a1e Vangelis Koukis
    handler = JobFileHandler(publisher)
92 b80d0a1e Vangelis Koukis
    notifier = pyinotify.Notifier(wm, handler)
93 b80d0a1e Vangelis Koukis
    wm.add_watch(constants.QUEUE_DIR, mask)
94 b80d0a1e Vangelis Koukis
95 b80d0a1e Vangelis Koukis
    logging.info("Now watching %s" % constants.QUEUE_DIR)
96 b80d0a1e Vangelis Koukis
97 b80d0a1e Vangelis Koukis
    while True:    # loop forever
98 b80d0a1e Vangelis Koukis
        try:
99 b80d0a1e Vangelis Koukis
            # process the queue of events as explained above
100 b80d0a1e Vangelis Koukis
            notifier.process_events()
101 b80d0a1e Vangelis Koukis
            if notifier.check_events():
102 b80d0a1e Vangelis Koukis
                # read notified events and enqeue them
103 b80d0a1e Vangelis Koukis
                notifier.read_events()
104 b80d0a1e Vangelis Koukis
        except KeyboardInterrupt:
105 b80d0a1e Vangelis Koukis
            # destroy the inotify's instance on this interrupt (stop monitoring)
106 b80d0a1e Vangelis Koukis
            notifier.stop()
107 b80d0a1e Vangelis Koukis
            break
108 b80d0a1e Vangelis Koukis
109 b80d0a1e Vangelis Koukis
110 b80d0a1e Vangelis Koukis
if __name__ == "__main__":
111 b80d0a1e Vangelis Koukis
    logging.basicConfig(level=logging.DEBUG)
112 b80d0a1e Vangelis Koukis
    sys.exit(main())
113 b80d0a1e Vangelis Koukis
114 b80d0a1e Vangelis Koukis
# vim: set ts=4 sts=4 sw=4 et ai :