Statistics
| Branch: | Tag: | Revision:

root / lib / client / gnt_debug.py @ b469eb4d

History | View | Annotate | Download (18.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Debugging commands"""
22

    
23
# pylint: disable-msg=W0401,W0614,C0103
24
# W0401: Wildcard import ganeti.cli
25
# W0614: Unused import %s from wildcard import (since we need cli)
26
# C0103: Invalid name gnt-backup
27

    
28
import simplejson
29
import time
30
import socket
31
import logging
32

    
33
from ganeti.cli import *
34
from ganeti import cli
35
from ganeti import constants
36
from ganeti import opcodes
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import compat
40

    
41

    
42
#: Default fields for L{ListLocks}
43
_LIST_LOCKS_DEF_FIELDS = [
44
  "name",
45
  "mode",
46
  "owner",
47
  "pending",
48
  ]
49

    
50

    
51
def Delay(opts, args):
52
  """Sleeps for a while
53

54
  @param opts: the command line options selected by the user
55
  @type args: list
56
  @param args: should contain only one element, the duration
57
      the sleep
58
  @rtype: int
59
  @return: the desired exit code
60

61
  """
62
  delay = float(args[0])
63
  op = opcodes.OpTestDelay(duration=delay,
64
                           on_master=opts.on_master,
65
                           on_nodes=opts.on_nodes,
66
                           repeat=opts.repeat)
67
  SubmitOpCode(op, opts=opts)
68

    
69
  return 0
70

    
71

    
72
def GenericOpCodes(opts, args):
73
  """Send any opcode to the master.
74

75
  @param opts: the command line options selected by the user
76
  @type args: list
77
  @param args: should contain only one element, the path of
78
      the file with the opcode definition
79
  @rtype: int
80
  @return: the desired exit code
81

82
  """
83
  cl = cli.GetClient()
84
  jex = cli.JobExecutor(cl=cl, verbose=opts.verbose, opts=opts)
85

    
86
  job_cnt = 0
87
  op_cnt = 0
88
  if opts.timing_stats:
89
    ToStdout("Loading...")
90
  for job_idx in range(opts.rep_job):
91
    for fname in args:
92
      # pylint: disable-msg=W0142
93
      op_data = simplejson.loads(utils.ReadFile(fname))
94
      op_list = [opcodes.OpCode.LoadOpCode(val) for val in op_data]
95
      op_list = op_list * opts.rep_op
96
      jex.QueueJob("file %s/%d" % (fname, job_idx), *op_list)
97
      op_cnt += len(op_list)
98
      job_cnt += 1
99

    
100
  if opts.timing_stats:
101
    t1 = time.time()
102
    ToStdout("Submitting...")
103

    
104
  jex.SubmitPending(each=opts.each)
105

    
106
  if opts.timing_stats:
107
    t2 = time.time()
108
    ToStdout("Executing...")
109

    
110
  jex.GetResults()
111
  if opts.timing_stats:
112
    t3 = time.time()
113
    ToStdout("C:op     %4d" % op_cnt)
114
    ToStdout("C:job    %4d" % job_cnt)
115
    ToStdout("T:submit %4.4f" % (t2-t1))
116
    ToStdout("T:exec   %4.4f" % (t3-t2))
117
    ToStdout("T:total  %4.4f" % (t3-t1))
118
  return 0
119

    
120

    
121
def TestAllocator(opts, args):
122
  """Runs the test allocator opcode.
123

124
  @param opts: the command line options selected by the user
125
  @type args: list
126
  @param args: should contain only one element, the iallocator name
127
  @rtype: int
128
  @return: the desired exit code
129

130
  """
131
  try:
132
    disks = [{"size": utils.ParseUnit(val), "mode": 'w'}
133
             for val in opts.disks.split(",")]
134
  except errors.UnitParseError, err:
135
    ToStderr("Invalid disks parameter '%s': %s", opts.disks, err)
136
    return 1
137

    
138
  nics = [val.split("/") for val in opts.nics.split(",")]
139
  for row in nics:
140
    while len(row) < 3:
141
      row.append(None)
142
    for i in range(3):
143
      if row[i] == '':
144
        row[i] = None
145
  nic_dict = [{"mac": v[0], "ip": v[1], "bridge": v[2]} for v in nics]
146

    
147
  if opts.tags is None:
148
    opts.tags = []
149
  else:
150
    opts.tags = opts.tags.split(",")
151

    
152
  op = opcodes.OpTestAllocator(mode=opts.mode,
153
                               name=args[0],
154
                               evac_nodes=args,
155
                               mem_size=opts.mem,
156
                               disks=disks,
157
                               disk_template=opts.disk_template,
158
                               nics=nic_dict,
159
                               os=opts.os,
160
                               vcpus=opts.vcpus,
161
                               tags=opts.tags,
162
                               direction=opts.direction,
163
                               allocator=opts.iallocator,
164
                               )
165
  result = SubmitOpCode(op, opts=opts)
166
  ToStdout("%s" % result)
167
  return 0
168

    
169

    
170
def _TestJobSubmission(opts):
171
  """Tests submitting jobs.
172

173
  """
174
  ToStdout("Testing job submission")
175

    
176
  testdata = [
177
    (0, 0, constants.OP_PRIO_LOWEST),
178
    (0, 0, constants.OP_PRIO_HIGHEST),
179
    ]
180

    
181
  for priority in (constants.OP_PRIO_SUBMIT_VALID |
182
                   frozenset([constants.OP_PRIO_LOWEST,
183
                              constants.OP_PRIO_HIGHEST])):
184
    for offset in [-1, +1]:
185
      testdata.extend([
186
        (0, 0, priority + offset),
187
        (3, 0, priority + offset),
188
        (0, 3, priority + offset),
189
        (4, 2, priority + offset),
190
        ])
191

    
192
  cl = cli.GetClient()
193

    
194
  for before, after, failpriority in testdata:
195
    ops = []
196
    ops.extend([opcodes.OpTestDelay(duration=0) for _ in range(before)])
197
    ops.append(opcodes.OpTestDelay(duration=0, priority=failpriority))
198
    ops.extend([opcodes.OpTestDelay(duration=0) for _ in range(after)])
199

    
200
    try:
201
      cl.SubmitJob(ops)
202
    except errors.GenericError, err:
203
      if opts.debug:
204
        ToStdout("Ignoring error: %s", err)
205
    else:
206
      raise errors.OpExecError("Submitting opcode with priority %s did not"
207
                               " fail when it should (allowed are %s)" %
208
                               (failpriority, constants.OP_PRIO_SUBMIT_VALID))
209

    
210
    jobs = [
211
      [opcodes.OpTestDelay(duration=0),
212
       opcodes.OpTestDelay(duration=0, dry_run=False),
213
       opcodes.OpTestDelay(duration=0, dry_run=True)],
214
      ops,
215
      ]
216
    result = cl.SubmitManyJobs(jobs)
217
    if not (len(result) == 2 and
218
            compat.all(len(i) == 2 for i in result) and
219
            compat.all(isinstance(i[1], basestring) for i in result) and
220
            result[0][0] and not result[1][0]):
221
      raise errors.OpExecError("Submitting multiple jobs did not work as"
222
                               " expected, result %s" % result)
223
    assert len(result) == 2
224

    
225
  ToStdout("Job submission tests were successful")
226

    
227

    
228
class _JobQueueTestReporter(cli.StdioJobPollReportCb):
229
  def __init__(self):
230
    """Initializes this class.
231

232
    """
233
    cli.StdioJobPollReportCb.__init__(self)
234
    self._expected_msgcount = 0
235
    self._all_testmsgs = []
236
    self._testmsgs = None
237
    self._job_id = None
238

    
239
  def GetTestMessages(self):
240
    """Returns all test log messages received so far.
241

242
    """
243
    return self._all_testmsgs
244

    
245
  def GetJobId(self):
246
    """Returns the job ID.
247

248
    """
249
    return self._job_id
250

    
251
  def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
252
    """Handles a log message.
253

254
    """
255
    if self._job_id is None:
256
      self._job_id = job_id
257
    elif self._job_id != job_id:
258
      raise errors.ProgrammerError("The same reporter instance was used for"
259
                                   " more than one job")
260

    
261
    if log_type == constants.ELOG_JQUEUE_TEST:
262
      (sockname, test, arg) = log_msg
263
      return self._ProcessTestMessage(job_id, sockname, test, arg)
264

    
265
    elif (log_type == constants.ELOG_MESSAGE and
266
          log_msg.startswith(constants.JQT_MSGPREFIX)):
267
      if self._testmsgs is None:
268
        raise errors.OpExecError("Received test message without a preceding"
269
                                 " start message")
270
      testmsg = log_msg[len(constants.JQT_MSGPREFIX):]
271
      self._testmsgs.append(testmsg)
272
      self._all_testmsgs.append(testmsg)
273
      return
274

    
275
    return cli.StdioJobPollReportCb.ReportLogMessage(self, job_id, serial,
276
                                                     timestamp, log_type,
277
                                                     log_msg)
278

    
279
  def _ProcessTestMessage(self, job_id, sockname, test, arg):
280
    """Handles a job queue test message.
281

282
    """
283
    if test not in constants.JQT_ALL:
284
      raise errors.OpExecError("Received invalid test message %s" % test)
285

    
286
    sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
287
    try:
288
      sock.settimeout(30.0)
289

    
290
      logging.debug("Connecting to %s", sockname)
291
      sock.connect(sockname)
292

    
293
      logging.debug("Checking status")
294
      jobdetails = cli.GetClient().QueryJobs([job_id], ["status"])[0]
295
      if not jobdetails:
296
        raise errors.OpExecError("Can't find job %s" % job_id)
297

    
298
      status = jobdetails[0]
299

    
300
      logging.debug("Status of job %s is %s", job_id, status)
301

    
302
      if test == constants.JQT_EXPANDNAMES:
303
        if status != constants.JOB_STATUS_WAITLOCK:
304
          raise errors.OpExecError("Job status while expanding names is '%s',"
305
                                   " not '%s' as expected" %
306
                                   (status, constants.JOB_STATUS_WAITLOCK))
307
      elif test in (constants.JQT_EXEC, constants.JQT_LOGMSG):
308
        if status != constants.JOB_STATUS_RUNNING:
309
          raise errors.OpExecError("Job status while executing opcode is '%s',"
310
                                   " not '%s' as expected" %
311
                                   (status, constants.JOB_STATUS_RUNNING))
312

    
313
      if test == constants.JQT_STARTMSG:
314
        logging.debug("Expecting %s test messages", arg)
315
        self._testmsgs = []
316
      elif test == constants.JQT_LOGMSG:
317
        if len(self._testmsgs) != arg:
318
          raise errors.OpExecError("Received %s test messages when %s are"
319
                                   " expected" % (len(self._testmsgs), arg))
320
    finally:
321
      logging.debug("Closing socket")
322
      sock.close()
323

    
324

    
325
def TestJobqueue(opts, _):
326
  """Runs a few tests on the job queue.
327

328
  """
329
  _TestJobSubmission(opts)
330

    
331
  (TM_SUCCESS,
332
   TM_MULTISUCCESS,
333
   TM_FAIL,
334
   TM_PARTFAIL) = range(4)
335
  TM_ALL = frozenset([TM_SUCCESS, TM_MULTISUCCESS, TM_FAIL, TM_PARTFAIL])
336

    
337
  for mode in TM_ALL:
338
    test_messages = [
339
      "Testing mode %s" % mode,
340
      "Hello World",
341
      "A",
342
      "",
343
      "B"
344
      "Foo|bar|baz",
345
      utils.TimestampForFilename(),
346
      ]
347

    
348
    fail = mode in (TM_FAIL, TM_PARTFAIL)
349

    
350
    if mode == TM_PARTFAIL:
351
      ToStdout("Testing partial job failure")
352
      ops = [
353
        opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True,
354
                             log_messages=test_messages, fail=False),
355
        opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True,
356
                             log_messages=test_messages, fail=False),
357
        opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True,
358
                             log_messages=test_messages, fail=True),
359
        opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True,
360
                             log_messages=test_messages, fail=False),
361
        ]
362
      expect_messages = 3 * [test_messages]
363
      expect_opstatus = [
364
        constants.OP_STATUS_SUCCESS,
365
        constants.OP_STATUS_SUCCESS,
366
        constants.OP_STATUS_ERROR,
367
        constants.OP_STATUS_ERROR,
368
        ]
369
      expect_resultlen = 2
370
    elif mode == TM_MULTISUCCESS:
371
      ToStdout("Testing multiple successful opcodes")
372
      ops = [
373
        opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True,
374
                             log_messages=test_messages, fail=False),
375
        opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True,
376
                             log_messages=test_messages, fail=False),
377
        ]
378
      expect_messages = 2 * [test_messages]
379
      expect_opstatus = [
380
        constants.OP_STATUS_SUCCESS,
381
        constants.OP_STATUS_SUCCESS,
382
        ]
383
      expect_resultlen = 2
384
    else:
385
      if mode == TM_SUCCESS:
386
        ToStdout("Testing job success")
387
        expect_opstatus = [constants.OP_STATUS_SUCCESS]
388
      elif mode == TM_FAIL:
389
        ToStdout("Testing job failure")
390
        expect_opstatus = [constants.OP_STATUS_ERROR]
391
      else:
392
        raise errors.ProgrammerError("Unknown test mode %s" % mode)
393

    
394
      ops = [
395
        opcodes.OpTestJqueue(notify_waitlock=True,
396
                             notify_exec=True,
397
                             log_messages=test_messages,
398
                             fail=fail)
399
        ]
400
      expect_messages = [test_messages]
401
      expect_resultlen = 1
402

    
403
    cl = cli.GetClient()
404
    cli.SetGenericOpcodeOpts(ops, opts)
405

    
406
    # Send job to master daemon
407
    job_id = cli.SendJob(ops, cl=cl)
408

    
409
    reporter = _JobQueueTestReporter()
410
    results = None
411

    
412
    try:
413
      results = cli.PollJob(job_id, cl=cl, reporter=reporter)
414
    except errors.OpExecError, err:
415
      if not fail:
416
        raise
417
      ToStdout("Ignoring error: %s", err)
418
    else:
419
      if fail:
420
        raise errors.OpExecError("Job didn't fail when it should")
421

    
422
    # Check length of result
423
    if fail:
424
      if results is not None:
425
        raise errors.OpExecError("Received result from failed job")
426
    elif len(results) != expect_resultlen:
427
      raise errors.OpExecError("Received %s results (%s), expected %s" %
428
                               (len(results), results, expect_resultlen))
429

    
430
    # Check received log messages
431
    all_messages = [i for j in expect_messages for i in j]
432
    if reporter.GetTestMessages() != all_messages:
433
      raise errors.OpExecError("Received test messages don't match input"
434
                               " (input %r, received %r)" %
435
                               (all_messages, reporter.GetTestMessages()))
436

    
437
    # Check final status
438
    reported_job_id = reporter.GetJobId()
439
    if reported_job_id != job_id:
440
      raise errors.OpExecError("Reported job ID %s doesn't match"
441
                               "submission job ID %s" %
442
                               (reported_job_id, job_id))
443

    
444
    jobdetails = cli.GetClient().QueryJobs([job_id], ["status", "opstatus"])[0]
445
    if not jobdetails:
446
      raise errors.OpExecError("Can't find job %s" % job_id)
447

    
448
    if fail:
449
      exp_status = constants.JOB_STATUS_ERROR
450
    else:
451
      exp_status = constants.JOB_STATUS_SUCCESS
452

    
453
    (final_status, final_opstatus) = jobdetails
454
    if final_status != exp_status:
455
      raise errors.OpExecError("Final job status is %s, not %s as expected" %
456
                               (final_status, exp_status))
457
    if len(final_opstatus) != len(ops):
458
      raise errors.OpExecError("Did not receive status for all opcodes (got %s,"
459
                               " expected %s)" %
460
                               (len(final_opstatus), len(ops)))
461
    if final_opstatus != expect_opstatus:
462
      raise errors.OpExecError("Opcode status is %s, expected %s" %
463
                               (final_opstatus, expect_opstatus))
464

    
465
  ToStdout("Job queue test successful")
466

    
467
  return 0
468

    
469

    
470
def ListLocks(opts, args): # pylint: disable-msg=W0613
471
  """List all locks.
472

473
  @param opts: the command line options selected by the user
474
  @type args: list
475
  @param args: should be an empty list
476
  @rtype: int
477
  @return: the desired exit code
478

479
  """
480
  selected_fields = ParseFields(opts.output, _LIST_LOCKS_DEF_FIELDS)
481

    
482
  def _DashIfNone(fn):
483
    def wrapper(value):
484
      if not value:
485
        return "-"
486
      return fn(value)
487
    return wrapper
488

    
489
  def _FormatPending(value):
490
    """Format pending acquires.
491

492
    """
493
    return utils.CommaJoin("%s:%s" % (mode, ",".join(threads))
494
                           for mode, threads in value)
495

    
496
  # Format raw values
497
  fmtoverride = {
498
    "mode": (_DashIfNone(str), False),
499
    "owner": (_DashIfNone(",".join), False),
500
    "pending": (_DashIfNone(_FormatPending), False),
501
    }
502

    
503
  while True:
504
    ret = GenericList(constants.QR_LOCK, selected_fields, None, None,
505
                      opts.separator, not opts.no_headers,
506
                      format_override=fmtoverride)
507

    
508
    if ret != constants.EXIT_SUCCESS:
509
      return ret
510

    
511
    if not opts.interval:
512
      break
513

    
514
    ToStdout("")
515
    time.sleep(opts.interval)
516

    
517
  return 0
518

    
519

    
520
commands = {
521
  'delay': (
522
    Delay, [ArgUnknown(min=1, max=1)],
523
    [cli_option("--no-master", dest="on_master", default=True,
524
                action="store_false", help="Do not sleep in the master code"),
525
     cli_option("-n", dest="on_nodes", default=[],
526
                action="append", help="Select nodes to sleep on"),
527
     cli_option("-r", "--repeat", type="int", default="0", dest="repeat",
528
                help="Number of times to repeat the sleep"),
529
     DRY_RUN_OPT, PRIORITY_OPT,
530
     ],
531
    "[opts...] <duration>", "Executes a TestDelay OpCode"),
532
  'submit-job': (
533
    GenericOpCodes, [ArgFile(min=1)],
534
    [VERBOSE_OPT,
535
     cli_option("--op-repeat", type="int", default="1", dest="rep_op",
536
                help="Repeat the opcode sequence this number of times"),
537
     cli_option("--job-repeat", type="int", default="1", dest="rep_job",
538
                help="Repeat the job this number of times"),
539
     cli_option("--timing-stats", default=False,
540
                action="store_true", help="Show timing stats"),
541
     cli_option("--each", default=False, action="store_true",
542
                help="Submit each job separately"),
543
     DRY_RUN_OPT, PRIORITY_OPT,
544
     ],
545
    "<op_list_file...>", "Submits jobs built from json files"
546
    " containing a list of serialized opcodes"),
547
  'allocator': (
548
    TestAllocator, [ArgUnknown(min=1)],
549
    [cli_option("--dir", dest="direction",
550
                default="in", choices=["in", "out"],
551
                help="Show allocator input (in) or allocator"
552
                " results (out)"),
553
     IALLOCATOR_OPT,
554
     cli_option("-m", "--mode", default="relocate",
555
                choices=["relocate", "allocate", "multi-evacuate"],
556
                help="Request mode, either allocate or relocate"),
557
     cli_option("--mem", default=128, type="unit",
558
                help="Memory size for the instance (MiB)"),
559
     cli_option("--disks", default="4096,4096",
560
                help="Comma separated list of disk sizes (MiB)"),
561
     DISK_TEMPLATE_OPT,
562
     cli_option("--nics", default="00:11:22:33:44:55",
563
                help="Comma separated list of nics, each nic"
564
                " definition is of form mac/ip/bridge, if"
565
                " missing values are replace by None"),
566
     OS_OPT,
567
     cli_option("-p", "--vcpus", default=1, type="int",
568
                help="Select number of VCPUs for the instance"),
569
     cli_option("--tags", default=None,
570
                help="Comma separated list of tags"),
571
     DRY_RUN_OPT, PRIORITY_OPT,
572
     ],
573
    "{opts...} <instance>", "Executes a TestAllocator OpCode"),
574
  "test-jobqueue": (
575
    TestJobqueue, ARGS_NONE, [PRIORITY_OPT],
576
    "", "Test a few aspects of the job queue"),
577
  "locks": (
578
    ListLocks, ARGS_NONE, [NOHDR_OPT, SEP_OPT, FIELDS_OPT, INTERVAL_OPT],
579
    "[--interval N]", "Show a list of locks in the master daemon"),
580
  }
581

    
582

    
583
def Main():
584
  return GenericMain(commands)