Statistics
| Branch: | Tag: | Revision:

root / ganeti / ganeti-0mqd.py @ f453b337

History | View | Annotate | Download (3.2 kB)

1
#!/usr/bin/env python
2
#
3
# Copyright (c) 2010 Greek Research and Technology Network
4
#
5
""" A daemon to monitor the Ganeti job queue and emit job progress notifications over 0mq. """
6

    
7
import os
8
import sys
9
import zmq
10
import time
11
import json
12
import logging
13
import pyinotify
14

    
15
from ganeti import utils
16
from ganeti import jqueue
17
from ganeti import constants
18
from ganeti import serializer
19

    
20
GANETI_ZMQ_PUBLISHER = "tcp://*:5801" # FIXME: move to settings.py
21

    
22

    
23
class JobFileHandler(pyinotify.ProcessEvent):
24
    def __init__(self, publisher):
25
            pyinotify.ProcessEvent.__init__(self)
26
            self.publisher = publisher
27
                  
28
    def process_IN_CLOSE_WRITE(self, event):
29
        jobfile = os.path.join(event.path, event.name)
30
        if not event.name.startswith("job-"):
31
            logging.debug("Not a job file: %s" % event.path)
32
            return
33

    
34
        try:
35
            data = utils.ReadFile(jobfile)
36
        except IOError:
37

    
38
            return
39

    
40
        data = serializer.LoadJson(data)
41
        job = jqueue._QueuedJob.Restore(None, data)
42

    
43
        for op in job.ops:
44
            instances = ""
45
            try:
46
                instances = " ".join(op.input.instances)
47
            except AttributeError:
48
                pass
49

    
50
            try:
51
                instances = op.input.instance_name
52
            except AttributeError:
53
                pass
54

    
55
            # Get the last line of the op log as message
56
            try:
57
                logmsg = op.log[-1][-1]
58
            except IndexError:
59
                logmsg = None
60
            
61
            logging.debug("%d: %s(%s) %s %s" % (int(job.id), op.input.OP_ID, instances, op.status, logmsg))
62
            if op.status in constants.JOBS_FINALIZED:
63
                logging.info("%d: %s" % (int(job.id), op.status))
64

    
65
            # Construct message
66
            msg = {
67
                "type": "Ganeti-op-status",
68
                "instance": instances,
69
                "operation": op.input.OP_ID,
70
                "jobId": int(job.id),
71
                "status": op.status
72
            }
73
            if logmsg:
74
                msg["message"] = logmsg
75
            
76
            # Output as JSON
77
            print json.dumps(msg)
78
            
79
            self.publisher.send_json(msg)
80

    
81

    
82
def main():
83
    zmqc = zmq.Context()
84
    publisher = zmqc.socket(zmq.PUB)
85
    publisher.bind(GANETI_ZMQ_PUBLISHER)
86
    logging.info("Now publishing on %s" % GANETI_ZMQ_PUBLISHER)
87
    
88
    wm = pyinotify.WatchManager()
89
    mask = pyinotify.EventsCodes.ALL_FLAGS["IN_CLOSE_WRITE"]
90
    handler = JobFileHandler(publisher)
91
    notifier = pyinotify.Notifier(wm, handler)
92
    wm.add_watch(constants.QUEUE_DIR, mask)
93

    
94
    logging.info("Now watching %s" % constants.QUEUE_DIR)
95

    
96
    while True:    # loop forever
97
        try:
98
            # process the queue of events as explained above
99
            notifier.process_events()
100
            if notifier.check_events():
101
                # read notified events and enqeue them
102
                notifier.read_events()
103
        except KeyboardInterrupt:
104
            # destroy the inotify's instance on this interrupt (stop monitoring)
105
            notifier.stop()
106
            break
107

    
108

    
109
if __name__ == "__main__":
110
    logging.basicConfig(level=logging.DEBUG)
111
    sys.exit(main())
112

    
113
# vim: set ts=4 sts=4 sw=4 et ai :