Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 0e3baaf3

History | View | Annotate | Download (337.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0201
25

    
26
# W0201 since most LU attributes are defined in CheckPrereq or similar
27
# functions
28

    
29
import os
30
import os.path
31
import time
32
import re
33
import platform
34
import logging
35
import copy
36
import OpenSSL
37

    
38
from ganeti import ssh
39
from ganeti import utils
40
from ganeti import errors
41
from ganeti import hypervisor
42
from ganeti import locking
43
from ganeti import constants
44
from ganeti import objects
45
from ganeti import serializer
46
from ganeti import ssconf
47

    
48

    
49
class LogicalUnit(object):
50
  """Logical Unit base class.
51

52
  Subclasses must follow these rules:
53
    - implement ExpandNames
54
    - implement CheckPrereq (except when tasklets are used)
55
    - implement Exec (except when tasklets are used)
56
    - implement BuildHooksEnv
57
    - redefine HPATH and HTYPE
58
    - optionally redefine their run requirements:
59
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60

61
  Note that all commands require root permissions.
62

63
  @ivar dry_run_result: the value (if any) that will be returned to the caller
64
      in dry-run mode (signalled by opcode dry_run parameter)
65

66
  """
67
  HPATH = None
68
  HTYPE = None
69
  _OP_REQP = []
70
  REQ_BGL = True
71

    
72
  def __init__(self, processor, op, context, rpc):
73
    """Constructor for LogicalUnit.
74

75
    This needs to be overridden in derived classes in order to check op
76
    validity.
77

78
    """
79
    self.proc = processor
80
    self.op = op
81
    self.cfg = context.cfg
82
    self.context = context
83
    self.rpc = rpc
84
    # Dicts used to declare locking needs to mcpu
85
    self.needed_locks = None
86
    self.acquired_locks = {}
87
    self.share_locks = dict.fromkeys(locking.LEVELS, 0)
88
    self.add_locks = {}
89
    self.remove_locks = {}
90
    # Used to force good behavior when calling helper functions
91
    self.recalculate_locks = {}
92
    self.__ssh = None
93
    # logging
94
    self.LogWarning = processor.LogWarning # pylint: disable-msg=C0103
95
    self.LogInfo = processor.LogInfo # pylint: disable-msg=C0103
96
    self.LogStep = processor.LogStep # pylint: disable-msg=C0103
97
    # support for dry-run
98
    self.dry_run_result = None
99
    # support for generic debug attribute
100
    if (not hasattr(self.op, "debug_level") or
101
        not isinstance(self.op.debug_level, int)):
102
      self.op.debug_level = 0
103

    
104
    # Tasklets
105
    self.tasklets = None
106

    
107
    for attr_name in self._OP_REQP:
108
      attr_val = getattr(op, attr_name, None)
109
      if attr_val is None:
110
        raise errors.OpPrereqError("Required parameter '%s' missing" %
111
                                   attr_name, errors.ECODE_INVAL)
112

    
113
    self.CheckArguments()
114

    
115
  def __GetSSH(self):
116
    """Returns the SshRunner object
117

118
    """
119
    if not self.__ssh:
120
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
121
    return self.__ssh
122

    
123
  ssh = property(fget=__GetSSH)
124

    
125
  def CheckArguments(self):
126
    """Check syntactic validity for the opcode arguments.
127

128
    This method is for doing a simple syntactic check and ensure
129
    validity of opcode parameters, without any cluster-related
130
    checks. While the same can be accomplished in ExpandNames and/or
131
    CheckPrereq, doing these separate is better because:
132

133
      - ExpandNames is left as as purely a lock-related function
134
      - CheckPrereq is run after we have acquired locks (and possible
135
        waited for them)
136

137
    The function is allowed to change the self.op attribute so that
138
    later methods can no longer worry about missing parameters.
139

140
    """
141
    pass
142

    
143
  def ExpandNames(self):
144
    """Expand names for this LU.
145

146
    This method is called before starting to execute the opcode, and it should
147
    update all the parameters of the opcode to their canonical form (e.g. a
148
    short node name must be fully expanded after this method has successfully
149
    completed). This way locking, hooks, logging, ecc. can work correctly.
150

151
    LUs which implement this method must also populate the self.needed_locks
152
    member, as a dict with lock levels as keys, and a list of needed lock names
153
    as values. Rules:
154

155
      - use an empty dict if you don't need any lock
156
      - if you don't need any lock at a particular level omit that level
157
      - don't put anything for the BGL level
158
      - if you want all locks at a level use locking.ALL_SET as a value
159

160
    If you need to share locks (rather than acquire them exclusively) at one
161
    level you can modify self.share_locks, setting a true value (usually 1) for
162
    that level. By default locks are not shared.
163

164
    This function can also define a list of tasklets, which then will be
165
    executed in order instead of the usual LU-level CheckPrereq and Exec
166
    functions, if those are not defined by the LU.
167

168
    Examples::
169

170
      # Acquire all nodes and one instance
171
      self.needed_locks = {
172
        locking.LEVEL_NODE: locking.ALL_SET,
173
        locking.LEVEL_INSTANCE: ['instance1.example.tld'],
174
      }
175
      # Acquire just two nodes
176
      self.needed_locks = {
177
        locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
178
      }
179
      # Acquire no locks
180
      self.needed_locks = {} # No, you can't leave it to the default value None
181

182
    """
183
    # The implementation of this method is mandatory only if the new LU is
184
    # concurrent, so that old LUs don't need to be changed all at the same
185
    # time.
186
    if self.REQ_BGL:
187
      self.needed_locks = {} # Exclusive LUs don't need locks.
188
    else:
189
      raise NotImplementedError
190

    
191
  def DeclareLocks(self, level):
192
    """Declare LU locking needs for a level
193

194
    While most LUs can just declare their locking needs at ExpandNames time,
195
    sometimes there's the need to calculate some locks after having acquired
196
    the ones before. This function is called just before acquiring locks at a
197
    particular level, but after acquiring the ones at lower levels, and permits
198
    such calculations. It can be used to modify self.needed_locks, and by
199
    default it does nothing.
200

201
    This function is only called if you have something already set in
202
    self.needed_locks for the level.
203

204
    @param level: Locking level which is going to be locked
205
    @type level: member of ganeti.locking.LEVELS
206

207
    """
208

    
209
  def CheckPrereq(self):
210
    """Check prerequisites for this LU.
211

212
    This method should check that the prerequisites for the execution
213
    of this LU are fulfilled. It can do internode communication, but
214
    it should be idempotent - no cluster or system changes are
215
    allowed.
216

217
    The method should raise errors.OpPrereqError in case something is
218
    not fulfilled. Its return value is ignored.
219

220
    This method should also update all the parameters of the opcode to
221
    their canonical form if it hasn't been done by ExpandNames before.
222

223
    """
224
    if self.tasklets is not None:
225
      for (idx, tl) in enumerate(self.tasklets):
226
        logging.debug("Checking prerequisites for tasklet %s/%s",
227
                      idx + 1, len(self.tasklets))
228
        tl.CheckPrereq()
229
    else:
230
      raise NotImplementedError
231

    
232
  def Exec(self, feedback_fn):
233
    """Execute the LU.
234

235
    This method should implement the actual work. It should raise
236
    errors.OpExecError for failures that are somewhat dealt with in
237
    code, or expected.
238

239
    """
240
    if self.tasklets is not None:
241
      for (idx, tl) in enumerate(self.tasklets):
242
        logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
243
        tl.Exec(feedback_fn)
244
    else:
245
      raise NotImplementedError
246

    
247
  def BuildHooksEnv(self):
248
    """Build hooks environment for this LU.
249

250
    This method should return a three-node tuple consisting of: a dict
251
    containing the environment that will be used for running the
252
    specific hook for this LU, a list of node names on which the hook
253
    should run before the execution, and a list of node names on which
254
    the hook should run after the execution.
255

256
    The keys of the dict must not have 'GANETI_' prefixed as this will
257
    be handled in the hooks runner. Also note additional keys will be
258
    added by the hooks runner. If the LU doesn't define any
259
    environment, an empty dict (and not None) should be returned.
260

261
    No nodes should be returned as an empty list (and not None).
262

263
    Note that if the HPATH for a LU class is None, this function will
264
    not be called.
265

266
    """
267
    raise NotImplementedError
268

    
269
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
270
    """Notify the LU about the results of its hooks.
271

272
    This method is called every time a hooks phase is executed, and notifies
273
    the Logical Unit about the hooks' result. The LU can then use it to alter
274
    its result based on the hooks.  By default the method does nothing and the
275
    previous result is passed back unchanged but any LU can define it if it
276
    wants to use the local cluster hook-scripts somehow.
277

278
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
279
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
280
    @param hook_results: the results of the multi-node hooks rpc call
281
    @param feedback_fn: function used send feedback back to the caller
282
    @param lu_result: the previous Exec result this LU had, or None
283
        in the PRE phase
284
    @return: the new Exec result, based on the previous result
285
        and hook results
286

287
    """
288
    # API must be kept, thus we ignore the unused argument and could
289
    # be a function warnings
290
    # pylint: disable-msg=W0613,R0201
291
    return lu_result
292

    
293
  def _ExpandAndLockInstance(self):
294
    """Helper function to expand and lock an instance.
295

296
    Many LUs that work on an instance take its name in self.op.instance_name
297
    and need to expand it and then declare the expanded name for locking. This
298
    function does it, and then updates self.op.instance_name to the expanded
299
    name. It also initializes needed_locks as a dict, if this hasn't been done
300
    before.
301

302
    """
303
    if self.needed_locks is None:
304
      self.needed_locks = {}
305
    else:
306
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
307
        "_ExpandAndLockInstance called with instance-level locks set"
308
    self.op.instance_name = _ExpandInstanceName(self.cfg,
309
                                                self.op.instance_name)
310
    self.needed_locks[locking.LEVEL_INSTANCE] = self.op.instance_name
311

    
312
  def _LockInstancesNodes(self, primary_only=False):
313
    """Helper function to declare instances' nodes for locking.
314

315
    This function should be called after locking one or more instances to lock
316
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
317
    with all primary or secondary nodes for instances already locked and
318
    present in self.needed_locks[locking.LEVEL_INSTANCE].
319

320
    It should be called from DeclareLocks, and for safety only works if
321
    self.recalculate_locks[locking.LEVEL_NODE] is set.
322

323
    In the future it may grow parameters to just lock some instance's nodes, or
324
    to just lock primaries or secondary nodes, if needed.
325

326
    If should be called in DeclareLocks in a way similar to::
327

328
      if level == locking.LEVEL_NODE:
329
        self._LockInstancesNodes()
330

331
    @type primary_only: boolean
332
    @param primary_only: only lock primary nodes of locked instances
333

334
    """
335
    assert locking.LEVEL_NODE in self.recalculate_locks, \
336
      "_LockInstancesNodes helper function called with no nodes to recalculate"
337

    
338
    # TODO: check if we're really been called with the instance locks held
339

    
340
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
341
    # future we might want to have different behaviors depending on the value
342
    # of self.recalculate_locks[locking.LEVEL_NODE]
343
    wanted_nodes = []
344
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
345
      instance = self.context.cfg.GetInstanceInfo(instance_name)
346
      wanted_nodes.append(instance.primary_node)
347
      if not primary_only:
348
        wanted_nodes.extend(instance.secondary_nodes)
349

    
350
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
351
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
352
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
353
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
354

    
355
    del self.recalculate_locks[locking.LEVEL_NODE]
356

    
357

    
358
class NoHooksLU(LogicalUnit): # pylint: disable-msg=W0223
359
  """Simple LU which runs no hooks.
360

361
  This LU is intended as a parent for other LogicalUnits which will
362
  run no hooks, in order to reduce duplicate code.
363

364
  """
365
  HPATH = None
366
  HTYPE = None
367

    
368
  def BuildHooksEnv(self):
369
    """Empty BuildHooksEnv for NoHooksLu.
370

371
    This just raises an error.
372

373
    """
374
    assert False, "BuildHooksEnv called for NoHooksLUs"
375

    
376

    
377
class Tasklet:
378
  """Tasklet base class.
379

380
  Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
381
  they can mix legacy code with tasklets. Locking needs to be done in the LU,
382
  tasklets know nothing about locks.
383

384
  Subclasses must follow these rules:
385
    - Implement CheckPrereq
386
    - Implement Exec
387

388
  """
389
  def __init__(self, lu):
390
    self.lu = lu
391

    
392
    # Shortcuts
393
    self.cfg = lu.cfg
394
    self.rpc = lu.rpc
395

    
396
  def CheckPrereq(self):
397
    """Check prerequisites for this tasklets.
398

399
    This method should check whether the prerequisites for the execution of
400
    this tasklet are fulfilled. It can do internode communication, but it
401
    should be idempotent - no cluster or system changes are allowed.
402

403
    The method should raise errors.OpPrereqError in case something is not
404
    fulfilled. Its return value is ignored.
405

406
    This method should also update all parameters to their canonical form if it
407
    hasn't been done before.
408

409
    """
410
    raise NotImplementedError
411

    
412
  def Exec(self, feedback_fn):
413
    """Execute the tasklet.
414

415
    This method should implement the actual work. It should raise
416
    errors.OpExecError for failures that are somewhat dealt with in code, or
417
    expected.
418

419
    """
420
    raise NotImplementedError
421

    
422

    
423
def _GetWantedNodes(lu, nodes):
424
  """Returns list of checked and expanded node names.
425

426
  @type lu: L{LogicalUnit}
427
  @param lu: the logical unit on whose behalf we execute
428
  @type nodes: list
429
  @param nodes: list of node names or None for all nodes
430
  @rtype: list
431
  @return: the list of nodes, sorted
432
  @raise errors.ProgrammerError: if the nodes parameter is wrong type
433

434
  """
435
  if not isinstance(nodes, list):
436
    raise errors.OpPrereqError("Invalid argument type 'nodes'",
437
                               errors.ECODE_INVAL)
438

    
439
  if not nodes:
440
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
441
      " non-empty list of nodes whose name is to be expanded.")
442

    
443
  wanted = [_ExpandNodeName(lu.cfg, name) for name in nodes]
444
  return utils.NiceSort(wanted)
445

    
446

    
447
def _GetWantedInstances(lu, instances):
448
  """Returns list of checked and expanded instance names.
449

450
  @type lu: L{LogicalUnit}
451
  @param lu: the logical unit on whose behalf we execute
452
  @type instances: list
453
  @param instances: list of instance names or None for all instances
454
  @rtype: list
455
  @return: the list of instances, sorted
456
  @raise errors.OpPrereqError: if the instances parameter is wrong type
457
  @raise errors.OpPrereqError: if any of the passed instances is not found
458

459
  """
460
  if not isinstance(instances, list):
461
    raise errors.OpPrereqError("Invalid argument type 'instances'",
462
                               errors.ECODE_INVAL)
463

    
464
  if instances:
465
    wanted = [_ExpandInstanceName(lu.cfg, name) for name in instances]
466
  else:
467
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
468
  return wanted
469

    
470

    
471
def _CheckOutputFields(static, dynamic, selected):
472
  """Checks whether all selected fields are valid.
473

474
  @type static: L{utils.FieldSet}
475
  @param static: static fields set
476
  @type dynamic: L{utils.FieldSet}
477
  @param dynamic: dynamic fields set
478

479
  """
480
  f = utils.FieldSet()
481
  f.Extend(static)
482
  f.Extend(dynamic)
483

    
484
  delta = f.NonMatching(selected)
485
  if delta:
486
    raise errors.OpPrereqError("Unknown output fields selected: %s"
487
                               % ",".join(delta), errors.ECODE_INVAL)
488

    
489

    
490
def _CheckBooleanOpField(op, name):
491
  """Validates boolean opcode parameters.
492

493
  This will ensure that an opcode parameter is either a boolean value,
494
  or None (but that it always exists).
495

496
  """
497
  val = getattr(op, name, None)
498
  if not (val is None or isinstance(val, bool)):
499
    raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
500
                               (name, str(val)), errors.ECODE_INVAL)
501
  setattr(op, name, val)
502

    
503

    
504
def _CheckGlobalHvParams(params):
505
  """Validates that given hypervisor params are not global ones.
506

507
  This will ensure that instances don't get customised versions of
508
  global params.
509

510
  """
511
  used_globals = constants.HVC_GLOBALS.intersection(params)
512
  if used_globals:
513
    msg = ("The following hypervisor parameters are global and cannot"
514
           " be customized at instance level, please modify them at"
515
           " cluster level: %s" % utils.CommaJoin(used_globals))
516
    raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
517

    
518

    
519
def _CheckNodeOnline(lu, node):
520
  """Ensure that a given node is online.
521

522
  @param lu: the LU on behalf of which we make the check
523
  @param node: the node to check
524
  @raise errors.OpPrereqError: if the node is offline
525

526
  """
527
  if lu.cfg.GetNodeInfo(node).offline:
528
    raise errors.OpPrereqError("Can't use offline node %s" % node,
529
                               errors.ECODE_INVAL)
530

    
531

    
532
def _CheckNodeNotDrained(lu, node):
533
  """Ensure that a given node is not drained.
534

535
  @param lu: the LU on behalf of which we make the check
536
  @param node: the node to check
537
  @raise errors.OpPrereqError: if the node is drained
538

539
  """
540
  if lu.cfg.GetNodeInfo(node).drained:
541
    raise errors.OpPrereqError("Can't use drained node %s" % node,
542
                               errors.ECODE_INVAL)
543

    
544

    
545
def _CheckNodeHasOS(lu, node, os_name, force_variant):
546
  """Ensure that a node supports a given OS.
547

548
  @param lu: the LU on behalf of which we make the check
549
  @param node: the node to check
550
  @param os_name: the OS to query about
551
  @param force_variant: whether to ignore variant errors
552
  @raise errors.OpPrereqError: if the node is not supporting the OS
553

554
  """
555
  result = lu.rpc.call_os_get(node, os_name)
556
  result.Raise("OS '%s' not in supported OS list for node %s" %
557
               (os_name, node),
558
               prereq=True, ecode=errors.ECODE_INVAL)
559
  if not force_variant:
560
    _CheckOSVariant(result.payload, os_name)
