Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 159d4ec6

History | View | Annotate | Download (296 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import time
29
import re
30
import platform
31
import logging
32
import copy
33

    
34
from ganeti import ssh
35
from ganeti import utils
36
from ganeti import errors
37
from ganeti import hypervisor
38
from ganeti import locking
39
from ganeti import constants
40
from ganeti import objects
41
from ganeti import serializer
42
from ganeti import ssconf
43

    
44

    
45
class LogicalUnit(object):
46
  """Logical Unit base class.
47

48
  Subclasses must follow these rules:
49
    - implement ExpandNames
50
    - implement CheckPrereq (except when tasklets are used)
51
    - implement Exec (except when tasklets are used)
52
    - implement BuildHooksEnv
53
    - redefine HPATH and HTYPE
54
    - optionally redefine their run requirements:
55
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
56

57
  Note that all commands require root permissions.
58

59
  @ivar dry_run_result: the value (if any) that will be returned to the caller
60
      in dry-run mode (signalled by opcode dry_run parameter)
61

62
  """
63
  HPATH = None
64
  HTYPE = None
65
  _OP_REQP = []
66
  REQ_BGL = True
67

    
68
  def __init__(self, processor, op, context, rpc):
69
    """Constructor for LogicalUnit.
70

71
    This needs to be overridden in derived classes in order to check op
72
    validity.
73

74
    """
75
    self.proc = processor
76
    self.op = op
77
    self.cfg = context.cfg
78
    self.context = context
79
    self.rpc = rpc
80
    # Dicts used to declare locking needs to mcpu
81
    self.needed_locks = None
82
    self.acquired_locks = {}
83
    self.share_locks = dict.fromkeys(locking.LEVELS, 0)
84
    self.add_locks = {}
85
    self.remove_locks = {}
86
    # Used to force good behavior when calling helper functions
87
    self.recalculate_locks = {}
88
    self.__ssh = None
89
    # logging
90
    self.LogWarning = processor.LogWarning
91
    self.LogInfo = processor.LogInfo
92
    self.LogStep = processor.LogStep
93
    # support for dry-run
94
    self.dry_run_result = None
95

    
96
    # Tasklets
97
    self.tasklets = None
98

    
99
    for attr_name in self._OP_REQP:
100
      attr_val = getattr(op, attr_name, None)
101
      if attr_val is None:
102
        raise errors.OpPrereqError("Required parameter '%s' missing" %
103
                                   attr_name)
104

    
105
    self.CheckArguments()
106

    
107
  def __GetSSH(self):
108
    """Returns the SshRunner object
109

110
    """
111
    if not self.__ssh:
112
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
113
    return self.__ssh
114

    
115
  ssh = property(fget=__GetSSH)
116

    
117
  def CheckArguments(self):
118
    """Check syntactic validity for the opcode arguments.
119

120
    This method is for doing a simple syntactic check and ensure
121
    validity of opcode parameters, without any cluster-related
122
    checks. While the same can be accomplished in ExpandNames and/or
123
    CheckPrereq, doing these separate is better because:
124

125
      - ExpandNames is left as as purely a lock-related function
126
      - CheckPrereq is run after we have acquired locks (and possible
127
        waited for them)
128

129
    The function is allowed to change the self.op attribute so that
130
    later methods can no longer worry about missing parameters.
131

132
    """
133
    pass
134

    
135
  def ExpandNames(self):
136
    """Expand names for this LU.
137

138
    This method is called before starting to execute the opcode, and it should
139
    update all the parameters of the opcode to their canonical form (e.g. a
140
    short node name must be fully expanded after this method has successfully
141
    completed). This way locking, hooks, logging, ecc. can work correctly.
142

143
    LUs which implement this method must also populate the self.needed_locks
144
    member, as a dict with lock levels as keys, and a list of needed lock names
145
    as values. Rules:
146

147
      - use an empty dict if you don't need any lock
148
      - if you don't need any lock at a particular level omit that level
149
      - don't put anything for the BGL level
150
      - if you want all locks at a level use locking.ALL_SET as a value
151

152
    If you need to share locks (rather than acquire them exclusively) at one
153
    level you can modify self.share_locks, setting a true value (usually 1) for
154
    that level. By default locks are not shared.
155

156
    This function can also define a list of tasklets, which then will be
157
    executed in order instead of the usual LU-level CheckPrereq and Exec
158
    functions, if those are not defined by the LU.
159

160
    Examples::
161

162
      # Acquire all nodes and one instance
163
      self.needed_locks = {
164
        locking.LEVEL_NODE: locking.ALL_SET,
165
        locking.LEVEL_INSTANCE: ['instance1.example.tld'],
166
      }
167
      # Acquire just two nodes
168
      self.needed_locks = {
169
        locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
170
      }
171
      # Acquire no locks
172
      self.needed_locks = {} # No, you can't leave it to the default value None
173

174
    """
175
    # The implementation of this method is mandatory only if the new LU is
176
    # concurrent, so that old LUs don't need to be changed all at the same
177
    # time.
178
    if self.REQ_BGL:
179
      self.needed_locks = {} # Exclusive LUs don't need locks.
180
    else:
181
      raise NotImplementedError
182

    
183
  def DeclareLocks(self, level):
184
    """Declare LU locking needs for a level
185

186
    While most LUs can just declare their locking needs at ExpandNames time,
187
    sometimes there's the need to calculate some locks after having acquired
188
    the ones before. This function is called just before acquiring locks at a
189
    particular level, but after acquiring the ones at lower levels, and permits
190
    such calculations. It can be used to modify self.needed_locks, and by
191
    default it does nothing.
192

193
    This function is only called if you have something already set in
194
    self.needed_locks for the level.
195

196
    @param level: Locking level which is going to be locked
197
    @type level: member of ganeti.locking.LEVELS
198

199
    """
200

    
201
  def CheckPrereq(self):
202
    """Check prerequisites for this LU.
203

204
    This method should check that the prerequisites for the execution
205
    of this LU are fulfilled. It can do internode communication, but
206
    it should be idempotent - no cluster or system changes are
207
    allowed.
208

209
    The method should raise errors.OpPrereqError in case something is
210
    not fulfilled. Its return value is ignored.
211

212
    This method should also update all the parameters of the opcode to
213
    their canonical form if it hasn't been done by ExpandNames before.
214

215
    """
216
    if self.tasklets is not None:
217
      for (idx, tl) in enumerate(self.tasklets):
218
        logging.debug("Checking prerequisites for tasklet %s/%s",
219
                      idx + 1, len(self.tasklets))
220
        tl.CheckPrereq()
221
    else:
222
      raise NotImplementedError
223

    
224
  def Exec(self, feedback_fn):
225
    """Execute the LU.
226

227
    This method should implement the actual work. It should raise
228
    errors.OpExecError for failures that are somewhat dealt with in
229
    code, or expected.
230

231
    """
232
    if self.tasklets is not None:
233
      for (idx, tl) in enumerate(self.tasklets):
234
        logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
235
        tl.Exec(feedback_fn)
236
    else:
237
      raise NotImplementedError
238

    
239
  def BuildHooksEnv(self):
240
    """Build hooks environment for this LU.
241

242
    This method should return a three-node tuple consisting of: a dict
243
    containing the environment that will be used for running the
244
    specific hook for this LU, a list of node names on which the hook
245
    should run before the execution, and a list of node names on which
246
    the hook should run after the execution.
247

248
    The keys of the dict must not have 'GANETI_' prefixed as this will
249
    be handled in the hooks runner. Also note additional keys will be
250
    added by the hooks runner. If the LU doesn't define any
251
    environment, an empty dict (and not None) should be returned.
252

253
    No nodes should be returned as an empty list (and not None).
254

255
    Note that if the HPATH for a LU class is None, this function will
256
    not be called.
257

258
    """
259
    raise NotImplementedError
260

    
261
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
262
    """Notify the LU about the results of its hooks.
263

264
    This method is called every time a hooks phase is executed, and notifies
265
    the Logical Unit about the hooks' result. The LU can then use it to alter
266
    its result based on the hooks.  By default the method does nothing and the
267
    previous result is passed back unchanged but any LU can define it if it
268
    wants to use the local cluster hook-scripts somehow.
269

270
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
271
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
272
    @param hook_results: the results of the multi-node hooks rpc call
273
    @param feedback_fn: function used send feedback back to the caller
274
    @param lu_result: the previous Exec result this LU had, or None
275
        in the PRE phase
276
    @return: the new Exec result, based on the previous result
277
        and hook results
278

279
    """
280
    return lu_result
281

    
282
  def _ExpandAndLockInstance(self):
283
    """Helper function to expand and lock an instance.
284

285
    Many LUs that work on an instance take its name in self.op.instance_name
286
    and need to expand it and then declare the expanded name for locking. This
287
    function does it, and then updates self.op.instance_name to the expanded
288
    name. It also initializes needed_locks as a dict, if this hasn't been done
289
    before.
290

291
    """
292
    if self.needed_locks is None:
293
      self.needed_locks = {}
294
    else:
295
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
296
        "_ExpandAndLockInstance called with instance-level locks set"
297
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
298
    if expanded_name is None:
299
      raise errors.OpPrereqError("Instance '%s' not known" %
300
                                  self.op.instance_name)
301
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
302
    self.op.instance_name = expanded_name
303

    
304
  def _LockInstancesNodes(self, primary_only=False):
305
    """Helper function to declare instances' nodes for locking.
306

307
    This function should be called after locking one or more instances to lock
308
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
309
    with all primary or secondary nodes for instances already locked and
310
    present in self.needed_locks[locking.LEVEL_INSTANCE].
311

312
    It should be called from DeclareLocks, and for safety only works if
313
    self.recalculate_locks[locking.LEVEL_NODE] is set.
314

315
    In the future it may grow parameters to just lock some instance's nodes, or
316
    to just lock primaries or secondary nodes, if needed.
317

318
    If should be called in DeclareLocks in a way similar to::
319

320
      if level == locking.LEVEL_NODE:
321
        self._LockInstancesNodes()
322

323
    @type primary_only: boolean
324
    @param primary_only: only lock primary nodes of locked instances
325

326
    """
327
    assert locking.LEVEL_NODE in self.recalculate_locks, \
328
      "_LockInstancesNodes helper function called with no nodes to recalculate"
329

    
330
    # TODO: check if we're really been called with the instance locks held
331

    
332
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
333
    # future we might want to have different behaviors depending on the value
334
    # of self.recalculate_locks[locking.LEVEL_NODE]
335
    wanted_nodes = []
336
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
337
      instance = self.context.cfg.GetInstanceInfo(instance_name)
338
      wanted_nodes.append(instance.primary_node)
339
      if not primary_only:
340
        wanted_nodes.extend(instance.secondary_nodes)
341

    
342
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
343
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
344
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
345
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
346

    
347
    del self.recalculate_locks[locking.LEVEL_NODE]
348

    
349

    
350
class NoHooksLU(LogicalUnit):
351
  """Simple LU which runs no hooks.
352

353
  This LU is intended as a parent for other LogicalUnits which will
354
  run no hooks, in order to reduce duplicate code.
355

356
  """
357
  HPATH = None
358
  HTYPE = None
359

    
360

    
361
class Tasklet:
362
  """Tasklet base class.
363

364
  Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
365
  they can mix legacy code with tasklets. Locking needs to be done in the LU,
366
  tasklets know nothing about locks.
367

368
  Subclasses must follow these rules:
369
    - Implement CheckPrereq
370
    - Implement Exec
371

372
  """
373
  def __init__(self, lu):
374
    self.lu = lu
375

    
376
    # Shortcuts
377
    self.cfg = lu.cfg
378
    self.rpc = lu.rpc
379

    
380
  def CheckPrereq(self):
381
    """Check prerequisites for this tasklets.
382

383
    This method should check whether the prerequisites for the execution of
384
    this tasklet are fulfilled. It can do internode communication, but it
385
    should be idempotent - no cluster or system changes are allowed.
386

387
    The method should raise errors.OpPrereqError in case something is not
388
    fulfilled. Its return value is ignored.
389

390
    This method should also update all parameters to their canonical form if it
391
    hasn't been done before.
392

393
    """
394
    raise NotImplementedError
395

    
396
  def Exec(self, feedback_fn):
397
    """Execute the tasklet.
398

399
    This method should implement the actual work. It should raise
400
    errors.OpExecError for failures that are somewhat dealt with in code, or
401
    expected.
402

403
    """
404
    raise NotImplementedError
405

    
406

    
407
def _GetWantedNodes(lu, nodes):
408
  """Returns list of checked and expanded node names.
409

410
  @type lu: L{LogicalUnit}
411
  @param lu: the logical unit on whose behalf we execute
412
  @type nodes: list
413
  @param nodes: list of node names or None for all nodes
414
  @rtype: list
415
  @return: the list of nodes, sorted
416
  @raise errors.OpProgrammerError: if the nodes parameter is wrong type
417

418
  """
419
  if not isinstance(nodes, list):
420
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
421

    
422
  if not nodes:
423
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
424
      " non-empty list of nodes whose name is to be expanded.")
425

    
426
  wanted = []
427
  for name in nodes:
428
    node = lu.cfg.ExpandNodeName(name)
429
    if node is None:
430
      raise errors.OpPrereqError("No such node name '%s'" % name)
431
    wanted.append(node)
432

    
433
  return utils.NiceSort(wanted)
434

    
435

    
436
def _GetWantedInstances(lu, instances):
437
  """Returns list of checked and expanded instance names.
438

439
  @type lu: L{LogicalUnit}
440
  @param lu: the logical unit on whose behalf we execute
441
  @type instances: list
442
  @param instances: list of instance names or None for all instances
443
  @rtype: list
444
  @return: the list of instances, sorted
445
  @raise errors.OpPrereqError: if the instances parameter is wrong type
446
  @raise errors.OpPrereqError: if any of the passed instances is not found
447

448
  """
449
  if not isinstance(instances, list):
450
    raise errors.OpPrereqError("Invalid argument type 'instances'")
451

    
452
  if instances:
453
    wanted = []
454

    
455
    for name in instances:
456
      instance = lu.cfg.ExpandInstanceName(name)
457
      if instance is None:
458
        raise errors.OpPrereqError("No such instance name '%s'" % name)
459
      wanted.append(instance)
460

    
461
  else:
462
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
463
  return wanted
464

    
465

    
466
def _CheckOutputFields(static, dynamic, selected):
467
  """Checks whether all selected fields are valid.
468

469
  @type static: L{utils.FieldSet}
470
  @param static: static fields set
471
  @type dynamic: L{utils.FieldSet}
472
  @param dynamic: dynamic fields set
473

474
  """
475
  f = utils.FieldSet()
476
  f.Extend(static)
477
  f.Extend(dynamic)
478

    
479
  delta = f.NonMatching(selected)
480
  if delta:
481
    raise errors.OpPrereqError("Unknown output fields selected: %s"
482
                               % ",".join(delta))
483

    
484

    
485
def _CheckBooleanOpField(op, name):
486
  """Validates boolean opcode parameters.
487

488
  This will ensure that an opcode parameter is either a boolean value,
489
  or None (but that it always exists).
490

491
  """
492
  val = getattr(op, name, None)
493
  if not (val is None or isinstance(val, bool)):
494
    raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
495
                               (name, str(val)))
496
  setattr(op, name, val)
497

    
498

    
499
def _CheckNodeOnline(lu, node):
500
  """Ensure that a given node is online.
501

502
  @param lu: the LU on behalf of which we make the check
503
  @param node: the node to check
504
  @raise errors.OpPrereqError: if the node is offline
505

506
  """
507
  if lu.cfg.GetNodeInfo(node).offline:
508
    raise errors.OpPrereqError("Can't use offline node %s" % node)
509

    
510

    
511
def _CheckNodeNotDrained(lu, node):
512
  """Ensure that a given node is not drained.
513

514
  @param lu: the LU on behalf of which we make the check
515
  @param node: the node to check
516
  @raise errors.OpPrereqError: if the node is drained
517

518
  """
519
  if lu.cfg.GetNodeInfo(node).drained:
520
    raise errors.OpPrereqError("Can't use drained node %s" % node)
521

    
522

    
523
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
524
                          memory, vcpus, nics, disk_template, disks,
525
                          bep, hvp, hypervisor_name):
526
  """Builds instance related env variables for hooks
527

528
  This builds the hook environment from individual variables.
529

530
  @type name: string
531
  @param name: the name of the instance
532
  @type primary_node: string
533
  @param primary_node: the name of the instance's primary node
534
  @type secondary_nodes: list
535
  @param secondary_nodes: list of secondary nodes as strings
536
  @type os_type: string
537
  @param os_type: the name of the instance's OS
538
  @type status: boolean
539
  @param status: the should_run status of the instance
540
  @type memory: string
541
  @param memory: the memory size of the instance
542
  @type vcpus: string
543
  @param vcpus: the count of VCPUs the instance has
544
  @type nics: list
545
  @param nics: list of tuples (ip, mac, mode, link) representing
546
      the NICs the instance has
547
  @type disk_template: string
548
  @param disk_template: the disk template of the instance
549
  @type disks: list
550
  @param disks: the list of (size, mode) pairs
551
  @type bep: dict
552
  @param bep: the backend parameters for the instance
553
  @type hvp: dict
554
  @param hvp: the hypervisor parameters for the instance
555
  @type hypervisor_name: string
556
  @param hypervisor_name: the hypervisor for the instance
557
  @rtype: dict
558
  @return: the hook environment for this instance
559

560
  """
561
  if status:
562
    str_status = "up"
563
  else:
564
    str_status = "down"
