Statistics
| Branch: | Tag: | Revision:

root / ganeti / ganeti-0mqd.py @ 432fc8c3

History | View | Annotate | Download (3.2 kB)

1
#!/usr/bin/env python
2
#
3
# Copyright (c) 2010 Greek Research and Technology Network
4
#
5
""" A daemon to monitor the Ganeti job queue and emit job progress notifications over 0mq. """
6

    
7
import os
8
import sys
9
import zmq
10
import time
11
import json
12
import logging
13
import pyinotify
14

    
15
from ganeti import utils
16
from ganeti import jqueue
17
from ganeti import constants
18
from ganeti import serializer
19

    
20
GANETI_ZMQ_PUBLISHER = "tcp://*:5801" # FIXME: move to settings.py
21

    
22

    
23
class JobFileHandler(pyinotify.ProcessEvent):
24
    def __init__(self, publisher):
25
            pyinotify.ProcessEvent.__init__(self)
26
            self.publisher = publisher
27
                  
28
    def process_IN_CLOSE_WRITE(self, event):
29
        jobfile = os.path.join(event.path, event.name)
30
        if not event.name.startswith("job-"):
31
            logging.debug("Not a job file: %s" % event.path)
32
            return
33

    
34
        try:
35
            data = utils.ReadFile(jobfile)
36
        except IOError:
37

    
38
            return
39

    
40
        data = serializer.LoadJson(data)
41
        job = jqueue._QueuedJob.Restore(None, data)
42

    
43
        for op in job.ops:
44
            instances = ""
45
            try:
46
                instances = " ".join(op.input.instances)
47
            except AttributeError:
48
                pass
49

    
50
            try:
51
                instances = op.input.instance_name
52
            except AttributeError:
53
                pass
54

    
55
            # Get the last line of the op log as message
56
            try:
57
                logmsg = op.log[-1][-1]
58
            except IndexError:
59
                logmsg = None
60
            
61
            logging.debug("%d: %s(%s) %s %s" % (int(job.id), op.input.OP_ID, instances, op.status, logmsg))
62
            if op.status in constants.JOBS_FINALIZED:
63
                logging.info("%d: %s" % (int(job.id), op.status))
64

    
65
            # Construct message
66
            msg = {
67
                "type": "ganeti-op-status",
68
                "instance": instances,
69
                "operation": op.input.OP_ID,
70
                "jobId": int(job.id),
71
                "status": op.status,
72
                "logmsg": logmsg
73
            }
74
            if logmsg:
75
                msg["message"] = logmsg
76
            
77
            # Output as JSON
78
            print json.dumps(msg)
79
            
80
            self.publisher.send_json(msg)
81

    
82

    
83
def main():
84
    zmqc = zmq.Context()
85
    publisher = zmqc.socket(zmq.PUB)
86
    publisher.bind(GANETI_ZMQ_PUBLISHER)
87
    logging.info("Now publishing on %s" % GANETI_ZMQ_PUBLISHER)
88
    
89
    wm = pyinotify.WatchManager()
90
    mask = pyinotify.EventsCodes.ALL_FLAGS["IN_CLOSE_WRITE"]
91
    handler = JobFileHandler(publisher)
92
    notifier = pyinotify.Notifier(wm, handler)
93
    wm.add_watch(constants.QUEUE_DIR, mask)
94

    
95
    logging.info("Now watching %s" % constants.QUEUE_DIR)
96

    
97
    while True:    # loop forever
98
        try:
99
            # process the queue of events as explained above
100
            notifier.process_events()
101
            if notifier.check_events():
102
                # read notified events and enqeue them
103
                notifier.read_events()
104
        except KeyboardInterrupt:
105
            # destroy the inotify's instance on this interrupt (stop monitoring)
106
            notifier.stop()
107
            break
108

    
109

    
110
if __name__ == "__main__":
111
    logging.basicConfig(level=logging.DEBUG)
112
    sys.exit(main())
113

    
114
# vim: set ts=4 sts=4 sw=4 et ai :