Revision 0827883e snf-cyclades-gtools/synnefo/ganeti/progress_monitor.py

b/snf-cyclades-gtools/synnefo/ganeti/progress_monitor.py
1 1
#!/usr/bin/env python
2 2
# -*- coding: utf-8 -*-
3 3
#
4
# Copyright 2011 GRNET S.A. All rights reserved.
4
# Copyright 2011, 2012 GRNET S.A. All rights reserved.
5 5
#
6 6
# Redistribution and use in source and binary forms, with or
7 7
# without modification, are permitted provided that the following
......
36 36
#
37 37
"""Utility to monitor the progress of image deployment
38 38

  
39
A small utility to monitor the progress of image deployment
40
by watching the contents of /proc/<pid>/io and producing
41
notifications of type 'ganeti-create-progress' to the rest
42
of the Synnefo infrastructure over AMQP.
43

  
39
A small utility that collects various monitoring messages from snf-image and
40
forwards them to the rest of the Synnefo infrastructure over AMQP.
44 41
"""
45 42

  
46 43
import os
47 44
import sys
48 45
import time
49 46
import json
50
import prctl
51
import signal
52
import socket
53 47

  
54 48
from synnefo import settings
55 49
from synnefo.lib.amqp import AMQPClient
56 50
from synnefo.lib.utils import split_time
57 51

  
52
PROGNAME = os.path.basename(sys.argv[0])
58 53

  
59
def parse_arguments(args):
60
    from optparse import OptionParser
61

  
62
    kw = {}
63
    kw['usage'] = "%prog [options] command [args...]"
64
    kw['description'] = \
65
        "%prog runs 'command' with the specified arguments, monitoring the " \
66
        "number of bytes read and written by it. 'command' is assumed to be " \
67
        "A program used to install the OS for a Ganeti instance. %prog " \
68
        "periodically issues notifications of type 'ganeti-create-progress' " \
69
        "to the rest of the Synnefo infrastructure over AMQP."
70

  
71
    parser = OptionParser(**kw)
72
    parser.disable_interspersed_args()
73
    parser.add_option("-r", "--read-bytes",
74
                      action="store", type="int", dest="read_bytes",
75
                      metavar="BYTES_TO_READ",
76
                      help="The expected number of bytes to be read, " \
77
                           "used to compute input progress",
78
                      default=0)
79
    parser.add_option("-w", "--write-bytes",
80
                      action="store", type="int", dest="write_bytes",
81
                      metavar="BYTES_TO_WRITE",
82
                      help="The expected number of bytes to be written, " \
83
                           "used to compute output progress",
84
                      default=0)
85
    parser.add_option("-i", "--instance-name",
86
                      dest="instance_name",
87
                      metavar="GANETI_INSTANCE",
88
                      help="The Ganeti instance name to be used in AMQP " \
89
                           "notifications")
90

  
91
    (opts, args) = parser.parse_args(args)
92

  
93
    if opts.instance_name is None or (opts.read_bytes == 0 and
94
                                      opts.write_bytes == 0):
95
        sys.stderr.write("Fatal: Options '-i' and at least one of '-r' " \
96
                         "or '-w' are mandatory.\n")
97
        parser.print_help()
98
        sys.exit(1)
99

  
100
    if len(args) == 0:
101
        sys.stderr.write("Fatal: You need to specify the command to run.\n")
102
        parser.print_help()
103
        sys.exit(1)
104

  
105
    return (opts, args)
106

  
107

  
108
def report_wait_status(pid, status):
109
    if os.WIFEXITED(status):
110
        sys.stderr.write("Child PID = %d exited, status = %d\n" %
111
                         (pid, os.WEXITSTATUS(status)))
112
    elif os.WIFSIGNALED(status):
113
        sys.stderr.write("Child PID = %d died by signal, signal = %d\n" %
114
                         (pid, os.WTERMSIG(status)))
115
    elif os.WIFSTOPPED(status):
116
        sys.stderr.write("Child PID = %d stopped by signal, signal = %d\n" %
117
                         (pid, os.WSTOPSIG(status)))
118
    else:
119
        sys.stderr.write("Internal error: Unhandled case, " \
120
                         "PID = %d, status = %d\n" % (pid, status))
121
        sys.exit(1)
122
    sys.stderr.flush()
123 54

  
55
def jsonstream(file):
56
    buf = ""
57
    decoder = json.JSONDecoder()
58
    while True:
59
        new_data = os.read(file.fileno(), 512)
60
        if not len(new_data):
61
            break
