Statistics
| Branch: | Tag: | Revision:

root / lib / client / gnt_debug.py @ fdbe29ee

History | View | Annotate | Download (18.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Debugging commands"""
22

    
23
# pylint: disable-msg=W0401,W0614,C0103
24
# W0401: Wildcard import ganeti.cli
25
# W0614: Unused import %s from wildcard import (since we need cli)
26
# C0103: Invalid name gnt-backup
27

    
28
import simplejson
29
import time
30
import socket
31
import logging
32

    
33
from ganeti.cli import *
34
from ganeti import cli
35
from ganeti import constants
36
from ganeti import opcodes
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import compat
40

    
41

    
42
#: Default fields for L{ListLocks}
43
_LIST_LOCKS_DEF_FIELDS = [
44
  "name",
45
  "mode",
46
  "owner",
47
  "pending",
48
  ]
49

    
50

    
51
def Delay(opts, args):
52
  """Sleeps for a while
53

54
  @param opts: the command line options selected by the user
55
  @type args: list
56
  @param args: should contain only one element, the duration
57
      the sleep
58
  @rtype: int
59
  @return: the desired exit code
60

61
  """
62
  delay = float(args[0])
63
  op = opcodes.OpTestDelay(duration=delay,
64
                           on_master=opts.on_master,
65
                           on_nodes=opts.on_nodes,
66
                           repeat=opts.repeat)
67
  SubmitOpCode(op, opts=opts)
68

    
69
  return 0
70

    
71

    
72
def GenericOpCodes(opts, args):
73
  """Send any opcode to the master.
74

75
  @param opts: the command line options selected by the user
76
  @type args: list
77
  @param args: should contain only one element, the path of
78
      the file with the opcode definition
79
  @rtype: int
80
  @return: the desired exit code
81

82
  """
83
  cl = cli.GetClient()
84
  jex = cli.JobExecutor(cl=cl, verbose=opts.verbose, opts=opts)
85

    
86
  job_cnt = 0
87
  op_cnt = 0
88
  if opts.timing_stats:
89
    ToStdout("Loading...")
90
  for job_idx in range(opts.rep_job):
91
    for fname in args:
92
      # pylint: disable-msg=W0142
93
      op_data = simplejson.loads(utils.ReadFile(fname))
94
      op_list = [opcodes.OpCode.LoadOpCode(val) for val in op_data]
95
      op_list = op_list * opts.rep_op
96
      jex.QueueJob("file %s/%d" % (fname, job_idx), *op_list)
97
      op_cnt += len(op_list)
98
      job_cnt += 1
99

    
100
  if opts.timing_stats:
101
    t1 = time.time()
102
    ToStdout("Submitting...")
103

    
104
  jex.SubmitPending(each=opts.each)
105

    
106
  if opts.timing_stats:
107
    t2 = time.time()
108
    ToStdout("Executing...")
109

    
110
  jex.GetResults()
111
  if opts.timing_stats:
112
    t3 = time.time()
113
    ToStdout("C:op     %4d" % op_cnt)
114
    ToStdout("C:job    %4d" % job_cnt)
115
    ToStdout("T:submit %4.4f" % (t2-t1))
116
    ToStdout("T:exec   %4.4f" % (t3-t2))
117
    ToStdout("T:total  %4.4f" % (t3-t1))
118
  return 0
119

    
120

    
121
def TestAllocator(opts, args):
122
  """Runs the test allocator opcode.
123

124
  @param opts: the command line options selected by the user
125
  @type args: list
126
  @param args: should contain only one element, the iallocator name
127
  @rtype: int
128
  @return: the desired exit code
129

130
  """
131
  try:
132
    disks = [{
133
      constants.IDISK_SIZE: utils.ParseUnit(val),
134
      constants.IDISK_MODE: constants.DISK_RDWR,
135
      } for val in opts.disks.split(",")]
136
  except errors.UnitParseError, err:
137
    ToStderr("Invalid disks parameter '%s': %s", opts.disks, err)
138
    return 1
139

    
140
  nics = [val.split("/") for val in opts.nics.split(",")]
141
  for row in nics:
142
    while len(row) < 3:
143
      row.append(None)
144
    for i in range(3):
145
      if row[i] == '':
146
        row[i] = None
147
  nic_dict = [{
148
    constants.INIC_MAC: v[0],
149
    constants.INIC_IP: v[1],
150
    # The iallocator interface defines a "bridge" item
151
    "bridge": v[2],
152
    } for v in nics]
153

    
154
  if opts.tags is None:
155
    opts.tags = []
156
  else:
157
    opts.tags = opts.tags.split(",")
158

    
159
  op = opcodes.OpTestAllocator(mode=opts.mode,
160
                               name=args[0],
161
                               evac_nodes=args,
162
                               mem_size=opts.mem,
163
                               disks=disks,
164
                               disk_template=opts.disk_template,
165
                               nics=nic_dict,
166
                               os=opts.os,
167
                               vcpus=opts.vcpus,
168
                               tags=opts.tags,
169
                               direction=opts.direction,
170
                               allocator=opts.iallocator,
171
                               )
172
  result = SubmitOpCode(op, opts=opts)
173
  ToStdout("%s" % result)
174
  return 0
175

    
176

    
177
def _TestJobSubmission(opts):
178
  """Tests submitting jobs.
179

180
  """
181
  ToStdout("Testing job submission")
182

    
183
  testdata = [
184
    (0, 0, constants.OP_PRIO_LOWEST),
185
    (0, 0, constants.OP_PRIO_HIGHEST),
186
    ]
187

    
188
  for priority in (constants.OP_PRIO_SUBMIT_VALID |
189
                   frozenset([constants.OP_PRIO_LOWEST,
190
                              constants.OP_PRIO_HIGHEST])):
191
    for offset in [-1, +1]:
192
      testdata.extend([
193
        (0, 0, priority + offset),
194
        (3, 0, priority + offset),
195
        (0, 3, priority + offset),
196
        (4, 2, priority + offset),
197
        ])
198

    
199
  cl = cli.GetClient()
200

    
201
  for before, after, failpriority in testdata:
202
    ops = []
203
    ops.extend([opcodes.OpTestDelay(duration=0) for _ in range(before)])
204
    ops.append(opcodes.OpTestDelay(duration=0, priority=failpriority))
205
    ops.extend([opcodes.OpTestDelay(duration=0) for _ in range(after)])
206

    
207
    try:
208
      cl.SubmitJob(ops)
209
    except errors.GenericError, err:
210
      if opts.debug:
211
        ToStdout("Ignoring error: %s", err)
212
    else:
213
      raise errors.OpExecError("Submitting opcode with priority %s did not"
214
                               " fail when it should (allowed are %s)" %
215
                               (failpriority, constants.OP_PRIO_SUBMIT_VALID))
216

    
217
    jobs = [
218
      [opcodes.OpTestDelay(duration=0),
219
       opcodes.OpTestDelay(duration=0, dry_run=False),
220
       opcodes.OpTestDelay(duration=0, dry_run=True)],
221
      ops,
222
      ]
223
    result = cl.SubmitManyJobs(jobs)
224
    if not (len(result) == 2 and
225
            compat.all(len(i) == 2 for i in result) and
226
            compat.all(isinstance(i[1], basestring) for i in result) and
227
            result[0][0] and not result[1][0]):
228
      raise errors.OpExecError("Submitting multiple jobs did not work as"
229
                               " expected, result %s" % result)
230
    assert len(result) == 2
231

    
232
  ToStdout("Job submission tests were successful")
233

    
234

    
235
class _JobQueueTestReporter(cli.StdioJobPollReportCb):
236
  def __init__(self):
237
    """Initializes this class.
238

239
    """
240
    cli.StdioJobPollReportCb.__init__(self)
241
    self._expected_msgcount = 0
242
    self._all_testmsgs = []
243
    self._testmsgs = None
244
    self._job_id = None
245

    
246
  def GetTestMessages(self):
247
    """Returns all test log messages received so far.
248

249
    """
250
    return self._all_testmsgs
251

    
252
  def GetJobId(self):
253
    """Returns the job ID.
254

255
    """
256
    return self._job_id
257

    
258
  def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
259
    """Handles a log message.
260

261
    """
262
    if self._job_id is None:
263
      self._job_id = job_id
264
    elif self._job_id != job_id:
265
      raise errors.ProgrammerError("The same reporter instance was used for"
266
                                   " more than one job")
267

    
268
    if log_type == constants.ELOG_JQUEUE_TEST:
269
      (sockname, test, arg) = log_msg
270
      return self._ProcessTestMessage(job_id, sockname, test, arg)
271

    
272
    elif (log_type == constants.ELOG_MESSAGE and
273
          log_msg.startswith(constants.JQT_MSGPREFIX)):
274
      if self._testmsgs is None:
275
        raise errors.OpExecError("Received test message without a preceding"
276
                                 " start message")
277
      testmsg = log_msg[len(constants.JQT_MSGPREFIX):]
278
      self._testmsgs.append(testmsg)
279
      self._all_testmsgs.append(testmsg)
280
      return
281

    
282
    return cli.StdioJobPollReportCb.ReportLogMessage(self, job_id, serial,
283
                                                     timestamp, log_type,
284
                                                     log_msg)
285

    
286
  def _ProcessTestMessage(self, job_id, sockname, test, arg):
287
    """Handles a job queue test message.
288

289
    """
290
    if test not in constants.JQT_ALL:
291
      raise errors.OpExecError("Received invalid test message %s" % test)
292

    
293
    sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
294
    try:
295
      sock.settimeout(30.0)
296

    
297
      logging.debug("Connecting to %s", sockname)
298
      sock.connect(sockname)
299

    
300
      logging.debug("Checking status")
301
      jobdetails = cli.GetClient().QueryJobs([job_id], ["status"])[0]
302
      if not jobdetails:
303
        raise errors.OpExecError("Can't find job %s" % job_id)
304

    
305
      status = jobdetails[0]
306

    
307
      logging.debug("Status of job %s is %s", job_id, status)
308

    
309
      if test == constants.JQT_EXPANDNAMES:
310
        if status != constants.JOB_STATUS_WAITLOCK:
311
          raise errors.OpExecError("Job status while expanding names is '%s',"
312
                                   " not '%s' as expected" %
313
                                   (status, constants.JOB_STATUS_WAITLOCK))
314
      elif test in (constants.JQT_EXEC, constants.JQT_LOGMSG):
315
        if status != constants.JOB_STATUS_RUNNING:
316
          raise errors.OpExecError("Job status while executing opcode is '%s',"
317
                                   " not '%s' as expected" %
318
                                   (status, constants.JOB_STATUS_RUNNING))
319

    
320
      if test == constants.JQT_STARTMSG:
321
        logging.debug("Expecting %s test messages", arg)
322
        self._testmsgs = []
323
      elif test == constants.JQT_LOGMSG:
324
        if len(self._testmsgs) != arg:
325
          raise errors.OpExecError("Received %s test messages when %s are"
326
                                   " expected" % (len(self._testmsgs), arg))
327
    finally:
328
      logging.debug("Closing socket")
329
      sock.close()
330

    
331

    
332
def TestJobqueue(opts, _):
333
  """Runs a few tests on the job queue.
334

335
  """
336
  _TestJobSubmission(opts)
337

    
338
  (TM_SUCCESS,
339
   TM_MULTISUCCESS,
340
   TM_FAIL,
341
   TM_PARTFAIL) = range(4)
342
  TM_ALL = frozenset([TM_SUCCESS, TM_MULTISUCCESS, TM_FAIL, TM_PARTFAIL])
343

    
344
  for mode in TM_ALL:
345
    test_messages = [
346
      "Testing mode %s" % mode,
347
      "Hello World",
348
      "A",
349
      "",
350
      "B"
351
      "Foo|bar|baz",
352
      utils.TimestampForFilename(),
353
      ]
354

    
355
    fail = mode in (TM_FAIL, TM_PARTFAIL)
356

    
357
    if mode == TM_PARTFAIL:
358
      ToStdout("Testing partial job failure")
359
      ops = [
360
        opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True,
361
                             log_messages=test_messages, fail=False),
362
        opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True,
363
                             log_messages=test_messages, fail=False),
364
        opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True,
365
                             log_messages=test_messages, fail=True),
366
        opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True,
367
                             log_messages=test_messages, fail=False),
368
        ]
369
      expect_messages = 3 * [test_messages]
370
      expect_opstatus = [
371
        constants.OP_STATUS_SUCCESS,
372
        constants.OP_STATUS_SUCCESS,
373
        constants.OP_STATUS_ERROR,
374
        constants.OP_STATUS_ERROR,
375
        ]
376
      expect_resultlen = 2
377
    elif mode == TM_MULTISUCCESS:
378
      ToStdout("Testing multiple successful opcodes")
379
      ops = [
380
        opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True,
381
                             log_messages=test_messages, fail=False),
382
        opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True,
383
                             log_messages=test_messages, fail=False),
384
        ]
385
      expect_messages = 2 * [test_messages]
386
      expect_opstatus = [
387
        constants.OP_STATUS_SUCCESS,
388
        constants.OP_STATUS_SUCCESS,
389
        ]
390
      expect_resultlen = 2
391
    else:
392
      if mode == TM_SUCCESS:
393
        ToStdout("Testing job success")
394
        expect_opstatus = [constants.OP_STATUS_SUCCESS]
395
      elif mode == TM_FAIL:
396
        ToStdout("Testing job failure")
397
        expect_opstatus = [constants.OP_STATUS_ERROR]
398
      else:
399
        raise errors.ProgrammerError("Unknown test mode %s" % mode)
400

    
401
      ops = [
402
        opcodes.OpTestJqueue(notify_waitlock=True,
403
                             notify_exec=True,
404
                             log_messages=test_messages,
405
                             fail=fail)
406
        ]
407
      expect_messages = [test_messages]
408
      expect_resultlen = 1
409

    
410
    cl = cli.GetClient()
411
    cli.SetGenericOpcodeOpts(ops, opts)
412

    
413
    # Send job to master daemon
414
    job_id = cli.SendJob(ops, cl=cl)
415

    
416
    reporter = _JobQueueTestReporter()
417
    results = None
418

    
419
    try:
420
      results = cli.PollJob(job_id, cl=cl, reporter=reporter)
421
    except errors.OpExecError, err:
422
      if not fail:
423
        raise
424
      ToStdout("Ignoring error: %s", err)
425
    else:
426
      if fail:
427
        raise errors.OpExecError("Job didn't fail when it should")
428

    
429
    # Check length of result
430
    if fail:
431
      if results is not None:
432
        raise errors.OpExecError("Received result from failed job")
433
    elif len(results) != expect_resultlen:
434
      raise errors.OpExecError("Received %s results (%s), expected %s" %
435
                               (len(results), results, expect_resultlen))
436

    
437
    # Check received log messages
438
    all_messages = [i for j in expect_messages for i in j]
439
    if reporter.GetTestMessages() != all_messages:
440
      raise errors.OpExecError("Received test messages don't match input"
441
                               " (input %r, received %r)" %
442
                               (all_messages, reporter.GetTestMessages()))
443

    
444
    # Check final status
445
    reported_job_id = reporter.GetJobId()
446
    if reported_job_id != job_id:
447
      raise errors.OpExecError("Reported job ID %s doesn't match"
448
                               "submission job ID %s" %
449
                               (reported_job_id, job_id))
450

    
451
    jobdetails = cli.GetClient().QueryJobs([job_id], ["status", "opstatus"])[0]
452
    if not jobdetails:
453
      raise errors.OpExecError("Can't find job %s" % job_id)
454

    
455
    if fail:
456
      exp_status = constants.JOB_STATUS_ERROR
457
    else:
458
      exp_status = constants.JOB_STATUS_SUCCESS
459

    
460
    (final_status, final_opstatus) = jobdetails
461
    if final_status != exp_status:
462
      raise errors.OpExecError("Final job status is %s, not %s as expected" %
463
                               (final_status, exp_status))
464
    if len(final_opstatus) != len(ops):
465
      raise errors.OpExecError("Did not receive status for all opcodes (got %s,"
466
                               " expected %s)" %
467
                               (len(final_opstatus), len(ops)))
468
    if final_opstatus != expect_opstatus:
469
      raise errors.OpExecError("Opcode status is %s, expected %s" %
470
                               (final_opstatus, expect_opstatus))
471

    
472
  ToStdout("Job queue test successful")
473

    
474
  return 0
475

    
476

    
477
def ListLocks(opts, args): # pylint: disable-msg=W0613
478
  """List all locks.
479

480
  @param opts: the command line options selected by the user
481
  @type args: list
482
  @param args: should be an empty list
483
  @rtype: int
484
  @return: the desired exit code
485

486
  """
487
  selected_fields = ParseFields(opts.output, _LIST_LOCKS_DEF_FIELDS)
488

    
489
  def _DashIfNone(fn):
490
    def wrapper(value):
491
      if not value:
492
        return "-"
493
      return fn(value)
494
    return wrapper
495

    
496
  def _FormatPending(value):
497
    """Format pending acquires.
498

499
    """
500
    return utils.CommaJoin("%s:%s" % (mode, ",".join(threads))
501
                           for mode, threads in value)
502

    
503
  # Format raw values
504
  fmtoverride = {
505
    "mode": (_DashIfNone(str), False),
506
    "owner": (_DashIfNone(",".join), False),
507
    "pending": (_DashIfNone(_FormatPending), False),
508
    }
509

    
510
  while True:
511
    ret = GenericList(constants.QR_LOCK, selected_fields, None, None,
512
                      opts.separator, not opts.no_headers,
513
                      format_override=fmtoverride, verbose=opts.verbose)
514

    
515
    if ret != constants.EXIT_SUCCESS:
516
      return ret
517

    
518
    if not opts.interval:
519
      break
520

    
521
    ToStdout("")
522
    time.sleep(opts.interval)
523

    
524
  return 0
525

    
526

    
527
commands = {
528
  'delay': (
529
    Delay, [ArgUnknown(min=1, max=1)],
530
    [cli_option("--no-master", dest="on_master", default=True,
531
                action="store_false", help="Do not sleep in the master code"),
532
     cli_option("-n", dest="on_nodes", default=[],
533
                action="append", help="Select nodes to sleep on"),
534
     cli_option("-r", "--repeat", type="int", default="0", dest="repeat",
535
                help="Number of times to repeat the sleep"),
536
     DRY_RUN_OPT, PRIORITY_OPT,
537
     ],
538
    "[opts...] <duration>", "Executes a TestDelay OpCode"),
539
  'submit-job': (
540
    GenericOpCodes, [ArgFile(min=1)],
541
    [VERBOSE_OPT,
542
     cli_option("--op-repeat", type="int", default="1", dest="rep_op",
543
                help="Repeat the opcode sequence this number of times"),
544
     cli_option("--job-repeat", type="int", default="1", dest="rep_job",
545
                help="Repeat the job this number of times"),
546
     cli_option("--timing-stats", default=False,
547
                action="store_true", help="Show timing stats"),
548
     cli_option("--each", default=False, action="store_true",
549
                help="Submit each job separately"),
550
     DRY_RUN_OPT, PRIORITY_OPT,
551
     ],
552
    "<op_list_file...>", "Submits jobs built from json files"
553
    " containing a list of serialized opcodes"),
554
  'allocator': (
555
    TestAllocator, [ArgUnknown(min=1)],
556
    [cli_option("--dir", dest="direction",
557
                default="in", choices=["in", "out"],
558
                help="Show allocator input (in) or allocator"
559
                " results (out)"),
560
     IALLOCATOR_OPT,
561
     cli_option("-m", "--mode", default="relocate",
562
                choices=["relocate", "allocate", "multi-evacuate"],
563
                help="Request mode, either allocate or relocate"),
564
     cli_option("--mem", default=128, type="unit",
565
                help="Memory size for the instance (MiB)"),
566
     cli_option("--disks", default="4096,4096",
567
                help="Comma separated list of disk sizes (MiB)"),
568
     DISK_TEMPLATE_OPT,
569
     cli_option("--nics", default="00:11:22:33:44:55",
570
                help="Comma separated list of nics, each nic"
571
                " definition is of form mac/ip/bridge, if"
572
                " missing values are replace by None"),
573
     OS_OPT,
574
     cli_option("-p", "--vcpus", default=1, type="int",
575
                help="Select number of VCPUs for the instance"),
576
     cli_option("--tags", default=None,
577
                help="Comma separated list of tags"),
578
     DRY_RUN_OPT, PRIORITY_OPT,
579
     ],
580
    "{opts...} <instance>", "Executes a TestAllocator OpCode"),
581
  "test-jobqueue": (
582
    TestJobqueue, ARGS_NONE, [PRIORITY_OPT],
583
    "", "Test a few aspects of the job queue"),
584
  "locks": (
585
    ListLocks, ARGS_NONE,
586
    [NOHDR_OPT, SEP_OPT, FIELDS_OPT, INTERVAL_OPT, VERBOSE_OPT],
587
    "[--interval N]", "Show a list of locks in the master daemon"),
588
  }
589

    
590

    
591
def Main():
592
  return GenericMain(commands)