Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 9f3ac970

History | View | Annotate | Download (338.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0201
25

    
26
# W0201 since most LU attributes are defined in CheckPrereq or similar
27
# functions
28

    
29
import os
30
import os.path
31
import time
32
import re
33
import platform
34
import logging
35
import copy
36
import OpenSSL
37

    
38
from ganeti import ssh
39
from ganeti import utils
40
from ganeti import errors
41
from ganeti import hypervisor
42
from ganeti import locking
43
from ganeti import constants
44
from ganeti import objects
45
from ganeti import serializer
46
from ganeti import ssconf
47
from ganeti import uidpool
48

    
49

    
50
class LogicalUnit(object):
51
  """Logical Unit base class.
52

53
  Subclasses must follow these rules:
54
    - implement ExpandNames
55
    - implement CheckPrereq (except when tasklets are used)
56
    - implement Exec (except when tasklets are used)
57
    - implement BuildHooksEnv
58
    - redefine HPATH and HTYPE
59
    - optionally redefine their run requirements:
60
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
61

62
  Note that all commands require root permissions.
63

64
  @ivar dry_run_result: the value (if any) that will be returned to the caller
65
      in dry-run mode (signalled by opcode dry_run parameter)
66

67
  """
68
  HPATH = None
69
  HTYPE = None
70
  _OP_REQP = []
71
  REQ_BGL = True
72

    
73
  def __init__(self, processor, op, context, rpc):
74
    """Constructor for LogicalUnit.
75

76
    This needs to be overridden in derived classes in order to check op
77
    validity.
78

79
    """
80
    self.proc = processor
81
    self.op = op
82
    self.cfg = context.cfg
83
    self.context = context
84
    self.rpc = rpc
85
    # Dicts used to declare locking needs to mcpu
86
    self.needed_locks = None
87
    self.acquired_locks = {}
88
    self.share_locks = dict.fromkeys(locking.LEVELS, 0)
89
    self.add_locks = {}
90
    self.remove_locks = {}
91
    # Used to force good behavior when calling helper functions
92
    self.recalculate_locks = {}
93
    self.__ssh = None
94
    # logging
95
    self.LogWarning = processor.LogWarning # pylint: disable-msg=C0103
96
    self.LogInfo = processor.LogInfo # pylint: disable-msg=C0103
97
    self.LogStep = processor.LogStep # pylint: disable-msg=C0103
98
    # support for dry-run
99
    self.dry_run_result = None
100
    # support for generic debug attribute
101
    if (not hasattr(self.op, "debug_level") or
102
        not isinstance(self.op.debug_level, int)):
103
      self.op.debug_level = 0
104

    
105
    # Tasklets
106
    self.tasklets = None
107

    
108
    for attr_name in self._OP_REQP:
109
      attr_val = getattr(op, attr_name, None)
110
      if attr_val is None:
111
        raise errors.OpPrereqError("Required parameter '%s' missing" %
112
                                   attr_name, errors.ECODE_INVAL)
113

    
114
    self.CheckArguments()
115

    
116
  def __GetSSH(self):
117
    """Returns the SshRunner object
118

119
    """
120
    if not self.__ssh:
121
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
122
    return self.__ssh
123

    
124
  ssh = property(fget=__GetSSH)
125

    
126
  def CheckArguments(self):
127
    """Check syntactic validity for the opcode arguments.
128

129
    This method is for doing a simple syntactic check and ensure
130
    validity of opcode parameters, without any cluster-related
131
    checks. While the same can be accomplished in ExpandNames and/or
132
    CheckPrereq, doing these separate is better because:
133

134
      - ExpandNames is left as as purely a lock-related function
135
      - CheckPrereq is run after we have acquired locks (and possible
136
        waited for them)
137

138
    The function is allowed to change the self.op attribute so that
139
    later methods can no longer worry about missing parameters.
140

141
    """
142
    pass
143

    
144
  def ExpandNames(self):
145
    """Expand names for this LU.
146

147
    This method is called before starting to execute the opcode, and it should
148
    update all the parameters of the opcode to their canonical form (e.g. a
149
    short node name must be fully expanded after this method has successfully
150
    completed). This way locking, hooks, logging, ecc. can work correctly.
151

152
    LUs which implement this method must also populate the self.needed_locks
153
    member, as a dict with lock levels as keys, and a list of needed lock names
154
    as values. Rules:
155

156
      - use an empty dict if you don't need any lock
157
      - if you don't need any lock at a particular level omit that level
158
      - don't put anything for the BGL level
159
      - if you want all locks at a level use locking.ALL_SET as a value
160

161
    If you need to share locks (rather than acquire them exclusively) at one
162
    level you can modify self.share_locks, setting a true value (usually 1) for
163
    that level. By default locks are not shared.
164

165
    This function can also define a list of tasklets, which then will be
166
    executed in order instead of the usual LU-level CheckPrereq and Exec
167
    functions, if those are not defined by the LU.
168

169
    Examples::
170

171
      # Acquire all nodes and one instance
172
      self.needed_locks = {
173
        locking.LEVEL_NODE: locking.ALL_SET,
174
        locking.LEVEL_INSTANCE: ['instance1.example.tld'],
175
      }
176
      # Acquire just two nodes
177
      self.needed_locks = {
178
        locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
179
      }
180
      # Acquire no locks
181
      self.needed_locks = {} # No, you can't leave it to the default value None
182

183
    """
184
    # The implementation of this method is mandatory only if the new LU is
185
    # concurrent, so that old LUs don't need to be changed all at the same
186
    # time.
187
    if self.REQ_BGL:
188
      self.needed_locks = {} # Exclusive LUs don't need locks.
189
    else:
190
      raise NotImplementedError
191

    
192
  def DeclareLocks(self, level):
193
    """Declare LU locking needs for a level
194

195
    While most LUs can just declare their locking needs at ExpandNames time,
196
    sometimes there's the need to calculate some locks after having acquired
197
    the ones before. This function is called just before acquiring locks at a
198
    particular level, but after acquiring the ones at lower levels, and permits
199
    such calculations. It can be used to modify self.needed_locks, and by
200
    default it does nothing.
201

202
    This function is only called if you have something already set in
203
    self.needed_locks for the level.
204

205
    @param level: Locking level which is going to be locked
206
    @type level: member of ganeti.locking.LEVELS
207

208
    """
209

    
210
  def CheckPrereq(self):
211
    """Check prerequisites for this LU.
212

213
    This method should check that the prerequisites for the execution
214
    of this LU are fulfilled. It can do internode communication, but
215
    it should be idempotent - no cluster or system changes are
216
    allowed.
217

218
    The method should raise errors.OpPrereqError in case something is
219
    not fulfilled. Its return value is ignored.
220

221
    This method should also update all the parameters of the opcode to
222
    their canonical form if it hasn't been done by ExpandNames before.
223

224
    """
225
    if self.tasklets is not None:
226
      for (idx, tl) in enumerate(self.tasklets):
227
        logging.debug("Checking prerequisites for tasklet %s/%s",
228
                      idx + 1, len(self.tasklets))
229
        tl.CheckPrereq()
230
    else:
231
      raise NotImplementedError
232

    
233
  def Exec(self, feedback_fn):
234
    """Execute the LU.
235

236
    This method should implement the actual work. It should raise
237
    errors.OpExecError for failures that are somewhat dealt with in
238
    code, or expected.
239

240
    """
241
    if self.tasklets is not None:
242
      for (idx, tl) in enumerate(self.tasklets):
243
        logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
244
        tl.Exec(feedback_fn)
245
    else:
246
      raise NotImplementedError
247

    
248
  def BuildHooksEnv(self):
249
    """Build hooks environment for this LU.
250

251
    This method should return a three-node tuple consisting of: a dict
252
    containing the environment that will be used for running the
253
    specific hook for this LU, a list of node names on which the hook
254
    should run before the execution, and a list of node names on which
255
    the hook should run after the execution.
256

257
    The keys of the dict must not have 'GANETI_' prefixed as this will
258
    be handled in the hooks runner. Also note additional keys will be
259
    added by the hooks runner. If the LU doesn't define any
260
    environment, an empty dict (and not None) should be returned.
261

262
    No nodes should be returned as an empty list (and not None).
263

264
    Note that if the HPATH for a LU class is None, this function will
265
    not be called.
266

267
    """
268
    raise NotImplementedError
269

    
270
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
271
    """Notify the LU about the results of its hooks.
272

273
    This method is called every time a hooks phase is executed, and notifies
274
    the Logical Unit about the hooks' result. The LU can then use it to alter
275
    its result based on the hooks.  By default the method does nothing and the
276
    previous result is passed back unchanged but any LU can define it if it
277
    wants to use the local cluster hook-scripts somehow.
278

279
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
280
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
281
    @param hook_results: the results of the multi-node hooks rpc call
282
    @param feedback_fn: function used send feedback back to the caller
283
    @param lu_result: the previous Exec result this LU had, or None
284
        in the PRE phase
285
    @return: the new Exec result, based on the previous result
286
        and hook results
287

288
    """
289
    # API must be kept, thus we ignore the unused argument and could
290
    # be a function warnings
291
    # pylint: disable-msg=W0613,R0201
292
    return lu_result
293

    
294
  def _ExpandAndLockInstance(self):
295
    """Helper function to expand and lock an instance.
296

297
    Many LUs that work on an instance take its name in self.op.instance_name
298
    and need to expand it and then declare the expanded name for locking. This
299
    function does it, and then updates self.op.instance_name to the expanded
300
    name. It also initializes needed_locks as a dict, if this hasn't been done
301
    before.
302

303
    """
304
    if self.needed_locks is None:
305
      self.needed_locks = {}
306
    else:
307
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
308
        "_ExpandAndLockInstance called with instance-level locks set"
309
    self.op.instance_name = _ExpandInstanceName(self.cfg,
310
                                                self.op.instance_name)
311
    self.needed_locks[locking.LEVEL_INSTANCE] = self.op.instance_name
312

    
313
  def _LockInstancesNodes(self, primary_only=False):
314
    """Helper function to declare instances' nodes for locking.
315

316
    This function should be called after locking one or more instances to lock
317
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
318
    with all primary or secondary nodes for instances already locked and
319
    present in self.needed_locks[locking.LEVEL_INSTANCE].
320

321
    It should be called from DeclareLocks, and for safety only works if
322
    self.recalculate_locks[locking.LEVEL_NODE] is set.
323

324
    In the future it may grow parameters to just lock some instance's nodes, or
325
    to just lock primaries or secondary nodes, if needed.
326

327
    If should be called in DeclareLocks in a way similar to::
328

329
      if level == locking.LEVEL_NODE:
330
        self._LockInstancesNodes()
331

332
    @type primary_only: boolean
333
    @param primary_only: only lock primary nodes of locked instances
334

335
    """
336
    assert locking.LEVEL_NODE in self.recalculate_locks, \
337
      "_LockInstancesNodes helper function called with no nodes to recalculate"
338

    
339
    # TODO: check if we're really been called with the instance locks held
340

    
341
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
342
    # future we might want to have different behaviors depending on the value
343
    # of self.recalculate_locks[locking.LEVEL_NODE]
344
    wanted_nodes = []
345
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
346
      instance = self.context.cfg.GetInstanceInfo(instance_name)
347
      wanted_nodes.append(instance.primary_node)
348
      if not primary_only:
349
        wanted_nodes.extend(instance.secondary_nodes)
350

    
351
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
352
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
353
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
354
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
355

    
356
    del self.recalculate_locks[locking.LEVEL_NODE]
357

    
358

    
359
class NoHooksLU(LogicalUnit): # pylint: disable-msg=W0223
360
  """Simple LU which runs no hooks.
361

362
  This LU is intended as a parent for other LogicalUnits which will
363
  run no hooks, in order to reduce duplicate code.
364

365
  """
366
  HPATH = None
367
  HTYPE = None
368

    
369
  def BuildHooksEnv(self):
370
    """Empty BuildHooksEnv for NoHooksLu.
371

372
    This just raises an error.
373

374
    """
375
    assert False, "BuildHooksEnv called for NoHooksLUs"
376

    
377

    
378
class Tasklet:
379
  """Tasklet base class.
380

381
  Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
382
  they can mix legacy code with tasklets. Locking needs to be done in the LU,
383
  tasklets know nothing about locks.
384

385
  Subclasses must follow these rules:
386
    - Implement CheckPrereq
387
    - Implement Exec
388

389
  """
390
  def __init__(self, lu):
391
    self.lu = lu
392

    
393
    # Shortcuts
394
    self.cfg = lu.cfg
395
    self.rpc = lu.rpc
396

    
397
  def CheckPrereq(self):
398
    """Check prerequisites for this tasklets.
399

400
    This method should check whether the prerequisites for the execution of
401
    this tasklet are fulfilled. It can do internode communication, but it
402
    should be idempotent - no cluster or system changes are allowed.
403

404
    The method should raise errors.OpPrereqError in case something is not
405
    fulfilled. Its return value is ignored.
406

407
    This method should also update all parameters to their canonical form if it
408
    hasn't been done before.
409

410
    """
411
    raise NotImplementedError
412

    
413
  def Exec(self, feedback_fn):
414
    """Execute the tasklet.
415

416
    This method should implement the actual work. It should raise
417
    errors.OpExecError for failures that are somewhat dealt with in code, or
418
    expected.
419

420
    """
421
    raise NotImplementedError
422

    
423

    
424
def _GetWantedNodes(lu, nodes):
425
  """Returns list of checked and expanded node names.
426

427
  @type lu: L{LogicalUnit}
428
  @param lu: the logical unit on whose behalf we execute
429
  @type nodes: list
430
  @param nodes: list of node names or None for all nodes
431
  @rtype: list
432
  @return: the list of nodes, sorted
433
  @raise errors.ProgrammerError: if the nodes parameter is wrong type
434

435
  """
436
  if not isinstance(nodes, list):
437
    raise errors.OpPrereqError("Invalid argument type 'nodes'",
438
                               errors.ECODE_INVAL)
439

    
440
  if not nodes:
441
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
442
      " non-empty list of nodes whose name is to be expanded.")
443

    
444
  wanted = [_ExpandNodeName(lu.cfg, name) for name in nodes]
445
  return utils.NiceSort(wanted)
446

    
447

    
448
def _GetWantedInstances(lu, instances):
449
  """Returns list of checked and expanded instance names.
450

451
  @type lu: L{LogicalUnit}
452
  @param lu: the logical unit on whose behalf we execute
453
  @type instances: list
454
  @param instances: list of instance names or None for all instances
455
  @rtype: list
456
  @return: the list of instances, sorted
457
  @raise errors.OpPrereqError: if the instances parameter is wrong type
458
  @raise errors.OpPrereqError: if any of the passed instances is not found
459

460
  """
461
  if not isinstance(instances, list):
462
    raise errors.OpPrereqError("Invalid argument type 'instances'",
463
                               errors.ECODE_INVAL)
464

    
465
  if instances:
466
    wanted = [_ExpandInstanceName(lu.cfg, name) for name in instances]
467
  else:
468
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
469
  return wanted
470

    
471

    
472
def _CheckOutputFields(static, dynamic, selected):
473
  """Checks whether all selected fields are valid.
474

475
  @type static: L{utils.FieldSet}
476
  @param static: static fields set
477
  @type dynamic: L{utils.FieldSet}
478
  @param dynamic: dynamic fields set
479

480
  """
481
  f = utils.FieldSet()
482
  f.Extend(static)
483
  f.Extend(dynamic)
484

    
485
  delta = f.NonMatching(selected)
486
  if delta:
487
    raise errors.OpPrereqError("Unknown output fields selected: %s"
488
                               % ",".join(delta), errors.ECODE_INVAL)
489

    
490

    
491
def _CheckBooleanOpField(op, name):
492
  """Validates boolean opcode parameters.
493

494
  This will ensure that an opcode parameter is either a boolean value,
495
  or None (but that it always exists).
496

497
  """
498
  val = getattr(op, name, None)
499
  if not (val is None or isinstance(val, bool)):
500
    raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
501
                               (name, str(val)), errors.ECODE_INVAL)
502
  setattr(op, name, val)
503

    
504

    
505
def _CheckGlobalHvParams(params):
506
  """Validates that given hypervisor params are not global ones.
507

508
  This will ensure that instances don't get customised versions of
509
  global params.
510

511
  """
512
  used_globals = constants.HVC_GLOBALS.intersection(params)
513
  if used_globals:
514
    msg = ("The following hypervisor parameters are global and cannot"
515
           " be customized at instance level, please modify them at"
516
           " cluster level: %s" % utils.CommaJoin(used_globals))
517
    raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
518

    
519

    
520
def _CheckNodeOnline(lu, node):
521
  """Ensure that a given node is online.
522

523
  @param lu: the LU on behalf of which we make the check
524
  @param node: the node to check
525
  @raise errors.OpPrereqError: if the node is offline
526

527
  """
528
  if lu.cfg.GetNodeInfo(node).offline:
529
    raise errors.OpPrereqError("Can't use offline node %s" % node,
530
                               errors.ECODE_INVAL)
531

    
532

    
533
def _CheckNodeNotDrained(lu, node):
534
  """Ensure that a given node is not drained.
535

536
  @param lu: the LU on behalf of which we make the check
537
  @param node: the node to check
538
  @raise errors.OpPrereqError: if the node is drained
539

540
  """
541
  if lu.cfg.GetNodeInfo(node).drained:
542
    raise errors.OpPrereqError("Can't use drained node %s" % node,
543
                               errors.ECODE_INVAL)
544

    
545

    
546
def _CheckNodeHasOS(lu, node, os_name, force_variant):
547
  """Ensure that a node supports a given OS.
548

549
  @param lu: the LU on behalf of which we make the check
550
  @param node: the node to check
551
  @param os_name: the OS to query about
552
  @param force_variant: whether to ignore variant errors
553
  @raise errors.OpPrereqError: if the node is not supporting the OS
554

555
  """
556
  result = lu.rpc.call_os_get(node, os_name)
557
  result.Raise("OS '%s' not in supported OS list for node %s" %
558
               (os_name, node),
559
               prereq=True, ecode=errors.ECODE_INVAL)
560
  if not force_variant:
561
    _CheckOSVariant(result.payload, os_name)
