Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-gtools / synnefo / ganeti / progress_monitor.py @ a31e427d

History | View | Annotate | Download (9 kB)

1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
#
4
# Copyright 2011 GRNET S.A. All rights reserved.
5
#
6
# Redistribution and use in source and binary forms, with or
7
# without modification, are permitted provided that the following
8
# conditions are met:
9
#
10
#   1. Redistributions of source code must retain the above
11
#      copyright notice, this list of conditions and the following
12
#      disclaimer.
13
#
14
#   2. Redistributions in binary form must reproduce the above
15
#      copyright notice, this list of conditions and the following
16
#      disclaimer in the documentation and/or other materials
17
#      provided with the distribution.
18
#
19
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30
# POSSIBILITY OF SUCH DAMAGE.
31
#
32
# The views and conclusions contained in the software and
33
# documentation are those of the authors and should not be
34
# interpreted as representing official policies, either expressed
35
# or implied, of GRNET S.A.
36
#
37
"""Utility to monitor the progress of image deployment
38

39
A small utility to monitor the progress of image deployment
40
by watching the contents of /proc/<pid>/io and producing
41
notifications of type 'ganeti-create-progress' to the rest
42
of the Synnefo infrastructure over AMQP.
43

44
"""
45

    
46
import os
47
import sys
48
import time
49
import json
50
import prctl
51
import signal
52
import socket
53

    
54
from amqplib import client_0_8 as amqp
55

    
56
from synnefo import settings
57

    
58

    
59
class AMQPClient(object):
60
    def __init__(self, routekey):
61
        self.conn = None
62
        self.chan = None
63
        self.routekey = routekey
64

    
65
    def open_channel(self):
66
        if not self.conn:
67
            try:
68
                sys.stderr.write("Attempting to connect to %s\n" %
69
                                 settings.RABBIT_HOST)
70
                self.conn = amqp.Connection(host=settings.RABBIT_HOST,
71
                                            userid=settings.RABBIT_USERNAME,
72
                                            password=settings.RABBIT_PASSWORD,
73
                                            virtual_host=settings.RABBIT_VHOST)
74
            except socket.error:
75
                sys.stderr.write("Connection failed, will retry in 1s\n")
76
                time.sleep(1)
77

    
78
        if self.conn:
79
            sys.stderr.write("Connection succesful, opening channel\n")
80
            self.chan = self.conn.channel()
81

    
82
    def send_message(self, msg):
83
        sys.stderr.write("Delivering msg with key=%s:\n%s\n" %
84
                         (self.routekey, json.dumps(msg)))
85
        msg = amqp.Message(json.dumps(msg))
86
        msg.properties["delivery_mode"] = 2  # Persistent
87

    
88
        if not self.chan:
89
            self.open_channel()
90
        if not self.chan:
91
            return
92

    
93
        try:
94
            self.chan.basic_publish(msg,
95
                                    exchange=settings.EXCHANGE_GANETI,
96
                                    routing_key=self.routekey)
97
        except socket.error:
98
            sys.stderr.write("Server went away, reconnecting...\n")
99
            self.conn = None
100
            self.chan = None
101

    
102

    
103
def parse_arguments(args):
104
    from optparse import OptionParser
105

    
106
    kw = {}
107
    kw['usage'] = "%prog [options] command [args...]"
108
    kw['description'] = \
109
        "%prog runs 'command' with the specified arguments, monitoring the " \
110
        "number of bytes read and written by it. 'command' is assumed to be " \
111
        "A program used to install the OS for a Ganeti instance. %prog " \
112
        "periodically issues notifications of type 'ganeti-create-progress' " \
113
        "to the rest of the Synnefo infrastructure over AMQP."
114

    
115
    parser = OptionParser(**kw)
116
    parser.disable_interspersed_args()
117
    parser.add_option("-r", "--read-bytes",
118
                      action="store", type="int", dest="read_bytes",
119
                      metavar="BYTES_TO_READ",
120
                      help="The expected number of bytes to be read, " \
121
                           "used to compute input progress",
122
                      default=0)
123
    parser.add_option("-w", "--write-bytes",
124
                      action="store", type="int", dest="write_bytes",
125
                      metavar="BYTES_TO_WRITE",
126
                      help="The expected number of bytes to be written, " \
127
                           "used to compute output progress",
128
                      default=0)
129
    parser.add_option("-i", "--instance-name",
130
                      dest="instance_name",
131
                      metavar="GANETI_INSTANCE",
132
                      help="The Ganeti instance name to be used in AMQP " \
133
                           "notifications")