565
  env = {
566
    "OP_TARGET": name,
567
    "INSTANCE_NAME": name,
568
    "INSTANCE_PRIMARY": primary_node,
569
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
570
    "INSTANCE_OS_TYPE": os_type,
571
    "INSTANCE_STATUS": str_status,
572
    "INSTANCE_MEMORY": memory,
573
    "INSTANCE_VCPUS": vcpus,
574
    "INSTANCE_DISK_TEMPLATE": disk_template,
575
    "INSTANCE_HYPERVISOR": hypervisor_name,
576
  }
577

    
578
  if nics:
579
    nic_count = len(nics)
580
    for idx, (ip, mac, mode, link) in enumerate(nics):
581
      if ip is None:
582
        ip = ""
583
      env["INSTANCE_NIC%d_IP" % idx] = ip
584
      env["INSTANCE_NIC%d_MAC" % idx] = mac
585
      env["INSTANCE_NIC%d_MODE" % idx] = mode
586
      env["INSTANCE_NIC%d_LINK" % idx] = link
587
      if mode == constants.NIC_MODE_BRIDGED:
588
        env["INSTANCE_NIC%d_BRIDGE" % idx] = link
589
  else:
590
    nic_count = 0
591

    
592
  env["INSTANCE_NIC_COUNT"] = nic_count
593

    
594
  if disks:
595
    disk_count = len(disks)
596
    for idx, (size, mode) in enumerate(disks):
597
      env["INSTANCE_DISK%d_SIZE" % idx] = size
598
      env["INSTANCE_DISK%d_MODE" % idx] = mode
599
  else:
600
    disk_count = 0
601

    
602
  env["INSTANCE_DISK_COUNT"] = disk_count
603

    
604
  for source, kind in [(bep, "BE"), (hvp, "HV")]:
605
    for key, value in source.items():
606
      env["INSTANCE_%s_%s" % (kind, key)] = value
607

    
608
  return env
609

    
610

    
611
def _NICListToTuple(lu, nics):
612
  """Build a list of nic information tuples.
613

614
  This list is suitable to be passed to _BuildInstanceHookEnv or as a return
615
  value in LUQueryInstanceData.
616

617
  @type lu:  L{LogicalUnit}
618
  @param lu: the logical unit on whose behalf we execute
619
  @type nics: list of L{objects.NIC}
620
  @param nics: list of nics to convert to hooks tuples
621

622
  """
623
  hooks_nics = []
624
  c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
625
  for nic in nics:
626
    ip = nic.ip
627
    mac = nic.mac
628
    filled_params = objects.FillDict(c_nicparams, nic.nicparams)
629
    mode = filled_params[constants.NIC_MODE]
630
    link = filled_params[constants.NIC_LINK]
631
    hooks_nics.append((ip, mac, mode, link))
632
  return hooks_nics
633

    
634

    
635
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
636
  """Builds instance related env variables for hooks from an object.
637

638
  @type lu: L{LogicalUnit}
639
  @param lu: the logical unit on whose behalf we execute
640
  @type instance: L{objects.Instance}
641
  @param instance: the instance for which we should build the
642
      environment
643
  @type override: dict
644
  @param override: dictionary with key/values that will override
645
      our values
646
  @rtype: dict
647
  @return: the hook environment dictionary
648

649
  """
650
  cluster = lu.cfg.GetClusterInfo()
651
  bep = cluster.FillBE(instance)
652
  hvp = cluster.FillHV(instance)
653
  args = {
654
    'name': instance.name,
655
    'primary_node': instance.primary_node,
656
    'secondary_nodes': instance.secondary_nodes,
657
    'os_type': instance.os,
658
    'status': instance.admin_up,
659
    'memory': bep[constants.BE_MEMORY],
660
    'vcpus': bep[constants.BE_VCPUS],
661
    'nics': _NICListToTuple(lu, instance.nics),
662
    'disk_template': instance.disk_template,
663
    'disks': [(disk.size, disk.mode) for disk in instance.disks],
664
    'bep': bep,
665
    'hvp': hvp,
666
    'hypervisor_name': instance.hypervisor,
667
  }
668
  if override:
669
    args.update(override)
670
  return _BuildInstanceHookEnv(**args)
671

    
672

    
673
def _AdjustCandidatePool(lu, exceptions):
674
  """Adjust the candidate pool after node operations.
675

676
  """
677
  mod_list = lu.cfg.MaintainCandidatePool(exceptions)
678
  if mod_list:
679
    lu.LogInfo("Promoted nodes to master candidate role: %s",
680
               ", ".join(node.name for node in mod_list))
681
    for name in mod_list:
682
      lu.context.ReaddNode(name)
683
  mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
684
  if mc_now > mc_max:
685
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
686
               (mc_now, mc_max))
687

    
688

    
689
def _DecideSelfPromotion(lu, exceptions=None):
690
  """Decide whether I should promote myself as a master candidate.
691

692
  """
693
  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
694
  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
695
  # the new node will increase mc_max with one, so:
696
  mc_should = min(mc_should + 1, cp_size)
697
  return mc_now < mc_should
698

    
699

    
700
def _CheckNicsBridgesExist(lu, target_nics, target_node,
701
                               profile=constants.PP_DEFAULT):
702
  """Check that the brigdes needed by a list of nics exist.
703

704
  """
705
  c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
706
  paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
707
                for nic in target_nics]
708
  brlist = [params[constants.NIC_LINK] for params in paramslist
709
            if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
710
  if brlist:
711
    result = lu.rpc.call_bridges_exist(target_node, brlist)
712
    result.Raise("Error checking bridges on destination node '%s'" %
713
                 target_node, prereq=True)
714

    
715

    
716
def _CheckInstanceBridgesExist(lu, instance, node=None):
717
  """Check that the brigdes needed by an instance exist.
718

719
  """
720
  if node is None:
721
    node = instance.primary_node
722
  _CheckNicsBridgesExist(lu, instance.nics, node)
723

    
724

    
725
def _CheckOSVariant(os_obj, name):
726
  """Check whether an OS name conforms to the os variants specification.
727

728
  @type os_obj: L{objects.OS}
729
  @param os_obj: OS object to check
730
  @type name: string
731
  @param name: OS name passed by the user, to check for validity
732

733
  """
734
  if not os_obj.supported_variants:
735
    return
736
  try:
737
    variant = name.split("+", 1)[1]
738
  except IndexError:
739
    raise errors.OpPrereqError("OS name must include a variant")
740

    
741
  if variant not in os_obj.supported_variants:
742
    raise errors.OpPrereqError("Unsupported OS variant")
743

    
744

    
745
def _GetNodeInstancesInner(cfg, fn):
746
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
747

    
748

    
749
def _GetNodeInstances(cfg, node_name):
750
  """Returns a list of all primary and secondary instances on a node.
751

752
  """
753

    
754
  return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
755

    
756

    
757
def _GetNodePrimaryInstances(cfg, node_name):
758
  """Returns primary instances on a node.
759

760
  """
761
  return _GetNodeInstancesInner(cfg,
762
                                lambda inst: node_name == inst.primary_node)
763

    
764

    
765
def _GetNodeSecondaryInstances(cfg, node_name):
766
  """Returns secondary instances on a node.
767

768
  """
769
  return _GetNodeInstancesInner(cfg,
770
                                lambda inst: node_name in inst.secondary_nodes)
771

    
772

    
773
def _GetStorageTypeArgs(cfg, storage_type):
774
  """Returns the arguments for a storage type.
775

776
  """
777
  # Special case for file storage
778
  if storage_type == constants.ST_FILE:
779
    # storage.FileStorage wants a list of storage directories
780
    return [[cfg.GetFileStorageDir()]]
781

    
782
  return []
783

    
784

    
785
def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
786
  faulty = []
787

    
788
  for dev in instance.disks:
789
    cfg.SetDiskID(dev, node_name)
790

    
791
  result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks)
792
  result.Raise("Failed to get disk status from node %s" % node_name,
793
               prereq=prereq)
794

    
795
  for idx, bdev_status in enumerate(result.payload):
796
    if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
797
      faulty.append(idx)
798

    
799
  return faulty
800

    
801

    
802
class LUPostInitCluster(LogicalUnit):
803
  """Logical unit for running hooks after cluster initialization.
804

805
  """
806
  HPATH = "cluster-init"
807
  HTYPE = constants.HTYPE_CLUSTER
808
  _OP_REQP = []
809

    
810
  def BuildHooksEnv(self):
811
    """Build hooks env.
812

813
    """
814
    env = {"OP_TARGET": self.cfg.GetClusterName()}
815
    mn = self.cfg.GetMasterNode()
816
    return env, [], [mn]
817

    
818
  def CheckPrereq(self):
819
    """No prerequisites to check.
820

821
    """
822
    return True
823

    
824
  def Exec(self, feedback_fn):
825
    """Nothing to do.
826

827
    """
828
    return True
829

    
830

    
831
class LUDestroyCluster(LogicalUnit):
832
  """Logical unit for destroying the cluster.
833

834
  """
835
  HPATH = "cluster-destroy"
836
  HTYPE = constants.HTYPE_CLUSTER
837
  _OP_REQP = []
838

    
839
  def BuildHooksEnv(self):
840
    """Build hooks env.
841

842
    """
843
    env = {"OP_TARGET": self.cfg.GetClusterName()}
844
    return env, [], []
845

    
846
  def CheckPrereq(self):
847
    """Check prerequisites.
848

849
    This checks whether the cluster is empty.
850

851
    Any errors are signaled by raising errors.OpPrereqError.
852

853
    """
854
    master = self.cfg.GetMasterNode()
855

    
856
    nodelist = self.cfg.GetNodeList()
857
    if len(nodelist) != 1 or nodelist[0] != master:
858
      raise errors.OpPrereqError("There are still %d node(s) in"
859
                                 " this cluster." % (len(nodelist) - 1))
860
    instancelist = self.cfg.GetInstanceList()
861
    if instancelist:
862
      raise errors.OpPrereqError("There are still %d instance(s) in"
863
                                 " this cluster." % len(instancelist))
864

    
865
  def Exec(self, feedback_fn):
866
    """Destroys the cluster.
867

868
    """
869
    master = self.cfg.GetMasterNode()
870
    modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
871

    
872
    # Run post hooks on master node before it's removed
873
    hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
874
    try:
875
      hm.RunPhase(constants.HOOKS_PHASE_POST, [master])
876
    except:
877
      self.LogWarning("Errors occurred running hooks on %s" % master)
878

    
879
    result = self.rpc.call_node_stop_master(master, False)
880
    result.Raise("Could not disable the master role")
881

    
882
    if modify_ssh_setup:
883
      priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
884
      utils.CreateBackup(priv_key)
885
      utils.CreateBackup(pub_key)
886

    
887
    return master
888

    
889

    
890
class LUVerifyCluster(LogicalUnit):
891
  """Verifies the cluster status.
892

893
  """
894
  HPATH = "cluster-verify"
895
  HTYPE = constants.HTYPE_CLUSTER
896
  _OP_REQP = ["skip_checks", "verbose", "error_codes", "debug_simulate_errors"]
897
  REQ_BGL = False
898

    
899
  TCLUSTER = "cluster"
900
  TNODE = "node"
901
  TINSTANCE = "instance"
902

    
903
  ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG")
904
  EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE")
905
  EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN")
906
  EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT")
907
  EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
908
  EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
909
  EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE")
910
  ENODEDRBD = (TNODE, "ENODEDRBD")
911
  ENODEFILECHECK = (TNODE, "ENODEFILECHECK")
912
  ENODEHOOKS = (TNODE, "ENODEHOOKS")
913
  ENODEHV = (TNODE, "ENODEHV")
914
  ENODELVM = (TNODE, "ENODELVM")
915
  ENODEN1 = (TNODE, "ENODEN1")
916
  ENODENET = (TNODE, "ENODENET")
917
  ENODEORPHANINSTANCE = (TNODE, "ENODEORPHANINSTANCE")
918
  ENODEORPHANLV = (TNODE, "ENODEORPHANLV")
919
  ENODERPC = (TNODE, "ENODERPC")
920
  ENODESSH = (TNODE, "ENODESSH")
921
  ENODEVERSION = (TNODE, "ENODEVERSION")
922
  ENODESETUP = (TNODE, "ENODESETUP")
923

    
924
  ETYPE_FIELD = "code"
925
  ETYPE_ERROR = "ERROR"
926
  ETYPE_WARNING = "WARNING"
927

    
928
  def ExpandNames(self):
929
    self.needed_locks = {
930
      locking.LEVEL_NODE: locking.ALL_SET,
931
      locking.LEVEL_INSTANCE: locking.ALL_SET,
932
    }
933
    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
934

    
935
  def _Error(self, ecode, item, msg, *args, **kwargs):
936
    """Format an error message.
937

938
    Based on the opcode's error_codes parameter, either format a
939
    parseable error code, or a simpler error string.
940

941
    This must be called only from Exec and functions called from Exec.
942

943
    """
944
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
945
    itype, etxt = ecode
946
    # first complete the msg
947
    if args:
948
      msg = msg % args
949
    # then format the whole message
950
    if self.op.error_codes:
951
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
952
    else:
953
      if item:
954
        item = " " + item
955
      else:
956
        item = ""
957
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
958
    # and finally report it via the feedback_fn
959
    self._feedback_fn("  - %s" % msg)
960

    
961
  def _ErrorIf(self, cond, *args, **kwargs):
962
    """Log an error message if the passed condition is True.
963

964
    """
965
    cond = bool(cond) or self.op.debug_simulate_errors
966
    if cond:
967
      self._Error(*args, **kwargs)
968
    # do not mark the operation as failed for WARN cases only
969
    if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
970
      self.bad = self.bad or cond
971

    
972
  def _VerifyNode(self, nodeinfo, file_list, local_cksum,
973
                  node_result, master_files, drbd_map, vg_name):
974
    """Run multiple tests against a node.
975

976
    Test list:
977

978
      - compares ganeti version
979
      - checks vg existence and size > 20G
980
      - checks config file checksum
981
      - checks ssh to other nodes
982

983
    @type nodeinfo: L{objects.Node}
984
    @param nodeinfo: the node to check
985
    @param file_list: required list of files
986
    @param local_cksum: dictionary of local files and their checksums
987
    @param node_result: the results from the node
988
    @param master_files: list of files that only masters should have
989
    @param drbd_map: the useddrbd minors for this node, in
990
        form of minor: (instance, must_exist) which correspond to instances
991
        and their running status
992
    @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
993

994
    """
995
    node = nodeinfo.name
996
    _ErrorIf = self._ErrorIf
997

    
998
    # main result, node_result should be a non-empty dict
999
    test = not node_result or not isinstance(node_result, dict)
1000
    _ErrorIf(test, self.ENODERPC, node,
1001
                  "unable to verify node: no data returned")
1002
    if test:
1003
      return
1004

    
1005
    # compares ganeti version
1006
    local_version = constants.PROTOCOL_VERSION
1007
    remote_version = node_result.get('version', None)
1008
    test = not (remote_version and
1009
                isinstance(remote_version, (list, tuple)) and
1010
                len(remote_version) == 2)
1011
    _ErrorIf(test, self.ENODERPC, node,
1012
             "connection to node returned invalid data")
1013
    if test:
1014
      return
1015

    
1016
    test = local_version != remote_version[0]
1017
    _ErrorIf(test, self.ENODEVERSION, node,
1018
             "incompatible protocol versions: master %s,"
1019
             " node %s", local_version, remote_version[0])
1020
    if test:
1021
      return
1022

    
1023
    # node seems compatible, we can actually try to look into its results
1024

    
1025
    # full package version
1026
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1027
                  self.ENODEVERSION, node,
1028
                  "software version mismatch: master %s, node %s",
1029
                  constants.RELEASE_VERSION, remote_version[1],
1030
                  code=self.ETYPE_WARNING)
1031

    
1032
    # checks vg existence and size > 20G
1033
    if vg_name is not None:
1034
      vglist = node_result.get(constants.NV_VGLIST, None)
1035
      test = not vglist
1036
      _ErrorIf(test, self.ENODELVM, node, "unable to check volume groups")
1037
      if not test:
1038
        vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1039
                                              constants.MIN_VG_SIZE)
1040
        _ErrorIf(vgstatus, self.ENODELVM, node, vgstatus)
1041

    
1042
    # checks config file checksum
1043

    
1044
    remote_cksum = node_result.get(constants.NV_FILELIST, None)
1045
    test = not isinstance(remote_cksum, dict)
1046
    _ErrorIf(test, self.ENODEFILECHECK, node,
1047
             "node hasn't returned file checksum data")
1048
    if not test:
1049
      for file_name in file_list:
1050
        node_is_mc = nodeinfo.master_candidate
1051
        must_have = (file_name not in master_files) or node_is_mc
1052
        # missing
1053
        test1 = file_name not in remote_cksum
1054
        # invalid checksum
1055
        test2 = not test1 and remote_cksum[file_name] != local_cksum[file_name]
1056
        # existing and good
1057
        test3 = not test1 and remote_cksum[file_name] == local_cksum[file_name]
1058
        _ErrorIf(test1 and must_have, self.ENODEFILECHECK, node,
1059
                 "file '%s' missing", file_name)
1060
        _ErrorIf(test2 and must_have, self.ENODEFILECHECK, node,
1061
                 "file '%s' has wrong checksum", file_name)
1062
        # not candidate and this is not a must-have file
1063
        _ErrorIf(test2 and not must_have, self.ENODEFILECHECK, node,
1064
                 "file '%s' should not exist on non master"
1065
                 " candidates (and the file is outdated)", file_name)
1066
        # all good, except non-master/non-must have combination