562

    
563

    
564
def _RequireFileStorage():
565
  """Checks that file storage is enabled.
566

567
  @raise errors.OpPrereqError: when file storage is disabled
568

569
  """
570
  if not constants.ENABLE_FILE_STORAGE:
571
    raise errors.OpPrereqError("File storage disabled at configure time",
572
                               errors.ECODE_INVAL)
573

    
574

    
575
def _CheckDiskTemplate(template):
576
  """Ensure a given disk template is valid.
577

578
  """
579
  if template not in constants.DISK_TEMPLATES:
580
    msg = ("Invalid disk template name '%s', valid templates are: %s" %
581
           (template, utils.CommaJoin(constants.DISK_TEMPLATES)))
582
    raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
583
  if template == constants.DT_FILE:
584
    _RequireFileStorage()
585

    
586

    
587
def _CheckStorageType(storage_type):
588
  """Ensure a given storage type is valid.
589

590
  """
591
  if storage_type not in constants.VALID_STORAGE_TYPES:
592
    raise errors.OpPrereqError("Unknown storage type: %s" % storage_type,
593
                               errors.ECODE_INVAL)
594
  if storage_type == constants.ST_FILE:
595
    _RequireFileStorage()
596

    
597

    
598

    
599
def _CheckInstanceDown(lu, instance, reason):
600
  """Ensure that an instance is not running."""
601
  if instance.admin_up:
602
    raise errors.OpPrereqError("Instance %s is marked to be up, %s" %
603
                               (instance.name, reason), errors.ECODE_STATE)
604

    
605
  pnode = instance.primary_node
606
  ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
607
  ins_l.Raise("Can't contact node %s for instance information" % pnode,
608
              prereq=True, ecode=errors.ECODE_ENVIRON)
609

    
610
  if instance.name in ins_l.payload:
611
    raise errors.OpPrereqError("Instance %s is running, %s" %
612
                               (instance.name, reason), errors.ECODE_STATE)
613

    
614

    
615
def _ExpandItemName(fn, name, kind):
616
  """Expand an item name.
617

618
  @param fn: the function to use for expansion
619
  @param name: requested item name
620
  @param kind: text description ('Node' or 'Instance')
621
  @return: the resolved (full) name
622
  @raise errors.OpPrereqError: if the item is not found
623

624
  """
625
  full_name = fn(name)
626
  if full_name is None:
627
    raise errors.OpPrereqError("%s '%s' not known" % (kind, name),
628
                               errors.ECODE_NOENT)
629
  return full_name
630

    
631

    
632
def _ExpandNodeName(cfg, name):
633
  """Wrapper over L{_ExpandItemName} for nodes."""
634
  return _ExpandItemName(cfg.ExpandNodeName, name, "Node")
635

    
636

    
637
def _ExpandInstanceName(cfg, name):
638
  """Wrapper over L{_ExpandItemName} for instance."""
639
  return _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
640

    
641

    
642
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
643
                          memory, vcpus, nics, disk_template, disks,
644
                          bep, hvp, hypervisor_name):
645
  """Builds instance related env variables for hooks
646

647
  This builds the hook environment from individual variables.
648

649
  @type name: string
650
  @param name: the name of the instance
651
  @type primary_node: string
652
  @param primary_node: the name of the instance's primary node
653
  @type secondary_nodes: list
654
  @param secondary_nodes: list of secondary nodes as strings
655
  @type os_type: string
656
  @param os_type: the name of the instance's OS
657
  @type status: boolean
658
  @param status: the should_run status of the instance
659
  @type memory: string
660
  @param memory: the memory size of the instance
661
  @type vcpus: string
662
  @param vcpus: the count of VCPUs the instance has
663
  @type nics: list
664
  @param nics: list of tuples (ip, mac, mode, link) representing
665
      the NICs the instance has
666
  @type disk_template: string
667
  @param disk_template: the disk template of the instance
668
  @type disks: list
669
  @param disks: the list of (size, mode) pairs
670
  @type bep: dict
671
  @param bep: the backend parameters for the instance
672
  @type hvp: dict
673
  @param hvp: the hypervisor parameters for the instance
674
  @type hypervisor_name: string
675
  @param hypervisor_name: the hypervisor for the instance
676
  @rtype: dict
677
  @return: the hook environment for this instance
678

679
  """
680
  if status:
681
    str_status = "up"
682
  else:
683
    str_status = "down"
684
  env = {
685
    "OP_TARGET": name,
686
    "INSTANCE_NAME": name,
687
    "INSTANCE_PRIMARY": primary_node,
688
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
689
    "INSTANCE_OS_TYPE": os_type,
690
    "INSTANCE_STATUS": str_status,
691
    "INSTANCE_MEMORY": memory,
692
    "INSTANCE_VCPUS": vcpus,
693
    "INSTANCE_DISK_TEMPLATE": disk_template,
694
    "INSTANCE_HYPERVISOR": hypervisor_name,
695
  }
696

    
697
  if nics:
698
    nic_count = len(nics)
699
    for idx, (ip, mac, mode, link) in enumerate(nics):
700
      if ip is None:
701
        ip = ""
702
      env["INSTANCE_NIC%d_IP" % idx] = ip
703
      env["INSTANCE_NIC%d_MAC" % idx] = mac
704
      env["INSTANCE_NIC%d_MODE" % idx] = mode
705
      env["INSTANCE_NIC%d_LINK" % idx] = link
706
      if mode == constants.NIC_MODE_BRIDGED:
707
        env["INSTANCE_NIC%d_BRIDGE" % idx] = link
708
  else:
709
    nic_count = 0
710

    
711
  env["INSTANCE_NIC_COUNT"] = nic_count
712

    
713
  if disks:
714
    disk_count = len(disks)
715
    for idx, (size, mode) in enumerate(disks):
716
      env["INSTANCE_DISK%d_SIZE" % idx] = size
717
      env["INSTANCE_DISK%d_MODE" % idx] = mode
718
  else:
719
    disk_count = 0
720

    
721
  env["INSTANCE_DISK_COUNT"] = disk_count
722

    
723
  for source, kind in [(bep, "BE"), (hvp, "HV")]:
724
    for key, value in source.items():
725
      env["INSTANCE_%s_%s" % (kind, key)] = value
726

    
727
  return env
728

    
729

    
730
def _NICListToTuple(lu, nics):
731
  """Build a list of nic information tuples.
732

733
  This list is suitable to be passed to _BuildInstanceHookEnv or as a return
734
  value in LUQueryInstanceData.
735

736
  @type lu:  L{LogicalUnit}
737
  @param lu: the logical unit on whose behalf we execute
738
  @type nics: list of L{objects.NIC}
739
  @param nics: list of nics to convert to hooks tuples
740

741
  """
742
  hooks_nics = []
743
  c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
744
  for nic in nics:
745
    ip = nic.ip
746
    mac = nic.mac
747
    filled_params = objects.FillDict(c_nicparams, nic.nicparams)
748
    mode = filled_params[constants.NIC_MODE]
749
    link = filled_params[constants.NIC_LINK]
750
    hooks_nics.append((ip, mac, mode, link))
751
  return hooks_nics
752

    
753

    
754
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
755
  """Builds instance related env variables for hooks from an object.
756

757
  @type lu: L{LogicalUnit}
758
  @param lu: the logical unit on whose behalf we execute
759
  @type instance: L{objects.Instance}
760
  @param instance: the instance for which we should build the
761
      environment
762
  @type override: dict
763
  @param override: dictionary with key/values that will override
764
      our values
765
  @rtype: dict
766
  @return: the hook environment dictionary
767

768
  """
769
  cluster = lu.cfg.GetClusterInfo()
770
  bep = cluster.FillBE(instance)
771
  hvp = cluster.FillHV(instance)
772
  args = {
773
    'name': instance.name,
774
    'primary_node': instance.primary_node,
775
    'secondary_nodes': instance.secondary_nodes,
776
    'os_type': instance.os,
777
    'status': instance.admin_up,
778
    'memory': bep[constants.BE_MEMORY],
779
    'vcpus': bep[constants.BE_VCPUS],
780
    'nics': _NICListToTuple(lu, instance.nics),
781
    'disk_template': instance.disk_template,
782
    'disks': [(disk.size, disk.mode) for disk in instance.disks],
783
    'bep': bep,
784
    'hvp': hvp,
785
    'hypervisor_name': instance.hypervisor,
786
  }
787
  if override:
788
    args.update(override)
789
  return _BuildInstanceHookEnv(**args) # pylint: disable-msg=W0142
790

    
791

    
792
def _AdjustCandidatePool(lu, exceptions):
793
  """Adjust the candidate pool after node operations.
794

795
  """
796
  mod_list = lu.cfg.MaintainCandidatePool(exceptions)
797
  if mod_list:
798
    lu.LogInfo("Promoted nodes to master candidate role: %s",
799
               utils.CommaJoin(node.name for node in mod_list))
800
    for name in mod_list:
801
      lu.context.ReaddNode(name)
802
  mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
803
  if mc_now > mc_max:
804
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
805
               (mc_now, mc_max))
806

    
807

    
808
def _DecideSelfPromotion(lu, exceptions=None):
809
  """Decide whether I should promote myself as a master candidate.
810

811
  """
812
  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
813
  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
814
  # the new node will increase mc_max with one, so:
815
  mc_should = min(mc_should + 1, cp_size)
816
  return mc_now < mc_should
817

    
818

    
819
def _CheckNicsBridgesExist(lu, target_nics, target_node,
820
                               profile=constants.PP_DEFAULT):
821
  """Check that the brigdes needed by a list of nics exist.
822

823
  """
824
  c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
825
  paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
826
                for nic in target_nics]
827
  brlist = [params[constants.NIC_LINK] for params in paramslist
828
            if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
829
  if brlist:
830
    result = lu.rpc.call_bridges_exist(target_node, brlist)
831
    result.Raise("Error checking bridges on destination node '%s'" %
832
                 target_node, prereq=True, ecode=errors.ECODE_ENVIRON)
833

    
834

    
835
def _CheckInstanceBridgesExist(lu, instance, node=None):
836
  """Check that the brigdes needed by an instance exist.
837

838
  """
839
  if node is None:
840
    node = instance.primary_node
841
  _CheckNicsBridgesExist(lu, instance.nics, node)
842

    
843

    
844
def _CheckOSVariant(os_obj, name):
845
  """Check whether an OS name conforms to the os variants specification.
846

847
  @type os_obj: L{objects.OS}
848
  @param os_obj: OS object to check
849
  @type name: string
850
  @param name: OS name passed by the user, to check for validity
851

852
  """
853
  if not os_obj.supported_variants:
854
    return
855
  try:
856
    variant = name.split("+", 1)[1]
857
  except IndexError:
858
    raise errors.OpPrereqError("OS name must include a variant",
859
                               errors.ECODE_INVAL)
860

    
861
  if variant not in os_obj.supported_variants:
862
    raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
863

    
864

    
865
def _GetNodeInstancesInner(cfg, fn):
866
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
867

    
868

    
869
def _GetNodeInstances(cfg, node_name):
870
  """Returns a list of all primary and secondary instances on a node.
871

872
  """
873

    
874
  return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
875

    
876

    
877
def _GetNodePrimaryInstances(cfg, node_name):
878
  """Returns primary instances on a node.
879

880
  """
881
  return _GetNodeInstancesInner(cfg,
882
                                lambda inst: node_name == inst.primary_node)
883

    
884

    
885
def _GetNodeSecondaryInstances(cfg, node_name):
886
  """Returns secondary instances on a node.
887

888
  """
889
  return _GetNodeInstancesInner(cfg,
890
                                lambda inst: node_name in inst.secondary_nodes)
891

    
892

    
893
def _GetStorageTypeArgs(cfg, storage_type):
894
  """Returns the arguments for a storage type.
895

896
  """
897
  # Special case for file storage
898
  if storage_type == constants.ST_FILE:
899
    # storage.FileStorage wants a list of storage directories
900
    return [[cfg.GetFileStorageDir()]]
901

    
902
  return []
903

    
904

    
905
def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
906
  faulty = []
907

    
908
  for dev in instance.disks:
909
    cfg.SetDiskID(dev, node_name)
910

    
911
  result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks)
912
  result.Raise("Failed to get disk status from node %s" % node_name,
913
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
914

    
915
  for idx, bdev_status in enumerate(result.payload):
916
    if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
917
      faulty.append(idx)
918

    
919
  return faulty
920

    
921

    
922
def _FormatTimestamp(secs):
923
  """Formats a Unix timestamp with the local timezone.
924

925
  """
926
  return time.strftime("%F %T %Z", time.gmtime(secs))
927

    
928

    
929
class LUPostInitCluster(LogicalUnit):
930
  """Logical unit for running hooks after cluster initialization.
931

932
  """
933
  HPATH = "cluster-init"
934
  HTYPE = constants.HTYPE_CLUSTER
935
  _OP_REQP = []
936

    
937
  def BuildHooksEnv(self):
938
    """Build hooks env.
939

940
    """
941
    env = {"OP_TARGET": self.cfg.GetClusterName()}
942
    mn = self.cfg.GetMasterNode()
943
    return env, [], [mn]
944

    
945
  def CheckPrereq(self):
946
    """No prerequisites to check.
947

948
    """
949
    return True
950

    
951
  def Exec(self, feedback_fn):
952
    """Nothing to do.
953

954
    """
955
    return True
956

    
957

    
958
class LUDestroyCluster(LogicalUnit):
959
  """Logical unit for destroying the cluster.
960

961
  """
962
  HPATH = "cluster-destroy"
963
  HTYPE = constants.HTYPE_CLUSTER
964
  _OP_REQP = []
965

    
966
  def BuildHooksEnv(self):
967
    """Build hooks env.
968

969
    """
970
    env = {"OP_TARGET": self.cfg.GetClusterName()}
971
    return env, [], []
972

    
973
  def CheckPrereq(self):
974
    """Check prerequisites.
975

976
    This checks whether the cluster is empty.
977

978
    Any errors are signaled by raising errors.OpPrereqError.
979

980
    """
981
    master = self.cfg.GetMasterNode()
982

    
983
    nodelist = self.cfg.GetNodeList()
984
    if len(nodelist) != 1 or nodelist[0] != master:
985
      raise errors.OpPrereqError("There are still %d node(s) in"
986
                                 " this cluster." % (len(nodelist) - 1),
987
                                 errors.ECODE_INVAL)
988
    instancelist = self.cfg.GetInstanceList()
989
    if instancelist:
990
      raise errors.OpPrereqError("There are still %d instance(s) in"
991
                                 " this cluster." % len(instancelist),
992
                                 errors.ECODE_INVAL)
993

    
994
  def Exec(self, feedback_fn):
995
    """Destroys the cluster.
996

997
    """
998
    master = self.cfg.GetMasterNode()
999
    modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
1000

    
1001
    # Run post hooks on master node before it's removed
1002
    hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
1003
    try:
1004
      hm.RunPhase(constants.HOOKS_PHASE_POST, [master])
1005
    except:
1006
      # pylint: disable-msg=W0702
1007
      self.LogWarning("Errors occurred running hooks on %s" % master)
1008

    
1009
    result = self.rpc.call_node_stop_master(master, False)
1010
    result.Raise("Could not disable the master role")
1011

    
1012
    if modify_ssh_setup:
1013
      priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1014
      utils.CreateBackup(priv_key)
1015
      utils.CreateBackup(pub_key)
1016

    
1017
    return master
1018

    
1019

    
1020
def _VerifyCertificateInner(filename, expired, not_before, not_after, now,
1021
                            warn_days=constants.SSL_CERT_EXPIRATION_WARN,
1022
                            error_days=constants.SSL_CERT_EXPIRATION_ERROR):
1023
  """Verifies certificate details for LUVerifyCluster.
1024

1025
  """
1026
  if expired:
1027
    msg = "Certificate %s is expired" % filename
1028

    
1029
    if not_before is not None and not_after is not None:
1030
      msg += (" (valid from %s to %s)" %
1031
              (_FormatTimestamp(not_before),
1032
               _FormatTimestamp(not_after)))
1033
    elif not_before is not None:
1034
      msg += " (valid from %s)" % _FormatTimestamp(not_before)
1035
    elif not_after is not None:
1036
      msg += " (valid until %s)" % _FormatTimestamp(not_after)
1037

    
1038
    return (LUVerifyCluster.ETYPE_ERROR, msg)
1039

    
1040
  elif not_before is not None and not_before > now:
1041
    return (LUVerifyCluster.ETYPE_WARNING,
1042
            "Certificate %s not yet valid (valid from %s)" %
1043
            (filename, _FormatTimestamp(not_before)))
1044

    
1045
  elif not_after is not None:
1046
    remaining_days = int((not_after - now) / (24 * 3600))
1047

    
1048
    msg = ("Certificate %s expires in %d days" % (filename, remaining_days))
1049

    
1050
    if remaining_days <= error_days:
1051
      return (LUVerifyCluster.ETYPE_ERROR, msg)
1052

    
1053
    if remaining_days <= warn_days:
1054
      return (LUVerifyCluster.ETYPE_WARNING, msg)
1055

    
1056
  return (None, None)
1057

    
1058

    
1059
def _VerifyCertificate(filename):
1060
  """Verifies a certificate for LUVerifyCluster.
1061

1062
  @type filename: string
1063
  @param filename: Path to PEM file
1064

1065
  """
1066
  try:
1067
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1068
                                           utils.ReadFile(filename))
1069
  except Exception, err: # pylint: disable-msg=W0703
1070
    return (LUVerifyCluster.ETYPE_ERROR,
1071
            "Failed to load X509 certificate %s: %s" % (filename, err))
1072

    
1073
  # Depending on the pyOpenSSL version, this can just return (None, None)
1074
  (not_before, not_after) = utils.GetX509CertValidity(cert)
1075

    
1076
  return _VerifyCertificateInner(filename, cert.has_expired(),
1077
                                 not_before, not_after, time.time())
1078

    
1079

    
1080
class LUVerifyCluster(LogicalUnit):
1081
  """Verifies the cluster status.
1082

1083
  """
1084
  HPATH = "cluster-verify"
1085
  HTYPE = constants.HTYPE_CLUSTER