561

    
562

    
563
def _RequireFileStorage():
564
  """Checks that file storage is enabled.
565

566
  @raise errors.OpPrereqError: when file storage is disabled
567

568
  """
569
  if not constants.ENABLE_FILE_STORAGE:
570
    raise errors.OpPrereqError("File storage disabled at configure time",
571
                               errors.ECODE_INVAL)
572

    
573

    
574
def _CheckDiskTemplate(template):
575
  """Ensure a given disk template is valid.
576

577
  """
578
  if template not in constants.DISK_TEMPLATES:
579
    msg = ("Invalid disk template name '%s', valid templates are: %s" %
580
           (template, utils.CommaJoin(constants.DISK_TEMPLATES)))
581
    raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
582
  if template == constants.DT_FILE:
583
    _RequireFileStorage()
584

    
585

    
586
def _CheckStorageType(storage_type):
587
  """Ensure a given storage type is valid.
588

589
  """
590
  if storage_type not in constants.VALID_STORAGE_TYPES:
591
    raise errors.OpPrereqError("Unknown storage type: %s" % storage_type,
592
                               errors.ECODE_INVAL)
593
  if storage_type == constants.ST_FILE:
594
    _RequireFileStorage()
595

    
596

    
597

    
598
def _CheckInstanceDown(lu, instance, reason):
599
  """Ensure that an instance is not running."""
600
  if instance.admin_up:
601
    raise errors.OpPrereqError("Instance %s is marked to be up, %s" %
602
                               (instance.name, reason), errors.ECODE_STATE)
603

    
604
  pnode = instance.primary_node
605
  ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
606
  ins_l.Raise("Can't contact node %s for instance information" % pnode,
607
              prereq=True, ecode=errors.ECODE_ENVIRON)
608

    
609
  if instance.name in ins_l.payload:
610
    raise errors.OpPrereqError("Instance %s is running, %s" %
611
                               (instance.name, reason), errors.ECODE_STATE)
612

    
613

    
614
def _ExpandItemName(fn, name, kind):
615
  """Expand an item name.
616

617
  @param fn: the function to use for expansion
618
  @param name: requested item name
619
  @param kind: text description ('Node' or 'Instance')
620
  @return: the resolved (full) name
621
  @raise errors.OpPrereqError: if the item is not found
622

623
  """
624
  full_name = fn(name)
625
  if full_name is None:
626
    raise errors.OpPrereqError("%s '%s' not known" % (kind, name),
627
                               errors.ECODE_NOENT)
628
  return full_name
629

    
630

    
631
def _ExpandNodeName(cfg, name):
632
  """Wrapper over L{_ExpandItemName} for nodes."""
633
  return _ExpandItemName(cfg.ExpandNodeName, name, "Node")
634

    
635

    
636
def _ExpandInstanceName(cfg, name):
637
  """Wrapper over L{_ExpandItemName} for instance."""
638
  return _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
639

    
640

    
641
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
642
                          memory, vcpus, nics, disk_template, disks,
643
                          bep, hvp, hypervisor_name):
644
  """Builds instance related env variables for hooks
645

646
  This builds the hook environment from individual variables.
647

648
  @type name: string
649
  @param name: the name of the instance
650
  @type primary_node: string
651
  @param primary_node: the name of the instance's primary node
652
  @type secondary_nodes: list
653
  @param secondary_nodes: list of secondary nodes as strings
654
  @type os_type: string
655
  @param os_type: the name of the instance's OS
656
  @type status: boolean
657
  @param status: the should_run status of the instance
658
  @type memory: string
659
  @param memory: the memory size of the instance
660
  @type vcpus: string
661
  @param vcpus: the count of VCPUs the instance has
662
  @type nics: list
663
  @param nics: list of tuples (ip, mac, mode, link) representing
664
      the NICs the instance has
665
  @type disk_template: string
666
  @param disk_template: the disk template of the instance
667
  @type disks: list
668
  @param disks: the list of (size, mode) pairs
669
  @type bep: dict
670
  @param bep: the backend parameters for the instance
671
  @type hvp: dict
672
  @param hvp: the hypervisor parameters for the instance
673
  @type hypervisor_name: string
674
  @param hypervisor_name: the hypervisor for the instance
675
  @rtype: dict
676
  @return: the hook environment for this instance
677

678
  """
679
  if status:
680
    str_status = "up"
681
  else:
682
    str_status = "down"
683
  env = {
684
    "OP_TARGET": name,
685
    "INSTANCE_NAME": name,
686
    "INSTANCE_PRIMARY": primary_node,
687
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
688
    "INSTANCE_OS_TYPE": os_type,
689
    "INSTANCE_STATUS": str_status,
690
    "INSTANCE_MEMORY": memory,
691
    "INSTANCE_VCPUS": vcpus,
692
    "INSTANCE_DISK_TEMPLATE": disk_template,
693
    "INSTANCE_HYPERVISOR": hypervisor_name,
694
  }
695

    
696
  if nics:
697
    nic_count = len(nics)
698
    for idx, (ip, mac, mode, link) in enumerate(nics):
699
      if ip is None:
700
        ip = ""
701
      env["INSTANCE_NIC%d_IP" % idx] = ip
702
      env["INSTANCE_NIC%d_MAC" % idx] = mac
703
      env["INSTANCE_NIC%d_MODE" % idx] = mode
704
      env["INSTANCE_NIC%d_LINK" % idx] = link
705
      if mode == constants.NIC_MODE_BRIDGED:
706
        env["INSTANCE_NIC%d_BRIDGE" % idx] = link
707
  else:
708
    nic_count = 0
709

    
710
  env["INSTANCE_NIC_COUNT"] = nic_count
711

    
712
  if disks:
713
    disk_count = len(disks)
714
    for idx, (size, mode) in enumerate(disks):
715
      env["INSTANCE_DISK%d_SIZE" % idx] = size
716
      env["INSTANCE_DISK%d_MODE" % idx] = mode
717
  else:
718
    disk_count = 0
719

    
720
  env["INSTANCE_DISK_COUNT"] = disk_count
721

    
722
  for source, kind in [(bep, "BE"), (hvp, "HV")]:
723
    for key, value in source.items():
724
      env["INSTANCE_%s_%s" % (kind, key)] = value
725

    
726
  return env
727

    
728

    
729
def _NICListToTuple(lu, nics):
730
  """Build a list of nic information tuples.
731

732
  This list is suitable to be passed to _BuildInstanceHookEnv or as a return
733
  value in LUQueryInstanceData.
734

735
  @type lu:  L{LogicalUnit}
736
  @param lu: the logical unit on whose behalf we execute
737
  @type nics: list of L{objects.NIC}
738
  @param nics: list of nics to convert to hooks tuples
739

740
  """
741
  hooks_nics = []
742
  c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
743
  for nic in nics:
744
    ip = nic.ip
745
    mac = nic.mac
746
    filled_params = objects.FillDict(c_nicparams, nic.nicparams)
747
    mode = filled_params[constants.NIC_MODE]
748
    link = filled_params[constants.NIC_LINK]
749
    hooks_nics.append((ip, mac, mode, link))
750
  return hooks_nics
751

    
752

    
753
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
754
  """Builds instance related env variables for hooks from an object.
755

756
  @type lu: L{LogicalUnit}
757
  @param lu: the logical unit on whose behalf we execute
758
  @type instance: L{objects.Instance}
759
  @param instance: the instance for which we should build the
760
      environment
761
  @type override: dict
762
  @param override: dictionary with key/values that will override
763
      our values
764
  @rtype: dict
765
  @return: the hook environment dictionary
766

767
  """
768
  cluster = lu.cfg.GetClusterInfo()
769
  bep = cluster.FillBE(instance)
770
  hvp = cluster.FillHV(instance)
771
  args = {
772
    'name': instance.name,
773
    'primary_node': instance.primary_node,
774
    'secondary_nodes': instance.secondary_nodes,
775
    'os_type': instance.os,
776
    'status': instance.admin_up,
777
    'memory': bep[constants.BE_MEMORY],
778
    'vcpus': bep[constants.BE_VCPUS],
779
    'nics': _NICListToTuple(lu, instance.nics),
780
    'disk_template': instance.disk_template,
781
    'disks': [(disk.size, disk.mode) for disk in instance.disks],
782
    'bep': bep,
783
    'hvp': hvp,
784
    'hypervisor_name': instance.hypervisor,
785
  }
786
  if override:
787
    args.update(override)
788
  return _BuildInstanceHookEnv(**args) # pylint: disable-msg=W0142
789

    
790

    
791
def _AdjustCandidatePool(lu, exceptions):
792
  """Adjust the candidate pool after node operations.
793

794
  """
795
  mod_list = lu.cfg.MaintainCandidatePool(exceptions)
796
  if mod_list:
797
    lu.LogInfo("Promoted nodes to master candidate role: %s",
798
               utils.CommaJoin(node.name for node in mod_list))
799
    for name in mod_list:
800
      lu.context.ReaddNode(name)
801
  mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
802
  if mc_now > mc_max:
803
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
804
               (mc_now, mc_max))
805

    
806

    
807
def _DecideSelfPromotion(lu, exceptions=None):
808
  """Decide whether I should promote myself as a master candidate.
809

810
  """
811
  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
812
  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
813
  # the new node will increase mc_max with one, so:
814
  mc_should = min(mc_should + 1, cp_size)
815
  return mc_now < mc_should
816

    
817

    
818
def _CheckNicsBridgesExist(lu, target_nics, target_node,
819
                               profile=constants.PP_DEFAULT):
820
  """Check that the brigdes needed by a list of nics exist.
821

822
  """
823
  c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
824
  paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
825
                for nic in target_nics]
826
  brlist = [params[constants.NIC_LINK] for params in paramslist
827
            if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
828
  if brlist:
829
    result = lu.rpc.call_bridges_exist(target_node, brlist)
830
    result.Raise("Error checking bridges on destination node '%s'" %
831
                 target_node, prereq=True, ecode=errors.ECODE_ENVIRON)
832

    
833

    
834
def _CheckInstanceBridgesExist(lu, instance, node=None):
835
  """Check that the brigdes needed by an instance exist.
836

837
  """
838
  if node is None:
839
    node = instance.primary_node
840
  _CheckNicsBridgesExist(lu, instance.nics, node)
841

    
842

    
843
def _CheckOSVariant(os_obj, name):
844
  """Check whether an OS name conforms to the os variants specification.
845

846
  @type os_obj: L{objects.OS}
847
  @param os_obj: OS object to check
848
  @type name: string
849
  @param name: OS name passed by the user, to check for validity
850

851
  """
852
  if not os_obj.supported_variants:
853
    return
854
  try:
855
    variant = name.split("+", 1)[1]
856
  except IndexError:
857
    raise errors.OpPrereqError("OS name must include a variant",
858
                               errors.ECODE_INVAL)
859

    
860
  if variant not in os_obj.supported_variants:
861
    raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
862

    
863

    
864
def _GetNodeInstancesInner(cfg, fn):
865
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
866

    
867

    
868
def _GetNodeInstances(cfg, node_name):
869
  """Returns a list of all primary and secondary instances on a node.
870

871
  """
872

    
873
  return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
874

    
875

    
876
def _GetNodePrimaryInstances(cfg, node_name):
877
  """Returns primary instances on a node.
878

879
  """
880
  return _GetNodeInstancesInner(cfg,
881
                                lambda inst: node_name == inst.primary_node)
882

    
883

    
884
def _GetNodeSecondaryInstances(cfg, node_name):
885
  """Returns secondary instances on a node.
886

887
  """
888
  return _GetNodeInstancesInner(cfg,
889
                                lambda inst: node_name in inst.secondary_nodes)
890

    
891

    
892
def _GetStorageTypeArgs(cfg, storage_type):
893
  """Returns the arguments for a storage type.
894

895
  """
896
  # Special case for file storage
897
  if storage_type == constants.ST_FILE:
898
    # storage.FileStorage wants a list of storage directories
899
    return [[cfg.GetFileStorageDir()]]
900

    
901
  return []
902

    
903

    
904
def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
905
  faulty = []
906

    
907
  for dev in instance.disks:
908
    cfg.SetDiskID(dev, node_name)
909

    
910
  result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks)
911
  result.Raise("Failed to get disk status from node %s" % node_name,
912
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
913

    
914
  for idx, bdev_status in enumerate(result.payload):
915
    if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
916
      faulty.append(idx)
917

    
918
  return faulty
919

    
920

    
921
def _FormatTimestamp(secs):
922
  """Formats a Unix timestamp with the local timezone.
923

924
  """
925
  return time.strftime("%F %T %Z", time.gmtime(secs))
926

    
927

    
928
class LUPostInitCluster(LogicalUnit):
929
  """Logical unit for running hooks after cluster initialization.
930

931
  """
932
  HPATH = "cluster-init"
933
  HTYPE = constants.HTYPE_CLUSTER
934
  _OP_REQP = []
935

    
936
  def BuildHooksEnv(self):
937
    """Build hooks env.
938

939
    """
940
    env = {"OP_TARGET": self.cfg.GetClusterName()}
941
    mn = self.cfg.GetMasterNode()
942
    return env, [], [mn]
943

    
944
  def CheckPrereq(self):
945
    """No prerequisites to check.
946

947
    """
948
    return True
949

    
950
  def Exec(self, feedback_fn):
951
    """Nothing to do.
952

953
    """
954
    return True
955

    
956

    
957
class LUDestroyCluster(LogicalUnit):
958
  """Logical unit for destroying the cluster.
959

960
  """
961
  HPATH = "cluster-destroy"
962
  HTYPE = constants.HTYPE_CLUSTER
963
  _OP_REQP = []
964

    
965
  def BuildHooksEnv(self):
966
    """Build hooks env.
967

968
    """
969
    env = {"OP_TARGET": self.cfg.GetClusterName()}
970
    return env, [], []
971

    
972
  def CheckPrereq(self):
973
    """Check prerequisites.
974

975
    This checks whether the cluster is empty.
976

977
    Any errors are signaled by raising errors.OpPrereqError.
978

979
    """
980
    master = self.cfg.GetMasterNode()
981

    
982
    nodelist = self.cfg.GetNodeList()
983
    if len(nodelist) != 1 or nodelist[0] != master:
984
      raise errors.OpPrereqError("There are still %d node(s) in"
985
                                 " this cluster." % (len(nodelist) - 1),
986
                                 errors.ECODE_INVAL)
987
    instancelist = self.cfg.GetInstanceList()
988
    if instancelist:
989
      raise errors.OpPrereqError("There are still %d instance(s) in"
990
                                 " this cluster." % len(instancelist),
991
                                 errors.ECODE_INVAL)
992

    
993
  def Exec(self, feedback_fn):
994
    """Destroys the cluster.
995

996
    """
997
    master = self.cfg.GetMasterNode()
998
    modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
999

    
1000
    # Run post hooks on master node before it's removed
1001
    hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
1002
    try:
1003
      hm.RunPhase(constants.HOOKS_PHASE_POST, [master])
1004
    except:
1005
      # pylint: disable-msg=W0702
1006
      self.LogWarning("Errors occurred running hooks on %s" % master)
1007

    
1008
    result = self.rpc.call_node_stop_master(master, False)
1009
    result.Raise("Could not disable the master role")
1010

    
1011
    if modify_ssh_setup:
1012
      priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1013
      utils.CreateBackup(priv_key)
1014
      utils.CreateBackup(pub_key)
1015

    
1016
    return master
1017

    
1018

    
1019
def _VerifyCertificateInner(filename, expired, not_before, not_after, now,
1020
                            warn_days=constants.SSL_CERT_EXPIRATION_WARN,
1021
                            error_days=constants.SSL_CERT_EXPIRATION_ERROR):
1022
  """Verifies certificate details for LUVerifyCluster.
1023

1024
  """
1025
  if expired:
1026
    msg = "Certificate %s is expired" % filename
1027

    
1028
    if not_before is not None and not_after is not None:
1029
      msg += (" (valid from %s to %s)" %
1030
              (_FormatTimestamp(not_before),
1031
               _FormatTimestamp(not_after)))
1032
    elif not_before is not None:
1033
      msg += " (valid from %s)" % _FormatTimestamp(not_before)
1034
    elif not_after is not None:
1035
      msg += " (valid until %s)" % _FormatTimestamp(not_after)
1036

    
1037
    return (LUVerifyCluster.ETYPE_ERROR, msg)
1038

    
1039
  elif not_before is not None and not_before > now:
1040
    return (LUVerifyCluster.ETYPE_WARNING,
1041
            "Certificate %s not yet valid (valid from %s)" %
1042
            (filename, _FormatTimestamp(not_before)))
1043

    
1044
  elif not_after is not None:
1045
    remaining_days = int((not_after - now) / (24 * 3600))
1046

    
1047
    msg = ("Certificate %s expires in %d days" % (filename, remaining_days))
1048

    
1049
    if remaining_days <= error_days:
1050
      return (LUVerifyCluster.ETYPE_ERROR, msg)
1051

    
1052
    if remaining_days <= warn_days:
1053
      return (LUVerifyCluster.ETYPE_WARNING, msg)
1054

    
1055
  return (None, None)
1056

    
1057

    
1058
def _VerifyCertificate(filename):
1059
  """Verifies a certificate for LUVerifyCluster.
1060

1061
  @type filename: string
1062
  @param filename: Path to PEM file
1063

1064
  """
1065
  try:
1066
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1067
                                           utils.ReadFile(filename))
1068
  except Exception, err: # pylint: disable-msg=W0703
1069
    return (LUVerifyCluster.ETYPE_ERROR,
1070
            "Failed to load X509 certificate %s: %s" % (filename, err))
1071

    
1072
  # Depending on the pyOpenSSL version, this can just return (None, None)
1073
  (not_before, not_after) = utils.GetX509CertValidity(cert)
1074

    
1075
  return _VerifyCertificateInner(filename, cert.has_expired(),
1076
                                 not_before, not_after, time.time())
1077

    
1078

    
1079
class LUVerifyCluster(LogicalUnit):
1080
  """Verifies the cluster status.
1081

1082
  """
1083
  HPATH = "cluster-verify"
1084
  HTYPE = constants.HTYPE_CLUSTER
