Revision 0827883e snf-cyclades-gtools/synnefo/ganeti/progress_monitor.py
b/snf-cyclades-gtools/synnefo/ganeti/progress_monitor.py | ||
---|---|---|
1 | 1 |
#!/usr/bin/env python |
2 | 2 |
# -*- coding: utf-8 -*- |
3 | 3 |
# |
4 |
# Copyright 2011 GRNET S.A. All rights reserved. |
|
4 |
# Copyright 2011, 2012 GRNET S.A. All rights reserved.
|
|
5 | 5 |
# |
6 | 6 |
# Redistribution and use in source and binary forms, with or |
7 | 7 |
# without modification, are permitted provided that the following |
... | ... | |
36 | 36 |
# |
37 | 37 |
"""Utility to monitor the progress of image deployment |
38 | 38 |
|
39 |
A small utility to monitor the progress of image deployment |
|
40 |
by watching the contents of /proc/<pid>/io and producing |
|
41 |
notifications of type 'ganeti-create-progress' to the rest |
|
42 |
of the Synnefo infrastructure over AMQP. |
|
43 |
|
|
39 |
A small utility that collects various monitoring messages from snf-image and |
|
40 |
forwards them to the rest of the Synnefo infrastructure over AMQP. |
|
44 | 41 |
""" |
45 | 42 |
|
46 | 43 |
import os |
47 | 44 |
import sys |
48 | 45 |
import time |
49 | 46 |
import json |
50 |
import prctl |
|
51 |
import signal |
|
52 |
import socket |
|
53 | 47 |
|
54 | 48 |
from synnefo import settings |
55 | 49 |
from synnefo.lib.amqp import AMQPClient |
56 | 50 |
from synnefo.lib.utils import split_time |
57 | 51 |
|
52 |
PROGNAME = os.path.basename(sys.argv[0]) |
|
58 | 53 |
|
59 |
def parse_arguments(args): |
|
60 |
from optparse import OptionParser |
|
61 |
|
|
62 |
kw = {} |
|
63 |
kw['usage'] = "%prog [options] command [args...]" |
|
64 |
kw['description'] = \ |
|
65 |
"%prog runs 'command' with the specified arguments, monitoring the " \ |
|
66 |
"number of bytes read and written by it. 'command' is assumed to be " \ |
|
67 |
"A program used to install the OS for a Ganeti instance. %prog " \ |
|
68 |
"periodically issues notifications of type 'ganeti-create-progress' " \ |
|
69 |
"to the rest of the Synnefo infrastructure over AMQP." |
|
70 |
|
|
71 |
parser = OptionParser(**kw) |
|
72 |
parser.disable_interspersed_args() |
|
73 |
parser.add_option("-r", "--read-bytes", |
|
74 |
action="store", type="int", dest="read_bytes", |
|
75 |
metavar="BYTES_TO_READ", |
|
76 |
help="The expected number of bytes to be read, " \ |
|
77 |
"used to compute input progress", |
|
78 |
default=0) |
|
79 |
parser.add_option("-w", "--write-bytes", |
|
80 |
action="store", type="int", dest="write_bytes", |
|
81 |
metavar="BYTES_TO_WRITE", |
|
82 |
help="The expected number of bytes to be written, " \ |
|
83 |
"used to compute output progress", |
|
84 |
default=0) |
|
85 |
parser.add_option("-i", "--instance-name", |
|
86 |
dest="instance_name", |
|
87 |
metavar="GANETI_INSTANCE", |
|
88 |
help="The Ganeti instance name to be used in AMQP " \ |
|
89 |
"notifications") |
|
90 |
|
|
91 |
(opts, args) = parser.parse_args(args) |
|
92 |
|
|
93 |
if opts.instance_name is None or (opts.read_bytes == 0 and |
|
94 |
opts.write_bytes == 0): |
|
95 |
sys.stderr.write("Fatal: Options '-i' and at least one of '-r' " \ |
|
96 |
"or '-w' are mandatory.\n") |
|
97 |
parser.print_help() |
|
98 |
sys.exit(1) |
|
99 |
|
|
100 |
if len(args) == 0: |
|
101 |
sys.stderr.write("Fatal: You need to specify the command to run.\n") |
|
102 |
parser.print_help() |
|
103 |
sys.exit(1) |
|
104 |
|
|
105 |
return (opts, args) |
|
106 |
|
|
107 |
|
|
108 |
def report_wait_status(pid, status): |
|
109 |
if os.WIFEXITED(status): |
|
110 |
sys.stderr.write("Child PID = %d exited, status = %d\n" % |
|
111 |
(pid, os.WEXITSTATUS(status))) |
|
112 |
elif os.WIFSIGNALED(status): |
|
113 |
sys.stderr.write("Child PID = %d died by signal, signal = %d\n" % |
|
114 |
(pid, os.WTERMSIG(status))) |
|
115 |
elif os.WIFSTOPPED(status): |
|
116 |
sys.stderr.write("Child PID = %d stopped by signal, signal = %d\n" % |
|
117 |
(pid, os.WSTOPSIG(status))) |
|
118 |
else: |
|
119 |
sys.stderr.write("Internal error: Unhandled case, " \ |
|
120 |
"PID = %d, status = %d\n" % (pid, status)) |
|
121 |
sys.exit(1) |
|
122 |
sys.stderr.flush() |
|
123 | 54 |
|
55 |
def jsonstream(file): |
|
56 |
buf = "" |
|
57 |
decoder = json.JSONDecoder() |
|
58 |
while True: |
|
59 |
new_data = os.read(file.fileno(), 512) |
|
60 |
if not len(new_data): |
|
61 |
break |
|
62 |
|
|
63 |
buf += new_data.strip() |
|
64 |
while 1: |
|
65 |
try: |
|
66 |
msg, idx = decoder.raw_decode(buf) |
|
67 |
except ValueError: |
|
68 |
break |
|
69 |
yield msg |
|
70 |
buf = buf[idx:].strip() |
|
124 | 71 |
|
125 | 72 |
def main(): |
126 |
(opts, args) = parse_arguments(sys.argv[1:]) |
|
73 |
|
|
74 |
usage = "Usage: %s <instance_name>\n" % PROGNAME |
|
75 |
|
|
76 |
if len(sys.argv) != 2: |
|
77 |
sys.stderr.write(usage) |
|
78 |
return 1 |
|
79 |
|
|
80 |
instance_name = sys.argv[1] |
|
127 | 81 |
|
128 | 82 |
# WARNING: This assumes that instance names |
129 | 83 |
# are of the form prefix-id, and uses prefix to |
130 | 84 |
# determine the routekey for AMPQ |
131 |
prefix = opts.instance_name.split('-')[0]
|
|
85 |
prefix = instance_name.split('-')[0] |
|
132 | 86 |
routekey = "ganeti.%s.event.progress" % prefix |
133 | 87 |
amqp_client = AMQPClient(hosts=settings.AMQP_HOSTS, confirm_buffer=2) |
134 | 88 |
amqp_client.connect() |
135 | 89 |
amqp_client.exchange_declare(settings.EXCHANGE_GANETI, type='topic') |
136 | 90 |
|
137 |
pid = os.fork() |
|
138 |
if pid == 0: |
|
139 |
# In child process: |
|
140 |
|
|
141 |
# Make sure we die with the parent and are not left behind |
|
142 |
# WARNING: This uses the prctl(2) call and is Linux-specific. |
|
143 |
prctl.set_pdeathsig(signal.SIGHUP) |
|
144 |
|
|
145 |
# exec command specified in arguments, |
|
146 |
# searching the $PATH, keeping all environment |
|
147 |
os.execvpe(args[0], args, os.environ) |
|
148 |
sys.stderr.write("execvpe failed, exiting with non-zero status") |
|
149 |
os.exit(1) |
|
150 |
|
|
151 |
# In parent process: |
|
152 |
iofname = "/proc/%d/io" % pid |
|
153 |
iof = open(iofname, "r", 0) # 0: unbuffered open |
|
154 |
sys.stderr.write("%s: created child PID = %d, monitoring file %s\n" % |
|
155 |
(sys.argv[0], pid, iofname)) |
|
156 |
|
|
157 |
while True: |
|
158 |
# check if the child process is still alive |
|
159 |
(wpid, status) = os.waitpid(pid, os.WNOHANG) |
|
160 |
if wpid == pid: |
|
161 |
report_wait_status(pid, status) |
|
162 |
if (os.WIFEXITED(status) or os.WIFSIGNALED(status)): |
|
163 |
if not (os.WIFEXITED(status) and os.WEXITSTATUS(status) == 0): |
|
164 |
return 1 |
|
165 |
else: |
|
166 |
# send a final notification |
|
167 |
final_msg = dict(type="ganeti-create-progress", |
|
168 |
instance=opts.instance_name) |
|
169 |
final_msg['event_time'] = split_time(time.time()) |
|
170 |
if opts.read_bytes: |
|
171 |
final_msg['rprogress'] = float(100) |
|
172 |
if opts.write_bytes: |
|
173 |
final_msg['wprogress'] = float(100) |
|
174 |
amqp_client.basic_publish(exchange=settings.EXCHANGE_GANETI, |
|
175 |
routing_key=routekey, |
|
176 |
body=json.dumps(final_msg)) |
|
177 |
return 0 |
|
178 |
|
|
179 |
# retrieve the current values of the read/write byte counters |
|
180 |
iof.seek(0) |
|
181 |
for l in iof.readlines(): |
|
182 |
if l.startswith("rchar:"): |
|
183 |
rchar = int(l.split(': ')[1]) |
|
184 |
if l.startswith("wchar:"): |
|
185 |
wchar = int(l.split(': ')[1]) |
|
186 |
|
|
187 |
# Construct notification of type 'ganeti-create-progress' |
|
188 |
msg = dict(type="ganeti-create-progress", |
|
189 |
instance=opts.instance_name) |
|
91 |
for msg in jsonstream(sys.stdin): |
|
190 | 92 |
msg['event_time'] = split_time(time.time()) |
191 |
if opts.read_bytes: |
|
192 |
msg['rprogress'] = float("%2.2f" % |
|
193 |
(rchar * 100.0 / opts.read_bytes)) |
|
194 |
if opts.write_bytes: |
|
195 |
msg['wprogress'] = float("%2.2f" % |
|
196 |
(wchar * 100.0 / opts.write_bytes)) |
|
93 |
msg['instance'] = instance_name |
|
197 | 94 |
|
198 | 95 |
# and send it over AMQP |
199 | 96 |
amqp_client.basic_publish(exchange=settings.EXCHANGE_GANETI, |
200 | 97 |
routing_key=routekey, |
201 | 98 |
body=json.dumps(msg)) |
202 | 99 |
|
203 |
# Sleep for a while |
|
204 |
time.sleep(3) |
|
205 |
|
|
206 | 100 |
amqp_client.close() |
207 |
|
|
101 |
return 0 |
|
208 | 102 |
|
209 | 103 |
if __name__ == "__main__": |
210 | 104 |
sys.exit(main()) |
105 |
|
|
106 |
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai : |
Also available in: Unified diff