Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ d1b83918

History | View | Annotate | Download (309.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0201
25

    
26
# W0201 since most LU attributes are defined in CheckPrereq or similar
27
# functions
28

    
29
import os
30
import os.path
31
import time
32
import re
33
import platform
34
import logging
35
import copy
36

    
37
from ganeti import ssh
38
from ganeti import utils
39
from ganeti import errors
40
from ganeti import hypervisor
41
from ganeti import locking
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import serializer
45
from ganeti import ssconf
46

    
47

    
48
class LogicalUnit(object):
49
  """Logical Unit base class.
50

51
  Subclasses must follow these rules:
52
    - implement ExpandNames
53
    - implement CheckPrereq (except when tasklets are used)
54
    - implement Exec (except when tasklets are used)
55
    - implement BuildHooksEnv
56
    - redefine HPATH and HTYPE
57
    - optionally redefine their run requirements:
58
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
59

60
  Note that all commands require root permissions.
61

62
  @ivar dry_run_result: the value (if any) that will be returned to the caller
63
      in dry-run mode (signalled by opcode dry_run parameter)
64

65
  """
66
  HPATH = None
67
  HTYPE = None
68
  _OP_REQP = []
69
  REQ_BGL = True
70

    
71
  def __init__(self, processor, op, context, rpc):
72
    """Constructor for LogicalUnit.
73

74
    This needs to be overridden in derived classes in order to check op
75
    validity.
76

77
    """
78
    self.proc = processor
79
    self.op = op
80
    self.cfg = context.cfg
81
    self.context = context
82
    self.rpc = rpc
83
    # Dicts used to declare locking needs to mcpu
84
    self.needed_locks = None
85
    self.acquired_locks = {}
86
    self.share_locks = dict.fromkeys(locking.LEVELS, 0)
87
    self.add_locks = {}
88
    self.remove_locks = {}
89
    # Used to force good behavior when calling helper functions
90
    self.recalculate_locks = {}
91
    self.__ssh = None
92
    # logging
93
    self.LogWarning = processor.LogWarning # pylint: disable-msg=C0103
94
    self.LogInfo = processor.LogInfo # pylint: disable-msg=C0103
95
    self.LogStep = processor.LogStep # pylint: disable-msg=C0103
96
    # support for dry-run
97
    self.dry_run_result = None
98

    
99
    # Tasklets
100
    self.tasklets = None
101

    
102
    for attr_name in self._OP_REQP:
103
      attr_val = getattr(op, attr_name, None)
104
      if attr_val is None:
105
        raise errors.OpPrereqError("Required parameter '%s' missing" %
106
                                   attr_name, errors.ECODE_INVAL)
107

    
108
    self.CheckArguments()
109

    
110
  def __GetSSH(self):
111
    """Returns the SshRunner object
112

113
    """
114
    if not self.__ssh:
115
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
116
    return self.__ssh
117

    
118
  ssh = property(fget=__GetSSH)
119

    
120
  def CheckArguments(self):
121
    """Check syntactic validity for the opcode arguments.
122

123
    This method is for doing a simple syntactic check and ensure
124
    validity of opcode parameters, without any cluster-related
125
    checks. While the same can be accomplished in ExpandNames and/or
126
    CheckPrereq, doing these separate is better because:
127

128
      - ExpandNames is left as as purely a lock-related function
129
      - CheckPrereq is run after we have acquired locks (and possible
130
        waited for them)
131

132
    The function is allowed to change the self.op attribute so that
133
    later methods can no longer worry about missing parameters.
134

135
    """
136
    pass
137

    
138
  def ExpandNames(self):
139
    """Expand names for this LU.
140

141
    This method is called before starting to execute the opcode, and it should
142
    update all the parameters of the opcode to their canonical form (e.g. a
143
    short node name must be fully expanded after this method has successfully
144
    completed). This way locking, hooks, logging, ecc. can work correctly.
145

146
    LUs which implement this method must also populate the self.needed_locks
147
    member, as a dict with lock levels as keys, and a list of needed lock names
148
    as values. Rules:
149

150
      - use an empty dict if you don't need any lock
151
      - if you don't need any lock at a particular level omit that level
152
      - don't put anything for the BGL level
153
      - if you want all locks at a level use locking.ALL_SET as a value
154

155
    If you need to share locks (rather than acquire them exclusively) at one
156
    level you can modify self.share_locks, setting a true value (usually 1) for
157
    that level. By default locks are not shared.
158

159
    This function can also define a list of tasklets, which then will be
160
    executed in order instead of the usual LU-level CheckPrereq and Exec
161
    functions, if those are not defined by the LU.
162

163
    Examples::
164

165
      # Acquire all nodes and one instance
166
      self.needed_locks = {
167
        locking.LEVEL_NODE: locking.ALL_SET,
168
        locking.LEVEL_INSTANCE: ['instance1.example.tld'],
169
      }
170
      # Acquire just two nodes
171
      self.needed_locks = {
172
        locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
173
      }
174
      # Acquire no locks
175
      self.needed_locks = {} # No, you can't leave it to the default value None
176

177
    """
178
    # The implementation of this method is mandatory only if the new LU is
179
    # concurrent, so that old LUs don't need to be changed all at the same
180
    # time.
181
    if self.REQ_BGL:
182
      self.needed_locks = {} # Exclusive LUs don't need locks.
183
    else:
184
      raise NotImplementedError
185

    
186
  def DeclareLocks(self, level):
187
    """Declare LU locking needs for a level
188

189
    While most LUs can just declare their locking needs at ExpandNames time,
190
    sometimes there's the need to calculate some locks after having acquired
191
    the ones before. This function is called just before acquiring locks at a
192
    particular level, but after acquiring the ones at lower levels, and permits
193
    such calculations. It can be used to modify self.needed_locks, and by
194
    default it does nothing.
195

196
    This function is only called if you have something already set in
197
    self.needed_locks for the level.
198

199
    @param level: Locking level which is going to be locked
200
    @type level: member of ganeti.locking.LEVELS
201

202
    """
203

    
204
  def CheckPrereq(self):
205
    """Check prerequisites for this LU.
206

207
    This method should check that the prerequisites for the execution
208
    of this LU are fulfilled. It can do internode communication, but
209
    it should be idempotent - no cluster or system changes are
210
    allowed.
211

212
    The method should raise errors.OpPrereqError in case something is
213
    not fulfilled. Its return value is ignored.
214

215
    This method should also update all the parameters of the opcode to
216
    their canonical form if it hasn't been done by ExpandNames before.
217

218
    """
219
    if self.tasklets is not None:
220
      for (idx, tl) in enumerate(self.tasklets):
221
        logging.debug("Checking prerequisites for tasklet %s/%s",
222
                      idx + 1, len(self.tasklets))
223
        tl.CheckPrereq()
224
    else:
225
      raise NotImplementedError
226

    
227
  def Exec(self, feedback_fn):
228
    """Execute the LU.
229

230
    This method should implement the actual work. It should raise
231
    errors.OpExecError for failures that are somewhat dealt with in
232
    code, or expected.
233

234
    """
235
    if self.tasklets is not None:
236
      for (idx, tl) in enumerate(self.tasklets):
237
        logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
238
        tl.Exec(feedback_fn)
239
    else:
240
      raise NotImplementedError
241

    
242
  def BuildHooksEnv(self):
243
    """Build hooks environment for this LU.
244

245
    This method should return a three-node tuple consisting of: a dict
246
    containing the environment that will be used for running the
247
    specific hook for this LU, a list of node names on which the hook
248
    should run before the execution, and a list of node names on which
249
    the hook should run after the execution.
250

251
    The keys of the dict must not have 'GANETI_' prefixed as this will
252
    be handled in the hooks runner. Also note additional keys will be
253
    added by the hooks runner. If the LU doesn't define any
254
    environment, an empty dict (and not None) should be returned.
255

256
    No nodes should be returned as an empty list (and not None).
257

258
    Note that if the HPATH for a LU class is None, this function will
259
    not be called.
260

261
    """
262
    raise NotImplementedError
263

    
264
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
265
    """Notify the LU about the results of its hooks.
266

267
    This method is called every time a hooks phase is executed, and notifies
268
    the Logical Unit about the hooks' result. The LU can then use it to alter
269
    its result based on the hooks.  By default the method does nothing and the
270
    previous result is passed back unchanged but any LU can define it if it
271
    wants to use the local cluster hook-scripts somehow.
272

273
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
274
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
275
    @param hook_results: the results of the multi-node hooks rpc call
276
    @param feedback_fn: function used send feedback back to the caller
277
    @param lu_result: the previous Exec result this LU had, or None
278
        in the PRE phase
279
    @return: the new Exec result, based on the previous result
280
        and hook results
281

282
    """
283
    return lu_result
284

    
285
  def _ExpandAndLockInstance(self):
286
    """Helper function to expand and lock an instance.
287

288
    Many LUs that work on an instance take its name in self.op.instance_name
289
    and need to expand it and then declare the expanded name for locking. This
290
    function does it, and then updates self.op.instance_name to the expanded
291
    name. It also initializes needed_locks as a dict, if this hasn't been done
292
    before.
293

294
    """
295
    if self.needed_locks is None:
296
      self.needed_locks = {}
297
    else:
298
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
299
        "_ExpandAndLockInstance called with instance-level locks set"
300
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
301
    if expanded_name is None:
302
      raise errors.OpPrereqError("Instance '%s' not known" %
303
                                 self.op.instance_name, errors.ECODE_NOENT)
304
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
305
    self.op.instance_name = expanded_name
306

    
307
  def _LockInstancesNodes(self, primary_only=False):
308
    """Helper function to declare instances' nodes for locking.
309

310
    This function should be called after locking one or more instances to lock
311
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
312
    with all primary or secondary nodes for instances already locked and
313
    present in self.needed_locks[locking.LEVEL_INSTANCE].
314

315
    It should be called from DeclareLocks, and for safety only works if
316
    self.recalculate_locks[locking.LEVEL_NODE] is set.
317

318
    In the future it may grow parameters to just lock some instance's nodes, or
319
    to just lock primaries or secondary nodes, if needed.
320

321
    If should be called in DeclareLocks in a way similar to::
322

323
      if level == locking.LEVEL_NODE:
324
        self._LockInstancesNodes()
325

326
    @type primary_only: boolean
327
    @param primary_only: only lock primary nodes of locked instances
328

329
    """
330
    assert locking.LEVEL_NODE in self.recalculate_locks, \
331
      "_LockInstancesNodes helper function called with no nodes to recalculate"
332

    
333
    # TODO: check if we're really been called with the instance locks held
334

    
335
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
336
    # future we might want to have different behaviors depending on the value
337
    # of self.recalculate_locks[locking.LEVEL_NODE]
338
    wanted_nodes = []
339
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
340
      instance = self.context.cfg.GetInstanceInfo(instance_name)
341
      wanted_nodes.append(instance.primary_node)
342
      if not primary_only:
343
        wanted_nodes.extend(instance.secondary_nodes)
344

    
345
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
346
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
347
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
348
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
349

    
350
    del self.recalculate_locks[locking.LEVEL_NODE]
351

    
352

    
353
class NoHooksLU(LogicalUnit): # pylint: disable-msg=W0223
354
  """Simple LU which runs no hooks.
355

356
  This LU is intended as a parent for other LogicalUnits which will
357
  run no hooks, in order to reduce duplicate code.
358

359
  """
360
  HPATH = None
361
  HTYPE = None
362

    
363
  def BuildHooksEnv(self):
364
    """Empty BuildHooksEnv for NoHooksLu.
365

366
    This just raises an error.
367

368
    """
369
    assert False, "BuildHooksEnv called for NoHooksLUs"
370

    
371

    
372
class Tasklet:
373
  """Tasklet base class.
374

375
  Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
376
  they can mix legacy code with tasklets. Locking needs to be done in the LU,
377
  tasklets know nothing about locks.
378

379
  Subclasses must follow these rules:
380
    - Implement CheckPrereq
381
    - Implement Exec
382

383
  """
384
  def __init__(self, lu):
385
    self.lu = lu
386

    
387
    # Shortcuts
388
    self.cfg = lu.cfg
389
    self.rpc = lu.rpc
390

    
391
  def CheckPrereq(self):
392
    """Check prerequisites for this tasklets.
393

394
    This method should check whether the prerequisites for the execution of
395
    this tasklet are fulfilled. It can do internode communication, but it
396
    should be idempotent - no cluster or system changes are allowed.
397

398
    The method should raise errors.OpPrereqError in case something is not
399
    fulfilled. Its return value is ignored.
400

401
    This method should also update all parameters to their canonical form if it
402
    hasn't been done before.
403

404
    """
405
    raise NotImplementedError
406

    
407
  def Exec(self, feedback_fn):
408
    """Execute the tasklet.
409

410
    This method should implement the actual work. It should raise
411
    errors.OpExecError for failures that are somewhat dealt with in code, or
412
    expected.
413

414
    """
415
    raise NotImplementedError
416

    
417

    
418
def _GetWantedNodes(lu, nodes):
419
  """Returns list of checked and expanded node names.
420

421
  @type lu: L{LogicalUnit}
422
  @param lu: the logical unit on whose behalf we execute
423
  @type nodes: list
424
  @param nodes: list of node names or None for all nodes
425
  @rtype: list
426
  @return: the list of nodes, sorted
427
  @raise errors.OpProgrammerError: if the nodes parameter is wrong type
428

429
  """
430
  if not isinstance(nodes, list):
431
    raise errors.OpPrereqError("Invalid argument type 'nodes'",
432
                               errors.ECODE_INVAL)
433

    
434
  if not nodes:
435
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
436
      " non-empty list of nodes whose name is to be expanded.")
437

    
438
  wanted = []
439
  for name in nodes:
440
    node = lu.cfg.ExpandNodeName(name)
441
    if node is None:
442
      raise errors.OpPrereqError("No such node name '%s'" % name,
443
                                 errors.ECODE_NOENT)
444
    wanted.append(node)
445

    
446
  return utils.NiceSort(wanted)
447

    
448

    
449
def _GetWantedInstances(lu, instances):
450
  """Returns list of checked and expanded instance names.
451

452
  @type lu: L{LogicalUnit}
453
  @param lu: the logical unit on whose behalf we execute
454
  @type instances: list
455
  @param instances: list of instance names or None for all instances
456
  @rtype: list
457
  @return: the list of instances, sorted
458
  @raise errors.OpPrereqError: if the instances parameter is wrong type
459
  @raise errors.OpPrereqError: if any of the passed instances is not found
460

461
  """
462
  if not isinstance(instances, list):
463
    raise errors.OpPrereqError("Invalid argument type 'instances'",
464
                               errors.ECODE_INVAL)
465

    
466
  if instances:
467
    wanted = []
468

    
469
    for name in instances:
470
      instance = lu.cfg.ExpandInstanceName(name)
471
      if instance is None:
472
        raise errors.OpPrereqError("No such instance name '%s'" % name,
473
                                   errors.ECODE_NOENT)
474
      wanted.append(instance)
475

    
476
  else:
477
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
478
  return wanted
479

    
480

    
481
def _CheckOutputFields(static, dynamic, selected):
482
  """Checks whether all selected fields are valid.
483

484
  @type static: L{utils.FieldSet}
485
  @param static: static fields set
486
  @type dynamic: L{utils.FieldSet}
487
  @param dynamic: dynamic fields set
488

489
  """
490
  f = utils.FieldSet()
491
  f.Extend(static)
492
  f.Extend(dynamic)
493

    
494
  delta = f.NonMatching(selected)
495
  if delta:
496
    raise errors.OpPrereqError("Unknown output fields selected: %s"
497
                               % ",".join(delta), errors.ECODE_INVAL)
498

    
499

    
500
def _CheckBooleanOpField(op, name):
501
  """Validates boolean opcode parameters.
502

503
  This will ensure that an opcode parameter is either a boolean value,
504
  or None (but that it always exists).
505

506
  """
507
  val = getattr(op, name, None)
508
  if not (val is None or isinstance(val, bool)):
509
    raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
510
                               (name, str(val)), errors.ECODE_INVAL)
511
  setattr(op, name, val)
512

    
513

    
514
def _CheckGlobalHvParams(params):
515
  """Validates that given hypervisor params are not global ones.
516

517
  This will ensure that instances don't get customised versions of
518
  global params.
519

520
  """
521
  used_globals = constants.HVC_GLOBALS.intersection(params)
522
  if used_globals:
523
    msg = ("The following hypervisor parameters are global and cannot"
524
           " be customized at instance level, please modify them at"
525
           " cluster level: %s" % utils.CommaJoin(used_globals))
526
    raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
527

    
528

    
529
def _CheckNodeOnline(lu, node):
530
  """Ensure that a given node is online.
531

532
  @param lu: the LU on behalf of which we make the check
533
  @param node: the node to check
534
  @raise errors.OpPrereqError: if the node is offline
535

536
  """
537
  if lu.cfg.GetNodeInfo(node).offline:
538
    raise errors.OpPrereqError("Can't use offline node %s" % node,
539
                               errors.ECODE_INVAL)
540

    
541

    
542
def _CheckNodeNotDrained(lu, node):
543
  """Ensure that a given node is not drained.
544

545
  @param lu: the LU on behalf of which we make the check
546
  @param node: the node to check
547
  @raise errors.OpPrereqError: if the node is drained
548

549
  """
550
  if lu.cfg.GetNodeInfo(node).drained:
551
    raise errors.OpPrereqError("Can't use drained node %s" % node,
552
                               errors.ECODE_INVAL)
553

    
554

    
555
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
556
                          memory, vcpus, nics, disk_template, disks,
557
                          bep, hvp, hypervisor_name):
