Statistics
| Branch: | Tag: | Revision:

root / ganeti / snf-progress-monitor.py @ dcb4a587

History | View | Annotate | Download (8.5 kB)

1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
#
4
# Copyright 2011 GRNET S.A. All rights reserved.
5
#
6
# Redistribution and use in source and binary forms, with or
7
# without modification, are permitted provided that the following
8
# conditions are met:
9
#
10
#   1. Redistributions of source code must retain the above
11
#      copyright notice, this list of conditions and the following
12
#      disclaimer.
13
#
14
#   2. Redistributions in binary form must reproduce the above
15
#      copyright notice, this list of conditions and the following
16
#      disclaimer in the documentation and/or other materials
17
#      provided with the distribution.
18
#
19
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30
# POSSIBILITY OF SUCH DAMAGE.
31
#
32
# The views and conclusions contained in the software and
33
# documentation are those of the authors and should not be
34
# interpreted as representing official policies, either expressed
35
# or implied, of GRNET S.A.
36
#
37
import os
38
import sys
39
import time
40
import json
41
import prctl
42
import signal
43
import socket
44

    
45
from amqplib import client_0_8 as amqp
46

    
47
try:
48
    import synnefo.settings as settings
49
except ImportError:
50
    raise Exception("Cannot import settings, make sure PYTHONPATH contains "
51
                    "the parent directory of the Synnefo Django project.")
52

    
53

    
54
class AMQPClient(object):
55
    def __init__(self, routekey):
56
        self.conn = None
57
        self.chan = None
58
        self.routekey = routekey
59
  
60
    def open_channel(self):
61
        if not self.conn:
62
            try:
63
                sys.stderr.write("Attempting to connect to %s\n" %
64
                                 settings.RABBIT_HOST)
65
                self.conn = amqp.Connection(host=settings.RABBIT_HOST,
66
                                            userid=settings.RABBIT_USERNAME,
67
                                            password=settings.RABBIT_PASSWORD,
68
                                            virtual_host=settings.RABBIT_VHOST)
69
            except socket.error:
70
                sys.stderr.write("Connection failed, will retry in 1s\n")
71
                time.sleep(1)
72

    
73
        if self.conn:
74
            sys.stderr.write("Connection succesful, opening channel\n")
75

    
76
        self.chan = self.conn.channel()
77

    
78
    def send_message(self, msg):
79
        sys.stderr.write("Delivering msg with key=%s:\n%s\n" %
80
                         (self.routekey, json.dumps(msg)))
81
        msg = amqp.Message(json.dumps(msg))
82
        msg.properties["delivery_mode"] = 2  # Persistent
83

    
84
        if not self.chan:
85
            self.open_channel()
86
        if not self.chan:
87
            return
88

    
89
        try:
90
            self.chan.basic_publish(msg,
91
                                    exchange=settings.EXCHANGE_GANETI,
92
                                    routing_key=self.routekey)
93
        except socket.error:
94
            sys.stderr.write("Server went away, reconnecting...\n")
95
            self.conn = None
96
            self.chan = None
97

    
98

    
99
def parse_arguments(args):
100
    from optparse import OptionParser
101

    
102
    kw = {}
103
    kw['usage'] = "%prog [options] command [args...]"
104
    kw['description'] = \
105
        "%prog runs 'command' with the specified arguments, monitoring the " \
106
        "number of bytes read and written by it. 'command' is assumed to be " \
107
        "A program used to install the OS for a Ganeti instance. %prog " \
108
        "periodically issues notifications of type 'ganeti-create-progress' " \
109
        "to the rest of the Synnefo infrastructure over AMQP."
110

    
111
    parser = OptionParser(**kw)
112
    parser.disable_interspersed_args()
113
    parser.add_option("-r", "--read-bytes",
114
                      action="store", type="int", dest="read_bytes",
115
                      metavar="BYTES_TO_READ",
116
                      help="The expected number of bytes to be read, " \
117
                           "used to compute input progress",
118
                      default=0)
119
    parser.add_option("-w", "--write-bytes",
120
                      action="store", type="int", dest="write_bytes",
121
                      metavar="BYTES_TO_WRITE",
122
                      help="The expected number of bytes to be written, " \
123
                           "used to compute output progress",
124
                      default=0)