1085
  _OP_REQP = ["skip_checks", "verbose", "error_codes", "debug_simulate_errors"]
1086
  REQ_BGL = False
1087

    
1088
  TCLUSTER = "cluster"
1089
  TNODE = "node"
1090
  TINSTANCE = "instance"
1091

    
1092
  ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG")
1093
  ECLUSTERCERT = (TCLUSTER, "ECLUSTERCERT")
1094
  EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE")
1095
  EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN")
1096
  EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT")
1097
  EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
1098
  EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
1099
  EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE")
1100
  ENODEDRBD = (TNODE, "ENODEDRBD")
1101
  ENODEFILECHECK = (TNODE, "ENODEFILECHECK")
1102
  ENODEHOOKS = (TNODE, "ENODEHOOKS")
1103
  ENODEHV = (TNODE, "ENODEHV")
1104
  ENODELVM = (TNODE, "ENODELVM")
1105
  ENODEN1 = (TNODE, "ENODEN1")
1106
  ENODENET = (TNODE, "ENODENET")
1107
  ENODEORPHANINSTANCE = (TNODE, "ENODEORPHANINSTANCE")
1108
  ENODEORPHANLV = (TNODE, "ENODEORPHANLV")
1109
  ENODERPC = (TNODE, "ENODERPC")
1110
  ENODESSH = (TNODE, "ENODESSH")
1111
  ENODEVERSION = (TNODE, "ENODEVERSION")
1112
  ENODESETUP = (TNODE, "ENODESETUP")
1113
  ENODETIME = (TNODE, "ENODETIME")
1114

    
1115
  ETYPE_FIELD = "code"
1116
  ETYPE_ERROR = "ERROR"
1117
  ETYPE_WARNING = "WARNING"
1118

    
1119
  class NodeImage(object):
1120
    """A class representing the logical and physical status of a node.
1121

1122
    @ivar volumes: a structure as returned from
1123
        L{ganeti.backend.GetVolumeList} (runtime)
1124
    @ivar instances: a list of running instances (runtime)
1125
    @ivar pinst: list of configured primary instances (config)
1126
    @ivar sinst: list of configured secondary instances (config)
1127
    @ivar sbp: diction of {secondary-node: list of instances} of all peers
1128
        of this node (config)
1129
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1130
    @ivar dfree: free disk, as reported by the node (runtime)
1131
    @ivar offline: the offline status (config)
1132
    @type rpc_fail: boolean
1133
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1134
        not whether the individual keys were correct) (runtime)
1135
    @type lvm_fail: boolean
1136
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1137
    @type hyp_fail: boolean
1138
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1139
    @type ghost: boolean
1140
    @ivar ghost: whether this is a known node or not (config)
1141

1142
    """
1143
    def __init__(self, offline=False):
1144
      self.volumes = {}
1145
      self.instances = []
1146
      self.pinst = []
1147
      self.sinst = []
1148
      self.sbp = {}
1149
      self.mfree = 0
1150
      self.dfree = 0
1151
      self.offline = offline
1152
      self.rpc_fail = False
1153
      self.lvm_fail = False
1154
      self.hyp_fail = False
1155
      self.ghost = False
1156

    
1157
  def ExpandNames(self):
1158
    self.needed_locks = {
1159
      locking.LEVEL_NODE: locking.ALL_SET,
1160
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1161
    }
1162
    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1163

    
1164
  def _Error(self, ecode, item, msg, *args, **kwargs):
1165
    """Format an error message.
1166

1167
    Based on the opcode's error_codes parameter, either format a
1168
    parseable error code, or a simpler error string.
1169

1170
    This must be called only from Exec and functions called from Exec.
1171

1172
    """
1173
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1174
    itype, etxt = ecode
1175
    # first complete the msg
1176
    if args:
1177
      msg = msg % args
1178
    # then format the whole message
1179
    if self.op.error_codes:
1180
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1181
    else:
1182
      if item:
1183
        item = " " + item
1184
      else:
1185
        item = ""
1186
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1187
    # and finally report it via the feedback_fn
1188
    self._feedback_fn("  - %s" % msg)
1189

    
1190
  def _ErrorIf(self, cond, *args, **kwargs):
1191
    """Log an error message if the passed condition is True.
1192

1193
    """
1194
    cond = bool(cond) or self.op.debug_simulate_errors
1195
    if cond:
1196
      self._Error(*args, **kwargs)
1197
    # do not mark the operation as failed for WARN cases only
1198
    if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
1199
      self.bad = self.bad or cond
1200

    
1201
  def _VerifyNode(self, ninfo, nresult):
1202
    """Run multiple tests against a node.
1203

1204
    Test list:
1205

1206
      - compares ganeti version
1207
      - checks vg existence and size > 20G
1208
      - checks config file checksum
1209
      - checks ssh to other nodes
1210

1211
    @type ninfo: L{objects.Node}
1212
    @param ninfo: the node to check
1213
    @param nresult: the results from the node
1214
    @rtype: boolean
1215
    @return: whether overall this call was successful (and we can expect
1216
         reasonable values in the respose)
1217

1218
    """
1219
    node = ninfo.name
1220
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1221

    
1222
    # main result, nresult should be a non-empty dict
1223
    test = not nresult or not isinstance(nresult, dict)
1224
    _ErrorIf(test, self.ENODERPC, node,
1225
                  "unable to verify node: no data returned")
1226
    if test:
1227
      return False
1228

    
1229
    # compares ganeti version
1230
    local_version = constants.PROTOCOL_VERSION
1231
    remote_version = nresult.get("version", None)
1232
    test = not (remote_version and
1233
                isinstance(remote_version, (list, tuple)) and
1234
                len(remote_version) == 2)
1235
    _ErrorIf(test, self.ENODERPC, node,
1236
             "connection to node returned invalid data")
1237
    if test:
1238
      return False
1239

    
1240
    test = local_version != remote_version[0]
1241
    _ErrorIf(test, self.ENODEVERSION, node,
1242
             "incompatible protocol versions: master %s,"
1243
             " node %s", local_version, remote_version[0])
1244
    if test:
1245
      return False
1246

    
1247
    # node seems compatible, we can actually try to look into its results
1248

    
1249
    # full package version
1250
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1251
                  self.ENODEVERSION, node,
1252
                  "software version mismatch: master %s, node %s",
1253
                  constants.RELEASE_VERSION, remote_version[1],
1254
                  code=self.ETYPE_WARNING)
1255

    
1256
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1257
    if isinstance(hyp_result, dict):
1258
      for hv_name, hv_result in hyp_result.iteritems():
1259
        test = hv_result is not None
1260
        _ErrorIf(test, self.ENODEHV, node,
1261
                 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1262

    
1263

    
1264
    test = nresult.get(constants.NV_NODESETUP,
1265
                           ["Missing NODESETUP results"])
1266
    _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s",
1267
             "; ".join(test))
1268

    
1269
    return True
1270

    
1271
  def _VerifyNodeTime(self, ninfo, nresult,
1272
                      nvinfo_starttime, nvinfo_endtime):
1273
    """Check the node time.
1274

1275
    @type ninfo: L{objects.Node}
1276
    @param ninfo: the node to check
1277
    @param nresult: the remote results for the node
1278
    @param nvinfo_starttime: the start time of the RPC call
1279
    @param nvinfo_endtime: the end time of the RPC call
1280

1281
    """
1282
    node = ninfo.name
1283
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1284

    
1285
    ntime = nresult.get(constants.NV_TIME, None)
1286
    try:
1287
      ntime_merged = utils.MergeTime(ntime)
1288
    except (ValueError, TypeError):
1289
      _ErrorIf(True, self.ENODETIME, node, "Node returned invalid time")
1290
      return
1291

    
1292
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1293
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1294
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1295
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1296
    else:
1297
      ntime_diff = None
1298

    
1299
    _ErrorIf(ntime_diff is not None, self.ENODETIME, node,
1300
             "Node time diverges by at least %s from master node time",
1301
             ntime_diff)
1302

    
1303
  def _VerifyNodeLVM(self, ninfo, nresult, vg_name):
1304
    """Check the node time.
1305

1306
    @type ninfo: L{objects.Node}
1307
    @param ninfo: the node to check
1308
    @param nresult: the remote results for the node
1309
    @param vg_name: the configured VG name
1310

1311
    """
1312
    if vg_name is None:
1313
      return
1314

    
1315
    node = ninfo.name
1316
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1317

    
1318
    # checks vg existence and size > 20G
1319
    vglist = nresult.get(constants.NV_VGLIST, None)
1320
    test = not vglist
1321
    _ErrorIf(test, self.ENODELVM, node, "unable to check volume groups")
1322
    if not test:
1323
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1324
                                            constants.MIN_VG_SIZE)
1325
      _ErrorIf(vgstatus, self.ENODELVM, node, vgstatus)
1326

    
1327
    # check pv names
1328
    pvlist = nresult.get(constants.NV_PVLIST, None)
1329
    test = pvlist is None
1330
    _ErrorIf(test, self.ENODELVM, node, "Can't get PV list from node")
1331
    if not test:
1332
      # check that ':' is not present in PV names, since it's a
1333
      # special character for lvcreate (denotes the range of PEs to
1334
      # use on the PV)
1335
      for _, pvname, owner_vg in pvlist:
1336
        test = ":" in pvname
1337
        _ErrorIf(test, self.ENODELVM, node, "Invalid character ':' in PV"
1338
                 " '%s' of VG '%s'", pvname, owner_vg)
1339

    
1340
  def _VerifyNodeNetwork(self, ninfo, nresult):
1341
    """Check the node time.
1342

1343
    @type ninfo: L{objects.Node}
1344
    @param ninfo: the node to check
1345
    @param nresult: the remote results for the node
1346

1347
    """
1348
    node = ninfo.name
1349
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1350

    
1351
    test = constants.NV_NODELIST not in nresult
1352
    _ErrorIf(test, self.ENODESSH, node,
1353
             "node hasn't returned node ssh connectivity data")
1354
    if not test:
1355
      if nresult[constants.NV_NODELIST]:
1356
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1357
          _ErrorIf(True, self.ENODESSH, node,
1358
                   "ssh communication with node '%s': %s", a_node, a_msg)
1359

    
1360
    test = constants.NV_NODENETTEST not in nresult
1361
    _ErrorIf(test, self.ENODENET, node,
1362
             "node hasn't returned node tcp connectivity data")
1363
    if not test:
1364
      if nresult[constants.NV_NODENETTEST]:
1365
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1366
        for anode in nlist:
1367
          _ErrorIf(True, self.ENODENET, node,
1368
                   "tcp communication with node '%s': %s",
1369
                   anode, nresult[constants.NV_NODENETTEST][anode])
1370

    
1371
  def _VerifyInstance(self, instance, instanceconfig, node_image):
1372
    """Verify an instance.
1373

1374
    This function checks to see if the required block devices are
1375
    available on the instance's node.
1376

1377
    """
1378
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1379
    node_current = instanceconfig.primary_node
1380

    
1381
    node_vol_should = {}
1382
    instanceconfig.MapLVsByNode(node_vol_should)
1383

    
1384
    for node in node_vol_should:
1385
      n_img = node_image[node]
1386
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1387
        # ignore missing volumes on offline or broken nodes
1388
        continue
1389
      for volume in node_vol_should[node]:
1390
        test = volume not in n_img.volumes
1391
        _ErrorIf(test, self.EINSTANCEMISSINGDISK, instance,
1392
                 "volume %s missing on node %s", volume, node)
1393

    
1394
    if instanceconfig.admin_up:
1395
      pri_img = node_image[node_current]
1396
      test = instance not in pri_img.instances and not pri_img.offline
1397
      _ErrorIf(test, self.EINSTANCEDOWN, instance,
1398
               "instance not running on its primary node %s",
1399
               node_current)
1400

    
1401
    for node, n_img in node_image.items():
1402
      if (not node == node_current):
1403
        test = instance in n_img.instances
1404
        _ErrorIf(test, self.EINSTANCEWRONGNODE, instance,
1405
                 "instance should not run on node %s", node)
1406

    
1407
  def _VerifyOrphanVolumes(self, node_vol_should, node_image):
1408
    """Verify if there are any unknown volumes in the cluster.
1409

1410
    The .os, .swap and backup volumes are ignored. All other volumes are
1411
    reported as unknown.
1412

1413
    """
1414
    for node, n_img in node_image.items():
1415
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1416
        # skip non-healthy nodes
1417
        continue
1418
      for volume in n_img.volumes:
1419
        test = (node not in node_vol_should or
1420
                volume not in node_vol_should[node])
1421
        self._ErrorIf(test, self.ENODEORPHANLV, node,
1422
                      "volume %s is unknown", volume)
1423

    
1424
  def _VerifyOrphanInstances(self, instancelist, node_image):
1425
    """Verify the list of running instances.
1426

1427
    This checks what instances are running but unknown to the cluster.
1428

1429
    """
1430
    for node, n_img in node_image.items():
1431
      for o_inst in n_img.instances:
1432
        test = o_inst not in instancelist
1433
        self._ErrorIf(test, self.ENODEORPHANINSTANCE, node,
1434
                      "instance %s on node %s should not exist", o_inst, node)
1435

    
1436
  def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
1437
    """Verify N+1 Memory Resilience.
1438

1439
    Check that if one single node dies we can still start all the
1440
    instances it was primary for.
1441

1442
    """
1443
    for node, n_img in node_image.items():
1444
      # This code checks that every node which is now listed as
1445
      # secondary has enough memory to host all instances it is
1446
      # supposed to should a single other node in the cluster fail.
1447
      # FIXME: not ready for failover to an arbitrary node
1448
      # FIXME: does not support file-backed instances
1449
      # WARNING: we currently take into account down instances as well
1450
      # as up ones, considering that even if they're down someone
1451
      # might want to start them even in the event of a node failure.
1452
      for prinode, instances in n_img.sbp.items():
1453
        needed_mem = 0
1454
        for instance in instances:
1455
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
1456
          if bep[constants.BE_AUTO_BALANCE]:
1457
            needed_mem += bep[constants.BE_MEMORY]
1458
        test = n_img.mfree < needed_mem
1459
        self._ErrorIf(test, self.ENODEN1, node,
1460
                      "not enough memory on to accommodate"
1461
                      " failovers should peer node %s fail", prinode)
1462

    
1463
  def _VerifyNodeFiles(self, ninfo, nresult, file_list, local_cksum,
1464
                       master_files):
1465
    """Verifies and computes the node required file checksums.
1466

1467
    @type ninfo: L{objects.Node}
1468
    @param ninfo: the node to check
1469
    @param nresult: the remote results for the node
1470
    @param file_list: required list of files
1471
    @param local_cksum: dictionary of local files and their checksums
1472
    @param master_files: list of files that only masters should have
1473

1474
    """
1475
    node = ninfo.name
1476
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1477

    
1478
    remote_cksum = nresult.get(constants.NV_FILELIST, None)
1479
    test = not isinstance(remote_cksum, dict)
1480
    _ErrorIf(test, self.ENODEFILECHECK, node,
1481
             "node hasn't returned file checksum data")
1482
    if test:
1483
      return
1484

    
1485
    for file_name in file_list:
1486
      node_is_mc = ninfo.master_candidate
1487
      must_have = (file_name not in master_files) or node_is_mc
1488
      # missing
1489
      test1 = file_name not in remote_cksum
1490
      # invalid checksum
1491
      test2 = not test1 and remote_cksum[file_name] != local_cksum[file_name]
1492
      # existing and good
1493
      test3 = not test1 and remote_cksum[file_name] == local_cksum[file_name]
1494
      _ErrorIf(test1 and must_have, self.ENODEFILECHECK, node,
1495
               "file '%s' missing", file_name)
1496
      _ErrorIf(test2 and must_have, self.ENODEFILECHECK, node,
1497
               "file '%s' has wrong checksum", file_name)
1498
      # not candidate and this is not a must-have file
1499
      _ErrorIf(test2 and not must_have, self.ENODEFILECHECK, node,
1500
               "file '%s' should not exist on non master"
1501
               " candidates (and the file is outdated)", file_name)
1502
      # all good, except non-master/non-must have combination
1503
      _ErrorIf(test3 and not must_have, self.ENODEFILECHECK, node,
1504
               "file '%s' should not exist"
1505
               " on non master candidates", file_name)
1506

    
1507
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_map):
1508
    """Verifies and the node DRBD status.
1509

1510
    @type ninfo: L{objects.Node}
1511
    @param ninfo: the node to check
1512
    @param nresult: the remote results for the node
1513
    @param instanceinfo: the dict of instances
1514
    @param drbd_map: the DRBD map as returned by
1515
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
1516

1517
    """
1518
    node = ninfo.name
1519
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1520

    
1521
    # compute the DRBD minors
1522
    node_drbd = {}
1523
    for minor, instance in drbd_map[node].items():
1524
      test = instance not in instanceinfo
1525
      _ErrorIf(test, self.ECLUSTERCFG, None,
1526
               "ghost instance '%s' in temporary DRBD map", instance)
1527
        # ghost instance should not be running, but otherwise we
1528
        # don't give double warnings (both ghost instance and
1529
        # unallocated minor in use)
1530
      if test:
1531
        node_drbd[minor] = (instance, False)
1532
      else:
1533
        instance = instanceinfo[instance]
1534
        node_drbd[minor] = (instance.name, instance.admin_up)
1535

    
1536
    # and now check them
1537
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
1538
    test = not isinstance(used_minors, (tuple, list))
1539
    _ErrorIf(test, self.ENODEDRBD, node,
1540
             "cannot parse drbd status file: %s", str(used_minors))
1541
    if test:
1542
      # we cannot check drbd status
1543
      return
1544

    
1545
    for minor, (iname, must_exist) in node_drbd.items():
1546
      test = minor not in used_minors and must_exist
1547
      _ErrorIf(test, self.ENODEDRBD, node,
1548
               "drbd minor %d of instance %s is not active", minor, iname)
1549
    for minor in used_minors:
1550
      test = minor not in node_drbd
1551
      _ErrorIf(test, self.ENODEDRBD, node,
1552
               "unallocated drbd minor %d is in use", minor)
