root / ganeti / ganeti-0mqd.py @ 432fc8c3
History | View | Annotate | Download (3.2 kB)
1 |
#!/usr/bin/env python
|
---|---|
2 |
#
|
3 |
# Copyright (c) 2010 Greek Research and Technology Network
|
4 |
#
|
5 |
""" A daemon to monitor the Ganeti job queue and emit job progress notifications over 0mq. """
|
6 |
|
7 |
import os |
8 |
import sys |
9 |
import zmq |
10 |
import time |
11 |
import json |
12 |
import logging |
13 |
import pyinotify |
14 |
|
15 |
from ganeti import utils |
16 |
from ganeti import jqueue |
17 |
from ganeti import constants |
18 |
from ganeti import serializer |
19 |
|
20 |
GANETI_ZMQ_PUBLISHER = "tcp://*:5801" # FIXME: move to settings.py |
21 |
|
22 |
|
23 |
class JobFileHandler(pyinotify.ProcessEvent): |
24 |
def __init__(self, publisher): |
25 |
pyinotify.ProcessEvent.__init__(self)
|
26 |
self.publisher = publisher
|
27 |
|
28 |
def process_IN_CLOSE_WRITE(self, event): |
29 |
jobfile = os.path.join(event.path, event.name) |
30 |
if not event.name.startswith("job-"): |
31 |
logging.debug("Not a job file: %s" % event.path)
|
32 |
return
|
33 |
|
34 |
try:
|
35 |
data = utils.ReadFile(jobfile) |
36 |
except IOError: |
37 |
|
38 |
return
|
39 |
|
40 |
data = serializer.LoadJson(data) |
41 |
job = jqueue._QueuedJob.Restore(None, data)
|
42 |
|
43 |
for op in job.ops: |
44 |
instances = ""
|
45 |
try:
|
46 |
instances = " ".join(op.input.instances)
|
47 |
except AttributeError: |
48 |
pass
|
49 |
|
50 |
try:
|
51 |
instances = op.input.instance_name |
52 |
except AttributeError: |
53 |
pass
|
54 |
|
55 |
# Get the last line of the op log as message
|
56 |
try:
|
57 |
logmsg = op.log[-1][-1] |
58 |
except IndexError: |
59 |
logmsg = None
|
60 |
|
61 |
logging.debug("%d: %s(%s) %s %s" % (int(job.id), op.input.OP_ID, instances, op.status, logmsg)) |
62 |
if op.status in constants.JOBS_FINALIZED: |
63 |
logging.info("%d: %s" % (int(job.id), op.status)) |
64 |
|
65 |
# Construct message
|
66 |
msg = { |
67 |
"type": "ganeti-op-status", |
68 |
"instance": instances,
|
69 |
"operation": op.input.OP_ID,
|
70 |
"jobId": int(job.id), |
71 |
"status": op.status,
|
72 |
"logmsg": logmsg
|
73 |
} |
74 |
if logmsg:
|
75 |
msg["message"] = logmsg
|
76 |
|
77 |
# Output as JSON
|
78 |
print json.dumps(msg)
|
79 |
|
80 |
self.publisher.send_json(msg)
|
81 |
|
82 |
|
83 |
def main(): |
84 |
zmqc = zmq.Context() |
85 |
publisher = zmqc.socket(zmq.PUB) |
86 |
publisher.bind(GANETI_ZMQ_PUBLISHER) |
87 |
logging.info("Now publishing on %s" % GANETI_ZMQ_PUBLISHER)
|
88 |
|
89 |
wm = pyinotify.WatchManager() |
90 |
mask = pyinotify.EventsCodes.ALL_FLAGS["IN_CLOSE_WRITE"]
|
91 |
handler = JobFileHandler(publisher) |
92 |
notifier = pyinotify.Notifier(wm, handler) |
93 |
wm.add_watch(constants.QUEUE_DIR, mask) |
94 |
|
95 |
logging.info("Now watching %s" % constants.QUEUE_DIR)
|
96 |
|
97 |
while True: # loop forever |
98 |
try:
|
99 |
# process the queue of events as explained above
|
100 |
notifier.process_events() |
101 |
if notifier.check_events():
|
102 |
# read notified events and enqeue them
|
103 |
notifier.read_events() |
104 |
except KeyboardInterrupt: |
105 |
# destroy the inotify's instance on this interrupt (stop monitoring)
|
106 |
notifier.stop() |
107 |
break
|
108 |
|
109 |
|
110 |
if __name__ == "__main__": |
111 |
logging.basicConfig(level=logging.DEBUG) |
112 |
sys.exit(main()) |
113 |
|
114 |
# vim: set ts=4 sts=4 sw=4 et ai :
|