root / ganeti / ganeti-0mqd.py @ f453b337
History | View | Annotate | Download (3.2 kB)
1 |
#!/usr/bin/env python
|
---|---|
2 |
#
|
3 |
# Copyright (c) 2010 Greek Research and Technology Network
|
4 |
#
|
5 |
""" A daemon to monitor the Ganeti job queue and emit job progress notifications over 0mq. """
|
6 |
|
7 |
import os |
8 |
import sys |
9 |
import zmq |
10 |
import time |
11 |
import json |
12 |
import logging |
13 |
import pyinotify |
14 |
|
15 |
from ganeti import utils |
16 |
from ganeti import jqueue |
17 |
from ganeti import constants |
18 |
from ganeti import serializer |
19 |
|
20 |
GANETI_ZMQ_PUBLISHER = "tcp://*:5801" # FIXME: move to settings.py |
21 |
|
22 |
|
23 |
class JobFileHandler(pyinotify.ProcessEvent): |
24 |
def __init__(self, publisher): |
25 |
pyinotify.ProcessEvent.__init__(self)
|
26 |
self.publisher = publisher
|
27 |
|
28 |
def process_IN_CLOSE_WRITE(self, event): |
29 |
jobfile = os.path.join(event.path, event.name) |
30 |
if not event.name.startswith("job-"): |
31 |
logging.debug("Not a job file: %s" % event.path)
|
32 |
return
|
33 |
|
34 |
try:
|
35 |
data = utils.ReadFile(jobfile) |
36 |
except IOError: |
37 |
|
38 |
return
|
39 |
|
40 |
data = serializer.LoadJson(data) |
41 |
job = jqueue._QueuedJob.Restore(None, data)
|
42 |
|
43 |
for op in job.ops: |
44 |
instances = ""
|
45 |
try:
|
46 |
instances = " ".join(op.input.instances)
|
47 |
except AttributeError: |
48 |
pass
|
49 |
|
50 |
try:
|
51 |
instances = op.input.instance_name |
52 |
except AttributeError: |
53 |
pass
|
54 |
|
55 |
# Get the last line of the op log as message
|
56 |
try:
|
57 |
logmsg = op.log[-1][-1] |
58 |
except IndexError: |
59 |
logmsg = None
|
60 |
|
61 |
logging.debug("%d: %s(%s) %s %s" % (int(job.id), op.input.OP_ID, instances, op.status, logmsg)) |
62 |
if op.status in constants.JOBS_FINALIZED: |
63 |
logging.info("%d: %s" % (int(job.id), op.status)) |
64 |
|
65 |
# Construct message
|
66 |
msg = { |
67 |
"type": "Ganeti-op-status", |
68 |
"instance": instances,
|
69 |
"operation": op.input.OP_ID,
|
70 |
"jobId": int(job.id), |
71 |
"status": op.status
|
72 |
} |
73 |
if logmsg:
|
74 |
msg["message"] = logmsg
|
75 |
|
76 |
# Output as JSON
|
77 |
print json.dumps(msg)
|
78 |
|
79 |
self.publisher.send_json(msg)
|
80 |
|
81 |
|
82 |
def main(): |
83 |
zmqc = zmq.Context() |
84 |
publisher = zmqc.socket(zmq.PUB) |
85 |
publisher.bind(GANETI_ZMQ_PUBLISHER) |
86 |
logging.info("Now publishing on %s" % GANETI_ZMQ_PUBLISHER)
|
87 |
|
88 |
wm = pyinotify.WatchManager() |
89 |
mask = pyinotify.EventsCodes.ALL_FLAGS["IN_CLOSE_WRITE"]
|
90 |
handler = JobFileHandler(publisher) |
91 |
notifier = pyinotify.Notifier(wm, handler) |
92 |
wm.add_watch(constants.QUEUE_DIR, mask) |
93 |
|
94 |
logging.info("Now watching %s" % constants.QUEUE_DIR)
|
95 |
|
96 |
while True: # loop forever |
97 |
try:
|
98 |
# process the queue of events as explained above
|
99 |
notifier.process_events() |
100 |
if notifier.check_events():
|
101 |
# read notified events and enqeue them
|
102 |
notifier.read_events() |
103 |
except KeyboardInterrupt: |
104 |
# destroy the inotify's instance on this interrupt (stop monitoring)
|
105 |
notifier.stop() |
106 |
break
|
107 |
|
108 |
|
109 |
if __name__ == "__main__": |
110 |
logging.basicConfig(level=logging.DEBUG) |
111 |
sys.exit(main()) |
112 |
|
113 |
# vim: set ts=4 sts=4 sw=4 et ai :
|