1086
  _OP_REQP = ["skip_checks", "verbose", "error_codes", "debug_simulate_errors"]
1087
  REQ_BGL = False
1088

    
1089
  TCLUSTER = "cluster"
1090
  TNODE = "node"
1091
  TINSTANCE = "instance"
1092

    
1093
  ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG")
1094
  ECLUSTERCERT = (TCLUSTER, "ECLUSTERCERT")
1095
  EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE")
1096
  EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN")
1097
  EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT")
1098
  EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
1099
  EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
1100
  EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE")
1101
  ENODEDRBD = (TNODE, "ENODEDRBD")
1102
  ENODEFILECHECK = (TNODE, "ENODEFILECHECK")
1103
  ENODEHOOKS = (TNODE, "ENODEHOOKS")
1104
  ENODEHV = (TNODE, "ENODEHV")
1105
  ENODELVM = (TNODE, "ENODELVM")
1106
  ENODEN1 = (TNODE, "ENODEN1")
1107
  ENODENET = (TNODE, "ENODENET")
1108
  ENODEORPHANINSTANCE = (TNODE, "ENODEORPHANINSTANCE")
1109
  ENODEORPHANLV = (TNODE, "ENODEORPHANLV")
1110
  ENODERPC = (TNODE, "ENODERPC")
1111
  ENODESSH = (TNODE, "ENODESSH")
1112
  ENODEVERSION = (TNODE, "ENODEVERSION")
1113
  ENODESETUP = (TNODE, "ENODESETUP")
1114
  ENODETIME = (TNODE, "ENODETIME")
1115

    
1116
  ETYPE_FIELD = "code"
1117
  ETYPE_ERROR = "ERROR"
1118
  ETYPE_WARNING = "WARNING"
1119

    
1120
  class NodeImage(object):
1121
    """A class representing the logical and physical status of a node.
1122

1123
    @ivar volumes: a structure as returned from
1124
        L{ganeti.backend.GetVolumeList} (runtime)
1125
    @ivar instances: a list of running instances (runtime)
1126
    @ivar pinst: list of configured primary instances (config)
1127
    @ivar sinst: list of configured secondary instances (config)
1128
    @ivar sbp: diction of {secondary-node: list of instances} of all peers
1129
        of this node (config)
1130
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1131
    @ivar dfree: free disk, as reported by the node (runtime)
1132
    @ivar offline: the offline status (config)
1133
    @type rpc_fail: boolean
1134
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1135
        not whether the individual keys were correct) (runtime)
1136
    @type lvm_fail: boolean
1137
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1138
    @type hyp_fail: boolean
1139
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1140
    @type ghost: boolean
1141
    @ivar ghost: whether this is a known node or not (config)
1142

1143
    """
1144
    def __init__(self, offline=False):
1145
      self.volumes = {}
1146
      self.instances = []
1147
      self.pinst = []
1148
      self.sinst = []
1149
      self.sbp = {}
1150
      self.mfree = 0
1151
      self.dfree = 0
1152
      self.offline = offline
1153
      self.rpc_fail = False
1154
      self.lvm_fail = False
1155
      self.hyp_fail = False
1156
      self.ghost = False
1157

    
1158
  def ExpandNames(self):
1159
    self.needed_locks = {
1160
      locking.LEVEL_NODE: locking.ALL_SET,
1161
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1162
    }
1163
    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1164

    
1165
  def _Error(self, ecode, item, msg, *args, **kwargs):
1166
    """Format an error message.
1167

1168
    Based on the opcode's error_codes parameter, either format a
1169
    parseable error code, or a simpler error string.
1170

1171
    This must be called only from Exec and functions called from Exec.
1172

1173
    """
1174
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1175
    itype, etxt = ecode
1176
    # first complete the msg
1177
    if args:
1178
      msg = msg % args
1179
    # then format the whole message
1180
    if self.op.error_codes:
1181
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1182
    else:
1183
      if item:
1184
        item = " " + item
1185
      else:
1186
        item = ""
1187
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1188
    # and finally report it via the feedback_fn
1189
    self._feedback_fn("  - %s" % msg)
1190

    
1191
  def _ErrorIf(self, cond, *args, **kwargs):
1192
    """Log an error message if the passed condition is True.
1193

1194
    """
1195
    cond = bool(cond) or self.op.debug_simulate_errors
1196
    if cond:
1197
      self._Error(*args, **kwargs)
1198
    # do not mark the operation as failed for WARN cases only
1199
    if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
1200
      self.bad = self.bad or cond
1201

    
1202
  def _VerifyNode(self, ninfo, nresult):
1203
    """Run multiple tests against a node.
1204

1205
    Test list:
1206

1207
      - compares ganeti version
1208
      - checks vg existence and size > 20G
1209
      - checks config file checksum
1210
      - checks ssh to other nodes
1211

1212
    @type ninfo: L{objects.Node}
1213
    @param ninfo: the node to check
1214
    @param nresult: the results from the node
1215
    @rtype: boolean
1216
    @return: whether overall this call was successful (and we can expect
1217
         reasonable values in the respose)
1218

1219
    """
1220
    node = ninfo.name
1221
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1222

    
1223
    # main result, nresult should be a non-empty dict
1224
    test = not nresult or not isinstance(nresult, dict)
1225
    _ErrorIf(test, self.ENODERPC, node,
1226
                  "unable to verify node: no data returned")
1227
    if test:
1228
      return False
1229

    
1230
    # compares ganeti version
1231
    local_version = constants.PROTOCOL_VERSION
1232
    remote_version = nresult.get("version", None)
1233
    test = not (remote_version and
1234
                isinstance(remote_version, (list, tuple)) and
1235
                len(remote_version) == 2)
1236
    _ErrorIf(test, self.ENODERPC, node,
1237
             "connection to node returned invalid data")
1238
    if test:
1239
      return False
1240

    
1241
    test = local_version != remote_version[0]
1242
    _ErrorIf(test, self.ENODEVERSION, node,
1243
             "incompatible protocol versions: master %s,"
1244
             " node %s", local_version, remote_version[0])
1245
    if test:
1246
      return False
1247

    
1248
    # node seems compatible, we can actually try to look into its results
1249

    
1250
    # full package version
1251
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1252
                  self.ENODEVERSION, node,
1253
                  "software version mismatch: master %s, node %s",
1254
                  constants.RELEASE_VERSION, remote_version[1],
1255
                  code=self.ETYPE_WARNING)
1256

    
1257
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1258
    if isinstance(hyp_result, dict):
1259
      for hv_name, hv_result in hyp_result.iteritems():
1260
        test = hv_result is not None
1261
        _ErrorIf(test, self.ENODEHV, node,
1262
                 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1263

    
1264

    
1265
    test = nresult.get(constants.NV_NODESETUP,
1266
                           ["Missing NODESETUP results"])
1267
    _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s",
1268
             "; ".join(test))
1269

    
1270
    return True
1271

    
1272
  def _VerifyNodeTime(self, ninfo, nresult,
1273
                      nvinfo_starttime, nvinfo_endtime):
1274
    """Check the node time.
1275

1276
    @type ninfo: L{objects.Node}
1277
    @param ninfo: the node to check
1278
    @param nresult: the remote results for the node
1279
    @param nvinfo_starttime: the start time of the RPC call
1280
    @param nvinfo_endtime: the end time of the RPC call
1281

1282
    """
1283
    node = ninfo.name
1284
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1285

    
1286
    ntime = nresult.get(constants.NV_TIME, None)
1287
    try:
1288
      ntime_merged = utils.MergeTime(ntime)
1289
    except (ValueError, TypeError):
1290
      _ErrorIf(True, self.ENODETIME, node, "Node returned invalid time")
1291
      return
1292

    
1293
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1294
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1295
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1296
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1297
    else:
1298
      ntime_diff = None
1299

    
1300
    _ErrorIf(ntime_diff is not None, self.ENODETIME, node,
1301
             "Node time diverges by at least %s from master node time",
1302
             ntime_diff)
1303

    
1304
  def _VerifyNodeLVM(self, ninfo, nresult, vg_name):
1305
    """Check the node time.
1306

1307
    @type ninfo: L{objects.Node}
1308
    @param ninfo: the node to check
1309
    @param nresult: the remote results for the node
1310
    @param vg_name: the configured VG name
1311

1312
    """
1313
    if vg_name is None:
1314
      return
1315

    
1316
    node = ninfo.name
1317
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1318

    
1319
    # checks vg existence and size > 20G
1320
    vglist = nresult.get(constants.NV_VGLIST, None)
1321
    test = not vglist
1322
    _ErrorIf(test, self.ENODELVM, node, "unable to check volume groups")
1323
    if not test:
1324
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1325
                                            constants.MIN_VG_SIZE)
1326
      _ErrorIf(vgstatus, self.ENODELVM, node, vgstatus)
1327

    
1328
    # check pv names
1329
    pvlist = nresult.get(constants.NV_PVLIST, None)
1330
    test = pvlist is None
1331
    _ErrorIf(test, self.ENODELVM, node, "Can't get PV list from node")
1332
    if not test:
1333
      # check that ':' is not present in PV names, since it's a
1334
      # special character for lvcreate (denotes the range of PEs to
1335
      # use on the PV)
1336
      for _, pvname, owner_vg in pvlist:
1337
        test = ":" in pvname
1338
        _ErrorIf(test, self.ENODELVM, node, "Invalid character ':' in PV"
1339
                 " '%s' of VG '%s'", pvname, owner_vg)
1340

    
1341
  def _VerifyNodeNetwork(self, ninfo, nresult):
1342
    """Check the node time.
1343

1344
    @type ninfo: L{objects.Node}
1345
    @param ninfo: the node to check
1346
    @param nresult: the remote results for the node
1347

1348
    """
1349
    node = ninfo.name
1350
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1351

    
1352
    test = constants.NV_NODELIST not in nresult
1353
    _ErrorIf(test, self.ENODESSH, node,
1354
             "node hasn't returned node ssh connectivity data")
1355
    if not test:
1356
      if nresult[constants.NV_NODELIST]:
1357
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1358
          _ErrorIf(True, self.ENODESSH, node,
1359
                   "ssh communication with node '%s': %s", a_node, a_msg)
1360

    
1361
    test = constants.NV_NODENETTEST not in nresult
1362
    _ErrorIf(test, self.ENODENET, node,
1363
             "node hasn't returned node tcp connectivity data")
1364
    if not test:
1365
      if nresult[constants.NV_NODENETTEST]:
1366
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1367
        for anode in nlist:
1368
          _ErrorIf(True, self.ENODENET, node,
1369
                   "tcp communication with node '%s': %s",
1370
                   anode, nresult[constants.NV_NODENETTEST][anode])
1371

    
1372
  def _VerifyInstance(self, instance, instanceconfig, node_image):
1373
    """Verify an instance.
1374

1375
    This function checks to see if the required block devices are
1376
    available on the instance's node.
1377

1378
    """
1379
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1380
    node_current = instanceconfig.primary_node
1381

    
1382
    node_vol_should = {}
1383
    instanceconfig.MapLVsByNode(node_vol_should)
1384

    
1385
    for node in node_vol_should:
1386
      n_img = node_image[node]
1387
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1388
        # ignore missing volumes on offline or broken nodes
1389
        continue
1390
      for volume in node_vol_should[node]:
1391
        test = volume not in n_img.volumes
1392
        _ErrorIf(test, self.EINSTANCEMISSINGDISK, instance,
1393
                 "volume %s missing on node %s", volume, node)
1394

    
1395
    if instanceconfig.admin_up:
1396
      pri_img = node_image[node_current]
1397
      test = instance not in pri_img.instances and not pri_img.offline
1398
      _ErrorIf(test, self.EINSTANCEDOWN, instance,
1399
               "instance not running on its primary node %s",
1400
               node_current)
1401

    
1402
    for node, n_img in node_image.items():
1403
      if (not node == node_current):
1404
        test = instance in n_img.instances
1405
        _ErrorIf(test, self.EINSTANCEWRONGNODE, instance,
1406
                 "instance should not run on node %s", node)
1407

    
1408
  def _VerifyOrphanVolumes(self, node_vol_should, node_image):
1409
    """Verify if there are any unknown volumes in the cluster.
1410

1411
    The .os, .swap and backup volumes are ignored. All other volumes are
1412
    reported as unknown.
1413

1414
    """
1415
    for node, n_img in node_image.items():
1416
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1417
        # skip non-healthy nodes
1418
        continue
1419
      for volume in n_img.volumes:
1420
        test = (node not in node_vol_should or
1421
                volume not in node_vol_should[node])
1422
        self._ErrorIf(test, self.ENODEORPHANLV, node,
1423
                      "volume %s is unknown", volume)
1424

    
1425
  def _VerifyOrphanInstances(self, instancelist, node_image):
1426
    """Verify the list of running instances.
1427

1428
    This checks what instances are running but unknown to the cluster.
1429

1430
    """
1431
    for node, n_img in node_image.items():
1432
      for o_inst in n_img.instances:
1433
        test = o_inst not in instancelist
1434
        self._ErrorIf(test, self.ENODEORPHANINSTANCE, node,
1435
                      "instance %s on node %s should not exist", o_inst, node)
1436

    
1437
  def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
1438
    """Verify N+1 Memory Resilience.
1439

1440
    Check that if one single node dies we can still start all the
1441
    instances it was primary for.
1442

1443
    """
1444
    for node, n_img in node_image.items():
1445
      # This code checks that every node which is now listed as
1446
      # secondary has enough memory to host all instances it is
1447
      # supposed to should a single other node in the cluster fail.
1448
      # FIXME: not ready for failover to an arbitrary node
1449
      # FIXME: does not support file-backed instances
1450
      # WARNING: we currently take into account down instances as well
1451
      # as up ones, considering that even if they're down someone
1452
      # might want to start them even in the event of a node failure.
1453
      for prinode, instances in n_img.sbp.items():
1454
        needed_mem = 0
1455
        for instance in instances:
1456
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
1457
          if bep[constants.BE_AUTO_BALANCE]:
1458
            needed_mem += bep[constants.BE_MEMORY]
1459
        test = n_img.mfree < needed_mem
1460
        self._ErrorIf(test, self.ENODEN1, node,
1461
                      "not enough memory on to accommodate"
1462
                      " failovers should peer node %s fail", prinode)
1463

    
1464
  def _VerifyNodeFiles(self, ninfo, nresult, file_list, local_cksum,
1465
                       master_files):
1466
    """Verifies and computes the node required file checksums.
1467

1468
    @type ninfo: L{objects.Node}
1469
    @param ninfo: the node to check
1470
    @param nresult: the remote results for the node
1471
    @param file_list: required list of files
1472
    @param local_cksum: dictionary of local files and their checksums
1473
    @param master_files: list of files that only masters should have
1474

1475
    """
1476
    node = ninfo.name
1477
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1478

    
1479
    remote_cksum = nresult.get(constants.NV_FILELIST, None)
1480
    test = not isinstance(remote_cksum, dict)
1481
    _ErrorIf(test, self.ENODEFILECHECK, node,
1482
             "node hasn't returned file checksum data")
1483
    if test:
1484
      return
1485

    
1486
    for file_name in file_list:
1487
      node_is_mc = ninfo.master_candidate
1488
      must_have = (file_name not in master_files) or node_is_mc
1489
      # missing
1490
      test1 = file_name not in remote_cksum
1491
      # invalid checksum
1492
      test2 = not test1 and remote_cksum[file_name] != local_cksum[file_name]
1493
      # existing and good
1494
      test3 = not test1 and remote_cksum[file_name] == local_cksum[file_name]
1495
      _ErrorIf(test1 and must_have, self.ENODEFILECHECK, node,
1496
               "file '%s' missing", file_name)
1497
      _ErrorIf(test2 and must_have, self.ENODEFILECHECK, node,
1498
               "file '%s' has wrong checksum", file_name)
1499
      # not candidate and this is not a must-have file
1500
      _ErrorIf(test2 and not must_have, self.ENODEFILECHECK, node,
1501
               "file '%s' should not exist on non master"
1502
               " candidates (and the file is outdated)", file_name)
1503
      # all good, except non-master/non-must have combination
1504
      _ErrorIf(test3 and not must_have, self.ENODEFILECHECK, node,
1505
               "file '%s' should not exist"
1506
               " on non master candidates", file_name)
1507

    
1508
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_map):
1509
    """Verifies and the node DRBD status.
1510

1511
    @type ninfo: L{objects.Node}
1512
    @param ninfo: the node to check
1513
    @param nresult: the remote results for the node
1514
    @param instanceinfo: the dict of instances
1515
    @param drbd_map: the DRBD map as returned by
1516
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
1517

1518
    """
1519
    node = ninfo.name
1520
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1521

    
1522
    # compute the DRBD minors
1523
    node_drbd = {}
1524
    for minor, instance in drbd_map[node].items():
1525
      test = instance not in instanceinfo
1526
      _ErrorIf(test, self.ECLUSTERCFG, None,
1527
               "ghost instance '%s' in temporary DRBD map", instance)
1528
        # ghost instance should not be running, but otherwise we
1529
        # don't give double warnings (both ghost instance and
1530
        # unallocated minor in use)
1531
      if test:
1532
        node_drbd[minor] = (instance, False)
1533
      else:
1534
        instance = instanceinfo[instance]
1535
        node_drbd[minor] = (instance.name, instance.admin_up)
1536

    
1537
    # and now check them
1538
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
1539
    test = not isinstance(used_minors, (tuple, list))
1540
    _ErrorIf(test, self.ENODEDRBD, node,
1541
             "cannot parse drbd status file: %s", str(used_minors))
1542
    if test:
1543
      # we cannot check drbd status
1544
      return
1545

    
1546
    for minor, (iname, must_exist) in node_drbd.items():
1547
      test = minor not in used_minors and must_exist
1548
      _ErrorIf(test, self.ENODEDRBD, node,
1549
               "drbd minor %d of instance %s is not active", minor, iname)
1550
    for minor in used_minors:
1551
      test = minor not in node_drbd
1552
      _ErrorIf(test, self.ENODEDRBD, node,
1553
               "unallocated drbd minor %d is in use", minor)
