root / ganeti / ganeti-0mqd.py @ 432fc8c3
History | View | Annotate | Download (3.2 kB)
1 | b80d0a1e | Vangelis Koukis | #!/usr/bin/env python
|
---|---|---|---|
2 | b80d0a1e | Vangelis Koukis | #
|
3 | b80d0a1e | Vangelis Koukis | # Copyright (c) 2010 Greek Research and Technology Network
|
4 | b80d0a1e | Vangelis Koukis | #
|
5 | b80d0a1e | Vangelis Koukis | """ A daemon to monitor the Ganeti job queue and emit job progress notifications over 0mq. """
|
6 | b80d0a1e | Vangelis Koukis | |
7 | b80d0a1e | Vangelis Koukis | import os |
8 | b80d0a1e | Vangelis Koukis | import sys |
9 | b80d0a1e | Vangelis Koukis | import zmq |
10 | b80d0a1e | Vangelis Koukis | import time |
11 | b80d0a1e | Vangelis Koukis | import json |
12 | b80d0a1e | Vangelis Koukis | import logging |
13 | b80d0a1e | Vangelis Koukis | import pyinotify |
14 | b80d0a1e | Vangelis Koukis | |
15 | b80d0a1e | Vangelis Koukis | from ganeti import utils |
16 | b80d0a1e | Vangelis Koukis | from ganeti import jqueue |
17 | b80d0a1e | Vangelis Koukis | from ganeti import constants |
18 | b80d0a1e | Vangelis Koukis | from ganeti import serializer |
19 | b80d0a1e | Vangelis Koukis | |
20 | b80d0a1e | Vangelis Koukis | GANETI_ZMQ_PUBLISHER = "tcp://*:5801" # FIXME: move to settings.py |
21 | b80d0a1e | Vangelis Koukis | |
22 | b80d0a1e | Vangelis Koukis | |
23 | b80d0a1e | Vangelis Koukis | class JobFileHandler(pyinotify.ProcessEvent): |
24 | b80d0a1e | Vangelis Koukis | def __init__(self, publisher): |
25 | b80d0a1e | Vangelis Koukis | pyinotify.ProcessEvent.__init__(self)
|
26 | b80d0a1e | Vangelis Koukis | self.publisher = publisher
|
27 | b80d0a1e | Vangelis Koukis | |
28 | b80d0a1e | Vangelis Koukis | def process_IN_CLOSE_WRITE(self, event): |
29 | b80d0a1e | Vangelis Koukis | jobfile = os.path.join(event.path, event.name) |
30 | b80d0a1e | Vangelis Koukis | if not event.name.startswith("job-"): |
31 | b80d0a1e | Vangelis Koukis | logging.debug("Not a job file: %s" % event.path)
|
32 | b80d0a1e | Vangelis Koukis | return
|
33 | b80d0a1e | Vangelis Koukis | |
34 | b80d0a1e | Vangelis Koukis | try:
|
35 | b80d0a1e | Vangelis Koukis | data = utils.ReadFile(jobfile) |
36 | b80d0a1e | Vangelis Koukis | except IOError: |
37 | b80d0a1e | Vangelis Koukis | |
38 | b80d0a1e | Vangelis Koukis | return
|
39 | b80d0a1e | Vangelis Koukis | |
40 | b80d0a1e | Vangelis Koukis | data = serializer.LoadJson(data) |
41 | b80d0a1e | Vangelis Koukis | job = jqueue._QueuedJob.Restore(None, data)
|
42 | b80d0a1e | Vangelis Koukis | |
43 | b80d0a1e | Vangelis Koukis | for op in job.ops: |
44 | b80d0a1e | Vangelis Koukis | instances = ""
|
45 | b80d0a1e | Vangelis Koukis | try:
|
46 | b80d0a1e | Vangelis Koukis | instances = " ".join(op.input.instances)
|
47 | b80d0a1e | Vangelis Koukis | except AttributeError: |
48 | b80d0a1e | Vangelis Koukis | pass
|
49 | b80d0a1e | Vangelis Koukis | |
50 | b80d0a1e | Vangelis Koukis | try:
|
51 | b80d0a1e | Vangelis Koukis | instances = op.input.instance_name |
52 | b80d0a1e | Vangelis Koukis | except AttributeError: |
53 | b80d0a1e | Vangelis Koukis | pass
|
54 | b80d0a1e | Vangelis Koukis | |
55 | b80d0a1e | Vangelis Koukis | # Get the last line of the op log as message
|
56 | b80d0a1e | Vangelis Koukis | try:
|
57 | b80d0a1e | Vangelis Koukis | logmsg = op.log[-1][-1] |
58 | b80d0a1e | Vangelis Koukis | except IndexError: |
59 | b80d0a1e | Vangelis Koukis | logmsg = None
|
60 | b80d0a1e | Vangelis Koukis | |
61 | b80d0a1e | Vangelis Koukis | logging.debug("%d: %s(%s) %s %s" % (int(job.id), op.input.OP_ID, instances, op.status, logmsg)) |
62 | b80d0a1e | Vangelis Koukis | if op.status in constants.JOBS_FINALIZED: |
63 | b80d0a1e | Vangelis Koukis | logging.info("%d: %s" % (int(job.id), op.status)) |
64 | b80d0a1e | Vangelis Koukis | |
65 | b80d0a1e | Vangelis Koukis | # Construct message
|
66 | b80d0a1e | Vangelis Koukis | msg = { |
67 | d08a5f6f | Vangelis Koukis | "type": "ganeti-op-status", |
68 | b80d0a1e | Vangelis Koukis | "instance": instances,
|
69 | b80d0a1e | Vangelis Koukis | "operation": op.input.OP_ID,
|
70 | b80d0a1e | Vangelis Koukis | "jobId": int(job.id), |
71 | d08a5f6f | Vangelis Koukis | "status": op.status,
|
72 | d08a5f6f | Vangelis Koukis | "logmsg": logmsg
|
73 | b80d0a1e | Vangelis Koukis | } |
74 | b80d0a1e | Vangelis Koukis | if logmsg:
|
75 | b80d0a1e | Vangelis Koukis | msg["message"] = logmsg
|
76 | b80d0a1e | Vangelis Koukis | |
77 | b80d0a1e | Vangelis Koukis | # Output as JSON
|
78 | b80d0a1e | Vangelis Koukis | print json.dumps(msg)
|
79 | b80d0a1e | Vangelis Koukis | |
80 | b80d0a1e | Vangelis Koukis | self.publisher.send_json(msg)
|
81 | b80d0a1e | Vangelis Koukis | |
82 | b80d0a1e | Vangelis Koukis | |
83 | b80d0a1e | Vangelis Koukis | def main(): |
84 | b80d0a1e | Vangelis Koukis | zmqc = zmq.Context() |
85 | b80d0a1e | Vangelis Koukis | publisher = zmqc.socket(zmq.PUB) |
86 | b80d0a1e | Vangelis Koukis | publisher.bind(GANETI_ZMQ_PUBLISHER) |
87 | b80d0a1e | Vangelis Koukis | logging.info("Now publishing on %s" % GANETI_ZMQ_PUBLISHER)
|
88 | b80d0a1e | Vangelis Koukis | |
89 | b80d0a1e | Vangelis Koukis | wm = pyinotify.WatchManager() |
90 | b80d0a1e | Vangelis Koukis | mask = pyinotify.EventsCodes.ALL_FLAGS["IN_CLOSE_WRITE"]
|
91 | b80d0a1e | Vangelis Koukis | handler = JobFileHandler(publisher) |
92 | b80d0a1e | Vangelis Koukis | notifier = pyinotify.Notifier(wm, handler) |
93 | b80d0a1e | Vangelis Koukis | wm.add_watch(constants.QUEUE_DIR, mask) |
94 | b80d0a1e | Vangelis Koukis | |
95 | b80d0a1e | Vangelis Koukis | logging.info("Now watching %s" % constants.QUEUE_DIR)
|
96 | b80d0a1e | Vangelis Koukis | |
97 | b80d0a1e | Vangelis Koukis | while True: # loop forever |
98 | b80d0a1e | Vangelis Koukis | try:
|
99 | b80d0a1e | Vangelis Koukis | # process the queue of events as explained above
|
100 | b80d0a1e | Vangelis Koukis | notifier.process_events() |
101 | b80d0a1e | Vangelis Koukis | if notifier.check_events():
|
102 | b80d0a1e | Vangelis Koukis | # read notified events and enqeue them
|
103 | b80d0a1e | Vangelis Koukis | notifier.read_events() |
104 | b80d0a1e | Vangelis Koukis | except KeyboardInterrupt: |
105 | b80d0a1e | Vangelis Koukis | # destroy the inotify's instance on this interrupt (stop monitoring)
|
106 | b80d0a1e | Vangelis Koukis | notifier.stop() |
107 | b80d0a1e | Vangelis Koukis | break
|
108 | b80d0a1e | Vangelis Koukis | |
109 | b80d0a1e | Vangelis Koukis | |
110 | b80d0a1e | Vangelis Koukis | if __name__ == "__main__": |
111 | b80d0a1e | Vangelis Koukis | logging.basicConfig(level=logging.DEBUG) |
112 | b80d0a1e | Vangelis Koukis | sys.exit(main()) |
113 | b80d0a1e | Vangelis Koukis | |
114 | b80d0a1e | Vangelis Koukis | # vim: set ts=4 sts=4 sw=4 et ai : |