1067
        _ErrorIf(test3 and not must_have, self.ENODEFILECHECK, node,
1068
                 "file '%s' should not exist"
1069
                 " on non master candidates", file_name)
1070

    
1071
    # checks ssh to any
1072

    
1073
    test = constants.NV_NODELIST not in node_result
1074
    _ErrorIf(test, self.ENODESSH, node,
1075
             "node hasn't returned node ssh connectivity data")
1076
    if not test:
1077
      if node_result[constants.NV_NODELIST]:
1078
        for a_node, a_msg in node_result[constants.NV_NODELIST].items():
1079
          _ErrorIf(True, self.ENODESSH, node,
1080
                   "ssh communication with node '%s': %s", a_node, a_msg)
1081

    
1082
    test = constants.NV_NODENETTEST not in node_result
1083
    _ErrorIf(test, self.ENODENET, node,
1084
             "node hasn't returned node tcp connectivity data")
1085
    if not test:
1086
      if node_result[constants.NV_NODENETTEST]:
1087
        nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
1088
        for anode in nlist:
1089
          _ErrorIf(True, self.ENODENET, node,
1090
                   "tcp communication with node '%s': %s",
1091
                   anode, node_result[constants.NV_NODENETTEST][anode])
1092

    
1093
    hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
1094
    if isinstance(hyp_result, dict):
1095
      for hv_name, hv_result in hyp_result.iteritems():
1096
        test = hv_result is not None
1097
        _ErrorIf(test, self.ENODEHV, node,
1098
                 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1099

    
1100
    # check used drbd list
1101
    if vg_name is not None:
1102
      used_minors = node_result.get(constants.NV_DRBDLIST, [])
1103
      test = not isinstance(used_minors, (tuple, list))
1104
      _ErrorIf(test, self.ENODEDRBD, node,
1105
               "cannot parse drbd status file: %s", str(used_minors))
1106
      if not test:
1107
        for minor, (iname, must_exist) in drbd_map.items():
1108
          test = minor not in used_minors and must_exist
1109
          _ErrorIf(test, self.ENODEDRBD, node,
1110
                   "drbd minor %d of instance %s is not active",
1111
                   minor, iname)
1112
        for minor in used_minors:
1113
          test = minor not in drbd_map
1114
          _ErrorIf(test, self.ENODEDRBD, node,
1115
                   "unallocated drbd minor %d is in use", minor)
1116
    test = node_result.get(constants.NV_NODESETUP,
1117
                           ["Missing NODESETUP results"])
1118
    _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s",
1119
             "; ".join(test))
1120

    
1121
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
1122
                      node_instance, n_offline):
1123
    """Verify an instance.
1124

1125
    This function checks to see if the required block devices are
1126
    available on the instance's node.
1127

1128
    """
1129
    _ErrorIf = self._ErrorIf
1130
    node_current = instanceconfig.primary_node
1131

    
1132
    node_vol_should = {}
1133
    instanceconfig.MapLVsByNode(node_vol_should)
1134

    
1135
    for node in node_vol_should:
1136
      if node in n_offline:
1137
        # ignore missing volumes on offline nodes
1138
        continue
1139
      for volume in node_vol_should[node]:
1140
        test = node not in node_vol_is or volume not in node_vol_is[node]
1141
        _ErrorIf(test, self.EINSTANCEMISSINGDISK, instance,
1142
                 "volume %s missing on node %s", volume, node)
1143

    
1144
    if instanceconfig.admin_up:
1145
      test = ((node_current not in node_instance or
1146
               not instance in node_instance[node_current]) and
1147
              node_current not in n_offline)
1148
      _ErrorIf(test, self.EINSTANCEDOWN, instance,
1149
               "instance not running on its primary node %s",
1150
               node_current)
1151

    
1152
    for node in node_instance:
1153
      if (not node == node_current):
1154
        test = instance in node_instance[node]
1155
        _ErrorIf(test, self.EINSTANCEWRONGNODE, instance,
1156
                 "instance should not run on node %s", node)
1157

    
1158
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is):
1159
    """Verify if there are any unknown volumes in the cluster.
1160

1161
    The .os, .swap and backup volumes are ignored. All other volumes are
1162
    reported as unknown.
1163

1164
    """
1165
    for node in node_vol_is:
1166
      for volume in node_vol_is[node]:
1167
        test = (node not in node_vol_should or
1168
                volume not in node_vol_should[node])
1169
        self._ErrorIf(test, self.ENODEORPHANLV, node,
1170
                      "volume %s is unknown", volume)
1171

    
1172
  def _VerifyOrphanInstances(self, instancelist, node_instance):
1173
    """Verify the list of running instances.
1174

1175
    This checks what instances are running but unknown to the cluster.
1176

1177
    """
1178
    for node in node_instance:
1179
      for o_inst in node_instance[node]:
1180
        test = o_inst not in instancelist
1181
        self._ErrorIf(test, self.ENODEORPHANINSTANCE, node,
1182
                      "instance %s on node %s should not exist", o_inst, node)
1183

    
1184
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg):
1185
    """Verify N+1 Memory Resilience.
1186

1187
    Check that if one single node dies we can still start all the instances it
1188
    was primary for.
1189

1190
    """
1191
    for node, nodeinfo in node_info.iteritems():
1192
      # This code checks that every node which is now listed as secondary has
1193
      # enough memory to host all instances it is supposed to should a single
1194
      # other node in the cluster fail.
1195
      # FIXME: not ready for failover to an arbitrary node
1196
      # FIXME: does not support file-backed instances
1197
      # WARNING: we currently take into account down instances as well as up
1198
      # ones, considering that even if they're down someone might want to start
1199
      # them even in the event of a node failure.
1200
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
1201
        needed_mem = 0
1202
        for instance in instances:
1203
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
1204
          if bep[constants.BE_AUTO_BALANCE]:
1205
            needed_mem += bep[constants.BE_MEMORY]
1206
        test = nodeinfo['mfree'] < needed_mem
1207
        self._ErrorIf(test, self.ENODEN1, node,
1208
                      "not enough memory on to accommodate"
1209
                      " failovers should peer node %s fail", prinode)
1210

    
1211
  def CheckPrereq(self):
1212
    """Check prerequisites.
1213

1214
    Transform the list of checks we're going to skip into a set and check that
1215
    all its members are valid.
1216

1217
    """
1218
    self.skip_set = frozenset(self.op.skip_checks)
1219
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
1220
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
1221

    
1222
  def BuildHooksEnv(self):
1223
    """Build hooks env.
1224

1225
    Cluster-Verify hooks just ran in the post phase and their failure makes
1226
    the output be logged in the verify output and the verification to fail.
1227

1228
    """
1229
    all_nodes = self.cfg.GetNodeList()
1230
    env = {
1231
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
1232
      }
1233
    for node in self.cfg.GetAllNodesInfo().values():
1234
      env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
1235

    
1236
    return env, [], all_nodes
1237

    
1238
  def Exec(self, feedback_fn):
1239
    """Verify integrity of cluster, performing various test on nodes.
1240

1241
    """
1242
    self.bad = False
1243
    _ErrorIf = self._ErrorIf
1244
    verbose = self.op.verbose
1245
    self._feedback_fn = feedback_fn
1246
    feedback_fn("* Verifying global settings")
1247
    for msg in self.cfg.VerifyConfig():
1248
      _ErrorIf(True, self.ECLUSTERCFG, None, msg)
1249

    
1250
    vg_name = self.cfg.GetVGName()
1251
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
1252
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
1253
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
1254
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
1255
    instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
1256
                        for iname in instancelist)
1257
    i_non_redundant = [] # Non redundant instances
1258
    i_non_a_balanced = [] # Non auto-balanced instances
1259
    n_offline = [] # List of offline nodes
1260
    n_drained = [] # List of nodes being drained
1261
    node_volume = {}
1262
    node_instance = {}
1263
    node_info = {}
1264
    instance_cfg = {}
1265

    
1266
    # FIXME: verify OS list
1267
    # do local checksums
1268
    master_files = [constants.CLUSTER_CONF_FILE]
1269

    
1270
    file_names = ssconf.SimpleStore().GetFileList()
1271
    file_names.append(constants.SSL_CERT_FILE)
1272
    file_names.append(constants.RAPI_CERT_FILE)
1273
    file_names.extend(master_files)
1274

    
1275
    local_checksums = utils.FingerprintFiles(file_names)
1276

    
1277
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1278
    node_verify_param = {
1279
      constants.NV_FILELIST: file_names,
1280
      constants.NV_NODELIST: [node.name for node in nodeinfo
1281
                              if not node.offline],
1282
      constants.NV_HYPERVISOR: hypervisors,
1283
      constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1284
                                  node.secondary_ip) for node in nodeinfo
1285
                                 if not node.offline],
1286
      constants.NV_INSTANCELIST: hypervisors,
1287
      constants.NV_VERSION: None,
1288
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1289
      constants.NV_NODESETUP: None,
1290
      }
1291
    if vg_name is not None:
1292
      node_verify_param[constants.NV_VGLIST] = None
1293
      node_verify_param[constants.NV_LVLIST] = vg_name
1294
      node_verify_param[constants.NV_DRBDLIST] = None
1295
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1296
                                           self.cfg.GetClusterName())
1297

    
1298
    cluster = self.cfg.GetClusterInfo()
1299
    master_node = self.cfg.GetMasterNode()
1300
    all_drbd_map = self.cfg.ComputeDRBDMap()
1301

    
1302
    feedback_fn("* Verifying node status")
1303
    for node_i in nodeinfo:
1304
      node = node_i.name
1305

    
1306
      if node_i.offline:
1307
        if verbose:
1308
          feedback_fn("* Skipping offline node %s" % (node,))
1309
        n_offline.append(node)
1310
        continue
1311

    
1312
      if node == master_node:
1313
        ntype = "master"
1314
      elif node_i.master_candidate:
1315
        ntype = "master candidate"
1316
      elif node_i.drained:
1317
        ntype = "drained"
1318
        n_drained.append(node)
1319
      else:
1320
        ntype = "regular"
1321
      if verbose:
1322
        feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1323

    
1324
      msg = all_nvinfo[node].fail_msg
1325
      _ErrorIf(msg, self.ENODERPC, node, "while contacting node: %s", msg)
1326
      if msg:
1327
        continue
1328

    
1329
      nresult = all_nvinfo[node].payload
1330
      node_drbd = {}
1331
      for minor, instance in all_drbd_map[node].items():
1332
        test = instance not in instanceinfo
1333
        _ErrorIf(test, self.ECLUSTERCFG, None,
1334
                 "ghost instance '%s' in temporary DRBD map", instance)
1335
          # ghost instance should not be running, but otherwise we
1336
          # don't give double warnings (both ghost instance and
1337
          # unallocated minor in use)
1338
        if test:
1339
          node_drbd[minor] = (instance, False)
1340
        else:
1341
          instance = instanceinfo[instance]
1342
          node_drbd[minor] = (instance.name, instance.admin_up)
1343
      self._VerifyNode(node_i, file_names, local_checksums,
1344
                       nresult, master_files, node_drbd, vg_name)
1345

    
1346
      lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1347
      if vg_name is None:
1348
        node_volume[node] = {}
1349
      elif isinstance(lvdata, basestring):
1350
        _ErrorIf(True, self.ENODELVM, node, "LVM problem on node: %s",
1351
                 utils.SafeEncode(lvdata))
1352
        node_volume[node] = {}
1353
      elif not isinstance(lvdata, dict):
1354
        _ErrorIf(True, self.ENODELVM, node, "rpc call to node failed (lvlist)")
1355
        continue
1356
      else:
1357
        node_volume[node] = lvdata
1358

    
1359
      # node_instance
1360
      idata = nresult.get(constants.NV_INSTANCELIST, None)
1361
      test = not isinstance(idata, list)
1362
      _ErrorIf(test, self.ENODEHV, node,
1363
               "rpc call to node failed (instancelist)")
1364
      if test:
1365
        continue
1366

    
1367
      node_instance[node] = idata
1368

    
1369
      # node_info
1370
      nodeinfo = nresult.get(constants.NV_HVINFO, None)
1371
      test = not isinstance(nodeinfo, dict)
1372
      _ErrorIf(test, self.ENODEHV, node, "rpc call to node failed (hvinfo)")
1373
      if test:
1374
        continue
1375

    
1376
      try:
1377
        node_info[node] = {
1378
          "mfree": int(nodeinfo['memory_free']),
1379
          "pinst": [],
1380
          "sinst": [],
1381
          # dictionary holding all instances this node is secondary for,
1382
          # grouped by their primary node. Each key is a cluster node, and each
1383
          # value is a list of instances which have the key as primary and the
1384
          # current node as secondary.  this is handy to calculate N+1 memory
1385
          # availability if you can only failover from a primary to its
1386
          # secondary.
1387
          "sinst-by-pnode": {},
1388
        }
1389
        # FIXME: devise a free space model for file based instances as well
1390
        if vg_name is not None:
1391
          test = (constants.NV_VGLIST not in nresult or
1392
                  vg_name not in nresult[constants.NV_VGLIST])
1393
          _ErrorIf(test, self.ENODELVM, node,
1394
                   "node didn't return data for the volume group '%s'"
1395
                   " - it is either missing or broken", vg_name)
1396
          if test:
1397
            continue
1398
          node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1399
      except (ValueError, KeyError):
1400
        _ErrorIf(True, self.ENODERPC, node,
1401
                 "node returned invalid nodeinfo, check lvm/hypervisor")
1402
        continue
1403

    
1404
    node_vol_should = {}
1405

    
1406
    feedback_fn("* Verifying instance status")
1407
    for instance in instancelist:
1408
      if verbose:
1409
        feedback_fn("* Verifying instance %s" % instance)
1410
      inst_config = instanceinfo[instance]
1411
      self._VerifyInstance(instance, inst_config, node_volume,
1412
                           node_instance, n_offline)
1413
      inst_nodes_offline = []
1414

    
1415
      inst_config.MapLVsByNode(node_vol_should)
1416

    
1417
      instance_cfg[instance] = inst_config
1418

    
1419
      pnode = inst_config.primary_node
1420
      _ErrorIf(pnode not in node_info and pnode not in n_offline,
1421
               self.ENODERPC, pnode, "instance %s, connection to"
1422
               " primary node failed", instance)
1423
      if pnode in node_info:
1424
        node_info[pnode]['pinst'].append(instance)
1425

    
1426
      if pnode in n_offline:
1427
        inst_nodes_offline.append(pnode)
1428

    
1429
      # If the instance is non-redundant we cannot survive losing its primary
1430
      # node, so we are not N+1 compliant. On the other hand we have no disk
1431
      # templates with more than one secondary so that situation is not well
1432
      # supported either.
1433
      # FIXME: does not support file-backed instances
1434
      if len(inst_config.secondary_nodes) == 0:
1435
        i_non_redundant.append(instance)
1436
      _ErrorIf(len(inst_config.secondary_nodes) > 1,
1437
               self.EINSTANCELAYOUT, instance,
1438
               "instance has multiple secondary nodes", code="WARNING")
1439

    
1440
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1441
        i_non_a_balanced.append(instance)
1442

    
1443
      for snode in inst_config.secondary_nodes:
1444
        _ErrorIf(snode not in node_info and snode not in n_offline,
1445
                 self.ENODERPC, snode,
1446
                 "instance %s, connection to secondary node"
1447
                 "failed", instance)
1448

    
1449
        if snode in node_info:
1450
          node_info[snode]['sinst'].append(instance)
1451
          if pnode not in node_info[snode]['sinst-by-pnode']:
1452
            node_info[snode]['sinst-by-pnode'][pnode] = []
1453
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1454

    
1455
        if snode in n_offline:
1456
          inst_nodes_offline.append(snode)
1457

    
1458
      # warn that the instance lives on offline nodes
1459
      _ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance,
1460
               "instance lives on offline node(s) %s",
1461
               ", ".join(inst_nodes_offline))
1462

    
1463
    feedback_fn("* Verifying orphan volumes")
1464
    self._VerifyOrphanVolumes(node_vol_should, node_volume)
1465

    
1466
    feedback_fn("* Verifying remaining instances")
1467
    self._VerifyOrphanInstances(instancelist, node_instance)
1468

    
1469
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1470
      feedback_fn("* Verifying N+1 Memory redundancy")
1471
      self._VerifyNPlusOneMemory(node_info, instance_cfg)
1472

    
1473
    feedback_fn("* Other Notes")
1474
    if i_non_redundant:
1475
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1476
                  % len(i_non_redundant))
1477

    
1478
    if i_non_a_balanced:
1479
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1480
                  % len(i_non_a_balanced))
1481

    
1482
    if n_offline:
1483
      feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1484

    
1485
    if n_drained:
1486
      feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1487

    
1488
    return not self.bad
1489

    
1490
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1491
    """Analyze the post-hooks' result
1492

1493
    This method analyses the hook result, handles it, and sends some
1494
    nicely-formatted feedback back to the user.
1495

1496
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
1497
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1498
    @param hooks_results: the results of the multi-node hooks rpc call
1499
    @param feedback_fn: function used send feedback back to the caller
1500
    @param lu_result: previous Exec result
1501
    @return: the new Exec result, based on the previous result
1502
        and hook results
1503

1504
    """
1505
    # We only really run POST phase hooks, and are only interested in
1506
    # their results
1507
    if phase == constants.HOOKS_PHASE_POST:
1508
      # Used to change hooks' output to proper indentation
1509
      indent_re = re.compile('^', re.M)
1510
      feedback_fn("* Hooks Results")
1511
      assert hooks_results, "invalid result from hooks"
