Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 60975797

History | View | Annotate | Download (250.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import time
29
import tempfile
30
import re
31
import platform
32
import logging
33
import copy
34
import random
35

    
36
from ganeti import ssh
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import locking
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import serializer
45
from ganeti import ssconf
46

    
47

    
48
class LogicalUnit(object):
49
  """Logical Unit base class.
50

51
  Subclasses must follow these rules:
52
    - implement ExpandNames
53
    - implement CheckPrereq
54
    - implement Exec
55
    - implement BuildHooksEnv
56
    - redefine HPATH and HTYPE
57
    - optionally redefine their run requirements:
58
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
59

60
  Note that all commands require root permissions.
61

62
  """
63
  HPATH = None
64
  HTYPE = None
65
  _OP_REQP = []
66
  REQ_BGL = True
67

    
68
  def __init__(self, processor, op, context, rpc):
69
    """Constructor for LogicalUnit.
70

71
    This needs to be overriden in derived classes in order to check op
72
    validity.
73

74
    """
75
    self.proc = processor
76
    self.op = op
77
    self.cfg = context.cfg
78
    self.context = context
79
    self.rpc = rpc
80
    # Dicts used to declare locking needs to mcpu
81
    self.needed_locks = None
82
    self.acquired_locks = {}
83
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
84
    self.add_locks = {}
85
    self.remove_locks = {}
86
    # Used to force good behavior when calling helper functions
87
    self.recalculate_locks = {}
88
    self.__ssh = None
89
    # logging
90
    self.LogWarning = processor.LogWarning
91
    self.LogInfo = processor.LogInfo
92

    
93
    for attr_name in self._OP_REQP:
94
      attr_val = getattr(op, attr_name, None)
95
      if attr_val is None:
96
        raise errors.OpPrereqError("Required parameter '%s' missing" %
97
                                   attr_name)
98
    self.CheckArguments()
99

    
100
  def __GetSSH(self):
101
    """Returns the SshRunner object
102

103
    """
104
    if not self.__ssh:
105
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
106
    return self.__ssh
107

    
108
  ssh = property(fget=__GetSSH)
109

    
110
  def CheckArguments(self):
111
    """Check syntactic validity for the opcode arguments.
112

113
    This method is for doing a simple syntactic check and ensure
114
    validity of opcode parameters, without any cluster-related
115
    checks. While the same can be accomplished in ExpandNames and/or
116
    CheckPrereq, doing these separate is better because:
117

118
      - ExpandNames is left as as purely a lock-related function
119
      - CheckPrereq is run after we have aquired locks (and possible
120
        waited for them)
121

122
    The function is allowed to change the self.op attribute so that
123
    later methods can no longer worry about missing parameters.
124

125
    """
126
    pass
127

    
128
  def ExpandNames(self):
129
    """Expand names for this LU.
130

131
    This method is called before starting to execute the opcode, and it should
132
    update all the parameters of the opcode to their canonical form (e.g. a
133
    short node name must be fully expanded after this method has successfully
134
    completed). This way locking, hooks, logging, ecc. can work correctly.
135

136
    LUs which implement this method must also populate the self.needed_locks
137
    member, as a dict with lock levels as keys, and a list of needed lock names
138
    as values. Rules:
139

140
      - use an empty dict if you don't need any lock
141
      - if you don't need any lock at a particular level omit that level
142
      - don't put anything for the BGL level
143
      - if you want all locks at a level use locking.ALL_SET as a value
144

145
    If you need to share locks (rather than acquire them exclusively) at one
146
    level you can modify self.share_locks, setting a true value (usually 1) for
147
    that level. By default locks are not shared.
148

149
    Examples::
150

151
      # Acquire all nodes and one instance
152
      self.needed_locks = {
153
        locking.LEVEL_NODE: locking.ALL_SET,
154
        locking.LEVEL_INSTANCE: ['instance1.example.tld'],
155
      }
156
      # Acquire just two nodes
157
      self.needed_locks = {
158
        locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
159
      }
160
      # Acquire no locks
161
      self.needed_locks = {} # No, you can't leave it to the default value None
162

163
    """
164
    # The implementation of this method is mandatory only if the new LU is
165
    # concurrent, so that old LUs don't need to be changed all at the same
166
    # time.
167
    if self.REQ_BGL:
168
      self.needed_locks = {} # Exclusive LUs don't need locks.
169
    else:
170
      raise NotImplementedError
171

    
172
  def DeclareLocks(self, level):
173
    """Declare LU locking needs for a level
174

175
    While most LUs can just declare their locking needs at ExpandNames time,
176
    sometimes there's the need to calculate some locks after having acquired
177
    the ones before. This function is called just before acquiring locks at a
178
    particular level, but after acquiring the ones at lower levels, and permits
179
    such calculations. It can be used to modify self.needed_locks, and by
180
    default it does nothing.
181

182
    This function is only called if you have something already set in
183
    self.needed_locks for the level.
184

185
    @param level: Locking level which is going to be locked
186
    @type level: member of ganeti.locking.LEVELS
187

188
    """
189

    
190
  def CheckPrereq(self):
191
    """Check prerequisites for this LU.
192

193
    This method should check that the prerequisites for the execution
194
    of this LU are fulfilled. It can do internode communication, but
195
    it should be idempotent - no cluster or system changes are
196
    allowed.
197

198
    The method should raise errors.OpPrereqError in case something is
199
    not fulfilled. Its return value is ignored.
200

201
    This method should also update all the parameters of the opcode to
202
    their canonical form if it hasn't been done by ExpandNames before.
203

204
    """
205
    raise NotImplementedError
206

    
207
  def Exec(self, feedback_fn):
208
    """Execute the LU.
209

210
    This method should implement the actual work. It should raise
211
    errors.OpExecError for failures that are somewhat dealt with in
212
    code, or expected.
213

214
    """
215
    raise NotImplementedError
216

    
217
  def BuildHooksEnv(self):
218
    """Build hooks environment for this LU.
219

220
    This method should return a three-node tuple consisting of: a dict
221
    containing the environment that will be used for running the
222
    specific hook for this LU, a list of node names on which the hook
223
    should run before the execution, and a list of node names on which
224
    the hook should run after the execution.
225

226
    The keys of the dict must not have 'GANETI_' prefixed as this will
227
    be handled in the hooks runner. Also note additional keys will be
228
    added by the hooks runner. If the LU doesn't define any
229
    environment, an empty dict (and not None) should be returned.
230

231
    No nodes should be returned as an empty list (and not None).
232

233
    Note that if the HPATH for a LU class is None, this function will
234
    not be called.
235

236
    """
237
    raise NotImplementedError
238

    
239
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
240
    """Notify the LU about the results of its hooks.
241

242
    This method is called every time a hooks phase is executed, and notifies
243
    the Logical Unit about the hooks' result. The LU can then use it to alter
244
    its result based on the hooks.  By default the method does nothing and the
245
    previous result is passed back unchanged but any LU can define it if it
246
    wants to use the local cluster hook-scripts somehow.
247

248
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
249
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
250
    @param hook_results: the results of the multi-node hooks rpc call
251
    @param feedback_fn: function used send feedback back to the caller
252
    @param lu_result: the previous Exec result this LU had, or None
253
        in the PRE phase
254
    @return: the new Exec result, based on the previous result
255
        and hook results
256

257
    """
258
    return lu_result
259

    
260
  def _ExpandAndLockInstance(self):
261
    """Helper function to expand and lock an instance.
262

263
    Many LUs that work on an instance take its name in self.op.instance_name
264
    and need to expand it and then declare the expanded name for locking. This
265
    function does it, and then updates self.op.instance_name to the expanded
266
    name. It also initializes needed_locks as a dict, if this hasn't been done
267
    before.
268

269
    """
270
    if self.needed_locks is None:
271
      self.needed_locks = {}
272
    else:
273
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
274
        "_ExpandAndLockInstance called with instance-level locks set"
275
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
276
    if expanded_name is None:
277
      raise errors.OpPrereqError("Instance '%s' not known" %
278
                                  self.op.instance_name)
279
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
280
    self.op.instance_name = expanded_name
281

    
282
  def _LockInstancesNodes(self, primary_only=False):
283
    """Helper function to declare instances' nodes for locking.
284

285
    This function should be called after locking one or more instances to lock
286
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
287
    with all primary or secondary nodes for instances already locked and
288
    present in self.needed_locks[locking.LEVEL_INSTANCE].
289

290
    It should be called from DeclareLocks, and for safety only works if
291
    self.recalculate_locks[locking.LEVEL_NODE] is set.
292

293
    In the future it may grow parameters to just lock some instance's nodes, or
294
    to just lock primaries or secondary nodes, if needed.
295

296
    If should be called in DeclareLocks in a way similar to::
297

298
      if level == locking.LEVEL_NODE:
299
        self._LockInstancesNodes()
300

301
    @type primary_only: boolean
302
    @param primary_only: only lock primary nodes of locked instances
303

304
    """
305
    assert locking.LEVEL_NODE in self.recalculate_locks, \
306
      "_LockInstancesNodes helper function called with no nodes to recalculate"
307

    
308
    # TODO: check if we're really been called with the instance locks held
309

    
310
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
311
    # future we might want to have different behaviors depending on the value
312
    # of self.recalculate_locks[locking.LEVEL_NODE]
313
    wanted_nodes = []
314
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
315
      instance = self.context.cfg.GetInstanceInfo(instance_name)
316
      wanted_nodes.append(instance.primary_node)
317
      if not primary_only:
318
        wanted_nodes.extend(instance.secondary_nodes)
319

    
320
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
321
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
322
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
323
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
324

    
325
    del self.recalculate_locks[locking.LEVEL_NODE]
326

    
327

    
328
class NoHooksLU(LogicalUnit):
329
  """Simple LU which runs no hooks.
330

331
  This LU is intended as a parent for other LogicalUnits which will
332
  run no hooks, in order to reduce duplicate code.
333

334
  """
335
  HPATH = None
336
  HTYPE = None
337

    
338

    
339
def _GetWantedNodes(lu, nodes):
340
  """Returns list of checked and expanded node names.
341

342
  @type lu: L{LogicalUnit}
343
  @param lu: the logical unit on whose behalf we execute
344
  @type nodes: list
345
  @param nodes: list of node names or None for all nodes
346
  @rtype: list
347
  @return: the list of nodes, sorted
348
  @raise errors.OpProgrammerError: if the nodes parameter is wrong type
349

350
  """
351
  if not isinstance(nodes, list):
352
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
353

    
354
  if not nodes:
355
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
356
      " non-empty list of nodes whose name is to be expanded.")
357

    
358
  wanted = []
359
  for name in nodes:
360
    node = lu.cfg.ExpandNodeName(name)
361
    if node is None:
362
      raise errors.OpPrereqError("No such node name '%s'" % name)
363
    wanted.append(node)
364

    
365
  return utils.NiceSort(wanted)
366

    
367

    
368
def _GetWantedInstances(lu, instances):
369
  """Returns list of checked and expanded instance names.
370

371
  @type lu: L{LogicalUnit}
372
  @param lu: the logical unit on whose behalf we execute
373
  @type instances: list
374
  @param instances: list of instance names or None for all instances
375
  @rtype: list
376
  @return: the list of instances, sorted
377
  @raise errors.OpPrereqError: if the instances parameter is wrong type
378
  @raise errors.OpPrereqError: if any of the passed instances is not found
379

380
  """
381
  if not isinstance(instances, list):
382
    raise errors.OpPrereqError("Invalid argument type 'instances'")
383

    
384
  if instances:
385
    wanted = []
386

    
387
    for name in instances:
388
      instance = lu.cfg.ExpandInstanceName(name)
389
      if instance is None:
390
        raise errors.OpPrereqError("No such instance name '%s'" % name)
391
      wanted.append(instance)
392

    
393
  else:
394
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
395
  return wanted
396

    
397

    
398
def _CheckOutputFields(static, dynamic, selected):
399
  """Checks whether all selected fields are valid.
400

401
  @type static: L{utils.FieldSet}
402
  @param static: static fields set
403
  @type dynamic: L{utils.FieldSet}
404
  @param dynamic: dynamic fields set
405

406
  """
407
  f = utils.FieldSet()
408
  f.Extend(static)
409
  f.Extend(dynamic)
410

    
411
  delta = f.NonMatching(selected)
412
  if delta:
413
    raise errors.OpPrereqError("Unknown output fields selected: %s"
414
                               % ",".join(delta))
415

    
416

    
417
def _CheckBooleanOpField(op, name):
418
  """Validates boolean opcode parameters.
419

420
  This will ensure that an opcode parameter is either a boolean value,
421
  or None (but that it always exists).
422

423
  """
424
  val = getattr(op, name, None)
425
  if not (val is None or isinstance(val, bool)):
426
    raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
427
                               (name, str(val)))
428
  setattr(op, name, val)
429

    
430

    
431
def _CheckNodeOnline(lu, node):
432
  """Ensure that a given node is online.
433

434
  @param lu: the LU on behalf of which we make the check
435
  @param node: the node to check
436
  @raise errors.OpPrereqError: if the node is offline
437

438
  """
439
  if lu.cfg.GetNodeInfo(node).offline:
440
    raise errors.OpPrereqError("Can't use offline node %s" % node)
441

    
442

    
443
def _CheckNodeNotDrained(lu, node):
444
  """Ensure that a given node is not drained.
445

446
  @param lu: the LU on behalf of which we make the check
447
  @param node: the node to check
448
  @raise errors.OpPrereqError: if the node is drained
449

450
  """
451
  if lu.cfg.GetNodeInfo(node).drained:
452
    raise errors.OpPrereqError("Can't use drained node %s" % node)
453

    
454

    
455
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
456
                          memory, vcpus, nics, disk_template, disks,
457
                          bep, hvp, hypervisor):
458
  """Builds instance related env variables for hooks
459

460
  This builds the hook environment from individual variables.
461

462
  @type name: string
463
  @param name: the name of the instance
464
  @type primary_node: string
465
  @param primary_node: the name of the instance's primary node
466
  @type secondary_nodes: list
467
  @param secondary_nodes: list of secondary nodes as strings
468
  @type os_type: string
469
  @param os_type: the name of the instance's OS
470
  @type status: boolean
471
  @param status: the should_run status of the instance
472
  @type memory: string
473
  @param memory: the memory size of the instance
474
  @type vcpus: string
475
  @param vcpus: the count of VCPUs the instance has
476
  @type nics: list
477
  @param nics: list of tuples (ip, bridge, mac) representing
478
      the NICs the instance  has
479
  @type disk_template: string
480
  @param disk_template: the distk template of the instance
481
  @type disks: list
482
  @param disks: the list of (size, mode) pairs
483
  @type bep: dict
484
  @param bep: the backend parameters for the instance
485
  @type hvp: dict
486
  @param hvp: the hypervisor parameters for the instance
487
  @type hypervisor: string
488
  @param hypervisor: the hypervisor for the instance
489
  @rtype: dict
490
  @return: the hook environment for this instance
491

492
  """
493
  if status:
494
    str_status = "up"
495
  else:
496
    str_status = "down"
497
  env = {
498
    "OP_TARGET": name,
499
    "INSTANCE_NAME": name,
500
    "INSTANCE_PRIMARY": primary_node,
501
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
502
    "INSTANCE_OS_TYPE": os_type,
503
    "INSTANCE_STATUS": str_status,
504
    "INSTANCE_MEMORY": memory,
505
    "INSTANCE_VCPUS": vcpus,
506
    "INSTANCE_DISK_TEMPLATE": disk_template,
507
    "INSTANCE_HYPERVISOR": hypervisor,
508
  }
509

    
510
  if nics:
511
    nic_count = len(nics)
512
    for idx, (ip, bridge, mac) in enumerate(nics):
513
      if ip is None:
514
        ip = ""
515
      env["INSTANCE_NIC%d_IP" % idx] = ip
516
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
517
      env["INSTANCE_NIC%d_MAC" % idx] = mac
518
  else:
519
    nic_count = 0
520

    
521
  env["INSTANCE_NIC_COUNT"] = nic_count
522

    
523
  if disks:
524
    disk_count = len(disks)
525
    for idx, (size, mode) in enumerate(disks):
526
      env["INSTANCE_DISK%d_SIZE" % idx] = size
527
      env["INSTANCE_DISK%d_MODE" % idx] = mode
528
  else:
529
    disk_count = 0
530

    
531
  env["INSTANCE_DISK_COUNT"] = disk_count
532

    
533
  for source, kind in [(bep, "BE"), (hvp, "HV")]:
534
    for key, value in source.items():
535
      env["INSTANCE_%s_%s" % (kind, key)] = value
536

    
537
  return env
538

    
539

    
540
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
541
  """Builds instance related env variables for hooks from an object.
542

543
  @type lu: L{LogicalUnit}
544
  @param lu: the logical unit on whose behalf we execute
545
  @type instance: L{objects.Instance}
546
  @param instance: the instance for which we should build the
547
      environment
548
  @type override: dict
549
  @param override: dictionary with key/values that will override
550
      our values
551
  @rtype: dict
552
  @return: the hook environment dictionary
553

554
  """
555
  cluster = lu.cfg.GetClusterInfo()