558
  """Builds instance related env variables for hooks
559

560
  This builds the hook environment from individual variables.
561

562
  @type name: string
563
  @param name: the name of the instance
564
  @type primary_node: string
565
  @param primary_node: the name of the instance's primary node
566
  @type secondary_nodes: list
567
  @param secondary_nodes: list of secondary nodes as strings
568
  @type os_type: string
569
  @param os_type: the name of the instance's OS
570
  @type status: boolean
571
  @param status: the should_run status of the instance
572
  @type memory: string
573
  @param memory: the memory size of the instance
574
  @type vcpus: string
575
  @param vcpus: the count of VCPUs the instance has
576
  @type nics: list
577
  @param nics: list of tuples (ip, mac, mode, link) representing
578
      the NICs the instance has
579
  @type disk_template: string
580
  @param disk_template: the disk template of the instance
581
  @type disks: list
582
  @param disks: the list of (size, mode) pairs
583
  @type bep: dict
584
  @param bep: the backend parameters for the instance
585
  @type hvp: dict
586
  @param hvp: the hypervisor parameters for the instance
587
  @type hypervisor_name: string
588
  @param hypervisor_name: the hypervisor for the instance
589
  @rtype: dict
590
  @return: the hook environment for this instance
591

592
  """
593
  if status:
594
    str_status = "up"
595
  else:
596
    str_status = "down"
597
  env = {
598
    "OP_TARGET": name,
599
    "INSTANCE_NAME": name,
600
    "INSTANCE_PRIMARY": primary_node,
601
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
602
    "INSTANCE_OS_TYPE": os_type,
603
    "INSTANCE_STATUS": str_status,
604
    "INSTANCE_MEMORY": memory,
605
    "INSTANCE_VCPUS": vcpus,
606
    "INSTANCE_DISK_TEMPLATE": disk_template,
607
    "INSTANCE_HYPERVISOR": hypervisor_name,
608
  }
609

    
610
  if nics:
611
    nic_count = len(nics)
612
    for idx, (ip, mac, mode, link) in enumerate(nics):
613
      if ip is None:
614
        ip = ""
615
      env["INSTANCE_NIC%d_IP" % idx] = ip
616
      env["INSTANCE_NIC%d_MAC" % idx] = mac
617
      env["INSTANCE_NIC%d_MODE" % idx] = mode
618
      env["INSTANCE_NIC%d_LINK" % idx] = link
619
      if mode == constants.NIC_MODE_BRIDGED:
620
        env["INSTANCE_NIC%d_BRIDGE" % idx] = link
621
  else:
622
    nic_count = 0
623

    
624
  env["INSTANCE_NIC_COUNT"] = nic_count
625

    
626
  if disks:
627
    disk_count = len(disks)
628
    for idx, (size, mode) in enumerate(disks):
629
      env["INSTANCE_DISK%d_SIZE" % idx] = size
630
      env["INSTANCE_DISK%d_MODE" % idx] = mode
631
  else:
632
    disk_count = 0
633

    
634
  env["INSTANCE_DISK_COUNT"] = disk_count
635

    
636
  for source, kind in [(bep, "BE"), (hvp, "HV")]:
637
    for key, value in source.items():
638
      env["INSTANCE_%s_%s" % (kind, key)] = value
639

    
640
  return env
641

    
642

    
643
def _NICListToTuple(lu, nics):
644
  """Build a list of nic information tuples.
645

646
  This list is suitable to be passed to _BuildInstanceHookEnv or as a return
647
  value in LUQueryInstanceData.
648

649
  @type lu:  L{LogicalUnit}
650
  @param lu: the logical unit on whose behalf we execute
651
  @type nics: list of L{objects.NIC}
652
  @param nics: list of nics to convert to hooks tuples
653

654
  """
655
  hooks_nics = []
656
  c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
657
  for nic in nics:
658
    ip = nic.ip
659
    mac = nic.mac
660
    filled_params = objects.FillDict(c_nicparams, nic.nicparams)
661
    mode = filled_params[constants.NIC_MODE]
662
    link = filled_params[constants.NIC_LINK]
663
    hooks_nics.append((ip, mac, mode, link))
664
  return hooks_nics
665

    
666

    
667
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
668
  """Builds instance related env variables for hooks from an object.
669

670
  @type lu: L{LogicalUnit}
671
  @param lu: the logical unit on whose behalf we execute
672
  @type instance: L{objects.Instance}
673
  @param instance: the instance for which we should build the
674
      environment
675
  @type override: dict
676
  @param override: dictionary with key/values that will override
677
      our values
678
  @rtype: dict
679
  @return: the hook environment dictionary
680

681
  """
682
  cluster = lu.cfg.GetClusterInfo()
683
  bep = cluster.FillBE(instance)
684
  hvp = cluster.FillHV(instance)
685
  args = {
686
    'name': instance.name,
687
    'primary_node': instance.primary_node,
688
    'secondary_nodes': instance.secondary_nodes,
689
    'os_type': instance.os,
690
    'status': instance.admin_up,
691
    'memory': bep[constants.BE_MEMORY],
692
    'vcpus': bep[constants.BE_VCPUS],
693
    'nics': _NICListToTuple(lu, instance.nics),
694
    'disk_template': instance.disk_template,
695
    'disks': [(disk.size, disk.mode) for disk in instance.disks],
696
    'bep': bep,
697
    'hvp': hvp,
698
    'hypervisor_name': instance.hypervisor,
699
  }
700
  if override:
701
    args.update(override)
702
  return _BuildInstanceHookEnv(**args)
703

    
704

    
705
def _AdjustCandidatePool(lu, exceptions):
706
  """Adjust the candidate pool after node operations.
707

708
  """
709
  mod_list = lu.cfg.MaintainCandidatePool(exceptions)
710
  if mod_list:
711
    lu.LogInfo("Promoted nodes to master candidate role: %s",
712
               utils.CommaJoin(node.name for node in mod_list))
713
    for name in mod_list:
714
      lu.context.ReaddNode(name)
715
  mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
716
  if mc_now > mc_max:
717
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
718
               (mc_now, mc_max))
719

    
720

    
721
def _DecideSelfPromotion(lu, exceptions=None):
722
  """Decide whether I should promote myself as a master candidate.
723

724
  """
725
  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
726
  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
727
  # the new node will increase mc_max with one, so:
728
  mc_should = min(mc_should + 1, cp_size)
729
  return mc_now < mc_should
730

    
731

    
732
def _CheckNicsBridgesExist(lu, target_nics, target_node,
733
                               profile=constants.PP_DEFAULT):
734
  """Check that the brigdes needed by a list of nics exist.
735

736
  """
737
  c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
738
  paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
739
                for nic in target_nics]
740
  brlist = [params[constants.NIC_LINK] for params in paramslist
741
            if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
742
  if brlist:
743
    result = lu.rpc.call_bridges_exist(target_node, brlist)
744
    result.Raise("Error checking bridges on destination node '%s'" %
745
                 target_node, prereq=True, ecode=errors.ECODE_ENVIRON)
746

    
747

    
748
def _CheckInstanceBridgesExist(lu, instance, node=None):
749
  """Check that the brigdes needed by an instance exist.
750

751
  """
752
  if node is None:
753
    node = instance.primary_node
754
  _CheckNicsBridgesExist(lu, instance.nics, node)
755

    
756

    
757
def _CheckOSVariant(os_obj, name):
758
  """Check whether an OS name conforms to the os variants specification.
759

760
  @type os_obj: L{objects.OS}
761
  @param os_obj: OS object to check
762
  @type name: string
763
  @param name: OS name passed by the user, to check for validity
764

765
  """
766
  if not os_obj.supported_variants:
767
    return
768
  try:
769
    variant = name.split("+", 1)[1]
770
  except IndexError:
771
    raise errors.OpPrereqError("OS name must include a variant",
772
                               errors.ECODE_INVAL)
773

    
774
  if variant not in os_obj.supported_variants:
775
    raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
776

    
777

    
778
def _GetNodeInstancesInner(cfg, fn):
779
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
780

    
781

    
782
def _GetNodeInstances(cfg, node_name):
783
  """Returns a list of all primary and secondary instances on a node.
784

785
  """
786

    
787
  return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
788

    
789

    
790
def _GetNodePrimaryInstances(cfg, node_name):
791
  """Returns primary instances on a node.
792

793
  """
794
  return _GetNodeInstancesInner(cfg,
795
                                lambda inst: node_name == inst.primary_node)
796

    
797

    
798
def _GetNodeSecondaryInstances(cfg, node_name):
799
  """Returns secondary instances on a node.
800

801
  """
802
  return _GetNodeInstancesInner(cfg,
803
                                lambda inst: node_name in inst.secondary_nodes)
804

    
805

    
806
def _GetStorageTypeArgs(cfg, storage_type):
807
  """Returns the arguments for a storage type.
808

809
  """
810
  # Special case for file storage
811
  if storage_type == constants.ST_FILE:
812
    # storage.FileStorage wants a list of storage directories
813
    return [[cfg.GetFileStorageDir()]]
814

    
815
  return []
816

    
817

    
818
def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
819
  faulty = []
820

    
821
  for dev in instance.disks:
822
    cfg.SetDiskID(dev, node_name)
823

    
824
  result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks)
825
  result.Raise("Failed to get disk status from node %s" % node_name,
826
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
827

    
828
  for idx, bdev_status in enumerate(result.payload):
829
    if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
830
      faulty.append(idx)
831

    
832
  return faulty
833

    
834

    
835
class LUPostInitCluster(LogicalUnit):
836
  """Logical unit for running hooks after cluster initialization.
837

838
  """
839
  HPATH = "cluster-init"
840
  HTYPE = constants.HTYPE_CLUSTER
841
  _OP_REQP = []
842

    
843
  def BuildHooksEnv(self):
844
    """Build hooks env.
845

846
    """
847
    env = {"OP_TARGET": self.cfg.GetClusterName()}
848
    mn = self.cfg.GetMasterNode()
849
    return env, [], [mn]
850

    
851
  def CheckPrereq(self):
852
    """No prerequisites to check.
853

854
    """
855
    return True
856

    
857
  def Exec(self, feedback_fn):
858
    """Nothing to do.
859

860
    """
861
    return True
862

    
863

    
864
class LUDestroyCluster(LogicalUnit):
865
  """Logical unit for destroying the cluster.
866

867
  """
868
  HPATH = "cluster-destroy"
869
  HTYPE = constants.HTYPE_CLUSTER
870
  _OP_REQP = []
871

    
872
  def BuildHooksEnv(self):
873
    """Build hooks env.
874

875
    """
876
    env = {"OP_TARGET": self.cfg.GetClusterName()}
877
    return env, [], []
878

    
879
  def CheckPrereq(self):
880
    """Check prerequisites.
881

882
    This checks whether the cluster is empty.
883

884
    Any errors are signaled by raising errors.OpPrereqError.
885

886
    """
887
    master = self.cfg.GetMasterNode()
888

    
889
    nodelist = self.cfg.GetNodeList()
890
    if len(nodelist) != 1 or nodelist[0] != master:
891
      raise errors.OpPrereqError("There are still %d node(s) in"
892
                                 " this cluster." % (len(nodelist) - 1),
893
                                 errors.ECODE_INVAL)
894
    instancelist = self.cfg.GetInstanceList()
895
    if instancelist:
896
      raise errors.OpPrereqError("There are still %d instance(s) in"
897
                                 " this cluster." % len(instancelist),
898
                                 errors.ECODE_INVAL)
899

    
900
  def Exec(self, feedback_fn):
901
    """Destroys the cluster.
902

903
    """
904
    master = self.cfg.GetMasterNode()
905
    modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
906

    
907
    # Run post hooks on master node before it's removed
908
    hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
909
    try:
910
      hm.RunPhase(constants.HOOKS_PHASE_POST, [master])
911
    except:
912
      self.LogWarning("Errors occurred running hooks on %s" % master)
913

    
914
    result = self.rpc.call_node_stop_master(master, False)
915
    result.Raise("Could not disable the master role")
916

    
917
    if modify_ssh_setup:
918
      priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
919
      utils.CreateBackup(priv_key)
920
      utils.CreateBackup(pub_key)
921

    
922
    return master
923

    
924

    
925
class LUVerifyCluster(LogicalUnit):
926
  """Verifies the cluster status.
927

928
  """
929
  HPATH = "cluster-verify"
930
  HTYPE = constants.HTYPE_CLUSTER
931
  _OP_REQP = ["skip_checks", "verbose", "error_codes", "debug_simulate_errors"]
932
  REQ_BGL = False
933

    
934
  TCLUSTER = "cluster"
935
  TNODE = "node"
936
  TINSTANCE = "instance"
937

    
938
  ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG")
939
  EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE")
940
  EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN")
941
  EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT")
942
  EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
943
  EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
944
  EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE")
945
  ENODEDRBD = (TNODE, "ENODEDRBD")
946
  ENODEFILECHECK = (TNODE, "ENODEFILECHECK")
947
  ENODEHOOKS = (TNODE, "ENODEHOOKS")
948
  ENODEHV = (TNODE, "ENODEHV")
949
  ENODELVM = (TNODE, "ENODELVM")
950
  ENODEN1 = (TNODE, "ENODEN1")
951
  ENODENET = (TNODE, "ENODENET")
952
  ENODEORPHANINSTANCE = (TNODE, "ENODEORPHANINSTANCE")
953
  ENODEORPHANLV = (TNODE, "ENODEORPHANLV")
954
  ENODERPC = (TNODE, "ENODERPC")
955
  ENODESSH = (TNODE, "ENODESSH")
956
  ENODEVERSION = (TNODE, "ENODEVERSION")
957
  ENODESETUP = (TNODE, "ENODESETUP")
958
  ENODETIME = (TNODE, "ENODETIME")
959

    
960
  ETYPE_FIELD = "code"
961
  ETYPE_ERROR = "ERROR"
962
  ETYPE_WARNING = "WARNING"
963

    
964
  def ExpandNames(self):
965
    self.needed_locks = {
966
      locking.LEVEL_NODE: locking.ALL_SET,
967
      locking.LEVEL_INSTANCE: locking.ALL_SET,
968
    }
969
    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
970

    
971
  def _Error(self, ecode, item, msg, *args, **kwargs):
972
    """Format an error message.
973

974
    Based on the opcode's error_codes parameter, either format a
975
    parseable error code, or a simpler error string.
976

977
    This must be called only from Exec and functions called from Exec.
978

979
    """
980
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
981
    itype, etxt = ecode
982
    # first complete the msg
983
    if args:
984
      msg = msg % args
985
    # then format the whole message
986
    if self.op.error_codes:
987
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
988
    else:
989
      if item:
990
        item = " " + item
991
      else:
992
        item = ""
993
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
994
    # and finally report it via the feedback_fn
995
    self._feedback_fn("  - %s" % msg)
996

    
997
  def _ErrorIf(self, cond, *args, **kwargs):
998
    """Log an error message if the passed condition is True.
999

1000
    """
1001
    cond = bool(cond) or self.op.debug_simulate_errors
1002
    if cond:
1003
      self._Error(*args, **kwargs)
1004
    # do not mark the operation as failed for WARN cases only
1005
    if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
1006
      self.bad = self.bad or cond
1007

    
1008
  def _VerifyNode(self, nodeinfo, file_list, local_cksum,
1009
                  node_result, master_files, drbd_map, vg_name):
1010
    """Run multiple tests against a node.
1011

1012
    Test list:
1013

1014
      - compares ganeti version
1015
      - checks vg existence and size > 20G
1016
      - checks config file checksum
1017
      - checks ssh to other nodes
1018

1019
    @type nodeinfo: L{objects.Node}
1020
    @param nodeinfo: the node to check
1021
    @param file_list: required list of files
1022
    @param local_cksum: dictionary of local files and their checksums
1023
    @param node_result: the results from the node
1024
    @param master_files: list of files that only masters should have
1025
    @param drbd_map: the useddrbd minors for this node, in
1026
        form of minor: (instance, must_exist) which correspond to instances
1027
        and their running status
1028
    @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
1029

1030
    """
1031
    node = nodeinfo.name
1032
    _ErrorIf = self._ErrorIf
1033

    
1034
    # main result, node_result should be a non-empty dict
1035
    test = not node_result or not isinstance(node_result, dict)
1036
    _ErrorIf(test, self.ENODERPC, node,
1037
                  "unable to verify node: no data returned")
1038
    if test:
1039
      return
1040

    
1041
    # compares ganeti version
1042
    local_version = constants.PROTOCOL_VERSION
1043
    remote_version = node_result.get('version', None)
1044
    test = not (remote_version and
1045
                isinstance(remote_version, (list, tuple)) and
1046
                len(remote_version) == 2)
1047
    _ErrorIf(test, self.ENODERPC, node,
1048
             "connection to node returned invalid data")
1049
    if test:
1050
      return
1051

    
1052
    test = local_version != remote_version[0]
1053
    _ErrorIf(test, self.ENODEVERSION, node,
1054
             "incompatible protocol versions: master %s,"
1055
             " node %s", local_version, remote_version[0])
1056
    if test:
1057
      return
1058

    
1059
    # node seems compatible, we can actually try to look into its results
1060

    
1061
    # full package version
1062
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1063
                  self.ENODEVERSION, node,
1064
                  "software version mismatch: master %s, node %s",
1065
                  constants.RELEASE_VERSION, remote_version[1],
1066
                  code=self.ETYPE_WARNING)
1067

    
1068
    # checks vg existence and size > 20G
1069
    if vg_name is not None:
1070
      vglist = node_result.get(constants.NV_VGLIST, None)
1071
      test = not vglist
1072
      _ErrorIf(test, self.ENODELVM, node, "unable to check volume groups")
1073
      if not test:
1074
        vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1075
                                              constants.MIN_VG_SIZE)
1076
        _ErrorIf(vgstatus, self.ENODELVM, node, vgstatus)
1077

    
1078
    # checks config file checksum
1079

    
1080
    remote_cksum = node_result.get(constants.NV_FILELIST, None)
1081
    test = not isinstance(remote_cksum, dict)
1082
    _ErrorIf(test, self.ENODEFILECHECK, node,
1083
             "node hasn't returned file checksum data")
1084
    if not test:
1085
      for file_name in file_list:
1086
        node_is_mc = nodeinfo.master_candidate
1087
        must_have = (file_name not in master_files) or node_is_mc
1088
        # missing
1089
        test1 = file_name not in remote_cksum
1090
        # invalid checksum
