1 |
1 |
#!/usr/bin/env python
|
2 |
2 |
# -*- coding: utf-8 -*-
|
3 |
3 |
#
|
4 |
|
# Copyright 2011 GRNET S.A. All rights reserved.
|
|
4 |
# Copyright 2011, 2012 GRNET S.A. All rights reserved.
|
5 |
5 |
#
|
6 |
6 |
# Redistribution and use in source and binary forms, with or
|
7 |
7 |
# without modification, are permitted provided that the following
|
... | ... | |
36 |
36 |
#
|
37 |
37 |
"""Utility to monitor the progress of image deployment
|
38 |
38 |
|
39 |
|
A small utility to monitor the progress of image deployment
|
40 |
|
by watching the contents of /proc/<pid>/io and producing
|
41 |
|
notifications of type 'ganeti-create-progress' to the rest
|
42 |
|
of the Synnefo infrastructure over AMQP.
|
43 |
|
|
|
39 |
A small utility that collects various monitoring messages from snf-image and
|
|
40 |
forwards them to the rest of the Synnefo infrastructure over AMQP.
|
44 |
41 |
"""
|
45 |
42 |
|
46 |
43 |
import os
|
47 |
44 |
import sys
|
48 |
45 |
import time
|
49 |
46 |
import json
|
50 |
|
import prctl
|
51 |
|
import signal
|
52 |
|
import socket
|
53 |
47 |
|
54 |
48 |
from synnefo import settings
|
55 |
49 |
from synnefo.lib.amqp import AMQPClient
|
56 |
50 |
from synnefo.lib.utils import split_time
|
57 |
51 |
|
|
52 |
PROGNAME = os.path.basename(sys.argv[0])
|
58 |
53 |
|
59 |
|
def parse_arguments(args):
|
60 |
|
from optparse import OptionParser
|
61 |
|
|
62 |
|
kw = {}
|
63 |
|
kw['usage'] = "%prog [options] command [args...]"
|
64 |
|
kw['description'] = \
|
65 |
|
"%prog runs 'command' with the specified arguments, monitoring the " \
|
66 |
|
"number of bytes read and written by it. 'command' is assumed to be " \
|
67 |
|
"A program used to install the OS for a Ganeti instance. %prog " \
|
68 |
|
"periodically issues notifications of type 'ganeti-create-progress' " \
|
69 |
|
"to the rest of the Synnefo infrastructure over AMQP."
|
70 |
|
|
71 |
|
parser = OptionParser(**kw)
|
72 |
|
parser.disable_interspersed_args()
|
73 |
|
parser.add_option("-r", "--read-bytes",
|
74 |
|
action="store", type="int", dest="read_bytes",
|
75 |
|
metavar="BYTES_TO_READ",
|
76 |
|
help="The expected number of bytes to be read, " \
|
77 |
|
"used to compute input progress",
|
78 |
|
default=0)
|
79 |
|
parser.add_option("-w", "--write-bytes",
|
80 |
|
action="store", type="int", dest="write_bytes",
|
81 |
|
metavar="BYTES_TO_WRITE",
|
82 |
|
help="The expected number of bytes to be written, " \
|
83 |
|
"used to compute output progress",
|
84 |
|
default=0)
|
85 |
|
parser.add_option("-i", "--instance-name",
|
86 |
|
dest="instance_name",
|
87 |
|
metavar="GANETI_INSTANCE",
|
88 |
|
help="The Ganeti instance name to be used in AMQP " \
|
89 |
|
"notifications")
|
90 |
|
|
91 |
|
(opts, args) = parser.parse_args(args)
|
92 |
|
|
93 |
|
if opts.instance_name is None or (opts.read_bytes == 0 and
|
94 |
|
opts.write_bytes == 0):
|
95 |
|
sys.stderr.write("Fatal: Options '-i' and at least one of '-r' " \
|
96 |
|
"or '-w' are mandatory.\n")
|
97 |
|
parser.print_help()
|
98 |
|
sys.exit(1)
|
99 |
|
|
100 |
|
if len(args) == 0:
|
101 |
|
sys.stderr.write("Fatal: You need to specify the command to run.\n")
|
102 |
|
parser.print_help()
|
103 |
|
sys.exit(1)
|
104 |
|
|
105 |
|
return (opts, args)
|
106 |
|
|
107 |
|
|
108 |
|
def report_wait_status(pid, status):
|
109 |
|
if os.WIFEXITED(status):
|
110 |
|
sys.stderr.write("Child PID = %d exited, status = %d\n" %
|
111 |
|
(pid, os.WEXITSTATUS(status)))
|
112 |
|
elif os.WIFSIGNALED(status):
|
113 |
|
sys.stderr.write("Child PID = %d died by signal, signal = %d\n" %
|
114 |
|
(pid, os.WTERMSIG(status)))
|
115 |
|
elif os.WIFSTOPPED(status):
|
116 |
|
sys.stderr.write("Child PID = %d stopped by signal, signal = %d\n" %
|
117 |
|
(pid, os.WSTOPSIG(status)))
|
118 |
|
else:
|
119 |
|
sys.stderr.write("Internal error: Unhandled case, " \
|
120 |
|
"PID = %d, status = %d\n" % (pid, status))
|
121 |
|
sys.exit(1)
|
122 |
|
sys.stderr.flush()
|
123 |
54 |
|
|
55 |
def jsonstream(file):
|
|
56 |
buf = ""
|
|
57 |
decoder = json.JSONDecoder()
|
|
58 |
while True:
|
|
59 |
new_data = os.read(file.fileno(), 512)
|
|
60 |
if not len(new_data):
|
|
61 |
break
|
|
62 |
|
|
63 |
buf += new_data.strip()
|
|
64 |
while 1:
|
|
65 |
try:
|
|
66 |
msg, idx = decoder.raw_decode(buf)
|
|
67 |
except ValueError:
|
|
68 |
break
|
|
69 |
yield msg
|
|
70 |
buf = buf[idx:].strip()
|
124 |
71 |
|
125 |
72 |
def main():
|
126 |
|
(opts, args) = parse_arguments(sys.argv[1:])
|
|
73 |
|
|
74 |
usage = "Usage: %s <instance_name>\n" % PROGNAME
|
|
75 |
|
|
76 |
if len(sys.argv) != 2:
|
|
77 |
sys.stderr.write(usage)
|
|
78 |
return 1
|
|
79 |
|
|
80 |
instance_name = sys.argv[1]
|
127 |
81 |
|
128 |
82 |
# WARNING: This assumes that instance names
|
129 |
83 |
# are of the form prefix-id, and uses prefix to
|
130 |
84 |
# determine the routekey for AMPQ
|
131 |
|
prefix = opts.instance_name.split('-')[0]
|
|
85 |
prefix = instance_name.split('-')[0]
|
132 |
86 |
routekey = "ganeti.%s.event.progress" % prefix
|
133 |
87 |
amqp_client = AMQPClient(hosts=settings.AMQP_HOSTS, confirm_buffer=2)
|
134 |
88 |
amqp_client.connect()
|
135 |
89 |
amqp_client.exchange_declare(settings.EXCHANGE_GANETI, type='topic')
|
136 |
90 |
|
137 |
|
pid = os.fork()
|
138 |
|
if pid == 0:
|
139 |
|
# In child process:
|
140 |
|
|
141 |
|
# Make sure we die with the parent and are not left behind
|
142 |
|
# WARNING: This uses the prctl(2) call and is Linux-specific.
|
143 |
|
prctl.set_pdeathsig(signal.SIGHUP)
|
144 |
|
|
145 |
|
# exec command specified in arguments,
|
146 |
|
# searching the $PATH, keeping all environment
|
147 |
|
os.execvpe(args[0], args, os.environ)
|
148 |
|
sys.stderr.write("execvpe failed, exiting with non-zero status")
|
149 |
|
os.exit(1)
|
150 |
|
|
151 |
|
# In parent process:
|
152 |
|
iofname = "/proc/%d/io" % pid
|
153 |
|
iof = open(iofname, "r", 0) # 0: unbuffered open
|
154 |
|
sys.stderr.write("%s: created child PID = %d, monitoring file %s\n" %
|
155 |
|
(sys.argv[0], pid, iofname))
|
156 |
|
|
157 |
|
while True:
|
158 |
|
# check if the child process is still alive
|
159 |
|
(wpid, status) = os.waitpid(pid, os.WNOHANG)
|
160 |
|
if wpid == pid:
|
161 |
|
report_wait_status(pid, status)
|
162 |
|
if (os.WIFEXITED(status) or os.WIFSIGNALED(status)):
|
163 |
|
if not (os.WIFEXITED(status) and os.WEXITSTATUS(status) == 0):
|
164 |
|
return 1
|
165 |
|
else:
|
166 |
|
# send a final notification
|
167 |
|
final_msg = dict(type="ganeti-create-progress",
|
168 |
|
instance=opts.instance_name)
|
169 |
|
final_msg['event_time'] = split_time(time.time())
|
170 |
|
if opts.read_bytes:
|
171 |
|
final_msg['rprogress'] = float(100)
|
172 |
|
if opts.write_bytes:
|
173 |
|
final_msg['wprogress'] = float(100)
|
174 |
|
amqp_client.basic_publish(exchange=settings.EXCHANGE_GANETI,
|
175 |
|
routing_key=routekey,
|
176 |
|
body=json.dumps(final_msg))
|
177 |
|
return 0
|
178 |
|
|
179 |
|
# retrieve the current values of the read/write byte counters
|
180 |
|
iof.seek(0)
|
181 |
|
for l in iof.readlines():
|
182 |
|
if l.startswith("rchar:"):
|
183 |
|
rchar = int(l.split(': ')[1])
|
184 |
|
if l.startswith("wchar:"):
|
185 |
|
wchar = int(l.split(': ')[1])
|
186 |
|
|
187 |
|
# Construct notification of type 'ganeti-create-progress'
|
188 |
|
msg = dict(type="ganeti-create-progress",
|
189 |
|
instance=opts.instance_name)
|
|
91 |
for msg in jsonstream(sys.stdin):
|
190 |
92 |
msg['event_time'] = split_time(time.time())
|
191 |
|
if opts.read_bytes:
|
192 |
|
msg['rprogress'] = float("%2.2f" %
|
193 |
|
(rchar * 100.0 / opts.read_bytes))
|
194 |
|
if opts.write_bytes:
|
195 |
|
msg['wprogress'] = float("%2.2f" %
|
196 |
|
(wchar * 100.0 / opts.write_bytes))
|
|
93 |
msg['instance'] = instance_name
|
197 |
94 |
|
198 |
95 |
# and send it over AMQP
|
199 |
96 |
amqp_client.basic_publish(exchange=settings.EXCHANGE_GANETI,
|
200 |
97 |
routing_key=routekey,
|
201 |
98 |
body=json.dumps(msg))
|
202 |
99 |
|
203 |
|
# Sleep for a while
|
204 |
|
time.sleep(3)
|
205 |
|
|
206 |
100 |
amqp_client.close()
|
207 |
|
|
|
101 |
return 0
|
208 |
102 |
|
209 |
103 |
if __name__ == "__main__":
|
210 |
104 |
sys.exit(main())
|
|
105 |
|
|
106 |
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :
|