1553

    
1554
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
1555
    """Verifies and updates the node volume data.
1556

1557
    This function will update a L{NodeImage}'s internal structures
1558
    with data from the remote call.
1559

1560
    @type ninfo: L{objects.Node}
1561
    @param ninfo: the node to check
1562
    @param nresult: the remote results for the node
1563
    @param nimg: the node image object
1564
    @param vg_name: the configured VG name
1565

1566
    """
1567
    node = ninfo.name
1568
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1569

    
1570
    nimg.lvm_fail = True
1571
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1572
    if vg_name is None:
1573
      pass
1574
    elif isinstance(lvdata, basestring):
1575
      _ErrorIf(True, self.ENODELVM, node, "LVM problem on node: %s",
1576
               utils.SafeEncode(lvdata))
1577
    elif not isinstance(lvdata, dict):
1578
      _ErrorIf(True, self.ENODELVM, node, "rpc call to node failed (lvlist)")
1579
    else:
1580
      nimg.volumes = lvdata
1581
      nimg.lvm_fail = False
1582

    
1583
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
1584
    """Verifies and updates the node instance list.
1585

1586
    If the listing was successful, then updates this node's instance
1587
    list. Otherwise, it marks the RPC call as failed for the instance
1588
    list key.
1589

1590
    @type ninfo: L{objects.Node}
1591
    @param ninfo: the node to check
1592
    @param nresult: the remote results for the node
1593
    @param nimg: the node image object
1594

1595
    """
1596
    idata = nresult.get(constants.NV_INSTANCELIST, None)
1597
    test = not isinstance(idata, list)
1598
    self._ErrorIf(test, self.ENODEHV, ninfo.name, "rpc call to node failed"
1599
                  " (instancelist): %s", utils.SafeEncode(str(idata)))
1600
    if test:
1601
      nimg.hyp_fail = True
1602
    else:
1603
      nimg.instances = idata
1604

    
1605
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
1606
    """Verifies and computes a node information map
1607

1608
    @type ninfo: L{objects.Node}
1609
    @param ninfo: the node to check
1610
    @param nresult: the remote results for the node
1611
    @param nimg: the node image object
1612
    @param vg_name: the configured VG name
1613

1614
    """
1615
    node = ninfo.name
1616
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1617

    
1618
    # try to read free memory (from the hypervisor)
1619
    hv_info = nresult.get(constants.NV_HVINFO, None)
1620
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
1621
    _ErrorIf(test, self.ENODEHV, node, "rpc call to node failed (hvinfo)")
1622
    if not test:
1623
      try:
1624
        nimg.mfree = int(hv_info["memory_free"])
1625
      except (ValueError, TypeError):
1626
        _ErrorIf(True, self.ENODERPC, node,
1627
                 "node returned invalid nodeinfo, check hypervisor")
1628

    
1629
    # FIXME: devise a free space model for file based instances as well
1630
    if vg_name is not None:
1631
      test = (constants.NV_VGLIST not in nresult or
1632
              vg_name not in nresult[constants.NV_VGLIST])
1633
      _ErrorIf(test, self.ENODELVM, node,
1634
               "node didn't return data for the volume group '%s'"
1635
               " - it is either missing or broken", vg_name)
1636
      if not test:
1637
        try:
1638
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
1639
        except (ValueError, TypeError):
1640
          _ErrorIf(True, self.ENODERPC, node,
1641
                   "node returned invalid LVM info, check LVM status")
1642

    
1643
  def CheckPrereq(self):
1644
    """Check prerequisites.
1645

1646
    Transform the list of checks we're going to skip into a set and check that
1647
    all its members are valid.
1648

1649
    """
1650
    self.skip_set = frozenset(self.op.skip_checks)
1651
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
1652
      raise errors.OpPrereqError("Invalid checks to be skipped specified",
1653
                                 errors.ECODE_INVAL)
1654

    
1655
  def BuildHooksEnv(self):
1656
    """Build hooks env.
1657

1658
    Cluster-Verify hooks just ran in the post phase and their failure makes
1659
    the output be logged in the verify output and the verification to fail.
1660

1661
    """
1662
    all_nodes = self.cfg.GetNodeList()
1663
    env = {
1664
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
1665
      }
1666
    for node in self.cfg.GetAllNodesInfo().values():
1667
      env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
1668

    
1669
    return env, [], all_nodes
1670

    
1671
  def Exec(self, feedback_fn):
1672
    """Verify integrity of cluster, performing various test on nodes.
1673

1674
    """
1675
    self.bad = False
1676
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1677
    verbose = self.op.verbose
1678
    self._feedback_fn = feedback_fn
1679
    feedback_fn("* Verifying global settings")
1680
    for msg in self.cfg.VerifyConfig():
1681
      _ErrorIf(True, self.ECLUSTERCFG, None, msg)
1682

    
1683
    # Check the cluster certificates
1684
    for cert_filename in constants.ALL_CERT_FILES:
1685
      (errcode, msg) = _VerifyCertificate(cert_filename)
1686
      _ErrorIf(errcode, self.ECLUSTERCERT, None, msg, code=errcode)
1687

    
1688
    vg_name = self.cfg.GetVGName()
1689
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
1690
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
1691
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
1692
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
1693
    instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
1694
                        for iname in instancelist)
1695
    i_non_redundant = [] # Non redundant instances
1696
    i_non_a_balanced = [] # Non auto-balanced instances
1697
    n_offline = 0 # Count of offline nodes
1698
    n_drained = 0 # Count of nodes being drained
1699
    node_vol_should = {}
1700

    
1701
    # FIXME: verify OS list
1702
    # do local checksums
1703
    master_files = [constants.CLUSTER_CONF_FILE]
1704

    
1705
    file_names = ssconf.SimpleStore().GetFileList()
1706
    file_names.extend(constants.ALL_CERT_FILES)
1707
    file_names.extend(master_files)
1708

    
1709
    local_checksums = utils.FingerprintFiles(file_names)
1710

    
1711
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1712
    node_verify_param = {
1713
      constants.NV_FILELIST: file_names,
1714
      constants.NV_NODELIST: [node.name for node in nodeinfo
1715
                              if not node.offline],
1716
      constants.NV_HYPERVISOR: hypervisors,
1717
      constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1718
                                  node.secondary_ip) for node in nodeinfo
1719
                                 if not node.offline],
1720
      constants.NV_INSTANCELIST: hypervisors,
1721
      constants.NV_VERSION: None,
1722
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1723
      constants.NV_NODESETUP: None,
1724
      constants.NV_TIME: None,
1725
      }
1726

    
1727
    if vg_name is not None:
1728
      node_verify_param[constants.NV_VGLIST] = None
1729
      node_verify_param[constants.NV_LVLIST] = vg_name
1730
      node_verify_param[constants.NV_PVLIST] = [vg_name]
1731
      node_verify_param[constants.NV_DRBDLIST] = None
1732

    
1733
    # Build our expected cluster state
1734
    node_image = dict((node.name, self.NodeImage(offline=node.offline))
1735
                      for node in nodeinfo)
1736

    
1737
    for instance in instancelist:
1738
      inst_config = instanceinfo[instance]
1739

    
1740
      for nname in inst_config.all_nodes:
1741
        if nname not in node_image:
1742
          # ghost node
1743
          gnode = self.NodeImage()
1744
          gnode.ghost = True
1745
          node_image[nname] = gnode
1746

    
1747
      inst_config.MapLVsByNode(node_vol_should)
1748

    
1749
      pnode = inst_config.primary_node
1750
      node_image[pnode].pinst.append(instance)
1751

    
1752
      for snode in inst_config.secondary_nodes:
1753
        nimg = node_image[snode]
1754
        nimg.sinst.append(instance)
1755
        if pnode not in nimg.sbp:
1756
          nimg.sbp[pnode] = []
1757
        nimg.sbp[pnode].append(instance)
1758

    
1759
    # At this point, we have the in-memory data structures complete,
1760
    # except for the runtime information, which we'll gather next
1761

    
1762
    # Due to the way our RPC system works, exact response times cannot be
1763
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
1764
    # time before and after executing the request, we can at least have a time
1765
    # window.
1766
    nvinfo_starttime = time.time()
1767
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1768
                                           self.cfg.GetClusterName())
1769
    nvinfo_endtime = time.time()
1770

    
1771
    cluster = self.cfg.GetClusterInfo()
1772
    master_node = self.cfg.GetMasterNode()
1773
    all_drbd_map = self.cfg.ComputeDRBDMap()
1774

    
1775
    feedback_fn("* Verifying node status")
1776
    for node_i in nodeinfo:
1777
      node = node_i.name
1778
      nimg = node_image[node]
1779

    
1780
      if node_i.offline:
1781
        if verbose:
1782
          feedback_fn("* Skipping offline node %s" % (node,))
1783
        n_offline += 1
1784
        continue
1785

    
1786
      if node == master_node:
1787
        ntype = "master"
1788
      elif node_i.master_candidate:
1789
        ntype = "master candidate"
1790
      elif node_i.drained:
1791
        ntype = "drained"
1792
        n_drained += 1
1793
      else:
1794
        ntype = "regular"
1795
      if verbose:
1796
        feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1797

    
1798
      msg = all_nvinfo[node].fail_msg
1799
      _ErrorIf(msg, self.ENODERPC, node, "while contacting node: %s", msg)
1800
      if msg:
1801
        nimg.rpc_fail = True
1802
        continue
1803

    
1804
      nresult = all_nvinfo[node].payload
1805

    
1806
      nimg.call_ok = self._VerifyNode(node_i, nresult)
1807
      self._VerifyNodeNetwork(node_i, nresult)
1808
      self._VerifyNodeLVM(node_i, nresult, vg_name)
1809
      self._VerifyNodeFiles(node_i, nresult, file_names, local_checksums,
1810
                            master_files)
1811
      self._VerifyNodeDrbd(node_i, nresult, instanceinfo, all_drbd_map)
1812
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
1813

    
1814
      self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
1815
      self._UpdateNodeInstances(node_i, nresult, nimg)
1816
      self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
1817

    
1818
    feedback_fn("* Verifying instance status")
1819
    for instance in instancelist:
1820
      if verbose:
1821
        feedback_fn("* Verifying instance %s" % instance)
1822
      inst_config = instanceinfo[instance]
1823
      self._VerifyInstance(instance, inst_config, node_image)
1824
      inst_nodes_offline = []
1825

    
1826
      pnode = inst_config.primary_node
1827
      pnode_img = node_image[pnode]
1828
      _ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
1829
               self.ENODERPC, pnode, "instance %s, connection to"
1830
               " primary node failed", instance)
1831

    
1832
      if pnode_img.offline:
1833
        inst_nodes_offline.append(pnode)
1834

    
1835
      # If the instance is non-redundant we cannot survive losing its primary
1836
      # node, so we are not N+1 compliant. On the other hand we have no disk
1837
      # templates with more than one secondary so that situation is not well
1838
      # supported either.
1839
      # FIXME: does not support file-backed instances
1840
      if not inst_config.secondary_nodes:
1841
        i_non_redundant.append(instance)
1842
      _ErrorIf(len(inst_config.secondary_nodes) > 1, self.EINSTANCELAYOUT,
1843
               instance, "instance has multiple secondary nodes: %s",
1844
               utils.CommaJoin(inst_config.secondary_nodes),
1845
               code=self.ETYPE_WARNING)
1846

    
1847
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1848
        i_non_a_balanced.append(instance)
1849

    
1850
      for snode in inst_config.secondary_nodes:
1851
        s_img = node_image[snode]
1852
        _ErrorIf(s_img.rpc_fail and not s_img.offline, self.ENODERPC, snode,
1853
                 "instance %s, connection to secondary node failed", instance)
1854

    
1855
        if s_img.offline:
1856
          inst_nodes_offline.append(snode)
1857

    
1858
      # warn that the instance lives on offline nodes
1859
      _ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance,
1860
               "instance lives on offline node(s) %s",
1861
               utils.CommaJoin(inst_nodes_offline))
1862
      # ... or ghost nodes
1863
      for node in inst_config.all_nodes:
1864
        _ErrorIf(node_image[node].ghost, self.EINSTANCEBADNODE, instance,
1865
                 "instance lives on ghost node %s", node)
1866

    
1867
    feedback_fn("* Verifying orphan volumes")
1868
    self._VerifyOrphanVolumes(node_vol_should, node_image)
1869

    
1870
    feedback_fn("* Verifying oprhan instances")
1871
    self._VerifyOrphanInstances(instancelist, node_image)
1872

    
1873
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1874
      feedback_fn("* Verifying N+1 Memory redundancy")
1875
      self._VerifyNPlusOneMemory(node_image, instanceinfo)
1876

    
1877
    feedback_fn("* Other Notes")
1878
    if i_non_redundant:
1879
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1880
                  % len(i_non_redundant))
1881

    
1882
    if i_non_a_balanced:
1883
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1884
                  % len(i_non_a_balanced))
1885

    
1886
    if n_offline:
1887
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
1888

    
1889
    if n_drained:
1890
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
1891

    
1892
    return not self.bad
1893

    
1894
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1895
    """Analyze the post-hooks' result
1896

1897
    This method analyses the hook result, handles it, and sends some
1898
    nicely-formatted feedback back to the user.
1899

1900
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
1901
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1902
    @param hooks_results: the results of the multi-node hooks rpc call
1903
    @param feedback_fn: function used send feedback back to the caller
1904
    @param lu_result: previous Exec result
1905
    @return: the new Exec result, based on the previous result
1906
        and hook results
1907

1908
    """
1909
    # We only really run POST phase hooks, and are only interested in
1910
    # their results
1911
    if phase == constants.HOOKS_PHASE_POST:
1912
      # Used to change hooks' output to proper indentation
1913
      indent_re = re.compile('^', re.M)
1914
      feedback_fn("* Hooks Results")
1915
      assert hooks_results, "invalid result from hooks"
1916

    
1917
      for node_name in hooks_results:
1918
        res = hooks_results[node_name]
1919
        msg = res.fail_msg
1920
        test = msg and not res.offline
1921
        self._ErrorIf(test, self.ENODEHOOKS, node_name,
1922
                      "Communication failure in hooks execution: %s", msg)
1923
        if res.offline or msg:
1924
          # No need to investigate payload if node is offline or gave an error.
1925
          # override manually lu_result here as _ErrorIf only
1926
          # overrides self.bad
1927
          lu_result = 1
1928
          continue
1929
        for script, hkr, output in res.payload:
1930
          test = hkr == constants.HKR_FAIL
1931
          self._ErrorIf(test, self.ENODEHOOKS, node_name,
1932
                        "Script %s failed, output:", script)
1933
          if test:
1934
            output = indent_re.sub('      ', output)
1935
            feedback_fn("%s" % output)
1936
            lu_result = 0
1937

    
1938
      return lu_result
1939

    
1940

    
1941
class LUVerifyDisks(NoHooksLU):
1942
  """Verifies the cluster disks status.
1943

1944
  """
1945
  _OP_REQP = []
1946
  REQ_BGL = False
1947

    
1948
  def ExpandNames(self):
1949
    self.needed_locks = {
1950
      locking.LEVEL_NODE: locking.ALL_SET,
1951
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1952
    }
1953
    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1954

    
1955
  def CheckPrereq(self):
1956
    """Check prerequisites.
1957

1958
    This has no prerequisites.
1959

1960
    """
1961
    pass
1962

    
1963
  def Exec(self, feedback_fn):
1964
    """Verify integrity of cluster disks.
1965

1966
    @rtype: tuple of three items
1967
    @return: a tuple of (dict of node-to-node_error, list of instances
1968
        which need activate-disks, dict of instance: (node, volume) for
1969
        missing volumes
1970

1971
    """
1972
    result = res_nodes, res_instances, res_missing = {}, [], {}
1973

    
1974
    vg_name = self.cfg.GetVGName()
1975
    nodes = utils.NiceSort(self.cfg.GetNodeList())
1976
    instances = [self.cfg.GetInstanceInfo(name)
1977
                 for name in self.cfg.GetInstanceList()]
1978

    
1979
    nv_dict = {}
1980
    for inst in instances:
1981
      inst_lvs = {}
1982
      if (not inst.admin_up or
1983
          inst.disk_template not in constants.DTS_NET_MIRROR):
1984
        continue
1985
      inst.MapLVsByNode(inst_lvs)
1986
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1987
      for node, vol_list in inst_lvs.iteritems():
1988
        for vol in vol_list:
1989
          nv_dict[(node, vol)] = inst
1990

    
1991
    if not nv_dict:
1992
      return result
1993

    
1994
    node_lvs = self.rpc.call_lv_list(nodes, vg_name)
1995

    
1996
    for node in nodes:
1997
      # node_volume
1998
      node_res = node_lvs[node]
1999
      if node_res.offline:
2000
        continue
2001
      msg = node_res.fail_msg
2002
      if msg:
2003
        logging.warning("Error enumerating LVs on node %s: %s", node, msg)
2004
        res_nodes[node] = msg
2005
        continue
2006

    
2007
      lvs = node_res.payload
2008
      for lv_name, (_, _, lv_online) in lvs.items():
2009
        inst = nv_dict.pop((node, lv_name), None)
2010
        if (not lv_online and inst is not None
2011
            and inst.name not in res_instances):
2012
          res_instances.append(inst.name)
2013

    
2014
    # any leftover items in nv_dict are missing LVs, let's arrange the
2015
    # data better
2016
    for key, inst in nv_dict.iteritems():
2017
      if inst.name not in res_missing:
2018
        res_missing[inst.name] = []
2019
      res_missing[inst.name].append(key)
2020

    
2021
    return result
2022

    
2023

    
2024
class LURepairDiskSizes(NoHooksLU):
2025
  """Verifies the cluster disks sizes.
2026

2027
  """
2028
  _OP_REQP = ["instances"]
2029
  REQ_BGL = False
2030

    
2031
  def ExpandNames(self):
2032
    if not isinstance(self.op.instances, list):
2033
      raise errors.OpPrereqError("Invalid argument type 'instances'",
2034
                                 errors.ECODE_INVAL)
2035

    
2036
    if self.op.instances:
2037
      self.wanted_names = []
2038
      for name in self.op.instances:
2039
        full_name = _ExpandInstanceName(self.cfg, name)
2040
        self.wanted_names.append(full_name)