1091
        test2 = not test1 and remote_cksum[file_name] != local_cksum[file_name]
1092
        # existing and good
1093
        test3 = not test1 and remote_cksum[file_name] == local_cksum[file_name]
1094
        _ErrorIf(test1 and must_have, self.ENODEFILECHECK, node,
1095
                 "file '%s' missing", file_name)
1096
        _ErrorIf(test2 and must_have, self.ENODEFILECHECK, node,
1097
                 "file '%s' has wrong checksum", file_name)
1098
        # not candidate and this is not a must-have file
1099
        _ErrorIf(test2 and not must_have, self.ENODEFILECHECK, node,
1100
                 "file '%s' should not exist on non master"
1101
                 " candidates (and the file is outdated)", file_name)
1102
        # all good, except non-master/non-must have combination
1103
        _ErrorIf(test3 and not must_have, self.ENODEFILECHECK, node,
1104
                 "file '%s' should not exist"
1105
                 " on non master candidates", file_name)
1106

    
1107
    # checks ssh to any
1108

    
1109
    test = constants.NV_NODELIST not in node_result
1110
    _ErrorIf(test, self.ENODESSH, node,
1111
             "node hasn't returned node ssh connectivity data")
1112
    if not test:
1113
      if node_result[constants.NV_NODELIST]:
1114
        for a_node, a_msg in node_result[constants.NV_NODELIST].items():
1115
          _ErrorIf(True, self.ENODESSH, node,
1116
                   "ssh communication with node '%s': %s", a_node, a_msg)
1117

    
1118
    test = constants.NV_NODENETTEST not in node_result
1119
    _ErrorIf(test, self.ENODENET, node,
1120
             "node hasn't returned node tcp connectivity data")
1121
    if not test:
1122
      if node_result[constants.NV_NODENETTEST]:
1123
        nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
1124
        for anode in nlist:
1125
          _ErrorIf(True, self.ENODENET, node,
1126
                   "tcp communication with node '%s': %s",
1127
                   anode, node_result[constants.NV_NODENETTEST][anode])
1128

    
1129
    hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
1130
    if isinstance(hyp_result, dict):
1131
      for hv_name, hv_result in hyp_result.iteritems():
1132
        test = hv_result is not None
1133
        _ErrorIf(test, self.ENODEHV, node,
1134
                 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1135

    
1136
    # check used drbd list
1137
    if vg_name is not None:
1138
      used_minors = node_result.get(constants.NV_DRBDLIST, [])
1139
      test = not isinstance(used_minors, (tuple, list))
1140
      _ErrorIf(test, self.ENODEDRBD, node,
1141
               "cannot parse drbd status file: %s", str(used_minors))
1142
      if not test:
1143
        for minor, (iname, must_exist) in drbd_map.items():
1144
          test = minor not in used_minors and must_exist
1145
          _ErrorIf(test, self.ENODEDRBD, node,
1146
                   "drbd minor %d of instance %s is not active",
1147
                   minor, iname)
1148
        for minor in used_minors:
1149
          test = minor not in drbd_map
1150
          _ErrorIf(test, self.ENODEDRBD, node,
1151
                   "unallocated drbd minor %d is in use", minor)
1152
    test = node_result.get(constants.NV_NODESETUP,
1153
                           ["Missing NODESETUP results"])
1154
    _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s",
1155
             "; ".join(test))
1156

    
1157
    # check pv names
1158
    if vg_name is not None:
1159
      pvlist = node_result.get(constants.NV_PVLIST, None)
1160
      test = pvlist is None
1161
      _ErrorIf(test, self.ENODELVM, node, "Can't get PV list from node")
1162
      if not test:
1163
        # check that ':' is not present in PV names, since it's a
1164
        # special character for lvcreate (denotes the range of PEs to
1165
        # use on the PV)
1166
        for _, pvname, owner_vg in pvlist:
1167
          test = ":" in pvname
1168
          _ErrorIf(test, self.ENODELVM, node, "Invalid character ':' in PV"
1169
                   " '%s' of VG '%s'", pvname, owner_vg)
1170

    
1171
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
1172
                      node_instance, n_offline):
1173
    """Verify an instance.
1174

1175
    This function checks to see if the required block devices are
1176
    available on the instance's node.
1177

1178
    """
1179
    _ErrorIf = self._ErrorIf
1180
    node_current = instanceconfig.primary_node
1181

    
1182
    node_vol_should = {}
1183
    instanceconfig.MapLVsByNode(node_vol_should)
1184

    
1185
    for node in node_vol_should:
1186
      if node in n_offline:
1187
        # ignore missing volumes on offline nodes
1188
        continue
1189
      for volume in node_vol_should[node]:
1190
        test = node not in node_vol_is or volume not in node_vol_is[node]
1191
        _ErrorIf(test, self.EINSTANCEMISSINGDISK, instance,
1192
                 "volume %s missing on node %s", volume, node)
1193

    
1194
    if instanceconfig.admin_up:
1195
      test = ((node_current not in node_instance or
1196
               not instance in node_instance[node_current]) and
1197
              node_current not in n_offline)
1198
      _ErrorIf(test, self.EINSTANCEDOWN, instance,
1199
               "instance not running on its primary node %s",
1200
               node_current)
1201

    
1202
    for node in node_instance:
1203
      if (not node == node_current):
1204
        test = instance in node_instance[node]
1205
        _ErrorIf(test, self.EINSTANCEWRONGNODE, instance,
1206
                 "instance should not run on node %s", node)
1207

    
1208
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is):
1209
    """Verify if there are any unknown volumes in the cluster.
1210

1211
    The .os, .swap and backup volumes are ignored. All other volumes are
1212
    reported as unknown.
1213

1214
    """
1215
    for node in node_vol_is:
1216
      for volume in node_vol_is[node]:
1217
        test = (node not in node_vol_should or
1218
                volume not in node_vol_should[node])
1219
        self._ErrorIf(test, self.ENODEORPHANLV, node,
1220
                      "volume %s is unknown", volume)
1221

    
1222
  def _VerifyOrphanInstances(self, instancelist, node_instance):
1223
    """Verify the list of running instances.
1224

1225
    This checks what instances are running but unknown to the cluster.
1226

1227
    """
1228
    for node in node_instance:
1229
      for o_inst in node_instance[node]:
1230
        test = o_inst not in instancelist
1231
        self._ErrorIf(test, self.ENODEORPHANINSTANCE, node,
1232
                      "instance %s on node %s should not exist", o_inst, node)
1233

    
1234
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg):
1235
    """Verify N+1 Memory Resilience.
1236

1237
    Check that if one single node dies we can still start all the instances it
1238
    was primary for.
1239

1240
    """
1241
    for node, nodeinfo in node_info.iteritems():
1242
      # This code checks that every node which is now listed as secondary has
1243
      # enough memory to host all instances it is supposed to should a single
1244
      # other node in the cluster fail.
1245
      # FIXME: not ready for failover to an arbitrary node
1246
      # FIXME: does not support file-backed instances
1247
      # WARNING: we currently take into account down instances as well as up
1248
      # ones, considering that even if they're down someone might want to start
1249
      # them even in the event of a node failure.
1250
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
1251
        needed_mem = 0
1252
        for instance in instances:
1253
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
1254
          if bep[constants.BE_AUTO_BALANCE]:
1255
            needed_mem += bep[constants.BE_MEMORY]
1256
        test = nodeinfo['mfree'] < needed_mem
1257
        self._ErrorIf(test, self.ENODEN1, node,
1258
                      "not enough memory on to accommodate"
1259
                      " failovers should peer node %s fail", prinode)
1260

    
1261
  def CheckPrereq(self):
1262
    """Check prerequisites.
1263

1264
    Transform the list of checks we're going to skip into a set and check that
1265
    all its members are valid.
1266

1267
    """
1268
    self.skip_set = frozenset(self.op.skip_checks)
1269
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
1270
      raise errors.OpPrereqError("Invalid checks to be skipped specified",
1271
                                 errors.ECODE_INVAL)
1272

    
1273
  def BuildHooksEnv(self):
1274
    """Build hooks env.
1275

1276
    Cluster-Verify hooks just ran in the post phase and their failure makes
1277
    the output be logged in the verify output and the verification to fail.
1278

1279
    """
1280
    all_nodes = self.cfg.GetNodeList()
1281
    env = {
1282
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
1283
      }
1284
    for node in self.cfg.GetAllNodesInfo().values():
1285
      env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
1286

    
1287
    return env, [], all_nodes
1288

    
1289
  def Exec(self, feedback_fn):
1290
    """Verify integrity of cluster, performing various test on nodes.
1291

1292
    """
1293
    self.bad = False
1294
    _ErrorIf = self._ErrorIf
1295
    verbose = self.op.verbose
1296
    self._feedback_fn = feedback_fn
1297
    feedback_fn("* Verifying global settings")
1298
    for msg in self.cfg.VerifyConfig():
1299
      _ErrorIf(True, self.ECLUSTERCFG, None, msg)
1300

    
1301
    vg_name = self.cfg.GetVGName()
1302
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
1303
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
1304
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
1305
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
1306
    instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
1307
                        for iname in instancelist)
1308
    i_non_redundant = [] # Non redundant instances
1309
    i_non_a_balanced = [] # Non auto-balanced instances
1310
    n_offline = [] # List of offline nodes
1311
    n_drained = [] # List of nodes being drained
1312
    node_volume = {}
1313
    node_instance = {}
1314
    node_info = {}
1315
    instance_cfg = {}
1316

    
1317
    # FIXME: verify OS list
1318
    # do local checksums
1319
    master_files = [constants.CLUSTER_CONF_FILE]
1320

    
1321
    file_names = ssconf.SimpleStore().GetFileList()
1322
    file_names.append(constants.SSL_CERT_FILE)
1323
    file_names.append(constants.RAPI_CERT_FILE)
1324
    file_names.extend(master_files)
1325

    
1326
    local_checksums = utils.FingerprintFiles(file_names)
1327

    
1328
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1329
    node_verify_param = {
1330
      constants.NV_FILELIST: file_names,
1331
      constants.NV_NODELIST: [node.name for node in nodeinfo
1332
                              if not node.offline],
1333
      constants.NV_HYPERVISOR: hypervisors,
1334
      constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1335
                                  node.secondary_ip) for node in nodeinfo
1336
                                 if not node.offline],
1337
      constants.NV_INSTANCELIST: hypervisors,
1338
      constants.NV_VERSION: None,
1339
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1340
      constants.NV_NODESETUP: None,
1341
      constants.NV_TIME: None,
1342
      }
1343

    
1344
    if vg_name is not None:
1345
      node_verify_param[constants.NV_VGLIST] = None
1346
      node_verify_param[constants.NV_LVLIST] = vg_name
1347
      node_verify_param[constants.NV_PVLIST] = [vg_name]
1348
      node_verify_param[constants.NV_DRBDLIST] = None
1349

    
1350
    # Due to the way our RPC system works, exact response times cannot be
1351
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
1352
    # time before and after executing the request, we can at least have a time
1353
    # window.
1354
    nvinfo_starttime = time.time()
1355
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1356
                                           self.cfg.GetClusterName())
1357
    nvinfo_endtime = time.time()
1358

    
1359
    cluster = self.cfg.GetClusterInfo()
1360
    master_node = self.cfg.GetMasterNode()
1361
    all_drbd_map = self.cfg.ComputeDRBDMap()
1362

    
1363
    feedback_fn("* Verifying node status")
1364
    for node_i in nodeinfo:
1365
      node = node_i.name
1366

    
1367
      if node_i.offline:
1368
        if verbose:
1369
          feedback_fn("* Skipping offline node %s" % (node,))
1370
        n_offline.append(node)
1371
        continue
1372

    
1373
      if node == master_node:
1374
        ntype = "master"
1375
      elif node_i.master_candidate:
1376
        ntype = "master candidate"
1377
      elif node_i.drained:
1378
        ntype = "drained"
1379
        n_drained.append(node)
1380
      else:
1381
        ntype = "regular"
1382
      if verbose:
1383
        feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1384

    
1385
      msg = all_nvinfo[node].fail_msg
1386
      _ErrorIf(msg, self.ENODERPC, node, "while contacting node: %s", msg)
1387
      if msg:
1388
        continue
1389

    
1390
      nresult = all_nvinfo[node].payload
1391
      node_drbd = {}
1392
      for minor, instance in all_drbd_map[node].items():
1393
        test = instance not in instanceinfo
1394
        _ErrorIf(test, self.ECLUSTERCFG, None,
1395
                 "ghost instance '%s' in temporary DRBD map", instance)
1396
          # ghost instance should not be running, but otherwise we
1397
          # don't give double warnings (both ghost instance and
1398
          # unallocated minor in use)
1399
        if test:
1400
          node_drbd[minor] = (instance, False)
1401
        else:
1402
          instance = instanceinfo[instance]
1403
          node_drbd[minor] = (instance.name, instance.admin_up)
1404

    
1405
      self._VerifyNode(node_i, file_names, local_checksums,
1406
                       nresult, master_files, node_drbd, vg_name)
1407

    
1408
      lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1409
      if vg_name is None:
1410
        node_volume[node] = {}
1411
      elif isinstance(lvdata, basestring):
1412
        _ErrorIf(True, self.ENODELVM, node, "LVM problem on node: %s",
1413
                 utils.SafeEncode(lvdata))
1414
        node_volume[node] = {}
1415
      elif not isinstance(lvdata, dict):
1416
        _ErrorIf(True, self.ENODELVM, node, "rpc call to node failed (lvlist)")
1417
        continue
1418
      else:
1419
        node_volume[node] = lvdata
1420

    
1421
      # node_instance
1422
      idata = nresult.get(constants.NV_INSTANCELIST, None)
1423
      test = not isinstance(idata, list)
1424
      _ErrorIf(test, self.ENODEHV, node,
1425
               "rpc call to node failed (instancelist)")
1426
      if test:
1427
        continue
1428

    
1429
      node_instance[node] = idata
1430

    
1431
      # node_info
1432
      nodeinfo = nresult.get(constants.NV_HVINFO, None)
1433
      test = not isinstance(nodeinfo, dict)
1434
      _ErrorIf(test, self.ENODEHV, node, "rpc call to node failed (hvinfo)")
1435
      if test:
1436
        continue
1437

    
1438
      # Node time
1439
      ntime = nresult.get(constants.NV_TIME, None)
1440
      try:
1441
        ntime_merged = utils.MergeTime(ntime)
1442
      except (ValueError, TypeError):
1443
        _ErrorIf(test, self.ENODETIME, node, "Node returned invalid time")
1444

    
1445
      if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1446
        ntime_diff = abs(nvinfo_starttime - ntime_merged)
1447
      elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1448
        ntime_diff = abs(ntime_merged - nvinfo_endtime)
1449
      else:
1450
        ntime_diff = None
1451

    
1452
      _ErrorIf(ntime_diff is not None, self.ENODETIME, node,
1453
               "Node time diverges by at least %0.1fs from master node time",
1454
               ntime_diff)
1455

    
1456
      if ntime_diff is not None:
1457
        continue
1458

    
1459
      try:
1460
        node_info[node] = {
1461
          "mfree": int(nodeinfo['memory_free']),
1462
          "pinst": [],
1463
          "sinst": [],
1464
          # dictionary holding all instances this node is secondary for,
1465
          # grouped by their primary node. Each key is a cluster node, and each
1466
          # value is a list of instances which have the key as primary and the
1467
          # current node as secondary.  this is handy to calculate N+1 memory
1468
          # availability if you can only failover from a primary to its
1469
          # secondary.
1470
          "sinst-by-pnode": {},
1471
        }
1472
        # FIXME: devise a free space model for file based instances as well
1473
        if vg_name is not None:
1474
          test = (constants.NV_VGLIST not in nresult or
1475
                  vg_name not in nresult[constants.NV_VGLIST])
1476
          _ErrorIf(test, self.ENODELVM, node,
1477
                   "node didn't return data for the volume group '%s'"
1478
                   " - it is either missing or broken", vg_name)
1479
          if test:
1480
            continue
1481
          node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1482
      except (ValueError, KeyError):
1483
        _ErrorIf(True, self.ENODERPC, node,
1484
                 "node returned invalid nodeinfo, check lvm/hypervisor")
1485
        continue
1486

    
1487
    node_vol_should = {}
1488

    
1489
    feedback_fn("* Verifying instance status")
1490
    for instance in instancelist:
1491
      if verbose:
1492
        feedback_fn("* Verifying instance %s" % instance)
1493
      inst_config = instanceinfo[instance]
1494
      self._VerifyInstance(instance, inst_config, node_volume,
1495
                           node_instance, n_offline)
1496
      inst_nodes_offline = []
1497

    
1498
      inst_config.MapLVsByNode(node_vol_should)
1499

    
1500
      instance_cfg[instance] = inst_config
1501

    
1502
      pnode = inst_config.primary_node
1503
      _ErrorIf(pnode not in node_info and pnode not in n_offline,
1504
               self.ENODERPC, pnode, "instance %s, connection to"
1505
               " primary node failed", instance)
1506
      if pnode in node_info:
1507
        node_info[pnode]['pinst'].append(instance)
1508

    
1509
      if pnode in n_offline:
1510
        inst_nodes_offline.append(pnode)
1511

    
1512
      # If the instance is non-redundant we cannot survive losing its primary
1513
      # node, so we are not N+1 compliant. On the other hand we have no disk
1514
      # templates with more than one secondary so that situation is not well
1515
      # supported either.
1516
      # FIXME: does not support file-backed instances
1517
      if len(inst_config.secondary_nodes) == 0:
1518
        i_non_redundant.append(instance)
1519
      _ErrorIf(len(inst_config.secondary_nodes) > 1,
1520
               self.EINSTANCELAYOUT, instance,
1521
               "instance has multiple secondary nodes", code="WARNING")
1522

    
1523
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1524
        i_non_a_balanced.append(instance)
1525

    
1526
      for snode in inst_config.secondary_nodes:
1527
        _ErrorIf(snode not in node_info and snode not in n_offline,
1528
                 self.ENODERPC, snode,
1529
                 "instance %s, connection to secondary node"
1530
                 "failed", instance)