1554

    
1555
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
1556
    """Verifies and updates the node volume data.
1557

1558
    This function will update a L{NodeImage}'s internal structures
1559
    with data from the remote call.
1560

1561
    @type ninfo: L{objects.Node}
1562
    @param ninfo: the node to check
1563
    @param nresult: the remote results for the node
1564
    @param nimg: the node image object
1565
    @param vg_name: the configured VG name
1566

1567
    """
1568
    node = ninfo.name
1569
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1570

    
1571
    nimg.lvm_fail = True
1572
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1573
    if vg_name is None:
1574
      pass
1575
    elif isinstance(lvdata, basestring):
1576
      _ErrorIf(True, self.ENODELVM, node, "LVM problem on node: %s",
1577
               utils.SafeEncode(lvdata))
1578
    elif not isinstance(lvdata, dict):
1579
      _ErrorIf(True, self.ENODELVM, node, "rpc call to node failed (lvlist)")
1580
    else:
1581
      nimg.volumes = lvdata
1582
      nimg.lvm_fail = False
1583

    
1584
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
1585
    """Verifies and updates the node instance list.
1586

1587
    If the listing was successful, then updates this node's instance
1588
    list. Otherwise, it marks the RPC call as failed for the instance
1589
    list key.
1590

1591
    @type ninfo: L{objects.Node}
1592
    @param ninfo: the node to check
1593
    @param nresult: the remote results for the node
1594
    @param nimg: the node image object
1595

1596
    """
1597
    idata = nresult.get(constants.NV_INSTANCELIST, None)
1598
    test = not isinstance(idata, list)
1599
    self._ErrorIf(test, self.ENODEHV, ninfo.name, "rpc call to node failed"
1600
                  " (instancelist): %s", utils.SafeEncode(str(idata)))
1601
    if test:
1602
      nimg.hyp_fail = True
1603
    else:
1604
      nimg.instances = idata
1605

    
1606
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
1607
    """Verifies and computes a node information map
1608

1609
    @type ninfo: L{objects.Node}
1610
    @param ninfo: the node to check
1611
    @param nresult: the remote results for the node
1612
    @param nimg: the node image object
1613
    @param vg_name: the configured VG name
1614

1615
    """
1616
    node = ninfo.name
1617
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1618

    
1619
    # try to read free memory (from the hypervisor)
1620
    hv_info = nresult.get(constants.NV_HVINFO, None)
1621
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
1622
    _ErrorIf(test, self.ENODEHV, node, "rpc call to node failed (hvinfo)")
1623
    if not test:
1624
      try:
1625
        nimg.mfree = int(hv_info["memory_free"])
1626
      except (ValueError, TypeError):
1627
        _ErrorIf(True, self.ENODERPC, node,
1628
                 "node returned invalid nodeinfo, check hypervisor")
1629

    
1630
    # FIXME: devise a free space model for file based instances as well
1631
    if vg_name is not None:
1632
      test = (constants.NV_VGLIST not in nresult or
1633
              vg_name not in nresult[constants.NV_VGLIST])
1634
      _ErrorIf(test, self.ENODELVM, node,
1635
               "node didn't return data for the volume group '%s'"
1636
               " - it is either missing or broken", vg_name)
1637
      if not test:
1638
        try:
1639
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
1640
        except (ValueError, TypeError):
1641
          _ErrorIf(True, self.ENODERPC, node,
1642
                   "node returned invalid LVM info, check LVM status")
1643

    
1644
  def CheckPrereq(self):
1645
    """Check prerequisites.
1646

1647
    Transform the list of checks we're going to skip into a set and check that
1648
    all its members are valid.
1649

1650
    """
1651
    self.skip_set = frozenset(self.op.skip_checks)
1652
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
1653
      raise errors.OpPrereqError("Invalid checks to be skipped specified",
1654
                                 errors.ECODE_INVAL)
1655

    
1656
  def BuildHooksEnv(self):
1657
    """Build hooks env.
1658

1659
    Cluster-Verify hooks just ran in the post phase and their failure makes
1660
    the output be logged in the verify output and the verification to fail.
1661

1662
    """
1663
    all_nodes = self.cfg.GetNodeList()
1664
    env = {
1665
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
1666
      }
1667
    for node in self.cfg.GetAllNodesInfo().values():
1668
      env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
1669

    
1670
    return env, [], all_nodes
1671

    
1672
  def Exec(self, feedback_fn):
1673
    """Verify integrity of cluster, performing various test on nodes.
1674

1675
    """
1676
    self.bad = False
1677
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1678
    verbose = self.op.verbose
1679
    self._feedback_fn = feedback_fn
1680
    feedback_fn("* Verifying global settings")
1681
    for msg in self.cfg.VerifyConfig():
1682
      _ErrorIf(True, self.ECLUSTERCFG, None, msg)
1683

    
1684
    # Check the cluster certificates
1685
    for cert_filename in constants.ALL_CERT_FILES:
1686
      (errcode, msg) = _VerifyCertificate(cert_filename)
1687
      _ErrorIf(errcode, self.ECLUSTERCERT, None, msg, code=errcode)
1688

    
1689
    vg_name = self.cfg.GetVGName()
1690
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
1691
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
1692
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
1693
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
1694
    instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
1695
                        for iname in instancelist)
1696
    i_non_redundant = [] # Non redundant instances
1697
    i_non_a_balanced = [] # Non auto-balanced instances
1698
    n_offline = 0 # Count of offline nodes
1699
    n_drained = 0 # Count of nodes being drained
1700
    node_vol_should = {}
1701

    
1702
    # FIXME: verify OS list
1703
    # do local checksums
1704
    master_files = [constants.CLUSTER_CONF_FILE]
1705

    
1706
    file_names = ssconf.SimpleStore().GetFileList()
1707
    file_names.extend(constants.ALL_CERT_FILES)
1708
    file_names.extend(master_files)
1709

    
1710
    local_checksums = utils.FingerprintFiles(file_names)
1711

    
1712
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1713
    node_verify_param = {
1714
      constants.NV_FILELIST: file_names,
1715
      constants.NV_NODELIST: [node.name for node in nodeinfo
1716
                              if not node.offline],
1717
      constants.NV_HYPERVISOR: hypervisors,
1718
      constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1719
                                  node.secondary_ip) for node in nodeinfo
1720
                                 if not node.offline],
1721
      constants.NV_INSTANCELIST: hypervisors,
1722
      constants.NV_VERSION: None,
1723
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1724
      constants.NV_NODESETUP: None,
1725
      constants.NV_TIME: None,
1726
      }
1727

    
1728
    if vg_name is not None:
1729
      node_verify_param[constants.NV_VGLIST] = None
1730
      node_verify_param[constants.NV_LVLIST] = vg_name
1731
      node_verify_param[constants.NV_PVLIST] = [vg_name]
1732
      node_verify_param[constants.NV_DRBDLIST] = None
1733

    
1734
    # Build our expected cluster state
1735
    node_image = dict((node.name, self.NodeImage(offline=node.offline))
1736
                      for node in nodeinfo)
1737

    
1738
    for instance in instancelist:
1739
      inst_config = instanceinfo[instance]
1740

    
1741
      for nname in inst_config.all_nodes:
1742
        if nname not in node_image:
1743
          # ghost node
1744
          gnode = self.NodeImage()
1745
          gnode.ghost = True
1746
          node_image[nname] = gnode
1747

    
1748
      inst_config.MapLVsByNode(node_vol_should)
1749

    
1750
      pnode = inst_config.primary_node
1751
      node_image[pnode].pinst.append(instance)
1752

    
1753
      for snode in inst_config.secondary_nodes:
1754
        nimg = node_image[snode]
1755
        nimg.sinst.append(instance)
1756
        if pnode not in nimg.sbp:
1757
          nimg.sbp[pnode] = []
1758
        nimg.sbp[pnode].append(instance)
1759

    
1760
    # At this point, we have the in-memory data structures complete,
1761
    # except for the runtime information, which we'll gather next
1762

    
1763
    # Due to the way our RPC system works, exact response times cannot be
1764
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
1765
    # time before and after executing the request, we can at least have a time
1766
    # window.
1767
    nvinfo_starttime = time.time()
1768
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1769
                                           self.cfg.GetClusterName())
1770
    nvinfo_endtime = time.time()
1771

    
1772
    cluster = self.cfg.GetClusterInfo()
1773
    master_node = self.cfg.GetMasterNode()
1774
    all_drbd_map = self.cfg.ComputeDRBDMap()
1775

    
1776
    feedback_fn("* Verifying node status")
1777
    for node_i in nodeinfo:
1778
      node = node_i.name
1779
      nimg = node_image[node]
1780

    
1781
      if node_i.offline:
1782
        if verbose:
1783
          feedback_fn("* Skipping offline node %s" % (node,))
1784
        n_offline += 1
1785
        continue
1786

    
1787
      if node == master_node:
1788
        ntype = "master"
1789
      elif node_i.master_candidate:
1790
        ntype = "master candidate"
1791
      elif node_i.drained:
1792
        ntype = "drained"
1793
        n_drained += 1
1794
      else:
1795
        ntype = "regular"
1796
      if verbose:
1797
        feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1798

    
1799
      msg = all_nvinfo[node].fail_msg
1800
      _ErrorIf(msg, self.ENODERPC, node, "while contacting node: %s", msg)
1801
      if msg:
1802
        nimg.rpc_fail = True
1803
        continue
1804

    
1805
      nresult = all_nvinfo[node].payload
1806

    
1807
      nimg.call_ok = self._VerifyNode(node_i, nresult)
1808
      self._VerifyNodeNetwork(node_i, nresult)
1809
      self._VerifyNodeLVM(node_i, nresult, vg_name)
1810
      self._VerifyNodeFiles(node_i, nresult, file_names, local_checksums,
1811
                            master_files)
1812
      self._VerifyNodeDrbd(node_i, nresult, instanceinfo, all_drbd_map)
1813
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
1814

    
1815
      self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
1816
      self._UpdateNodeInstances(node_i, nresult, nimg)
1817
      self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
1818

    
1819
    feedback_fn("* Verifying instance status")
1820
    for instance in instancelist:
1821
      if verbose:
1822
        feedback_fn("* Verifying instance %s" % instance)
1823
      inst_config = instanceinfo[instance]
1824
      self._VerifyInstance(instance, inst_config, node_image)
1825
      inst_nodes_offline = []
1826

    
1827
      pnode = inst_config.primary_node
1828
      pnode_img = node_image[pnode]
1829
      _ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
1830
               self.ENODERPC, pnode, "instance %s, connection to"
1831
               " primary node failed", instance)
1832

    
1833
      if pnode_img.offline:
1834
        inst_nodes_offline.append(pnode)
1835

    
1836
      # If the instance is non-redundant we cannot survive losing its primary
1837
      # node, so we are not N+1 compliant. On the other hand we have no disk
1838
      # templates with more than one secondary so that situation is not well
1839
      # supported either.
1840
      # FIXME: does not support file-backed instances
1841
      if not inst_config.secondary_nodes:
1842
        i_non_redundant.append(instance)
1843
      _ErrorIf(len(inst_config.secondary_nodes) > 1, self.EINSTANCELAYOUT,
1844
               instance, "instance has multiple secondary nodes: %s",
1845
               utils.CommaJoin(inst_config.secondary_nodes),
1846
               code=self.ETYPE_WARNING)
1847

    
1848
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1849
        i_non_a_balanced.append(instance)
1850

    
1851
      for snode in inst_config.secondary_nodes:
1852
        s_img = node_image[snode]
1853
        _ErrorIf(s_img.rpc_fail and not s_img.offline, self.ENODERPC, snode,
1854
                 "instance %s, connection to secondary node failed", instance)
1855

    
1856
        if s_img.offline:
1857
          inst_nodes_offline.append(snode)
1858

    
1859
      # warn that the instance lives on offline nodes
1860
      _ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance,
1861
               "instance lives on offline node(s) %s",
1862
               utils.CommaJoin(inst_nodes_offline))
1863
      # ... or ghost nodes
1864
      for node in inst_config.all_nodes:
1865
        _ErrorIf(node_image[node].ghost, self.EINSTANCEBADNODE, instance,
1866
                 "instance lives on ghost node %s", node)
1867

    
1868
    feedback_fn("* Verifying orphan volumes")
1869
    self._VerifyOrphanVolumes(node_vol_should, node_image)
1870

    
1871
    feedback_fn("* Verifying oprhan instances")
1872
    self._VerifyOrphanInstances(instancelist, node_image)
1873

    
1874
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1875
      feedback_fn("* Verifying N+1 Memory redundancy")
1876
      self._VerifyNPlusOneMemory(node_image, instanceinfo)
1877

    
1878
    feedback_fn("* Other Notes")
1879
    if i_non_redundant:
1880
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1881
                  % len(i_non_redundant))
1882

    
1883
    if i_non_a_balanced:
1884
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1885
                  % len(i_non_a_balanced))
1886

    
1887
    if n_offline:
1888
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
1889

    
1890
    if n_drained:
1891
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
1892

    
1893
    return not self.bad
1894

    
1895
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1896
    """Analyze the post-hooks' result
1897

1898
    This method analyses the hook result, handles it, and sends some
1899
    nicely-formatted feedback back to the user.
1900

1901
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
1902
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1903
    @param hooks_results: the results of the multi-node hooks rpc call
1904
    @param feedback_fn: function used send feedback back to the caller
1905
    @param lu_result: previous Exec result
1906
    @return: the new Exec result, based on the previous result
1907
        and hook results
1908

1909
    """
1910
    # We only really run POST phase hooks, and are only interested in
1911
    # their results
1912
    if phase == constants.HOOKS_PHASE_POST:
1913
      # Used to change hooks' output to proper indentation
1914
      indent_re = re.compile('^', re.M)
1915
      feedback_fn("* Hooks Results")
1916
      assert hooks_results, "invalid result from hooks"
1917

    
1918
      for node_name in hooks_results:
1919
        res = hooks_results[node_name]
1920
        msg = res.fail_msg
1921
        test = msg and not res.offline
1922
        self._ErrorIf(test, self.ENODEHOOKS, node_name,
1923
                      "Communication failure in hooks execution: %s", msg)
1924
        if res.offline or msg:
1925
          # No need to investigate payload if node is offline or gave an error.
1926
          # override manually lu_result here as _ErrorIf only
1927
          # overrides self.bad
1928
          lu_result = 1
1929
          continue
1930
        for script, hkr, output in res.payload:
1931
          test = hkr == constants.HKR_FAIL
1932
          self._ErrorIf(test, self.ENODEHOOKS, node_name,
1933
                        "Script %s failed, output:", script)
1934
          if test:
1935
            output = indent_re.sub('      ', output)
1936
            feedback_fn("%s" % output)
1937
            lu_result = 0
1938

    
1939
      return lu_result
1940

    
1941

    
1942
class LUVerifyDisks(NoHooksLU):
1943
  """Verifies the cluster disks status.
1944

1945
  """
1946
  _OP_REQP = []
1947
  REQ_BGL = False
1948

    
1949
  def ExpandNames(self):
1950
    self.needed_locks = {
1951
      locking.LEVEL_NODE: locking.ALL_SET,
1952
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1953
    }
1954
    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1955

    
1956
  def CheckPrereq(self):
1957
    """Check prerequisites.
1958

1959
    This has no prerequisites.
1960

1961
    """
1962
    pass
1963

    
1964
  def Exec(self, feedback_fn):
1965
    """Verify integrity of cluster disks.
1966

1967
    @rtype: tuple of three items
1968
    @return: a tuple of (dict of node-to-node_error, list of instances
1969
        which need activate-disks, dict of instance: (node, volume) for
1970
        missing volumes
1971

1972
    """
1973
    result = res_nodes, res_instances, res_missing = {}, [], {}
1974

    
1975
    vg_name = self.cfg.GetVGName()
1976
    nodes = utils.NiceSort(self.cfg.GetNodeList())
1977
    instances = [self.cfg.GetInstanceInfo(name)
1978
                 for name in self.cfg.GetInstanceList()]
1979

    
1980
    nv_dict = {}
1981
    for inst in instances:
1982
      inst_lvs = {}
1983
      if (not inst.admin_up or
1984
          inst.disk_template not in constants.DTS_NET_MIRROR):
1985
        continue
1986
      inst.MapLVsByNode(inst_lvs)
1987
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1988
      for node, vol_list in inst_lvs.iteritems():
1989
        for vol in vol_list:
1990
          nv_dict[(node, vol)] = inst
1991

    
1992
    if not nv_dict:
1993
      return result
1994

    
1995
    node_lvs = self.rpc.call_lv_list(nodes, vg_name)
1996

    
1997
    for node in nodes:
1998
      # node_volume
1999
      node_res = node_lvs[node]
2000
      if node_res.offline:
2001
        continue
2002
      msg = node_res.fail_msg
2003
      if msg:
2004
        logging.warning("Error enumerating LVs on node %s: %s", node, msg)
2005
        res_nodes[node] = msg
2006
        continue
2007

    
2008
      lvs = node_res.payload
2009
      for lv_name, (_, _, lv_online) in lvs.items():
2010
        inst = nv_dict.pop((node, lv_name), None)
2011
        if (not lv_online and inst is not None
2012
            and inst.name not in res_instances):
2013
          res_instances.append(inst.name)
2014

    
2015
    # any leftover items in nv_dict are missing LVs, let's arrange the
2016
    # data better
2017
    for key, inst in nv_dict.iteritems():
2018
      if inst.name not in res_missing:
2019
        res_missing[inst.name] = []
2020
      res_missing[inst.name].append(key)
2021

    
2022
    return result
2023

    
2024

    
2025
class LURepairDiskSizes(NoHooksLU):
2026
  """Verifies the cluster disks sizes.
2027

2028
  """
2029
  _OP_REQP = ["instances"]
2030
  REQ_BGL = False
2031

    
2032
  def ExpandNames(self):
2033
    if not isinstance(self.op.instances, list):
2034
      raise errors.OpPrereqError("Invalid argument type 'instances'",
2035
                                 errors.ECODE_INVAL)
2036

    
2037
    if self.op.instances:
2038
      self.wanted_names = []
2039
      for name in self.op.instances:
2040
        full_name = _ExpandInstanceName(self.cfg, name)
2041
        self.wanted_names.append(full_name)