1512

    
1513
      for node_name in hooks_results:
1514
        show_node_header = True
1515
        res = hooks_results[node_name]
1516
        msg = res.fail_msg
1517
        test = msg and not res.offline
1518
        self._ErrorIf(test, self.ENODEHOOKS, node_name,
1519
                      "Communication failure in hooks execution: %s", msg)
1520
        if test:
1521
          # override manually lu_result here as _ErrorIf only
1522
          # overrides self.bad
1523
          lu_result = 1
1524
          continue
1525
        for script, hkr, output in res.payload:
1526
          test = hkr == constants.HKR_FAIL
1527
          self._ErrorIf(test, self.ENODEHOOKS, node_name,
1528
                        "Script %s failed, output:", script)
1529
          if test:
1530
            output = indent_re.sub('      ', output)
1531
            feedback_fn("%s" % output)
1532
            lu_result = 1
1533

    
1534
      return lu_result
1535

    
1536

    
1537
class LUVerifyDisks(NoHooksLU):
1538
  """Verifies the cluster disks status.
1539

1540
  """
1541
  _OP_REQP = []
1542
  REQ_BGL = False
1543

    
1544
  def ExpandNames(self):
1545
    self.needed_locks = {
1546
      locking.LEVEL_NODE: locking.ALL_SET,
1547
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1548
    }
1549
    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1550

    
1551
  def CheckPrereq(self):
1552
    """Check prerequisites.
1553

1554
    This has no prerequisites.
1555

1556
    """
1557
    pass
1558

    
1559
  def Exec(self, feedback_fn):
1560
    """Verify integrity of cluster disks.
1561

1562
    @rtype: tuple of three items
1563
    @return: a tuple of (dict of node-to-node_error, list of instances
1564
        which need activate-disks, dict of instance: (node, volume) for
1565
        missing volumes
1566

1567
    """
1568
    result = res_nodes, res_instances, res_missing = {}, [], {}
1569

    
1570
    vg_name = self.cfg.GetVGName()
1571
    nodes = utils.NiceSort(self.cfg.GetNodeList())
1572
    instances = [self.cfg.GetInstanceInfo(name)
1573
                 for name in self.cfg.GetInstanceList()]
1574

    
1575
    nv_dict = {}
1576
    for inst in instances:
1577
      inst_lvs = {}
1578
      if (not inst.admin_up or
1579
          inst.disk_template not in constants.DTS_NET_MIRROR):
1580
        continue
1581
      inst.MapLVsByNode(inst_lvs)
1582
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1583
      for node, vol_list in inst_lvs.iteritems():
1584
        for vol in vol_list:
1585
          nv_dict[(node, vol)] = inst
1586

    
1587
    if not nv_dict:
1588
      return result
1589

    
1590
    node_lvs = self.rpc.call_lv_list(nodes, vg_name)
1591

    
1592
    for node in nodes:
1593
      # node_volume
1594
      node_res = node_lvs[node]
1595
      if node_res.offline:
1596
        continue
1597
      msg = node_res.fail_msg
1598
      if msg:
1599
        logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1600
        res_nodes[node] = msg
1601
        continue
1602

    
1603
      lvs = node_res.payload
1604
      for lv_name, (_, lv_inactive, lv_online) in lvs.items():
1605
        inst = nv_dict.pop((node, lv_name), None)
1606
        if (not lv_online and inst is not None
1607
            and inst.name not in res_instances):
1608
          res_instances.append(inst.name)
1609

    
1610
    # any leftover items in nv_dict are missing LVs, let's arrange the
1611
    # data better
1612
    for key, inst in nv_dict.iteritems():
1613
      if inst.name not in res_missing:
1614
        res_missing[inst.name] = []
1615
      res_missing[inst.name].append(key)
1616

    
1617
    return result
1618

    
1619

    
1620
class LURepairDiskSizes(NoHooksLU):
1621
  """Verifies the cluster disks sizes.
1622

1623
  """
1624
  _OP_REQP = ["instances"]
1625
  REQ_BGL = False
1626

    
1627
  def ExpandNames(self):
1628
    if not isinstance(self.op.instances, list):
1629
      raise errors.OpPrereqError("Invalid argument type 'instances'")
1630

    
1631
    if self.op.instances:
1632
      self.wanted_names = []
1633
      for name in self.op.instances:
1634
        full_name = self.cfg.ExpandInstanceName(name)
1635
        if full_name is None:
1636
          raise errors.OpPrereqError("Instance '%s' not known" % name)
1637
        self.wanted_names.append(full_name)
1638
      self.needed_locks = {
1639
        locking.LEVEL_NODE: [],
1640
        locking.LEVEL_INSTANCE: self.wanted_names,
1641
        }
1642
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1643
    else:
1644
      self.wanted_names = None
1645
      self.needed_locks = {
1646
        locking.LEVEL_NODE: locking.ALL_SET,
1647
        locking.LEVEL_INSTANCE: locking.ALL_SET,
1648
        }
1649
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1650

    
1651
  def DeclareLocks(self, level):
1652
    if level == locking.LEVEL_NODE and self.wanted_names is not None:
1653
      self._LockInstancesNodes(primary_only=True)
1654

    
1655
  def CheckPrereq(self):
1656
    """Check prerequisites.
1657

1658
    This only checks the optional instance list against the existing names.
1659

1660
    """
1661
    if self.wanted_names is None:
1662
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
1663

    
1664
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
1665
                             in self.wanted_names]
1666

    
1667
  def _EnsureChildSizes(self, disk):
1668
    """Ensure children of the disk have the needed disk size.
1669

1670
    This is valid mainly for DRBD8 and fixes an issue where the
1671
    children have smaller disk size.
1672

1673
    @param disk: an L{ganeti.objects.Disk} object
1674

1675
    """
1676
    if disk.dev_type == constants.LD_DRBD8:
1677
      assert disk.children, "Empty children for DRBD8?"
1678
      fchild = disk.children[0]
1679
      mismatch = fchild.size < disk.size
1680
      if mismatch:
1681
        self.LogInfo("Child disk has size %d, parent %d, fixing",
1682
                     fchild.size, disk.size)
1683
        fchild.size = disk.size
1684

    
1685
      # and we recurse on this child only, not on the metadev
1686
      return self._EnsureChildSizes(fchild) or mismatch
1687
    else:
1688
      return False
1689

    
1690
  def Exec(self, feedback_fn):
1691
    """Verify the size of cluster disks.
1692

1693
    """
1694
    # TODO: check child disks too
1695
    # TODO: check differences in size between primary/secondary nodes
1696
    per_node_disks = {}
1697
    for instance in self.wanted_instances:
1698
      pnode = instance.primary_node
1699
      if pnode not in per_node_disks:
1700
        per_node_disks[pnode] = []
1701
      for idx, disk in enumerate(instance.disks):
1702
        per_node_disks[pnode].append((instance, idx, disk))
1703

    
1704
    changed = []
1705
    for node, dskl in per_node_disks.items():
1706
      newl = [v[2].Copy() for v in dskl]
1707
      for dsk in newl:
1708
        self.cfg.SetDiskID(dsk, node)
1709
      result = self.rpc.call_blockdev_getsizes(node, newl)
1710
      if result.fail_msg:
1711
        self.LogWarning("Failure in blockdev_getsizes call to node"
1712
                        " %s, ignoring", node)
1713
        continue
1714
      if len(result.data) != len(dskl):
1715
        self.LogWarning("Invalid result from node %s, ignoring node results",
1716
                        node)
1717
        continue
1718
      for ((instance, idx, disk), size) in zip(dskl, result.data):
1719
        if size is None:
1720
          self.LogWarning("Disk %d of instance %s did not return size"
1721
                          " information, ignoring", idx, instance.name)
1722
          continue
1723
        if not isinstance(size, (int, long)):
1724
          self.LogWarning("Disk %d of instance %s did not return valid"
1725
                          " size information, ignoring", idx, instance.name)
1726
          continue
1727
        size = size >> 20
1728
        if size != disk.size:
1729
          self.LogInfo("Disk %d of instance %s has mismatched size,"
1730
                       " correcting: recorded %d, actual %d", idx,
1731
                       instance.name, disk.size, size)
1732
          disk.size = size
1733
          self.cfg.Update(instance, feedback_fn)
1734
          changed.append((instance.name, idx, size))
1735
        if self._EnsureChildSizes(disk):
1736
          self.cfg.Update(instance, feedback_fn)
1737
          changed.append((instance.name, idx, disk.size))
1738
    return changed
1739

    
1740

    
1741
class LURenameCluster(LogicalUnit):
1742
  """Rename the cluster.
1743

1744
  """
1745
  HPATH = "cluster-rename"
1746
  HTYPE = constants.HTYPE_CLUSTER
1747
  _OP_REQP = ["name"]
1748

    
1749
  def BuildHooksEnv(self):
1750
    """Build hooks env.
1751

1752
    """
1753
    env = {
1754
      "OP_TARGET": self.cfg.GetClusterName(),
1755
      "NEW_NAME": self.op.name,
1756
      }
1757
    mn = self.cfg.GetMasterNode()
1758
    return env, [mn], [mn]
1759

    
1760
  def CheckPrereq(self):
1761
    """Verify that the passed name is a valid one.
1762

1763
    """
1764
    hostname = utils.HostInfo(self.op.name)
1765

    
1766
    new_name = hostname.name
1767
    self.ip = new_ip = hostname.ip
1768
    old_name = self.cfg.GetClusterName()
1769
    old_ip = self.cfg.GetMasterIP()
1770
    if new_name == old_name and new_ip == old_ip:
1771
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1772
                                 " cluster has changed")
1773
    if new_ip != old_ip:
1774
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1775
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1776
                                   " reachable on the network. Aborting." %
1777
                                   new_ip)
1778

    
1779
    self.op.name = new_name
1780

    
1781
  def Exec(self, feedback_fn):
1782
    """Rename the cluster.
1783

1784
    """
1785
    clustername = self.op.name
1786
    ip = self.ip
1787

    
1788
    # shutdown the master IP
1789
    master = self.cfg.GetMasterNode()
1790
    result = self.rpc.call_node_stop_master(master, False)
1791
    result.Raise("Could not disable the master role")
1792

    
1793
    try:
1794
      cluster = self.cfg.GetClusterInfo()
1795
      cluster.cluster_name = clustername
1796
      cluster.master_ip = ip
1797
      self.cfg.Update(cluster, feedback_fn)
1798

    
1799
      # update the known hosts file
1800
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1801
      node_list = self.cfg.GetNodeList()
1802
      try:
1803
        node_list.remove(master)
1804
      except ValueError:
1805
        pass
1806
      result = self.rpc.call_upload_file(node_list,
1807
                                         constants.SSH_KNOWN_HOSTS_FILE)
1808
      for to_node, to_result in result.iteritems():
1809
        msg = to_result.fail_msg
1810
        if msg:
1811
          msg = ("Copy of file %s to node %s failed: %s" %
1812
                 (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1813
          self.proc.LogWarning(msg)
1814

    
1815
    finally:
1816
      result = self.rpc.call_node_start_master(master, False, False)
1817
      msg = result.fail_msg
1818
      if msg:
1819
        self.LogWarning("Could not re-enable the master role on"
1820
                        " the master, please restart manually: %s", msg)
1821

    
1822

    
1823
def _RecursiveCheckIfLVMBased(disk):
1824
  """Check if the given disk or its children are lvm-based.
1825

1826
  @type disk: L{objects.Disk}
1827
  @param disk: the disk to check
1828
  @rtype: boolean
1829
  @return: boolean indicating whether a LD_LV dev_type was found or not
1830

1831
  """
1832
  if disk.children:
1833
    for chdisk in disk.children:
1834
      if _RecursiveCheckIfLVMBased(chdisk):
1835
        return True
1836
  return disk.dev_type == constants.LD_LV
1837

    
1838

    
1839
class LUSetClusterParams(LogicalUnit):
1840
  """Change the parameters of the cluster.
1841

1842
  """
1843
  HPATH = "cluster-modify"
1844
  HTYPE = constants.HTYPE_CLUSTER
1845
  _OP_REQP = []
1846
  REQ_BGL = False
1847

    
1848
  def CheckArguments(self):
1849
    """Check parameters
1850

1851
    """
1852
    if not hasattr(self.op, "candidate_pool_size"):
1853
      self.op.candidate_pool_size = None
1854
    if self.op.candidate_pool_size is not None:
1855
      try:
1856
        self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1857
      except (ValueError, TypeError), err:
1858
        raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1859
                                   str(err))
1860
      if self.op.candidate_pool_size < 1:
1861
        raise errors.OpPrereqError("At least one master candidate needed")
1862

    
1863
  def ExpandNames(self):
1864
    # FIXME: in the future maybe other cluster params won't require checking on
1865
    # all nodes to be modified.
1866
    self.needed_locks = {
1867
      locking.LEVEL_NODE: locking.ALL_SET,
1868
    }
1869
    self.share_locks[locking.LEVEL_NODE] = 1
1870

    
1871
  def BuildHooksEnv(self):
1872
    """Build hooks env.
1873

1874
    """
1875
    env = {
1876
      "OP_TARGET": self.cfg.GetClusterName(),
1877
      "NEW_VG_NAME": self.op.vg_name,
1878
      }
1879
    mn = self.cfg.GetMasterNode()
1880
    return env, [mn], [mn]
1881

    
1882
  def CheckPrereq(self):
1883
    """Check prerequisites.
1884

1885
    This checks whether the given params don't conflict and
1886
    if the given volume group is valid.
1887

1888
    """
1889
    if self.op.vg_name is not None and not self.op.vg_name:
1890
      instances = self.cfg.GetAllInstancesInfo().values()
1891
      for inst in instances:
1892
        for disk in inst.disks:
1893
          if _RecursiveCheckIfLVMBased(disk):
1894
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1895
                                       " lvm-based instances exist")
1896

    
1897
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1898

    
1899
    # if vg_name not None, checks given volume group on all nodes
1900
    if self.op.vg_name:
1901
      vglist = self.rpc.call_vg_list(node_list)
1902
      for node in node_list:
1903
        msg = vglist[node].fail_msg
1904
        if msg:
1905
          # ignoring down node
1906
          self.LogWarning("Error while gathering data on node %s"
1907
                          " (ignoring node): %s", node, msg)
1908
          continue
1909
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
1910
                                              self.op.vg_name,
1911
                                              constants.MIN_VG_SIZE)
1912
        if vgstatus:
1913
          raise errors.OpPrereqError("Error on node '%s': %s" %
1914
                                     (node, vgstatus))
1915

    
1916
    self.cluster = cluster = self.cfg.GetClusterInfo()
1917
    # validate params changes
1918
    if self.op.beparams:
1919
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1920
      self.new_beparams = objects.FillDict(
1921
        cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1922

    
1923
    if self.op.nicparams:
1924
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1925
      self.new_nicparams = objects.FillDict(
1926
        cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1927
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
1928

    
1929
    # hypervisor list/parameters
1930
    self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1931
    if self.op.hvparams:
1932
      if not isinstance(self.op.hvparams, dict):
1933
        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1934
      for hv_name, hv_dict in self.op.hvparams.items():
1935
        if hv_name not in self.new_hvparams:
1936
          self.new_hvparams[hv_name] = hv_dict
1937
        else:
1938
          self.new_hvparams[hv_name].update(hv_dict)
1939

    
1940
    if self.op.enabled_hypervisors is not None:
1941
      self.hv_list = self.op.enabled_hypervisors
1942
      if not self.hv_list:
1943
        raise errors.OpPrereqError("Enabled hypervisors list must contain at"
1944
                                   " least one member")
1945
      invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES
1946
      if invalid_hvs:
1947
        raise errors.OpPrereqError("Enabled hypervisors contains invalid"
1948
                                   " entries: %s" % " ,".join(invalid_hvs))
1949
    else:
1950
      self.hv_list = cluster.enabled_hypervisors
1951

    
1952
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1953
      # either the enabled list has changed, or the parameters have, validate
1954
      for hv_name, hv_params in self.new_hvparams.items():
1955
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1956
            (self.op.enabled_hypervisors and
1957
             hv_name in self.op.enabled_hypervisors)):
1958
          # either this is a new hypervisor, or its parameters have changed
1959
          hv_class = hypervisor.GetHypervisor(hv_name)
1960
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1961
          hv_class.CheckParameterSyntax(hv_params)
1962
          _CheckHVParams(self, node_list, hv_name, hv_params)
1963

    
1964
  def Exec(self, feedback_fn):
1965
    """Change the parameters of the cluster.
1966

1967
    """
1968
    if self.op.vg_name is not None:
1969
      new_volume = self.op.vg_name
1970
      if not new_volume:
1971
        new_volume = None
1972
      if new_volume != self.cfg.GetVGName():
1973
        self.cfg.SetVGName(new_volume)
1974
      else:
1975
        feedback_fn("Cluster LVM configuration already in desired"
1976
                    " state, not changing")
1977
    if self.op.hvparams:
1978
      self.cluster.hvparams = self.new_hvparams
1979
    if self.op.enabled_hypervisors is not None:
1980
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1981
    if self.op.beparams:
1982
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1983
    if self.op.nicparams:
1984
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1985

    
1986
    if self.op.candidate_pool_size is not None:
1987
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1988
      # we need to update the pool size here, otherwise the save will fail
1989
      _AdjustCandidatePool(self, [])
1990

    
1991
    self.cfg.Update(self.cluster, feedback_fn)
1992

    
1993

    
1994
def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1995
  """Distribute additional files which are part of the cluster configuration.
1996

1997
  ConfigWriter takes care of distributing the config and ssconf files, but
1998
  there are more files which should be distributed to all nodes. This function
1999
  makes sure those are copied.
2000

2001
  @param lu: calling logical unit
2002
  @param additional_nodes: list of nodes not in the config to distribute to
2003

2004
  """
2005
  # 1. Gather target nodes
2006
  myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
2007
  dist_nodes = lu.cfg.GetNodeList()
2008
  if additional_nodes is not None:
2009
    dist_nodes.extend(additional_nodes)
2010
  if myself.name in dist_nodes:
2011
    dist_nodes.remove(myself.name)
2012

    
2013
  # 2. Gather files to distribute
2014
  dist_files = set([constants.ETC_HOSTS,
2015
                    constants.SSH_KNOWN_HOSTS_FILE,
2016
                    constants.RAPI_CERT_FILE,
2017
                    constants.RAPI_USERS_FILE,
2018
                    constants.HMAC_CLUSTER_KEY,
2019
                   ])
2020

    
2021
  enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
2022
  for hv_name in enabled_hypervisors:
2023
    hv_class = hypervisor.GetHypervisor(hv_name)
2024
    dist_files.update(hv_class.GetAncillaryFiles())
2025

    
2026
  # 3. Perform the files upload
2027
  for fname in dist_files:
2028
    if os.path.exists(fname):
2029
      result = lu.rpc.call_upload_file(dist_nodes, fname)
2030
      for to_node, to_result in result.items():
2031
        msg = to_result.fail_msg
2032
        if msg:
2033
          msg = ("Copy of file %s to node %s failed: %s" %
2034
                 (fname, to_node, msg))
2035
          lu.proc.LogWarning(msg)
2036

    
2037

    
2038
class LURedistributeConfig(NoHooksLU):
2039
  """Force the redistribution of cluster configuration.
2040

2041
  This is a very simple LU.
2042

2043
  """
2044
  _OP_REQP = []
2045
  REQ_BGL = False
2046

    
2047
  def ExpandNames(self):
2048
    self.needed_locks = {
2049
      locking.LEVEL_NODE: locking.ALL_SET,
2050
    }
2051
    self.share_locks[locking.LEVEL_NODE] = 1
2052

    
2053
  def CheckPrereq(self):
2054
    """Check prerequisites.
2055

2056
    """
2057

    
2058
  def Exec(self, feedback_fn):
2059
    """Redistribute the configuration.
2060

2061
    """
2062
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
2063
    _RedistributeAncillaryFiles(self)
2064

    
2065

    
2066
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
2067
  """Sleep and poll for an instance's disk to sync.
2068

2069
  """
2070
  if not instance.disks:
2071
    return True
2072

    
2073
  if not oneshot:
2074
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
2075

    
2076
  node = instance.primary_node
2077

    
2078
  for dev in instance.disks:
2079
    lu.cfg.SetDiskID(dev, node)
2080

    
2081
  retries = 0
2082
  degr_retries = 10 # in seconds, as we sleep 1 second each time
2083
  while True:
2084
    max_time = 0
2085
    done = True
2086
    cumul_degraded = False
2087
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
2088
    msg = rstats.fail_msg
2089
    if msg:
2090
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
2091
      retries += 1
2092
      if retries >= 10:
2093
        raise errors.RemoteError("Can't contact node %s for mirror data,"
2094
                                 " aborting." % node)
2095
      time.sleep(6)
2096
      continue
2097
    rstats = rstats.payload
2098
    retries = 0
2099
    for i, mstat in enumerate(rstats):
2100
      if mstat is None:
2101
        lu.LogWarning("Can't compute data for node %s/%s",
2102
                           node, instance.disks[i].iv_name)
2103
        continue
2104

    
2105
      cumul_degraded = (cumul_degraded or
2106
                        (mstat.is_degraded and mstat.sync_percent is None))
2107
      if mstat.sync_percent is not None:
2108
        done = False
2109
        if mstat.estimated_time is not None:
2110
          rem_time = "%d estimated seconds remaining" % mstat.estimated_time
2111
          max_time = mstat.estimated_time
2112
        else:
2113
          rem_time = "no time estimate"
2114
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
2115
                        (instance.disks[i].iv_name, mstat.sync_percent,
2116
                         rem_time))
2117

    
2118
    # if we're done but degraded, let's do a few small retries, to
2119
    # make sure we see a stable and not transient situation; therefore
2120
    # we force restart of the loop
2121
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
2122
      logging.info("Degraded disks found, %d retries left", degr_retries)
2123
      degr_retries -= 1
2124
      time.sleep(1)
2125
      continue
2126

    
2127
    if done or oneshot:
2128
      break
2129

    
2130
    time.sleep(min(60, max_time))
2131

    
2132
  if done:
2133
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
2134
  return not cumul_degraded
2135

    
2136

    
2137
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
2138
  """Check that mirrors are not degraded.
2139

2140
  The ldisk parameter, if True, will change the test from the
2141
  is_degraded attribute (which represents overall non-ok status for
2142
  the device(s)) to the ldisk (representing the local storage status).
2143

2144
  """
2145
  lu.cfg.SetDiskID(dev, node)
2146

    
2147
  result = True
2148

    
2149
  if on_primary or dev.AssembleOnSecondary():
2150
    rstats = lu.rpc.call_blockdev_find(node, dev)
2151
    msg = rstats.fail_msg
2152
    if msg:
2153
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
2154
      result = False
2155
    elif not rstats.payload:
2156
      lu.LogWarning("Can't find disk on node %s", node)
2157
      result = False
2158
    else:
2159
      if ldisk:
2160
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
2161
      else:
2162
        result = result and not rstats.payload.is_degraded
2163

    
2164
  if dev.children:
2165
    for child in dev.children:
2166
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
2167

    
2168
  return result
2169

    
2170

    
2171
class LUDiagnoseOS(NoHooksLU):
2172
  """Logical unit for OS diagnose/query.
2173

2174
  """
2175
  _OP_REQP = ["output_fields", "names"]
2176
  REQ_BGL = False
2177
  _FIELDS_STATIC = utils.FieldSet()
2178
  _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status", "variants")
2179
  # Fields that need calculation of global os validity
2180
  _FIELDS_NEEDVALID = frozenset(["valid", "variants"])
2181

    
2182
  def ExpandNames(self):
2183
    if self.op.names:
2184
      raise errors.OpPrereqError("Selective OS query not supported")
2185

    
2186
    _CheckOutputFields(static=self._FIELDS_STATIC,
2187
                       dynamic=self._FIELDS_DYNAMIC,
2188
                       selected=self.op.output_fields)
2189

    
2190
    # Lock all nodes, in shared mode
2191
    # Temporary removal of locks, should be reverted later
2192
    # TODO: reintroduce locks when they are lighter-weight
2193
    self.needed_locks = {}
2194
    #self.share_locks[locking.LEVEL_NODE] = 1
2195
    #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2196

    
2197
  def CheckPrereq(self):
2198
    """Check prerequisites.
2199

2200
    """
2201

    
2202
  @staticmethod
2203
  def _DiagnoseByOS(node_list, rlist):
2204
    """Remaps a per-node return list into an a per-os per-node dictionary
2205

2206
    @param node_list: a list with the names of all nodes
2207
    @param rlist: a map with node names as keys and OS objects as values
2208

2209
    @rtype: dict
2210
    @return: a dictionary with osnames as keys and as value another map, with
2211
        nodes as keys and tuples of (path, status, diagnose) as values, eg::
2212

2213
          {"debian-etch": {"node1": [(/usr/lib/..., True, ""),
2214
                                     (/srv/..., False, "invalid api")],
2215
                           "node2": [(/srv/..., True, "")]}
2216
          }
2217

2218
    """
2219
    all_os = {}
2220
    # we build here the list of nodes that didn't fail the RPC (at RPC
2221
    # level), so that nodes with a non-responding node daemon don't
2222
    # make all OSes invalid
2223
    good_nodes = [node_name for node_name in rlist
2224
                  if not rlist[node_name].fail_msg]
2225
    for node_name, nr in rlist.items():
2226
      if nr.fail_msg or not nr.payload:
2227
        continue
2228
      for name, path, status, diagnose, variants in nr.payload:
2229
        if name not in all_os:
2230
          # build a list of nodes for this os containing empty lists
2231
          # for each node in node_list
2232
          all_os[name] = {}
2233
          for nname in good_nodes:
2234
            all_os[name][nname] = []
2235
        all_os[name][node_name].append((path, status, diagnose, variants))
2236
    return all_os
2237

    
2238
  def Exec(self, feedback_fn):
2239
    """Compute the list of OSes.
2240

2241
    """
2242
    valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
2243
    node_data = self.rpc.call_os_diagnose(valid_nodes)
2244
    pol = self._DiagnoseByOS(valid_nodes, node_data)
2245
    output = []
2246
    calc_valid = self._FIELDS_NEEDVALID.intersection(self.op.output_fields)
2247
    calc_variants = "variants" in self.op.output_fields
2248

    
2249
    for os_name, os_data in pol.items():
2250
      row = []
2251
      if calc_valid:
2252
        valid = True
2253
        variants = None
2254
        for osl in os_data.values():
2255
          valid = valid and osl and osl[0][1]
2256
          if not valid:
2257
            variants = None
2258
            break
2259
          if calc_variants:
2260
            node_variants = osl[0][3]
2261
            if variants is None:
2262
              variants = node_variants
2263
            else:
2264
              variants = [v for v in variants if v in node_variants]
2265

    
2266
      for field in self.op.output_fields:
2267
        if field == "name":
2268
          val = os_name
2269
        elif field == "valid":
2270
          val = valid
2271
        elif field == "node_status":
2272
          # this is just a copy of the dict
2273
          val = {}
2274
          for node_name, nos_list in os_data.items():
2275
            val[node_name] = nos_list
2276
        elif field == "variants":
2277
          val =  variants
2278
        else:
2279
          raise errors.ParameterError(field)
2280
        row.append(val)
2281
      output.append(row)
2282

    
2283
    return output
2284

    
2285

    
2286
class LURemoveNode(LogicalUnit):
2287
  """Logical unit for removing a node.
2288

2289
  """
2290
  HPATH = "node-remove"
2291
  HTYPE = constants.HTYPE_NODE
2292
  _OP_REQP = ["node_name"]
2293

    
2294
  def BuildHooksEnv(self):
2295
    """Build hooks env.
2296

2297
    This doesn't run on the target node in the pre phase as a failed
2298
    node would then be impossible to remove.
2299

2300
    """
2301
    env = {
2302
      "OP_TARGET": self.op.node_name,
2303
      "NODE_NAME": self.op.node_name,
2304
      }
2305
    all_nodes = self.cfg.GetNodeList()
2306
    if self.op.node_name in all_nodes:
2307
      all_nodes.remove(self.op.node_name)
2308
    return env, all_nodes, all_nodes
2309

    
2310
  def CheckPrereq(self):
2311
    """Check prerequisites.
2312

2313
    This checks:
2314
     - the node exists in the configuration
2315
     - it does not have primary or secondary instances
2316
     - it's not the master
2317

2318
    Any errors are signaled by raising errors.OpPrereqError.
2319

2320
    """
2321
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
2322
    if node is None:
2323
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
2324

    
2325
    instance_list = self.cfg.GetInstanceList()
2326

    
2327
    masternode = self.cfg.GetMasterNode()
2328
    if node.name == masternode:
2329
      raise errors.OpPrereqError("Node is the master node,"
2330
                                 " you need to failover first.")
2331

    
2332
    for instance_name in instance_list:
2333
      instance = self.cfg.GetInstanceInfo(instance_name)
2334
      if node.name in instance.all_nodes:
2335
        raise errors.OpPrereqError("Instance %s is still running on the node,"
2336
                                   " please remove first." % instance_name)
2337
    self.op.node_name = node.name
2338
    self.node = node
2339

    
2340
  def Exec(self, feedback_fn):
2341
    """Removes the node from the cluster.
2342

2343
    """
2344
    node = self.node
2345
    logging.info("Stopping the node daemon and removing configs from node %s",
2346
                 node.name)
2347

    
2348
    modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
2349

    
2350
    # Promote nodes to master candidate as needed
2351
    _AdjustCandidatePool(self, exceptions=[node.name])
2352
    self.context.RemoveNode(node.name)
2353

    
2354
    # Run post hooks on the node before it's removed
2355
    hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
2356
    try:
2357
      h_results = hm.RunPhase(constants.HOOKS_PHASE_POST, [node.name])
2358
    except:
2359
      self.LogWarning("Errors occurred running hooks on %s" % node.name)
2360

    
2361
    result = self.rpc.call_node_leave_cluster(node.name, modify_ssh_setup)
2362
    msg = result.fail_msg
2363
    if msg:
2364
      self.LogWarning("Errors encountered on the remote node while leaving"
2365
                      " the cluster: %s", msg)
2366

    
2367

    
2368
class LUQueryNodes(NoHooksLU):
2369
  """Logical unit for querying nodes.
2370

2371
  """
2372
  _OP_REQP = ["output_fields", "names", "use_locking"]
2373
  REQ_BGL = False
2374

    
2375
  _SIMPLE_FIELDS = ["name", "serial_no", "ctime", "mtime", "uuid",
2376
                    "master_candidate", "offline", "drained"]
2377

    
2378
  _FIELDS_DYNAMIC = utils.FieldSet(
2379
    "dtotal", "dfree",
2380
    "mtotal", "mnode", "mfree",
2381
    "bootid",
2382
    "ctotal", "cnodes", "csockets",
2383
    )
2384

    
2385
  _FIELDS_STATIC = utils.FieldSet(*[
2386
    "pinst_cnt", "sinst_cnt",
2387
    "pinst_list", "sinst_list",
2388
    "pip", "sip", "tags",
2389
    "master",
2390
    "role"] + _SIMPLE_FIELDS
2391
    )
2392

    
2393
  def ExpandNames(self):
2394
    _CheckOutputFields(static=self._FIELDS_STATIC,
2395
                       dynamic=self._FIELDS_DYNAMIC,
2396
                       selected=self.op.output_fields)
2397

    
2398
    self.needed_locks = {}
2399
    self.share_locks[locking.LEVEL_NODE] = 1
2400

    
2401
    if self.op.names:
2402
      self.wanted = _GetWantedNodes(self, self.op.names)
2403
    else:
2404
      self.wanted = locking.ALL_SET
2405

    
2406
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2407
    self.do_locking = self.do_node_query and self.op.use_locking
2408
    if self.do_locking:
2409
      # if we don't request only static fields, we need to lock the nodes
2410
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
2411

    
2412
  def CheckPrereq(self):
2413
    """Check prerequisites.
2414

2415
    """
2416
    # The validation of the node list is done in the _GetWantedNodes,
2417
    # if non empty, and if empty, there's no validation to do
2418
    pass
2419

    
2420
  def Exec(self, feedback_fn):
2421
    """Computes the list of nodes and their attributes.
2422

2423
    """
2424
    all_info = self.cfg.GetAllNodesInfo()
2425
    if self.do_locking:
2426
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
2427
    elif self.wanted != locking.ALL_SET:
2428
      nodenames = self.wanted
2429
      missing = set(nodenames).difference(all_info.keys())
2430
      if missing:
2431
        raise errors.OpExecError(
2432
          "Some nodes were removed before retrieving their data: %s" % missing)
2433
    else:
2434
      nodenames = all_info.keys()
2435

    
2436
    nodenames = utils.NiceSort(nodenames)
2437
    nodelist = [all_info[name] for name in nodenames]
2438

    
2439
    # begin data gathering
2440

    
2441
    if self.do_node_query:
2442
      live_data = {}
2443
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2444
                                          self.cfg.GetHypervisorType())
2445
      for name in nodenames:
2446
        nodeinfo = node_data[name]
2447
        if not nodeinfo.fail_msg and nodeinfo.payload:
2448
          nodeinfo = nodeinfo.payload
2449
          fn = utils.TryConvert
2450
          live_data[name] = {
2451
            "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2452
            "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2453
            "mfree": fn(int, nodeinfo.get('memory_free', None)),
2454
            "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2455
            "dfree": fn(int, nodeinfo.get('vg_free', None)),
2456
            "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2457
            "bootid": nodeinfo.get('bootid', None),
2458
            "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2459
            "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2460
            }
2461
        else:
2462
          live_data[name] = {}
2463
    else:
2464
      live_data = dict.fromkeys(nodenames, {})
2465

    
2466
    node_to_primary = dict([(name, set()) for name in nodenames])
2467
    node_to_secondary = dict([(name, set()) for name in nodenames])
2468

    
2469
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
2470
                             "sinst_cnt", "sinst_list"))
2471
    if inst_fields & frozenset(self.op.output_fields):
2472
      instancelist = self.cfg.GetInstanceList()
2473

    
2474
      for instance_name in instancelist:
2475
        inst = self.cfg.GetInstanceInfo(instance_name)
2476
        if inst.primary_node in node_to_primary:
2477
          node_to_primary[inst.primary_node].add(inst.name)
2478
        for secnode in inst.secondary_nodes:
2479
          if secnode in node_to_secondary:
2480
            node_to_secondary[secnode].add(inst.name)
2481

    
2482
    master_node = self.cfg.GetMasterNode()
2483

    
2484
    # end data gathering
2485

    
2486
    output = []
2487
    for node in nodelist:
2488
      node_output = []
2489
      for field in self.op.output_fields:
2490
        if field in self._SIMPLE_FIELDS:
2491
          val = getattr(node, field)
2492
        elif field == "pinst_list":
2493
          val = list(node_to_primary[node.name])
2494
        elif field == "sinst_list":