556
  bep = cluster.FillBE(instance)
557
  hvp = cluster.FillHV(instance)
558
  args = {
559
    'name': instance.name,
560
    'primary_node': instance.primary_node,
561
    'secondary_nodes': instance.secondary_nodes,
562
    'os_type': instance.os,
563
    'status': instance.admin_up,
564
    'memory': bep[constants.BE_MEMORY],
565
    'vcpus': bep[constants.BE_VCPUS],
566
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
567
    'disk_template': instance.disk_template,
568
    'disks': [(disk.size, disk.mode) for disk in instance.disks],
569
    'bep': bep,
570
    'hvp': hvp,
571
    'hypervisor': instance.hypervisor,
572
  }
573
  if override:
574
    args.update(override)
575
  return _BuildInstanceHookEnv(**args)
576

    
577

    
578
def _AdjustCandidatePool(lu):
579
  """Adjust the candidate pool after node operations.
580

581
  """
582
  mod_list = lu.cfg.MaintainCandidatePool()
583
  if mod_list:
584
    lu.LogInfo("Promoted nodes to master candidate role: %s",
585
               ", ".join(node.name for node in mod_list))
586
    for name in mod_list:
587
      lu.context.ReaddNode(name)
588
  mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
589
  if mc_now > mc_max:
590
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
591
               (mc_now, mc_max))
592

    
593

    
594
def _CheckInstanceBridgesExist(lu, instance):
595
  """Check that the brigdes needed by an instance exist.
596

597
  """
598
  # check bridges existance
599
  brlist = [nic.bridge for nic in instance.nics]
600
  result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
601
  result.Raise()
602
  if not result.data:
603
    raise errors.OpPrereqError("One or more target bridges %s does not"
604
                               " exist on destination node '%s'" %
605
                               (brlist, instance.primary_node))
606

    
607

    
608
class LUDestroyCluster(NoHooksLU):
609
  """Logical unit for destroying the cluster.
610

611
  """
612
  _OP_REQP = []
613

    
614
  def CheckPrereq(self):
615
    """Check prerequisites.
616

617
    This checks whether the cluster is empty.
618

619
    Any errors are signalled by raising errors.OpPrereqError.
620

621
    """
622
    master = self.cfg.GetMasterNode()
623

    
624
    nodelist = self.cfg.GetNodeList()
625
    if len(nodelist) != 1 or nodelist[0] != master:
626
      raise errors.OpPrereqError("There are still %d node(s) in"
627
                                 " this cluster." % (len(nodelist) - 1))
628
    instancelist = self.cfg.GetInstanceList()
629
    if instancelist:
630
      raise errors.OpPrereqError("There are still %d instance(s) in"
631
                                 " this cluster." % len(instancelist))
632

    
633
  def Exec(self, feedback_fn):
634
    """Destroys the cluster.
635

636
    """
637
    master = self.cfg.GetMasterNode()
638
    result = self.rpc.call_node_stop_master(master, False)
639
    result.Raise()
640
    if not result.data:
641
      raise errors.OpExecError("Could not disable the master role")
642
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
643
    utils.CreateBackup(priv_key)
644
    utils.CreateBackup(pub_key)
645
    return master
646

    
647

    
648
class LUVerifyCluster(LogicalUnit):
649
  """Verifies the cluster status.
650

651
  """
652
  HPATH = "cluster-verify"
653
  HTYPE = constants.HTYPE_CLUSTER
654
  _OP_REQP = ["skip_checks"]
655
  REQ_BGL = False
656

    
657
  def ExpandNames(self):
658
    self.needed_locks = {
659
      locking.LEVEL_NODE: locking.ALL_SET,
660
      locking.LEVEL_INSTANCE: locking.ALL_SET,
661
    }
662
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
663

    
664
  def _VerifyNode(self, nodeinfo, file_list, local_cksum,
665
                  node_result, feedback_fn, master_files,
666
                  drbd_map, vg_name):
667
    """Run multiple tests against a node.
668

669
    Test list:
670

671
      - compares ganeti version
672
      - checks vg existance and size > 20G
673
      - checks config file checksum
674
      - checks ssh to other nodes
675

676
    @type nodeinfo: L{objects.Node}
677
    @param nodeinfo: the node to check
678
    @param file_list: required list of files
679
    @param local_cksum: dictionary of local files and their checksums
680
    @param node_result: the results from the node
681
    @param feedback_fn: function used to accumulate results
682
    @param master_files: list of files that only masters should have
683
    @param drbd_map: the useddrbd minors for this node, in
684
        form of minor: (instance, must_exist) which correspond to instances
685
        and their running status
686
    @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
687

688
    """
689
    node = nodeinfo.name
690

    
691
    # main result, node_result should be a non-empty dict
692
    if not node_result or not isinstance(node_result, dict):
693
      feedback_fn("  - ERROR: unable to verify node %s." % (node,))
694
      return True
695

    
696
    # compares ganeti version
697
    local_version = constants.PROTOCOL_VERSION
698
    remote_version = node_result.get('version', None)
699
    if not (remote_version and isinstance(remote_version, (list, tuple)) and
700
            len(remote_version) == 2):
701
      feedback_fn("  - ERROR: connection to %s failed" % (node))
702
      return True
703

    
704
    if local_version != remote_version[0]:
705
      feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
706
                  " node %s %s" % (local_version, node, remote_version[0]))
707
      return True
708

    
709
    # node seems compatible, we can actually try to look into its results
710

    
711
    bad = False
712

    
713
    # full package version
714
    if constants.RELEASE_VERSION != remote_version[1]:
715
      feedback_fn("  - WARNING: software version mismatch: master %s,"
716
                  " node %s %s" %
717
                  (constants.RELEASE_VERSION, node, remote_version[1]))
718

    
719
    # checks vg existence and size > 20G
720
    if vg_name is not None:
721
      vglist = node_result.get(constants.NV_VGLIST, None)
722
      if not vglist:
723
        feedback_fn("  - ERROR: unable to check volume groups on node %s." %
724
                        (node,))
725
        bad = True
726
      else:
727
        vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
728
                                              constants.MIN_VG_SIZE)
729
        if vgstatus:
730
          feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
731
          bad = True
732

    
733
    # checks config file checksum
734

    
735
    remote_cksum = node_result.get(constants.NV_FILELIST, None)
736
    if not isinstance(remote_cksum, dict):
737
      bad = True
738
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
739
    else:
740
      for file_name in file_list:
741
        node_is_mc = nodeinfo.master_candidate
742
        must_have_file = file_name not in master_files
743
        if file_name not in remote_cksum:
744
          if node_is_mc or must_have_file:
745
            bad = True
746
            feedback_fn("  - ERROR: file '%s' missing" % file_name)
747
        elif remote_cksum[file_name] != local_cksum[file_name]:
748
          if node_is_mc or must_have_file:
749
            bad = True
750
            feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
751
          else:
752
            # not candidate and this is not a must-have file
753
            bad = True
754
            feedback_fn("  - ERROR: file '%s' should not exist on non master"
755
                        " candidates (and the file is outdated)" % file_name)
756
        else:
757
          # all good, except non-master/non-must have combination
758
          if not node_is_mc and not must_have_file:
759
            feedback_fn("  - ERROR: file '%s' should not exist on non master"
760
                        " candidates" % file_name)
761

    
762
    # checks ssh to any
763

    
764
    if constants.NV_NODELIST not in node_result:
765
      bad = True
766
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
767
    else:
768
      if node_result[constants.NV_NODELIST]:
769
        bad = True
770
        for node in node_result[constants.NV_NODELIST]:
771
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
772
                          (node, node_result[constants.NV_NODELIST][node]))
773

    
774
    if constants.NV_NODENETTEST not in node_result:
775
      bad = True
776
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
777
    else:
778
      if node_result[constants.NV_NODENETTEST]:
779
        bad = True
780
        nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
781
        for node in nlist:
782
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
783
                          (node, node_result[constants.NV_NODENETTEST][node]))
784

    
785
    hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
786
    if isinstance(hyp_result, dict):
787
      for hv_name, hv_result in hyp_result.iteritems():
788
        if hv_result is not None:
789
          feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
790
                      (hv_name, hv_result))
791

    
792
    # check used drbd list
793
    if vg_name is not None:
794
      used_minors = node_result.get(constants.NV_DRBDLIST, [])
795
      if not isinstance(used_minors, (tuple, list)):
796
        feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
797
                    str(used_minors))
798
      else:
799
        for minor, (iname, must_exist) in drbd_map.items():
800
          if minor not in used_minors and must_exist:
801
            feedback_fn("  - ERROR: drbd minor %d of instance %s is"
802
                        " not active" % (minor, iname))
803
            bad = True
804
        for minor in used_minors:
805
          if minor not in drbd_map:
806
            feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
807
                        minor)
808
            bad = True
809

    
810
    return bad
811

    
812
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
813
                      node_instance, feedback_fn, n_offline):
814
    """Verify an instance.
815

816
    This function checks to see if the required block devices are
817
    available on the instance's node.
818

819
    """
820
    bad = False
821

    
822
    node_current = instanceconfig.primary_node
823

    
824
    node_vol_should = {}
825
    instanceconfig.MapLVsByNode(node_vol_should)
826

    
827
    for node in node_vol_should:
828
      if node in n_offline:
829
        # ignore missing volumes on offline nodes
830
        continue
831
      for volume in node_vol_should[node]:
832
        if node not in node_vol_is or volume not in node_vol_is[node]:
833
          feedback_fn("  - ERROR: volume %s missing on node %s" %
834
                          (volume, node))
835
          bad = True
836

    
837
    if instanceconfig.admin_up:
838
      if ((node_current not in node_instance or
839
          not instance in node_instance[node_current]) and
840
          node_current not in n_offline):
841
        feedback_fn("  - ERROR: instance %s not running on node %s" %
842
                        (instance, node_current))
843
        bad = True
844

    
845
    for node in node_instance:
846
      if (not node == node_current):
847
        if instance in node_instance[node]:
848
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
849
                          (instance, node))
850
          bad = True
851

    
852
    return bad
853

    
854
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
855
    """Verify if there are any unknown volumes in the cluster.
856

857
    The .os, .swap and backup volumes are ignored. All other volumes are
858
    reported as unknown.
859

860
    """
861
    bad = False
862

    
863
    for node in node_vol_is:
864
      for volume in node_vol_is[node]:
865
        if node not in node_vol_should or volume not in node_vol_should[node]:
866
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
867
                      (volume, node))
868
          bad = True
869
    return bad
870

    
871
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
872
    """Verify the list of running instances.
873

874
    This checks what instances are running but unknown to the cluster.
875

876
    """
877
    bad = False
878
    for node in node_instance:
879
      for runninginstance in node_instance[node]:
880
        if runninginstance not in instancelist:
881
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
882
                          (runninginstance, node))
883
          bad = True
884
    return bad
885

    
886
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
887
    """Verify N+1 Memory Resilience.
888

889
    Check that if one single node dies we can still start all the instances it
890
    was primary for.
891

892
    """
893
    bad = False
894

    
895
    for node, nodeinfo in node_info.iteritems():
896
      # This code checks that every node which is now listed as secondary has
897
      # enough memory to host all instances it is supposed to should a single
898
      # other node in the cluster fail.
899
      # FIXME: not ready for failover to an arbitrary node
900
      # FIXME: does not support file-backed instances
901
      # WARNING: we currently take into account down instances as well as up
902
      # ones, considering that even if they're down someone might want to start
903
      # them even in the event of a node failure.
904
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
905
        needed_mem = 0
906
        for instance in instances:
907
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
908
          if bep[constants.BE_AUTO_BALANCE]:
909
            needed_mem += bep[constants.BE_MEMORY]
910
        if nodeinfo['mfree'] < needed_mem:
911
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
912
                      " failovers should node %s fail" % (node, prinode))
913
          bad = True
914
    return bad
915

    
916
  def CheckPrereq(self):
917
    """Check prerequisites.
918

919
    Transform the list of checks we're going to skip into a set and check that
920
    all its members are valid.
921

922
    """
923
    self.skip_set = frozenset(self.op.skip_checks)
924
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
925
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
926

    
927
  def BuildHooksEnv(self):
928
    """Build hooks env.
929

930
    Cluster-Verify hooks just rone in the post phase and their failure makes
931
    the output be logged in the verify output and the verification to fail.
932

933
    """
934
    all_nodes = self.cfg.GetNodeList()
935
    env = {
936
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
937
      }
938
    for node in self.cfg.GetAllNodesInfo().values():
939
      env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
940

    
941
    return env, [], all_nodes
942

    
943
  def Exec(self, feedback_fn):
944
    """Verify integrity of cluster, performing various test on nodes.
945

946
    """
947
    bad = False
948
    feedback_fn("* Verifying global settings")
949
    for msg in self.cfg.VerifyConfig():
950
      feedback_fn("  - ERROR: %s" % msg)
951

    
952
    vg_name = self.cfg.GetVGName()
953
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
954
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
955
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
956
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
957
    instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
958
                        for iname in instancelist)
959
    i_non_redundant = [] # Non redundant instances
960
    i_non_a_balanced = [] # Non auto-balanced instances
961
    n_offline = [] # List of offline nodes
962
    n_drained = [] # List of nodes being drained
963
    node_volume = {}
964
    node_instance = {}
965
    node_info = {}
966
    instance_cfg = {}
967

    
968
    # FIXME: verify OS list
969
    # do local checksums
970
    master_files = [constants.CLUSTER_CONF_FILE]
971

    
972
    file_names = ssconf.SimpleStore().GetFileList()
973
    file_names.append(constants.SSL_CERT_FILE)
974
    file_names.append(constants.RAPI_CERT_FILE)
975
    file_names.extend(master_files)
976

    
977
    local_checksums = utils.FingerprintFiles(file_names)
978

    
979
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
980
    node_verify_param = {
981
      constants.NV_FILELIST: file_names,
982
      constants.NV_NODELIST: [node.name for node in nodeinfo
983
                              if not node.offline],
984
      constants.NV_HYPERVISOR: hypervisors,
985
      constants.NV_NODENETTEST: [(node.name, node.primary_ip,
986
                                  node.secondary_ip) for node in nodeinfo
987
                                 if not node.offline],
988
      constants.NV_INSTANCELIST: hypervisors,
989
      constants.NV_VERSION: None,
990
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
991
      }
992
    if vg_name is not None:
993
      node_verify_param[constants.NV_VGLIST] = None
994
      node_verify_param[constants.NV_LVLIST] = vg_name
995
      node_verify_param[constants.NV_DRBDLIST] = None
996
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
997
                                           self.cfg.GetClusterName())
998

    
999
    cluster = self.cfg.GetClusterInfo()
1000
    master_node = self.cfg.GetMasterNode()
1001
    all_drbd_map = self.cfg.ComputeDRBDMap()
1002

    
1003
    for node_i in nodeinfo:
1004
      node = node_i.name
1005
      nresult = all_nvinfo[node].data
1006

    
1007
      if node_i.offline:
1008
        feedback_fn("* Skipping offline node %s" % (node,))
1009
        n_offline.append(node)
1010
        continue
1011

    
1012
      if node == master_node:
1013
        ntype = "master"
1014
      elif node_i.master_candidate:
1015
        ntype = "master candidate"
1016
      elif node_i.drained:
1017
        ntype = "drained"
1018
        n_drained.append(node)
1019
      else:
1020
        ntype = "regular"
1021
      feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1022

    
1023
      if all_nvinfo[node].failed or not isinstance(nresult, dict):
1024
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
1025
        bad = True
1026
        continue
1027

    
1028
      node_drbd = {}
1029
      for minor, instance in all_drbd_map[node].items():
1030
        if instance not in instanceinfo:
1031
          feedback_fn("  - ERROR: ghost instance '%s' in temporary DRBD map" %
1032
                      instance)
1033
          # ghost instance should not be running, but otherwise we
1034
          # don't give double warnings (both ghost instance and
1035
          # unallocated minor in use)
1036
          node_drbd[minor] = (instance, False)
1037
        else:
1038
          instance = instanceinfo[instance]
1039
          node_drbd[minor] = (instance.name, instance.admin_up)
1040
      result = self._VerifyNode(node_i, file_names, local_checksums,
1041
                                nresult, feedback_fn, master_files,
1042
                                node_drbd, vg_name)
1043
      bad = bad or result
1044

    
1045
      lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1046
      if vg_name is None:
1047
        node_volume[node] = {}
1048
      elif isinstance(lvdata, basestring):
1049
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1050
                    (node, utils.SafeEncode(lvdata)))
1051
        bad = True
1052
        node_volume[node] = {}
1053
      elif not isinstance(lvdata, dict):
1054
        feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1055
        bad = True
1056
        continue
1057
      else:
1058
        node_volume[node] = lvdata
1059

    
1060
      # node_instance
1061
      idata = nresult.get(constants.NV_INSTANCELIST, None)
1062
      if not isinstance(idata, list):
1063
        feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1064
                    (node,))
1065
        bad = True
1066
        continue
1067

    
1068
      node_instance[node] = idata
1069

    
1070
      # node_info
1071
      nodeinfo = nresult.get(constants.NV_HVINFO, None)