2041
      self.needed_locks = {
2042
        locking.LEVEL_NODE: [],
2043
        locking.LEVEL_INSTANCE: self.wanted_names,
2044
        }
2045
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2046
    else:
2047
      self.wanted_names = None
2048
      self.needed_locks = {
2049
        locking.LEVEL_NODE: locking.ALL_SET,
2050
        locking.LEVEL_INSTANCE: locking.ALL_SET,
2051
        }
2052
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
2053

    
2054
  def DeclareLocks(self, level):
2055
    if level == locking.LEVEL_NODE and self.wanted_names is not None:
2056
      self._LockInstancesNodes(primary_only=True)
2057

    
2058
  def CheckPrereq(self):
2059
    """Check prerequisites.
2060

2061
    This only checks the optional instance list against the existing names.
2062

2063
    """
2064
    if self.wanted_names is None:
2065
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2066

    
2067
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
2068
                             in self.wanted_names]
2069

    
2070
  def _EnsureChildSizes(self, disk):
2071
    """Ensure children of the disk have the needed disk size.
2072

2073
    This is valid mainly for DRBD8 and fixes an issue where the
2074
    children have smaller disk size.
2075

2076
    @param disk: an L{ganeti.objects.Disk} object
2077

2078
    """
2079
    if disk.dev_type == constants.LD_DRBD8:
2080
      assert disk.children, "Empty children for DRBD8?"
2081
      fchild = disk.children[0]
2082
      mismatch = fchild.size < disk.size
2083
      if mismatch:
2084
        self.LogInfo("Child disk has size %d, parent %d, fixing",
2085
                     fchild.size, disk.size)
2086
        fchild.size = disk.size
2087

    
2088
      # and we recurse on this child only, not on the metadev
2089
      return self._EnsureChildSizes(fchild) or mismatch
2090
    else:
2091
      return False
2092

    
2093
  def Exec(self, feedback_fn):
2094
    """Verify the size of cluster disks.
2095

2096
    """
2097
    # TODO: check child disks too
2098
    # TODO: check differences in size between primary/secondary nodes
2099
    per_node_disks = {}
2100
    for instance in self.wanted_instances:
2101
      pnode = instance.primary_node
2102
      if pnode not in per_node_disks:
2103
        per_node_disks[pnode] = []
2104
      for idx, disk in enumerate(instance.disks):
2105
        per_node_disks[pnode].append((instance, idx, disk))
2106

    
2107
    changed = []
2108
    for node, dskl in per_node_disks.items():
2109
      newl = [v[2].Copy() for v in dskl]
2110
      for dsk in newl:
2111
        self.cfg.SetDiskID(dsk, node)
2112
      result = self.rpc.call_blockdev_getsizes(node, newl)
2113
      if result.fail_msg:
2114
        self.LogWarning("Failure in blockdev_getsizes call to node"
2115
                        " %s, ignoring", node)
2116
        continue
2117
      if len(result.data) != len(dskl):
2118
        self.LogWarning("Invalid result from node %s, ignoring node results",
2119
                        node)
2120
        continue
2121
      for ((instance, idx, disk), size) in zip(dskl, result.data):
2122
        if size is None:
2123
          self.LogWarning("Disk %d of instance %s did not return size"
2124
                          " information, ignoring", idx, instance.name)
2125
          continue
2126
        if not isinstance(size, (int, long)):
2127
          self.LogWarning("Disk %d of instance %s did not return valid"
2128
                          " size information, ignoring", idx, instance.name)
2129
          continue
2130
        size = size >> 20
2131
        if size != disk.size:
2132
          self.LogInfo("Disk %d of instance %s has mismatched size,"
2133
                       " correcting: recorded %d, actual %d", idx,
2134
                       instance.name, disk.size, size)
2135
          disk.size = size
2136
          self.cfg.Update(instance, feedback_fn)
2137
          changed.append((instance.name, idx, size))
2138
        if self._EnsureChildSizes(disk):
2139
          self.cfg.Update(instance, feedback_fn)
2140
          changed.append((instance.name, idx, disk.size))
2141
    return changed
2142

    
2143

    
2144
class LURenameCluster(LogicalUnit):
2145
  """Rename the cluster.
2146

2147
  """
2148
  HPATH = "cluster-rename"
2149
  HTYPE = constants.HTYPE_CLUSTER
2150
  _OP_REQP = ["name"]
2151

    
2152
  def BuildHooksEnv(self):
2153
    """Build hooks env.
2154

2155
    """
2156
    env = {
2157
      "OP_TARGET": self.cfg.GetClusterName(),
2158
      "NEW_NAME": self.op.name,
2159
      }
2160
    mn = self.cfg.GetMasterNode()
2161
    all_nodes = self.cfg.GetNodeList()
2162
    return env, [mn], all_nodes
2163

    
2164
  def CheckPrereq(self):
2165
    """Verify that the passed name is a valid one.
2166

2167
    """
2168
    hostname = utils.GetHostInfo(self.op.name)
2169

    
2170
    new_name = hostname.name
2171
    self.ip = new_ip = hostname.ip
2172
    old_name = self.cfg.GetClusterName()
2173
    old_ip = self.cfg.GetMasterIP()
2174
    if new_name == old_name and new_ip == old_ip:
2175
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
2176
                                 " cluster has changed",
2177
                                 errors.ECODE_INVAL)
2178
    if new_ip != old_ip:
2179
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
2180
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
2181
                                   " reachable on the network. Aborting." %
2182
                                   new_ip, errors.ECODE_NOTUNIQUE)
2183

    
2184
    self.op.name = new_name
2185

    
2186
  def Exec(self, feedback_fn):
2187
    """Rename the cluster.
2188

2189
    """
2190
    clustername = self.op.name
2191
    ip = self.ip
2192

    
2193
    # shutdown the master IP
2194
    master = self.cfg.GetMasterNode()
2195
    result = self.rpc.call_node_stop_master(master, False)
2196
    result.Raise("Could not disable the master role")
2197

    
2198
    try:
2199
      cluster = self.cfg.GetClusterInfo()
2200
      cluster.cluster_name = clustername
2201
      cluster.master_ip = ip
2202
      self.cfg.Update(cluster, feedback_fn)
2203

    
2204
      # update the known hosts file
2205
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
2206
      node_list = self.cfg.GetNodeList()
2207
      try:
2208
        node_list.remove(master)
2209
      except ValueError:
2210
        pass
2211
      result = self.rpc.call_upload_file(node_list,
2212
                                         constants.SSH_KNOWN_HOSTS_FILE)
2213
      for to_node, to_result in result.iteritems():
2214
        msg = to_result.fail_msg
2215
        if msg:
2216
          msg = ("Copy of file %s to node %s failed: %s" %
2217
                 (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
2218
          self.proc.LogWarning(msg)
2219

    
2220
    finally:
2221
      result = self.rpc.call_node_start_master(master, False, False)
2222
      msg = result.fail_msg
2223
      if msg:
2224
        self.LogWarning("Could not re-enable the master role on"
2225
                        " the master, please restart manually: %s", msg)
2226

    
2227

    
2228
def _RecursiveCheckIfLVMBased(disk):
2229
  """Check if the given disk or its children are lvm-based.
2230

2231
  @type disk: L{objects.Disk}
2232
  @param disk: the disk to check
2233
  @rtype: boolean
2234
  @return: boolean indicating whether a LD_LV dev_type was found or not
2235

2236
  """
2237
  if disk.children:
2238
    for chdisk in disk.children:
2239
      if _RecursiveCheckIfLVMBased(chdisk):
2240
        return True
2241
  return disk.dev_type == constants.LD_LV
2242

    
2243

    
2244
class LUSetClusterParams(LogicalUnit):
2245
  """Change the parameters of the cluster.
2246

2247
  """
2248
  HPATH = "cluster-modify"
2249
  HTYPE = constants.HTYPE_CLUSTER
2250
  _OP_REQP = []
2251
  REQ_BGL = False
2252

    
2253
  def CheckArguments(self):
2254
    """Check parameters
2255

2256
    """
2257
    if not hasattr(self.op, "candidate_pool_size"):
2258
      self.op.candidate_pool_size = None
2259
    if self.op.candidate_pool_size is not None:
2260
      try:
2261
        self.op.candidate_pool_size = int(self.op.candidate_pool_size)
2262
      except (ValueError, TypeError), err:
2263
        raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
2264
                                   str(err), errors.ECODE_INVAL)
2265
      if self.op.candidate_pool_size < 1:
2266
        raise errors.OpPrereqError("At least one master candidate needed",
2267
                                   errors.ECODE_INVAL)
2268
    _CheckBooleanOpField(self.op, "maintain_node_health")
2269

    
2270
  def ExpandNames(self):
2271
    # FIXME: in the future maybe other cluster params won't require checking on
2272
    # all nodes to be modified.
2273
    self.needed_locks = {
2274
      locking.LEVEL_NODE: locking.ALL_SET,
2275
    }
2276
    self.share_locks[locking.LEVEL_NODE] = 1
2277

    
2278
  def BuildHooksEnv(self):
2279
    """Build hooks env.
2280

2281
    """
2282
    env = {
2283
      "OP_TARGET": self.cfg.GetClusterName(),
2284
      "NEW_VG_NAME": self.op.vg_name,
2285
      }
2286
    mn = self.cfg.GetMasterNode()
2287
    return env, [mn], [mn]
2288

    
2289
  def CheckPrereq(self):
2290
    """Check prerequisites.
2291

2292
    This checks whether the given params don't conflict and
2293
    if the given volume group is valid.
2294

2295
    """
2296
    if self.op.vg_name is not None and not self.op.vg_name:
2297
      instances = self.cfg.GetAllInstancesInfo().values()
2298
      for inst in instances:
2299
        for disk in inst.disks:
2300
          if _RecursiveCheckIfLVMBased(disk):
2301
            raise errors.OpPrereqError("Cannot disable lvm storage while"
2302
                                       " lvm-based instances exist",
2303
                                       errors.ECODE_INVAL)
2304

    
2305
    node_list = self.acquired_locks[locking.LEVEL_NODE]
2306

    
2307
    # if vg_name not None, checks given volume group on all nodes
2308
    if self.op.vg_name:
2309
      vglist = self.rpc.call_vg_list(node_list)
2310
      for node in node_list:
2311
        msg = vglist[node].fail_msg
2312
        if msg:
2313
          # ignoring down node
2314
          self.LogWarning("Error while gathering data on node %s"
2315
                          " (ignoring node): %s", node, msg)
2316
          continue
2317
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
2318
                                              self.op.vg_name,
2319
                                              constants.MIN_VG_SIZE)
2320
        if vgstatus:
2321
          raise errors.OpPrereqError("Error on node '%s': %s" %
2322
                                     (node, vgstatus), errors.ECODE_ENVIRON)
2323

    
2324
    self.cluster = cluster = self.cfg.GetClusterInfo()
2325
    # validate params changes
2326
    if self.op.beparams:
2327
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
2328
      self.new_beparams = objects.FillDict(
2329
        cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
2330

    
2331
    if self.op.nicparams:
2332
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
2333
      self.new_nicparams = objects.FillDict(
2334
        cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
2335
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
2336
      nic_errors = []
2337

    
2338
      # check all instances for consistency
2339
      for instance in self.cfg.GetAllInstancesInfo().values():
2340
        for nic_idx, nic in enumerate(instance.nics):
2341
          params_copy = copy.deepcopy(nic.nicparams)
2342
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
2343

    
2344
          # check parameter syntax
2345
          try:
2346
            objects.NIC.CheckParameterSyntax(params_filled)
2347
          except errors.ConfigurationError, err:
2348
            nic_errors.append("Instance %s, nic/%d: %s" %
2349
                              (instance.name, nic_idx, err))
2350

    
2351
          # if we're moving instances to routed, check that they have an ip
2352
          target_mode = params_filled[constants.NIC_MODE]
2353
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
2354
            nic_errors.append("Instance %s, nic/%d: routed nick with no ip" %
2355
                              (instance.name, nic_idx))
2356
      if nic_errors:
2357
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
2358
                                   "\n".join(nic_errors))
2359

    
2360
    # hypervisor list/parameters
2361
    self.new_hvparams = objects.FillDict(cluster.hvparams, {})
2362
    if self.op.hvparams:
2363
      if not isinstance(self.op.hvparams, dict):
2364
        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input",
2365
                                   errors.ECODE_INVAL)
2366
      for hv_name, hv_dict in self.op.hvparams.items():
2367
        if hv_name not in self.new_hvparams:
2368
          self.new_hvparams[hv_name] = hv_dict
2369
        else:
2370
          self.new_hvparams[hv_name].update(hv_dict)
2371

    
2372
    # os hypervisor parameters
2373
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
2374
    if self.op.os_hvp:
2375
      if not isinstance(self.op.os_hvp, dict):
2376
        raise errors.OpPrereqError("Invalid 'os_hvp' parameter on input",
2377
                                   errors.ECODE_INVAL)
2378
      for os_name, hvs in self.op.os_hvp.items():
2379
        if not isinstance(hvs, dict):
2380
          raise errors.OpPrereqError(("Invalid 'os_hvp' parameter on"
2381
                                      " input"), errors.ECODE_INVAL)
2382
        if os_name not in self.new_os_hvp:
2383
          self.new_os_hvp[os_name] = hvs
2384
        else:
2385
          for hv_name, hv_dict in hvs.items():
2386
            if hv_name not in self.new_os_hvp[os_name]:
2387
              self.new_os_hvp[os_name][hv_name] = hv_dict
2388
            else:
2389
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
2390

    
2391
    if self.op.enabled_hypervisors is not None:
2392
      self.hv_list = self.op.enabled_hypervisors
2393
      if not self.hv_list:
2394
        raise errors.OpPrereqError("Enabled hypervisors list must contain at"
2395
                                   " least one member",
2396
                                   errors.ECODE_INVAL)
2397
      invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES
2398
      if invalid_hvs:
2399
        raise errors.OpPrereqError("Enabled hypervisors contains invalid"
2400
                                   " entries: %s" %
2401
                                   utils.CommaJoin(invalid_hvs),
2402
                                   errors.ECODE_INVAL)
2403
    else:
2404
      self.hv_list = cluster.enabled_hypervisors
2405

    
2406
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
2407
      # either the enabled list has changed, or the parameters have, validate
2408
      for hv_name, hv_params in self.new_hvparams.items():
2409
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
2410
            (self.op.enabled_hypervisors and
2411
             hv_name in self.op.enabled_hypervisors)):
2412
          # either this is a new hypervisor, or its parameters have changed
2413
          hv_class = hypervisor.GetHypervisor(hv_name)
2414
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
2415
          hv_class.CheckParameterSyntax(hv_params)
2416
          _CheckHVParams(self, node_list, hv_name, hv_params)
2417

    
2418
    if self.op.os_hvp:
2419
      # no need to check any newly-enabled hypervisors, since the
2420
      # defaults have already been checked in the above code-block
2421
      for os_name, os_hvp in self.new_os_hvp.items():
2422
        for hv_name, hv_params in os_hvp.items():
2423
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
2424
          # we need to fill in the new os_hvp on top of the actual hv_p
2425
          cluster_defaults = self.new_hvparams.get(hv_name, {})
2426
          new_osp = objects.FillDict(cluster_defaults, hv_params)
2427
          hv_class = hypervisor.GetHypervisor(hv_name)
2428
          hv_class.CheckParameterSyntax(new_osp)
2429
          _CheckHVParams(self, node_list, hv_name, new_osp)
2430

    
2431

    
2432
  def Exec(self, feedback_fn):
2433
    """Change the parameters of the cluster.
2434

2435
    """
2436
    if self.op.vg_name is not None:
2437
      new_volume = self.op.vg_name
2438
      if not new_volume:
2439
        new_volume = None
2440
      if new_volume != self.cfg.GetVGName():
2441
        self.cfg.SetVGName(new_volume)
2442
      else:
2443
        feedback_fn("Cluster LVM configuration already in desired"
2444
                    " state, not changing")
2445
    if self.op.hvparams:
2446
      self.cluster.hvparams = self.new_hvparams
2447
    if self.op.os_hvp:
2448
      self.cluster.os_hvp = self.new_os_hvp
2449
    if self.op.enabled_hypervisors is not None:
2450
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
2451
    if self.op.beparams:
2452
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
2453
    if self.op.nicparams:
2454
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
2455

    
2456
    if self.op.candidate_pool_size is not None:
2457
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
2458
      # we need to update the pool size here, otherwise the save will fail
2459
      _AdjustCandidatePool(self, [])
2460

    
2461
    if self.op.maintain_node_health is not None:
2462
      self.cluster.maintain_node_health = self.op.maintain_node_health
2463

    
2464
    self.cfg.Update(self.cluster, feedback_fn)
2465

    
2466

    
2467
def _RedistributeAncillaryFiles(lu, additional_nodes=None):
2468
  """Distribute additional files which are part of the cluster configuration.
2469

2470
  ConfigWriter takes care of distributing the config and ssconf files, but
2471
  there are more files which should be distributed to all nodes. This function
2472
  makes sure those are copied.
2473

2474
  @param lu: calling logical unit
2475
  @param additional_nodes: list of nodes not in the config to distribute to
2476

2477
  """
2478
  # 1. Gather target nodes
2479
  myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
2480
  dist_nodes = lu.cfg.GetOnlineNodeList()
2481
  if additional_nodes is not None:
2482
    dist_nodes.extend(additional_nodes)
2483
  if myself.name in dist_nodes:
2484
    dist_nodes.remove(myself.name)
2485

    
2486
  # 2. Gather files to distribute
2487
  dist_files = set([constants.ETC_HOSTS,
2488
                    constants.SSH_KNOWN_HOSTS_FILE,
2489
                    constants.RAPI_CERT_FILE,
2490
                    constants.RAPI_USERS_FILE,
2491
                    constants.CONFD_HMAC_KEY,
2492
                   ])
2493

    
2494
  enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
2495
  for hv_name in enabled_hypervisors:
2496
    hv_class = hypervisor.GetHypervisor(hv_name)
2497
    dist_files.update(hv_class.GetAncillaryFiles())
2498

    
2499
  # 3. Perform the files upload
2500
  for fname in dist_files:
2501
    if os.path.exists(fname):