2495
          val = list(node_to_secondary[node.name])
2496
        elif field == "pinst_cnt":
2497
          val = len(node_to_primary[node.name])
2498
        elif field == "sinst_cnt":
2499
          val = len(node_to_secondary[node.name])
2500
        elif field == "pip":
2501
          val = node.primary_ip
2502
        elif field == "sip":
2503
          val = node.secondary_ip
2504
        elif field == "tags":
2505
          val = list(node.GetTags())
2506
        elif field == "master":
2507
          val = node.name == master_node
2508
        elif self._FIELDS_DYNAMIC.Matches(field):
2509
          val = live_data[node.name].get(field, None)
2510
        elif field == "role":
2511
          if node.name == master_node:
2512
            val = "M"
2513
          elif node.master_candidate:
2514
            val = "C"
2515
          elif node.drained:
2516
            val = "D"
2517
          elif node.offline:
2518
            val = "O"
2519
          else:
2520
            val = "R"
2521
        else:
2522
          raise errors.ParameterError(field)
2523
        node_output.append(val)
2524
      output.append(node_output)
2525

    
2526
    return output
2527

    
2528

    
2529
class LUQueryNodeVolumes(NoHooksLU):
2530
  """Logical unit for getting volumes on node(s).
2531

2532
  """
2533
  _OP_REQP = ["nodes", "output_fields"]
2534
  REQ_BGL = False
2535
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2536
  _FIELDS_STATIC = utils.FieldSet("node")
2537

    
2538
  def ExpandNames(self):
2539
    _CheckOutputFields(static=self._FIELDS_STATIC,
2540
                       dynamic=self._FIELDS_DYNAMIC,
2541
                       selected=self.op.output_fields)
2542

    
2543
    self.needed_locks = {}
2544
    self.share_locks[locking.LEVEL_NODE] = 1
2545
    if not self.op.nodes:
2546
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2547
    else:
2548
      self.needed_locks[locking.LEVEL_NODE] = \
2549
        _GetWantedNodes(self, self.op.nodes)
2550

    
2551
  def CheckPrereq(self):
2552
    """Check prerequisites.
2553

2554
    This checks that the fields required are valid output fields.
2555

2556
    """
2557
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2558

    
2559
  def Exec(self, feedback_fn):
2560
    """Computes the list of nodes and their attributes.
2561

2562
    """
2563
    nodenames = self.nodes
2564
    volumes = self.rpc.call_node_volumes(nodenames)
2565

    
2566
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
2567
             in self.cfg.GetInstanceList()]
2568

    
2569
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2570

    
2571
    output = []
2572
    for node in nodenames:
2573
      nresult = volumes[node]
2574
      if nresult.offline:
2575
        continue
2576
      msg = nresult.fail_msg
2577
      if msg:
2578
        self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
2579
        continue
2580

    
2581
      node_vols = nresult.payload[:]
2582
      node_vols.sort(key=lambda vol: vol['dev'])
2583

    
2584
      for vol in node_vols:
2585
        node_output = []
2586
        for field in self.op.output_fields:
2587
          if field == "node":
2588
            val = node
2589
          elif field == "phys":
2590
            val = vol['dev']
2591
          elif field == "vg":
2592
            val = vol['vg']
2593
          elif field == "name":
2594
            val = vol['name']
2595
          elif field == "size":
2596
            val = int(float(vol['size']))
2597
          elif field == "instance":
2598
            for inst in ilist:
2599
              if node not in lv_by_node[inst]:
2600
                continue
2601
              if vol['name'] in lv_by_node[inst][node]:
2602
                val = inst.name
2603
                break
2604
            else:
2605
              val = '-'
2606
          else:
2607
            raise errors.ParameterError(field)
2608
          node_output.append(str(val))
2609

    
2610
        output.append(node_output)
2611

    
2612
    return output
2613

    
2614

    
2615
class LUQueryNodeStorage(NoHooksLU):
2616
  """Logical unit for getting information on storage units on node(s).
2617

2618
  """
2619
  _OP_REQP = ["nodes", "storage_type", "output_fields"]
2620
  REQ_BGL = False
2621
  _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
2622

    
2623
  def ExpandNames(self):
2624
    storage_type = self.op.storage_type
2625

    
2626
    if storage_type not in constants.VALID_STORAGE_TYPES:
2627
      raise errors.OpPrereqError("Unknown storage type: %s" % storage_type)
2628

    
2629
    _CheckOutputFields(static=self._FIELDS_STATIC,
2630
                       dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
2631
                       selected=self.op.output_fields)
2632

    
2633
    self.needed_locks = {}
2634
    self.share_locks[locking.LEVEL_NODE] = 1
2635

    
2636
    if self.op.nodes:
2637
      self.needed_locks[locking.LEVEL_NODE] = \
2638
        _GetWantedNodes(self, self.op.nodes)
2639
    else:
2640
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2641

    
2642
  def CheckPrereq(self):
2643
    """Check prerequisites.
2644

2645
    This checks that the fields required are valid output fields.
2646

2647
    """
2648
    self.op.name = getattr(self.op, "name", None)
2649

    
2650
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2651

    
2652
  def Exec(self, feedback_fn):
2653
    """Computes the list of nodes and their attributes.
2654

2655
    """
2656
    # Always get name to sort by
2657
    if constants.SF_NAME in self.op.output_fields:
2658
      fields = self.op.output_fields[:]
2659
    else:
2660
      fields = [constants.SF_NAME] + self.op.output_fields
2661

    
2662
    # Never ask for node or type as it's only known to the LU
2663
    for extra in [constants.SF_NODE, constants.SF_TYPE]:
2664
      while extra in fields:
2665
        fields.remove(extra)
2666

    
2667
    field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
2668
    name_idx = field_idx[constants.SF_NAME]
2669

    
2670
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2671
    data = self.rpc.call_storage_list(self.nodes,
2672
                                      self.op.storage_type, st_args,
2673
                                      self.op.name, fields)
2674

    
2675
    result = []
2676

    
2677
    for node in utils.NiceSort(self.nodes):
2678
      nresult = data[node]
2679
      if nresult.offline:
2680
        continue
2681

    
2682
      msg = nresult.fail_msg
2683
      if msg:
2684
        self.LogWarning("Can't get storage data from node %s: %s", node, msg)
2685
        continue
2686

    
2687
      rows = dict([(row[name_idx], row) for row in nresult.payload])
2688

    
2689
      for name in utils.NiceSort(rows.keys()):
2690
        row = rows[name]
2691

    
2692
        out = []
2693

    
2694
        for field in self.op.output_fields:
2695
          if field == constants.SF_NODE:
2696
            val = node
2697
          elif field == constants.SF_TYPE:
2698
            val = self.op.storage_type
2699
          elif field in field_idx:
2700
            val = row[field_idx[field]]
2701
          else:
2702
            raise errors.ParameterError(field)
2703

    
2704
          out.append(val)
2705

    
2706
        result.append(out)
2707

    
2708
    return result
2709

    
2710

    
2711
class LUModifyNodeStorage(NoHooksLU):
2712
  """Logical unit for modifying a storage volume on a node.
2713

2714
  """
2715
  _OP_REQP = ["node_name", "storage_type", "name", "changes"]
2716
  REQ_BGL = False
2717

    
2718
  def CheckArguments(self):
2719
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
2720
    if node_name is None:
2721
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2722

    
2723
    self.op.node_name = node_name
2724

    
2725
    storage_type = self.op.storage_type
2726
    if storage_type not in constants.VALID_STORAGE_TYPES:
2727
      raise errors.OpPrereqError("Unknown storage type: %s" % storage_type)
2728

    
2729
  def ExpandNames(self):
2730
    self.needed_locks = {
2731
      locking.LEVEL_NODE: self.op.node_name,
2732
      }
2733

    
2734
  def CheckPrereq(self):
2735
    """Check prerequisites.
2736

2737
    """
2738
    storage_type = self.op.storage_type
2739

    
2740
    try:
2741
      modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
2742
    except KeyError:
2743
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
2744
                                 " modified" % storage_type)
2745

    
2746
    diff = set(self.op.changes.keys()) - modifiable
2747
    if diff:
2748
      raise errors.OpPrereqError("The following fields can not be modified for"
2749
                                 " storage units of type '%s': %r" %
2750
                                 (storage_type, list(diff)))
2751

    
2752
  def Exec(self, feedback_fn):
2753
    """Computes the list of nodes and their attributes.
2754

2755
    """
2756
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2757
    result = self.rpc.call_storage_modify(self.op.node_name,
2758
                                          self.op.storage_type, st_args,
2759
                                          self.op.name, self.op.changes)
2760
    result.Raise("Failed to modify storage unit '%s' on %s" %
2761
                 (self.op.name, self.op.node_name))
2762

    
2763

    
2764
class LUAddNode(LogicalUnit):
2765
  """Logical unit for adding node to the cluster.
2766

2767
  """
2768
  HPATH = "node-add"
2769
  HTYPE = constants.HTYPE_NODE
2770
  _OP_REQP = ["node_name"]
2771

    
2772
  def BuildHooksEnv(self):
2773
    """Build hooks env.
2774

2775
    This will run on all nodes before, and on all nodes + the new node after.
2776

2777
    """
2778
    env = {
2779
      "OP_TARGET": self.op.node_name,
2780
      "NODE_NAME": self.op.node_name,
2781
      "NODE_PIP": self.op.primary_ip,
2782
      "NODE_SIP": self.op.secondary_ip,
2783
      }
2784
    nodes_0 = self.cfg.GetNodeList()
2785
    nodes_1 = nodes_0 + [self.op.node_name, ]
2786
    return env, nodes_0, nodes_1
2787

    
2788
  def CheckPrereq(self):
2789
    """Check prerequisites.
2790

2791
    This checks:
2792
     - the new node is not already in the config
2793
     - it is resolvable
2794
     - its parameters (single/dual homed) matches the cluster
2795

2796
    Any errors are signaled by raising errors.OpPrereqError.
2797

2798
    """
2799
    node_name = self.op.node_name
2800
    cfg = self.cfg
2801

    
2802
    dns_data = utils.HostInfo(node_name)
2803

    
2804
    node = dns_data.name
2805
    primary_ip = self.op.primary_ip = dns_data.ip
2806
    secondary_ip = getattr(self.op, "secondary_ip", None)
2807
    if secondary_ip is None:
2808
      secondary_ip = primary_ip
2809
    if not utils.IsValidIP(secondary_ip):
2810
      raise errors.OpPrereqError("Invalid secondary IP given")
2811
    self.op.secondary_ip = secondary_ip
2812

    
2813
    node_list = cfg.GetNodeList()
2814
    if not self.op.readd and node in node_list:
2815
      raise errors.OpPrereqError("Node %s is already in the configuration" %
2816
                                 node)
2817
    elif self.op.readd and node not in node_list:
2818
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2819

    
2820
    for existing_node_name in node_list:
2821
      existing_node = cfg.GetNodeInfo(existing_node_name)
2822

    
2823
      if self.op.readd and node == existing_node_name:
2824
        if (existing_node.primary_ip != primary_ip or
2825
            existing_node.secondary_ip != secondary_ip):
2826
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
2827
                                     " address configuration as before")
2828
        continue
2829

    
2830
      if (existing_node.primary_ip == primary_ip or
2831
          existing_node.secondary_ip == primary_ip or
2832
          existing_node.primary_ip == secondary_ip or
2833
          existing_node.secondary_ip == secondary_ip):
2834
        raise errors.OpPrereqError("New node ip address(es) conflict with"
2835
                                   " existing node %s" % existing_node.name)
2836

    
2837
    # check that the type of the node (single versus dual homed) is the
2838
    # same as for the master
2839
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2840
    master_singlehomed = myself.secondary_ip == myself.primary_ip
2841
    newbie_singlehomed = secondary_ip == primary_ip
2842
    if master_singlehomed != newbie_singlehomed:
2843
      if master_singlehomed:
2844
        raise errors.OpPrereqError("The master has no private ip but the"
2845
                                   " new node has one")
2846
      else:
2847
        raise errors.OpPrereqError("The master has a private ip but the"
2848
                                   " new node doesn't have one")
2849

    
2850
    # checks reachability
2851
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2852
      raise errors.OpPrereqError("Node not reachable by ping")
2853

    
2854
    if not newbie_singlehomed:
2855
      # check reachability from my secondary ip to newbie's secondary ip
2856
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2857
                           source=myself.secondary_ip):
2858
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2859
                                   " based ping to noded port")
2860

    
2861
    if self.op.readd:
2862
      exceptions = [node]
2863
    else:
2864
      exceptions = []
2865

    
2866
    self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
2867

    
2868
    if self.op.readd:
2869
      self.new_node = self.cfg.GetNodeInfo(node)
2870
      assert self.new_node is not None, "Can't retrieve locked node %s" % node
2871
    else:
2872
      self.new_node = objects.Node(name=node,
2873
                                   primary_ip=primary_ip,
2874
                                   secondary_ip=secondary_ip,
2875
                                   master_candidate=self.master_candidate,
2876
                                   offline=False, drained=False)
2877

    
2878
  def Exec(self, feedback_fn):
2879
    """Adds the new node to the cluster.
2880

2881
    """
2882
    new_node = self.new_node
2883
    node = new_node.name
2884

    
2885
    # for re-adds, reset the offline/drained/master-candidate flags;
2886
    # we need to reset here, otherwise offline would prevent RPC calls
2887
    # later in the procedure; this also means that if the re-add
2888
    # fails, we are left with a non-offlined, broken node
2889
    if self.op.readd:
2890
      new_node.drained = new_node.offline = False
2891
      self.LogInfo("Readding a node, the offline/drained flags were reset")
2892
      # if we demote the node, we do cleanup later in the procedure
2893
      new_node.master_candidate = self.master_candidate
2894

    
2895
    # notify the user about any possible mc promotion
2896
    if new_node.master_candidate:
2897
      self.LogInfo("Node will be a master candidate")
2898

    
2899
    # check connectivity
2900
    result = self.rpc.call_version([node])[node]
2901
    result.Raise("Can't get version information from node %s" % node)
2902
    if constants.PROTOCOL_VERSION == result.payload:
2903
      logging.info("Communication to node %s fine, sw version %s match",
2904
                   node, result.payload)
2905
    else:
2906
      raise errors.OpExecError("Version mismatch master version %s,"
2907
                               " node version %s" %
2908
                               (constants.PROTOCOL_VERSION, result.payload))
2909

    
2910
    # setup ssh on node
2911
    if self.cfg.GetClusterInfo().modify_ssh_setup:
2912
      logging.info("Copy ssh key to node %s", node)
2913
      priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2914
      keyarray = []
2915
      keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2916
                  constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2917
                  priv_key, pub_key]
2918

    
2919
      for i in keyfiles:
2920
        keyarray.append(utils.ReadFile(i))
2921

    
2922
      result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2923
                                      keyarray[2], keyarray[3], keyarray[4],
2924
                                      keyarray[5])
2925
      result.Raise("Cannot transfer ssh keys to the new node")
2926

    
2927
    # Add node to our /etc/hosts, and add key to known_hosts
2928
    if self.cfg.GetClusterInfo().modify_etc_hosts:
2929
      utils.AddHostToEtcHosts(new_node.name)
2930

    
2931
    if new_node.secondary_ip != new_node.primary_ip:
2932
      result = self.rpc.call_node_has_ip_address(new_node.name,
2933
                                                 new_node.secondary_ip)
2934
      result.Raise("Failure checking secondary ip on node %s" % new_node.name,
2935
                   prereq=True)
2936
      if not result.payload:
2937
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2938
                                 " you gave (%s). Please fix and re-run this"
2939
                                 " command." % new_node.secondary_ip)
2940

    
2941
    node_verify_list = [self.cfg.GetMasterNode()]
2942
    node_verify_param = {
2943
      constants.NV_NODELIST: [node],
2944
      # TODO: do a node-net-test as well?
2945
    }
2946

    
2947
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2948
                                       self.cfg.GetClusterName())
2949
    for verifier in node_verify_list:
2950
      result[verifier].Raise("Cannot communicate with node %s" % verifier)
2951
      nl_payload = result[verifier].payload[constants.NV_NODELIST]
2952
      if nl_payload:
2953
        for failed in nl_payload:
2954
          feedback_fn("ssh/hostname verification failed"
2955
                      " (checking from %s): %s" %
2956
                      (verifier, nl_payload[failed]))
2957
        raise errors.OpExecError("ssh/hostname verification failed.")
2958

    
2959
    if self.op.readd:
2960
      _RedistributeAncillaryFiles(self)
2961
      self.context.ReaddNode(new_node)
2962
      # make sure we redistribute the config
2963
      self.cfg.Update(new_node, feedback_fn)
2964
      # and make sure the new node will not have old files around
2965
      if not new_node.master_candidate:
2966
        result = self.rpc.call_node_demote_from_mc(new_node.name)
2967
        msg = result.fail_msg
2968
        if msg:
2969
          self.LogWarning("Node failed to demote itself from master"
2970
                          " candidate status: %s" % msg)
2971
    else:
2972
      _RedistributeAncillaryFiles(self, additional_nodes=[node])
2973
      self.context.AddNode(new_node)
2974

    
2975

    
2976
class LUSetNodeParams(LogicalUnit):
2977
  """Modifies the parameters of a node.
2978

2979
  """
2980
  HPATH = "node-modify"
2981
  HTYPE = constants.HTYPE_NODE
2982
  _OP_REQP = ["node_name"]
2983
  REQ_BGL = False
2984

    
2985
  def CheckArguments(self):
2986
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
2987
    if node_name is None:
2988
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2989
    self.op.node_name = node_name
2990
    _CheckBooleanOpField(self.op, 'master_candidate')
2991
    _CheckBooleanOpField(self.op, 'offline')
2992
    _CheckBooleanOpField(self.op, 'drained')
2993
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2994
    if all_mods.count(None) == 3:
2995
      raise errors.OpPrereqError("Please pass at least one modification")
2996
    if all_mods.count(True) > 1:
2997
      raise errors.OpPrereqError("Can't set the node into more than one"
2998
                                 " state at the same time")
2999

    
3000
  def ExpandNames(self):
3001
    self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
3002

    
3003
  def BuildHooksEnv(self):
3004
    """Build hooks env.
3005

3006
    This runs on the master node.
3007

3008
    """
3009
    env = {
3010
      "OP_TARGET": self.op.node_name,
3011
      "MASTER_CANDIDATE": str(self.op.master_candidate),
3012
      "OFFLINE": str(self.op.offline),
3013
      "DRAINED": str(self.op.drained),
3014
      }
3015
    nl = [self.cfg.GetMasterNode(),
3016
          self.op.node_name]
3017
    return env, nl, nl
3018

    
3019
  def CheckPrereq(self):
3020
    """Check prerequisites.
3021

3022
    This only checks the instance list against the existing names.
3023

3024
    """
3025
    node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
3026

    
3027
    if (self.op.master_candidate is not None or
3028
        self.op.drained is not None or
3029
        self.op.offline is not None):
3030
      # we can't change the master's node flags
3031
      if self.op.node_name == self.cfg.GetMasterNode():
3032
        raise errors.OpPrereqError("The master role can be changed"
3033
                                   " only via masterfailover")
3034

    
3035
    # Boolean value that tells us whether we're offlining or draining the node
3036
    offline_or_drain = self.op.offline == True or self.op.drained == True
3037
    deoffline_or_drain = self.op.offline == False or self.op.drained == False
3038

    
3039
    if (node.master_candidate and
3040
        (self.op.master_candidate == False or offline_or_drain)):
3041
      cp_size = self.cfg.GetClusterInfo().candidate_pool_size
3042
      mc_now, mc_should, mc_max = self.cfg.GetMasterCandidateStats()
3043
      if mc_now <= cp_size:
3044
        msg = ("Not enough master candidates (desired"
3045
               " %d, new value will be %d)" % (cp_size, mc_now-1))
3046
        # Only allow forcing the operation if it's an offline/drain operation,
3047
        # and we could not possibly promote more nodes.
3048
        # FIXME: this can still lead to issues if in any way another node which
3049
        # could be promoted appears in the meantime.
3050
        if self.op.force and offline_or_drain and mc_should == mc_max:
3051
          self.LogWarning(msg)
3052
        else:
3053
          raise errors.OpPrereqError(msg)
3054

    
3055
    if (self.op.master_candidate == True and
3056
        ((node.offline and not self.op.offline == False) or
3057
         (node.drained and not self.op.drained == False))):
3058
      raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
3059
                                 " to master_candidate" % node.name)
3060

    
3061
    # If we're being deofflined/drained, we'll MC ourself if needed
3062
    if (deoffline_or_drain and not offline_or_drain and not
3063
        self.op.master_candidate == True):
3064
      self.op.master_candidate = _DecideSelfPromotion(self)
3065
      if self.op.master_candidate:
3066
        self.LogInfo("Autopromoting node to master candidate")
3067

    
3068
    return
3069

    
3070
  def Exec(self, feedback_fn):
3071
    """Modifies a node.
3072

3073
    """
3074
    node = self.node
3075

    
3076
    result = []
3077
    changed_mc = False
3078

    
3079
    if self.op.offline is not None:
3080
      node.offline = self.op.offline
3081
      result.append(("offline", str(self.op.offline)))
3082
      if self.op.offline == True:
3083
        if node.master_candidate:
3084
          node.master_candidate = False
3085
          changed_mc = True
3086
          result.append(("master_candidate", "auto-demotion due to offline"))
3087
        if node.drained:
3088
          node.drained = False
3089
          result.append(("drained", "clear drained status due to offline"))
3090

    
3091
    if self.op.master_candidate is not None:
3092
      node.master_candidate = self.op.master_candidate
3093
      changed_mc = True
3094
      result.append(("master_candidate", str(self.op.master_candidate)))
3095
      if self.op.master_candidate == False:
3096
        rrc = self.rpc.call_node_demote_from_mc(node.name)
3097
        msg = rrc.fail_msg
3098
        if msg:
3099
          self.LogWarning("Node failed to demote itself: %s" % msg)
3100

    
3101
    if self.op.drained is not None:
3102
      node.drained = self.op.drained
3103
      result.append(("drained", str(self.op.drained)))
3104
      if self.op.drained == True:
3105
        if node.master_candidate:
3106
          node.master_candidate = False
3107
          changed_mc = True
3108
          result.append(("master_candidate", "auto-demotion due to drain"))
3109
          rrc = self.rpc.call_node_demote_from_mc(node.name)
3110
          msg = rrc.fail_msg
3111
          if msg:
3112
            self.LogWarning("Node failed to demote itself: %s" % msg)
3113
        if node.offline:
3114
          node.offline = False
3115
          result.append(("offline", "clear offline status due to drain"))
3116

    
3117
    # this will trigger configuration file update, if needed
3118
    self.cfg.Update(node, feedback_fn)
3119
    # this will trigger job queue propagation or cleanup
3120
    if changed_mc:
3121
      self.context.ReaddNode(node)
3122

    
3123
    return result
3124

    
3125

    
3126
class LUPowercycleNode(NoHooksLU):
3127
  """Powercycles a node.
3128

3129
  """
3130
  _OP_REQP = ["node_name", "force"]
3131
  REQ_BGL = False
3132

    
3133
  def CheckArguments(self):
3134
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
3135
    if node_name is None:
3136
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
3137
    self.op.node_name = node_name
3138
    if node_name == self.cfg.GetMasterNode() and not self.op.force:
3139
      raise errors.OpPrereqError("The node is the master and the force"
3140
                                 " parameter was not set")
3141

    
3142
  def ExpandNames(self):
3143
    """Locking for PowercycleNode.
3144

3145
    This is a last-resort option and shouldn't block on other
3146
    jobs. Therefore, we grab no locks.
3147

3148
    """
3149
    self.needed_locks = {}
3150

    
3151
  def CheckPrereq(self):
3152
    """Check prerequisites.
3153

3154
    This LU has no prereqs.
3155

3156
    """
3157
    pass
3158

    
3159
  def Exec(self, feedback_fn):
3160
    """Reboots a node.
3161

3162
    """
3163
    result = self.rpc.call_node_powercycle(self.op.node_name,
3164
                                           self.cfg.GetHypervisorType())
3165
    result.Raise("Failed to schedule the reboot")
3166
    return result.payload
3167

    
3168

    
3169
class LUQueryClusterInfo(NoHooksLU):
3170
  """Query cluster configuration.
3171

3172
  """
3173
  _OP_REQP = []
3174
  REQ_BGL = False
3175

    
3176
  def ExpandNames(self):
3177
    self.needed_locks = {}
3178

    
3179
  def CheckPrereq(self):
3180
    """No prerequsites needed for this LU.
3181

3182
    """
3183
    pass
3184

    
3185
  def Exec(self, feedback_fn):
3186
    """Return cluster config.
3187

3188
    """
3189
    cluster = self.cfg.GetClusterInfo()
3190
    result = {
3191
      "software_version": constants.RELEASE_VERSION,
3192
      "protocol_version": constants.PROTOCOL_VERSION,
3193
      "config_version": constants.CONFIG_VERSION,
3194
      "os_api_version": max(constants.OS_API_VERSIONS),
3195
      "export_version": constants.EXPORT_VERSION,
3196
      "architecture": (platform.architecture()[0], platform.machine()),
3197
      "name": cluster.cluster_name,
3198
      "master": cluster.master_node,
3199
      "default_hypervisor": cluster.enabled_hypervisors[0],
3200
      "enabled_hypervisors": cluster.enabled_hypervisors,
3201
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
3202
                        for hypervisor_name in cluster.enabled_hypervisors]),
3203
      "beparams": cluster.beparams,
3204
      "nicparams": cluster.nicparams,
3205
      "candidate_pool_size": cluster.candidate_pool_size,
3206
      "master_netdev": cluster.master_netdev,
3207
      "volume_group_name": cluster.volume_group_name,
3208
      "file_storage_dir": cluster.file_storage_dir,
3209
      "ctime": cluster.ctime,
3210
      "mtime": cluster.mtime,
3211
      "uuid": cluster.uuid,
3212
      "tags": list(cluster.GetTags()),
3213
      }
3214

    
3215
    return result
3216

    
3217

    
3218
class LUQueryConfigValues(NoHooksLU):
3219
  """Return configuration values.
3220

3221
  """
3222
  _OP_REQP = []
3223
  REQ_BGL = False
3224
  _FIELDS_DYNAMIC = utils.FieldSet()
3225
  _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag",
3226
                                  "watcher_pause")
3227

    
3228
  def ExpandNames(self):
3229
    self.needed_locks = {}
3230

    
3231
    _CheckOutputFields(static=self._FIELDS_STATIC,
3232
                       dynamic=self._FIELDS_DYNAMIC,
3233
                       selected=self.op.output_fields)
3234

    
3235
  def CheckPrereq(self):
3236
    """No prerequisites.
3237

3238
    """
3239
    pass
3240

    
3241
  def Exec(self, feedback_fn):
3242
    """Dump a representation of the cluster config to the standard output.
3243

3244
    """
3245
    values = []
3246
    for field in self.op.output_fields:
3247
      if field == "cluster_name":
3248
        entry = self.cfg.GetClusterName()
3249
      elif field == "master_node":
3250
        entry = self.cfg.GetMasterNode()
3251
      elif field == "drain_flag":
3252
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
3253
      elif field == "watcher_pause":
3254
        return utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE)
3255
      else:
3256
        raise errors.ParameterError(field)
3257
      values.append(entry)
3258
    return values
3259

    
3260

    
3261
class LUActivateInstanceDisks(NoHooksLU):
3262
  """Bring up an instance's disks.
3263

3264
  """
3265
  _OP_REQP = ["instance_name"]
3266
  REQ_BGL = False
3267

    
3268
  def ExpandNames(self):
3269
    self._ExpandAndLockInstance()
3270
    self.needed_locks[locking.LEVEL_NODE] = []
3271
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3272

    
3273
  def DeclareLocks(self, level):
3274
    if level == locking.LEVEL_NODE:
3275
      self._LockInstancesNodes()
3276

    
3277
  def CheckPrereq(self):
3278
    """Check prerequisites.
3279

3280
    This checks that the instance is in the cluster.
3281

3282
    """
3283
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3284
    assert self.instance is not None, \
3285
      "Cannot retrieve locked instance %s" % self.op.instance_name
3286
    _CheckNodeOnline(self, self.instance.primary_node)
3287
    if not hasattr(self.op, "ignore_size"):
3288
      self.op.ignore_size = False
3289

    
3290
  def Exec(self, feedback_fn):
3291
    """Activate the disks.
3292

3293
    """
3294
    disks_ok, disks_info = \
3295
              _AssembleInstanceDisks(self, self.instance,
3296
                                     ignore_size=self.op.ignore_size)
3297
    if not disks_ok:
3298
      raise errors.OpExecError("Cannot activate block devices")
3299

    
3300
    return disks_info
3301

    
3302

    
3303
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False,
3304
                           ignore_size=False):
3305
  """Prepare the block devices for an instance.
3306

3307
  This sets up the block devices on all nodes.
3308

3309
  @type lu: L{LogicalUnit}
3310
  @param lu: the logical unit on whose behalf we execute
3311
  @type instance: L{objects.Instance}
3312
  @param instance: the instance for whose disks we assemble
3313
  @type ignore_secondaries: boolean
3314
  @param ignore_secondaries: if true, errors on secondary nodes
3315
      won't result in an error return from the function
3316
  @type ignore_size: boolean
3317
  @param ignore_size: if true, the current known size of the disk
3318
      will not be used during the disk activation, useful for cases
3319
      when the size is wrong
3320
  @return: False if the operation failed, otherwise a list of
3321
      (host, instance_visible_name, node_visible_name)
3322
      with the mapping from node devices to instance devices
3323

3324
  """
3325
  device_info = []
3326
  disks_ok = True
3327
  iname = instance.name
3328
  # With the two passes mechanism we try to reduce the window of
3329
  # opportunity for the race condition of switching DRBD to primary
3330
  # before handshaking occured, but we do not eliminate it
3331

    
3332
  # The proper fix would be to wait (with some limits) until the
3333
  # connection has been made and drbd transitions from WFConnection
3334
  # into any other network-connected state (Connected, SyncTarget,
3335
  # SyncSource, etc.)
3336

    
3337
  # 1st pass, assemble on all nodes in secondary mode
3338
  for inst_disk in instance.disks:
3339
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3340
      if ignore_size:
3341
        node_disk = node_disk.Copy()
3342
        node_disk.UnsetSize()
3343
      lu.cfg.SetDiskID(node_disk, node)
3344
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
3345
      msg = result.fail_msg
3346
      if msg:
3347
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
3348
                           " (is_primary=False, pass=1): %s",
3349
                           inst_disk.iv_name, node, msg)
3350
        if not ignore_secondaries:
3351
          disks_ok = False
3352

    
3353
  # FIXME: race condition on drbd migration to primary
3354

    
3355
  # 2nd pass, do only the primary node
3356
  for inst_disk in instance.disks:
3357
    dev_path = None
3358

    
3359
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3360
      if node != instance.primary_node:
3361
        continue
3362
      if ignore_size:
3363
        node_disk = node_disk.Copy()
3364
        node_disk.UnsetSize()
3365
      lu.cfg.SetDiskID(node_disk, node)
3366
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
3367
      msg = result.fail_msg
3368
      if msg:
3369
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
3370
                           " (is_primary=True, pass=2): %s",
3371
                           inst_disk.iv_name, node, msg)
3372
        disks_ok = False
3373
      else:
3374
        dev_path = result.payload
3375

    
3376
    device_info.append((instance.primary_node, inst_disk.iv_name, dev_path))
3377

    
3378
  # leave the disks configured for the primary node
3379
  # this is a workaround that would be fixed better by
3380
  # improving the logical/physical id handling
3381
  for disk in instance.disks:
3382
    lu.cfg.SetDiskID(disk, instance.primary_node)
3383

    
3384
  return disks_ok, device_info
3385

    
3386

    
3387
def _StartInstanceDisks(lu, instance, force):
3388
  """Start the disks of an instance.
3389

3390
  """
3391
  disks_ok, _ = _AssembleInstanceDisks(lu, instance,
3392
                                           ignore_secondaries=force)
3393
  if not disks_ok:
3394
    _ShutdownInstanceDisks(lu, instance)
3395
    if force is not None and not force:
3396
      lu.proc.LogWarning("", hint="If the message above refers to a"
3397
                         " secondary node,"
3398
                         " you can retry the operation using '--force'.")
3399
    raise errors.OpExecError("Disk consistency error")
3400

    
3401

    
3402
class LUDeactivateInstanceDisks(NoHooksLU):
3403
  """Shutdown an instance's disks.
3404

3405
  """
3406
  _OP_REQP = ["instance_name"]
3407
  REQ_BGL = False
3408

    
3409
  def ExpandNames(self):
3410
    self._ExpandAndLockInstance()
3411
    self.needed_locks[locking.LEVEL_NODE] = []
3412
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3413

    
3414
  def DeclareLocks(self, level):
3415
    if level == locking.LEVEL_NODE:
3416
      self._LockInstancesNodes()
3417

    
3418
  def CheckPrereq(self):
3419
    """Check prerequisites.
3420

3421
    This checks that the instance is in the cluster.
3422

3423
    """
3424
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3425
    assert self.instance is not None, \
3426
      "Cannot retrieve locked instance %s" % self.op.instance_name
3427

    
3428
  def Exec(self, feedback_fn):
3429
    """Deactivate the disks
3430

3431
    """
3432
    instance = self.instance
3433
    _SafeShutdownInstanceDisks(self, instance)
3434

    
3435

    
3436
def _SafeShutdownInstanceDisks(lu, instance):
3437
  """Shutdown block devices of an instance.
3438

3439
  This function checks if an instance is running, before calling
3440
  _ShutdownInstanceDisks.
3441

3442
  """
3443
  pnode = instance.primary_node
3444
  ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
3445
  ins_l.Raise("Can't contact node %s" % pnode)
3446

    
3447
  if instance.name in ins_l.payload:
3448
    raise errors.OpExecError("Instance is running, can't shutdown"
3449
                             " block devices.")
3450

    
3451
  _ShutdownInstanceDisks(lu, instance)
3452

    
3453

    
3454
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
3455
  """Shutdown block devices of an instance.
3456

3457
  This does the shutdown on all nodes of the instance.
3458

3459
  If the ignore_primary is false, errors on the primary node are
3460
  ignored.
3461

3462
  """
3463
  all_result = True
3464
  for disk in instance.disks:
3465
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
3466
      lu.cfg.SetDiskID(top_disk, node)
3467
      result = lu.rpc.call_blockdev_shutdown(node, top_disk)
3468
      msg = result.fail_msg
3469
      if msg:
3470
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
3471
                      disk.iv_name, node, msg)
3472
        if not ignore_primary or node != instance.primary_node:
3473
          all_result = False
3474
  return all_result