1072
      if not isinstance(nodeinfo, dict):
1073
        feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1074
        bad = True
1075
        continue
1076

    
1077
      try:
1078
        node_info[node] = {
1079
          "mfree": int(nodeinfo['memory_free']),
1080
          "pinst": [],
1081
          "sinst": [],
1082
          # dictionary holding all instances this node is secondary for,
1083
          # grouped by their primary node. Each key is a cluster node, and each
1084
          # value is a list of instances which have the key as primary and the
1085
          # current node as secondary.  this is handy to calculate N+1 memory
1086
          # availability if you can only failover from a primary to its
1087
          # secondary.
1088
          "sinst-by-pnode": {},
1089
        }
1090
        # FIXME: devise a free space model for file based instances as well
1091
        if vg_name is not None:
1092
          if (constants.NV_VGLIST not in nresult or
1093
              vg_name not in nresult[constants.NV_VGLIST]):
1094
            feedback_fn("  - ERROR: node %s didn't return data for the"
1095
                        " volume group '%s' - it is either missing or broken" %
1096
                        (node, vg_name))
1097
            bad = True
1098
            continue
1099
          node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1100
      except (ValueError, KeyError):
1101
        feedback_fn("  - ERROR: invalid nodeinfo value returned"
1102
                    " from node %s" % (node,))
1103
        bad = True
1104
        continue
1105

    
1106
    node_vol_should = {}
1107

    
1108
    for instance in instancelist:
1109
      feedback_fn("* Verifying instance %s" % instance)
1110
      inst_config = instanceinfo[instance]
1111
      result =  self._VerifyInstance(instance, inst_config, node_volume,
1112
                                     node_instance, feedback_fn, n_offline)
1113
      bad = bad or result
1114
      inst_nodes_offline = []
1115

    
1116
      inst_config.MapLVsByNode(node_vol_should)
1117

    
1118
      instance_cfg[instance] = inst_config
1119

    
1120
      pnode = inst_config.primary_node
1121
      if pnode in node_info:
1122
        node_info[pnode]['pinst'].append(instance)
1123
      elif pnode not in n_offline:
1124
        feedback_fn("  - ERROR: instance %s, connection to primary node"
1125
                    " %s failed" % (instance, pnode))
1126
        bad = True
1127

    
1128
      if pnode in n_offline:
1129
        inst_nodes_offline.append(pnode)
1130

    
1131
      # If the instance is non-redundant we cannot survive losing its primary
1132
      # node, so we are not N+1 compliant. On the other hand we have no disk
1133
      # templates with more than one secondary so that situation is not well
1134
      # supported either.
1135
      # FIXME: does not support file-backed instances
1136
      if len(inst_config.secondary_nodes) == 0:
1137
        i_non_redundant.append(instance)
1138
      elif len(inst_config.secondary_nodes) > 1:
1139
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
1140
                    % instance)
1141

    
1142
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1143
        i_non_a_balanced.append(instance)
1144

    
1145
      for snode in inst_config.secondary_nodes:
1146
        if snode in node_info:
1147
          node_info[snode]['sinst'].append(instance)
1148
          if pnode not in node_info[snode]['sinst-by-pnode']:
1149
            node_info[snode]['sinst-by-pnode'][pnode] = []
1150
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1151
        elif snode not in n_offline:
1152
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
1153
                      " %s failed" % (instance, snode))
1154
          bad = True
1155
        if snode in n_offline:
1156
          inst_nodes_offline.append(snode)
1157

    
1158
      if inst_nodes_offline:
1159
        # warn that the instance lives on offline nodes, and set bad=True
1160
        feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1161
                    ", ".join(inst_nodes_offline))
1162
        bad = True
1163

    
1164
    feedback_fn("* Verifying orphan volumes")
1165
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1166
                                       feedback_fn)
1167
    bad = bad or result
1168

    
1169
    feedback_fn("* Verifying remaining instances")
1170
    result = self._VerifyOrphanInstances(instancelist, node_instance,
1171
                                         feedback_fn)
1172
    bad = bad or result
1173

    
1174
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1175
      feedback_fn("* Verifying N+1 Memory redundancy")
1176
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1177
      bad = bad or result
1178

    
1179
    feedback_fn("* Other Notes")
1180
    if i_non_redundant:
1181
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1182
                  % len(i_non_redundant))
1183

    
1184
    if i_non_a_balanced:
1185
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1186
                  % len(i_non_a_balanced))
1187

    
1188
    if n_offline:
1189
      feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1190

    
1191
    if n_drained:
1192
      feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1193

    
1194
    return not bad
1195

    
1196
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1197
    """Analize the post-hooks' result
1198

1199
    This method analyses the hook result, handles it, and sends some
1200
    nicely-formatted feedback back to the user.
1201

1202
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
1203
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1204
    @param hooks_results: the results of the multi-node hooks rpc call
1205
    @param feedback_fn: function used send feedback back to the caller
1206
    @param lu_result: previous Exec result
1207
    @return: the new Exec result, based on the previous result
1208
        and hook results
1209

1210
    """
1211
    # We only really run POST phase hooks, and are only interested in
1212
    # their results
1213
    if phase == constants.HOOKS_PHASE_POST:
1214
      # Used to change hooks' output to proper indentation
1215
      indent_re = re.compile('^', re.M)
1216
      feedback_fn("* Hooks Results")
1217
      if not hooks_results:
1218
        feedback_fn("  - ERROR: general communication failure")
1219
        lu_result = 1
1220
      else:
1221
        for node_name in hooks_results:
1222
          show_node_header = True
1223
          res = hooks_results[node_name]
1224
          if res.failed or res.data is False or not isinstance(res.data, list):
1225
            if res.offline:
1226
              # no need to warn or set fail return value
1227
              continue
1228
            feedback_fn("    Communication failure in hooks execution")
1229
            lu_result = 1
1230
            continue
1231
          for script, hkr, output in res.data:
1232
            if hkr == constants.HKR_FAIL:
1233
              # The node header is only shown once, if there are
1234
              # failing hooks on that node
1235
              if show_node_header:
1236
                feedback_fn("  Node %s:" % node_name)
1237
                show_node_header = False
1238
              feedback_fn("    ERROR: Script %s failed, output:" % script)
1239
              output = indent_re.sub('      ', output)
1240
              feedback_fn("%s" % output)
1241
              lu_result = 1
1242

    
1243
      return lu_result
1244

    
1245

    
1246
class LUVerifyDisks(NoHooksLU):
1247
  """Verifies the cluster disks status.
1248

1249
  """
1250
  _OP_REQP = []
1251
  REQ_BGL = False
1252

    
1253
  def ExpandNames(self):
1254
    self.needed_locks = {
1255
      locking.LEVEL_NODE: locking.ALL_SET,
1256
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1257
    }
1258
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1259

    
1260
  def CheckPrereq(self):
1261
    """Check prerequisites.
1262

1263
    This has no prerequisites.
1264

1265
    """
1266
    pass
1267

    
1268
  def Exec(self, feedback_fn):
1269
    """Verify integrity of cluster disks.
1270

1271
    """
1272
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1273

    
1274
    vg_name = self.cfg.GetVGName()
1275
    nodes = utils.NiceSort(self.cfg.GetNodeList())
1276
    instances = [self.cfg.GetInstanceInfo(name)
1277
                 for name in self.cfg.GetInstanceList()]
1278

    
1279
    nv_dict = {}
1280
    for inst in instances:
1281
      inst_lvs = {}
1282
      if (not inst.admin_up or
1283
          inst.disk_template not in constants.DTS_NET_MIRROR):
1284
        continue
1285
      inst.MapLVsByNode(inst_lvs)
1286
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1287
      for node, vol_list in inst_lvs.iteritems():
1288
        for vol in vol_list:
1289
          nv_dict[(node, vol)] = inst
1290

    
1291
    if not nv_dict:
1292
      return result
1293

    
1294
    node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1295

    
1296
    to_act = set()
1297
    for node in nodes:
1298
      # node_volume
1299
      lvs = node_lvs[node]
1300
      if lvs.failed:
1301
        if not lvs.offline:
1302
          self.LogWarning("Connection to node %s failed: %s" %
1303
                          (node, lvs.data))
1304
        continue
1305
      lvs = lvs.data
1306
      if isinstance(lvs, basestring):
1307
        logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1308
        res_nlvm[node] = lvs
1309
        continue
1310
      elif not isinstance(lvs, dict):
1311
        logging.warning("Connection to node %s failed or invalid data"
1312
                        " returned", node)
1313
        res_nodes.append(node)
1314
        continue
1315

    
1316
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1317
        inst = nv_dict.pop((node, lv_name), None)
1318
        if (not lv_online and inst is not None
1319
            and inst.name not in res_instances):
1320
          res_instances.append(inst.name)
1321

    
1322
    # any leftover items in nv_dict are missing LVs, let's arrange the
1323
    # data better
1324
    for key, inst in nv_dict.iteritems():
1325
      if inst.name not in res_missing:
1326
        res_missing[inst.name] = []
1327
      res_missing[inst.name].append(key)
1328

    
1329
    return result
1330

    
1331

    
1332
class LURepairDiskSizes(NoHooksLU):
1333
  """Verifies the cluster disks sizes.
1334

1335
  """
1336
  _OP_REQP = ["instances"]
1337
  REQ_BGL = False
1338

    
1339
  def ExpandNames(self):
1340

    
1341
    if not isinstance(self.op.instances, list):
1342
      raise errors.OpPrereqError("Invalid argument type 'instances'")
1343

    
1344
    if self.op.instances:
1345
      self.wanted_names = []
1346
      for name in self.op.instances:
1347
        full_name = self.cfg.ExpandInstanceName(name)
1348
        if full_name is None:
1349
          raise errors.OpPrereqError("Instance '%s' not known" % name)
1350
        self.wanted_names.append(full_name)
1351
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
1352
      self.needed_locks = {
1353
        locking.LEVEL_NODE: [],
1354
        locking.LEVEL_INSTANCE: self.wanted_names,
1355
        }
1356
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1357
    else:
1358
      self.wanted_names = None
1359
      self.needed_locks = {
1360
        locking.LEVEL_NODE: locking.ALL_SET,
1361
        locking.LEVEL_INSTANCE: locking.ALL_SET,
1362
        }
1363
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1364

    
1365
  def DeclareLocks(self, level):
1366
    if level == locking.LEVEL_NODE and self.wanted_names is not None:
1367
      self._LockInstancesNodes(primary_only=True)
1368

    
1369
  def CheckPrereq(self):
1370
    """Check prerequisites.
1371

1372
    This only checks the optional instance list against the existing names.
1373

1374
    """
1375
    if self.wanted_names is None:
1376
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
1377

    
1378
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
1379
                             in self.wanted_names]
1380

    
1381
  def Exec(self, feedback_fn):
1382
    """Verify the size of cluster disks.
1383

1384
    """
1385
    # TODO: check child disks too
1386
    # TODO: check differences in size between primary/secondary nodes
1387
    per_node_disks = {}
1388
    for instance in self.wanted_instances:
1389
      pnode = instance.primary_node
1390
      if pnode not in per_node_disks:
1391
        per_node_disks[pnode] = []
1392
      for idx, disk in enumerate(instance.disks):
1393
        per_node_disks[pnode].append((instance, idx, disk))
1394

    
1395
    changed = []
1396
    for node, dskl in per_node_disks.items():
1397
      result = self.rpc.call_blockdev_getsizes(node, [v[2] for v in dskl])
1398
      if result.failed:
1399
        self.LogWarning("Failure in blockdev_getsizes call to node"
1400
                        " %s, ignoring", node)
1401
        continue
1402
      if len(result.data) != len(dskl):
1403
        self.LogWarning("Invalid result from node %s, ignoring node results",
1404
                        node)
1405
        continue
1406
      for ((instance, idx, disk), size) in zip(dskl, result.data):
1407
        if size is None:
1408
          self.LogWarning("Disk %d of instance %s did not return size"
1409
                          " information, ignoring", idx, instance.name)
1410
          continue
1411
        if not isinstance(size, (int, long)):
1412
          self.LogWarning("Disk %d of instance %s did not return valid"
1413
                          " size information, ignoring", idx, instance.name)
1414
          continue
1415
        size = size >> 20
1416
        if size != disk.size:
1417
          self.LogInfo("Disk %d of instance %s has mismatched size,"
1418
                       " correcting: recorded %d, actual %d", idx,
1419
                       instance.name, disk.size, size)
1420
          disk.size = size
1421
          self.cfg.Update(instance)
1422
          changed.append((instance.name, idx, size))
1423
    return changed
1424

    
1425

    
1426
class LURenameCluster(LogicalUnit):
1427
  """Rename the cluster.
1428

1429
  """
1430
  HPATH = "cluster-rename"
1431
  HTYPE = constants.HTYPE_CLUSTER
1432
  _OP_REQP = ["name"]
1433

    
1434
  def BuildHooksEnv(self):
1435
    """Build hooks env.
1436

1437
    """
1438
    env = {
1439
      "OP_TARGET": self.cfg.GetClusterName(),
1440
      "NEW_NAME": self.op.name,
1441
      }
1442
    mn = self.cfg.GetMasterNode()
1443
    return env, [mn], [mn]
1444

    
1445
  def CheckPrereq(self):
1446
    """Verify that the passed name is a valid one.
1447

1448
    """
1449
    hostname = utils.HostInfo(self.op.name)
1450

    
1451
    new_name = hostname.name
1452
    self.ip = new_ip = hostname.ip
1453
    old_name = self.cfg.GetClusterName()
1454
    old_ip = self.cfg.GetMasterIP()
1455
    if new_name == old_name and new_ip == old_ip:
1456
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1457
                                 " cluster has changed")
1458
    if new_ip != old_ip:
1459
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1460
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1461
                                   " reachable on the network. Aborting." %
1462
                                   new_ip)
1463

    
1464
    self.op.name = new_name
1465

    
1466
  def Exec(self, feedback_fn):
1467
    """Rename the cluster.
1468

1469
    """
1470
    clustername = self.op.name
1471
    ip = self.ip
1472

    
1473
    # shutdown the master IP
1474
    master = self.cfg.GetMasterNode()
1475
    result = self.rpc.call_node_stop_master(master, False)
1476
    if result.failed or not result.data:
1477
      raise errors.OpExecError("Could not disable the master role")
1478

    
1479
    try:
1480
      cluster = self.cfg.GetClusterInfo()
1481
      cluster.cluster_name = clustername
1482
      cluster.master_ip = ip
1483
      self.cfg.Update(cluster)
1484

    
1485
      # update the known hosts file
1486
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1487
      node_list = self.cfg.GetNodeList()
1488
      try:
1489
        node_list.remove(master)
1490
      except ValueError:
1491
        pass
1492
      result = self.rpc.call_upload_file(node_list,
1493
                                         constants.SSH_KNOWN_HOSTS_FILE)
1494
      for to_node, to_result in result.iteritems():
1495
        if to_result.failed or not to_result.data:
1496
          logging.error("Copy of file %s to node %s failed",
1497
                        constants.SSH_KNOWN_HOSTS_FILE, to_node)
1498

    
1499
    finally:
1500
      result = self.rpc.call_node_start_master(master, False, False)
1501
      if result.failed or not result.data:
1502
        self.LogWarning("Could not re-enable the master role on"
1503
                        " the master, please restart manually.")
1504

    
1505

    
1506
def _RecursiveCheckIfLVMBased(disk):
1507
  """Check if the given disk or its children are lvm-based.
1508

1509
  @type disk: L{objects.Disk}
1510
  @param disk: the disk to check
1511
  @rtype: booleean
1512
  @return: boolean indicating whether a LD_LV dev_type was found or not
1513

1514
  """
1515
  if disk.children:
1516
    for chdisk in disk.children:
1517
      if _RecursiveCheckIfLVMBased(chdisk):
1518
        return True
1519
  return disk.dev_type == constants.LD_LV
1520

    
1521

    
1522
class LUSetClusterParams(LogicalUnit):
1523
  """Change the parameters of the cluster.
1524

1525
  """
1526
  HPATH = "cluster-modify"
1527
  HTYPE = constants.HTYPE_CLUSTER
1528
  _OP_REQP = []
1529
  REQ_BGL = False
1530

    
1531
  def CheckArguments(self):
1532
    """Check parameters
1533

1534
    """
1535
    if not hasattr(self.op, "candidate_pool_size"):
1536
      self.op.candidate_pool_size = None
1537
    if self.op.candidate_pool_size is not None:
1538
      try:
1539
        self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1540
      except (ValueError, TypeError), err:
1541
        raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1542
                                   str(err))
1543
      if self.op.candidate_pool_size < 1:
1544
        raise errors.OpPrereqError("At least one master candidate needed")
1545

    
1546
  def ExpandNames(self):
1547
    # FIXME: in the future maybe other cluster params won't require checking on
1548
    # all nodes to be modified.
1549
    self.needed_locks = {
1550
      locking.LEVEL_NODE: locking.ALL_SET,
1551
    }
1552
    self.share_locks[locking.LEVEL_NODE] = 1
1553

    
1554
  def BuildHooksEnv(self):
1555
    """Build hooks env.
1556

1557
    """
