Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-gtools / synnefo / ganeti / progress_monitor.py @ 996e5d53

History | View | Annotate | Download (8.1 kB)

1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
#
4
# Copyright 2011 GRNET S.A. All rights reserved.
5
#
6
# Redistribution and use in source and binary forms, with or
7
# without modification, are permitted provided that the following
8
# conditions are met:
9
#
10
#   1. Redistributions of source code must retain the above
11
#      copyright notice, this list of conditions and the following
12
#      disclaimer.
13
#
14
#   2. Redistributions in binary form must reproduce the above
15
#      copyright notice, this list of conditions and the following
16
#      disclaimer in the documentation and/or other materials
17
#      provided with the distribution.
18
#
19
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30
# POSSIBILITY OF SUCH DAMAGE.
31
#
32
# The views and conclusions contained in the software and
33
# documentation are those of the authors and should not be
34
# interpreted as representing official policies, either expressed
35
# or implied, of GRNET S.A.
36
#
37
"""Utility to monitor the progress of image deployment
38

39
A small utility to monitor the progress of image deployment
40
by watching the contents of /proc/<pid>/io and producing
41
notifications of type 'ganeti-create-progress' to the rest
42
of the Synnefo infrastructure over AMQP.
43

44
"""
45

    
46
import os
47
import sys
48
import time
49
import json
50
import prctl
51
import signal
52
import socket
53

    
54
from synnefo import settings
55
from synnefo.lib.amqp import AMQPClient
56
from synnefo.lib.utils import split_time
57

    
58

    
59
def parse_arguments(args):
60
    from optparse import OptionParser
61

    
62
    kw = {}
63
    kw['usage'] = "%prog [options] command [args...]"
64
    kw['description'] = \
65
        "%prog runs 'command' with the specified arguments, monitoring the " \
66
        "number of bytes read and written by it. 'command' is assumed to be " \
67
        "A program used to install the OS for a Ganeti instance. %prog " \
68
        "periodically issues notifications of type 'ganeti-create-progress' " \
69
        "to the rest of the Synnefo infrastructure over AMQP."
70

    
71
    parser = OptionParser(**kw)
72
    parser.disable_interspersed_args()
73
    parser.add_option("-r", "--read-bytes",
74
                      action="store", type="int", dest="read_bytes",
75
                      metavar="BYTES_TO_READ",
76
                      help="The expected number of bytes to be read, " \
77
                           "used to compute input progress",
78
                      default=0)
79
    parser.add_option("-w", "--write-bytes",
80
                      action="store", type="int", dest="write_bytes",
81
                      metavar="BYTES_TO_WRITE",
82
                      help="The expected number of bytes to be written, " \
83
                           "used to compute output progress",
84
                      default=0)
85
    parser.add_option("-i", "--instance-name",
86
                      dest="instance_name",
87
                      metavar="GANETI_INSTANCE",
88
                      help="The Ganeti instance name to be used in AMQP " \
89
                           "notifications")
90

    
91
    (opts, args) = parser.parse_args(args)
92

    
93
    if opts.instance_name is None or (opts.read_bytes == 0 and
94
                                      opts.write_bytes == 0):
95
        sys.stderr.write("Fatal: Options '-i' and at least one of '-r' " \
96
                         "or '-w' are mandatory.\n")
97
        parser.print_help()
98
        sys.exit(1)
99

    
100
    if len(args) == 0:
101
        sys.stderr.write("Fatal: You need to specify the command to run.\n")
102
        parser.print_help()
103
        sys.exit(1)
104

    
105
    return (opts, args)
106

    
107

    
108
def report_wait_status(pid, status):
109
    if os.WIFEXITED(status):
110
        sys.stderr.write("Child PID = %d exited, status = %d\n" %
111
                         (pid, os.WEXITSTATUS(status)))
112
    elif os.WIFSIGNALED(status):
113
        sys.stderr.write("Child PID = %d died by signal, signal = %d\n" %
114
                         (pid, os.WTERMSIG(status)))
115
    elif os.WIFSTOPPED(status):