2042
      self.needed_locks = {
2043
        locking.LEVEL_NODE: [],
2044
        locking.LEVEL_INSTANCE: self.wanted_names,
2045
        }
2046
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2047
    else:
2048
      self.wanted_names = None
2049
      self.needed_locks = {
2050
        locking.LEVEL_NODE: locking.ALL_SET,
2051
        locking.LEVEL_INSTANCE: locking.ALL_SET,
2052
        }
2053
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
2054

    
2055
  def DeclareLocks(self, level):
2056
    if level == locking.LEVEL_NODE and self.wanted_names is not None:
2057
      self._LockInstancesNodes(primary_only=True)
2058

    
2059
  def CheckPrereq(self):
2060
    """Check prerequisites.
2061

2062
    This only checks the optional instance list against the existing names.
2063

2064
    """
2065
    if self.wanted_names is None:
2066
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2067

    
2068
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
2069
                             in self.wanted_names]
2070

    
2071
  def _EnsureChildSizes(self, disk):
2072
    """Ensure children of the disk have the needed disk size.
2073

2074
    This is valid mainly for DRBD8 and fixes an issue where the
2075
    children have smaller disk size.
2076

2077
    @param disk: an L{ganeti.objects.Disk} object
2078

2079
    """
2080
    if disk.dev_type == constants.LD_DRBD8:
2081
      assert disk.children, "Empty children for DRBD8?"
2082
      fchild = disk.children[0]
2083
      mismatch = fchild.size < disk.size
2084
      if mismatch:
2085
        self.LogInfo("Child disk has size %d, parent %d, fixing",
2086
                     fchild.size, disk.size)
2087
        fchild.size = disk.size
2088

    
2089
      # and we recurse on this child only, not on the metadev
2090
      return self._EnsureChildSizes(fchild) or mismatch
2091
    else:
2092
      return False
2093

    
2094
  def Exec(self, feedback_fn):
2095
    """Verify the size of cluster disks.
2096

2097
    """
2098
    # TODO: check child disks too
2099
    # TODO: check differences in size between primary/secondary nodes
2100
    per_node_disks = {}
2101
    for instance in self.wanted_instances:
2102
      pnode = instance.primary_node
2103
      if pnode not in per_node_disks:
2104
        per_node_disks[pnode] = []
2105
      for idx, disk in enumerate(instance.disks):
2106
        per_node_disks[pnode].append((instance, idx, disk))
2107

    
2108
    changed = []
2109
    for node, dskl in per_node_disks.items():
2110
      newl = [v[2].Copy() for v in dskl]
2111
      for dsk in newl:
2112
        self.cfg.SetDiskID(dsk, node)
2113
      result = self.rpc.call_blockdev_getsizes(node, newl)
2114
      if result.fail_msg:
2115
        self.LogWarning("Failure in blockdev_getsizes call to node"
2116
                        " %s, ignoring", node)
2117
        continue
2118
      if len(result.data) != len(dskl):
2119
        self.LogWarning("Invalid result from node %s, ignoring node results",
2120
                        node)
2121
        continue
2122
      for ((instance, idx, disk), size) in zip(dskl, result.data):
2123
        if size is None:
2124
          self.LogWarning("Disk %d of instance %s did not return size"
2125
                          " information, ignoring", idx, instance.name)
2126
          continue
2127
        if not isinstance(size, (int, long)):
2128
          self.LogWarning("Disk %d of instance %s did not return valid"
2129
                          " size information, ignoring", idx, instance.name)
2130
          continue
2131
        size = size >> 20
2132
        if size != disk.size:
2133
          self.LogInfo("Disk %d of instance %s has mismatched size,"
2134
                       " correcting: recorded %d, actual %d", idx,
2135
                       instance.name, disk.size, size)
2136
          disk.size = size
2137
          self.cfg.Update(instance, feedback_fn)
2138
          changed.append((instance.name, idx, size))
2139
        if self._EnsureChildSizes(disk):
2140
          self.cfg.Update(instance, feedback_fn)
2141
          changed.append((instance.name, idx, disk.size))
2142
    return changed
2143

    
2144

    
2145
class LURenameCluster(LogicalUnit):
2146
  """Rename the cluster.
2147

2148
  """
2149
  HPATH = "cluster-rename"
2150
  HTYPE = constants.HTYPE_CLUSTER
2151
  _OP_REQP = ["name"]
2152

    
2153
  def BuildHooksEnv(self):
2154
    """Build hooks env.
2155

2156
    """
2157
    env = {
2158
      "OP_TARGET": self.cfg.GetClusterName(),
2159
      "NEW_NAME": self.op.name,
2160
      }
2161
    mn = self.cfg.GetMasterNode()
2162
    all_nodes = self.cfg.GetNodeList()
2163
    return env, [mn], all_nodes
2164

    
2165
  def CheckPrereq(self):
2166
    """Verify that the passed name is a valid one.
2167

2168
    """
2169
    hostname = utils.GetHostInfo(self.op.name)
2170

    
2171
    new_name = hostname.name
2172
    self.ip = new_ip = hostname.ip
2173
    old_name = self.cfg.GetClusterName()
2174
    old_ip = self.cfg.GetMasterIP()
2175
    if new_name == old_name and new_ip == old_ip:
2176
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
2177
                                 " cluster has changed",
2178
                                 errors.ECODE_INVAL)
2179
    if new_ip != old_ip:
2180
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
2181
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
2182
                                   " reachable on the network. Aborting." %
2183
                                   new_ip, errors.ECODE_NOTUNIQUE)
2184

    
2185
    self.op.name = new_name
2186

    
2187
  def Exec(self, feedback_fn):
2188
    """Rename the cluster.
2189

2190
    """
2191
    clustername = self.op.name
2192
    ip = self.ip
2193

    
2194
    # shutdown the master IP
2195
    master = self.cfg.GetMasterNode()
2196
    result = self.rpc.call_node_stop_master(master, False)
2197
    result.Raise("Could not disable the master role")
2198

    
2199
    try:
2200
      cluster = self.cfg.GetClusterInfo()
2201
      cluster.cluster_name = clustername
2202
      cluster.master_ip = ip
2203
      self.cfg.Update(cluster, feedback_fn)
2204

    
2205
      # update the known hosts file
2206
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
2207
      node_list = self.cfg.GetNodeList()
2208
      try:
2209
        node_list.remove(master)
2210
      except ValueError:
2211
        pass
2212
      result = self.rpc.call_upload_file(node_list,
2213
                                         constants.SSH_KNOWN_HOSTS_FILE)
2214
      for to_node, to_result in result.iteritems():
2215
        msg = to_result.fail_msg
2216
        if msg:
2217
          msg = ("Copy of file %s to node %s failed: %s" %
2218
                 (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
2219
          self.proc.LogWarning(msg)
2220

    
2221
    finally:
2222
      result = self.rpc.call_node_start_master(master, False, False)
2223
      msg = result.fail_msg
2224
      if msg:
2225
        self.LogWarning("Could not re-enable the master role on"
2226
                        " the master, please restart manually: %s", msg)
2227

    
2228

    
2229
def _RecursiveCheckIfLVMBased(disk):
2230
  """Check if the given disk or its children are lvm-based.
2231

2232
  @type disk: L{objects.Disk}
2233
  @param disk: the disk to check
2234
  @rtype: boolean
2235
  @return: boolean indicating whether a LD_LV dev_type was found or not
2236

2237
  """
2238
  if disk.children:
2239
    for chdisk in disk.children:
2240
      if _RecursiveCheckIfLVMBased(chdisk):
2241
        return True
2242
  return disk.dev_type == constants.LD_LV
2243

    
2244

    
2245
class LUSetClusterParams(LogicalUnit):
2246
  """Change the parameters of the cluster.
2247

2248
  """
2249
  HPATH = "cluster-modify"
2250
  HTYPE = constants.HTYPE_CLUSTER
2251
  _OP_REQP = []
2252
  REQ_BGL = False
2253

    
2254
  def CheckArguments(self):
2255
    """Check parameters
2256

2257
    """
2258
    if not hasattr(self.op, "candidate_pool_size"):
2259
      self.op.candidate_pool_size = None
2260
    if self.op.candidate_pool_size is not None:
2261
      try:
2262
        self.op.candidate_pool_size = int(self.op.candidate_pool_size)
2263
      except (ValueError, TypeError), err:
2264
        raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
2265
                                   str(err), errors.ECODE_INVAL)
2266
      if self.op.candidate_pool_size < 1:
2267
        raise errors.OpPrereqError("At least one master candidate needed",
2268
                                   errors.ECODE_INVAL)
2269

    
2270
    _CheckBooleanOpField(self.op, "maintain_node_health")
2271

    
2272
    if self.op.uid_pool:
2273
      uidpool.CheckUidPool(self.op.uid_pool)
2274

    
2275
    if self.op.add_uids:
2276
      uidpool.CheckUidPool(self.op.add_uids)
2277

    
2278
    if self.op.remove_uids:
2279
      uidpool.CheckUidPool(self.op.remove_uids)
2280

    
2281
  def ExpandNames(self):
2282
    # FIXME: in the future maybe other cluster params won't require checking on
2283
    # all nodes to be modified.
2284
    self.needed_locks = {
2285
      locking.LEVEL_NODE: locking.ALL_SET,
2286
    }
2287
    self.share_locks[locking.LEVEL_NODE] = 1
2288

    
2289
  def BuildHooksEnv(self):
2290
    """Build hooks env.
2291

2292
    """
2293
    env = {
2294
      "OP_TARGET": self.cfg.GetClusterName(),
2295
      "NEW_VG_NAME": self.op.vg_name,
2296
      }
2297
    mn = self.cfg.GetMasterNode()
2298
    return env, [mn], [mn]
2299

    
2300
  def CheckPrereq(self):
2301
    """Check prerequisites.
2302

2303
    This checks whether the given params don't conflict and
2304
    if the given volume group is valid.
2305

2306
    """
2307
    if self.op.vg_name is not None and not self.op.vg_name:
2308
      instances = self.cfg.GetAllInstancesInfo().values()
2309
      for inst in instances:
2310
        for disk in inst.disks:
2311
          if _RecursiveCheckIfLVMBased(disk):
2312
            raise errors.OpPrereqError("Cannot disable lvm storage while"
2313
                                       " lvm-based instances exist",
2314
                                       errors.ECODE_INVAL)
2315

    
2316
    node_list = self.acquired_locks[locking.LEVEL_NODE]
2317

    
2318
    # if vg_name not None, checks given volume group on all nodes
2319
    if self.op.vg_name:
2320
      vglist = self.rpc.call_vg_list(node_list)
2321
      for node in node_list:
2322
        msg = vglist[node].fail_msg
2323
        if msg:
2324
          # ignoring down node
2325
          self.LogWarning("Error while gathering data on node %s"
2326
                          " (ignoring node): %s", node, msg)
2327
          continue
2328
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
2329
                                              self.op.vg_name,
2330
                                              constants.MIN_VG_SIZE)
2331
        if vgstatus:
2332
          raise errors.OpPrereqError("Error on node '%s': %s" %
2333
                                     (node, vgstatus), errors.ECODE_ENVIRON)
2334

    
2335
    self.cluster = cluster = self.cfg.GetClusterInfo()
2336
    # validate params changes
2337
    if self.op.beparams:
2338
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
2339
      self.new_beparams = objects.FillDict(
2340
        cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
2341

    
2342
    if self.op.nicparams:
2343
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
2344
      self.new_nicparams = objects.FillDict(
2345
        cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
2346
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
2347
      nic_errors = []
2348

    
2349
      # check all instances for consistency
2350
      for instance in self.cfg.GetAllInstancesInfo().values():
2351
        for nic_idx, nic in enumerate(instance.nics):
2352
          params_copy = copy.deepcopy(nic.nicparams)
2353
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
2354

    
2355
          # check parameter syntax
2356
          try:
2357
            objects.NIC.CheckParameterSyntax(params_filled)
2358
          except errors.ConfigurationError, err:
2359
            nic_errors.append("Instance %s, nic/%d: %s" %
2360
                              (instance.name, nic_idx, err))
2361

    
2362
          # if we're moving instances to routed, check that they have an ip
2363
          target_mode = params_filled[constants.NIC_MODE]
2364
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
2365
            nic_errors.append("Instance %s, nic/%d: routed nick with no ip" %
2366
                              (instance.name, nic_idx))
2367
      if nic_errors:
2368
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
2369
                                   "\n".join(nic_errors))
2370

    
2371
    # hypervisor list/parameters
2372
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
2373
    if self.op.hvparams:
2374
      if not isinstance(self.op.hvparams, dict):
2375
        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input",
2376
                                   errors.ECODE_INVAL)
2377
      for hv_name, hv_dict in self.op.hvparams.items():
2378
        if hv_name not in self.new_hvparams:
2379
          self.new_hvparams[hv_name] = hv_dict
2380
        else:
2381
          self.new_hvparams[hv_name].update(hv_dict)
2382

    
2383
    # os hypervisor parameters
2384
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
2385
    if self.op.os_hvp:
2386
      if not isinstance(self.op.os_hvp, dict):
2387
        raise errors.OpPrereqError("Invalid 'os_hvp' parameter on input",
2388
                                   errors.ECODE_INVAL)
2389
      for os_name, hvs in self.op.os_hvp.items():
2390
        if not isinstance(hvs, dict):
2391
          raise errors.OpPrereqError(("Invalid 'os_hvp' parameter on"
2392
                                      " input"), errors.ECODE_INVAL)
2393
        if os_name not in self.new_os_hvp:
2394
          self.new_os_hvp[os_name] = hvs
2395
        else:
2396
          for hv_name, hv_dict in hvs.items():
2397
            if hv_name not in self.new_os_hvp[os_name]:
2398
              self.new_os_hvp[os_name][hv_name] = hv_dict
2399
            else:
2400
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
2401

    
2402
    # changes to the hypervisor list
2403
    if self.op.enabled_hypervisors is not None:
2404
      self.hv_list = self.op.enabled_hypervisors
2405
      if not self.hv_list:
2406
        raise errors.OpPrereqError("Enabled hypervisors list must contain at"
2407
                                   " least one member",
2408
                                   errors.ECODE_INVAL)
2409
      invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES
2410
      if invalid_hvs:
2411
        raise errors.OpPrereqError("Enabled hypervisors contains invalid"
2412
                                   " entries: %s" %
2413
                                   utils.CommaJoin(invalid_hvs),
2414
                                   errors.ECODE_INVAL)
2415
      for hv in self.hv_list:
2416
        # if the hypervisor doesn't already exist in the cluster
2417
        # hvparams, we initialize it to empty, and then (in both
2418
        # cases) we make sure to fill the defaults, as we might not
2419
        # have a complete defaults list if the hypervisor wasn't
2420
        # enabled before
2421
        if hv not in new_hvp:
2422
          new_hvp[hv] = {}
2423
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
2424
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
2425
    else:
2426
      self.hv_list = cluster.enabled_hypervisors
2427

    
2428
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
2429
      # either the enabled list has changed, or the parameters have, validate
2430
      for hv_name, hv_params in self.new_hvparams.items():
2431
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
2432
            (self.op.enabled_hypervisors and
2433
             hv_name in self.op.enabled_hypervisors)):
2434
          # either this is a new hypervisor, or its parameters have changed
2435
          hv_class = hypervisor.GetHypervisor(hv_name)
2436
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
2437
          hv_class.CheckParameterSyntax(hv_params)
2438
          _CheckHVParams(self, node_list, hv_name, hv_params)
2439

    
2440
    if self.op.os_hvp:
2441
      # no need to check any newly-enabled hypervisors, since the
2442
      # defaults have already been checked in the above code-block
2443
      for os_name, os_hvp in self.new_os_hvp.items():
2444
        for hv_name, hv_params in os_hvp.items():
2445
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
2446
          # we need to fill in the new os_hvp on top of the actual hv_p
2447
          cluster_defaults = self.new_hvparams.get(hv_name, {})
2448
          new_osp = objects.FillDict(cluster_defaults, hv_params)
2449
          hv_class = hypervisor.GetHypervisor(hv_name)
2450
          hv_class.CheckParameterSyntax(new_osp)
2451
          _CheckHVParams(self, node_list, hv_name, new_osp)
2452

    
2453

    
2454
  def Exec(self, feedback_fn):
2455
    """Change the parameters of the cluster.
2456

2457
    """
2458
    if self.op.vg_name is not None:
2459
      new_volume = self.op.vg_name
2460
      if not new_volume:
2461
        new_volume = None
2462
      if new_volume != self.cfg.GetVGName():
2463
        self.cfg.SetVGName(new_volume)
2464
      else:
2465
        feedback_fn("Cluster LVM configuration already in desired"
2466
                    " state, not changing")
2467
    if self.op.hvparams:
2468
      self.cluster.hvparams = self.new_hvparams
2469
    if self.op.os_hvp:
2470
      self.cluster.os_hvp = self.new_os_hvp
2471
    if self.op.enabled_hypervisors is not None:
2472
      self.cluster.hvparams = self.new_hvparams
2473
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
2474
    if self.op.beparams:
2475
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
2476
    if self.op.nicparams:
2477
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
2478

    
2479
    if self.op.candidate_pool_size is not None:
2480
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
2481
      # we need to update the pool size here, otherwise the save will fail
2482
      _AdjustCandidatePool(self, [])
2483

    
2484
    if self.op.maintain_node_health is not None:
2485
      self.cluster.maintain_node_health = self.op.maintain_node_health
2486

    
2487
    if self.op.add_uids is not None:
2488
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
2489

    
2490
    if self.op.remove_uids is not None:
2491
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
2492

    
2493
    if self.op.uid_pool is not None:
2494
      self.cluster.uid_pool = self.op.uid_pool
2495

    
2496
    self.cfg.Update(self.cluster, feedback_fn)
2497

    
2498

    
2499
def _RedistributeAncillaryFiles(lu, additional_nodes=None):
2500
  """Distribute additional files which are part of the cluster configuration.
2501

2502
  ConfigWriter takes care of distributing the config and ssconf files, but
2503
  there are more files which should be distributed to all nodes. This function
2504
  makes sure those are copied.
2505

2506
  @param lu: calling logical unit
2507
  @param additional_nodes: list of nodes not in the config to distribute to
2508

2509
  """