1558
    env = {
1559
      "OP_TARGET": self.cfg.GetClusterName(),
1560
      "NEW_VG_NAME": self.op.vg_name,
1561
      }
1562
    mn = self.cfg.GetMasterNode()
1563
    return env, [mn], [mn]
1564

    
1565
  def CheckPrereq(self):
1566
    """Check prerequisites.
1567

1568
    This checks whether the given params don't conflict and
1569
    if the given volume group is valid.
1570

1571
    """
1572
    if self.op.vg_name is not None and not self.op.vg_name:
1573
      instances = self.cfg.GetAllInstancesInfo().values()
1574
      for inst in instances:
1575
        for disk in inst.disks:
1576
          if _RecursiveCheckIfLVMBased(disk):
1577
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1578
                                       " lvm-based instances exist")
1579

    
1580
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1581

    
1582
    # if vg_name not None, checks given volume group on all nodes
1583
    if self.op.vg_name:
1584
      vglist = self.rpc.call_vg_list(node_list)
1585
      for node in node_list:
1586
        if vglist[node].failed:
1587
          # ignoring down node
1588
          self.LogWarning("Node %s unreachable/error, ignoring" % node)
1589
          continue
1590
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1591
                                              self.op.vg_name,
1592
                                              constants.MIN_VG_SIZE)
1593
        if vgstatus:
1594
          raise errors.OpPrereqError("Error on node '%s': %s" %
1595
                                     (node, vgstatus))
1596

    
1597
    self.cluster = cluster = self.cfg.GetClusterInfo()
1598
    # validate beparams changes
1599
    if self.op.beparams:
1600
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1601
      self.new_beparams = cluster.FillDict(
1602
        cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1603

    
1604
    # hypervisor list/parameters
1605
    self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1606
    if self.op.hvparams:
1607
      if not isinstance(self.op.hvparams, dict):
1608
        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1609
      for hv_name, hv_dict in self.op.hvparams.items():
1610
        if hv_name not in self.new_hvparams:
1611
          self.new_hvparams[hv_name] = hv_dict
1612
        else:
1613
          self.new_hvparams[hv_name].update(hv_dict)
1614

    
1615
    if self.op.enabled_hypervisors is not None:
1616
      self.hv_list = self.op.enabled_hypervisors
1617
    else:
1618
      self.hv_list = cluster.enabled_hypervisors
1619

    
1620
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1621
      # either the enabled list has changed, or the parameters have, validate
1622
      for hv_name, hv_params in self.new_hvparams.items():
1623
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1624
            (self.op.enabled_hypervisors and
1625
             hv_name in self.op.enabled_hypervisors)):
1626
          # either this is a new hypervisor, or its parameters have changed
1627
          hv_class = hypervisor.GetHypervisor(hv_name)
1628
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1629
          hv_class.CheckParameterSyntax(hv_params)
1630
          _CheckHVParams(self, node_list, hv_name, hv_params)
1631

    
1632
  def Exec(self, feedback_fn):
1633
    """Change the parameters of the cluster.
1634

1635
    """
1636
    if self.op.vg_name is not None:
1637
      new_volume = self.op.vg_name
1638
      if not new_volume:
1639
        new_volume = None
1640
      if new_volume != self.cfg.GetVGName():
1641
        self.cfg.SetVGName(new_volume)
1642
      else:
1643
        feedback_fn("Cluster LVM configuration already in desired"
1644
                    " state, not changing")
1645
    if self.op.hvparams:
1646
      self.cluster.hvparams = self.new_hvparams
1647
    if self.op.enabled_hypervisors is not None:
1648
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1649
    if self.op.beparams:
1650
      self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1651
    if self.op.candidate_pool_size is not None:
1652
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1653
      # we need to update the pool size here, otherwise the save will fail
1654
      _AdjustCandidatePool(self)
1655

    
1656
    self.cfg.Update(self.cluster)
1657

    
1658

    
1659
class LURedistributeConfig(NoHooksLU):
1660
  """Force the redistribution of cluster configuration.
1661

1662
  This is a very simple LU.
1663

1664
  """
1665
  _OP_REQP = []
1666
  REQ_BGL = False
1667

    
1668
  def ExpandNames(self):
1669
    self.needed_locks = {
1670
      locking.LEVEL_NODE: locking.ALL_SET,
1671
    }
1672
    self.share_locks[locking.LEVEL_NODE] = 1
1673

    
1674
  def CheckPrereq(self):
1675
    """Check prerequisites.
1676

1677
    """
1678

    
1679
  def Exec(self, feedback_fn):
1680
    """Redistribute the configuration.
1681

1682
    """
1683
    self.cfg.Update(self.cfg.GetClusterInfo())
1684

    
1685

    
1686
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1687
  """Sleep and poll for an instance's disk to sync.
1688

1689
  """
1690
  if not instance.disks:
1691
    return True
1692

    
1693
  if not oneshot:
1694
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1695

    
1696
  node = instance.primary_node
1697

    
1698
  for dev in instance.disks:
1699
    lu.cfg.SetDiskID(dev, node)
1700

    
1701
  retries = 0
1702
  degr_retries = 10 # in seconds, as we sleep 1 second each time
1703
  while True:
1704
    max_time = 0
1705
    done = True
1706
    cumul_degraded = False
1707
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1708
    if rstats.failed or not rstats.data:
1709
      lu.LogWarning("Can't get any data from node %s", node)
1710
      retries += 1
1711
      if retries >= 10:
1712
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1713
                                 " aborting." % node)
1714
      time.sleep(6)
1715
      continue
1716
    rstats = rstats.data
1717
    retries = 0
1718
    for i, mstat in enumerate(rstats):
1719
      if mstat is None:
1720
        lu.LogWarning("Can't compute data for node %s/%s",
1721
                           node, instance.disks[i].iv_name)
1722
        continue
1723
      # we ignore the ldisk parameter
1724
      perc_done, est_time, is_degraded, _ = mstat
1725
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1726
      if perc_done is not None:
1727
        done = False
1728
        if est_time is not None:
1729
          rem_time = "%d estimated seconds remaining" % est_time
1730
          max_time = est_time
1731
        else:
1732
          rem_time = "no time estimate"
1733
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1734
                        (instance.disks[i].iv_name, perc_done, rem_time))
1735

    
1736
    # if we're done but degraded, let's do a few small retries, to
1737
    # make sure we see a stable and not transient situation; therefore
1738
    # we force restart of the loop
1739
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
1740
      logging.info("Degraded disks found, %d retries left", degr_retries)
1741
      degr_retries -= 1
1742
      time.sleep(1)
1743
      continue
1744

    
1745
    if done or oneshot:
1746
      break
1747

    
1748
    time.sleep(min(60, max_time))
1749

    
1750
  if done:
1751
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1752
  return not cumul_degraded
1753

    
1754

    
1755
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1756
  """Check that mirrors are not degraded.
1757

1758
  The ldisk parameter, if True, will change the test from the
1759
  is_degraded attribute (which represents overall non-ok status for
1760
  the device(s)) to the ldisk (representing the local storage status).
1761

1762
  """
1763
  lu.cfg.SetDiskID(dev, node)
1764
  if ldisk:
1765
    idx = 6
1766
  else:
1767
    idx = 5
1768

    
1769
  result = True
1770
  if on_primary or dev.AssembleOnSecondary():
1771
    rstats = lu.rpc.call_blockdev_find(node, dev)
1772
    msg = rstats.RemoteFailMsg()
1773
    if msg:
1774
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1775
      result = False
1776
    elif not rstats.payload:
1777
      lu.LogWarning("Can't find disk on node %s", node)
1778
      result = False
1779
    else:
1780
      result = result and (not rstats.payload[idx])
1781
  if dev.children:
1782
    for child in dev.children:
1783
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1784

    
1785
  return result
1786

    
1787

    
1788
class LUDiagnoseOS(NoHooksLU):
1789
  """Logical unit for OS diagnose/query.
1790

1791
  """
1792
  _OP_REQP = ["output_fields", "names"]
1793
  REQ_BGL = False
1794
  _FIELDS_STATIC = utils.FieldSet()
1795
  _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1796

    
1797
  def ExpandNames(self):
1798
    if self.op.names:
1799
      raise errors.OpPrereqError("Selective OS query not supported")
1800

    
1801
    _CheckOutputFields(static=self._FIELDS_STATIC,
1802
                       dynamic=self._FIELDS_DYNAMIC,
1803
                       selected=self.op.output_fields)
1804

    
1805
    # Lock all nodes, in shared mode
1806
    # Temporary removal of locks, should be reverted later
1807
    # TODO: reintroduce locks when they are lighter-weight
1808
    self.needed_locks = {}
1809
    #self.share_locks[locking.LEVEL_NODE] = 1
1810
    #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1811

    
1812
  def CheckPrereq(self):
1813
    """Check prerequisites.
1814

1815
    """
1816

    
1817
  @staticmethod
1818
  def _DiagnoseByOS(node_list, rlist):
1819
    """Remaps a per-node return list into an a per-os per-node dictionary
1820

1821
    @param node_list: a list with the names of all nodes
1822
    @param rlist: a map with node names as keys and OS objects as values
1823

1824
    @rtype: dict
1825
    @return: a dictionary with osnames as keys and as value another map, with
1826
        nodes as keys and list of OS objects as values, eg::
1827

1828
          {"debian-etch": {"node1": [<object>,...],
1829
                           "node2": [<object>,]}
1830
          }
1831

1832
    """
1833
    all_os = {}
1834
    # we build here the list of nodes that didn't fail the RPC (at RPC
1835
    # level), so that nodes with a non-responding node daemon don't
1836
    # make all OSes invalid
1837
    good_nodes = [node_name for node_name in rlist
1838
                  if not rlist[node_name].failed]
1839
    for node_name, nr in rlist.iteritems():
1840
      if nr.failed or not nr.data:
1841
        continue
1842
      for os_obj in nr.data:
1843
        if os_obj.name not in all_os:
1844
          # build a list of nodes for this os containing empty lists
1845
          # for each node in node_list
1846
          all_os[os_obj.name] = {}
1847
          for nname in good_nodes:
1848
            all_os[os_obj.name][nname] = []
1849
        all_os[os_obj.name][node_name].append(os_obj)
1850
    return all_os
1851

    
1852
  def Exec(self, feedback_fn):
1853
    """Compute the list of OSes.
1854

1855
    """
1856
    valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1857
    node_data = self.rpc.call_os_diagnose(valid_nodes)
1858
    if node_data == False:
1859
      raise errors.OpExecError("Can't gather the list of OSes")
1860
    pol = self._DiagnoseByOS(valid_nodes, node_data)
1861
    output = []
1862
    for os_name, os_data in pol.iteritems():
1863
      row = []
1864
      for field in self.op.output_fields:
1865
        if field == "name":
1866
          val = os_name
1867
        elif field == "valid":
1868
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1869
        elif field == "node_status":
1870
          val = {}
1871
          for node_name, nos_list in os_data.iteritems():
1872
            val[node_name] = [(v.status, v.path) for v in nos_list]
1873
        else:
1874
          raise errors.ParameterError(field)
1875
        row.append(val)
1876
      output.append(row)
1877

    
1878
    return output
1879

    
1880

    
1881
class LURemoveNode(LogicalUnit):
1882
  """Logical unit for removing a node.
1883

1884
  """
1885
  HPATH = "node-remove"
1886
  HTYPE = constants.HTYPE_NODE
1887
  _OP_REQP = ["node_name"]
1888

    
1889
  def BuildHooksEnv(self):
1890
    """Build hooks env.
1891

1892
    This doesn't run on the target node in the pre phase as a failed
1893
    node would then be impossible to remove.
1894

1895
    """
1896
    env = {
1897
      "OP_TARGET": self.op.node_name,
1898
      "NODE_NAME": self.op.node_name,
1899
      }
1900
    all_nodes = self.cfg.GetNodeList()
1901
    all_nodes.remove(self.op.node_name)
1902
    return env, all_nodes, all_nodes
1903

    
1904
  def CheckPrereq(self):
1905
    """Check prerequisites.
1906

1907
    This checks:
1908
     - the node exists in the configuration
1909
     - it does not have primary or secondary instances
1910
     - it's not the master
1911

1912
    Any errors are signalled by raising errors.OpPrereqError.
1913

1914
    """
1915
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1916
    if node is None:
1917
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1918

    
1919
    instance_list = self.cfg.GetInstanceList()
1920

    
1921
    masternode = self.cfg.GetMasterNode()
1922
    if node.name == masternode:
1923
      raise errors.OpPrereqError("Node is the master node,"
1924
                                 " you need to failover first.")
1925

    
1926
    for instance_name in instance_list:
1927
      instance = self.cfg.GetInstanceInfo(instance_name)
1928
      if node.name in instance.all_nodes:
1929
        raise errors.OpPrereqError("Instance %s is still running on the node,"
1930
                                   " please remove first." % instance_name)
1931
    self.op.node_name = node.name
1932
    self.node = node
1933

    
1934
  def Exec(self, feedback_fn):
1935
    """Removes the node from the cluster.
1936

1937
    """
1938
    node = self.node
1939
    logging.info("Stopping the node daemon and removing configs from node %s",
1940
                 node.name)
1941

    
1942
    self.context.RemoveNode(node.name)
1943

    
1944
    self.rpc.call_node_leave_cluster(node.name)
1945

    
1946
    # Promote nodes to master candidate as needed
1947
    _AdjustCandidatePool(self)
1948

    
1949

    
1950
class LUQueryNodes(NoHooksLU):
1951
  """Logical unit for querying nodes.
1952

1953
  """
1954
  _OP_REQP = ["output_fields", "names", "use_locking"]
1955
  REQ_BGL = False
1956
  _FIELDS_DYNAMIC = utils.FieldSet(
1957
    "dtotal", "dfree",
1958
    "mtotal", "mnode", "mfree",
1959
    "bootid",
1960
    "ctotal", "cnodes", "csockets",
1961
    )
1962

    
1963
  _FIELDS_STATIC = utils.FieldSet(
1964
    "name", "pinst_cnt", "sinst_cnt",
1965
    "pinst_list", "sinst_list",
1966
    "pip", "sip", "tags",
1967
    "serial_no",
1968
    "master_candidate",
1969
    "master",
1970
    "offline",
1971
    "drained",
1972
    "role",
1973
    )
1974

    
1975
  def ExpandNames(self):
1976
    _CheckOutputFields(static=self._FIELDS_STATIC,
1977
                       dynamic=self._FIELDS_DYNAMIC,
1978
                       selected=self.op.output_fields)
1979

    
1980
    self.needed_locks = {}
1981
    self.share_locks[locking.LEVEL_NODE] = 1
1982

    
1983
    if self.op.names:
1984
      self.wanted = _GetWantedNodes(self, self.op.names)
1985
    else:
1986
      self.wanted = locking.ALL_SET
1987

    
1988
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1989
    self.do_locking = self.do_node_query and self.op.use_locking
1990
    if self.do_locking:
1991
      # if we don't request only static fields, we need to lock the nodes
1992
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1993

    
1994

    
1995
  def CheckPrereq(self):
1996
    """Check prerequisites.
1997

1998
    """
1999
    # The validation of the node list is done in the _GetWantedNodes,
2000
    # if non empty, and if empty, there's no validation to do
2001
    pass
2002

    
2003
  def Exec(self, feedback_fn):
2004
    """Computes the list of nodes and their attributes.
2005

2006
    """
2007
    all_info = self.cfg.GetAllNodesInfo()
2008
    if self.do_locking:
2009
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
2010
    elif self.wanted != locking.ALL_SET:
2011
      nodenames = self.wanted
2012
      missing = set(nodenames).difference(all_info.keys())
2013
      if missing:
2014
        raise errors.OpExecError(
2015
          "Some nodes were removed before retrieving their data: %s" % missing)
2016
    else:
2017
      nodenames = all_info.keys()
2018

    
2019
    nodenames = utils.NiceSort(nodenames)
2020
    nodelist = [all_info[name] for name in nodenames]
2021

    
2022
    # begin data gathering
2023

    
2024
    if self.do_node_query:
2025
      live_data = {}
2026
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2027
                                          self.cfg.GetHypervisorType())
2028
      for name in nodenames:
2029
        nodeinfo = node_data[name]
2030
        if not nodeinfo.failed and nodeinfo.data:
2031
          nodeinfo = nodeinfo.data
2032
          fn = utils.TryConvert
2033
          live_data[name] = {
2034
            "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2035
            "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2036
            "mfree": fn(int, nodeinfo.get('memory_free', None)),
2037
            "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2038
            "dfree": fn(int, nodeinfo.get('vg_free', None)),
2039
            "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2040
            "bootid": nodeinfo.get('bootid', None),
2041
            "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2042
            "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2043
            }
2044
        else:
2045
          live_data[name] = {}
2046
    else:
2047
      live_data = dict.fromkeys(nodenames, {})
2048

    
2049
    node_to_primary = dict([(name, set()) for name in nodenames])
2050
    node_to_secondary = dict([(name, set()) for name in nodenames])
2051

    
2052
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
2053
                             "sinst_cnt", "sinst_list"))
2054
    if inst_fields & frozenset(self.op.output_fields):
