gnt-debug: Extend job queue tests
[ganeti-local] / scripts / gnt-debug
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21 """Debugging commands"""
22
23 # pylint: disable-msg=W0401,W0614,C0103
24 # W0401: Wildcard import ganeti.cli
25 # W0614: Unused import %s from wildcard import (since we need cli)
26 # C0103: Invalid name gnt-backup
27
28 import sys
29 import simplejson
30 import time
31 import socket
32 import logging
33
34 from ganeti.cli import *
35 from ganeti import cli
36 from ganeti import constants
37 from ganeti import opcodes
38 from ganeti import utils
39 from ganeti import errors
40
41
42 def Delay(opts, args):
43   """Sleeps for a while
44
45   @param opts: the command line options selected by the user
46   @type args: list
47   @param args: should contain only one element, the duration
48       the sleep
49   @rtype: int
50   @return: the desired exit code
51
52   """
53   delay = float(args[0])
54   op = opcodes.OpTestDelay(duration=delay,
55                            on_master=opts.on_master,
56                            on_nodes=opts.on_nodes,
57                            repeat=opts.repeat)
58   SubmitOpCode(op, opts=opts)
59
60   return 0
61
62
63 def GenericOpCodes(opts, args):
64   """Send any opcode to the master.
65
66   @param opts: the command line options selected by the user
67   @type args: list
68   @param args: should contain only one element, the path of
69       the file with the opcode definition
70   @rtype: int
71   @return: the desired exit code
72
73   """
74   cl = cli.GetClient()
75   jex = cli.JobExecutor(cl=cl, verbose=opts.verbose, opts=opts)
76
77   job_cnt = 0
78   op_cnt = 0
79   if opts.timing_stats:
80     ToStdout("Loading...")
81   for job_idx in range(opts.rep_job):
82     for fname in args:
83       # pylint: disable-msg=W0142
84       op_data = simplejson.loads(utils.ReadFile(fname))
85       op_list = [opcodes.OpCode.LoadOpCode(val) for val in op_data]
86       op_list = op_list * opts.rep_op
87       jex.QueueJob("file %s/%d" % (fname, job_idx), *op_list)
88       op_cnt += len(op_list)
89       job_cnt += 1
90
91   if opts.timing_stats:
92     t1 = time.time()
93     ToStdout("Submitting...")
94
95   jex.SubmitPending(each=opts.each)
96
97   if opts.timing_stats:
98     t2 = time.time()
99     ToStdout("Executing...")
100
101   jex.GetResults()
102   if opts.timing_stats:
103     t3 = time.time()
104     ToStdout("C:op     %4d" % op_cnt)
105     ToStdout("C:job    %4d" % job_cnt)
106     ToStdout("T:submit %4.4f" % (t2-t1))
107     ToStdout("T:exec   %4.4f" % (t3-t2))
108     ToStdout("T:total  %4.4f" % (t3-t1))
109   return 0
110
111
112 def TestAllocator(opts, args):
113   """Runs the test allocator opcode.
114
115   @param opts: the command line options selected by the user
116   @type args: list
117   @param args: should contain only one element, the iallocator name
118   @rtype: int
119   @return: the desired exit code
120
121   """
122   try:
123     disks = [{"size": utils.ParseUnit(val), "mode": 'w'}
124              for val in opts.disks.split(",")]
125   except errors.UnitParseError, err:
126     ToStderr("Invalid disks parameter '%s': %s", opts.disks, err)
127     return 1
128
129   nics = [val.split("/") for val in opts.nics.split(",")]
130   for row in nics:
131     while len(row) < 3:
132       row.append(None)
133     for i in range(3):
134       if row[i] == '':
135         row[i] = None
136   nic_dict = [{"mac": v[0], "ip": v[1], "bridge": v[2]} for v in nics]
137
138   if opts.tags is None:
139     opts.tags = []
140   else:
141     opts.tags = opts.tags.split(",")
142
143   op = opcodes.OpTestAllocator(mode=opts.mode,
144                                name=args[0],
145                                evac_nodes=args,
146                                mem_size=opts.mem,
147                                disks=disks,
148                                disk_template=opts.disk_template,
149                                nics=nic_dict,
150                                os=opts.os,
151                                vcpus=opts.vcpus,
152                                tags=opts.tags,
153                                direction=opts.direction,
154                                allocator=opts.iallocator,
155                                )
156   result = SubmitOpCode(op, opts=opts)
157   ToStdout("%s" % result)
158   return 0
159
160
161 class _JobQueueTestReporter(cli.StdioJobPollReportCb):
162   def __init__(self):
163     """Initializes this class.
164
165     """
166     cli.StdioJobPollReportCb.__init__(self)
167     self._expected_msgcount = 0
168     self._all_testmsgs = []
169     self._testmsgs = None
170     self._job_id = None
171
172   def GetTestMessages(self):
173     """Returns all test log messages received so far.
174
175     """
176     return self._all_testmsgs
177
178   def GetJobId(self):
179     """Returns the job ID.
180
181     """
182     return self._job_id
183
184   def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
185     """Handles a log message.
186
187     """
188     if self._job_id is None:
189       self._job_id = job_id
190     elif self._job_id != job_id:
191       raise errors.ProgrammerError("The same reporter instance was used for"
192                                    " more than one job")
193
194     if log_type == constants.ELOG_JQUEUE_TEST:
195       (sockname, test, arg) = log_msg
196       return self._ProcessTestMessage(job_id, sockname, test, arg)
197
198     elif (log_type == constants.ELOG_MESSAGE and
199           log_msg.startswith(constants.JQT_MSGPREFIX)):
200       if self._testmsgs is None:
201         raise errors.OpExecError("Received test message without a preceding"
202                                  " start message")
203       testmsg = log_msg[len(constants.JQT_MSGPREFIX):]
204       self._testmsgs.append(testmsg)
205       self._all_testmsgs.append(testmsg)
206       return
207
208     return cli.StdioJobPollReportCb.ReportLogMessage(self, job_id, serial,
209                                                      timestamp, log_type,
210                                                      log_msg)
211
212   def _ProcessTestMessage(self, job_id, sockname, test, arg):
213     """Handles a job queue test message.
214
215     """
216     if test not in constants.JQT_ALL:
217       raise errors.OpExecError("Received invalid test message %s" % test)
218
219     sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
220     try:
221       sock.settimeout(30.0)
222
223       logging.debug("Connecting to %s", sockname)
224       sock.connect(sockname)
225
226       logging.debug("Checking status")
227       jobdetails = cli.GetClient().QueryJobs([job_id], ["status"])[0]
228       if not jobdetails:
229         raise errors.OpExecError("Can't find job %s" % job_id)
230
231       status = jobdetails[0]
232
233       logging.debug("Status of job %s is %s", job_id, status)
234
235       if test == constants.JQT_EXPANDNAMES:
236         if status != constants.JOB_STATUS_WAITLOCK:
237           raise errors.OpExecError("Job status while expanding names is '%s',"
238                                    " not '%s' as expected" %
239                                    (status, constants.JOB_STATUS_WAITLOCK))
240       elif test in (constants.JQT_EXEC, constants.JQT_LOGMSG):
241         if status != constants.JOB_STATUS_RUNNING:
242           raise errors.OpExecError("Job status while executing opcode is '%s',"
243                                    " not '%s' as expected" %
244                                    (status, constants.JOB_STATUS_RUNNING))
245
246       if test == constants.JQT_STARTMSG:
247         logging.debug("Expecting %s test messages", arg)
248         self._testmsgs = []
249       elif test == constants.JQT_LOGMSG:
250         if len(self._testmsgs) != arg:
251           raise errors.OpExecError("Received %s test messages when %s are"
252                                    " expected" % (len(self._testmsgs), arg))
253     finally:
254       logging.debug("Closing socket")
255       sock.close()
256
257
258 def TestJobqueue(opts, _):
259   """Runs a few tests on the job queue.
260
261   """
262   (TM_SUCCESS,
263    TM_MULTISUCCESS,
264    TM_FAIL,
265    TM_PARTFAIL) = range(4)
266   TM_ALL = frozenset([TM_SUCCESS, TM_MULTISUCCESS, TM_FAIL, TM_PARTFAIL])
267
268   for mode in TM_ALL:
269     test_messages = [
270       "Testing mode %s" % mode,
271       "Hello World",
272       "A",
273       "",
274       "B"
275       "Foo|bar|baz",
276       utils.TimestampForFilename(),
277       ]
278
279     fail = mode in (TM_FAIL, TM_PARTFAIL)
280
281     if mode == TM_PARTFAIL:
282       ToStdout("Testing partial job failure")
283       ops = [
284         opcodes.OpTestJobqueue(notify_waitlock=True, notify_exec=True,
285                                log_messages=test_messages, fail=False),
286         opcodes.OpTestJobqueue(notify_waitlock=True, notify_exec=True,
287                                log_messages=test_messages, fail=False),
288         opcodes.OpTestJobqueue(notify_waitlock=True, notify_exec=True,
289                                log_messages=test_messages, fail=True),
290         opcodes.OpTestJobqueue(notify_waitlock=True, notify_exec=True,
291                                log_messages=test_messages, fail=False),
292         ]
293       expect_messages = 3 * [test_messages]
294       expect_opstatus = [
295         constants.OP_STATUS_SUCCESS,
296         constants.OP_STATUS_SUCCESS,
297         constants.OP_STATUS_ERROR,
298         constants.OP_STATUS_ERROR,
299         ]
300       expect_resultlen = 2
301     elif mode == TM_MULTISUCCESS:
302       ToStdout("Testing multiple successful opcodes")
303       ops = [
304         opcodes.OpTestJobqueue(notify_waitlock=True, notify_exec=True,
305                                log_messages=test_messages, fail=False),
306         opcodes.OpTestJobqueue(notify_waitlock=True, notify_exec=True,
307                                log_messages=test_messages, fail=False),
308         ]
309       expect_messages = 2 * [test_messages]
310       expect_opstatus = [
311         constants.OP_STATUS_SUCCESS,
312         constants.OP_STATUS_SUCCESS,
313         ]
314       expect_resultlen = 2
315     else:
316       if mode == TM_SUCCESS:
317         ToStdout("Testing job success")
318         expect_opstatus = [constants.OP_STATUS_SUCCESS]
319       elif mode == TM_FAIL:
320         ToStdout("Testing job failure")
321         expect_opstatus = [constants.OP_STATUS_ERROR]
322       else:
323         raise errors.ProgrammerError("Unknown test mode %s" % mode)
324
325       ops = [
326         opcodes.OpTestJobqueue(notify_waitlock=True,
327                                notify_exec=True,
328                                log_messages=test_messages,
329                                fail=fail)
330         ]
331       expect_messages = [test_messages]
332       expect_resultlen = 1
333
334     cl = cli.GetClient()
335     cli.SetGenericOpcodeOpts(ops, opts)
336
337     # Send job to master daemon
338     job_id = cli.SendJob(ops, cl=cl)
339
340     reporter = _JobQueueTestReporter()
341     results = None
342
343     try:
344       results = cli.PollJob(job_id, cl=cl, reporter=reporter)
345     except errors.OpExecError, err:
346       if not fail:
347         raise
348       ToStdout("Ignoring error: %s", err)
349     else:
350       if fail:
351         raise errors.OpExecError("Job didn't fail when it should")
352
353     # Check length of result
354     if fail:
355       if results is not None:
356         raise errors.OpExecError("Received result from failed job")
357     elif len(results) != expect_resultlen:
358       raise errors.OpExecError("Received %s results (%s), expected %s" %
359                                (len(results), results, expect_resultlen))
360
361     # Check received log messages
362     all_messages = [i for j in expect_messages for i in j]
363     if reporter.GetTestMessages() != all_messages:
364       raise errors.OpExecError("Received test messages don't match input"
365                                " (input %r, received %r)" %
366                                (all_messages, reporter.GetTestMessages()))
367
368     # Check final status
369     reported_job_id = reporter.GetJobId()
370     if reported_job_id != job_id:
371       raise errors.OpExecError("Reported job ID %s doesn't match"
372                                "submission job ID %s" %
373                                (reported_job_id, job_id))
374
375     jobdetails = cli.GetClient().QueryJobs([job_id], ["status", "opstatus"])[0]
376     if not jobdetails:
377       raise errors.OpExecError("Can't find job %s" % job_id)
378
379     if fail:
380       exp_status = constants.JOB_STATUS_ERROR
381     else:
382       exp_status = constants.JOB_STATUS_SUCCESS
383
384     (final_status, final_opstatus) = jobdetails
385     if final_status != exp_status:
386       raise errors.OpExecError("Final job status is %s, not %s as expected" %
387                                (final_status, exp_status))
388     if len(final_opstatus) != len(ops):
389       raise errors.OpExecError("Did not receive status for all opcodes (got %s,"
390                                " expected %s)" %
391                                (len(final_opstatus), len(ops)))
392     if final_opstatus != expect_opstatus:
393       raise errors.OpExecError("Opcode status is %s, expected %s" %
394                                (final_opstatus, expect_opstatus))
395
396   ToStdout("Job queue test successful")
397
398   return 0
399
400
401 commands = {
402   'delay': (
403     Delay, [ArgUnknown(min=1, max=1)],
404     [cli_option("--no-master", dest="on_master", default=True,
405                 action="store_false", help="Do not sleep in the master code"),
406      cli_option("-n", dest="on_nodes", default=[],
407                 action="append", help="Select nodes to sleep on"),
408      cli_option("-r", "--repeat", type="int", default="0", dest="repeat",
409                 help="Number of times to repeat the sleep"),
410      ],
411     "[opts...] <duration>", "Executes a TestDelay OpCode"),
412   'submit-job': (
413     GenericOpCodes, [ArgFile(min=1)],
414     [VERBOSE_OPT,
415      cli_option("--op-repeat", type="int", default="1", dest="rep_op",
416                 help="Repeat the opcode sequence this number of times"),
417      cli_option("--job-repeat", type="int", default="1", dest="rep_job",
418                 help="Repeat the job this number of times"),
419      cli_option("--timing-stats", default=False,
420                 action="store_true", help="Show timing stats"),
421      cli_option("--each", default=False, action="store_true",
422                 help="Submit each job separately"),
423      ],
424     "<op_list_file...>", "Submits jobs built from json files"
425     " containing a list of serialized opcodes"),
426   'allocator': (
427     TestAllocator, [ArgUnknown(min=1)],
428     [cli_option("--dir", dest="direction",
429                 default="in", choices=["in", "out"],
430                 help="Show allocator input (in) or allocator"
431                 " results (out)"),
432      IALLOCATOR_OPT,
433      cli_option("-m", "--mode", default="relocate",
434                 choices=["relocate", "allocate", "multi-evacuate"],
435                 help="Request mode, either allocate or relocate"),
436      cli_option("--mem", default=128, type="unit",
437                 help="Memory size for the instance (MiB)"),
438      cli_option("--disks", default="4096,4096",
439                 help="Comma separated list of disk sizes (MiB)"),
440      DISK_TEMPLATE_OPT,
441      cli_option("--nics", default="00:11:22:33:44:55",
442                 help="Comma separated list of nics, each nic"
443                 " definition is of form mac/ip/bridge, if"
444                 " missing values are replace by None"),
445      OS_OPT,
446      cli_option("-p", "--vcpus", default=1, type="int",
447                 help="Select number of VCPUs for the instance"),
448      cli_option("--tags", default=None,
449                 help="Comma separated list of tags"),
450      ],
451     "{opts...} <instance>", "Executes a TestAllocator OpCode"),
452   "test-jobqueue": (
453     TestJobqueue, ARGS_NONE, [],
454     "", "Test a few aspects of the job queue")
455   }
456
457
458 if __name__ == '__main__':
459   sys.exit(GenericMain(commands))