2502
      result = lu.rpc.call_upload_file(dist_nodes, fname)
2503
      for to_node, to_result in result.items():
2504
        msg = to_result.fail_msg
2505
        if msg:
2506
          msg = ("Copy of file %s to node %s failed: %s" %
2507
                 (fname, to_node, msg))
2508
          lu.proc.LogWarning(msg)
2509

    
2510

    
2511
class LURedistributeConfig(NoHooksLU):
2512
  """Force the redistribution of cluster configuration.
2513

2514
  This is a very simple LU.
2515

2516
  """
2517
  _OP_REQP = []
2518
  REQ_BGL = False
2519

    
2520
  def ExpandNames(self):
2521
    self.needed_locks = {
2522
      locking.LEVEL_NODE: locking.ALL_SET,
2523
    }
2524
    self.share_locks[locking.LEVEL_NODE] = 1
2525

    
2526
  def CheckPrereq(self):
2527
    """Check prerequisites.
2528

2529
    """
2530

    
2531
  def Exec(self, feedback_fn):
2532
    """Redistribute the configuration.
2533

2534
    """
2535
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
2536
    _RedistributeAncillaryFiles(self)
2537

    
2538

    
2539
def _WaitForSync(lu, instance, oneshot=False):
2540
  """Sleep and poll for an instance's disk to sync.
2541

2542
  """
2543
  if not instance.disks:
2544
    return True
2545

    
2546
  if not oneshot:
2547
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
2548

    
2549
  node = instance.primary_node
2550

    
2551
  for dev in instance.disks:
2552
    lu.cfg.SetDiskID(dev, node)
2553

    
2554
  # TODO: Convert to utils.Retry
2555

    
2556
  retries = 0
2557
  degr_retries = 10 # in seconds, as we sleep 1 second each time
2558
  while True:
2559
    max_time = 0
2560
    done = True
2561
    cumul_degraded = False
2562
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
2563
    msg = rstats.fail_msg
2564
    if msg:
2565
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
2566
      retries += 1
2567
      if retries >= 10:
2568
        raise errors.RemoteError("Can't contact node %s for mirror data,"
2569
                                 " aborting." % node)
2570
      time.sleep(6)
2571
      continue
2572
    rstats = rstats.payload
2573
    retries = 0
2574
    for i, mstat in enumerate(rstats):
2575
      if mstat is None:
2576
        lu.LogWarning("Can't compute data for node %s/%s",
2577
                           node, instance.disks[i].iv_name)
2578
        continue
2579

    
2580
      cumul_degraded = (cumul_degraded or
2581
                        (mstat.is_degraded and mstat.sync_percent is None))
2582
      if mstat.sync_percent is not None:
2583
        done = False
2584
        if mstat.estimated_time is not None:
2585
          rem_time = "%d estimated seconds remaining" % mstat.estimated_time
2586
          max_time = mstat.estimated_time
2587
        else:
2588
          rem_time = "no time estimate"
2589
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
2590
                        (instance.disks[i].iv_name, mstat.sync_percent,
2591
                         rem_time))
2592

    
2593
    # if we're done but degraded, let's do a few small retries, to
2594
    # make sure we see a stable and not transient situation; therefore
2595
    # we force restart of the loop
2596
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
2597
      logging.info("Degraded disks found, %d retries left", degr_retries)
2598
      degr_retries -= 1
2599
      time.sleep(1)
2600
      continue
2601

    
2602
    if done or oneshot:
2603
      break
2604

    
2605
    time.sleep(min(60, max_time))
2606

    
2607
  if done:
2608
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
2609
  return not cumul_degraded
2610

    
2611

    
2612
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
2613
  """Check that mirrors are not degraded.
2614

2615
  The ldisk parameter, if True, will change the test from the
2616
  is_degraded attribute (which represents overall non-ok status for
2617
  the device(s)) to the ldisk (representing the local storage status).
2618

2619
  """
2620
  lu.cfg.SetDiskID(dev, node)
2621

    
2622
  result = True
2623

    
2624
  if on_primary or dev.AssembleOnSecondary():
2625
    rstats = lu.rpc.call_blockdev_find(node, dev)
2626
    msg = rstats.fail_msg
2627
    if msg:
2628
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
2629
      result = False
2630
    elif not rstats.payload:
2631
      lu.LogWarning("Can't find disk on node %s", node)
2632
      result = False
2633
    else:
2634
      if ldisk:
2635
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
2636
      else:
2637
        result = result and not rstats.payload.is_degraded
2638

    
2639
  if dev.children:
2640
    for child in dev.children:
2641
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
2642

    
2643
  return result
2644

    
2645

    
2646
class LUDiagnoseOS(NoHooksLU):
2647
  """Logical unit for OS diagnose/query.
2648

2649
  """
2650
  _OP_REQP = ["output_fields", "names"]
2651
  REQ_BGL = False
2652
  _FIELDS_STATIC = utils.FieldSet()
2653
  _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status", "variants")
2654
  # Fields that need calculation of global os validity
2655
  _FIELDS_NEEDVALID = frozenset(["valid", "variants"])
2656

    
2657
  def ExpandNames(self):
2658
    if self.op.names:
2659
      raise errors.OpPrereqError("Selective OS query not supported",
2660
                                 errors.ECODE_INVAL)
2661

    
2662
    _CheckOutputFields(static=self._FIELDS_STATIC,
2663
                       dynamic=self._FIELDS_DYNAMIC,
2664
                       selected=self.op.output_fields)
2665

    
2666
    # Lock all nodes, in shared mode
2667
    # Temporary removal of locks, should be reverted later
2668
    # TODO: reintroduce locks when they are lighter-weight
2669
    self.needed_locks = {}
2670
    #self.share_locks[locking.LEVEL_NODE] = 1
2671
    #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2672

    
2673
  def CheckPrereq(self):
2674
    """Check prerequisites.
2675

2676
    """
2677

    
2678
  @staticmethod
2679
  def _DiagnoseByOS(rlist):
2680
    """Remaps a per-node return list into an a per-os per-node dictionary
2681

2682
    @param rlist: a map with node names as keys and OS objects as values
2683

2684
    @rtype: dict
2685
    @return: a dictionary with osnames as keys and as value another map, with
2686
        nodes as keys and tuples of (path, status, diagnose) as values, eg::
2687

2688
          {"debian-etch": {"node1": [(/usr/lib/..., True, ""),
2689
                                     (/srv/..., False, "invalid api")],
2690
                           "node2": [(/srv/..., True, "")]}
2691
          }
2692

2693
    """
2694
    all_os = {}
2695
    # we build here the list of nodes that didn't fail the RPC (at RPC
2696
    # level), so that nodes with a non-responding node daemon don't
2697
    # make all OSes invalid
2698
    good_nodes = [node_name for node_name in rlist
2699
                  if not rlist[node_name].fail_msg]
2700
    for node_name, nr in rlist.items():
2701
      if nr.fail_msg or not nr.payload:
2702
        continue
2703
      for name, path, status, diagnose, variants in nr.payload:
2704
        if name not in all_os:
2705
          # build a list of nodes for this os containing empty lists
2706
          # for each node in node_list
2707
          all_os[name] = {}
2708
          for nname in good_nodes:
2709
            all_os[name][nname] = []
2710
        all_os[name][node_name].append((path, status, diagnose, variants))
2711
    return all_os
2712

    
2713
  def Exec(self, feedback_fn):
2714
    """Compute the list of OSes.
2715

2716
    """
2717
    valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
2718
    node_data = self.rpc.call_os_diagnose(valid_nodes)
2719
    pol = self._DiagnoseByOS(node_data)
2720
    output = []
2721
    calc_valid = self._FIELDS_NEEDVALID.intersection(self.op.output_fields)
2722
    calc_variants = "variants" in self.op.output_fields
2723

    
2724
    for os_name, os_data in pol.items():
2725
      row = []
2726
      if calc_valid:
2727
        valid = True
2728
        variants = None
2729
        for osl in os_data.values():
2730
          valid = valid and osl and osl[0][1]
2731
          if not valid:
2732
            variants = None
2733
            break
2734
          if calc_variants:
2735
            node_variants = osl[0][3]
2736
            if variants is None:
2737
              variants = node_variants
2738
            else:
2739
              variants = [v for v in variants if v in node_variants]
2740

    
2741
      for field in self.op.output_fields:
2742
        if field == "name":
2743
          val = os_name
2744
        elif field == "valid":
2745
          val = valid
2746
        elif field == "node_status":
2747
          # this is just a copy of the dict
2748
          val = {}
2749
          for node_name, nos_list in os_data.items():
2750
            val[node_name] = nos_list
2751
        elif field == "variants":
2752
          val =  variants
2753
        else:
2754
          raise errors.ParameterError(field)
2755
        row.append(val)
2756
      output.append(row)
2757

    
2758
    return output
2759

    
2760

    
2761
class LURemoveNode(LogicalUnit):
2762
  """Logical unit for removing a node.
2763

2764
  """
2765
  HPATH = "node-remove"
2766
  HTYPE = constants.HTYPE_NODE
2767
  _OP_REQP = ["node_name"]
2768

    
2769
  def BuildHooksEnv(self):
2770
    """Build hooks env.
2771

2772
    This doesn't run on the target node in the pre phase as a failed
2773
    node would then be impossible to remove.
2774

2775
    """
2776
    env = {
2777
      "OP_TARGET": self.op.node_name,
2778
      "NODE_NAME": self.op.node_name,
2779
      }
2780
    all_nodes = self.cfg.GetNodeList()
2781
    try:
2782
      all_nodes.remove(self.op.node_name)
2783
    except ValueError:
2784
      logging.warning("Node %s which is about to be removed not found"
2785
                      " in the all nodes list", self.op.node_name)
2786
    return env, all_nodes, all_nodes
2787

    
2788
  def CheckPrereq(self):
2789
    """Check prerequisites.
2790

2791
    This checks:
2792
     - the node exists in the configuration
2793
     - it does not have primary or secondary instances
2794
     - it's not the master
2795

2796
    Any errors are signaled by raising errors.OpPrereqError.
2797

2798
    """
2799
    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
2800
    node = self.cfg.GetNodeInfo(self.op.node_name)
2801
    assert node is not None
2802

    
2803
    instance_list = self.cfg.GetInstanceList()
2804

    
2805
    masternode = self.cfg.GetMasterNode()
2806
    if node.name == masternode:
2807
      raise errors.OpPrereqError("Node is the master node,"
2808
                                 " you need to failover first.",
2809
                                 errors.ECODE_INVAL)
2810

    
2811
    for instance_name in instance_list:
2812
      instance = self.cfg.GetInstanceInfo(instance_name)
2813
      if node.name in instance.all_nodes:
2814
        raise errors.OpPrereqError("Instance %s is still running on the node,"
2815
                                   " please remove first." % instance_name,
2816
                                   errors.ECODE_INVAL)
2817
    self.op.node_name = node.name
2818
    self.node = node
2819

    
2820
  def Exec(self, feedback_fn):
2821
    """Removes the node from the cluster.
2822

2823
    """
2824
    node = self.node
2825
    logging.info("Stopping the node daemon and removing configs from node %s",
2826
                 node.name)
2827

    
2828
    modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
2829

    
2830
    # Promote nodes to master candidate as needed
2831
    _AdjustCandidatePool(self, exceptions=[node.name])
2832
    self.context.RemoveNode(node.name)
2833

    
2834
    # Run post hooks on the node before it's removed
2835
    hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
2836
    try:
2837
      hm.RunPhase(constants.HOOKS_PHASE_POST, [node.name])
2838
    except:
2839
      # pylint: disable-msg=W0702
2840
      self.LogWarning("Errors occurred running hooks on %s" % node.name)
2841

    
2842
    result = self.rpc.call_node_leave_cluster(node.name, modify_ssh_setup)
2843
    msg = result.fail_msg
2844
    if msg:
2845
      self.LogWarning("Errors encountered on the remote node while leaving"
2846
                      " the cluster: %s", msg)
2847

    
2848

    
2849
class LUQueryNodes(NoHooksLU):
2850
  """Logical unit for querying nodes.
2851

2852
  """
2853
  # pylint: disable-msg=W0142
2854
  _OP_REQP = ["output_fields", "names", "use_locking"]
2855
  REQ_BGL = False
2856

    
2857
  _SIMPLE_FIELDS = ["name", "serial_no", "ctime", "mtime", "uuid",
2858
                    "master_candidate", "offline", "drained"]
2859

    
2860
  _FIELDS_DYNAMIC = utils.FieldSet(
2861
    "dtotal", "dfree",
2862
    "mtotal", "mnode", "mfree",
2863
    "bootid",
2864
    "ctotal", "cnodes", "csockets",
2865
    )
2866

    
2867
  _FIELDS_STATIC = utils.FieldSet(*[
2868
    "pinst_cnt", "sinst_cnt",
2869
    "pinst_list", "sinst_list",
2870
    "pip", "sip", "tags",
2871
    "master",
2872
    "role"] + _SIMPLE_FIELDS
2873
    )
2874

    
2875
  def ExpandNames(self):
2876
    _CheckOutputFields(static=self._FIELDS_STATIC,
2877
                       dynamic=self._FIELDS_DYNAMIC,
2878
                       selected=self.op.output_fields)
2879

    
2880
    self.needed_locks = {}
2881
    self.share_locks[locking.LEVEL_NODE] = 1
2882

    
2883
    if self.op.names:
2884
      self.wanted = _GetWantedNodes(self, self.op.names)
2885
    else:
2886
      self.wanted = locking.ALL_SET
2887

    
2888
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2889
    self.do_locking = self.do_node_query and self.op.use_locking
2890
    if self.do_locking:
2891
      # if we don't request only static fields, we need to lock the nodes
2892
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
2893

    
2894
  def CheckPrereq(self):
2895
    """Check prerequisites.
2896

2897
    """
2898
    # The validation of the node list is done in the _GetWantedNodes,
2899
    # if non empty, and if empty, there's no validation to do
2900
    pass
2901

    
2902
  def Exec(self, feedback_fn):
2903
    """Computes the list of nodes and their attributes.
2904

2905
    """
2906
    all_info = self.cfg.GetAllNodesInfo()
2907
    if self.do_locking:
2908
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
2909
    elif self.wanted != locking.ALL_SET:
2910
      nodenames = self.wanted
2911
      missing = set(nodenames).difference(all_info.keys())
2912
      if missing:
2913
        raise errors.OpExecError(
2914
          "Some nodes were removed before retrieving their data: %s" % missing)
2915
    else:
2916
      nodenames = all_info.keys()
2917

    
2918
    nodenames = utils.NiceSort(nodenames)
2919
    nodelist = [all_info[name] for name in nodenames]
2920

    
2921
    # begin data gathering
2922

    
2923
    if self.do_node_query:
2924
      live_data = {}
2925
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2926
                                          self.cfg.GetHypervisorType())
2927
      for name in nodenames:
2928
        nodeinfo = node_data[name]
2929
        if not nodeinfo.fail_msg and nodeinfo.payload:
2930
          nodeinfo = nodeinfo.payload
2931
          fn = utils.TryConvert
2932
          live_data[name] = {
2933
            "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2934
            "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2935
            "mfree": fn(int, nodeinfo.get('memory_free', None)),
2936
            "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2937
            "dfree": fn(int, nodeinfo.get('vg_free', None)),
2938
            "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2939
            "bootid": nodeinfo.get('bootid', None),
2940
            "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2941
            "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2942
            }
2943
        else:
2944
          live_data[name] = {}
2945
    else:
2946
      live_data = dict.fromkeys(nodenames, {})
2947

    
2948
    node_to_primary = dict([(name, set()) for name in nodenames])
2949
    node_to_secondary = dict([(name, set()) for name in nodenames])
2950

    
2951
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
2952
                             "sinst_cnt", "sinst_list"))
2953
    if inst_fields & frozenset(self.op.output_fields):
2954
      inst_data = self.cfg.GetAllInstancesInfo()
2955

    
2956
      for inst in inst_data.values():
2957
        if inst.primary_node in node_to_primary:
2958
          node_to_primary[inst.primary_node].add(inst.name)
2959
        for secnode in inst.secondary_nodes:
2960
          if secnode in node_to_secondary:
2961
            node_to_secondary[secnode].add(inst.name)
2962

    
2963
    master_node = self.cfg.GetMasterNode()
2964

    
2965
    # end data gathering
2966

    
2967
    output = []
2968
    for node in nodelist:
2969
      node_output = []
2970
      for field in self.op.output_fields:
2971
        if field in self._SIMPLE_FIELDS:
2972
          val = getattr(node, field)
2973
        elif field == "pinst_list":
2974
          val = list(node_to_primary[node.name])
2975
        elif field == "sinst_list":
2976
          val = list(node_to_secondary[node.name])
2977
        elif field == "pinst_cnt":
2978
          val = len(node_to_primary[node.name])
2979
        elif field == "sinst_cnt":
2980
          val = len(node_to_secondary[node.name])
2981
        elif field == "pip":
2982
          val = node.primary_ip
2983
        elif field == "sip":
2984
          val = node.secondary_ip
2985
        elif field == "tags":
2986
          val = list(node.GetTags())
2987
        elif field == "master":
2988
          val = node.name == master_node
2989
        elif self._FIELDS_DYNAMIC.Matches(field):
2990
          val = live_data[node.name].get(field, None)
2991
        elif field == "role":
2992
          if node.name == master_node:
2993
            val = "M"
2994
          elif node.master_candidate:
2995
            val = "C"
2996
          elif node.drained:
2997
            val = "D"
2998
          elif node.offline:
2999
            val = "O"
3000
          else:
3001
            val = "R"
3002
        else:
3003
          raise errors.ParameterError(field)
3004
        node_output.append(val)
3005
      output.append(node_output)
3006

    
3007
    return output
3008

    
3009

    
3010
class LUQueryNodeVolumes(NoHooksLU):
3011
  """Logical unit for getting volumes on node(s).
3012

3013
  """
3014
  _OP_REQP = ["nodes", "output_fields"]
3015
  REQ_BGL = False
3016
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
3017
  _FIELDS_STATIC = utils.FieldSet("node")
3018

    
3019
  def ExpandNames(self):