2055
      instancelist = self.cfg.GetInstanceList()
2056

    
2057
      for instance_name in instancelist:
2058
        inst = self.cfg.GetInstanceInfo(instance_name)
2059
        if inst.primary_node in node_to_primary:
2060
          node_to_primary[inst.primary_node].add(inst.name)
2061
        for secnode in inst.secondary_nodes:
2062
          if secnode in node_to_secondary:
2063
            node_to_secondary[secnode].add(inst.name)
2064

    
2065
    master_node = self.cfg.GetMasterNode()
2066

    
2067
    # end data gathering
2068

    
2069
    output = []
2070
    for node in nodelist:
2071
      node_output = []
2072
      for field in self.op.output_fields:
2073
        if field == "name":
2074
          val = node.name
2075
        elif field == "pinst_list":
2076
          val = list(node_to_primary[node.name])
2077
        elif field == "sinst_list":
2078
          val = list(node_to_secondary[node.name])
2079
        elif field == "pinst_cnt":
2080
          val = len(node_to_primary[node.name])
2081
        elif field == "sinst_cnt":
2082
          val = len(node_to_secondary[node.name])
2083
        elif field == "pip":
2084
          val = node.primary_ip
2085
        elif field == "sip":
2086
          val = node.secondary_ip
2087
        elif field == "tags":
2088
          val = list(node.GetTags())
2089
        elif field == "serial_no":
2090
          val = node.serial_no
2091
        elif field == "master_candidate":
2092
          val = node.master_candidate
2093
        elif field == "master":
2094
          val = node.name == master_node
2095
        elif field == "offline":
2096
          val = node.offline
2097
        elif field == "drained":
2098
          val = node.drained
2099
        elif self._FIELDS_DYNAMIC.Matches(field):
2100
          val = live_data[node.name].get(field, None)
2101
        elif field == "role":
2102
          if node.name == master_node:
2103
            val = "M"
2104
          elif node.master_candidate:
2105
            val = "C"
2106
          elif node.drained:
2107
            val = "D"
2108
          elif node.offline:
2109
            val = "O"
2110
          else:
2111
            val = "R"
2112
        else:
2113
          raise errors.ParameterError(field)
2114
        node_output.append(val)
2115
      output.append(node_output)
2116

    
2117
    return output
2118

    
2119

    
2120
class LUQueryNodeVolumes(NoHooksLU):
2121
  """Logical unit for getting volumes on node(s).
2122

2123
  """
2124
  _OP_REQP = ["nodes", "output_fields"]
2125
  REQ_BGL = False
2126
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2127
  _FIELDS_STATIC = utils.FieldSet("node")
2128

    
2129
  def ExpandNames(self):
2130
    _CheckOutputFields(static=self._FIELDS_STATIC,
2131
                       dynamic=self._FIELDS_DYNAMIC,
2132
                       selected=self.op.output_fields)
2133

    
2134
    self.needed_locks = {}
2135
    self.share_locks[locking.LEVEL_NODE] = 1
2136
    if not self.op.nodes:
2137
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2138
    else:
2139
      self.needed_locks[locking.LEVEL_NODE] = \
2140
        _GetWantedNodes(self, self.op.nodes)
2141

    
2142
  def CheckPrereq(self):
2143
    """Check prerequisites.
2144

2145
    This checks that the fields required are valid output fields.
2146

2147
    """
2148
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2149

    
2150
  def Exec(self, feedback_fn):
2151
    """Computes the list of nodes and their attributes.
2152

2153
    """
2154
    nodenames = self.nodes
2155
    volumes = self.rpc.call_node_volumes(nodenames)
2156

    
2157
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
2158
             in self.cfg.GetInstanceList()]
2159

    
2160
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2161

    
2162
    output = []
2163
    for node in nodenames:
2164
      if node not in volumes or volumes[node].failed or not volumes[node].data:
2165
        continue
2166

    
2167
      node_vols = volumes[node].data[:]
2168
      node_vols.sort(key=lambda vol: vol['dev'])
2169

    
2170
      for vol in node_vols:
2171
        node_output = []
2172
        for field in self.op.output_fields:
2173
          if field == "node":
2174
            val = node
2175
          elif field == "phys":
2176
            val = vol['dev']
2177
          elif field == "vg":
2178
            val = vol['vg']
2179
          elif field == "name":
2180
            val = vol['name']
2181
          elif field == "size":
2182
            val = int(float(vol['size']))
2183
          elif field == "instance":
2184
            for inst in ilist:
2185
              if node not in lv_by_node[inst]:
2186
                continue
2187
              if vol['name'] in lv_by_node[inst][node]:
2188
                val = inst.name
2189
                break
2190
            else:
2191
              val = '-'
2192
          else:
2193
            raise errors.ParameterError(field)
2194
          node_output.append(str(val))
2195

    
2196
        output.append(node_output)
2197

    
2198
    return output
2199

    
2200

    
2201
class LUAddNode(LogicalUnit):
2202
  """Logical unit for adding node to the cluster.
2203

2204
  """
2205
  HPATH = "node-add"
2206
  HTYPE = constants.HTYPE_NODE
2207
  _OP_REQP = ["node_name"]
2208

    
2209
  def BuildHooksEnv(self):
2210
    """Build hooks env.
2211

2212
    This will run on all nodes before, and on all nodes + the new node after.
2213

2214
    """
2215
    env = {
2216
      "OP_TARGET": self.op.node_name,
2217
      "NODE_NAME": self.op.node_name,
2218
      "NODE_PIP": self.op.primary_ip,
2219
      "NODE_SIP": self.op.secondary_ip,
2220
      }
2221
    nodes_0 = self.cfg.GetNodeList()
2222
    nodes_1 = nodes_0 + [self.op.node_name, ]
2223
    return env, nodes_0, nodes_1
2224

    
2225
  def CheckPrereq(self):
2226
    """Check prerequisites.
2227

2228
    This checks:
2229
     - the new node is not already in the config
2230
     - it is resolvable
2231
     - its parameters (single/dual homed) matches the cluster
2232

2233
    Any errors are signalled by raising errors.OpPrereqError.
2234

2235
    """
2236
    node_name = self.op.node_name
2237
    cfg = self.cfg
2238

    
2239
    dns_data = utils.HostInfo(node_name)
2240

    
2241
    node = dns_data.name
2242
    primary_ip = self.op.primary_ip = dns_data.ip
2243
    secondary_ip = getattr(self.op, "secondary_ip", None)
2244
    if secondary_ip is None:
2245
      secondary_ip = primary_ip
2246
    if not utils.IsValidIP(secondary_ip):
2247
      raise errors.OpPrereqError("Invalid secondary IP given")
2248
    self.op.secondary_ip = secondary_ip
2249

    
2250
    node_list = cfg.GetNodeList()
2251
    if not self.op.readd and node in node_list:
2252
      raise errors.OpPrereqError("Node %s is already in the configuration" %
2253
                                 node)
2254
    elif self.op.readd and node not in node_list:
2255
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2256

    
2257
    for existing_node_name in node_list:
2258
      existing_node = cfg.GetNodeInfo(existing_node_name)
2259

    
2260
      if self.op.readd and node == existing_node_name:
2261
        if (existing_node.primary_ip != primary_ip or
2262
            existing_node.secondary_ip != secondary_ip):
2263
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
2264
                                     " address configuration as before")
2265
        continue
2266

    
2267
      if (existing_node.primary_ip == primary_ip or
2268
          existing_node.secondary_ip == primary_ip or
2269
          existing_node.primary_ip == secondary_ip or
2270
          existing_node.secondary_ip == secondary_ip):
2271
        raise errors.OpPrereqError("New node ip address(es) conflict with"
2272
                                   " existing node %s" % existing_node.name)
2273

    
2274
    # check that the type of the node (single versus dual homed) is the
2275
    # same as for the master
2276
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2277
    master_singlehomed = myself.secondary_ip == myself.primary_ip
2278
    newbie_singlehomed = secondary_ip == primary_ip
2279
    if master_singlehomed != newbie_singlehomed:
2280
      if master_singlehomed:
2281
        raise errors.OpPrereqError("The master has no private ip but the"
2282
                                   " new node has one")
2283
      else:
2284
        raise errors.OpPrereqError("The master has a private ip but the"
2285
                                   " new node doesn't have one")
2286

    
2287
    # checks reachablity
2288
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2289
      raise errors.OpPrereqError("Node not reachable by ping")
2290

    
2291
    if not newbie_singlehomed:
2292
      # check reachability from my secondary ip to newbie's secondary ip
2293
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2294
                           source=myself.secondary_ip):
2295
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2296
                                   " based ping to noded port")
2297

    
2298
    cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2299
    if self.op.readd:
2300
      exceptions = [node]
2301
    else:
2302
      exceptions = []
2303
    mc_now, mc_max = self.cfg.GetMasterCandidateStats(exceptions)
2304
    # the new node will increase mc_max with one, so:
2305
    mc_max = min(mc_max + 1, cp_size)
2306
    self.master_candidate = mc_now < mc_max
2307

    
2308
    if self.op.readd:
2309
      self.new_node = self.cfg.GetNodeInfo(node)
2310
      assert self.new_node is not None, "Can't retrieve locked node %s" % node
2311
    else:
2312
      self.new_node = objects.Node(name=node,
2313
                                   primary_ip=primary_ip,
2314
                                   secondary_ip=secondary_ip,
2315
                                   master_candidate=self.master_candidate,
2316
                                   offline=False, drained=False)
2317

    
2318
  def Exec(self, feedback_fn):
2319
    """Adds the new node to the cluster.
2320

2321
    """
2322
    new_node = self.new_node
2323
    node = new_node.name
2324

    
2325
    # for re-adds, reset the offline/drained/master-candidate flags;
2326
    # we need to reset here, otherwise offline would prevent RPC calls
2327
    # later in the procedure; this also means that if the re-add
2328
    # fails, we are left with a non-offlined, broken node
2329
    if self.op.readd:
2330
      new_node.drained = new_node.offline = False
2331
      self.LogInfo("Readding a node, the offline/drained flags were reset")
2332
      # if we demote the node, we do cleanup later in the procedure
2333
      new_node.master_candidate = self.master_candidate
2334

    
2335
    # notify the user about any possible mc promotion
2336
    if new_node.master_candidate:
2337
      self.LogInfo("Node will be a master candidate")
2338

    
2339
    # check connectivity
2340
    result = self.rpc.call_version([node])[node]
2341
    result.Raise()
2342
    if result.data:
2343
      if constants.PROTOCOL_VERSION == result.data:
2344
        logging.info("Communication to node %s fine, sw version %s match",
2345
                     node, result.data)
2346
      else:
2347
        raise errors.OpExecError("Version mismatch master version %s,"
2348
                                 " node version %s" %
2349
                                 (constants.PROTOCOL_VERSION, result.data))
2350
    else:
2351
      raise errors.OpExecError("Cannot get version from the new node")
2352

    
2353
    # setup ssh on node
2354
    logging.info("Copy ssh key to node %s", node)
2355
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2356
    keyarray = []
2357
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2358
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2359
                priv_key, pub_key]
2360

    
2361
    for i in keyfiles:
2362
      f = open(i, 'r')
2363
      try:
2364
        keyarray.append(f.read())
2365
      finally:
2366
        f.close()
2367

    
2368
    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2369
                                    keyarray[2],
2370
                                    keyarray[3], keyarray[4], keyarray[5])
2371

    
2372
    msg = result.RemoteFailMsg()
2373
    if msg:
2374
      raise errors.OpExecError("Cannot transfer ssh keys to the"
2375
                               " new node: %s" % msg)
2376

    
2377
    # Add node to our /etc/hosts, and add key to known_hosts
2378
    utils.AddHostToEtcHosts(new_node.name)
2379

    
2380
    if new_node.secondary_ip != new_node.primary_ip:
2381
      result = self.rpc.call_node_has_ip_address(new_node.name,
2382
                                                 new_node.secondary_ip)
2383
      if result.failed or not result.data:
2384
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2385
                                 " you gave (%s). Please fix and re-run this"
2386
                                 " command." % new_node.secondary_ip)
2387

    
2388
    node_verify_list = [self.cfg.GetMasterNode()]
2389
    node_verify_param = {
2390
      'nodelist': [node],
2391
      # TODO: do a node-net-test as well?
2392
    }
2393

    
2394
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2395
                                       self.cfg.GetClusterName())
2396
    for verifier in node_verify_list:
2397
      if result[verifier].failed or not result[verifier].data:
2398
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
2399
                                 " for remote verification" % verifier)
2400
      if result[verifier].data['nodelist']:
2401
        for failed in result[verifier].data['nodelist']:
2402
          feedback_fn("ssh/hostname verification failed %s -> %s" %
2403
                      (verifier, result[verifier].data['nodelist'][failed]))
2404
        raise errors.OpExecError("ssh/hostname verification failed.")
2405

    
2406
    # Distribute updated /etc/hosts and known_hosts to all nodes,
2407
    # including the node just added
2408
    myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2409
    dist_nodes = self.cfg.GetNodeList()
2410
    if not self.op.readd:
2411
      dist_nodes.append(node)
2412
    if myself.name in dist_nodes:
2413
      dist_nodes.remove(myself.name)
2414

    
2415
    logging.debug("Copying hosts and known_hosts to all nodes")
2416
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2417
      result = self.rpc.call_upload_file(dist_nodes, fname)
2418
      for to_node, to_result in result.iteritems():
2419
        if to_result.failed or not to_result.data:
2420
          logging.error("Copy of file %s to node %s failed", fname, to_node)
2421

    
2422
    to_copy = []
2423
    enabled_hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
2424
    if constants.HTS_COPY_VNC_PASSWORD.intersection(enabled_hypervisors):
2425
      to_copy.append(constants.VNC_PASSWORD_FILE)
2426

    
2427
    for fname in to_copy:
2428
      result = self.rpc.call_upload_file([node], fname)
2429
      if result[node].failed or not result[node]:
2430
        logging.error("Could not copy file %s to node %s", fname, node)
2431

    
2432
    if self.op.readd:
2433
      self.context.ReaddNode(new_node)
2434
      # make sure we redistribute the config
2435
      self.cfg.Update(new_node)
2436
      # and make sure the new node will not have old files around
2437
      if not new_node.master_candidate:
2438
        result = self.rpc.call_node_demote_from_mc(new_node.name)
2439
        msg = result.RemoteFailMsg()
2440
        if msg:
2441
          self.LogWarning("Node failed to demote itself from master"
2442
                          " candidate status: %s" % msg)
2443
    else:
2444
      self.context.AddNode(new_node)
2445

    
2446

    
2447
class LUSetNodeParams(LogicalUnit):
2448
  """Modifies the parameters of a node.
2449

2450
  """
2451
  HPATH = "node-modify"
2452
  HTYPE = constants.HTYPE_NODE
2453
  _OP_REQP = ["node_name"]
2454
  REQ_BGL = False
2455

    
2456
  def CheckArguments(self):
2457
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
2458
    if node_name is None:
2459
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2460
    self.op.node_name = node_name
2461
    _CheckBooleanOpField(self.op, 'master_candidate')
2462
    _CheckBooleanOpField(self.op, 'offline')
2463
    _CheckBooleanOpField(self.op, 'drained')
2464
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2465
    if all_mods.count(None) == 3:
2466
      raise errors.OpPrereqError("Please pass at least one modification")
2467
    if all_mods.count(True) > 1:
2468
      raise errors.OpPrereqError("Can't set the node into more than one"
2469
                                 " state at the same time")
2470

    
2471
  def ExpandNames(self):
2472
    self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2473

    
2474
  def BuildHooksEnv(self):
2475
    """Build hooks env.
2476

2477
    This runs on the master node.
2478

2479
    """
2480
    env = {
2481
      "OP_TARGET": self.op.node_name,
2482
      "MASTER_CANDIDATE": str(self.op.master_candidate),
2483
      "OFFLINE": str(self.op.offline),
2484
      "DRAINED": str(self.op.drained),
2485
      }
2486
    nl = [self.cfg.GetMasterNode(),
2487
          self.op.node_name]
2488
    return env, nl, nl
2489

    
2490
  def CheckPrereq(self):
2491
    """Check prerequisites.
2492

2493
    This only checks the instance list against the existing names.
2494

2495
    """
2496
    node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2497

    
2498
    if ((self.op.master_candidate == False or self.op.offline == True or
2499
         self.op.drained == True) and node.master_candidate):
2500
      # we will demote the node from master_candidate
2501
      if self.op.node_name == self.cfg.GetMasterNode():
2502
        raise errors.OpPrereqError("The master node has to be a"
2503
                                   " master candidate, online and not drained")
2504
      cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2505
      num_candidates, _ = self.cfg.GetMasterCandidateStats()
2506
      if num_candidates <= cp_size:
2507
        msg = ("Not enough master candidates (desired"
2508
               " %d, new value will be %d)" % (cp_size, num_candidates-1))
2509
        if self.op.force:
2510
          self.LogWarning(msg)
2511
        else:
2512
          raise errors.OpPrereqError(msg)
2513

    
2514
    if (self.op.master_candidate == True and
2515
        ((node.offline and not self.op.offline == False) or
2516
         (node.drained and not self.op.drained == False))):