2510
  # 1. Gather target nodes
2511
  myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
2512
  dist_nodes = lu.cfg.GetOnlineNodeList()
2513
  if additional_nodes is not None:
2514
    dist_nodes.extend(additional_nodes)
2515
  if myself.name in dist_nodes:
2516
    dist_nodes.remove(myself.name)
2517

    
2518
  # 2. Gather files to distribute
2519
  dist_files = set([constants.ETC_HOSTS,
2520
                    constants.SSH_KNOWN_HOSTS_FILE,
2521
                    constants.RAPI_CERT_FILE,
2522
                    constants.RAPI_USERS_FILE,
2523
                    constants.CONFD_HMAC_KEY,
2524
                   ])
2525

    
2526
  enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
2527
  for hv_name in enabled_hypervisors:
2528
    hv_class = hypervisor.GetHypervisor(hv_name)
2529
    dist_files.update(hv_class.GetAncillaryFiles())
2530

    
2531
  # 3. Perform the files upload
2532
  for fname in dist_files:
2533
    if os.path.exists(fname):
2534
      result = lu.rpc.call_upload_file(dist_nodes, fname)
2535
      for to_node, to_result in result.items():
2536
        msg = to_result.fail_msg
2537
        if msg:
2538
          msg = ("Copy of file %s to node %s failed: %s" %
2539
                 (fname, to_node, msg))
2540
          lu.proc.LogWarning(msg)
2541

    
2542

    
2543
class LURedistributeConfig(NoHooksLU):
2544
  """Force the redistribution of cluster configuration.
2545

2546
  This is a very simple LU.
2547

2548
  """
2549
  _OP_REQP = []
2550
  REQ_BGL = False
2551

    
2552
  def ExpandNames(self):
2553
    self.needed_locks = {
2554
      locking.LEVEL_NODE: locking.ALL_SET,
2555
    }
2556
    self.share_locks[locking.LEVEL_NODE] = 1
2557

    
2558
  def CheckPrereq(self):
2559
    """Check prerequisites.
2560

2561
    """
2562

    
2563
  def Exec(self, feedback_fn):
2564
    """Redistribute the configuration.
2565

2566
    """
2567
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
2568
    _RedistributeAncillaryFiles(self)
2569

    
2570

    
2571
def _WaitForSync(lu, instance, oneshot=False):
2572
  """Sleep and poll for an instance's disk to sync.
2573

2574
  """
2575
  if not instance.disks:
2576
    return True
2577

    
2578
  if not oneshot:
2579
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
2580

    
2581
  node = instance.primary_node
2582

    
2583
  for dev in instance.disks:
2584
    lu.cfg.SetDiskID(dev, node)
2585

    
2586
  # TODO: Convert to utils.Retry
2587

    
2588
  retries = 0
2589
  degr_retries = 10 # in seconds, as we sleep 1 second each time
2590
  while True:
2591
    max_time = 0
2592
    done = True
2593
    cumul_degraded = False
2594
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
2595
    msg = rstats.fail_msg
2596
    if msg:
2597
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
2598
      retries += 1
2599
      if retries >= 10:
2600
        raise errors.RemoteError("Can't contact node %s for mirror data,"
2601
                                 " aborting." % node)
2602
      time.sleep(6)
2603
      continue
2604
    rstats = rstats.payload
2605
    retries = 0
2606
    for i, mstat in enumerate(rstats):
2607
      if mstat is None:
2608
        lu.LogWarning("Can't compute data for node %s/%s",
2609
                           node, instance.disks[i].iv_name)
2610
        continue
2611

    
2612
      cumul_degraded = (cumul_degraded or
2613
                        (mstat.is_degraded and mstat.sync_percent is None))
2614
      if mstat.sync_percent is not None:
2615
        done = False
2616
        if mstat.estimated_time is not None:
2617
          rem_time = "%d estimated seconds remaining" % mstat.estimated_time
2618
          max_time = mstat.estimated_time
2619
        else:
2620
          rem_time = "no time estimate"
2621
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
2622
                        (instance.disks[i].iv_name, mstat.sync_percent,
2623
                         rem_time))
2624

    
2625
    # if we're done but degraded, let's do a few small retries, to
2626
    # make sure we see a stable and not transient situation; therefore
2627
    # we force restart of the loop
2628
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
2629
      logging.info("Degraded disks found, %d retries left", degr_retries)
2630
      degr_retries -= 1
2631
      time.sleep(1)
2632
      continue
2633

    
2634
    if done or oneshot:
2635
      break
2636

    
2637
    time.sleep(min(60, max_time))
2638

    
2639
  if done:
2640
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
2641
  return not cumul_degraded
2642

    
2643

    
2644
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
2645
  """Check that mirrors are not degraded.
2646

2647
  The ldisk parameter, if True, will change the test from the
2648
  is_degraded attribute (which represents overall non-ok status for
2649
  the device(s)) to the ldisk (representing the local storage status).
2650

2651
  """
2652
  lu.cfg.SetDiskID(dev, node)
2653

    
2654
  result = True
2655

    
2656
  if on_primary or dev.AssembleOnSecondary():
2657
    rstats = lu.rpc.call_blockdev_find(node, dev)
2658
    msg = rstats.fail_msg
2659
    if msg:
2660
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
2661
      result = False
2662
    elif not rstats.payload:
2663
      lu.LogWarning("Can't find disk on node %s", node)
2664
      result = False
2665
    else:
2666
      if ldisk:
2667
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
2668
      else:
2669
        result = result and not rstats.payload.is_degraded
2670

    
2671
  if dev.children:
2672
    for child in dev.children:
2673
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
2674

    
2675
  return result
2676

    
2677

    
2678
class LUDiagnoseOS(NoHooksLU):
2679
  """Logical unit for OS diagnose/query.
2680

2681
  """
2682
  _OP_REQP = ["output_fields", "names"]
2683
  REQ_BGL = False
2684
  _FIELDS_STATIC = utils.FieldSet()
2685
  _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status", "variants")
2686
  # Fields that need calculation of global os validity
2687
  _FIELDS_NEEDVALID = frozenset(["valid", "variants"])
2688

    
2689
  def ExpandNames(self):
2690
    if self.op.names:
2691
      raise errors.OpPrereqError("Selective OS query not supported",
2692
                                 errors.ECODE_INVAL)
2693

    
2694
    _CheckOutputFields(static=self._FIELDS_STATIC,
2695
                       dynamic=self._FIELDS_DYNAMIC,
2696
                       selected=self.op.output_fields)
2697

    
2698
    # Lock all nodes, in shared mode
2699
    # Temporary removal of locks, should be reverted later
2700
    # TODO: reintroduce locks when they are lighter-weight
2701
    self.needed_locks = {}
2702
    #self.share_locks[locking.LEVEL_NODE] = 1
2703
    #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2704

    
2705
  def CheckPrereq(self):
2706
    """Check prerequisites.
2707

2708
    """
2709

    
2710
  @staticmethod
2711
  def _DiagnoseByOS(rlist):
2712
    """Remaps a per-node return list into an a per-os per-node dictionary
2713

2714
    @param rlist: a map with node names as keys and OS objects as values
2715

2716
    @rtype: dict
2717
    @return: a dictionary with osnames as keys and as value another map, with
2718
        nodes as keys and tuples of (path, status, diagnose) as values, eg::
2719

2720
          {"debian-etch": {"node1": [(/usr/lib/..., True, ""),
2721
                                     (/srv/..., False, "invalid api")],
2722
                           "node2": [(/srv/..., True, "")]}
2723
          }
2724

2725
    """
2726
    all_os = {}
2727
    # we build here the list of nodes that didn't fail the RPC (at RPC
2728
    # level), so that nodes with a non-responding node daemon don't
2729
    # make all OSes invalid
2730
    good_nodes = [node_name for node_name in rlist
2731
                  if not rlist[node_name].fail_msg]
2732
    for node_name, nr in rlist.items():
2733
      if nr.fail_msg or not nr.payload:
2734
        continue
2735
      for name, path, status, diagnose, variants in nr.payload:
2736
        if name not in all_os:
2737
          # build a list of nodes for this os containing empty lists
2738
          # for each node in node_list
2739
          all_os[name] = {}
2740
          for nname in good_nodes:
2741
            all_os[name][nname] = []
2742
        all_os[name][node_name].append((path, status, diagnose, variants))
2743
    return all_os
2744

    
2745
  def Exec(self, feedback_fn):
2746
    """Compute the list of OSes.
2747

2748
    """
2749
    valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
2750
    node_data = self.rpc.call_os_diagnose(valid_nodes)
2751
    pol = self._DiagnoseByOS(node_data)
2752
    output = []
2753
    calc_valid = self._FIELDS_NEEDVALID.intersection(self.op.output_fields)
2754
    calc_variants = "variants" in self.op.output_fields
2755

    
2756
    for os_name, os_data in pol.items():
2757
      row = []
2758
      if calc_valid:
2759
        valid = True
2760
        variants = None
2761
        for osl in os_data.values():
2762
          valid = valid and osl and osl[0][1]
2763
          if not valid:
2764
            variants = None
2765
            break
2766
          if calc_variants:
2767
            node_variants = osl[0][3]
2768
            if variants is None:
2769
              variants = node_variants
2770
            else:
2771
              variants = [v for v in variants if v in node_variants]
2772

    
2773
      for field in self.op.output_fields:
2774
        if field == "name":
2775
          val = os_name
2776
        elif field == "valid":
2777
          val = valid
2778
        elif field == "node_status":
2779
          # this is just a copy of the dict
2780
          val = {}
2781
          for node_name, nos_list in os_data.items():
2782
            val[node_name] = nos_list
2783
        elif field == "variants":
2784
          val =  variants
2785
        else:
2786
          raise errors.ParameterError(field)
2787
        row.append(val)
2788
      output.append(row)
2789

    
2790
    return output
2791

    
2792

    
2793
class LURemoveNode(LogicalUnit):
2794
  """Logical unit for removing a node.
2795

2796
  """
2797
  HPATH = "node-remove"
2798
  HTYPE = constants.HTYPE_NODE
2799
  _OP_REQP = ["node_name"]
2800

    
2801
  def BuildHooksEnv(self):
2802
    """Build hooks env.
2803

2804
    This doesn't run on the target node in the pre phase as a failed
2805
    node would then be impossible to remove.
2806

2807
    """
2808
    env = {
2809
      "OP_TARGET": self.op.node_name,
2810
      "NODE_NAME": self.op.node_name,
2811
      }
2812
    all_nodes = self.cfg.GetNodeList()
2813
    try:
2814
      all_nodes.remove(self.op.node_name)
2815
    except ValueError:
2816
      logging.warning("Node %s which is about to be removed not found"
2817
                      " in the all nodes list", self.op.node_name)
2818
    return env, all_nodes, all_nodes
2819

    
2820
  def CheckPrereq(self):
2821
    """Check prerequisites.
2822

2823
    This checks:
2824
     - the node exists in the configuration
2825
     - it does not have primary or secondary instances
2826
     - it's not the master
2827

2828
    Any errors are signaled by raising errors.OpPrereqError.
2829

2830
    """
2831
    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
2832
    node = self.cfg.GetNodeInfo(self.op.node_name)
2833
    assert node is not None
2834

    
2835
    instance_list = self.cfg.GetInstanceList()
2836

    
2837
    masternode = self.cfg.GetMasterNode()
2838
    if node.name == masternode:
2839
      raise errors.OpPrereqError("Node is the master node,"
2840
                                 " you need to failover first.",
2841
                                 errors.ECODE_INVAL)
2842

    
2843
    for instance_name in instance_list:
2844
      instance = self.cfg.GetInstanceInfo(instance_name)
2845
      if node.name in instance.all_nodes:
2846
        raise errors.OpPrereqError("Instance %s is still running on the node,"
2847
                                   " please remove first." % instance_name,
2848
                                   errors.ECODE_INVAL)
2849
    self.op.node_name = node.name
2850
    self.node = node
2851

    
2852
  def Exec(self, feedback_fn):
2853
    """Removes the node from the cluster.
2854

2855
    """
2856
    node = self.node
2857
    logging.info("Stopping the node daemon and removing configs from node %s",
2858
                 node.name)
2859

    
2860
    modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
2861

    
2862
    # Promote nodes to master candidate as needed
2863
    _AdjustCandidatePool(self, exceptions=[node.name])
2864
    self.context.RemoveNode(node.name)
2865

    
2866
    # Run post hooks on the node before it's removed
2867
    hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
2868
    try:
2869
      hm.RunPhase(constants.HOOKS_PHASE_POST, [node.name])
2870
    except:
2871
      # pylint: disable-msg=W0702
2872
      self.LogWarning("Errors occurred running hooks on %s" % node.name)
2873

    
2874
    result = self.rpc.call_node_leave_cluster(node.name, modify_ssh_setup)
2875
    msg = result.fail_msg
2876
    if msg:
2877
      self.LogWarning("Errors encountered on the remote node while leaving"
2878
                      " the cluster: %s", msg)
2879

    
2880

    
2881
class LUQueryNodes(NoHooksLU):
2882
  """Logical unit for querying nodes.
2883

2884
  """
2885
  # pylint: disable-msg=W0142
2886
  _OP_REQP = ["output_fields", "names", "use_locking"]
2887
  REQ_BGL = False
2888

    
2889
  _SIMPLE_FIELDS = ["name", "serial_no", "ctime", "mtime", "uuid",
2890
                    "master_candidate", "offline", "drained"]
2891

    
2892
  _FIELDS_DYNAMIC = utils.FieldSet(
2893
    "dtotal", "dfree",
2894
    "mtotal", "mnode", "mfree",
2895
    "bootid",
2896
    "ctotal", "cnodes", "csockets",
2897
    )
2898

    
2899
  _FIELDS_STATIC = utils.FieldSet(*[
2900
    "pinst_cnt", "sinst_cnt",
2901
    "pinst_list", "sinst_list",
2902
    "pip", "sip", "tags",
2903
    "master",
2904
    "role"] + _SIMPLE_FIELDS
2905
    )
2906

    
2907
  def ExpandNames(self):
2908
    _CheckOutputFields(static=self._FIELDS_STATIC,
2909
                       dynamic=self._FIELDS_DYNAMIC,
2910
                       selected=self.op.output_fields)
2911

    
2912
    self.needed_locks = {}
2913
    self.share_locks[locking.LEVEL_NODE] = 1
2914

    
2915
    if self.op.names:
2916
      self.wanted = _GetWantedNodes(self, self.op.names)
2917
    else:
2918
      self.wanted = locking.ALL_SET
2919

    
2920
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2921
    self.do_locking = self.do_node_query and self.op.use_locking
2922
    if self.do_locking:
2923
      # if we don't request only static fields, we need to lock the nodes
2924
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
2925

    
2926
  def CheckPrereq(self):
2927
    """Check prerequisites.
2928

2929
    """
2930
    # The validation of the node list is done in the _GetWantedNodes,
2931
    # if non empty, and if empty, there's no validation to do
2932
    pass
2933

    
2934
  def Exec(self, feedback_fn):
2935
    """Computes the list of nodes and their attributes.
2936

2937
    """
2938
    all_info = self.cfg.GetAllNodesInfo()
2939
    if self.do_locking:
2940
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
2941
    elif self.wanted != locking.ALL_SET:
2942
      nodenames = self.wanted
2943
      missing = set(nodenames).difference(all_info.keys())
2944
      if missing:
2945
        raise errors.OpExecError(
2946
          "Some nodes were removed before retrieving their data: %s" % missing)
2947
    else:
2948
      nodenames = all_info.keys()
2949

    
2950
    nodenames = utils.NiceSort(nodenames)
2951
    nodelist = [all_info[name] for name in nodenames]
2952

    
2953
    # begin data gathering
2954

    
2955
    if self.do_node_query:
2956
      live_data = {}
2957
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2958
                                          self.cfg.GetHypervisorType())
2959
      for name in nodenames:
2960
        nodeinfo = node_data[name]
2961
        if not nodeinfo.fail_msg and nodeinfo.payload:
2962
          nodeinfo = nodeinfo.payload
2963
          fn = utils.TryConvert
2964
          live_data[name] = {
2965
            "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2966
            "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2967
            "mfree": fn(int, nodeinfo.get('memory_free', None)),
2968
            "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2969
            "dfree": fn(int, nodeinfo.get('vg_free', None)),
2970
            "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2971
            "bootid": nodeinfo.get('bootid', None),
2972
            "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2973
            "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2974
            }
2975
        else:
2976
          live_data[name] = {}
2977
    else:
2978
      live_data = dict.fromkeys(nodenames, {})
2979

    
2980
    node_to_primary = dict([(name, set()) for name in nodenames])
2981
    node_to_secondary = dict([(name, set()) for name in nodenames])
2982

    
2983
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
2984
                             "sinst_cnt", "sinst_list"))
2985
    if inst_fields & frozenset(self.op.output_fields):
2986
      inst_data = self.cfg.GetAllInstancesInfo()
2987

    
2988
      for inst in inst_data.values():
2989
        if inst.primary_node in node_to_primary:
2990
          node_to_primary[inst.primary_node].add(inst.name)
2991
        for secnode in inst.secondary_nodes:
2992
          if secnode in node_to_secondary:
2993
            node_to_secondary[secnode].add(inst.name)
2994

    
2995
    master_node = self.cfg.GetMasterNode()
2996

    
2997
    # end data gathering
2998

    
2999
    output = []
3000
    for node in nodelist:
3001
      node_output = []
3002
      for field in self.op.output_fields:
3003
        if field in self._SIMPLE_FIELDS:
3004
          val = getattr(node, field)
3005
        elif field == "pinst_list":
3006
          val = list(node_to_primary[node.name])
3007
        elif field == "sinst_list":
3008
          val = list(node_to_secondary[node.name])
3009
        elif field == "pinst_cnt":
3010
          val = len(node_to_primary[node.name])
3011
        elif field == "sinst_cnt":
3012
          val = len(node_to_secondary[node.name])
3013
        elif field == "pip":
3014
          val = node.primary_ip
3015
        elif field == "sip":
3016
          val = node.secondary_ip
3017
        elif field == "tags":
3018
          val = list(node.GetTags())
3019
        elif field == "master":