1531

    
1532
        if snode in node_info:
1533
          node_info[snode]['sinst'].append(instance)
1534
          if pnode not in node_info[snode]['sinst-by-pnode']:
1535
            node_info[snode]['sinst-by-pnode'][pnode] = []
1536
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1537

    
1538
        if snode in n_offline:
1539
          inst_nodes_offline.append(snode)
1540

    
1541
      # warn that the instance lives on offline nodes
1542
      _ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance,
1543
               "instance lives on offline node(s) %s",
1544
               utils.CommaJoin(inst_nodes_offline))
1545

    
1546
    feedback_fn("* Verifying orphan volumes")
1547
    self._VerifyOrphanVolumes(node_vol_should, node_volume)
1548

    
1549
    feedback_fn("* Verifying remaining instances")
1550
    self._VerifyOrphanInstances(instancelist, node_instance)
1551

    
1552
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1553
      feedback_fn("* Verifying N+1 Memory redundancy")
1554
      self._VerifyNPlusOneMemory(node_info, instance_cfg)
1555

    
1556
    feedback_fn("* Other Notes")
1557
    if i_non_redundant:
1558
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1559
                  % len(i_non_redundant))
1560

    
1561
    if i_non_a_balanced:
1562
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1563
                  % len(i_non_a_balanced))
1564

    
1565
    if n_offline:
1566
      feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1567

    
1568
    if n_drained:
1569
      feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1570

    
1571
    return not self.bad
1572

    
1573
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1574
    """Analyze the post-hooks' result
1575

1576
    This method analyses the hook result, handles it, and sends some
1577
    nicely-formatted feedback back to the user.
1578

1579
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
1580
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1581
    @param hooks_results: the results of the multi-node hooks rpc call
1582
    @param feedback_fn: function used send feedback back to the caller
1583
    @param lu_result: previous Exec result
1584
    @return: the new Exec result, based on the previous result
1585
        and hook results
1586

1587
    """
1588
    # We only really run POST phase hooks, and are only interested in
1589
    # their results
1590
    if phase == constants.HOOKS_PHASE_POST:
1591
      # Used to change hooks' output to proper indentation
1592
      indent_re = re.compile('^', re.M)
1593
      feedback_fn("* Hooks Results")
1594
      assert hooks_results, "invalid result from hooks"
1595

    
1596
      for node_name in hooks_results:
1597
        res = hooks_results[node_name]
1598
        msg = res.fail_msg
1599
        test = msg and not res.offline
1600
        self._ErrorIf(test, self.ENODEHOOKS, node_name,
1601
                      "Communication failure in hooks execution: %s", msg)
1602
        if test:
1603
          # override manually lu_result here as _ErrorIf only
1604
          # overrides self.bad
1605
          lu_result = 1
1606
          continue
1607
        for script, hkr, output in res.payload:
1608
          test = hkr == constants.HKR_FAIL
1609
          self._ErrorIf(test, self.ENODEHOOKS, node_name,
1610
                        "Script %s failed, output:", script)
1611
          if test:
1612
            output = indent_re.sub('      ', output)
1613
            feedback_fn("%s" % output)
1614
            lu_result = 1
1615

    
1616
      return lu_result
1617

    
1618

    
1619
class LUVerifyDisks(NoHooksLU):
1620
  """Verifies the cluster disks status.
1621

1622
  """
1623
  _OP_REQP = []
1624
  REQ_BGL = False
1625

    
1626
  def ExpandNames(self):
1627
    self.needed_locks = {
1628
      locking.LEVEL_NODE: locking.ALL_SET,
1629
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1630
    }
1631
    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1632

    
1633
  def CheckPrereq(self):
1634
    """Check prerequisites.
1635

1636
    This has no prerequisites.
1637

1638
    """
1639
    pass
1640

    
1641
  def Exec(self, feedback_fn):
1642
    """Verify integrity of cluster disks.
1643

1644
    @rtype: tuple of three items
1645
    @return: a tuple of (dict of node-to-node_error, list of instances
1646
        which need activate-disks, dict of instance: (node, volume) for
1647
        missing volumes
1648

1649
    """
1650
    result = res_nodes, res_instances, res_missing = {}, [], {}
1651

    
1652
    vg_name = self.cfg.GetVGName()
1653
    nodes = utils.NiceSort(self.cfg.GetNodeList())
1654
    instances = [self.cfg.GetInstanceInfo(name)
1655
                 for name in self.cfg.GetInstanceList()]
1656

    
1657
    nv_dict = {}
1658
    for inst in instances:
1659
      inst_lvs = {}
1660
      if (not inst.admin_up or
1661
          inst.disk_template not in constants.DTS_NET_MIRROR):
1662
        continue
1663
      inst.MapLVsByNode(inst_lvs)
1664
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1665
      for node, vol_list in inst_lvs.iteritems():
1666
        for vol in vol_list:
1667
          nv_dict[(node, vol)] = inst
1668

    
1669
    if not nv_dict:
1670
      return result
1671

    
1672
    node_lvs = self.rpc.call_lv_list(nodes, vg_name)
1673

    
1674
    for node in nodes:
1675
      # node_volume
1676
      node_res = node_lvs[node]
1677
      if node_res.offline:
1678
        continue
1679
      msg = node_res.fail_msg
1680
      if msg:
1681
        logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1682
        res_nodes[node] = msg
1683
        continue
1684

    
1685
      lvs = node_res.payload
1686
      for lv_name, (_, _, lv_online) in lvs.items():
1687
        inst = nv_dict.pop((node, lv_name), None)
1688
        if (not lv_online and inst is not None
1689
            and inst.name not in res_instances):
1690
          res_instances.append(inst.name)
1691

    
1692
    # any leftover items in nv_dict are missing LVs, let's arrange the
1693
    # data better
1694
    for key, inst in nv_dict.iteritems():
1695
      if inst.name not in res_missing:
1696
        res_missing[inst.name] = []
1697
      res_missing[inst.name].append(key)
1698

    
1699
    return result
1700

    
1701

    
1702
class LURepairDiskSizes(NoHooksLU):
1703
  """Verifies the cluster disks sizes.
1704

1705
  """
1706
  _OP_REQP = ["instances"]
1707
  REQ_BGL = False
1708

    
1709
  def ExpandNames(self):
1710
    if not isinstance(self.op.instances, list):
1711
      raise errors.OpPrereqError("Invalid argument type 'instances'",
1712
                                 errors.ECODE_INVAL)
1713

    
1714
    if self.op.instances:
1715
      self.wanted_names = []
1716
      for name in self.op.instances:
1717
        full_name = self.cfg.ExpandInstanceName(name)
1718
        if full_name is None:
1719
          raise errors.OpPrereqError("Instance '%s' not known" % name,
1720
                                     errors.ECODE_NOENT)
1721
        self.wanted_names.append(full_name)
1722
      self.needed_locks = {
1723
        locking.LEVEL_NODE: [],
1724
        locking.LEVEL_INSTANCE: self.wanted_names,
1725
        }
1726
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1727
    else:
1728
      self.wanted_names = None
1729
      self.needed_locks = {
1730
        locking.LEVEL_NODE: locking.ALL_SET,
1731
        locking.LEVEL_INSTANCE: locking.ALL_SET,
1732
        }
1733
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1734

    
1735
  def DeclareLocks(self, level):
1736
    if level == locking.LEVEL_NODE and self.wanted_names is not None:
1737
      self._LockInstancesNodes(primary_only=True)
1738

    
1739
  def CheckPrereq(self):
1740
    """Check prerequisites.
1741

1742
    This only checks the optional instance list against the existing names.
1743

1744
    """
1745
    if self.wanted_names is None:
1746
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
1747

    
1748
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
1749
                             in self.wanted_names]
1750

    
1751
  def _EnsureChildSizes(self, disk):
1752
    """Ensure children of the disk have the needed disk size.
1753

1754
    This is valid mainly for DRBD8 and fixes an issue where the
1755
    children have smaller disk size.
1756

1757
    @param disk: an L{ganeti.objects.Disk} object
1758

1759
    """
1760
    if disk.dev_type == constants.LD_DRBD8:
1761
      assert disk.children, "Empty children for DRBD8?"
1762
      fchild = disk.children[0]
1763
      mismatch = fchild.size < disk.size
1764
      if mismatch:
1765
        self.LogInfo("Child disk has size %d, parent %d, fixing",
1766
                     fchild.size, disk.size)
1767
        fchild.size = disk.size
1768

    
1769
      # and we recurse on this child only, not on the metadev
1770
      return self._EnsureChildSizes(fchild) or mismatch
1771
    else:
1772
      return False
1773

    
1774
  def Exec(self, feedback_fn):
1775
    """Verify the size of cluster disks.
1776

1777
    """
1778
    # TODO: check child disks too
1779
    # TODO: check differences in size between primary/secondary nodes
1780
    per_node_disks = {}
1781
    for instance in self.wanted_instances:
1782
      pnode = instance.primary_node
1783
      if pnode not in per_node_disks:
1784
        per_node_disks[pnode] = []
1785
      for idx, disk in enumerate(instance.disks):
1786
        per_node_disks[pnode].append((instance, idx, disk))
1787

    
1788
    changed = []
1789
    for node, dskl in per_node_disks.items():
1790
      newl = [v[2].Copy() for v in dskl]
1791
      for dsk in newl:
1792
        self.cfg.SetDiskID(dsk, node)
1793
      result = self.rpc.call_blockdev_getsizes(node, newl)
1794
      if result.fail_msg:
1795
        self.LogWarning("Failure in blockdev_getsizes call to node"
1796
                        " %s, ignoring", node)
1797
        continue
1798
      if len(result.data) != len(dskl):
1799
        self.LogWarning("Invalid result from node %s, ignoring node results",
1800
                        node)
1801
        continue
1802
      for ((instance, idx, disk), size) in zip(dskl, result.data):
1803
        if size is None:
1804
          self.LogWarning("Disk %d of instance %s did not return size"
1805
                          " information, ignoring", idx, instance.name)
1806
          continue
1807
        if not isinstance(size, (int, long)):
1808
          self.LogWarning("Disk %d of instance %s did not return valid"
1809
                          " size information, ignoring", idx, instance.name)
1810
          continue
1811
        size = size >> 20
1812
        if size != disk.size:
1813
          self.LogInfo("Disk %d of instance %s has mismatched size,"
1814
                       " correcting: recorded %d, actual %d", idx,
1815
                       instance.name, disk.size, size)
1816
          disk.size = size
1817
          self.cfg.Update(instance, feedback_fn)
1818
          changed.append((instance.name, idx, size))
1819
        if self._EnsureChildSizes(disk):
1820
          self.cfg.Update(instance, feedback_fn)
1821
          changed.append((instance.name, idx, disk.size))
1822
    return changed
1823

    
1824

    
1825
class LURenameCluster(LogicalUnit):
1826
  """Rename the cluster.
1827

1828
  """
1829
  HPATH = "cluster-rename"
1830
  HTYPE = constants.HTYPE_CLUSTER
1831
  _OP_REQP = ["name"]
1832

    
1833
  def BuildHooksEnv(self):
1834
    """Build hooks env.
1835

1836
    """
1837
    env = {
1838
      "OP_TARGET": self.cfg.GetClusterName(),
1839
      "NEW_NAME": self.op.name,
1840
      }
1841
    mn = self.cfg.GetMasterNode()
1842
    return env, [mn], [mn]
1843

    
1844
  def CheckPrereq(self):
1845
    """Verify that the passed name is a valid one.
1846

1847
    """
1848
    hostname = utils.GetHostInfo(self.op.name)
1849

    
1850
    new_name = hostname.name
1851
    self.ip = new_ip = hostname.ip
1852
    old_name = self.cfg.GetClusterName()
1853
    old_ip = self.cfg.GetMasterIP()
1854
    if new_name == old_name and new_ip == old_ip:
1855
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1856
                                 " cluster has changed",
1857
                                 errors.ECODE_INVAL)
1858
    if new_ip != old_ip:
1859
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1860
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1861
                                   " reachable on the network. Aborting." %
1862
                                   new_ip, errors.ECODE_NOTUNIQUE)
1863

    
1864
    self.op.name = new_name
1865

    
1866
  def Exec(self, feedback_fn):
1867
    """Rename the cluster.
1868

1869
    """
1870
    clustername = self.op.name
1871
    ip = self.ip
1872

    
1873
    # shutdown the master IP
1874
    master = self.cfg.GetMasterNode()
1875
    result = self.rpc.call_node_stop_master(master, False)
1876
    result.Raise("Could not disable the master role")
1877

    
1878
    try:
1879
      cluster = self.cfg.GetClusterInfo()
1880
      cluster.cluster_name = clustername
1881
      cluster.master_ip = ip
1882
      self.cfg.Update(cluster, feedback_fn)
1883

    
1884
      # update the known hosts file
1885
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1886
      node_list = self.cfg.GetNodeList()
1887
      try:
1888
        node_list.remove(master)
1889
      except ValueError:
1890
        pass
1891
      result = self.rpc.call_upload_file(node_list,
1892
                                         constants.SSH_KNOWN_HOSTS_FILE)
1893
      for to_node, to_result in result.iteritems():
1894
        msg = to_result.fail_msg
1895
        if msg:
1896
          msg = ("Copy of file %s to node %s failed: %s" %
1897
                 (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1898
          self.proc.LogWarning(msg)
1899

    
1900
    finally:
1901
      result = self.rpc.call_node_start_master(master, False, False)
1902
      msg = result.fail_msg
1903
      if msg:
1904
        self.LogWarning("Could not re-enable the master role on"
1905
                        " the master, please restart manually: %s", msg)
1906

    
1907

    
1908
def _RecursiveCheckIfLVMBased(disk):
1909
  """Check if the given disk or its children are lvm-based.
1910

1911
  @type disk: L{objects.Disk}
1912
  @param disk: the disk to check
1913
  @rtype: boolean
1914
  @return: boolean indicating whether a LD_LV dev_type was found or not
1915

1916
  """
1917
  if disk.children:
1918
    for chdisk in disk.children:
1919
      if _RecursiveCheckIfLVMBased(chdisk):
1920
        return True
1921
  return disk.dev_type == constants.LD_LV
1922

    
1923

    
1924
class LUSetClusterParams(LogicalUnit):
1925
  """Change the parameters of the cluster.
1926

1927
  """
1928
  HPATH = "cluster-modify"
1929
  HTYPE = constants.HTYPE_CLUSTER
1930
  _OP_REQP = []
1931
  REQ_BGL = False
1932

    
1933
  def CheckArguments(self):
1934
    """Check parameters
1935

1936
    """
1937
    if not hasattr(self.op, "candidate_pool_size"):
1938
      self.op.candidate_pool_size = None
1939
    if self.op.candidate_pool_size is not None:
1940
      try:
1941
        self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1942
      except (ValueError, TypeError), err:
1943
        raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1944
                                   str(err), errors.ECODE_INVAL)
1945
      if self.op.candidate_pool_size < 1:
1946
        raise errors.OpPrereqError("At least one master candidate needed",
1947
                                   errors.ECODE_INVAL)
1948

    
1949
  def ExpandNames(self):
1950
    # FIXME: in the future maybe other cluster params won't require checking on
1951
    # all nodes to be modified.
1952
    self.needed_locks = {
1953
      locking.LEVEL_NODE: locking.ALL_SET,
1954
    }
1955
    self.share_locks[locking.LEVEL_NODE] = 1
1956

    
1957
  def BuildHooksEnv(self):
1958
    """Build hooks env.
1959

1960
    """
1961
    env = {
1962
      "OP_TARGET": self.cfg.GetClusterName(),
1963
      "NEW_VG_NAME": self.op.vg_name,
1964
      }
1965
    mn = self.cfg.GetMasterNode()
1966
    return env, [mn], [mn]
1967

    
1968
  def CheckPrereq(self):
1969
    """Check prerequisites.
1970

1971
    This checks whether the given params don't conflict and
1972
    if the given volume group is valid.
1973

1974
    """
1975
    if self.op.vg_name is not None and not self.op.vg_name:
1976
      instances = self.cfg.GetAllInstancesInfo().values()
1977
      for inst in instances:
1978
        for disk in inst.disks:
1979
          if _RecursiveCheckIfLVMBased(disk):
1980
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1981
                                       " lvm-based instances exist",
1982
                                       errors.ECODE_INVAL)
1983

    
1984
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1985

    
1986
    # if vg_name not None, checks given volume group on all nodes
1987
    if self.op.vg_name:
1988
      vglist = self.rpc.call_vg_list(node_list)
1989
      for node in node_list:
1990
        msg = vglist[node].fail_msg
1991
        if msg:
1992
          # ignoring down node
1993
          self.LogWarning("Error while gathering data on node %s"
1994
                          " (ignoring node): %s", node, msg)
1995
          continue
1996
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
1997
                                              self.op.vg_name,
1998
                                              constants.MIN_VG_SIZE)
1999
        if vgstatus:
2000
          raise errors.OpPrereqError("Error on node '%s': %s" %
2001
                                     (node, vgstatus), errors.ECODE_ENVIRON)
2002

    
2003
    self.cluster = cluster = self.cfg.GetClusterInfo()
2004
    # validate params changes
2005
    if self.op.beparams:
2006
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
2007
      self.new_beparams = objects.FillDict(
2008
        cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
2009

    
2010
    if self.op.nicparams:
2011
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
2012
      self.new_nicparams = objects.FillDict(
2013
        cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
2014
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
2015
      nic_errors = []
2016

    
2017
      # check all instances for consistency
2018
      for instance in self.cfg.GetAllInstancesInfo().values():
2019
        for nic_idx, nic in enumerate(instance.nics):
2020
          params_copy = copy.deepcopy(nic.nicparams)
2021
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
2022

    
2023
          # check parameter syntax
2024
          try:
2025
            objects.NIC.CheckParameterSyntax(params_filled)
2026
          except errors.ConfigurationError, err:
2027
            nic_errors.append("Instance %s, nic/%d: %s" %
2028
                              (instance.name, nic_idx, err))
2029

    
2030
          # if we're moving instances to routed, check that they have an ip
2031
          target_mode = params_filled[constants.NIC_MODE]
2032
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
2033
            nic_errors.append("Instance %s, nic/%d: routed nick with no ip" %
2034
                              (instance.name, nic_idx))
2035
      if nic_errors:
2036
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
2037
                                   "\n".join(nic_errors))
2038

    
2039
    # hypervisor list/parameters
2040
    self.new_hvparams = objects.FillDict(cluster.hvparams, {})
2041
    if self.op.hvparams:
2042
      if not isinstance(self.op.hvparams, dict):
2043
        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input",
2044
                                   errors.ECODE_INVAL)