2517
      raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2518
                                 " to master_candidate" % node.name)
2519

    
2520
    return
2521

    
2522
  def Exec(self, feedback_fn):
2523
    """Modifies a node.
2524

2525
    """
2526
    node = self.node
2527

    
2528
    result = []
2529
    changed_mc = False
2530

    
2531
    if self.op.offline is not None:
2532
      node.offline = self.op.offline
2533
      result.append(("offline", str(self.op.offline)))
2534
      if self.op.offline == True:
2535
        if node.master_candidate:
2536
          node.master_candidate = False
2537
          changed_mc = True
2538
          result.append(("master_candidate", "auto-demotion due to offline"))
2539
        if node.drained:
2540
          node.drained = False
2541
          result.append(("drained", "clear drained status due to offline"))
2542

    
2543
    if self.op.master_candidate is not None:
2544
      node.master_candidate = self.op.master_candidate
2545
      changed_mc = True
2546
      result.append(("master_candidate", str(self.op.master_candidate)))
2547
      if self.op.master_candidate == False:
2548
        rrc = self.rpc.call_node_demote_from_mc(node.name)
2549
        msg = rrc.RemoteFailMsg()
2550
        if msg:
2551
          self.LogWarning("Node failed to demote itself: %s" % msg)
2552

    
2553
    if self.op.drained is not None:
2554
      node.drained = self.op.drained
2555
      result.append(("drained", str(self.op.drained)))
2556
      if self.op.drained == True:
2557
        if node.master_candidate:
2558
          node.master_candidate = False
2559
          changed_mc = True
2560
          result.append(("master_candidate", "auto-demotion due to drain"))
2561
          rrc = self.rpc.call_node_demote_from_mc(node.name)
2562
          msg = rrc.RemoteFailMsg()
2563
          if msg:
2564
            self.LogWarning("Node failed to demote itself: %s" % msg)
2565
        if node.offline:
2566
          node.offline = False
2567
          result.append(("offline", "clear offline status due to drain"))
2568

    
2569
    # this will trigger configuration file update, if needed
2570
    self.cfg.Update(node)
2571
    # this will trigger job queue propagation or cleanup
2572
    if changed_mc:
2573
      self.context.ReaddNode(node)
2574

    
2575
    return result
2576

    
2577

    
2578
class LUQueryClusterInfo(NoHooksLU):
2579
  """Query cluster configuration.
2580

2581
  """
2582
  _OP_REQP = []
2583
  REQ_BGL = False
2584

    
2585
  def ExpandNames(self):
2586
    self.needed_locks = {}
2587

    
2588
  def CheckPrereq(self):
2589
    """No prerequsites needed for this LU.
2590

2591
    """
2592
    pass
2593

    
2594
  def Exec(self, feedback_fn):
2595
    """Return cluster config.
2596

2597
    """
2598
    cluster = self.cfg.GetClusterInfo()
2599
    result = {
2600
      "software_version": constants.RELEASE_VERSION,
2601
      "protocol_version": constants.PROTOCOL_VERSION,
2602
      "config_version": constants.CONFIG_VERSION,
2603
      "os_api_version": constants.OS_API_VERSION,
2604
      "export_version": constants.EXPORT_VERSION,
2605
      "architecture": (platform.architecture()[0], platform.machine()),
2606
      "name": cluster.cluster_name,
2607
      "master": cluster.master_node,
2608
      "default_hypervisor": cluster.default_hypervisor,
2609
      "enabled_hypervisors": cluster.enabled_hypervisors,
2610
      "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2611
                        for hypervisor in cluster.enabled_hypervisors]),
2612
      "beparams": cluster.beparams,
2613
      "candidate_pool_size": cluster.candidate_pool_size,
2614
      "default_bridge": cluster.default_bridge,
2615
      "master_netdev": cluster.master_netdev,
2616
      "volume_group_name": cluster.volume_group_name,
2617
      "file_storage_dir": cluster.file_storage_dir,
2618
      }
2619

    
2620
    return result
2621

    
2622

    
2623
class LUQueryConfigValues(NoHooksLU):
2624
  """Return configuration values.
2625

2626
  """
2627
  _OP_REQP = []
2628
  REQ_BGL = False
2629
  _FIELDS_DYNAMIC = utils.FieldSet()
2630
  _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2631

    
2632
  def ExpandNames(self):
2633
    self.needed_locks = {}
2634

    
2635
    _CheckOutputFields(static=self._FIELDS_STATIC,
2636
                       dynamic=self._FIELDS_DYNAMIC,
2637
                       selected=self.op.output_fields)
2638

    
2639
  def CheckPrereq(self):
2640
    """No prerequisites.
2641

2642
    """
2643
    pass
2644

    
2645
  def Exec(self, feedback_fn):
2646
    """Dump a representation of the cluster config to the standard output.
2647

2648
    """
2649
    values = []
2650
    for field in self.op.output_fields:
2651
      if field == "cluster_name":
2652
        entry = self.cfg.GetClusterName()
2653
      elif field == "master_node":
2654
        entry = self.cfg.GetMasterNode()
2655
      elif field == "drain_flag":
2656
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2657
      else:
2658
        raise errors.ParameterError(field)
2659
      values.append(entry)
2660
    return values
2661

    
2662

    
2663
class LUActivateInstanceDisks(NoHooksLU):
2664
  """Bring up an instance's disks.
2665

2666
  """
2667
  _OP_REQP = ["instance_name"]
2668
  REQ_BGL = False
2669

    
2670
  def ExpandNames(self):
2671
    self._ExpandAndLockInstance()
2672
    self.needed_locks[locking.LEVEL_NODE] = []
2673
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2674

    
2675
  def DeclareLocks(self, level):
2676
    if level == locking.LEVEL_NODE:
2677
      self._LockInstancesNodes()
2678

    
2679
  def CheckPrereq(self):
2680
    """Check prerequisites.
2681

2682
    This checks that the instance is in the cluster.
2683

2684
    """
2685
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2686
    assert self.instance is not None, \
2687
      "Cannot retrieve locked instance %s" % self.op.instance_name
2688
    _CheckNodeOnline(self, self.instance.primary_node)
2689
    if not hasattr(self.op, "ignore_size"):
2690
      self.op.ignore_size = False
2691

    
2692
  def Exec(self, feedback_fn):
2693
    """Activate the disks.
2694

2695
    """
2696
    disks_ok, disks_info = \
2697
              _AssembleInstanceDisks(self, self.instance,
2698
                                     ignore_size=self.op.ignore_size)
2699
    if not disks_ok:
2700
      raise errors.OpExecError("Cannot activate block devices")
2701

    
2702
    return disks_info
2703

    
2704

    
2705
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False,
2706
                           ignore_size=False):
2707
  """Prepare the block devices for an instance.
2708

2709
  This sets up the block devices on all nodes.
2710

2711
  @type lu: L{LogicalUnit}
2712
  @param lu: the logical unit on whose behalf we execute
2713
  @type instance: L{objects.Instance}
2714
  @param instance: the instance for whose disks we assemble
2715
  @type ignore_secondaries: boolean
2716
  @param ignore_secondaries: if true, errors on secondary nodes
2717
      won't result in an error return from the function
2718
  @type ignore_size: boolean
2719
  @param ignore_size: if true, the current known size of the disk
2720
      will not be used during the disk activation, useful for cases
2721
      when the size is wrong
2722
  @return: False if the operation failed, otherwise a list of
2723
      (host, instance_visible_name, node_visible_name)
2724
      with the mapping from node devices to instance devices
2725

2726
  """
2727
  device_info = []
2728
  disks_ok = True
2729
  iname = instance.name
2730
  # With the two passes mechanism we try to reduce the window of
2731
  # opportunity for the race condition of switching DRBD to primary
2732
  # before handshaking occured, but we do not eliminate it
2733

    
2734
  # The proper fix would be to wait (with some limits) until the
2735
  # connection has been made and drbd transitions from WFConnection
2736
  # into any other network-connected state (Connected, SyncTarget,
2737
  # SyncSource, etc.)
2738

    
2739
  # 1st pass, assemble on all nodes in secondary mode
2740
  for inst_disk in instance.disks:
2741
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2742
      if ignore_size:
2743
        node_disk = node_disk.Copy()
2744
        node_disk.UnsetSize()
2745
      lu.cfg.SetDiskID(node_disk, node)
2746
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2747
      msg = result.RemoteFailMsg()
2748
      if msg:
2749
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2750
                           " (is_primary=False, pass=1): %s",
2751
                           inst_disk.iv_name, node, msg)
2752
        if not ignore_secondaries:
2753
          disks_ok = False
2754

    
2755
  # FIXME: race condition on drbd migration to primary
2756

    
2757
  # 2nd pass, do only the primary node
2758
  for inst_disk in instance.disks:
2759
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2760
      if node != instance.primary_node:
2761
        continue
2762
      if ignore_size:
2763
        node_disk = node_disk.Copy()
2764
        node_disk.UnsetSize()
2765
      lu.cfg.SetDiskID(node_disk, node)
2766
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2767
      msg = result.RemoteFailMsg()
2768
      if msg:
2769
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2770
                           " (is_primary=True, pass=2): %s",
2771
                           inst_disk.iv_name, node, msg)
2772
        disks_ok = False
2773
    device_info.append((instance.primary_node, inst_disk.iv_name,
2774
                        result.payload))
2775

    
2776
  # leave the disks configured for the primary node
2777
  # this is a workaround that would be fixed better by
2778
  # improving the logical/physical id handling
2779
  for disk in instance.disks:
2780
    lu.cfg.SetDiskID(disk, instance.primary_node)
2781

    
2782
  return disks_ok, device_info
2783

    
2784

    
2785
def _StartInstanceDisks(lu, instance, force):
2786
  """Start the disks of an instance.
2787

2788
  """
2789
  disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2790
                                           ignore_secondaries=force)
2791
  if not disks_ok:
2792
    _ShutdownInstanceDisks(lu, instance)
2793
    if force is not None and not force:
2794
      lu.proc.LogWarning("", hint="If the message above refers to a"
2795
                         " secondary node,"
2796
                         " you can retry the operation using '--force'.")
2797
    raise errors.OpExecError("Disk consistency error")
2798

    
2799

    
2800
class LUDeactivateInstanceDisks(NoHooksLU):
2801
  """Shutdown an instance's disks.
2802

2803
  """
2804
  _OP_REQP = ["instance_name"]
2805
  REQ_BGL = False
2806

    
2807
  def ExpandNames(self):
2808
    self._ExpandAndLockInstance()
2809
    self.needed_locks[locking.LEVEL_NODE] = []
2810
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2811

    
2812
  def DeclareLocks(self, level):
2813
    if level == locking.LEVEL_NODE:
2814
      self._LockInstancesNodes()
2815

    
2816
  def CheckPrereq(self):
2817
    """Check prerequisites.
2818

2819
    This checks that the instance is in the cluster.
2820

2821
    """
2822
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2823
    assert self.instance is not None, \
2824
      "Cannot retrieve locked instance %s" % self.op.instance_name
2825

    
2826
  def Exec(self, feedback_fn):
2827
    """Deactivate the disks
2828

2829
    """
2830
    instance = self.instance
2831
    _SafeShutdownInstanceDisks(self, instance)
2832

    
2833

    
2834
def _SafeShutdownInstanceDisks(lu, instance):
2835
  """Shutdown block devices of an instance.
2836

2837
  This function checks if an instance is running, before calling
2838
  _ShutdownInstanceDisks.
2839

2840
  """
2841
  ins_l = lu.rpc.call_instance_list([instance.primary_node],
2842
                                      [instance.hypervisor])
2843
  ins_l = ins_l[instance.primary_node]
2844
  if ins_l.failed or not isinstance(ins_l.data, list):
2845
    raise errors.OpExecError("Can't contact node '%s'" %
2846
                             instance.primary_node)
2847

    
2848
  if instance.name in ins_l.data:
2849
    raise errors.OpExecError("Instance is running, can't shutdown"
2850
                             " block devices.")
2851

    
2852
  _ShutdownInstanceDisks(lu, instance)
2853

    
2854

    
2855
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2856
  """Shutdown block devices of an instance.
2857

2858
  This does the shutdown on all nodes of the instance.
2859

2860
  If the ignore_primary is false, errors on the primary node are
2861
  ignored.
2862

2863
  """
2864
  all_result = True
2865
  for disk in instance.disks:
2866
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2867
      lu.cfg.SetDiskID(top_disk, node)
2868
      result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2869
      msg = result.RemoteFailMsg()
2870
      if msg:
2871
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2872
                      disk.iv_name, node, msg)
2873
        if not ignore_primary or node != instance.primary_node:
2874
          all_result = False
2875
  return all_result
2876

    
2877

    
2878
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2879
  """Checks if a node has enough free memory.
2880

2881
  This function check if a given node has the needed amount of free
2882
  memory. In case the node has less memory or we cannot get the
2883
  information from the node, this function raise an OpPrereqError
2884
  exception.
2885

2886
  @type lu: C{LogicalUnit}
2887
  @param lu: a logical unit from which we get configuration data
2888
  @type node: C{str}
2889
  @param node: the node to check
2890
  @type reason: C{str}
2891
  @param reason: string to use in the error message
2892
  @type requested: C{int}
2893
  @param requested: the amount of memory in MiB to check for
2894
  @type hypervisor_name: C{str}
2895
  @param hypervisor_name: the hypervisor to ask for memory stats
2896
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2897
      we cannot check the node
2898

2899
  """
2900
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2901
  nodeinfo[node].Raise()
2902
  free_mem = nodeinfo[node].data.get('memory_free')
2903
  if not isinstance(free_mem, int):
2904
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2905
                             " was '%s'" % (node, free_mem))
2906
  if requested > free_mem:
2907
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2908
                             " needed %s MiB, available %s MiB" %
2909
                             (node, reason, requested, free_mem))
2910

    
2911

    
2912
class LUStartupInstance(LogicalUnit):
2913
  """Starts an instance.
2914

2915
  """
2916
  HPATH = "instance-start"
2917
  HTYPE = constants.HTYPE_INSTANCE
2918
  _OP_REQP = ["instance_name", "force"]
2919
  REQ_BGL = False
2920

    
2921
  def ExpandNames(self):
2922
    self._ExpandAndLockInstance()
2923

    
2924
  def BuildHooksEnv(self):
2925
    """Build hooks env.
2926

2927
    This runs on master, primary and secondary nodes of the instance.
2928

2929
    """
2930
    env = {
2931
      "FORCE": self.op.force,
2932
      }
2933
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2934
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2935
    return env, nl, nl
2936

    
2937
  def CheckPrereq(self):
2938
    """Check prerequisites.
2939

2940
    This checks that the instance is in the cluster.
2941

2942
    """
2943
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2944
    assert self.instance is not None, \
2945
      "Cannot retrieve locked instance %s" % self.op.instance_name
2946

    
2947
    # extra beparams
2948
    self.beparams = getattr(self.op, "beparams", {})
2949
    if self.beparams:
2950
      if not isinstance(self.beparams, dict):
2951
        raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2952
                                   " dict" % (type(self.beparams), ))
2953
      # fill the beparams dict
2954
      utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2955
      self.op.beparams = self.beparams
2956

    
2957
    # extra hvparams
2958
    self.hvparams = getattr(self.op, "hvparams", {})
2959
    if self.hvparams:
2960
      if not isinstance(self.hvparams, dict):
2961
        raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2962
                                   " dict" % (type(self.hvparams), ))
2963

    
2964
      # check hypervisor parameter syntax (locally)
2965
      cluster = self.cfg.GetClusterInfo()
2966
      utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2967
      filled_hvp = cluster.FillDict(cluster.hvparams[instance.hypervisor],
2968
                                    instance.hvparams)
2969
      filled_hvp.update(self.hvparams)
2970
      hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2971
      hv_type.CheckParameterSyntax(filled_hvp)
2972
      _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2973
      self.op.hvparams = self.hvparams
2974

    
2975
    _CheckNodeOnline(self, instance.primary_node)
2976

    
2977
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2978
    # check bridges existance
2979
    _CheckInstanceBridgesExist(self, instance)
2980

    
2981
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2982
                                              instance.name,
2983
                                              instance.hypervisor)
2984
    remote_info.Raise()
2985
    if not remote_info.data:
2986
      _CheckNodeFreeMemory(self, instance.primary_node,
2987
                           "starting instance %s" % instance.name,
2988
                           bep[constants.BE_MEMORY], instance.hypervisor)
2989

    
2990
  def Exec(self, feedback_fn):
2991
    """Start the instance.
2992

2993
    """
2994
    instance = self.instance
2995
    force = self.op.force
2996

    
2997
    self.cfg.MarkInstanceUp(instance.name)
2998

    
2999
    node_current = instance.primary_node
3000

    
3001
    _StartInstanceDisks(self, instance, force)
3002

    
3003
    result = self.rpc.call_instance_start(node_current, instance,
3004
                                          self.hvparams, self.beparams)
3005
    msg = result.RemoteFailMsg()
3006
    if msg:
3007
      _ShutdownInstanceDisks(self, instance)
3008
      raise errors.OpExecError("Could not start instance: %s" % msg)