125
    parser.add_option("-i", "--instance-name",
126
                      dest="instance_name",
127
                      metavar="GANETI_INSTANCE",
128
                      help="The Ganeti instance name to be used in AMQP " \
129
                           "notifications")
130

    
131
    (opts, args) = parser.parse_args(args)
132

    
133
    if opts.instance_name is None or (opts.read_bytes == 0 and
134
                                      opts.write_bytes == 0):
135
        sys.stderr.write("Fatal: Options '-i' and at least one of '-r' " \
136
                         "or '-w' are mandatory.\n")
137
        parser.print_help()
138
        sys.exit(1)
139

    
140
    if len(args) == 0:
141
        sys.stderr.write("Fatal: You need to specify the command to run.\n")
142
        parser.print_help()
143
        sys.exit(1)
144

    
145
    return (opts, args)
146

    
147

    
148
def report_wait_status(pid, status):
149
    if os.WIFEXITED(status):
150
        sys.stderr.write("Child PID = %d exited, status = %d\n" %
151
                         (pid, os.WEXITSTATUS(status)))
152
    elif os.WIFSIGNALED(status):
153
        sys.stderr.write("Child PID = %d died by signal, signal = %d\n" %
154
                         (pid, os.WTERMSIG(status)))
155
    elif os.WIFSTOPPED(status):
156
        sys.stderr.write("Child PID = %d stopped by signal, signal = %d\n" %
157
                         (pid, os.WSTOPSIG(status)))
158
    else:
159
        sys.stderr.write("Internal error: Unhandled case, " \
160
                         "PID = %d, status = %d\n" % (pid, status))
161
        sys.exit(1)
162
    sys.stderr.flush()
163

    
164

    
165
def main():
166
    (opts, args) = parse_arguments(sys.argv[1:])
167

    
168
    # WARNING: This assumes that instance names
169
    # are of the form prefix-id, and uses prefix to
170
    # determine the routekey for AMPQ
171
    prefix = opts.instance_name.split('-')[0]
172
    routekey = "ganeti.%s.event.progress" % prefix
173
    amqp = AMQPClient(routekey)
174

    
175
    pid = os.fork()
176
    if pid == 0:
177
        # In child process:
178

    
179
        # Make sure we die we the parent and are not left behind
180
        # WARNING: This uses the prctl(2) call and is Linux-specific.
181
        prctl.set_pdeathsig(signal.SIGHUP)
182
        
183
        # exec command specified in arguments,
184
        # searching the $PATH, keeping all environment
185
        os.execvpe(args[0], args, os.environ)
186
        sys.stderr.write("execvpe failed, exiting with non-zero status")
187
        os.exit(1)
188

    
189
    # In parent process:
190
    iofname = "/proc/%d/io" % pid
191
    iof = open(iofname, "r", 0)   # 0: unbuffered open
192
    sys.stderr.write("%s: created child PID = %d, monitoring file %s\n" %
193
                     (sys.argv[0], pid, iofname))
194

    
195
    while True:
196
        # check if the child process is still alive
197
        (wpid, status) = os.waitpid(pid, os.WNOHANG)
198
        if wpid == pid:
199
            report_wait_status(pid, status)
200
            if (os.WIFEXITED(status) or os.WIFSIGNALED(status)):
201
                if not (os.WIFEXITED(status) and os.WEXITSTATUS(status) == 0):
202
                    return 1
203
                else:
204
                    return 0
205

    
206
        # retrieve the current values of the read/write byte counters
207
        iof.seek(0)
208
        for l in iof.readlines():
209
            if l.startswith("rchar:"):
210
                rchar = int(l.split(': ')[1])
211
            if l.startswith("wchar:"):
212
                wchar = int(l.split(': ')[1])
213

    
214
        # Construct notification of type 'ganeti-create-progress'
215
        msg = dict(type="ganeti-create-progress",
216
                   instance=opts.instance_name)
217
        if opts.read_bytes:
218
            msg['rprogress'] =  float("%2.2f" %
219
                                      (rchar * 100.0 / opts.read_bytes))
220
        if opts.write_bytes:
221
            msg['wprogress'] =  float("%2.2f" %
222
                                      (wchar * 100.0 / opts.write_bytes))
223

    
224
        # and send it over AMQP
225
        amqp.send_message(msg)
226
    
227
        # Sleep for a while
228
        time.sleep(3)
229

    
230

    
231
if __name__ == "__main__":
232
    sys.exit(main())