2045
      for hv_name, hv_dict in self.op.hvparams.items():
2046
        if hv_name not in self.new_hvparams:
2047
          self.new_hvparams[hv_name] = hv_dict
2048
        else:
2049
          self.new_hvparams[hv_name].update(hv_dict)
2050

    
2051
    if self.op.enabled_hypervisors is not None:
2052
      self.hv_list = self.op.enabled_hypervisors
2053
      if not self.hv_list:
2054
        raise errors.OpPrereqError("Enabled hypervisors list must contain at"
2055
                                   " least one member",
2056
                                   errors.ECODE_INVAL)
2057
      invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES
2058
      if invalid_hvs:
2059
        raise errors.OpPrereqError("Enabled hypervisors contains invalid"
2060
                                   " entries: %s" %
2061
                                   utils.CommaJoin(invalid_hvs),
2062
                                   errors.ECODE_INVAL)
2063
    else:
2064
      self.hv_list = cluster.enabled_hypervisors
2065

    
2066
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
2067
      # either the enabled list has changed, or the parameters have, validate
2068
      for hv_name, hv_params in self.new_hvparams.items():
2069
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
2070
            (self.op.enabled_hypervisors and
2071
             hv_name in self.op.enabled_hypervisors)):
2072
          # either this is a new hypervisor, or its parameters have changed
2073
          hv_class = hypervisor.GetHypervisor(hv_name)
2074
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
2075
          hv_class.CheckParameterSyntax(hv_params)
2076
          _CheckHVParams(self, node_list, hv_name, hv_params)
2077

    
2078
  def Exec(self, feedback_fn):
2079
    """Change the parameters of the cluster.
2080

2081
    """
2082
    if self.op.vg_name is not None:
2083
      new_volume = self.op.vg_name
2084
      if not new_volume:
2085
        new_volume = None
2086
      if new_volume != self.cfg.GetVGName():
2087
        self.cfg.SetVGName(new_volume)
2088
      else:
2089
        feedback_fn("Cluster LVM configuration already in desired"
2090
                    " state, not changing")
2091
    if self.op.hvparams:
2092
      self.cluster.hvparams = self.new_hvparams
2093
    if self.op.enabled_hypervisors is not None:
2094
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
2095
    if self.op.beparams:
2096
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
2097
    if self.op.nicparams:
2098
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
2099

    
2100
    if self.op.candidate_pool_size is not None:
2101
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
2102
      # we need to update the pool size here, otherwise the save will fail
2103
      _AdjustCandidatePool(self, [])
2104

    
2105
    self.cfg.Update(self.cluster, feedback_fn)
2106

    
2107

    
2108
def _RedistributeAncillaryFiles(lu, additional_nodes=None):
2109
  """Distribute additional files which are part of the cluster configuration.
2110

2111
  ConfigWriter takes care of distributing the config and ssconf files, but
2112
  there are more files which should be distributed to all nodes. This function
2113
  makes sure those are copied.
2114

2115
  @param lu: calling logical unit
2116
  @param additional_nodes: list of nodes not in the config to distribute to
2117

2118
  """
2119
  # 1. Gather target nodes
2120
  myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
2121
  dist_nodes = lu.cfg.GetNodeList()
2122
  if additional_nodes is not None:
2123
    dist_nodes.extend(additional_nodes)
2124
  if myself.name in dist_nodes:
2125
    dist_nodes.remove(myself.name)
2126

    
2127
  # 2. Gather files to distribute
2128
  dist_files = set([constants.ETC_HOSTS,
2129
                    constants.SSH_KNOWN_HOSTS_FILE,
2130
                    constants.RAPI_CERT_FILE,
2131
                    constants.RAPI_USERS_FILE,
2132
                    constants.HMAC_CLUSTER_KEY,
2133
                   ])
2134

    
2135
  enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
2136
  for hv_name in enabled_hypervisors:
2137
    hv_class = hypervisor.GetHypervisor(hv_name)
2138
    dist_files.update(hv_class.GetAncillaryFiles())
2139

    
2140
  # 3. Perform the files upload
2141
  for fname in dist_files:
2142
    if os.path.exists(fname):
2143
      result = lu.rpc.call_upload_file(dist_nodes, fname)
2144
      for to_node, to_result in result.items():
2145
        msg = to_result.fail_msg
2146
        if msg:
2147
          msg = ("Copy of file %s to node %s failed: %s" %
2148
                 (fname, to_node, msg))
2149
          lu.proc.LogWarning(msg)
2150

    
2151

    
2152
class LURedistributeConfig(NoHooksLU):
2153
  """Force the redistribution of cluster configuration.
2154

2155
  This is a very simple LU.
2156

2157
  """
2158
  _OP_REQP = []
2159
  REQ_BGL = False
2160

    
2161
  def ExpandNames(self):
2162
    self.needed_locks = {
2163
      locking.LEVEL_NODE: locking.ALL_SET,
2164
    }
2165
    self.share_locks[locking.LEVEL_NODE] = 1
2166

    
2167
  def CheckPrereq(self):
2168
    """Check prerequisites.
2169

2170
    """
2171

    
2172
  def Exec(self, feedback_fn):
2173
    """Redistribute the configuration.
2174

2175
    """
2176
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
2177
    _RedistributeAncillaryFiles(self)
2178

    
2179

    
2180
def _WaitForSync(lu, instance, oneshot=False):
2181
  """Sleep and poll for an instance's disk to sync.
2182

2183
  """
2184
  if not instance.disks:
2185
    return True
2186

    
2187
  if not oneshot:
2188
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
2189

    
2190
  node = instance.primary_node
2191

    
2192
  for dev in instance.disks:
2193
    lu.cfg.SetDiskID(dev, node)
2194

    
2195
  # TODO: Convert to utils.Retry
2196

    
2197
  retries = 0
2198
  degr_retries = 10 # in seconds, as we sleep 1 second each time
2199
  while True:
2200
    max_time = 0
2201
    done = True
2202
    cumul_degraded = False
2203
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
2204
    msg = rstats.fail_msg
2205
    if msg:
2206
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
2207
      retries += 1
2208
      if retries >= 10:
2209
        raise errors.RemoteError("Can't contact node %s for mirror data,"
2210
                                 " aborting." % node)
2211
      time.sleep(6)
2212
      continue
2213
    rstats = rstats.payload
2214
    retries = 0
2215
    for i, mstat in enumerate(rstats):
2216
      if mstat is None:
2217
        lu.LogWarning("Can't compute data for node %s/%s",
2218
                           node, instance.disks[i].iv_name)
2219
        continue
2220

    
2221
      cumul_degraded = (cumul_degraded or
2222
                        (mstat.is_degraded and mstat.sync_percent is None))
2223
      if mstat.sync_percent is not None:
2224
        done = False
2225
        if mstat.estimated_time is not None:
2226
          rem_time = "%d estimated seconds remaining" % mstat.estimated_time
2227
          max_time = mstat.estimated_time
2228
        else:
2229
          rem_time = "no time estimate"
2230
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
2231
                        (instance.disks[i].iv_name, mstat.sync_percent,
2232
                         rem_time))
2233

    
2234
    # if we're done but degraded, let's do a few small retries, to
2235
    # make sure we see a stable and not transient situation; therefore
2236
    # we force restart of the loop
2237
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
2238
      logging.info("Degraded disks found, %d retries left", degr_retries)
2239
      degr_retries -= 1
2240
      time.sleep(1)
2241
      continue
2242

    
2243
    if done or oneshot:
2244
      break
2245

    
2246
    time.sleep(min(60, max_time))
2247

    
2248
  if done:
2249
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
2250
  return not cumul_degraded
2251

    
2252

    
2253
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
2254
  """Check that mirrors are not degraded.
2255

2256
  The ldisk parameter, if True, will change the test from the
2257
  is_degraded attribute (which represents overall non-ok status for
2258
  the device(s)) to the ldisk (representing the local storage status).
2259

2260
  """
2261
  lu.cfg.SetDiskID(dev, node)
2262

    
2263
  result = True
2264

    
2265
  if on_primary or dev.AssembleOnSecondary():
2266
    rstats = lu.rpc.call_blockdev_find(node, dev)
2267
    msg = rstats.fail_msg
2268
    if msg:
2269
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
2270
      result = False
2271
    elif not rstats.payload:
2272
      lu.LogWarning("Can't find disk on node %s", node)
2273
      result = False
2274
    else:
2275
      if ldisk:
2276
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
2277
      else:
2278
        result = result and not rstats.payload.is_degraded
2279

    
2280
  if dev.children:
2281
    for child in dev.children:
2282
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
2283

    
2284
  return result
2285

    
2286

    
2287
class LUDiagnoseOS(NoHooksLU):
2288
  """Logical unit for OS diagnose/query.
2289

2290
  """
2291
  _OP_REQP = ["output_fields", "names"]
2292
  REQ_BGL = False
2293
  _FIELDS_STATIC = utils.FieldSet()
2294
  _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status", "variants")
2295
  # Fields that need calculation of global os validity
2296
  _FIELDS_NEEDVALID = frozenset(["valid", "variants"])
2297

    
2298
  def ExpandNames(self):
2299
    if self.op.names:
2300
      raise errors.OpPrereqError("Selective OS query not supported",
2301
                                 errors.ECODE_INVAL)
2302

    
2303
    _CheckOutputFields(static=self._FIELDS_STATIC,
2304
                       dynamic=self._FIELDS_DYNAMIC,
2305
                       selected=self.op.output_fields)
2306

    
2307
    # Lock all nodes, in shared mode
2308
    # Temporary removal of locks, should be reverted later
2309
    # TODO: reintroduce locks when they are lighter-weight
2310
    self.needed_locks = {}
2311
    #self.share_locks[locking.LEVEL_NODE] = 1
2312
    #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2313

    
2314
  def CheckPrereq(self):
2315
    """Check prerequisites.
2316

2317
    """
2318

    
2319
  @staticmethod
2320
  def _DiagnoseByOS(node_list, rlist):
2321
    """Remaps a per-node return list into an a per-os per-node dictionary
2322

2323
    @param node_list: a list with the names of all nodes
2324
    @param rlist: a map with node names as keys and OS objects as values
2325

2326
    @rtype: dict
2327
    @return: a dictionary with osnames as keys and as value another map, with
2328
        nodes as keys and tuples of (path, status, diagnose) as values, eg::
2329

2330
          {"debian-etch": {"node1": [(/usr/lib/..., True, ""),
2331
                                     (/srv/..., False, "invalid api")],
2332
                           "node2": [(/srv/..., True, "")]}
2333
          }
2334

2335
    """
2336
    all_os = {}
2337
    # we build here the list of nodes that didn't fail the RPC (at RPC
2338
    # level), so that nodes with a non-responding node daemon don't
2339
    # make all OSes invalid
2340
    good_nodes = [node_name for node_name in rlist
2341
                  if not rlist[node_name].fail_msg]
2342
    for node_name, nr in rlist.items():
2343
      if nr.fail_msg or not nr.payload:
2344
        continue
2345
      for name, path, status, diagnose, variants in nr.payload:
2346
        if name not in all_os:
2347
          # build a list of nodes for this os containing empty lists
2348
          # for each node in node_list
2349
          all_os[name] = {}
2350
          for nname in good_nodes:
2351
            all_os[name][nname] = []
2352
        all_os[name][node_name].append((path, status, diagnose, variants))
2353
    return all_os
2354

    
2355
  def Exec(self, feedback_fn):
2356
    """Compute the list of OSes.
2357

2358
    """
2359
    valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
2360
    node_data = self.rpc.call_os_diagnose(valid_nodes)
2361
    pol = self._DiagnoseByOS(valid_nodes, node_data)
2362
    output = []
2363
    calc_valid = self._FIELDS_NEEDVALID.intersection(self.op.output_fields)
2364
    calc_variants = "variants" in self.op.output_fields
2365

    
2366
    for os_name, os_data in pol.items():
2367
      row = []
2368
      if calc_valid:
2369
        valid = True
2370
        variants = None
2371
        for osl in os_data.values():
2372
          valid = valid and osl and osl[0][1]
2373
          if not valid:
2374
            variants = None
2375
            break
2376
          if calc_variants:
2377
            node_variants = osl[0][3]
2378
            if variants is None:
2379
              variants = node_variants
2380
            else:
2381
              variants = [v for v in variants if v in node_variants]
2382

    
2383
      for field in self.op.output_fields:
2384
        if field == "name":
2385
          val = os_name
2386
        elif field == "valid":
2387
          val = valid
2388
        elif field == "node_status":
2389
          # this is just a copy of the dict
2390
          val = {}
2391
          for node_name, nos_list in os_data.items():
2392
            val[node_name] = nos_list
2393
        elif field == "variants":
2394
          val =  variants
2395
        else:
2396
          raise errors.ParameterError(field)
2397
        row.append(val)
2398
      output.append(row)
2399

    
2400
    return output
2401

    
2402

    
2403
class LURemoveNode(LogicalUnit):
2404
  """Logical unit for removing a node.
2405

2406
  """
2407
  HPATH = "node-remove"
2408
  HTYPE = constants.HTYPE_NODE
2409
  _OP_REQP = ["node_name"]
2410

    
2411
  def BuildHooksEnv(self):
2412
    """Build hooks env.
2413

2414
    This doesn't run on the target node in the pre phase as a failed
2415
    node would then be impossible to remove.
2416

2417
    """
2418
    env = {
2419
      "OP_TARGET": self.op.node_name,
2420
      "NODE_NAME": self.op.node_name,
2421
      }
2422
    all_nodes = self.cfg.GetNodeList()
2423
    if self.op.node_name in all_nodes:
2424
      all_nodes.remove(self.op.node_name)
2425
    return env, all_nodes, all_nodes
2426

    
2427
  def CheckPrereq(self):
2428
    """Check prerequisites.
2429

2430
    This checks:
2431
     - the node exists in the configuration
2432
     - it does not have primary or secondary instances
2433
     - it's not the master
2434

2435
    Any errors are signaled by raising errors.OpPrereqError.
2436

2437
    """
2438
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
2439
    if node is None:
2440
      raise errors.OpPrereqError("Node '%s' is unknown." % self.op.node_name,
2441
                                 errors.ECODE_NOENT)
2442

    
2443
    instance_list = self.cfg.GetInstanceList()
2444

    
2445
    masternode = self.cfg.GetMasterNode()
2446
    if node.name == masternode:
2447
      raise errors.OpPrereqError("Node is the master node,"
2448
                                 " you need to failover first.",
2449
                                 errors.ECODE_INVAL)
2450

    
2451
    for instance_name in instance_list:
2452
      instance = self.cfg.GetInstanceInfo(instance_name)
2453
      if node.name in instance.all_nodes:
2454
        raise errors.OpPrereqError("Instance %s is still running on the node,"
2455
                                   " please remove first." % instance_name,
2456
                                   errors.ECODE_INVAL)
2457
    self.op.node_name = node.name
2458
    self.node = node
2459

    
2460
  def Exec(self, feedback_fn):
2461
    """Removes the node from the cluster.
2462

2463
    """
2464
    node = self.node
2465
    logging.info("Stopping the node daemon and removing configs from node %s",
2466
                 node.name)
2467

    
2468
    modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
2469

    
2470
    # Promote nodes to master candidate as needed
2471
    _AdjustCandidatePool(self, exceptions=[node.name])
2472
    self.context.RemoveNode(node.name)
2473

    
2474
    # Run post hooks on the node before it's removed
2475
    hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
2476
    try:
2477
      hm.RunPhase(constants.HOOKS_PHASE_POST, [node.name])
2478
    except:
2479
      self.LogWarning("Errors occurred running hooks on %s" % node.name)
2480

    
2481
    result = self.rpc.call_node_leave_cluster(node.name, modify_ssh_setup)
2482
    msg = result.fail_msg
2483
    if msg:
2484
      self.LogWarning("Errors encountered on the remote node while leaving"
2485
                      " the cluster: %s", msg)
2486

    
2487

    
2488
class LUQueryNodes(NoHooksLU):
2489
  """Logical unit for querying nodes.
2490

2491
  """
2492
  _OP_REQP = ["output_fields", "names", "use_locking"]
2493
  REQ_BGL = False
2494

    
2495
  _SIMPLE_FIELDS = ["name", "serial_no", "ctime", "mtime", "uuid",
2496
                    "master_candidate", "offline", "drained"]
2497

    
2498
  _FIELDS_DYNAMIC = utils.FieldSet(
2499
    "dtotal", "dfree",
2500
    "mtotal", "mnode", "mfree",
2501
    "bootid",
2502
    "ctotal", "cnodes", "csockets",
2503
    )
2504

    
2505
  _FIELDS_STATIC = utils.FieldSet(*[
2506
    "pinst_cnt", "sinst_cnt",
2507
    "pinst_list", "sinst_list",
2508
    "pip", "sip", "tags",
2509
    "master",
2510
    "role"] + _SIMPLE_FIELDS
2511
    )
2512

    
2513
  def ExpandNames(self):
2514
    _CheckOutputFields(static=self._FIELDS_STATIC,
2515
                       dynamic=self._FIELDS_DYNAMIC,
2516
                       selected=self.op.output_fields)
2517

    
2518
    self.needed_locks = {}
2519
    self.share_locks[locking.LEVEL_NODE] = 1
2520

    
2521
    if self.op.names:
2522
      self.wanted = _GetWantedNodes(self, self.op.names)
2523
    else:
2524
      self.wanted = locking.ALL_SET
2525

    
2526
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2527
    self.do_locking = self.do_node_query and self.op.use_locking
2528
    if self.do_locking:
2529
      # if we don't request only static fields, we need to lock the nodes
2530
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
2531

    
2532
  def CheckPrereq(self):
2533
    """Check prerequisites.
2534

2535
    """
2536
    # The validation of the node list is done in the _GetWantedNodes,
2537
    # if non empty, and if empty, there's no validation to do
2538
    pass
2539

    
2540
  def Exec(self, feedback_fn):
2541
    """Computes the list of nodes and their attributes.
2542

2543
    """
2544
    all_info = self.cfg.GetAllNodesInfo()
2545
    if self.do_locking:
2546
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
2547
    elif self.wanted != locking.ALL_SET:
2548
      nodenames = self.wanted
2549
      missing = set(nodenames).difference(all_info.keys())
2550
      if missing:
2551
        raise errors.OpExecError(
2552
          "Some nodes were removed before retrieving their data: %s" % missing)
2553
    else:
2554
      nodenames = all_info.keys()
2555

    
2556
    nodenames = utils.NiceSort(nodenames)
2557
    nodelist = [all_info[name] for name in nodenames]
2558

    
2559
    # begin data gathering
2560

    
2561
    if self.do_node_query:
2562
      live_data = {}
2563
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2564
                                          self.cfg.GetHypervisorType())
2565
      for name in nodenames:
2566
        nodeinfo = node_data[name]
2567
        if not nodeinfo.fail_msg and nodeinfo.payload:
2568
          nodeinfo = nodeinfo.payload
2569
          fn = utils.TryConvert
2570
          live_data[name] = {
2571
            "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2572
            "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2573
            "mfree": fn(int, nodeinfo.get('memory_free', None)),
2574
            "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2575
            "dfree": fn(int, nodeinfo.get('vg_free', None)),
2576
            "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2577
            "bootid": nodeinfo.get('bootid', None),
2578
            "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2579
            "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2580
            }
2581
        else:
2582
          live_data[name] = {}
2583
    else:
2584
      live_data = dict.fromkeys(nodenames, {})
2585

    
2586
    node_to_primary = dict([(name, set()) for name in nodenames])
2587
    node_to_secondary = dict([(name, set()) for name in nodenames])
2588

    
2589
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
2590
                             "sinst_cnt", "sinst_list"))
2591
    if inst_fields & frozenset(self.op.output_fields):
2592
      inst_data = self.cfg.GetAllInstancesInfo()
2593

    
2594
      for inst in inst_data.values():
2595
        if inst.primary_node in node_to_primary:
2596
          node_to_primary[inst.primary_node].add(inst.name)
2597
        for secnode in inst.secondary_nodes:
2598
          if secnode in node_to_secondary:
2599
            node_to_secondary[secnode].add(inst.name)
2600

    
2601
    master_node = self.cfg.GetMasterNode()
2602

    
2603
    # end data gathering
2604

    
2605
    output = []
2606
    for node in nodelist:
2607
      node_output = []
2608
      for field in self.op.output_fields:
2609
        if field in self._SIMPLE_FIELDS:
2610
          val = getattr(node, field)
2611
        elif field == "pinst_list":
2612
          val = list(node_to_primary[node.name])
2613
        elif field == "sinst_list":
2614
          val = list(node_to_secondary[node.name])
2615
        elif field == "pinst_cnt":
2616
          val = len(node_to_primary[node.name])
2617
        elif field == "sinst_cnt":
2618
          val = len(node_to_secondary[node.name])
2619
        elif field == "pip":
2620
          val = node.primary_ip
2621
        elif field == "sip":
2622
          val = node.secondary_ip
2623
        elif field == "tags":
2624
          val = list(node.GetTags())
2625
        elif field == "master":
2626
          val = node.name == master_node
2627
        elif self._FIELDS_DYNAMIC.Matches(field):
2628
          val = live_data[node.name].get(field, None)
2629
        elif field == "role":
2630
          if node.name == master_node:
2631
            val = "M"
2632
          elif node.master_candidate:
2633
            val = "C"
2634
          elif node.drained:
2635
            val = "D"
2636
          elif node.offline:
2637
            val = "O"
2638
          else:
2639
            val = "R"
2640
        else:
2641
          raise errors.ParameterError(field)
2642
        node_output.append(val)
2643
      output.append(node_output)
2644

    
2645
    return output
2646

    
2647

    
2648
class LUQueryNodeVolumes(NoHooksLU):
2649
  """Logical unit for getting volumes on node(s).
2650

2651
  """
2652
  _OP_REQP = ["nodes", "output_fields"]
2653
  REQ_BGL = False
2654
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2655
  _FIELDS_STATIC = utils.FieldSet("node")
2656

    
2657
  def ExpandNames(self):
2658
    _CheckOutputFields(static=self._FIELDS_STATIC,
2659
                       dynamic=self._FIELDS_DYNAMIC,
2660
                       selected=self.op.output_fields)
2661

    
2662
    self.needed_locks = {}
2663
    self.share_locks[locking.LEVEL_NODE] = 1
2664
    if not self.op.nodes:
2665
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2666
    else:
2667
      self.needed_locks[locking.LEVEL_NODE] = \
2668
        _GetWantedNodes(self, self.op.nodes)
2669

    
2670
  def CheckPrereq(self):
2671
    """Check prerequisites.
2672

2673
    This checks that the fields required are valid output fields.
2674

2675
    """
2676
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2677

    
2678
  def Exec(self, feedback_fn):
2679
    """Computes the list of nodes and their attributes.
2680

2681
    """
2682
    nodenames = self.nodes
2683
    volumes = self.rpc.call_node_volumes(nodenames)
2684

    
2685
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
2686
             in self.cfg.GetInstanceList()]
2687

    
2688
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2689

    
2690
    output = []
2691
    for node in nodenames:
2692
      nresult = volumes[node]
2693
      if nresult.offline:
2694
        continue
2695
      msg = nresult.fail_msg
2696
      if msg:
2697
        self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
2698
        continue
2699

    
2700
      node_vols = nresult.payload[:]
2701
      node_vols.sort(key=lambda vol: vol['dev'])
2702

    
2703
      for vol in node_vols:
2704
        node_output = []
2705
        for field in self.op.output_fields:
2706
          if field == "node":
2707
            val = node
2708
          elif field == "phys":
2709
            val = vol['dev']
2710
          elif field == "vg":
2711
            val = vol['vg']
2712
          elif field == "name":
2713
            val = vol['name']
2714
          elif field == "size":
2715
            val = int(float(vol['size']))
2716
          elif field == "instance":
2717
            for inst in ilist:
2718
              if node not in lv_by_node[inst]:
2719
                continue
2720
              if vol['name'] in lv_by_node[inst][node]:
2721
                val = inst.name
2722
                break
2723
            else:
2724
              val = '-'
2725
          else:
2726
            raise errors.ParameterError(field)
2727
          node_output.append(str(val))
2728

    
2729
        output.append(node_output)
2730

    
2731
    return output
2732

    
2733

    
2734
class LUQueryNodeStorage(NoHooksLU):
2735
  """Logical unit for getting information on storage units on node(s).
2736

2737
  """
2738
  _OP_REQP = ["nodes", "storage_type", "output_fields"]
2739
  REQ_BGL = False
2740
  _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
2741

    
2742
  def ExpandNames(self):
2743
    storage_type = self.op.storage_type
2744

    
2745
    if storage_type not in constants.VALID_STORAGE_TYPES:
2746
      raise errors.OpPrereqError("Unknown storage type: %s" % storage_type,
2747
                                 errors.ECODE_INVAL)
2748

    
2749
    _CheckOutputFields(static=self._FIELDS_STATIC,
2750
                       dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
2751
                       selected=self.op.output_fields)
2752

    
2753
    self.needed_locks = {}
2754
    self.share_locks[locking.LEVEL_NODE] = 1
2755

    
2756
    if self.op.nodes:
2757
      self.needed_locks[locking.LEVEL_NODE] = \
2758
        _GetWantedNodes(self, self.op.nodes)
2759
    else:
2760
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2761

    
2762
  def CheckPrereq(self):
2763
    """Check prerequisites.
2764

2765
    This checks that the fields required are valid output fields.
2766

2767
    """
2768
    self.op.name = getattr(self.op, "name", None)
2769

    
2770
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2771

    
2772
  def Exec(self, feedback_fn):
2773
    """Computes the list of nodes and their attributes.
2774

2775
    """
2776
    # Always get name to sort by
2777
    if constants.SF_NAME in self.op.output_fields:
2778
      fields = self.op.output_fields[:]
2779
    else:
2780
      fields = [constants.SF_NAME] + self.op.output_fields
2781

    
2782
    # Never ask for node or type as it's only known to the LU
2783
    for extra in [constants.SF_NODE, constants.SF_TYPE]:
2784
      while extra in fields:
2785
        fields.remove(extra)
2786

    
2787
    field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
2788
    name_idx = field_idx[constants.SF_NAME]
2789

    
2790
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2791
    data = self.rpc.call_storage_list(self.nodes,
2792
                                      self.op.storage_type, st_args,
2793
                                      self.op.name, fields)
2794

    
2795
    result = []
2796

    
2797
    for node in utils.NiceSort(self.nodes):
2798
      nresult = data[node]
2799
      if nresult.offline:
2800
        continue
2801

    
2802
      msg = nresult.fail_msg
2803
      if msg:
2804
        self.LogWarning("Can't get storage data from node %s: %s", node, msg)
2805
        continue
2806

    
2807
      rows = dict([(row[name_idx], row) for row in nresult.payload])
2808

    
2809
      for name in utils.NiceSort(rows.keys()):
2810
        row = rows[name]
2811

    
2812
        out = []
2813

    
2814
        for field in self.op.output_fields:
2815
          if field == constants.SF_NODE:
2816
            val = node
2817
          elif field == constants.SF_TYPE:
2818
            val = self.op.storage_type
2819
          elif field in field_idx:
2820
            val = row[field_idx[field]]
2821
          else:
2822
            raise errors.ParameterError(field)
2823

    
2824
          out.append(val)
2825

    
2826
        result.append(out)
2827

    
2828
    return result
2829

    
2830

    
2831
class LUModifyNodeStorage(NoHooksLU):
2832
  """Logical unit for modifying a storage volume on a node.
2833

2834
  """
2835
  _OP_REQP = ["node_name", "storage_type", "name", "changes"]
2836
  REQ_BGL = False
2837

    
2838
  def CheckArguments(self):
2839
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
2840
    if node_name is None:
2841
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name,
2842
                                 errors.ECODE_NOENT)
2843

    
2844
    self.op.node_name = node_name
2845

    
2846
    storage_type = self.op.storage_type
2847
    if storage_type not in constants.VALID_STORAGE_TYPES:
2848
      raise errors.OpPrereqError("Unknown storage type: %s" % storage_type,
2849
                                 errors.ECODE_INVAL)
2850

    
2851
  def ExpandNames(self):
2852
    self.needed_locks = {
2853
      locking.LEVEL_NODE: self.op.node_name,
2854
      }
2855

    
2856
  def CheckPrereq(self):
2857
    """Check prerequisites.
2858

2859
    """
2860
    storage_type = self.op.storage_type
2861

    
2862
    try:
2863
      modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
2864
    except KeyError:
2865
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
2866
                                 " modified" % storage_type,
2867
                                 errors.ECODE_INVAL)
2868

    
2869
    diff = set(self.op.changes.keys()) - modifiable
2870
    if diff:
2871
      raise errors.OpPrereqError("The following fields can not be modified for"
2872
                                 " storage units of type '%s': %r" %
2873
                                 (storage_type, list(diff)),
2874
                                 errors.ECODE_INVAL)
2875

    
2876
  def Exec(self, feedback_fn):
2877
    """Computes the list of nodes and their attributes.
2878

2879
    """
2880
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2881
    result = self.rpc.call_storage_modify(self.op.node_name,
2882
                                          self.op.storage_type, st_args,
2883
                                          self.op.name, self.op.changes)
2884
    result.Raise("Failed to modify storage unit '%s' on %s" %
2885
                 (self.op.name, self.op.node_name))
2886

    
2887

    
2888
class LUAddNode(LogicalUnit):
2889
  """Logical unit for adding node to the cluster.
2890

2891
  """
2892
  HPATH = "node-add"
2893
  HTYPE = constants.HTYPE_NODE
2894
  _OP_REQP = ["node_name"]
2895

    
2896
  def BuildHooksEnv(self):
2897
    """Build hooks env.
2898

2899
    This will run on all nodes before, and on all nodes + the new node after.
2900

2901
    """
2902
    env = {
2903
      "OP_TARGET": self.op.node_name,
2904
      "NODE_NAME": self.op.node_name,
2905
      "NODE_PIP": self.op.primary_ip,
2906
      "NODE_SIP": self.op.secondary_ip,
2907
      }
2908
    nodes_0 = self.cfg.GetNodeList()
2909
    nodes_1 = nodes_0 + [self.op.node_name, ]
2910
    return env, nodes_0, nodes_1
2911

    
2912
  def CheckPrereq(self):
2913
    """Check prerequisites.
2914

2915
    This checks:
2916
     - the new node is not already in the config
2917
     - it is resolvable
2918
     - its parameters (single/dual homed) matches the cluster
2919

2920
    Any errors are signaled by raising errors.OpPrereqError.
2921

2922
    """
2923
    node_name = self.op.node_name
2924
    cfg = self.cfg
2925

    
2926
    dns_data = utils.GetHostInfo(node_name)
2927

    
2928
    node = dns_data.name
2929
    primary_ip = self.op.primary_ip = dns_data.ip
2930
    secondary_ip = getattr(self.op, "secondary_ip", None)
2931
    if secondary_ip is None:
2932
      secondary_ip = primary_ip
2933
    if not utils.IsValidIP(secondary_ip):
2934
      raise errors.OpPrereqError("Invalid secondary IP given",
2935
                                 errors.ECODE_INVAL)
2936
    self.op.secondary_ip = secondary_ip
2937

    
2938
    node_list = cfg.GetNodeList()
2939
    if not self.op.readd and node in node_list:
2940
      raise errors.OpPrereqError("Node %s is already in the configuration" %
2941
                                 node, errors.ECODE_EXISTS)
2942
    elif self.op.readd and node not in node_list:
2943
      raise errors.OpPrereqError("Node %s is not in the configuration" % node,
2944
                                 errors.ECODE_NOENT)
2945

    
2946
    for existing_node_name in node_list:
2947
      existing_node = cfg.GetNodeInfo(existing_node_name)
2948

    
2949
      if self.op.readd and node == existing_node_name:
2950
        if (existing_node.primary_ip != primary_ip or
2951
            existing_node.secondary_ip != secondary_ip):
2952
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
2953
                                     " address configuration as before",
2954
                                     errors.ECODE_INVAL)
2955
        continue
2956

    
2957
      if (existing_node.primary_ip == primary_ip or
2958
          existing_node.secondary_ip == primary_ip or
2959
          existing_node.primary_ip == secondary_ip or
2960
          existing_node.secondary_ip == secondary_ip):
2961
        raise errors.OpPrereqError("New node ip address(es) conflict with"
2962
                                   " existing node %s" % existing_node.name,
2963
                                   errors.ECODE_NOTUNIQUE)
2964

    
2965
    # check that the type of the node (single versus dual homed) is the
2966
    # same as for the master
2967
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2968
    master_singlehomed = myself.secondary_ip == myself.primary_ip
2969
    newbie_singlehomed = secondary_ip == primary_ip
2970
    if master_singlehomed != newbie_singlehomed:
2971
      if master_singlehomed:
2972
        raise errors.OpPrereqError("The master has no private ip but the"
2973
                                   " new node has one",
2974
                                   errors.ECODE_INVAL)
2975
      else:
2976
        raise errors.OpPrereqError("The master has a private ip but the"
2977
                                   " new node doesn't have one",
2978
                                   errors.ECODE_INVAL)
2979

    
2980
    # checks reachability
2981
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2982
      raise errors.OpPrereqError("Node not reachable by ping",
2983
                                 errors.ECODE_ENVIRON)
2984

    
2985
    if not newbie_singlehomed:
2986
      # check reachability from my secondary ip to newbie's secondary ip
2987
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2988
                           source=myself.secondary_ip):
2989
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2990
                                   " based ping to noded port",
2991
                                   errors.ECODE_ENVIRON)
2992

    
2993
    if self.op.readd:
2994
      exceptions = [node]
2995
    else:
2996
      exceptions = []
2997

    
2998
    self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
2999

    
3000
    if self.op.readd:
3001
      self.new_node = self.cfg.GetNodeInfo(node)
3002
      assert self.new_node is not None, "Can't retrieve locked node %s" % node
3003
    else:
3004
      self.new_node = objects.Node(name=node,
3005
                                   primary_ip=primary_ip,
3006
                                   secondary_ip=secondary_ip,
3007
                                   master_candidate=self.master_candidate,
3008
                                   offline=False, drained=False)
3009

    
3010
  def Exec(self, feedback_fn):
3011
    """Adds the new node to the cluster.
3012

3013
    """
3014
    new_node = self.new_node
3015
    node = new_node.name
3016

    
3017
    # for re-adds, reset the offline/drained/master-candidate flags;
3018
    # we need to reset here, otherwise offline would prevent RPC calls
3019
    # later in the procedure; this also means that if the re-add
3020
    # fails, we are left with a non-offlined, broken node
3021
    if self.op.readd:
3022
      new_node.drained = new_node.offline = False
3023
      self.LogInfo("Readding a node, the offline/drained flags were reset")
3024
      # if we demote the node, we do cleanup later in the procedure
3025
      new_node.master_candidate = self.master_candidate
3026

    
3027
    # notify the user about any possible mc promotion
3028
    if new_node.master_candidate:
3029
      self.LogInfo("Node will be a master candidate")
3030

    
3031
    # check connectivity
3032
    result = self.rpc.call_version([node])[node]
3033
    result.Raise("Can't get version information from node %s" % node)
