Implement gnt-cluster master-ping
[ganeti-local] / scripts / gnt-debug
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21 """Debugging commands"""
22
23 # pylint: disable-msg=W0401,W0614,C0103
24 # W0401: Wildcard import ganeti.cli
25 # W0614: Unused import %s from wildcard import (since we need cli)
26 # C0103: Invalid name gnt-backup
27
28 import sys
29 import simplejson
30 import time
31 import socket
32 import logging
33
34 from ganeti.cli import *
35 from ganeti import cli
36 from ganeti import constants
37 from ganeti import opcodes
38 from ganeti import utils
39 from ganeti import errors
40
41
42 def Delay(opts, args):
43   """Sleeps for a while
44
45   @param opts: the command line options selected by the user
46   @type args: list
47   @param args: should contain only one element, the duration
48       the sleep
49   @rtype: int
50   @return: the desired exit code
51
52   """
53   delay = float(args[0])
54   op = opcodes.OpTestDelay(duration=delay,
55                            on_master=opts.on_master,
56                            on_nodes=opts.on_nodes,
57                            repeat=opts.repeat)
58   SubmitOpCode(op, opts=opts)
59
60   return 0
61
62
63 def GenericOpCodes(opts, args):
64   """Send any opcode to the master.
65
66   @param opts: the command line options selected by the user
67   @type args: list
68   @param args: should contain only one element, the path of
69       the file with the opcode definition
70   @rtype: int
71   @return: the desired exit code
72
73   """
74   cl = cli.GetClient()
75   jex = cli.JobExecutor(cl=cl, verbose=opts.verbose, opts=opts)
76
77   job_cnt = 0
78   op_cnt = 0
79   if opts.timing_stats:
80     ToStdout("Loading...")
81   for job_idx in range(opts.rep_job):
82     for fname in args:
83       # pylint: disable-msg=W0142
84       op_data = simplejson.loads(utils.ReadFile(fname))
85       op_list = [opcodes.OpCode.LoadOpCode(val) for val in op_data]
86       op_list = op_list * opts.rep_op
87       jex.QueueJob("file %s/%d" % (fname, job_idx), *op_list)
88       op_cnt += len(op_list)
89       job_cnt += 1
90
91   if opts.timing_stats:
92     t1 = time.time()
93     ToStdout("Submitting...")
94
95   jex.SubmitPending(each=opts.each)
96
97   if opts.timing_stats:
98     t2 = time.time()
99     ToStdout("Executing...")
100
101   jex.GetResults()
102   if opts.timing_stats:
103     t3 = time.time()
104     ToStdout("C:op     %4d" % op_cnt)
105     ToStdout("C:job    %4d" % job_cnt)
106     ToStdout("T:submit %4.4f" % (t2-t1))
107     ToStdout("T:exec   %4.4f" % (t3-t2))
108     ToStdout("T:total  %4.4f" % (t3-t1))
109   return 0
110
111
112 def TestAllocator(opts, args):
113   """Runs the test allocator opcode.
114
115   @param opts: the command line options selected by the user
116   @type args: list
117   @param args: should contain only one element, the iallocator name
118   @rtype: int
119   @return: the desired exit code
120
121   """
122   try:
123     disks = [{"size": utils.ParseUnit(val), "mode": 'w'}
124              for val in opts.disks.split(",")]
125   except errors.UnitParseError, err:
126     ToStderr("Invalid disks parameter '%s': %s", opts.disks, err)
127     return 1
128
129   nics = [val.split("/") for val in opts.nics.split(",")]
130   for row in nics:
131     while len(row) < 3:
132       row.append(None)
133     for i in range(3):
134       if row[i] == '':
135         row[i] = None
136   nic_dict = [{"mac": v[0], "ip": v[1], "bridge": v[2]} for v in nics]
137
138   if opts.tags is None:
139     opts.tags = []
140   else:
141     opts.tags = opts.tags.split(",")
142
143   op = opcodes.OpTestAllocator(mode=opts.mode,
144                                name=args[0],
145                                evac_nodes=args,
146                                mem_size=opts.mem,
147                                disks=disks,
148                                disk_template=opts.disk_template,
149                                nics=nic_dict,
150                                os=opts.os,
151                                vcpus=opts.vcpus,
152                                tags=opts.tags,
153                                direction=opts.direction,
154                                allocator=opts.iallocator,
155                                )
156   result = SubmitOpCode(op, opts=opts)
157   ToStdout("%s" % result)
158   return 0
159
160
161 class _JobQueueTestReporter(cli.StdioJobPollReportCb):
162   def __init__(self):
163     """Initializes this class.
164
165     """
166     cli.StdioJobPollReportCb.__init__(self)
167     self._testmsgs = []
168     self._job_id = None
169
170   def GetTestMessages(self):
171     """Returns all test log messages received so far.
172
173     """
174     return self._testmsgs
175
176   def GetJobId(self):
177     """Returns the job ID.
178
179     """
180     return self._job_id
181
182   def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
183     """Handles a log message.
184
185     """
186     if self._job_id is None:
187       self._job_id = job_id
188     elif self._job_id != job_id:
189       raise errors.ProgrammerError("The same reporter instance was used for"
190                                    " more than one job")
191
192     if log_type == constants.ELOG_JQUEUE_TEST:
193       (sockname, test, arg) = log_msg
194       return self._ProcessTestMessage(job_id, sockname, test, arg)
195
196     elif (log_type == constants.ELOG_MESSAGE and
197           log_msg.startswith(constants.JQT_MSGPREFIX)):
198       self._testmsgs.append(log_msg[len(constants.JQT_MSGPREFIX):])
199       return
200
201     return cli.StdioJobPollReportCb.ReportLogMessage(self, job_id, serial,
202                                                      timestamp, log_type,
203                                                      log_msg)
204
205   def _ProcessTestMessage(self, job_id, sockname, test, arg):
206     """Handles a job queue test message.
207
208     """
209     if test not in constants.JQT_ALL:
210       raise errors.OpExecError("Received invalid test message %s" % test)
211
212     sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
213     try:
214       sock.settimeout(30.0)
215
216       logging.debug("Connecting to %s", sockname)
217       sock.connect(sockname)
218
219       logging.debug("Checking status")
220       jobdetails = cli.GetClient().QueryJobs([job_id], ["status"])[0]
221       if not jobdetails:
222         raise errors.OpExecError("Can't find job %s" % job_id)
223
224       status = jobdetails[0]
225
226       logging.debug("Status of job %s is %s", job_id, status)
227
228       if test == constants.JQT_EXPANDNAMES:
229         if status != constants.JOB_STATUS_WAITLOCK:
230           raise errors.OpExecError("Job status while expanding names is '%s',"
231                                    " not '%s' as expected" %
232                                    (status, constants.JOB_STATUS_WAITLOCK))
233       elif test in (constants.JQT_EXEC, constants.JQT_LOGMSG):
234         if status != constants.JOB_STATUS_RUNNING:
235           raise errors.OpExecError("Job status while executing opcode is '%s',"
236                                    " not '%s' as expected" %
237                                    (status, constants.JOB_STATUS_RUNNING))
238
239       if test == constants.JQT_LOGMSG:
240         if len(self._testmsgs) != arg:
241           raise errors.OpExecError("Received %s test messages when %s are"
242                                    " expected" % (len(self._testmsgs), arg))
243     finally:
244       logging.debug("Closing socket")
245       sock.close()
246
247
248 def TestJobqueue(opts, _):
249   """Runs a few tests on the job queue.
250
251   """
252   test_messages = [
253     "Hello World",
254     "A",
255     "",
256     "B"
257     "Foo|bar|baz",
258     utils.TimestampForFilename(),
259     ]
260
261   for fail in [False, True]:
262     if fail:
263       ToStdout("Testing job failure")
264     else:
265       ToStdout("Testing job success")
266
267     op = opcodes.OpTestJobqueue(notify_waitlock=True,
268                                 notify_exec=True,
269                                 log_messages=test_messages,
270                                 fail=fail)
271
272     reporter = _JobQueueTestReporter()
273     try:
274       SubmitOpCode(op, reporter=reporter, opts=opts)
275     except errors.OpExecError:
276       if not fail:
277         raise
278       # Ignore error
279     else:
280       if fail:
281         raise errors.OpExecError("Job didn't fail when it should")
282
283     # Check received log messages
284     if reporter.GetTestMessages() != test_messages:
285       raise errors.OpExecError("Received test messages don't match input"
286                                " (input %r, received %r)" %
287                                (test_messages, reporter.GetTestMessages()))
288
289     # Check final status
290     job_id = reporter.GetJobId()
291
292     jobdetails = cli.GetClient().QueryJobs([job_id], ["status"])[0]
293     if not jobdetails:
294       raise errors.OpExecError("Can't find job %s" % job_id)
295
296     if fail:
297       exp_status = constants.JOB_STATUS_ERROR
298     else:
299       exp_status = constants.JOB_STATUS_SUCCESS
300
301     final_status = jobdetails[0]
302     if final_status != exp_status:
303       raise errors.OpExecError("Final job status is %s, not %s as expected" %
304                                (final_status, exp_status))
305
306   return 0
307
308
309 commands = {
310   'delay': (
311     Delay, [ArgUnknown(min=1, max=1)],
312     [cli_option("--no-master", dest="on_master", default=True,
313                 action="store_false", help="Do not sleep in the master code"),
314      cli_option("-n", dest="on_nodes", default=[],
315                 action="append", help="Select nodes to sleep on"),
316      cli_option("-r", "--repeat", type="int", default="0", dest="repeat",
317                 help="Number of times to repeat the sleep"),
318      ],
319     "[opts...] <duration>", "Executes a TestDelay OpCode"),
320   'submit-job': (
321     GenericOpCodes, [ArgFile(min=1)],
322     [VERBOSE_OPT,
323      cli_option("--op-repeat", type="int", default="1", dest="rep_op",
324                 help="Repeat the opcode sequence this number of times"),
325      cli_option("--job-repeat", type="int", default="1", dest="rep_job",
326                 help="Repeat the job this number of times"),
327      cli_option("--timing-stats", default=False,
328                 action="store_true", help="Show timing stats"),
329      cli_option("--each", default=False, action="store_true",
330                 help="Submit each job separately"),
331      ],
332     "<op_list_file...>", "Submits jobs built from json files"
333     " containing a list of serialized opcodes"),
334   'allocator': (
335     TestAllocator, [ArgUnknown(min=1)],
336     [cli_option("--dir", dest="direction",
337                 default="in", choices=["in", "out"],
338                 help="Show allocator input (in) or allocator"
339                 " results (out)"),
340      IALLOCATOR_OPT,
341      cli_option("-m", "--mode", default="relocate",
342                 choices=["relocate", "allocate", "multi-evacuate"],
343                 help="Request mode, either allocate or relocate"),
344      cli_option("--mem", default=128, type="unit",
345                 help="Memory size for the instance (MiB)"),
346      cli_option("--disks", default="4096,4096",
347                 help="Comma separated list of disk sizes (MiB)"),
348      DISK_TEMPLATE_OPT,
349      cli_option("--nics", default="00:11:22:33:44:55",
350                 help="Comma separated list of nics, each nic"
351                 " definition is of form mac/ip/bridge, if"
352                 " missing values are replace by None"),
353      OS_OPT,
354      cli_option("-p", "--vcpus", default=1, type="int",
355                 help="Select number of VCPUs for the instance"),
356      cli_option("--tags", default=None,
357                 help="Comma separated list of tags"),
358      ],
359     "{opts...} <instance>", "Executes a TestAllocator OpCode"),
360   "test-jobqueue": (
361     TestJobqueue, ARGS_NONE, [],
362     "", "Test a few aspects of the job queue")
363   }
364
365
366 if __name__ == '__main__':
367   sys.exit(GenericMain(commands))