134

    
135
    (opts, args) = parser.parse_args(args)
136

    
137
    if opts.instance_name is None or (opts.read_bytes == 0 and
138
                                      opts.write_bytes == 0):
139
        sys.stderr.write("Fatal: Options '-i' and at least one of '-r' " \
140
                         "or '-w' are mandatory.\n")
141
        parser.print_help()
142
        sys.exit(1)
143

    
144
    if len(args) == 0:
145
        sys.stderr.write("Fatal: You need to specify the command to run.\n")
146
        parser.print_help()
147
        sys.exit(1)
148

    
149
    return (opts, args)
150

    
151

    
152
def report_wait_status(pid, status):
153
    if os.WIFEXITED(status):
154
        sys.stderr.write("Child PID = %d exited, status = %d\n" %
155
                         (pid, os.WEXITSTATUS(status)))
156
    elif os.WIFSIGNALED(status):
157
        sys.stderr.write("Child PID = %d died by signal, signal = %d\n" %
158
                         (pid, os.WTERMSIG(status)))
159
    elif os.WIFSTOPPED(status):
160
        sys.stderr.write("Child PID = %d stopped by signal, signal = %d\n" %
161
                         (pid, os.WSTOPSIG(status)))
162
    else:
163
        sys.stderr.write("Internal error: Unhandled case, " \
164
                         "PID = %d, status = %d\n" % (pid, status))
165
        sys.exit(1)
166
    sys.stderr.flush()
167

    
168

    
169
def main():
170
    (opts, args) = parse_arguments(sys.argv[1:])
171

    
172
    # WARNING: This assumes that instance names
173
    # are of the form prefix-id, and uses prefix to
174
    # determine the routekey for AMPQ
175
    prefix = opts.instance_name.split('-')[0]
176
    routekey = "ganeti.%s.event.progress" % prefix
177
    amqp = AMQPClient(routekey)
178

    
179
    pid = os.fork()
180
    if pid == 0:
181
        # In child process:
182

    
183
        # Make sure we die with the parent and are not left behind
184
        # WARNING: This uses the prctl(2) call and is Linux-specific.
185
        prctl.set_pdeathsig(signal.SIGHUP)
186

    
187
        # exec command specified in arguments,
188
        # searching the $PATH, keeping all environment
189
        os.execvpe(args[0], args, os.environ)
190
        sys.stderr.write("execvpe failed, exiting with non-zero status")
191
        os.exit(1)
192

    
193
    # In parent process:
194
    iofname = "/proc/%d/io" % pid
195
    iof = open(iofname, "r", 0)   # 0: unbuffered open
196
    sys.stderr.write("%s: created child PID = %d, monitoring file %s\n" %
197
                     (sys.argv[0], pid, iofname))
198

    
199
    while True:
200
        # check if the child process is still alive
201
        (wpid, status) = os.waitpid(pid, os.WNOHANG)
202
        if wpid == pid:
203
            report_wait_status(pid, status)
204
            if (os.WIFEXITED(status) or os.WIFSIGNALED(status)):
205
                if not (os.WIFEXITED(status) and os.WEXITSTATUS(status) == 0):
206
                    return 1
207
                else:
208
                    # send a final notification
209
                    final_msg = dict(type="ganeti-create-progress",
210
                                     instance=opts.instance_name)
211
                    if opts.read_bytes:
212
                        final_msg['rprogress'] = float(100)
213
                    if opts.write_bytes:
214
                        final_msg['wprogress'] = float(100)
215
                    amqp.send_message(final_msg)
216
                    return 0
217

    
218
        # retrieve the current values of the read/write byte counters
219
        iof.seek(0)
220
        for l in iof.readlines():
221
            if l.startswith("rchar:"):
222
                rchar = int(l.split(': ')[1])
223
            if l.startswith("wchar:"):
224
                wchar = int(l.split(': ')[1])
225

    
226
        # Construct notification of type 'ganeti-create-progress'
227
        msg = dict(type="ganeti-create-progress",
228
                   instance=opts.instance_name)
229
        if opts.read_bytes:
230
            msg['rprogress'] = float("%2.2f" %
231
                                     (rchar * 100.0 / opts.read_bytes))
232
        if opts.write_bytes:
233
            msg['wprogress'] = float("%2.2f" %
234
                                     (wchar * 100.0 / opts.write_bytes))
235

    
236
        # and send it over AMQP
237
        amqp.send_message(msg)
238

    
239
        # Sleep for a while
240
        time.sleep(3)
241

    
242

    
243
if __name__ == "__main__":
244
    sys.exit(main())