62

  
63
        buf += new_data.strip()
64
        while 1:
65
            try:
66
                msg, idx = decoder.raw_decode(buf)
67
            except ValueError:
68
                break
69
            yield msg
70
            buf = buf[idx:].strip()
124 71

  
125 72
def main():
126
    (opts, args) = parse_arguments(sys.argv[1:])
73

  
74
    usage = "Usage: %s <instance_name>\n" % PROGNAME
75

  
76
    if len(sys.argv) != 2:
77
        sys.stderr.write(usage)
78
        return 1
79

  
80
    instance_name = sys.argv[1]
127 81

  
128 82
    # WARNING: This assumes that instance names
129 83
    # are of the form prefix-id, and uses prefix to
130 84
    # determine the routekey for AMPQ
131
    prefix = opts.instance_name.split('-')[0]
85
    prefix = instance_name.split('-')[0]
132 86
    routekey = "ganeti.%s.event.progress" % prefix
133 87
    amqp_client = AMQPClient(hosts=settings.AMQP_HOSTS, confirm_buffer=2)
134 88
    amqp_client.connect()
135 89
    amqp_client.exchange_declare(settings.EXCHANGE_GANETI, type='topic')
136 90

  
137
    pid = os.fork()
138
    if pid == 0:
139
        # In child process:
140

  
141
        # Make sure we die with the parent and are not left behind
142
        # WARNING: This uses the prctl(2) call and is Linux-specific.
143
        prctl.set_pdeathsig(signal.SIGHUP)
144

  
145
        # exec command specified in arguments,
146
        # searching the $PATH, keeping all environment
147
        os.execvpe(args[0], args, os.environ)
148
        sys.stderr.write("execvpe failed, exiting with non-zero status")
149
        os.exit(1)
150

  
151
    # In parent process:
152
    iofname = "/proc/%d/io" % pid
153
    iof = open(iofname, "r", 0)   # 0: unbuffered open
154
    sys.stderr.write("%s: created child PID = %d, monitoring file %s\n" %
155
                     (sys.argv[0], pid, iofname))
156

  
157
    while True:
158
        # check if the child process is still alive
159
        (wpid, status) = os.waitpid(pid, os.WNOHANG)
160
        if wpid == pid:
161
            report_wait_status(pid, status)
162
            if (os.WIFEXITED(status) or os.WIFSIGNALED(status)):
163
                if not (os.WIFEXITED(status) and os.WEXITSTATUS(status) == 0):
164
                    return 1
165
                else:
166
                    # send a final notification
167
                    final_msg = dict(type="ganeti-create-progress",
168
                                     instance=opts.instance_name)
169
                    final_msg['event_time'] = split_time(time.time())
170
                    if opts.read_bytes:
171
                        final_msg['rprogress'] = float(100)
172
                    if opts.write_bytes:
173
                        final_msg['wprogress'] = float(100)
174
                    amqp_client.basic_publish(exchange=settings.EXCHANGE_GANETI,
175
                                              routing_key=routekey,
176
                                              body=json.dumps(final_msg))
177
                    return 0
178

  
179
        # retrieve the current values of the read/write byte counters
180
        iof.seek(0)
181
        for l in iof.readlines():
182
            if l.startswith("rchar:"):
183
                rchar = int(l.split(': ')[1])
184
            if l.startswith("wchar:"):
185
                wchar = int(l.split(': ')[1])
186

  
187
        # Construct notification of type 'ganeti-create-progress'
188
        msg = dict(type="ganeti-create-progress",
189
                   instance=opts.instance_name)
91
    for msg in jsonstream(sys.stdin):
190 92
        msg['event_time'] = split_time(time.time())
191
        if opts.read_bytes:
192
            msg['rprogress'] = float("%2.2f" %
193
                                     (rchar * 100.0 / opts.read_bytes))
194
        if opts.write_bytes:
195
            msg['wprogress'] = float("%2.2f" %
196
                                     (wchar * 100.0 / opts.write_bytes))
93
        msg['instance'] = instance_name
197 94

  
198 95
        # and send it over AMQP
199 96
        amqp_client.basic_publish(exchange=settings.EXCHANGE_GANETI,
200 97
                                  routing_key=routekey,
201 98
                                  body=json.dumps(msg))
202 99

  
203
        # Sleep for a while
204
        time.sleep(3)
205

  
206 100
    amqp_client.close()
207

  
101
    return 0
208 102

  
209 103
if __name__ == "__main__":
210 104
    sys.exit(main())
105

  
106
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :

Also available in: Unified diff