gnt-debug: New iallocator mode
[ganeti-local] / lib / client / gnt_debug.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21 """Debugging commands"""
22
23 # pylint: disable-msg=W0401,W0614,C0103
24 # W0401: Wildcard import ganeti.cli
25 # W0614: Unused import %s from wildcard import (since we need cli)
26 # C0103: Invalid name gnt-backup
27
28 import simplejson
29 import time
30 import socket
31 import logging
32
33 from ganeti.cli import *
34 from ganeti import cli
35 from ganeti import constants
36 from ganeti import opcodes
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import compat
40
41
42 #: Default fields for L{ListLocks}
43 _LIST_LOCKS_DEF_FIELDS = [
44   "name",
45   "mode",
46   "owner",
47   "pending",
48   ]
49
50
51 def Delay(opts, args):
52   """Sleeps for a while
53
54   @param opts: the command line options selected by the user
55   @type args: list
56   @param args: should contain only one element, the duration
57       the sleep
58   @rtype: int
59   @return: the desired exit code
60
61   """
62   delay = float(args[0])
63   op = opcodes.OpTestDelay(duration=delay,
64                            on_master=opts.on_master,
65                            on_nodes=opts.on_nodes,
66                            repeat=opts.repeat)
67   SubmitOpCode(op, opts=opts)
68
69   return 0
70
71
72 def GenericOpCodes(opts, args):
73   """Send any opcode to the master.
74
75   @param opts: the command line options selected by the user
76   @type args: list
77   @param args: should contain only one element, the path of
78       the file with the opcode definition
79   @rtype: int
80   @return: the desired exit code
81
82   """
83   cl = cli.GetClient()
84   jex = cli.JobExecutor(cl=cl, verbose=opts.verbose, opts=opts)
85
86   job_cnt = 0
87   op_cnt = 0
88   if opts.timing_stats:
89     ToStdout("Loading...")
90   for job_idx in range(opts.rep_job):
91     for fname in args:
92       # pylint: disable-msg=W0142
93       op_data = simplejson.loads(utils.ReadFile(fname))
94       op_list = [opcodes.OpCode.LoadOpCode(val) for val in op_data]
95       op_list = op_list * opts.rep_op
96       jex.QueueJob("file %s/%d" % (fname, job_idx), *op_list)
97       op_cnt += len(op_list)
98       job_cnt += 1
99
100   if opts.timing_stats:
101     t1 = time.time()
102     ToStdout("Submitting...")
103
104   jex.SubmitPending(each=opts.each)
105
106   if opts.timing_stats:
107     t2 = time.time()
108     ToStdout("Executing...")
109
110   jex.GetResults()
111   if opts.timing_stats:
112     t3 = time.time()
113     ToStdout("C:op     %4d" % op_cnt)
114     ToStdout("C:job    %4d" % job_cnt)
115     ToStdout("T:submit %4.4f" % (t2-t1))
116     ToStdout("T:exec   %4.4f" % (t3-t2))
117     ToStdout("T:total  %4.4f" % (t3-t1))
118   return 0
119
120
121 def TestAllocator(opts, args):
122   """Runs the test allocator opcode.
123
124   @param opts: the command line options selected by the user
125   @type args: list
126   @param args: should contain only one element, the iallocator name
127   @rtype: int
128   @return: the desired exit code
129
130   """
131   try:
132     disks = [{
133       constants.IDISK_SIZE: utils.ParseUnit(val),
134       constants.IDISK_MODE: constants.DISK_RDWR,
135       } for val in opts.disks.split(",")]
136   except errors.UnitParseError, err:
137     ToStderr("Invalid disks parameter '%s': %s", opts.disks, err)
138     return 1
139
140   nics = [val.split("/") for val in opts.nics.split(",")]
141   for row in nics:
142     while len(row) < 3:
143       row.append(None)
144     for i in range(3):
145       if row[i] == '':
146         row[i] = None
147   nic_dict = [{
148     constants.INIC_MAC: v[0],
149     constants.INIC_IP: v[1],
150     # The iallocator interface defines a "bridge" item
151     "bridge": v[2],
152     } for v in nics]
153
154   if opts.tags is None:
155     opts.tags = []
156   else:
157     opts.tags = opts.tags.split(",")
158
159   op = opcodes.OpTestAllocator(mode=opts.mode,
160                                name=args[0],
161                                evac_nodes=args,
162                                instances=args,
163                                mem_size=opts.mem,
164                                disks=disks,
165                                disk_template=opts.disk_template,
166                                nics=nic_dict,
167                                os=opts.os,
168                                vcpus=opts.vcpus,
169                                tags=opts.tags,
170                                direction=opts.direction,
171                                allocator=opts.iallocator,
172                                reloc_mode=opts.reloc_mode,
173                                target_groups=opts.target_groups)
174   result = SubmitOpCode(op, opts=opts)
175   ToStdout("%s" % result)
176   return 0
177
178
179 def _TestJobSubmission(opts):
180   """Tests submitting jobs.
181
182   """
183   ToStdout("Testing job submission")
184
185   testdata = [
186     (0, 0, constants.OP_PRIO_LOWEST),
187     (0, 0, constants.OP_PRIO_HIGHEST),
188     ]
189
190   for priority in (constants.OP_PRIO_SUBMIT_VALID |
191                    frozenset([constants.OP_PRIO_LOWEST,
192                               constants.OP_PRIO_HIGHEST])):
193     for offset in [-1, +1]:
194       testdata.extend([
195         (0, 0, priority + offset),
196         (3, 0, priority + offset),
197         (0, 3, priority + offset),
198         (4, 2, priority + offset),
199         ])
200
201   cl = cli.GetClient()
202
203   for before, after, failpriority in testdata:
204     ops = []
205     ops.extend([opcodes.OpTestDelay(duration=0) for _ in range(before)])
206     ops.append(opcodes.OpTestDelay(duration=0, priority=failpriority))
207     ops.extend([opcodes.OpTestDelay(duration=0) for _ in range(after)])
208
209     try:
210       cl.SubmitJob(ops)
211     except errors.GenericError, err:
212       if opts.debug:
213         ToStdout("Ignoring error: %s", err)
214     else:
215       raise errors.OpExecError("Submitting opcode with priority %s did not"
216                                " fail when it should (allowed are %s)" %
217                                (failpriority, constants.OP_PRIO_SUBMIT_VALID))
218
219     jobs = [
220       [opcodes.OpTestDelay(duration=0),
221        opcodes.OpTestDelay(duration=0, dry_run=False),
222        opcodes.OpTestDelay(duration=0, dry_run=True)],
223       ops,
224       ]
225     result = cl.SubmitManyJobs(jobs)
226     if not (len(result) == 2 and
227             compat.all(len(i) == 2 for i in result) and
228             compat.all(isinstance(i[1], basestring) for i in result) and
229             result[0][0] and not result[1][0]):
230       raise errors.OpExecError("Submitting multiple jobs did not work as"
231                                " expected, result %s" % result)
232     assert len(result) == 2
233
234   ToStdout("Job submission tests were successful")
235
236
237 class _JobQueueTestReporter(cli.StdioJobPollReportCb):
238   def __init__(self):
239     """Initializes this class.
240
241     """
242     cli.StdioJobPollReportCb.__init__(self)
243     self._expected_msgcount = 0
244     self._all_testmsgs = []
245     self._testmsgs = None
246     self._job_id = None
247
248   def GetTestMessages(self):
249     """Returns all test log messages received so far.
250
251     """
252     return self._all_testmsgs
253
254   def GetJobId(self):
255     """Returns the job ID.
256
257     """
258     return self._job_id
259
260   def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
261     """Handles a log message.
262
263     """
264     if self._job_id is None:
265       self._job_id = job_id
266     elif self._job_id != job_id:
267       raise errors.ProgrammerError("The same reporter instance was used for"
268                                    " more than one job")
269
270     if log_type == constants.ELOG_JQUEUE_TEST:
271       (sockname, test, arg) = log_msg
272       return self._ProcessTestMessage(job_id, sockname, test, arg)
273
274     elif (log_type == constants.ELOG_MESSAGE and
275           log_msg.startswith(constants.JQT_MSGPREFIX)):
276       if self._testmsgs is None:
277         raise errors.OpExecError("Received test message without a preceding"
278                                  " start message")
279       testmsg = log_msg[len(constants.JQT_MSGPREFIX):]
280       self._testmsgs.append(testmsg)
281       self._all_testmsgs.append(testmsg)
282       return
283
284     return cli.StdioJobPollReportCb.ReportLogMessage(self, job_id, serial,
285                                                      timestamp, log_type,
286                                                      log_msg)
287
288   def _ProcessTestMessage(self, job_id, sockname, test, arg):
289     """Handles a job queue test message.
290
291     """
292     if test not in constants.JQT_ALL:
293       raise errors.OpExecError("Received invalid test message %s" % test)
294
295     sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
296     try:
297       sock.settimeout(30.0)
298
299       logging.debug("Connecting to %s", sockname)
300       sock.connect(sockname)
301
302       logging.debug("Checking status")
303       jobdetails = cli.GetClient().QueryJobs([job_id], ["status"])[0]
304       if not jobdetails:
305         raise errors.OpExecError("Can't find job %s" % job_id)
306
307       status = jobdetails[0]
308
309       logging.debug("Status of job %s is %s", job_id, status)
310
311       if test == constants.JQT_EXPANDNAMES:
312         if status != constants.JOB_STATUS_WAITLOCK:
313           raise errors.OpExecError("Job status while expanding names is '%s',"
314                                    " not '%s' as expected" %
315                                    (status, constants.JOB_STATUS_WAITLOCK))
316       elif test in (constants.JQT_EXEC, constants.JQT_LOGMSG):
317         if status != constants.JOB_STATUS_RUNNING:
318           raise errors.OpExecError("Job status while executing opcode is '%s',"
319                                    " not '%s' as expected" %
320                                    (status, constants.JOB_STATUS_RUNNING))
321
322       if test == constants.JQT_STARTMSG:
323         logging.debug("Expecting %s test messages", arg)
324         self._testmsgs = []
325       elif test == constants.JQT_LOGMSG:
326         if len(self._testmsgs) != arg:
327           raise errors.OpExecError("Received %s test messages when %s are"
328                                    " expected" % (len(self._testmsgs), arg))
329     finally:
330       logging.debug("Closing socket")
331       sock.close()
332
333
334 def TestJobqueue(opts, _):
335   """Runs a few tests on the job queue.
336
337   """
338   _TestJobSubmission(opts)
339
340   (TM_SUCCESS,
341    TM_MULTISUCCESS,
342    TM_FAIL,
343    TM_PARTFAIL) = range(4)
344   TM_ALL = frozenset([TM_SUCCESS, TM_MULTISUCCESS, TM_FAIL, TM_PARTFAIL])
345
346   for mode in TM_ALL:
347     test_messages = [
348       "Testing mode %s" % mode,
349       "Hello World",
350       "A",
351       "",
352       "B"
353       "Foo|bar|baz",
354       utils.TimestampForFilename(),
355       ]
356
357     fail = mode in (TM_FAIL, TM_PARTFAIL)
358
359     if mode == TM_PARTFAIL:
360       ToStdout("Testing partial job failure")
361       ops = [
362         opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True,
363                              log_messages=test_messages, fail=False),
364         opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True,
365                              log_messages=test_messages, fail=False),
366         opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True,
367                              log_messages=test_messages, fail=True),
368         opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True,
369                              log_messages=test_messages, fail=False),
370         ]
371       expect_messages = 3 * [test_messages]
372       expect_opstatus = [
373         constants.OP_STATUS_SUCCESS,
374         constants.OP_STATUS_SUCCESS,
375         constants.OP_STATUS_ERROR,
376         constants.OP_STATUS_ERROR,
377         ]
378       expect_resultlen = 2
379     elif mode == TM_MULTISUCCESS:
380       ToStdout("Testing multiple successful opcodes")
381       ops = [
382         opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True,
383                              log_messages=test_messages, fail=False),
384         opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True,
385                              log_messages=test_messages, fail=False),
386         ]
387       expect_messages = 2 * [test_messages]
388       expect_opstatus = [
389         constants.OP_STATUS_SUCCESS,
390         constants.OP_STATUS_SUCCESS,
391         ]
392       expect_resultlen = 2
393     else:
394       if mode == TM_SUCCESS:
395         ToStdout("Testing job success")
396         expect_opstatus = [constants.OP_STATUS_SUCCESS]
397       elif mode == TM_FAIL:
398         ToStdout("Testing job failure")
399         expect_opstatus = [constants.OP_STATUS_ERROR]
400       else:
401         raise errors.ProgrammerError("Unknown test mode %s" % mode)
402
403       ops = [
404         opcodes.OpTestJqueue(notify_waitlock=True,
405                              notify_exec=True,
406                              log_messages=test_messages,
407                              fail=fail)
408         ]
409       expect_messages = [test_messages]
410       expect_resultlen = 1
411
412     cl = cli.GetClient()
413     cli.SetGenericOpcodeOpts(ops, opts)
414
415     # Send job to master daemon
416     job_id = cli.SendJob(ops, cl=cl)
417
418     reporter = _JobQueueTestReporter()
419     results = None
420
421     try:
422       results = cli.PollJob(job_id, cl=cl, reporter=reporter)
423     except errors.OpExecError, err:
424       if not fail:
425         raise
426       ToStdout("Ignoring error: %s", err)
427     else:
428       if fail:
429         raise errors.OpExecError("Job didn't fail when it should")
430
431     # Check length of result
432     if fail:
433       if results is not None:
434         raise errors.OpExecError("Received result from failed job")
435     elif len(results) != expect_resultlen:
436       raise errors.OpExecError("Received %s results (%s), expected %s" %
437                                (len(results), results, expect_resultlen))
438
439     # Check received log messages
440     all_messages = [i for j in expect_messages for i in j]
441     if reporter.GetTestMessages() != all_messages:
442       raise errors.OpExecError("Received test messages don't match input"
443                                " (input %r, received %r)" %
444                                (all_messages, reporter.GetTestMessages()))
445
446     # Check final status
447     reported_job_id = reporter.GetJobId()
448     if reported_job_id != job_id:
449       raise errors.OpExecError("Reported job ID %s doesn't match"
450                                "submission job ID %s" %
451                                (reported_job_id, job_id))
452
453     jobdetails = cli.GetClient().QueryJobs([job_id], ["status", "opstatus"])[0]
454     if not jobdetails:
455       raise errors.OpExecError("Can't find job %s" % job_id)
456
457     if fail:
458       exp_status = constants.JOB_STATUS_ERROR
459     else:
460       exp_status = constants.JOB_STATUS_SUCCESS
461
462     (final_status, final_opstatus) = jobdetails
463     if final_status != exp_status:
464       raise errors.OpExecError("Final job status is %s, not %s as expected" %
465                                (final_status, exp_status))
466     if len(final_opstatus) != len(ops):
467       raise errors.OpExecError("Did not receive status for all opcodes (got %s,"
468                                " expected %s)" %
469                                (len(final_opstatus), len(ops)))
470     if final_opstatus != expect_opstatus:
471       raise errors.OpExecError("Opcode status is %s, expected %s" %
472                                (final_opstatus, expect_opstatus))
473
474   ToStdout("Job queue test successful")
475
476   return 0
477
478
479 def ListLocks(opts, args): # pylint: disable-msg=W0613
480   """List all locks.
481
482   @param opts: the command line options selected by the user
483   @type args: list
484   @param args: should be an empty list
485   @rtype: int
486   @return: the desired exit code
487
488   """
489   selected_fields = ParseFields(opts.output, _LIST_LOCKS_DEF_FIELDS)
490
491   def _DashIfNone(fn):
492     def wrapper(value):
493       if not value:
494         return "-"
495       return fn(value)
496     return wrapper
497
498   def _FormatPending(value):
499     """Format pending acquires.
500
501     """
502     return utils.CommaJoin("%s:%s" % (mode, ",".join(threads))
503                            for mode, threads in value)
504
505   # Format raw values
506   fmtoverride = {
507     "mode": (_DashIfNone(str), False),
508     "owner": (_DashIfNone(",".join), False),
509     "pending": (_DashIfNone(_FormatPending), False),
510     }
511
512   while True:
513     ret = GenericList(constants.QR_LOCK, selected_fields, None, None,
514                       opts.separator, not opts.no_headers,
515                       format_override=fmtoverride, verbose=opts.verbose)
516
517     if ret != constants.EXIT_SUCCESS:
518       return ret
519
520     if not opts.interval:
521       break
522
523     ToStdout("")
524     time.sleep(opts.interval)
525
526   return 0
527
528
529 commands = {
530   'delay': (
531     Delay, [ArgUnknown(min=1, max=1)],
532     [cli_option("--no-master", dest="on_master", default=True,
533                 action="store_false", help="Do not sleep in the master code"),
534      cli_option("-n", dest="on_nodes", default=[],
535                 action="append", help="Select nodes to sleep on"),
536      cli_option("-r", "--repeat", type="int", default="0", dest="repeat",
537                 help="Number of times to repeat the sleep"),
538      DRY_RUN_OPT, PRIORITY_OPT,
539      ],
540     "[opts...] <duration>", "Executes a TestDelay OpCode"),
541   'submit-job': (
542     GenericOpCodes, [ArgFile(min=1)],
543     [VERBOSE_OPT,
544      cli_option("--op-repeat", type="int", default="1", dest="rep_op",
545                 help="Repeat the opcode sequence this number of times"),
546      cli_option("--job-repeat", type="int", default="1", dest="rep_job",
547                 help="Repeat the job this number of times"),
548      cli_option("--timing-stats", default=False,
549                 action="store_true", help="Show timing stats"),
550      cli_option("--each", default=False, action="store_true",
551                 help="Submit each job separately"),
552      DRY_RUN_OPT, PRIORITY_OPT,
553      ],
554     "<op_list_file...>", "Submits jobs built from json files"
555     " containing a list of serialized opcodes"),
556   'allocator': (
557     TestAllocator, [ArgUnknown(min=1)],
558     [cli_option("--dir", dest="direction", default=constants.IALLOCATOR_DIR_IN,
559                 choices=list(constants.VALID_IALLOCATOR_DIRECTIONS),
560                 help="Show allocator input (in) or allocator"
561                 " results (out)"),
562      IALLOCATOR_OPT,
563      cli_option("-m", "--mode", default="relocate",
564                 choices=list(constants.VALID_IALLOCATOR_MODES),
565                 help=("Request mode (one of %s)" %
566                       utils.CommaJoin(constants.VALID_IALLOCATOR_MODES))),
567      cli_option("--mem", default=128, type="unit",
568                 help="Memory size for the instance (MiB)"),
569      cli_option("--disks", default="4096,4096",
570                 help="Comma separated list of disk sizes (MiB)"),
571      DISK_TEMPLATE_OPT,
572      cli_option("--nics", default="00:11:22:33:44:55",
573                 help="Comma separated list of nics, each nic"
574                 " definition is of form mac/ip/bridge, if"
575                 " missing values are replace by None"),
576      OS_OPT,
577      cli_option("-p", "--vcpus", default=1, type="int",
578                 help="Select number of VCPUs for the instance"),
579      cli_option("--tags", default=None,
580                 help="Comma separated list of tags"),
581      cli_option("--reloc-mode", default=constants.IALLOCATOR_MRELOC_ANY,
582                 choices=list(constants.IALLOCATOR_MRELOC_MODES),
583                 help=("Instance relocation mode (one of %s)" %
584                       utils.CommaJoin(constants.IALLOCATOR_MRELOC_MODES))),
585      cli_option("--target-groups", help="Target groups for relocation"),
586      DRY_RUN_OPT, PRIORITY_OPT,
587      ],
588     "{opts...} <instance>", "Executes a TestAllocator OpCode"),
589   "test-jobqueue": (
590     TestJobqueue, ARGS_NONE, [PRIORITY_OPT],
591     "", "Test a few aspects of the job queue"),
592   "locks": (
593     ListLocks, ARGS_NONE,
594     [NOHDR_OPT, SEP_OPT, FIELDS_OPT, INTERVAL_OPT, VERBOSE_OPT],
595     "[--interval N]", "Show a list of locks in the master daemon"),
596   }
597
598
599 def Main():
600   return GenericMain(commands)