3020
          val = node.name == master_node
3021
        elif self._FIELDS_DYNAMIC.Matches(field):
3022
          val = live_data[node.name].get(field, None)
3023
        elif field == "role":
3024
          if node.name == master_node:
3025
            val = "M"
3026
          elif node.master_candidate:
3027
            val = "C"
3028
          elif node.drained:
3029
            val = "D"
3030
          elif node.offline:
3031
            val = "O"
3032
          else:
3033
            val = "R"
3034
        else:
3035
          raise errors.ParameterError(field)
3036
        node_output.append(val)
3037
      output.append(node_output)
3038

    
3039
    return output
3040

    
3041

    
3042
class LUQueryNodeVolumes(NoHooksLU):
3043
  """Logical unit for getting volumes on node(s).
3044

3045
  """
3046
  _OP_REQP = ["nodes", "output_fields"]
3047
  REQ_BGL = False
3048
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
3049
  _FIELDS_STATIC = utils.FieldSet("node")
3050

    
3051
  def ExpandNames(self):
3052
    _CheckOutputFields(static=self._FIELDS_STATIC,
3053
                       dynamic=self._FIELDS_DYNAMIC,
3054
                       selected=self.op.output_fields)
3055

    
3056
    self.needed_locks = {}
3057
    self.share_locks[locking.LEVEL_NODE] = 1
3058
    if not self.op.nodes:
3059
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3060
    else:
3061
      self.needed_locks[locking.LEVEL_NODE] = \
3062
        _GetWantedNodes(self, self.op.nodes)
3063

    
3064
  def CheckPrereq(self):
3065
    """Check prerequisites.
3066

3067
    This checks that the fields required are valid output fields.
3068

3069
    """
3070
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
3071

    
3072
  def Exec(self, feedback_fn):
3073
    """Computes the list of nodes and their attributes.
3074

3075
    """
3076
    nodenames = self.nodes
3077
    volumes = self.rpc.call_node_volumes(nodenames)
3078

    
3079
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
3080
             in self.cfg.GetInstanceList()]
3081

    
3082
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
3083

    
3084
    output = []
3085
    for node in nodenames:
3086
      nresult = volumes[node]
3087
      if nresult.offline:
3088
        continue
3089
      msg = nresult.fail_msg
3090
      if msg:
3091
        self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
3092
        continue
3093

    
3094
      node_vols = nresult.payload[:]
3095
      node_vols.sort(key=lambda vol: vol['dev'])
3096

    
3097
      for vol in node_vols:
3098
        node_output = []
3099
        for field in self.op.output_fields:
3100
          if field == "node":
3101
            val = node
3102
          elif field == "phys":
3103
            val = vol['dev']
3104
          elif field == "vg":
3105
            val = vol['vg']
3106
          elif field == "name":
3107
            val = vol['name']
3108
          elif field == "size":
3109
            val = int(float(vol['size']))
3110
          elif field == "instance":
3111
            for inst in ilist:
3112
              if node not in lv_by_node[inst]:
3113
                continue
3114
              if vol['name'] in lv_by_node[inst][node]:
3115
                val = inst.name
3116
                break
3117
            else:
3118
              val = '-'
3119
          else:
3120
            raise errors.ParameterError(field)
3121
          node_output.append(str(val))
3122

    
3123
        output.append(node_output)
3124

    
3125
    return output
3126

    
3127

    
3128
class LUQueryNodeStorage(NoHooksLU):
3129
  """Logical unit for getting information on storage units on node(s).
3130

3131
  """
3132
  _OP_REQP = ["nodes", "storage_type", "output_fields"]
3133
  REQ_BGL = False
3134
  _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
3135

    
3136
  def CheckArguments(self):
3137
    _CheckStorageType(self.op.storage_type)
3138

    
3139
    _CheckOutputFields(static=self._FIELDS_STATIC,
3140
                       dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
3141
                       selected=self.op.output_fields)
3142

    
3143
  def ExpandNames(self):
3144
    self.needed_locks = {}
3145
    self.share_locks[locking.LEVEL_NODE] = 1
3146

    
3147
    if self.op.nodes:
3148
      self.needed_locks[locking.LEVEL_NODE] = \
3149
        _GetWantedNodes(self, self.op.nodes)
3150
    else:
3151
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3152

    
3153
  def CheckPrereq(self):
3154
    """Check prerequisites.
3155

3156
    This checks that the fields required are valid output fields.
3157

3158
    """
3159
    self.op.name = getattr(self.op, "name", None)
3160

    
3161
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
3162

    
3163
  def Exec(self, feedback_fn):
3164
    """Computes the list of nodes and their attributes.
3165

3166
    """
3167
    # Always get name to sort by
3168
    if constants.SF_NAME in self.op.output_fields:
3169
      fields = self.op.output_fields[:]
3170
    else:
3171
      fields = [constants.SF_NAME] + self.op.output_fields
3172

    
3173
    # Never ask for node or type as it's only known to the LU
3174
    for extra in [constants.SF_NODE, constants.SF_TYPE]:
3175
      while extra in fields:
3176
        fields.remove(extra)
3177

    
3178
    field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
3179
    name_idx = field_idx[constants.SF_NAME]
3180

    
3181
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
3182
    data = self.rpc.call_storage_list(self.nodes,
3183
                                      self.op.storage_type, st_args,
3184
                                      self.op.name, fields)
3185

    
3186
    result = []
3187

    
3188
    for node in utils.NiceSort(self.nodes):
3189
      nresult = data[node]
3190
      if nresult.offline:
3191
        continue
3192

    
3193
      msg = nresult.fail_msg
3194
      if msg:
3195
        self.LogWarning("Can't get storage data from node %s: %s", node, msg)
3196
        continue
3197

    
3198
      rows = dict([(row[name_idx], row) for row in nresult.payload])
3199

    
3200
      for name in utils.NiceSort(rows.keys()):
3201
        row = rows[name]
3202

    
3203
        out = []
3204

    
3205
        for field in self.op.output_fields:
3206
          if field == constants.SF_NODE:
3207
            val = node
3208
          elif field == constants.SF_TYPE:
3209
            val = self.op.storage_type
3210
          elif field in field_idx:
3211
            val = row[field_idx[field]]
3212
          else:
3213
            raise errors.ParameterError(field)
3214

    
3215
          out.append(val)
3216

    
3217
        result.append(out)
3218

    
3219
    return result
3220

    
3221

    
3222
class LUModifyNodeStorage(NoHooksLU):
3223
  """Logical unit for modifying a storage volume on a node.
3224

3225
  """
3226
  _OP_REQP = ["node_name", "storage_type", "name", "changes"]
3227
  REQ_BGL = False
3228

    
3229
  def CheckArguments(self):
3230
    self.opnode_name = _ExpandNodeName(self.cfg, self.op.node_name)
3231

    
3232
    _CheckStorageType(self.op.storage_type)
3233

    
3234
  def ExpandNames(self):
3235
    self.needed_locks = {
3236
      locking.LEVEL_NODE: self.op.node_name,
3237
      }
3238

    
3239
  def CheckPrereq(self):
3240
    """Check prerequisites.
3241

3242
    """
3243
    storage_type = self.op.storage_type
3244

    
3245
    try:
3246
      modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
3247
    except KeyError:
3248
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
3249
                                 " modified" % storage_type,
3250
                                 errors.ECODE_INVAL)
3251

    
3252
    diff = set(self.op.changes.keys()) - modifiable
3253
    if diff:
3254
      raise errors.OpPrereqError("The following fields can not be modified for"
3255
                                 " storage units of type '%s': %r" %
3256
                                 (storage_type, list(diff)),
3257
                                 errors.ECODE_INVAL)
3258

    
3259
  def Exec(self, feedback_fn):
3260
    """Computes the list of nodes and their attributes.
3261

3262
    """
3263
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
3264
    result = self.rpc.call_storage_modify(self.op.node_name,
3265
                                          self.op.storage_type, st_args,
3266
                                          self.op.name, self.op.changes)
3267
    result.Raise("Failed to modify storage unit '%s' on %s" %
3268
                 (self.op.name, self.op.node_name))
3269

    
3270

    
3271
class LUAddNode(LogicalUnit):
3272
  """Logical unit for adding node to the cluster.
3273

3274
  """
3275
  HPATH = "node-add"
3276
  HTYPE = constants.HTYPE_NODE
3277
  _OP_REQP = ["node_name"]
3278

    
3279
  def CheckArguments(self):
3280
    # validate/normalize the node name
3281
    self.op.node_name = utils.HostInfo.NormalizeName(self.op.node_name)
3282

    
3283
  def BuildHooksEnv(self):
3284
    """Build hooks env.
3285

3286
    This will run on all nodes before, and on all nodes + the new node after.
3287

3288
    """
3289
    env = {
3290
      "OP_TARGET": self.op.node_name,
3291
      "NODE_NAME": self.op.node_name,
3292
      "NODE_PIP": self.op.primary_ip,
3293
      "NODE_SIP": self.op.secondary_ip,
3294
      }
3295
    nodes_0 = self.cfg.GetNodeList()
3296
    nodes_1 = nodes_0 + [self.op.node_name, ]
3297
    return env, nodes_0, nodes_1
3298

    
3299
  def CheckPrereq(self):
3300
    """Check prerequisites.
3301

3302
    This checks:
3303
     - the new node is not already in the config
3304
     - it is resolvable
3305
     - its parameters (single/dual homed) matches the cluster
3306

3307
    Any errors are signaled by raising errors.OpPrereqError.
3308

3309
    """
3310
    node_name = self.op.node_name
3311
    cfg = self.cfg
3312

    
3313
    dns_data = utils.GetHostInfo(node_name)
3314

    
3315
    node = dns_data.name
3316
    primary_ip = self.op.primary_ip = dns_data.ip
3317
    secondary_ip = getattr(self.op, "secondary_ip", None)
3318
    if secondary_ip is None:
3319
      secondary_ip = primary_ip
3320
    if not utils.IsValidIP(secondary_ip):
3321
      raise errors.OpPrereqError("Invalid secondary IP given",
3322
                                 errors.ECODE_INVAL)
3323
    self.op.secondary_ip = secondary_ip
3324

    
3325
    node_list = cfg.GetNodeList()
3326
    if not self.op.readd and node in node_list:
3327
      raise errors.OpPrereqError("Node %s is already in the configuration" %
3328
                                 node, errors.ECODE_EXISTS)
3329
    elif self.op.readd and node not in node_list:
3330
      raise errors.OpPrereqError("Node %s is not in the configuration" % node,
3331
                                 errors.ECODE_NOENT)
3332

    
3333
    self.changed_primary_ip = False
3334

    
3335
    for existing_node_name in node_list:
3336
      existing_node = cfg.GetNodeInfo(existing_node_name)
3337

    
3338
      if self.op.readd and node == existing_node_name:
3339
        if existing_node.secondary_ip != secondary_ip:
3340
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
3341
                                     " address configuration as before",
3342
                                     errors.ECODE_INVAL)
3343
        if existing_node.primary_ip != primary_ip:
3344
          self.changed_primary_ip = True
3345

    
3346
        continue
3347

    
3348
      if (existing_node.primary_ip == primary_ip or
3349
          existing_node.secondary_ip == primary_ip or
3350
          existing_node.primary_ip == secondary_ip or
3351
          existing_node.secondary_ip == secondary_ip):
3352
        raise errors.OpPrereqError("New node ip address(es) conflict with"
3353
                                   " existing node %s" % existing_node.name,
3354
                                   errors.ECODE_NOTUNIQUE)
3355

    
3356
    # check that the type of the node (single versus dual homed) is the
3357
    # same as for the master
3358
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
3359
    master_singlehomed = myself.secondary_ip == myself.primary_ip
3360
    newbie_singlehomed = secondary_ip == primary_ip
3361
    if master_singlehomed != newbie_singlehomed:
3362
      if master_singlehomed:
3363
        raise errors.OpPrereqError("The master has no private ip but the"
3364
                                   " new node has one",
3365
                                   errors.ECODE_INVAL)
3366
      else:
3367
        raise errors.OpPrereqError("The master has a private ip but the"
3368
                                   " new node doesn't have one",
3369
                                   errors.ECODE_INVAL)
3370

    
3371
    # checks reachability
3372
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
3373
      raise errors.OpPrereqError("Node not reachable by ping",
3374
                                 errors.ECODE_ENVIRON)
3375

    
3376
    if not newbie_singlehomed:
3377
      # check reachability from my secondary ip to newbie's secondary ip
3378
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
3379
                           source=myself.secondary_ip):
3380
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
3381
                                   " based ping to noded port",
3382
                                   errors.ECODE_ENVIRON)
3383

    
3384
    if self.op.readd:
3385
      exceptions = [node]
3386
    else:
3387
      exceptions = []
3388

    
3389
    self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
3390

    
3391
    if self.op.readd:
3392
      self.new_node = self.cfg.GetNodeInfo(node)
3393
      assert self.new_node is not None, "Can't retrieve locked node %s" % node
3394
    else:
3395
      self.new_node = objects.Node(name=node,
3396
                                   primary_ip=primary_ip,
3397
                                   secondary_ip=secondary_ip,
3398
                                   master_candidate=self.master_candidate,
3399
                                   offline=False, drained=False)
3400

    
3401
  def Exec(self, feedback_fn):
3402
    """Adds the new node to the cluster.
3403

3404
    """
3405
    new_node = self.new_node
3406
    node = new_node.name
3407

    
3408
    # for re-adds, reset the offline/drained/master-candidate flags;
3409
    # we need to reset here, otherwise offline would prevent RPC calls
3410
    # later in the procedure; this also means that if the re-add
3411
    # fails, we are left with a non-offlined, broken node
3412
    if self.op.readd:
3413
      new_node.drained = new_node.offline = False # pylint: disable-msg=W0201
3414
      self.LogInfo("Readding a node, the offline/drained flags were reset")
3415
      # if we demote the node, we do cleanup later in the procedure
3416
      new_node.master_candidate = self.master_candidate
3417
      if self.changed_primary_ip:
3418
        new_node.primary_ip = self.op.primary_ip
3419

    
3420
    # notify the user about any possible mc promotion
3421
    if new_node.master_candidate:
3422
      self.LogInfo("Node will be a master candidate")
3423

    
3424
    # check connectivity
3425
    result = self.rpc.call_version([node])[node]
3426
    result.Raise("Can't get version information from node %s" % node)
3427
    if constants.PROTOCOL_VERSION == result.payload:
3428
      logging.info("Communication to node %s fine, sw version %s match",
3429
                   node, result.payload)
3430
    else:
3431
      raise errors.OpExecError("Version mismatch master version %s,"
3432
                               " node version %s" %
3433
                               (constants.PROTOCOL_VERSION, result.payload))
3434

    
3435
    # setup ssh on node
3436
    if self.cfg.GetClusterInfo().modify_ssh_setup:
3437
      logging.info("Copy ssh key to node %s", node)
3438
      priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
3439
      keyarray = []
3440
      keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
3441
                  constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
3442
                  priv_key, pub_key]
3443

    
3444
      for i in keyfiles:
3445
        keyarray.append(utils.ReadFile(i))
3446

    
3447
      result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
3448
                                      keyarray[2], keyarray[3], keyarray[4],
3449
                                      keyarray[5])
3450
      result.Raise("Cannot transfer ssh keys to the new node")
3451

    
3452
    # Add node to our /etc/hosts, and add key to known_hosts
3453
    if self.cfg.GetClusterInfo().modify_etc_hosts:
3454
      utils.AddHostToEtcHosts(new_node.name)
3455

    
3456
    if new_node.secondary_ip != new_node.primary_ip:
3457
      result = self.rpc.call_node_has_ip_address(new_node.name,
3458
                                                 new_node.secondary_ip)
3459
      result.Raise("Failure checking secondary ip on node %s" % new_node.name,
3460
                   prereq=True, ecode=errors.ECODE_ENVIRON)
3461
      if not result.payload:
3462
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
3463
                                 " you gave (%s). Please fix and re-run this"
3464
                                 " command." % new_node.secondary_ip)
3465

    
3466
    node_verify_list = [self.cfg.GetMasterNode()]
3467
    node_verify_param = {
3468
      constants.NV_NODELIST: [node],
3469
      # TODO: do a node-net-test as well?
3470
    }
3471

    
3472
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
3473
                                       self.cfg.GetClusterName())
3474
    for verifier in node_verify_list:
3475
      result[verifier].Raise("Cannot communicate with node %s" % verifier)
3476
      nl_payload = result[verifier].payload[constants.NV_NODELIST]
3477
      if nl_payload:
3478
        for failed in nl_payload:
3479
          feedback_fn("ssh/hostname verification failed"
3480
                      " (checking from %s): %s" %
3481
                      (verifier, nl_payload[failed]))
3482
        raise errors.OpExecError("ssh/hostname verification failed.")
3483

    
3484
    if self.op.readd:
3485
      _RedistributeAncillaryFiles(self)
3486
      self.context.ReaddNode(new_node)
3487
      # make sure we redistribute the config
3488
      self.cfg.Update(new_node, feedback_fn)
3489
      # and make sure the new node will not have old files around
3490
      if not new_node.master_candidate:
3491
        result = self.rpc.call_node_demote_from_mc(new_node.name)
3492
        msg = result.fail_msg
3493
        if msg:
3494
          self.LogWarning("Node failed to demote itself from master"
3495
                          " candidate status: %s" % msg)
3496
    else:
3497
      _RedistributeAncillaryFiles(self, additional_nodes=[node])
3498
      self.context.AddNode(new_node, self.proc.GetECId())
3499

    
3500

    
3501
class LUSetNodeParams(LogicalUnit):
3502
  """Modifies the parameters of a node.
3503

3504
  """
3505
  HPATH = "node-modify"
3506
  HTYPE = constants.HTYPE_NODE
3507
  _OP_REQP = ["node_name"]
3508
  REQ_BGL = False
3509

    
3510
  def CheckArguments(self):
3511
    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
3512
    _CheckBooleanOpField(self.op, 'master_candidate')
3513
    _CheckBooleanOpField(self.op, 'offline')
3514
    _CheckBooleanOpField(self.op, 'drained')
3515
    _CheckBooleanOpField(self.op, 'auto_promote')