3020
    _CheckOutputFields(static=self._FIELDS_STATIC,
3021
                       dynamic=self._FIELDS_DYNAMIC,
3022
                       selected=self.op.output_fields)
3023

    
3024
    self.needed_locks = {}
3025
    self.share_locks[locking.LEVEL_NODE] = 1
3026
    if not self.op.nodes:
3027
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3028
    else:
3029
      self.needed_locks[locking.LEVEL_NODE] = \
3030
        _GetWantedNodes(self, self.op.nodes)
3031

    
3032
  def CheckPrereq(self):
3033
    """Check prerequisites.
3034

3035
    This checks that the fields required are valid output fields.
3036

3037
    """
3038
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
3039

    
3040
  def Exec(self, feedback_fn):
3041
    """Computes the list of nodes and their attributes.
3042

3043
    """
3044
    nodenames = self.nodes
3045
    volumes = self.rpc.call_node_volumes(nodenames)
3046

    
3047
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
3048
             in self.cfg.GetInstanceList()]
3049

    
3050
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
3051

    
3052
    output = []
3053
    for node in nodenames:
3054
      nresult = volumes[node]
3055
      if nresult.offline:
3056
        continue
3057
      msg = nresult.fail_msg
3058
      if msg:
3059
        self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
3060
        continue
3061

    
3062
      node_vols = nresult.payload[:]
3063
      node_vols.sort(key=lambda vol: vol['dev'])
3064

    
3065
      for vol in node_vols:
3066
        node_output = []
3067
        for field in self.op.output_fields:
3068
          if field == "node":
3069
            val = node
3070
          elif field == "phys":
3071
            val = vol['dev']
3072
          elif field == "vg":
3073
            val = vol['vg']
3074
          elif field == "name":
3075
            val = vol['name']
3076
          elif field == "size":
3077
            val = int(float(vol['size']))
3078
          elif field == "instance":
3079
            for inst in ilist:
3080
              if node not in lv_by_node[inst]:
3081
                continue
3082
              if vol['name'] in lv_by_node[inst][node]:
3083
                val = inst.name
3084
                break
3085
            else:
3086
              val = '-'
3087
          else:
3088
            raise errors.ParameterError(field)
3089
          node_output.append(str(val))
3090

    
3091
        output.append(node_output)
3092

    
3093
    return output
3094

    
3095

    
3096
class LUQueryNodeStorage(NoHooksLU):
3097
  """Logical unit for getting information on storage units on node(s).
3098

3099
  """
3100
  _OP_REQP = ["nodes", "storage_type", "output_fields"]
3101
  REQ_BGL = False
3102
  _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
3103

    
3104
  def CheckArguments(self):
3105
    _CheckStorageType(self.op.storage_type)
3106

    
3107
    _CheckOutputFields(static=self._FIELDS_STATIC,
3108
                       dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
3109
                       selected=self.op.output_fields)
3110

    
3111
  def ExpandNames(self):
3112
    self.needed_locks = {}
3113
    self.share_locks[locking.LEVEL_NODE] = 1
3114

    
3115
    if self.op.nodes:
3116
      self.needed_locks[locking.LEVEL_NODE] = \
3117
        _GetWantedNodes(self, self.op.nodes)
3118
    else:
3119
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3120

    
3121
  def CheckPrereq(self):
3122
    """Check prerequisites.
3123

3124
    This checks that the fields required are valid output fields.
3125

3126
    """
3127
    self.op.name = getattr(self.op, "name", None)
3128

    
3129
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
3130

    
3131
  def Exec(self, feedback_fn):
3132
    """Computes the list of nodes and their attributes.
3133

3134
    """
3135
    # Always get name to sort by
3136
    if constants.SF_NAME in self.op.output_fields:
3137
      fields = self.op.output_fields[:]
3138
    else:
3139
      fields = [constants.SF_NAME] + self.op.output_fields
3140

    
3141
    # Never ask for node or type as it's only known to the LU
3142
    for extra in [constants.SF_NODE, constants.SF_TYPE]:
3143
      while extra in fields:
3144
        fields.remove(extra)
3145

    
3146
    field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
3147
    name_idx = field_idx[constants.SF_NAME]
3148

    
3149
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
3150
    data = self.rpc.call_storage_list(self.nodes,
3151
                                      self.op.storage_type, st_args,
3152
                                      self.op.name, fields)
3153

    
3154
    result = []
3155

    
3156
    for node in utils.NiceSort(self.nodes):
3157
      nresult = data[node]
3158
      if nresult.offline:
3159
        continue
3160

    
3161
      msg = nresult.fail_msg
3162
      if msg:
3163
        self.LogWarning("Can't get storage data from node %s: %s", node, msg)
3164
        continue
3165

    
3166
      rows = dict([(row[name_idx], row) for row in nresult.payload])
3167

    
3168
      for name in utils.NiceSort(rows.keys()):
3169
        row = rows[name]
3170

    
3171
        out = []
3172

    
3173
        for field in self.op.output_fields:
3174
          if field == constants.SF_NODE:
3175
            val = node
3176
          elif field == constants.SF_TYPE:
3177
            val = self.op.storage_type
3178
          elif field in field_idx:
3179
            val = row[field_idx[field]]
3180
          else:
3181
            raise errors.ParameterError(field)
3182

    
3183
          out.append(val)
3184

    
3185
        result.append(out)
3186

    
3187
    return result
3188

    
3189

    
3190
class LUModifyNodeStorage(NoHooksLU):
3191
  """Logical unit for modifying a storage volume on a node.
3192

3193
  """
3194
  _OP_REQP = ["node_name", "storage_type", "name", "changes"]
3195
  REQ_BGL = False
3196

    
3197
  def CheckArguments(self):
3198
    self.opnode_name = _ExpandNodeName(self.cfg, self.op.node_name)
3199

    
3200
    _CheckStorageType(self.op.storage_type)
3201

    
3202
  def ExpandNames(self):
3203
    self.needed_locks = {
3204
      locking.LEVEL_NODE: self.op.node_name,
3205
      }
3206

    
3207
  def CheckPrereq(self):
3208
    """Check prerequisites.
3209

3210
    """
3211
    storage_type = self.op.storage_type
3212

    
3213
    try:
3214
      modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
3215
    except KeyError:
3216
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
3217
                                 " modified" % storage_type,
3218
                                 errors.ECODE_INVAL)
3219

    
3220
    diff = set(self.op.changes.keys()) - modifiable
3221
    if diff:
3222
      raise errors.OpPrereqError("The following fields can not be modified for"
3223
                                 " storage units of type '%s': %r" %
3224
                                 (storage_type, list(diff)),
3225
                                 errors.ECODE_INVAL)
3226

    
3227
  def Exec(self, feedback_fn):
3228
    """Computes the list of nodes and their attributes.
3229

3230
    """
3231
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
3232
    result = self.rpc.call_storage_modify(self.op.node_name,
3233
                                          self.op.storage_type, st_args,
3234
                                          self.op.name, self.op.changes)
3235
    result.Raise("Failed to modify storage unit '%s' on %s" %
3236
                 (self.op.name, self.op.node_name))
3237

    
3238

    
3239
class LUAddNode(LogicalUnit):
3240
  """Logical unit for adding node to the cluster.
3241

3242
  """
3243
  HPATH = "node-add"
3244
  HTYPE = constants.HTYPE_NODE
3245
  _OP_REQP = ["node_name"]
3246

    
3247
  def CheckArguments(self):
3248
    # validate/normalize the node name
3249
    self.op.node_name = utils.HostInfo.NormalizeName(self.op.node_name)
3250

    
3251
  def BuildHooksEnv(self):
3252
    """Build hooks env.
3253

3254
    This will run on all nodes before, and on all nodes + the new node after.
3255

3256
    """
3257
    env = {
3258
      "OP_TARGET": self.op.node_name,
3259
      "NODE_NAME": self.op.node_name,
3260
      "NODE_PIP": self.op.primary_ip,
3261
      "NODE_SIP": self.op.secondary_ip,
3262
      }
3263
    nodes_0 = self.cfg.GetNodeList()
3264
    nodes_1 = nodes_0 + [self.op.node_name, ]
3265
    return env, nodes_0, nodes_1
3266

    
3267
  def CheckPrereq(self):
3268
    """Check prerequisites.
3269

3270
    This checks:
3271
     - the new node is not already in the config
3272
     - it is resolvable
3273
     - its parameters (single/dual homed) matches the cluster
3274

3275
    Any errors are signaled by raising errors.OpPrereqError.
3276

3277
    """
3278
    node_name = self.op.node_name
3279
    cfg = self.cfg
3280

    
3281
    dns_data = utils.GetHostInfo(node_name)
3282

    
3283
    node = dns_data.name
3284
    primary_ip = self.op.primary_ip = dns_data.ip
3285
    secondary_ip = getattr(self.op, "secondary_ip", None)
3286
    if secondary_ip is None:
3287
      secondary_ip = primary_ip
3288
    if not utils.IsValidIP(secondary_ip):
3289
      raise errors.OpPrereqError("Invalid secondary IP given",
3290
                                 errors.ECODE_INVAL)
3291
    self.op.secondary_ip = secondary_ip
3292

    
3293
    node_list = cfg.GetNodeList()
3294
    if not self.op.readd and node in node_list:
3295
      raise errors.OpPrereqError("Node %s is already in the configuration" %
3296
                                 node, errors.ECODE_EXISTS)
3297
    elif self.op.readd and node not in node_list:
3298
      raise errors.OpPrereqError("Node %s is not in the configuration" % node,
3299
                                 errors.ECODE_NOENT)
3300

    
3301
    for existing_node_name in node_list:
3302
      existing_node = cfg.GetNodeInfo(existing_node_name)
3303

    
3304
      if self.op.readd and node == existing_node_name:
3305
        if (existing_node.primary_ip != primary_ip or
3306
            existing_node.secondary_ip != secondary_ip):
3307
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
3308
                                     " address configuration as before",
3309
                                     errors.ECODE_INVAL)
3310
        continue
3311

    
3312
      if (existing_node.primary_ip == primary_ip or
3313
          existing_node.secondary_ip == primary_ip or
3314
          existing_node.primary_ip == secondary_ip or
3315
          existing_node.secondary_ip == secondary_ip):
3316
        raise errors.OpPrereqError("New node ip address(es) conflict with"
3317
                                   " existing node %s" % existing_node.name,
3318
                                   errors.ECODE_NOTUNIQUE)
3319

    
3320
    # check that the type of the node (single versus dual homed) is the
3321
    # same as for the master
3322
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
3323
    master_singlehomed = myself.secondary_ip == myself.primary_ip
3324
    newbie_singlehomed = secondary_ip == primary_ip
3325
    if master_singlehomed != newbie_singlehomed:
3326
      if master_singlehomed:
3327
        raise errors.OpPrereqError("The master has no private ip but the"
3328
                                   " new node has one",
3329
                                   errors.ECODE_INVAL)
3330
      else:
3331
        raise errors.OpPrereqError("The master has a private ip but the"
3332
                                   " new node doesn't have one",
3333
                                   errors.ECODE_INVAL)
3334

    
3335
    # checks reachability
3336
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
3337
      raise errors.OpPrereqError("Node not reachable by ping",
3338
                                 errors.ECODE_ENVIRON)
3339

    
3340
    if not newbie_singlehomed:
3341
      # check reachability from my secondary ip to newbie's secondary ip
3342
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
3343
                           source=myself.secondary_ip):
3344
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
3345
                                   " based ping to noded port",
3346
                                   errors.ECODE_ENVIRON)
3347

    
3348
    if self.op.readd:
3349
      exceptions = [node]
3350
    else:
3351
      exceptions = []
3352

    
3353
    self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
3354

    
3355
    if self.op.readd:
3356
      self.new_node = self.cfg.GetNodeInfo(node)
3357
      assert self.new_node is not None, "Can't retrieve locked node %s" % node
3358
    else:
3359
      self.new_node = objects.Node(name=node,
3360
                                   primary_ip=primary_ip,
3361
                                   secondary_ip=secondary_ip,
3362
                                   master_candidate=self.master_candidate,
3363
                                   offline=False, drained=False)
3364

    
3365
  def Exec(self, feedback_fn):
3366
    """Adds the new node to the cluster.
3367

3368
    """
3369
    new_node = self.new_node
3370
    node = new_node.name
3371

    
3372
    # for re-adds, reset the offline/drained/master-candidate flags;
3373
    # we need to reset here, otherwise offline would prevent RPC calls
3374
    # later in the procedure; this also means that if the re-add
3375
    # fails, we are left with a non-offlined, broken node
3376
    if self.op.readd:
3377
      new_node.drained = new_node.offline = False # pylint: disable-msg=W0201
3378
      self.LogInfo("Readding a node, the offline/drained flags were reset")
3379
      # if we demote the node, we do cleanup later in the procedure
3380
      new_node.master_candidate = self.master_candidate
3381

    
3382
    # notify the user about any possible mc promotion
3383
    if new_node.master_candidate:
3384
      self.LogInfo("Node will be a master candidate")
3385

    
3386
    # check connectivity
3387
    result = self.rpc.call_version([node])[node]
3388
    result.Raise("Can't get version information from node %s" % node)
3389
    if constants.PROTOCOL_VERSION == result.payload:
3390
      logging.info("Communication to node %s fine, sw version %s match",
3391
                   node, result.payload)
3392
    else:
3393
      raise errors.OpExecError("Version mismatch master version %s,"
3394
                               " node version %s" %
3395
                               (constants.PROTOCOL_VERSION, result.payload))
3396

    
3397
    # setup ssh on node
3398
    if self.cfg.GetClusterInfo().modify_ssh_setup:
3399
      logging.info("Copy ssh key to node %s", node)
3400
      priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
3401
      keyarray = []
3402
      keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
3403
                  constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
3404
                  priv_key, pub_key]
3405

    
3406
      for i in keyfiles:
3407
        keyarray.append(utils.ReadFile(i))
3408

    
3409
      result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
3410
                                      keyarray[2], keyarray[3], keyarray[4],
3411
                                      keyarray[5])
3412
      result.Raise("Cannot transfer ssh keys to the new node")
3413

    
3414
    # Add node to our /etc/hosts, and add key to known_hosts
3415
    if self.cfg.GetClusterInfo().modify_etc_hosts:
3416
      utils.AddHostToEtcHosts(new_node.name)
3417

    
3418
    if new_node.secondary_ip != new_node.primary_ip:
3419
      result = self.rpc.call_node_has_ip_address(new_node.name,
3420
                                                 new_node.secondary_ip)
3421
      result.Raise("Failure checking secondary ip on node %s" % new_node.name,
3422
                   prereq=True, ecode=errors.ECODE_ENVIRON)
3423
      if not result.payload:
3424
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
3425
                                 " you gave (%s). Please fix and re-run this"
3426
                                 " command." % new_node.secondary_ip)
3427

    
3428
    node_verify_list = [self.cfg.GetMasterNode()]
3429
    node_verify_param = {
3430
      constants.NV_NODELIST: [node],
3431
      # TODO: do a node-net-test as well?
3432
    }
3433

    
3434
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
3435
                                       self.cfg.GetClusterName())
3436
    for verifier in node_verify_list:
3437
      result[verifier].Raise("Cannot communicate with node %s" % verifier)
3438
      nl_payload = result[verifier].payload[constants.NV_NODELIST]
3439
      if nl_payload:
3440
        for failed in nl_payload:
3441
          feedback_fn("ssh/hostname verification failed"
3442
                      " (checking from %s): %s" %
3443
                      (verifier, nl_payload[failed]))
3444
        raise errors.OpExecError("ssh/hostname verification failed.")
3445

    
3446
    if self.op.readd:
3447
      _RedistributeAncillaryFiles(self)
3448
      self.context.ReaddNode(new_node)
3449
      # make sure we redistribute the config
3450
      self.cfg.Update(new_node, feedback_fn)
3451
      # and make sure the new node will not have old files around
3452
      if not new_node.master_candidate:
3453
        result = self.rpc.call_node_demote_from_mc(new_node.name)
3454
        msg = result.fail_msg
3455
        if msg:
3456
          self.LogWarning("Node failed to demote itself from master"
3457
                          " candidate status: %s" % msg)
3458
    else:
3459
      _RedistributeAncillaryFiles(self, additional_nodes=[node])
3460
      self.context.AddNode(new_node, self.proc.GetECId())
3461

    
3462

    
3463
class LUSetNodeParams(LogicalUnit):
3464
  """Modifies the parameters of a node.
3465

3466
  """
3467
  HPATH = "node-modify"
3468
  HTYPE = constants.HTYPE_NODE
3469
  _OP_REQP = ["node_name"]
3470
  REQ_BGL = False
3471

    
3472
  def CheckArguments(self):
3473
    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
3474
    _CheckBooleanOpField(self.op, 'master_candidate')
3475
    _CheckBooleanOpField(self.op, 'offline')
3476
    _CheckBooleanOpField(self.op, 'drained')
3477
    _CheckBooleanOpField(self.op, 'auto_promote')
3478
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
3479
    if all_mods.count(None) == 3:
3480
      raise errors.OpPrereqError("Please pass at least one modification",
3481
                                 errors.ECODE_INVAL)
3482
    if all_mods.count(True) > 1:
3483
      raise errors.OpPrereqError("Can't set the node into more than one"
3484
                                 " state at the same time",
3485
                                 errors.ECODE_INVAL)
3486

    
3487
    # Boolean value that tells us whether we're offlining or draining the node
3488
    self.offline_or_drain = (self.op.offline == True or
3489
                             self.op.drained == True)
3490
    self.deoffline_or_drain = (self.op.offline == False or
3491
                               self.op.drained == False)
3492
    self.might_demote = (self.op.master_candidate == False or
3493
                         self.offline_or_drain)
3494

    
3495
    self.lock_all = self.op.auto_promote and self.might_demote
3496

    
3497

    
3498
  def ExpandNames(self):
3499
    if self.lock_all:
3500
      self.needed_locks = {locking.LEVEL_NODE: locking.ALL_SET}
3501
    else:
3502
      self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
3503

    
3504
  def BuildHooksEnv(self):
3505
    """Build hooks env.
3506

3507
    This runs on the master node.
3508

3509
    """