3034
    if constants.PROTOCOL_VERSION == result.payload:
3035
      logging.info("Communication to node %s fine, sw version %s match",
3036
                   node, result.payload)
3037
    else:
3038
      raise errors.OpExecError("Version mismatch master version %s,"
3039
                               " node version %s" %
3040
                               (constants.PROTOCOL_VERSION, result.payload))
3041

    
3042
    # setup ssh on node
3043
    if self.cfg.GetClusterInfo().modify_ssh_setup:
3044
      logging.info("Copy ssh key to node %s", node)
3045
      priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
3046
      keyarray = []
3047
      keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
3048
                  constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
3049
                  priv_key, pub_key]
3050

    
3051
      for i in keyfiles:
3052
        keyarray.append(utils.ReadFile(i))
3053

    
3054
      result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
3055
                                      keyarray[2], keyarray[3], keyarray[4],
3056
                                      keyarray[5])
3057
      result.Raise("Cannot transfer ssh keys to the new node")
3058

    
3059
    # Add node to our /etc/hosts, and add key to known_hosts
3060
    if self.cfg.GetClusterInfo().modify_etc_hosts:
3061
      utils.AddHostToEtcHosts(new_node.name)
3062

    
3063
    if new_node.secondary_ip != new_node.primary_ip:
3064
      result = self.rpc.call_node_has_ip_address(new_node.name,
3065
                                                 new_node.secondary_ip)
3066
      result.Raise("Failure checking secondary ip on node %s" % new_node.name,
3067
                   prereq=True, ecode=errors.ECODE_ENVIRON)
3068
      if not result.payload:
3069
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
3070
                                 " you gave (%s). Please fix and re-run this"
3071
                                 " command." % new_node.secondary_ip)
3072

    
3073
    node_verify_list = [self.cfg.GetMasterNode()]
3074
    node_verify_param = {
3075
      constants.NV_NODELIST: [node],
3076
      # TODO: do a node-net-test as well?
3077
    }
3078

    
3079
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
3080
                                       self.cfg.GetClusterName())
3081
    for verifier in node_verify_list:
3082
      result[verifier].Raise("Cannot communicate with node %s" % verifier)
3083
      nl_payload = result[verifier].payload[constants.NV_NODELIST]
3084
      if nl_payload:
3085
        for failed in nl_payload:
3086
          feedback_fn("ssh/hostname verification failed"
3087
                      " (checking from %s): %s" %
3088
                      (verifier, nl_payload[failed]))
3089
        raise errors.OpExecError("ssh/hostname verification failed.")
3090

    
3091
    if self.op.readd:
3092
      _RedistributeAncillaryFiles(self)
3093
      self.context.ReaddNode(new_node)
3094
      # make sure we redistribute the config
3095
      self.cfg.Update(new_node, feedback_fn)
3096
      # and make sure the new node will not have old files around
3097
      if not new_node.master_candidate:
3098
        result = self.rpc.call_node_demote_from_mc(new_node.name)
3099
        msg = result.fail_msg
3100
        if msg:
3101
          self.LogWarning("Node failed to demote itself from master"
3102
                          " candidate status: %s" % msg)
3103
    else:
3104
      _RedistributeAncillaryFiles(self, additional_nodes=[node])
3105
      self.context.AddNode(new_node, self.proc.GetECId())
3106

    
3107

    
3108
class LUSetNodeParams(LogicalUnit):
3109
  """Modifies the parameters of a node.
3110

3111
  """
3112
  HPATH = "node-modify"
3113
  HTYPE = constants.HTYPE_NODE
3114
  _OP_REQP = ["node_name"]
3115
  REQ_BGL = False
3116

    
3117
  def CheckArguments(self):
3118
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
3119
    if node_name is None:
3120
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name,
3121
                                 errors.ECODE_INVAL)
3122
    self.op.node_name = node_name
3123
    _CheckBooleanOpField(self.op, 'master_candidate')
3124
    _CheckBooleanOpField(self.op, 'offline')
3125
    _CheckBooleanOpField(self.op, 'drained')
3126
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
3127
    if all_mods.count(None) == 3:
3128
      raise errors.OpPrereqError("Please pass at least one modification",
3129
                                 errors.ECODE_INVAL)
3130
    if all_mods.count(True) > 1:
3131
      raise errors.OpPrereqError("Can't set the node into more than one"
3132
                                 " state at the same time",
3133
                                 errors.ECODE_INVAL)
3134

    
3135
  def ExpandNames(self):
3136
    self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
3137

    
3138
  def BuildHooksEnv(self):
3139
    """Build hooks env.
3140

3141
    This runs on the master node.
3142

3143
    """
3144
    env = {
3145
      "OP_TARGET": self.op.node_name,
3146
      "MASTER_CANDIDATE": str(self.op.master_candidate),
3147
      "OFFLINE": str(self.op.offline),
3148
      "DRAINED": str(self.op.drained),
3149
      }
3150
    nl = [self.cfg.GetMasterNode(),
3151
          self.op.node_name]
3152
    return env, nl, nl
3153

    
3154
  def CheckPrereq(self):
3155
    """Check prerequisites.
3156

3157
    This only checks the instance list against the existing names.
3158

3159
    """
3160
    node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
3161

    
3162
    if (self.op.master_candidate is not None or
3163
        self.op.drained is not None or
3164
        self.op.offline is not None):
3165
      # we can't change the master's node flags
3166
      if self.op.node_name == self.cfg.GetMasterNode():
3167
        raise errors.OpPrereqError("The master role can be changed"
3168
                                   " only via masterfailover",
3169
                                   errors.ECODE_INVAL)
3170

    
3171
    # Boolean value that tells us whether we're offlining or draining the node
3172
    offline_or_drain = self.op.offline == True or self.op.drained == True
3173
    deoffline_or_drain = self.op.offline == False or self.op.drained == False
3174

    
3175
    if (node.master_candidate and
3176
        (self.op.master_candidate == False or offline_or_drain)):
3177
      cp_size = self.cfg.GetClusterInfo().candidate_pool_size
3178
      mc_now, mc_should, mc_max = self.cfg.GetMasterCandidateStats()
3179
      if mc_now <= cp_size:
3180
        msg = ("Not enough master candidates (desired"
3181
               " %d, new value will be %d)" % (cp_size, mc_now-1))
3182
        # Only allow forcing the operation if it's an offline/drain operation,
3183
        # and we could not possibly promote more nodes.
3184
        # FIXME: this can still lead to issues if in any way another node which
3185
        # could be promoted appears in the meantime.
3186
        if self.op.force and offline_or_drain and mc_should == mc_max:
3187
          self.LogWarning(msg)
3188
        else:
3189
          raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
3190

    
3191
    if (self.op.master_candidate == True and
3192
        ((node.offline and not self.op.offline == False) or
3193
         (node.drained and not self.op.drained == False))):
3194
      raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
3195
                                 " to master_candidate" % node.name,
3196
                                 errors.ECODE_INVAL)
3197

    
3198
    # If we're being deofflined/drained, we'll MC ourself if needed
3199
    if (deoffline_or_drain and not offline_or_drain and not
3200
        self.op.master_candidate == True):
3201
      self.op.master_candidate = _DecideSelfPromotion(self)
3202
      if self.op.master_candidate:
3203
        self.LogInfo("Autopromoting node to master candidate")
3204

    
3205
    return
3206

    
3207
  def Exec(self, feedback_fn):
3208
    """Modifies a node.
3209

3210
    """
3211
    node = self.node
3212

    
3213
    result = []
3214
    changed_mc = False
3215

    
3216
    if self.op.offline is not None:
3217
      node.offline = self.op.offline
3218
      result.append(("offline", str(self.op.offline)))
3219
      if self.op.offline == True:
3220
        if node.master_candidate:
3221
          node.master_candidate = False
3222
          changed_mc = True
3223
          result.append(("master_candidate", "auto-demotion due to offline"))
3224
        if node.drained:
3225
          node.drained = False
3226
          result.append(("drained", "clear drained status due to offline"))
3227

    
3228
    if self.op.master_candidate is not None:
3229
      node.master_candidate = self.op.master_candidate
3230
      changed_mc = True
3231
      result.append(("master_candidate", str(self.op.master_candidate)))
3232
      if self.op.master_candidate == False:
3233
        rrc = self.rpc.call_node_demote_from_mc(node.name)
3234
        msg = rrc.fail_msg
3235
        if msg:
3236
          self.LogWarning("Node failed to demote itself: %s" % msg)
3237

    
3238
    if self.op.drained is not None:
3239
      node.drained = self.op.drained
3240
      result.append(("drained", str(self.op.drained)))
3241
      if self.op.drained == True:
3242
        if node.master_candidate:
3243
          node.master_candidate = False
3244
          changed_mc = True
3245
          result.append(("master_candidate", "auto-demotion due to drain"))
3246
          rrc = self.rpc.call_node_demote_from_mc(node.name)
3247
          msg = rrc.fail_msg
3248
          if msg:
3249
            self.LogWarning("Node failed to demote itself: %s" % msg)
3250
        if node.offline:
3251
          node.offline = False
3252
          result.append(("offline", "clear offline status due to drain"))
3253

    
3254
    # this will trigger configuration file update, if needed
3255
    self.cfg.Update(node, feedback_fn)
3256
    # this will trigger job queue propagation or cleanup
3257
    if changed_mc:
3258
      self.context.ReaddNode(node)
3259

    
3260
    return result
3261

    
3262

    
3263
class LUPowercycleNode(NoHooksLU):
3264
  """Powercycles a node.
3265

3266
  """
3267
  _OP_REQP = ["node_name", "force"]
3268
  REQ_BGL = False
3269

    
3270
  def CheckArguments(self):
3271
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
3272
    if node_name is None:
3273
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name,
3274
                                 errors.ECODE_NOENT)
3275
    self.op.node_name = node_name
3276
    if node_name == self.cfg.GetMasterNode() and not self.op.force:
3277
      raise errors.OpPrereqError("The node is the master and the force"
3278
                                 " parameter was not set",
3279
                                 errors.ECODE_INVAL)
3280

    
3281
  def ExpandNames(self):
3282
    """Locking for PowercycleNode.
3283

3284
    This is a last-resort option and shouldn't block on other
3285
    jobs. Therefore, we grab no locks.
3286

3287
    """
3288
    self.needed_locks = {}
3289

    
3290
  def CheckPrereq(self):
3291
    """Check prerequisites.
3292

3293
    This LU has no prereqs.
3294

3295
    """
3296
    pass
3297

    
3298
  def Exec(self, feedback_fn):
3299
    """Reboots a node.
3300

3301
    """
3302
    result = self.rpc.call_node_powercycle(self.op.node_name,
3303
                                           self.cfg.GetHypervisorType())
3304
    result.Raise("Failed to schedule the reboot")
3305
    return result.payload
3306

    
3307

    
3308
class LUQueryClusterInfo(NoHooksLU):
3309
  """Query cluster configuration.
3310

3311
  """
3312
  _OP_REQP = []
3313
  REQ_BGL = False
3314

    
3315
  def ExpandNames(self):
3316
    self.needed_locks = {}
3317

    
3318
  def CheckPrereq(self):
3319
    """No prerequsites needed for this LU.
3320

3321
    """
3322
    pass
3323

    
3324
  def Exec(self, feedback_fn):
3325
    """Return cluster config.
3326

3327
    """
3328
    cluster = self.cfg.GetClusterInfo()
3329
    result = {
3330
      "software_version": constants.RELEASE_VERSION,
3331
      "protocol_version": constants.PROTOCOL_VERSION,
3332
      "config_version": constants.CONFIG_VERSION,
3333
      "os_api_version": max(constants.OS_API_VERSIONS),
3334
      "export_version": constants.EXPORT_VERSION,
3335
      "architecture": (platform.architecture()[0], platform.machine()),
3336
      "name": cluster.cluster_name,
3337
      "master": cluster.master_node,
3338
      "default_hypervisor": cluster.enabled_hypervisors[0],
3339
      "enabled_hypervisors": cluster.enabled_hypervisors,
3340
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
3341
                        for hypervisor_name in cluster.enabled_hypervisors]),
3342
      "beparams": cluster.beparams,
3343
      "nicparams": cluster.nicparams,
3344
      "candidate_pool_size": cluster.candidate_pool_size,
3345
      "master_netdev": cluster.master_netdev,
3346
      "volume_group_name": cluster.volume_group_name,
3347
      "file_storage_dir": cluster.file_storage_dir,
3348
      "ctime": cluster.ctime,
3349
      "mtime": cluster.mtime,
3350
      "uuid": cluster.uuid,
3351
      "tags": list(cluster.GetTags()),
3352
      }
3353

    
3354
    return result
3355

    
3356

    
3357
class LUQueryConfigValues(NoHooksLU):
3358
  """Return configuration values.
3359

3360
  """
3361
  _OP_REQP = []
3362
  REQ_BGL = False
3363
  _FIELDS_DYNAMIC = utils.FieldSet()
3364
  _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag",
3365
                                  "watcher_pause")
3366

    
3367
  def ExpandNames(self):
3368
    self.needed_locks = {}
3369

    
3370
    _CheckOutputFields(static=self._FIELDS_STATIC,
3371
                       dynamic=self._FIELDS_DYNAMIC,
3372
                       selected=self.op.output_fields)
3373

    
3374
  def CheckPrereq(self):
3375
    """No prerequisites.
3376

3377
    """
3378
    pass
3379

    
3380
  def Exec(self, feedback_fn):
3381
    """Dump a representation of the cluster config to the standard output.
3382

3383
    """
3384
    values = []
3385
    for field in self.op.output_fields:
3386
      if field == "cluster_name":
3387
        entry = self.cfg.GetClusterName()
3388
      elif field == "master_node":
3389
        entry = self.cfg.GetMasterNode()
3390
      elif field == "drain_flag":
3391
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
3392
      elif field == "watcher_pause":
3393
        return utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE)
3394
      else:
3395
        raise errors.ParameterError(field)
3396
      values.append(entry)
3397
    return values
3398

    
3399

    
3400
class LUActivateInstanceDisks(NoHooksLU):
3401
  """Bring up an instance's disks.
3402

3403
  """
3404
  _OP_REQP = ["instance_name"]
3405
  REQ_BGL = False
3406

    
3407
  def ExpandNames(self):
3408
    self._ExpandAndLockInstance()
3409
    self.needed_locks[locking.LEVEL_NODE] = []
3410
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3411

    
3412
  def DeclareLocks(self, level):
3413
    if level == locking.LEVEL_NODE:
3414
      self._LockInstancesNodes()
3415

    
3416
  def CheckPrereq(self):
3417
    """Check prerequisites.
3418

3419
    This checks that the instance is in the cluster.
3420

3421
    """
3422
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3423
    assert self.instance is not None, \
3424
      "Cannot retrieve locked instance %s" % self.op.instance_name
3425
    _CheckNodeOnline(self, self.instance.primary_node)
3426
    if not hasattr(self.op, "ignore_size"):
3427
      self.op.ignore_size = False
3428

    
3429
  def Exec(self, feedback_fn):
3430
    """Activate the disks.
3431

3432
    """
3433
    disks_ok, disks_info = \
3434
              _AssembleInstanceDisks(self, self.instance,
3435
                                     ignore_size=self.op.ignore_size)
3436
    if not disks_ok:
3437
      raise errors.OpExecError("Cannot activate block devices")
3438

    
3439
    return disks_info
3440

    
3441

    
3442
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False,
3443
                           ignore_size=False):
3444
  """Prepare the block devices for an instance.
3445

3446
  This sets up the block devices on all nodes.
3447

3448
  @type lu: L{LogicalUnit}
3449
  @param lu: the logical unit on whose behalf we execute
3450
  @type instance: L{objects.Instance}
3451
  @param instance: the instance for whose disks we assemble
3452
  @type ignore_secondaries: boolean
3453
  @param ignore_secondaries: if true, errors on secondary nodes
3454
      won't result in an error return from the function
3455
  @type ignore_size: boolean
3456
  @param ignore_size: if true, the current known size of the disk
3457
      will not be used during the disk activation, useful for cases
3458
      when the size is wrong
3459
  @return: False if the operation failed, otherwise a list of
3460
      (host, instance_visible_name, node_visible_name)
3461
      with the mapping from node devices to instance devices
3462

3463
  """
3464
  device_info = []
3465
  disks_ok = True
3466
  iname = instance.name
3467
  # With the two passes mechanism we try to reduce the window of
3468
  # opportunity for the race condition of switching DRBD to primary
3469
  # before handshaking occured, but we do not eliminate it
3470

    
3471
  # The proper fix would be to wait (with some limits) until the
3472
  # connection has been made and drbd transitions from WFConnection
3473
  # into any other network-connected state (Connected, SyncTarget,
3474
  # SyncSource, etc.)
3475

    
3476
  # 1st pass, assemble on all nodes in secondary mode
3477
  for inst_disk in instance.disks:
3478
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3479
      if ignore_size:
3480
        node_disk = node_disk.Copy()
3481
        node_disk.UnsetSize()
3482
      lu.cfg.SetDiskID(node_disk, node)
3483
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
3484
      msg = result.fail_msg
3485
      if msg:
3486
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
3487
                           " (is_primary=False, pass=1): %s",
3488
                           inst_disk.iv_name, node, msg)
3489
        if not ignore_secondaries:
3490
          disks_ok = False
3491

    
3492
  # FIXME: race condition on drbd migration to primary
3493

    
3494
  # 2nd pass, do only the primary node
3495
  for inst_disk in instance.disks:
3496
    dev_path = None
3497

    
3498
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3499
      if node != instance.primary_node:
3500
        continue
3501
      if ignore_size:
3502
        node_disk = node_disk.Copy()
3503
        node_disk.UnsetSize()
3504
      lu.cfg.SetDiskID(node_disk, node)
3505
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
3506
      msg = result.fail_msg
3507
      if msg:
3508
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
3509
                           " (is_primary=True, pass=2): %s",
3510
                           inst_disk.iv_name, node, msg)
3511
        disks_ok = False
3512
      else:
3513
        dev_path = result.payload
3514

    
3515
    device_info.append((instance.primary_node, inst_disk.iv_name, dev_path))
3516

    
3517
  # leave the disks configured for the primary node
3518
  # this is a workaround that would be fixed better by
3519
  # improving the logical/physical id handling
3520
  for disk in instance.disks:
3521
    lu.cfg.SetDiskID(disk, instance.primary_node)
3522

    
3523
  return disks_ok, device_info
3524

    
3525

    
3526
def _StartInstanceDisks(lu, instance, force):
3527
  """Start the disks of an instance.
3528

3529
  """
3530
  disks_ok, _ = _AssembleInstanceDisks(lu, instance,
3531
                                           ignore_secondaries=force)
3532
  if not disks_ok:
3533
    _ShutdownInstanceDisks(lu, instance)
3534
    if force is not None and not force:
3535
      lu.proc.LogWarning("", hint="If the message above refers to a"
3536
                         " secondary node,"
3537
                         " you can retry the operation using '--force'.")
3538
    raise errors.OpExecError("Disk consistency error")
3539

    
3540

    
3541
class LUDeactivateInstanceDisks(NoHooksLU):
3542
  """Shutdown an instance's disks.
3543

3544
  """
3545
  _OP_REQP = ["instance_name"]
3546
  REQ_BGL = False
3547

    
3548
  def ExpandNames(self):
3549
    self._ExpandAndLockInstance()
3550
    self.needed_locks[locking.LEVEL_NODE] = []
3551
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3552

    
3553
  def DeclareLocks(self, level):
3554
    if level == locking.LEVEL_NODE:
3555
      self._LockInstancesNodes()
3556

    
3557
  def CheckPrereq(self):
3558
    """Check prerequisites.
3559

3560
    This checks that the instance is in the cluster.
3561

3562
    """
3563
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3564
    assert self.instance is not None, \
3565
      "Cannot retrieve locked instance %s" % self.op.instance_name
3566

    
3567
  def Exec(self, feedback_fn):
3568
    """Deactivate the disks
3569

3570
    """
3571
    instance = self.instance
3572
    _SafeShutdownInstanceDisks(self, instance)
3573

    
3574

    
3575
def _SafeShutdownInstanceDisks(lu, instance):
3576
  """Shutdown block devices of an instance.
3577

3578
  This function checks if an instance is running, before calling
3579
  _ShutdownInstanceDisks.
3580

3581
  """
3582
  pnode = instance.primary_node
3583
  ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
3584
  ins_l.Raise("Can't contact node %s" % pnode)
3585

    
3586
  if instance.name in ins_l.payload:
3587
    raise errors.OpExecError("Instance is running, can't shutdown"
3588
                             " block devices.")
3589

    
3590
  _ShutdownInstanceDisks(lu, instance)
3591

    
3592

    
3593
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
3594
  """Shutdown block devices of an instance.
3595

3596
  This does the shutdown on all nodes of the instance.
3597

3598
  If the ignore_primary is false, errors on the primary node are
3599
  ignored.
3600

3601
  """
3602
  all_result = True
3603
  for disk in instance.disks:
3604
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
3605
      lu.cfg.SetDiskID(top_disk, node)
3606
      result = lu.rpc.call_blockdev_shutdown(node, top_disk)
3607
      msg = result.fail_msg
3608
      if msg:
3609
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
3610
                      disk.iv_name, node, msg)
3611
        if not ignore_primary or node != instance.primary_node:
3612
          all_result = False
3613
  return all_result
3614

    
3615

    
3616
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
3617
  """Checks if a node has enough free memory.
3618

3619
  This function check if a given node has the needed amount of free
3620
  memory. In case the node has less memory or we cannot get the
3621
  information from the node, this function raise an OpPrereqError
3622
  exception.
3623

3624
  @type lu: C{LogicalUnit}
3625
  @param lu: a logical unit from which we get configuration data
3626
  @type node: C{str}
3627
  @param node: the node to check
3628
  @type reason: C{str}
3629
  @param reason: string to use in the error message
3630
  @type requested: C{int}
3631
  @param requested: the amount of memory in MiB to check for
3632
  @type hypervisor_name: C{str}
3633
  @param hypervisor_name: the hypervisor to ask for memory stats
3634
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
3635
      we cannot check the node
3636

3637
  """
3638
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
3639
  nodeinfo[node].Raise("Can't get data from node %s" % node,
3640
                       prereq=True, ecode=errors.ECODE_ENVIRON)
3641
  free_mem = nodeinfo[node].payload.get('memory_free', None)
3642
  if not isinstance(free_mem, int):
3643
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
3644
                               " was '%s'" % (node, free_mem),
3645
                               errors.ECODE_ENVIRON)
3646
  if requested > free_mem:
3647
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
3648
                               " needed %s MiB, available %s MiB" %
3649
                               (node, reason, requested, free_mem),
3650
                               errors.ECODE_NORES)
3651

    
3652

    
3653
class LUStartupInstance(LogicalUnit):
3654
  """Starts an instance.
3655

3656
  """
3657
  HPATH = "instance-start"
3658
  HTYPE = constants.HTYPE_INSTANCE
3659
  _OP_REQP = ["instance_name", "force"]
3660
  REQ_BGL = False
3661

    
3662
  def ExpandNames(self):
3663
    self._ExpandAndLockInstance()
3664

    
3665
  def BuildHooksEnv(self):
3666
    """Build hooks env.
3667

3668
    This runs on master, primary and secondary nodes of the instance.
3669

3670
    """
3671
    env = {
3672
      "FORCE": self.op.force,
3673
      }
3674
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3675
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3676
    return env, nl, nl
3677

    
3678
  def CheckPrereq(self):
3679
    """Check prerequisites.
3680

3681
    This checks that the instance is in the cluster.
3682

3683
    """
3684
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3685
    assert self.instance is not None, \
3686
      "Cannot retrieve locked instance %s" % self.op.instance_name
3687

    
3688
    # extra beparams
3689
    self.beparams = getattr(self.op, "beparams", {})
3690
    if self.beparams:
3691
      if not isinstance(self.beparams, dict):
3692
        raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
3693
                                   " dict" % (type(self.beparams), ),
3694
                                   errors.ECODE_INVAL)
3695
      # fill the beparams dict
3696
      utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
3697
      self.op.beparams = self.beparams
3698

    
3699
    # extra hvparams
3700
    self.hvparams = getattr(self.op, "hvparams", {})
3701
    if self.hvparams:
3702
      if not isinstance(self.hvparams, dict):
3703
        raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
3704
                                   " dict" % (type(self.hvparams), ),
3705
                                   errors.ECODE_INVAL)
3706

    
3707
      # check hypervisor parameter syntax (locally)
3708
      cluster = self.cfg.GetClusterInfo()
3709
      utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
3710
      filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
3711
                                    instance.hvparams)
3712
      filled_hvp.update(self.hvparams)
3713
      hv_type = hypervisor.GetHypervisor(instance.hypervisor)
3714
      hv_type.CheckParameterSyntax(filled_hvp)
3715
      _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
3716
      self.op.hvparams = self.hvparams
3717

    
3718
    _CheckNodeOnline(self, instance.primary_node)
3719

    
3720
    bep = self.cfg.GetClusterInfo().FillBE(instance)
3721
    # check bridges existence
3722
    _CheckInstanceBridgesExist(self, instance)
3723

    
3724
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3725
                                              instance.name,
3726
                                              instance.hypervisor)
3727
    remote_info.Raise("Error checking node %s" % instance.primary_node,
3728
                      prereq=True, ecode=errors.ECODE_ENVIRON)
3729
    if not remote_info.payload: # not running already
3730
      _CheckNodeFreeMemory(self, instance.primary_node,
3731
                           "starting instance %s" % instance.name,
3732
                           bep[constants.BE_MEMORY], instance.hypervisor)
3733

    
3734
  def Exec(self, feedback_fn):
3735
    """Start the instance.
3736

3737
    """
3738
    instance = self.instance
3739
    force = self.op.force
3740

    
3741
    self.cfg.MarkInstanceUp(instance.name)
3742

    
3743
    node_current = instance.primary_node
3744

    
3745
    _StartInstanceDisks(self, instance, force)
3746

    
3747
    result = self.rpc.call_instance_start(node_current, instance,
3748
                                          self.hvparams, self.beparams)
3749
    msg = result.fail_msg
3750
    if msg:
3751
      _ShutdownInstanceDisks(self, instance)
3752
      raise errors.OpExecError("Could not start instance: %s" % msg)
3753

    
3754

    
3755
class LURebootInstance(LogicalUnit):
3756
  """Reboot an instance.
3757

3758
  """
3759
  HPATH = "instance-reboot"
3760
  HTYPE = constants.HTYPE_INSTANCE
3761
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
3762
  REQ_BGL = False
3763

    
3764
  def CheckArguments(self):
3765
    """Check the arguments.
3766

3767
    """
3768
    self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
3769
                                    constants.DEFAULT_SHUTDOWN_TIMEOUT)
3770

    
3771
  def ExpandNames(self):
3772
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
3773
                                   constants.INSTANCE_REBOOT_HARD,
3774
                                   constants.INSTANCE_REBOOT_FULL]:
3775
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
3776
                                  (constants.INSTANCE_REBOOT_SOFT,
3777
                                   constants.INSTANCE_REBOOT_HARD,
3778
                                   constants.INSTANCE_REBOOT_FULL))
3779
    self._ExpandAndLockInstance()
3780

    
3781
  def BuildHooksEnv(self):
3782
    """Build hooks env.
3783

3784
    This runs on master, primary and secondary nodes of the instance.
3785

3786
    """
3787
    env = {
3788
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
3789
      "REBOOT_TYPE": self.op.reboot_type,
3790
      "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
3791
      }
3792
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3793
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3794
    return env, nl, nl
3795

    
3796
  def CheckPrereq(self):
3797
    """Check prerequisites.
3798

3799
    This checks that the instance is in the cluster.
3800

3801
    """
3802
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3803
    assert self.instance is not None, \
3804
      "Cannot retrieve locked instance %s" % self.op.instance_name
3805

    
3806
    _CheckNodeOnline(self, instance.primary_node)
3807

    
3808
    # check bridges existence
3809
    _CheckInstanceBridgesExist(self, instance)
3810

    
3811
  def Exec(self, feedback_fn):
3812
    """Reboot the instance.
3813

3814
    """
3815
    instance = self.instance
3816
    ignore_secondaries = self.op.ignore_secondaries
3817
    reboot_type = self.op.reboot_type
3818

    
3819
    node_current = instance.primary_node
3820

    
3821
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3822
                       constants.INSTANCE_REBOOT_HARD]:
3823
      for disk in instance.disks:
3824
        self.cfg.SetDiskID(disk, node_current)
3825
      result = self.rpc.call_instance_reboot(node_current, instance,
3826
                                             reboot_type,
3827
                                             self.shutdown_timeout)
3828
      result.Raise("Could not reboot instance")
3829
    else:
3830
      result = self.rpc.call_instance_shutdown(node_current, instance,
3831
                                               self.shutdown_timeout)
3832
      result.Raise("Could not shutdown instance for full reboot")
3833
      _ShutdownInstanceDisks(self, instance)
3834
      _StartInstanceDisks(self, instance, ignore_secondaries)
3835
      result = self.rpc.call_instance_start(node_current, instance, None, None)
3836
      msg = result.fail_msg
3837
      if msg:
3838
        _ShutdownInstanceDisks(self, instance)
3839
        raise errors.OpExecError("Could not start instance for"
3840
                                 " full reboot: %s" % msg)
3841

    
3842
    self.cfg.MarkInstanceUp(instance.name)
3843

    
3844

    
3845
class LUShutdownInstance(LogicalUnit):
3846
  """Shutdown an instance.
3847

3848
  """
3849
  HPATH = "instance-stop"
3850
  HTYPE = constants.HTYPE_INSTANCE
3851
  _OP_REQP = ["instance_name"]
3852
  REQ_BGL = False
3853

    
3854
  def CheckArguments(self):
3855
    """Check the arguments.
3856

3857
    """
3858
    self.timeout = getattr(self.op, "timeout",
3859
                           constants.DEFAULT_SHUTDOWN_TIMEOUT)
3860

    
3861
  def ExpandNames(self):
3862
    self._ExpandAndLockInstance()
3863

    
3864
  def BuildHooksEnv(self):
3865
    """Build hooks env.
3866

3867
    This runs on master, primary and secondary nodes of the instance.
3868

3869
    """
3870
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3871
    env["TIMEOUT"] = self.timeout
3872
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3873
    return env, nl, nl
3874

    
3875
  def CheckPrereq(self):
3876
    """Check prerequisites.
3877

3878
    This checks that the instance is in the cluster.
3879

3880
    """
3881
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3882
    assert self.instance is not None, \
3883
      "Cannot retrieve locked instance %s" % self.op.instance_name
3884
    _CheckNodeOnline(self, self.instance.primary_node)
3885

    
3886
  def Exec(self, feedback_fn):
3887
    """Shutdown the instance.
3888

3889
    """
3890
    instance = self.instance
3891
    node_current = instance.primary_node
3892
    timeout = self.timeout
3893
    self.cfg.MarkInstanceDown(instance.name)
3894
    result = self.rpc.call_instance_shutdown(node_current, instance, timeout)
3895
    msg = result.fail_msg
3896
    if msg:
3897
      self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3898

    
3899
    _ShutdownInstanceDisks(self, instance)
3900

    
3901

    
3902
class LUReinstallInstance(LogicalUnit):
3903
  """Reinstall an instance.
3904

3905
  """
3906
  HPATH = "instance-reinstall"
3907
  HTYPE = constants.HTYPE_INSTANCE
3908
  _OP_REQP = ["instance_name"]
3909
  REQ_BGL = False
3910

    
3911
  def ExpandNames(self):
3912
    self._ExpandAndLockInstance()
3913

    
3914
  def BuildHooksEnv(self):
3915
    """Build hooks env.
3916

3917
    This runs on master, primary and secondary nodes of the instance.
3918

3919
    """
3920
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3921
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3922
    return env, nl, nl
3923

    
3924
  def CheckPrereq(self):
3925
    """Check prerequisites.
3926

3927
    This checks that the instance is in the cluster and is not running.
3928

3929
    """
3930
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3931
    assert instance is not None, \
3932
      "Cannot retrieve locked instance %s" % self.op.instance_name
3933
    _CheckNodeOnline(self, instance.primary_node)
3934

    
3935
    if instance.disk_template == constants.DT_DISKLESS:
3936
      raise errors.OpPrereqError("Instance '%s' has no disks" %
3937
                                 self.op.instance_name,
3938
                                 errors.ECODE_INVAL)
3939
    if instance.admin_up:
3940
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3941
                                 self.op.instance_name,
3942
                                 errors.ECODE_STATE)
3943
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3944
                                              instance.name,
3945
                                              instance.hypervisor)
3946
    remote_info.Raise("Error checking node %s" % instance.primary_node,
3947
                      prereq=True, ecode=errors.ECODE_ENVIRON)
3948
    if remote_info.payload:
3949
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3950
                                 (self.op.instance_name,
3951
                                  instance.primary_node),
3952
                                 errors.ECODE_STATE)
3953

    
3954
    self.op.os_type = getattr(self.op, "os_type", None)
3955
    self.op.force_variant = getattr(self.op, "force_variant", False)
3956
    if self.op.os_type is not None:
3957
      # OS verification
3958
      pnode = self.cfg.GetNodeInfo(
3959
        self.cfg.ExpandNodeName(instance.primary_node))
3960
      if pnode is None:
3961
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
3962
                                   self.op.pnode, errors.ECODE_NOENT)
3963
      result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3964
      result.Raise("OS '%s' not in supported OS list for primary node %s" %
3965
                   (self.op.os_type, pnode.name),
3966
                   prereq=True, ecode=errors.ECODE_INVAL)
3967
      if not self.op.force_variant:
3968
        _CheckOSVariant(result.payload, self.op.os_type)
3969

    
3970
    self.instance = instance
3971

    
3972
  def Exec(self, feedback_fn):
3973
    """Reinstall the instance.
3974

3975
    """
3976
    inst = self.instance
3977

    
3978
    if self.op.os_type is not None:
3979
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3980
      inst.os = self.op.os_type
3981
      self.cfg.Update(inst, feedback_fn)
3982

    
3983
    _StartInstanceDisks(self, inst, None)
3984
    try:
3985
      feedback_fn("Running the instance OS create scripts...")
3986
      result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3987
      result.Raise("Could not install OS for instance %s on node %s" %
3988
                   (inst.name, inst.primary_node))
3989
    finally:
3990
      _ShutdownInstanceDisks(self, inst)
3991

    
3992

    
3993
class LURecreateInstanceDisks(LogicalUnit):
3994
  """Recreate an instance's missing disks.
3995

3996
  """
3997
  HPATH = "instance-recreate-disks"
3998
  HTYPE = constants.HTYPE_INSTANCE
3999
  _OP_REQP = ["instance_name", "disks"]
4000
  REQ_BGL = False
4001

    
4002
  def CheckArguments(self):
4003
    """Check the arguments.
4004

4005
    """
4006
    if not isinstance(self.op.disks, list):
4007
      raise errors.OpPrereqError("Invalid disks parameter", errors.ECODE_INVAL)
4008
    for item in self.op.disks:
4009
      if (not isinstance(item, int) or
4010
          item < 0):
4011
        raise errors.OpPrereqError("Invalid disk specification '%s'" %
4012
                                   str(item), errors.ECODE_INVAL)
4013

    
4014
  def ExpandNames(self):
4015
    self._ExpandAndLockInstance()
4016

    
4017
  def BuildHooksEnv(self):
4018
    """Build hooks env.
4019

4020
    This runs on master, primary and secondary nodes of the instance.
4021

4022
    """
4023
    env = _BuildInstanceHookEnvByObject(self, self.instance)
4024
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
4025
    return env, nl, nl
4026

    
4027
  def CheckPrereq(self):
4028
    """Check prerequisites.
4029

4030
    This checks that the instance is in the c