116
        sys.stderr.write("Child PID = %d stopped by signal, signal = %d\n" %
117
                         (pid, os.WSTOPSIG(status)))
118
    else:
119
        sys.stderr.write("Internal error: Unhandled case, " \
120
                         "PID = %d, status = %d\n" % (pid, status))
121
        sys.exit(1)
122
    sys.stderr.flush()
123

    
124

    
125
def main():
126
    (opts, args) = parse_arguments(sys.argv[1:])
127

    
128
    # WARNING: This assumes that instance names
129
    # are of the form prefix-id, and uses prefix to
130
    # determine the routekey for AMPQ
131
    prefix = opts.instance_name.split('-')[0]
132
    routekey = "ganeti.%s.event.progress" % prefix
133
    amqp_client = AMQPClient(hosts=settings.AMQP_HOSTS, confirm_buffer=2)
134
    amqp_client.connect()
135
    amqp_client.exchange_declare(settings.EXCHANGE_GANETI, type='topic')
136

    
137
    pid = os.fork()
138
    if pid == 0:
139
        # In child process:
140

    
141
        # Make sure we die with the parent and are not left behind
142
        # WARNING: This uses the prctl(2) call and is Linux-specific.
143
        prctl.set_pdeathsig(signal.SIGHUP)
144

    
145
        # exec command specified in arguments,
146
        # searching the $PATH, keeping all environment
147
        os.execvpe(args[0], args, os.environ)
148
        sys.stderr.write("execvpe failed, exiting with non-zero status")
149
        os.exit(1)
150

    
151
    # In parent process:
152
    iofname = "/proc/%d/io" % pid
153
    iof = open(iofname, "r", 0)   # 0: unbuffered open
154
    sys.stderr.write("%s: created child PID = %d, monitoring file %s\n" %
155
                     (sys.argv[0], pid, iofname))
156

    
157
    while True:
158
        # check if the child process is still alive
159
        (wpid, status) = os.waitpid(pid, os.WNOHANG)
160
        if wpid == pid:
161
            report_wait_status(pid, status)
162
            if (os.WIFEXITED(status) or os.WIFSIGNALED(status)):
163
                if not (os.WIFEXITED(status) and os.WEXITSTATUS(status) == 0):
164
                    return 1
165
                else:
166
                    # send a final notification
167
                    final_msg = dict(type="ganeti-create-progress",
168
                                     instance=opts.instance_name)
169
                    final_msg['event_time'] = split_time(time.time())
170
                    if opts.read_bytes:
171
                        final_msg['rprogress'] = float(100)
172
                    if opts.write_bytes:
173
                        final_msg['wprogress'] = float(100)
174
                    amqp_client.basic_publish(exchange=settings.EXCHANGE_GANETI,
175
                                              routing_key=routekey,
176
                                              body=json.dumps(final_msg))
177
                    return 0
178

    
179
        # retrieve the current values of the read/write byte counters
180
        iof.seek(0)
181
        for l in iof.readlines():
182
            if l.startswith("rchar:"):
183
                rchar = int(l.split(': ')[1])
184
            if l.startswith("wchar:"):
185
                wchar = int(l.split(': ')[1])
186

    
187
        # Construct notification of type 'ganeti-create-progress'
188
        msg = dict(type="ganeti-create-progress",
189
                   instance=opts.instance_name)
190
        msg['event_time'] = split_time(time.time())
191
        if opts.read_bytes:
192
            msg['rprogress'] = float("%2.2f" %
193
                                     (rchar * 100.0 / opts.read_bytes))
194
        if opts.write_bytes:
195
            msg['wprogress'] = float("%2.2f" %
196
                                     (wchar * 100.0 / opts.write_bytes))
197

    
198
        # and send it over AMQP
199
        amqp_client.basic_publish(exchange=settings.EXCHANGE_GANETI,
200
                                  routing_key=routekey,
201
                                  body=json.dumps(msg))
202

    
203
        # Sleep for a while
204
        time.sleep(3)
205

    
206
    amqp_client.close()
207

    
208

    
209
if __name__ == "__main__":
210
    sys.exit(main())