3510
    env = {
3511
      "OP_TARGET": self.op.node_name,
3512
      "MASTER_CANDIDATE": str(self.op.master_candidate),
3513
      "OFFLINE": str(self.op.offline),
3514
      "DRAINED": str(self.op.drained),
3515
      }
3516
    nl = [self.cfg.GetMasterNode(),
3517
          self.op.node_name]
3518
    return env, nl, nl
3519

    
3520
  def CheckPrereq(self):
3521
    """Check prerequisites.
3522

3523
    This only checks the instance list against the existing names.
3524

3525
    """
3526
    node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
3527

    
3528
    if (self.op.master_candidate is not None or
3529
        self.op.drained is not None or
3530
        self.op.offline is not None):
3531
      # we can't change the master's node flags
3532
      if self.op.node_name == self.cfg.GetMasterNode():
3533
        raise errors.OpPrereqError("The master role can be changed"
3534
                                   " only via masterfailover",
3535
                                   errors.ECODE_INVAL)
3536

    
3537

    
3538
    if node.master_candidate and self.might_demote and not self.lock_all:
3539
      assert not self.op.auto_promote, "auto-promote set but lock_all not"
3540
      # check if after removing the current node, we're missing master
3541
      # candidates
3542
      (mc_remaining, mc_should, _) = \
3543
          self.cfg.GetMasterCandidateStats(exceptions=[node.name])
3544
      if mc_remaining < mc_should:
3545
        raise errors.OpPrereqError("Not enough master candidates, please"
3546
                                   " pass auto_promote to allow promotion",
3547
                                   errors.ECODE_INVAL)
3548

    
3549
    if (self.op.master_candidate == True and
3550
        ((node.offline and not self.op.offline == False) or
3551
         (node.drained and not self.op.drained == False))):
3552
      raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
3553
                                 " to master_candidate" % node.name,
3554
                                 errors.ECODE_INVAL)
3555

    
3556
    # If we're being deofflined/drained, we'll MC ourself if needed
3557
    if (self.deoffline_or_drain and not self.offline_or_drain and not
3558
        self.op.master_candidate == True and not node.master_candidate):
3559
      self.op.master_candidate = _DecideSelfPromotion(self)
3560
      if self.op.master_candidate:
3561
        self.LogInfo("Autopromoting node to master candidate")
3562

    
3563
    return
3564

    
3565
  def Exec(self, feedback_fn):
3566
    """Modifies a node.
3567

3568
    """
3569
    node = self.node
3570

    
3571
    result = []
3572
    changed_mc = False
3573

    
3574
    if self.op.offline is not None:
3575
      node.offline = self.op.offline
3576
      result.append(("offline", str(self.op.offline)))
3577
      if self.op.offline == True:
3578
        if node.master_candidate:
3579
          node.master_candidate = False
3580
          changed_mc = True
3581
          result.append(("master_candidate", "auto-demotion due to offline"))
3582
        if node.drained:
3583
          node.drained = False
3584
          result.append(("drained", "clear drained status due to offline"))
3585

    
3586
    if self.op.master_candidate is not None:
3587
      node.master_candidate = self.op.master_candidate
3588
      changed_mc = True
3589
      result.append(("master_candidate", str(self.op.master_candidate)))
3590
      if self.op.master_candidate == False:
3591
        rrc = self.rpc.call_node_demote_from_mc(node.name)
3592
        msg = rrc.fail_msg
3593
        if msg:
3594
          self.LogWarning("Node failed to demote itself: %s" % msg)
3595

    
3596
    if self.op.drained is not None:
3597
      node.drained = self.op.drained
3598
      result.append(("drained", str(self.op.drained)))
3599
      if self.op.drained == True:
3600
        if node.master_candidate:
3601
          node.master_candidate = False
3602
          changed_mc = True
3603
          result.append(("master_candidate", "auto-demotion due to drain"))
3604
          rrc = self.rpc.call_node_demote_from_mc(node.name)
3605
          msg = rrc.fail_msg
3606
          if msg:
3607
            self.LogWarning("Node failed to demote itself: %s" % msg)
3608
        if node.offline:
3609
          node.offline = False
3610
          result.append(("offline", "clear offline status due to drain"))
3611

    
3612
    # we locked all nodes, we adjust the CP before updating this node
3613
    if self.lock_all:
3614
      _AdjustCandidatePool(self, [node.name])
3615

    
3616
    # this will trigger configuration file update, if needed
3617
    self.cfg.Update(node, feedback_fn)
3618

    
3619
    # this will trigger job queue propagation or cleanup
3620
    if changed_mc:
3621
      self.context.ReaddNode(node)
3622

    
3623
    return result
3624

    
3625

    
3626
class LUPowercycleNode(NoHooksLU):
3627
  """Powercycles a node.
3628

3629
  """
3630
  _OP_REQP = ["node_name", "force"]
3631
  REQ_BGL = False
3632

    
3633
  def CheckArguments(self):
3634
    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
3635
    if self.op.node_name == self.cfg.GetMasterNode() and not self.op.force:
3636
      raise errors.OpPrereqError("The node is the master and the force"
3637
                                 " parameter was not set",
3638
                                 errors.ECODE_INVAL)
3639

    
3640
  def ExpandNames(self):
3641
    """Locking for PowercycleNode.
3642

3643
    This is a last-resort option and shouldn't block on other
3644
    jobs. Therefore, we grab no locks.
3645

3646
    """
3647
    self.needed_locks = {}
3648

    
3649
  def CheckPrereq(self):
3650
    """Check prerequisites.
3651

3652
    This LU has no prereqs.
3653

3654
    """
3655
    pass
3656

    
3657
  def Exec(self, feedback_fn):
3658
    """Reboots a node.
3659

3660
    """
3661
    result = self.rpc.call_node_powercycle(self.op.node_name,
3662
                                           self.cfg.GetHypervisorType())
3663
    result.Raise("Failed to schedule the reboot")
3664
    return result.payload
3665

    
3666

    
3667
class LUQueryClusterInfo(NoHooksLU):
3668
  """Query cluster configuration.
3669

3670
  """
3671
  _OP_REQP = []
3672
  REQ_BGL = False
3673

    
3674
  def ExpandNames(self):
3675
    self.needed_locks = {}
3676

    
3677
  def CheckPrereq(self):
3678
    """No prerequsites needed for this LU.
3679

3680
    """
3681
    pass
3682

    
3683
  def Exec(self, feedback_fn):
3684
    """Return cluster config.
3685

3686
    """
3687
    cluster = self.cfg.GetClusterInfo()
3688
    os_hvp = {}
3689

    
3690
    # Filter just for enabled hypervisors
3691
    for os_name, hv_dict in cluster.os_hvp.items():
3692
      os_hvp[os_name] = {}
3693
      for hv_name, hv_params in hv_dict.items():
3694
        if hv_name in cluster.enabled_hypervisors:
3695
          os_hvp[os_name][hv_name] = hv_params
3696

    
3697
    result = {
3698
      "software_version": constants.RELEASE_VERSION,
3699
      "protocol_version": constants.PROTOCOL_VERSION,
3700
      "config_version": constants.CONFIG_VERSION,
3701
      "os_api_version": max(constants.OS_API_VERSIONS),
3702
      "export_version": constants.EXPORT_VERSION,
3703
      "architecture": (platform.architecture()[0], platform.machine()),
3704
      "name": cluster.cluster_name,
3705
      "master": cluster.master_node,
3706
      "default_hypervisor": cluster.enabled_hypervisors[0],
3707
      "enabled_hypervisors": cluster.enabled_hypervisors,
3708
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
3709
                        for hypervisor_name in cluster.enabled_hypervisors]),
3710
      "os_hvp": os_hvp,
3711
      "beparams": cluster.beparams,
3712
      "nicparams": cluster.nicparams,
3713
      "candidate_pool_size": cluster.candidate_pool_size,
3714
      "master_netdev": cluster.master_netdev,
3715
      "volume_group_name": cluster.volume_group_name,
3716
      "file_storage_dir": cluster.file_storage_dir,
3717
      "maintain_node_health": cluster.maintain_node_health,
3718
      "ctime": cluster.ctime,
3719
      "mtime": cluster.mtime,
3720
      "uuid": cluster.uuid,
3721
      "tags": list(cluster.GetTags()),
3722
      }
3723

    
3724
    return result
3725

    
3726

    
3727
class LUQueryConfigValues(NoHooksLU):
3728
  """Return configuration values.
3729

3730
  """
3731
  _OP_REQP = []
3732
  REQ_BGL = False
3733
  _FIELDS_DYNAMIC = utils.FieldSet()
3734
  _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag",
3735
                                  "watcher_pause")
3736

    
3737
  def ExpandNames(self):
3738
    self.needed_locks = {}
3739

    
3740
    _CheckOutputFields(static=self._FIELDS_STATIC,
3741
                       dynamic=self._FIELDS_DYNAMIC,
3742
                       selected=self.op.output_fields)
3743

    
3744
  def CheckPrereq(self):
3745
    """No prerequisites.
3746

3747
    """
3748
    pass
3749

    
3750
  def Exec(self, feedback_fn):
3751
    """Dump a representation of the cluster config to the standard output.
3752

3753
    """
3754
    values = []
3755
    for field in self.op.output_fields:
3756
      if field == "cluster_name":
3757
        entry = self.cfg.GetClusterName()
3758
      elif field == "master_node":
3759
        entry = self.cfg.GetMasterNode()
3760
      elif field == "drain_flag":
3761
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
3762
      elif field == "watcher_pause":
3763
        entry = utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE)
3764
      else:
3765
        raise errors.ParameterError(field)
3766
      values.append(entry)
3767
    return values
3768

    
3769

    
3770
class LUActivateInstanceDisks(NoHooksLU):
3771
  """Bring up an instance's disks.
3772

3773
  """
3774
  _OP_REQP = ["instance_name"]
3775
  REQ_BGL = False
3776

    
3777
  def ExpandNames(self):
3778
    self._ExpandAndLockInstance()
3779
    self.needed_locks[locking.LEVEL_NODE] = []
3780
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3781

    
3782
  def DeclareLocks(self, level):
3783
    if level == locking.LEVEL_NODE:
3784
      self._LockInstancesNodes()
3785

    
3786
  def CheckPrereq(self):
3787
    """Check prerequisites.
3788

3789
    This checks that the instance is in the cluster.
3790

3791
    """
3792
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3793
    assert self.instance is not None, \
3794
      "Cannot retrieve locked instance %s" % self.op.instance_name
3795
    _CheckNodeOnline(self, self.instance.primary_node)
3796
    if not hasattr(self.op, "ignore_size"):
3797
      self.op.ignore_size = False
3798

    
3799
  def Exec(self, feedback_fn):
3800
    """Activate the disks.
3801

3802
    """
3803
    disks_ok, disks_info = \
3804
              _AssembleInstanceDisks(self, self.instance,
3805
                                     ignore_size=self.op.ignore_size)
3806
    if not disks_ok:
3807
      raise errors.OpExecError("Cannot activate block devices")
3808

    
3809
    return disks_info
3810

    
3811

    
3812
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False,
3813
                           ignore_size=False):
3814
  """Prepare the block devices for an instance.
3815

3816
  This sets up the block devices on all nodes.
3817

3818
  @type lu: L{LogicalUnit}
3819
  @param lu: the logical unit on whose behalf we execute
3820
  @type instance: L{objects.Instance}
3821
  @param instance: the instance for whose disks we assemble
3822
  @type ignore_secondaries: boolean
3823
  @param ignore_secondaries: if true, errors on secondary nodes
3824
      won't result in an error return from the function
3825
  @type ignore_size: boolean
3826
  @param ignore_size: if true, the current known size of the disk
3827
      will not be used during the disk activation, useful for cases
3828
      when the size is wrong
3829
  @return: False if the operation failed, otherwise a list of
3830
      (host, instance_visible_name, node_visible_name)
3831
      with the mapping from node devices to instance devices
3832

3833
  """
3834
  device_info = []
3835
  disks_ok = True
3836
  iname = instance.name
3837
  # With the two passes mechanism we try to reduce the window of
3838
  # opportunity for the race condition of switching DRBD to primary
3839
  # before handshaking occured, but we do not eliminate it
3840

    
3841
  # The proper fix would be to wait (with some limits) until the
3842
  # connection has been made and drbd transitions from WFConnection
3843
  # into any other network-connected state (Connected, SyncTarget,
3844
  # SyncSource, etc.)
3845

    
3846
  # 1st pass, assemble on all nodes in secondary mode
3847
  for inst_disk in instance.disks:
3848
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3849
      if ignore_size:
3850
        node_disk = node_disk.Copy()
3851
        node_disk.UnsetSize()
3852
      lu.cfg.SetDiskID(node_disk, node)
3853
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
3854
      msg = result.fail_msg
3855
      if msg:
3856
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
3857
                           " (is_primary=False, pass=1): %s",
3858
                           inst_disk.iv_name, node, msg)
3859
        if not ignore_secondaries:
3860
          disks_ok = False
3861

    
3862
  # FIXME: race condition on drbd migration to primary
3863

    
3864
  # 2nd pass, do only the primary node
3865
  for inst_disk in instance.disks:
3866
    dev_path = None
3867

    
3868
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3869
      if node != instance.primary_node:
3870
        continue
3871
      if ignore_size:
3872
        node_disk = node_disk.Copy()
3873
        node_disk.UnsetSize()
3874
      lu.cfg.SetDiskID(node_disk, node)
3875
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
3876
      msg = result.fail_msg
3877
      if msg:
3878
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
3879
                           " (is_primary=True, pass=2): %s",
3880
                           inst_disk.iv_name, node, msg)
3881
        disks_ok = False
3882
      else:
3883
        dev_path = result.payload
3884

    
3885
    device_info.append((instance.primary_node, inst_disk.iv_name, dev_path))
3886

    
3887
  # leave the disks configured for the primary node
3888
  # this is a workaround that would be fixed better by
3889
  # improving the logical/physical id handling
3890
  for disk in instance.disks:
3891
    lu.cfg.SetDiskID(disk, instance.primary_node)
3892

    
3893
  return disks_ok, device_info
3894

    
3895

    
3896
def _StartInstanceDisks(lu, instance, force):
3897
  """Start the disks of an instance.
3898

3899
  """
3900
  disks_ok, _ = _AssembleInstanceDisks(lu, instance,
3901
                                           ignore_secondaries=force)
3902
  if not disks_ok:
3903
    _ShutdownInstanceDisks(lu, instance)
3904
    if force is not None and not force:
3905
      lu.proc.LogWarning("", hint="If the message above refers to a"
3906
                         " secondary node,"
3907
                         " you can retry the operation using '--force'.")
3908
    raise errors.OpExecError("Disk consistency error")
3909

    
3910

    
3911
class LUDeactivateInstanceDisks(NoHooksLU):
3912
  """Shutdown an instance's disks.
3913

3914
  """
3915
  _OP_REQP = ["instance_name"]
3916
  REQ_BGL = False
3917

    
3918
  def ExpandNames(self):
3919
    self._ExpandAndLockInstance()
3920
    self.needed_locks[locking.LEVEL_NODE] = []
3921
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3922

    
3923
  def DeclareLocks(self, level):
3924
    if level == locking.LEVEL_NODE:
3925
      self._LockInstancesNodes()
3926

    
3927
  def CheckPrereq(self):
3928
    """Check prerequisites.
3929

3930
    This checks that the instance is in the cluster.
3931

3932
    """
3933
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3934
    assert self.instance is not None, \
3935
      "Cannot retrieve locked instance %s" % self.op.instance_name
3936

    
3937
  def Exec(self, feedback_fn):
3938
    """Deactivate the disks
3939

3940
    """
3941
    instance = self.instance
3942
    _SafeShutdownInstanceDisks(self, instance)
3943

    
3944

    
3945
def _SafeShutdownInstanceDisks(lu, instance):
3946
  """Shutdown block devices of an instance.
3947

3948
  This function checks if an instance is running, before calling
3949
  _ShutdownInstanceDisks.
3950

3951
  """
3952
  _CheckInstanceDown(lu, instance, "cannot shutdown disks")
3953
  _ShutdownInstanceDisks(lu, instance)
3954

    
3955

    
3956
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
3957
  """Shutdown block devices of an instance.
3958

3959
  This does the shutdown on all nodes of the instance.
3960

3961
  If the ignore_primary is false, errors on the primary node are
3962
  ignored.
3963

3964
  """
3965
  all_result = True
3966
  for disk in instance.disks:
3967
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
3968
      lu.cfg.SetDiskID(top_disk, node)
3969
      result = lu.rpc.call_blockdev_shutdown(node, top_disk)
3970
      msg = result.fail_msg
3971
      if msg:
3972
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
3973
                      disk.iv_name, node, msg)
3974
        if not ignore_primary or node != instance.primary_node:
3975
          all_result = False
3976
  return all_result
3977

    
3978

    
3979
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
3980
  """Checks if a node has enough free memory.
3981

3982
  This function check if a given node has the needed amount of free
3983
  memory. In case the node has less memory or we cannot get the
3984
  information from the node, this function raise an OpPrereqError
3985
  exception.
3986

3987
  @type lu: C{LogicalUnit}
3988
  @param lu: a logical unit from which we get configuration data
3989
  @type node: C{str}
3990
  @param node: the node to check
3991
  @type reason: C{str}
3992
  @param reason: string to use in the error message
3993
  @type requested: C{int}
3994
  @param requested: the amount of memory in MiB to check for
3995
  @type hypervisor_name: C{str}
3996
  @param hypervisor_name: the hypervisor to ask for memory stats
3997
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
3998
      we cannot check the node
3999

4000
  """
4001
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
4002
  nodeinfo[node].Raise("Can't get data from node %s" % node,
4003
                       prereq=True, ecode=errors.ECODE_ENVIRON)
4004
  free_mem = nodeinfo[node].payload.get('memory_free', None)
4005
  if not isinstance(free_mem, int):
4006
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
4007
                               " was '%s'" % (node, free_mem),
4008
                               errors.ECODE_ENVIRON)
4009
  if requested > free_mem:
4010
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
4011
                               " needed %s MiB, available %s MiB" %
4012
                               (node, reason, requested, free_mem),