Revision b80d0a1e

b/ganeti/events.py
1
#!/usr/bin/env python
2
#
3
# Copyright (c) 2010 Greek Research and Technology Network
4
#
5
""" Receive Ganeti events over 0mq """
6

  
7
import sys
8
import zmq
9
import time
10
import json
11
import logging
12

  
13
GANETI_ZMQ_PUBLISHER = "tcp://ganeti-master:5801" # FIXME: move to settings.py
14

  
15
def main():
16
    # Connect to ganeti-0mqd
17
    zmqc = zmq.Context()
18
    subscriber = zmqc.socket(zmq.SUB)
19
    subscriber.setsockopt(zmq.IDENTITY, "DBController")
20
    subscriber.setsockopt(zmq.SUBSCRIBE, "")
21
    subscriber.connect(GANETI_ZMQ_PUBLISHER)
22

  
23
    print "Connected to %s." % GANETI_ZMQ_PUBLISHER
24

  
25
    # Get updates, expect random Ctrl-C death
26
    while True:
27
        data = subscriber.recv()
28
        print data
29

  
30
if __name__ == "__main__":
31
    logging.basicConfig(level=logging.DEBUG)
32
    sys.exit(main())
33

  
34
# vim: set ts=4 sts=4 sw=4 et ai :
b/ganeti/jobwatcher.py
1
#!/usr/bin/env python
2
#
3
# Copyright (c) 2010 Greek Research and Technology Network
4
#
5
""" A daemon to monitor the Ganeti job queue and emit job progress notifications over 0mq. """
6

  
7
import os
8
import sys
9
import zmq
10
import time
11
import json
12
import logging
13
import pyinotify
14

  
15
from ganeti import utils
16
from ganeti import jqueue
17
from ganeti import constants
18
from ganeti import serializer
19

  
20
GANETI_ZMQ_PUBLISHER = "tcp://*:5801" # FIXME: move to settings.py
21

  
22

  
23
class JobFileHandler(pyinotify.ProcessEvent):
24
    def __init__(self, publisher):
25
            pyinotify.ProcessEvent.__init__(self)
26
            self.publisher = publisher
27
                  
28
    def process_IN_CLOSE_WRITE(self, event):
29
        jobfile = os.path.join(event.path, event.name)
30
        if not event.name.startswith("job-"):
31
            logging.debug("Not a job file: %s" % event.path)
32
            return
33

  
34
        try:
35
            data = utils.ReadFile(jobfile)
36
        except IOError:
37

  
38
            return
39

  
40
        data = serializer.LoadJson(data)
41
        job = jqueue._QueuedJob.Restore(None, data)
42

  
43
        for op in job.ops:
44
            instances = ""
45
            try:
46
                instances = " ".join(op.input.instances)
47
            except AttributeError:
48
                pass
49

  
50
            try:
51
                instances = op.input.instance_name
52
            except AttributeError:
53
                pass
54

  
55
            # Get the last line of the op log as message
56
            try:
57
                logmsg = op.log[-1][-1]
58
            except IndexError:
59
                logmsg = None
60
            
61
            logging.debug("%d: %s(%s) %s %s" % (int(job.id), op.input.OP_ID, instances, op.status, logmsg))
62
            if op.status in constants.JOBS_FINALIZED:
63
                logging.info("%d: %s" % (int(job.id), op.status))
64

  
65
            # Construct message
66
            msg = {
67
                "type": "Ganeti-op-status",
68
                "instance": instances,
69
                "operation": op.input.OP_ID,
70
                "jobId": int(job.id),
71
                "status": op.status
72
            }
73
            if logmsg:
74
                msg["message"] = logmsg
75
            
76
            # Output as JSON
77
            print json.dumps(msg)
78
            
79
            self.publisher.send_json(msg)
80

  
81

  
82
def main():
83
    zmqc = zmq.Context()
84
    publisher = zmqc.socket(zmq.PUB)
85
    publisher.bind(GANETI_ZMQ_PUBLISHER)
86
    logging.info("Now publishing on %s" % GANETI_ZMQ_PUBLISHER)
87
    
88
    wm = pyinotify.WatchManager()
89
    mask = pyinotify.EventsCodes.ALL_FLAGS["IN_CLOSE_WRITE"]
90
    handler = JobFileHandler(publisher)
91
    notifier = pyinotify.Notifier(wm, handler)
92
    wm.add_watch(constants.QUEUE_DIR, mask)
93

  
94
    logging.info("Now watching %s" % constants.QUEUE_DIR)
95

  
96
    while True:    # loop forever
97
        try:
98
            # process the queue of events as explained above
99
            notifier.process_events()
100
            if notifier.check_events():
101
                # read notified events and enqeue them
102
                notifier.read_events()
103
        except KeyboardInterrupt:
104
            # destroy the inotify's instance on this interrupt (stop monitoring)
105
            notifier.stop()
106
            break
107

  
108

  
109
if __name__ == "__main__":
110
    logging.basicConfig(level=logging.DEBUG)
111
    sys.exit(main())
112

  
113
# vim: set ts=4 sts=4 sw=4 et ai :

Also available in: Unified diff