3009

    
3010

    
3011
class LURebootInstance(LogicalUnit):
3012
  """Reboot an instance.
3013

3014
  """
3015
  HPATH = "instance-reboot"
3016
  HTYPE = constants.HTYPE_INSTANCE
3017
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
3018
  REQ_BGL = False
3019

    
3020
  def ExpandNames(self):
3021
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
3022
                                   constants.INSTANCE_REBOOT_HARD,
3023
                                   constants.INSTANCE_REBOOT_FULL]:
3024
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
3025
                                  (constants.INSTANCE_REBOOT_SOFT,
3026
                                   constants.INSTANCE_REBOOT_HARD,
3027
                                   constants.INSTANCE_REBOOT_FULL))
3028
    self._ExpandAndLockInstance()
3029

    
3030
  def BuildHooksEnv(self):
3031
    """Build hooks env.
3032

3033
    This runs on master, primary and secondary nodes of the instance.
3034

3035
    """
3036
    env = {
3037
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
3038
      "REBOOT_TYPE": self.op.reboot_type,
3039
      }
3040
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3041
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3042
    return env, nl, nl
3043

    
3044
  def CheckPrereq(self):
3045
    """Check prerequisites.
3046

3047
    This checks that the instance is in the cluster.
3048

3049
    """
3050
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3051
    assert self.instance is not None, \
3052
      "Cannot retrieve locked instance %s" % self.op.instance_name
3053

    
3054
    _CheckNodeOnline(self, instance.primary_node)
3055

    
3056
    # check bridges existance
3057
    _CheckInstanceBridgesExist(self, instance)
3058

    
3059
  def Exec(self, feedback_fn):
3060
    """Reboot the instance.
3061

3062
    """
3063
    instance = self.instance
3064
    ignore_secondaries = self.op.ignore_secondaries
3065
    reboot_type = self.op.reboot_type
3066

    
3067
    node_current = instance.primary_node
3068

    
3069
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3070
                       constants.INSTANCE_REBOOT_HARD]:
3071
      for disk in instance.disks:
3072
        self.cfg.SetDiskID(disk, node_current)
3073
      result = self.rpc.call_instance_reboot(node_current, instance,
3074
                                             reboot_type)
3075
      msg = result.RemoteFailMsg()
3076
      if msg:
3077
        raise errors.OpExecError("Could not reboot instance: %s" % msg)
3078
    else:
3079
      result = self.rpc.call_instance_shutdown(node_current, instance)
3080
      msg = result.RemoteFailMsg()
3081
      if msg:
3082
        raise errors.OpExecError("Could not shutdown instance for"
3083
                                 " full reboot: %s" % msg)
3084
      _ShutdownInstanceDisks(self, instance)
3085
      _StartInstanceDisks(self, instance, ignore_secondaries)
3086
      result = self.rpc.call_instance_start(node_current, instance, None, None)
3087
      msg = result.RemoteFailMsg()
3088
      if msg:
3089
        _ShutdownInstanceDisks(self, instance)
3090
        raise errors.OpExecError("Could not start instance for"
3091
                                 " full reboot: %s" % msg)
3092

    
3093
    self.cfg.MarkInstanceUp(instance.name)
3094

    
3095

    
3096
class LUShutdownInstance(LogicalUnit):
3097
  """Shutdown an instance.
3098

3099
  """
3100
  HPATH = "instance-stop"
3101
  HTYPE = constants.HTYPE_INSTANCE
3102
  _OP_REQP = ["instance_name"]
3103
  REQ_BGL = False
3104

    
3105
  def ExpandNames(self):
3106
    self._ExpandAndLockInstance()
3107

    
3108
  def BuildHooksEnv(self):
3109
    """Build hooks env.
3110

3111
    This runs on master, primary and secondary nodes of the instance.
3112

3113
    """
3114
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3115
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3116
    return env, nl, nl
3117

    
3118
  def CheckPrereq(self):
3119
    """Check prerequisites.
3120

3121
    This checks that the instance is in the cluster.
3122

3123
    """
3124
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3125
    assert self.instance is not None, \
3126
      "Cannot retrieve locked instance %s" % self.op.instance_name
3127
    _CheckNodeOnline(self, self.instance.primary_node)
3128

    
3129
  def Exec(self, feedback_fn):
3130
    """Shutdown the instance.
3131

3132
    """
3133
    instance = self.instance
3134
    node_current = instance.primary_node
3135
    self.cfg.MarkInstanceDown(instance.name)
3136
    result = self.rpc.call_instance_shutdown(node_current, instance)
3137
    msg = result.RemoteFailMsg()
3138
    if msg:
3139
      self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3140

    
3141
    _ShutdownInstanceDisks(self, instance)
3142

    
3143

    
3144
class LUReinstallInstance(LogicalUnit):
3145
  """Reinstall an instance.
3146

3147
  """
3148
  HPATH = "instance-reinstall"
3149
  HTYPE = constants.HTYPE_INSTANCE
3150
  _OP_REQP = ["instance_name"]
3151
  REQ_BGL = False
3152

    
3153
  def ExpandNames(self):
3154
    self._ExpandAndLockInstance()
3155

    
3156
  def BuildHooksEnv(self):
3157
    """Build hooks env.
3158

3159
    This runs on master, primary and secondary nodes of the instance.
3160

3161
    """
3162
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3163
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3164
    return env, nl, nl
3165

    
3166
  def CheckPrereq(self):
3167
    """Check prerequisites.
3168

3169
    This checks that the instance is in the cluster and is not running.
3170

3171
    """
3172
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3173
    assert instance is not None, \
3174
      "Cannot retrieve locked instance %s" % self.op.instance_name
3175
    _CheckNodeOnline(self, instance.primary_node)
3176

    
3177
    if instance.disk_template == constants.DT_DISKLESS:
3178
      raise errors.OpPrereqError("Instance '%s' has no disks" %
3179
                                 self.op.instance_name)
3180
    if instance.admin_up:
3181
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3182
                                 self.op.instance_name)
3183
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3184
                                              instance.name,
3185
                                              instance.hypervisor)
3186
    remote_info.Raise()
3187
    if remote_info.data:
3188
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3189
                                 (self.op.instance_name,
3190
                                  instance.primary_node))
3191

    
3192
    self.op.os_type = getattr(self.op, "os_type", None)
3193
    if self.op.os_type is not None:
3194
      # OS verification
3195
      pnode = self.cfg.GetNodeInfo(
3196
        self.cfg.ExpandNodeName(instance.primary_node))
3197
      if pnode is None:
3198
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
3199
                                   self.op.pnode)
3200
      result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3201
      result.Raise()
3202
      if not isinstance(result.data, objects.OS):
3203
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
3204
                                   " primary node"  % self.op.os_type)
3205

    
3206
    self.instance = instance
3207

    
3208
  def Exec(self, feedback_fn):
3209
    """Reinstall the instance.
3210

3211
    """
3212
    inst = self.instance
3213

    
3214
    if self.op.os_type is not None:
3215
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3216
      inst.os = self.op.os_type
3217
      self.cfg.Update(inst)
3218

    
3219
    _StartInstanceDisks(self, inst, None)
3220
    try:
3221
      feedback_fn("Running the instance OS create scripts...")
3222
      result = self.rpc.call_instance_os_add(inst.primary_node, inst)
3223
      msg = result.RemoteFailMsg()
3224
      if msg:
3225
        raise errors.OpExecError("Could not install OS for instance %s"
3226
                                 " on node %s: %s" %
3227
                                 (inst.name, inst.primary_node, msg))
3228
    finally:
3229
      _ShutdownInstanceDisks(self, inst)
3230

    
3231

    
3232
class LURenameInstance(LogicalUnit):
3233
  """Rename an instance.
3234

3235
  """
3236
  HPATH = "instance-rename"
3237
  HTYPE = constants.HTYPE_INSTANCE
3238
  _OP_REQP = ["instance_name", "new_name"]
3239

    
3240
  def BuildHooksEnv(self):
3241
    """Build hooks env.
3242

3243
    This runs on master, primary and secondary nodes of the instance.
3244

3245
    """
3246
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3247
    env["INSTANCE_NEW_NAME"] = self.op.new_name
3248
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3249
    return env, nl, nl
3250

    
3251
  def CheckPrereq(self):
3252
    """Check prerequisites.
3253

3254
    This checks that the instance is in the cluster and is not running.
3255

3256
    """
3257
    instance = self.cfg.GetInstanceInfo(
3258
      self.cfg.ExpandInstanceName(self.op.instance_name))
3259
    if instance is None:
3260
      raise errors.OpPrereqError("Instance '%s' not known" %
3261
                                 self.op.instance_name)
3262
    _CheckNodeOnline(self, instance.primary_node)
3263

    
3264
    if instance.admin_up:
3265
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3266
                                 self.op.instance_name)
3267
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3268
                                              instance.name,
3269
                                              instance.hypervisor)
3270
    remote_info.Raise()
3271
    if remote_info.data:
3272
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3273
                                 (self.op.instance_name,
3274
                                  instance.primary_node))
3275
    self.instance = instance
3276

    
3277
    # new name verification
3278
    name_info = utils.HostInfo(self.op.new_name)
3279

    
3280
    self.op.new_name = new_name = name_info.name
3281
    instance_list = self.cfg.GetInstanceList()
3282
    if new_name in instance_list:
3283
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3284
                                 new_name)
3285

    
3286
    if not getattr(self.op, "ignore_ip", False):
3287
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3288
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3289
                                   (name_info.ip, new_name))
3290

    
3291

    
3292
  def Exec(self, feedback_fn):
3293
    """Reinstall the instance.
3294

3295
    """
3296
    inst = self.instance
3297
    old_name = inst.name
3298

    
3299
    if inst.disk_template == constants.DT_FILE:
3300
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3301

    
3302
    self.cfg.RenameInstance(inst.name, self.op.new_name)
3303
    # Change the instance lock. This is definitely safe while we hold the BGL
3304
    self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3305
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3306

    
3307
    # re-read the instance from the configuration after rename
3308
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
3309

    
3310
    if inst.disk_template == constants.DT_FILE:
3311
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3312
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3313
                                                     old_file_storage_dir,
3314
                                                     new_file_storage_dir)
3315
      result.Raise()
3316
      if not result.data:
3317
        raise errors.OpExecError("Could not connect to node '%s' to rename"
3318
                                 " directory '%s' to '%s' (but the instance"
3319
                                 " has been renamed in Ganeti)" % (
3320
                                 inst.primary_node, old_file_storage_dir,
3321
                                 new_file_storage_dir))
3322

    
3323
      if not result.data[0]:
3324
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3325
                                 " (but the instance has been renamed in"
3326
                                 " Ganeti)" % (old_file_storage_dir,
3327
                                               new_file_storage_dir))
3328

    
3329
    _StartInstanceDisks(self, inst, None)
3330
    try:
3331
      result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3332
                                                 old_name)
3333
      msg = result.RemoteFailMsg()
3334
      if msg:
3335
        msg = ("Could not run OS rename script for instance %s on node %s"
3336
               " (but the instance has been renamed in Ganeti): %s" %
3337
               (inst.name, inst.primary_node, msg))
3338
        self.proc.LogWarning(msg)
3339
    finally:
3340
      _ShutdownInstanceDisks(self, inst)
3341

    
3342

    
3343
class LURemoveInstance(LogicalUnit):
3344
  """Remove an instance.
3345

3346
  """
3347
  HPATH = "instance-remove"
3348
  HTYPE = constants.HTYPE_INSTANCE
3349
  _OP_REQP = ["instance_name", "ignore_failures"]
3350
  REQ_BGL = False
3351

    
3352
  def ExpandNames(self):
3353
    self._ExpandAndLockInstance()
3354
    self.needed_locks[locking.LEVEL_NODE] = []
3355
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3356

    
3357
  def DeclareLocks(self, level):
3358
    if level == locking.LEVEL_NODE:
3359
      self._LockInstancesNodes()
3360

    
3361
  def BuildHooksEnv(self):
3362
    """Build hooks env.
3363

3364
    This runs on master, primary and secondary nodes of the instance.
3365

3366
    """
3367
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3368
    nl = [self.cfg.GetMasterNode()]
3369
    return env, nl, nl
3370

    
3371
  def CheckPrereq(self):
3372
    """Check prerequisites.
3373

3374
    This checks that the instance is in the cluster.
3375

3376
    """
3377
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3378
    assert self.instance is not None, \
3379
      "Cannot retrieve locked instance %s" % self.op.instance_name
3380

    
3381
  def Exec(self, feedback_fn):
3382
    """Remove the instance.
3383

3384
    """
3385
    instance = self.instance
3386
    logging.info("Shutting down instance %s on node %s",
3387
                 instance.name, instance.primary_node)
3388

    
3389
    result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3390
    msg = result.RemoteFailMsg()
3391
    if msg:
3392
      if self.op.ignore_failures:
3393
        feedback_fn("Warning: can't shutdown instance: %s" % msg)
3394
      else:
3395
        raise errors.OpExecError("Could not shutdown instance %s on"
3396
                                 " node %s: %s" %
3397
                                 (instance.name, instance.primary_node, msg))
3398

    
3399
    logging.info("Removing block devices for instance %s", instance.name)
3400

    
3401
    if not _RemoveDisks(self, instance):
3402
      if self.op.ignore_failures:
3403
        feedback_fn("Warning: can't remove instance's disks")
3404
      else:
3405
        raise errors.OpExecError("Can't remove instance's disks")
3406

    
3407
    logging.info("Removing instance %s out of cluster config", instance.name)
3408

    
3409
    self.cfg.RemoveInstance(instance.name)
3410
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3411

    
3412

    
3413
class LUQueryInstances(NoHooksLU):
3414
  """Logical unit for querying instances.
3415

3416
  """
3417
  _OP_REQP = ["output_fields", "names", "use_locking"]
3418
  REQ_BGL = False
3419
  _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3420
                                    "admin_state",
3421
                                    "disk_template", "ip", "mac", "bridge",
3422
                                    "sda_size", "sdb_size", "vcpus", "tags",
3423
                                    "network_port", "beparams",
3424
                                    r"(disk)\.(size)/([0-9]+)",
3425
                                    r"(disk)\.(sizes)", "disk_usage",
3426
                                    r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3427
                                    r"(nic)\.(macs|ips|bridges)",
3428
                                    r"(disk|nic)\.(count)",
3429
                                    "serial_no", "hypervisor", "hvparams",] +
3430
                                  ["hv/%s" % name
3431
                                   for name in constants.HVS_PARAMETERS] +
3432
                                  ["be/%s" % name
3433
                                   for name in constants.BES_PARAMETERS])
3434
  _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3435

    
3436

    
3437
  def ExpandNames(self):
3438
    _CheckOutputFields(static=self._FIELDS_STATIC,
3439
                       dynamic=self._FIELDS_DYNAMIC,
3440
                       selected=self.op.output_fields)
3441

    
3442
    self.needed_locks = {}
3443
    self.share_locks[locking.LEVEL_INSTANCE] = 1
3444
    self.share_locks[locking.LEVEL_NODE] = 1
3445

    
3446
    if self.op.names:
3447
      self.wanted = _GetWantedInstances(self, self.op.names)
3448
    else:
3449
      self.wanted = locking.ALL_SET
3450

    
3451
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3452
    self.do_locking = self.do_node_query and self.op.use_locking
3453
    if self.do_locking:
3454
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3455
      self.needed_locks[locking.LEVEL_NODE] = []
3456
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3457

    
3458
  def DeclareLocks(self, level):
3459
    if level == locking.LEVEL_NODE and self.do_locking:
3460
      self._LockInstancesNodes()
3461

    
3462
  def CheckPrereq(self):
3463
    """Check prerequisites.
3464

3465
    """
3466
    pass
3467

    
3468
  def Exec(self, feedback_fn):
3469
    """Computes the list of nodes and their attributes.
3470

3471
    """
3472
    all_info = self.cfg.GetAllInstancesInfo()
3473
    if self.wanted == locking.ALL_SET:
3474
      # caller didn't specify instance names, so ordering is not important
3475
      if self.do_locking:
3476
        instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3477
      else:
3478
        instance_names = all_info.keys()
3479
      instance_names = utils.NiceSort(instance_names)
3480
    else:
3481
      # caller did specify names, so we must keep the ordering
3482
      if self.do_locking:
3483
        tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3484
      else:
3485
        tgt_set = all_info.keys()
3486
      missing = set(self.wanted).difference(tgt_set)
3487
      if missing:
3488
        raise errors.OpExecError("Some instances were removed before"
3489
                                 " retrieving their data: %s" % missing)
3490
      instance_names = self.wanted
3491

    
3492
    instance_list = [all_info[iname] for iname in instance_names]
3493

    
3494
    # begin data gathering
3495

    
3496
    nodes = frozenset([inst.primary_node for inst in instance_list])
3497
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
3498

    
3499
    bad_nodes = []
3500
    off_nodes = []
3501
    if self.do_node_query:
3502
      live_data = {}
3503
      node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3504
      for name in nodes:
3505
        result = node_data[name]
3506
        if result.offline:
3507
          # offline nodes will be in both lists
3508
          off_nodes.append(name)
3509
        if result.failed:
3510
          bad_nodes.append(name)
3511
        else:
3512
          if result.data:
3513
            live_data.update(result.data)
3514
            # else no instance is alive
3515
    else:
3516
      live_data = dict([(name, {}) for name in instance_names])
3517

    
3518
    # end data gathering
3519

    
3520
    HVPREFIX = "hv/"
3521
    BEPREFIX = "be/"
3522
    output = []
3523
    for instance in instance_list:
3524
      iout = []
3525
      i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3526
      i_be = self.cfg.GetClusterInfo().FillBE(instance)
3527
      for field in self.op.output_fields:
3528
        st_match = self._FIELDS_STATIC.Matches(field)
3529
        if field == "name":
3530
          val = instance.name
3531
        elif field == "os":
3532
          val = instance.os
3533
        elif field == "pnode":
3534
          val = instance.primary_node
3535
        elif field == "snodes":
3536
          val = list(instance.secondary_nodes)
3537
        elif field == "admin_state":
3538
          val = instance.admin_up
3539
        elif field == "oper_state":
3540
          if instance.primary_node in bad_nodes:
3541
            val = None
3542
          else:
3543
            val = bool(live_data.get(instance.name))
3544
        elif field == "status":
3545
          if instance.primary_node in off_nodes:
3546
            val = "ERROR_nodeoffline"
3547
          elif instance.primary_node in bad_nodes:
3548
            val = "ERROR_nodedown"
3549
          else:
3550
            running = bool(live_data.get(instance.name))
3551
            if running:
3552
              if instance.admin_up:
3553
                val = "running"
3554
              else:
3555
                val = "ERROR_up"
3556
            else:
3557
              if instance.admin_up:
3558
                val = "ERROR_down"
3559
              else:
3560
                val = "ADMIN_down"
3561
        elif field == "oper_ram":
3562
          if instance.primary_node in bad_nodes:
3563
            val = None
3564
          elif instance.name in live_data:
3565
            val = live_data[instance.name].get("memory", "?")
3566
          else:
3567
            val = "-"
3568
        elif field == "vcpus":
3569
          val = i_be[constants.BE_VCPUS]
3570
        elif field == "disk_template":
3571
          val = instance.disk_template
3572
        elif field == "ip":
3573
          if instance.nics:
3574
            val = instance.nics[0].ip
3575
          else:
3576
            val = None
3577
        elif field == "bridge":
3578
          if instance.nics:
3579
            val = instance.nics[0].bridge
3580
          else:
3581
            val = None
3582
        elif field == "mac":
3583
          if instance.nics:
3584
            val = instance.nics[0].mac
3585
          else:
3586
            val = None
3587
        elif field == "sda_size" or field == "sdb_size":
3588
          idx = ord(field[2]) - ord('a')
3589
          try:
3590
            val = instance.FindDisk(idx).size
3591
          except errors.OpPrereqError:
3592
            val = None
3593
        elif field == "disk_usage": # total disk usage per node
3594
          disk_sizes = [{'size': disk.size} for disk in instance.disks]
3595
          val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3596
        elif field == "tags":
3597
          val = list(instance.GetTags())
3598
        elif field == "serial_no":
3599
          val = instance.serial_no
3600
        elif field == "network_port":
3601
          val = instance.network_port
3602
        elif field == "hypervisor":
3603
          val = instance.hypervisor
3604
        elif field == "hvparams":
3605
          val = i_hv
3606
        elif (field.startswith(HVPREFIX) and
3607
              field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3608
          val = i_hv.get(field[len(HVPREFIX):], None)
3609
        elif field == "beparams":
3610
          val = i_be
3611
        elif (field.startswith(BEPREFIX) and
3612
              field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3613
          val = i_be.get(field[len(BEPREFIX):], None)
3614
        elif st_match and st_match.groups():
3615
          # matches a variable list
3616
          st_groups = st_match.groups()
3617
          if st_groups and st_groups[0] == "disk":
3618
            if st_groups[1] == "count":
3619
              val = len(instance.disks)
3620
            elif st_groups[1] == "sizes":
3621
              val = [disk.size for disk in instance.disks]
3622
            elif st_groups[1] == "size":
3623
              try:
3624
                val = instance.FindDisk(st_groups[2]).size
3625
              except errors.OpPrereqError:
3626
                val = None
3627
            else:
3628
              assert False, "Unhandled disk parameter"
3629
          elif st_groups[0] == "nic":
3630
            if st_groups[1] == "count":
3631
              val = len(instance.nics)
3632
            elif st_groups[1] == "macs":
3633
              val = [nic.mac for nic in instance.nics]
3634
            elif st_groups[1] == "ips":
3635
              val = [nic.ip for nic in instance.nics]
3636
            elif st_groups[1] == "bridges":
3637
              val = [nic.bridge for nic in instance.nics]
3638
            else:
3639
              # index-based item
3640
              nic_idx = int(st_groups[2])
3641
              if nic_idx >= len(instance.nics):
3642
                val = None
3643
              else:
3644
                if st_groups[1] == "mac":
3645
                  val = instance.nics[nic_idx].mac
3646
                elif st_groups[1] == "ip":
3647
                  val = instance.nics[nic_idx].ip
3648
                elif st_groups[1] == "bridge":
3649
                  val = instance.nics[nic_idx].bridge
3650
                else:
3651
                  assert False, "Unhandled NIC parameter"
3652
          else:
3653
            assert False, ("Declared but unhandled variable parameter '%s'" %
3654
                           field)
3655
        else:
3656
          assert False, "Declared but unhandled parameter '%s'" % field
3657
        iout.append(val)
3658
      output.append(iout)
3659

    
3660
    return output
3661

    
3662

    
3663
class LUFailoverInstance(LogicalUnit):
3664
  """Failover an instance.
3665

3666
  """
3667
  HPATH = "instance-failover"
3668
  HTYPE = constants.HTYPE_INSTANCE
3669
  _OP_REQP = ["instance_name", "ignore_consistency"]
3670
  REQ_BGL = False
3671

    
3672
  def ExpandNames(self):
3673
    self._ExpandAndLockInstance()
3674
    self.needed_locks[locking.LEVEL_NODE] = []
3675
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3676

    
3677
  def DeclareLocks(self, level):
3678
    if level == locking.LEVEL_NODE:
3679
      self._LockInstancesNodes()
3680

    
3681
  def BuildHooksEnv(self):
3682
    """Build hooks env.
3683

3684
    This runs on master, primary and secondary nodes of the instance.
3685

3686
    """
3687
    env = {
3688
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3689
      }
3690
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3691
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3692
    return env, nl, nl
3693

    
3694
  def CheckPrereq(self):
3695
    """Check prerequisites.
3696

3697
    This checks that the instance is in the cluster.
3698

3699
    """
3700
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3701
    assert self.instance is not None, \
3702
      "Cannot retrieve locked instance %s" % self.op.instance_name
3703

    
3704
    bep = self.cfg.GetClusterInfo().FillBE(instance)
3705
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3706
      raise errors.OpPrereqError("Instance's disk layout is not"
3707
                                 " network mirrored, cannot failover.")
3708

    
3709
    secondary_nodes = instance.secondary_nodes
3710
    if not secondary_nodes:
3711
      raise errors.ProgrammerError("no secondary node but using "
3712
                                   "a mirrored disk template")
3713

    
3714
    target_node = secondary_nodes[0]
3715
    _CheckNodeOnline(self, target_node)
3716
    _CheckNodeNotDrained(self, target_node)
3717

    
3718
    if instance.admin_up:
3719
      # check memory requirements on the secondary node
3720
      _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3721
                           instance.name, bep[constants.BE_MEMORY],
3722
                           instance.hypervisor)
3723
    else:
3724
      self.LogInfo("Not checking memory on the secondary node as"
3725
                   " instance will not be started")
3726

    
3727
    # check bridge existance
3728
    brlist = [nic.bridge for nic in instance.nics]
3729
    result = self.rpc.call_bridges_exist(target_node, brlist)
3730
    result.Raise()
3731
    if not result.data:
3732
      raise errors.OpPrereqError("One or more target bridges %s does not"
3733
                                 " exist on destination node '%s'" %
3734
                                 (brlist, target_node))
3735

    
3736
  def Exec(self, feedback_fn):
3737
    """Failover an instance.
3738

3739
    The failover is done by shutting it down on its present node and
3740
    starting it on the secondary.
3741

3742
    """
3743
    instance = self.instance
3744

    
3745
    source_node = instance.primary_node
3746
    target_node = instance.secondary_nodes[0]
3747

    
3748
    feedback_fn("* checking disk consistency between source and target")
3749
    for dev in instance.disks:
3750
      # for drbd, these are drbd over lvm
3751
      if not _CheckDiskConsistency(self, dev, target_node, False):
3752
        if instance.admin_up and not self.op.ignore_consistency:
3753
          raise errors.OpExecError("Disk %s is degraded on target node,"
3754
                                   " aborting failover." % dev.iv_name)
3755

    
3756
    feedback_fn("* shutting down instance on source node")
3757
    logging.info("Shutting down instance %s on node %s",
3758
                 instance.name, source_node)
3759

    
3760
    result = self.rpc.call_instance_shutdown(source_node, instance)
3761
    msg = result.RemoteFailMsg()
3762
    if msg:
3763
      if self.op.ignore_consistency:
3764
        self.proc.LogWarning("Could not shutdown instance %s on node %s."
3765
                             " Proceeding anyway. Please make sure node"
3766
                             " %s is down. Error details: %s",
3767
                             instance.name, source_node, source_node, msg)
3768
      else:
3769
        raise errors.OpExecError("Could not shutdown instance %s on"
3770
                                 " node %s: %s" %
3771
                                 (instance.name, source_node, msg))
3772

    
3773
    feedback_fn("* deactivating the instance's disks on source node")
3774
    if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3775
      raise errors.OpExecError("Can't shut down the instance's disks.")
3776

    
3777
    instance.primary_node = target_node
3778
    # distribute new instance config to the other nodes
3779
    self.cfg.Update(instance)
3780

    
3781
    # Only start the instance if it's marked as up
3782
    if instance.admin_up:
3783
      feedback_fn("* activating the instance's disks on target node")
3784
      logging.info("Starting instance %s on node %s",
3785
                   instance.name, target_node)
3786

    
3787
      disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3788
                                               ignore_secondaries=True)
3789
      if not disks_ok:
3790
        _ShutdownInstanceDisks(self, instance)
3791
        raise errors.OpExecError("Can't activate the instance's disks")
3792

    
3793
      feedback_fn("* starting the instance on the target node")
3794
      result = self.rpc.call_instance_start(target_node, instance, None, None)
3795
      msg = result.RemoteFailMsg()
3796
      if msg:
3797
        _ShutdownInstanceDisks(self, instance)
3798
        raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3799
                                 (instance.name, target_node, msg))
3800

    
3801

    
3802
class LUMigrateInstance(LogicalUnit):
3803
  """Migrate an instance.
3804

3805
  This is migration without shutting down, compared to the failover,
3806
  which is done with shutdown.
3807

3808
  """
3809
  HPATH = "instance-migrate"
3810
  HTYPE = constants.HTYPE_INSTANCE
3811
  _OP_REQP = ["instance_name", "live", "cleanup"]
3812

    
3813
  REQ_BGL = False
3814

    
3815
  def ExpandNames(self):
3816
    self._ExpandAndLockInstance()
3817
    self.needed_locks[locking.LEVEL_NODE] = []
3818
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3819

    
3820
  def DeclareLocks(self, level):
3821
    if level == locking.LEVEL_NODE:
3822
      self._LockInstancesNodes()
3823

    
3824
  def BuildHooksEnv(self):
3825
    """Build hooks env.
3826

3827
    This runs on master, primary and secondary nodes of the instance.
3828

3829
    """
3830
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3831
    env["MIGRATE_LIVE"] = self.op.live
3832
    env["MIGRATE_CLEANUP"] = self.op.cleanup
3833
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3834
    return env, nl, nl
3835

    
3836
  def CheckPrereq(self):
3837
    """Check prerequisites.
3838

3839
    This checks that the instance is in the cluster.
3840

3841
    """
3842
    instance = self.cfg.GetInstanceInfo(
3843
      self.cfg.ExpandInstanceName(self.op.instance_name))
3844
    if instance is None:
3845
      raise errors.OpPrereqError("Instance '%s' not known" %
3846
                                 self.op.instance_name)
3847

    
3848
    if instance.disk_template != constants.DT_DRBD8:
3849
      raise errors.OpPrereqError("Instance's disk layout is not"
3850
                                 " drbd8, cannot migrate.")
3851

    
3852
    secondary_nodes = instance.secondary_nodes
3853
    if not secondary_nodes:
3854
      raise errors.ConfigurationError("No secondary node but using"
3855
                                      " drbd8 disk template")
3856

    
3857
    i_be = self.cfg.GetClusterInfo().FillBE(instance)
3858

    
3859
    target_node = secondary_nodes[0]
3860
    # check memory requirements on the secondary node
3861
    _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3862
                         instance.name, i_be[constants.BE_MEMORY],
3863
                         instance.hypervisor)
3864

    
3865
    # check bridge existance
3866
    brlist = [nic.bridge for nic in instance.nics]
3867
    result = self.rpc.call_bridges_exist(target_node, brlist)
3868
    if result.failed or not result.data:
3869
      raise errors.OpPrereqError("One or more target bridges %s does not"
3870
                                 " exist on destination node '%s'" %
3871
                                 (brlist, target_node))
3872

    
3873
    if not self.op.cleanup:
3874
      _CheckNodeNotDrained(self, target_node)
3875
      result = self.rpc.call_instance_migratable(instance.primary_node,
3876
                                                 instance)
3877
      msg = result.RemoteFailMsg()
3878
      if msg:
3879
        raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3880
                                   msg)
3881

    
3882
    self.instance = instance
3883

    
3884
  def _WaitUntilSync(self):
3885
    """Poll with custom rpc for disk sync.
3886

3887
    This uses our own step-based rpc call.
3888

3889
    """
3890
    self.feedback_fn("* wait until resync is done")
3891
    all_done = False
3892
    while not all_done:
3893
      all_done = True
3894
      result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3895
                                            self.nodes_ip,
3896
                                            self.instance.disks)
3897
      min_percent = 100
3898
      for node, nres in result.items():
3899
        msg = nres.RemoteFailMsg()
3900
        if msg:
3901
          raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3902
                                   (node, msg))
3903
        node_done, node_percent = nres.payload
3904
        all_done = all_done and node_done
3905
        if node_percent is not None:
3906
          min_percent = min(min_percent, node_percent)
3907
      if not all_done:
3908
        if min_percent < 100:
3909
          self.feedback_fn("   - progress: %.1f%%" % min_percent)
3910
        time.sleep(2)
3911

    
3912
  def _EnsureSecondary(self, node):
3913
    """Demote a node to secondary.
3914

3915
    """
3916
    self.feedback_fn("* switching node %s to secondary mode" % node)
3917

    
3918
    for dev in self.instance.disks:
3919
      self.cfg.SetDiskID(dev, node)
3920

    
3921
    result = self.rpc.call_blockdev_close(node, self.instance.name,
3922
                                          self.instance.disks)
3923
    msg = result.RemoteFailMsg()
3924
    if msg:
3925
      raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3926
                               " error %s" % (node, msg))
3927

    
3928
  def _GoStandalone(self):
3929
    """Disconnect from the network.
3930

3931
    """
3932
    self.feedback_fn("* changing into standalone mode")
3933
    result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3934
                                               self.instance.disks)
3935
    for node, nres in result.items():
3936
      msg = nres.RemoteFailMsg()
3937
      if msg:
3938
        raise errors.OpExecError("Cannot disconnect disks node %s,"
3939
                                 " error %s" % (node, msg))
3940

    
3941
  def _GoReconnect(self, multimaster):
3942
    """Reconnect to the network.
3943

3944
    """
3945
    if multimaster:
3946
      msg = "dual-master"
3947
    else:
3948
      msg = "single-master"
3949
    self.feedback_fn("* changing disks into %s mode" % msg)
3950
    result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3951
                                           self.instance.disks,
3952
                                           self.instance.name, multimaster)
3953
    for node, nres in result.items():
3954
      msg = nres.RemoteFailMsg()
3955
      if msg:
3956
        raise errors.OpExecError("Cannot change disks config on node %s,"
3957
                                 " error: %s" % (node, msg))
3958

    
3959
  def _ExecCleanup(self):
3960
    """Try to cleanup after a failed migration.
3961

3962
    The cleanup is done by:
3963
      - check that the instance is running only on one node
3964
        (and update the config if needed)
3965
      - change disks on its secondary node to secondary
3966
      - wait until disks are fully synchronized
3967
      - disconnect from the network
3968
      - change disks into single-master mode
3969
      - wait again until disks are fully synchronized
3970

3971
    """
3972
    instance = self.instance
3973
    target_node = self.target_node
3974
    source_node = self.source_node
3975

    
3976
    # check running on only one node
3977
    self.feedback_fn("* checking where the instance actually runs"
3978
                     " (if this hangs, the hypervisor might be in"
3979
                     " a bad state)")
3980
    ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3981
    for node, result in ins_l.items():
3982