3475

    
3476

    
3477
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
3478
  """Checks if a node has enough free memory.
3479

3480
  This function check if a given node has the needed amount of free
3481
  memory. In case the node has less memory or we cannot get the
3482
  information from the node, this function raise an OpPrereqError
3483
  exception.
3484

3485
  @type lu: C{LogicalUnit}
3486
  @param lu: a logical unit from which we get configuration data
3487
  @type node: C{str}
3488
  @param node: the node to check
3489
  @type reason: C{str}
3490
  @param reason: string to use in the error message
3491
  @type requested: C{int}
3492
  @param requested: the amount of memory in MiB to check for
3493
  @type hypervisor_name: C{str}
3494
  @param hypervisor_name: the hypervisor to ask for memory stats
3495
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
3496
      we cannot check the node
3497

3498
  """
3499
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
3500
  nodeinfo[node].Raise("Can't get data from node %s" % node, prereq=True)
3501
  free_mem = nodeinfo[node].payload.get('memory_free', None)
3502
  if not isinstance(free_mem, int):
3503
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
3504
                               " was '%s'" % (node, free_mem))
3505
  if requested > free_mem:
3506
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
3507
                               " needed %s MiB, available %s MiB" %
3508
                               (node, reason, requested, free_mem))
3509

    
3510

    
3511
class LUStartupInstance(LogicalUnit):
3512
  """Starts an instance.
3513

3514
  """
3515
  HPATH = "instance-start"
3516
  HTYPE = constants.HTYPE_INSTANCE
3517
  _OP_REQP = ["instance_name", "force"]
3518
  REQ_BGL = False
3519

    
3520
  def ExpandNames(self):
3521
    self._ExpandAndLockInstance()
3522

    
3523
  def BuildHooksEnv(self):
3524
    """Build hooks env.
3525

3526
    This runs on master, primary and secondary nodes of the instance.
3527

3528
    """
3529
    env = {
3530
      "FORCE": self.op.force,
3531
      }
3532
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3533
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3534
    return env, nl, nl
3535

    
3536
  def CheckPrereq(self):
3537
    """Check prerequisites.
3538

3539
    This checks that the instance is in the cluster.
3540

3541
    """
3542
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3543
    assert self.instance is not None, \
3544
      "Cannot retrieve locked instance %s" % self.op.instance_name
3545

    
3546
    # extra beparams
3547
    self.beparams = getattr(self.op, "beparams", {})
3548
    if self.beparams:
3549
      if not isinstance(self.beparams, dict):
3550
        raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
3551
                                   " dict" % (type(self.beparams), ))
3552
      # fill the beparams dict
3553
      utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
3554
      self.op.beparams = self.beparams
3555

    
3556
    # extra hvparams
3557
    self.hvparams = getattr(self.op, "hvparams", {})
3558
    if self.hvparams:
3559
      if not isinstance(self.hvparams, dict):
3560
        raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
3561
                                   " dict" % (type(self.hvparams), ))
3562

    
3563
      # check hypervisor parameter syntax (locally)
3564
      cluster = self.cfg.GetClusterInfo()
3565
      utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
3566
      filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
3567
                                    instance.hvparams)
3568
      filled_hvp.update(self.hvparams)
3569
      hv_type = hypervisor.GetHypervisor(instance.hypervisor)
3570
      hv_type.CheckParameterSyntax(filled_hvp)
3571
      _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
3572
      self.op.hvparams = self.hvparams
3573

    
3574
    _CheckNodeOnline(self, instance.primary_node)
3575

    
3576
    bep = self.cfg.GetClusterInfo().FillBE(instance)
3577
    # check bridges existence
3578
    _CheckInstanceBridgesExist(self, instance)
3579

    
3580
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3581
                                              instance.name,
3582
                                              instance.hypervisor)
3583
    remote_info.Raise("Error checking node %s" % instance.primary_node,
3584
                      prereq=True)
3585
    if not remote_info.payload: # not running already
3586
      _CheckNodeFreeMemory(self, instance.primary_node,
3587
                           "starting instance %s" % instance.name,
3588
                           bep[constants.BE_MEMORY], instance.hypervisor)
3589

    
3590
  def Exec(self, feedback_fn):
3591
    """Start the instance.
3592

3593
    """
3594
    instance = self.instance
3595
    force = self.op.force
3596

    
3597
    self.cfg.MarkInstanceUp(instance.name)
3598

    
3599
    node_current = instance.primary_node
3600

    
3601
    _StartInstanceDisks(self, instance, force)
3602

    
3603
    result = self.rpc.call_instance_start(node_current, instance,
3604
                                          self.hvparams, self.beparams)
3605
    msg = result.fail_msg
3606
    if msg:
3607
      _ShutdownInstanceDisks(self, instance)
3608
      raise errors.OpExecError("Could not start instance: %s" % msg)
3609

    
3610

    
3611
class LURebootInstance(LogicalUnit):
3612
  """Reboot an instance.
3613

3614
  """
3615
  HPATH = "instance-reboot"
3616
  HTYPE = constants.HTYPE_INSTANCE
3617
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
3618
  REQ_BGL = False
3619

    
3620
  def CheckArguments(self):
3621
    """Check the arguments.
3622

3623
    """
3624
    self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
3625
                                    constants.DEFAULT_SHUTDOWN_TIMEOUT)
3626

    
3627
  def ExpandNames(self):
3628
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
3629
                                   constants.INSTANCE_REBOOT_HARD,
3630
                                   constants.INSTANCE_REBOOT_FULL]:
3631
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
3632
                                  (constants.INSTANCE_REBOOT_SOFT,
3633
                                   constants.INSTANCE_REBOOT_HARD,
3634
                                   constants.INSTANCE_REBOOT_FULL))
3635
    self._ExpandAndLockInstance()
3636

    
3637
  def BuildHooksEnv(self):
3638
    """Build hooks env.
3639

3640
    This runs on master, primary and secondary nodes of the instance.
3641

3642
    """
3643
    env = {
3644
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
3645
      "REBOOT_TYPE": self.op.reboot_type,
3646
      "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
3647
      }
3648
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3649
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3650
    return env, nl, nl
3651

    
3652
  def CheckPrereq(self):
3653
    """Check prerequisites.
3654

3655
    This checks that the instance is in the cluster.
3656

3657
    """
3658
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3659
    assert self.instance is not None, \
3660
      "Cannot retrieve locked instance %s" % self.op.instance_name
3661

    
3662
    _CheckNodeOnline(self, instance.primary_node)
3663

    
3664
    # check bridges existence
3665
    _CheckInstanceBridgesExist(self, instance)
3666

    
3667
  def Exec(self, feedback_fn):
3668
    """Reboot the instance.
3669

3670
    """
3671
    instance = self.instance
3672
    ignore_secondaries = self.op.ignore_secondaries
3673
    reboot_type = self.op.reboot_type
3674

    
3675
    node_current = instance.primary_node
3676

    
3677
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3678
                       constants.INSTANCE_REBOOT_HARD]:
3679
      for disk in instance.disks:
3680
        self.cfg.SetDiskID(disk, node_current)
3681
      result = self.rpc.call_instance_reboot(node_current, instance,
3682
                                             reboot_type,
3683
                                             self.shutdown_timeout)
3684
      result.Raise("Could not reboot instance")
3685
    else:
3686
      result = self.rpc.call_instance_shutdown(node_current, instance,
3687
                                               self.shutdown_timeout)
3688
      result.Raise("Could not shutdown instance for full reboot")
3689
      _ShutdownInstanceDisks(self, instance)
3690
      _StartInstanceDisks(self, instance, ignore_secondaries)
3691
      result = self.rpc.call_instance_start(node_current, instance, None, None)
3692
      msg = result.fail_msg
3693
      if msg:
3694
        _ShutdownInstanceDisks(self, instance)
3695
        raise errors.OpExecError("Could not start instance for"
3696
                                 " full reboot: %s" % msg)
3697

    
3698
    self.cfg.MarkInstanceUp(instance.name)
3699

    
3700

    
3701
class LUShutdownInstance(LogicalUnit):
3702
  """Shutdown an instance.
3703

3704
  """
3705
  HPATH = "instance-stop"
3706
  HTYPE = constants.HTYPE_INSTANCE
3707
  _OP_REQP = ["instance_name"]
3708
  REQ_BGL = False
3709

    
3710
  def CheckArguments(self):
3711
    """Check the arguments.
3712

3713
    """
3714
    self.timeout = getattr(self.op, "timeout",
3715
                           constants.DEFAULT_SHUTDOWN_TIMEOUT)
3716

    
3717
  def ExpandNames(self):
3718
    self._ExpandAndLockInstance()
3719

    
3720
  def BuildHooksEnv(self):
3721
    """Build hooks env.
3722

3723
    This runs on master, primary and secondary nodes of the instance.
3724

3725
    """
3726
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3727
    env["TIMEOUT"] = self.timeout
3728
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3729
    return env, nl, nl
3730

    
3731
  def CheckPrereq(self):
3732
    """Check prerequisites.
3733

3734
    This checks that the instance is in the cluster.
3735

3736
    """
3737
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3738
    assert self.instance is not None, \
3739
      "Cannot retrieve locked instance %s" % self.op.instance_name
3740
    _CheckNodeOnline(self, self.instance.primary_node)
3741

    
3742
  def Exec(self, feedback_fn):
3743
    """Shutdown the instance.
3744

3745
    """
3746
    instance = self.instance
3747
    node_current = instance.primary_node
3748
    timeout = self.timeout
3749
    self.cfg.MarkInstanceDown(instance.name)
3750
    result = self.rpc.call_instance_shutdown(node_current, instance, timeout)
3751
    msg = result.fail_msg
3752
    if msg:
3753
      self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3754

    
3755
    _ShutdownInstanceDisks(self, instance)
3756

    
3757

    
3758
class LUReinstallInstance(LogicalUnit):
3759
  """Reinstall an instance.
3760

3761
  """
3762
  HPATH = "instance-reinstall"
3763
  HTYPE = constants.HTYPE_INSTANCE
3764
  _OP_REQP = ["instance_name"]
3765
  REQ_BGL = False
3766

    
3767
  def ExpandNames(self):
3768
    self._ExpandAndLockInstance()
3769

    
3770
  def BuildHooksEnv(self):
3771
    """Build hooks env.
3772

3773
    This runs on master, primary and secondary nodes of the instance.
3774

3775
    """
3776
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3777
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3778
    return env, nl, nl
3779

    
3780
  def CheckPrereq(self):
3781
    """Check prerequisites.
3782

3783
    This checks that the instance is in the cluster and is not running.
3784

3785
    """
3786
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3787
    assert instance is not None, \
3788
      "Cannot retrieve locked instance %s" % self.op.instance_name
3789
    _CheckNodeOnline(self, instance.primary_node)
3790

    
3791
    if instance.disk_template == constants.DT_DISKLESS:
3792
      raise errors.OpPrereqError("Instance '%s' has no disks" %
3793
                                 self.op.instance_name)
3794
    if instance.admin_up:
3795
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3796
                                 self.op.instance_name)
3797
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3798
                                              instance.name,
3799
                                              instance.hypervisor)
3800
    remote_info.Raise("Error checking node %s" % instance.primary_node,
3801
                      prereq=True)
3802
    if remote_info.payload:
3803
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3804
                                 (self.op.instance_name,
3805
                                  instance.primary_node))
3806

    
3807
    self.op.os_type = getattr(self.op, "os_type", None)
3808
    self.op.force_variant = getattr(self.op, "force_variant", False)
3809
    if self.op.os_type is not None:
3810
      # OS verification
3811
      pnode = self.cfg.GetNodeInfo(
3812
        self.cfg.ExpandNodeName(instance.primary_node))
3813
      if pnode is None:
3814
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
3815
                                   self.op.pnode)
3816
      result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3817
      result.Raise("OS '%s' not in supported OS list for primary node %s" %
3818
                   (self.op.os_type, pnode.name), prereq=True)
3819
      if not self.op.force_variant:
3820
        _CheckOSVariant(result.payload, self.op.os_type)
3821

    
3822
    self.instance = instance
3823

    
3824
  def Exec(self, feedback_fn):
3825
    """Reinstall the instance.
3826

3827
    """
3828
    inst = self.instance
3829

    
3830
    if self.op.os_type is not None:
3831
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3832
      inst.os = self.op.os_type
3833
      self.cfg.Update(inst, feedback_fn)
3834

    
3835
    _StartInstanceDisks(self, inst, None)
3836
    try:
3837
      feedback_fn("Running the instance OS create scripts...")
3838
      result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3839
      result.Raise("Could not install OS for instance %s on node %s" %
3840
                   (inst.name, inst.primary_node))
3841
    finally:
3842
      _ShutdownInstanceDisks(self, inst)
3843

    
3844

    
3845
class LURecreateInstanceDisks(LogicalUnit):
3846
  """Recreate an instance's missing disks.
3847

3848
  """
3849
  HPATH = "instance-recreate-disks"
3850
  HTYPE = constants.HTYPE_INSTANCE
3851
  _OP_REQP = ["instance_name", "disks"]
3852
  REQ_BGL = False
3853

    
3854
  def CheckArguments(self):
3855
    """Check the arguments.
3856

3857
    """
3858
    if not isinstance(self.op.disks, list):
3859
      raise errors.OpPrereqError("Invalid disks parameter")
3860
    for item in self.op.disks:
3861
      if (not isinstance(item, int) or
3862
          item < 0):
3863
        raise errors.OpPrereqError("Invalid disk specification '%s'" %
3864
                                   str(item))
3865

    
3866
  def ExpandNames(self):
3867
    self._ExpandAndLockInstance()
3868

    
3869
  def BuildHooksEnv(self):
3870
    """Build hooks env.
3871

3872
    This runs on master, primary and secondary nodes of the instance.
3873

3874
    """
3875
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3876
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3877
    return env, nl, nl
3878

    
3879
  def CheckPrereq(self):
3880
    """Check prerequisites.
3881

3882
    This checks that the instance is in the cluster and is not running.
3883

3884
    """
3885
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3886
    assert instance is not None, \
3887
      "Cannot retrieve locked instance %s" % self.op.instance_name
3888
    _CheckNodeOnline(self, instance.primary_node)
3889

    
3890
    if instance.disk_template == constants.DT_DISKLESS:
3891
      raise errors.OpPrereqError("Instance '%s' has no disks" %
3892
                                 self.op.instance_name)
3893
    if instance.admin_up:
3894
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3895
                                 self.op.instance_name)
3896
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3897
                                              instance.name,
3898
                                              instance.hypervisor)
3899
    remote_info.Raise("Error checking node %s" % instance.primary_node,
3900
                      prereq=True)
3901
    if remote_info.payload:
3902
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3903
                                 (self.op.instance_name,
3904
                                  instance.primary_node))
3905

    
3906
    if not self.op.disks:
3907
      self.op.disks = range(len(instance.disks))
3908
    else:
3909
      for idx in self.op.disks:
3910
        if idx >= len(instance.disks):
3911
          raise errors.OpPrereqError("Invalid disk index passed '%s'" % idx)
3912

    
3913
    self.instance = instance
3914

    
3915
  def Exec(self, feedback_fn):
3916
    """Recreate the disks.
3917

3918
    """
3919
    to_skip = []
3920
    for idx, disk in enumerate(self.instance.disks):
3921
      if idx not in self.op.disks: # disk idx has not been passed in
3922
        to_skip.append(idx)
3923
        continue
3924

    
3925
    _CreateDisks(self, self.instance, to_skip=to_skip)
3926

    
3927

    
3928
class LURenameInstance(LogicalUnit):
3929
  """Rename an instance.
3930

3931
  """
3932
  HPATH = "instance-rename"
3933
  HTYPE = constants.HTYPE_INSTANCE
3934
  _OP_REQP = ["instance_name", "new_name"]
3935

    
3936
  def BuildHooksEnv(self):
3937
    """Build hooks env.
3938

3939
    This runs on master, primary and secondary nodes of the instance.
3940

3941
    """
3942
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3943
    env["INSTANCE_NEW_NAME"] = self.op.new_name
3944
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3945
    return env, nl, nl
3946

    
3947
  def CheckPrereq(self):
3948
    """Check prerequisites.
3949

3950
    This checks that the instance is in the cluster and is not running.
3951

3952
    """
3953
    instance = self.cfg.GetInstanceInfo(
3954
      self.cfg.ExpandInstanceName(self.op.instance_name))
3955
    if instance is None:
3956
      raise errors.OpPrereqError("Instance '%s' not known" %
3957
                                 self.op.instance_name)
3958
    _CheckNodeOnline(self, instance.primary_node)
3959

    
3960
    if instance.admin_up:
3961
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3962
                                 self.op.instance_name)
3963
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3964
                                              instance.name,
3965
                                              instance.hypervisor)
3966
    remote_info.Raise("Error checking node %s" % instance.primary_node,
3967
                      prereq=True)
3968
    if remote_info.payload:
3969
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3970
                                 (self.op.instance_name,
3971
                                  instance.primary_node))
3972
    self.instance = instance
3973

    
3974
    # new name verification
3975
    name_info = utils.HostInfo(self.op.new_name)
3976

    
3977
    self.op.new_name = new_name = name_info.name
3978
    instance_list = self.cfg.GetInstanceList()
3979
    if new_name in instance_list:
3980
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3981
                                 new_name)
3982

    
3983
    if not getattr(self.op, "ignore_ip", False):
3984
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3985
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3986
                                   (name_info.ip, new_name))
3987

    
3988

    
3989
  def Exec(self, feedback_fn):
3990
    """Reinstall the instance.
3991

3992
    """
3993
    inst = self.instance
3994
    old_name = inst.name