3516
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
3517
    if all_mods.count(None) == 3:
3518
      raise errors.OpPrereqError("Please pass at least one modification",
3519
                                 errors.ECODE_INVAL)
3520
    if all_mods.count(True) > 1:
3521
      raise errors.OpPrereqError("Can't set the node into more than one"
3522
                                 " state at the same time",
3523
                                 errors.ECODE_INVAL)
3524

    
3525
    # Boolean value that tells us whether we're offlining or draining the node
3526
    self.offline_or_drain = (self.op.offline == True or
3527
                             self.op.drained == True)
3528
    self.deoffline_or_drain = (self.op.offline == False or
3529
                               self.op.drained == False)
3530
    self.might_demote = (self.op.master_candidate == False or
3531
                         self.offline_or_drain)
3532

    
3533
    self.lock_all = self.op.auto_promote and self.might_demote
3534

    
3535

    
3536
  def ExpandNames(self):
3537
    if self.lock_all:
3538
      self.needed_locks = {locking.LEVEL_NODE: locking.ALL_SET}
3539
    else:
3540
      self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
3541

    
3542
  def BuildHooksEnv(self):
3543
    """Build hooks env.
3544

3545
    This runs on the master node.
3546

3547
    """
3548
    env = {
3549
      "OP_TARGET": self.op.node_name,
3550
      "MASTER_CANDIDATE": str(self.op.master_candidate),
3551
      "OFFLINE": str(self.op.offline),
3552
      "DRAINED": str(self.op.drained),
3553
      }
3554
    nl = [self.cfg.GetMasterNode(),
3555
          self.op.node_name]
3556
    return env, nl, nl
3557

    
3558
  def CheckPrereq(self):
3559
    """Check prerequisites.
3560

3561
    This only checks the instance list against the existing names.
3562

3563
    """
3564
    node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
3565

    
3566
    if (self.op.master_candidate is not None or
3567
        self.op.drained is not None or
3568
        self.op.offline is not None):
3569
      # we can't change the master's node flags
3570
      if self.op.node_name == self.cfg.GetMasterNode():
3571
        raise errors.OpPrereqError("The master role can be changed"
3572
                                   " only via masterfailover",
3573
                                   errors.ECODE_INVAL)
3574

    
3575

    
3576
    if node.master_candidate and self.might_demote and not self.lock_all:
3577
      assert not self.op.auto_promote, "auto-promote set but lock_all not"
3578
      # check if after removing the current node, we're missing master
3579
      # candidates
3580
      (mc_remaining, mc_should, _) = \
3581
          self.cfg.GetMasterCandidateStats(exceptions=[node.name])
3582
      if mc_remaining < mc_should:
3583
        raise errors.OpPrereqError("Not enough master candidates, please"
3584
                                   " pass auto_promote to allow promotion",
3585
                                   errors.ECODE_INVAL)
3586

    
3587
    if (self.op.master_candidate == True and
3588
        ((node.offline and not self.op.offline == False) or
3589
         (node.drained and not self.op.drained == False))):
3590
      raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
3591
                                 " to master_candidate" % node.name,
3592
                                 errors.ECODE_INVAL)
3593

    
3594
    # If we're being deofflined/drained, we'll MC ourself if needed
3595
    if (self.deoffline_or_drain and not self.offline_or_drain and not
3596
        self.op.master_candidate == True and not node.master_candidate):
3597
      self.op.master_candidate = _DecideSelfPromotion(self)
3598
      if self.op.master_candidate:
3599
        self.LogInfo("Autopromoting node to master candidate")
3600

    
3601
    return
3602

    
3603
  def Exec(self, feedback_fn):
3604
    """Modifies a node.
3605

3606
    """
3607
    node = self.node
3608

    
3609
    result = []
3610
    changed_mc = False
3611

    
3612
    if self.op.offline is not None:
3613
      node.offline = self.op.offline
3614
      result.append(("offline", str(self.op.offline)))
3615
      if self.op.offline == True:
3616
        if node.master_candidate:
3617
          node.master_candidate = False
3618
          changed_mc = True
3619
          result.append(("master_candidate", "auto-demotion due to offline"))
3620
        if node.drained:
3621
          node.drained = False
3622
          result.append(("drained", "clear drained status due to offline"))
3623

    
3624
    if self.op.master_candidate is not None:
3625
      node.master_candidate = self.op.master_candidate
3626
      changed_mc = True
3627
      result.append(("master_candidate", str(self.op.master_candidate)))
3628
      if self.op.master_candidate == False:
3629
        rrc = self.rpc.call_node_demote_from_mc(node.name)
3630
        msg = rrc.fail_msg
3631
        if msg:
3632
          self.LogWarning("Node failed to demote itself: %s" % msg)
3633

    
3634
    if self.op.drained is not None:
3635
      node.drained = self.op.drained
3636
      result.append(("drained", str(self.op.drained)))
3637
      if self.op.drained == True:
3638
        if node.master_candidate:
3639
          node.master_candidate = False
3640
          changed_mc = True
3641
          result.append(("master_candidate", "auto-demotion due to drain"))
3642
          rrc = self.rpc.call_node_demote_from_mc(node.name)
3643
          msg = rrc.fail_msg
3644
          if msg:
3645
            self.LogWarning("Node failed to demote itself: %s" % msg)
3646
        if node.offline:
3647
          node.offline = False
3648
          result.append(("offline", "clear offline status due to drain"))
3649

    
3650
    # we locked all nodes, we adjust the CP before updating this node
3651
    if self.lock_all:
3652
      _AdjustCandidatePool(self, [node.name])
3653

    
3654
    # this will trigger configuration file update, if needed
3655
    self.cfg.Update(node, feedback_fn)
3656

    
3657
    # this will trigger job queue propagation or cleanup
3658
    if changed_mc:
3659
      self.context.ReaddNode(node)
3660

    
3661
    return result
3662

    
3663

    
3664
class LUPowercycleNode(NoHooksLU):
3665
  """Powercycles a node.
3666

3667
  """
3668
  _OP_REQP = ["node_name", "force"]
3669
  REQ_BGL = False
3670

    
3671
  def CheckArguments(self):
3672
    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
3673
    if self.op.node_name == self.cfg.GetMasterNode() and not self.op.force:
3674
      raise errors.OpPrereqError("The node is the master and the force"
3675
                                 " parameter was not set",
3676
                                 errors.ECODE_INVAL)
3677

    
3678
  def ExpandNames(self):
3679
    """Locking for PowercycleNode.
3680

3681
    This is a last-resort option and shouldn't block on other
3682
    jobs. Therefore, we grab no locks.
3683

3684
    """
3685
    self.needed_locks = {}
3686

    
3687
  def CheckPrereq(self):
3688
    """Check prerequisites.
3689

3690
    This LU has no prereqs.
3691

3692
    """
3693
    pass
3694

    
3695
  def Exec(self, feedback_fn):
3696
    """Reboots a node.
3697

3698
    """
3699
    result = self.rpc.call_node_powercycle(self.op.node_name,
3700
                                           self.cfg.GetHypervisorType())
3701
    result.Raise("Failed to schedule the reboot")
3702
    return result.payload
3703

    
3704

    
3705
class LUQueryClusterInfo(NoHooksLU):
3706
  """Query cluster configuration.
3707

3708
  """
3709
  _OP_REQP = []
3710
  REQ_BGL = False
3711

    
3712
  def ExpandNames(self):
3713
    self.needed_locks = {}
3714

    
3715
  def CheckPrereq(self):
3716
    """No prerequsites needed for this LU.
3717

3718
    """
3719
    pass
3720

    
3721
  def Exec(self, feedback_fn):
3722
    """Return cluster config.
3723

3724
    """
3725
    cluster = self.cfg.GetClusterInfo()
3726
    os_hvp = {}
3727

    
3728
    # Filter just for enabled hypervisors
3729
    for os_name, hv_dict in cluster.os_hvp.items():
3730
      os_hvp[os_name] = {}
3731
      for hv_name, hv_params in hv_dict.items():
3732
        if hv_name in cluster.enabled_hypervisors:
3733
          os_hvp[os_name][hv_name] = hv_params
3734

    
3735
    result = {
3736
      "software_version": constants.RELEASE_VERSION,
3737
      "protocol_version": constants.PROTOCOL_VERSION,
3738
      "config_version": constants.CONFIG_VERSION,
3739
      "os_api_version": max(constants.OS_API_VERSIONS),
3740
      "export_version": constants.EXPORT_VERSION,
3741
      "architecture": (platform.architecture()[0], platform.machine()),
3742
      "name": cluster.cluster_name,
3743
      "master": cluster.master_node,
3744
      "default_hypervisor": cluster.enabled_hypervisors[0],
3745
      "enabled_hypervisors": cluster.enabled_hypervisors,
3746
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
3747
                        for hypervisor_name in cluster.enabled_hypervisors]),
3748
      "os_hvp": os_hvp,
3749
      "beparams": cluster.beparams,
3750
      "nicparams": cluster.nicparams,
3751
      "candidate_pool_size": cluster.candidate_pool_size,
3752
      "master_netdev": cluster.master_netdev,
3753
      "volume_group_name": cluster.volume_group_name,
3754
      "file_storage_dir": cluster.file_storage_dir,
3755
      "maintain_node_health": cluster.maintain_node_health,
3756
      "ctime": cluster.ctime,
3757
      "mtime": cluster.mtime,
3758
      "uuid": cluster.uuid,
3759
      "tags": list(cluster.GetTags()),
3760
      "uid_pool": cluster.uid_pool,
3761
      }
3762

    
3763
    return result
3764

    
3765

    
3766
class LUQueryConfigValues(NoHooksLU):
3767
  """Return configuration values.
3768

3769
  """
3770
  _OP_REQP = []
3771
  REQ_BGL = False
3772
  _FIELDS_DYNAMIC = utils.FieldSet()
3773
  _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag",
3774
                                  "watcher_pause")
3775

    
3776
  def ExpandNames(self):
3777
    self.needed_locks = {}
3778

    
3779
    _CheckOutputFields(static=self._FIELDS_STATIC,
3780
                       dynamic=self._FIELDS_DYNAMIC,
3781
                       selected=self.op.output_fields)
3782

    
3783
  def CheckPrereq(self):
3784
    """No prerequisites.
3785

3786
    """
3787
    pass
3788

    
3789
  def Exec(self, feedback_fn):
3790
    """Dump a representation of the cluster config to the standard output.
3791

3792
    """
3793
    values = []
3794
    for field in self.op.output_fields:
3795
      if field == "cluster_name":
3796
        entry = self.cfg.GetClusterName()
3797
      elif field == "master_node":
3798
        entry = self.cfg.GetMasterNode()
3799
      elif field == "drain_flag":
3800
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
3801
      elif field == "watcher_pause":
3802
        entry = utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE)
3803
      else:
3804
        raise errors.ParameterError(field)
3805
      values.append(entry)
3806
    return values
3807

    
3808

    
3809
class LUActivateInstanceDisks(NoHooksLU):
3810
  """Bring up an instance's disks.
3811

3812
  """
3813
  _OP_REQP = ["instance_name"]
3814
  REQ_BGL = False
3815

    
3816
  def ExpandNames(self):
3817
    self._ExpandAndLockInstance()
3818
    self.needed_locks[locking.LEVEL_NODE] = []
3819
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3820

    
3821
  def DeclareLocks(self, level):
3822
    if level == locking.LEVEL_NODE:
3823
      self._LockInstancesNodes()
3824

    
3825
  def CheckPrereq(self):
3826
    """Check prerequisites.
3827

3828
    This checks that the instance is in the cluster.
3829

3830
    """
3831
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3832
    assert self.instance is not None, \
3833
      "Cannot retrieve locked instance %s" % self.op.instance_name
3834
    _CheckNodeOnline(self, self.instance.primary_node)
3835
    if not hasattr(self.op, "ignore_size"):
3836
      self.op.ignore_size = False
3837

    
3838
  def Exec(self, feedback_fn):
3839
    """Activate the disks.
3840

3841
    """
3842
    disks_ok, disks_info = \
3843
              _AssembleInstanceDisks(self, self.instance,
3844
                                     ignore_size=self.op.ignore_size)
3845
    if not disks_ok:
3846
      raise errors.OpExecError("Cannot activate block devices")
3847

    
3848
    return disks_info
3849

    
3850

    
3851
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False,
3852
                           ignore_size=False):
3853
  """Prepare the block devices for an instance.
3854

3855
  This sets up the block devices on all nodes.
3856

3857
  @type lu: L{LogicalUnit}
3858
  @param lu: the logical unit on whose behalf we execute
3859
  @type instance: L{objects.Instance}
3860
  @param instance: the instance for whose disks we assemble
3861
  @type ignore_secondaries: boolean
3862
  @param ignore_secondaries: if true, errors on secondary nodes
3863
      won't result in an error return from the function
3864
  @type ignore_size: boolean
3865
  @param ignore_size: if true, the current known size of the disk
3866
      will not be used during the disk activation, useful for cases
3867
      when the size is wrong
3868
  @return: False if the operation failed, otherwise a list of
3869
      (host, instance_visible_name, node_visible_name)
3870
      with the mapping from node devices to instance devices
3871

3872
  """
3873
  device_info = []
3874
  disks_ok = True
3875
  iname = instance.name
3876
  # With the two passes mechanism we try to reduce the window of
3877
  # opportunity for the race condition of switching DRBD to primary
3878
  # before handshaking occured, but we do not eliminate it
3879

    
3880
  # The proper fix would be to wait (with some limits) until the
3881
  # connection has been made and drbd transitions from WFConnection
3882
  # into any other network-connected state (Connected, SyncTarget,
3883
  # SyncSource, etc.)
3884

    
3885
  # 1st pass, assemble on all nodes in secondary mode
3886
  for inst_disk in instance.disks:
3887
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3888
      if ignore_size:
3889
        node_disk = node_disk.Copy()
3890
        node_disk.UnsetSize()
3891
      lu.cfg.SetDiskID(node_disk, node)
3892
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
3893
      msg = result.fail_msg
3894
      if msg:
3895
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
3896
                           " (is_primary=False, pass=1): %s",
3897
                           inst_disk.iv_name, node, msg)
3898
        if not ignore_secondaries:
3899
          disks_ok = False
3900

    
3901
  # FIXME: race condition on drbd migration to primary
3902

    
3903
  # 2nd pass, do only the primary node
3904
  for inst_disk in instance.disks:
3905
    dev_path = None
3906

    
3907
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3908
      if node != instance.primary_node:
3909
        continue
3910
      if ignore_size:
3911
        node_disk = node_disk.Copy()
3912
        node_disk.UnsetSize()
3913
      lu.cfg.SetDiskID(node_disk, node)
3914
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
3915
      msg = result.fail_msg
3916
      if msg:
3917
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
3918
                           " (is_primary=True, pass=2): %s",
3919
                           inst_disk.iv_name, node, msg)
3920
        disks_ok = False
3921
      else:
3922
        dev_path = result.payload
3923

    
3924
    device_info.append((instance.primary_node, inst_disk.iv_name, dev_path))
3925

    
3926
  # leave the disks configured for the primary node
3927
  # this is a workaround that would be fixed better by
3928
  # improving the logical/physical id handling
3929
  for disk in instance.disks:
3930
    lu.cfg.SetDiskID(disk, instance.primary_node)
3931

    
3932
  return disks_ok, device_info
3933

    
3934

    
3935
def _StartInstanceDisks(lu, instance, force):
3936
  """Start the disks of an instance.
3937

3938
  """
3939
  disks_ok, _ = _AssembleInstanceDisks(lu, instance,
3940
                                           ignore_secondaries=force)
3941
  if not disks_ok:
3942
    _ShutdownInstanceDisks(lu, instance)
3943
    if force is not None and not force:
3944
      lu.proc.LogWarning("", hint="If the message above refers to a"
3945
                         " secondary node,"
3946
                         " you can retry the operation using '--force'.")
3947
    raise errors.OpExecError("Disk consistency error")
3948

    
3949

    
3950
class LUDeactivateInstanceDisks(NoHooksLU):
3951
  """Shutdown an instance's disks.
3952

3953
  """
3954
  _OP_REQP = ["instance_name"]
3955
  REQ_BGL = False
3956

    
3957
  def ExpandNames(self):
3958
    self._ExpandAndLockInstance()
3959
    self.needed_locks[locking.LEVEL_NODE] = []
3960
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3961

    
3962
  def DeclareLocks(self, level):
3963
    if level == locking.LEVEL_NODE:
3964
      self._LockInstancesNodes()
3965

    
3966
  def CheckPrereq(self):
3967
    """Check prerequisites.
3968

3969
    This checks that the instance is in the cluster.
3970

3971
    """
3972
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3973
    assert self.instance is not None, \
3974
      "Cannot retrieve locked instance %s" % self.op.instance_name
3975

    
3976
  def Exec(self, feedback_fn):
3977
    """Deactivate the disks
3978

3979
    """
3980
    instance = self.instance
3981
    _SafeShutdownInstanceDisks(self, instance)
3982

    
3983

    
3984
def _SafeShutdownInstanceDisks(lu, instance):
3985
  """Shutdown block devices of an instance.
3986

3987
  This function checks if an instance is running, before calling
3988
  _ShutdownInstanceDisks.
3989

3990
  """
3991
  _CheckInstanceDown(lu, instance, "cannot shutdown disks")
3992
  _ShutdownInstanceDisks(lu, instance)
3993

    
3994

    
3995
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
3996
  """Shutdown block devices of an instance.
3997

3998
  This does the shutdown on all nodes of the instance.
3999

4000
  If the ignore_primary is false, errors on the primary node are
4001
  ignored.
4002

4003
  """
4004
  all_result = True
4005
  for disk in instance.disks:
4006
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
4007
      lu.cfg.SetDiskID(top_disk, node)
4008
      result = lu.rpc.call_blockdev_shutdown(node, top_disk)
4009
      msg = result.fail_msg
4010
      if msg:
4011
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
4012
                      disk.iv_name, node, msg)
4013
        if not ignore_primary or node != instance.primary_node:
4014
          all_result = False
4015
  return all_result
4016

    
4017

    
4018
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
4019
  """Checks if a node has enough free memory.
4020

4021